MulaiMulai sekarang secara gratis

Mendefinisikan DAG

Pada latihan sebelumnya Anda telah menerapkan tiga langkah dalam proses ETL:

  • Extract: Mengekstrak tabel PostgreSQL film ke dalam pandas.
  • Transform: Memisahkan kolom rental_rate dari DataFrame film.
  • Load: Memuat DataFrame film ke dalam gudang data PostgreSQL.

Fungsi extract_film_to_pandas(), transform_rental_rate() dan load_dataframe_to_film() telah didefinisikan di ruang kerja Anda. Pada latihan ini, Anda akan menambahkan sebuah tugas ETL ke DAG yang sudah ada. DAG yang akan diperluas dan tugas yang harus ditunggu telah didefinisikan di ruang kerja Anda masing-masing sebagai dag dan wait_for_table.

Latihan ini adalah bagian dari kursus

Pengantar Data Engineering

Lihat Kursus

Petunjuk latihan

  • Lengkapi fungsi etl() dengan memanfaatkan fungsi-fungsi yang didefinisikan dalam deskripsi latihan.
  • Pastikan etl_task menggunakan callable etl.
  • Atur dependensi upstream yang benar. Perhatikan bahwa etl_task harus menunggu hingga wait_for_table selesai.
  • Kode contoh menyertakan contoh eksekusi. Ini berarti pipeline ETL akan berjalan saat Anda menjalankan kodenya.

Latihan interaktif praktis

Cobalah latihan ini dengan menyelesaikan kode contoh berikut.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Edit dan Jalankan Kode