Skip to content

simplify gossip #1792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 6 additions & 90 deletions src/Proto.Cluster/Gossip/GossipActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,49 +98,11 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest)
}

ReceiveState(context, gossipRequest.State);

context.Respond(new GossipResponse());

//it's OK, we might not just yet be aware of this member yet....

// if (!context.Cluster().MemberList.ContainsMemberId(gossipRequest.MemberId))
// {
// Logger.LogWarning("Got gossip request from unknown member {MemberId}", gossipRequest.MemberId);
//
// // Nothing to send, do not provide sender or state payload
// context.Respond(new GossipResponse());
// return Task.CompletedTask;
// }

var memberState = _internal.GetMemberStateDelta(gossipRequest.MemberId);

if (!memberState.HasState)
{
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Got gossip request from member {MemberId}, but no state was found",
gossipRequest.MemberId);
}

// Nothing to send, do not provide sender or state payload
context.Respond(new GossipResponse());

return Task.CompletedTask;
}

context.Respond(new GossipResponse
{
State = memberState.State.Clone() //ensure we have a copy and not state that might mutate
});


return Task.CompletedTask;

//this code is broken
//
// context.RequestReenter<GossipResponseAck>(context.Sender!, new GossipResponse
// {
// State = memberState.State.Clone(), //ensure we have a copy and not state that might mutate
// }, task => ReenterAfterResponseAck(context, task, memberState), CancellationTokens.WithTimeout(_gossipRequestTimeout));
//
// return Task.CompletedTask;
}

private void ReceiveState(IContext context, GossipState remoteState)
Expand Down Expand Up @@ -189,7 +151,7 @@ private void SendGossipForMember(IContext context, Member member, InstanceLogger
//a short timeout is massively important, we cannot afford hanging around waiting for timeout, blocking other gossips from getting through

// This will return a GossipResponse, but since we need could need to get the sender, we do not unpack it from the MessageEnvelope
context.RequestReenter<object>(pid, new GossipRequest
context.RequestReenter<GossipResponse>(pid, new GossipRequest
{
MemberId = context.System.Id,
State = memberStateDelta.State.Clone() //ensure we have a copy and not send state that might mutate
Expand All @@ -199,29 +161,14 @@ private void SendGossipForMember(IContext context, Member member, InstanceLogger
);
}

private async Task GossipReenterAfterSend(IContext context, Task<object> task, MemberStateDelta delta)
private async Task GossipReenterAfterSend(IContext context, Task<GossipResponse> task, MemberStateDelta delta)
{
var logger = context.Logger();

try
{
await task;
var envelope = task.Result as MessageEnvelope;

if (envelope?.Message is GossipResponse response)
{
delta.CommitOffsets();

if (response.State is not null)
{
ReceiveState(context, response.State!);

if (envelope.Sender is not null)
{
context.Send(envelope.Sender, new GossipResponseAck());
}
}
}
delta.CommitOffsets();
}
catch (DeadLetterException)
{
Expand All @@ -244,35 +191,4 @@ private async Task GossipReenterAfterSend(IContext context, Task<object> task, M
Logger.LogError(x, "GossipReenterAfterSend failed");
}
}

private async Task ReenterAfterResponseAck(IContext context, Task<GossipResponseAck> task, MemberStateDelta delta)
{
var logger = context.Logger();

try
{
await task;
delta.CommitOffsets();
}
catch (DeadLetterException)
{
logger?.LogWarning("DeadLetter");
Logger.LogWarning("DeadLetter in ReenterAfterResponseAck");
}
catch (OperationCanceledException)
{
logger?.LogWarning("Timeout");
Logger.LogWarning("Timeout in ReenterAfterResponseAck");
}
catch (TimeoutException)
{
logger?.LogWarning("Timeout");
Logger.LogWarning("Timeout in ReenterAfterResponseAck");
}
catch (Exception x)
{
logger?.LogError(x, "ReenterAfterResponseAck failed");
Logger.LogError(x, "ReenterAfterResponseAck failed");
}
}
}
3 changes: 0 additions & 3 deletions src/Proto.Cluster/GossipContracts.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ message GossipResponse {
GossipState state = 1;
}

message GossipResponseAck { // Or just use empty GossipResponse?
}

//two GossipState objects can be merged
//key + member_id gets it's own entry, if collision, highest version is selected
message GossipState {
Expand Down