Get startedGet started for free

Putting it all together

1. Putting it all together

So we've now covered the full extent of an ETL pipeline. We've extracted data from databases, transformed the data to fit our needs, and loaded them back into a database, the data warehouse. This kind of batched ETL needs to run at a specific moment, and maybe after we completed some other tasks. It's time to put everything together.

2. The ETL function

First of all, it's nice to have your ETL behavior encapsulated into a clean `etl()` function. Let's say we have a `extract_table_to_df()` function, which extracts a PostgreSQL table into a pandas DataFrame. Then we could have one or many transformation functions that takes a pandas DataFrame and transform it by putting the data in a more suitable format for analysis. This function could be called `split_columns_transform()`, for example. Last but not least, a `load_df_into_dwh()` function loads the transformed data into a PostgreSQL database. We can define the resulting `etl()` function as follows. The result of `extract_table_to_df()` is used as an input for the transform function. We then use the output of the transform as input for `load_df_into_dwh`.

3. Airflow refresher

Now that we have a python function that describes the full ETL, we need to make sure that this function runs at a specific time. Before we go into the specifics, let's look at a small recap of Airflow. Apache Airflow is a workflow scheduler written in Python. You can represent directed acyclic graphs in Python objects. DAGs lend themselves perfectly to manage workflows, as there can be a dependency relation between tasks in the DAG. An operator represents a unit of work in Airflow, and Airflow has many of them built-in. As we saw earlier, you can use a BashOperator to run a bash script, for example. There are plenty of other operators as well. Alternatively, you can write a custom operator.

4. Scheduling with DAGs in Airflow

So the first thing we need to do is to create the DAG itself. In this code sample, we keep it simple and create a DAG object with id 'sample'. The second argument is `schedule_interval` and it defines when the DAG needs to run. There are multiple ways of defining the interval, but the most common one is using a cron expression. That is a string which represents a set of times. It's a string containing 5 characters, separated by a space. The leftmost character describes minutes, then hours, day of the month, month, and lastly, day of the week. Going into detail would drive us too far, but there are several great resources to learn cron expressions online, for example, the website: https://crontab.guru. The DAG in the code sample run every 0th minute of the hour.

5. The DAG definition file

Having created the DAG, it's time to set the ETL into motion. The etl() function we defined earlier is a Python function, so it makes sense to use the PythonOperator function from the python_operator submodule of airflow. Looking at the documentation of the PythonOperator function, we can see that it expects a Python callable. In our case, this is the `etl()` function we defined before. It also expects two other parameters we're going to pass: `task_id` and `dag`. These are parameters which are standard for all operators. They define the identifier of this task, and the DAG it belongs to. We fill in the DAG we created earlier as a source. We can now set upstream or downstream dependencies between tasks using the `.set_upstream()` or `.set_downstream()` methods. By using `.set_upstream` in the example, `etl_task` will run after `wait_for_this_task` is completed.

6. The DAG definition file

Once you have this DAG definition and some tasks that relate to it, you can write it into a python file and place it in the DAG folder of Airflow. The service detects DAG and shows it in the interface.

7. Airflow UI

The Airflow UI will look something like on the following screenshot. Note the task id and schedule interval in the interface.

8. Let's practice!

That's all for now. Let's do some exercises.