Get Started

Introduction to Data streams

1. Introduction to Data streams

Welcome back. In this lesson, we'll learn how to subscribe to datastreams using MQTT.

2. What is a Data Stream

Data streams are constant streams of data. Examples of data streams include, but are not limited to Twitter messages, online News Articles, Video streams such as this video, sensor data or IoT, and financial Market orders. This course will focus on

3. What is a Data Stream

sensor data from IoT devices.

4. MQTT

We mentioned MQTT a couple times already. But what is it exactly? MQTT, or Message Queuing Telemetry Transport, is a publish/subscribe messaging protocol, designed to have a small footprint, making it ideal for IoT use cases. An MQTT system consists of clients connecting to a server, also called a Broker. These clients can publish (or produce) data to a topic, and subscribe (or consume) data from one or multiple topics. A topic is a string that the Broker uses to filter messages for each connected client. Each topic can have multiple clients producing or consuming the same topic. During this course, we'll only act as a client subscribing to predefined topics, assuming a producer is available and producing data.

5. Python library

We'll be using the Eclipse Paho MQTT Python client to connect to MQTT Brokers. The Python modules are available in the package paho.mqtt. More information and the documentation for the library is available under the projects Github page.

6. Single message

In the simplest case, we only want one message from the Broker. This could be the case for a webpage showing the latest value available. We can use subscribe.simple() for this, which takes the topic as the first argument, and the broker's hostname as the second argument and will return a message object. We can then print the message topic, as well as the message payload. More often, we'll want to receive a constant stream of data.

7. Callback

We're going to use a concept named callback to subscribe to a topic. This is a very efficient way to wait for things to happen on the other end. We do this by defining a callback function, which will be called whenever a new message is available. We are free to select any function name. A recommended name is "on_message", and we'll stick to that. The header for a function used as the callback is defined by the paho.mqtt library and needs to have the arguments client, the client instance for this callback userdata, private user data set when creating the instance message, an instance of MQTTMessage, which is a class with the properties topic, payload, qos, and retain. All arguments are mandatory even if we don't use them, since the library calls the function with exactly these arguments. Within the function, we'll print the topic of the message and the message payload, separated by a colon.

8. Callback

We now need to connect our function to an MQTT topic. We'll import the module "paho.mqtt.subscribe" and use subscribe.callback, with the function to be called as the first argument, the topic to subscribe ("datacamp/roomtemp") as the 2nd argument and the server or broker as keyword argument `hostname` to connect the function to the topic. The library will then connect to the MQTT broker, and call our function each time a new message is available.

9. MQTT Subscribe

Let's have a final look at the complete code. First, we import paho.mqtt.suscribe as subscribe. Next, we define our function on_message and then pass this function as the first argument to subscribe.callback, the topic name, "datacamp/roomtemp", as the second argument, and the hostname for the mqtt server as keyword argument "hostname". Once we run our program, we'll see the following output. The topic on the left of the colon, as well as the message payload on the right, which, in this case, is a json object containing the time, temperature and humidity from an environmental sensor.

10. Let's practice!

And now, let's try this out. Enjoy!