Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC stream API gateway implementation #11713

Closed
Tracked by #11231
npepinpe opened this issue Feb 16, 2023 · 5 comments · Fixed by #12310
Closed
Tracked by #11231

Add gRPC stream API gateway implementation #11713

npepinpe opened this issue Feb 16, 2023 · 5 comments · Fixed by #12310
Assignees
Labels
kind/toil Categorizes an issue or PR as general maintenance, i.e. cleanup, refactoring, etc. version:8.3.0-alpha1 Marks an issue as being completely or in parts released in 8.3.0-alpha1 version:8.3.0 Marks an issue as being completely or in parts released in 8.3.0

Comments

@npepinpe
Copy link
Member

npepinpe commented Feb 16, 2023

Description

When a client sends a job stream call (as defined in #11708), the gateway should keep track of these in an in-memory register.

This will be a long living stream observer. It's important that the gateway properly keep track of it, including when it is closed (gracefully or not, including cancellations), and should only be available to receive jobs when it is ready.

See the prototype for an example.

Clients which are logically equivalent based on their activation properties should be grouped together using some unique ID. This unique ID will be used to communicate with the broker when it comes to adding/removing workers, and pushing jobs.

When a new logical client is added, then gateway should send a AddWorker request to all brokers. If the broker returns that the request is invalid, then the error should be propagated to the client. Any other errors should be retried.

Note We may want to limit this of course, but in theory if a broker never replies successfully but the request is correct, likely the broker is dead and will be removed from the topology soon.

If there are no brokers, the client is not cancelled or closed.

When a new broker is added to the topology, an AddWorker request should be sent for all workers.

Note We can look into batching this as an optimization in a further iteration.

When a broker is removed from the topology, nothing is to be done.

When the gateway is shutting down, it should send a RemoveWorkers with its own member ID to all brokers. Requests are not retried here, of course, it's just a best-of effort to be polite.

@deepthidevaki
Copy link
Contributor

deepthidevaki commented Mar 24, 2023

A first version for stream-management support for gateway is available in PR #12116. For easy tracking, I'm listing pending tasks here:

  • Aggregate multiple client streams with same metadata and streamType to a single stream. This should be done in transport, and transparently to the consumers of the API ClientStreamer#add(streamType, metadata, consumer). Aggregate multiple client streams with same metadata and streamType to a single stream #12253
  • Re-visit how to handle node restarts. Currently, it fully relies on MembershipService to notify when a node is added or removed. However, there are some edge-cases which we have to handle. For example, if the broker restarts too fast, gateway may not detect the broker is dead, and as a result will not notify when the broker is restarted.
  • Optimal handling of retries. We might have to stop retrying sending add/remove request to a server at some point. Currently, it is retried indefinitely. It might also be good to retry with a backoff.
  • Integrate transport stream-management with gateway so that implementation of grpc api Add gRPC job stream API #11708 can use them.

@npepinpe
Copy link
Member Author

Can we add these points to the open questions in the epic issue? We should evaluate the cost of not doing these now, and the benefits of adding them in the second phase. If we're not sure about the benefits, then identify scenarios which will help us evaluate these.

Except the last one of course, that we just have to do 😄

@deepthidevaki
Copy link
Contributor

Can we add these points to the open questions in the epic issue?

I expect that most of these will be done in the scope of this issue. Let's first discuss what is necessary and what can be postponed, and then move them to open questions. I will not close this issue until then. So we won't loose track of it.

ghost pushed a commit that referenced this issue Mar 29, 2023
12116: Add simple gateway stream management r=deepthidevaki a=deepthidevaki

## Description

- When a client stream is added, the stream manager opens a stream with all servers. 
- When a server is added, it is most likely that this server is restarted. So all previously opened stream would be unknown this new server. So all existing client stream is registered with this server.
- When a client stream is added, it is immediately added to an in-memory registry and the operation is considered successful. Adding stream to the servers is done asynchronously. Attempt to add a stream to a server is retried indefinitely until the server acknowledges.
- A placeholder interface for `ClientStreamConsumer` is also added. This is supposed to be used to push jobs back to the client.

Other changes included:
- To allow easier unit testing, all implementation that requires an actor accepts a `ConcurrencyControl` interface instead of directly extending `Actor`. To do this, we had to add some missing methods such as `call` and `runDelayed` to the interface. Adding `runDelayed` resulted in conflicts with other classes that implements runDelayed with a different return type. So `runDelayed` in `ConcurrencyControl` is renamed to `schedule`. This change resulted in a lot of noise in this PR, because all usage of `ActorControl#runDelayed` had to be renamed. f7bbb00

Out of scope for this PR:
- Aggregating multiple client streams with same metadata and streamType to a single stream.
- Detecting or repairing missing streams in client vs server. Currently, it fully relies on MembershipService to notify when a node is added or removed. However, there are some edge-cases which we have to handle. 
- Optimal handling of retries. We might have to stop retrying sending add/remove request to a server at some point. Currently, it is retried indefinitely. It might also be good to retry with a backoff.

## Related issues

relates #11713 

Can close the issue after some of the above out of scope items are also implemented.


Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
ghost pushed a commit that referenced this issue Mar 29, 2023
12116: Add simple gateway stream management r=deepthidevaki a=deepthidevaki

## Description

- When a client stream is added, the stream manager opens a stream with all servers. 
- When a server is added, it is most likely that this server is restarted. So all previously opened stream would be unknown this new server. So all existing client stream is registered with this server.
- When a client stream is added, it is immediately added to an in-memory registry and the operation is considered successful. Adding stream to the servers is done asynchronously. Attempt to add a stream to a server is retried indefinitely until the server acknowledges.
- A placeholder interface for `ClientStreamConsumer` is also added. This is supposed to be used to push jobs back to the client.

Other changes included:
- To allow easier unit testing, all implementation that requires an actor accepts a `ConcurrencyControl` interface instead of directly extending `Actor`. To do this, we had to add some missing methods such as `call` and `runDelayed` to the interface. Adding `runDelayed` resulted in conflicts with other classes that implements runDelayed with a different return type. So `runDelayed` in `ConcurrencyControl` is renamed to `schedule`. This change resulted in a lot of noise in this PR, because all usage of `ActorControl#runDelayed` had to be renamed. f7bbb00

Out of scope for this PR:
- Aggregating multiple client streams with same metadata and streamType to a single stream.
- Detecting or repairing missing streams in client vs server. Currently, it fully relies on MembershipService to notify when a node is added or removed. However, there are some edge-cases which we have to handle. 
- Optimal handling of retries. We might have to stop retrying sending add/remove request to a server at some point. Currently, it is retried indefinitely. It might also be good to retry with a backoff.

## Related issues

relates #11713 

Can close the issue after some of the above out of scope items are also implemented.


Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
@deepthidevaki
Copy link
Contributor

Another edge case to consider: If a client stream is removed, while there is an ongoing "AddRequest" retry, it is possible that the stream is added to the broker after it has been removed.

  • Allow cancelling ongoing retries

@deepthidevaki
Copy link
Contributor

  • Re-visit how to handle node restarts. Currently, it fully relies on MembershipService to notify when a node is added or removed. However, there are some edge-cases which we have to handle. For example, if the broker restarts too fast, gateway may not detect the broker is dead, and as a result will not notify when the broker is restarted.

This is not a problem. When a node is restarted, it's incarnation number will be chaged. Swim detects this and post and MEMBER_REMOVED, and MEMBER_ADDED event. So we will always get notified of node restarts, and we will re-send AddStream requests.

https://github.com/camunda/zeebe/blob/c055d58fc86afa675e27f7c8b7e6ce6110a29592/atomix/cluster/src/main/java/io/atomix/cluster/protocol/SwimMembershipProtocol.java#L251

    // If the term has been increased, update the member and record a gossip event.
    else if (member.incarnationNumber() > swimMember.getIncarnationNumber()) {
      // If the member's version has changed, remove the old member and add the new member.
      if (!Objects.equals(member.version(), swimMember.version())) {
        members.remove(member.id());
        post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, swimMember.copy()));
        ....
        LOGGER.debug("{} - Evicted member for new version {}", localMember.id(), swimMember);
        post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, swimMember.copy()));
        recordUpdate(swimMember.copy());
      }

@ghost ghost closed this as completed in d58828d Apr 11, 2023
@remcowesterhoud remcowesterhoud added version:8.3.0-alpha1 Marks an issue as being completely or in parts released in 8.3.0-alpha1 and removed version:8.3.0-alpha1 Marks an issue as being completely or in parts released in 8.3.0-alpha1 labels May 3, 2023
@megglos megglos added the version:8.3.0 Marks an issue as being completely or in parts released in 8.3.0 label Oct 5, 2023
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/toil Categorizes an issue or PR as general maintenance, i.e. cleanup, refactoring, etc. version:8.3.0-alpha1 Marks an issue as being completely or in parts released in 8.3.0-alpha1 version:8.3.0 Marks an issue as being completely or in parts released in 8.3.0
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants