Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimedOut error when has_broken = true #167

Closed
mdecimus opened this issue May 31, 2023 · 8 comments
Closed

TimedOut error when has_broken = true #167

mdecimus opened this issue May 31, 2023 · 8 comments

Comments

@mdecimus
Copy link

Hi,

I found what seems to be a bug. When multiple connections return true on has_broken(), all future requests to get() fail with a TimedOut error. This code can be used to reproduce the problem:

use std::{convert::Infallible, time::Duration};

struct ConnectionManager;
struct Connection;

#[async_trait::async_trait]
impl bb8::ManageConnection for ConnectionManager {
    type Connection = Connection;
    type Error = Infallible;

    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        Ok(Connection)
    }

    async fn is_valid(&self, _: &mut Self::Connection) -> Result<(), Self::Error> {
        Ok(())
    }

    fn has_broken(&self, _: &mut Self::Connection) -> bool {
        true
    }
}

#[tokio::test]
async fn pool() {
    let pool = bb8::Pool::builder()
        .max_size(5)
        .connection_timeout(Duration::from_secs(10))
        .build(ConnectionManager)
        .await
        .unwrap();

    let mut futures = Vec::new();

    for _ in 0..10 {
        let pool = pool.clone();
        futures.push(tokio::spawn(async move {
            let conn = pool.get().await.unwrap();
            drop(conn);
        }));
    }

    for future in futures {
        future.await.unwrap();
    }
}

@mdecimus
Copy link
Author

I also found out that is_valid is not called before releasing a connection from the pool:

use std::time::Duration;

struct ConnectionManager;
struct Connection;

#[async_trait::async_trait]
impl bb8::ManageConnection for ConnectionManager {
    type Connection = Connection;
    type Error = ();

    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        println!("New connection");
        Ok(Connection)
    }

    async fn is_valid(&self, _: &mut Self::Connection) -> Result<(), Self::Error> {
        println!("Test connection");
        Err(())
    }

    fn has_broken(&self, _: &mut Self::Connection) -> bool {
        false
    }
}

#[tokio::test]
async fn pool() {
    let pool = bb8::Pool::builder()
        .max_size(5)
        .connection_timeout(Duration::from_secs(10))
        .test_on_check_out(true)
        .build(ConnectionManager)
        .await
        .unwrap();

    let mut futures = Vec::new();

    for _ in 0..10 {
        let pool = pool.clone();
        futures.push(tokio::spawn(async move {
            let conn = pool.get().await.unwrap();
            println!("Got connection");
            tokio::time::sleep(Duration::from_millis(100)).await;
            drop(conn);
        }));
    }

    for future in futures {
        future.await.unwrap();
    }
}

The output is:

New connection
New connection
New connection
New connection
New connection
Got connection
Got connection
Got connection
Got connection
Got connection
Got connection
Got connection
Got connection
Got connection
Got connection

@mdecimus
Copy link
Author

Another update, it seems to be a concurrency bug, waiting a few milliseconds before spawning the next concurrent get request makes everything work as expected:

for _ in 0..10 {
        let pool = pool.clone();
        futures.push(tokio::spawn(async move {
            let conn = pool.get().await.unwrap();
            println!("Got connection");
            drop(conn);
        }));
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

@djc
Copy link
Owner

djc commented Jun 2, 2023

Is there a regression here? If so, I suspect #153 might be at fault...

@mdecimus
Copy link
Author

mdecimus commented Jun 2, 2023

I am not sure, I started using bb8 this week probably after #153 had been merged.

@couchand
Copy link
Contributor

Hi @mdecimus, thanks for the bug report! Your findings on is_valid are expected, see the documentation on Builder::test_on_check_out. The documentation for the ManageConnection trait could possibly use more information about what each method is used for.

@djc I've confirmed that this bug still reproduces before #153 was merged, it fails identically way back on 709137f which was the arbitrary old commit I picked to check.

It's interesting, there's definitely a timing aspect here. I can trigger/not trigger the issue with this code on the loop:

      let pause_on = 5;
      let delay = 2;
      for i in 0..6 {
          if i == pause_on { tokio::time::sleep(Duration::from_millis(delay)).await; }
          let pool = pool.clone();
          futures.push(tokio::spawn(async move {
              let conn = pool.get().await.unwrap();
              drop(conn);
          }));
      }

This test will pass with a value of pause_on from 0 to 5, but not 6, no matter how high delay goes. This test will pass with a value for delay of 2 but not 1 (on my machine). Moving the delay inside the async block will never pass the test, no matter where it goes. Adding a delay just before dropping can make an otherwise-passing test fail.

@xortive
Copy link
Contributor

xortive commented Mar 21, 2024

I did some debugging based on the test case provided in the issue.

  • the spawned task in the test case's loop runs to the first await point for each iteration, such that
    10 calls are made to PoolInternals::approvals. After this, PoolInternals::pending_conns is equal to 5,
    corresponding to the 5 allowed approvals for the pool_size of 5.
    At this point, the test case's tasks are waiting on SharedPool::notify
  • the spawn_replenishing_approvals tasks now run, of which there is only 5. They all create a connection and notify their waters.
  • Each spawned task from the test immediately drops the connection, which calls PoolInner::put_back. Since these connections always return true for has_broken(),
    but we don't create any new connections to replace them.
  • The last 5 spawned tasks created are left waiting for the notification that a connection is available in the pool, which never comes. They time out.

I'm not quite sure how to solve this best, I'd be happy to write up a PR if you have any ideas @djc

@xortive
Copy link
Contributor

xortive commented Mar 22, 2024

Adding a call to self.inner.notify.notify_waiters() in PoolInner::put_back might be a potential solution:

    /// Return connection back in to the pool
    pub(crate) fn put_back(&self, mut conn: Conn<M::Connection>, state: ConnectionState) {
        debug_assert!(
            !matches!(state, ConnectionState::Extracted),
            "handled in caller"
        );

        let mut locked = self.inner.internals.lock();
        match (state, self.inner.manager.has_broken(&mut conn.conn)) {
            (ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
            (_, _) => {
                let approvals = locked.dropped(1, &self.inner.statics);
                self.spawn_replenishing_approvals(approvals);
                self.inner.notify.notify_waiters();
            }
        }
    }

This would only notify active waiters, they wake up, and see that there's now an available slot. The provided test case passes with this change.

It might be a good idea to do this in Reaper::run too?

@djc
Copy link
Owner

djc commented Mar 22, 2024

@xortive thanks for looking into this! Would you be able to submit a PR with both a regression unit test and your proposed fix? Would make it a bit easier for me to review in context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants