Inizia subitoInizia gratis

Implementare una funzione di callback

Ti è stato appena assegnato il compito di aggiungere callback di errore ai Dag creati dal tuo team. Per iniziare, vuoi aggiungere un semplice callback di errore che scriva un messaggio nel registro di audit quando sales_etl_dag fallisce.

Gli oggetti dag e task sono già importati e i task get_sales_data e process_sales_data sono stati creati.

Questo esercizio fa parte del corso

Introduzione ad Apache Airflow in Python

Visualizza corso

Istruzioni dell'esercizio

  • Crea una funzione di callback chiamata alert_on_failure.
  • Definisci la funzione in modo che accetti qualsiasi oggetto le venga passato da Airflow.
  • Specifica un callback di errore usando la funzione alert_on_failure.

esercizio interattivo pratico

Prova questo esercizio completando questo codice di esempio.

# Create the callback function
def ____(____):
  dag_id = context["dag"].dag_id
  task_id = context["task_instance"].task_id
  print(f"Task {task_id} in Dag {dag_id} has failed.")
  
# Specify the Dag with a failure callback
@dag(dag_id='sales_etl_dag',
     ____=alert_on_failure
)
def sales_etl_dag():
  get_sales_data() >> process_sales_data()
  
sales_etl_dag()
Modifica ed esegui il codice