Définir un DAG
Dans les exercices précédents, vous avez appliqué les trois étapes du processus ETL:
- Extrait: Extrayez le tableau
film
PostgreSQL danspandas
. - Transformer: Fractionnez la colonne
rental_rate
du DataFramefilm
. - Chargement: Chargez un DataFrame
film
dans un entrepôt de données PostgreSQL.
Les fonctions extract_film_to_pandas()
, transform_rental_rate()
et load_dataframe_to_film()
sont définies dans votre espace de travail. Dans cet exercice, vous ajouterez une tâche ETL à une tâche existante DAG. Le site DAG à étendre et la tâche à attendre sont définis dans votre espace de travail sous les noms dag
et wait_for_table
respectivement.
Cet exercice fait partie du cours
Introduction à l'ingénierie des données
Instructions
- Complétez la fonction
etl()
en utilisant les fonctions définies dans la description de l'exercice. - Assurez-vous que
etl_task
utilise l'appelableetl
. - Mettez en place les dépendances correctes en amont. Notez que
etl_task
doit attendre quewait_for_table
ait terminé. - L'exemple de code contient un exemple d'exécution. Cela signifie que le pipeline ETL s'exécute lorsque vous exécutez le code.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()