Skip to content

Commit 2d5babb

Browse files
authored
Time-series insertion filters that allow ignoring close-by samples (#303)
* timeseries ignore parameters with builder * adding remaining builder methods and unit tests #302 * remove unused import and make tscommandbuilder public * add version checks for tests
1 parent e913dfc commit 2d5babb

14 files changed

+544
-117
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
using NRedisStack.DataTypes;
2+
using NRedisStack.Literals.Enums;
3+
4+
namespace NRedisStack
5+
{
6+
public class TsBaseParams
7+
{
8+
protected IList<object> parameters;
9+
10+
internal TsBaseParams()
11+
{
12+
this.parameters = new List<object>();
13+
}
14+
15+
internal TsBaseParams(IList<object> parameters)
16+
{
17+
this.parameters = parameters;
18+
}
19+
20+
internal object[] ToArray(string key)
21+
{
22+
parameters.Insert(0, key);
23+
return parameters.ToArray();
24+
}
25+
}
26+
27+
public class TsCreateParams : TsBaseParams
28+
{
29+
internal TsCreateParams(IList<object> parameters) : base(parameters) { }
30+
31+
internal TsCreateParams(long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed,
32+
long? chunkSizeBytes, TsDuplicatePolicy? policy)
33+
{
34+
parameters.AddRetentionTime(retentionTime);
35+
parameters.AddChunkSize(chunkSizeBytes);
36+
parameters.AddLabels(labels);
37+
parameters.AddUncompressed(uncompressed);
38+
parameters.AddDuplicatePolicy(policy);
39+
}
40+
}
41+
42+
public class TsAlterParams : TsBaseParams
43+
{
44+
internal TsAlterParams(IList<object> parameters) : base(parameters) { }
45+
46+
internal TsAlterParams(long? retentionTime, long? chunkSizeBytes, TsDuplicatePolicy? policy, IReadOnlyCollection<TimeSeriesLabel>? labels)
47+
{
48+
parameters.AddRetentionTime(retentionTime);
49+
parameters.AddChunkSize(chunkSizeBytes);
50+
parameters.AddDuplicatePolicy(policy);
51+
parameters.AddLabels(labels);
52+
}
53+
}
54+
55+
public class TsAddParams : TsBaseParams
56+
{
57+
internal TsAddParams(IList<object> parameters) : base(parameters) { }
58+
59+
internal TsAddParams(TimeStamp timestamp, double value, long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes, TsDuplicatePolicy? policy)
60+
{
61+
parameters.Add(timestamp.Value);
62+
parameters.Add(value);
63+
parameters.AddRetentionTime(retentionTime);
64+
parameters.AddChunkSize(chunkSizeBytes);
65+
parameters.AddLabels(labels);
66+
parameters.AddUncompressed(uncompressed);
67+
parameters.AddOnDuplicate(policy);
68+
}
69+
}
70+
71+
public class TsIncrByParams : TsBaseParams
72+
{
73+
internal TsIncrByParams(IList<object> parameters) : base(parameters) { }
74+
75+
internal TsIncrByParams(double value, TimeStamp? timestampMaybe, long? retentionTime,
76+
IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes)
77+
{
78+
parameters.Add(value);
79+
if (timestampMaybe is { } timestamp) parameters.AddTimeStamp(timestamp);
80+
parameters.AddRetentionTime(retentionTime);
81+
parameters.AddChunkSize(chunkSizeBytes);
82+
if (labels != null) parameters.AddLabels(labels);
83+
parameters.AddUncompressed(uncompressed);
84+
}
85+
}
86+
87+
public class TsDecrByParams : TsIncrByParams
88+
{
89+
internal TsDecrByParams(IList<object> parameters) : base(parameters) { }
90+
91+
internal TsDecrByParams(double value, TimeStamp? timestampMaybe, long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes)
92+
: base(value, timestampMaybe, retentionTime, labels, uncompressed, chunkSizeBytes)
93+
{ }
94+
}
95+
96+
}

src/NRedisStack/TimeSeries/Literals/CommandArgs.cs

+1
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ internal class TimeSeriesArgs
2323
public const string DEBUG = "DEBUG";
2424
public const string BUCKETTIMESTAMP = "BUCKETTIMESTAMP";
2525
public const string EMPTY = "EMPTY";
26+
public const String IGNORE = "IGNORE";
2627
}
2728
}

