Implementare una funzione di callback
Ti è stato appena assegnato il compito di aggiungere callback di errore ai Dag creati dal tuo team. Per iniziare, vuoi aggiungere un semplice callback di errore che scriva un messaggio nel registro di audit quando sales_etl_dag fallisce.
Gli oggetti dag e task sono già importati e i task get_sales_data e process_sales_data sono stati creati.
Questo esercizio fa parte del corso
Introduzione ad Apache Airflow in Python
Istruzioni dell'esercizio
- Crea una funzione di callback chiamata
alert_on_failure. - Definisci la funzione in modo che accetti qualsiasi oggetto le venga passato da Airflow.
- Specifica un callback di errore usando la funzione
alert_on_failure.
esercizio interattivo pratico
Prova questo esercizio completando questo codice di esempio.
# Create the callback function
def ____(____):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
print(f"Task {task_id} in Dag {dag_id} has failed.")
# Specify the Dag with a failure callback
@dag(dag_id='sales_etl_dag',
____=alert_on_failure
)
def sales_etl_dag():
get_sales_data() >> process_sales_data()
sales_etl_dag()