diff --git a/index.js b/index.js index 77329e4..6eadcaa 100644 --- a/index.js +++ b/index.js @@ -1,105 +1,105 @@ -var Transform = require('readable-stream').Transform; -var inherits = require('inherits'); -var cyclist = require('cyclist'); -var util = require('util'); - -var ParallelTransform = function(maxParallel, opts, ontransform) { - if (!(this instanceof ParallelTransform)) return new ParallelTransform(maxParallel, opts, ontransform); +const {Transform} = require('stream'); +const cyclist = require('cyclist'); + +class ParallelTransform extends Transform { + constructor(maxParallel, opts, ontransform) { + if (typeof maxParallel === 'function') { + ontransform = maxParallel; + opts = null; + maxParallel = 1; + } + if (typeof opts === 'function') { + ontransform = opts; + opts = null; + } - if (typeof maxParallel === 'function') { - ontransform = maxParallel; - opts = null; - maxParallel = 1; + if (!opts) opts = {}; + if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16); + if (opts.objectMode !== false) opts.objectMode = true; + + super(opts); + + this._maxParallel = maxParallel; + this._ontransform = ontransform; + this._destroyed = false; + this._flushed = false; + this._ordered = opts.ordered !== false; + this._buffer = this._ordered ? cyclist(maxParallel) : []; + this._top = 0; + this._bottom = 0; + this._ondrain = null; } - if (typeof opts === 'function') { - ontransform = opts; - opts = null; - } - - if (!opts) opts = {}; - if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16); - if (opts.objectMode !== false) opts.objectMode = true; - - Transform.call(this, opts); - this._maxParallel = maxParallel; - this._ontransform = ontransform; - this._destroyed = false; - this._flushed = false; - this._ordered = opts.ordered !== false; - this._buffer = this._ordered ? cyclist(maxParallel) : []; - this._top = 0; - this._bottom = 0; - this._ondrain = null; -}; - -inherits(ParallelTransform, Transform); + destroy() { + if (this._destroyed) return; + this._destroyed = true; + this.emit('close'); + } -ParallelTransform.prototype.destroy = function() { - if (this._destroyed) return; - this._destroyed = true; - this.emit('close'); -}; + _transform (chunk, enc, callback) { + const self = this; + const pos = this._top++; + + this._ontransform(chunk, function(err, data) { + if (self._destroyed) return; + if (err) { + self.emit('error', err); + self.push(null); + self.destroy(); + return; + } + if (self._ordered) { + self._buffer.put(pos, (data === undefined || data === null) ? null : data); + } + else { + self._buffer.push(data); + } + self._drain(); + }); + + if (this._top - this._bottom < this._maxParallel) return callback(); + this._ondrain = callback; + } -ParallelTransform.prototype._transform = function(chunk, enc, callback) { - var self = this; - var pos = this._top++; + _flush (callback) { + this._flushed = true; + this._ondrain = callback; + this._drain(); + } - this._ontransform(chunk, function(err, data) { - if (self._destroyed) return; - if (err) { - self.emit('error', err); - self.push(null); - self.destroy(); - return; - } - if (self._ordered) { - self._buffer.put(pos, (data === undefined || data === null) ? null : data); + _drain () { + let data; + if (this._ordered) { + while (this._buffer.get(this._bottom) !== undefined) { + data = this._buffer.del(this._bottom++); + if (data === null) continue; + this.push(data); + } } else { - self._buffer.push(data); + while (this._buffer.length > 0) { + data = this._buffer.pop(); + this._bottom++; + if (data === null) continue; + this.push(data); + } } - self._drain(); - }); - - if (this._top - this._bottom < this._maxParallel) return callback(); - this._ondrain = callback; -}; -ParallelTransform.prototype._flush = function(callback) { - this._flushed = true; - this._ondrain = callback; - this._drain(); -}; + if (!this._drained() || !this._ondrain) return; -ParallelTransform.prototype._drain = function() { - if (this._ordered) { - while (this._buffer.get(this._bottom) !== undefined) { - var data = this._buffer.del(this._bottom++); - if (data === null) continue; - this.push(data); - } + const ondrain = this._ondrain; + this._ondrain = null; + ondrain(); } - else { - while (this._buffer.length > 0) { - var data = this._buffer.pop(); - this._bottom++; - if (data === null) continue; - this.push(data); - } - } - - if (!this._drained() || !this._ondrain) return; - - var ondrain = this._ondrain; - this._ondrain = null; - ondrain(); -}; + _drained () { + let diff = this._top - this._bottom; + return this._flushed ? !diff : diff < this._maxParallel; + }; +} -ParallelTransform.prototype._drained = function() { - var diff = this._top - this._bottom; - return this._flushed ? !diff : diff < this._maxParallel; +module.exports = function parallelTransform(maxParallel, opts, ontransform) { + return new ParallelTransform(maxParallel, opts, ontransform); }; -module.exports = ParallelTransform; +module.exports.ParallelTransform = ParallelTransform; diff --git a/package.json b/package.json index 5c3c570..605ad44 100644 --- a/package.json +++ b/package.json @@ -13,8 +13,6 @@ ], "author": "Mathias Buus Madsen ", "dependencies": { - "cyclist": "~0.2.2", - "inherits": "^2.0.3", - "readable-stream": "^2.1.5" + "cyclist": "~0.2.2" } }