Skip to content

Commit 27b408e

Browse files
committed
refactor: rewrite connectors with TypeScript
1 parent 24dabbe commit 27b408e

File tree

8 files changed

+356
-315
lines changed

8 files changed

+356
-315
lines changed

lib/connectors/Connector.ts

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import {createConnection, TcpNetConnectOpts, IpcNetConnectOpts, Socket} from 'net'
2+
import {connect as createTLSConnection, SecureContextOptions, TLSSocket} from 'tls'
3+
import {CONNECTION_CLOSED_ERROR_MSG} from '../utils/index'
4+
5+
export function isIIpcConnectionOptions (value: any): value is IIpcConnectionOptions {
6+
return value.hasOwnProperty('path')
7+
}
8+
9+
export interface ITcpConnectionOptions extends TcpNetConnectOpts {
10+
tls?: SecureContextOptions
11+
}
12+
13+
export interface IIpcConnectionOptions extends IpcNetConnectOpts {
14+
tls?: SecureContextOptions
15+
}
16+
17+
export type ErrorEmitter = (type: string, err: Error) => void
18+
19+
export default class Connector {
20+
protected connecting: boolean = false
21+
protected stream: Socket | TLSSocket
22+
23+
constructor (protected options: ITcpConnectionOptions | IIpcConnectionOptions) {}
24+
25+
public check (info: any): boolean {
26+
return true
27+
}
28+
29+
public disconnect (): void {
30+
this.connecting = false
31+
if (this.stream) {
32+
this.stream.end()
33+
}
34+
}
35+
36+
public connect (callback: Function, _: ErrorEmitter) {
37+
const {options} = this
38+
this.connecting = true
39+
40+
let connectionOptions: any
41+
if (isIIpcConnectionOptions(options)) {
42+
connectionOptions = {
43+
path: options.path
44+
}
45+
} else {
46+
connectionOptions = {
47+
port: options.port,
48+
host: options.host,
49+
family: options.family
50+
}
51+
}
52+
53+
if (options.tls) {
54+
Object.assign(connectionOptions, options.tls)
55+
}
56+
57+
process.nextTick(() => {
58+
if (!this.connecting) {
59+
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
60+
return
61+
}
62+
63+
let stream: Socket | TLSSocket
64+
try {
65+
if (options.tls) {
66+
stream = createTLSConnection(connectionOptions)
67+
} else {
68+
stream = createConnection(connectionOptions)
69+
}
70+
} catch (err) {
71+
callback(err)
72+
return
73+
}
74+
75+
this.stream = stream
76+
callback(null, stream)
77+
})
78+
}
79+
}

lib/connectors/SentinelConnector.ts

