Get startedGet started for free

Data-aware scheduling with Assets

1. Data-aware scheduling with Assets

Cron schedules are great when we know exactly when data arrives. But what if the data doesn't arrive on schedule?

2. The timing problem

Picture two pipelines. The first loads raw data from an API. The second builds a dashboard from that data. As shown in the diagram, if both run on cron, Pipeline B starts before Pipeline A finishes writing the data, so the dashboard is built from stale or incomplete data. We could add a buffer and schedule the consumer an hour later, but that's fragile. If the producer runs slow one day, the buffer isn't enough. If it runs fast, we're wasting time. What we really want is for the consumer to run when the data is actually available.

3. What is an Asset?

Assets solve this. An Asset is a reference to a piece of data, identified by a unique name. We import Asset from airflow.sdk and create one by passing a URI string. We can optionally attach a URI when the asset represents a concrete data entity, such as an S3 file or a database table. The key idea is that an Asset is a lightweight reference. Airflow does not read or write the data itself. It just tracks when the asset was last updated, along with other metadata.

4. Producers and consumers

Here is how assets connect producers and consumers. When a task completes and updates an asset, Airflow records an asset event. Other Dags can set their schedule to one or more assets. When the asset event arrives, Airflow automatically triggers those consumer Dags.

5. Producer: signaling with outlets

On the producer side, we start with the asset we defined earlier. Then we create a task and add the asset to its outlets list. The outlets parameter tells Airflow: this task produces this data. When write_sales completes successfully, Airflow records an asset event, signaling that the sales data is fresh. The producer does not need to know about any consumer. It just declares what it produces, and Airflow handles the rest.

6. Consumer: scheduling on an asset

On the consumer side, we define the same asset with the same URI. Then, instead of a cron expression, we pass the asset reference in a list to the Dag's schedule parameter. This tells Airflow: trigger this Dag automatically whenever the sales data asset is updated. There is no cron and no polling, just a direct data dependency. When the producer task completes and the asset event fires, Airflow starts the consumer Dag.

7. Conditional scheduling

We can also combine multiple assets with conditional operators. First, we define two assets: sales and inventory. The full_report Dag uses the AND operator, so it waits until both sales and inventory have been updated before triggering. Only when both asset events arrive does the Dag run. The quick_refresh Dag uses the OR operator. It triggers when any one of the assets updates. So if sales data arrives but inventory hasn't changed yet, the Dag runs anyway. This gives us fine-grained control over multi-source pipelines.

8. Verifying assets with the CLI

After running our producer Dag, we can verify the asset was registered using the Airflow CLI. The "airflow assets list" command lists all registered assets, including their names and URIs. To check when an asset was last updated, we use "airflow assets details" with the asset name. This shows metadata including the updated_at timestamp. You can also see your assets and their update events in the Airflow UI under the Assets tab. You'll use both the CLI and the UI in the upcoming exercises.

9. Let's practice!

Let's connect some pipelines with Assets.

Create Your Free Account

or

By continuing, you accept our Terms of Use, our Privacy Policy and that your data is stored in the USA.