Definieren einer DAG
In den vorherigen Übungen hast du die drei Schritte des ETL-Prozesses angewendet:
- Auszug: Extrahiere die Tabelle
film
von PostgreSQL inpandas
. - Verwandeln: Teile die Spalte
rental_rate
desfilm
DataFrame. - Last: Lade einen
film
DataFrame in ein PostgreSQL Data Warehouse.
Die Funktionen extract_film_to_pandas()
, transform_rental_rate()
und load_dataframe_to_film()
sind in deinem Arbeitsbereich definiert. In dieser Übung fügst du eine ETL-Aufgabe zu einer bestehenden DAG hinzu. Die zu erweiternde DAG und die Aufgabe, auf die gewartet werden soll, sind in deinem Arbeitsbereich als dag
bzw. wait_for_table
definiert.
Diese Übung ist Teil des Kurses
Einführung in die Datentechnik
Anleitung zur Übung
- Vervollständige die Funktion
etl()
, indem du die in der Übungsbeschreibung definierten Funktionen verwendest. - Stelle sicher, dass
etl_task
dieetl
callable verwendet. - Richte die richtige Upstream-Abhängigkeit ein. Beachte, dass
etl_task
warten sollte, biswait_for_table
fertig ist. - Der Beispielcode enthält einen Probelauf. Das bedeutet, dass die ETL-Pipeline läuft, wenn du den Code ausführst.
Interaktive Übung
Versuche dich an dieser Übung, indem du diesen Beispielcode vervollständigst.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()