@@ -831,3 +831,58 @@ async fn test_customize_connection_acquire() {
831
831
let connection_1_or_2 = pool. get ( ) . await . unwrap ( ) ;
832
832
assert ! ( connection_1_or_2. custom_field == 1 || connection_1_or_2. custom_field == 2 ) ;
833
833
}
834
+
835
+ #[ tokio:: test]
836
+ async fn test_broken_connections_dont_starve_pool ( ) {
837
+ use std:: sync:: RwLock ;
838
+ use std:: { convert:: Infallible , time:: Duration } ;
839
+
840
+ #[ derive( Default ) ]
841
+ struct ConnectionManager {
842
+ counter : RwLock < u16 > ,
843
+ }
844
+ #[ derive( Debug ) ]
845
+ struct Connection ;
846
+
847
+ #[ async_trait:: async_trait]
848
+ impl bb8:: ManageConnection for ConnectionManager {
849
+ type Connection = Connection ;
850
+ type Error = Infallible ;
851
+
852
+ async fn connect ( & self ) -> Result < Self :: Connection , Self :: Error > {
853
+ Ok ( Connection )
854
+ }
855
+
856
+ async fn is_valid ( & self , _: & mut Self :: Connection ) -> Result < ( ) , Self :: Error > {
857
+ Ok ( ( ) )
858
+ }
859
+
860
+ fn has_broken ( & self , _: & mut Self :: Connection ) -> bool {
861
+ let mut counter = self . counter . write ( ) . unwrap ( ) ;
862
+ let res = * counter < 5 ;
863
+ * counter += 1 ;
864
+ res
865
+ }
866
+ }
867
+
868
+ let pool = bb8:: Pool :: builder ( )
869
+ . max_size ( 5 )
870
+ . connection_timeout ( Duration :: from_secs ( 10 ) )
871
+ . build ( ConnectionManager :: default ( ) )
872
+ . await
873
+ . unwrap ( ) ;
874
+
875
+ let mut futures = Vec :: new ( ) ;
876
+
877
+ for _ in 0 ..10 {
878
+ let pool = pool. clone ( ) ;
879
+ futures. push ( tokio:: spawn ( async move {
880
+ let conn = pool. get ( ) . await . unwrap ( ) ;
881
+ drop ( conn) ;
882
+ } ) ) ;
883
+ }
884
+
885
+ for future in futures {
886
+ future. await . unwrap ( ) ;
887
+ }
888
+ }
0 commit comments