src/NRedisStack/TimeSeries/TimeSeriesAux.cs

-105
Original file line numberDiff line numberDiff line change
@@ -12,45 +12,6 @@ public static void AddLatest(this IList<object> args, bool latest)
1212
if (latest) args.Add(TimeSeriesArgs.LATEST);
1313
}
1414

15-
public static void AddRetentionTime(this IList<object> args, long? retentionTime)
16-
{
17-
if (retentionTime.HasValue)
18-
{
19-
args.Add(TimeSeriesArgs.RETENTION);
20-
args.Add(retentionTime);
21-
}
22-
}
23-
24-
public static void AddChunkSize(this IList<object> args, long? chunkSize)
25-
{
26-
if (chunkSize.HasValue)
27-
{
28-
args.Add(TimeSeriesArgs.CHUNK_SIZE);
29-
args.Add(chunkSize);
30-
}
31-
}
32-
33-
public static void AddLabels(this IList<object> args, IReadOnlyCollection<TimeSeriesLabel>? labels)
34-
{
35-
if (labels != null)
36-
{
37-
args.Add(TimeSeriesArgs.LABELS);
38-
foreach (var label in labels)
39-
{
40-
args.Add(label.Key);
41-
args.Add(label.Value);
42-
}
43-
}
44-
}
45-
46-
public static void AddUncompressed(this IList<object> args, bool? uncompressed)
47-
{
48-
if (uncompressed.HasValue)
49-
{
50-
args.Add(TimeSeriesArgs.UNCOMPRESSED);
51-
}
52-
}
53-
5415
public static void AddCount(this IList<object> args, long? count)
5516
{
5617
if (count.HasValue)
@@ -60,25 +21,6 @@ public static void AddCount(this IList<object> args, long? count)
6021
}
6122
}
6223

63-
public static void AddDuplicatePolicy(this IList<object> args, TsDuplicatePolicy? policy)
64-
{
65-
if (policy.HasValue)
66-
{
67-
args.Add(TimeSeriesArgs.DUPLICATE_POLICY);
68-
args.Add(policy.Value.AsArg());
69-
}
70-
}
71-
72-
73-
public static void AddOnDuplicate(this IList<object> args, TsDuplicatePolicy? policy)
74-
{
75-
if (policy.HasValue)
76-
{
77-
args.Add(TimeSeriesArgs.ON_DUPLICATE);
78-
args.Add(policy.Value.AsArg());
79-
}
80-
}
81-
8224
public static void AddAlign(this IList<object> args, TimeStamp? alignMaybe)
8325
{
8426
if (alignMaybe is { } align)
@@ -212,53 +154,6 @@ public static void AddRule(this IList<object> args, TimeSeriesRule rule)
212154
args.Add(rule.TimeBucket);
213155
}
214156

