Skip to content

Commit d8d3174

Browse files
aoprisanalexandru
authored andcommitted
Issue monix#42 - switch operator WIP
1 parent 3aa3eed commit d8d3174

File tree

5 files changed

+176
-4
lines changed

5 files changed

+176
-4
lines changed

shared/src/main/scala/monifu/concurrent/cancelables/RefCountCancelable.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@ final class RefCountCancelable private (onCancel: () => Unit) extends BooleanCan
3131
state.get.isCanceled
3232

3333
@tailrec
34-
def acquire(): Cancelable = {
34+
def acquire(): BooleanCancelable = {
3535
val oldState = state.get
3636
if (oldState.isCanceled)
3737
BooleanCancelable.alreadyCanceled
3838
else if (!state.compareAndSet(oldState, oldState.copy(activeCounter = oldState.activeCounter + 1)))
3939
acquire()
4040
else
41-
Cancelable {
41+
BooleanCancelable {
4242
val newState = state.transformAndGet(s => s.copy(activeCounter = s.activeCounter - 1))
4343
if (newState.activeCounter == 0 && newState.isCanceled)
4444
onCancel()

shared/src/main/scala/monifu/reactive/Observable.scala

+7
Original file line numberDiff line numberDiff line change
@@ -1168,6 +1168,13 @@ trait Observable[+T] { self =>
11681168
def lift[U](f: Observable[T] => Observable[U]): Observable[U] =
11691169
f(self)
11701170

1171+
1172+
/**
1173+
*
1174+
*/
1175+
def switch[U](implicit ev: T <:< Observable[U]): Observable[U] =
1176+
operators.switch(self)
1177+
11711178
/**
11721179
* Returns the first generated result as a Future and then cancels
11731180
* the subscription.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (c) 2014-2015 Alexandru Nedelcu
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package monifu.reactive.operators
18+
19+
import monifu.concurrent.cancelables.{BooleanCancelable, RefCountCancelable}
20+
import monifu.reactive.Ack.{Cancel, Continue}
21+
import monifu.reactive._
22+
import monifu.reactive.internals._
23+
import monifu.reactive.observers.SynchronousObserver
24+
import scala.concurrent.Future
25+
26+
object switch {
27+
/**
28+
* Implementation for [[Observable.concat]].
29+
*/
30+
def apply[T, U](source: Observable[T])(implicit ev: T <:< Observable[U]): Observable[U] =
31+
Observable.create { subscriber:Subscriber[U] =>
32+
implicit val s = subscriber.scheduler
33+
val observerU = subscriber.observer
34+
35+
source.unsafeSubscribe(new SynchronousObserver[T] { self =>
36+
// Global subscription, is canceled by the downstream
37+
// observer and if canceled all streaming is supposed to stop
38+
private[this] val upstream = BooleanCancelable()
39+
40+
// MUST BE synchronized by `self`
41+
private[this] var ack: Future[Ack] = Continue
42+
// To be accessed only in `self.onNext`
43+
private[this] var current: BooleanCancelable = null
44+
45+
private[this] val refCount = RefCountCancelable {
46+
self.synchronized {
47+
ack.onContinueSignalComplete(observerU)
48+
}
49+
}
50+
51+
def onNext(childObservable: T) = {
52+
if (upstream.isCanceled) Cancel else {
53+
// canceling current observable in order to
54+
// start the new stream
55+
val activeSubscription = refCount.acquire()
56+
if (current != null) current.cancel()
57+
current = activeSubscription
58+
59+
childObservable.unsafeSubscribe(new Observer[U] {
60+
def onNext(elem: U) = self.synchronized {
61+
if (upstream.isCanceled || activeSubscription.isCanceled)
62+
Cancel
63+
else {
64+
ack = ack.onContinueStreamOnNext(observerU, elem)
65+
ack.ifCanceledDoCancel(upstream)
66+
ack
67+
}
68+
}
69+
70+
def onError(ex: Throwable): Unit =
71+
self.onError(ex)
72+
73+
def onComplete(): Unit = {
74+
// NOTE: we aren't sending this onComplete signal downstream to our observerU
75+
// we will eventually do that after all of them are complete
76+
activeSubscription.cancel()
77+
}
78+
})
79+
80+
Continue
81+
}
82+
}
83+
84+
def onError(ex: Throwable): Unit =
85+
self.synchronized {
86+
if (!upstream.isCanceled) {
87+
upstream.cancel()
88+
ack.onContinueSignalError(observerU, ex)
89+
}
90+
}
91+
92+
def onComplete(): Unit = {
93+
refCount.cancel()
94+
}
95+
})
96+
}
97+
}

shared/src/test/scala/monifu/reactive/operators/MergeManySuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ object MergeManySuite extends BaseOperatorSuite {
3131

3232
def observableInError(sourceCount: Int, ex: Throwable) = Some {
3333
val o = createObservableEndingInError(Observable.range(0, sourceCount), ex)
34-
.flatMap(i => Observable.fromIterable(Seq(i, i, i, i)))
34+
.mergeMap(i => Observable.fromIterable(Seq(i, i, i, i)))
3535
Sample(o, count(sourceCount), sum(sourceCount), Zero, Zero)
3636
}
3737

@@ -40,7 +40,7 @@ object MergeManySuite extends BaseOperatorSuite {
4040
}
4141

4242
def brokenUserCodeObservable(sourceCount: Int, ex: Throwable) = Some {
43-
val o = Observable.range(0, sourceCount).flatMap { i =>
43+
val o = Observable.range(0, sourceCount).mergeMap { i =>
4444
if (i == sourceCount-1)
4545
throw ex
4646
else
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) 2014-2015 Alexandru Nedelcu
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package monifu.reactive.operators
18+
19+
import monifu.reactive.{Ack, Observer, Observable}
20+
import scala.concurrent.Future
21+
import scala.concurrent.duration._
22+
import scala.language.postfixOps
23+
24+
object SwitchSuite extends BaseOperatorSuite {
25+
def createChild() = {
26+
Observable.interval(1.second).take(2) ++
27+
Observable.interval(1.second).drop(3)
28+
}
29+
30+
def observable(sourceCount: Int) = Some {
31+
val o = Observable.interval(2.seconds)
32+
.take(sourceCount)
33+
.map(i => (if (i < sourceCount-1) createChild() else Observable.interval(1.second)).take(sourceCount))
34+
.switch
35+
36+
val count = (sourceCount - 1) * 2 + sourceCount
37+
val sum = (sourceCount - 1) + (1 until sourceCount).sum
38+
Sample(o, count, sum, waitFirst, waitNext)
39+
}
40+
41+
def waitFirst = 0.seconds
42+
def waitNext = 1.second
43+
44+
def observableInError(sourceCount: Int, ex: Throwable) = Some {
45+
def createChild(i: Long) = {
46+
if (i < sourceCount-1)
47+
(Observable.interval(1.second).take(2) ++
48+
Observable.interval(1.second).drop(3)).doOnCanceled("CANCELED")
49+
else
50+
Observable.interval(1.second)
51+
.take(sourceCount)
52+
}
53+
54+
val source = Observable.interval(2.seconds).take(sourceCount)
55+
val endingInError = createObservableEndingInError(source, ex)
56+
val o = endingInError
57+
.map(i => createChild(i))
58+
.switch
59+
60+
val count = (sourceCount - 1) * 2
61+
val sum = sourceCount - 1
62+
63+
Sample(o, count, sum, waitFirst, waitNext)
64+
}
65+
66+
def brokenUserCodeObservable(sourceCount: Int, ex: Throwable) =
67+
None
68+
}

0 commit comments

Comments
 (0)