Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing graceful shutdown #105

Merged
merged 21 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ connect_timeout = 100
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 100

# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 5000

# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # Seconds

Expand Down
8 changes: 4 additions & 4 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ cd ../..

#
# Python tests
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
#
cd tests/python
pip3 install -r requirements.txt
python3 tests.py
cd ../..
pip3 install -r tests/python/requirements.txt
python3 tests/python/tests.py

start_pgcat "info"

# Admin tests
export PGPASSWORD=admin_pass
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea
/target
*.deb
.vscode
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
| `pool_mode` | The pool mode to use, i.e. `session` or `transaction`. | `transaction` |
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| | | |
| **`user`** | | |
Expand Down Expand Up @@ -250,6 +251,7 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process or by qu
| `pool_mode` | no |
| `connect_timeout` | yes |
| `healthcheck_timeout` | no |
| `shutdown_timeout` | no |
| `ban_time` | no |
| `user` | yes |
| `shards` | yes |
Expand Down
3 changes: 3 additions & 0 deletions examples/docker/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ connect_timeout = 5000
# How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout = 1000

# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 60000

# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # seconds

Expand Down
3 changes: 3 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ connect_timeout = 5000
# How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout = 1000

# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 60000

# For how long to ban a server if it fails a health check (seconds).
ban_time = 60 # seconds

Expand Down
63 changes: 57 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use log::{debug, error, info, trace};
use std::collections::HashMap;
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::sync::broadcast::Receiver;

use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::config::get_config;
Expand Down Expand Up @@ -73,12 +74,15 @@ pub struct Client<S, T> {
last_server_id: Option<i32>,

target_pool: ConnectionPool,

shutdown_event_receiver: Receiver<()>,
}

