IniziaInizia gratis

Implementare una Trigger Rule

Dopo aver creato un workflow, ti rendi conto che il Dag trarrebbe vantaggio dal fornire degli aggiornamenti quando almeno un task fallisce. Decidi quindi di aggiungere un task che implementi il controllo one failed nel tuo Dag per avvisarti se qualsiasi task nel Dag fallisce.

Tutti gli altri task sono già stati definiti e gli oggetti task e dag sono già importati per te.

Questo esercizio fa parte del corso

Introduzione ad Apache Airflow in Python

Visualizza il corso

Istruzioni dell'esercizio

  • Importa la libreria appropriata per usare le trigger rules.
  • Aggiungi l'attributo di trigger rule corretto al task notify_on_failure.
  • Imposta l'attributo in modo che il task venga attivato quando uno o più task upstream falliscono.
  • Imposta notify_on_failure come dipendenza downstream dei due task di trasformazione.

Esercizio pratico interattivo

Prova a risolvere questo esercizio completando il codice di esempio.

# Import TriggerRule
from airflow.utils.____ import ____

@dag(schedule="@daily", start_date=datetime(2026, 5, 1))
def etl_pipeline():

    # Trigger notify_on_failure when any upstream task fails
    @task(____=TriggerRule.____)
    def notify_on_failure(**context) -> None:
        dag_id = context["dag"].dag_id
        run_id = context["run_id"]
        print(f"ALERT: A task failed in DAG '{dag_id}', run '{run_id}'. Sending notification...")

    # Set notify_on_failure downstream of both transform tasks
    [transform_users(), transform_orders()] ____ notify_on_failure()
    
etl_pipeline()
Modifica ed esegui il codice