CommencerCommencer gratuitement

Encore plus de @tasks

Pour poursuivre la mise en place de votre workflow, vous devez ajouter une nouvelle étape pour analyser et enregistrer les modifications du fichier téléchargé. Le Dag process_sales est défini et la tâche pull_file y est déjà ajoutée. Dans ce cas, la fonction Python est déjà définie pour vous, parse_file(inputfile, outputfile).

Notez que, lors de l’implémentation de tâches Airflow, il arrive souvent que vous ne connaissiez pas en détail chaque étape qui vous est fournie. Tant que vous savez encapsuler ces étapes dans la structure d’Airflow, vous pourrez mettre en œuvre le workflow souhaité.

Cet exercice fait partie du cours

Introduction à Apache Airflow en Python

Afficher le cours

Instructions

  • Créez une tâche Airflow en utilisant la méthode parse_file.
  • Appelez la tâche avec les arguments nécessaires.

Exercice interactif pratique

Essayez cet exercice en complétant cet exemple de code.

@dag(dag_id='process_sales')
def process_sales():
    # Decorate parse_file as a task
    ____
    def parse_file(inputfile: str, outputfile: str):
        with open(inputfile) as infile:
            data = json.load(infile)
            with open(outputfile, 'w') as outfile:
                json.dump(data, outfile)

    pull_file('http://dataserver/sales.json', 'latestsales.json')
    # Call the parse_file task
    ____('latestsales.json', 'latestsales_parsed.json')

process_sales()
Modifier et exécuter le code