Definición de un DAG
En los ejercicios anteriores has aplicado los tres pasos del proceso ETL:
- Extract (Extraer): Extrae la tabla
filmPostgreSQL enpandas. - Transform (Transformar): Divide la columna
rental_ratedel DataFramefilm. - Load (Cargar): Carga un DataFrame de
filmen un almacén de datos PostgreSQL.
Las funciones extract_film_to_pandas(), transform_rental_rate() y load_dataframe_to_film() están definidas en tu espacio de trabajo. En este ejercicio, añadirás una tarea ETL a un DAG existente. El DAG a ampliar y la tarea a esperar se definen en tu espacio de trabajo como dag y wait_for_table respectivamente.
Este ejercicio forma parte del curso
Introducción a la ingeniería de datos
Instrucciones del ejercicio
- Completa la función
etl()utilizando las funciones definidas en la descripción del ejercicio. - Asegúrate de que
etl_taskutiliza la llamadaetl. - Establece la dependencia upstream correcta. Ten en cuenta que
etl_taskdebe esperar a que terminewait_for_table. - El código de ejemplo contiene una ejecución de muestra. Esto significa que se ejecuta la canalización ETL cuando tú ejecutas el código.
ejercicio interactivo práctico
Prueba este ejercicio completando este código de ejemplo.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()