Implémenter une règle de déclenchement
Après avoir créé un workflow, vous réalisez que le Dag gagnerait à fournir des mises à jour lorsqu’au moins une tâche échoue. Vous décidez d’ajouter une tâche qui applique le contrôle one failed sur votre Dag afin de vous alerter si une tâche du Dag échoue.
Toutes les autres tâches ont été définies et les objets task et dag sont déjà importés pour vous.
Cet exercice fait partie du cours
Introduction à Apache Airflow en Python
Instructions
- Importez la bibliothèque appropriée pour utiliser les règles de déclenchement (trigger rules).
- Ajoutez l’attribut de règle de déclenchement approprié à la tâche
notify_on_failure. - Définissez l’attribut pour que la tâche se déclenche lorsqu’une ou plusieurs tâches amont échouent.
- Définissez
notify_on_failurecomme dépendance aval des deux tâches de transformation.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
# Import TriggerRule
from airflow.utils.____ import ____
@dag(schedule="@daily", start_date=datetime(2026, 5, 1))
def etl_pipeline():
# Trigger notify_on_failure when any upstream task fails
@task(____=TriggerRule.____)
def notify_on_failure(**context) -> None:
dag_id = context["dag"].dag_id
run_id = context["run_id"]
print(f"ALERT: A task failed in DAG '{dag_id}', run '{run_id}'. Sending notification...")
# Set notify_on_failure downstream of both transform tasks
[transform_users(), transform_orders()] ____ notify_on_failure()
etl_pipeline()