MulaiMulai sekarang secara gratis

Defining a DAG

In the previous exercises you applied the three steps in the ETL process:

  • Extract: Extract the film PostgreSQL table into pandas.
  • Transform: Split the rental_rate column of the film DataFrame.
  • Load: Load a the film DataFrame into a PostgreSQL data warehouse.

The functions extract_film_to_pandas(), transform_rental_rate() and load_dataframe_to_film() are defined in your workspace. In this exercise, you'll add an ETL task to an existing DAG. The DAG to extend and the task to wait for are defined in your workspace are defined as dag and wait_for_table respectively.

Latihan ini adalah bagian dari kursus

Introduction to Data Engineering

Lihat Kursus

Petunjuk latihan

  • Complete the etl() function by making use of the functions defined in the exercise description.
  • Make sure etl_task uses the etl callable.
  • Set up the correct upstream dependency. Note that etl_task should wait for wait_for_table to be finished.
  • The sample code contains a sample run. This means the ETL pipeline runs when you run the code.

Latihan interaktif praktis

Cobalah latihan ini dengan menyelesaikan kode contoh berikut.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Edit dan Jalankan Kode