Defining a DAG
In the previous exercises you applied the three steps in the ETL process:
- Extract: Extract the
film
PostgreSQL table intopandas
. - Transform: Split the
rental_rate
column of thefilm
DataFrame. - Load: Load a the
film
DataFrame into a PostgreSQL data warehouse.
The functions extract_film_to_pandas()
, transform_rental_rate()
and load_dataframe_to_film()
are defined in your workspace. In this exercise, you'll add an ETL task to an existing DAG. The DAG to extend and the task to wait for are defined in your workspace are defined as dag
and wait_for_table
respectively.
This exercise is part of the course
Introduction to Data Engineering
Exercise instructions
- Complete the
etl()
function by making use of the functions defined in the exercise description. - Make sure
etl_task
uses theetl
callable. - Set up the correct upstream dependency. Note that
etl_task
should wait forwait_for_table
to be finished. - The sample code contains a sample run. This means the ETL pipeline runs when you run the code.
Hands-on interactive exercise
Have a go at this exercise by completing this sample code.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()