Menerapkan Trigger Rule
Setelah membuat sebuah workflow, Anda menyadari Dag akan lebih bermanfaat jika memberikan pembaruan ketika setidaknya satu task gagal. Anda memutuskan untuk menambahkan sebuah task yang menerapkan pemeriksaan one failed pada Dag Anda untuk memberi tahu jika ada task dalam Dag yang gagal.
Semua task lainnya telah didefinisikan dan objek task & dag sudah diimpor untuk Anda.
Latihan ini adalah bagian dari kursus
Pengantar Apache Airflow dengan Python
Petunjuk latihan
- Impor pustaka yang sesuai untuk menggunakan trigger rules.
- Tambahkan atribut trigger rule yang sesuai ke task
notify_on_failure. - Atur atribut tersebut agar task berjalan ketika satu atau lebih task hulu gagal.
- Jadikan
notify_on_failuresebagai dependensi hilir dari dua task transformasi.
Latihan interaktif praktis
Cobalah latihan ini dengan menyelesaikan kode contoh berikut.
# Import TriggerRule
from airflow.utils.____ import ____
@dag(schedule="@daily", start_date=datetime(2026, 5, 1))
def etl_pipeline():
# Trigger notify_on_failure when any upstream task fails
@task(____=TriggerRule.____)
def notify_on_failure(**context) -> None:
dag_id = context["dag"].dag_id
run_id = context["run_id"]
print(f"ALERT: A task failed in DAG '{dag_id}', run '{run_id}'. Sending notification...")
# Set notify_on_failure downstream of both transform tasks
[transform_users(), transform_orders()] ____ notify_on_failure()
etl_pipeline()