Skip to content

Commit c9d841c

Browse files
authored
Fixed TopicActor problem with nullref when loading subscribers (#1716)
* fixed problem with nullref when loading subscribers * test for default registration * limit cluster size to 1 for the empty kv store
1 parent fba274d commit c9d841c

File tree

5 files changed

+76
-17
lines changed

5 files changed

+76
-17
lines changed

src/Proto.Actor/Utils/EmptyKeyValueStore.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Proto.Utils;
1414
/// <typeparam name="T"></typeparam>
1515
public class EmptyKeyValueStore<T> : IKeyValueStore<T>
1616
{
17-
public Task<T> GetAsync(string id, CancellationToken ct) => Task.FromResult(default(T));
17+
public Task<T?> GetAsync(string id, CancellationToken ct) => Task.FromResult(default(T));
1818

1919
public Task SetAsync(string id, T state, CancellationToken ct) => Task.CompletedTask;
2020

src/Proto.Actor/Utils/IKeyValueStore.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public interface IKeyValueStore<T>
2222
/// <param name="id">Key</param>
2323
/// <param name="ct"></param>
2424
/// <returns></returns>
25-
Task<T> GetAsync(string id, CancellationToken ct);
25+
Task<T?> GetAsync(string id, CancellationToken ct);
2626

2727
/// <summary>
2828
/// Set the value for the given key.

src/Proto.Cluster/PubSub/TopicActor.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ private async Task<Subscribers> LoadSubscriptions(string topic)
221221
//TODO: cancellation token config?
222222
var state = await _subscriptionStore.GetAsync(topic, CancellationToken.None);
223223
Logger.LogDebug("Topic {Topic} loaded subscriptions {Subscriptions}", _topic, state);
224-
return state;
224+
return state ?? new Subscribers();
225225
}
226226
catch (Exception e)
227227
{

tests/Proto.Cluster.PubSub.Tests/PubSubClusterFixture.cs

+33-14
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public record Response;
1818

1919
public class PubSubClusterFixture : BaseInMemoryClusterFixture
2020
{
21+
private readonly bool _useDefaultTopicRegistration;
2122
public const string SubscriberKind = "Subscriber";
2223
public const string TimeoutSubscriberKind = "TimeoutSubscriber";
2324

@@ -28,26 +29,34 @@ public class PubSubClusterFixture : BaseInMemoryClusterFixture
2829

2930
public ITestOutputHelper? Output;
3031

31-
public PubSubClusterFixture() : base(3, config =>
32+
public PubSubClusterFixture() : this(3, false) {} // xunit requires single, public, parameterless constructor
33+
34+
internal PubSubClusterFixture(int clusterSize, bool useDefaultTopicRegistration) : base(clusterSize, config =>
3235
config
3336
.WithActorRequestTimeout(TimeSpan.FromSeconds(1))
3437
.WithPubSubConfig(PubSubConfig.Setup()
3538
.WithSubscriberTimeout(TimeSpan.FromSeconds(2))
3639
)
37-
)
38-
{
39-
}
40+
) => _useDefaultTopicRegistration = useDefaultTopicRegistration;
4041

4142
private readonly InMemorySubscribersStore _subscribersStore = new();
4243

4344
public Task<Subscribers> GetSubscribersForTopic(string topic) => _subscribersStore.GetAsync(topic, CancellationToken.None);
4445

45-
protected override ClusterKind[] ClusterKinds => new[]
46-
{
47-
new ClusterKind(TopicActor.Kind, Props.FromProducer(() => new TopicActor(_subscribersStore))),
48-
new ClusterKind(SubscriberKind, SubscriberProps()),
49-
new ClusterKind(TimeoutSubscriberKind, TimeoutSubscriberProps())
50-
};
46+
protected override ClusterKind[] ClusterKinds {
47+
get {
48+
var kinds = new List<ClusterKind>
49+
{
50+
new(SubscriberKind, SubscriberProps()),
51+
new(TimeoutSubscriberKind, TimeoutSubscriberProps())
52+
};
53+
54+
if(!_useDefaultTopicRegistration)
55+
kinds.Add(new ClusterKind(TopicActor.Kind, Props.FromProducer(() => new TopicActor(_subscribersStore))));
56+
57+
return kinds.ToArray();
58+
}
59+
}
5160

5261
private Props SubscriberProps()
5362
{
@@ -136,29 +145,39 @@ public async Task UnsubscribeAllFrom(string topic, string[] subscriberIds)
136145

137146
public async Task SubscribeTo(string topic, string identity, string kind = PubSubClusterFixture.SubscriberKind)
138147
{
139-
var subRes = await RandomMember().Subscribe(topic, identity, kind);
148+
var subRes = await RandomMember().Subscribe(topic, identity, kind, CancellationTokens.FromSeconds(5));
140149
if (subRes == null)
141150
Output?.WriteLine($"{kind}/{identity} failed to subscribe due to timeout");
151+
subRes.Should().NotBeNull("subscribing should not time out");
142152
}
143153

144154
public async Task UnsubscribeFrom(string topic, string identity, string kind = PubSubClusterFixture.SubscriberKind)
145155
{
146-
var unsubRes = await RandomMember().Unsubscribe(topic, identity, kind);
156+
var unsubRes = await RandomMember().Unsubscribe(topic, identity, kind, CancellationTokens.FromSeconds(5));
147157
if (unsubRes == null)
148158
Output?.WriteLine($"{kind}/{identity} failed to subscribe due to timeout");
159+
unsubRes.Should().NotBeNull("unsubscribing should not time out");
149160
}
150161

151162
public Task<PublishResponse?> PublishData(string topic, int data, CancellationToken cancel = default)
152-
=> RandomMember()
163+
{
164+
if(cancel == default) cancel = CancellationTokens.FromSeconds(5);
165+
166+
return RandomMember()
153167
.Publisher()
154168
.Publish(topic, new DataPublished(data), cancel);
169+
}
155170

156171
public Task<PublishResponse?> PublishDataBatch(string topic, int[] data, CancellationToken cancel = default)
157-
=> RandomMember()
172+
{
173+
if (cancel == default) cancel = CancellationTokens.FromSeconds(5);
174+
175+
return RandomMember()
158176
.Publisher()
159177
.PublishBatch(
160178
topic,
161179
data.Select(d => new DataPublished(d)).ToArray(),
162180
cancel
163181
);
182+
}
164183
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file = "PubSubDefaultTopicRegistrationTests.cs" company = "Asynkron AB">
3+
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
4+
// </copyright>
5+
// -----------------------------------------------------------------------
6+
using FluentAssertions;
7+
using Xunit;
8+
9+
namespace Proto.Cluster.PubSub.Tests;
10+
11+
[Collection("PubSub")] // The CI is just to slow to run cluster fixture based tests in parallel
12+
public class PubSubDefaultTopicRegistrationTests : IAsyncLifetime
13+
{
14+
private readonly PubSubClusterFixture _fixture;
15+
16+
public PubSubDefaultTopicRegistrationTests() => _fixture = new PubSubClusterFixture(clusterSize: 1, useDefaultTopicRegistration: true);
17+
18+
public Task InitializeAsync() => _fixture.InitializeAsync();
19+
20+
[Fact]
21+
public async Task Pub_sub_works_with_default_topic_registration()
22+
{
23+
var subscriberIds = _fixture.SubscriberIds("topic-default", 20);
24+
const string topic = "topic-default-registration";
25+
const int numMessages = 100;
26+
27+
await _fixture.SubscribeAllTo(topic, subscriberIds);
28+
29+
for (var i = 0; i < numMessages; i++)
30+
{
31+
var response = await _fixture.PublishData(topic, i);
32+
response.Should().NotBeNull("publishing should not time out");
33+
response!.Status.Should().Be(PublishStatus.Ok);
34+
}
35+
36+
await _fixture.VerifyAllSubscribersGotAllTheData(subscriberIds, numMessages);
37+
}
38+
39+
public Task DisposeAsync() => _fixture.DisposeAsync();
40+
}

0 commit comments

Comments
 (0)