215-
public static List<object> BuildTsCreateArgs(string key, long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed,
216-
long? chunkSizeBytes, TsDuplicatePolicy? policy)
217-
{
218-
var args = new List<object> { key };
219-
args.AddRetentionTime(retentionTime);
220-
args.AddChunkSize(chunkSizeBytes);
221-
args.AddLabels(labels);
222-
args.AddUncompressed(uncompressed);
223-
args.AddDuplicatePolicy(policy);
224-
return args;
225-
}
226-
227-
public static List<object> BuildTsAlterArgs(string key, long? retentionTime, long? chunkSizeBytes,
228-
TsDuplicatePolicy? policy, IReadOnlyCollection<TimeSeriesLabel>? labels)
229-
{
230-
var args = new List<object> { key };
231-
args.AddRetentionTime(retentionTime);
232-
args.AddChunkSize(chunkSizeBytes);
233-
args.AddDuplicatePolicy(policy);
234-
args.AddLabels(labels);
235-
return args;
236-
}
237-
238-
public static List<object> BuildTsAddArgs(string key, TimeStamp timestamp, double value, long? retentionTime,
239-
IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes, TsDuplicatePolicy? policy)
240-
{
241-
var args = new List<object> { key, timestamp.Value, value };
242-
args.AddRetentionTime(retentionTime);
243-
args.AddChunkSize(chunkSizeBytes);
244-
args.AddLabels(labels);
245-
args.AddUncompressed(uncompressed);
246-
args.AddOnDuplicate(policy);
247-
return args;
248-
}
249-
250-
public static List<object> BuildTsIncrDecrByArgs(string key, double value, TimeStamp? timestampMaybe, long? retentionTime,
251-
IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes)
252-
{
253-
var args = new List<object> { key, value };
254-
if (timestampMaybe is { } timestamp) args.AddTimeStamp(timestamp);
255-
args.AddRetentionTime(retentionTime);
256-
args.AddChunkSize(chunkSizeBytes);
257-
if (labels != null) args.AddLabels(labels);
258-
args.AddUncompressed(uncompressed);
259-
return args;
260-
}
261-
262157
public static List<object> BuildTsDelArgs(string key, TimeStamp fromTimeStamp, TimeStamp toTimeStamp)
263158
{
264159
var args = new List<object>

src/NRedisStack/TimeSeries/TimeSeriesCommands.cs

+19
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,33 @@ public TimeSeriesCommands(IDatabase db) : base(db)
1414
#region Create
1515

1616
/// <inheritdoc/>
17+
[Obsolete("Please use the other method with TsCreateParams and check related builder TsCreateParamsBuilder to build parameters.")]
1718
public bool Create(string key, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
1819
{
1920
return _db.Execute(TimeSeriesCommandsBuilder.Create(key, retentionTime, labels,
2021
uncompressed, chunkSizeBytes,
2122
duplicatePolicy)).OKtoBoolean();
2223
}
2324

25+
/// <inheritdoc/>
26+
public bool Create(string key, TsCreateParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.Create(key, parameters)).OKtoBoolean();
27+
2428
#endregion
2529

2630
#region Update
2731

2832
/// <inheritdoc/>
33+
[Obsolete("Please use the other method with TsAlterParams and check related builder TsAlterParamsBuilder to build parameters.")]
2934
public bool Alter(string key, long? retentionTime = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null)
3035
{
3136
return _db.Execute(TimeSeriesCommandsBuilder.Alter(key, retentionTime, chunkSizeBytes, duplicatePolicy, labels)).OKtoBoolean();
3237
}
3338

39+
/// <inheritdoc/>
40+
public bool Alter(string key, TsAlterParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.Alter(key, parameters)).OKtoBoolean();
3441

3542
/// <inheritdoc/>
43+
[Obsolete("Please use the other method with TsAddParams and check related builder TsAddParamsBuilder to build parameters.")]
3644
public TimeStamp Add(string key, TimeStamp timestamp, double value, long? retentionTime = null,
3745
IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null,
3846
long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
@@ -41,26 +49,37 @@ public TimeStamp Add(string key, TimeStamp timestamp, double value, long? retent
4149
uncompressed, chunkSizeBytes, duplicatePolicy)).ToTimeStamp();
4250
}
4351

52+
/// <inheritdoc/>
53+
public TimeStamp Add(string key, TsAddParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.Add(key, parameters)).ToTimeStamp();
54+
4455
/// <inheritdoc/>
4556
public IReadOnlyList<TimeStamp> MAdd(IReadOnlyCollection<(string key, TimeStamp timestamp, double value)> sequence)
4657
{
4758
return _db.Execute(TimeSeriesCommandsBuilder.MAdd(sequence)).ToTimeStampArray()!;
4859
}
4960

5061
/// <inheritdoc/>
62+
[Obsolete("Please use the other method with TsIncrByParams and check related builder TsIncryByParamsBuilder to build parameters.")]
5163
public TimeStamp IncrBy(string key, double value, TimeStamp? timestamp = null, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null)
5264
{
5365
return _db.Execute(TimeSeriesCommandsBuilder.IncrBy(key, value, timestamp, retentionTime,
5466
labels, uncompressed, chunkSizeBytes)).ToTimeStamp();
5567
}
5668

5769
/// <inheritdoc/>
70+
public TimeStamp IncrBy(string key, TsIncrByParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.IncrBy(key, parameters)).ToTimeStamp();
71+
72+
/// <inheritdoc/>
73+
[Obsolete("Please use the other method with TsDecrByParams and check related builder TsDecryByParamsBuilder to build parameters.")]
5874
public TimeStamp DecrBy(string key, double value, TimeStamp? timestamp = null, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null)
5975
{
6076
return _db.Execute(TimeSeriesCommandsBuilder.DecrBy(key, value, timestamp, retentionTime,
6177
labels, uncompressed, chunkSizeBytes)).ToTimeStamp();
6278
}
6379

80+
/// <inheritdoc/>
81+
public TimeStamp DecrBy(string key, TsDecrByParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.DecrBy(key, parameters)).ToTimeStamp();
82+
6483
/// <inheritdoc/>
6584
public long Del(string key, TimeStamp fromTimeStamp, TimeStamp toTimeStamp)
6685
{

src/NRedisStack/TimeSeries/TimeSeriesCommandsAsync.cs

+22
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,42 @@ public TimeSeriesCommandsAsync(IDatabaseAsync db)
1414
#region Create
1515

1616
/// <inheritdoc/>
17+
[Obsolete("Please use the other method with TsCreateParams and check related builder TsCreateParamsBuilder to build parameters.")]
1718
public async Task<bool> CreateAsync(string key, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
1819
{
1920
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Create(key, retentionTime, labels,
2021
uncompressed, chunkSizeBytes,
2122
duplicatePolicy))).OKtoBoolean();
2223
}
2324

25+
/// <inheritdoc/>
26+
public async Task<bool> CreateAsync(string key, TsCreateParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Create(key, parameters))).OKtoBoolean();
27+
2428
#endregion
2529

