Skip to content

Commit 3285006

Browse files
authored
Statement timeout + replica imbalance fix (#122)
* Statement timeout * send error message too * Correct error messages * Fix replica inbalance * disable stmt timeout by default * Redundant mark_bad * revert healthcheck delay * tests * set it to 0 * reload config again
1 parent 52303cc commit 3285006

File tree

6 files changed

+77
-7
lines changed

6 files changed

+77
-7
lines changed

.circleci/pgcat.toml

+3
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,13 @@ password = "sharding_user"
9191
# The maximum number of connection from a single Pgcat process to any database in the cluster
9292
# is the sum of pool_size across all users.
9393
pool_size = 9
94+
statement_timeout = 0
9495

9596
[pools.sharded_db.users.1]
9697
username = "other_user"
9798
password = "other_user"
9899
pool_size = 21
100+
statement_timeout = 30000
99101

100102
# Shard 0
101103
[pools.sharded_db.shards.0]
@@ -133,6 +135,7 @@ sharding_function = "pg_bigint_hash"
133135
username = "simple_user"
134136
password = "simple_user"
135137
pool_size = 5
138+
statement_timeout = 30000
136139

137140
[pools.simple_db.shards.0]
138141
servers = [

.circleci/run_tests.sh

+12
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_te
6666
# Replica/primary selection & more sharding tests
6767
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
6868

69+
# Statement timeout tests
70+
sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml
71+
kill -SIGHUP $(pgrep pgcat) # Reload config
72+
sleep 0.2
73+
74+
# This should timeout
75+
(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)')
76+
77+
# Disable statement timeout
78+
sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml
79+
kill -SIGHUP $(pgrep pgcat) # Reload config again
80+
6981
#
7082
# ActiveRecord tests
7183
#

pgcat.toml

+5
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,14 @@ password = "sharding_user"
9292
# is the sum of pool_size across all users.
9393
pool_size = 9
9494

95+
# Maximum query duration. Dangerous, but protetcts against DBs that died and a non-obvious way.
96+
statement_timeout = 0
97+
9598
[pools.sharded_db.users.1]
9699
username = "other_user"
97100
password = "other_user"
98101
pool_size = 21
102+
statement_timeout = 15000
99103

100104
# Shard 0
101105
[pools.sharded_db.shards.0]
@@ -133,6 +137,7 @@ sharding_function = "pg_bigint_hash"
133137
username = "simple_user"
134138
password = "simple_user"
135139
pool_size = 5
140+
statement_timeout = 0
136141

137142
[pools.simple_db.shards.0]
138143
servers = [

src/client.rs

+44-7
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ where
499499
// The query router determines where the query is going to go,
500500
// e.g. primary, replica, which shard.
501501
let mut query_router = QueryRouter::new();
502-
let mut round_robin = 0;
502+
let mut round_robin = rand::random();
503503

504504
// Our custom protocol loop.
505505
// We expect the client to either start a transaction with regular queries
@@ -970,17 +970,54 @@ where
970970
}
971971

972972
async fn receive_server_message(
973-
&self,
973+
&mut self,
974974
server: &mut Server,
975975
address: &Address,
976976
shard: usize,
977977
pool: &ConnectionPool,
978978
) -> Result<BytesMut, Error> {
979-
match server.recv().await {
980-
Ok(message) => Ok(message),
981-
Err(err) => {
982-
pool.ban(address, shard, self.process_id);
983-
Err(err)
979+
if pool.settings.user.statement_timeout > 0 {
980+
match tokio::time::timeout(
981+
tokio::time::Duration::from_millis(pool.settings.user.statement_timeout),
982+
server.recv(),
983+
)
984+
.await
985+
{
986+
Ok(result) => match result {
987+
Ok(message) => Ok(message),
988+
Err(err) => {
989+
pool.ban(address, shard, self.process_id);
990+
error_response_terminal(
991+
&mut self.write,
992+
&format!("error receiving data from server: {:?}", err),
993+
)
994+
.await?;
995+
Err(err)
996+
}
997+
},
998+
Err(_) => {
999+
error!(
1000+
"Statement timeout while talking to {:?} with user {}",
1001+
address, pool.settings.user.username
1002+
);
1003+
server.mark_bad();
1004+
pool.ban(address, shard, self.process_id);
1005+
error_response_terminal(&mut self.write, "pool statement timeout").await?;
1006+
Err(Error::StatementTimeout)
1007+
}
1008+
}
1009+
} else {
1010+
match server.recv().await {
1011+
Ok(message) => Ok(message),
1012+
Err(err) => {
1013+
pool.ban(address, shard, self.process_id);
1014+
error_response_terminal(
1015+
&mut self.write,
1016+
&format!("error receiving data from server: {:?}", err),
1017+
)
1018+
.await?;
1019+
Err(err)
1020+
}
9841021
}
9851022
}
9861023
}

src/config.rs

+12
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub struct User {
100100
pub username: String,
101101
pub password: String,
102102
pub pool_size: u32,
103+
pub statement_timeout: u64,
103104
}
104105

105106
impl Default for User {
@@ -108,6 +109,7 @@ impl Default for User {
108109
username: String::from("postgres"),
109110
password: String::new(),
110111
pool_size: 15,
112+
statement_timeout: 0,
111113
}
112114
}
113115
}
@@ -332,6 +334,7 @@ impl Config {
332334
};
333335

334336
for (pool_name, pool_config) in &self.pools {
337+
// TODO: Make this output prettier (maybe a table?)
335338
info!("--- Settings for pool {} ---", pool_name);
336339
info!(
337340
"Pool size from all users: {}",
@@ -346,8 +349,17 @@ impl Config {
346349
info!("Sharding function: {}", pool_config.sharding_function);
347350
info!("Primary reads: {}", pool_config.primary_reads_enabled);
348351
info!("Query router: {}", pool_config.query_parser_enabled);
352+
353+
// TODO: Make this prettier.
349354
info!("Number of shards: {}", pool_config.shards.len());
350355
info!("Number of users: {}", pool_config.users.len());
356+
357+
for user in &pool_config.users {
358+
info!(
359+
"{} pool size: {}, statement timeout: {}",
360+
user.1.username, user.1.pool_size, user.1.statement_timeout
361+
);
362+
}
351363
}
352364
}
353365
}

src/errors.rs

+1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ pub enum Error {
1111
AllServersDown,
1212
ClientError,
1313
TlsError,
14+
StatementTimeout,
1415
}

0 commit comments

Comments
 (0)