Mendefinisikan DAG
Pada latihan sebelumnya Anda telah menerapkan tiga langkah dalam proses ETL:
- Extract: Mengekstrak tabel PostgreSQL
filmke dalampandas. - Transform: Memisahkan kolom
rental_ratedari DataFramefilm. - Load: Memuat DataFrame
filmke dalam gudang data PostgreSQL.
Fungsi extract_film_to_pandas(), transform_rental_rate() dan load_dataframe_to_film() telah didefinisikan di ruang kerja Anda. Pada latihan ini, Anda akan menambahkan sebuah tugas ETL ke DAG yang sudah ada. DAG yang akan diperluas dan tugas yang harus ditunggu telah didefinisikan di ruang kerja Anda masing-masing sebagai dag dan wait_for_table.
Latihan ini adalah bagian dari kursus
Pengantar Data Engineering
Petunjuk latihan
- Lengkapi fungsi
etl()dengan memanfaatkan fungsi-fungsi yang didefinisikan dalam deskripsi latihan. - Pastikan
etl_taskmenggunakan callableetl. - Atur dependensi upstream yang benar. Perhatikan bahwa
etl_taskharus menunggu hinggawait_for_tableselesai. - Kode contoh menyertakan contoh eksekusi. Ini berarti pipeline ETL akan berjalan saat Anda menjalankan kodenya.
Latihan interaktif praktis
Cobalah latihan ini dengan menyelesaikan kode contoh berikut.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()