Define un BranchPythonOperator
Después de aprender sobre el poder de la lógica condicional en Airflow, quieres probar el BranchPythonOperator. Te gustaría ejecutar una ruta de código diferente si la fecha de ejecución actual representa un año nuevo (p. ej., 2020 vs 2019).
El DAG está definido por ti, junto con las tareas correspondientes. Tu tarea actual es implementar el BranchPythonOperator.
Este ejercicio forma parte del curso
Introducción a Apache Airflow en Python
Instrucciones del ejercicio
- En la función
year_check, configura el código para determinar si el año de la fecha de ejecución actual es diferente del de la fecha de ejecución anterior (es decir, si el año difiere entre las variables de plantilla de Airflow correspondientes). - Completa el
BranchPythonOperatorañadiendo los argumentos apropiados. - Establece las dependencias en
current_year_taskynew_year_task.
Ejercicio interactivo práctico
Prueba este ejercicio y completa el código de muestra.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task