Mais @tasks
Para continuar implementando seu workflow, você precisa adicionar outra etapa para analisar e salvar as alterações do arquivo baixado. O Dag process_sales está definido e já tem a task pull_file adicionada. Neste caso, a função Python já está definida para você: parse_file(inputfile, outputfile).
Perceba que, ao implementar tasks no Airflow, muitas vezes você não vai entender necessariamente cada etapa que recebe. Enquanto você souber como encapsular essas etapas dentro da estrutura do Airflow, conseguirá implementar o workflow desejado.
Este exercício faz parte do curso
Introdução ao Apache Airflow em Python
Instruções do exercício
- Crie uma task do Airflow usando o método
parse_file. - Chame a task com os argumentos necessários.
Exercício interativo prático
Experimente este exercício completando este código de exemplo.
@dag(dag_id='process_sales')
def process_sales():
# Decorate parse_file as a task
____
def parse_file(inputfile: str, outputfile: str):
with open(inputfile) as infile:
data = json.load(infile)
with open(outputfile, 'w') as outfile:
json.dump(data, outfile)
pull_file('http://dataserver/sales.json', 'latestsales.json')
# Call the parse_file task
____('latestsales.json', 'latestsales_parsed.json')
process_sales()