Get startedGet started for free

Defining a DAG

In the previous exercises you applied the three steps in the ETL process:

  • Extract: Extract the film PostgreSQL table into pandas.
  • Transform: Split the rental_rate column of the film DataFrame.
  • Load: Load a the film DataFrame into a PostgreSQL data warehouse.

The functions extract_film_to_pandas(), transform_rental_rate() and load_dataframe_to_film() are defined in your workspace. In this exercise, you'll add an ETL task to an existing DAG. The DAG to extend and the task to wait for are defined in your workspace are defined as dag and wait_for_table respectively.

This exercise is part of the course

Introduction to Data Engineering

View Course

Exercise instructions

  • Complete the etl() function by making use of the functions defined in the exercise description.
  • Make sure etl_task uses the etl callable.
  • Set up the correct upstream dependency. Note that etl_task should wait for wait_for_table to be finished.
  • The sample code contains a sample run. This means the ETL pipeline runs when you run the code.

Hands-on interactive exercise

Have a go at this exercise by completing this sample code.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Edit and Run Code