Skip to content

Commit 6795b1e

Browse files
author
Ramon Snir
committed
feat(cluster): add the option for a custom node selector in scaleReads
#250
1 parent 95c6857 commit 6795b1e

File tree

3 files changed

+80
-14
lines changed

3 files changed

+80
-14
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -671,10 +671,11 @@ but a few so that if one is unreachable the client will try the next one, and th
671671

672672
A typical redis cluster contains three or more masters and several slaves for each master. It's possible to scale out redis cluster by sending read queries to slaves and write queries to masters by setting the `scaleReads` option.
673673

674-
`scaleReads` is "master" by default, which means ioredis will never send any queries to slaves. There are other two available options:
674+
`scaleReads` is "master" by default, which means ioredis will never send any queries to slaves. There are other three available options:
675675

676676
1. "all": Send write queries to masters and read queries to masters or slaves randomly.
677677
2. "slave": Send write queries to masters and read queries to slaves.
678+
3. a custom `function(nodes, command): node`: Will choose the custom function to select to which node to send read queries (write queries keep being sent to master). The first node in `nodes` is always the master serving the relevant slots. If the function returns an array of nodes, a random node of that list will be selected.
678679

679680
For example:
680681

lib/cluster/index.js

+25-9
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ function Cluster(startupNodes, options) {
4040
this.options = _.defaults(this.options, options, Cluster.defaultOptions);
4141

4242
// validate options
43-
if (['all', 'master', 'slave'].indexOf(this.options.scaleReads) === -1) {
43+
if (typeof this.options.scaleReads !== 'function' &&
44+
['all', 'master', 'slave'].indexOf(this.options.scaleReads) === -1) {
4445
throw new Error('Invalid option scaleReads "' + this.options.scaleReads +
45-
'". Expected "all", "master" or "slave"');
46+
'". Expected "all", "master", "slave" or a custom function');
4647
}
4748

4849
if (!Array.isArray(startupNodes) || startupNodes.length === 0) {
@@ -453,15 +454,30 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
453454
if (!random) {
454455
if (typeof targetSlot === 'number' && _this.slots[targetSlot]) {
455456
var nodeKeys = _this.slots[targetSlot];
456-
var key;
457-
if (to === 'all') {
458-
key = utils.sample(nodeKeys);
459-
} else if (to === 'slave' && nodeKeys.length > 1) {
460-
key = utils.sample(nodeKeys, 1);
457+
if (typeof to === 'function') {
458+
var nodes =
459+
nodeKeys
460+
.map(function(key) {
461+
return _this.connectionPool.nodes.all[key];
462+
});
463+
redis = to(nodes, command);
464+
if (Array.isArray(redis)) {
465+
redis = utils.sample(redis);
466+
}
467+
if (!redis) {
468+
redis = nodes[0];
469+
}
461470
} else {
462-
key = nodeKeys[0];
471+
var key;
472+
if (to === 'all') {
473+
key = utils.sample(nodeKeys);
474+
} else if (to === 'slave' && nodeKeys.length > 1) {
475+
key = utils.sample(nodeKeys, 1);
476+
} else {
477+
key = nodeKeys[0];
478+
}
479+
redis = _this.connectionPool.nodes.all[key];
463480
}
464-
redis = _this.connectionPool.nodes.all[key];
465481
}
466482
if (asking) {
467483
redis = _this.connectionPool.nodes.all[asking];

test/functional/cluster.js

+53-4
Original file line numberDiff line numberDiff line change
@@ -940,7 +940,7 @@ describe('cluster', function () {
940940
function handler(port, argv) {
941941
if (argv[0] === 'cluster' && argv[1] === 'slots') {
942942
return [
943-
[0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
943+
[0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003], ['127.0.0.1', 30004]],
944944
[16382, 16383, ['127.0.0.1', 30002]]
945945
];
946946
}
@@ -949,10 +949,11 @@ describe('cluster', function () {
949949
this.node1 = new MockServer(30001, handler.bind(null, 30001));
950950
this.node2 = new MockServer(30002, handler.bind(null, 30002));
951951
this.node3 = new MockServer(30003, handler.bind(null, 30003));
952+
this.node4 = new MockServer(30004, handler.bind(null, 30004));
952953
});
953954

954955
afterEach(function (done) {
955-
disconnect([this.node1, this.node2, this.node3], done);
956+
disconnect([this.node1, this.node2, this.node3, this.node4], done);
956957
});
957958

958959
context('master', function () {
@@ -977,7 +978,7 @@ describe('cluster', function () {
977978
});
978979
cluster.on('ready', function () {
979980
stub(utils, 'sample', function (array, from) {
980-
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']);
981+
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003', '127.0.0.1:30004']);
981982
expect(from).to.eql(1);
982983
return '127.0.0.1:30003';
983984
});
@@ -1006,14 +1007,62 @@ describe('cluster', function () {
10061007
});
10071008
});
10081009

1010+
context('custom', function () {
1011+
it('should send to selected slave', function (done) {
1012+
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
1013+
scaleReads: function(node, command) {
1014+
if (command.name === 'get') {
1015+
return node[1];
1016+
} else {
1017+
return node[2];
1018+
}
1019+
}
1020+
});
1021+
cluster.on('ready', function () {
1022+
stub(utils, 'sample', function (array, from) {
1023+
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003', '127.0.0.1:30004']);
1024+
expect(from).to.eql(1);
1025+
return '127.0.0.1:30003';
1026+
});
1027+
cluster.hgetall('foo', function (err, res) {
1028+
utils.sample.restore();
1029+
expect(res).to.eql(30004);
1030+
cluster.disconnect();
1031+
done();
1032+
});
1033+
});
1034+
});
1035+
1036+
it('should send writes to masters', function (done) {
1037+
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
1038+
scaleReads: function(node, command) {
1039+
if (command.name === 'get') {
1040+
return node[1];
1041+
} else {
1042+
return node[2];
1043+
}
1044+
}
1045+
});
1046+
cluster.on('ready', function () {
1047+
stub(utils, 'sample').throws('sample is called');
1048+
cluster.set('foo', 'bar', function (err, res) {
1049+
utils.sample.restore();
1050+
expect(res).to.eql(30001);
1051+
cluster.disconnect();
1052+
done();
1053+
});
1054+
});
1055+
});
1056+
});
1057+
10091058
context('all', function () {
10101059
it('should send reads to all nodes randomly', function (done) {
10111060
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
10121061
scaleReads: 'all'
10131062
});
10141063
cluster.on('ready', function () {
10151064
stub(utils, 'sample', function (array, from) {
1016-
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']);
1065+
expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003', '127.0.0.1:30004']);
10171066
expect(from).to.eql(undefined);
10181067
return '127.0.0.1:30003';
10191068
});

0 commit comments

Comments
 (0)