/// Client entrypoint.
pub async fn client_entrypoint(
mut stream: TcpStream,
client_server_map: ClientServerMap,
shutdown_event_receiver: Receiver<()>,
) -> Result<(), Error> {
// Figure out if the client wants TLS or not.
let addr = stream.peer_addr().unwrap();
Expand All @@ -97,7 +101,7 @@ pub async fn client_entrypoint(
write_all(&mut stream, yes).await?;

// Negotiate TLS.
match startup_tls(stream, client_server_map).await {
match startup_tls(stream, client_server_map, shutdown_event_receiver).await {
Ok(mut client) => {
info!("Client {:?} connected (TLS)", addr);

Expand All @@ -121,7 +125,16 @@ pub async fn client_entrypoint(
let (read, write) = split(stream);

// Continue with regular startup.
match Client::startup(read, write, addr, bytes, client_server_map).await {
match Client::startup(
read,
write,
addr,
bytes,
client_server_map,
shutdown_event_receiver,
)
.await
{
Ok(mut client) => {
info!("Client {:?} connected (plain)", addr);

Expand All @@ -142,7 +155,16 @@ pub async fn client_entrypoint(
let (read, write) = split(stream);

// Continue with regular startup.
match Client::startup(read, write, addr, bytes, client_server_map).await {
match Client::startup(
read,
write,
addr,
bytes,
client_server_map,
shutdown_event_receiver,
)
.await
{
Ok(mut client) => {
info!("Client {:?} connected (plain)", addr);

Expand All @@ -157,7 +179,16 @@ pub async fn client_entrypoint(
let (read, write) = split(stream);

// Continue with cancel query request.
match Client::cancel(read, write, addr, bytes, client_server_map).await {
match Client::cancel(
read,
write,
addr,
bytes,
client_server_map,
shutdown_event_receiver,
)
.await
{
Ok(mut client) => {
info!("Client {:?} issued a cancel query request", addr);

Expand Down Expand Up @@ -214,6 +245,7 @@ where
pub async fn startup_tls(
stream: TcpStream,
client_server_map: ClientServerMap,
shutdown_event_receiver: Receiver<()>,
) -> Result<Client<ReadHalf<TlsStream<TcpStream>>, WriteHalf<TlsStream<TcpStream>>>, Error> {
// Negotiate TLS.
let tls = Tls::new()?;
Expand All @@ -237,7 +269,15 @@ pub async fn startup_tls(
Ok((ClientConnectionType::Startup, bytes)) => {
let (read, write) = split(stream);

Client::startup(read, write, addr, bytes, client_server_map).await
Client::startup(
read,
write,
addr,
bytes,
client_server_map,
shutdown_event_receiver,
)
.await
}

// Bad Postgres client.
Expand All @@ -258,6 +298,7 @@ where
addr: std::net::SocketAddr,
bytes: BytesMut, // The rest of the startup message.
client_server_map: ClientServerMap,
shutdown_event_receiver: Receiver<()>,
) -> Result<Client<S, T>, Error> {
let config = get_config();
let stats = get_reporter();
Expand Down Expand Up @@ -384,6 +425,7 @@ where
last_address_id: None,
last_server_id: None,
target_pool: target_pool,
shutdown_event_receiver: shutdown_event_receiver,
});
}

Expand All @@ -394,6 +436,7 @@ where
addr: std::net::SocketAddr,
mut bytes: BytesMut, // The rest of the startup message.
client_server_map: ClientServerMap,
shutdown_event_receiver: Receiver<()>,
) -> Result<Client<S, T>, Error> {
let process_id = bytes.get_i32();
let secret_key = bytes.get_i32();
Expand All @@ -413,6 +456,7 @@ where
last_address_id: None,
last_server_id: None,
target_pool: ConnectionPool::default(),
shutdown_event_receiver: shutdown_event_receiver,
});
}

Expand Down Expand Up @@ -467,7 +511,14 @@ where
// We can parse it here before grabbing a server from the pool,
// in case the client is sending some custom protocol messages, e.g.
// SET SHARDING KEY TO 'bigint';
let mut message = read_message(&mut self.read).await?;

let mut message = tokio::select! {
_ = self.shutdown_event_receiver.recv() => {
error_response_terminal(&mut self.write, &format!("terminating connection due to administrator command")).await?;
return Ok(())
},
message_result = read_message(&mut self.read) => message_result?
};

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ pub struct General {
pub port: i16,
pub connect_timeout: u64,
pub healthcheck_timeout: u64,
pub shutdown_timeout: u64,
pub ban_time: i64,
pub autoreload: bool,
pub tls_certificate: Option<String>,
Expand All @@ -134,6 +135,7 @@ impl Default for General {
port: 5432,
connect_timeout: 5000,
healthcheck_timeout: 1000,
shutdown_timeout: 60000,
ban_time: 60,
autoreload: false,
tls_certificate: None,
Expand Down Expand Up @@ -273,6 +275,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
"healthcheck_timeout".to_string(),
config.general.healthcheck_timeout.to_string(),
),
(
"shutdown_timeout".to_string(),
config.general.shutdown_timeout.to_string(),
),
("ban_time".to_string(), config.general.ban_time.to_string()),
];

Expand All @@ -290,6 +296,7 @@ impl Config {
self.general.healthcheck_timeout
);
info!("Connection timeout: {}ms", self.general.connect_timeout);
info!("Shutdown timeout: {}ms", self.general.shutdown_timeout);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);
Expand Down
78 changes: 68 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ use log::{debug, error, info};
use parking_lot::Mutex;
use tokio::net::TcpListener;
use tokio::{
signal,
signal::unix::{signal as unix_signal, SignalKind},
sync::mpsc,
};

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;

mod admin;
mod client;
Expand Down Expand Up @@ -139,24 +139,52 @@ async fn main() {

info!("Waiting for clients");

let (shutdown_event_tx, mut shutdown_event_rx) = broadcast::channel::<()>(1);

let shutdown_event_tx_clone = shutdown_event_tx.clone();

// Client connection loop.
tokio::task::spawn(async move {
// Creates event subscriber for shutdown event, this is dropped when shutdown event is broadcast
let mut listener_shutdown_event_rx = shutdown_event_tx_clone.subscribe();
loop {
let client_server_map = client_server_map.clone();

let (socket, addr) = match listener.accept().await {
Ok((socket, addr)) => (socket, addr),
Err(err) => {
error!("{:?}", err);
continue;
// Listen for shutdown event and client connection at the same time
let (socket, addr) = tokio::select! {
_ = listener_shutdown_event_rx.recv() => {
// Exits client connection loop which drops listener, listener_shutdown_event_rx and shutdown_event_tx_clone
break;
}

listener_response = listener.accept() => {
match listener_response {
Ok((socket, addr)) => (socket, addr),
Err(err) => {
error!("{:?}", err);
continue;
}
}
}
};

// Used to signal shutdown
let client_shutdown_handler_rx = shutdown_event_tx_clone.subscribe();

// Used to signal that the task has completed
let dummy_tx = shutdown_event_tx_clone.clone();

// Handle client.
tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();

match client::client_entrypoint(socket, client_server_map).await {
match client::client_entrypoint(
socket,
client_server_map,
client_shutdown_handler_rx,
)
.await
{
Ok(_) => {
let duration = chrono::offset::Utc::now().naive_utc() - start;

Expand All @@ -171,6 +199,8 @@ async fn main() {
debug!("Client disconnected with error {:?}", err);
}
};
// Drop this transmitter so receiver knows that the task is completed
drop(dummy_tx);
});
}
});
Expand Down Expand Up @@ -214,13 +244,41 @@ async fn main() {
});
}

// Exit on Ctrl-C (SIGINT) and SIGTERM.
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();

tokio::select! {
_ = signal::ctrl_c() => (),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you removed the ctrl-c and the SIGTERM handler? SIGTERM is commonly used in Docker containers to stop a container.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pgbouncer interprets sigterm and sigint differently. since the sigint is graceful shutdown we want to catch and handle that. for sigterm, which is immediate shutdown we don't need to do anything special

SIGINT
Safe shutdown. Same as issuing PAUSE and SHUTDOWN on the console.
SIGTERM
Immediate shutdown. Same as issuing SHUTDOWN on the console.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the reasoning here. In Kuberentes SIGTERM is meant to initiate a graceful shutdown. https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/

// Initiate graceful shutdown sequence on sig int
_ = interrupt_signal.recv() => {
info!("Got SIGINT, waiting for client connection drain now");

// Broadcast that client tasks need to finish
shutdown_event_tx.send(()).unwrap();
// Closes transmitter
drop(shutdown_event_tx);

// This is in a loop because the first event that the receiver receives will be the shutdown event
// This is not what we are waiting for instead, we want the receiver to send an error once all senders are closed which is reached after the shutdown event is received
loop {
match tokio::time::timeout(
tokio::time::Duration::from_millis(config.general.shutdown_timeout),
shutdown_event_rx.recv(),
)
.await
{
Ok(res) => match res {
Ok(_) => {}
Err(_) => break,
},
Err(_) => {
info!("Timed out while waiting for clients to shutdown");
break;
}
}
}
},
_ = term_signal.recv() => (),
};
}

info!("Shutting down...");
}
Expand Down
Loading