|
| 1 | +'use strict'; |
| 2 | + |
| 3 | +var EventEmitter = require('events').EventEmitter |
| 4 | + , retry = require('retry'); |
| 5 | + |
| 6 | +/** |
| 7 | + * A net.Stream connection pool. |
| 8 | + * |
| 9 | + * @constructor |
| 10 | + * @param {Number} limit size of the connection pool |
| 11 | + * @param {Function} builder stream factory |
| 12 | + * @api public |
| 13 | + */ |
| 14 | + |
| 15 | +function Manager(limit, builder) { |
| 16 | + this.limit = +limit || 20; // defaults to 20 connections max |
| 17 | + this.pool = []; |
| 18 | + this.pending = 0; |
| 19 | + this.generator = null; |
| 20 | + this.retries = 5; |
| 21 | + |
| 22 | + // some stats that can be used for metrics |
| 23 | + this.metrics = { |
| 24 | + allocations: 0 |
| 25 | + , releases: 0 |
| 26 | + }; |
| 27 | + |
| 28 | + if (builder) this.factory(builder); |
| 29 | + EventEmitter.call(this); |
| 30 | +} |
| 31 | + |
| 32 | +Manager.prototype = new EventEmitter(); |
| 33 | +Manager.prototype.constructor = Manager; |
| 34 | + |
| 35 | +/** |
| 36 | + * Add a stream generator so we can generate streams for the pool. |
| 37 | + * |
| 38 | + * @param {Function} builder |
| 39 | + * @api public |
| 40 | + */ |
| 41 | + |
| 42 | +Manager.prototype.factory = function factory(builder) { |
| 43 | + if (typeof builder !== 'function') { |
| 44 | + throw new Error('The #factory requires a function'); |
| 45 | + } |
| 46 | + |
| 47 | + this.generator = builder; |
| 48 | +}; |
| 49 | + |
| 50 | +/** |
| 51 | + * Start listening to events that could influence the state of the connection. |
| 52 | + * |
| 53 | + * @param {net.Connection} net |
| 54 | + * @api private |
| 55 | + */ |
| 56 | + |
| 57 | +Manager.prototype.listen = function listen(net) { |
| 58 | + if (!net) return; |
| 59 | + |
| 60 | + var self = this; |
| 61 | + |
| 62 | + /** |
| 63 | + * Simple helper function that allows us to automatically remove the |
| 64 | + * connection from the pool when we are unable to connect using it. |
| 65 | + * |
| 66 | + * @param {Error} err optional error |
| 67 | + * @api private |
| 68 | + */ |
| 69 | + |
| 70 | + function regenerate(err) { |
| 71 | + net.destroySoon(); |
| 72 | + |
| 73 | + self.remove(net); |
| 74 | + net.removeListener('error', regenerate); |
| 75 | + net.removeListener('end', regenerate); |
| 76 | + |
| 77 | + if (err) self.emit('error', err); |
| 78 | + } |
| 79 | + |
| 80 | + // listen for events that would mess up the connection |
| 81 | + net.on('error', regenerate) |
| 82 | + .on('end', regenerate); |
| 83 | +}; |
| 84 | + |
| 85 | +/** |
| 86 | + * A fault tolerant connection allocation wrapper. |
| 87 | + * |
| 88 | + * @param {Function} fn |
| 89 | + * @api private |
| 90 | + */ |
| 91 | + |
| 92 | +Manager.prototype.pull = function pull(fn) { |
| 93 | + var operation = retry.operation({ |
| 94 | + retries: this.retries |
| 95 | + , factor: 3 |
| 96 | + , minTimeout: 1 * 1000 |
| 97 | + , maxTimeout: 60 * 1000 |
| 98 | + , randomize: true |
| 99 | + }) |
| 100 | + , self = this; |
| 101 | + |
| 102 | + /** |
| 103 | + * Small wrapper around pulling a connection |
| 104 | + * |
| 105 | + * @param {Error} err |
| 106 | + * @api private |
| 107 | + */ |
| 108 | + |
| 109 | + function allocate(err) { |
| 110 | + if (operation.retry(err)) return; |
| 111 | + |
| 112 | + fn.apply(fn, arguments); |
| 113 | + } |
| 114 | + |
| 115 | + operation.attempt(function attempt() { |
| 116 | + self.allocate(allocate); |
| 117 | + }); |
| 118 | +}; |
| 119 | + |
| 120 | +/** |
| 121 | + * Allocate a new connection from the connection pool, this can be done async |
| 122 | + * that's why we use a error first callback pattern. |
| 123 | + * |
| 124 | + * @param {Function} fn |
| 125 | + * @api public |
| 126 | + */ |
| 127 | + |
| 128 | +Manager.prototype.allocate = function allocate(fn) { |
| 129 | + if (!this.generator) return fn(new Error('Specify a stream #factory')); |
| 130 | + |
| 131 | + /** |
| 132 | + * Small helper function that allows us to correctly call the callback with |
| 133 | + * the correct arguments when we generate a new connection as the connection |
| 134 | + * should be emitting 'connect' befor we can use it. But it can also emit |
| 135 | + * error if it fails to connect. |
| 136 | + * |
| 137 | + * @param {Error} err |
| 138 | + * @api private |
| 139 | + */ |
| 140 | + |
| 141 | + function either(err) { |
| 142 | + this.removeListener('error', either); |
| 143 | + this.removeListener('connect', either); |
| 144 | + |
| 145 | + // add to the pool |
| 146 | + self.pool.push(this); |
| 147 | + self.pending--; |
| 148 | + |
| 149 | + fn(err, this); |
| 150 | + } |
| 151 | + |
| 152 | + var probabilities = [] |
| 153 | + , self = this |
| 154 | + , total, i, probability, connection; |
| 155 | + |
| 156 | + i = total = this.pool.length; |
| 157 | + |
| 158 | + // increase the allocation metric |
| 159 | + this.metrics.allocations++; |
| 160 | + |
| 161 | + // check the current pool if we already have a few connections available, so |
| 162 | + // we don't have to generate a new connection |
| 163 | + while (i--) { |
| 164 | + connection = this.pool[i]; |
| 165 | + probability = this.isAvailable(connection); |
| 166 | + |
| 167 | + // we are sure this connection works |
| 168 | + if (probability === 100) return fn(undefined, connection); |
| 169 | + |
| 170 | + // no accurate match, add it to the queue as we can get the most likely |
| 171 | + // available connection |
| 172 | + probabilities.push({ |
| 173 | + probability: probability |
| 174 | + , connection: connection |
| 175 | + }); |
| 176 | + } |
| 177 | + |
| 178 | + // we didn't find a confident match, see if we are allowed to generate a fresh |
| 179 | + // connection |
| 180 | + if ((this.pool.length + this.pending) < this.limit) { |
| 181 | + // determin if the function expects a callback or not, this can be done by |
| 182 | + // checking the length of the given function, as the amount of args accepted |
| 183 | + // equals the length.. |
| 184 | + if (this.generator.length === 0) { |
| 185 | + connection = this.generator(); |
| 186 | + |
| 187 | + if (connection) { |
| 188 | + this.pending++; |
| 189 | + this.listen(connection); |
| 190 | + return connection.on('error', either).on('connect', either); |
| 191 | + } |
| 192 | + } else { |
| 193 | + return this.generator(function generate(err, connection) { |
| 194 | + if (err) return fn(err); |
| 195 | + if (!connection) return fn(new Error('The #factory failed to generate a stream')); |
| 196 | + |
| 197 | + self.pending++; |
| 198 | + self.listen(connection); |
| 199 | + return connection.on('error', either).on('connect', either); |
| 200 | + }); |
| 201 | + } |
| 202 | + } |
| 203 | + |
| 204 | + // o, dear, we got issues.. we didn't find a valid connection and we cannot |
| 205 | + // create more.. so we are going to check if we might have semi valid |
| 206 | + // connection by sorting the probabilities array and see if it has |
| 207 | + // a probability above 60 |
| 208 | + probability = probabilities.sort(function sort(a, b) { |
| 209 | + return a.probability - b.probability; |
| 210 | + }).pop(); |
| 211 | + |
| 212 | + if (probability && probability.probability >= 60) { |
| 213 | + return fn(undefined, probability.connection); |
| 214 | + } |
| 215 | + |
| 216 | + // well, that didn't work out, so assume failure |
| 217 | + fn(new Error('The connection pool is full')); |
| 218 | +}; |
| 219 | + |
| 220 | +/** |
| 221 | + * Check if a connection is available for writing. |
| 222 | + * |
| 223 | + * @param {net.Connection} net |
| 224 | + * @param {Boolean} ignore ignore closed or dead connections |
| 225 | + * @returns {Number} probability that his connection is available or will be |
| 226 | + * @api private |
| 227 | + */ |
| 228 | + |
| 229 | +Manager.prototype.isAvailable = function isAvailable(net, ignore) { |
| 230 | + var readyState = net.readyState |
| 231 | + , writable = readyState === 'open' || readyState === 'writeOnly' |
| 232 | + , writePending = net._pendingWriteReqs || 0 |
| 233 | + , writeQueue = net._writeQueue || [] |
| 234 | + , writes = writeQueue.length || writePending; |
| 235 | + |
| 236 | + // if the stream is writable and we don't have anything pending we are 100% |
| 237 | + // sure that this stream is available for writing |
| 238 | + if (writable && writes === 0) return 100; |
| 239 | + |
| 240 | + // the connection is already closed or has been destroyed, why on earth are we |
| 241 | + // getting it then, remove it from the pool and return 0 |
| 242 | + if (readyState === 'closed' || net.destroyed) { |
| 243 | + this.remove(net); |
| 244 | + return 0; |
| 245 | + } |
| 246 | + |
| 247 | + // if the stream isn't writable we aren't that sure.. |
| 248 | + if (!writable) return 0; |
| 249 | + |
| 250 | + // the connection is still opening, so we can write to it in the future |
| 251 | + if (readyState === 'opening') return 70; |
| 252 | + |
| 253 | + // we have some writes, so we are going to substract that amount from our 100 |
| 254 | + if (writes < 100) return 100 - writes; |
| 255 | + |
| 256 | + // we didn't find any relaiable states of the stream, so we are going to |
| 257 | + // assume something random, because we have no clue, so generate a random |
| 258 | + // number between 0 - 70 |
| 259 | + return Math.floor(Math.random() * 50); |
| 260 | +}; |
| 261 | + |
| 262 | +/** |
| 263 | + * Release the connection from the connection pool. |
| 264 | + * |
| 265 | + * @param {Stream} net |
| 266 | + * @param {Boolean} hard destroySoon or destroy |
| 267 | + * @returns {Boolean} was the removal successful |
| 268 | + * @api private |
| 269 | + */ |
| 270 | + |
| 271 | +Manager.prototype.release = function release(net, hard) { |
| 272 | + var index = this.pool.indexOf(net); |
| 273 | + |
| 274 | + // no match |
| 275 | + if (index === -1) return false; |
| 276 | + |
| 277 | + // check if the stream is still open |
| 278 | + if (net) { |
| 279 | + if (!hard) net.destroySoon(); |
| 280 | + else net.destroy(); |
| 281 | + |
| 282 | + // remove it from the pool |
| 283 | + this.pool.splice(net, 1); |
| 284 | + |
| 285 | + // increase the releases metric |
| 286 | + this.metrics.releases++; |
| 287 | + } |
| 288 | + |
| 289 | + return true; |
| 290 | +}; |
| 291 | + |
| 292 | +// alias remove to release |
| 293 | +Manager.prototype.remove = Manager.prototype.release; |
| 294 | + |
| 295 | +/** |
| 296 | + * Free dead connections from the pool. |
| 297 | + * |
| 298 | + * @param {Number} keep the amount of connection to keep open |
| 299 | + * @param {Boolean} hard destroy all connections instead of destroySoon |
| 300 | + * @api public |
| 301 | + */ |
| 302 | + |
| 303 | +Manager.prototype.free = function free(keep, hard) { |
| 304 | + // default to 0 if no arguments are supplied |
| 305 | + keep = +keep || 0; |
| 306 | + |
| 307 | + // create a back-up of the pool as we will be removing items from the array |
| 308 | + // and this could cause memory / socket leaks as we are unable to close some |
| 309 | + // connections in the array as the index has moved. |
| 310 | + var pool = this.pool.slice(0) |
| 311 | + , saved = 0; |
| 312 | + |
| 313 | + for (var i = 0, length = pool.length; i < length; i++) { |
| 314 | + var connection = pool[i] |
| 315 | + , probability = this.isAvailable(connection); |
| 316 | + |
| 317 | + // this is still a healthy connection, so try we probably just want to keep it |
| 318 | + if (keep && saved < keep && probability === 100) { |
| 319 | + saved++; |
| 320 | + continue; |
| 321 | + } |
| 322 | + |
| 323 | + this.release(connection, hard); |
| 324 | + } |
| 325 | + |
| 326 | + // clear the back-up |
| 327 | + pool.length = 0; |
| 328 | + |
| 329 | + // see how much connections are still available |
| 330 | + this.emit('free', saved, this.pool.length); |
| 331 | +}; |
| 332 | + |
| 333 | +/** |
| 334 | + * Close the connection pool. |
| 335 | + * |
| 336 | + * @param {Boolean} hard destroy all connections |
| 337 | + * @api public |
| 338 | + */ |
| 339 | + |
| 340 | +Manager.prototype.end = function end(hard) { |
| 341 | + this.free(0, hard); |
| 342 | + |
| 343 | + return this.emit('end'); |
| 344 | +}; |
| 345 | + |
| 346 | +module.exports = Manager; |
0 commit comments