Implementare una Trigger Rule
Dopo aver creato un workflow, ti rendi conto che il Dag trarrebbe vantaggio dal fornire degli aggiornamenti quando almeno un task fallisce. Decidi quindi di aggiungere un task che implementi il controllo one failed nel tuo Dag per avvisarti se qualsiasi task nel Dag fallisce.
Tutti gli altri task sono già stati definiti e gli oggetti task e dag sono già importati per te.
Questo esercizio fa parte del corso
Introduzione ad Apache Airflow in Python
Istruzioni dell'esercizio
- Importa la libreria appropriata per usare le trigger rules.
- Aggiungi l'attributo di trigger rule corretto al task
notify_on_failure. - Imposta l'attributo in modo che il task venga attivato quando uno o più task upstream falliscono.
- Imposta
notify_on_failurecome dipendenza downstream dei due task di trasformazione.
Esercizio pratico interattivo
Prova a risolvere questo esercizio completando il codice di esempio.
# Import TriggerRule
from airflow.utils.____ import ____
@dag(schedule="@daily", start_date=datetime(2026, 5, 1))
def etl_pipeline():
# Trigger notify_on_failure when any upstream task fails
@task(____=TriggerRule.____)
def notify_on_failure(**context) -> None:
dag_id = context["dag"].dag_id
run_id = context["run_id"]
print(f"ALERT: A task failed in DAG '{dag_id}', run '{run_id}'. Sending notification...")
# Set notify_on_failure downstream of both transform tasks
[transform_users(), transform_orders()] ____ notify_on_failure()
etl_pipeline()