1. Serverless data workflow
Whoo - welcome to Lesson 3!
2. A look back
In the previous lesson, we used streaming data from vehicles to notify Cody of speeders using SMS.
Along with texts, Cody wants a daily report of speeders as well.
3. Current partitioning
Firehose writes data to S3 partitioning by YEAR, then MONTH, then DATE, then HOUR.
4. Filtering out speeders
As each record comes in, we will read the data, filter out speeders, and write the resulting dataframe to a separate folder.
5. Aggregating by day
Next, we will write another lambda that will run at the end of the day. It will read all the speeder records and aggregate them to one object per day.
Cody will download that file as a report.
6. Open recordReaderS3
Open the Lambda code editor to recordReaderS3.
7. Editing recordReaderS3
Import datetime and pytz module. We will use pytz to define Cody's timezone. Dealing with time is - unfortunately - very important in data engineering.
8. Editing recordReaderS3
In the record_created_handler() function, filter the Vehicle Numbers that exceeded the speed threshold.
Use time generated in California's time zone as part of the object key in S3, and write the object.
9. Writing speeders by date and time
As vehicles send data, speeders will get filtered out into a CSV in the speeders folder.
10. speederAggregator
recordReaderS3 executes frequently - on every write to S3. It needs to be light to prevent failures.
Reading a day worth of files and aggregating them isn't light - we will make another function for this that will run at the end of the day - speederAggregator.
11. Create speederAggregator
Create a new Lambda function called speederAggregator.
12. Add AWS data wrangler layer
Add the AWS Data Wrangler layer, like we did in the previous video.
13. speederAggregator Resources
This Lambda will read multiple files, store them in memory, aggregate them and write to S3. It will take longer to execute and will need more RAM.
In the function configuration, increase the memory to 512 MB and execution timeout to 15 seconds.
Remember to be careful with these - Lambda bills by memory and execution time.
14. Create the timed trigger
We want this function to run once at the end of the day.
Add a Trigger and select EventBridge.
Create a new rule, dailyMidnight, and use a schedule expression.
We want the job to run at 6:50 AM UTC - 11:50PM in California. We specify this using a cron expression.
Until the function is ready, disable the trigger, and click Add!
15. Cron expressions
Cron is a process that runs on most systems, and is very useful for data engineering.
Cron expressions specify how often that process executes code.
Cron expressions in AWS follow this format.
Our expression specifies a run on minute 50, of 6th hour, of every day, of every month, weekdays and weekends, of every year.
16. speederAggregator callback
Now for the code. Our objects are named with the California timezone. Let's continue using it as we get today's date.
17. speederAggregator callback
We use that as prefix to list_objects_v2(). Then, as usual, we loop through the response, aggregating the CSVs into a single dataframe.
18. awswrangler package
You've seen me load multiple objects from S3 several times by now, it's very verbose.
The AWS wrangler layer we added has a module to make this easier. Let's use it to load the CSVs for the relevant date.
We import the wrangler module as wr.
Create a boto3 session using our AWS Key, Secret, and region.
We call the wr.s3.read_csv() function, passing a prefix that includes the filter date, the boto session and the delimiter.
19. Old way vs awswrangler
Compared to the old way with all the looping,
20. Old way vs awswrangler
AWSwrangler saves us quiet a few lines of code!
21. Writing aggregate speeders file
Using wrangler to write CSVs to S3 is easier as well using the to_csv() method.
22. Save and test
Save and test the function. Use the hello world sample event. We're not using any event data from the function - it just triggers by time.
23. Enable the trigger
Enable the trigger. Now, every night it will generate a report of speeders.
24. Review
Let's Review!
When firehose writes data to S3, we use the recordReaderS3 lambda to create a derivative dataset of speeders.
25. Review
We use a lambda time trigger to generate a daily report of speeders.
26. Review
To schedule the job, we use cron expressions
And we simplified reading and writing CSVs to S3 using the AWS data wrangler!
27. Let's practice!
Let's practice our new data engineering skills!