Skip to content

Commit 4217c52

Browse files
committed
transport: fix logical race in flow control
Remove the add and cancel methods of quotaPool. Their use is not required, and leads to logical races when used concurrently from multiple goroutines. Rename the reset method to add. The typical way that a goroutine claims quota is to call the add method and then to select on the channel returned by the acquire method. If two goroutines are both trying to claim quota from a single quotaPool, the second call to the add method can happen before the first attempt to read from the channel. When that happens the second goroutine to attempt the channel read will end up waiting for a very long time, in spite of its efforts to prepare the channel for reading. The quotaPool will always behave correctly when any positive quota is on the channel rather than stored in the struct field. In the opposite case, when positive quota is only in the struct field and not on the channel, users of the quotaPool can fail to access available quota. Err on the side of storing any positive quota in the channel. This includes a reproducer for #632, which fails on many runs with this package at v1.0.4. The frequency of the test failures depends on how stressed the server is, since it's now effectively checking for weird interleavings of goroutines. It passes reliably with these changes to the transport package. The behavior described in #734 (an RPC with a streaming response hangs unexpectedly) matches what I've seen in my programs, and what I see in the test case added here. If it's a logical flow control bug, this change may well fix it. Updates #632 Updates #734
1 parent cc3363f commit 4217c52

File tree

4 files changed

+125
-38
lines changed

4 files changed

+125
-38
lines changed

test/end2end_test.go

+119
Original file line numberDiff line numberDiff line change
@@ -3188,6 +3188,124 @@ func TestServerCredsDispatch(t *testing.T) {
31883188
}
31893189
}
31903190

