diff --git a/lib/needle.js b/lib/needle.js index 0d0a469de..01fa2e64c 100644 --- a/lib/needle.js +++ b/lib/needle.js @@ -160,6 +160,20 @@ function get_stream_length(stream, given_length, cb) { }); } +function pump_streams(streams, cb) { + if (stream.pipeline) + return stream.pipeline.apply(null, streams.concat(cb)); + + var tmp = streams.shift(); + while (streams.length) { + tmp = tmp.pipe(streams.shift()); + tmp.once('error', function(e) { + cb && cb(e); + cb = null; + }) + } +} + ////////////////////////////////////////// // the main act @@ -452,7 +466,8 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, uri = modified_uri; } - var timer, + var request, + timer, returned = 0, self = this, request_opts = this.get_request_opts(method, uri, config), @@ -464,6 +479,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, if (timer) clearTimeout(timer); request.removeListener('error', had_error); + out.done = true; if (callback) return callback(err, resp, resp ? resp.body : undefined); @@ -502,7 +518,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, } debug('Making request #' + count, request_opts); - var request = protocol.request(request_opts, function(resp) { + request = protocol.request(request_opts, function(resp) { var headers = resp.headers; debug('Got response', resp.statusCode, headers); @@ -614,8 +630,13 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, pipeline.push(out); // Now, release the kraken! - function pipelineCb(err) { if (err) debug(err) } - stream.pipeline.apply(null, [resp].concat(pipeline).concat(pipelineCb)); + pump_streams([resp].concat(pipeline), function(err) { + if (err) debug(err) + + // on node v8.x, if an error ocurrs on the receiving end, + // then we want to abort the request to avoid having dangling sockets + if (err && err.message == 'write after end') request.destroy(); + }); // If the user has requested and output file, pipe the output stream to it. // In stream mode, we will still get the response stream to play with. @@ -665,7 +686,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, } }) - stream.pipeline(resp, clean_pipe, function(err) { + pump_streams([resp, clean_pipe], function(err) { if (err) debug(err); }); @@ -724,6 +745,13 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, }); + // out.on('error', function(err) { + // had_error(err); + // if (err.code == 'ERR_STREAM_DESTROYED' || err.code == 'ERR_STREAM_PREMATURE_CLOSE') { + // request.abort(); + // } + // }) + }); // end request call // unless open_timeout was disabled, set a timeout to abort the request. @@ -747,7 +775,10 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, set_timeout('response', config.response_timeout); } - // console.log(socket); + // socket.once('close', function(e) { + // console.log('socket closed!', e); + // }) + if (!socket.on_socket_end) { socket.on_socket_end = on_socket_end; socket.once('end', function() { process.nextTick(on_socket_end.bind(socket)) }); @@ -756,7 +787,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, if (post_data) { if (is_stream(post_data)) { - stream.pipeline(post_data, request, function(err) { + pump_streams([post_data, request], function(err) { if (err) debug(err); }); } else { @@ -767,6 +798,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data, request.end(); } + out.abort = function() { request.abort() }; // easier access out.request = request; return out; } diff --git a/package-lock.json b/package-lock.json index 5866e450c..4dc4c1f87 100644 --- a/package-lock.json +++ b/package-lock.json @@ -62,6 +62,15 @@ "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", "dev": true }, + "end-of-stream": { + "version": "1.4.4", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", + "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", + "dev": true, + "requires": { + "once": "^1.4.0" + } + }, "escape-string-regexp": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz", @@ -148,24 +157,12 @@ "integrity": "sha512-xYuhvQ7I9PDJIGBWev9xm0+SMSed3ZDBAmvVjbFR1ZRLAF+vlXcQu6cRI9uAlj81rzikElRVteehwV7DuX2ZmQ==", "dev": true }, - "json-stringify-safe": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz", - "integrity": "sha1-Epai1Y/UXxmg9s4B1lcB4sc1tus=", - "dev": true - }, "jsonparse": { "version": "1.3.1", "resolved": "https://registry.npmjs.org/jsonparse/-/jsonparse-1.3.1.tgz", "integrity": "sha1-P02uSpH6wxX3EGL4UhzCOfE2YoA=", "dev": true }, - "lodash": { - "version": "4.17.15", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.15.tgz", - "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==", - "dev": true - }, "lolex": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/lolex/-/lolex-1.6.0.tgz", @@ -243,29 +240,6 @@ "integrity": "sha1-IKMYwwy0X3H+et+/eyHJnBRy7xE=", "dev": true }, - "nock": { - "version": "12.0.3", - "resolved": "https://registry.npmjs.org/nock/-/nock-12.0.3.tgz", - "integrity": "sha512-QNb/j8kbFnKCiyqi9C5DD0jH/FubFGj5rt9NQFONXwQm3IPB0CULECg/eS3AU1KgZb/6SwUa4/DTRKhVxkGABw==", - "dev": true, - "requires": { - "debug": "^4.1.0", - "json-stringify-safe": "^5.0.1", - "lodash": "^4.17.13", - "propagate": "^2.0.0" - }, - "dependencies": { - "debug": { - "version": "4.1.1", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", - "integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==", - "dev": true, - "requires": { - "ms": "^2.1.1" - } - } - } - }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -290,11 +264,15 @@ "isarray": "0.0.1" } }, - "propagate": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/propagate/-/propagate-2.0.1.tgz", - "integrity": "sha512-vGrhOavPSTz4QVNuBNdcNXePNdNMaO1xj9yBeH1ScQPjk/rhg9sSlCXPhMkFuaNNW/syTvYqsnbIJxMBfRbbag==", - "dev": true + "pump": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", + "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", + "dev": true, + "requires": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + } }, "q": { "version": "1.5.1", diff --git a/package.json b/package.json index 10af1456f..775c56d29 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "JSONStream": "^1.3.5", "jschardet": "^1.6.0", "mocha": "^5.2.0", + "pump": "^3.0.0", "q": "^1.5.1", "should": "^13.2.3", "sinon": "^2.3.0", diff --git a/test/socket_cleanup_spec.js b/test/socket_cleanup_spec.js new file mode 100644 index 000000000..483b4620e --- /dev/null +++ b/test/socket_cleanup_spec.js @@ -0,0 +1,79 @@ +var should = require('should'), + needle = require('./../'), + fs = require('fs'), + https = require('https'), + stream = require('stream'); + +describe('socket cleanup', function(){ + + var outFile = 'test/tmp'; + var httpAgent, readStream, writeStream + + var file = 'ubuntu-21.04-desktop-amd64.iso', + url = 'https://releases.ubuntu.com/21.04/' + file; + + function getActiveSockets() { + return Object.keys(httpAgent.sockets).length + } + + before(function() { + httpAgent = new https.Agent({ + keepAlive : true, + maxSockets : 1 + }); + }) + + after(function() { + httpAgent.destroy() + fs.unlinkSync(outFile); + }) + + it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using .pipe)', function(done) { + getActiveSockets().should.eql(0); + + var resp = needle.get(url, { agent: httpAgent }); + var writable = fs.createWriteStream(outFile); + resp.pipe(writable); + + writable.on('close', function(e) { + if (!resp.done) resp.abort(); + }) + + setTimeout(function() { + getActiveSockets().should.eql(1); + writable.destroy(); + }, 50); + + setTimeout(function() { + getActiveSockets().should.eql(0); + done(); + }, 500); // takes a bit + }) + + it('should cleanup sockets on ERR_STREAM_PREMATURE_CLOSE (using stream.pipeline)', function(done) { + if (!stream.pipeline) + return done() + + getActiveSockets().should.eql(0); + + var resp = needle.get(url, { agent: httpAgent }); + var writable = fs.createWriteStream(outFile); + + stream.pipeline(resp, writable, function(err) { + err.code.should.eql('ERR_STREAM_PREMATURE_CLOSE') + // if (err) resp.request.destroy(); + }); + + setTimeout(function() { + getActiveSockets().should.eql(1); + writable.destroy(); + }, 50); + + setTimeout(function() { + getActiveSockets().should.eql(0); + done(); + }, 1000); // takes a bit + + }) + +}) \ No newline at end of file