2630
#region Update
2731

2832
/// <inheritdoc/>
33+
[Obsolete("Please use the other method with TsAlterParams and check related builder TsAlterParamsBuilder to build parameters.")]
2934
public async Task<bool> AlterAsync(string key, long? retentionTime = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null)
3035
{
3136
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Alter(key, retentionTime, chunkSizeBytes, duplicatePolicy, labels))).OKtoBoolean();
3237
}
3338

3439
/// <inheritdoc/>
40+
public async Task<bool> AlterAsync(string key, TsAlterParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Alter(key, parameters))).OKtoBoolean();
41+
42+
/// <inheritdoc/>
43+
[Obsolete("Please use the other method with TsAddParams and check related builder TsAddParamsBuilder to build parameters.")]
3544
public async Task<TimeStamp> AddAsync(string key, TimeStamp timestamp, double value, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
3645
{
3746
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Add(key, timestamp, value, retentionTime, labels,
3847
uncompressed, chunkSizeBytes, duplicatePolicy))).ToTimeStamp();
3948
}
4049

50+
/// <inheritdoc/>
51+
public async Task<TimeStamp> AddAsync(string key, TsAddParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Add(key, parameters))).ToTimeStamp();
52+
4153
/// <inheritdoc/>
4254
public async Task<IReadOnlyList<TimeStamp>> MAddAsync(IReadOnlyCollection<(string key, TimeStamp timestamp, double value)> sequence)
4355
{
@@ -51,13 +63,23 @@ public async Task<TimeStamp> IncrByAsync(string key, double value, TimeStamp? ti
5163
labels, uncompressed, chunkSizeBytes))).ToTimeStamp();
5264
}
5365

66+
67+
/// <inheritdoc/>
68+
[Obsolete("Please use the other method with TsIncrByParams and check related builder TsIncryByParamsBuilder to build parameters.")]
69+
public async Task<TimeStamp> IncrByAsync(string key, TsIncrByParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.IncrBy(key, parameters))).ToTimeStamp();
70+
71+
5472
/// <inheritdoc/>
5573
public async Task<TimeStamp> DecrByAsync(string key, double value, TimeStamp? timestamp = null, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null)
5674
{
5775
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.DecrBy(key, value, timestamp, retentionTime,
5876
labels, uncompressed, chunkSizeBytes))).ToTimeStamp();
5977
}
6078

79+
/// <inheritdoc/>
80+
[Obsolete("Please use the other method with TsDecrByParams and check related builder TsDecryByParamsBuilder to build parameters.")]
81+
public async Task<TimeStamp> DecrByAsync(string key, TsDecrByParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.DecrBy(key, parameters))).ToTimeStamp();
82+
6183
/// <inheritdoc/>
6284
public async Task<long> DelAsync(string key, TimeStamp fromTimeStamp, TimeStamp toTimeStamp)
6385
{

0 commit comments

Comments
 (0)