Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async blocking commands #257

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/NRedisStack/CoreCommands/CoreCommandBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,81 @@ public static SerializedCommand BRPopLPush(RedisKey source, RedisKey destination
return new SerializedCommand(RedisCoreCommands.BRPOPLPUSH, args);
}

public static SerializedCommand XRead(RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds)
{
if (keys.Length == 0)
{
throw new ArgumentException("At least one key must be provided.");
}

if (keys.Length != positions.Length)
{
throw new ArgumentException("The number of keys and positions must be the same.");
}

List<object> args = new List<object>();

if (count != null)
{
args.Add(CoreArgs.COUNT);
args.Add(count);
}

if (timeoutMilliseconds != null)
{
args.Add(CoreArgs.BLOCK);
args.Add(timeoutMilliseconds);
}

args.Add(CoreArgs.STREAMS);
args.AddRange(keys.Cast<object>());
args.AddRange(positions.Cast<object>());

return new SerializedCommand(RedisCoreCommands.XREAD, args);
}

public static SerializedCommand XReadGroup(RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds, bool? noAcknowledge)
{
if (keys.Length == 0)
{
throw new ArgumentException("At least one key must be provided.");
}

if (keys.Length != positions.Length)
{
throw new ArgumentException("The number of keys and positions must be the same.");
}

List<object> args = new List<object>();

args.Add(CoreArgs.GROUP);
args.Add(groupName);
args.Add(consumerName);

if (count != null)
{
args.Add(CoreArgs.COUNT);
args.Add(count);
}

if (timeoutMilliseconds != null)
{
args.Add(CoreArgs.BLOCK);
args.Add(timeoutMilliseconds);
}

if (noAcknowledge != null && noAcknowledge.Value)
{
args.Add(CoreArgs.NOACK);
}

args.Add(CoreArgs.STREAMS);
args.AddRange(keys.Cast<object>());
args.AddRange(positions.Cast<object>());

return new SerializedCommand(RedisCoreCommands.XREADGROUP, args);
}

private static SerializedCommand BlockingCommandWithKeysAndTimeout(String command, RedisKey[] keys, double timeout)
{
if (keys.Length == 0)
Expand Down
112 changes: 111 additions & 1 deletion src/NRedisStack/CoreCommands/CoreCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val
}

/// <summary>
/// Syntactic sugar for <see cref="BLPop(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],double)"/>,
/// Syntactic sugar for <see cref="BRPop(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],double)"/>,
/// where only one key is used.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
Expand Down Expand Up @@ -370,5 +370,115 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val
var command = CoreCommandBuilder.BRPopLPush(source, destination, timeout);
return db.Execute(command).ToRedisValue();
}

/// <summary>
/// The XREAD command.
/// <para/>
/// Read data from one or multiple streams, only returning entries with an ID greater than an ID provided by the caller.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="keys">Keys of the streams where to read from.</param>
/// <param name="positions">The positions from which to begin reading for each stream. See
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
/// <param name="count">The maximum number of messages to return from each stream.</param>
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
/// <returns>A value of <see cref="RedisStreamEntries"/> for each stream, or <c>null</c> if the command times out
/// on the server.</returns>
/// <remarks>
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamRead(StackExchange.Redis.StreamPosition[],System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
/// <para><seealso href="https://redis.io/commands/xread"/></para>
/// </remarks>
public static RedisStreamEntries[]? XRead(this IDatabase db, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null)
{
var command = CoreCommandBuilder.XRead(keys, positions, count, timeoutMilliseconds);
return db.Execute(command).ToRedisStreamEntries();
}

/// <summary>
/// Syntactic sugar for <see cref="XRead(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],StackExchange.Redis.RedisValue[],System.Nullable{int},System.Nullable{int})"/>,
/// where only one stream is being read from.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="key">Key of the stream where to read from.</param>
/// <param name="position">The position from which to begin reading. See
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
/// <param name="count">The maximum number of messages to return from each stream.</param>
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
/// <returns>A <see cref="StreamEntry"/> list with the data read from the stream, of <c>null</c> if the command
/// times out on the server.</returns>
/// <remarks>
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamRead(StackExchange.Redis.RedisKey,StackExchange.Redis.RedisValue,System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
/// <para><seealso href="https://redis.io/commands/xread"/></para>
/// </remarks>
public static StreamEntry[]? XRead(this IDatabase db, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null)
{
var result = XRead(db, new[] { key }, new[] { position }, count, timeoutMilliseconds);
if (result == null || result.Length == 0)
{
return null;
}
return result[0].Entries;
}

/// <summary>
/// The XREADGROUP command.
/// <para/>
/// Read new or historical messages in one or several streams, for a consumer in a consumer group.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="groupName">The consumer group name.</param>
/// <param name="consumerName">The name of the consumer in the consumer group.</param>
/// <param name="keys">Keys of the streams where to read from.</param>
/// <param name="positions">The positions from which to begin reading for each stream. See
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
/// <param name="count">The maximum number of messages to return from each stream.</param>
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
/// <param name="noAck">If set to <c>true</c> then inform the server that it should not wait for ACK for the
/// messages it sends to this read call.</param>
/// <returns>A value of <see cref="RedisStreamEntries"/> for each stream, or <c>null</c> if the command times out
/// on the server.</returns>
/// <remarks>
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[],StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
/// </remarks>
public static RedisStreamEntries[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null)
{
var command = CoreCommandBuilder.XReadGroup(groupName, consumerName, keys, positions, count, timeoutMilliseconds, noAck);
return db.Execute(command).ToRedisStreamEntries();
}

/// <summary>
/// Syntactic sugar for <see cref="XReadGroup(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisKey[],StackExchange.Redis.RedisValue[],System.Nullable{int},System.Nullable{int},System.Nullable{bool})"/>,
/// where only one stream is being read from.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="groupName">The consumer group name.</param>
/// <param name="consumerName">The name of the consumer in the consumer group.</param>
/// <param name="key">Key of the stream where to read from.</param>
/// <param name="position">The position from which to begin reading. See
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
/// <param name="count">The maximum number of messages to return from each stream.</param>
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
/// <param name="noAck">If set to <c>true</c> then inform the server that it should not wait for ACK for the
/// messages it sends to this read call.</param>
/// <returns>A <see cref="StreamEntry"/> list with the data read from the stream, of <c>null</c> if the command
/// times out on the server.</returns>
/// <remarks>
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,System.Nullable{StackExchange.Redis.RedisValue},System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
/// </remarks>
public static StreamEntry[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null)
{
var result = XReadGroup(db, groupName, consumerName, new[] { key }, new[] { position }, count, timeoutMilliseconds, noAck);
if (result == null || result.Length == 0)
{
return null;
}
return result[0].Entries;
}
}
}
Loading
Loading