Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Weak for PooledConnection's Pool ref? #72

Closed
kardeiz opened this issue Jun 5, 2020 · 12 comments · Fixed by #107
Closed

Use Weak for PooledConnection's Pool ref? #72

kardeiz opened this issue Jun 5, 2020 · 12 comments · Fixed by #107

Comments

@kardeiz
Copy link

kardeiz commented Jun 5, 2020

Would you be open to a PR that changes the Pool reference on PooledConnection to Weak<SharedPool>? You can take a look here: master...kardeiz:local.

This would be useful for using the PooledConnection in some async contexts (for example, I'm working on a sync adapter for bb8 where I'm sending the PooledConnection from another thread).

@kardeiz kardeiz changed the title Use Weak for PooledConnection's Pool ref Use Weak for PooledConnection's Pool ref? Jun 5, 2020
@djc
Copy link
Owner

djc commented Jun 5, 2020

In principle, the lifetime is there on purpose to make it harder to "leak" pooled connections. Why don't you keep a reference to the pool itself in each thread and keep PooledConnections scoped to the thread they originated in?

@kardeiz
Copy link
Author

kardeiz commented Jun 5, 2020

Hi @djc, thanks for the response! I think your suggestion won't work (at least the way I have set things up), because the thread originating the PooledConnection is not the thread using the PooledConnection.

I'll paste my relevant code below, which works with my branch of bb8. It is based loosely on the sync/blocking interfaces of reqwest and ldap3. I'm not sure if it is sound, but it seems to work fine in some small testing. However, I understand your rationale for having pool be a ref, so feel free to close this if you like. Thanks!

impl<M: crate::ManageConnection> Pool<M> {
    pub fn new(
        builder: bb8::Builder<ManageConnectionContainer<M>>,
        manager: M,
    ) -> Result<Self, BuildError<M::Error>> {
        use futures::future::{Future, TryFutureExt};

        let (tx, mut rx) = mpsc::unbounded_channel::<
            oneshot::Sender<
                Result<
                    bb8::PooledConnection<ManageConnectionContainer<M>>,
                    bb8::RunError<M::Error>,
                >,
            >,
        >();
        let (tx_b, rx_b) = oneshot::channel::<Result<(), BuildError<M::Error>>>();

        let handle = std::thread::Builder::new()
            .name("bb8-sync-runtime".into())
            .spawn(move || {
                let mut rt = match runtime::Builder::new()
                    .basic_scheduler()
                    .enable_all()
                    .build()
                    .map_err(BuildError::TokioIo)
                {
                    Ok(rt) => rt,
                    Err(e) => {
                        tx_b.send(Err(e)).unwrap();
                        return;
                    }
                };

                rt.block_on(async move {
                    let pool = match builder
                        .build(ManageConnectionContainer(manager))
                        .map_err(BuildError::ManagedConnectionError)
                        .await
                    {
                        Ok(pool) => pool,
                        Err(e) => {
                            tx_b.send(Err(e)).unwrap();
                            return;
                        }
                    };

                    tx_b.send(Ok(())).unwrap();

                    while let Some(mut tx_get) = rx.recv().await {
                        let pool = pool.clone();
                        tokio::spawn(async move {
                            use std::task::Poll;

                            let fut = pool.get();

                            futures::pin_mut!(fut);

                            let res =
                                futures::future::poll_fn(|ctx| match fut.as_mut().poll(ctx) {
                                    Poll::Ready(val) => Poll::Ready(Some(val)),
                                    Poll::Pending => {
                                        futures::ready!(tx_get.poll_closed(ctx));
                                        Poll::Ready(None)
                                    }
                                })
                                .await;

                            if let Some(res) = res {
                                let _ = tx_get.send(res);
                            }
                        });
                    }
                });
            })
            .unwrap();

        futures::executor::block_on(async move { rx_b.await.unwrap() })?;

        Ok(Pool { tx, handle })
    }

    pub fn get(
        &self,
    ) -> Result<bb8::PooledConnection<ManageConnectionContainer<M>>, bb8::RunError<M::Error>> {
        let (tx, rx) = oneshot::channel();

        self.tx.send(tx).ok().unwrap();

        futures::executor::block_on(async move { rx.await.unwrap() })
    }
}

@djc
Copy link
Owner

djc commented Jun 6, 2020

What I was asking is, why do you need to get the PooledConnection out of the pool on a different thread than the thread where you use it?

@kardeiz
Copy link
Author

kardeiz commented Jun 6, 2020

@djc In my example, it is because the Tokio runtime is working in a background thread. Even if I store the Pool outside of that thread, I will need to send it across to the Tokio thread and send back a PooledConnection, which won't work because of the 'static lifetime requirement.

@djc
Copy link
Owner

djc commented Jun 6, 2020

So you're saying the thread where you want to use the pooled connection is not managed by tokio, therefore you cannot get a connection from the pool directly on the thread where you want to use it?

@kardeiz
Copy link
Author

kardeiz commented Jun 6, 2020

Correct. In the code above, I am using combined oneshot/mpsc channels to get the PooledConnection back to the current thread. This is similar to how reqwest provides its blocking API.

@djc
Copy link
Owner

djc commented Jun 6, 2020

Isn't there some async pooling solution for whatever runtime is managing your thread?

@kardeiz
Copy link
Author

kardeiz commented Jun 8, 2020

I don't think I understand the question.

@djc
Copy link
Owner

djc commented Jun 8, 2020

Why don't you run your worker thread/task (the one that needs the database connection) in the context of the tokio runtime?

@davidpdrsn
Copy link
Contributor

I'm also interested in something like this. My use-case is that I have a trait along the lines of

#[async_trait]
pub trait FromRequest {
    async fn from_request(req: &mut http::Request<hyper::Body>) -> Result<Self, Error>;
}

And would like to implement it for pooled connections in such a way that it would get the pool from request extensions and then grab a connection from there. However since the connection has a reference back to the pool, I cannot return it.

Making the trait generic over the lifetime of the request wouldn't work since its borrowed mutably leading to multiple mutable borrows when using the trait.

If the pooled connection was fully owned however, everything should work.

@djc
Copy link
Owner

djc commented Jun 21, 2021

I guess I'd be open to additional API for this. Something along the lines of a get_weak() method on the Pool, which clearly documents the risk of leaking the connection and explicitly points to get() as the preferred alternative.

@davidpdrsn
Copy link
Contributor

I've submitted #107 to address this 👀

@djc djc closed this as completed in #107 Aug 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants