Get startedGet started for free

Airflow

1. Airflow

Once we have a clear pipeline design, the next step is building the pipeline.

2. Airflow pipeline

We'll use Apache Airflow to illustrate this process. The principles should be applicable to other orchestration tools. Apache Airflow, or Airflow, is a Pythonic framework for orchestrating workflows and tasks. It is commonly used for data automation and ETL tasks.

3. Build supporting functions

Here are the steps for setting up the pipeline. We start by building supporting functions using two types of modules. The first type includes supporting modules: one for the ETL process, another for the forecast process, and a third for general supporting functions. The second type is the abstract layer, containing the DAG callable functions. For example, the following function represents the pipeline's first task - checking if new data is available on the API.

4. Setting the pipeline

Next, we will set the DAG locally and test its functionality. Once it is ready, we will deploy it into production.

5. Setting the DAG

Here is our DAG for the data and forecast refresh.

6. Testing the DAG

We test the DAG and can see a report within the Airflow UI.

7. Building a prototype

Here are some best practices for building an Airflow pipeline. Use a prototype to develop the pipeline components. I recommend setting up a prototype inside a notebook, as it's easier and faster to test and debug pipeline components than inside a DAG.

8. Set the DAG structure

Once the pipeline components are ready, set the DAG skeleton,

9. Add the components

and gradually onboard the DAG's different components. This enables us to test the pipeline functionality.

10. Simulate scenario

Stress test the pipeline functionality by simulating different scenarios. I recommend setting up a script with those tests so that you can seamlessly run them whenever you make changes in the code.

11. Clear messages

Last but not least, it is helpful to set print messages with some context during the function runtime. This simplifies reading logs when debugging is needed.

12. Airflow operators

Airflow's DAG is constructed using operators. Various operators exist for different use cases, such as the Docker operator for executing tasks within containers and the Bash operator for executing Bash scripts.

13. Airflow PythonOperator

We will use the Python operator

14. Airflow BranchPythonOperator

and the branch Python operator. As the name implies, both operators enable the execution of Python code.

15. Airflow BranchPythonOperator

The branch operator enables the dynamic selection of different paths based on the task output.

16. Importing modules

We will start by importing the Airflow modules, the pipeline supporting modules, and other supporting libraries.

17. Setting DAG default arguments

Next, we will define the DAG default arguments,

18. Setting the DAG

and then define the dag using the `DAG` function. We use the Python operator to define the first task, which checks if new data is available on the API. We set the task ID as check_api, define the Python function that checks the API metadata, and set the function input arguments.

19. Setting the DAG

Similarly, we will add the remaining pipeline components, including `check_status`, `data_refresh`, and `no_updates`.

20. Setting the DAG

Lastly, we define the order of execution of the DAG's tasks using the greater-than operation and the task IDs. Note that we use the square brackets to define the paths from a branch operator.

21. Setting the DAG

We can use the Airflow UI to review the DAG's runtime results and logs.

22. Let's practice!

Now it's your turn to set up a DAG for your data and forecast pipelines!

Create Your Free Account

or

By continuing, you accept our Terms of Use, our Privacy Policy and that your data is stored in the USA.