-
Notifications
You must be signed in to change notification settings - Fork 565
/
Copy pathstreaming-api-connection.js
343 lines (310 loc) · 12.5 KB
/
streaming-api-connection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var helpers = require('./helpers')
var Parser = require('./parser');
var request = require('request');
var STATUS_CODES_TO_ABORT_ON = require('./settings').STATUS_CODES_TO_ABORT_ON
var StreamingAPIConnection = function (reqOpts, twitOptions) {
this.reqOpts = reqOpts
this.twitOptions = twitOptions
this._twitter_time_minus_local_time_ms = 0
EventEmitter.call(this)
}
util.inherits(StreamingAPIConnection, EventEmitter)
/**
* Resets the connection.
* - clears request, response, parser
* - removes scheduled reconnect handle (if one was scheduled)
* - stops the stall abort timeout handle (if one was scheduled)
*/
StreamingAPIConnection.prototype._resetConnection = function () {
if (this.request) {
// clear our reference to the `request` instance
this.request.removeAllListeners();
this.request.destroy();
}
if (this.response) {
// clear our reference to the http.IncomingMessage instance
this.response.removeAllListeners();
this.response.destroy();
}
if (this.parser) {
this.parser.removeAllListeners()
}
// ensure a scheduled reconnect does not occur (if one was scheduled)
// this can happen if we get a close event before .stop() is called
clearTimeout(this._scheduledReconnect)
delete this._scheduledReconnect
// clear our stall abort timeout
this._stopStallAbortTimeout()
}
/**
* Resets the parameters used in determining the next reconnect time
*/
StreamingAPIConnection.prototype._resetRetryParams = function () {
// delay for next reconnection attempt
this._connectInterval = 0
// flag indicating whether we used a 0-delay reconnect
this._usedFirstReconnect = false
}
StreamingAPIConnection.prototype._startPersistentConnection = function () {
var self = this;
self._resetConnection();
self._setupParser();
self._resetStallAbortTimeout();
self._setOauthTimestamp();
this.reqOpts.encoding = 'utf8'
self.request = request.post(this.reqOpts);
self.emit('connect', self.request);
self.request.on('response', function (response) {
self._updateOauthTimestampOffsetFromResponse(response)
// reset our reconnection attempt flag so next attempt goes through with 0 delay
// if we get a transport-level error
self._usedFirstReconnect = false;
// start a stall abort timeout handle
self._resetStallAbortTimeout();
self.response = response
if (STATUS_CODES_TO_ABORT_ON.indexOf(self.response.statusCode) !== -1) {
// We got a status code telling us we should abort the connection.
// Read the body from the response and return an error to the user.
var body = '';
self.request.on('data', function (chunk) {
body += chunk;
})
self.request.on('end', function () {
try {
body = JSON.parse(body)
} catch (jsonDecodeError) {
// Twitter may send an HTML body
// if non-JSON text was returned, we'll just attach it to the error as-is
}
// surface the error to the user
var error = helpers.makeTwitError('Bad Twitter streaming request: ' + self.response.statusCode)
error.statusCode = response ? response.statusCode: null;
helpers.attachBodyInfoToError(error, body)
self.emit('error', error);
// stop the stream explicitly so we don't reconnect
self.stop()
body = null;
});
self.request.on('error', function (err) {
var twitErr = helpers.makeTwitError(err.message);
twitErr.statusCode = self.response.statusCode;
helpers.attachBodyInfoToError(twitErr, body);
self.emit('parser-error', twitErr);
});
} else if (self.response.statusCode === 420) {
// close the connection forcibly so a reconnect is scheduled by `self.onClose()`
self._scheduleReconnect();
} else {
// We got an OK status code - the response should be valid.
// Read the body from the response and return to the user.
//pass all response data to parser
self.request.on('data', function(data) {
self._connectInterval = 0
self._resetStallAbortTimeout();
self.parser.parse(data);
})
self.response.on('close', self._onClose.bind(self))
self.response.on('error', function (err) {
// expose response errors on twit instance
self.emit('error', err);
})
// connected without an error response from Twitter, emit `connected` event
// this must be emitted after all its event handlers are bound
// so the reference to `self.response` is not interfered-with by the user until it is emitted
self.emit('connected', self.response);
}
});
self.request.on('close', self._onClose.bind(self));
self.request.on('error', function (err) { self._scheduleReconnect.bind(self) });
return self;
}
/**
* Handle when the request or response closes.
* Schedule a reconnect according to Twitter's reconnect guidelines
*
*/
StreamingAPIConnection.prototype._onClose = function () {
var self = this;
self._stopStallAbortTimeout();
if (self._scheduledReconnect) {
// if we already have a reconnect scheduled, don't schedule another one.
// this race condition can happen if the http.ClientRequest and http.IncomingMessage both emit `close`
return
}
self._scheduleReconnect();
}
/**
* Kick off the http request, and persist the connection
*
*/
StreamingAPIConnection.prototype.start = function () {
this._resetRetryParams();
this._startPersistentConnection();
return this;
}
/**
* Abort the http request, stop scheduled reconnect (if one was scheduled) and clear state
*
*/
StreamingAPIConnection.prototype.stop = function () {
// clear connection variables and timeout handles
this._resetConnection();
this._resetRetryParams();
return this;
}
/**
* Stop and restart the stall abort timer (called when new data is received)
*
* If we go 90s without receiving data from twitter, we abort the request & reconnect.
*/
StreamingAPIConnection.prototype._resetStallAbortTimeout = function () {
var self = this;
// stop the previous stall abort timer
self._stopStallAbortTimeout();
//start a new 90s timeout to trigger a close & reconnect if no data received
self._stallAbortTimeout = setTimeout(function () {
self._scheduleReconnect()
}, 90000);
return this;
}
/**
* Stop stall timeout
*
*/
StreamingAPIConnection.prototype._stopStallAbortTimeout = function () {
clearTimeout(this._stallAbortTimeout);
// mark the timer as `null` so it is clear via introspection that the timeout is not scheduled
delete this._stallAbortTimeout;
return this;
}
/**
* Computes the next time a reconnect should occur (based on the last HTTP response received)
* and starts a timeout handle to begin reconnecting after `self._connectInterval` passes.
*
* @return {Undefined}
*/
StreamingAPIConnection.prototype._scheduleReconnect = function () {
var self = this;
if (self.response && self.response.statusCode === 420) {
// we are being rate limited
// start with a 1 minute wait and double each attempt
if (!self._connectInterval) {
self._connectInterval = 60000;
} else {
self._connectInterval *= 2;
}
} else if (self.response && String(self.response.statusCode).charAt(0) === '5') {
// twitter 5xx errors
// start with a 5s wait, double each attempt up to 320s
if (!self._connectInterval) {
self._connectInterval = 5000;
} else if (self._connectInterval < 320000) {
self._connectInterval *= 2;
} else {
self._connectInterval = 320000;
}
} else {
// we did not get an HTTP response from our last connection attempt.
// DNS/TCP error, or a stall in the stream (and stall timer closed the connection)
if (!self._usedFirstReconnect) {
// first reconnection attempt on a valid connection should occur immediately
self._connectInterval = 0;
self._usedFirstReconnect = true;
} else if (self._connectInterval < 16000) {
// linearly increase delay by 250ms up to 16s
self._connectInterval += 250;
} else {
// cap out reconnect interval at 16s
self._connectInterval = 16000;
}
}
// schedule the reconnect
self._scheduledReconnect = setTimeout(function () {
self._startPersistentConnection();
}, self._connectInterval);
self.emit('reconnect', self.request, self.response, self._connectInterval);
}
StreamingAPIConnection.prototype._setupParser = function () {
var self = this
self.parser = new Parser()
// handle twitter objects as they come in - emit the generic `message` event
// along with the specific event corresponding to the message
self.parser.on('element', function (msg) {
self.emit('message', msg)
if (msg.delete) { self.emit('delete', msg) }
else if (msg.disconnect) { self._handleDisconnect(msg) }
else if (msg.limit) { self.emit('limit', msg) }
else if (msg.scrub_geo) { self.emit('scrub_geo', msg) }
else if (msg.warning) { self.emit('warning', msg) }
else if (msg.status_withheld) { self.emit('status_withheld', msg) }
else if (msg.user_withheld) { self.emit('user_withheld', msg) }
else if (msg.friends || msg.friends_str) { self.emit('friends', msg) }
else if (msg.direct_message) { self.emit('direct_message', msg) }
else if (msg.event) {
self.emit('user_event', msg)
// reference: https://dev.twitter.com/docs/streaming-apis/messages#User_stream_messages
var ev = msg.event
if (ev === 'blocked') { self.emit('blocked', msg) }
else if (ev === 'unblocked') { self.emit('unblocked', msg) }
else if (ev === 'favorite') { self.emit('favorite', msg) }
else if (ev === 'unfavorite') { self.emit('unfavorite', msg) }
else if (ev === 'follow') { self.emit('follow', msg) }
else if (ev === 'unfollow') { self.emit('unfollow', msg) }
else if (ev === 'mute') { self.emit('mute', msg) }
else if (ev === 'unmute') { self.emit('unmute', msg) }
else if (ev === 'user_update') { self.emit('user_update', msg) }
else if (ev === 'list_created') { self.emit('list_created', msg) }
else if (ev === 'list_destroyed') { self.emit('list_destroyed', msg) }
else if (ev === 'list_updated') { self.emit('list_updated', msg) }
else if (ev === 'list_member_added') { self.emit('list_member_added', msg) }
else if (ev === 'list_member_removed') { self.emit('list_member_removed', msg) }
else if (ev === 'list_user_subscribed') { self.emit('list_user_subscribed', msg) }
else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) }
else if (ev === 'quoted_tweet') { self.emit('quoted_tweet', msg) }
else if (ev === 'favorited_retweet') { self.emit('favorited_retweet', msg) }
else if (ev === 'retweeted_retweet') { self.emit('retweeted_retweet', msg) }
else { self.emit('unknown_user_event', msg) }
} else { self.emit('tweet', msg) }
})
self.parser.on('error', function (err) {
self.emit('parser-error', err)
});
self.parser.on('connection-limit-exceeded', function (err) {
self.emit('error', err);
})
}
StreamingAPIConnection.prototype._handleDisconnect = function (twitterMsg) {
this.emit('disconnect', twitterMsg);
this.stop();
}
/**
* Call whenever an http request is about to be made to update
* our local timestamp (used for Oauth) to be Twitter's server time.
*
*/
StreamingAPIConnection.prototype._setOauthTimestamp = function () {
var self = this;
if (self.reqOpts.oauth) {
var oauth_ts = Date.now() + self._twitter_time_minus_local_time_ms;
self.reqOpts.oauth.timestamp = Math.floor(oauth_ts/1000).toString();
}
}
/**
* Call whenever an http response is received from Twitter,
* to set our local timestamp offset from Twitter's server time.
* This is used to set the Oauth timestamp for our next http request
* to Twitter (by calling _setOauthTimestamp).
*
* @param {http.IncomingResponse} resp http response received from Twitter.
*/
StreamingAPIConnection.prototype._updateOauthTimestampOffsetFromResponse = function (resp) {
if (resp && resp.headers && resp.headers.date &&
new Date(resp.headers.date).toString() !== 'Invalid Date'
) {
var twitterTimeMs = new Date(resp.headers.date).getTime()
this._twitter_time_minus_local_time_ms = twitterTimeMs - Date.now();
}
}
module.exports = StreamingAPIConnection