Defina um BranchPythonOperator
Depois de conhecer o poder da lógica condicional no Airflow, você quer testar o BranchPythonOperator. Você deseja executar um caminho de código diferente se a data de execução atual representar um novo ano (por exemplo, 2020 vs 2019).
O DAG já está definido para você, assim como as tarefas em questão. Sua tarefa agora é implementar o BranchPythonOperator.
Este exercício faz parte do curso
Introdução ao Apache Airflow em Python
Instruções do exercício
- Na função
year_check, ajuste o código para verificar se o ano da data de execução atual é diferente do da execução anterior (isto é, se o ano difere entre as variáveis de template apropriadas do Airflow). - Conclua o
BranchPythonOperatoradicionando os argumentos apropriados. - Defina as dependências em
current_year_taskenew_year_task.
Exercício interativo prático
Experimente este exercício completando este código de exemplo.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task