CommencerCommencer gratuitement

Définir un BranchPythonOperator

Après avoir découvert la puissance de la logique conditionnelle dans Airflow, vous souhaitez tester l'opérateur BranchPython. Vous souhaitez exécuter un chemin de code différent si la date d'exécution actuelle correspond à une nouvelle année (c'est-à-dire 2020 contre 2019).

Le site DAG est défini pour vous, ainsi que les tâches en question. Votre tâche actuelle est d'implémenter le BranchPythonOperator.

Cet exercice fait partie du cours

Introduction à Apache Airflow en Python

Afficher le cours

Instructions

  • Dans la fonction year_check, configurez le code pour déterminer si l'année de la date d'exécution actuelle est différente de la date d'exécution précédente (c'est-à-dire si l'année est différente entre les variables appropriées du modèle Airflow).
  • Terminez le site BranchPythonOperator en ajoutant les arguments appropriés.
  • Définissez les dépendances sur current_year_task et new_year_task.

Exercice interactif pratique

Essayez cet exercice en complétant cet exemple de code.

# Create a function to determine if years are different
def year_check(**kwargs):
    current_year = int(kwargs[____][0:4])
    previous_year = int(____[____][0:4])
    if current_year == previous_year:
        return 'current_year_task'
    else:
        return 'new_year_task'

# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
                                   python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task
Modifier et exécuter le code