Define a @task.branch
After learning about the power of conditional logic within Airflow, you wish to test out the @task.branch decorator. You'd like to run a different code path if the current execution date represents a new year (i.e., 2026 vs 2025).
The Dag is defined for you, along with the tasks in question. Your current task is to implement the @task.branch.
Bu egzersiz
Python ile Apache Airflow'a Giriş
kursunun bir parçasıdırEgzersiz talimatları
- Add the appropriate Airflow date template variables as parameters to
year_checkto compare the current and previous execution year. - Assign
current_yearandprevious_yearby slicing the first 4 characters from each date parameter. - Set the dependencies on
current_year_taskandnew_year_task.
Uygulamalı interaktif egzersiz
Bu örnek kodu tamamlayarak bu egzersizi bitirin.
@dag(start_date=datetime(2026,5,1), schedule='@monthly')
def process_yearly_expenses():
# Create a function to determine if years are different
@task.branch
def year_check(____, ____):
current_year = int(____[0:4])
previous_year = int(____[0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the dependencies
branch_task __ current_year_task
____ >> new_year_task