CommencerCommencer gratuitement

Définir un @task.branch

Après avoir découvert la puissance de la logique conditionnelle dans Airflow, vous souhaitez tester le décorateur @task.branch. Vous voulez exécuter un chemin de code différent si la date d’exécution actuelle correspond à une nouvelle année (c.-à-d. 2026 vs 2025).

Le Dag est déjà défini, ainsi que les tâches concernées. Votre tâche actuelle consiste à implémenter le @task.branch.

Cet exercice fait partie du cours

Introduction à Apache Airflow en Python

Afficher le cours

Instructions

  • Ajoutez les variables de modèle de date Airflow appropriées comme paramètres de year_check pour comparer l’année d’exécution actuelle et la précédente.
  • Assignez current_year et previous_year en découpant les 4 premiers caractères de chaque paramètre de date.
  • Définissez les dépendances sur current_year_task et new_year_task.

Exercice interactif pratique

Essayez cet exercice en complétant cet exemple de code.

@dag(start_date=datetime(2026,5,1), schedule='@monthly')
def process_yearly_expenses():
  # Create a function to determine if years are different
  @task.branch
  def year_check(____, ____):
      current_year = int(____[0:4])
      previous_year = int(____[0:4])
      if current_year == previous_year:
          return 'current_year_task'
      else:
          return 'new_year_task'

  # Define the dependencies
  branch_task __ current_year_task
  ____ >> new_year_task
Modifier et exécuter le code