Mendefinisikan sebuah @task.branch
Setelah mempelajari kekuatan logika kondisional di Airflow, Anda ingin mencoba dekorator @task.branch. Anda ingin menjalankan jalur kode yang berbeda jika tanggal eksekusi saat ini menandai tahun baru (misalnya, 2026 vs 2025).
Dag sudah didefinisikan untuk Anda, beserta tugas-tugasnya. Tugas Anda saat ini adalah mengimplementasikan @task.branch.
Latihan ini adalah bagian dari kursus
Pengantar Apache Airflow dengan Python
Petunjuk latihan
- Tambahkan variabel templat tanggal Airflow yang sesuai sebagai parameter ke
year_checkuntuk membandingkan tahun eksekusi saat ini dan sebelumnya. - Tetapkan
current_yeardanprevious_yeardengan mengambil 4 karakter pertama dari masing-masing parameter tanggal. - Atur dependensi pada
current_year_taskdannew_year_task.
Latihan interaktif praktis
Cobalah latihan ini dengan menyelesaikan kode contoh berikut.
@dag(start_date=datetime(2026,5,1), schedule='@monthly')
def process_yearly_expenses():
# Create a function to determine if years are different
@task.branch
def year_check(____, ____):
current_year = int(____[0:4])
previous_year = int(____[0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the dependencies
branch_task __ current_year_task
____ >> new_year_task