From ae2916aea91194679ac62d80d23f425003ad3e5f Mon Sep 17 00:00:00 2001 From: Gabriel Erzse Date: Mon, 19 Feb 2024 17:19:51 +0200 Subject: [PATCH 1/3] Add support for blocking XREAD and XREADGROUP Issues #237 and #255. --- .../CoreCommands/CoreCommandBuilder.cs | 75 ++++ src/NRedisStack/CoreCommands/CoreCommands.cs | 110 ++++++ .../DataTypes/RedisStreamEntries.cs | 25 ++ .../DataTypes/StreamSpecialIds.cs | 22 ++ .../CoreCommands/Literals/CommandArgs.cs | 4 + .../CoreCommands/Literals/Commands.cs | 2 + src/NRedisStack/ResponseParser.cs | 51 +++ .../Core Commands/CoreTests.cs | 338 ++++++++++++++++++ 8 files changed, 627 insertions(+) create mode 100644 src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs create mode 100644 src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs diff --git a/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs b/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs index 5f8f62e1..a343f3b2 100644 --- a/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs +++ b/src/NRedisStack/CoreCommands/CoreCommandBuilder.cs @@ -109,6 +109,81 @@ public static SerializedCommand BRPopLPush(RedisKey source, RedisKey destination return new SerializedCommand(RedisCoreCommands.BRPOPLPUSH, args); } + public static SerializedCommand XRead(RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds) + { + if (keys.Length == 0) + { + throw new ArgumentException("At least one key must be provided."); + } + + if (keys.Length != positions.Length) + { + throw new ArgumentException("The number of keys and positions must be the same."); + } + + List args = new List(); + + if (count != null) + { + args.Add(CoreArgs.COUNT); + args.Add(count); + } + + if (timeoutMilliseconds != null) + { + args.Add(CoreArgs.BLOCK); + args.Add(timeoutMilliseconds); + } + + args.Add(CoreArgs.STREAMS); + args.AddRange(keys.Cast()); + args.AddRange(positions.Cast()); + + return new SerializedCommand(RedisCoreCommands.XREAD, args); + } + + public static SerializedCommand XReadGroup(RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds, bool? noAcknowledge) + { + if (keys.Length == 0) + { + throw new ArgumentException("At least one key must be provided."); + } + + if (keys.Length != positions.Length) + { + throw new ArgumentException("The number of keys and positions must be the same."); + } + + List args = new List(); + + args.Add(CoreArgs.GROUP); + args.Add(groupName); + args.Add(consumerName); + + if (count != null) + { + args.Add(CoreArgs.COUNT); + args.Add(count); + } + + if (timeoutMilliseconds != null) + { + args.Add(CoreArgs.BLOCK); + args.Add(timeoutMilliseconds); + } + + if (noAcknowledge != null && noAcknowledge.Value) + { + args.Add(CoreArgs.NOACK); + } + + args.Add(CoreArgs.STREAMS); + args.AddRange(keys.Cast()); + args.AddRange(positions.Cast()); + + return new SerializedCommand(RedisCoreCommands.XREADGROUP, args); + } + private static SerializedCommand BlockingCommandWithKeysAndTimeout(String command, RedisKey[] keys, double timeout) { if (keys.Length == 0) diff --git a/src/NRedisStack/CoreCommands/CoreCommands.cs b/src/NRedisStack/CoreCommands/CoreCommands.cs index 514f3176..7a2a5ae4 100644 --- a/src/NRedisStack/CoreCommands/CoreCommands.cs +++ b/src/NRedisStack/CoreCommands/CoreCommands.cs @@ -370,5 +370,115 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val var command = CoreCommandBuilder.BRPopLPush(source, destination, timeout); return db.Execute(command).ToRedisValue(); } + + /// + /// The XREAD command. + /// + /// Read data from one or multiple streams, only returning entries with an ID greater than an ID provided by the caller. + /// + /// The class where this extension method is applied. + /// Keys of the streams where to read from. + /// The positions from which to begin reading for each stream. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// A value of for each stream, or null if the command times out + /// on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static RedisStreamEntries[]? XRead(this IDatabase db, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null) + { + var command = CoreCommandBuilder.XRead(keys, positions, count, timeoutMilliseconds); + return db.Execute(command).ToRedisStreamEntries(); + } + + /// + /// Syntactic sugar for , + /// where only one stream is being read from. + /// + /// The class where this extension method is applied. + /// Key of the stream where to read from. + /// The position from which to begin reading. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// A list with the data read from the stream, of null if the command + /// times out on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static StreamEntry[]? XRead(this IDatabase db, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null) + { + var result = XRead(db, new[] { key }, new[] { position }, count, timeoutMilliseconds); + if (result == null || result.Length == 0) + { + return null; + } + return result[0].Entries; + } + + /// + /// The XREADGROUP command. + /// + /// Read new or historical messages in one or several streams, for a consumer in a consumer group. + /// + /// The class where this extension method is applied. + /// The consumer group name. + /// The name of the consumer in the consumer group. + /// Keys of the streams where to read from. + /// The positions from which to begin reading for each stream. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// If set to true then inform the server that it should not wait for ACK for the + /// messages it sends to this read call. + /// A value of for each stream, or null if the command times out + /// on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static RedisStreamEntries[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null) + { + var command = CoreCommandBuilder.XReadGroup(groupName, consumerName, keys, positions, count, timeoutMilliseconds, noAck); + return db.Execute(command).ToRedisStreamEntries(); + } + + /// + /// Syntactic sugar for , + /// where only one stream is being read from. + /// + /// The class where this extension method is applied. + /// The consumer group name. + /// The name of the consumer in the consumer group. + /// Key of the stream where to read from. + /// The position from which to begin reading. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// If set to true then inform the server that it should not wait for ACK for the + /// messages it sends to this read call. + /// A list with the data read from the stream, of null if the command + /// times out on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static StreamEntry[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null) + { + var result = XReadGroup(db, groupName, consumerName, new[] { key }, new[] { position }, count, timeoutMilliseconds, noAck); + if (result == null || result.Length == 0) + { + return null; + } + return result[0].Entries; + } } } diff --git a/src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs b/src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs new file mode 100644 index 00000000..e472ec7b --- /dev/null +++ b/src/NRedisStack/CoreCommands/DataTypes/RedisStreamEntries.cs @@ -0,0 +1,25 @@ +using StackExchange.Redis; + +namespace NRedisStack.Core.DataTypes; + +/// +/// Holds the key and the entries for a Redis Stream, as returned by, for example, the XREAD or the XREADGROUP commands. +/// +public readonly struct RedisStreamEntries +{ + internal RedisStreamEntries(RedisKey key, StreamEntry[] entries) + { + Key = key; + Entries = entries; + } + + /// + /// The key for the stream. + /// + public RedisKey Key { get; } + + /// + /// An array of entries contained within the stream. + /// + public StreamEntry[] Entries { get; } +} diff --git a/src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs b/src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs new file mode 100644 index 00000000..b10b2bc5 --- /dev/null +++ b/src/NRedisStack/CoreCommands/DataTypes/StreamSpecialIds.cs @@ -0,0 +1,22 @@ +namespace NRedisStack.Core.DataTypes; + +/// +/// Constants for special stream Ids, to be used, for example, with the XREAD and XREADGROUP commands +/// +public class StreamSpecialIds +{ + /// + /// Smallest incomplete ID, can be used for reading from the very first message in a stream. + /// + public const string AllMessagesId = "0"; + + /// + /// For receiving only new messages that arrive after blocking on a read. + /// + public const string NewMessagesId = "$"; + + /// + /// For receiving only messages that were never delivered to any other consumer. + /// + public const string UndeliveredMessagesId = ">"; +} \ No newline at end of file diff --git a/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs b/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs index 9611cefd..edb8142f 100644 --- a/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs +++ b/src/NRedisStack/CoreCommands/Literals/CommandArgs.cs @@ -2,11 +2,15 @@ namespace NRedisStack.Core.Literals { internal static class CoreArgs { + public const string BLOCK = "BLOCK"; public const string COUNT = "COUNT"; + public const string GROUP = "GROUP"; public const string LEFT = "LEFT"; public const string MAX = "MAX"; public const string MIN = "MIN"; + public const string NOACK = "NOACK"; public const string RIGHT = "RIGHT"; + public const string STREAMS = "STREAMS"; public const string lib_name = "LIB-NAME"; public const string lib_ver = "LIB-VER"; } diff --git a/src/NRedisStack/CoreCommands/Literals/Commands.cs b/src/NRedisStack/CoreCommands/Literals/Commands.cs index 7e969144..2b312449 100644 --- a/src/NRedisStack/CoreCommands/Literals/Commands.cs +++ b/src/NRedisStack/CoreCommands/Literals/Commands.cs @@ -15,5 +15,7 @@ internal static class RedisCoreCommands public const string BZPOPMIN = "BZPOPMIN"; public const string CLIENT = "CLIENT"; public const string SETINFO = "SETINFO"; + public const string XREAD = "XREAD"; + public const string XREADGROUP = "XREADGROUP"; } } diff --git a/src/NRedisStack/ResponseParser.cs b/src/NRedisStack/ResponseParser.cs index 58546d37..0be0b169 100644 --- a/src/NRedisStack/ResponseParser.cs +++ b/src/NRedisStack/ResponseParser.cs @@ -802,5 +802,56 @@ public static Dictionary[] ToDictionarys(this RedisResult r return new Tuple>(resultKey, values); } + + public static RedisStreamEntries[]? ToRedisStreamEntries(this RedisResult result) + { + if (result.IsNull) + { + return null; + } + + var resultArray = (RedisResult[])result!; + RedisStreamEntries[] redisStreamEntries = new RedisStreamEntries[resultArray.Length]; + for (int i = 0; i < resultArray.Length; i++) + { + RedisResult[] streamResultArray = (RedisResult[])resultArray[i]!; + RedisKey streamKey = streamResultArray[0].ToRedisKey(); + StreamEntry[] streamEntries = ParseStreamEntries(streamResultArray[1].ToArray()); + redisStreamEntries[i] = new RedisStreamEntries(streamKey, streamEntries); + } + + return redisStreamEntries; + } + + private static StreamEntry[] ParseStreamEntries(IReadOnlyList results) + { + int count = results.Count; + StreamEntry[] streamEntries = new StreamEntry[count]; + + for (int i = 0; i < count; i++) + { + RedisResult[] streamEntryArray = (RedisResult[])results[i]!; + RedisValue key = streamEntryArray[0].ToRedisValue(); + NameValueEntry[] nameValueEntries = ParseNameValueEntries(streamEntryArray[1].ToArray()); + streamEntries[i] = new StreamEntry(key, nameValueEntries); + } + + return streamEntries; + } + + private static NameValueEntry[] ParseNameValueEntries(IReadOnlyList redisResults) + { + int count = redisResults.Count / 2; + var nameValueEntries = new NameValueEntry[count]; + + for (int i = 0; i < count; i++) + { + nameValueEntries[i] = new NameValueEntry( + redisResults[2 * i].ToRedisValue(), + redisResults[2 * i + 1].ToRedisValue()); + } + + return nameValueEntries; + } } } \ No newline at end of file diff --git a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs index dfad0471..126d10b4 100644 --- a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs +++ b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs @@ -656,4 +656,342 @@ public void TestBRPopLPush() Assert.Equal(3, db.ListLength("list-two")); Assert.Equal("b", db.ListLeftPop("list-two")); } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXRead() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + + var result = db.XRead("my-stream", StreamSpecialIds.AllMessagesId, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + StreamEntry streamEntry = result![0]; + var lastKey = streamEntry.Id; + Assert.Single(streamEntry.Values); + Assert.Equal("a", streamEntry.Values[0].Name); + Assert.Equal(1, streamEntry.Values[0].Value); + + result = db.XRead("my-stream", lastKey, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + streamEntry = result![0]; + Assert.Single(streamEntry.Values); + Assert.Equal("b", streamEntry.Values[0].Name); + Assert.Equal(7, streamEntry.Values[0].Value); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadMultipleStreams() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("stream-one", "a", 1); + db.StreamAdd("stream-one", "b", 7); + db.StreamAdd("stream-two", "c", "foo"); + db.StreamAdd("stream-two", "d", "bar"); + + var result = db.XRead(new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.AllMessagesId, StreamSpecialIds.AllMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + var lastKeyOne = result![0].Entries[0].Id; + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("a", result![0].Entries[0].Values[0].Name); + Assert.Equal(1, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + var lastKeyTwo = result![1].Entries[0].Id; + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("c", result![1].Entries[0].Values[0].Name); + Assert.Equal("foo", result![1].Entries[0].Values[0].Value); + + result = db.XRead(new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { lastKeyOne, lastKeyTwo }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("b", result![0].Entries[0].Values[0].Name); + Assert.Equal(7, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("d", result![1].Entries[0].Values[0].Name); + Assert.Equal("bar", result![1].Entries[0].Values[0].Value); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadOnlyNewMessages() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("my-stream", "a", 1); + + // Reading only new messages will yield null, because we don't add any and the read times out. + var result = db.XRead("my-stream", StreamSpecialIds.NewMessagesId, + count: 1, timeoutMilliseconds: 500); + + Assert.Null(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadNoKeysProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XRead(Array.Empty(), + new RedisValue[] { StreamSpecialIds.NewMessagesId })); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadMismatchedKeysAndPositionsCountsProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XRead(new RedisKey[] { "my-stream" }, + new RedisValue[] { StreamSpecialIds.NewMessagesId, StreamSpecialIds.NewMessagesId })); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroup() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + db.StreamAdd("my-stream", "c", 11); + db.StreamAdd("my-stream", "d", 12); + + // Read one message by each consumer. + var result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerAIdOne = result![0].Id; + Assert.Single(result[0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerBIdOne = result![0].Id; + Assert.Single(result[0].Values); + Assert.Equal("b", result![0].Values[0].Name); + Assert.Equal(7, result![0].Values[0].Value); + + // Read another message from each consumer, don't ACK anything. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerAIdTwo = result![0].Id; + Assert.Single(result![0].Values); + Assert.Equal("c", result![0].Values[0].Name); + Assert.Equal(11, result![0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerBIdTwo = result![0].Id; + Assert.Single(result![0].Values); + Assert.Equal("d", result![0].Values[0].Name); + Assert.Equal(12, result![0].Values[0].Value); + + // Since we didn't ACK anything, the pending messages can be re-read with the right ID. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("b", result![0].Values[0].Name); + Assert.Equal(7, result![0].Values[0].Value); + + // ACK the messages. + var ackedMessagesCount = db.StreamAcknowledge("my-stream", "my-group", + new[] { consumerAIdOne, consumerAIdTwo, consumerBIdOne, consumerBIdTwo }); + Assert.Equal(4, ackedMessagesCount); + + // After ACK we don't see anything pending. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + + result = db.XReadGroup("my-group", "consumer-b", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupNoAck() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + db.StreamAdd("my-stream", "c", 11); + db.StreamAdd("my-stream", "d", 12); + + var result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, + count: 1, timeoutMilliseconds: 1000, noAck: true); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + // We don't see anything pending because of the NOACK. + result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupMultipleStreams() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("stream-one", "my-group"); + Assert.True(groupCreationResult); + + groupCreationResult = db.StreamCreateConsumerGroup("stream-two", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("stream-one", "a", 1); + db.StreamAdd("stream-two", "b", 7); + db.StreamAdd("stream-one", "c", 11); + db.StreamAdd("stream-two", "d", 17); + + var result = db.XReadGroup("my-group", "consumer-a", + new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.UndeliveredMessagesId, StreamSpecialIds.UndeliveredMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("a", result![0].Entries[0].Values[0].Name); + Assert.Equal(1, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("b", result![1].Entries[0].Values[0].Name); + Assert.Equal(7, result![1].Entries[0].Values[0].Value); + + result = db.XReadGroup("my-group", "consumer-b", + new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.UndeliveredMessagesId, StreamSpecialIds.UndeliveredMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("c", result![0].Entries[0].Values[0].Name); + Assert.Equal(11, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("d", result![1].Entries[0].Values[0].Name); + Assert.Equal(17, result![1].Entries[0].Values[0].Value); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupNull() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + var result = db.XReadGroup("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, + count: 1, timeoutMilliseconds: 500); + + Assert.Null(result); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupNoKeysProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XReadGroup("my-group", "consumer", + Array.Empty(), new RedisValue[] { StreamSpecialIds.NewMessagesId })); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public void TestXReadGroupMismatchedKeysAndPositionsCountsProvided() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + Assert.Throws(() => db.XReadGroup("my-group", "consumer", + new RedisKey[] { "my-stream" }, new RedisValue[] { StreamSpecialIds.NewMessagesId, StreamSpecialIds.NewMessagesId })); + } } \ No newline at end of file From 49150d1b43c62487622bf358be53e85f33bd2e14 Mon Sep 17 00:00:00 2001 From: Gabriel Erzse Date: Mon, 26 Feb 2024 12:40:11 +0200 Subject: [PATCH 2/3] Extend unit test coverage --- tests/NRedisStack.Tests/Core Commands/CoreTests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs index 126d10b4..2acc3bcb 100644 --- a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs +++ b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs @@ -707,12 +707,14 @@ public void TestXReadMultipleStreams() Assert.NotNull(result); Assert.Equal(2, result!.Length); + Assert.Equal("stream-one", result![0].Key); Assert.Single(result![0].Entries); var lastKeyOne = result![0].Entries[0].Id; Assert.Single(result![0].Entries[0].Values); Assert.Equal("a", result![0].Entries[0].Values[0].Name); Assert.Equal(1, result![0].Entries[0].Values[0].Value); + Assert.Equal("stream-two", result![1].Key); Assert.Single(result![1].Entries); var lastKeyTwo = result![1].Entries[0].Id; Assert.Single(result![1].Entries[0].Values); From 97af530a5018f62d3c4111b3074ded90c3dd4648 Mon Sep 17 00:00:00 2001 From: Gabriel Erzse Date: Mon, 26 Feb 2024 14:21:29 +0200 Subject: [PATCH 3/3] Async blocking commands Issues #232, #233, #234, #235, #237, #248, #249, #250, #251, #255 Add the async versions for the blocking commands: BZPOPMIN, BZPOPMAX, BZMPOP, BLMOVE, BLMPOP, BLPOP, BRPOP, BRPOPLPUSH, XREAD and XREADGROUP. --- src/NRedisStack/CoreCommands/CoreCommands.cs | 2 +- .../CoreCommands/CoreCommandsAsync.cs | 460 +++++++++++ .../Core Commands/CoreTests.cs | 756 +++++++++++++++++- 3 files changed, 1208 insertions(+), 10 deletions(-) diff --git a/src/NRedisStack/CoreCommands/CoreCommands.cs b/src/NRedisStack/CoreCommands/CoreCommands.cs index 7a2a5ae4..d8f5f361 100644 --- a/src/NRedisStack/CoreCommands/CoreCommands.cs +++ b/src/NRedisStack/CoreCommands/CoreCommands.cs @@ -313,7 +313,7 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val } /// - /// Syntactic sugar for , + /// Syntactic sugar for , /// where only one key is used. /// /// The class where this extension method is applied. diff --git a/src/NRedisStack/CoreCommands/CoreCommandsAsync.cs b/src/NRedisStack/CoreCommands/CoreCommandsAsync.cs index c091159f..8ab5eb99 100644 --- a/src/NRedisStack/CoreCommands/CoreCommandsAsync.cs +++ b/src/NRedisStack/CoreCommands/CoreCommandsAsync.cs @@ -1,4 +1,5 @@ using NRedisStack.Core; +using NRedisStack.Core.DataTypes; using StackExchange.Redis; namespace NRedisStack { @@ -21,5 +22,464 @@ public static async Task ClientSetInfoAsync(this IDatabaseAsync db, SetInf } return (await db.ExecuteAsync(CoreCommandBuilder.ClientSetInfo(attr, value))).OKtoBoolean(); } + + /// + /// The BZMPOP command. + ///

