Encore plus de @tasks
Pour poursuivre la mise en place de votre workflow, vous devez ajouter une nouvelle étape pour analyser et enregistrer les modifications du fichier téléchargé. Le Dag process_sales est défini et la tâche pull_file y est déjà ajoutée. Dans ce cas, la fonction Python est déjà définie pour vous, parse_file(inputfile, outputfile).
Notez que, lors de l’implémentation de tâches Airflow, il arrive souvent que vous ne connaissiez pas en détail chaque étape qui vous est fournie. Tant que vous savez encapsuler ces étapes dans la structure d’Airflow, vous pourrez mettre en œuvre le workflow souhaité.
Cet exercice fait partie du cours
Introduction à Apache Airflow en Python
Instructions
- Créez une tâche Airflow en utilisant la méthode
parse_file. - Appelez la tâche avec les arguments nécessaires.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
@dag(dag_id='process_sales')
def process_sales():
# Decorate parse_file as a task
____
def parse_file(inputfile: str, outputfile: str):
with open(inputfile) as infile:
data = json.load(infile)
with open(outputfile, 'w') as outfile:
json.dump(data, outfile)
pull_file('http://dataserver/sales.json', 'latestsales.json')
# Call the parse_file task
____('latestsales.json', 'latestsales_parsed.json')
process_sales()