Definición de un DAG
En los ejercicios anteriores has aplicado los tres pasos del proceso ETL:
- Extract (Extraer): Extrae la tabla
film
PostgreSQL enpandas
. - Transform (Transformar): Divide la columna
rental_rate
del DataFramefilm
. - Load (Cargar): Carga un DataFrame de
film
en un almacén de datos PostgreSQL.
Las funciones extract_film_to_pandas()
, transform_rental_rate()
y load_dataframe_to_film()
están definidas en tu espacio de trabajo. En este ejercicio, añadirás una tarea ETL a un DAG existente. El DAG a ampliar y la tarea a esperar se definen en tu espacio de trabajo como dag
y wait_for_table
respectivamente.
Este ejercicio forma parte del curso
Introducción a la ingeniería de datos
Instrucciones del ejercicio
- Completa la función
etl()
utilizando las funciones definidas en la descripción del ejercicio. - Asegúrate de que
etl_task
utiliza la llamadaetl
. - Establece la dependencia upstream correcta. Ten en cuenta que
etl_task
debe esperar a que terminewait_for_table
. - El código de ejemplo contiene una ejecución de muestra. Esto significa que se ejecuta la canalización ETL cuando tú ejecutas el código.
Ejercicio interactivo práctico
Prueba este ejercicio y completa el código de muestra.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()