Definindo um DAG
Nos exercícios anteriores, você aplicou as três etapas do processo de ETL:
- Extract: extrair a tabela
filmdo PostgreSQL para opandas. - Transform: dividir a coluna
rental_ratedo DataFramefilm. - Load: carregar o DataFrame
filmem um data warehouse PostgreSQL.
As funções extract_film_to_pandas(), transform_rental_rate() e load_dataframe_to_film() estão definidas no seu ambiente de trabalho. Neste exercício, você vai adicionar uma tarefa de ETL a um DAG existente. O DAG a ser estendido e a tarefa a aguardar estão definidos no seu ambiente como dag e wait_for_table, respectivamente.
Este exercício faz parte do curso
Introdução à Engenharia de Dados
Instruções do exercício
- Complete a função
etl()usando as funções definidas na descrição do exercício. - Garanta que
etl_taskuse o callableetl. - Configure a dependência upstream correta. Note que
etl_taskdeve aguardar a conclusão dewait_for_table. - O código de exemplo inclui uma execução de teste. Isso significa que o pipeline de ETL será executado quando você rodar o código.
Exercício interativo prático
Experimente este exercício completando este código de exemplo.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()