Einen DAG definieren
In den vorherigen Übungen hast du die drei Schritte des ETL-Prozesses angewendet:
- Extrahieren: Extrahiere die PostgreSQL-Tabelle
filminpandas. - Transformieren: Teile die Spalte
rental_ratedes DataFramesfilmauf. - Laden: Lade den DataFrame
filmin ein PostgreSQL-Data Warehouse.
Die Funktionen extract_film_to_pandas(), transform_rental_rate() und load_dataframe_to_film() sind in deinem Arbeitsbereich definiert. In dieser Übung fügst du einem vorhandenen DAG eine ETL-Aufgabe hinzu. Die zu erweiternde DAG und die Aufgabe, auf die gewartet werden soll, sind in deinem Arbeitsbereich als dag bzw. wait_for_table definiert.
Diese Übung ist Teil des Kurses
Einführung in das Data Engineering
Anleitung zur Übung
- Vervollständige die Funktion
etl(), indem du die Funktionen verwendest, die in der Aufgabenbeschreibung definiert sind. - Stell sicher, dass
etl_taskdie aufrufbare Funktionetlbenutzt. - Richte die richtige Upstream-Abhängigkeit ein. Beachte, dass
etl_taskwarten sollte, biswait_for_tablefertig ist. - Der Beispielcode beinhaltet einen Beispiellauf. Das heißt, die ETL-Pipeline läuft, wenn du den Code ausführst.
Interaktive Übung
Vervollständige den Beispielcode, um diese Übung erfolgreich abzuschließen.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()