Skip to content

Commit bf3fe29

Browse files
imatlopezluin
authored andcommitted
feat: support custom connectors (#906)
1 parent 224df78 commit bf3fe29

13 files changed

+154
-84
lines changed

.eslintrc

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"env": {
3-
"node": true
3+
"node": true,
4+
"es6": true
45
},
56
"rules": {
67
"no-const-assign": 2,

.travis.yml

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ services:
1212
- redis-server
1313

1414
script:
15+
- npm run build
1516
- npm run test:cov || npm run test:cov || npm run test:cov
1617

1718
env:

examples/basic_operations.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,4 @@ redis.sadd('set', [1, 3, 5, 7]);
2727
redis.set('key', 100, 'EX', 10);
2828

2929
// Change the server configuration
30-
redis.config('set', 'notify-keyspace-events', 'KEA')
31-
32-
30+
redis.config('set', 'notify-keyspace-events', 'KEA');

examples/custom_connector.js

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
'use strict';
2+
3+
const Redis = require('ioredis');
4+
const MyService = require('path/to/my/service');
5+
6+
// Create a custom connector that fetches sentinels from an external call
7+
class AsyncSentinelConnector extends Redis.SentinelConnector {
8+
constructor(options = {}) {
9+
// Placeholder
10+
options.sentinels = options.sentinels || [{ host: 'localhost', port: 6379 }];
11+
12+
// SentinelConnector saves options as its property
13+
super(options);
14+
}
15+
16+
connect(eventEmitter) {
17+
return MyService.getSentinels().then(sentinels => {
18+
this.options.sentinels = sentinels;
19+
this.sentinelIterator = new Redis.SentinelIterator(sentinels);
20+
return Redis.SentinelConnector.prototype.connect.call(this, eventEmitter);
21+
});
22+
}
23+
}
24+
25+
const redis = new Redis({
26+
connector: new AsyncSentinelConnector()
27+
});
28+
29+
// ioredis supports all Redis commands:
30+
redis.set('foo', 'bar');
31+
redis.get('foo', function (err, result) {
32+
if (err) {
33+
console.error(err);
34+
} else {
35+
console.log(result);
36+
}
37+
});
38+
redis.del('foo');
39+
40+
// Or using a promise if the last argument isn't a function
41+
redis.get('foo').then(function (result) {
42+
console.log(result);
43+
});
44+
45+
// Arguments to commands are flattened, so the following are the same:
46+
redis.sadd('set', 1, 3, 5, 7);
47+
redis.sadd('set', [1, 3, 5, 7]);
48+
49+
// All arguments are passed directly to the redis server:
50+
redis.set('key', 100, 'EX', 10);
51+
52+
// Change the server configuration
53+
redis.config('set', 'notify-keyspace-events', 'KEA');

lib/connectors/AbstractConnector.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ export default abstract class AbstractConnector {
1717
}
1818
}
1919

20-
public abstract connect (callback: Function, _: ErrorEmitter)
20+
public abstract connect (_: ErrorEmitter): Promise<NetStream>
2121
}

lib/connectors/SentinelConnector/SentinelIterator.ts

+5-9
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,19 @@ function isSentinelEql (a: Partial<ISentinelAddress>, b: Partial<ISentinelAddres
55
((a.port || 26379) === (b.port || 26379))
66
}
77

