Définir le DAG
Dans les exercices précédents, vous avez réalisé séparément les phases d'extraction, de transformation et de chargement. Tout cela est rassemblé dans une fonction etl()
que vous pouvez découvrir dans la console.
La fonction etl()
extrait les données brutes sur les cours et les évaluations des bases de données pertinentes, nettoie les données corrompues et complète les valeurs manquantes, calcule l'évaluation moyenne par cours et crée des recommandations basées sur les règles de décision pour la production de recommandations, et enfin charge les recommandations dans une base de données.
Comme vous vous en souvenez peut-être dans la vidéo, etl()
accepte un seul argument : db_engines
. Vous pouvez transmettre cette information à la tâche en utilisant op_kwargs
dans la page PythonOperator
. Vous pouvez lui passer un dictionnaire qui sera rempli comme kwargs dans le callable.
Cet exercice fait partie du cours
Introduction à l'ingénierie des données
Instructions
- Complétez la définition de DAG, afin qu'il fonctionne quotidiennement. Veillez à utiliser la notation cron.
- Complétez le site
PythonOperator()
en passant les arguments corrects. Outreetl
,db_engines
est également disponible dans votre espace de travail.
Exercice interactif pratique
Essayez cet exercice en complétant cet exemple de code.
# Define the DAG so it runs on a daily basis
dag = DAG(dag_id="recommendations",
schedule_interval="____")
# Make sure `etl()` is called in the operator. Pass the correct kwargs.
task_recommendations = PythonOperator(
task_id="recommendations_task",
python_callable=____,
op_kwargs={"____": ____},
)