Definir um BranchPythonOperator
Depois de aprender sobre o poder da lógica condicional no Airflow, você deseja testar o BranchPythonOperator. Você gostaria de executar um caminho de código diferente se a data de execução atual representar um novo ano (ou seja, 2020 vs. 2019).
O site DAG é definido para você, juntamente com as tarefas em questão. Sua tarefa atual é implementar o BranchPythonOperator.
Este exercício faz parte do curso
Introdução ao Apache Airflow em Python
Instruções de exercício
- Na função
year_check
, configure o código para determinar se o ano da data de execução atual é diferente da data de execução anterior (ou seja, se o ano é diferente entre as variáveis de modelo Airflow apropriadas). - Conclua o site
BranchPythonOperator
adicionando os argumentos apropriados. - Defina as dependências em
current_year_task
enew_year_task
.
Exercício interativo prático
Experimente este exercício preenchendo este código de exemplo.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task