Skip to content

Commit b668c7f

Browse files
seanmonstardswij
andauthored
fix: streams awaiting capacity lockout (#730) (#734)
This PR changes the the assign-capacity queue to prioritize streams that are send-ready. This is necessary to prevent a lockout when streams aren't able to proceed while waiting for connection capacity, but there is none. Closes hyperium/hyper#3338 Co-authored-by: dswij <[email protected]>
1 parent 0f412d8 commit b668c7f

File tree

3 files changed

+106
-3
lines changed

3 files changed

+106
-3
lines changed

src/error.rs

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub struct Error {
2525
#[derive(Debug)]
2626
enum Kind {
2727
/// A RST_STREAM frame was received or sent.
28+
#[allow(dead_code)]
2829
Reset(StreamId, Reason, Initiator),
2930

3031
/// A GO_AWAY frame was received or sent.

src/proto/streams/prioritize.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,15 @@ impl Prioritize {
184184
stream.requested_send_capacity =
185185
cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186186

187-
self.try_assign_capacity(stream);
187+
// `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188+
// cannot be assigned at the time it is called.
189+
//
190+
// Streams over the max concurrent count will still call `send_data` so we should be
191+
// careful not to put it into `pending_capacity` as it will starve the connection
192+
// capacity for other streams
193+
if !stream.is_pending_open {
194+
self.try_assign_capacity(stream);
195+
}
188196
}
189197

190198
if frame.is_end_stream() {
@@ -522,6 +530,7 @@ impl Prioritize {
522530
loop {
523531
if let Some(mut stream) = self.pop_pending_open(store, counts) {
524532
self.pending_send.push_front(&mut stream);
533+
self.try_assign_capacity(&mut stream);
525534
}
526535

527536
match self.pop_frame(buffer, store, max_frame_len, counts) {

tests/h2-tests/tests/prioritization.rs

+95-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use futures::future::join;
2-
use futures::{FutureExt, StreamExt};
1+
use futures::future::{join, select};
2+
use futures::{pin_mut, FutureExt, StreamExt};
3+
34
use h2_support::prelude::*;
45
use h2_support::DEFAULT_WINDOW_SIZE;
56
use std::task::Context;
@@ -408,3 +409,95 @@ async fn send_data_receive_window_update() {
408409

409410
join(mock, h2).await;
410411
}
412+
413+
#[tokio::test]
414+
async fn stream_count_over_max_stream_limit_does_not_starve_capacity() {
415+
use tokio::sync::oneshot;
416+
417+
h2_support::trace_init!();
418+
419+
let (io, mut srv) = mock::new();
420+
421+
let (tx, rx) = oneshot::channel();
422+
423+
let srv = async move {
424+
let _ = srv
425+
.assert_client_handshake_with_settings(
426+
frames::settings()
427+
// super tiny server
428+
.max_concurrent_streams(1),
429+
)
430+
.await;
431+
srv.recv_frame(frames::headers(1).request("POST", "http://example.com/"))
432+
.await;
433+
434+
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
435+
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
436+
srv.recv_frame(frames::data(1, vec![0; 16384])).await;
437+
srv.recv_frame(frames::data(1, vec![0; 16383]).eos()).await;
438+
srv.send_frame(frames::headers(1).response(200).eos()).await;
439+
440+
// All of these connection capacities should be assigned to stream 3
441+
srv.send_frame(frames::window_update(0, 16384)).await;
442+
srv.send_frame(frames::window_update(0, 16384)).await;
443+
srv.send_frame(frames::window_update(0, 16384)).await;
444+
srv.send_frame(frames::window_update(0, 16383)).await;
445+
446+
// StreamId(3) should be able to send all of its request with the conn capacity
447+
srv.recv_frame(frames::headers(3).request("POST", "http://example.com/"))
448+
.await;
449+
srv.recv_frame(frames::data(3, vec![0; 16384])).await;
450+
srv.recv_frame(frames::data(3, vec![0; 16384])).await;
451+
srv.recv_frame(frames::data(3, vec![0; 16384])).await;
452+
srv.recv_frame(frames::data(3, vec![0; 16383]).eos()).await;
453+
srv.send_frame(frames::headers(3).response(200).eos()).await;
454+
455+
// Then all the future stream is guaranteed to be send-able by induction
456+
tx.send(()).unwrap();
457+
};
458+
459+
fn request() -> Request<()> {
460+
Request::builder()
461+
.method(Method::POST)
462+
.uri("http://example.com/")
463+
.body(())
464+
.unwrap()
465+
}
466+
467+
let client = async move {
468+
let (mut client, mut conn) = client::Builder::new()
469+
.handshake::<_, Bytes>(io)
470+
.await
471+
.expect("handshake");
472+
473+
let (req1, mut send1) = client.send_request(request(), false).unwrap();
474+
let (req2, mut send2) = client.send_request(request(), false).unwrap();
475+
476+
// Use up the connection window.
477+
send1.send_data(vec![0; 65535].into(), true).unwrap();
478+
// Queue up for more connection window.
479+
send2.send_data(vec![0; 65535].into(), true).unwrap();
480+
481+
// Queue up more pending open streams
482+
for _ in 0..5 {
483+
let (_, mut send) = client.send_request(request(), false).unwrap();
484+
send.send_data(vec![0; 65535].into(), true).unwrap();
485+
}
486+
487+
let response = conn.drive(req1).await.unwrap();
488+
assert_eq!(response.status(), StatusCode::OK);
489+
490+
let response = conn.drive(req2).await.unwrap();
491+
assert_eq!(response.status(), StatusCode::OK);
492+
493+
let _ = rx.await;
494+
};
495+
496+
let task = join(srv, client);
497+
pin_mut!(task);
498+
499+
let t = tokio::time::sleep(Duration::from_secs(5)).map(|_| panic!("time out"));
500+
pin_mut!(t);
501+
502+
select(task, t).await;
503+
}

0 commit comments

Comments
 (0)