CommencerCommencer gratuitement

Implémenter une règle de déclenchement

Après avoir créé un workflow, vous réalisez que le Dag gagnerait à fournir des mises à jour lorsqu’au moins une tâche échoue. Vous décidez d’ajouter une tâche qui applique le contrôle one failed sur votre Dag afin de vous alerter si une tâche du Dag échoue.

Toutes les autres tâches ont été définies et les objets task et dag sont déjà importés pour vous.

Cet exercice fait partie du cours

Introduction à Apache Airflow en Python

Afficher le cours

Instructions

  • Importez la bibliothèque appropriée pour utiliser les règles de déclenchement (trigger rules).
  • Ajoutez l’attribut de règle de déclenchement approprié à la tâche notify_on_failure.
  • Définissez l’attribut pour que la tâche se déclenche lorsqu’une ou plusieurs tâches amont échouent.
  • Définissez notify_on_failure comme dépendance aval des deux tâches de transformation.

Exercice interactif pratique

Essayez cet exercice en complétant cet exemple de code.

# Import TriggerRule
from airflow.utils.____ import ____

@dag(schedule="@daily", start_date=datetime(2026, 5, 1))
def etl_pipeline():

    # Trigger notify_on_failure when any upstream task fails
    @task(____=TriggerRule.____)
    def notify_on_failure(**context) -> None:
        dag_id = context["dag"].dag_id
        run_id = context["run_id"]
        print(f"ALERT: A task failed in DAG '{dag_id}', run '{run_id}'. Sending notification...")

    # Set notify_on_failure downstream of both transform tasks
    [transform_users(), transform_orders()] ____ notify_on_failure()
    
etl_pipeline()
Modifier et exécuter le code