From bed8ffeb9796cb08792307fe8033b58f68321c94 Mon Sep 17 00:00:00 2001 From: Arnaud Benhamdine <arnaud.benhamdine@gmail.com> Date: Wed, 16 Feb 2022 15:03:06 +0100 Subject: [PATCH 1/4] Sent queries immediatly --- packages/pg/lib/client.js | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 589aa9f84..324d47bdd 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -53,6 +53,12 @@ class Client extends EventEmitter { encoding: this.connectionParameters.client_encoding || 'utf8', }) this.queryQueue = [] + + // Client.sentQueryQueue is the queue of queries that have been sent on the wire + this.sentQueryQueue = [] + // Client.sendImmediately can be set to false to restore the previous behavior + this.sendImmediately = true + this.binary = c.binary || defaults.binary this.processID = null this.secretKey = null @@ -286,6 +292,7 @@ class Client extends EventEmitter { const { activeQuery } = this this.activeQuery = null this.readyForQuery = true + this.handshakeDone = true if (activeQuery) { activeQuery.handleReadyForQuery(this.connection) } @@ -472,20 +479,32 @@ class Client extends EventEmitter { } _pulseQueryQueue() { + + if (!this.handshakeDone) { + return + } + + while ((this.sendImmediately && !this.blocked) || (this.activeQuery === null && this.sentQueryQueue.length === 0)) { + var query = this.queryQueue.shift() + if (!query) break + + const queryError = query.submit(this.connection) + if (queryError) { + process.nextTick(() => { + this.activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } + this.blocked = query.blocking + this.sentQueryQueue.push(query) + } + if (this.readyForQuery === true) { - this.activeQuery = this.queryQueue.shift() + this.activeQuery = this.sentQueryQueue.shift() if (this.activeQuery) { this.readyForQuery = false this.hasExecuted = true - - const queryError = this.activeQuery.submit(this.connection) - if (queryError) { - process.nextTick(() => { - this.activeQuery.handleError(queryError, this.connection) - this.readyForQuery = true - this._pulseQueryQueue() - }) - } } else if (this.hasExecuted) { this.activeQuery = null this.emit('drain') From ba6abd41eeef4feba020b65b1b44ebf55748192b Mon Sep 17 00:00:00 2001 From: Arnaud Benhamdine <arnaud.benhamdine@gmail.com> Date: Wed, 16 Feb 2022 19:04:03 +0100 Subject: [PATCH 2/4] Rename to match postgres docs, change the default to false --- packages/pg/lib/client.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 324d47bdd..a6bacfe95 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -56,8 +56,8 @@ class Client extends EventEmitter { // Client.sentQueryQueue is the queue of queries that have been sent on the wire this.sentQueryQueue = [] - // Client.sendImmediately can be set to false to restore the previous behavior - this.sendImmediately = true + // Client.pipelining can be set to true to enable experimental pipelining mode + this.pipelining = false this.binary = c.binary || defaults.binary this.processID = null @@ -484,7 +484,7 @@ class Client extends EventEmitter { return } - while ((this.sendImmediately && !this.blocked) || (this.activeQuery === null && this.sentQueryQueue.length === 0)) { + while ((this.pipelining && !this.blocked) || (this.activeQuery === null && this.sentQueryQueue.length === 0)) { var query = this.queryQueue.shift() if (!query) break From 11aaf68b969889ca68ad1e9324ef0b6bb39bb152 Mon Sep 17 00:00:00 2001 From: Arnaud Benhamdine <arnaud.benhamdine@gmail.com> Date: Wed, 16 Feb 2022 19:04:38 +0100 Subject: [PATCH 3/4] Add basic unit test --- .../pg/test/unit/client/simple-query-tests.js | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/packages/pg/test/unit/client/simple-query-tests.js b/packages/pg/test/unit/client/simple-query-tests.js index 2c3ea5e4e..bc61d2d44 100644 --- a/packages/pg/test/unit/client/simple-query-tests.js +++ b/packages/pg/test/unit/client/simple-query-tests.js @@ -60,6 +60,38 @@ test('executing query', function () { }) }) + test("multiple in the queue, pipelining mode", function () { + var client = helper.client() + client.pipelining = true + var connection = client.connection + var queries = connection.queries + client.query('one') + client.query('two') + client.query('three') + assert.empty(queries) + + test("after one ready for query", function () { + connection.emit('readyForQuery') + assert.lengthIs(queries, 3) + assert.equal(queries[0], "one") + }) + + test('after two ready for query', function () { + connection.emit('readyForQuery') + assert.lengthIs(queries, 3) + }) + + test("after a bunch more", function () { + connection.emit('readyForQuery') + connection.emit('readyForQuery') + connection.emit('readyForQuery') + assert.lengthIs(queries, 3) + assert.equal(queries[0], "one") + assert.equal(queries[1], 'two') + assert.equal(queries[2], 'three') + }) + }) + test('query event binding and flow', function () { var client = helper.client() var con = client.connection From a33084f10028a9b4725bd796afd9b06adfe7cc9a Mon Sep 17 00:00:00 2001 From: Arnaud Benhamdine <arnaud.benhamdine@gmail.com> Date: Sat, 19 Feb 2022 21:11:42 +0100 Subject: [PATCH 4/4] Don't prepare named statements twice --- packages/pg/lib/client.js | 4 ++++ packages/pg/lib/connection.js | 2 ++ packages/pg/lib/query.js | 6 ++++++ 3 files changed, 12 insertions(+) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index a6bacfe95..a0c5f5f11 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -498,6 +498,10 @@ class Client extends EventEmitter { } this.blocked = query.blocking this.sentQueryQueue.push(query) + if (query.name) { + console.log(`we store that ${query.name} has been submitted`) + this.connection.submittedNamedStatements[query.name] = query.text + } } if (this.readyForQuery === true) { diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index ebb2f099d..5dd1ee8ca 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -19,6 +19,8 @@ class Connection extends EventEmitter { this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.lastBuffer = false this.parsedStatements = {} + // to track preparation of statements submitted to server + this.submittedNamedStatements = {} this.ssl = config.ssl || false this._ending = false this._emitMessage = false diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index c0dfedd1e..5c15af088 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -160,6 +160,12 @@ class Query extends EventEmitter { } hasBeenParsed(connection) { + if (connection.submittedNamedStatements[this.name]) { + console.log(`-----------------------------------`) + console.log(`query.hasBeenParsed : This statement has already been prepared`) + console.log(`-----------------------------------`) + return true + } return this.name && connection.parsedStatements[this.name] }