ComeçarComece de graça

Definição de um DAG

Nos exercícios anteriores, você aplicou as três etapas do processo ETL:

  • Extrair: Extraia a tabela film PostgreSQL para pandas.
  • Transformar: Divida a coluna rental_rate do DataFrame film.
  • Carregar: Carregue o film DataFrame em um data warehouse do PostgreSQL.

As funções extract_film_to_pandas(), transform_rental_rate() e load_dataframe_to_film() estão definidas em seu espaço de trabalho. Neste exercício, você adicionará uma tarefa ETL a um DAG existente. O DAG a ser estendido e a tarefa a ser aguardada são definidos em seu espaço de trabalho como dag e wait_for_table, respectivamente.

Este exercício faz parte do curso

Introdução à engenharia de dados

Ver curso

Instruções do exercício

  • Complete a função etl() usando as funções definidas na descrição do exercício.
  • Certifique-se de que o site etl_task use o callable etl.
  • Configure a dependência correta do upstream. Observe que etl_task deve aguardar a conclusão de wait_for_table.
  • O código de amostra contém uma execução de amostra. Isso significa que o pipeline ETL é executado quando você executa o código.

Exercício interativo prático

Experimente este exercício completando este código de exemplo.

# Define the ETL function
def etl():
    film_df = ____()
    film_df = ____(____)
    ____(____)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=____,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()
Editar e executar o código