Defining a DAG
In the previous exercises you applied the three steps in the ETL process:
- Extract: Extract the
filmPostgreSQL table intopandas. - Transform: Split the
rental_ratecolumn of thefilmDataFrame. - Load: Load a the
filmDataFrame into a PostgreSQL data warehouse.
The functions extract_film_to_pandas(), transform_rental_rate() and load_dataframe_to_film() are defined in your workspace. In this exercise, you'll add an ETL task to an existing DAG. The DAG to extend and the task to wait for are defined in your workspace are defined as dag and wait_for_table respectively.
Bu egzersiz
Introduction to Data Engineering
kursunun bir parçasıdırEgzersiz talimatları
- Complete the
etl()function by making use of the functions defined in the exercise description. - Make sure
etl_taskuses theetlcallable. - Set up the correct upstream dependency. Note that
etl_taskshould wait forwait_for_tableto be finished. - The sample code contains a sample run. This means the ETL pipeline runs when you run the code.
Uygulamalı interaktif egzersiz
Bu örnek kodu tamamlayarak bu egzersizi bitirin.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()