CommencerCommencer gratuitement

Définir un DAG

Dans les exercices précédents, vous avez appliqué les trois étapes du processus ETL :

  • Extract : extraire la table PostgreSQL film dans pandas.
  • Transform : scinder la colonne rental_rate du DataFrame film.
  • Load : charger le DataFrame film dans un entrepôt de données PostgreSQL.

Les fonctions extract_film_to_pandas(), transform_rental_rate() et load_dataframe_to_film() sont définies dans votre espace de travail. Dans cet exercice, vous allez ajouter une tâche ETL à un DAG existant. Le DAG à étendre et la tâche à attendre sont définis dans votre espace de travail sous les noms dag et wait_for_table respectivement.

Cet exercice fait partie du cours

Introduction au data engineering

Afficher le cours

Instructions

  • Complétez la fonction etl() en utilisant les fonctions décrites dans l’énoncé de l’exercice.
  • Assurez-vous que etl_task utilise l’appelable etl.
  • Configurez la bonne dépendance amont. Notez que etl_task doit attendre la fin de wait_for_table.
  • Le code fourni contient une exécution d’exemple. Cela signifie que le pipeline ETL s’exécute lorsque vous lancez le code.

Exercice interactif pratique

Essayez cet exercice en complétant cet exemple de code.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Modifier et exécuter le code