Definição de um DAG
Nos exercícios anteriores, você aplicou as três etapas do processo ETL:
- Extrair: Extraia a tabela
film
PostgreSQL parapandas
. - Transformar: Divida a coluna
rental_rate
do DataFramefilm
. - Carregar: Carregue o
film
DataFrame em um data warehouse do PostgreSQL.
As funções extract_film_to_pandas()
, transform_rental_rate()
e load_dataframe_to_film()
estão definidas em seu espaço de trabalho. Neste exercício, você adicionará uma tarefa ETL a um DAG existente. O DAG a ser estendido e a tarefa a ser aguardada são definidos em seu espaço de trabalho como dag
e wait_for_table
, respectivamente.
Este exercício faz parte do curso
Introdução à engenharia de dados
Instruções do exercício
- Complete a função
etl()
usando as funções definidas na descrição do exercício. - Certifique-se de que o site
etl_task
use o callableetl
. - Configure a dependência correta do upstream. Observe que
etl_task
deve aguardar a conclusão dewait_for_table
. - O código de amostra contém uma execução de amostra. Isso significa que o pipeline ETL é executado quando você executa o código.
Exercício interativo prático
Experimente este exercício completando este código de exemplo.
# Define the ETL function
def etl():
film_df = ____()
film_df = ____(____)
____(____)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=____,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.____(wait_for_table)
etl()