Implementando uma Trigger Rule
Depois de criar um workflow, você percebe que o Dag se beneficiaria de enviar atualizações quando pelo menos uma tarefa falhar. Você decide implementar uma tarefa que faça a verificação de one failed no seu Dag para alertar caso qualquer tarefa no Dag falhe.
Todas as outras tarefas já foram definidas e os objetos task e dag já foram importados para você.
Este exercício faz parte do curso
Introdução ao Apache Airflow em Python
Instruções do exercício
- Importe a biblioteca apropriada para usar trigger rules.
- Adicione o atributo de trigger rule apropriado à tarefa
notify_on_failure. - Defina o atributo para que a tarefa seja acionada quando uma ou mais tarefas upstream falharem.
- Defina
notify_on_failurecomo dependência downstream das duas tarefas de transformação.
Exercício interativo prático
Experimente este exercício completando este código de exemplo.
# Import TriggerRule
from airflow.utils.____ import ____
@dag(schedule="@daily", start_date=datetime(2026, 5, 1))
def etl_pipeline():
# Trigger notify_on_failure when any upstream task fails
@task(____=TriggerRule.____)
def notify_on_failure(**context) -> None:
dag_id = context["dag"].dag_id
run_id = context["run_id"]
print(f"ALERT: A task failed in DAG '{dag_id}', run '{run_id}'. Sending notification...")
# Set notify_on_failure downstream of both transform tasks
[transform_users(), transform_orders()] ____ notify_on_failure()
etl_pipeline()