8-
export default class SentinelIterator {
8+
export default class SentinelIterator implements Iterator<Partial<ISentinelAddress>> {
99
private cursor: number = 0
1010

1111
constructor (private sentinels: Partial<ISentinelAddress>[]) {}
1212

13-
hasNext (): boolean {
14-
return this.cursor < this.sentinels.length
15-
}
16-
17-
next (): Partial<ISentinelAddress> | null {
18-
return this.hasNext() ? this.sentinels[this.cursor++] : null
13+
next () {
14+
const done = this.cursor >= this.sentinels.length
15+
return { done, value: done ? undefined : this.sentinels[this.cursor++] }
1916
}
2017

2118
reset (moveCurrentEndpointToFirst: boolean): void {
2219
if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) {
23-
const remains = this.sentinels.slice(this.cursor - 1)
24-
this.sentinels = remains.concat(this.sentinels.slice(0, this.cursor - 1))
20+
this.sentinels.unshift(...this.sentinels.splice(this.cursor - 1))
2521
}
2622
this.cursor = 0
2723
}

lib/connectors/SentinelConnector/index.ts

+38-32
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import {ITcpConnectionOptions, isIIpcConnectionOptions} from '../StandaloneConne
66
import SentinelIterator from './SentinelIterator'
77
import {ISentinelAddress} from './types';
88
import AbstractConnector, { ErrorEmitter } from '../AbstractConnector'
9-
import {NetStream} from '../../types'
9+
import {NetStream, CallbackFunction} from '../../types';
10+
import * as PromiseContainer from '../../promiseContainer';
1011
import Redis from '../../redis'
1112

1213
const debug = Debug('SentinelConnector')
@@ -17,12 +18,13 @@ interface IAddressFromResponse {
1718
flags?: string
1819
}
1920

20-
type NodeCallback<T = void> = (err: Error | null, result?: T) => void
2121
type PreferredSlaves =
2222
((slaves: Array<IAddressFromResponse>) => IAddressFromResponse | null) |
2323
Array<{port: string, ip: string, prio?: number}> |
2424
{port: string, ip: string, prio?: number}
2525

26+
export { ISentinelAddress, SentinelIterator };
27+
2628
export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
2729
role: 'master' | 'slave'
2830
name: string
@@ -39,12 +41,12 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
3941

4042
export default class SentinelConnector extends AbstractConnector {
4143
private retryAttempts: number
42-
private sentinelIterator: SentinelIterator
44+
protected sentinelIterator: SentinelIterator
4345

4446
constructor (protected options: ISentinelConnectionOptions) {
4547
super()
4648

47-
if (this.options.sentinels.length === 0) {
49+
if (!this.options.sentinels.length) {
4850
throw new Error('Requires at least one sentinel to connect to.')
4951
}
5052
if (!this.options.name) {
@@ -68,19 +70,20 @@ export default class SentinelConnector extends AbstractConnector {
6870
return roleMatches
6971
}
7072

71-
public connect (callback: NodeCallback<NetStream>, eventEmitter: ErrorEmitter): void {
73+
public connect (eventEmitter: ErrorEmitter): Promise<NetStream> {
7274
this.connecting = true
7375
this.retryAttempts = 0
7476

7577
let lastError
76-
const _this = this
77-
connectToNext()
78-
79-
function connectToNext() {
80-
if (!_this.sentinelIterator.hasNext()) {
81-
_this.sentinelIterator.reset(false)
82-
const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function'
83-
? _this.options.sentinelRetryStrategy(++_this.retryAttempts)
78+
const _Promise = PromiseContainer.get();
79+
80+
const connectToNext = () => new _Promise<NetStream>((resolve, reject) => {
81+
const endpoint = this.sentinelIterator.next();
82+
83+
if (endpoint.done) {
84+
this.sentinelIterator.reset(false)
85+
const retryDelay = typeof this.options.sentinelRetryStrategy === 'function'
86+
? this.options.sentinelRetryStrategy(++this.retryAttempts)
8487
: null
8588

8689
let errorMsg = typeof retryDelay !== 'number'
@@ -95,32 +98,33 @@ export default class SentinelConnector extends AbstractConnector {
9598

9699
const error = new Error(errorMsg)
97100
if (typeof retryDelay === 'number') {
98-
setTimeout(connectToNext, retryDelay)
101+
setTimeout(() => {
102+
resolve(connectToNext());
103+
}, retryDelay)
99104
eventEmitter('error', error)
100105
} else {
101-
callback(error)
106+
reject(error)
102107
}
103108
return
104109
}
105110

106-
const endpoint = _this.sentinelIterator.next()
107-
_this.resolve(endpoint, function (err, resolved) {
108-
if (!_this.connecting) {
109-
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
111+
this.resolve(endpoint.value, (err, resolved) => {
112+
if (!this.connecting) {
113+
reject(new Error(CONNECTION_CLOSED_ERROR_MSG))
110114
return
111115
}
112116
if (resolved) {
113117
debug('resolved: %s:%s', resolved.host, resolved.port)
114-
if (_this.options.enableTLSForSentinelMode && _this.options.tls) {
115-
Object.assign(resolved, _this.options.tls)
116-
_this.stream = createTLSConnection(resolved)
118+
if (this.options.enableTLSForSentinelMode && this.options.tls) {
119+
Object.assign(resolved, this.options.tls)
120+
this.stream = createTLSConnection(resolved)
117121
} else {
118-
_this.stream = createConnection(resolved)
122+
this.stream = createConnection(resolved)
119123
}
120-
_this.sentinelIterator.reset(true)
121-
callback(null, _this.stream)
124+
this.sentinelIterator.reset(true)
125+
resolve(this.stream)
122126
} else {
123-
const endpointAddress = endpoint.host + ':' + endpoint.port
127+
const endpointAddress = endpoint.value.host + ':' + endpoint.value.port
124128
const errorMsg = err
125129
? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message
126130
: 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved
@@ -132,13 +136,15 @@ export default class SentinelConnector extends AbstractConnector {
132136
if (err) {
133137
lastError = err
134138
}
135-
connectToNext()
139+
resolve(connectToNext())
136140
}
137141
})
138-
}
142+
});
143+
144+
return connectToNext();
139145
}
140146

141-
private updateSentinels (client, callback: NodeCallback): void {
147+
private updateSentinels (client, callback: CallbackFunction): void {
142148

143149
if (!this.options.updateSentinels) {
144150
return callback(null)
@@ -167,7 +173,7 @@ export default class SentinelConnector extends AbstractConnector {
167173
})
168174
}
169175

170-
private resolveMaster (client, callback: NodeCallback<ITcpConnectionOptions>): void {
176+
private resolveMaster (client, callback: CallbackFunction<ITcpConnectionOptions>): void {
171177
client.sentinel('get-master-addr-by-name', this.options.name, (err, result) => {
172178
if (err) {
173179
client.disconnect()
@@ -186,7 +192,7 @@ export default class SentinelConnector extends AbstractConnector {
186192
})
187193
}
188194

189-
private resolveSlave (client, callback: NodeCallback<ITcpConnectionOptions | null>): void {
195+
private resolveSlave (client, callback: CallbackFunction<ITcpConnectionOptions | null>): void {
190196
client.sentinel('slaves', this.options.name, (err, result) => {
191197
client.disconnect()
192198
if (err) {
@@ -214,7 +220,7 @@ export default class SentinelConnector extends AbstractConnector {
214220
return this.options.natMap[`${item.host}:${item.port}`] || item
215221
}
216222

217-
private resolve (endpoint, callback: NodeCallback<ITcpConnectionOptions>): void {
223+
private resolve (endpoint, callback: CallbackFunction<ITcpConnectionOptions>): void {
218224
var client = new Redis({
219225
port: endpoint.port || 26379,
220226
host: endpoint.host,

lib/connectors/StandaloneConnector.ts

+21-19
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {createConnection, TcpNetConnectOpts, IpcNetConnectOpts} from 'net'
22
import {connect as createTLSConnection, SecureContextOptions} from 'tls'
33
import {CONNECTION_CLOSED_ERROR_MSG} from '../utils'
44
import AbstractConnector, {ErrorEmitter} from './AbstractConnector'
5+
import * as PromiseContainer from '../promiseContainer';
56
import {NetStream} from '../types'
67

78
export function isIIpcConnectionOptions (value: any): value is IIpcConnectionOptions {
@@ -21,7 +22,7 @@ export default class StandaloneConnector extends AbstractConnector {
2122
super()
2223
}
2324

24-
public connect (callback: Function, _: ErrorEmitter) {
25+
public connect (_: ErrorEmitter) {
2526
const {options} = this
2627
this.connecting = true
2728

@@ -46,27 +47,28 @@ export default class StandaloneConnector extends AbstractConnector {
4647
if (options.tls) {
4748
Object.assign(connectionOptions, options.tls)
4849
}
50+
51+
const _Promise = PromiseContainer.get();
52+
return new _Promise<NetStream>((resolve, reject) => {
53+
process.nextTick(() => {
54+
if (!this.connecting) {
55+
reject(new Error(CONNECTION_CLOSED_ERROR_MSG))
56+
return
57+
}
4958

50-
process.nextTick(() => {
51-
if (!this.connecting) {
52-
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
53-
return
54-
}
55-
56-
let stream: NetStream
57-
try {
58-
if (options.tls) {
59-
stream = createTLSConnection(connectionOptions)
60-
} else {
61-
stream = createConnection(connectionOptions)
59+
try {
60+
if (options.tls) {
61+
this.stream = createTLSConnection(connectionOptions)
62+
} else {
63+
this.stream = createConnection(connectionOptions)
64+
}
65+
} catch (err) {
66+
reject(err)
67+
return
6268
}
63-
} catch (err) {
64-
callback(err)
65-
return
66-
}
6769

68-
this.stream = stream
69-
callback(null, stream)
70+
resolve(this.stream)
71+
})
7072
})
7173
}
7274
}

lib/index.ts

+14-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
exports = module.exports = require('./redis').default
22

3-
export {ReplyError} from 'redis-errors'
4-
export const Cluster = require('./cluster').default
5-
export const Command = require('./command').default
6-
export const ScanStream = require('./ScanStream').default
7-
export const Pipeline = require('./pipeline').default
3+
export {default} from './redis';
4+
export {default as Cluster} from './cluster'
5+
export {default as Command} from './command'
6+
export {default as ScanStream} from './ScanStream'
7+
export {default as Pipeline} from './pipeline'
8+
export {default as AbstractConnector} from './connectors/AbstractConnector'
9+
export {default as SentinelConnector, SentinelIterator} from './connectors/SentinelConnector'
10+
11+
// Type Exports
12+
export {ISentinelAddress} from './connectors/SentinelConnector'
13+
export {IRedisOptions} from './redis/RedisOptions'
14+
15+
// No TS typings
16+
export const ReplyError = require('redis-errors').ReplyError
817

918
const PromiseContainer = require('./promiseContainer')
1019
Object.defineProperty(exports, 'Promise', {

lib/redis/RedisOptions.ts

+2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import {ISentinelConnectionOptions} from '../connectors/SentinelConnector';
2+
import AbstractConnector from '../connectors/AbstractConnector';
23
import {IClusterOptions} from '../cluster/ClusterOptions';
34
import {ICommanderOptions} from '../commander';
45

56
export type ReconnectOnError = (err: Error) => boolean | 1 | 2;
67

78
export interface IRedisOptions extends Partial<ISentinelConnectionOptions>, Partial<ICommanderOptions>, Partial<IClusterOptions> {
9+
connector?: AbstractConnector,
810
retryStrategy?: (times: number) => number | void | null,
911
keepAlive?: number,
1012
noDelay?: boolean,

lib/redis/event_handler.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export function connectHandler(self) {
5454
}
5555
} else {
5656
self.serverInfo = info;
57-
if (self.connector.check(info)) {
57+
if (self.options.connector.check(info)) {
5858
exports.readyHandler(self)();
5959
} else {
6060
self.disconnect(true);

0 commit comments

Comments
 (0)