-
Notifications
You must be signed in to change notification settings - Fork 622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[manager/dispatcher] Synchronization fixes #2495
Conversation
manager/dispatcher/dispatcher.go
Outdated
@@ -125,8 +125,18 @@ type clusterUpdate struct { | |||
|
|||
// Dispatcher is responsible for dispatching tasks and tracking agent health. | |||
type Dispatcher struct { | |||
mu sync.Mutex | |||
wg sync.WaitGroup | |||
// Lock to provide mutually exclusive access to dispatcher fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start the comment with the name of the fields: // mu blah blah...
.
manager/dispatcher/dispatcher.go
Outdated
mu sync.Mutex | ||
// Flag to indicate shutdown and prevent new operations on the dispatcher. | ||
// Set by calling Stop(). | ||
shutdown bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to avoid using a tombstone channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caught up with @stevvooe offline. I agree that using a tombstone is cleaner, but some of the operations don't use a select {} so using a bool saves some code.
Codecov Report
@@ Coverage Diff @@
## master #2495 +/- ##
==========================================
+ Coverage 61.17% 62.38% +1.2%
==========================================
Files 49 129 +80
Lines 6898 21323 +14425
==========================================
+ Hits 4220 13302 +9082
- Misses 2243 6586 +4343
- Partials 435 1435 +1000 |
manager/dispatcher/dispatcher.go
Outdated
// Set shutdown to true. | ||
// This will prevent RPCs that start after stop() is called | ||
// from making progress and essentially puts the dispatcher in drain. | ||
d.shutdown = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't see any place where the shutdown is set back to false. Remember that the Dispatcher is not recreated but is reused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point! Will address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried by the fact that the CI was green...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, unfortunately the unit-test always creates a new dispatcher. I know.
Looking at adding a unit-test.
3773677
to
071a9b9
Compare
use an RWMutex instead of a regular one. There's only one case that writes (the actual shutdown) and everything else is reads. It's an easy win for optimization. In addition, this will allow you to eliminate the waitgroup, because you can just RLock the RWMutex for the duration of the method call, and the Stop method won't be able to acquire a write lock until all readlocks are released. Make sense? |
Using a RWMutex was the initial approach. Here's why I decided to use a wait group:
I realized that the shutdown flag is not really needed since we can inspect the dispatcher context to signal shutdown. This code is much simpler and also inline with the two points above. |
manager/dispatcher/dispatcher.go
Outdated
@@ -1137,6 +1157,9 @@ func (d *Dispatcher) getRootCACert() []byte { | |||
// a special boolean field Disconnect which if true indicates that node should | |||
// reconnect to another Manager immediately. | |||
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error { | |||
d.shutdownWait.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anshulpundir here and in a couple of places above, why isn't the check on d.shutdown
required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually nevermind, after your most recent comment, this is not relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're using the Dispatcher context to signal shutdown.
Note the order: we always increment the waitgroup first, followed by isRunningLocked(). If an RPC finds the context not cancelled, it will have +1 the WaitGroup, which will make sure that the Stop() function waits for this rpc to finish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that this is slightly clever, so make sure to document it somewhere that the ordering is what does the trick. It'll be hard to read later otherwise.
manager/dispatcher/dispatcher.go
Outdated
d.shutdownWait.Add(1) | ||
defer d.shutdownWait.Done() | ||
|
||
if !d.isRunning() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the version with no lock, so can potentially race with the shutdown, is it a problem?
If not can you add a comment saying the reason why it is fine like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not because Stop() will wait for Heardbeat() to complete since it has already incremented the waitgroup. Added a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wait of the WG is not the first operation of the Stop so the check for running here is racing with the beginning of the stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as I was saying maybe is not a problem but with this change you would have start the stop function and still process an extra heartbeat because the check isRunning check the bool with no lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this change you would have start the stop function and still process an extra heartbeat because the check isRunning check the bool with no lock
True. However is no correctness issue and the win is that we don't need to get a lock in Heartbeat(), which is the most frequent call on the dispatcher.
Signed-off-by: Anshul Pundir <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Alright, LGTM, we're shipping it. |
There are some potential races introduced by this PR causing tests to fail on moby/moby. We may have to fix that: moby/moby#36274 |
Fixes for synchronizing the dispatcher shutdown with in-progress rpcs. This addresses the case where the Dispatcher.Register() rpc races with Dispatcher.Stop() and reinserts into the dispatcher node store after it has been cleaned up by Stop().
We'll hold off on changing the grace period for agents until we have tested this fix at scale.