Définir un BranchPythonOperator
Après avoir découvert la puissance de la logique conditionnelle dans Airflow, vous souhaitez tester le BranchPythonOperator. Vous voulez exécuter un chemin de code différent si la date d’exécution actuelle correspond à une nouvelle année (par exemple, 2020 vs 2019).
Le DAG est déjà défini, ainsi que les tâches concernées. Votre tâche actuelle consiste à implémenter le BranchPythonOperator.
Cet exercice fait partie du cours
Introduction à Apache Airflow en Python
Instructions
- Dans la fonction
year_check, configurez le code pour déterminer si l’année de la date d’exécution actuelle est différente de celle de la date d’exécution précédente (c’est‑à‑dire si l’année diffère entre les variables de template Airflow appropriées). - Terminez le
BranchPythonOperatoren ajoutant les arguments appropriés. - Définissez les dépendances sur
current_year_tasketnew_year_task.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task