Bir DAG Tanımlamak
Önceki egzersizlerde ETL sürecinin üç adımını uyguladın:
- Extract:
filmPostgreSQL tablosunupandas'a aktar. - Transform:
filmDataFrame'indekirental_ratesütununu böl. - Load:
filmDataFrame'ini bir PostgreSQL veri ambarına yükle.
extract_film_to_pandas(), transform_rental_rate() ve load_dataframe_to_film() fonksiyonları çalışma alanında tanımlı. Bu egzersizde, mevcut bir DAG'e bir ETL görevi ekleyeceksin. Genişleteceğin DAG ve beklemesi gereken görev çalışma alanında sırasıyla dag ve wait_for_table olarak tanımlı.
Bu egzersiz
Data Engineering'e Giriş
kursunun bir parçasıdırEgzersiz talimatları
- Egzersiz açıklamasında tanımlanan fonksiyonları kullanarak
etl()fonksiyonunu tamamla. etl_task'inetlcallable'ını kullandığından emin ol.- Doğru upstream bağımlılığını kur.
etl_task'inwait_for_tabletamamlanana kadar beklemesi gerektiğini unutma. - Örnek kod bir örnek çalışma içerir. Bu, kodu çalıştırdığında ETL hattının da çalışacağı anlamına gelir.
Uygulamalı interaktif egzersiz
Bu örnek kodu tamamlayarak bu egzersizi bitirin.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()