@@ -2,16 +2,16 @@ import {EventEmitter} from 'events'
2
2
import ClusterAllFailedError from '../errors/ClusterAllFailedError'
3
3
import { defaults , noop } from '../utils/lodash'
4
4
import ConnectionPool from './ConnectionPool'
5
- import { NodeKey , IRedisOptions , normalizeNodeOptions , NodeRole } from './util'
5
+ import { NodeKey , IRedisOptions , normalizeNodeOptions , NodeRole , getUniqueHostnamesFromOptions } from './util'
6
6
import ClusterSubscriber from './ClusterSubscriber'
7
7
import DelayQueue from './DelayQueue'
8
8
import ScanStream from '../ScanStream'
9
- import { AbortError } from 'redis-errors'
9
+ import { AbortError , RedisError } from 'redis-errors'
10
10
import * as asCallback from 'standard-as-callback'
11
11
import * as PromiseContainer from '../promiseContainer'
12
12
import { CallbackFunction } from '../types' ;
13
13
import { IClusterOptions , DEFAULT_CLUSTER_OPTIONS } from './ClusterOptions'
14
- import { sample , CONNECTION_CLOSED_ERROR_MSG , shuffle , timeout } from '../utils'
14
+ import { sample , CONNECTION_CLOSED_ERROR_MSG , shuffle , timeout , zipMap } from '../utils'
15
15
import * as commands from 'redis-commands'
16
16
17
17
const Deque = require ( 'denque' )
@@ -30,7 +30,7 @@ type ClusterStatus = 'end' | 'close' | 'wait' | 'connecting' | 'connect' | 'read
30
30
*/
31
31
class Cluster extends EventEmitter {
32
32
private options : IClusterOptions
33
- private startupNodes : IRedisOptions [ ]
33
+ private startupNodes : Array < string | number | object >
34
34
private connectionPool : ConnectionPool
35
35
private slots : Array < NodeKey [ ] > = [ ]
36
36
private manuallyClosing : boolean
@@ -43,6 +43,18 @@ class Cluster extends EventEmitter {
43
43
private status : ClusterStatus
44
44
private isRefreshing : boolean = false
45
45
46
+ /**
47
+ * Every time Cluster#connect() is called, this value will be
48
+ * auto-incrementing. The purpose of this value is used for
49
+ * discarding previous connect attampts when creating a new
50
+ * connection.
51
+ *
52
+ * @private
53
+ * @type {number }
54
+ * @memberof Cluster
55
+ */
56
+ private connectionEpoch : number = 0
57
+
46
58
/**
47
59
* Creates an instance of Cluster.
48
60
*
@@ -54,7 +66,7 @@ class Cluster extends EventEmitter {
54
66
super ( )
55
67
Commander . call ( this )
56
68
57
- this . startupNodes = normalizeNodeOptions ( startupNodes )
69
+ this . startupNodes = startupNodes
58
70
this . options = defaults ( this . options , options , DEFAULT_CLUSTER_OPTIONS )
59
71
60
72
// validate options
@@ -117,59 +129,68 @@ class Cluster extends EventEmitter {
117
129
reject ( new Error ( 'Redis is already connecting/connected' ) )
118
130
return
119
131
}
132
+ const epoch = ++ this . connectionEpoch
120
133
this . setStatus ( 'connecting' )
121
134
122
- if ( ! Array . isArray ( this . startupNodes ) || this . startupNodes . length === 0 ) {
123
- throw new Error ( '`startupNodes` should contain at least one node.' )
124
- }
125
-
126
- this . connectionPool . reset ( this . startupNodes )
127
-
128
- function readyHandler ( ) {
129
- this . setStatus ( 'ready' )
130
- this . retryAttempts = 0
131
- this . executeOfflineCommands ( )
132
- this . resetNodesRefreshInterval ( )
133
- resolve ( )
134
- }
135
+ this . resolveStartupNodeHostnames ( ) . then ( ( nodes ) => {
136
+ if ( this . connectionEpoch !== epoch ) {
137
+ debug ( 'discard connecting after resolving startup nodes because epoch not match: %d != %d' , epoch , this . connectionEpoch )
138
+ reject ( new RedisError ( 'Connection is discarded because a new connection is made' ) )
139
+ return
140
+ }
141
+ if ( this . status !== 'connecting' ) {
142
+ debug ( 'discard connecting after resolving startup nodes because the status changed to %s' , this . status )
143
+ reject ( new RedisError ( 'Connection is aborted' ) )
144
+ return
145
+ }
146
+ this . connectionPool . reset ( nodes )
147
+
148
+ function readyHandler ( ) {
149
+ this . setStatus ( 'ready' )
150
+ this . retryAttempts = 0
151
+ this . executeOfflineCommands ( )
152
+ this . resetNodesRefreshInterval ( )
153
+ resolve ( )
154
+ }
135
155
136
- let closeListener : ( ) => void
137
- const refreshListener = ( ) => {
138
- this . removeListener ( 'close' , closeListener )
139
- this . manuallyClosing = false
140
- this . setStatus ( 'connect' )
141
- if ( this . options . enableReadyCheck ) {
142
- this . readyCheck ( ( err , fail ) => {
143
- if ( err || fail ) {
144
- debug ( 'Ready check failed (%s). Reconnecting...' , err || fail )
145
- if ( this . status === 'connect' ) {
146
- this . disconnect ( true )
156
+ let closeListener : ( ) => void
157
+ const refreshListener = ( ) => {
158
+ this . removeListener ( 'close' , closeListener )
159
+ this . manuallyClosing = false
160
+ this . setStatus ( 'connect' )
161
+ if ( this . options . enableReadyCheck ) {
162
+ this . readyCheck ( ( err , fail ) => {
163
+ if ( err || fail ) {
164
+ debug ( 'Ready check failed (%s). Reconnecting...' , err || fail )
165
+ if ( this . status === 'connect' ) {
166
+ this . disconnect ( true )
167
+ }
168
+ } else {
169
+ readyHandler . call ( this )
147
170
}
148
- } else {
149
- readyHandler . call ( this )
150
- }
151
- } )
152
- } else {
153
- readyHandler . call ( this )
171
+ } )
172
+ } else {
173
+ readyHandler . call ( this )
174
+ }
154
175
}
155
- }
156
176
157
- closeListener = function ( ) {
158
- this . removeListener ( 'refresh' , refreshListener )
159
- reject ( new Error ( 'None of startup nodes is available' ) )
160
- }
177
+ closeListener = function ( ) {
178
+ this . removeListener ( 'refresh' , refreshListener )
179
+ reject ( new Error ( 'None of startup nodes is available' ) )
180
+ }
161
181
162
- this . once ( 'refresh' , refreshListener )
163
- this . once ( 'close' , closeListener )
164
- this . once ( 'close' , this . handleCloseEvent . bind ( this ) )
182
+ this . once ( 'refresh' , refreshListener )
183
+ this . once ( 'close' , closeListener )
184
+ this . once ( 'close' , this . handleCloseEvent . bind ( this ) )
165
185
166
- this . refreshSlotsCache ( function ( err ) {
167
- if ( err && err . message === 'Failed to refresh slots cache.' ) {
168
- Redis . prototype . silentEmit . call ( this , 'error' , err )
169
- this . connectionPool . reset ( [ ] )
170
- }
171
- } . bind ( this ) )
172
- this . subscriber . start ( )
186
+ this . refreshSlotsCache ( function ( err ) {
187
+ if ( err && err . message === 'Failed to refresh slots cache.' ) {
188
+ Redis . prototype . silentEmit . call ( this , 'error' , err )
189
+ this . connectionPool . reset ( [ ] )
190
+ }
191
+ } . bind ( this ) )
192
+ this . subscriber . start ( )
193
+ } ) . catch ( reject )
173
194
} )
174
195
}
175
196
@@ -639,6 +660,51 @@ class Cluster extends EventEmitter {
639
660
}
640
661
} )
641
662
}
663
+
664
+ private dnsLookup ( hostname : string ) : Promise < string > {
665
+ return new Promise ( ( resolve , reject ) => {
666
+ this . options . dnsLookup ( hostname , ( err , address ) => {
667
+ if ( err ) {
668
+ debug ( 'failed to resolve hostname %s to IP: %s' , hostname , err . message )
669
+ reject ( err )
670
+ } else {
671
+ debug ( 'resolved hostname %s to IP %s' , hostname , address )
672
+ resolve ( address )
673
+ }
674
+ } )
675
+ } ) ;
676
+ }
677
+
678
+ /**
679
+ * Normalize startup nodes, and resolving hostnames to IPs.
680
+ *
681
+ * This process happens every time when #connect() is called since
682
+ * #startupNodes and DNS records may chanage.
683
+ *
684
+ * @private
685
+ * @returns {Promise<IRedisOptions[]> }
686
+ */
687
+ private resolveStartupNodeHostnames ( ) : Promise < IRedisOptions [ ] > {
688
+ if ( ! Array . isArray ( this . startupNodes ) || this . startupNodes . length === 0 ) {
689
+ return Promise . reject ( new Error ( '`startupNodes` should contain at least one node.' ) )
690
+ }
691
+ const startupNodes = normalizeNodeOptions ( this . startupNodes )
692
+
693
+ const hostnames = getUniqueHostnamesFromOptions ( startupNodes )
694
+ if ( hostnames . length === 0 ) {
695
+ return Promise . resolve ( startupNodes )
696
+ }
697
+
698
+ return Promise . all ( hostnames . map ( ( hostname ) => this . dnsLookup ( hostname ) ) ) . then ( ( ips ) => {
699
+ const hostnameToIP = zipMap ( hostnames , ips )
700
+
701
+ return startupNodes . map ( ( node ) => (
702
+ hostnameToIP . has ( node . host )
703
+ ? Object . assign ( { } , node , { host : hostnameToIP . get ( node . host ) } )
704
+ : node
705
+ ) )
706
+ } )
707
+ }
642
708
}
643
709
644
710
Object . getOwnPropertyNames ( Commander . prototype ) . forEach ( name => {
0 commit comments