Définir un DAG
Dans les exercices précédents, vous avez appliqué les trois étapes du processus ETL :
- Extract : extraire la table PostgreSQL
filmdanspandas. - Transform : scinder la colonne
rental_ratedu DataFramefilm. - Load : charger le DataFrame
filmdans un entrepôt de données PostgreSQL.
Les fonctions extract_film_to_pandas(), transform_rental_rate() et load_dataframe_to_film() sont définies dans votre espace de travail. Dans cet exercice, vous allez ajouter une tâche ETL à un DAG existant. Le DAG à étendre et la tâche à attendre sont définis dans votre espace de travail sous les noms dag et wait_for_table respectivement.
Cet exercice fait partie du cours
Introduction au data engineering
Instructions
- Complétez la fonction
etl()en utilisant les fonctions décrites dans l’énoncé de l’exercice. - Assurez-vous que
etl_taskutilise l’appelableetl. - Configurez la bonne dépendance amont. Notez que
etl_taskdoit attendre la fin dewait_for_table. - Le code fourni contient une exécution d’exemple. Cela signifie que le pipeline ETL s’exécute lorsque vous lancez le code.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()