Definir un BranchPythonOperator
Después de conocer el poder de la lógica condicional en Airflow, quieres probar el Operador BranchPython. Te gustaría ejecutar una ruta de código diferente si la fecha de ejecución actual representa un nuevo año (es decir, 2020 frente a 2019).
El DAG se define para ti, junto con las tareas en cuestión. Tu tarea actual es implementar el BranchPythonOperator.
Este ejercicio forma parte del curso
Introducción a Apache Airflow en Python
Instrucciones de ejercicio
- En la función
year_check
, configura el código para determinar si el año de la fecha de ejecución actual es diferente de la fecha de ejecución anterior (es decir, si el año es diferente entre las variables de plantilla Airflow correspondientes). - Termina el
BranchPythonOperator
añadiendo los argumentos adecuados. - Establece las dependencias en
current_year_task
ynew_year_task
.
Ejercicio interactivo práctico
Pruebe este ejercicio completando este código de muestra.
# Create a function to determine if years are different
def year_check(**kwargs):
current_year = int(kwargs[____][0:4])
previous_year = int(____[____][0:4])
if current_year == previous_year:
return 'current_year_task'
else:
return 'new_year_task'
# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
python_callable=____, ____=True)
# Define the dependencies
branch_task ____ current_year_task
branch_task ____ new_year_task