Skip to content

Commit f73b22f

Browse files
authored
Handle bad message ordering - make it catchable. Fixes 3174 (#3289)
* Handle bad message ordering - make it catchable. Fixes 3174 * Close client in test * Mess w/ github action settings * update ci config * Remove redundant tests * Update code to use handle error event * Add tests for commandComplete message being out of order * Lint fix * Fix native tests * Fix lint again...airport computer not my friend * Not a native issue
1 parent 92bb9a2 commit f73b22f

File tree

5 files changed

+203
-53
lines changed

5 files changed

+203
-53
lines changed

.github/workflows/ci.yml

+3-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
- run: yarn install --frozen-lockfile
2222
- run: yarn lint
2323
build:
24-
timeout-minutes: 10
24+
timeout-minutes: 15
2525
needs: lint
2626
services:
2727
postgres:
@@ -44,8 +44,8 @@ jobs:
4444
- '22'
4545
os:
4646
- ubuntu-latest
47-
name: Node.js ${{ matrix.node }} (${{ matrix.os }})
48-
runs-on: ${{ matrix.os }}
47+
name: Node.js ${{ matrix.node }}
48+
runs-on: ubuntu-latest
4949
env:
5050
PGUSER: postgres
5151
PGPASSWORD: postgres
@@ -71,5 +71,4 @@ jobs:
7171
node-version: ${{ matrix.node }}
7272
cache: yarn
7373
- run: yarn install --frozen-lockfile
74-
# TODO(bmc): get ssl tests working in ci
7574
- run: yarn test

packages/pg-native/test/many-connections.js

+1-12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ var bytes = require('crypto').pseudoRandomBytes
66
describe('many connections', function () {
77
describe('async', function () {
88
var test = function (count, times) {
9-
it('connecting ' + count + ' clients ' + times, function (done) {
9+
it(`connecting ${count} clients ${times} times`, function (done) {
1010
this.timeout(200000)
1111

1212
var connectClient = function (n, cb) {
@@ -38,20 +38,9 @@ describe('many connections', function () {
3838
}
3939

4040
test(1, 1)
41-
test(1, 1)
42-
test(1, 1)
43-
test(5, 5)
4441
test(5, 5)
45-
test(5, 5)
46-
test(5, 5)
47-
test(10, 10)
4842
test(10, 10)
49-
test(10, 10)
50-
test(20, 20)
51-
test(20, 20)
5243
test(20, 20)
5344
test(30, 10)
54-
test(30, 10)
55-
test(30, 10)
5645
})
5746
})

packages/pg/lib/client.js

+11-1
Original file line numberDiff line numberDiff line change
@@ -377,11 +377,21 @@ class Client extends EventEmitter {
377377
}
378378

379379
_handleCommandComplete(msg) {
380+
if (this.activeQuery == null) {
381+
const error = new Error('Received unexpected commandComplete message from backend.')
382+
this._handleErrorEvent(error)
383+
return
384+
}
380385
// delegate commandComplete to active query
381386
this.activeQuery.handleCommandComplete(msg, this.connection)
382387
}
383388

384-
_handleParseComplete(msg) {
389+
_handleParseComplete() {
390+
if (this.activeQuery == null) {
391+
const error = new Error('Received unexpected parseComplete message from backend.')
392+
this._handleErrorEvent(error)
393+
return
394+
}
385395
// if a prepared statement has a name and properly parses
386396
// we track that its already been executed so we don't parse
387397
// it again on the same client

packages/pg/script/create-test-tables.js

+21-36
Original file line numberDiff line numberDiff line change
@@ -31,41 +31,26 @@ var people = [
3131
{ name: 'Zanzabar', age: 260 },
3232
]
3333

34-
var con = new pg.Client({
35-
user: args.user,
36-
password: args.password,
37-
host: args.host,
38-
port: args.port,
39-
database: args.database,
40-
})
41-
42-
con.connect((err) => {
43-
if (err) {
44-
throw err
45-
}
46-
47-
con.query(
48-
'DROP TABLE IF EXISTS person;' + ' CREATE TABLE person (id serial, name varchar(10), age integer)',
49-
(err) => {
50-
if (err) {
51-
throw err
52-
}
53-
54-
console.log('Created table person')
55-
console.log('Filling it with people')
56-
57-
con.query(
58-
'INSERT INTO person (name, age) VALUES' +
59-
people.map((person) => ` ('${person.name}', ${person.age})`).join(','),
60-
(err, result) => {
61-
if (err) {
62-
throw err
63-
}
64-
65-
console.log(`Inserted ${result.rowCount} people`)
66-
con.end()
67-
}
68-
)
69-
}
34+
async function run() {
35+
var con = new pg.Client({
36+
user: args.user,
37+
password: args.password,
38+
host: args.host,
39+
port: args.port,
40+
database: args.database,
41+
})
42+
console.log('creating test dataset')
43+
await con.connect()
44+
await con.query('DROP TABLE IF EXISTS person')
45+
await con.query('CREATE TABLE person (id serial, name varchar(10), age integer)')
46+
await con.query(
47+
'INSERT INTO person (name, age) VALUES' + people.map((person) => ` ('${person.name}', ${person.age})`).join(',')
7048
)
49+
await con.end()
50+
console.log('created test dataset')
51+
}
52+
53+
run().catch((e) => {
54+
console.log('setup failed', e)
55+
process.exit(255)
7156
})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
const net = require('net')
2+
const buffers = require('../../test-buffers')
3+
const helper = require('../test-helper')
4+
const assert = require('assert')
5+
const cli = require('../../cli')
6+
7+
const suite = new helper.Suite()
8+
9+
const options = {
10+
host: 'localhost',
11+
port: Math.floor(Math.random() * 2000) + 2000,
12+
connectionTimeoutMillis: 2000,
13+
user: 'not',
14+
database: 'existing',
15+
}
16+
17+
const startMockServer = (port, badBuffer, callback) => {
18+
const sockets = new Set()
19+
20+
const server = net.createServer((socket) => {
21+
sockets.add(socket)
22+
socket.once('end', () => sockets.delete(socket))
23+
24+
socket.on('data', (data) => {
25+
// deny request for SSL
26+
if (data.length === 8) {
27+
socket.write(Buffer.from('N', 'utf8'))
28+
return
29+
// consider all authentication requests as good
30+
}
31+
// the initial message coming in has a 0 message type for authentication negotiation
32+
if (!data[0]) {
33+
socket.write(buffers.authenticationOk())
34+
// send ReadyForQuery `timeout` ms after authentication
35+
socket.write(buffers.readyForQuery())
36+
return
37+
// respond with our canned response
38+
}
39+
const code = data.toString('utf8', 0, 1)
40+
switch (code) {
41+
// parse
42+
case 'P':
43+
socket.write(buffers.parseComplete())
44+
socket.write(buffers.bindComplete())
45+
socket.write(buffers.rowDescription())
46+
socket.write(buffers.dataRow())
47+
socket.write(buffers.commandComplete('FOO BAR'))
48+
socket.write(buffers.readyForQuery())
49+
// this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer
50+
setImmediate(() => {
51+
socket.write(badBuffer)
52+
})
53+
break
54+
case 'Q':
55+
socket.write(buffers.rowDescription())
56+
socket.write(buffers.dataRow())
57+
socket.write(buffers.commandComplete('FOO BAR'))
58+
socket.write(buffers.readyForQuery())
59+
// this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer
60+
setImmediate(() => {
61+
socket.write(badBuffer)
62+
})
63+
default:
64+
// console.log('got code', code)
65+
}
66+
})
67+
})
68+
69+
const closeServer = () => {
70+
for (const socket of sockets) {
71+
socket.destroy()
72+
}
73+
return new Promise((resolve) => {
74+
server.close(resolve)
75+
})
76+
}
77+
78+
server.listen(port, options.host, () => callback(closeServer))
79+
}
80+
81+
const delay = (ms) =>
82+
new Promise((resolve) => {
83+
setTimeout(resolve, ms)
84+
})
85+
86+
const testErrorBuffer = (bufferName, errorBuffer) => {
87+
suite.testAsync(`Out of order ${bufferName} on simple query is catchable`, async () => {
88+
const closeServer = await new Promise((resolve, reject) => {
89+
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
90+
})
91+
const client = new helper.Client(options)
92+
await client.connect()
93+
94+
let errorHit = false
95+
client.on('error', () => {
96+
errorHit = true
97+
})
98+
99+
await client.query('SELECT NOW()')
100+
await delay(50)
101+
102+
// the native client only emits a notice message and keeps on its merry way
103+
if (!cli.native) {
104+
assert(errorHit)
105+
// further queries on the client should fail since its in an invalid state
106+
await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject')
107+
}
108+
109+
await closeServer()
110+
})
111+
112+
suite.testAsync(`Out of order ${bufferName} on extended query is catchable`, async () => {
113+
const closeServer = await new Promise((resolve, reject) => {
114+
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
115+
})
116+
const client = new helper.Client(options)
117+
await client.connect()
118+
119+
let errorHit = false
120+
client.on('error', () => {
121+
errorHit = true
122+
})
123+
124+
await client.query('SELECT $1', ['foo'])
125+
await delay(40)
126+
127+
// the native client only emits a notice message and keeps on its merry way
128+
if (!cli.native) {
129+
assert(errorHit)
130+
// further queries on the client should fail since its in an invalid state
131+
await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject')
132+
}
133+
134+
await client.end()
135+
136+
await closeServer()
137+
})
138+
139+
suite.testAsync(`Out of order ${bufferName} on pool is catchable`, async () => {
140+
const closeServer = await new Promise((resolve, reject) => {
141+
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
142+
})
143+
const pool = new helper.pg.Pool(options)
144+
145+
let errorHit = false
146+
pool.on('error', () => {
147+
errorHit = true
148+
})
149+
150+
await pool.query('SELECT $1', ['foo'])
151+
await delay(100)
152+
153+
if (!cli.native) {
154+
assert(errorHit)
155+
assert.strictEqual(pool.idleCount, 0, 'Pool should have no idle clients')
156+
assert.strictEqual(pool.totalCount, 0, 'Pool should have no connected clients')
157+
}
158+
159+
await pool.end()
160+
await closeServer()
161+
})
162+
}
163+
164+
if (!helper.args.native) {
165+
testErrorBuffer('parseComplete', buffers.parseComplete())
166+
testErrorBuffer('commandComplete', buffers.commandComplete('f'))
167+
}

0 commit comments

Comments
 (0)