Skip to content

Commit 30fc66a

Browse files
committed
Introduce trySend and tryReceive channel operations as a future replacement for error-prone offer, poll and receiveOrNull
Fixes #974
1 parent dbd7274 commit 30fc66a

File tree

14 files changed

+143
-57
lines changed

14 files changed

+143
-57
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+20-1
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx
555555

556556
public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
557557
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
558+
public static fun poll (Lkotlinx/coroutines/channels/ActorScope;)Ljava/lang/Object;
558559
}
559560

560561
public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
@@ -566,6 +567,7 @@ public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : k
566567
public final class kotlinx/coroutines/channels/BroadcastChannel$DefaultImpls {
567568
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
568569
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
570+
public static fun offer (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Object;)Z
569571
}
570572

571573
public final class kotlinx/coroutines/channels/BroadcastChannelKt {
@@ -598,6 +600,8 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co
598600

599601
public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
600602
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
603+
public static fun offer (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Object;)Z
604+
public static fun poll (Lkotlinx/coroutines/channels/Channel;)Ljava/lang/Object;
601605
}
602606

603607
public final class kotlinx/coroutines/channels/Channel$Factory {
@@ -628,7 +632,7 @@ public final class kotlinx/coroutines/channels/ChannelKt {
628632
public final class kotlinx/coroutines/channels/ChannelResult {
629633
public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion;
630634
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult;
631-
public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object;
635+
public static fun constructor-impl (Ljava/lang/Object;)Ljava/lang/Object;
632636
public fun equals (Ljava/lang/Object;)Z
633637
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
634638
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
@@ -645,6 +649,12 @@ public final class kotlinx/coroutines/channels/ChannelResult {
645649
public final synthetic fun unbox-impl ()Ljava/lang/Object;
646650
}
647651

652+
public final class kotlinx/coroutines/channels/ChannelResult$Companion {
653+
public final fun closed-JP2dKIU (Ljava/lang/Throwable;)Ljava/lang/Object;
654+
public final fun failure-PtdJZtk ()Ljava/lang/Object;
655+
public final fun success-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
656+
}
657+
648658
public final class kotlinx/coroutines/channels/ChannelsKt {
649659
public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
650660
public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -789,6 +799,7 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
789799
public fun offer (Ljava/lang/Object;)Z
790800
public fun openSubscription ()Lkotlinx/coroutines/channels/ReceiveChannel;
791801
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
802+
public fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
792803
}
793804

794805
public final class kotlinx/coroutines/channels/ProduceKt {
@@ -804,6 +815,10 @@ public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotl
804815
public abstract fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
805816
}
806817

818+
public final class kotlinx/coroutines/channels/ProducerScope$DefaultImpls {
819+
public static fun offer (Lkotlinx/coroutines/channels/ProducerScope;Ljava/lang/Object;)Z
820+
}
821+
807822
public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
808823
public abstract synthetic fun cancel ()V
809824
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
@@ -818,12 +833,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
818833
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
819834
public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
820835
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
836+
public abstract fun tryReceive-PtdJZtk ()Ljava/lang/Object;
821837
}
822838

823839
public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
824840
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
825841
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
826842
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
843+
public static fun poll (Lkotlinx/coroutines/channels/ReceiveChannel;)Ljava/lang/Object;
827844
}
828845

829846
public abstract interface class kotlinx/coroutines/channels/SendChannel {
@@ -834,10 +851,12 @@ public abstract interface class kotlinx/coroutines/channels/SendChannel {
834851
public abstract fun isFull ()Z
835852
public abstract fun offer (Ljava/lang/Object;)Z
836853
public abstract fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
854+
public abstract fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
837855
}
838856

839857
public final class kotlinx/coroutines/channels/SendChannel$DefaultImpls {
840858
public static synthetic fun close$default (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
859+
public static fun offer (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Z
841860
}
842861

843862
public final class kotlinx/coroutines/channels/TickerChannelsKt {

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+32-11
Original file line numberDiff line numberDiff line change
@@ -137,23 +137,42 @@ internal abstract class AbstractSendChannel<E>(
137137
return sendSuspend(element)
138138
}
139139

140-
public final override fun offer(element: E): Boolean {
140+
override fun offer(element: E): Boolean {
141+
try {
142+
return super.offer(element)
143+
} catch (e: Throwable) {
144+
onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
145+
// If it crashes, add send exception as suppressed for better diagnostics
146+
it.addSuppressed(e)
147+
throw it
148+
}
149+
throw e
150+
}
151+
}
152+
153+
public final override fun trySend(element: E): ChannelResult<Unit> {
141154
val result = offerInternal(element)
142155
return when {
143-
result === OFFER_SUCCESS -> true
156+
result === OFFER_SUCCESS -> ChannelResult.success(Unit)
144157
result === OFFER_FAILED -> {
145-
// We should check for closed token on offer as well, otherwise offer won't be linearizable
158+
// We should check for closed token on trySend as well, otherwise trySend won't be linearizable
146159
// in the face of concurrent close()
147160
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
148-
throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
161+
val closedForSend = closedForSend ?: return ChannelResult.failure()
162+
ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
149163
}
150164
result is Closed<*> -> {
151-
throw recoverStackTrace(helpCloseAndGetSendException(element, result))
165+
ChannelResult.closed(helpCloseAndGetSendException(result))
152166
}
153-
else -> error("offerInternal returned $result")
167+
else -> error("trySend returned $result")
154168
}
155169
}
156170

171+
private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
172+
helpClose(closed)
173+
return closed.sendException
174+
}
175+
157176
private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
158177
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
159178
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
@@ -632,9 +651,11 @@ internal abstract class AbstractChannel<E>(
632651
}
633652

634653
@Suppress("UNCHECKED_CAST")
635-
public final override fun poll(): E? {
654+
public final override fun tryReceive(): ChannelResult<E> {
636655
val result = pollInternal()
637-
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
656+
if (result === POLL_FAILED) return ChannelResult.failure()
657+
if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
658+
return ChannelResult.success(result as E)
638659
}
639660

640661
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
@@ -905,7 +926,7 @@ internal abstract class AbstractChannel<E>(
905926
@JvmField val receiveMode: Int
906927
) : Receive<E>() {
907928
fun resumeValue(value: E): Any? = when (receiveMode) {
908-
RECEIVE_RESULT -> ChannelResult.value(value)
929+
RECEIVE_RESULT -> ChannelResult.success(value)
909930
else -> value
910931
}
911932

@@ -990,7 +1011,7 @@ internal abstract class AbstractChannel<E>(
9901011
@Suppress("UNCHECKED_CAST")
9911012
override fun completeResumeReceive(value: E) {
9921013
block.startCoroutineCancellable(
993-
if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value,
1014+
if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
9941015
select.completion,
9951016
resumeOnCancellationFun(value)
9961017
)
@@ -1144,7 +1165,7 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose
11441165

11451166
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
11461167
private inline fun <E> Any?.toResult(): ChannelResult<E> =
1147-
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E)
1168+
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
11481169

11491170
@Suppress("NOTHING_TO_INLINE")
11501171
private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)

kotlinx-coroutines-core/common/src/channels/Channel.kt

+58-24
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,22 @@ public interface SendChannel<in E> {
8585
* then it calls `onUndeliveredElement` before throwing an exception.
8686
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
8787
*/
88-
public fun offer(element: E): Boolean
88+
public fun offer(element: E): Boolean {
89+
val result = trySend(element)
90+
if (result.isSuccess) return true
91+
throw recoverStackTrace(result.exceptionOrNull() ?: return false)
92+
}
93+
94+
/**
95+
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
96+
* and returns the successful result. Otherwise, returns failed or closed result.
97+
* This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
98+
*
99+
* When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
100+
* it does not call `onUndeliveredElement` that was installed for this channel.
101+
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
102+
*/
103+
public fun trySend(element: E): ChannelResult<Unit>
89104

90105
/**
91106
* Closes this channel.
@@ -218,7 +233,7 @@ public interface ReceiveChannel<out E> {
218233
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
219234
@LowPriorityInOverloadResolution
220235
@Deprecated(
221-
message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
236+
message = "Deprecated in favor of receiveCatching and receiveOrNull extension",
222237
level = DeprecationLevel.WARNING,
223238
replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull")
224239
)
@@ -230,13 +245,13 @@ public interface ReceiveChannel<out E> {
230245
* [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with
231246
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
232247
*
233-
* @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension.
248+
* @suppress **Deprecated**: in favor of receiveCatching and onReceiveOrNull extension.
234249
*/
235250
@ObsoleteCoroutinesApi
236251
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
237252
@LowPriorityInOverloadResolution
238253
@Deprecated(
239-
message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
254+
message = "Deprecated in favor of receiveCatching and onReceiveOrNull extension",
240255
level = DeprecationLevel.WARNING,
241256
replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull")
242257
)
@@ -251,7 +266,7 @@ public interface ReceiveChannel<out E> {
251266
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
252267
* function is suspended, this function immediately resumes with a [CancellationException].
253268
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
254-
* suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel,
269+
* suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel,
255270
* but then throw [CancellationException], thus failing to deliver the element.
256271
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
257272
*
@@ -271,11 +286,22 @@ public interface ReceiveChannel<out E> {
271286
public val onReceiveCatching: SelectClause1<ChannelResult<E>>
272287

273288
/**
274-
* Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
289+
* Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty
275290
* or is [is closed for `receive`][isClosedForReceive] without a cause.
276291
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
277292
*/
278-
public fun poll(): E?
293+
public fun poll(): E? {
294+
val result = tryReceive()
295+
if (result.isSuccess) return result.getOrThrow()
296+
throw recoverStackTrace(result.exceptionOrNull() ?: return null)
297+
}
298+
299+
/**
300+
* Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
301+
* result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
302+
* result if the channel is closed.
303+
*/
304+
public fun tryReceive(): ChannelResult<E>
279305

280306
/**
281307
* Returns a new iterator to receive elements from this channel using a `for` loop.
@@ -315,35 +341,35 @@ public interface ReceiveChannel<out E> {
315341

316342
/**
317343
* A discriminated union of channel operation result.
318-
* It encapsulates successful or failed result of a channel operation, or a failed operation to a closed channel with
344+
* It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
319345
* an optional cause.
320346
*
321-
* Successful result represents a successful operation with value of type [T], for example, result of [Channel.receiveCatching]
322-
* operation or a successfully sent element as a result of [Channel.trySend].
347+
* The successful result represents a successful operation with a value of type [T], for example,
348+
* the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
323349
*
324-
* Failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
350+
* The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
325351
* E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
326352
*
327-
* Closed result represents an operation attempt to a closed channel and also implies that the operation was failed.
353+
* The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
328354
*/
329355
@Suppress("UNCHECKED_CAST")
330356
public inline class ChannelResult<out T>
331-
internal constructor(private val holder: Any?) {
357+
@PublishedApi internal constructor(private val holder: Any?) {
332358
/**
333359
* Returns `true` if this instance represents a successful
334360
* operation outcome.
335361
*
336-
* In this case [isFailure] and [isClosed] return false.
362+
* In this case [isFailure] and [isClosed] return `false`.
337363
*/
338-
public val isSuccess: Boolean get() = holder !is Closed
364+
public val isSuccess: Boolean get() = holder !is Failed
339365

340366
/**
341-
* Returns true if this instance represents unsuccessful operation.
367+
* Returns `true` if this instance represents unsuccessful operation.
342368
*
343369
* In this case [isSuccess] returns false, but it does not imply
344370
* that the channel is failed or closed.
345371
*
346-
* Example of failed operation without an exception and channel being closed
372+
* Example of a failed operation without an exception and channel being closed
347373
* is [Channel.trySend] attempt to a channel that is full.
348374
*/
349375
public val isFailure: Boolean get() = holder is Failed
@@ -352,7 +378,7 @@ internal constructor(private val holder: Any?) {
352378
* Returns `true` if this instance represents unsuccessful operation
353379
* to a closed or cancelled channel.
354380
*
355-
* In this case [isSuccess] returns false, [isFailure] returns `true`, but it does not imply
381+
* In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
356382
* that [exceptionOrNull] returns non-null value.
357383
*
358384
* It can happen if the channel was [closed][Channel.close] normally without an exception.
@@ -374,7 +400,7 @@ internal constructor(private val holder: Any?) {
374400
}
375401

376402
/**
377-
* Returns the encapsulated exception if this instance represents failure or null if it is success
403+
* Returns the encapsulated exception if this instance represents failure or `null` if it is success
378404
* or unsuccessful operation to closed channel.
379405
*/
380406
public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
@@ -389,13 +415,21 @@ internal constructor(private val holder: Any?) {
389415
override fun toString(): String = "Closed($cause)"
390416
}
391417

392-
internal companion object {
393-
@Suppress("NOTHING_TO_INLINE")
394-
internal inline fun <E> value(value: E): ChannelResult<E> =
418+
@Suppress("NOTHING_TO_INLINE")
419+
@InternalCoroutinesApi
420+
public companion object {
421+
private val failed = Failed()
422+
423+
@InternalCoroutinesApi
424+
public fun <E> success(value: E): ChannelResult<E> =
395425
ChannelResult(value)
396426

397-
@Suppress("NOTHING_TO_INLINE")
398-
internal inline fun <E> closed(cause: Throwable?): ChannelResult<E> =
427+
@InternalCoroutinesApi
428+
public fun <E> failure(): ChannelResult<E> =
429+
ChannelResult(failed)
430+
431+
@InternalCoroutinesApi
432+
public fun <E> closed(cause: Throwable?): ChannelResult<E> =
399433
ChannelResult(Closed(cause))
400434
}
401435

0 commit comments

Comments
 (0)