Get startedGet started for free

Serverless data workflow

1. Serverless data workflow

Whoo - welcome to Lesson 3!

2. A look back

In the previous lesson, we used streaming data from vehicles to notify Cody of speeders using SMS. Along with texts, Cody wants a daily report of speeders as well.

3. Current partitioning

Firehose writes data to S3 partitioning by YEAR, then MONTH, then DATE, then HOUR.

4. Filtering out speeders

As each record comes in, we will read the data, filter out speeders, and write the resulting dataframe to a separate folder.

5. Aggregating by day

Next, we will write another lambda that will run at the end of the day. It will read all the speeder records and aggregate them to one object per day. Cody will download that file as a report.

6. Open recordReaderS3

Open the Lambda code editor to recordReaderS3.

7. Editing recordReaderS3

Import datetime and pytz module. We will use pytz to define Cody's timezone. Dealing with time is - unfortunately - very important in data engineering.

8. Editing recordReaderS3

In the record_created_handler() function, filter the Vehicle Numbers that exceeded the speed threshold. Use time generated in California's time zone as part of the object key in S3, and write the object.

9. Writing speeders by date and time

As vehicles send data, speeders will get filtered out into a CSV in the speeders folder.

10. speederAggregator

recordReaderS3 executes frequently - on every write to S3. It needs to be light to prevent failures. Reading a day worth of files and aggregating them isn't light - we will make another function for this that will run at the end of the day - speederAggregator.

11. Create speederAggregator

Create a new Lambda function called speederAggregator.

12. Add AWS data wrangler layer

Add the AWS Data Wrangler layer, like we did in the previous video.

13. speederAggregator Resources

This Lambda will read multiple files, store them in memory, aggregate them and write to S3. It will take longer to execute and will need more RAM. In the function configuration, increase the memory to 512 MB and execution timeout to 15 seconds. Remember to be careful with these - Lambda bills by memory and execution time.

14. Create the timed trigger

We want this function to run once at the end of the day. Add a Trigger and select EventBridge. Create a new rule, dailyMidnight, and use a schedule expression. We want the job to run at 6:50 AM UTC - 11:50PM in California. We specify this using a cron expression. Until the function is ready, disable the trigger, and click Add!

15. Cron expressions

Cron is a process that runs on most systems, and is very useful for data engineering. Cron expressions specify how often that process executes code. Cron expressions in AWS follow this format. Our expression specifies a run on minute 50, of 6th hour, of every day, of every month, weekdays and weekends, of every year.

16. speederAggregator callback

Now for the code. Our objects are named with the California timezone. Let's continue using it as we get today's date.

17. speederAggregator callback

We use that as prefix to list_objects_v2(). Then, as usual, we loop through the response, aggregating the CSVs into a single dataframe.

18. awswrangler package

You've seen me load multiple objects from S3 several times by now, it's very verbose. The AWS wrangler layer we added has a module to make this easier. Let's use it to load the CSVs for the relevant date. We import the wrangler module as wr. Create a boto3 session using our AWS Key, Secret, and region. We call the wr.s3.read_csv() function, passing a prefix that includes the filter date, the boto session and the delimiter.

19. Old way vs awswrangler

Compared to the old way with all the looping,

20. Old way vs awswrangler

AWSwrangler saves us quiet a few lines of code!

21. Writing aggregate speeders file

Using wrangler to write CSVs to S3 is easier as well using the to_csv() method.

22. Save and test

Save and test the function. Use the hello world sample event. We're not using any event data from the function - it just triggers by time.

23. Enable the trigger

Enable the trigger. Now, every night it will generate a report of speeders.

24. Review

Let's Review! When firehose writes data to S3, we use the recordReaderS3 lambda to create a derivative dataset of speeders.

25. Review

We use a lambda time trigger to generate a daily report of speeders.

26. Review

To schedule the job, we use cron expressions And we simplified reading and writing CSVs to S3 using the AWS data wrangler!

27. Let's practice!

Let's practice our new data engineering skills!