+ /// Removes and returns up to entries from the first non-empty sorted set in + /// . If none of the sets contain elements, the call blocks on the server until elements + /// become available, or the given expires. A of 0 + /// means to wait indefinitely server-side. Returns null if the server timeout expires. + ///

+ /// When using this, pay attention to the timeout configured in the client, on the + /// , which by default can be too small: + /// + /// ConfigurationOptions configurationOptions = new ConfigurationOptions(); + /// configurationOptions.SyncTimeout = 120000; // set a meaningful value here + /// configurationOptions.EndPoints.Add("localhost"); + /// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions); + /// + /// If the connection multiplexer timeout expires in the client, a StackExchange.Redis.RedisTimeoutException + /// is thrown. + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// The keys to check. + /// Specify from which end of the sorted set to pop values. If set to MinMaxModifier.Min + /// then the minimum elements will be popped, otherwise the maximum values. + /// The maximum number of records to pop out. If set to null then the server default + /// will be used. + /// A collection of sorted set entries paired with their scores, together with the key they were popped + /// from, or null if the server timeout expires. + /// + public static async Task>?> BZMPopAsync(this IDatabase db, double timeout, RedisKey[] keys, MinMaxModifier minMaxModifier, long? count = null) + { + var command = CoreCommandBuilder.BZMPop(timeout, keys, minMaxModifier, count); + return (await db.ExecuteAsync(command)).ToSortedSetPopResults(); + } + + /// + /// Syntactic sugar for + /// , + /// where only one key is used. + /// + /// The class where this extension method is applied. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// The key to check. + /// Specify from which end of the sorted set to pop values. If set to MinMaxModifier.Min + /// then the minimum elements will be popped, otherwise the maximum values. + /// The maximum number of records to pop out. If set to null then the server default + /// will be used. + /// A collection of sorted set entries paired with their scores, together with the key they were popped + /// from, or null if the server timeout expires. + /// + public static async Task>?> BZMPopAsync(this IDatabase db, double timeout, RedisKey key, MinMaxModifier minMaxModifier, long? count = null) + { + return await BZMPopAsync(db, timeout, new[] { key }, minMaxModifier, count); + } + + /// + /// The BZPOPMIN command. + ///

