Definición de un DAG
En los ejercicios anteriores has aplicado los tres pasos del proceso ETL:
- Extract (Extraer): Extrae la tabla
filmPostgreSQL enpandas. - Transform (Transformar): Divide la columna
rental_ratedel DataFramefilm. - Load (Cargar): Carga un DataFrame de
filmen un almacén de datos PostgreSQL.
Las funciones extract_film_to_pandas(), transform_rental_rate() y load_dataframe_to_film() están definidas en tu espacio de trabajo. En este ejercicio, añadirás una tarea ETL a un DAG existente. El DAG a ampliar y la tarea a esperar se definen en tu espacio de trabajo como dag y wait_for_table respectivamente.
Este ejercicio forma parte del curso
Introducción a la ingeniería de datos
Instrucciones del ejercicio
- Completa la función
etl()utilizando las funciones definidas en la descripción del ejercicio. - Asegúrate de que
etl_taskutiliza la llamadaetl. - Establece la dependencia upstream correcta. Ten en cuenta que
etl_taskdebe esperar a que terminewait_for_table. - El código de ejemplo contiene una ejecución de muestra. Esto significa que se ejecuta la canalización ETL cuando tú ejecutas el código.
Ejercicio interactivo práctico
Prueba este ejercicio y completa el código de muestra.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()