Een DAG definiëren
In de vorige oefeningen heb je de drie stappen van het ETL-proces toegepast:
- Extract: Haal de PostgreSQL-tabel
filmop inpandas. - Transform: Splits de kolom
rental_ratevan defilm-DataFrame. - Load: Laad de
film-DataFrame in een PostgreSQL-datawarehouse.
De functies extract_film_to_pandas(), transform_rental_rate() en load_dataframe_to_film() zijn beschikbaar in je werkruimte. In deze oefening voeg je een ETL-taak toe aan een bestaande DAG. De uit te breiden DAG en de taak waarop moet worden gewacht, zijn in je werkruimte gedefinieerd als respectievelijk dag en wait_for_table.
Deze oefening maakt deel uit van de cursus
Introductie tot Data Engineering
Oefeninstructies
- Maak de functie
etl()af met de functies die in de oefenbeschrijving staan. - Zorg dat
etl_taskde callableetlgebruikt. - Stel de juiste upstream-afhankelijkheid in. Let op:
etl_taskmoet wachten totwait_for_tableklaar is. - De voorbeeldcode bevat een voorbeelduitvoering. Dit betekent dat de ETL-pijplijn draait wanneer je de code uitvoert.
Praktische interactieve oefening
Probeer deze oefening eens door deze voorbeeldcode in te vullen.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()