+ /// Removes and returns the entry with the smallest score from the first non-empty sorted set in + /// . If none of the sets contain elements, the call blocks on the server until elements + /// become available, or the given expires. A of 0 + /// means to wait indefinitely server-side. Returns null if the server timeout expires. + ///

+ /// When using this, pay attention to the timeout configured in the client, on the + /// , which by default can be too small: + /// + /// ConfigurationOptions configurationOptions = new ConfigurationOptions(); + /// configurationOptions.SyncTimeout = 120000; // set a meaningful value here + /// configurationOptions.EndPoints.Add("localhost"); + /// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions); + /// + /// If the connection multiplexer timeout expires in the client, a StackExchange.Redis.RedisTimeoutException + /// is thrown. + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// The keys to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A sorted set entry paired with its score, together with the key it was popped from, or null + /// if the server timeout expires. + /// + public static async Task?> BZPopMinAsync(this IDatabase db, RedisKey[] keys, double timeout) + { + var command = CoreCommandBuilder.BZPopMin(keys, timeout); + return (await db.ExecuteAsync(command)).ToSortedSetPopResult(); + } + + /// + /// Syntactic sugar for , + /// where only one key is used. + /// + /// The class where this extension method is applied. + /// The key to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A sorted set entry paired with its score, together with the key it was popped from, or null + /// if the server timeout expires. + /// + public static async Task?> BZPopMinAsync(this IDatabase db, RedisKey key, double timeout) + { + return await BZPopMinAsync(db, new[] { key }, timeout); + } + + + /// + /// The BZPOPMAX command. + ///

