Skip to content

Commit 29b4c61

Browse files
gerzseshacharPash
andauthored
Async blocking commands (#257)
* Add support for blocking XREAD and XREADGROUP Issues #237 and #255. * Extend unit test coverage * Async blocking commands Issues #232, #233, #234, #235, #237, #248, #249, #250, #251, #255 Add the async versions for the blocking commands: BZPOPMIN, BZPOPMAX, BZMPOP, BLMOVE, BLMPOP, BLPOP, BRPOP, BRPOPLPUSH, XREAD and XREADGROUP. --------- Co-authored-by: Gabriel Erzse <[email protected]> Co-authored-by: shacharPash <[email protected]>
1 parent 3398a8c commit 29b4c61

File tree

9 files changed

+1828
-1
lines changed

9 files changed

+1828
-1
lines changed

src/NRedisStack/CoreCommands/CoreCommandBuilder.cs

+75
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,81 @@ public static SerializedCommand BRPopLPush(RedisKey source, RedisKey destination
109109
return new SerializedCommand(RedisCoreCommands.BRPOPLPUSH, args);
110110
}
111111

112+
public static SerializedCommand XRead(RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds)
113+
{
114+
if (keys.Length == 0)
115+
{
116+
throw new ArgumentException("At least one key must be provided.");
117+
}
118+
119+
if (keys.Length != positions.Length)
120+
{
121+
throw new ArgumentException("The number of keys and positions must be the same.");
122+
}
123+
124+
List<object> args = new List<object>();
125+
126+
if (count != null)
127+
{
128+
args.Add(CoreArgs.COUNT);
129+
args.Add(count);
130+
}
131+
132+
if (timeoutMilliseconds != null)
133+
{
134+
args.Add(CoreArgs.BLOCK);
135+
args.Add(timeoutMilliseconds);
136+
}
137+
138+
args.Add(CoreArgs.STREAMS);
139+
args.AddRange(keys.Cast<object>());
140+
args.AddRange(positions.Cast<object>());
141+
142+
return new SerializedCommand(RedisCoreCommands.XREAD, args);
143+
}
144+
145+
public static SerializedCommand XReadGroup(RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds, bool? noAcknowledge)
146+
{
147+
if (keys.Length == 0)
148+
{
149+
throw new ArgumentException("At least one key must be provided.");
150+
}
151+
152+
if (keys.Length != positions.Length)
153+
{
154+
throw new ArgumentException("The number of keys and positions must be the same.");
155+
}
156+
157+
List<object> args = new List<object>();
158+
159+
args.Add(CoreArgs.GROUP);
160+
args.Add(groupName);
161+
args.Add(consumerName);
162+
163+
if (count != null)
164+
{
165+
args.Add(CoreArgs.COUNT);
166+
args.Add(count);
167+
}
168+
169+
if (timeoutMilliseconds != null)
170+
{
171+
args.Add(CoreArgs.BLOCK);
172+
args.Add(timeoutMilliseconds);
173+
}
174+
175+
if (noAcknowledge != null && noAcknowledge.Value)
176+
{
177+
args.Add(CoreArgs.NOACK);
178+
}
179+
180+
args.Add(CoreArgs.STREAMS);
181+
args.AddRange(keys.Cast<object>());
182+
args.AddRange(positions.Cast<object>());
183+
184+
return new SerializedCommand(RedisCoreCommands.XREADGROUP, args);
185+
}
186+
112187
private static SerializedCommand BlockingCommandWithKeysAndTimeout(String command, RedisKey[] keys, double timeout)
113188
{
114189
if (keys.Length == 0)

src/NRedisStack/CoreCommands/CoreCommands.cs

+111-1
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val
313313
}
314314

315315
/// <summary>
316-
/// Syntactic sugar for <see cref="BLPop(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],double)"/>,
316+
/// Syntactic sugar for <see cref="BRPop(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],double)"/>,
317317
/// where only one key is used.
318318
/// </summary>
319319
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
@@ -370,5 +370,115 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val
370370
var command = CoreCommandBuilder.BRPopLPush(source, destination, timeout);
371371
return db.Execute(command).ToRedisValue();
372372
}
373+
374+
/// <summary>
375+
/// The XREAD command.
376+
/// <para/>
377+
/// Read data from one or multiple streams, only returning entries with an ID greater than an ID provided by the caller.
378+
/// </summary>
379+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
380+
/// <param name="keys">Keys of the streams where to read from.</param>
381+
/// <param name="positions">The positions from which to begin reading for each stream. See
382+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
383+
/// <param name="count">The maximum number of messages to return from each stream.</param>
384+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
385+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
386+
/// <returns>A value of <see cref="RedisStreamEntries"/> for each stream, or <c>null</c> if the command times out
387+
/// on the server.</returns>
388+
/// <remarks>
389+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamRead(StackExchange.Redis.StreamPosition[],System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
390+
/// <para><seealso href="https://redis.io/commands/xread"/></para>
391+
/// </remarks>
392+
public static RedisStreamEntries[]? XRead(this IDatabase db, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null)
393+
{
394+
var command = CoreCommandBuilder.XRead(keys, positions, count, timeoutMilliseconds);
395+
return db.Execute(command).ToRedisStreamEntries();
396+
}
397+
398+
/// <summary>
399+
/// Syntactic sugar for <see cref="XRead(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],StackExchange.Redis.RedisValue[],System.Nullable{int},System.Nullable{int})"/>,
400+
/// where only one stream is being read from.
401+
/// </summary>
402+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
403+
/// <param name="key">Key of the stream where to read from.</param>
404+
/// <param name="position">The position from which to begin reading. See
405+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
406+
/// <param name="count">The maximum number of messages to return from each stream.</param>
407+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
408+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
409+
/// <returns>A <see cref="StreamEntry"/> list with the data read from the stream, of <c>null</c> if the command
410+
/// times out on the server.</returns>
411+
/// <remarks>
412+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamRead(StackExchange.Redis.RedisKey,StackExchange.Redis.RedisValue,System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
413+
/// <para><seealso href="https://redis.io/commands/xread"/></para>
414+
/// </remarks>
415+
public static StreamEntry[]? XRead(this IDatabase db, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null)
416+
{
417+
var result = XRead(db, new[] { key }, new[] { position }, count, timeoutMilliseconds);
418+
if (result == null || result.Length == 0)
419+
{
420+
return null;
421+
}
422+
return result[0].Entries;
423+
}
424+
425+
/// <summary>
426+
/// The XREADGROUP command.
427+
/// <para/>
428+
/// Read new or historical messages in one or several streams, for a consumer in a consumer group.
429+
/// </summary>
430+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
431+
/// <param name="groupName">The consumer group name.</param>
432+
/// <param name="consumerName">The name of the consumer in the consumer group.</param>
433+
/// <param name="keys">Keys of the streams where to read from.</param>
434+
/// <param name="positions">The positions from which to begin reading for each stream. See
435+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
436+
/// <param name="count">The maximum number of messages to return from each stream.</param>
437+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
438+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
439+
/// <param name="noAck">If set to <c>true</c> then inform the server that it should not wait for ACK for the
440+
/// messages it sends to this read call.</param>
441+
/// <returns>A value of <see cref="RedisStreamEntries"/> for each stream, or <c>null</c> if the command times out
442+
/// on the server.</returns>
443+
/// <remarks>
444+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[],StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
445+
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
446+
/// </remarks>
447+
public static RedisStreamEntries[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null)
448+
{
449+
var command = CoreCommandBuilder.XReadGroup(groupName, consumerName, keys, positions, count, timeoutMilliseconds, noAck);
450+
return db.Execute(command).ToRedisStreamEntries();
451+
}
452+
453+
/// <summary>
454+
/// Syntactic sugar for <see cref="XReadGroup(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisKey[],StackExchange.Redis.RedisValue[],System.Nullable{int},System.Nullable{int},System.Nullable{bool})"/>,
455+
/// where only one stream is being read from.
456+
/// </summary>
457+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
458+
/// <param name="groupName">The consumer group name.</param>
459+
/// <param name="consumerName">The name of the consumer in the consumer group.</param>
460+
/// <param name="key">Key of the stream where to read from.</param>
461+
/// <param name="position">The position from which to begin reading. See
462+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
463+
/// <param name="count">The maximum number of messages to return from each stream.</param>
464+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
465+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
466+
/// <param name="noAck">If set to <c>true</c> then inform the server that it should not wait for ACK for the
467+
/// messages it sends to this read call.</param>
468+
/// <returns>A <see cref="StreamEntry"/> list with the data read from the stream, of <c>null</c> if the command
469+
/// times out on the server.</returns>
470+
/// <remarks>
471+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,System.Nullable{StackExchange.Redis.RedisValue},System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
472+
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
473+
/// </remarks>
474+
public static StreamEntry[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null)
475+
{
476+
var result = XReadGroup(db, groupName, consumerName, new[] { key }, new[] { position }, count, timeoutMilliseconds, noAck);
477+
if (result == null || result.Length == 0)
478+
{
479+
return null;
480+
}
481+
return result[0].Entries;
482+
}
373483
}
374484
}

0 commit comments

Comments
 (0)