A transformational Lambda
1. A transformational Lambda
Welcome to Chapter 3!2. In chapter 1...
In chapter 1, we used Firehose to collect data from multiple sources and store it.3. In chapter 2...
In chapter 2, we used Lambda to act on that data.4. In this chapter...
Now, it's time to work with data as it's moving through the firehose stream.5. Previous approach
In chapters 1 and 2, we built a lambda that reads data from S3 after it's been written by Firehose.6. Processing once in S3 vs Lambda transform
We are able to accomplish a lot that way, but this approach isn't always the best. If we process once the data hits S3, we are using a Lambda fired on object write. A lambda transform manipulates data mid-firehose-stream. That means in S3, it takes a bit longer to start processing the data than with a lambda transform. With S3, we have to store the raw data before processing it, whereas with Lambda transform we can transform data before it's stored. Finally, using a lambda transform allows for multiple destinations for your Firehose stream. Not just S3. As a data engineer, you'll have to decide which approach to use when, but both are equally important. In this lesson we'll learn how to transform data as it moves through a Firehose stream, before it hits S3.7. Incoming data
The data we receive through the Firehose contains a timestamp column. This column only has a time - but no date. Remember that VIN is our vehicle ID.8. Transformational Lambda
We will create a lambda function that generates a date/time string as the data moves through the Firehose.9. Sample event
Firehose transformational lambdas accept data in a specific format, and return it in a specific format. Each time our lambda is invoked it will receive a list of records to process. Each record has an ID, an arrivalTimeStamp, and the data - encoded in base64.10. Base64
Base64 converts any kind of data (images, strings, emoji) into letters and numbers - because letters and numbers are super easy to transfer over the web. In firehose_put_record(), boto3 takes care of the encoding for us. But in a transformational lambda, we have to do it ourselves.11. Processing the data
To process the records, we import the base64 and the datetime modules. We define a convert_timestamp() function to convert a time to a datetime string. We get today's date as a string, combine the time from the record with today's date, convert it to a datetime object, and return it as a datetime string.12. Processing the data
In our lambda_handler(), we create a list to hold the processed records. For each record, we decode the data payload using b64decode(), getting our data in bytes. We use the string object's decode() method to get a regular string. Next, we split the payload string to get a list of elements. We use the timestamp_converter() function we wrote earlier to convert the timestamp - the second element - into a date/time stamp.13. Processing the data
Next, we convert our payload list back to a string, encode it back to bytes, and then to base64. We append a dictionary to our output list. The dictionary contains a recordID key - the original ID of the record from the stream; a result key: Ok; and the data key - with our base64 encoded modified payload. Once we have processed all records that came in, we return a dictionary with a "records" key, containing the list of modified payloads.14. A review
The key components to remember are: iterate over the records, decode the payload, modify it, re-encode it, put it into a dictionary with a recordId, result, and data, and return a dictionary containing the result under the records key.15. Creating Lambda in AWS
Now we have the code and the test event. Let's create the Lambda in AWS!16. Script for create lambda
First, we head over to the AWS console, then Lambda, then we create a new function. Let's call it timeStampTransformer. Next let's add a test event that contains two records. We can use the kinesis-kinesis-firehose as a template. Then, we add in the code for the lambda function. We also need to update the timeout to be over one minute. That's a requirement for firehose transformational lambdas. Now, we test the function. All good Next, let's make a few tweaks to our stream. First, we have to give the Firehose role permission to execute our Lambda. We do this by granting it the AwsLambdaFullAccess policy. Next, under transform source records, we select our lambda function, and we're good to go! Now we can start writing to the Firehose stream. When we navigate to see the incoming data in S3, we see that Firehose has written our records with the full datetime string!17. Let's practice!
Create Your Free Account
or
By continuing, you accept our Terms of Use, our Privacy Policy and that your data is stored in the USA.