Definire un DAG
Negli esercizi precedenti hai applicato i tre passaggi del processo ETL:
- Extract: estrai la tabella PostgreSQL
filminpandas. - Transform: dividi la colonna
rental_ratedel DataFramefilm. - Load: carica il DataFrame
filmin un data warehouse PostgreSQL.
Le funzioni extract_film_to_pandas(), transform_rental_rate() e load_dataframe_to_film() sono già definite nel tuo workspace. In questo esercizio, aggiungerai un task ETL a un DAG esistente. Il DAG da estendere e il task da attendere sono definiti nel tuo workspace rispettivamente come dag e wait_for_table.
Questo esercizio fa parte del corso
Introduzione al Data Engineering
Istruzioni dell'esercizio
- Completa la funzione
etl()usando le funzioni definite nella descrizione dell'esercizio. - Assicurati che
etl_taskutilizzi il callableetl. - Imposta la dipendenza a monte corretta. Nota che
etl_taskdeve aspettare chewait_for_tablesia terminato. - Il codice di esempio include un'esecuzione di prova. Questo significa che la pipeline ETL verrà eseguita quando lanci il codice.
Esercizio pratico interattivo
Prova a risolvere questo esercizio completando il codice di esempio.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()