Mendefinisikan BranchPythonOperator
Setelah mempelajari kekuatan logika bersyarat di Airflow, Anda ingin mencoba BranchPythonOperator. Anda ingin menjalankan jalur kode yang berbeda jika tanggal eksekusi saat ini menandai tahun baru (misalnya, 2020 vs 2019).
DAG sudah didefinisikan untuk Anda, beserta tugas-tugas terkait. Tugas Anda saat ini adalah mengimplementasikan BranchPythonOperator.
Latihan ini adalah bagian dari kursus
Pengantar Apache Airflow dengan Python
Petunjuk latihan
- Dalam fungsi
year_check, konfigurasikan kode untuk menentukan apakah tahun dari tanggal eksekusi saat ini berbeda dengan tanggal eksekusi sebelumnya (yaitu, apakah tahunnya berbeda antara variabel template Airflow yang sesuai). - Selesaikan
BranchPythonOperatordengan menambahkan argumen yang tepat. - Atur dependensi pada
current_year_taskdannew_year_task.
Latihan interaktif praktis
Cobalah latihan ini dengan menyelesaikan kode contoh berikut.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task