Más @tasks
Para seguir implementando tu flujo de trabajo, debes añadir otro paso para analizar y guardar los cambios del archivo descargado. El Dag process_sales está definido y ya tiene añadida la tarea pull_file. En este caso, la función de Python ya está definida para ti: parse_file(inputfile, outputfile).
Ten en cuenta que, a menudo, cuando implementes tareas de Airflow, no necesariamente entenderás los pasos individuales que te den. Mientras entiendas cómo encapsular los pasos dentro de la estructura de Airflow, podrás implementar el flujo de trabajo deseado.
Este ejercicio forma parte del curso
Introducción a Apache Airflow en Python
Instrucciones del ejercicio
- Crea una tarea de Airflow usando el método
parse_file. - Llama a la tarea con los argumentos necesarios.
Ejercicio interactivo práctico
Prueba este ejercicio y completa el código de muestra.
@dag(dag_id='process_sales')
def process_sales():
# Decorate parse_file as a task
____
def parse_file(inputfile: str, outputfile: str):
with open(inputfile) as infile:
data = json.load(infile)
with open(outputfile, 'w') as outfile:
json.dump(data, outfile)
pull_file('http://dataserver/sales.json', 'latestsales.json')
# Call the parse_file task
____('latestsales.json', 'latestsales_parsed.json')
process_sales()