@@ -497,10 +497,12 @@ namespace net_utils
497
497
if (m_state.socket .cancel_write ) {
498
498
m_state.socket .cancel_write = false ;
499
499
m_state.data .write .queue .clear ();
500
+ m_state.data .write .total_bytes = 0 ;
500
501
state_status_check ();
501
502
}
502
503
else if (ec.value ()) {
503
504
m_state.data .write .queue .clear ();
505
+ m_state.data .write .total_bytes = 0 ;
504
506
interrupt ();
505
507
}
506
508
else {
@@ -525,8 +527,11 @@ namespace net_utils
525
527
526
528
start_timer (get_default_timeout (), true );
527
529
}
528
- assert (bytes_transferred == m_state.data .write .queue .back ().size ());
530
+ const std::size_t byte_count = m_state.data .write .queue .back ().size ();
531
+ assert (bytes_transferred == byte_count);
529
532
m_state.data .write .queue .pop_back ();
533
+ m_state.data .write .total_bytes -=
534
+ std::min (m_state.data .write .total_bytes , byte_count);
530
535
m_state.condition .notify_all ();
531
536
start_write ();
532
537
}
@@ -670,8 +675,9 @@ namespace net_utils
670
675
return ;
671
676
if (m_state.timers .throttle .out .wait_expire )
672
677
return ;
673
- if (m_state.socket .wait_write )
674
- return ;
678
+ // \NOTE See on_terminating() comments
679
+ // if (m_state.socket.wait_write)
680
+ // return;
675
681
if (m_state.socket .wait_shutdown )
676
682
return ;
677
683
if (m_state.protocol .wait_init )
@@ -729,8 +735,13 @@ namespace net_utils
729
735
return ;
730
736
if (m_state.timers .throttle .out .wait_expire )
731
737
return ;
732
- if (m_state.socket .wait_write )
733
- return ;
738
+ // Writes cannot be canceled due to `async_write` being a "composed"
739
+ // handler. ASIO has new cancellation routines, not available in 1.66, to
740
+ // handle this situation. The problem is that if cancel is called after an
741
+ // intermediate handler is queued, the op will not check the cancel flag in
742
+ // our code, and will instead queue up another write.
743
+ // if (m_state.socket.wait_write)
744
+ // return;
734
745
if (m_state.socket .wait_shutdown )
735
746
return ;
736
747
if (m_state.protocol .wait_init )
@@ -757,6 +768,8 @@ namespace net_utils
757
768
std::lock_guard<std::mutex> guard (m_state.lock );
758
769
if (m_state.status != status_t ::RUNNING || m_state.socket .wait_handshake )
759
770
return false ;
771
+ if (std::numeric_limits<std::size_t >::max () - m_state.data .write .total_bytes < message.size ())
772
+ return false ;
760
773
761
774
// Wait for the write queue to fall below the max. If it doesn't after a
762
775
// randomized delay, drop the connection.
@@ -774,7 +787,14 @@ namespace net_utils
774
787
std::uniform_int_distribution<>(5000 , 6000 )(rng)
775
788
);
776
789
};
777
- if (m_state.data .write .queue .size () <= ABSTRACT_SERVER_SEND_QUE_MAX_COUNT)
790
+
791
+ // The bytes check intentionally does not include incoming message size.
792
+ // This allows for a soft overflow; a single http response will never fail
793
+ // this check, but multiple responses could. Clients can avoid this case
794
+ // by reading the entire response before making another request. P2P
795
+ // should never hit the MAX_BYTES check (when using default values).
796
+ if (m_state.data .write .queue .size () <= ABSTRACT_SERVER_SEND_QUE_MAX_COUNT &&
797
+ m_state.data .write .total_bytes <= static_cast <shared_state&>(connection_basic::get_state ()).response_soft_limit )
778
798
return true ;
779
799
m_state.data .write .wait_consume = true ;
780
800
bool success = m_state.condition .wait_for (
@@ -783,14 +803,23 @@ namespace net_utils
783
803
[this ]{
784
804
return (
785
805
m_state.status != status_t ::RUNNING ||
786
- m_state.data .write .queue .size () <=
787
- ABSTRACT_SERVER_SEND_QUE_MAX_COUNT
806
+ (
807
+ m_state.data .write .queue .size () <=
808
+ ABSTRACT_SERVER_SEND_QUE_MAX_COUNT &&
809
+ m_state.data .write .total_bytes <=
810
+ static_cast <shared_state&>(connection_basic::get_state ()).response_soft_limit
811
+ )
788
812
);
789
813
}
790
814
);
791
815
m_state.data .write .wait_consume = false ;
792
816
if (!success) {
793
- terminate ();
817
+ // synchronize with intermediate writes on `m_strand`
818
+ auto self = connection<T>::shared_from_this ();
819
+ boost::asio::post (m_strand, [this , self] {
820
+ std::lock_guard<std::mutex> guard (m_state.lock );
821
+ terminate ();
822
+ });
794
823
return false ;
795
824
}
796
825
else
@@ -816,7 +845,9 @@ namespace net_utils
816
845
) {
817
846
if (!wait_consume ())
818
847
return false ;
848
+ const std::size_t byte_count = message.size ();
819
849
m_state.data .write .queue .emplace_front (std::move (message));
850
+ m_state.data .write .total_bytes += byte_count;
820
851
start_write ();
821
852
}
822
853
else {
@@ -826,6 +857,7 @@ namespace net_utils
826
857
m_state.data .write .queue .emplace_front (
827
858
message.take_slice (CHUNK_SIZE)
828
859
);
860
+ m_state.data .write .total_bytes += m_state.data .write .queue .front ().size ();
829
861
start_write ();
830
862
}
831
863
}
@@ -1369,6 +1401,13 @@ namespace net_utils
1369
1401
}
1370
1402
// ---------------------------------------------------------------------------------
1371
1403
template <class t_protocol_handler >
1404
+ void boosted_tcp_server<t_protocol_handler>::set_response_soft_limit(const std::size_t limit)
1405
+ {
1406
+ assert (m_state != nullptr ); // always set in constructor
1407
+ m_state->response_soft_limit = limit;
1408
+ }
1409
+ // ---------------------------------------------------------------------------------
1410
+ template <class t_protocol_handler >
1372
1411
bool boosted_tcp_server<t_protocol_handler>::run_server(size_t threads_count, bool wait, const boost::thread::attributes& attrs)
1373
1412
{
1374
1413
TRY_ENTRY ();
0 commit comments