Skip to content

Commit 13a5bc4

Browse files
authored
fix(cluster): robust solution for pub/sub in cluster (#697)
* fix(cluster): robust solution for pub/sub in cluster Previously (v3 & v4.0.0), ioredis reuse the existing connection for subscription, which will cause problem when executing commands on the reused connection. From now on, a specialized connection will be created when any subscription has made. This solves the problem above perfectly. Close #696
1 parent 63067c3 commit 13a5bc4

14 files changed

+280
-153
lines changed

lib/cluster/ClusterSubscriber.ts

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import {EventEmitter} from 'events'
2+
import ConnectionPool from './ConnectionPool'
3+
import {sample, noop} from '../utils/lodash'
4+
import {getNodeKey} from './util'
5+
6+
const Redis = require('../redis')
7+
const debug = require('../utils/debug')('ioredis:cluster:subscriber')
8+
9+
const SUBSCRIBER_CONNECTION_NAME = 'ioredisClusterSubscriber'
10+
11+
export default class ClusterSubscriber {
12+
private started: boolean = false
13+
private subscriber: any = null
14+
private lastActiveSubscriber: any
15+
16+
constructor (private connectionPool: ConnectionPool, private emitter: EventEmitter) {
17+
this.connectionPool.on('-node', (_, key: string) => {
18+
if (!this.started || !this.subscriber) {
19+
return
20+
}
21+
if (getNodeKey(this.subscriber.options) === key) {
22+
debug('subscriber has left, selecting a new one...')
23+
this.selectSubscriber()
24+
}
25+
})
26+
this.connectionPool.on('+node', () => {
27+
if (!this.started || this.subscriber) {
28+
return
29+
}
30+
debug('a new node is discovered and there is no subscriber, selecting a new one...')
31+
this.selectSubscriber()
32+
})
33+
}
34+
35+
getInstance (): any {
36+
return this.subscriber
37+
}
38+
39+
private selectSubscriber () {
40+
const lastActiveSubscriber = this.lastActiveSubscriber
41+
42+
// Disconnect the previous subscriber even if there
43+
// will not be a new one.
44+
if (lastActiveSubscriber) {
45+
lastActiveSubscriber.disconnect()
46+
}
47+
48+
const sampleNode = sample(this.connectionPool.getNodes())
49+
if (!sampleNode) {
50+
debug('selecting subscriber failed since there is no node discovered in the cluster yet')
51+
this.subscriber = null
52+
return
53+
}
54+
55+
const {port, host} = sampleNode.options
56+
debug('selected a subscriber %s:%s', host, port)
57+
58+
// Create a specialized Redis connection for the subscription.
59+
// Note that auto reconnection is enabled here.
60+
// `enableReadyCheck` is disabled because subscription is allowed
61+
// when redis is loading data from the disk.
62+
this.subscriber = new Redis({
63+
port,
64+
host,
65+
enableReadyCheck: false,
66+
connectionName: SUBSCRIBER_CONNECTION_NAME,
67+
lazyConnect: true
68+
})
69+
70+
// Re-subscribe previous channels
71+
var previousChannels = { subscribe: [], psubscribe: [] }
72+
if (lastActiveSubscriber) {
73+
const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition
74+
if (condition && condition.subscriber) {
75+
previousChannels.subscribe = condition.subscriber.channels('subscribe')
76+
previousChannels.psubscribe = condition.subscriber.channels('psubscribe')
77+
}
78+
}
79+
if (previousChannels.subscribe.length || previousChannels.psubscribe.length) {
80+
var pending = 0
81+
for (const type of ['subscribe', 'psubscribe']) {
82+
var channels = previousChannels[type]
83+
if (channels.length) {
84+
pending += 1
85+
debug('%s %d channels', type, channels.length)
86+
this.subscriber[type](channels).then(() => {
87+
if (!--pending) {
88+
this.lastActiveSubscriber = this.subscriber
89+
}
90+
}).catch(noop)
91+
}
92+
}
93+
} else {
94+
this.lastActiveSubscriber = this.subscriber
95+
}
96+
for (const event of ['message', 'messageBuffer']) {
97+
this.subscriber.on(event, (arg1, arg2) => {
98+
this.emitter.emit(event, arg1, arg2)
99+
})
100+
}
101+
for (const event of ['pmessage', 'pmessageBuffer']) {
102+
this.subscriber.on(event, (arg1, arg2, arg3) => {
103+
this.emitter.emit(event, arg1, arg2, arg3)
104+
})
105+
}
106+
}
107+
108+
start (): void {
109+
this.started = true
110+
this.selectSubscriber()
111+
debug('started')
112+
}
113+
114+
stop (): void {
115+
this.started = false
116+
if (this.subscriber) {
117+
this.subscriber.disconnect()
118+
this.subscriber = null
119+
}
120+
debug('stopped')
121+
}
122+
}

lib/cluster/ConnectionPool.ts

+36-45
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
11
import {parseURL} from '../utils'
22
import {EventEmitter} from 'events'
3-
import {noop, defaults} from '../utils/lodash'
3+
import {noop, defaults, values} from '../utils/lodash'
4+
import {IRedisOptions, getNodeKey} from './util'
45

56
const Redis = require('../redis')
67
const debug = require('../utils/debug')('ioredis:cluster:connectionPool')
78

89
type NODE_TYPE = 'all' | 'master' | 'slave'
910

10-
interface IRedisOptions {
11-
[key: string]: any
12-
}
13-
14-
interface IRedisOptionsWithKey extends IRedisOptions {
15-
key: string
16-
}
17-
1811
export default class ConnectionPool extends EventEmitter {
1912
// master + slave = all
2013
private nodes: {[key in NODE_TYPE]: {[key: string]: any}} = {
@@ -29,6 +22,10 @@ export default class ConnectionPool extends EventEmitter {
2922
super()
3023
}
3124

25+
public getNodes(role: 'all' | 'master' | 'slave' = 'all'): any[] {
26+
return values(this.nodes[role])
27+
}
28+
3229
/**
3330
* Find or create a connection to the node
3431
*
@@ -37,33 +34,34 @@ export default class ConnectionPool extends EventEmitter {
3734
* @returns {*}
3835
* @memberof ConnectionPool
3936
*/
40-
public findOrCreate (node: IRedisOptions, readOnly: boolean = false): any {
41-
setKey(node)
37+
public findOrCreate(node: IRedisOptions, readOnly: boolean = false): any {
38+
fillDefaultOptions(node)
39+
const key = getNodeKey(node)
4240
readOnly = Boolean(readOnly)
4341

44-
if (this.specifiedOptions[node.key]) {
45-
Object.assign(node, this.specifiedOptions[node.key])
42+
if (this.specifiedOptions[key]) {
43+
Object.assign(node, this.specifiedOptions[key])
4644
} else {
47-
this.specifiedOptions[node.key] = node
45+
this.specifiedOptions[key] = node
4846
}
4947

5048
let redis
51-
if (this.nodes.all[node.key]) {
52-
redis = this.nodes.all[node.key]
49+
if (this.nodes.all[key]) {
50+
redis = this.nodes.all[key]
5351
if (redis.options.readOnly !== readOnly) {
5452
redis.options.readOnly = readOnly
55-
debug('Change role of %s to %s', node.key, readOnly ? 'slave' : 'master')
53+
debug('Change role of %s to %s', key, readOnly ? 'slave' : 'master')
5654
redis[readOnly ? 'readonly' : 'readwrite']().catch(noop)
5755
if (readOnly) {
58-
delete this.nodes.master[node.key]
59-
this.nodes.slave[node.key] = redis
56+
delete this.nodes.master[key]
57+
this.nodes.slave[key] = redis
6058
} else {
61-
delete this.nodes.slave[node.key]
62-
this.nodes.master[node.key] = redis
59+
delete this.nodes.slave[key]
60+
this.nodes.master[key] = redis
6361
}
6462
}
6563
} else {
66-
debug('Connecting to %s as %s', node.key, readOnly ? 'slave' : 'master')
64+
debug('Connecting to %s as %s', key, readOnly ? 'slave' : 'master')
6765
redis = new Redis(defaults({
6866
// Never try to reconnect when a node is lose,
6967
// instead, waiting for a `MOVED` error and
@@ -75,23 +73,23 @@ export default class ConnectionPool extends EventEmitter {
7573
enableOfflineQueue: true,
7674
readOnly: readOnly
7775
}, node, this.redisOptions, { lazyConnect: true }))
78-
this.nodes.all[node.key] = redis
79-
this.nodes[readOnly ? 'slave' : 'master'][node.key] = redis
76+
this.nodes.all[key] = redis
77+
this.nodes[readOnly ? 'slave' : 'master'][key] = redis
8078

8179
redis.once('end', () => {
82-
delete this.nodes.all[node.key]
83-
delete this.nodes.master[node.key]
84-
delete this.nodes.slave[node.key]
85-
this.emit('-node', redis)
80+
delete this.nodes.all[key]
81+
delete this.nodes.master[key]
82+
delete this.nodes.slave[key]
83+
this.emit('-node', redis, key)
8684
if (!Object.keys(this.nodes.all).length) {
8785
this.emit('drain')
8886
}
8987
})
9088

91-
this.emit('+node', redis)
89+
this.emit('+node', redis, key)
9290

9391
redis.on('error', function (error) {
94-
this.emit('nodeError', error)
92+
this.emit('nodeError', error, key)
9593
})
9694
}
9795

@@ -105,14 +103,15 @@ export default class ConnectionPool extends EventEmitter {
105103
* @param {(Array<string | number | object>)} nodes
106104
* @memberof ConnectionPool
107105
*/
108-
public reset (nodes: Array<string | number | object>): void {
106+
public reset(nodes: Array<string | number | object>): void {
107+
debug('Reset with %O', nodes);
109108
const newNodes = {}
110109
nodes.forEach((node) => {
111-
const options: {port?: number | string, db?: number, key?: string} = {}
110+
const options: IRedisOptions = {}
112111
if (typeof node === 'object') {
113-
defaults(options, node)
112+
Object.assign(options, node)
114113
} else if (typeof node === 'string') {
115-
defaults(options, parseURL(node))
114+
Object.assign(options, parseURL(node))
116115
} else if (typeof node === 'number') {
117116
options.port = node
118117
} else {
@@ -123,8 +122,8 @@ export default class ConnectionPool extends EventEmitter {
123122
}
124123
delete options.db
125124

126-
setKey(options)
127-
newNodes[options.key] = options
125+
fillDefaultOptions(options)
126+
newNodes[getNodeKey(options)] = options
128127
}, this)
129128

130129
Object.keys(this.nodes.all).forEach((key) => {
@@ -140,15 +139,7 @@ export default class ConnectionPool extends EventEmitter {
140139
}
141140
}
142141

143-
/**
144-
* Set key property
145-
*
146-
* @private
147-
*/
148-
function setKey(node: IRedisOptions): IRedisOptionsWithKey {
149-
node = node || {}
142+
function fillDefaultOptions(node: IRedisOptions): void {
150143
node.port = node.port || 6379
151144
node.host = node.host || '127.0.0.1'
152-
node.key = node.key || node.host + ':' + node.port
153-
return <IRedisOptionsWithKey>node
154145
}

0 commit comments

Comments
 (0)