CommencerCommencer gratuitement

Définir un DAG

Dans les exercices précédents, vous avez appliqué les trois étapes du processus ETL:

  • Extrait: Extrayez le tableau film PostgreSQL dans pandas.
  • Transformer: Fractionnez la colonne rental_rate du DataFrame film.
  • Chargement: Chargez un DataFrame film dans un entrepôt de données PostgreSQL.

Les fonctions extract_film_to_pandas(), transform_rental_rate() et load_dataframe_to_film() sont définies dans votre espace de travail. Dans cet exercice, vous ajouterez une tâche ETL à une tâche existante DAG. Le site DAG à étendre et la tâche à attendre sont définis dans votre espace de travail sous les noms dag et wait_for_table respectivement.

Cet exercice fait partie du cours

Introduction à l'ingénierie des données

Afficher le cours

Instructions

  • Complétez la fonction etl() en utilisant les fonctions définies dans la description de l'exercice.
  • Assurez-vous que etl_task utilise l'appelable etl.
  • Mettez en place les dépendances correctes en amont. Notez que etl_task doit attendre que wait_for_table ait terminé.
  • L'exemple de code contient un exemple d'exécution. Cela signifie que le pipeline ETL s'exécute lorsque vous exécutez le code.

Exercice interactif pratique

Essayez cet exercice en complétant cet exemple de code.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Modifier et exécuter le code