Skip to content

Commit 3702d67

Browse files
authored
feat(cluster): add NAT support (#758)
Closes #693, #365
1 parent ab63994 commit 3702d67

File tree

6 files changed

+189
-7
lines changed

6 files changed

+189
-7
lines changed

README.md

+21
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ used in the world's biggest online commerce company [Alibaba](http://www.alibaba
2929
0. Support for ES6 types, such as `Map` and `Set`.
3030
0. Support for GEO commands (Redis 3.2 Unstable).
3131
0. Sophisticated error handling strategy.
32+
0. Support for NAT mapping.
3233

3334
# Links
3435
* [API Documentation](API.md)
@@ -836,6 +837,26 @@ Promise.all(masters.map(function (node) {
836837
});
837838
```
838839

840+
### NAT Mapping
841+
Sometimes the cluster is hosted within a internal network that can only be accessed via a NAT (Network Address Translation) instance. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) as an example.
842+
843+
You can specify nat mapping rules via `natMap` option:
844+
845+
```javascript
846+
const cluster = new Redis.Cluster([{
847+
host: '203.0.113.73',
848+
port: 30001
849+
}], {
850+
natMap: {
851+
'10.0.1.230:30001': {host: '203.0.113.73', port: 30001},
852+
'10.0.1.231:30001': {host: '203.0.113.73', port: 30002},
853+
'10.0.1.232:30001': {host: '203.0.113.73', port: 30003}
854+
}
855+
})
856+
```
857+
858+
This option is also useful when the cluster is running inside a Docker container.
859+
839860
### Transaction and pipeline in Cluster mode
840861
Almost all features that are supported by `Redis` are also supported by `Redis.Cluster`, e.g. custom commands, transaction and pipeline.
841862
However there are some differences when using transaction and pipeline in Cluster mode:

lib/cluster/ClusterOptions.ts

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {NodeRole} from './util'
22
import {lookup} from 'dns'
33

44
export type DNSLookupFunction = (hostname: string, callback: (err: NodeJS.ErrnoException, address: string, family: number) => void) => void
5+
export type NatMap = {[key: string]: {host: string, port: number}}
56

67
/**
78
* Options for Cluster constructor
@@ -116,6 +117,7 @@ export interface IClusterOptions {
116117
* @default require('dns').lookup
117118
*/
118119
dnsLookup?: DNSLookupFunction
120+
natMap?: NatMap
119121
}
120122

121123
export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {

lib/cluster/index.ts

+18-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import {EventEmitter} from 'events'
22
import ClusterAllFailedError from '../errors/ClusterAllFailedError'
33
import {defaults, noop} from '../utils/lodash'
44
import ConnectionPool from './ConnectionPool'
5-
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions} from './util'
5+
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions, nodeKeyToRedisOptions} from './util'
66
import ClusterSubscriber from './ClusterSubscriber'
77
import DelayQueue from './DelayQueue'
88
import ScanStream from '../ScanStream'
@@ -416,6 +416,18 @@ class Cluster extends EventEmitter {
416416
}
417417
}
418418

419+
natMapper(nodeKey: NodeKey | IRedisOptions): IRedisOptions {
420+
if (this.options.natMap && typeof this.options.natMap === 'object') {
421+
const key = typeof nodeKey === 'string' ? nodeKey : `${nodeKey.host}:${nodeKey.port}`
422+
const mapped = this.options.natMap[key]
423+
if (mapped) {
424+
debug('NAT mapping %s -> %O', key, mapped)
425+
return mapped
426+
}
427+
}
428+
return typeof nodeKey === 'string' ? nodeKeyToRedisOptions(nodeKey) : nodeKey
429+
}
430+
419431
sendCommand(command, stream, node) {
420432
if (this.status === 'wait') {
421433
this.connect().catch(noop)
@@ -449,16 +461,15 @@ class Cluster extends EventEmitter {
449461
} else {
450462
_this.slots[slot] = [key]
451463
}
452-
const splitKey = key.split(':')
453-
_this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])})
464+
_this.connectionPool.findOrCreate(_this.natMapper(key))
454465
tryConnection()
455466
_this.refreshSlotsCache()
456467
},
457468
ask: function (slot, key) {
458469
debug('command %s is required to ask %s:%s', command.name, key)
459-
const splitKey = key.split(':')
460-
_this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])})
461-
tryConnection(false, key)
470+
const mapped = _this.natMapper(key)
471+
_this.connectionPool.findOrCreate(mapped)
472+
tryConnection(false, `${mapped.host}:${mapped.port}`)
462473
},
463474
tryagain: partialTry,
464475
clusterDown: partialTry,
@@ -610,7 +621,7 @@ class Cluster extends EventEmitter {
610621

611622
const keys = []
612623
for (let j = 2; j < items.length; j++) {
613-
items[j] = {host: items[j][0], port: items[j][1]}
624+
items[j] = this.natMapper({host: items[j][0], port: items[j][1]})
614625
items[j].readOnly = j !== 2
615626
nodes.push(items[j])
616627
keys.push(items[j].host + ':' + items[j].port)

lib/cluster/util.ts

+11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@ export function getNodeKey(node: IRedisOptions): NodeKey {
1818
return node.host + ':' + node.port
1919
}
2020

21+
export function nodeKeyToRedisOptions(nodeKey: NodeKey): IRedisOptions {
22+
const portIndex = nodeKey.lastIndexOf(':')
23+
if (portIndex === -1) {
24+
throw new Error(`Invalid node key ${nodeKey}`)
25+
}
26+
return {
27+
host: nodeKey.slice(0, portIndex),
28+
port: Number(nodeKey.slice(portIndex + 1))
29+
}
30+
}
31+
2132
export function normalizeNodeOptions(nodes: Array<string | number | object>): IRedisOptions[] {
2233
return nodes.map((node) => {
2334
const options: any = {}

test/functional/cluster/nat.js

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
const calculateSlot = require('cluster-key-slot')
2+
3+
describe('NAT', () => {
4+
it('works for normal case', (done) => {
5+
const slotTable = [
6+
[0, 1, ['192.168.1.1', 30001]],
7+
[2, 16383, ['192.168.1.2', 30001]]
8+
]
9+
10+
let cluster
11+
new MockServer(30001, null, slotTable)
12+
new MockServer(30002, ([command, arg]) => {
13+
if (command === 'get' && arg === 'foo') {
14+
cluster.disconnect()
15+
done()
16+
}
17+
}, slotTable)
18+
19+
cluster = new Redis.Cluster([{
20+
host: '127.0.0.1',
21+
port: 30001
22+
}], {
23+
natMap: {
24+
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
25+
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
26+
}
27+
})
28+
29+
cluster.get('foo')
30+
})
31+
32+
it('works if natMap does not match all the cases', (done) => {
33+
const slotTable = [
34+
[0, 1, ['192.168.1.1', 30001]],
35+
[2, 16383, ['127.0.0.1', 30002]]
36+
]
37+
38+
let cluster
39+
new MockServer(30001, null, slotTable)
40+
new MockServer(30002, ([command, arg]) => {
41+
if (command === 'get' && arg === 'foo') {
42+
cluster.disconnect()
43+
done()
44+
}
45+
}, slotTable)
46+
47+
cluster = new Redis.Cluster([{
48+
host: '127.0.0.1',
49+
port: 30001
50+
}], {
51+
natMap: {
52+
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001}
53+
}
54+
})
55+
56+
cluster.get('foo')
57+
})
58+
59+
it('works for moved', (done) => {
60+
const slotTable = [
61+
[0, 16383, ['192.168.1.1', 30001]]
62+
]
63+
64+
let cluster
65+
new MockServer(30001, ([command, arg]) => {
66+
if (command === 'get' && arg === 'foo') {
67+
return new Error('MOVED ' + calculateSlot('foo') + ' 192.168.1.2:30001');
68+
}
69+
}, slotTable)
70+
new MockServer(30002, ([command, arg]) => {
71+
if (command === 'get' && arg === 'foo') {
72+
cluster.disconnect()
73+
done()
74+
}
75+
}, slotTable)
76+
77+
cluster = new Redis.Cluster([{
78+
host: '127.0.0.1',
79+
port: 30001
80+
}], {
81+
natMap: {
82+
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
83+
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
84+
}
85+
})
86+
87+
cluster.get('foo')
88+
})
89+
90+
it('works for ask', (done) => {
91+
const slotTable = [
92+
[0, 16383, ['192.168.1.1', 30001]]
93+
]
94+
95+
let cluster
96+
let asked = false
97+
new MockServer(30001, ([command, arg]) => {
98+
if (command === 'get' && arg === 'foo') {
99+
return new Error('ASK ' + calculateSlot('foo') + ' 192.168.1.2:30001');
100+
}
101+
}, slotTable)
102+
new MockServer(30002, ([command, arg]) => {
103+
if (command === 'asking') {
104+
asked = true
105+
}
106+
if (command === 'get' && arg === 'foo') {
107+
if (!asked) {
108+
throw new Error('expected asked to be true')
109+
}
110+
cluster.disconnect()
111+
done()
112+
}
113+
}, slotTable)
114+
115+
cluster = new Redis.Cluster([{
116+
host: '127.0.0.1',
117+
port: 30001
118+
}], {
119+
natMap: {
120+
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
121+
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
122+
}
123+
})
124+
125+
cluster.get('foo')
126+
})
127+
})

test/unit/cluster.js

+10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
var Cluster = require('../../lib/cluster').default;
4+
const {nodeKeyToRedisOptions} = require('../../lib/cluster/util')
45

56
describe('cluster', function () {
67
beforeEach(function () {
@@ -36,3 +37,12 @@ describe('cluster', function () {
3637
});
3738
});
3839
});
40+
41+
describe('nodeKeyToRedisOptions()', () => {
42+
it('returns correct result', () => {
43+
expect(nodeKeyToRedisOptions('127.0.0.1:6379')).to.eql({port: 6379, host: '127.0.0.1'})
44+
expect(nodeKeyToRedisOptions('192.168.1.1:30001')).to.eql({port: 30001, host: '192.168.1.1'})
45+
expect(nodeKeyToRedisOptions('::0:6379')).to.eql({port: 6379, host: '::0'})
46+
expect(nodeKeyToRedisOptions('0:0:6379')).to.eql({port: 6379, host: '0:0'})
47+
})
48+
})

0 commit comments

Comments
 (0)