Skip to content

Commit 3ab4e8a

Browse files
authored
perf(cluster): improve the performance of calculating slots (#323)
1 parent 1caeb51 commit 3ab4e8a

File tree

8 files changed

+58
-153
lines changed

8 files changed

+58
-153
lines changed

lib/command.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ var Promise = require('bluebird');
55
var fbuffer = require('flexbuffer');
66
var utils = require('./utils');
77
var commands = require('redis-commands');
8+
var calculateSlot = require('cluster-key-slot');
89

910
/**
1011
* Command instance
@@ -84,7 +85,7 @@ Command.prototype.getSlot = function () {
8485
if (typeof this._slot === 'undefined') {
8586
var key = this.getKeys()[0];
8687
if (key) {
87-
this.slot = utils.calcSlot(key);
88+
this.slot = calculateSlot(key);
8889
} else {
8990
this.slot = null;
9091
}

lib/pipeline.js

+27-21
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ var Commander = require('./commander');
44
var Command = require('./command');
55
var fbuffer = require('flexbuffer');
66
var Promise = require('bluebird');
7-
var utils = require('./utils');
87
var util = require('util');
98
var commands = require('redis-commands');
9+
var calculateSlot = require('cluster-key-slot');
1010

1111
function Pipeline(redis) {
1212
Commander.call(this);
@@ -210,25 +210,33 @@ Pipeline.prototype.exec = function (callback) {
210210
if (_.isEmpty(this._queue)) {
211211
this.resolve([]);
212212
}
213-
var pipelineSlot;
214-
// Check whether scripts exists and get a sampleKey.
215-
var scripts = [];
216-
for (var i = 0; i < this._queue.length; ++i) {
217-
var item = this._queue[i];
218-
if (this.isCluster) {
219-
var keys = item.getKeys();
220-
for (var j = 0; j < keys.length; ++j) {
221-
var slot = utils.calcSlot(keys[j]);
222-
if (typeof pipelineSlot === 'undefined') {
223-
pipelineSlot = slot;
224-
}
225-
if (pipelineSlot !== slot) {
226-
this.reject(new Error('All keys in the pipeline should belong to the same slot(expect "' +
227-
keys[j] + '" belongs to slot ' + pipelineSlot + ').'));
228-
return this.promise;
229-
}
213+
var pipelineSlot, i;
214+
if (this.isCluster) {
215+
// List of the first key for each command
216+
var sampleKeys = [];
217+
for (i = 0; i < this._queue.length; i++) {
218+
var keys = this._queue[i].getKeys();
219+
if (keys.length) {
220+
sampleKeys.push(keys[0]);
221+
}
222+
}
223+
224+
if (sampleKeys.length) {
225+
pipelineSlot = calculateSlot.generateMulti(sampleKeys);
226+
if (pipelineSlot < 0) {
227+
this.reject(new Error('All keys in the pipeline should belong to the same slot'));
228+
return this.promise;
230229
}
230+
} else {
231+
// Send the pipeline to a random node
232+
pipelineSlot = Math.random() * 16384 | 0;
231233
}
234+
}
235+
236+
// Check whether scripts exists
237+
var scripts = [];
238+
for (i = 0; i < this._queue.length; ++i) {
239+
var item = this._queue[i];
232240
if (this.isCluster && item.isCustomCommand) {
233241
this.reject(new Error('Sending custom commands in pipeline is not supported in Cluster mode.'));
234242
return this.promise;
@@ -242,14 +250,12 @@ Pipeline.prototype.exec = function (callback) {
242250
}
243251
scripts.push(script);
244252
}
245-
if (this.isCluster && typeof pipelineSlot === 'undefined') {
246-
pipelineSlot = Math.random() * 16384 | 0;
247-
}
248253

249254
var _this = this;
250255
if (!scripts.length) {
251256
return execPipeline();
252257
}
258+
253259
return this.redis.script('exists', scripts.map(function (item) {
254260
return item.sha;
255261
})).then(function (results) {

lib/utils/crc.js

-91
This file was deleted.

lib/utils/index.js

-18
Original file line numberDiff line numberDiff line change
@@ -213,24 +213,6 @@ exports.toArg = function (arg) {
213213
return String(arg);
214214
};
215215

216-
var crc16 = require('./crc');
217-
/**
218-
* Calculate slot by key
219-
*
220-
* @param {string} key
221-
* @return {number}
222-
*/
223-
exports.calcSlot = function (key) {
224-
var s = key.indexOf('{');
225-
if (s !== -1) {
226-
var e = key.indexOf('}', s + 2);
227-
if (e !== -1) {
228-
key = key.slice(s + 1, e);
229-
}
230-
}
231-
return crc16(key) & 16383;
232-
};
233-
234216
/**
235217
* Optimize error stack
236218
*

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"license": "MIT",
2424
"dependencies": {
2525
"bluebird": "^3.3.4",
26+
"cluster-key-slot": "^1.0.5",
2627
"debug": "^2.2.0",
2728
"double-ended-queue": "^2.1.0-0",
2829
"flexbuffer": "0.0.6",

test/functional/cluster.js

+11-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
var utils = require('../../lib/utils');
4+
var calculateSlot = require('cluster-key-slot');
45
var Promise = require('bluebird');
56

67
describe('cluster', function () {
@@ -325,7 +326,7 @@ describe('cluster', function () {
325326
if (argv[0] === 'get' && argv[1] === 'foo') {
326327
expect(moved).to.eql(false);
327328
moved = true;
328-
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
329+
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
329330
}
330331
});
331332

@@ -345,7 +346,7 @@ describe('cluster', function () {
345346
];
346347
}
347348
if (argv[0] === 'get' && argv[1] === 'foo') {
348-
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30002');
349+
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30002');
349350
}
350351
});
351352
var node2 = new MockServer(30002, function (argv) {
@@ -397,7 +398,7 @@ describe('cluster', function () {
397398
if (argv[0] === 'get' && argv[1] === 'foo') {
398399
expect(moved).to.eql(false);
399400
moved = true;
400-
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
401+
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
401402
}
402403
});
403404

@@ -439,7 +440,7 @@ describe('cluster', function () {
439440
disconnect([node1, node2], done);
440441
});
441442
} else {
442-
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
443+
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
443444
}
444445
}
445446
});
@@ -530,7 +531,7 @@ describe('cluster', function () {
530531
];
531532
} else if (argv[0] === 'get' && argv[1] === 'foo') {
532533
redirectTimes += 1;
533-
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
534+
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
534535
}
535536
};
536537
var node1 = new MockServer(30001, argvHandler);
@@ -650,7 +651,7 @@ describe('cluster', function () {
650651
expect(moved).to.eql(false);
651652
moved = true;
652653
}
653-
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
654+
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
654655
}
655656
});
656657

@@ -693,7 +694,7 @@ describe('cluster', function () {
693694
return slotTable;
694695
}
695696
if (argv[1] === 'foo') {
696-
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
697+
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
697698
}
698699
});
699700

@@ -755,7 +756,7 @@ describe('cluster', function () {
755756
return slotTable;
756757
}
757758
if (argv[0] === 'get' && argv[1] === 'foo') {
758-
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
759+
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
759760
}
760761
});
761762

@@ -836,7 +837,7 @@ describe('cluster', function () {
836837
}
837838
if (argv[0] === 'get' && argv[1] === 'foo') {
838839
moved = true;
839-
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
840+
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
840841
}
841842
if (argv[0] === 'exec') {
842843
return new Error('EXECABORT Transaction discarded because of previous errors.');
@@ -889,7 +890,7 @@ describe('cluster', function () {
889890
return slotTable;
890891
}
891892
if (argv[0] === 'get' && argv[1] === 'foo') {
892-
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
893+
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
893894
}
894895
if (argv[0] === 'exec') {
895896
return new Error('EXECABORT Transaction discarded because of previous errors.');

test/unit/command.js

+16
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ describe('Command', function () {
7474
});
7575
});
7676

77+
describe('#getSlot()', function () {
78+
it('should return correctly', function () {
79+
expectSlot('123', 5970);
80+
expectSlot('ab{c', 4619);
81+
expectSlot('ab{c}2', 7365);
82+
expectSlot('ab{{c}2', 2150);
83+
expectSlot('ab{qq}{c}2', 5598);
84+
expectSlot('ab}', 11817);
85+
expectSlot('encoding', 3060);
86+
87+
function expectSlot(key, slot) {
88+
expect(new Command('get', [key]).getSlot()).to.eql(slot);
89+
}
90+
});
91+
});
92+
7793
describe('.checkFlag()', function () {
7894
it('should return correct result', function () {
7995
expect(Command.checkFlag('VALID_IN_SUBSCRIBER_MODE', 'ping')).to.eql(true);

test/unit/utils.js

+1-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
var utils = require('../../lib/utils');
4+
45
describe('utils', function () {
56
describe('.bufferEqual', function () {
67
it('should return correctly', function () {
@@ -97,18 +98,6 @@ describe('utils', function () {
9798
});
9899
});
99100

100-
describe('.calcSlot', function () {
101-
it('should return correctly', function () {
102-
expect(utils.calcSlot('123')).to.eql(5970);
103-
expect(utils.calcSlot('ab{c')).to.eql(4619);
104-
expect(utils.calcSlot('ab{c}2')).to.eql(7365);
105-
expect(utils.calcSlot('ab{{c}2')).to.eql(2150);
106-
expect(utils.calcSlot('ab{qq}{c}2')).to.eql(5598);
107-
expect(utils.calcSlot('ab}')).to.eql(11817);
108-
expect(utils.calcSlot('encoding')).to.eql(3060);
109-
});
110-
});
111-
112101
describe('.toArg', function () {
113102
it('should return correctly', function () {
114103
expect(utils.toArg(null)).to.eql('');

0 commit comments

Comments
 (0)