Bir BranchPythonOperator tanımla
Airflow içinde koşullu mantığın gücünü öğrendikten sonra, BranchPythonOperator'ı denemek istiyorsun. Mevcut yürütme tarihi yeni bir yılı temsil ediyorsa (ör. 2020 ve 2019 gibi) farklı bir kod yolunu çalıştırmak istiyorsun.
DAG ve ilgili görevler senin için tanımlandı. Şu anki görevin BranchPythonOperator'ı uygulamak.
Bu egzersiz
Python ile Apache Airflow'a Giriş
kursunun bir parçasıdırEgzersiz talimatları
year_checkfonksiyonunda, mevcut yürütme tarihinin yılının önceki yürütme tarihinin yılından farklı olup olmadığını belirleyecek şekilde kodu yapılandır.- Uygun argümanları ekleyerek
BranchPythonOperator'ı tamamla. - Bağımlılıkları
current_year_taskvenew_year_tasküzerinde ayarla.
Uygulamalı interaktif egzersiz
Bu örnek kodu tamamlayarak bu egzersizi bitirin.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task