From dba369a40ec45da3535851c5ff2aeedefe714095 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Tue, 2 Aug 2022 17:51:28 -0400 Subject: [PATCH 01/20] Initial commit for graceful shutdown --- Cargo.lock | 10 ++++++++++ Cargo.toml | 1 + src/client.rs | 8 ++++++++ src/main.rs | 29 ++++++++++++++++++++--------- 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ddab730d..e92207c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -422,6 +422,7 @@ dependencies = [ "tokio", "tokio-rustls", "toml", + "wg", ] [[package]] @@ -884,6 +885,15 @@ dependencies = [ "untrusted", ] +[[package]] +name = "wg" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f0193600355622d5de21a3deadfffa0dc6b3aa3c40e02d5a476495c67841a76" +dependencies = [ + "parking_lot", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 8bdeab67..c399d946 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,4 @@ base64 = "0.13" stringprep = "0.1" tokio-rustls = "0.23" rustls-pemfile = "1" +wg = "0.1.0" \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 1775ad22..e00184f5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,6 +18,10 @@ use crate::tls::Tls; use tokio_rustls::server::TlsStream; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false); + /// Type of connection received from client. enum ClientConnectionType { Startup, @@ -462,6 +466,10 @@ where self.transaction_mode ); + if SHUTTING_DOWN.load(Ordering::Relaxed) { + return Ok(()); + } + // Read a complete message from the client, which normally would be // either a `Q` (query) or `P` (prepare, extended protocol). // We can parse it here before grabbing a server from the pool, diff --git a/src/main.rs b/src/main.rs index 3622398c..43200320 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,11 +36,10 @@ extern crate tokio; extern crate tokio_rustls; extern crate toml; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use parking_lot::Mutex; use tokio::net::TcpListener; use tokio::{ - signal, signal::unix::{signal as unix_signal, SignalKind}, sync::mpsc, }; @@ -66,6 +65,9 @@ use config::{get_config, reload_config}; use pool::{ClientServerMap, ConnectionPool}; use stats::{Collector, Reporter, REPORTER}; +use std::sync::atomic::Ordering; +use wg::WaitGroup; + use crate::config::VERSION; #[tokio::main(worker_threads = 4)] @@ -139,6 +141,10 @@ async fn main() { info!("Waiting for clients"); + let wg = WaitGroup::new(); + + let t_wg = wg.clone(); + // Client connection loop. tokio::task::spawn(async move { loop { @@ -152,6 +158,11 @@ async fn main() { } }; + if client::SHUTTING_DOWN.load(Ordering::Relaxed) { + break; + } + let t_wg = t_wg.add(1); + // Handle client. tokio::task::spawn(async move { let start = chrono::offset::Utc::now().naive_utc(); @@ -171,6 +182,7 @@ async fn main() { debug!("Client disconnected with error {:?}", err); } }; + t_wg.done(); }); } }); @@ -214,15 +226,14 @@ async fn main() { }); } - // Exit on Ctrl-C (SIGINT) and SIGTERM. - let mut term_signal = unix_signal(SignalKind::terminate()).unwrap(); + // initiate graceful shutdown sequence on sig int + let mut stream = unix_signal(SignalKind::interrupt()).unwrap(); - tokio::select! { - _ = signal::ctrl_c() => (), - _ = term_signal.recv() => (), - }; + stream.recv().await; + client::SHUTTING_DOWN.store(true, Ordering::Relaxed); + warn!("Got SIGINT, waiting for client connection drain now"); + wg.wait(); - info!("Shutting down..."); } /// Format chrono::Duration to be more human-friendly. From 72c6622a1ef3be1e2aabd50039b3922226a888e3 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Tue, 2 Aug 2022 17:54:03 -0400 Subject: [PATCH 02/20] fmt --- src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 43200320..cdd7d7ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -233,7 +233,6 @@ async fn main() { client::SHUTTING_DOWN.store(true, Ordering::Relaxed); warn!("Got SIGINT, waiting for client connection drain now"); wg.wait(); - } /// Format chrono::Duration to be more human-friendly. From aa593edd10260817f2ae6e2334cdd05c98c946c8 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 4 Aug 2022 20:32:20 -0400 Subject: [PATCH 03/20] Add .vscode to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a4b78411..3c654539 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea /target *.deb +.vscode \ No newline at end of file From 029429ddfdb99eee1a965431e7df6868e4c034b0 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 4 Aug 2022 20:47:32 -0400 Subject: [PATCH 04/20] Updates shutdown logic to use channels --- Cargo.lock | 10 -------- Cargo.toml | 3 +-- src/client.rs | 70 ++++++++++++++++++++++++++++++++++++++++----------- src/main.rs | 66 +++++++++++++++++++++++++++++++++++------------- 4 files changed, 105 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e92207c0..ddab730d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -422,7 +422,6 @@ dependencies = [ "tokio", "tokio-rustls", "toml", - "wg", ] [[package]] @@ -885,15 +884,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "wg" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f0193600355622d5de21a3deadfffa0dc6b3aa3c40e02d5a476495c67841a76" -dependencies = [ - "parking_lot", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index c399d946..2f7295a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,5 +30,4 @@ sha2 = "0.10" base64 = "0.13" stringprep = "0.1" tokio-rustls = "0.23" -rustls-pemfile = "1" -wg = "0.1.0" \ No newline at end of file +rustls-pemfile = "1" \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index e00184f5..e47a50a3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ use log::{debug, error, info, trace}; use std::collections::HashMap; use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; use tokio::net::TcpStream; +use tokio::sync::broadcast::Receiver; use crate::admin::{generate_server_info_for_admin, handle_admin}; use crate::config::get_config; @@ -18,10 +19,6 @@ use crate::tls::Tls; use tokio_rustls::server::TlsStream; -use std::sync::atomic::{AtomicBool, Ordering}; - -pub static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false); - /// Type of connection received from client. enum ClientConnectionType { Startup, @@ -77,12 +74,15 @@ pub struct Client { last_server_id: Option, target_pool: ConnectionPool, + + shutdown_event_receiver: Receiver<()>, } /// Client entrypoint. pub async fn client_entrypoint( mut stream: TcpStream, client_server_map: ClientServerMap, + shutdown_event_receiver: Receiver<()>, ) -> Result<(), Error> { // Figure out if the client wants TLS or not. let addr = stream.peer_addr().unwrap(); @@ -101,7 +101,7 @@ pub async fn client_entrypoint( write_all(&mut stream, yes).await?; // Negotiate TLS. - match startup_tls(stream, client_server_map).await { + match startup_tls(stream, client_server_map, shutdown_event_receiver).await { Ok(mut client) => { info!("Client {:?} connected (TLS)", addr); @@ -125,7 +125,16 @@ pub async fn client_entrypoint( let (read, write) = split(stream); // Continue with regular startup. - match Client::startup(read, write, addr, bytes, client_server_map).await { + match Client::startup( + read, + write, + addr, + bytes, + client_server_map, + shutdown_event_receiver, + ) + .await + { Ok(mut client) => { info!("Client {:?} connected (plain)", addr); @@ -146,7 +155,16 @@ pub async fn client_entrypoint( let (read, write) = split(stream); // Continue with regular startup. - match Client::startup(read, write, addr, bytes, client_server_map).await { + match Client::startup( + read, + write, + addr, + bytes, + client_server_map, + shutdown_event_receiver, + ) + .await + { Ok(mut client) => { info!("Client {:?} connected (plain)", addr); @@ -161,7 +179,16 @@ pub async fn client_entrypoint( let (read, write) = split(stream); // Continue with cancel query request. - match Client::cancel(read, write, addr, bytes, client_server_map).await { + match Client::cancel( + read, + write, + addr, + bytes, + client_server_map, + shutdown_event_receiver, + ) + .await + { Ok(mut client) => { info!("Client {:?} issued a cancel query request", addr); @@ -218,6 +245,7 @@ where pub async fn startup_tls( stream: TcpStream, client_server_map: ClientServerMap, + shutdown_event_receiver: Receiver<()>, ) -> Result>, WriteHalf>>, Error> { // Negotiate TLS. let tls = Tls::new()?; @@ -241,7 +269,15 @@ pub async fn startup_tls( Ok((ClientConnectionType::Startup, bytes)) => { let (read, write) = split(stream); - Client::startup(read, write, addr, bytes, client_server_map).await + Client::startup( + read, + write, + addr, + bytes, + client_server_map, + shutdown_event_receiver, + ) + .await } // Bad Postgres client. @@ -262,6 +298,7 @@ where addr: std::net::SocketAddr, bytes: BytesMut, // The rest of the startup message. client_server_map: ClientServerMap, + shutdown_event_receiver: Receiver<()>, ) -> Result, Error> { let config = get_config(); let stats = get_reporter(); @@ -388,6 +425,7 @@ where last_address_id: None, last_server_id: None, target_pool: target_pool, + shutdown_event_receiver: shutdown_event_receiver, }); } @@ -398,6 +436,7 @@ where addr: std::net::SocketAddr, mut bytes: BytesMut, // The rest of the startup message. client_server_map: ClientServerMap, + shutdown_event_receiver: Receiver<()>, ) -> Result, Error> { let process_id = bytes.get_i32(); let secret_key = bytes.get_i32(); @@ -417,6 +456,7 @@ where last_address_id: None, last_server_id: None, target_pool: ConnectionPool::default(), + shutdown_event_receiver: shutdown_event_receiver, }); } @@ -466,16 +506,18 @@ where self.transaction_mode ); - if SHUTTING_DOWN.load(Ordering::Relaxed) { - return Ok(()); - } - // Read a complete message from the client, which normally would be // either a `Q` (query) or `P` (prepare, extended protocol). // We can parse it here before grabbing a server from the pool, // in case the client is sending some custom protocol messages, e.g. // SET SHARDING KEY TO 'bigint'; - let mut message = read_message(&mut self.read).await?; + + let mut message = tokio::select! { + _ = self.shutdown_event_receiver.recv() => { + return Ok(()) + }, + message_result = read_message(&mut self.read) => message_result? + }; // Get a pool instance referenced by the most up-to-date // pointer. This ensures we always read the latest config diff --git a/src/main.rs b/src/main.rs index cdd7d7ab..a4ca8b93 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,6 +46,7 @@ use tokio::{ use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::broadcast; mod admin; mod client; @@ -65,9 +66,6 @@ use config::{get_config, reload_config}; use pool::{ClientServerMap, ConnectionPool}; use stats::{Collector, Reporter, REPORTER}; -use std::sync::atomic::Ordering; -use wg::WaitGroup; - use crate::config::VERSION; #[tokio::main(worker_threads = 4)] @@ -141,33 +139,51 @@ async fn main() { info!("Waiting for clients"); - let wg = WaitGroup::new(); + let (shutdown_event_tx, mut shutdown_event_rx) = broadcast::channel::<()>(1); - let t_wg = wg.clone(); + let shutdown_event_tx_clone = shutdown_event_tx.clone(); // Client connection loop. tokio::task::spawn(async move { + // Creates event subscriber for shutdown event, this is dropped when shutdown event is broadcast + let mut listener_rx = shutdown_event_tx_clone.subscribe(); loop { let client_server_map = client_server_map.clone(); - let (socket, addr) = match listener.accept().await { - Ok((socket, addr)) => (socket, addr), - Err(err) => { - error!("{:?}", err); - continue; + // Listen for shutdown event and client connection at the same time + let (socket, addr) = tokio::select! { + _ = listener_rx.recv() => { + break; + } + + listener_response = listener.accept() => { + match listener_response { + Ok((socket, addr)) => (socket, addr), + Err(err) => { + error!("{:?}", err); + continue; + } + } } }; - if client::SHUTTING_DOWN.load(Ordering::Relaxed) { - break; - } - let t_wg = t_wg.add(1); + // Used to signal shutdown + let client_shutdown_handler_rx = shutdown_event_tx_clone.subscribe(); + + // Used to signal that the task has completed + let dummy_tx = shutdown_event_tx_clone.clone(); // Handle client. tokio::task::spawn(async move { let start = chrono::offset::Utc::now().naive_utc(); - match client::client_entrypoint(socket, client_server_map).await { + match client::client_entrypoint( + socket, + client_server_map, + client_shutdown_handler_rx, + ) + .await + { Ok(_) => { let duration = chrono::offset::Utc::now().naive_utc() - start; @@ -182,7 +198,8 @@ async fn main() { debug!("Client disconnected with error {:?}", err); } }; - t_wg.done(); + // Drop this transmitter so receiver knows that the task is completed + drop(dummy_tx); }); } }); @@ -230,9 +247,22 @@ async fn main() { let mut stream = unix_signal(SignalKind::interrupt()).unwrap(); stream.recv().await; - client::SHUTTING_DOWN.store(true, Ordering::Relaxed); warn!("Got SIGINT, waiting for client connection drain now"); - wg.wait(); + + // Broadcast that client tasks need to finish + shutdown_event_tx.send(()).unwrap(); + // Closes transmitter + drop(shutdown_event_tx); + + loop { + match shutdown_event_rx.recv().await { + // The first event the receiver gets is from the initial broadcast, so we ignore that + Ok(_) => {} + // Expect to receive a closed error when all transmitters are closed. Which means all clients have completed their work + Err(_) => break, + }; + } + } /// Format chrono::Duration to be more human-friendly. From 67a81f3f3be6fa01d3a78e579de8973ab1a06d30 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 4 Aug 2022 20:47:55 -0400 Subject: [PATCH 05/20] fmt --- src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index a4ca8b93..dccf25c1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -262,7 +262,6 @@ async fn main() { Err(_) => break, }; } - } /// Format chrono::Duration to be more human-friendly. From d667b1715f75f5881f48268f428927496dac85a9 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 4 Aug 2022 20:54:47 -0400 Subject: [PATCH 06/20] fmt --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index dccf25c1..7ce93930 100644 --- a/src/main.rs +++ b/src/main.rs @@ -243,7 +243,7 @@ async fn main() { }); } - // initiate graceful shutdown sequence on sig int + // Initiate graceful shutdown sequence on sig int let mut stream = unix_signal(SignalKind::interrupt()).unwrap(); stream.recv().await; From 324b5391492201a991af44264a44017b54ef969c Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 5 Aug 2022 19:02:07 -0400 Subject: [PATCH 07/20] Adds shutdown timeout --- src/config.rs | 7 +++++++ src/main.rs | 22 ++++++++++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/config.rs b/src/config.rs index f1138f98..2a214260 100644 --- a/src/config.rs +++ b/src/config.rs @@ -119,6 +119,7 @@ pub struct General { pub port: i16, pub connect_timeout: u64, pub healthcheck_timeout: u64, + pub shutdown_timeout: u64, pub ban_time: i64, pub autoreload: bool, pub tls_certificate: Option, @@ -134,6 +135,7 @@ impl Default for General { port: 5432, connect_timeout: 5000, healthcheck_timeout: 1000, + shutdown_timeout: 60000, ban_time: 60, autoreload: false, tls_certificate: None, @@ -273,6 +275,10 @@ impl From<&Config> for std::collections::HashMap { "healthcheck_timeout".to_string(), config.general.healthcheck_timeout.to_string(), ), + ( + "shutdown_timeout".to_string(), + config.general.shutdown_timeout.to_string(), + ), ("ban_time".to_string(), config.general.ban_time.to_string()), ]; @@ -290,6 +296,7 @@ impl Config { self.general.healthcheck_timeout ); info!("Connection timeout: {}ms", self.general.connect_timeout); + info!("Shutdown timeout: {}ms", self.general.shutdown_timeout); match self.general.tls_certificate.clone() { Some(tls_certificate) => { info!("TLS certificate: {}", tls_certificate); diff --git a/src/main.rs b/src/main.rs index 7ce93930..7de65f26 100644 --- a/src/main.rs +++ b/src/main.rs @@ -247,21 +247,27 @@ async fn main() { let mut stream = unix_signal(SignalKind::interrupt()).unwrap(); stream.recv().await; - warn!("Got SIGINT, waiting for client connection drain now"); + debug!("Got SIGINT, waiting for client connection drain now"); // Broadcast that client tasks need to finish shutdown_event_tx.send(()).unwrap(); // Closes transmitter drop(shutdown_event_tx); - + loop { - match shutdown_event_rx.recv().await { - // The first event the receiver gets is from the initial broadcast, so we ignore that - Ok(_) => {} - // Expect to receive a closed error when all transmitters are closed. Which means all clients have completed their work - Err(_) => break, - }; + match tokio::time::timeout(tokio::time::Duration::from_millis(config.general.shutdown_timeout), shutdown_event_rx.recv()).await { + Ok(res) => match res { + Ok(_) => {}, + Err(_) => break, + }, + Err(_) => { + println!("Timed out while waiting for clients to shutdown"); + break + } + } } + + debug!("Shutting down..."); } /// Format chrono::Duration to be more human-friendly. From 6ed4eb1bd354f3546232131b000bb906bc8f3a59 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 5 Aug 2022 19:02:51 -0400 Subject: [PATCH 08/20] Fmt and updates tomls --- .circleci/pgcat.toml | 3 +++ examples/docker/pgcat.toml | 3 +++ pgcat.toml | 3 +++ src/main.rs | 13 +++++++++---- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index 1c0c0104..b44e3d2e 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -17,6 +17,9 @@ connect_timeout = 100 # How much time to give the health check query to return with a result (ms). healthcheck_timeout = 100 +# How much time to give clients during shutdown before forcibly killing client connections (ms). +shutdown_timeout = 5000 + # For how long to ban a server if it fails a health check (seconds). ban_time = 60 # Seconds diff --git a/examples/docker/pgcat.toml b/examples/docker/pgcat.toml index 874f737a..40a54928 100644 --- a/examples/docker/pgcat.toml +++ b/examples/docker/pgcat.toml @@ -17,6 +17,9 @@ connect_timeout = 5000 # How much time to give `SELECT 1` health check query to return with a result (ms). healthcheck_timeout = 1000 +# How much time to give clients during shutdown before forcibly killing client connections (ms). +shutdown_timeout = 60000 + # For how long to ban a server if it fails a health check (seconds). ban_time = 60 # seconds diff --git a/pgcat.toml b/pgcat.toml index a1937e6c..3d9d7df3 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -17,6 +17,9 @@ connect_timeout = 5000 # How much time to give `SELECT 1` health check query to return with a result (ms). healthcheck_timeout = 1000 +# How much time to give clients during shutdown before forcibly killing client connections (ms). +shutdown_timeout = 60000 + # For how long to ban a server if it fails a health check (seconds). ban_time = 60 # seconds diff --git a/src/main.rs b/src/main.rs index 7de65f26..75df65eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -253,16 +253,21 @@ async fn main() { shutdown_event_tx.send(()).unwrap(); // Closes transmitter drop(shutdown_event_tx); - + loop { - match tokio::time::timeout(tokio::time::Duration::from_millis(config.general.shutdown_timeout), shutdown_event_rx.recv()).await { + match tokio::time::timeout( + tokio::time::Duration::from_millis(config.general.shutdown_timeout), + shutdown_event_rx.recv(), + ) + .await + { Ok(res) => match res { - Ok(_) => {}, + Ok(_) => {} Err(_) => break, }, Err(_) => { println!("Timed out while waiting for clients to shutdown"); - break + break; } } } From f667e7aec8194b472a2850a3e14e1858a3174e98 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 5 Aug 2022 19:03:09 -0400 Subject: [PATCH 09/20] Updates readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 5b54a7fc..c03982d2 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1' | `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` | | `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` | | `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` | +| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` | | `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` | | | | | | **`user`** | | | @@ -250,6 +251,7 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process or by qu | `pool_mode` | no | | `connect_timeout` | yes | | `healthcheck_timeout` | no | +| `shutdown_timeout` | no | | `ban_time` | no | | `user` | yes | | `shards` | yes | From 78a1d70a105ac865436f0cffafcc78265e242da5 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 5 Aug 2022 21:37:18 -0400 Subject: [PATCH 10/20] fmt and updates log levels --- Cargo.toml | 2 +- src/main.rs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2f7295a8..8bdeab67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,4 @@ sha2 = "0.10" base64 = "0.13" stringprep = "0.1" tokio-rustls = "0.23" -rustls-pemfile = "1" \ No newline at end of file +rustls-pemfile = "1" diff --git a/src/main.rs b/src/main.rs index 75df65eb..aa1e9857 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,7 +36,7 @@ extern crate tokio; extern crate tokio_rustls; extern crate toml; -use log::{debug, error, info, warn}; +use log::{debug, error, info}; use parking_lot::Mutex; use tokio::net::TcpListener; use tokio::{ @@ -153,6 +153,7 @@ async fn main() { // Listen for shutdown event and client connection at the same time let (socket, addr) = tokio::select! { _ = listener_rx.recv() => { + // Exits client connection loop which drops listener, listener_rx and shutdown_event_tx_clone break; } @@ -247,7 +248,7 @@ async fn main() { let mut stream = unix_signal(SignalKind::interrupt()).unwrap(); stream.recv().await; - debug!("Got SIGINT, waiting for client connection drain now"); + info!("Got SIGINT, waiting for client connection drain now"); // Broadcast that client tasks need to finish shutdown_event_tx.send(()).unwrap(); @@ -266,13 +267,13 @@ async fn main() { Err(_) => break, }, Err(_) => { - println!("Timed out while waiting for clients to shutdown"); + info!("Timed out while waiting for clients to shutdown"); break; } } } - debug!("Shutting down..."); + info!("Shutting down..."); } /// Format chrono::Duration to be more human-friendly. From 3737f063a67f46f2be56b62bc9bc6e3db7a8fc96 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 5 Aug 2022 22:43:25 -0400 Subject: [PATCH 11/20] Update python tests to test shutdown --- .circleci/run_tests.sh | 7 +- tests/python/requirements.txt | 1 + tests/python/tests.py | 144 +++++++++++++++++++++++++++++++++- 3 files changed, 145 insertions(+), 7 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index a6fb7909..ab3649c9 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -74,12 +74,13 @@ cd /home/circleci/project # # Python tests +# These tests will start and stop the pgcat server so it will need to be restarted after the tests # -cd tests/python && \ - pip install -r requirements.txt && \ - python tests.py +pip install -r tests/python/requirements.txt && \ + python tests/python/tests.py cd /home/circleci/project +start_pgcat "info" # Admin tests export PGPASSWORD=admin_pass diff --git a/tests/python/requirements.txt b/tests/python/requirements.txt index d7661d4d..eebd9c90 100644 --- a/tests/python/requirements.txt +++ b/tests/python/requirements.txt @@ -1 +1,2 @@ psycopg2==2.9.3 +psutil==5.9.1 \ No newline at end of file diff --git a/tests/python/tests.py b/tests/python/tests.py index 06d27dce..3ff99a09 100644 --- a/tests/python/tests.py +++ b/tests/python/tests.py @@ -1,22 +1,158 @@ +from typing import Tuple import psycopg2 +import psutil +import os +import signal +import subprocess +from threading import Thread +import time -def test_normal_db_access(): - conn = psycopg2.connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat") +SHUTDOWN_TIMEOUT = 5 + +PGCAT_HOST = "127.0.0.1" +PGCAT_PORT = "6432" + + +def pgcat_start(): + pg_cat_send_signal(signal.SIGTERM) + pgcat_start_command = "./target/debug/pgcat .circleci/pgcat.toml" + subprocess.Popen(pgcat_start_command.split()) + + +def pg_cat_send_signal(signal: signal.Signals): + for proc in psutil.process_iter(["pid", "name"]): + if "pgcat" == proc.name(): + os.kill(proc.pid, signal) + + +def connect_normal_db( + autocommit: bool = False, +) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]: + conn = psycopg2.connect( + f"postgres://sharding_user:sharding_user@{PGCAT_HOST}:{PGCAT_PORT}/sharded_db?application_name=testing_pgcat" + ) + conn.autocommit = autocommit cur = conn.cursor() + return (conn, cur) + + +def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor): + cur.close() + conn.close() + + +def test_normal_db_access(): + conn, cur = connect_normal_db() cur.execute("SELECT 1") res = cur.fetchall() print(res) + cleanup_conn(conn, cur) def test_admin_db_access(): - conn = psycopg2.connect("postgres://user:pass@127.0.0.1:6432/pgcat") - conn.autocommit = True # BEGIN/COMMIT is not supported by admin db + conn = psycopg2.connect( + f"postgres://admin_user:admin_pass@{PGCAT_HOST}:{PGCAT_PORT}/pgcat" + ) + conn.autocommit = True # BEGIN/COMMIT is not supported by admin db cur = conn.cursor() cur.execute("SHOW POOLS") res = cur.fetchall() print(res) + cleanup_conn(conn, cur) + + +def test_shutdown_logic(): + + ##### NO ACTIVE QUERIES SIGINT HANDLING ##### + # Start pgcat + server = Thread(target=pgcat_start) + server.start() + + # Wait for server to fully start up + time.sleep(2) + + # Create client connection and send query (not in transaction) + conn, cur = connect_normal_db(True) + + cur.execute("BEGIN;") + cur.execute("SELECT 1;") + cur.execute("COMMIT;") + + # Send sigint to pgcat + pg_cat_send_signal(signal.SIGINT) + time.sleep(1) + + # Check that any new queries fail after sigint since server should close with no active transactions + try: + cur.execute("SELECT 1;") + except psycopg2.OperationalError as e: + pass + else: + # Fail if query execution succeeded + raise Exception("Server not closed after sigint") + cleanup_conn(conn, cur) + + ##### HANDLE TRANSACTION WITH SIGINT ##### + # Start pgcat + server = Thread(target=pgcat_start) + server.start() + + # Wait for server to fully start up + time.sleep(2) + + # Create client connection and begin transaction + conn, cur = connect_normal_db(True) + + cur.execute("BEGIN;") + cur.execute("SELECT 1;") + + # Send sigint to pgcat while still in transaction + pg_cat_send_signal(signal.SIGINT) + time.sleep(1) + + # Check that any new queries succeed after sigint since server should still allow transaction to complete + try: + cur.execute("SELECT 1;") + except psycopg2.OperationalError as e: + # Fail if query fails since server closed + raise Exception("Server closed while in transaction", e.pgerror) + + cleanup_conn(conn, cur) + + ##### HANDLE SHUTDOWN TIMEOUT WITH SIGINT ##### + # Start pgcat + server = Thread(target=pgcat_start) + server.start() + + # Wait for server to fully start up + time.sleep(3) + + # Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached + conn, cur = connect_normal_db(True) + + cur.execute("BEGIN;") + cur.execute("SELECT 1;") + + # Send sigint to pgcat while still in transaction + pg_cat_send_signal(signal.SIGINT) + + # pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds + time.sleep(SHUTDOWN_TIMEOUT + 1) + + # Check that any new queries succeed after sigint since server should still allow transaction to complete + try: + cur.execute("SELECT 1;") + except psycopg2.OperationalError as e: + pass + else: + # Fail if query execution succeeded + raise Exception("Server not closed after sigint and expected timeout") + + cleanup_conn(conn, cur) + test_normal_db_access() test_admin_db_access() +test_shutdown_logic() From cec0f1fd525d7a942f673adedbf1420d632e7f7d Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Fri, 5 Aug 2022 23:34:50 -0400 Subject: [PATCH 12/20] merge changes --- .circleci/run_tests.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index c2a62504..fa9b7c41 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -76,8 +76,9 @@ cd ../.. # Python tests # These tests will start and stop the pgcat server so it will need to be restarted after the tests # -pip install -r tests/python/requirements.txt && \ - python tests/python/tests.py +pip3 install -r tests/python/requirements.txt && \ + python3 tests/python/tests.py +cd /home/circleci/project start_pgcat "info" From 863ba8c6bc7d9b25b0771959e32ef5b794ff8d37 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 12:57:12 -0400 Subject: [PATCH 13/20] Rename listener rx and update bash to be in line with master --- .circleci/run_tests.sh | 1 - src/main.rs | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index fa9b7c41..da03813f 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -78,7 +78,6 @@ cd ../.. # pip3 install -r tests/python/requirements.txt && \ python3 tests/python/tests.py -cd /home/circleci/project start_pgcat "info" diff --git a/src/main.rs b/src/main.rs index aa1e9857..cc233789 100644 --- a/src/main.rs +++ b/src/main.rs @@ -146,14 +146,14 @@ async fn main() { // Client connection loop. tokio::task::spawn(async move { // Creates event subscriber for shutdown event, this is dropped when shutdown event is broadcast - let mut listener_rx = shutdown_event_tx_clone.subscribe(); + let mut listener_shutdown_event_rx = shutdown_event_tx_clone.subscribe(); loop { let client_server_map = client_server_map.clone(); // Listen for shutdown event and client connection at the same time let (socket, addr) = tokio::select! { - _ = listener_rx.recv() => { - // Exits client connection loop which drops listener, listener_rx and shutdown_event_tx_clone + _ = listener_shutdown_event_rx.recv() => { + // Exits client connection loop which drops listener, listener_shutdown_event_rx and shutdown_event_tx_clone break; } From 335ad621ace7e32f16f8a7c6acdf8c551c71bec6 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 14:32:54 -0400 Subject: [PATCH 14/20] Update python test bash script ordering --- .circleci/run_tests.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index da03813f..431f2d60 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -76,8 +76,8 @@ cd ../.. # Python tests # These tests will start and stop the pgcat server so it will need to be restarted after the tests # -pip3 install -r tests/python/requirements.txt && \ - python3 tests/python/tests.py +pip3 install -r tests/python/requirements.txt +python3 tests/python/tests.py start_pgcat "info" From df08b7f694ad58d56afc5fe8e6217383875d7065 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 17:11:14 -0400 Subject: [PATCH 15/20] Adds error response message before shutdown --- src/client.rs | 1 + src/messages.rs | 27 ++++++++++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index e47a50a3..cc912191 100644 --- a/src/client.rs +++ b/src/client.rs @@ -514,6 +514,7 @@ where let mut message = tokio::select! { _ = self.shutdown_event_receiver.recv() => { + error_response_terminal(&mut self.write, &format!("terminating connection due to administrator command")).await?; return Ok(()) }, message_result = read_message(&mut self.read) => message_result? diff --git a/src/messages.rs b/src/messages.rs index ba22a579..27f4dcac 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -264,6 +264,29 @@ where /// Tell the client we are ready for the next query and no rollback is necessary. /// Docs on error codes: . pub async fn error_response(stream: &mut S, message: &str) -> Result<(), Error> +where + S: tokio::io::AsyncWrite + std::marker::Unpin, +{ + error_response_terminal(stream, message).await?; + + // Ready for query, no rollback needed (I = idle). + let mut ready_for_query = BytesMut::new(); + + ready_for_query.put_u8(b'Z'); + ready_for_query.put_i32(5); + ready_for_query.put_u8(b'I'); + + // Compose the two message reply. + let mut res = BytesMut::with_capacity(ready_for_query.len() + 5); + res.put(ready_for_query); + + Ok(write_all_half(stream, res).await?) +} + +/// Send a custom error message to the client. +/// Tell the client we are ready for the next query and no rollback is necessary. +/// Docs on error codes: . +pub async fn error_response_terminal(stream: &mut S, message: &str) -> Result<(), Error> where S: tokio::io::AsyncWrite + std::marker::Unpin, { @@ -296,13 +319,11 @@ where ready_for_query.put_u8(b'I'); // Compose the two message reply. - let mut res = BytesMut::with_capacity(error.len() + ready_for_query.len() + 5); + let mut res = BytesMut::with_capacity(error.len()); res.put_u8(b'E'); res.put_i32(error.len() as i32 + 4); - res.put(error); - res.put(ready_for_query); Ok(write_all_half(stream, res).await?) } From 35d334623312ae5184975b7515d1171ad7b3a479 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 17:14:30 -0400 Subject: [PATCH 16/20] Add details on shutdown event loop --- src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main.rs b/src/main.rs index cc233789..8b7be77e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -255,6 +255,8 @@ async fn main() { // Closes transmitter drop(shutdown_event_tx); + // This is in a loop because the first event that the receiver receives will be the shutdown event + // This is not what we are waiting for instead, we want the receiver to send an error once all senders are closed which is reached after the shutdown event is received loop { match tokio::time::timeout( tokio::time::Duration::from_millis(config.general.shutdown_timeout), From ab4e6699f737aef1b2eb89d6f8f519c8944c27f7 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 17:23:05 -0400 Subject: [PATCH 17/20] Fixes response length for error --- src/messages.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/messages.rs b/src/messages.rs index 27f4dcac..d87e3587 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -311,15 +311,8 @@ where // No more fields follow. error.put_u8(0); - // Ready for query, no rollback needed (I = idle). - let mut ready_for_query = BytesMut::new(); - - ready_for_query.put_u8(b'Z'); - ready_for_query.put_i32(5); - ready_for_query.put_u8(b'I'); - // Compose the two message reply. - let mut res = BytesMut::with_capacity(error.len()); + let mut res = BytesMut::with_capacity(error.len() + 5); res.put_u8(b'E'); res.put_i32(error.len() as i32 + 4); From 72d7492f59a16c3ba8b06a7096c1fa3a9eec3bba Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 17:43:38 -0400 Subject: [PATCH 18/20] Adds handler for sigterm --- src/main.rs | 61 +++++++++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8b7be77e..5e5c9248 100644 --- a/src/main.rs +++ b/src/main.rs @@ -244,35 +244,40 @@ async fn main() { }); } - // Initiate graceful shutdown sequence on sig int - let mut stream = unix_signal(SignalKind::interrupt()).unwrap(); - - stream.recv().await; - info!("Got SIGINT, waiting for client connection drain now"); - - // Broadcast that client tasks need to finish - shutdown_event_tx.send(()).unwrap(); - // Closes transmitter - drop(shutdown_event_tx); - - // This is in a loop because the first event that the receiver receives will be the shutdown event - // This is not what we are waiting for instead, we want the receiver to send an error once all senders are closed which is reached after the shutdown event is received - loop { - match tokio::time::timeout( - tokio::time::Duration::from_millis(config.general.shutdown_timeout), - shutdown_event_rx.recv(), - ) - .await - { - Ok(res) => match res { - Ok(_) => {} - Err(_) => break, - }, - Err(_) => { - info!("Timed out while waiting for clients to shutdown"); - break; + let mut term_signal = unix_signal(SignalKind::terminate()).unwrap(); + let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap(); + + tokio::select! { + // Initiate graceful shutdown sequence on sig int + _ = interrupt_signal.recv() => { + info!("Got SIGINT, waiting for client connection drain now"); + + // Broadcast that client tasks need to finish + shutdown_event_tx.send(()).unwrap(); + // Closes transmitter + drop(shutdown_event_tx); + + // This is in a loop because the first event that the receiver receives will be the shutdown event + // This is not what we are waiting for instead, we want the receiver to send an error once all senders are closed which is reached after the shutdown event is received + loop { + match tokio::time::timeout( + tokio::time::Duration::from_millis(config.general.shutdown_timeout), + shutdown_event_rx.recv(), + ) + .await + { + Ok(res) => match res { + Ok(_) => {} + Err(_) => break, + }, + Err(_) => { + info!("Timed out while waiting for clients to shutdown"); + break; + } + } } - } + }, + _ = term_signal.recv() => (), } info!("Shutting down..."); From e1620ddc9576d15def4acf66b90a699e37255e41 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 18:51:59 -0400 Subject: [PATCH 19/20] Uses ready for query function and fixes number of bytes --- src/messages.rs | 32 ++++++-------------------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/src/messages.rs b/src/messages.rs index d87e3587..cac1dbbb 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -98,7 +98,7 @@ pub async fn ready_for_query(stream: &mut S) -> Result<(), Error> where S: tokio::io::AsyncWrite + std::marker::Unpin, { - let mut bytes = BytesMut::with_capacity(5); + let mut bytes = BytesMut::with_capacity(mem::size_of::() + mem::size_of::() + mem::size_of::()); bytes.put_u8(b'Z'); bytes.put_i32(5); @@ -252,12 +252,8 @@ where res.put_i32(len); res.put_slice(&set_complete[..]); - // ReadyForQuery (idle) - res.put_u8(b'Z'); - res.put_i32(5); - res.put_u8(b'I'); - - write_all_half(stream, res).await + write_all_half(stream, res).await?; + ready_for_query(stream).await } /// Send a custom error message to the client. @@ -268,19 +264,7 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin, { error_response_terminal(stream, message).await?; - - // Ready for query, no rollback needed (I = idle). - let mut ready_for_query = BytesMut::new(); - - ready_for_query.put_u8(b'Z'); - ready_for_query.put_i32(5); - ready_for_query.put_u8(b'I'); - - // Compose the two message reply. - let mut res = BytesMut::with_capacity(ready_for_query.len() + 5); - res.put(ready_for_query); - - Ok(write_all_half(stream, res).await?) + ready_for_query(stream).await } /// Send a custom error message to the client. @@ -380,12 +364,8 @@ where // CommandComplete res.put(command_complete("SELECT 1")); - // ReadyForQuery - res.put_u8(b'Z'); - res.put_i32(5); - res.put_u8(b'I'); - - write_all_half(stream, res).await + write_all_half(stream, res).await?; + ready_for_query(stream).await } pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut { From b018f41a9d4711076774e8fee337c0a0f4b36019 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Mon, 8 Aug 2022 18:53:04 -0400 Subject: [PATCH 20/20] fmt --- src/messages.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/messages.rs b/src/messages.rs index cac1dbbb..113e1ed5 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -98,7 +98,9 @@ pub async fn ready_for_query(stream: &mut S) -> Result<(), Error> where S: tokio::io::AsyncWrite + std::marker::Unpin, { - let mut bytes = BytesMut::with_capacity(mem::size_of::() + mem::size_of::() + mem::size_of::()); + let mut bytes = BytesMut::with_capacity( + mem::size_of::() + mem::size_of::() + mem::size_of::(), + ); bytes.put_u8(b'Z'); bytes.put_i32(5);