Implementing a callback function
You've recently been assigned to add failure callbacks to the Dags created by your team. To start, you'd like to add a simple failure callback that writes a message to the audit log when the sales_etl_dag fails.
The dag and task objects are already imported and the get_sales_data and process_sales_data tasks have been created.
Bu egzersiz, kursun bir parçasıdır
Python ile Apache Airflow'a Giriş
Egzersiz talimatları
- Create a callback function named
alert_on_failure. - Define the function to accept any objects Airflow passes to it.
- Specify a failure callback using the
alert_on_failurefunction.
Uygulamalı etkileşimli egzersiz
Bu egzersizi bu örnek kodu tamamlayarak deneyin.
# Create the callback function
def ____(____):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
print(f"Task {task_id} in Dag {dag_id} has failed.")
# Specify the Dag with a failure callback
@dag(dag_id='sales_etl_dag',
____=alert_on_failure
)
def sales_etl_dag():
get_sales_data() >> process_sales_data()
sales_etl_dag()