Ancora @tasks
Per continuare a implementare il tuo workflow, devi aggiungere un altro passaggio per analizzare e salvare le modifiche del file scaricato. Il Dag process_sales è già definito e include il task pull_file. In questo caso, la funzione Python è già pronta per te: parse_file(inputfile, outputfile).
Tieni presente che, quando implementi i task in Airflow, spesso potresti non comprendere nel dettaglio i singoli passaggi forniti. Finché capisci come incapsulare questi passaggi nella struttura di Airflow, sarai in grado di implementare il workflow desiderato.
Questo esercizio fa parte del corso
Introduzione ad Apache Airflow in Python
Istruzioni dell'esercizio
- Crea un task di Airflow usando il metodo
parse_file. - Chiama il task con gli argomenti necessari.
Esercizio pratico interattivo
Prova a risolvere questo esercizio completando il codice di esempio.
@dag(dag_id='process_sales')
def process_sales():
# Decorate parse_file as a task
____
def parse_file(inputfile: str, outputfile: str):
with open(inputfile) as infile:
data = json.load(infile)
with open(outputfile, 'w') as outfile:
json.dump(data, outfile)
pull_file('http://dataserver/sales.json', 'latestsales.json')
# Call the parse_file task
____('latestsales.json', 'latestsales_parsed.json')
process_sales()