+ /// Removes and returns the entry with the highest score from the first non-empty sorted set in + /// . If none of the sets contain elements, the call blocks on the server until elements + /// become available, or the given expires. A of 0 + /// means to wait indefinitely server-side. Returns null if the server timeout expires. + ///

+ /// When using this, pay attention to the timeout configured in the client, on the + /// , which by default can be too small: + /// + /// ConfigurationOptions configurationOptions = new ConfigurationOptions(); + /// configurationOptions.SyncTimeout = 120000; // set a meaningful value here + /// configurationOptions.EndPoints.Add("localhost"); + /// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions); + /// + /// If the connection multiplexer timeout expires in the client, a StackExchange.Redis.RedisTimeoutException + /// is thrown. + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// The keys to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A sorted set entry paired with its score, together with the key it was popped from, or null + /// if the server timeout expires. + /// + public static async Task?> BZPopMaxAsync(this IDatabase db, RedisKey[] keys, double timeout) + { + var command = CoreCommandBuilder.BZPopMax(keys, timeout); + return (await db.ExecuteAsync(command)).ToSortedSetPopResult(); + } + + /// + /// Syntactic sugar for , + /// where only one key is used. + /// + /// The class where this extension method is applied. + /// The key to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A sorted set entry paired with its score, together with the key it was popped from, or null + /// if the server timeout expires. + /// + public static async Task?> BZPopMaxAsync(this IDatabase db, RedisKey key, double timeout) + { + return await BZPopMaxAsync(db, new[] { key }, timeout); + } + + /// + /// The BLMPOP command. + ///

+ /// Removes and returns up to entries from the first non-empty list in + /// . If none of the lists contain elements, the call blocks on the server until elements + /// become available, or the given expires. A of 0 + /// means to wait indefinitely server-side. Returns null if the server timeout expires. + ///

+ /// When using this, pay attention to the timeout configured in the client, on the + /// , which by default can be too small: + /// + /// ConfigurationOptions configurationOptions = new ConfigurationOptions(); + /// configurationOptions.SyncTimeout = 120000; // set a meaningful value here + /// configurationOptions.EndPoints.Add("localhost"); + /// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions); + /// + /// If the connection multiplexer timeout expires in the client, a StackExchange.Redis.RedisTimeoutException + /// is thrown. + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// The keys to check. + /// Specify from which end of the list to pop values: left or right. + /// The maximum number of records to pop. If set to null then the server default + /// will be used. + /// A collection of values, together with the key they were popped from, or null if the + /// server timeout expires. + /// + public static async Task>?> BLMPopAsync(this IDatabase db, double timeout, RedisKey[] keys, ListSide listSide, long? count = null) + { + var command = CoreCommandBuilder.BLMPop(timeout, keys, listSide, count); + return (await db.ExecuteAsync(command)).ToListPopResults(); + } + + /// + /// Syntactic sugar for + /// , + /// where only one key is used. + /// + /// The class where this extension method is applied. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// The key to check. + /// Specify from which end of the list to pop values: left or right. + /// The maximum number of records to pop. If set to null then the server default + /// will be used. + /// A collection of values, together with the key they were popped from, or null if the + /// server timeout expires. + /// + public static async Task>?> BLMPopAsync(this IDatabase db, double timeout, RedisKey key, ListSide listSide, long? count = null) + { + return await BLMPopAsync(db, timeout, new[] { key }, listSide, count); + } + + /// + /// The BLPOP command. + ///

+ /// Removes and returns an entry from the head (left side) of the first non-empty list in . + /// If none of the lists contain elements, the call blocks on the server until elements + /// become available, or the given expires. A of 0 + /// means to wait indefinitely server-side. Returns null if the server timeout expires. + ///

+ /// When using this, pay attention to the timeout configured in the client, on the + /// , which by default can be too small: + /// + /// ConfigurationOptions configurationOptions = new ConfigurationOptions(); + /// configurationOptions.SyncTimeout = 120000; // set a meaningful value here + /// configurationOptions.EndPoints.Add("localhost"); + /// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions); + /// + /// If the connection multiplexer timeout expires in the client, a StackExchange.Redis.RedisTimeoutException + /// is thrown. + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// The keys to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A value, together with the key it was popped from, or null if the server timeout + /// expires. + /// + public static async Task?> BLPopAsync(this IDatabase db, RedisKey[] keys, double timeout) + { + var command = CoreCommandBuilder.BLPop(keys, timeout); + return (await db.ExecuteAsync(command)).ToListPopResult(); + } + + /// + /// Syntactic sugar for , + /// where only one key is used. + /// + /// The class where this extension method is applied. + /// The key to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A value, together with the key it was popped from, or null if the server timeout + /// expires. + /// + public static async Task?> BLPopAsync(this IDatabase db, RedisKey key, double timeout) + { + return await BLPopAsync(db, new[] { key }, timeout); + } + + /// + /// The BRPOP command. + ///

+ /// Removes and returns an entry from the tail (right side) of the first non-empty list in . + /// If none of the lists contain elements, the call blocks on the server until elements + /// become available, or the given expires. A of 0 + /// means to wait indefinitely server-side. Returns null if the server timeout expires. + ///

+ /// When using this, pay attention to the timeout configured in the client, on the + /// , which by default can be too small: + /// + /// ConfigurationOptions configurationOptions = new ConfigurationOptions(); + /// configurationOptions.SyncTimeout = 120000; // set a meaningful value here + /// configurationOptions.EndPoints.Add("localhost"); + /// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions); + /// + /// If the connection multiplexer timeout expires in the client, a StackExchange.Redis.RedisTimeoutException + /// is thrown. + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// The keys to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A value, together with the key it was popped from, or null if the server timeout + /// expires. + /// + public static async Task?> BRPopAsync(this IDatabase db, RedisKey[] keys, double timeout) + { + var command = CoreCommandBuilder.BRPop(keys, timeout); + return (await db.ExecuteAsync(command)).ToListPopResult(); + } + + /// + /// Syntactic sugar for , + /// where only one key is used. + /// + /// The class where this extension method is applied. + /// The key to check. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// A value, together with the key it was popped from, or null if the server timeout + /// expires. + /// + public static async Task?> BRPopAsync(this IDatabase db, RedisKey key, double timeout) + { + return await BRPopAsync(db, new[] { key }, timeout); + } + + /// + /// The BLMOVE command. + ///