+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
import {createConnection, Socket} from 'net'
2+
import {bind, sample} from '../utils/lodash'
3+
import {CONNECTION_CLOSED_ERROR_MSG, packObject} from '../utils/index'
4+
import Connector, {ITcpConnectionOptions, IIpcConnectionOptions, ErrorEmitter, isIIpcConnectionOptions} from './Connector'
5+
import { TLSSocket } from 'tls';
6+
const debug = require('../utils/debug')('ioredis:SentinelConnector')
7+
8+
let Redis
9+
10+
interface ISentinelSlavesResponse {
11+
port: string,
12+
ip: string,
13+
flags?: string
14+
}
15+
16+
interface ISentinelOptions {
17+
role: 'master' | 'slave'
18+
name: 'string'
19+
sentinels: any[]
20+
sentinelRetryStrategy?: (retryAttempts: number) => number
21+
preferredSlaves?:
22+
((slaves: Array<ISentinelSlavesResponse>) => ISentinelSlavesResponse) |
23+
Array<{port: string, ip: string, prio?: number}> |
24+
{port: string, ip: string, prio?: number}
25+
connectTimeout?: number
26+
}
27+
28+
type NodeCallback<T = void> = (err: Error | null, result?: T) => void
29+
30+
interface ISentinelTcpConnectionOptions extends ITcpConnectionOptions, ISentinelOptions {}
31+
interface ISentinelIpcConnectionOptions extends IIpcConnectionOptions, ISentinelOptions {}
32+
33+
export default class SentinelConnector extends Connector {
34+
private retryAttempts: number
35+
private currentPoint: number = -1
36+
private sentinels: any[]
37+
38+
constructor (protected options: ISentinelTcpConnectionOptions | ISentinelIpcConnectionOptions) {
39+
super(options)
40+
if (this.options.sentinels.length === 0) {
41+
throw new Error('Requires at least one sentinel to connect to.')
42+
}
43+
if (!this.options.name) {
44+
throw new Error('Requires the name of master.')
45+
}
46+
47+
this.sentinels = this.options.sentinels
48+
}
49+
50+
public check (info: {role?: string}): boolean {
51+
const roleMatches: boolean = !info.role || this.options.role === info.role
52+
if (!roleMatches) {
53+
debug('role invalid, expected %s, but got %s', this.options.role, info.role)
54+
}
55+
return roleMatches
56+
}
57+
58+
connect (callback: NodeCallback<Socket | TLSSocket>, eventEmitter: ErrorEmitter): void {
59+
this.connecting = true
60+
this.retryAttempts = 0
61+
62+
let lastError
63+
const _this = this
64+
connectToNext()
65+
66+
function connectToNext() {
67+
_this.currentPoint += 1
68+
if (_this.currentPoint === _this.sentinels.length) {
69+
_this.currentPoint = -1
70+
71+
const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function'
72+
? _this.options.sentinelRetryStrategy(++_this.retryAttempts)
73+
: null
74+
75+
let errorMsg = typeof retryDelay !== 'number'
76+
? 'All sentinels are unreachable and retry is disabled.'
77+
: `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.`
78+
79+
if (lastError) {
80+
errorMsg += ` Last error: ${lastError.message}`
81+
}
82+
83+
debug(errorMsg)
84+
85+
const error = new Error(errorMsg)
86+
if (typeof retryDelay === 'number') {
87+
setTimeout(connectToNext, retryDelay)
88+
eventEmitter('error', error)
89+
} else {
90+
callback(error)
91+
}
92+
return
93+
}
94+
95+
const endpoint = _this.sentinels[_this.currentPoint]
96+
_this.resolve(endpoint, function (err, resolved) {
97+
if (!_this.connecting) {
98+
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
99+
return
100+
}
101+
if (resolved) {
102+
debug('resolved: %s:%s', resolved.host, resolved.port)
103+
_this.stream = createConnection(resolved)
104+
callback(null, _this.stream)
105+
} else {
106+
var endpointAddress = endpoint.host + ':' + endpoint.port
107+
var errorMsg = err
108+
? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message
109+
: 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved
110+
111+
debug(errorMsg)
112+
113+
eventEmitter('sentinelError', new Error(errorMsg))
114+
115+
if (err) {
116+
lastError = err
117+
}
118+
connectToNext()
119+
}
120+
})
121+
}
122+
}
123+
124+
updateSentinels (client, callback: NodeCallback) {
125+
var _this = this
126+
client.sentinel('sentinels', this.options.name, function (err, result) {
127+
if (err) {
128+
client.disconnect()
129+
return callback(err)
130+
}
131+
if (Array.isArray(result)) {
132+
for (var i = 0; i < result.length; ++i) {
133+
var sentinel = packObject(result[i])
134+
var flags = sentinel.flags ? sentinel.flags.split(',') : []
135+
if (flags.indexOf('disconnected') === -1 && sentinel.ip && sentinel.port) {
136+
var endpoint = { host: sentinel.ip, port: parseInt(sentinel.port, 10) }
137+
var isDuplicate = _this.sentinels.some(bind(isSentinelEql, null, endpoint))
138+
if (!isDuplicate) {
139+
debug('adding sentinel %s:%s', endpoint.host, endpoint.port)
140+
_this.sentinels.push(endpoint)
141+
}
142+
}
143+
}
144+
debug('sentinels', _this.sentinels)
145+
}
146+
callback(null)
147+
})
148+
}
149+
150+
resolveMaster (client, callback: NodeCallback<ITcpConnectionOptions>) {
151+
var _this = this
152+
client.sentinel('get-master-addr-by-name', this.options.name, function (err, result) {
153+
if (err) {
154+
client.disconnect()
155+
return callback(err)
156+
}
157+
_this.updateSentinels(client, function (err) {
158+
client.disconnect()
159+
if (err) {
160+
return callback(err)
161+
}
162+
callback(null, Array.isArray(result) ? { host: result[0], port: result[1] } : null)
163+
})
164+
})
165+
}
166+
167+
resolveSlave (client, callback: NodeCallback<ITcpConnectionOptions>) {
168+
client.sentinel('slaves', this.options.name, (err, result) => {
169+
client.disconnect()
170+
if (err) {
171+
return callback(err)
172+
}
173+
let selectedSlave: ISentinelSlavesResponse
174+
if (Array.isArray(result)) {
175+
const availableSlaves: Array<{port: string, ip: string, flags?: string}> = []
176+
for (var i = 0; i < result.length; ++i) {
177+
const slave: ISentinelSlavesResponse = packObject(result[i])
178+
if (slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/)) {
179+
availableSlaves.push(slave)
180+
}
181+
}
182+
// allow the options to prefer particular slave(s)
183+
let {preferredSlaves} = this.options
184+
if (typeof preferredSlaves === 'function') {
185+
selectedSlave = preferredSlaves(availableSlaves)
186+
} else if (preferredSlaves !== null && typeof preferredSlaves === 'object') {
187+
const preferredSlavesArray = Array.isArray(preferredSlaves)
188+
? preferredSlaves
189+
: [preferredSlaves]
190+
191+
// sort by priority
192+
preferredSlavesArray.sort((a, b) => {
193+
// default the priority to 1
194+
if (!a.prio) {
195+
a.prio = 1
196+
}
197+
if (!b.prio) {
198+
b.prio = 1
199+
}
200+
201+
// lowest priority first
202+
if (a.prio < b.prio) {
203+
return -1
204+
}
205+
if (a.prio > b.prio) {
206+
return 1
207+
}
208+
return 0
209+
})
210+
211+
// loop over preferred slaves and return the first match
212+
for (let p = 0; p < preferredSlavesArray.length; p++) {
213+
for (let a = 0; a < availableSlaves.length; a++) {
214+
const slave = availableSlaves[a]
215+
if (slave.ip === preferredSlavesArray[p].ip) {
216+
if (slave.port === preferredSlavesArray[p].port) {
217+
selectedSlave = slave
218+
break
219+
}
220+
}
221+
}
222+
if (selectedSlave) {
223+
break
224+
}
225+
}
226+
// if none of the preferred slaves are available, a random available slave is returned
227+
}
228+
if (!selectedSlave) {
229+
// get a random available slave
230+
selectedSlave = sample(availableSlaves)
231+
}
232+
}
233+
callback(null, selectedSlave ? {host: selectedSlave.ip, port: Number(selectedSlave.port)} : null)
234+
})
235+
}
236+
237+
resolve (endpoint, callback: NodeCallback<ITcpConnectionOptions>) {
238+
if (typeof Redis === 'undefined') {
239+
Redis = require('../redis')
240+
}
241+
var client = new Redis({
242+
port: endpoint.port || 26379,
243+
host: endpoint.host,
244+
family: endpoint.family || (isIIpcConnectionOptions(this.options) ? undefined : this.options.family),
245+
retryStrategy: null,
246+
enableReadyCheck: false,
247+
connectTimeout: this.options.connectTimeout,
248+
dropBufferSupport: true
249+
})
250+
251+
// ignore the errors since resolve* methods will handle them
252+
client.on('error', noop)
253+
254+
if (this.options.role === 'slave') {
255+
this.resolveSlave(client, callback)
256+
} else {
257+
this.resolveMaster(client, callback)
258+
}
259+
}
260+
}
261+
262+
function noop (): void {}
263+
264+
function isSentinelEql (a, b): boolean {
265+
return ((a.host || '127.0.0.1') === (b.host || '127.0.0.1')) &&
266+
((a.port || 26379) === (b.port || 26379))
267+
}

0 commit comments

Comments
 (0)