Skip to content

Commit 165ca59

Browse files
tneelydjc
authored andcommitted
Add Pool::add
Fixes #212 This adds `Pool::add`, which allows for externally created connections to be added and managed by the pool. If the pool is at maximum capacity when this method is called, it will return the input connection as part of the Err response. I considered allowing `Pool:add` to ignore `max_size` when adding to the pool, but felt it could lead to confusion if the pool is allowed to exceed its capacity in this specific case. This change required making PoolInternals::approvals visible within the crate to get the approval needed to add a new connection. The alternative would have required defining a new pub(crate) method for this specific use case, which feels worse. I'm open to suggestions on how to more cleanly integrate this change into the package.
1 parent aef0076 commit 165ca59

File tree

5 files changed

+107
-2
lines changed

5 files changed

+107
-2
lines changed

bb8/src/api.rs

+35
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ impl<M: ManageConnection> Pool<M> {
7575
pub fn state(&self) -> State {
7676
self.inner.state()
7777
}
78+
79+
/// Adds a connection to the pool.
80+
///
81+
/// If the connection is broken, or the pool is at capacity, the
82+
/// connection is not added and instead returned to the caller in Err.
83+
pub fn add(&self, conn: M::Connection) -> Result<(), AddError<M::Connection>> {
84+
self.inner.try_put(conn)
85+
}
7886
}
7987

8088
/// Information about the state of a `Pool`.
@@ -526,6 +534,33 @@ where
526534
}
527535
}
528536

537+
/// Error type returned by `Pool::add(conn)`
538+
#[derive(Debug, Clone, PartialEq, Eq)]
539+
pub enum AddError<C> {
540+
/// The connection was broken before it could be added.
541+
Broken(C),
542+
/// Unable to add the connection to the pool due to insufficient capacity.
543+
NoCapacity(C),
544+
}
545+
546+
impl<E: error::Error + 'static> fmt::Display for AddError<E> {
547+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
548+
match *self {
549+
AddError::Broken(_) => write!(f, "The connection was broken before it could be added"),
550+
AddError::NoCapacity(_) => write!(
551+
f,
552+
"Unable to add the connection to the pool due to insufficient capacity"
553+
),
554+
}
555+
}
556+
}
557+
558+
impl<E: error::Error + 'static> error::Error for AddError<E> {
559+
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
560+
None
561+
}
562+
}
563+
529564
/// A trait to receive errors generated by connection management that aren't
530565
/// tied to any particular caller.
531566
pub trait ErrorSink<E>: fmt::Debug + Send + Sync + 'static {

bb8/src/inner.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use futures_util::TryFutureExt;
99
use tokio::spawn;
1010
use tokio::time::{interval_at, sleep, timeout, Interval};
1111

12-
use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
12+
use crate::api::{
13+
AddError, Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State,
14+
};
1315
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsGetKind, StatsKind};
1416

1517
pub(crate) struct PoolInner<M>
@@ -161,6 +163,15 @@ where
161163
}
162164
}
163165

166+
/// Adds an external connection to the pool if there is capacity for it.
167+
pub(crate) fn try_put(&self, mut conn: M::Connection) -> Result<(), AddError<M::Connection>> {
168+
if self.inner.manager.has_broken(&mut conn) {
169+
Err(AddError::Broken(conn))
170+
} else {
171+
self.inner.try_put(conn).map_err(AddError::NoCapacity)
172+
}
173+
}
174+
164175
/// Returns information about the current state of the pool.
165176
pub(crate) fn state(&self) -> State {
166177
self.inner

bb8/src/internals.rs

+11
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@ where
4747
(conn, approvals)
4848
}
4949

50+
pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
51+
let mut locked = self.internals.lock();
52+
let mut approvals = locked.approvals(&self.statics, 1);
53+
let Some(approval) = approvals.next() else {
54+
return Err(conn);
55+
};
56+
let conn = Conn::new(conn);
57+
locked.put(conn, Some(approval), self.clone());
58+
Ok(())
59+
}
60+
5061
pub(crate) fn reap(&self) -> ApprovalIter {
5162
let mut locked = self.internals.lock();
5263
let (iter, closed_idle_timeout, closed_max_lifetime) = locked.reap(&self.statics);

bb8/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
mod api;
3737
pub use api::{
38-
Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool,
38+
AddError, Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool,
3939
PooledConnection, QueueStrategy, RunError, State, Statistics,
4040
};
4141

bb8/tests/test.rs

+48
Original file line numberDiff line numberDiff line change
@@ -1020,3 +1020,51 @@ async fn test_statistics_connections_created() {
10201020

10211021
assert_eq!(pool.state().statistics.connections_created, 1);
10221022
}
1023+
1024+
#[tokio::test]
1025+
async fn test_can_use_added_connections() {
1026+
let pool = Pool::builder()
1027+
.connection_timeout(Duration::from_millis(1))
1028+
.build_unchecked(NthConnectionFailManager::<FakeConnection>::new(0));
1029+
1030+
// Assert pool can't replenish connections on its own
1031+
let res = pool.get().await;
1032+
assert_eq!(res.unwrap_err(), RunError::TimedOut);
1033+
1034+
pool.add(FakeConnection).unwrap();
1035+
let res = pool.get().await;
1036+
assert!(res.is_ok());
1037+
}
1038+
1039+
#[tokio::test]
1040+
async fn test_add_ok_until_max_size() {
1041+
let pool = Pool::builder()
1042+
.min_idle(1)
1043+
.max_size(3)
1044+
.build(OkManager::<FakeConnection>::new())
1045+
.await
1046+
.unwrap();
1047+
1048+
for _ in 0..2 {
1049+
let conn = pool.dedicated_connection().await.unwrap();
1050+
pool.add(conn).unwrap();
1051+
}
1052+
1053+
let conn = pool.dedicated_connection().await.unwrap();
1054+
let res = pool.add(conn);
1055+
assert!(matches!(res, Err(AddError::NoCapacity(_))));
1056+
}
1057+
1058+
#[tokio::test]
1059+
async fn test_add_checks_broken_connections() {
1060+
let pool = Pool::builder()
1061+
.min_idle(1)
1062+
.max_size(3)
1063+
.build(BrokenConnectionManager::<FakeConnection>::default())
1064+
.await
1065+
.unwrap();
1066+
1067+
let conn = pool.dedicated_connection().await.unwrap();
1068+
let res = pool.add(conn);
1069+
assert!(matches!(res, Err(AddError::Broken(_))));
1070+
}

0 commit comments

Comments
 (0)