Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add max result size #3406

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Client extends EventEmitter {
keepAlive: c.keepAlive || false,
keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
encoding: this.connectionParameters.client_encoding || 'utf8',
maxResultSize: c.maxResultSize,
})
this.queryQueue = []
this.binary = c.binary || defaults.binary
Expand Down
48 changes: 48 additions & 0 deletions packages/pg/lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class Connection extends EventEmitter {
this.ssl = config.ssl || false
this._ending = false
this._emitMessage = false
this._maxResultSize = config.maxResultSize
this._currentResultSize = 0
var self = this
this.on('newListener', function (eventName) {
if (eventName === 'message') {
Expand Down Expand Up @@ -108,6 +110,17 @@ class Connection extends EventEmitter {
}

attachListeners(stream) {
var self = this
// Use the appropriate implementation based on whether maxResultSize is enabled
if (self._maxResultSize && self._maxResultSize > 0) {
this._attachListenersWithSizeLimit(stream)
} else {
this._attachListenersStandard(stream)
}
}

// Original implementation with no overhead
_attachListenersStandard(stream) {
parse(stream, (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (this._emitMessage) {
Expand All @@ -117,6 +130,41 @@ class Connection extends EventEmitter {
})
}

// Implementation with size limiting logic
_attachListenersWithSizeLimit(stream) {
parse(stream, (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name

// Only track data row messages for result size
if (msg.name === 'dataRow') {
// Approximate size by using message length
const msgSize = msg.length || 1024 // Default to 1KB if we don't have lenght info
this._currentResultSize += msgSize

// Check if we've exceeded the max result size
if (this._currentResultSize > this._maxResultSize) {
const error = new Error('Query result size exceeded the configured limit')
error.code = 'RESULT_SIZE_EXCEEDED'
error.resultSize = this._currentResultSize
error.maxResultSize = this._maxResultSize
this.emit('errorMessage', error)
this.end() // Terminate the connection
return
}
}

// Reset counter on query completion
if (msg.name === 'readyForQuery') {
this._currentResultSize = 0
}

if (this._emitMessage) {
this.emit('message', msg)
}
this.emit(eventName, msg)
})
}

requestSsl() {
this.stream.write(serialize.requestSsl())
}
Expand Down
2 changes: 2 additions & 0 deletions packages/pg/lib/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ module.exports = {
keepalives: 1,

keepalives_idle: 0,
// maxResultSize limit of a request before erroring out
maxResultSize: undefined,
}

var pgTypes = require('pg-types')
Expand Down
6 changes: 6 additions & 0 deletions packages/pg/lib/native/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ var Client = (module.exports = function (config) {
types: this._types,
})

// Store maxResultSize configuration
this._maxResultSize = config.maxResultSize

this._queryQueue = []
this._ending = false
this._connecting = false
Expand Down Expand Up @@ -100,6 +103,9 @@ Client.prototype._connect = function (cb) {
// set internal states to connected
self._connected = true

// Add a reference to the client for error bubbling
self.native.connection = self

// handle connection errors from the native layer
self.native.on('error', function (err) {
self._queryable = false
Expand Down
94 changes: 94 additions & 0 deletions packages/pg/lib/native/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ NativeQuery.prototype.handleError = function (err) {
err[normalizedFieldName] = fields[key]
}
}

// For maxResultSize exceeded errors, make sure we emit the error to the client too
if (err.code === 'RESULT_SIZE_EXCEEDED') {
if (this.native && this.native.connection) {
// Need to emit the error on the client/connection level too
process.nextTick(() => {
this.native.connection.emit('error', err)
})
}
}

if (this.callback) {
this.callback(err)
} else {
Expand Down Expand Up @@ -89,6 +100,9 @@ NativeQuery.prototype.submit = function (client) {
this.native = client.native
client.native.arrayMode = this._arrayMode

// Get the maxResultSize from the client if it's set
this._maxResultSize = client._maxResultSize

var after = function (err, rows, results) {
client.native.arrayMode = false
setImmediate(function () {
Expand All @@ -100,6 +114,30 @@ NativeQuery.prototype.submit = function (client) {
return self.handleError(err)
}

// Check the result size if maxResultSize is configured
if (self._maxResultSize) {
// Calculate result size (rough approximation)
let resultSize = 0

// For multiple result sets
if (results.length > 1) {
for (let i = 0; i < rows.length; i++) {
resultSize += self._calculateResultSize(rows[i])
}
} else if (rows.length > 0) {
resultSize = self._calculateResultSize(rows)
}

// If the size limit is exceeded, generate an error
if (resultSize > self._maxResultSize) {
const error = new Error('Query result size exceeded the configured limit')
error.code = 'RESULT_SIZE_EXCEEDED'
error.resultSize = resultSize
error.maxResultSize = self._maxResultSize
return self.handleError(error)
}
}

// emit row events for each row in the result
if (self._emitRowEvents) {
if (results.length > 1) {
Expand Down Expand Up @@ -166,3 +204,59 @@ NativeQuery.prototype.submit = function (client) {
client.native.query(this.text, after)
}
}

// Helper method to estimate the size of a result set
NativeQuery.prototype._calculateResultSize = function (rows) {
let size = 0

// For empty results, return 0
if (!rows || rows.length === 0) {
return 0
}

// For array mode, calculate differently
if (this._arrayMode) {
// Just use a rough approximation based on number of rows
return rows.length * 100
}

// For each row, approximate its size
for (let i = 0; i < rows.length; i++) {
const row = rows[i]

// Add base row size
size += 24 // Overhead per row

// Add size of each column
for (const key in row) {
if (Object.prototype.hasOwnProperty.call(row, key)) {
const value = row[key]

// Add key size
size += key.length * 2 // Assume 2 bytes per character

// Add value size based on type
if (value === null || value === undefined) {
size += 8
} else if (typeof value === 'string') {
size += value.length * 2 // Assume 2 bytes per character
} else if (typeof value === 'number') {
size += 8
} else if (typeof value === 'boolean') {
size += 4
} else if (value instanceof Date) {
size += 8
} else if (Buffer.isBuffer(value)) {
size += value.length
} else if (Array.isArray(value)) {
size += 16 + value.length * 8
} else {
// For objects, use a rough estimate
size += 32 + JSON.stringify(value).length * 2
}
}
}
}

return size
}
131 changes: 131 additions & 0 deletions packages/pg/test/integration/client/max-result-size-tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
'use strict'
const helper = require('../test-helper')
const pg = helper.pg
const assert = require('assert')

process.on('unhandledRejection', function (e) {
console.error(e, e.stack)
process.exit(1)
})

const suite = new helper.Suite()

suite.test('maxResultSize limit triggers error', (cb) => {
// Check if we're running with the native client
const isNative = helper.args.native
console.log(isNative ? 'Testing with native client' : 'Testing with JavaScript client')

// Create a pool with a very small result size limit
const pool = new pg.Pool({
maxResultSize: 100, // Very small limit (100 bytes)
...helper.args,
})

let sizeExceededErrorSeen = false

pool.on('error', (err) => {
console.log('Pool error:', err.message, err.code)
})

pool
.connect()
.then((client) => {
// Set up client error listener for error events
client.on('error', (err) => {
console.log('Client error event:', err.message, err.code)

// If we get any size exceeded error, mark it
if (err.code === 'RESULT_SIZE_EXCEEDED' || err.message === 'Query result size exceeded the configured limit') {
sizeExceededErrorSeen = true
}
})

return client
.query('CREATE TEMP TABLE large_result_test(id SERIAL, data TEXT)')
.then(() => {
// Insert rows that will exceed the size limit when queried
const insertPromises = []
for (let i = 0; i < 20; i++) {
// Each row will have enough data to eventually exceed our limit
const data = 'x'.repeat(50)
insertPromises.push(client.query('INSERT INTO large_result_test(data) VALUES($1)', [data]))
}
return Promise.all(insertPromises)
})
.then(() => {
console.log('Running query that should exceed size limit...')

return client
.query('SELECT * FROM large_result_test')
.then(() => {
throw new Error('Query should have failed due to size limit')
})
.catch((err) => {
console.log('Query error caught:', err.message, err.code)

// Both implementations should throw an error with this code
assert.equal(err.code, 'RESULT_SIZE_EXCEEDED', 'Error should have RESULT_SIZE_EXCEEDED code')

// Give time for error events to propagate
return new Promise((resolve) => setTimeout(resolve, 100)).then(() => {
// Verify we saw the error event
assert(sizeExceededErrorSeen, 'Should have seen the size exceeded error event')

return client.query('DROP TABLE IF EXISTS large_result_test').catch(() => {
/* ignore cleanup errors */
})
})
})
})
.then(() => {
client.release()
pool.end(cb)
})
.catch((err) => {
console.error('Test error:', err.message)
client.release()
pool.end(() => cb(err))
})
})
.catch((err) => {
console.error('Connection error:', err.message)
pool.end(() => cb(err))
})
})

suite.test('results under maxResultSize limit work correctly', (cb) => {
// Create a pool with a reasonably large limit
const pool = new pg.Pool({
maxResultSize: 10 * 1024, // 10KB is plenty for small results
...helper.args,
})

pool
.connect()
.then((client) => {
return client
.query('CREATE TEMP TABLE small_result_test(id SERIAL, data TEXT)')
.then(() => {
return client.query('INSERT INTO small_result_test(data) VALUES($1)', ['small_data'])
})
.then(() => {
return client.query('SELECT * FROM small_result_test').then((result) => {
assert.equal(result.rows.length, 1, 'Should get 1 row')
assert.equal(result.rows[0].data, 'small_data', 'Data should match')

return client.query('DROP TABLE small_result_test')
})
})
.then(() => {
client.release()
pool.end(cb)
})
.catch((err) => {
client.release()
pool.end(() => cb(err))
})
})
.catch((err) => {
pool.end(() => cb(err))
})
})