ComeçarComece de graça

Definindo um DAG

Nos exercícios anteriores, você aplicou as três etapas do processo de ETL:

  • Extract: extrair a tabela film do PostgreSQL para o pandas.
  • Transform: dividir a coluna rental_rate do DataFrame film.
  • Load: carregar o DataFrame film em um data warehouse PostgreSQL.

As funções extract_film_to_pandas(), transform_rental_rate() e load_dataframe_to_film() estão definidas no seu ambiente de trabalho. Neste exercício, você vai adicionar uma tarefa de ETL a um DAG existente. O DAG a ser estendido e a tarefa a aguardar estão definidos no seu ambiente como dag e wait_for_table, respectivamente.

Este exercício faz parte do curso

Introdução à Engenharia de Dados

Ver curso

Instruções do exercício

  • Complete a função etl() usando as funções definidas na descrição do exercício.
  • Garanta que etl_task use o callable etl.
  • Configure a dependência upstream correta. Note que etl_task deve aguardar a conclusão de wait_for_table.
  • O código de exemplo inclui uma execução de teste. Isso significa que o pipeline de ETL será executado quando você rodar o código.

Exercício interativo prático

Experimente este exercício completando este código de exemplo.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Editar e executar o código