1. Introduction to Data streams
Welcome back.
In this lesson, we'll learn how to subscribe to datastreams using MQTT.
2. What is a Data Stream
Data streams are constant streams of data.
Examples of data streams include, but are not limited to Twitter messages, online News Articles, Video streams such as this video, sensor data or IoT, and financial Market orders.
This course will focus on
3. What is a Data Stream
sensor data from IoT devices.
4. MQTT
We mentioned MQTT a couple times already. But what is it exactly? MQTT, or Message Queuing Telemetry Transport, is a publish/subscribe messaging protocol, designed to have a small footprint, making it ideal for IoT use cases.
An MQTT system consists of clients connecting to a server, also called a Broker.
These clients can publish (or produce) data to a topic, and subscribe (or consume) data from one or multiple topics.
A topic is a string that the Broker uses to filter messages for each connected client.
Each topic can have multiple clients producing or consuming the same topic.
During this course, we'll only act as a client subscribing to predefined topics, assuming a producer is available and producing data.
5. Python library
We'll be using the Eclipse Paho MQTT Python client to connect to MQTT Brokers.
The Python modules are available in the package paho.mqtt.
More information and the documentation for the library is available under the projects Github page.
6. Single message
In the simplest case, we only want one message from the Broker.
This could be the case for a webpage showing the latest value available.
We can use subscribe.simple() for this, which takes the topic as the first argument, and the broker's hostname as the second argument and will return a message object.
We can then print the message topic, as well as the message payload.
More often, we'll want to receive a constant stream of data.
7. Callback
We're going to use a concept named callback to subscribe to a topic. This is a very efficient way to wait for things to happen on the other end.
We do this by defining a callback function, which will be called whenever a new message is available.
We are free to select any function name. A recommended name is "on_message", and we'll stick to that.
The header for a function used as the callback is defined by the paho.mqtt library and needs to have the arguments
client, the client instance for this callback
userdata, private user data set when creating the instance
message, an instance of MQTTMessage, which is a class with the properties topic, payload, qos, and retain.
All arguments are mandatory even if we don't use them, since the library calls the function with exactly these arguments.
Within the function, we'll print the topic of the message and the message payload, separated by a colon.
8. Callback
We now need to connect our function to an MQTT topic.
We'll import the module "paho.mqtt.subscribe" and use subscribe.callback, with
the function to be called as the first argument,
the topic to subscribe ("datacamp/roomtemp") as the 2nd argument
and the server or broker as keyword argument `hostname`
to connect the function to the topic.
The library will then connect to the MQTT broker, and call our function each time a new message is available.
9. MQTT Subscribe
Let's have a final look at the complete code.
First, we import paho.mqtt.suscribe as subscribe.
Next, we define our function on_message and then pass this function as the first argument to subscribe.callback, the topic name, "datacamp/roomtemp", as the second argument, and the hostname for the mqtt server as keyword argument "hostname".
Once we run our program, we'll see the following output.
The topic on the left of the colon, as well as the message payload on the right, which, in this case, is a json object containing the time, temperature and humidity from an environmental sensor.
10. Let's practice!
And now, let's try this out. Enjoy!