Orchestration with DAGs
1. Orchestration with DAGs
As your pipelines grow in scope, the chances that you'll need to increase the overall amount of automation will likely increase. This means there's a good chance you'll eventually end up with quite a number of tasks. When there are numerous tasks, it's extremely important to start thinking about how these tasks are coordinated and scheduled in relation to one another. This is important because you wouldn't want, for example, for a task to accidentally perform some operation at the wrong time and after other logic in your pipeline. These types of uncoordinated operations could seriously impact the reliability and integrity of your pipeline, among other things. So, how do you get around this? Well, if you know there are tasks that have dependencies on other tasks, you can link these tasks and specify the order in which the tasks should execute. This helps you keep your operations in sync, all happening in an intentional fashion. After all, this is a module on orchestration. We've already run across one such example. Like, maybe the automated transformations you're performing depend on an automated ingestion process. And those transformations can only happen after new data has been loaded into your account. And maybe this is true for multiple tables and multiple sources of data. In a scenario like this, it makes a lot of sense to link tasks together to coordinate these events. Some fancy terms for the linking of tasks include task graph or directed acyclical graph, also known as DAG. You might hear me use these terms interchangeably. Let's go ahead and create a DAG in Snowflake using SQL. We'll link together a couple of tasks in our pipeline. And once again, if you want to do this in Python, you absolutely can using Snowflake's Python APIs. Now's a good time to pause the video if you need to log into your Snowflake account. Our email analysts want a daily email that looks at a rolling window of time for sales in Hamburg. They specifically want to look at the last seven days of sales on a daily basis. Let's create a stored procedure that sends an email with the seven most recent entries in the daily sales Hamburg T table. We'll automate the stored procedure with a task and make sure it executes after our task that calls the orders header Sprock stored procedure. Navigate to the DAG email integration SQL file in the Module 5 folder. Copy its contents. Open a new SQL worksheet and paste the contents in there. Let's take a look at what we have here. First, we create an email notification integration. This object allows us to send email notifications to the recipient specified in the allowed recipients parameter. If you want to see this email integration in action, you should add your email address here. Next, we'll create our stored procedure to send the sales report. I write the stored procedure in Python and use Snowpark for Python to retrieve the last seven days of data in the daily sales Hamburg T table. I then format the email content and then send the email by invoking the system function send email. I make sure to specify the email notification integration object and the email address for the recipient, an email subject line, and the email content. This is another place where you should insert your email address if you want to see the email come into your inbox. I then define a new task called send last seven days report. It calls the stored procedure above. But the important thing is the after keyword here, which specifies that this task should occur after the process orders header Sprock task. This bit of SQL effectively chains together these two tasks, making process ordered header Sprock a dependency for the weekly sales report task. This chaining creates a DAG. Run everything up to and including this block of SQL. OK, great. Now let's start the tasks. I'm going to start the weekly sales report task first using alter task, name of task, and then resume. I'll do the same for the root task process order header Sprock, and I'll then start the overall DAG by executing the process order header Sprock task directly. Execute task manually triggers the execution of the specified task immediately. This is typically used when you want to run a task outside of its normal schedule for testing purposes, as an example. OK, we should be receiving an email with the information specified in the email integration. Let's check. There it is. The email has a title of weekly sales report for Hamburg and has this nice table showing the seven most recent entries to the daily sales Hamburg T table. To send this email at a particular cadence, you would modify the schedule for the process orders header Sprock task. For now, we're executing it immediately for the purposes of showing off the email and the DAG. Let's quickly navigate to the tasks section in the raw POS schema. Click on the send last seven days report task. Click on graph above. Snowflake provides us with this neat graph view of the chain tasks. This is incredibly useful if you want a visual representation of task dependencies. In practice, you can imagine having tens, hundreds of tasks, and this part of the UI would do a great job of helping you get oriented. If you're trying to understand how certain tasks are coordinated on the right, you can take a look at some other details, too. If you wanted to, you could kick off the DAG from here as well, but we won't do that for now. OK, great. You just created a DAG. Let's not forget to stop the tasks. Go back to the worksheet and run the final two lines. Run them in sequential order. The entire DAG should now be suspended. OK, there you have it. You sequence tasks and link them together to form a DAG. You also learned how to kick off the entire sequence of events by kicking off the root task. What I love about DAGs is that beyond a single sequence of events, you can imagine how you might scale this to parallelize multiple sequence of events that aren't dependent on each other. Perhaps you have tens or hundreds of different processes all happening within your data pipeline, and they aren't necessarily blocking each other. You can use DAGs to parallelize these sequences and optimize your overall execution time. And finally, an incredibly helpful thing about DAGs is that if something goes wrong, the nature of the graph makes it so that you can easily spot where in the sequence an error occurred, helping you debug faster. OK, let's recap what you learned. You learned that you can chain tasks using the after keyword in a task's definition. You learned that a sequence of two or more chain tasks constitutes a directed acyclical graph, also called a DAG or a task graph. And you learned how to define a DAG using SQL in Snowflake. Coming up, let's recap all that you learned in this module.2. Let's practice!
Create Your Free Account
or
By continuing, you accept our Terms of Use, our Privacy Policy and that your data is stored in the USA.