Mendefinisikan DAG
Pada latihan sebelumnya, Anda telah menyelesaikan tahap extract, transform, dan load secara terpisah. Sekarang semuanya digabungkan dalam satu fungsi etl() yang rapi dan dapat Anda temukan di konsol.
Fungsi etl() mengekstrak data kursus dan rating mentah dari basis data terkait, membersihkan data korup dan mengisi nilai yang hilang, menghitung rata-rata rating per kursus dan membuat rekomendasi berdasarkan aturan keputusan untuk menghasilkan rekomendasi, dan akhirnya memuat rekomendasi ke dalam basis data.
Seperti yang mungkin Anda ingat dari video, etl() menerima satu argumen: db_engines. Anda dapat meneruskannya ke task menggunakan op_kwargs di PythonOperator. Anda dapat memberinya sebuah dictionary yang akan diisikan sebagai kwargs pada callable tersebut.
Latihan ini adalah bagian dari kursus
Pengantar Data Engineering
Petunjuk latihan
- Lengkapi definisi DAG agar berjalan setiap hari. Pastikan menggunakan notasi cron.
- Lengkapi
PythonOperator()dengan meneruskan argumen yang benar. Selainetl,db_enginesjuga tersedia di ruang kerja Anda.
Latihan interaktif praktis
Cobalah latihan ini dengan menyelesaikan kode contoh berikut.
# Define the DAG so it runs on a daily basis
dag = DAG(dag_id="recommendations",
schedule_interval="____")
# Make sure `etl()` is called in the operator. Pass the correct kwargs.
task_recommendations = PythonOperator(
task_id="recommendations_task",
python_callable=____,
op_kwargs={"____": ____},
)