This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathwithlatestfrom.js
108 lines (89 loc) · 3.73 KB
/
withlatestfrom.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
function falseFactory() { return false; }
function argumentsToArray() {
var len = arguments.length, args = new Array(len);
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
return args;
}
var WithLatestFromObservable = (function(__super__) {
inherits(WithLatestFromObservable, __super__);
function WithLatestFromObservable(source, sources, resultSelector) {
this._s = source;
this._ss = sources;
this._cb = resultSelector;
__super__.call(this);
}
WithLatestFromObservable.prototype.subscribeCore = function (o) {
var len = this._ss.length;
var state = {
hasValue: arrayInitialize(len, falseFactory),
hasValueAll: false,
values: new Array(len)
};
var n = this._ss.length, subscriptions = new Array(n + 1);
for (var i = 0; i < n; i++) {
var other = this._ss[i], sad = new SingleAssignmentDisposable();
isPromise(other) && (other = observableFromPromise(other));
sad.setDisposable(other.subscribe(new WithLatestFromOtherObserver(o, i, state)));
subscriptions[i] = sad;
}
var outerSad = new SingleAssignmentDisposable();
outerSad.setDisposable(this._s.subscribe(new WithLatestFromSourceObserver(o, this._cb, state)));
subscriptions[n] = outerSad;
return new NAryDisposable(subscriptions);
};
return WithLatestFromObservable;
}(ObservableBase));
var WithLatestFromOtherObserver = (function (__super__) {
inherits(WithLatestFromOtherObserver, __super__);
function WithLatestFromOtherObserver(o, i, state) {
this._o = o;
this._i = i;
this._state = state;
__super__.call(this);
}
WithLatestFromOtherObserver.prototype.next = function (x) {
this._state.values[this._i] = x;
this._state.hasValue[this._i] = true;
this._state.hasValueAll = this._state.hasValue.every(identity);
};
WithLatestFromOtherObserver.prototype.error = function (e) {
this._o.onError(e);
};
WithLatestFromOtherObserver.prototype.completed = noop;
return WithLatestFromOtherObserver;
}(AbstractObserver));
var WithLatestFromSourceObserver = (function (__super__) {
inherits(WithLatestFromSourceObserver, __super__);
function WithLatestFromSourceObserver(o, cb, state) {
this._o = o;
this._cb = cb;
this._state = state;
__super__.call(this);
}
WithLatestFromSourceObserver.prototype.next = function (x) {
var allValues = [x].concat(this._state.values);
if (!this._state.hasValueAll) { return; }
var res = tryCatch(this._cb).apply(null, allValues);
if (res === errorObj) { return this._o.onError(res.e); }
this._o.onNext(res);
};
WithLatestFromSourceObserver.prototype.error = function (e) {
this._o.onError(e);
};
WithLatestFromSourceObserver.prototype.completed = function () {
this._o.onCompleted();
};
return WithLatestFromSourceObserver;
}(AbstractObserver));
/**
* Merges the specified observable sequences into one observable sequence by using the selector function only when the (first) source observable sequence produces an element.
* @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
observableProto.withLatestFrom = function () {
if (arguments.length === 0) { throw new Error('invalid arguments'); }
var len = arguments.length, args = new Array(len);
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
Array.isArray(args[0]) && (args = args[0]);
return new WithLatestFromObservable(this, args, resultSelector);
};