diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 589aa9f84..a0c5f5f11 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -53,6 +53,12 @@ class Client extends EventEmitter { encoding: this.connectionParameters.client_encoding || 'utf8', }) this.queryQueue = [] + + // Client.sentQueryQueue is the queue of queries that have been sent on the wire + this.sentQueryQueue = [] + // Client.pipelining can be set to true to enable experimental pipelining mode + this.pipelining = false + this.binary = c.binary || defaults.binary this.processID = null this.secretKey = null @@ -286,6 +292,7 @@ class Client extends EventEmitter { const { activeQuery } = this this.activeQuery = null this.readyForQuery = true + this.handshakeDone = true if (activeQuery) { activeQuery.handleReadyForQuery(this.connection) } @@ -472,20 +479,36 @@ class Client extends EventEmitter { } _pulseQueryQueue() { + + if (!this.handshakeDone) { + return + } + + while ((this.pipelining && !this.blocked) || (this.activeQuery === null && this.sentQueryQueue.length === 0)) { + var query = this.queryQueue.shift() + if (!query) break + + const queryError = query.submit(this.connection) + if (queryError) { + process.nextTick(() => { + this.activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } + this.blocked = query.blocking + this.sentQueryQueue.push(query) + if (query.name) { + console.log(`we store that ${query.name} has been submitted`) + this.connection.submittedNamedStatements[query.name] = query.text + } + } + if (this.readyForQuery === true) { - this.activeQuery = this.queryQueue.shift() + this.activeQuery = this.sentQueryQueue.shift() if (this.activeQuery) { this.readyForQuery = false this.hasExecuted = true - - const queryError = this.activeQuery.submit(this.connection) - if (queryError) { - process.nextTick(() => { - this.activeQuery.handleError(queryError, this.connection) - this.readyForQuery = true - this._pulseQueryQueue() - }) - } } else if (this.hasExecuted) { this.activeQuery = null this.emit('drain') diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index ebb2f099d..5dd1ee8ca 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -19,6 +19,8 @@ class Connection extends EventEmitter { this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.lastBuffer = false this.parsedStatements = {} + // to track preparation of statements submitted to server + this.submittedNamedStatements = {} this.ssl = config.ssl || false this._ending = false this._emitMessage = false diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index c0dfedd1e..5c15af088 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -160,6 +160,12 @@ class Query extends EventEmitter { } hasBeenParsed(connection) { + if (connection.submittedNamedStatements[this.name]) { + console.log(`-----------------------------------`) + console.log(`query.hasBeenParsed : This statement has already been prepared`) + console.log(`-----------------------------------`) + return true + } return this.name && connection.parsedStatements[this.name] } diff --git a/packages/pg/test/unit/client/simple-query-tests.js b/packages/pg/test/unit/client/simple-query-tests.js index 2c3ea5e4e..bc61d2d44 100644 --- a/packages/pg/test/unit/client/simple-query-tests.js +++ b/packages/pg/test/unit/client/simple-query-tests.js @@ -60,6 +60,38 @@ test('executing query', function () { }) }) + test("multiple in the queue, pipelining mode", function () { + var client = helper.client() + client.pipelining = true + var connection = client.connection + var queries = connection.queries + client.query('one') + client.query('two') + client.query('three') + assert.empty(queries) + + test("after one ready for query", function () { + connection.emit('readyForQuery') + assert.lengthIs(queries, 3) + assert.equal(queries[0], "one") + }) + + test('after two ready for query', function () { + connection.emit('readyForQuery') + assert.lengthIs(queries, 3) + }) + + test("after a bunch more", function () { + connection.emit('readyForQuery') + connection.emit('readyForQuery') + connection.emit('readyForQuery') + assert.lengthIs(queries, 3) + assert.equal(queries[0], "one") + assert.equal(queries[1], 'two') + assert.equal(queries[2], 'three') + }) + }) + test('query event binding and flow', function () { var client = helper.client() var con = client.connection