This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathmergeall.js
73 lines (60 loc) · 2.09 KB
/
mergeall.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
var MergeAllObservable = (function (__super__) {
inherits(MergeAllObservable, __super__);
function MergeAllObservable(source) {
this.source = source;
__super__.call(this);
}
MergeAllObservable.prototype.subscribeCore = function (o) {
var g = new CompositeDisposable(), m = new SingleAssignmentDisposable();
g.add(m);
m.setDisposable(this.source.subscribe(new MergeAllObserver(o, g)));
return g;
};
return MergeAllObservable;
}(ObservableBase));
var MergeAllObserver = (function (__super__) {
function MergeAllObserver(o, g) {
this.o = o;
this.g = g;
this.done = false;
__super__.call(this);
}
inherits(MergeAllObserver, __super__);
MergeAllObserver.prototype.next = function(innerSource) {
var sad = new SingleAssignmentDisposable();
this.g.add(sad);
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
sad.setDisposable(innerSource.subscribe(new InnerObserver(this, sad)));
};
MergeAllObserver.prototype.error = function (e) {
this.o.onError(e);
};
MergeAllObserver.prototype.completed = function () {
this.done = true;
this.g.length === 1 && this.o.onCompleted();
};
function InnerObserver(parent, sad) {
this.parent = parent;
this.sad = sad;
__super__.call(this);
}
inherits(InnerObserver, __super__);
InnerObserver.prototype.next = function (x) {
this.parent.o.onNext(x);
};
InnerObserver.prototype.error = function (e) {
this.parent.o.onError(e);
};
InnerObserver.prototype.completed = function () {
this.parent.g.remove(this.sad);
this.parent.done && this.parent.g.length === 1 && this.parent.o.onCompleted();
};
return MergeAllObserver;
}(AbstractObserver));
/**
* Merges an observable sequence of observable sequences into an observable sequence.
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
*/
observableProto.mergeAll = function () {
return new MergeAllObservable(this);
};