Mulai sekarangMulai gratis

Mendefinisikan DAG

Pada latihan sebelumnya Anda telah menerapkan tiga langkah dalam proses ETL:

  • Extract: Mengekstrak tabel PostgreSQL film ke dalam pandas.
  • Transform: Memisahkan kolom rental_rate dari DataFrame film.
  • Load: Memuat DataFrame film ke dalam gudang data PostgreSQL.

Fungsi extract_film_to_pandas(), transform_rental_rate() dan load_dataframe_to_film() telah didefinisikan di ruang kerja Anda. Pada latihan ini, Anda akan menambahkan sebuah tugas ETL ke DAG yang sudah ada. DAG yang akan diperluas dan tugas yang harus ditunggu telah didefinisikan di ruang kerja Anda masing-masing sebagai dag dan wait_for_table.

Latihan ini merupakan bagian dari kursus

Pengantar Data Engineering

Lihat Kursus

Instruksi latihan

  • Lengkapi fungsi etl() dengan memanfaatkan fungsi-fungsi yang didefinisikan dalam deskripsi latihan.
  • Pastikan etl_task menggunakan callable etl.
  • Atur dependensi upstream yang benar. Perhatikan bahwa etl_task harus menunggu hingga wait_for_table selesai.
  • Kode contoh menyertakan contoh eksekusi. Ini berarti pipeline ETL akan berjalan saat Anda menjalankan kodenya.

Latihan interaktif langsung praktik

Cobalah latihan ini dengan melengkapi kode contoh ini.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Edit dan Jalankan Kode