Skip to content

Commit 0b93c3a

Browse files
authored
simplify gossip (#1792)
1 parent f1aa77f commit 0b93c3a

File tree

2 files changed

+6
-93
lines changed

2 files changed

+6
-93
lines changed

src/Proto.Cluster/Gossip/GossipActor.cs

+6-90
Original file line numberDiff line numberDiff line change
@@ -98,49 +98,11 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest)
9898
}
9999

100100
ReceiveState(context, gossipRequest.State);
101+
102+
context.Respond(new GossipResponse());
101103

102-
//it's OK, we might not just yet be aware of this member yet....
103-
104-
// if (!context.Cluster().MemberList.ContainsMemberId(gossipRequest.MemberId))
105-
// {
106-
// Logger.LogWarning("Got gossip request from unknown member {MemberId}", gossipRequest.MemberId);
107-
//
108-
// // Nothing to send, do not provide sender or state payload
109-
// context.Respond(new GossipResponse());
110-
// return Task.CompletedTask;
111-
// }
112-
113-
var memberState = _internal.GetMemberStateDelta(gossipRequest.MemberId);
114-
115-
if (!memberState.HasState)
116-
{
117-
if (Logger.IsEnabled(LogLevel.Debug))
118-
{
119-
Logger.LogDebug("Got gossip request from member {MemberId}, but no state was found",
120-
gossipRequest.MemberId);
121-
}
122-
123-
// Nothing to send, do not provide sender or state payload
124-
context.Respond(new GossipResponse());
125-
126-
return Task.CompletedTask;
127-
}
128-
129-
context.Respond(new GossipResponse
130-
{
131-
State = memberState.State.Clone() //ensure we have a copy and not state that might mutate
132-
});
133-
104+
134105
return Task.CompletedTask;
135-
136-
//this code is broken
137-
//
138-
// context.RequestReenter<GossipResponseAck>(context.Sender!, new GossipResponse
139-
// {
140-
// State = memberState.State.Clone(), //ensure we have a copy and not state that might mutate
141-
// }, task => ReenterAfterResponseAck(context, task, memberState), CancellationTokens.WithTimeout(_gossipRequestTimeout));
142-
//
143-
// return Task.CompletedTask;
144106
}
145107

146108
private void ReceiveState(IContext context, GossipState remoteState)
@@ -189,7 +151,7 @@ private void SendGossipForMember(IContext context, Member member, InstanceLogger
189151
//a short timeout is massively important, we cannot afford hanging around waiting for timeout, blocking other gossips from getting through
190152

191153
// This will return a GossipResponse, but since we need could need to get the sender, we do not unpack it from the MessageEnvelope
192-
context.RequestReenter<object>(pid, new GossipRequest
154+
context.RequestReenter<GossipResponse>(pid, new GossipRequest
193155
{
194156
MemberId = context.System.Id,
195157
State = memberStateDelta.State.Clone() //ensure we have a copy and not send state that might mutate
@@ -199,29 +161,14 @@ private void SendGossipForMember(IContext context, Member member, InstanceLogger
199161
);
200162
}
201163

202-
private async Task GossipReenterAfterSend(IContext context, Task<object> task, MemberStateDelta delta)
164+
private async Task GossipReenterAfterSend(IContext context, Task<GossipResponse> task, MemberStateDelta delta)
203165
{
204166
var logger = context.Logger();
205167

206168
try
207169
{
208170
await task;
209-
var envelope = task.Result as MessageEnvelope;
210-
211-
if (envelope?.Message is GossipResponse response)
212-
{
213-
delta.CommitOffsets();
214-
215-
if (response.State is not null)
216-
{
217-
ReceiveState(context, response.State!);
218-
219-
if (envelope.Sender is not null)
220-
{
221-
context.Send(envelope.Sender, new GossipResponseAck());
222-
}
223-
}
224-
}
171+
delta.CommitOffsets();
225172
}
226173
catch (DeadLetterException)
227174
{
@@ -244,35 +191,4 @@ private async Task GossipReenterAfterSend(IContext context, Task<object> task, M
244191
Logger.LogError(x, "GossipReenterAfterSend failed");
245192
}
246193
}
247-
248-
private async Task ReenterAfterResponseAck(IContext context, Task<GossipResponseAck> task, MemberStateDelta delta)
249-
{
250-
var logger = context.Logger();
251-
252-
try
253-
{
254-
await task;
255-
delta.CommitOffsets();
256-
}
257-
catch (DeadLetterException)
258-
{
259-
logger?.LogWarning("DeadLetter");
260-
Logger.LogWarning("DeadLetter in ReenterAfterResponseAck");
261-
}
262-
catch (OperationCanceledException)
263-
{
264-
logger?.LogWarning("Timeout");
265-
Logger.LogWarning("Timeout in ReenterAfterResponseAck");
266-
}
267-
catch (TimeoutException)
268-
{
269-
logger?.LogWarning("Timeout");
270-
Logger.LogWarning("Timeout in ReenterAfterResponseAck");
271-
}
272-
catch (Exception x)
273-
{
274-
logger?.LogError(x, "ReenterAfterResponseAck failed");
275-
Logger.LogError(x, "ReenterAfterResponseAck failed");
276-
}
277-
}
278194
}

src/Proto.Cluster/GossipContracts.proto

-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@ message GossipResponse {
1414
GossipState state = 1;
1515
}
1616

17-
message GossipResponseAck { // Or just use empty GossipResponse?
18-
}
19-
2017
//two GossipState objects can be merged
2118
//key + member_id gets it's own entry, if collision, highest version is selected
2219
message GossipState {

0 commit comments

Comments
 (0)