CommencerCommencer gratuitement

Définir un BranchPythonOperator

Après avoir découvert la puissance de la logique conditionnelle dans Airflow, vous souhaitez tester le BranchPythonOperator. Vous voulez exécuter un chemin de code différent si la date d’exécution actuelle correspond à une nouvelle année (par exemple, 2020 vs 2019).

Le DAG est déjà défini, ainsi que les tâches concernées. Votre tâche actuelle consiste à implémenter le BranchPythonOperator.

Cet exercice fait partie du cours

Introduction à Apache Airflow en Python

Afficher le cours

Instructions

  • Dans la fonction year_check, configurez le code pour déterminer si l’année de la date d’exécution actuelle est différente de celle de la date d’exécution précédente (c’est‑à‑dire si l’année diffère entre les variables de template Airflow appropriées).
  • Terminez le BranchPythonOperator en ajoutant les arguments appropriés.
  • Définissez les dépendances sur current_year_task et new_year_task.

Exercice interactif pratique

Essayez cet exercice en complétant cet exemple de code.

# Create a function to determine if years are different
def year_check(**kwargs):
    current_year = int(kwargs[____][0:4])
    previous_year = int(____[____][0:4])
    if current_year == previous_year:
        return 'current_year_task'
    else:
        return 'new_year_task'

# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
                                   python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task
Modifier et exécuter le code