Einen DAG definieren
In den vorherigen Übungen hast du die drei Schritte des ETL-Prozesses angewendet:
- Extrahieren: Extrahiere die PostgreSQL-Tabelle
film
inpandas
. - Transformieren: Teile die Spalte
rental_rate
des DataFramesfilm
auf. - Laden: Lade den DataFrame
film
in ein PostgreSQL-Data Warehouse.
Die Funktionen extract_film_to_pandas()
, transform_rental_rate()
und load_dataframe_to_film()
sind in deinem Arbeitsbereich definiert. In dieser Übung fügst du einer vorhandenen DAG eine ETL-Aufgabe hinzu. Die zu erweiternde DAG und die Aufgabe, auf die gewartet werden soll, sind in deinem Arbeitsbereich als dag
bzw. wait_for_table
definiert.
Diese Übung ist Teil des Kurses
Einführung in das Data Engineering
Anleitung zur Übung
- Vervollständige die Funktion
etl()
, indem du die Funktionen verwendest, die in der Aufgabenbeschreibung definiert sind. - Stell sicher, dass
etl_task
die aufrufbare Funktionetl
benutzt. - Richte die richtige Upstream-Abhängigkeit ein. Beachte, dass
etl_task
warten sollte, biswait_for_table
fertig ist. - Der Beispielcode beinhaltet einen Beispiellauf. Das heißt, die ETL-Pipeline läuft, wenn du den Code ausführst.
Interaktive Übung
Vervollständige den Beispielcode, um diese Übung erfolgreich abzuschließen.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()