Skip to content

Commit f35fec9

Browse files
committed
http2: detect hung client connections by confirming stream resets
Consider the case of an unresponsive client connection, where the server has stopped responding. We send an infinite sequence of requests to the connection in sequence, each with a timeout. Each request counts against the concurrency limit for the connection while active, but when a request times out we send a RST_STREAM and free up the concurrency slot it was using. We continue to try to send requests to the connection forever (or until the kernel closes the underlying TCP connection, or until ReadIdleTimeout/WriteByteTimeout results in us closing the connection). Defend against this scenario by counting a canceled request against the connection concurrency limit until we confirm the server is responding. Specifically: Track the number of in-flight request cancellations in cc.pendingResets. This total counts against the connection concurrency limit. When sending a RST_STREAM for a canceled request, increment cc.pendingResets. Send a PING frame to the server, unless a PING is already in flight. When receiving a PING response, set cc.pendingResets to 0. A hung connection will be used for at most SETTINGS_MAX_CONCURRENT_STREAMS requests. When StrictMaxConcurrentStreams is false, we will create a new connection after reaching the concurrency limit for a hung one. When StrictMaxConcurrentStreams is true, we will continue to wait for the existing connection until some timeout closes it or it becomes responsive again. For golang/go#59690 Change-Id: I0151f9a594af14b32bcb6005a239fa19eb103704 Reviewed-on: https://go-review.googlesource.com/c/net/+/617655 LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Brad Fitzpatrick <[email protected]> Reviewed-by: Jonathan Amsterdam <[email protected]> Reviewed-by: Carlos Amedee <[email protected]>
1 parent e883dae commit f35fec9

File tree

3 files changed

+198
-7
lines changed

3 files changed

+198
-7
lines changed

http2/http2_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,11 @@ func TestNoUnicodeStrings(t *testing.T) {
283283
t.Fatal(err)
284284
}
285285
}
286+
287+
// must returns v if err is nil, or panics otherwise.
288+
func must[T any](v T, err error) T {
289+
if err != nil {
290+
panic(err)
291+
}
292+
return v
293+
}

http2/transport.go

+64-7
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,14 @@ type ClientConn struct {
364364
readIdleTimeout time.Duration
365365
pingTimeout time.Duration
366366

367+
// pendingResets is the number of RST_STREAM frames we have sent to the peer,
368+
// without confirming that the peer has received them. When we send a RST_STREAM,
369+
// we bundle it with a PING frame, unless a PING is already in flight. We count
370+
// the reset stream against the connection's concurrency limit until we get
371+
// a PING response. This limits the number of requests we'll try to send to a
372+
// completely unresponsive connection.
373+
pendingResets int
374+
367375
// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
368376
// Write to reqHeaderMu to lock it, read from it to unlock.
369377
// Lock reqmu BEFORE mu or wmu.
@@ -960,7 +968,7 @@ func (cc *ClientConn) State() ClientConnState {
960968
return ClientConnState{
961969
Closed: cc.closed,
962970
Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
963-
StreamsActive: len(cc.streams),
971+
StreamsActive: len(cc.streams) + cc.pendingResets,
964972
StreamsReserved: cc.streamsReserved,
965973
StreamsPending: cc.pendingRequests,
966974
LastIdle: cc.lastIdle,
@@ -992,7 +1000,13 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
9921000
// writing it.
9931001
maxConcurrentOkay = true
9941002
} else {
995-
maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams)
1003+
// We can take a new request if the total of
1004+
// - active streams;
1005+
// - reservation slots for new streams; and
1006+
// - streams for which we have sent a RST_STREAM and a PING,
1007+
// but received no subsequent frame
1008+
// is less than the concurrency limit.
1009+
maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
9961010
}
9971011

9981012
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
@@ -1002,6 +1016,12 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
10021016
return
10031017
}
10041018

