Redis Streams, Consumers, and Blocking Operations
Redis introduced the streams that mimic the log data structure with version 5.0. Stream is an append-only data structure with a richer set of operations than in a log file. It is one of the most complex data types in Redis since it implements additional blocking operations which allow the clients to wait for the new stream data. This is somewhat similar to the behavior of Redis Pub/Sub or blocking lists but fundamental differences are there when it comes to how the consumers consume the Redis stream data.
As shown in the previous illustration, several advantages over Redis Pub/Sub and blocking lists can be seen. Every new data item is delivered to every consumer. Unlike in the lists removing the list item whenever called to BLPOP or BRPOP, the stream items remain as it is in the stream. XREAD command operates as a blocking and non-blocking candidate on the Redis streams.
The XREAD command can fetch the entries from multiple streams simultaneously while the returned entries have an ID bigger than the last received ID for a given consumer. It can operate in both the blocking and non-blocking manner. In the non-blocking nature, the command behaves very similar to the XRANGE command but with some additional features listed in the following:
- It can fetch the entries starting from the most recent entry that has the greatest ID than any other item in the stream.
- It can read from multiple streams at the same time.
This command has a linear time complexity when the N number of elements are stored in the stream. Hence, with a fixed return count, the time complexity is constant.
The XREAD command follows the following syntax:
XREAD [COUNT number_of_returned_elements] [BLOCK blocking_time_in_milliseconds] STREAMS key [key …] id [id…]
COUNT <number_of_returned_elements>: The number of elements to be returned by the command. It limits the returned rows to a specified number.
BLOCK <blocking_time_in_milliseconds>: The maximum time to wait for a new item to appear in the stream.
The previous two options are optional to the command.
STREAMS <key>: The key of the stream. This is a mandatory option and must be the last option in the command since it accepts the variable length of keys and entry IDs.
<id>: The ID of the stream entry.
Multiple keys can be specified since the command allows you to read from more than one stream. At the same time, multiple IDs can be provided.
This command returns an array reply. Each array item consists of two elements as shown in the following format:
Example 1: Inspect the Weather Data for Two Locations with Non-Blocking XREAD
Let’s assume that we got two streams containing the weather data for LA and NYC. In our weather data publication site, we need to consume from both streams and fetch the latest weather data for these two locations. The XREAD command is the ideal candidate to use in this scenario with its non-blocking variant.
It’s time to create two streams named weather:nyc and weather:la and populate a couple of entries with some field-value pairs as shown in the following:
xadd weather:la * wind 12 humidity 45 temp 22
Both the streams weather:nyc and weather:lc are created successfully and the returned entry IDs are 1658114094434-0 and 1658114110474-0, respectively.
Let’s use the XREAD command to read from both streams at the same time in a non-blocking manner.
xread streams weather:nyc weather:la 0 0
As expected, the output contains the entries from both streams with the ID sequence starting from 0. It is acceptable to specify the incomplete IDs as previously illustrated where both the IDs are 0 which is the millisecond timestamp without the sequence number part. Hence, the previous command can be written as in the following:
xread streams weather:nyc weather:la 0–0 0–0
Let’s add a couple of entries to both streams now.
xadd weather:la * wind 18 humidity 80 temp 5
Since we already have the latest entry IDs for both streams from the previous commands, let’s call the XREAD command again to fetch all the entries with bigger IDs than the ones that we already queried.
xread streams weather:nyc weather:la 1658114094434–0 1658114110474–0
As you could see, the specified IDs are from the previous query. Now, the command call returns all the entries which have greater IDs than the specified ones.
As you can see, the newly added entries are returned from the previous command. Next, what you can do is take the entry IDs returned from the previous command and call the XREAD with those IDs until the returned array is empty.
Example 2: Get the Latest Pizza Promos with Blocking XREAD
There is another variant of the XREAD command which can be used to wait until the publishers publish a new data to the stream without terminating immediately as a non-blocking call. Let’s assume a scenario where the pizza guys want to push the notifications to all the customers regarding the latest promos available. There might be no promos on certain days. Hence, the customers should wait until the new promos are available. It can be achieved with the XREAD command with the block option in place.
Let’s assume that the pizza company is publishing the promo details to a stream called pizzapromos:daily. Hence, we can use the XREAD command to wait until a new promo item is added to the stream.
xread block 50000 streams pizzapromosnew:daily $
In this case, we specify the entry ID as $ which is interpreted as the top entry ID. Hence, the command will query only the new entries added to the stream and not the historical entries.
Since we have not added new entries to the stream, it will timeout after 50000 milliseconds with a nil return as shown in the following:
Now, let’s add an entry to the stream using the XADD while another consumer is waiting for the data with the XREAD command as shown in the following:
As expected, the added entry is consumed by the consumer immediately. From the next call, we need to make sure that we pass the ID that is returned from this command and not the $. If not, we will miss the entries added in between.
If multiple clients are waiting for the same stream, the newly added data is pushed to all of them immediately. The XREAD command is a very useful and recommended command to use in blocking the nature applications.
To summarize, the XREAD command is one of the widely used commands that operate on the Redis streams. It can operate in both blocking and non-blocking ways. As discussed, the non-blocking variant is very much similar to the XRANGE command with a couple of differences. In addition, this command can be used with the block option to wait until the publishers publish a new data to the stream. Overall, the XREAD command is specialized in consuming the data from multiple streams simultaneously. It is a helpful feature that the modern-day applications are looking for.