Skip to content

Commit 40715df

Browse files
authored
Make RetryWhenMainSubscriber#onError's calls serial (#2499, Fixes #2488)
Since `#onError` is called by multiple `Publisher`s, we cannot use `FAIL_FAST` emit strategy and instead should retry non-serialized access.
1 parent 4495c59 commit 40715df

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

reactor-core/src/main/java/reactor/core/publisher/FluxRetryWhen.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,9 @@ public void onError(Throwable t) {
186186
produced(p);
187187
}
188188

189-
otherArbiter.request(1);
190-
191189
signaller.emitNext(this, Sinks.EmitFailureHandler.FAIL_FAST);
190+
// request after signalling, otherwise it may race
191+
otherArbiter.request(1);
192192
}
193193

194194
@Override

reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -1119,4 +1119,19 @@ public void retryWhenWithThrowableFunction() {
11191119
.verifyComplete();
11201120
}
11211121

1122+
@Test
1123+
void gh2488() {
1124+
for (int i = 0; i < 1_000; i++) {
1125+
AtomicInteger sourceHelper = new AtomicInteger();
1126+
Flux.just("hello")
1127+
.doOnNext(m -> {
1128+
if (sourceHelper.getAndIncrement() < 9) {
1129+
throw new RuntimeException("Boom!");
1130+
}
1131+
})
1132+
.retryWhen(Retry.fixedDelay(10, Duration.ofNanos(1)))
1133+
.blockFirst(Duration.ofSeconds(1));
1134+
}
1135+
}
1136+
11221137
}

0 commit comments

Comments
 (0)