1019+
// currentRequestCountLocked reports the number of concurrency slots currently in use,
1020+
// including active streams, reserved slots, and reset streams waiting for acknowledgement.
1021+
func (cc *ClientConn) currentRequestCountLocked() int {
1022+
return len(cc.streams) + cc.streamsReserved + cc.pendingResets
1023+
}
1024+
10051025
func (cc *ClientConn) canTakeNewRequestLocked() bool {
10061026
st := cc.idleStateLocked()
10071027
return st.canTakeNewRequest
@@ -1578,6 +1598,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
15781598
cs.reqBodyClosed = make(chan struct{})
15791599
}
15801600
bodyClosed := cs.reqBodyClosed
1601+
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
15811602
cc.mu.Unlock()
15821603
if mustCloseBody {
15831604
cs.reqBody.Close()
@@ -1602,16 +1623,40 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
16021623
if cs.sentHeaders {
16031624
if se, ok := err.(StreamError); ok {
16041625
if se.Cause != errFromPeer {
1605-
cc.writeStreamReset(cs.ID, se.Code, err)
1626+
cc.writeStreamReset(cs.ID, se.Code, false, err)
16061627
}
16071628
} else {
1608-
cc.writeStreamReset(cs.ID, ErrCodeCancel, err)
1629+
// We're cancelling an in-flight request.
1630+
//
1631+
// This could be due to the server becoming unresponsive.
1632+
// To avoid sending too many requests on a dead connection,
1633+
// we let the request continue to consume a concurrency slot
1634+
// until we can confirm the server is still responding.
1635+
// We do this by sending a PING frame along with the RST_STREAM
1636+
// (unless a ping is already in flight).
1637+
//
1638+
// For simplicity, we don't bother tracking the PING payload:
1639+
// We reset cc.pendingResets any time we receive a PING ACK.
1640+
//
1641+
// We skip this if the conn is going to be closed on idle,
1642+
// because it's short lived and will probably be closed before
1643+
// we get the ping response.
1644+
ping := false
1645+
if !closeOnIdle {
1646+
cc.mu.Lock()
1647+
if cc.pendingResets == 0 {
1648+
ping = true
1649+
}
1650+
cc.pendingResets++
1651+
cc.mu.Unlock()
1652+
}
1653+
cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
16091654
}
16101655
}
16111656
cs.bufPipe.CloseWithError(err) // no-op if already closed
16121657
} else {
16131658
if cs.sentHeaders && !cs.sentEndStream {
1614-
cc.writeStreamReset(cs.ID, ErrCodeNo, nil)
1659+
cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
16151660
}
16161661
cs.bufPipe.CloseWithError(errRequestCanceled)
16171662
}
@@ -1638,7 +1683,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
16381683
return errClientConnUnusable
16391684
}
16401685
cc.lastIdle = time.Time{}
1641-
if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) {
1686+
if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
16421687
return nil
16431688
}
16441689
cc.pendingRequests++
@@ -3065,6 +3110,11 @@ func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
30653110
close(c)
30663111
delete(cc.pings, f.Data)
30673112
}
3113+
if cc.pendingResets > 0 {
3114+
// See clientStream.cleanupWriteRequest.
3115+
cc.pendingResets = 0
3116+
cc.cond.Broadcast()
3117+
}
30683118
return nil
30693119
}
30703120
cc := rl.cc
@@ -3087,13 +3137,20 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
30873137
return ConnectionError(ErrCodeProtocol)
30883138
}
30893139

3090-
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
3140+
// writeStreamReset sends a RST_STREAM frame.
3141+
// When ping is true, it also sends a PING frame with a random payload.
3142+
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
30913143
// TODO: map err to more interesting error codes, once the
30923144
// HTTP community comes up with some. But currently for
30933145
// RST_STREAM there's no equivalent to GOAWAY frame's debug
30943146
// data, and the error codes are all pretty vague ("cancel").
30953147
cc.wmu.Lock()
30963148
cc.fr.WriteRSTStream(streamID, code)
3149+
if ping {
3150+
var payload [8]byte
3151+
rand.Read(payload[:])
3152+
cc.fr.WritePing(false, payload)
3153+
}
30973154
cc.bw.Flush()
30983155
cc.wmu.Unlock()
30993156
}

http2/transport_test.go

