diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 4ccaffeac..d0dc2fbd3 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -52,6 +52,7 @@ class Client extends EventEmitter { keepAlive: c.keepAlive || false, keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, encoding: this.connectionParameters.client_encoding || 'utf8', + maxResultSize: c.maxResultSize, }) this.queryQueue = [] this.binary = c.binary || defaults.binary diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index c426b152c..d08a26c43 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -27,6 +27,8 @@ class Connection extends EventEmitter { this.ssl = config.ssl || false this._ending = false this._emitMessage = false + this._maxResultSize = config.maxResultSize + this._currentResultSize = 0 var self = this this.on('newListener', function (eventName) { if (eventName === 'message') { @@ -108,6 +110,17 @@ class Connection extends EventEmitter { } attachListeners(stream) { + var self = this + // Use the appropriate implementation based on whether maxResultSize is enabled + if (self._maxResultSize && self._maxResultSize > 0) { + this._attachListenersWithSizeLimit(stream) + } else { + this._attachListenersStandard(stream) + } + } + + // Original implementation with no overhead + _attachListenersStandard(stream) { parse(stream, (msg) => { var eventName = msg.name === 'error' ? 'errorMessage' : msg.name if (this._emitMessage) { @@ -117,6 +130,41 @@ class Connection extends EventEmitter { }) } + // Implementation with size limiting logic + _attachListenersWithSizeLimit(stream) { + parse(stream, (msg) => { + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name + + // Only track data row messages for result size + if (msg.name === 'dataRow') { + // Approximate size by using message length + const msgSize = msg.length || 1024 // Default to 1KB if we don't have lenght info + this._currentResultSize += msgSize + + // Check if we've exceeded the max result size + if (this._currentResultSize > this._maxResultSize) { + const error = new Error('Query result size exceeded the configured limit') + error.code = 'RESULT_SIZE_EXCEEDED' + error.resultSize = this._currentResultSize + error.maxResultSize = this._maxResultSize + this.emit('errorMessage', error) + this.end() // Terminate the connection + return + } + } + + // Reset counter on query completion + if (msg.name === 'readyForQuery') { + this._currentResultSize = 0 + } + + if (this._emitMessage) { + this.emit('message', msg) + } + this.emit(eventName, msg) + }) + } + requestSsl() { this.stream.write(serialize.requestSsl()) } diff --git a/packages/pg/lib/defaults.js b/packages/pg/lib/defaults.js index 5c5d997d2..6fc9538b3 100644 --- a/packages/pg/lib/defaults.js +++ b/packages/pg/lib/defaults.js @@ -70,6 +70,8 @@ module.exports = { keepalives: 1, keepalives_idle: 0, + // maxResultSize limit of a request before erroring out + maxResultSize: undefined, } var pgTypes = require('pg-types') diff --git a/packages/pg/lib/native/client.js b/packages/pg/lib/native/client.js index 6494375f1..fef62ecdc 100644 --- a/packages/pg/lib/native/client.js +++ b/packages/pg/lib/native/client.js @@ -26,6 +26,9 @@ var Client = (module.exports = function (config) { types: this._types, }) + // Store maxResultSize configuration + this._maxResultSize = config.maxResultSize + this._queryQueue = [] this._ending = false this._connecting = false @@ -100,6 +103,9 @@ Client.prototype._connect = function (cb) { // set internal states to connected self._connected = true + // Add a reference to the client for error bubbling + self.native.connection = self + // handle connection errors from the native layer self.native.on('error', function (err) { self._queryable = false diff --git a/packages/pg/lib/native/query.js b/packages/pg/lib/native/query.js index 0cfed1fda..d1012d77d 100644 --- a/packages/pg/lib/native/query.js +++ b/packages/pg/lib/native/query.js @@ -56,6 +56,17 @@ NativeQuery.prototype.handleError = function (err) { err[normalizedFieldName] = fields[key] } } + + // For maxResultSize exceeded errors, make sure we emit the error to the client too + if (err.code === 'RESULT_SIZE_EXCEEDED') { + if (this.native && this.native.connection) { + // Need to emit the error on the client/connection level too + process.nextTick(() => { + this.native.connection.emit('error', err) + }) + } + } + if (this.callback) { this.callback(err) } else { @@ -89,6 +100,9 @@ NativeQuery.prototype.submit = function (client) { this.native = client.native client.native.arrayMode = this._arrayMode + // Get the maxResultSize from the client if it's set + this._maxResultSize = client._maxResultSize + var after = function (err, rows, results) { client.native.arrayMode = false setImmediate(function () { @@ -100,6 +114,30 @@ NativeQuery.prototype.submit = function (client) { return self.handleError(err) } + // Check the result size if maxResultSize is configured + if (self._maxResultSize) { + // Calculate result size (rough approximation) + let resultSize = 0 + + // For multiple result sets + if (results.length > 1) { + for (let i = 0; i < rows.length; i++) { + resultSize += self._calculateResultSize(rows[i]) + } + } else if (rows.length > 0) { + resultSize = self._calculateResultSize(rows) + } + + // If the size limit is exceeded, generate an error + if (resultSize > self._maxResultSize) { + const error = new Error('Query result size exceeded the configured limit') + error.code = 'RESULT_SIZE_EXCEEDED' + error.resultSize = resultSize + error.maxResultSize = self._maxResultSize + return self.handleError(error) + } + } + // emit row events for each row in the result if (self._emitRowEvents) { if (results.length > 1) { @@ -166,3 +204,59 @@ NativeQuery.prototype.submit = function (client) { client.native.query(this.text, after) } } + +// Helper method to estimate the size of a result set +NativeQuery.prototype._calculateResultSize = function (rows) { + let size = 0 + + // For empty results, return 0 + if (!rows || rows.length === 0) { + return 0 + } + + // For array mode, calculate differently + if (this._arrayMode) { + // Just use a rough approximation based on number of rows + return rows.length * 100 + } + + // For each row, approximate its size + for (let i = 0; i < rows.length; i++) { + const row = rows[i] + + // Add base row size + size += 24 // Overhead per row + + // Add size of each column + for (const key in row) { + if (Object.prototype.hasOwnProperty.call(row, key)) { + const value = row[key] + + // Add key size + size += key.length * 2 // Assume 2 bytes per character + + // Add value size based on type + if (value === null || value === undefined) { + size += 8 + } else if (typeof value === 'string') { + size += value.length * 2 // Assume 2 bytes per character + } else if (typeof value === 'number') { + size += 8 + } else if (typeof value === 'boolean') { + size += 4 + } else if (value instanceof Date) { + size += 8 + } else if (Buffer.isBuffer(value)) { + size += value.length + } else if (Array.isArray(value)) { + size += 16 + value.length * 8 + } else { + // For objects, use a rough estimate + size += 32 + JSON.stringify(value).length * 2 + } + } + } + } + + return size +} diff --git a/packages/pg/test/integration/client/max-result-size-tests.js b/packages/pg/test/integration/client/max-result-size-tests.js new file mode 100644 index 000000000..e37ba3a94 --- /dev/null +++ b/packages/pg/test/integration/client/max-result-size-tests.js @@ -0,0 +1,131 @@ +'use strict' +const helper = require('../test-helper') +const pg = helper.pg +const assert = require('assert') + +process.on('unhandledRejection', function (e) { + console.error(e, e.stack) + process.exit(1) +}) + +const suite = new helper.Suite() + +suite.test('maxResultSize limit triggers error', (cb) => { + // Check if we're running with the native client + const isNative = helper.args.native + console.log(isNative ? 'Testing with native client' : 'Testing with JavaScript client') + + // Create a pool with a very small result size limit + const pool = new pg.Pool({ + maxResultSize: 100, // Very small limit (100 bytes) + ...helper.args, + }) + + let sizeExceededErrorSeen = false + + pool.on('error', (err) => { + console.log('Pool error:', err.message, err.code) + }) + + pool + .connect() + .then((client) => { + // Set up client error listener for error events + client.on('error', (err) => { + console.log('Client error event:', err.message, err.code) + + // If we get any size exceeded error, mark it + if (err.code === 'RESULT_SIZE_EXCEEDED' || err.message === 'Query result size exceeded the configured limit') { + sizeExceededErrorSeen = true + } + }) + + return client + .query('CREATE TEMP TABLE large_result_test(id SERIAL, data TEXT)') + .then(() => { + // Insert rows that will exceed the size limit when queried + const insertPromises = [] + for (let i = 0; i < 20; i++) { + // Each row will have enough data to eventually exceed our limit + const data = 'x'.repeat(50) + insertPromises.push(client.query('INSERT INTO large_result_test(data) VALUES($1)', [data])) + } + return Promise.all(insertPromises) + }) + .then(() => { + console.log('Running query that should exceed size limit...') + + return client + .query('SELECT * FROM large_result_test') + .then(() => { + throw new Error('Query should have failed due to size limit') + }) + .catch((err) => { + console.log('Query error caught:', err.message, err.code) + + // Both implementations should throw an error with this code + assert.equal(err.code, 'RESULT_SIZE_EXCEEDED', 'Error should have RESULT_SIZE_EXCEEDED code') + + // Give time for error events to propagate + return new Promise((resolve) => setTimeout(resolve, 100)).then(() => { + // Verify we saw the error event + assert(sizeExceededErrorSeen, 'Should have seen the size exceeded error event') + + return client.query('DROP TABLE IF EXISTS large_result_test').catch(() => { + /* ignore cleanup errors */ + }) + }) + }) + }) + .then(() => { + client.release() + pool.end(cb) + }) + .catch((err) => { + console.error('Test error:', err.message) + client.release() + pool.end(() => cb(err)) + }) + }) + .catch((err) => { + console.error('Connection error:', err.message) + pool.end(() => cb(err)) + }) +}) + +suite.test('results under maxResultSize limit work correctly', (cb) => { + // Create a pool with a reasonably large limit + const pool = new pg.Pool({ + maxResultSize: 10 * 1024, // 10KB is plenty for small results + ...helper.args, + }) + + pool + .connect() + .then((client) => { + return client + .query('CREATE TEMP TABLE small_result_test(id SERIAL, data TEXT)') + .then(() => { + return client.query('INSERT INTO small_result_test(data) VALUES($1)', ['small_data']) + }) + .then(() => { + return client.query('SELECT * FROM small_result_test').then((result) => { + assert.equal(result.rows.length, 1, 'Should get 1 row') + assert.equal(result.rows[0].data, 'small_data', 'Data should match') + + return client.query('DROP TABLE small_result_test') + }) + }) + .then(() => { + client.release() + pool.end(cb) + }) + .catch((err) => { + client.release() + pool.end(() => cb(err)) + }) + }) + .catch((err) => { + pool.end(() => cb(err)) + }) +})