3191+
func TestFlowControlIssue632(t *testing.T) {
3192+
// Test for a regression of https://github.com/grpc/grpc-go/issues/632,
3193+
// and other flow control bugs.
3194+
3195+
defer leakCheck(t)()
3196+
3197+
const (
3198+
itemCount = 100
3199+
itemSize = 1 << 10
3200+
recvCount = 2
3201+
maxFailures = 3
3202+
3203+
totalTimeout = 30 * time.Second
3204+
requestTimeout = time.Second
3205+
)
3206+
3207+
requestCount := 10000
3208+
if raceMode {
3209+
requestCount = 1000
3210+
}
3211+
3212+
lis, err := net.Listen("tcp", ":0")
3213+
if err != nil {
3214+
t.Fatalf("Failed to listen: %v", err)
3215+
}
3216+
defer lis.Close()
3217+
3218+
s := grpc.NewServer()
3219+
testpb.RegisterTestServiceServer(s, &issue632server{
3220+
itemCount: itemCount,
3221+
itemSize: itemSize,
3222+
})
3223+
defer s.Stop()
3224+
3225+
go s.Serve(lis)
3226+
3227+
ctx := context.Background()
3228+
ctx, _ = context.WithTimeout(ctx, totalTimeout)
3229+
defer func() {
3230+
if ctx.Err() == context.DeadlineExceeded {
3231+
t.Fatalf("test timed out")
3232+
}
3233+
}()
3234+
3235+
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
3236+
if err != nil {
3237+
t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
3238+
}
3239+
defer cc.Close()
3240+
cl := testpb.NewTestServiceClient(cc)
3241+
3242+
failures := 0
3243+
for i := 0; i < requestCount; i++ {
3244+
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
3245+
output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
3246+
if err != nil {
3247+
t.Fatalf("StreamingOutputCall; err = %q", err)
3248+
}
3249+
3250+
j := 0
3251+
loop:
3252+
for ; j < recvCount; j++ {
3253+
_, err := output.Recv()
3254+
if err != nil {
3255+
if err == io.EOF {
3256+
break loop
3257+
}
3258+
switch grpc.Code(err) {
3259+
case codes.DeadlineExceeded:
3260+
break loop
3261+
default:
3262+
t.Fatalf("Recv; err = %q", err)
3263+
}
3264+
}
3265+
}
3266+
cancel()
3267+
<-ctx.Done()
3268+
3269+
if j < recvCount {
3270+
t.Errorf("got %d responses to request %d", j, i)
3271+
failures++
3272+
if failures >= maxFailures {
3273+
// Continue past the first failure to see if the connection is
3274+
// entirely broken, or if only a single RPC was affected
3275+
break
3276+
}
3277+
}
3278+
}
3279+
}
3280+
3281+
type issue632server struct {
3282+
testpb.TestServiceServer
3283+
3284+
itemSize int
3285+
itemCount int
3286+
}
3287+
3288+
func (s *issue632server) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error {
3289+
for i := 0; i < s.itemCount; i++ {
3290+
err := srv.Send(&testpb.StreamingOutputCallResponse{
3291+
Payload: &testpb.Payload{
3292+
// Sending a large stream of data which the client reject
3293+
// helps to trigger some types of flow control bugs.
3294+
//
3295+
// Reallocating memory here is inefficient, but the stress it
3296+
// puts on the GC leads to more frequent flow control
3297+
// failures. The GC likely causes more variety in the
3298+
// goroutine scheduling orders.
3299+
Body: bytes.Repeat([]byte("a"), s.itemSize),
3300+
},
3301+
})
3302+
if err != nil {
3303+
return err
3304+
}
3305+
}
3306+
return nil
3307+
}
3308+
31913309
// interestingGoroutines returns all goroutines we care about for the purpose
31923310
// of leak checking. It excludes testing or runtime ones.
31933311
func interestingGoroutines() (gs []string) {
@@ -3208,6 +3326,7 @@ func interestingGoroutines() (gs []string) {
32083326
strings.Contains(stack, "testing.tRunner(") ||
32093327
strings.Contains(stack, "runtime.goexit") ||
32103328
strings.Contains(stack, "created by runtime.gc") ||
3329+
strings.Contains(stack, "created by runtime/trace.Start") ||
32113330
strings.Contains(stack, "created by google3/base/go/log.init") ||
32123331
strings.Contains(stack, "interestingGoroutines") ||
32133332
strings.Contains(stack, "runtime.MHeap_Scavenger") ||

transport/control.go

+2-28
Original file line numberDiff line numberDiff line change
@@ -111,35 +111,9 @@ func newQuotaPool(q int) *quotaPool {
111111
return qb
112112
}
113113

114-
// add adds n to the available quota and tries to send it on acquire.
115-
func (qb *quotaPool) add(n int) {
116-
qb.mu.Lock()
117-
defer qb.mu.Unlock()
118-
qb.quota += n
119-
if qb.quota <= 0 {
120-
return
121-
}
122-
select {
123-
case qb.c <- qb.quota:
124-
qb.quota = 0
125-
default:
126-
}
127-
}
128-
129-
// cancel cancels the pending quota sent on acquire, if any.
130-
func (qb *quotaPool) cancel() {
131-
qb.mu.Lock()
132-
defer qb.mu.Unlock()
133-
select {
134-
case n := <-qb.c:
135-
qb.quota += n
136-
default:
137-
}
138-
}
139-
140-
// reset cancels the pending quota sent on acquired, incremented by v and sends
114+
// add cancels the pending quota sent on acquired, incremented by v and sends
141115
// it back on acquire.
142-
func (qb *quotaPool) reset(v int) {
116+
func (qb *quotaPool) add(v int) {
143117
qb.mu.Lock()
144118
defer qb.mu.Unlock()
145119
select {

transport/http2_client.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
367367
}
368368
t.mu.Unlock()
369369
if reset {
370-
t.streamsQuota.reset(-1)
370+
t.streamsQuota.add(-1)
371371
}
372372

373373
// HPACK encodes various headers. Note that once WriteField(...) is
@@ -614,9 +614,6 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
614614
// Wait until the transport has some quota to send the data.
615615
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
616616
if err != nil {
617-
if _, ok := err.(StreamError); ok || err == io.EOF {
618-
t.sendQuotaPool.cancel()
619-
}
620617
return err
621618
}
622619
if sq < size {
@@ -1035,13 +1032,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
10351032
t.maxStreams = int(s.Val)
10361033
t.mu.Unlock()
10371034
if reset {
1038-
t.streamsQuota.reset(int(s.Val) - ms)
1035+
t.streamsQuota.add(int(s.Val) - ms)
10391036
}
10401037
case http2.SettingInitialWindowSize:
10411038
t.mu.Lock()
10421039
for _, stream := range t.activeStreams {
10431040
// Adjust the sending quota for each stream.
1044-
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
1041+
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
10451042
}
10461043
t.streamSendQuota = s.Val
10471044
t.mu.Unlock()

transport/http2_server.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -636,9 +636,6 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
636636
// Wait until the transport has some quota to send the data.
637637
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
638638
if err != nil {
639-
if _, ok := err.(StreamError); ok {
640-
t.sendQuotaPool.cancel()
641-
}
642639
return err
643640
}
644641
if sq < size {
@@ -706,7 +703,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
706703
t.mu.Lock()
707704
defer t.mu.Unlock()
708705
for _, stream := range t.activeStreams {
709-
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
706+
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
710707
}
711708
t.streamSendQuota = s.Val
712709
}

0 commit comments

Comments
 (0)