+126
Original file line numberDiff line numberDiff line change
@@ -2559,6 +2559,9 @@ func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
25592559
}
25602560
return true
25612561
},
2562+
func(f *PingFrame) bool {
2563+
return true
2564+
},
25622565
func(f *WindowUpdateFrame) bool {
25632566
if !oneDataFrame && !sentAdditionalData {
25642567
t.Fatalf("Got WindowUpdateFrame, don't expect one yet")
@@ -5512,3 +5515,126 @@ func TestTransport1xxLimits(t *testing.T) {
55125515
})
55135516
}
55145517
}
5518+
5519+
func TestTransportSendPingWithReset(t *testing.T) {
5520+
tc := newTestClientConn(t, func(tr *Transport) {
5521+
tr.StrictMaxConcurrentStreams = true
5522+
})
5523+
5524+
const maxConcurrent = 3
5525+
tc.greet(Setting{SettingMaxConcurrentStreams, maxConcurrent})
5526+
5527+
// Start several requests.
5528+
var rts []*testRoundTrip
5529+
for i := 0; i < maxConcurrent+1; i++ {
5530+
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
5531+
rt := tc.roundTrip(req)
5532+
if i >= maxConcurrent {
5533+
tc.wantIdle()
5534+
continue
5535+
}
5536+
tc.wantFrameType(FrameHeaders)
5537+
tc.writeHeaders(HeadersFrameParam{
5538+
StreamID: rt.streamID(),
5539+
EndHeaders: true,
5540+
BlockFragment: tc.makeHeaderBlockFragment(
5541+
":status", "200",
5542+
),
5543+
})
5544+
rt.wantStatus(200)
5545+
rts = append(rts, rt)
5546+
}
5547+
5548+
// Cancel one request. We send a PING frame along with the RST_STREAM.
5549+
rts[0].response().Body.Close()
5550+
tc.wantRSTStream(rts[0].streamID(), ErrCodeCancel)
5551+
pf := readFrame[*PingFrame](t, tc)
5552+
tc.wantIdle()
5553+
5554+
// Cancel another request. No PING frame, since one is in flight.
5555+
rts[1].response().Body.Close()
5556+
tc.wantRSTStream(rts[1].streamID(), ErrCodeCancel)
5557+
tc.wantIdle()
5558+
5559+
// Respond to the PING.
5560+
// This finalizes the previous resets, and allows the pending request to be sent.
5561+
tc.writePing(true, pf.Data)
5562+
tc.wantFrameType(FrameHeaders)
5563+
tc.wantIdle()
5564+
5565+
// Cancel the last request. We send another PING, since none are in flight.
5566+
rts[2].response().Body.Close()
5567+
tc.wantRSTStream(rts[2].streamID(), ErrCodeCancel)
5568+
tc.wantFrameType(FramePing)
5569+
tc.wantIdle()
5570+
}
5571+
5572+
func TestTransportConnBecomesUnresponsive(t *testing.T) {
5573+
// We send a number of requests in series to an unresponsive connection.
5574+
// Each request is canceled or times out without a response.
5575+
// Eventually, we open a new connection rather than trying to use the old one.
5576+
tt := newTestTransport(t)
5577+
5578+
const maxConcurrent = 3
5579+
5580+
t.Logf("first request opens a new connection and succeeds")
5581+
req1 := must(http.NewRequest("GET", "https://dummy.tld/", nil))
5582+
rt1 := tt.roundTrip(req1)
5583+
tc1 := tt.getConn()
5584+
tc1.wantFrameType(FrameSettings)
5585+
tc1.wantFrameType(FrameWindowUpdate)
5586+
hf1 := readFrame[*HeadersFrame](t, tc1)
5587+
tc1.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent})
5588+
tc1.wantFrameType(FrameSettings) // ack
5589+
tc1.writeHeaders(HeadersFrameParam{
5590+
StreamID: hf1.StreamID,
5591+
EndHeaders: true,
5592+
EndStream: true,
5593+
BlockFragment: tc1.makeHeaderBlockFragment(
5594+
":status", "200",
5595+
),
5596+
})
5597+
rt1.wantStatus(200)
5598+
rt1.response().Body.Close()
5599+
5600+
// Send more requests.
5601+
// None receive a response.
5602+
// Each is canceled.
5603+
for i := 0; i < maxConcurrent; i++ {
5604+
t.Logf("request %v receives no response and is canceled", i)
5605+
ctx, cancel := context.WithCancel(context.Background())
5606+
req := must(http.NewRequestWithContext(ctx, "GET", "https://dummy.tld/", nil))
5607+
tt.roundTrip(req)
5608+
if tt.hasConn() {
5609+
t.Fatalf("new connection created; expect existing conn to be reused")
5610+
}
5611+
tc1.wantFrameType(FrameHeaders)
5612+
cancel()
5613+
tc1.wantFrameType(FrameRSTStream)
5614+
if i == 0 {
5615+
tc1.wantFrameType(FramePing)
5616+
}
5617+
tc1.wantIdle()
5618+
}
5619+
5620+
// The conn has hit its concurrency limit.
5621+
// The next request is sent on a new conn.
5622+
req2 := must(http.NewRequest("GET", "https://dummy.tld/", nil))
5623+
rt2 := tt.roundTrip(req2)
5624+
tc2 := tt.getConn()
5625+
tc2.wantFrameType(FrameSettings)
5626+
tc2.wantFrameType(FrameWindowUpdate)
5627+
hf := readFrame[*HeadersFrame](t, tc2)
5628+
tc2.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent})
5629+
tc2.wantFrameType(FrameSettings) // ack
5630+
tc2.writeHeaders(HeadersFrameParam{
5631+
StreamID: hf.StreamID,
5632+
EndHeaders: true,
5633+
EndStream: true,
5634+
BlockFragment: tc2.makeHeaderBlockFragment(
5635+
":status", "200",
5636+
),
5637+
})
5638+
rt2.wantStatus(200)
5639+
rt2.response().Body.Close()
5640+
}

0 commit comments

Comments
 (0)