Skip to content

Commit a8b5501

Browse files
committed
Add support for auth_query based authentication to mirrored servers
The change itself is fairly simple, just piping the auth_hash object to the mirror server pool. The test was created by putting aping existing tests in mirror_spec.rb and auth_query_spec.rb. I also updated the auth of the mirror instance to md5, since the connection failed silently when scram-sha-256 was used.
1 parent b37d105 commit a8b5501

File tree

5 files changed

+80
-2
lines changed

5 files changed

+80
-2
lines changed

src/mirrors.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct MirroredClient {
1717
database: String,
1818
bytes_rx: Receiver<Bytes>,
1919
disconnect_rx: Receiver<()>,
20+
auth_hash: Arc<RwLock<Option<String>>>,
2021
}
2122

2223
impl MirroredClient {
@@ -39,7 +40,7 @@ impl MirroredClient {
3940
self.user.clone(),
4041
self.database.as_str(),
4142
ClientServerMap::default(),
42-
Arc::new(RwLock::new(None)),
43+
self.auth_hash.clone(),
4344
None,
4445
true,
4546
false,
@@ -125,6 +126,7 @@ impl MirroringManager {
125126
user: User,
126127
database: String,
127128
addresses: Vec<Address>,
129+
auth_hash: Arc<RwLock<Option<String>>>,
128130
) -> MirroringManager {
129131
let mut byte_senders: Vec<Sender<Bytes>> = vec![];
130132
let mut exit_senders: Vec<Sender<()>> = vec![];
@@ -140,6 +142,7 @@ impl MirroringManager {
140142
address: addr,
141143
bytes_rx,
142144
disconnect_rx: exit_rx,
145+
auth_hash: auth_hash.clone(),
143146
};
144147
exit_senders.push(exit_tx);
145148
byte_senders.push(bytes_tx);

src/server.rs

+1
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,7 @@ impl Server {
820820
user.clone(),
821821
database.to_owned(),
822822
address.mirrors.clone(),
823+
auth_hash,
823824
)),
824825
},
825826
cleanup_connections,

tests/docker/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ services:
2424
POSTGRES_USER: postgres
2525
POSTGRES_DB: postgres
2626
POSTGRES_PASSWORD: postgres
27-
POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256
27+
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
2828
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
2929
pg4:
3030
image: postgres:14

tests/ruby/helpers/auth_query_helper.rb

+3
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,10 @@ def self.set_up_auth_query_for_user(user:, password:, instance_ports: [ 5432, 10
147147
instance_ports.each do |port|
148148
connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}")
149149
connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction
150+
connection.exec("REVOKE EXECUTE ON FUNCTION pg_stat_statements_reset FROM #{user}") rescue PG::UndefinedObject
150151
connection.exec("DROP ROLE #{user}") rescue PG::UndefinedObject
151152
connection.exec("CREATE ROLE #{user} ENCRYPTED PASSWORD '#{password}' LOGIN;")
153+
connection.exec("GRANT EXECUTE ON FUNCTION pg_stat_statements_reset TO #{user}")
152154
connection.exec(self.create_query_auth_function(user))
153155
connection.close
154156
end
@@ -158,6 +160,7 @@ def self.tear_down_auth_query_for_user(user:, password:, instance_ports: [ 5432,
158160
instance_ports.each do |port|
159161
connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}")
160162
connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction
163+
connection.exec("REVOKE EXECUTE ON FUNCTION pg_stat_statements_reset FROM #{user}")
161164
connection.exec("DROP ROLE #{user}")
162165
connection.close
163166
end

tests/ruby/mirrors_spec.rb

+71
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22
require 'uri'
33
require_relative 'spec_helper'
4+
require_relative 'helpers/auth_query_helper'
45

56
describe "Query Mirroing" do
67
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
@@ -89,3 +90,73 @@
8990
end
9091
end
9192
end
93+
94+
describe "Query Mirroring with Auth Query" do
95+
let(:pg_user) { { 'username' => 'sharding_user', 'password' => 'sharding_user' } }
96+
let(:config_user) { {'username' => 'md5_auth_user'} }
97+
let(:auth_query_user) { { 'username' => 'md5_auth_user', 'password' => 'hash' } }
98+
let(:mirror_pg) { PgInstance.new(8432, "md5_auth_user", "hash", "shard0") }
99+
let(:mirror_host) { "localhost" }
100+
let(:config) {
101+
{
102+
'general' => {
103+
'auth_query' => "SELECT * FROM public.user_lookup('$1');",
104+
'auth_query_user' => auth_query_user['username'],
105+
'auth_query_password' => auth_query_user['password']
106+
},
107+
}
108+
}
109+
let(:processes) { Helpers::AuthQuery.single_shard_auth_query(
110+
pool_name: "sharded_db",
111+
pg_user: pg_user,
112+
config_user: config_user,
113+
extra_conf: config,
114+
wait_until_ready: false,
115+
)}
116+
117+
before do
118+
pgcat = processes.pgcat
119+
120+
new_configs = processes.pgcat.current_config
121+
new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
122+
[mirror_host, mirror_pg.port.to_i, 0],
123+
[mirror_host, mirror_pg.port.to_i, 1],
124+
]
125+
pgcat.update_config(new_configs)
126+
pgcat.reload_config
127+
128+
Helpers::AuthQuery.set_up_auth_query_for_user(
129+
user: auth_query_user['username'],
130+
password: auth_query_user['password'],
131+
instance_ports: [processes.primary.port, processes.replicas[0].port, mirror_pg.port],
132+
)
133+
134+
pgcat.wait_until_ready(
135+
pgcat.connection_string("sharded_db", auth_query_user['username'], auth_query_user['password'])
136+
)
137+
138+
mirror_pg.reset
139+
end
140+
141+
after do
142+
Helpers::AuthQuery.tear_down_auth_query_for_user(
143+
user: auth_query_user['username'],
144+
password: auth_query_user['password'],
145+
instance_ports: [processes.primary.port, processes.replicas[0].port, mirror_pg.port],
146+
)
147+
end
148+
149+
context "when auth_query is configured" do
150+
it "can mirror a query" do
151+
conn = PG.connect(processes.pgcat.connection_string("sharded_db", auth_query_user['username'], auth_query_user['password']))
152+
runs=5
153+
runs.times { conn.sync_exec("SELECT 1 + 2") }
154+
conn.close
155+
156+
# I'd like to check verify the primary and replica received the queries too, but getting the permissions correct is annoying
157+
# expect((processes.all_databases + processes.replicas).map(&:count_select_1_plus_2).sum).to eq(0)
158+
expect(mirror_pg.count_select_1_plus_2).to eq(runs)
159+
end
160+
end
161+
end
162+

0 commit comments

Comments
 (0)