IniziaInizia gratis

Definire un DAG

Negli esercizi precedenti hai applicato i tre passaggi del processo ETL:

  • Extract: estrai la tabella PostgreSQL film in pandas.
  • Transform: dividi la colonna rental_rate del DataFrame film.
  • Load: carica il DataFrame film in un data warehouse PostgreSQL.

Le funzioni extract_film_to_pandas(), transform_rental_rate() e load_dataframe_to_film() sono già definite nel tuo workspace. In questo esercizio, aggiungerai un task ETL a un DAG esistente. Il DAG da estendere e il task da attendere sono definiti nel tuo workspace rispettivamente come dag e wait_for_table.

Questo esercizio fa parte del corso

Introduzione al Data Engineering

Visualizza il corso

Istruzioni dell'esercizio

  • Completa la funzione etl() usando le funzioni definite nella descrizione dell'esercizio.
  • Assicurati che etl_task utilizzi il callable etl.
  • Imposta la dipendenza a monte corretta. Nota che etl_task deve aspettare che wait_for_table sia terminato.
  • Il codice di esempio include un'esecuzione di prova. Questo significa che la pipeline ETL verrà eseguita quando lanci il codice.

Esercizio pratico interattivo

Prova a risolvere questo esercizio completando il codice di esempio.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Modifica ed esegui il codice