Définir un @task.branch
Après avoir découvert la puissance de la logique conditionnelle dans Airflow, vous souhaitez tester le décorateur @task.branch. Vous voulez exécuter un chemin de code différent si la date d’exécution actuelle correspond à une nouvelle année (c.-à-d. 2026 vs 2025).
Le Dag est déjà défini, ainsi que les tâches concernées. Votre tâche actuelle consiste à implémenter le @task.branch.
Cet exercice fait partie du cours
Introduction à Apache Airflow en Python
Instructions
- Ajoutez les variables de modèle de date Airflow appropriées comme paramètres de
year_checkpour comparer l’année d’exécution actuelle et la précédente. - Assignez
current_yearetprevious_yearen découpant les 4 premiers caractères de chaque paramètre de date. - Définissez les dépendances sur
current_year_tasketnew_year_task.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
@dag(start_date=datetime(2026,5,1), schedule='@monthly')
def process_yearly_expenses():
# Create a function to determine if years are different
@task.branch
def year_check(____, ____):
current_year = int(____[0:4])
previous_year = int(____[0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the dependencies
branch_task __ current_year_task
____ >> new_year_task