Definición de un DAG
En los ejercicios anteriores has aplicado los tres pasos del proceso ETL:
- Extract (Extraer): Extrae la tabla
film
PostgreSQL enpandas
. - Transform (Transformar): Divide la columna
rental_rate
del DataFramefilm
. - Load (Cargar): Carga un DataFrame de
film
en un almacén de datos PostgreSQL.
Las funciones extract_film_to_pandas()
, transform_rental_rate()
y load_dataframe_to_film()
están definidas en tu espacio de trabajo. En este ejercicio, añadirás una tarea ETL a un DAG existente. El DAG a ampliar y la tarea a esperar se definen en tu espacio de trabajo como dag
y wait_for_table
respectivamente.
Este ejercicio forma parte del curso
Introducción a la ingeniería de datos
Instrucciones de ejercicio
- Completa la función
etl()
utilizando las funciones definidas en la descripción del ejercicio. - Asegúrate de que
etl_task
utiliza la llamadaetl
. - Establece la dependencia upstream correcta. Ten en cuenta que
etl_task
debe esperar a que terminewait_for_table
. - El código de ejemplo contiene una ejecución de muestra. Esto significa que se ejecuta la canalización ETL cuando tú ejecutas el código.
Ejercicio interactivo práctico
Pruebe este ejercicio completando este código de muestra.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()