Define a @task.branch
After learning about the power of conditional logic within Airflow, you wish to test out the @task.branch decorator. You'd like to run a different code path if the current execution date represents a new year (i.e., 2026 vs 2025).
The Dag is defined for you, along with the tasks in question. Your current task is to implement the @task.branch.
Diese Übung ist Teil des Kurses
Einführung in Apache Airflow mit Python
Anleitung zur Übung
- Add the appropriate Airflow date template variables as parameters to
year_checkto compare the current and previous execution year. - Assign
current_yearandprevious_yearby slicing the first 4 characters from each date parameter. - Set the dependencies on
current_year_taskandnew_year_task.
Interaktive Übung
Vervollständige den Beispielcode, um diese Übung erfolgreich abzuschließen.
@dag(start_date=datetime(2026,5,1), schedule='@monthly')
def process_yearly_expenses():
# Create a function to determine if years are different
@task.branch
def year_check(____, ____):
current_year = int(____[0:4])
previous_year = int(____[0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the dependencies
branch_task __ current_year_task
____ >> new_year_task