16
16
17
17
package monifu .reactive .operators
18
18
19
- import monifu .concurrent .cancelables .{BooleanCancelable , RefCountCancelable }
19
+ import monifu .concurrent .cancelables .{RefCountCancelable , SerialCancelable }
20
20
import monifu .reactive .Ack .{Cancel , Continue }
21
21
import monifu .reactive ._
22
22
import monifu .reactive .internals ._
23
23
import monifu .reactive .observers .SynchronousObserver
24
+ import scala .collection .mutable
24
25
import scala .concurrent .Future
25
26
26
27
object switch {
27
28
/**
28
29
* Implementation for [[Observable.concat ]].
29
30
*/
30
- def apply [T , U ](source : Observable [T ])(implicit ev : T <:< Observable [U ]): Observable [U ] =
31
+ def apply [T , U ](source : Observable [T ], delayErrors : Boolean )
32
+ (implicit ev : T <:< Observable [U ]): Observable [U ] = {
33
+
31
34
Observable .create { subscriber: Subscriber [U ] =>
32
35
implicit val s = subscriber.scheduler
33
36
val observerU = subscriber.observer
34
37
35
38
source.unsafeSubscribe(new SynchronousObserver [T ] { self =>
36
39
// Global subscription, is canceled by the downstream
37
40
// observer and if canceled all streaming is supposed to stop
38
- private [this ] val upstream = BooleanCancelable ()
41
+ private [this ] val upstream = SerialCancelable ()
39
42
40
43
// MUST BE synchronized by `self`
41
44
private [this ] var ack : Future [Ack ] = Continue
42
- // To be accessed only in `self.onNext`
43
- private [this ] var current : BooleanCancelable = null
45
+ // MUST BE synchronized by `self`
46
+ private [this ] val errors = if (delayErrors)
47
+ mutable.ArrayBuffer .empty[Throwable ] else null
44
48
45
49
private [this ] val refCount = RefCountCancelable {
46
50
self.synchronized {
47
- ack.onContinueSignalComplete(observerU)
51
+ if (delayErrors && errors.nonEmpty) {
52
+ ack.onContinueSignalError(observerU, CompositeException (errors))
53
+ errors.clear()
54
+ }
55
+ else {
56
+ ack.onContinueSignalComplete(observerU)
57
+ }
48
58
}
49
59
}
50
60
51
61
def onNext (childObservable : T ) = {
52
62
if (upstream.isCanceled) Cancel else {
53
63
// canceling current observable in order to
54
64
// start the new stream
55
- val activeSubscription = refCount.acquire()
56
- if (current != null ) current.cancel()
57
- current = activeSubscription
65
+ val refID = refCount.acquire()
66
+ upstream := refID
58
67
59
68
childObservable.unsafeSubscribe(new Observer [U ] {
60
69
def onNext (elem : U ) = self.synchronized {
61
- if (upstream.isCanceled || activeSubscription.isCanceled)
62
- Cancel
63
- else {
70
+ if (refID.isCanceled) Cancel else {
64
71
ack = ack.onContinueStreamOnNext(observerU, elem)
65
72
ack.ifCanceledDoCancel(upstream)
66
- ack
67
73
}
68
74
}
69
75
70
- def onError (ex : Throwable ): Unit =
71
- self.onError(ex)
76
+ def onError (ex : Throwable ): Unit = {
77
+ if (delayErrors) self.synchronized {
78
+ errors += ex
79
+ onComplete()
80
+ }
81
+ else {
82
+ self.onError(ex)
83
+ }
84
+ }
72
85
73
86
def onComplete (): Unit = {
74
- // NOTE: we aren't sending this onComplete signal downstream to our observerU
75
- // we will eventually do that after all of them are complete
76
- activeSubscription.cancel()
87
+ // NOTE: we aren't sending this onComplete signal downstream to
88
+ // our observerU we will eventually do that after all of them
89
+ // are complete
90
+ refID.cancel()
77
91
}
78
92
})
79
93
@@ -83,7 +97,11 @@ object switch {
83
97
84
98
def onError (ex : Throwable ): Unit =
85
99
self.synchronized {
86
- if (! upstream.isCanceled) {
100
+ if (delayErrors) {
101
+ errors += ex
102
+ onComplete()
103
+ }
104
+ else if (! upstream.isCanceled) {
87
105
upstream.cancel()
88
106
ack.onContinueSignalError(observerU, ex)
89
107
}
@@ -94,4 +112,5 @@ object switch {
94
112
}
95
113
})
96
114
}
115
+ }
97
116
}
0 commit comments