From d6bb4749a15b17808e843baa2c8359dd9a02c13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Wed, 25 Aug 2021 18:37:25 -0400 Subject: [PATCH 1/8] Add the pump module and use it on node's without stream.pipeline --- lib/needle.js | 20 +++++--- package-lock.json | 60 +++++++----------------- package.json | 1 + test/socket_cleanup_spec.js | 91 +++++++++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 49 deletions(-) create mode 100644 test/socket_cleanup_spec.js diff --git a/lib/needle.js b/lib/needle.js index 0d0a469de..efcecec00 100644 --- a/lib/needle.js +++ b/lib/needle.js @@ -11,6 +11,7 @@ var fs = require('fs'), url = require('url'), stream = require('stream'), debug = require('debug')('needle'), + pump = require('pump'), // stream.pipeline for older versions of node stringify = require('./querystring').build, multipart = require('./multipart'), auth = require('./auth'), @@ -160,6 +161,13 @@ function get_stream_length(stream, given_length, cb) { }); } +function pump_streams(streams, cb) { + if (stream.pipeline) // v10 or higher + return stream.pipeline.apply(null, streams.concat(cb)); + + pump.apply(null, streams.concat(cb)); +} + ////////////////////////////////////////// // the main act @@ -613,9 +621,10 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, // And `out` is the stream we finally push the decoded/parsed output to. pipeline.push(out); - // Now, release the kraken! - function pipelineCb(err) { if (err) debug(err) } - stream.pipeline.apply(null, [resp].concat(pipeline).concat(pipelineCb)); + pump_streams([resp].concat(pipeline), function(err) { + if (err) debug(err) + // if (err && err.code == 'ERR_STREAM_PREMATURE_CLOSE') request.destroy(); + }); // If the user has requested and output file, pipe the output stream to it. // In stream mode, we will still get the response stream to play with. @@ -665,7 +674,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, } }) - stream.pipeline(resp, clean_pipe, function(err) { + pump_streams(resp, clean_pipe, function(err) { if (err) debug(err); }); @@ -747,7 +756,6 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, set_timeout('response', config.response_timeout); } - // console.log(socket); if (!socket.on_socket_end) { socket.on_socket_end = on_socket_end; socket.once('end', function() { process.nextTick(on_socket_end.bind(socket)) }); @@ -756,7 +764,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, if (post_data) { if (is_stream(post_data)) { - stream.pipeline(post_data, request, function(err) { + pump_streams([post_data, request], function(err) { if (err) debug(err); }); } else { diff --git a/package-lock.json b/package-lock.json index 5866e450c..bb7c33366 100644 --- a/package-lock.json +++ b/package-lock.json @@ -62,6 +62,14 @@ "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", "dev": true }, + "end-of-stream": { + "version": "1.4.4", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", + "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", + "requires": { + "once": "^1.4.0" + } + }, "escape-string-regexp": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz", @@ -148,24 +156,12 @@ "integrity": "sha512-xYuhvQ7I9PDJIGBWev9xm0+SMSed3ZDBAmvVjbFR1ZRLAF+vlXcQu6cRI9uAlj81rzikElRVteehwV7DuX2ZmQ==", "dev": true }, - "json-stringify-safe": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz", - "integrity": "sha1-Epai1Y/UXxmg9s4B1lcB4sc1tus=", - "dev": true - }, "jsonparse": { "version": "1.3.1", "resolved": "https://registry.npmjs.org/jsonparse/-/jsonparse-1.3.1.tgz", "integrity": "sha1-P02uSpH6wxX3EGL4UhzCOfE2YoA=", "dev": true }, - "lodash": { - "version": "4.17.15", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.15.tgz", - "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==", - "dev": true - }, "lolex": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/lolex/-/lolex-1.6.0.tgz", @@ -243,34 +239,10 @@ "integrity": "sha1-IKMYwwy0X3H+et+/eyHJnBRy7xE=", "dev": true }, - "nock": { - "version": "12.0.3", - "resolved": "https://registry.npmjs.org/nock/-/nock-12.0.3.tgz", - "integrity": "sha512-QNb/j8kbFnKCiyqi9C5DD0jH/FubFGj5rt9NQFONXwQm3IPB0CULECg/eS3AU1KgZb/6SwUa4/DTRKhVxkGABw==", - "dev": true, - "requires": { - "debug": "^4.1.0", - "json-stringify-safe": "^5.0.1", - "lodash": "^4.17.13", - "propagate": "^2.0.0" - }, - "dependencies": { - "debug": { - "version": "4.1.1", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", - "integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==", - "dev": true, - "requires": { - "ms": "^2.1.1" - } - } - } - }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "dev": true, "requires": { "wrappy": "1" } @@ -290,11 +262,14 @@ "isarray": "0.0.1" } }, - "propagate": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/propagate/-/propagate-2.0.1.tgz", - "integrity": "sha512-vGrhOavPSTz4QVNuBNdcNXePNdNMaO1xj9yBeH1ScQPjk/rhg9sSlCXPhMkFuaNNW/syTvYqsnbIJxMBfRbbag==", - "dev": true + "pump": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", + "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", + "requires": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + } }, "q": { "version": "1.5.1", @@ -413,8 +388,7 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", - "dev": true + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "xml2js": { "version": "0.4.19", diff --git a/package.json b/package.json index 10af1456f..35f9e1d19 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "dependencies": { "debug": "^3.2.6", "iconv-lite": "^0.4.4", + "pump": "^3.0.0", "sax": "^1.2.4" }, "devDependencies": { diff --git a/test/socket_cleanup_spec.js b/test/socket_cleanup_spec.js new file mode 100644 index 000000000..c23f63a45 --- /dev/null +++ b/test/socket_cleanup_spec.js @@ -0,0 +1,91 @@ +var should = require('should'), + needle = require('./../'), + fs = require('fs'), + https = require('https'), + stream = require('stream'); + +describe('socket cleanup', function(){ + + var outFile = 'test/tmp'; + var httpAgent, endFired, closeFired, file, url, readStream, writeStream + + before(function() { + httpAgent = new https.Agent({ + keepAlive : true, + maxSockets : 1 + }); + + endFired = false; + closeFired = false; + file = 'ubuntu-21.04-desktop-amd64.iso'; + url = 'https://releases.ubuntu.com/21.04/' + file; + readStream = needle.get(url, { agent: httpAgent }); + writeStream = fs.createWriteStream(outFile); + }) + + after(function() { + httpAgent.destroy() + fs.unlinkSync(outFile); + }) + + it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using stream.pipeline)', function(done) { + + if (!stream.pipeline) + return done(); // unsupported + + readStream.on('end', function(e) { + endFired = true; + }); + + readStream.on('close', function(e) { + closeFired = true; + }); + + stream.pipeline(readStream, writeStream, function(err) { + // should.not.exist(err); + // err.should.be.an.Error; + // err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') + // if (err) readStream.request.destroy(); + }); + + setTimeout(function() { + writeStream.destroy(); + }, 100); + + setTimeout(function() { + var activeSockets = Object.keys(httpAgent.sockets).length; + activeSockets.should.eql(0); + // endFired.should.eql(true); + // closeFired.should.eql(true); + done() + }, 1000) + + }) + + it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using .pipe)', function(done) { + + readStream.on('end', function(e) { + endFired = true; + }); + + readStream.on('close', function(e) { + closeFired = true; + }); + + readStream.pipe(writeStream); + + setTimeout(function() { + writeStream.destroy(); + }, 100); + + setTimeout(function() { + var activeSockets = Object.keys(httpAgent.sockets).length; + activeSockets.should.eql(0); + // endFired.should.eql(true); + // closeFired.should.eql(true); + done() + }, 1000) + + }) + +}) \ No newline at end of file From 37802c2f3ee48d5f2d2f6b934dd04b0c9a2d4b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Wed, 25 Aug 2021 19:27:02 -0400 Subject: [PATCH 2/8] Trying out some things --- lib/needle.js | 15 ++++++-- test/socket_cleanup_spec.js | 74 ++++++++++++++++++++----------------- 2 files changed, 51 insertions(+), 38 deletions(-) diff --git a/lib/needle.js b/lib/needle.js index efcecec00..c4f413f67 100644 --- a/lib/needle.js +++ b/lib/needle.js @@ -19,6 +19,8 @@ var fs = require('fs'), parsers = require('./parsers'), decoder = require('./decoder'); +if (!stream.pipeline) stream.pipeline = pump; + ////////////////////////////////////////// // variabilia @@ -162,10 +164,7 @@ function get_stream_length(stream, given_length, cb) { } function pump_streams(streams, cb) { - if (stream.pipeline) // v10 or higher - return stream.pipeline.apply(null, streams.concat(cb)); - - pump.apply(null, streams.concat(cb)); + return stream.pipeline.apply(null, streams.concat(cb)); } ////////////////////////////////////////// @@ -733,6 +732,14 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, }); + out.on('error', function(err) { + // had_error(err); + if (err.code == 'ERR_STREAM_DESTROYED') { + request.abort(); + // out.trigger('close'); + } + }) + }); // end request call // unless open_timeout was disabled, set a timeout to abort the request. diff --git a/test/socket_cleanup_spec.js b/test/socket_cleanup_spec.js index c23f63a45..df5dd0b25 100644 --- a/test/socket_cleanup_spec.js +++ b/test/socket_cleanup_spec.js @@ -6,15 +6,23 @@ var should = require('should'), describe('socket cleanup', function(){ + this.timeout(5000); + var outFile = 'test/tmp'; var httpAgent, endFired, closeFired, file, url, readStream, writeStream + function getActiveSockets() { + return Object.keys(httpAgent.sockets).length + } + before(function() { httpAgent = new https.Agent({ keepAlive : true, maxSockets : 1 }); + getActiveSockets().should.eql(0); + endFired = false; closeFired = false; file = 'ubuntu-21.04-desktop-amd64.iso'; @@ -28,11 +36,7 @@ describe('socket cleanup', function(){ fs.unlinkSync(outFile); }) - it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using stream.pipeline)', function(done) { - - if (!stream.pipeline) - return done(); // unsupported - + it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using .pipe)', function(done) { readStream.on('end', function(e) { endFired = true; }); @@ -41,51 +45,53 @@ describe('socket cleanup', function(){ closeFired = true; }); - stream.pipeline(readStream, writeStream, function(err) { - // should.not.exist(err); - // err.should.be.an.Error; - // err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') - // if (err) readStream.request.destroy(); - }); + readStream.pipe(writeStream); setTimeout(function() { writeStream.destroy(); }, 100); setTimeout(function() { - var activeSockets = Object.keys(httpAgent.sockets).length; - activeSockets.should.eql(0); + // done(); // endFired.should.eql(true); // closeFired.should.eql(true); - done() - }, 1000) + setTimeout(function() { + console.log(getActiveSockets()) + getActiveSockets().should.eql(0); + done(); + }, 300); + }, 200) }) - it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using .pipe)', function(done) { + // it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using stream.pipeline)', function(done) { - readStream.on('end', function(e) { - endFired = true; - }); + // readStream.on('end', function(e) { + // endFired = true; + // }); - readStream.on('close', function(e) { - closeFired = true; - }); + // readStream.on('close', function(e) { + // closeFired = true; + // }); - readStream.pipe(writeStream); + // stream.pipeline(readStream, writeStream, function(err) { + // // err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') + // // if (err) readStream.request.destroy(); + // }); - setTimeout(function() { - writeStream.destroy(); - }, 100); + // setTimeout(function() { + // getActiveSockets().should.eql(0); + // writeStream.destroy(); + // }, 3000); - setTimeout(function() { - var activeSockets = Object.keys(httpAgent.sockets).length; - activeSockets.should.eql(0); - // endFired.should.eql(true); - // closeFired.should.eql(true); - done() - }, 1000) + // setTimeout(function() { + // getActiveSockets().should.eql(0); + // // endFired.should.eql(true); + // // closeFired.should.eql(true); + // done(); + // }, 4000) + + // }) - }) }) \ No newline at end of file From adf15a0fcab8537910d95bdd94e38f918d38e5df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Wed, 25 Aug 2021 21:24:01 -0400 Subject: [PATCH 3/8] Remove pump as dependency --- lib/needle.js | 48 ++++++++++++++++++++++++++++++++--------------- package-lock.json | 21 +++------------------ package.json | 1 - 3 files changed, 36 insertions(+), 34 deletions(-) diff --git a/lib/needle.js b/lib/needle.js index c4f413f67..bb6ef07ea 100644 --- a/lib/needle.js +++ b/lib/needle.js @@ -11,7 +11,6 @@ var fs = require('fs'), url = require('url'), stream = require('stream'), debug = require('debug')('needle'), - pump = require('pump'), // stream.pipeline for older versions of node stringify = require('./querystring').build, multipart = require('./multipart'), auth = require('./auth'), @@ -19,8 +18,6 @@ var fs = require('fs'), parsers = require('./parsers'), decoder = require('./decoder'); -if (!stream.pipeline) stream.pipeline = pump; - ////////////////////////////////////////// // variabilia @@ -163,8 +160,20 @@ function get_stream_length(stream, given_length, cb) { }); } + + function pump_streams(streams, cb) { - return stream.pipeline.apply(null, streams.concat(cb)); + if (stream.pipeline) + return stream.pipeline.apply(null, streams.concat(cb)); + + var tmp = streams.shift(); + while (streams.length) { + tmp = tmp.pipe(streams.shift()); + tmp.once('error', function(e) { + cb && cb(e); + cb = null; + }) + } } ////////////////////////////////////////// @@ -459,7 +468,8 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, uri = modified_uri; } - var timer, + var request, + timer, returned = 0, self = this, request_opts = this.get_request_opts(method, uri, config), @@ -471,6 +481,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, if (timer) clearTimeout(timer); request.removeListener('error', had_error); + out.done = true; if (callback) return callback(err, resp, resp ? resp.body : undefined); @@ -509,7 +520,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, } debug('Making request #' + count, request_opts); - var request = protocol.request(request_opts, function(resp) { + request = protocol.request(request_opts, function(resp) { var headers = resp.headers; debug('Got response', resp.statusCode, headers); @@ -622,7 +633,10 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, pump_streams([resp].concat(pipeline), function(err) { if (err) debug(err) - // if (err && err.code == 'ERR_STREAM_PREMATURE_CLOSE') request.destroy(); + + // on node v8.x, if an error ocurrs on the receiving end, + // then we want to abort the request to avoid having dangling sockets + if (err && err.message == 'write after end') request.destroy(); }); // If the user has requested and output file, pipe the output stream to it. @@ -673,7 +687,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, } }) - pump_streams(resp, clean_pipe, function(err) { + pump_streams([resp, clean_pipe], function(err) { if (err) debug(err); }); @@ -732,13 +746,12 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, }); - out.on('error', function(err) { - // had_error(err); - if (err.code == 'ERR_STREAM_DESTROYED') { - request.abort(); - // out.trigger('close'); - } - }) + // out.on('error', function(err) { + // had_error(err); + // if (err.code == 'ERR_STREAM_DESTROYED' || err.code == 'ERR_STREAM_PREMATURE_CLOSE') { + // request.abort(); + // } + // }) }); // end request call @@ -763,6 +776,10 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, set_timeout('response', config.response_timeout); } + // socket.once('close', function(e) { + // console.log('socket closed!', e); + // }) + if (!socket.on_socket_end) { socket.on_socket_end = on_socket_end; socket.once('end', function() { process.nextTick(on_socket_end.bind(socket)) }); @@ -782,6 +799,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, request.end(); } + out.abort = function() { request.abort() }; // easier access out.request = request; return out; } diff --git a/package-lock.json b/package-lock.json index bb7c33366..8b33e8d5b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -62,14 +62,6 @@ "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", "dev": true }, - "end-of-stream": { - "version": "1.4.4", - "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", - "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", - "requires": { - "once": "^1.4.0" - } - }, "escape-string-regexp": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz", @@ -243,6 +235,7 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", + "dev": true, "requires": { "wrappy": "1" } @@ -262,15 +255,6 @@ "isarray": "0.0.1" } }, - "pump": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", - "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", - "requires": { - "end-of-stream": "^1.1.0", - "once": "^1.3.1" - } - }, "q": { "version": "1.5.1", "resolved": "https://registry.npmjs.org/q/-/q-1.5.1.tgz", @@ -388,7 +372,8 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", + "dev": true }, "xml2js": { "version": "0.4.19", diff --git a/package.json b/package.json index 35f9e1d19..10af1456f 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,6 @@ "dependencies": { "debug": "^3.2.6", "iconv-lite": "^0.4.4", - "pump": "^3.0.0", "sax": "^1.2.4" }, "devDependencies": { From bcaac9b8d052b5db004f032b3b592e815277ba91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Wed, 25 Aug 2021 21:24:45 -0400 Subject: [PATCH 4/8] Readd pump as devDependency --- package-lock.json | 19 +++++++++++++++++++ package.json | 1 + 2 files changed, 20 insertions(+) diff --git a/package-lock.json b/package-lock.json index 8b33e8d5b..4dc4c1f87 100644 --- a/package-lock.json +++ b/package-lock.json @@ -62,6 +62,15 @@ "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", "dev": true }, + "end-of-stream": { + "version": "1.4.4", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", + "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", + "dev": true, + "requires": { + "once": "^1.4.0" + } + }, "escape-string-regexp": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz", @@ -255,6 +264,16 @@ "isarray": "0.0.1" } }, + "pump": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", + "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", + "dev": true, + "requires": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + } + }, "q": { "version": "1.5.1", "resolved": "https://registry.npmjs.org/q/-/q-1.5.1.tgz", diff --git a/package.json b/package.json index 10af1456f..775c56d29 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "JSONStream": "^1.3.5", "jschardet": "^1.6.0", "mocha": "^5.2.0", + "pump": "^3.0.0", "q": "^1.5.1", "should": "^13.2.3", "sinon": "^2.3.0", From b6f01cbb63d8453c64d1d94458338f18b6356188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Wed, 25 Aug 2021 21:26:38 -0400 Subject: [PATCH 5/8] Readd the most important line in the repository --- lib/needle.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/needle.js b/lib/needle.js index bb6ef07ea..805af5c64 100644 --- a/lib/needle.js +++ b/lib/needle.js @@ -631,6 +631,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, // And `out` is the stream we finally push the decoded/parsed output to. pipeline.push(out); + // Now, release the kraken! pump_streams([resp].concat(pipeline), function(err) { if (err) debug(err) From 4e850dee5355e9834c7e4136f7ae73a59375dfc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Thu, 26 Aug 2021 17:39:27 -0400 Subject: [PATCH 6/8] Spec --- test/socket_cleanup_spec.js | 90 +++++++++++++++---------------------- 1 file changed, 36 insertions(+), 54 deletions(-) diff --git a/test/socket_cleanup_spec.js b/test/socket_cleanup_spec.js index df5dd0b25..8515ca626 100644 --- a/test/socket_cleanup_spec.js +++ b/test/socket_cleanup_spec.js @@ -6,10 +6,11 @@ var should = require('should'), describe('socket cleanup', function(){ - this.timeout(5000); - var outFile = 'test/tmp'; - var httpAgent, endFired, closeFired, file, url, readStream, writeStream + var httpAgent, readStream, writeStream + + var file = 'ubuntu-21.04-desktop-amd64.iso', + url = 'https://releases.ubuntu.com/21.04/' + file; function getActiveSockets() { return Object.keys(httpAgent.sockets).length @@ -20,15 +21,6 @@ describe('socket cleanup', function(){ keepAlive : true, maxSockets : 1 }); - - getActiveSockets().should.eql(0); - - endFired = false; - closeFired = false; - file = 'ubuntu-21.04-desktop-amd64.iso'; - url = 'https://releases.ubuntu.com/21.04/' + file; - readStream = needle.get(url, { agent: httpAgent }); - writeStream = fs.createWriteStream(outFile); }) after(function() { @@ -37,61 +29,51 @@ describe('socket cleanup', function(){ }) it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using .pipe)', function(done) { - readStream.on('end', function(e) { - endFired = true; - }); + getActiveSockets().should.eql(0); - readStream.on('close', function(e) { - closeFired = true; - }); + var resp = needle.get(url, { agent: httpAgent }); + var writable = fs.createWriteStream(outFile); + resp.pipe(writable); - readStream.pipe(writeStream); + writable.on('close', function(e) { + if (!resp.done) resp.abort(); + }) setTimeout(function() { - writeStream.destroy(); - }, 100); + getActiveSockets().should.eql(1); + writable.destroy(); + }, 50); setTimeout(function() { - // done(); - // endFired.should.eql(true); - // closeFired.should.eql(true); - setTimeout(function() { - console.log(getActiveSockets()) - getActiveSockets().should.eql(0); - done(); - - }, 300); - }, 200) + getActiveSockets().should.eql(0); + done(); + }, 500); // takes a bit }) - // it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using stream.pipeline)', function(done) { - - // readStream.on('end', function(e) { - // endFired = true; - // }); + it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using stream.pipeline)', function(done) { + if (!stream.pipeline) + return done() - // readStream.on('close', function(e) { - // closeFired = true; - // }); + getActiveSockets().should.eql(0); - // stream.pipeline(readStream, writeStream, function(err) { - // // err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') - // // if (err) readStream.request.destroy(); - // }); + var resp = needle.get(url, { agent: httpAgent }); + var writable = fs.createWriteStream(outFile); - // setTimeout(function() { - // getActiveSockets().should.eql(0); - // writeStream.destroy(); - // }, 3000); + stream.pipeline(resp, writable, function(err) { + // err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') + // if (err) resp.request.destroy(); + }); - // setTimeout(function() { - // getActiveSockets().should.eql(0); - // // endFired.should.eql(true); - // // closeFired.should.eql(true); - // done(); - // }, 4000) + setTimeout(function() { + getActiveSockets().should.eql(1); + writable.destroy(); + }, 50); - // }) + setTimeout(function() { + getActiveSockets().should.eql(0); + done(); + }, 1000); // takes a bit + }) }) \ No newline at end of file From af57a281b4285044b1660fe578bfc7f92ac0068e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Thu, 26 Aug 2021 17:49:00 -0400 Subject: [PATCH 7/8] Whitespace --- lib/needle.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/needle.js b/lib/needle.js index 805af5c64..01fa2e64c 100644 --- a/lib/needle.js +++ b/lib/needle.js @@ -160,8 +160,6 @@ function get_stream_length(stream, given_length, cb) { }); } - - function pump_streams(streams, cb) { if (stream.pipeline) return stream.pipeline.apply(null, streams.concat(cb)); From b58b031365ceb5555b7450f531a4f9dd0b5fb34c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Pollak?= Date: Thu, 26 Aug 2021 17:55:06 -0400 Subject: [PATCH 8/8] Update test/socket_cleanup_spec.js --- test/socket_cleanup_spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/socket_cleanup_spec.js b/test/socket_cleanup_spec.js index 8515ca626..483b4620e 100644 --- a/test/socket_cleanup_spec.js +++ b/test/socket_cleanup_spec.js @@ -60,7 +60,7 @@ describe('socket cleanup', function(){ var writable = fs.createWriteStream(outFile); stream.pipeline(resp, writable, function(err) { - // err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') + err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') // if (err) resp.request.destroy(); });