Définir un DAG
Dans les exercices précédents, vous avez appliqué les trois étapes du processus ETL:
- Extrait: Extrayez le tableau
filmPostgreSQL danspandas. - Transformer: Fractionnez la colonne
rental_ratedu DataFramefilm. - Chargement: Chargez un DataFrame
filmdans un entrepôt de données PostgreSQL.
Les fonctions extract_film_to_pandas(), transform_rental_rate() et load_dataframe_to_film() sont définies dans votre espace de travail. Dans cet exercice, vous ajouterez une tâche ETL à une tâche existante DAG. Le site DAG à étendre et la tâche à attendre sont définis dans votre espace de travail sous les noms dag et wait_for_table respectivement.
Cet exercice fait partie du cours
Introduction à l'ingénierie des données
Instructions
- Complétez la fonction
etl()en utilisant les fonctions définies dans la description de l'exercice. - Assurez-vous que
etl_taskutilise l'appelableetl. - Mettez en place les dépendances correctes en amont. Notez que
etl_taskdoit attendre quewait_for_tableait terminé. - L'exemple de code contient un exemple d'exécution. Cela signifie que le pipeline ETL s'exécute lorsque vous exécutez le code.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()