Get startedGet started for free

State API

1. State API

Israel: Hello, my name is Israel Herraiz, and I work as a strategic cloud engineer at Google. In this video, you would learn about state and timers, two powerful features that you can use in your DoFn to implement stateful transformations. Let's start learning about the state API. Apache Beam pipelines can aggregate data with two main types of transformers, using GroupByKey or Combine. ParDos cannot do aggregations. This is because normally ParDos' states transform. You can map from one to zero, one to one, or one to many elements, but you cannot aggregate them together. Or can you? For instance, if you want to count the number of messages you have seen per key, is it possible to do that with a ParDo? Apache Beam comes with additional possibilities for ParDos: stateful transformations. You can have a state variables that can be reused across elements to do any kind of calculation that requires accumulating a state from several different messages. With stateful ParDos, there are many aggregations that can be implemented without having to use a combiner or a GroupByKey. State is maintained per key. So the input should be part of key values. For streaming pipelines, the state is also maintained per window. The state can be read and mutated during the processing of each one of the elements. The state is local to each one of the transformers. For instance, two different keys process, and two different workers are not able to share a common state, but all elements in the same key can share a common state. In this ParDo, we are receiving squares and we are doing a element by element transform to circles. We are also calculating the total area of the square scene so far. To do this calculation, we are accumulating the partial results in a state variable. The state variables are parsed through the process element as any other variable and can be safely mutated from within the process element method. You might be wondering that if you create your own DoFn class, why not using class members assisted variables? In case of reprocessing for instance, because of errors, dataflow takes care of making sure that the mutation of the state is consistent and safe. It will discount any mutation that has not been fully processed. If you use normal class members for keeping that state, you would have to implement that logic to discount that and ensure that your state mutation is safe and consistent. And that's actually very complex. Finally, in step three, after reaching the limit, the state values read will produce an output circles using the accumulated state value triangles. We could have just produced the state values as output, but in general we can produce any output that requires that value. In this example, the pipeline is calling an external service to enrich the data that is being processed. If you are running a large pipeline in data flow, accessing an external service for every element that you are processing in the ParDo can be problematic. You would be making millions of calls per second from hundreds of different workers. It is very easy to overwhelm an external service in that situation. How can you overcome this problem? The state variables allow you to batch the request by accumulating elements in a buffer and making batch calls to the external services, that, for example, you can make a call every 10 messages. Let's see the code for this example. This example is in Python. There are two state variables: buffer state and count state. These are passed through the process method of your DoFn as additional input arguments. In the buffer state, the DoFn is adding new elements right after the DoFn increases the count. When the count surpasses the max buffer size, the call to the external service is made. In this example, we have omitted this call for simplicity. It is important to remember to clear the state once it has been used. The DoFn will not finish entirely unless the whole state has been clear. This is the same example, but in Java. In Java, you need to annotate the state variables using state ID. And they are also passed as additional input arguments to the process method of the DoFn. Again, we have omitted the actual call to the external service for the purpose of simplicity. Now, both these code and the Python version shown in the previous slide have a problem. For the last message that we receive for the last messages that we receive, what if the buffer does not reach this max buffer size? The DoFn would keep that state indefinitely. And the DoFn would never finish. How can you solve that problem? Let's introduce the concept of timers.

2. Let's practice!

Create Your Free Account

or

By continuing, you accept our Terms of Use, our Privacy Policy and that your data is stored in the USA.