1. Gather minimalistic incremental data
You did well on those exercises!
2. What is caching?
As we have seen in previous lessons it's quite simple to gather data from a data stream.
We can save the data to disk once we are done with the data collection, or observation by observation.
Most of the time, the first option is not possible, since the data stream is continuous, and we cannot keep unlimited data in memory.
The second option can create high load on the disk, especially when dealing with high-frequency data with multiple thousand data points per second.
A common method to deal with this is to keep a few observations in memory and only save to disk every X observations. This method is called caching. It keeps a few records in memory and saves the whole cache to disk every X observations.
The cache acts as a buffer between the data stream and the disk. We can use this to avoid bottlenecks and bundle operations into batches to improve efficiency.
3. Caching
We can implement caching into our data stream as follows.
We first define our cache variable as a list.
Within the callback function, we append each new data point to this list.
We'll then check if the length of the cache is bigger than the maximum cache size.
If that's the case, we open the file "data.txt" as f in append mode and write the list to the file using writelines() on the file object.
Lastly, we need to connect our function to the MQTT topic, as we've also done in previous lessons.
4. Simplistic datastreams
While collecting data from data streams, we often don't have much choice on the data format used, so we need to adapt to different message types. Until now, we've seen full JSON messages containing all information necessary for further analysis.
Yet, sometimes messages are minimalistic and only consist of a device or location and the measurement.
Since the measurements don't contain a timestamp, we need to add one from the context.
5. Observation Timestamp
We should aim for the timestamp as close to data acquisition as possible.
If the message payload contains a timestamp, we should prefer this because the producer is as close as possible to the measurement.
In case this is not available, we need to fall back to when the broker received the message, also called the publication timestamp. This is available as message.timestamp.
The last fallback should be to use the current timestamp of the data collection.
This can be problematic since we could have a delay of many minutes, or even hours, before consuming the message.
6. Observation Timestamp
The timestamp of the MQTTMessage can be accessed by using message.timestamp. Remember that message is the 3rd argument of the callback function header.
The time of consumption can also be added by using datetime.utcnow().
This will store the time, in UTC timezone, of when the message was consumed.
This could then be used to analyze message-delays, which can be important for real-time applications.
7. pd.to_datetime()
Once we have the data in pandas, we can see a long number instead of the timestamp.
This format is called Unix timestamp, and is defined by the seconds since January, 1st 1970.
Pandas can understand this format so we can use pd.to_datetime() to convert epoch to a datetime series.
Since our records are in milliseconds, we need to specify unit="ms" to let pandas know about this, otherwise our times would be in the 1970s, since unit defaults to nano-seconds. The column is now of type datetime64.
8. Let's practice!
And now, it's your turn!