+ /// Atomically returns and removes the first or last element of the list stored at + /// (depending on the value of ), and pushes the element as the first or last + /// element of the list stored at (depending on the value of + /// ). + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// The key of the source list. + /// The key of the destination list. + /// What side of the list to remove from. + /// What side of the list to move to. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// The element being popped and pushed, or null if the server timeout expires. + /// + public static async Task BLMoveAsync(this IDatabase db, RedisKey source, RedisKey destination, ListSide sourceSide, ListSide destinationSide, double timeout) + { + var command = CoreCommandBuilder.BLMove(source, destination, sourceSide, destinationSide, timeout); + return (await db.ExecuteAsync(command)).ToRedisValue(); + } + + /// + /// The BRPOPLPUSH command. + ///

+ /// Atomically returns and removes the last element (tail) of the list stored at source, and pushes the element + /// at the first element (head) of the list stored at destination. + ///

+ /// This is an extension method added to the class, for convenience. + ///

+ /// The class where this extension method is applied. + /// The key of the source list. + /// The key of the destination list. + /// Server-side timeout for the wait. A value of 0 means to wait indefinitely. + /// The element being popped and pushed, or null if the server timeout expires. + /// + public static async Task BRPopLPushAsync(this IDatabase db, RedisKey source, RedisKey destination, double timeout) + { + var command = CoreCommandBuilder.BRPopLPush(source, destination, timeout); + return (await db.ExecuteAsync(command)).ToRedisValue(); + } + + /// + /// The XREAD command. + /// + /// Read data from one or multiple streams, only returning entries with an ID greater than an ID provided by the caller. + /// + /// The class where this extension method is applied. + /// Keys of the streams where to read from. + /// The positions from which to begin reading for each stream. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// A value of for each stream, or null if the command times out + /// on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static async Task XReadAsync(this IDatabase db, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null) + { + var command = CoreCommandBuilder.XRead(keys, positions, count, timeoutMilliseconds); + return (await db.ExecuteAsync(command)).ToRedisStreamEntries(); + } + + /// + /// Syntactic sugar for , + /// where only one stream is being read from. + /// + /// The class where this extension method is applied. + /// Key of the stream where to read from. + /// The position from which to begin reading. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// A list with the data read from the stream, of null if the command + /// times out on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static async Task XReadAsync(this IDatabase db, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null) + { + var result = await XReadAsync(db, new[] { key }, new[] { position }, count, timeoutMilliseconds); + if (result == null || result.Length == 0) + { + return null; + } + return result[0].Entries; + } + + /// + /// The XREADGROUP command. + /// + /// Read new or historical messages in one or several streams, for a consumer in a consumer group. + /// + /// The class where this extension method is applied. + /// The consumer group name. + /// The name of the consumer in the consumer group. + /// Keys of the streams where to read from. + /// The positions from which to begin reading for each stream. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// If set to true then inform the server that it should not wait for ACK for the + /// messages it sends to this read call. + /// A value of for each stream, or null if the command times out + /// on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static async Task XReadGroupAsync(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null) + { + var command = CoreCommandBuilder.XReadGroup(groupName, consumerName, keys, positions, count, timeoutMilliseconds, noAck); + return (await db.ExecuteAsync(command)).ToRedisStreamEntries(); + } + + /// + /// Syntactic sugar for , + /// where only one stream is being read from. + /// + /// The class where this extension method is applied. + /// The consumer group name. + /// The name of the consumer in the consumer group. + /// Key of the stream where to read from. + /// The position from which to begin reading. See + /// for special Ids that can be used. + /// The maximum number of messages to return from each stream. + /// Amount of time in milliseconds to block in case all the streams are empty. + /// If not provided, or set to null then the read does not block. If set to 0 then it blocks indefinitely. + /// If set to true then inform the server that it should not wait for ACK for the + /// messages it sends to this read call. + /// A list with the data read from the stream, of null if the command + /// times out on the server. + /// + /// This is the blocking alternative for . + /// + /// + public static async Task XReadGroupAsync(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null) + { + var result = await XReadGroupAsync(db, groupName, consumerName, new[] { key }, new[] { position }, count, timeoutMilliseconds, noAck); + if (result == null || result.Length == 0) + { + return null; + } + return result[0].Entries; + } } } diff --git a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs index 2acc3bcb..45856478 100644 --- a/tests/NRedisStack.Tests/Core Commands/CoreTests.cs +++ b/tests/NRedisStack.Tests/Core Commands/CoreTests.cs @@ -169,6 +169,38 @@ public void TestBZMPop() Assert.Equal("d", resultWithDescendingOrder.Item2[0].Value.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] + public async Task TestBZMPopAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var sortedSetKey = "my-set"; + + db.SortedSetAdd(sortedSetKey, "a", 1.5); + db.SortedSetAdd(sortedSetKey, "b", 5.1); + db.SortedSetAdd(sortedSetKey, "c", 3.7); + db.SortedSetAdd(sortedSetKey, "d", 9.4); + db.SortedSetAdd(sortedSetKey, "e", 7.76); + + // Pop two items with Min modifier, which means it will pop the minimum values. + var resultWithDefaultOrder = await db.BZMPopAsync(0, sortedSetKey, MinMaxModifier.Min, 2); + + Assert.NotNull(resultWithDefaultOrder); + Assert.Equal(sortedSetKey, resultWithDefaultOrder!.Item1); + Assert.Equal(2, resultWithDefaultOrder.Item2.Count); + Assert.Equal("a", resultWithDefaultOrder.Item2[0].Value.ToString()); + Assert.Equal("c", resultWithDefaultOrder.Item2[1].Value.ToString()); + + // Pop one more item, with Max modifier, which means it will pop the maximum value. + var resultWithDescendingOrder = await db.BZMPopAsync(0, sortedSetKey, MinMaxModifier.Max, 1); + + Assert.NotNull(resultWithDescendingOrder); + Assert.Equal(sortedSetKey, resultWithDescendingOrder!.Item1); + Assert.Single(resultWithDescendingOrder.Item2); + Assert.Equal("d", resultWithDescendingOrder.Item2[0].Value.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBZMPopNull() { @@ -181,6 +213,18 @@ public void TestBZMPopNull() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] + public async Task TestBZMPopNullAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + // Nothing in the set, and a short server timeout, which yields null. + var result = await db.BZMPopAsync(0.5, "my-set", MinMaxModifier.Min, null); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBZMPopMultiplexerTimeout() { @@ -196,6 +240,21 @@ public void TestBZMPopMultiplexerTimeout() Assert.Throws(() => db.BZMPop(0, "my-set", MinMaxModifier.Min)); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] + public async Task TestBZMPopMultiplexerTimeoutAsync() + { + var configurationOptions = new ConfigurationOptions(); + configurationOptions.SyncTimeout = 1000; + + await using var redis = redisFixture.CustomRedis(configurationOptions, out _); + + var db = redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + // Server would wait forever, but the multiplexer times out in 1 second. + await Assert.ThrowsAsync(async () => await db.BZMPopAsync(0, "my-set", MinMaxModifier.Min)); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBZMPopMultipleSets() { @@ -238,6 +297,48 @@ public void TestBZMPopMultipleSets() Assert.Equal("c", result.Item2[1].Value.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] + public async Task TestBZMPopMultipleSetsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.SortedSetAdd("set-one", "a", 1.5); + db.SortedSetAdd("set-one", "b", 5.1); + db.SortedSetAdd("set-one", "c", 3.7); + db.SortedSetAdd("set-two", "d", 9.4); + db.SortedSetAdd("set-two", "e", 7.76); + + var result = await db.BZMPopAsync(0, "set-two", MinMaxModifier.Max); + + Assert.NotNull(result); + Assert.Equal("set-two", result!.Item1); + Assert.Single(result.Item2); + Assert.Equal("d", result.Item2[0].Value.ToString()); + + result = await db.BZMPopAsync(0, new[] { new RedisKey("set-two"), new RedisKey("set-one") }, MinMaxModifier.Min); + + Assert.NotNull(result); + Assert.Equal("set-two", result!.Item1); + Assert.Single(result.Item2); + Assert.Equal("e", result.Item2[0].Value.ToString()); + + result = await db.BZMPopAsync(0, new[] { new RedisKey("set-two"), new RedisKey("set-one") }, MinMaxModifier.Max); + + Assert.NotNull(result); + Assert.Equal("set-one", result!.Item1); + Assert.Single(result.Item2); + Assert.Equal("b", result.Item2[0].Value.ToString()); + + result = await db.BZMPopAsync(0, "set-one", MinMaxModifier.Min, count: 2); + + Assert.NotNull(result); + Assert.Equal("set-one", result!.Item1); + Assert.Equal(2, result.Item2.Count); + Assert.Equal("a", result.Item2[0].Value.ToString()); + Assert.Equal("c", result.Item2[1].Value.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBZMPopNoKeysProvided() { @@ -295,6 +396,25 @@ public void TestBZPopMin() Assert.Equal(1.5, result.Item2.Score); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestBZPopMinAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var sortedSetKey = "my-set"; + + db.SortedSetAdd(sortedSetKey, "a", 1.5); + db.SortedSetAdd(sortedSetKey, "b", 5.1); + + var result = await db.BZPopMinAsync(sortedSetKey, 0); + + Assert.NotNull(result); + Assert.Equal(sortedSetKey, result!.Item1); + Assert.Equal("a", result.Item2.Value.ToString()); + Assert.Equal(1.5, result.Item2.Score); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestBZPopMinNull() { @@ -307,6 +427,18 @@ public void TestBZPopMinNull() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestBZPopMinNullAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + // Nothing in the set, and a short server timeout, which yields null. + var result = await db.BZPopMinAsync("my-set", 0.5); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestBZPopMinMultipleSets() { @@ -330,6 +462,29 @@ public void TestBZPopMinMultipleSets() Assert.Equal("a", result.Item2.Value.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestBZPopMinMultipleSetsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.SortedSetAdd("set-one", "a", 1.5); + db.SortedSetAdd("set-one", "b", 5.1); + db.SortedSetAdd("set-two", "e", 7.76); + + var result = await db.BZPopMinAsync(new[] { new RedisKey("set-two"), new RedisKey("set-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("set-two", result!.Item1); + Assert.Equal("e", result.Item2.Value.ToString()); + + result = await db.BZPopMinAsync(new[] { new RedisKey("set-two"), new RedisKey("set-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("set-one", result!.Item1); + Assert.Equal("a", result.Item2.Value.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestBZPopMax() { @@ -349,6 +504,25 @@ public void TestBZPopMax() Assert.Equal(5.1, result.Item2.Score); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestBZPopMaxAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var sortedSetKey = "my-set"; + + db.SortedSetAdd(sortedSetKey, "a", 1.5); + db.SortedSetAdd(sortedSetKey, "b", 5.1); + + var result = await db.BZPopMaxAsync(sortedSetKey, 0); + + Assert.NotNull(result); + Assert.Equal(sortedSetKey, result!.Item1); + Assert.Equal("b", result.Item2.Value.ToString()); + Assert.Equal(5.1, result.Item2.Score); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestBZPopMaxNull() { @@ -361,6 +535,18 @@ public void TestBZPopMaxNull() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestBZPopMaxNullAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + // Nothing in the set, and a short server timeout, which yields null. + var result = await db.BZPopMaxAsync("my-set", 0.5); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestBZPopMaxMultipleSets() { @@ -384,6 +570,29 @@ public void TestBZPopMaxMultipleSets() Assert.Equal("b", result.Item2.Value.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestBZPopMaxMultipleSetsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.SortedSetAdd("set-one", "a", 1.5); + db.SortedSetAdd("set-one", "b", 5.1); + db.SortedSetAdd("set-two", "e", 7.76); + + var result = await db.BZPopMaxAsync(new[] { new RedisKey("set-two"), new RedisKey("set-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("set-two", result!.Item1); + Assert.Equal("e", result.Item2.Value.ToString()); + + result = await db.BZPopMaxAsync(new[] { new RedisKey("set-two"), new RedisKey("set-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("set-one", result!.Item1); + Assert.Equal("b", result.Item2.Value.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBLMPop() { @@ -414,6 +623,36 @@ public void TestBLMPop() Assert.Equal("e", resultWithDescendingOrder.Item2[0].ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] + public async Task TestBLMPopAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("my-list", "a"); + db.ListRightPush("my-list", "b"); + db.ListRightPush("my-list", "c"); + db.ListRightPush("my-list", "d"); + db.ListRightPush("my-list", "e"); + + // Pop two items from the left side. + var resultWithDefaultOrder = await db.BLMPopAsync(0, "my-list", ListSide.Left, 2); + + Assert.NotNull(resultWithDefaultOrder); + Assert.Equal("my-list", resultWithDefaultOrder!.Item1); + Assert.Equal(2, resultWithDefaultOrder.Item2.Count); + Assert.Equal("a", resultWithDefaultOrder.Item2[0].ToString()); + Assert.Equal("b", resultWithDefaultOrder.Item2[1].ToString()); + + // Pop one more item, from the right side. + var resultWithDescendingOrder = await db.BLMPopAsync(0, "my-list", ListSide.Right, 1); + + Assert.NotNull(resultWithDescendingOrder); + Assert.Equal("my-list", resultWithDescendingOrder!.Item1); + Assert.Single(resultWithDescendingOrder.Item2); + Assert.Equal("e", resultWithDescendingOrder.Item2[0].ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBLMPopNull() { @@ -426,6 +665,18 @@ public void TestBLMPopNull() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] + public async Task TestBLMPopNullAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + // Nothing in the list, and a short server timeout, which yields null. + var result = await db.BLMPopAsync(0.5, "my-list", ListSide.Left); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBLMPopMultipleLists() { @@ -468,6 +719,48 @@ public void TestBLMPopMultipleLists() Assert.Equal("b", result.Item2[1].ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] + public async Task TestBLMPopMultipleListsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("list-one", "a"); + db.ListRightPush("list-one", "b"); + db.ListRightPush("list-one", "c"); + db.ListRightPush("list-two", "d"); + db.ListRightPush("list-two", "e"); + + var result = await db.BLMPopAsync(0, "list-two", ListSide.Right); + + Assert.NotNull(result); + Assert.Equal("list-two", result!.Item1); + Assert.Single(result.Item2); + Assert.Equal("e", result.Item2[0].ToString()); + + result = await db.BLMPopAsync(0, new[] { new RedisKey("list-two"), new RedisKey("list-one") }, ListSide.Left); + + Assert.NotNull(result); + Assert.Equal("list-two", result!.Item1); + Assert.Single(result.Item2); + Assert.Equal("d", result.Item2[0].ToString()); + + result = await db.BLMPopAsync(0, new[] { new RedisKey("list-two"), new RedisKey("list-one") }, ListSide.Right); + + Assert.NotNull(result); + Assert.Equal("list-one", result!.Item1); + Assert.Single(result.Item2); + Assert.Equal("c", result.Item2[0].ToString()); + + result = await db.BLMPopAsync(0, "list-one", ListSide.Left, count: 2); + + Assert.NotNull(result); + Assert.Equal("list-one", result!.Item1); + Assert.Equal(2, result.Item2.Count); + Assert.Equal("a", result.Item2[0].ToString()); + Assert.Equal("b", result.Item2[1].ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "7.0.0")] public void TestBLMPopNoKeysProvided() { @@ -493,6 +786,22 @@ public void TestBLPop() Assert.Equal("a", result.Item2.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] + public async Task TestBLPopAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("my-list", "a"); + db.ListRightPush("my-list", "b"); + + var result = await db.BLPopAsync("my-list", 0); + + Assert.NotNull(result); + Assert.Equal("my-list", result!.Item1); + Assert.Equal("a", result.Item2.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] public void TestBLPopNull() { @@ -505,6 +814,18 @@ public void TestBLPopNull() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] + public async Task TestBLPopNullAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + // Nothing in the set, and a short server timeout, which yields null. + var result = await db.BLPopAsync("my-set", 0.5); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] public void TestBLPopMultipleLists() { @@ -528,6 +849,29 @@ public void TestBLPopMultipleLists() Assert.Equal("a", result.Item2.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] + public async Task TestBLPopMultipleListsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("list-one", "a"); + db.ListRightPush("list-one", "b"); + db.ListRightPush("list-two", "e"); + + var result = await db.BLPopAsync(new[] { new RedisKey("list-two"), new RedisKey("list-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("list-two", result!.Item1); + Assert.Equal("e", result.Item2.ToString()); + + result = await db.BLPopAsync(new[] { new RedisKey("list-two"), new RedisKey("list-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("list-one", result!.Item1); + Assert.Equal("a", result.Item2.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] public void TestBRPop() { @@ -544,6 +888,22 @@ public void TestBRPop() Assert.Equal("b", result.Item2.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] + public async Task TestBRPopAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("my-list", "a"); + db.ListRightPush("my-list", "b"); + + var result = await db.BRPopAsync("my-list", 0); + + Assert.NotNull(result); + Assert.Equal("my-list", result!.Item1); + Assert.Equal("b", result.Item2.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] public void TestBRPopNull() { @@ -556,6 +916,18 @@ public void TestBRPopNull() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] + public async Task TestBRPopNullAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + // Nothing in the set, and a short server timeout, which yields null. + var result = await db.BRPopAsync("my-set", 0.5); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] public void TestBRPopMultipleLists() { @@ -579,6 +951,29 @@ public void TestBRPopMultipleLists() Assert.Equal("b", result.Item2.ToString()); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.0.0")] + public async Task TestBRPopMultipleListsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("list-one", "a"); + db.ListRightPush("list-one", "b"); + db.ListRightPush("list-two", "e"); + + var result = await db.BRPopAsync(new[] { new RedisKey("list-two"), new RedisKey("list-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("list-two", result!.Item1); + Assert.Equal("e", result.Item2.ToString()); + + result = await db.BRPopAsync(new[] { new RedisKey("list-two"), new RedisKey("list-one") }, 0); + + Assert.NotNull(result); + Assert.Equal("list-one", result!.Item1); + Assert.Equal("b", result.Item2.ToString()); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "6.2.0")] public void TestBLMove() { @@ -626,18 +1021,96 @@ public void TestBLMove() result = db.BLMove("list-two", "list-one", ListSide.Right, ListSide.Right, 0); Assert.NotNull(result); - Assert.Equal("d", result!); + Assert.Equal("d", result!); + + Assert.Equal(2, db.ListLength("list-one")); + Assert.Equal("b", db.ListGetByIndex("list-one", 0)); + Assert.Equal("d", db.ListGetByIndex("list-one", 1)); + Assert.Equal(2, db.ListLength("list-two")); + Assert.Equal("a", db.ListGetByIndex("list-two", 0)); + Assert.Equal("c", db.ListGetByIndex("list-two", 1)); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "6.2.0")] + public async Task TestBLMoveAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("list-one", "a"); + db.ListRightPush("list-one", "b"); + + db.ListRightPush("list-two", "c"); + db.ListRightPush("list-two", "d"); + + var result = await db.BLMoveAsync("list-one", "list-two", ListSide.Right, ListSide.Left, 0); + Assert.NotNull(result); + Assert.Equal("b", result!); + + Assert.Equal(1, db.ListLength("list-one")); + Assert.Equal("a", db.ListGetByIndex("list-one", 0)); + Assert.Equal(3, db.ListLength("list-two")); + Assert.Equal("b", db.ListGetByIndex("list-two", 0)); + Assert.Equal("c", db.ListGetByIndex("list-two", 1)); + Assert.Equal("d", db.ListGetByIndex("list-two", 2)); + + result = await db.BLMoveAsync("list-two", "list-one", ListSide.Left, ListSide.Right, 0); + Assert.NotNull(result); + Assert.Equal("b", result!); + + Assert.Equal(2, db.ListLength("list-one")); + Assert.Equal("a", db.ListGetByIndex("list-one", 0)); + Assert.Equal("b", db.ListGetByIndex("list-one", 1)); + Assert.Equal(2, db.ListLength("list-two")); + Assert.Equal("c", db.ListGetByIndex("list-two", 0)); + Assert.Equal("d", db.ListGetByIndex("list-two", 1)); + + result = await db.BLMoveAsync("list-one", "list-two", ListSide.Left, ListSide.Left, 0); + Assert.NotNull(result); + Assert.Equal("a", result!); + + Assert.Equal(1, db.ListLength("list-one")); + Assert.Equal("b", db.ListGetByIndex("list-one", 0)); + Assert.Equal(3, db.ListLength("list-two")); + Assert.Equal("a", db.ListGetByIndex("list-two", 0)); + Assert.Equal("c", db.ListGetByIndex("list-two", 1)); + Assert.Equal("d", db.ListGetByIndex("list-two", 2)); + + result = await db.BLMoveAsync("list-two", "list-one", ListSide.Right, ListSide.Right, 0); + Assert.NotNull(result); + Assert.Equal("d", result!); + + Assert.Equal(2, db.ListLength("list-one")); + Assert.Equal("b", db.ListGetByIndex("list-one", 0)); + Assert.Equal("d", db.ListGetByIndex("list-one", 1)); + Assert.Equal(2, db.ListLength("list-two")); + Assert.Equal("a", db.ListGetByIndex("list-two", 0)); + Assert.Equal("c", db.ListGetByIndex("list-two", 1)); + } + + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.2.0")] + public void TestBRPopLPush() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.ListRightPush("list-one", "a"); + db.ListRightPush("list-one", "b"); + + db.ListRightPush("list-two", "c"); + db.ListRightPush("list-two", "d"); + + var result = db.BRPopLPush("list-one", "list-two", 0); + Assert.NotNull(result); + Assert.Equal("b", result!); - Assert.Equal(2, db.ListLength("list-one")); - Assert.Equal("b", db.ListGetByIndex("list-one", 0)); - Assert.Equal("d", db.ListGetByIndex("list-one", 1)); - Assert.Equal(2, db.ListLength("list-two")); - Assert.Equal("a", db.ListGetByIndex("list-two", 0)); - Assert.Equal("c", db.ListGetByIndex("list-two", 1)); + Assert.Equal(1, db.ListLength("list-one")); + Assert.Equal(3, db.ListLength("list-two")); + Assert.Equal("b", db.ListLeftPop("list-two")); } [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "2.2.0")] - public void TestBRPopLPush() + public async Task TestBRPopLPushAsync() { var db = redisFixture.Redis.GetDatabase(null); db.Execute("FLUSHALL"); @@ -648,7 +1121,7 @@ public void TestBRPopLPush() db.ListRightPush("list-two", "c"); db.ListRightPush("list-two", "d"); - var result = db.BRPopLPush("list-one", "list-two", 0); + var result = await db.BRPopLPushAsync("list-one", "list-two", 0); Assert.NotNull(result); Assert.Equal("b", result!); @@ -689,6 +1162,38 @@ public void TestXRead() Assert.Equal(7, streamEntry.Values[0].Value); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestXReadAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + + var result = await db.XReadAsync("my-stream", StreamSpecialIds.AllMessagesId, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + StreamEntry streamEntry = result![0]; + var lastKey = streamEntry.Id; + Assert.Single(streamEntry.Values); + Assert.Equal("a", streamEntry.Values[0].Name); + Assert.Equal(1, streamEntry.Values[0].Value); + + result = await db.XReadAsync("my-stream", lastKey, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + streamEntry = result![0]; + Assert.Single(streamEntry.Values); + Assert.Equal("b", streamEntry.Values[0].Name); + Assert.Equal(7, streamEntry.Values[0].Value); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestXReadMultipleStreams() { @@ -739,6 +1244,54 @@ public void TestXReadMultipleStreams() Assert.Equal("bar", result![1].Entries[0].Values[0].Value); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestXReadMultipleStreamsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("stream-one", "a", 1); + db.StreamAdd("stream-one", "b", 7); + db.StreamAdd("stream-two", "c", "foo"); + db.StreamAdd("stream-two", "d", "bar"); + + var result = await db.XReadAsync(new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.AllMessagesId, StreamSpecialIds.AllMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + var lastKeyOne = result![0].Entries[0].Id; + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("a", result![0].Entries[0].Values[0].Name); + Assert.Equal(1, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + var lastKeyTwo = result![1].Entries[0].Id; + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("c", result![1].Entries[0].Values[0].Name); + Assert.Equal("foo", result![1].Entries[0].Values[0].Value); + + result = await db.XReadAsync(new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { lastKeyOne, lastKeyTwo }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("b", result![0].Entries[0].Values[0].Name); + Assert.Equal(7, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("d", result![1].Entries[0].Values[0].Name); + Assert.Equal("bar", result![1].Entries[0].Values[0].Value); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestXReadOnlyNewMessages() { @@ -754,6 +1307,21 @@ public void TestXReadOnlyNewMessages() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestXReadOnlyNewMessagesAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + db.StreamAdd("my-stream", "a", 1); + + // Reading only new messages will yield null, because we don't add any and the read times out. + var result = await db.XReadAsync("my-stream", StreamSpecialIds.NewMessagesId, + count: 1, timeoutMilliseconds: 500); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestXReadNoKeysProvided() { @@ -874,6 +1442,106 @@ public void TestXReadGroup() Assert.Empty(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestXReadGroupAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("my-stream", "a", 1); + db.StreamAdd("my-stream", "b", 7); + db.StreamAdd("my-stream", "c", 11); + db.StreamAdd("my-stream", "d", 12); + + // Read one message by each consumer. + var result = await db.XReadGroupAsync("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerAIdOne = result![0].Id; + Assert.Single(result[0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + result = await db.XReadGroupAsync("my-group", "consumer-b", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerBIdOne = result![0].Id; + Assert.Single(result[0].Values); + Assert.Equal("b", result![0].Values[0].Name); + Assert.Equal(7, result![0].Values[0].Value); + + // Read another message from each consumer, don't ACK anything. + result = await db.XReadGroupAsync("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerAIdTwo = result![0].Id; + Assert.Single(result![0].Values); + Assert.Equal("c", result![0].Values[0].Name); + Assert.Equal(11, result![0].Values[0].Value); + + result = await db.XReadGroupAsync("my-group", "consumer-b", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + var consumerBIdTwo = result![0].Id; + Assert.Single(result![0].Values); + Assert.Equal("d", result![0].Values[0].Name); + Assert.Equal(12, result![0].Values[0].Value); + + // Since we didn't ACK anything, the pending messages can be re-read with the right ID. + result = await db.XReadGroupAsync("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("a", result![0].Values[0].Name); + Assert.Equal(1, result![0].Values[0].Value); + + result = await db.XReadGroupAsync("my-group", "consumer-b", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Single(result); + + Assert.Single(result![0].Values); + Assert.Equal("b", result![0].Values[0].Name); + Assert.Equal(7, result![0].Values[0].Value); + + // ACK the messages. + var ackedMessagesCount = db.StreamAcknowledge("my-stream", "my-group", + new[] { consumerAIdOne, consumerAIdTwo, consumerBIdOne, consumerBIdTwo }); + Assert.Equal(4, ackedMessagesCount); + + // After ACK we don't see anything pending. + result = await db.XReadGroupAsync("my-group", "consumer-a", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + + result = await db.XReadGroupAsync("my-group", "consumer-b", + "my-stream", StreamSpecialIds.AllMessagesId, count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Empty(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestXReadGroupNoAck() { @@ -961,6 +1629,60 @@ public void TestXReadGroupMultipleStreams() Assert.Equal(17, result![1].Entries[0].Values[0].Value); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestXReadGroupMultipleStreamsAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("stream-one", "my-group"); + Assert.True(groupCreationResult); + + groupCreationResult = db.StreamCreateConsumerGroup("stream-two", "my-group"); + Assert.True(groupCreationResult); + + db.StreamAdd("stream-one", "a", 1); + db.StreamAdd("stream-two", "b", 7); + db.StreamAdd("stream-one", "c", 11); + db.StreamAdd("stream-two", "d", 17); + + var result = await db.XReadGroupAsync("my-group", "consumer-a", + new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.UndeliveredMessagesId, StreamSpecialIds.UndeliveredMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("a", result![0].Entries[0].Values[0].Name); + Assert.Equal(1, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("b", result![1].Entries[0].Values[0].Name); + Assert.Equal(7, result![1].Entries[0].Values[0].Value); + + result = await db.XReadGroupAsync("my-group", "consumer-b", + new RedisKey[] { "stream-one", "stream-two" }, + new RedisValue[] { StreamSpecialIds.UndeliveredMessagesId, StreamSpecialIds.UndeliveredMessagesId }, + count: 1, timeoutMilliseconds: 1000); + + Assert.NotNull(result); + Assert.Equal(2, result!.Length); + + Assert.Single(result![0].Entries); + Assert.Single(result![0].Entries[0].Values); + Assert.Equal("c", result![0].Entries[0].Values[0].Name); + Assert.Equal(11, result![0].Entries[0].Values[0].Value); + + Assert.Single(result![1].Entries); + Assert.Single(result![1].Entries[0].Values); + Assert.Equal("d", result![1].Entries[0].Values[0].Name); + Assert.Equal(17, result![1].Entries[0].Values[0].Value); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestXReadGroupNull() { @@ -977,6 +1699,22 @@ public void TestXReadGroupNull() Assert.Null(result); } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] + public async Task TestXReadGroupNullAsync() + { + var db = redisFixture.Redis.GetDatabase(null); + db.Execute("FLUSHALL"); + + var groupCreationResult = db.StreamCreateConsumerGroup("my-stream", "my-group"); + Assert.True(groupCreationResult); + + var result = await db.XReadGroupAsync("my-group", "consumer-a", + "my-stream", StreamSpecialIds.UndeliveredMessagesId, + count: 1, timeoutMilliseconds: 500); + + Assert.Null(result); + } + [SkipIfRedis(Is.OSSCluster, Is.Enterprise, Comparison.LessThan, "5.0.0")] public void TestXReadGroupNoKeysProvided() {