Get startedGet started for free

Airflow Sensors

1. Airflow Sensors

Welcome back! Let's look at a special kind of operator called a sensor.

2. Sensors

A sensor is an operator type that waits for a certain condition to be true. Examples include waiting for the creation of a file, the upload of a database record, or a specific web response. Sensors can define how often to check for the condition(s) to be true. Since sensors are a type of operator, they are like tasks. This means we can apply the bitshift dependencies to them as well.

3. Sensor details

All sensors are derived from the airflow.sdk.BaseSensorOperator class. There are some default arguments available to all sensors, including mode, poke_interval, and timeout. The mode tells the sensor how to check for the condition and has two options, poke or reschedule. The default is poke, which continues checking until complete without giving up a worker slot. Reschedule means to give up the worker slot and wait for another slot to become available. A worker slot is the capability to run a task. The poke_interval is used in the poke mode, and defines how often to check for the condition. It should be at least 1 minute to keep from overloading the Airflow scheduler. The timeout (in seconds) defines how long before marking the sensor task as failed. Make sure the timeout is significantly shorter than the schedule interval. As sensors are operators, they also include normal operator attributes, such as task_id.

4. File sensor

A useful sensor is the FileSensor, found in the airflow.providers.standard.sensors.filesystem library. It checks for the existence of a file at a certain location in the file system or in a directory. Here we import the FileSensor object, and define a task called file_sensor_task. Note that there is not a decorator function for sensors. We must define it as a new FileSensor object and assign it to a variable name. We can then access it like we would other tasks. We set the task_id, then the filepath argument is set to salesdata.csv, looking for this filename to exist before continuing. We set the poke_interval to 300 seconds, or to check every 5 minutes until true. We also add a timeout attribute to fail the task if nothing is processed in 50 minutes. Finally, we use the bitshift syntax to define the sensor's dependencies within our Dag. In this case, we must run init_sales_cleanup, wait for the file_sensor_task to finish, then run generate_report.

5. Other sensors

There are many sensors available within Airflow. The ExternalTaskSensor waits for a task in a separate Dag to complete, allowing a simple combination of workflows. The HttpSensor requests a web URL and waits for specific content. The SqlSensor runs a SQL query to validate a response.

6. When to use sensors?

You may be wondering when to use a sensor instead of an operator or a task. First, if you're uncertain when a condition will be true. If something will complete that day but can vary by time, a sensor can be used to check. Second, sensors provide more flexibility. Instead of failing an entire Dag immediately, a sensor can continue to check for a condition and gives you more control over how your Dag is defined. Third, sensors are a good choice if you want to repeatedly run a check without adding cycles to your Dag structure.

7. Let's practice!

We've learned a lot about sensors. Let's practice!

Create Your Free Account

or

By continuing, you accept our Terms of Use, our Privacy Policy and that your data is stored in the USA.