This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathretrywhen.js
88 lines (75 loc) · 2.48 KB
/
retrywhen.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
function repeat(value) {
return {
'@@iterator': function () {
return {
next: function () {
return { done: false, value: value };
}
};
}
};
}
var RetryWhenObservable = (function(__super__) {
function createDisposable(state) {
return {
isDisposed: false,
dispose: function () {
if (!this.isDisposed) {
this.isDisposed = true;
state.isDisposed = true;
}
}
};
}
function RetryWhenObservable(source, notifier) {
this.source = source;
this._notifier = notifier;
__super__.call(this);
}
inherits(RetryWhenObservable, __super__);
RetryWhenObservable.prototype.subscribeCore = function (o) {
var exceptions = new Subject(),
notifier = new Subject(),
handled = this._notifier(exceptions),
notificationDisposable = handled.subscribe(notifier);
var e = this.source['@@iterator']();
var state = { isDisposed: false },
lastError,
subscription = new SerialDisposable();
var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, recurse) {
if (state.isDisposed) { return; }
var currentItem = e.next();
if (currentItem.done) {
if (lastError) {
o.onError(lastError);
} else {
o.onCompleted();
}
return;
}
// Check if promise
var currentValue = currentItem.value;
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var outer = new SingleAssignmentDisposable();
var inner = new SingleAssignmentDisposable();
subscription.setDisposable(new BinaryDisposable(inner, outer));
outer.setDisposable(currentValue.subscribe(
function(x) { o.onNext(x); },
function (exn) {
inner.setDisposable(notifier.subscribe(recurse, function(ex) {
o.onError(ex);
}, function() {
o.onCompleted();
}));
exceptions.onNext(exn);
outer.dispose();
},
function() { o.onCompleted(); }));
});
return new NAryDisposable([notificationDisposable, subscription, cancelable, createDisposable(state)]);
};
return RetryWhenObservable;
}(ObservableBase));
observableProto.retryWhen = function (notifier) {
return new RetryWhenObservable(repeat(this), notifier);
};