Skip to content

Commit 2ff31b3

Browse files
authored
Fix groupBy initial request off by one (#2450)
This commit fixes the following misalignment: 1. drainLoop in GroupByMain does s.request(e) when e groups are produced to the downstream. Each group at the moment has at least 1 element enqueued into it. 2. When UnicastGroupedFlux consumes elements for the first time, it requests the first element and does main.s.request(e) where e covers the first element 3. That said that GroupByMain fulfilled the demand for the first element and then UnicastGroupedFlux does the same for the second time. It leads that for each group we have 1 extra demand which then ends up with overflow. 4. To avoid that, this PR adds isFirstRequest check which allows removing that redundant demand for the first element on the first requestN Signed-off-by: Oleh Dokuka [email protected]
1 parent d367f08 commit 2ff31b3

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ public K key() {
508508
volatile boolean outputFused;
509509

510510
int produced;
511+
boolean isFirstRequest = true;
511512

512513
UnicastGroupedFlux(K key,
513514
Queue<V> queue,
@@ -565,7 +566,16 @@ void drainRegular(Subscriber<? super V> a) {
565566
if (e != 0) {
566567
GroupByMain<?, K, V> main = parent;
567568
if (main != null) {
568-
main.s.request(e);
569+
if (this.isFirstRequest) {
570+
this.isFirstRequest = false;
571+
long toRequest = e - 1;
572+
573+
if (toRequest > 0) {
574+
main.s.request(toRequest);
575+
}
576+
} else {
577+
main.s.request(e);
578+
}
569579
}
570580
if (r != Long.MAX_VALUE) {
571581
REQUESTED.addAndGet(this, -e);
@@ -744,7 +754,16 @@ void tryReplenish() {
744754
produced = 0;
745755
GroupByMain<?, K, V> main = parent;
746756
if (main != null) {
747-
main.s.request(p);
757+
if (this.isFirstRequest) {
758+
this.isFirstRequest = false;
759+
p--;
760+
761+
if (p > 0) {
762+
main.s.request(p);
763+
}
764+
} else {
765+
main.s.request(p);
766+
}
748767
}
749768
}
750769
}

reactor-core/src/test/java/reactor/core/publisher/FluxGroupByTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,37 @@ public void twoGroupsLongAsyncMergeHidden() {
374374
.assertComplete();
375375
}
376376

377+
@Test
378+
public void twoGroupsLongAsyncMergeHidden2() {
379+
ForkJoinPool forkJoinPool = new ForkJoinPool();
380+
381+
for (int j = 0; j < 100; j++) {
382+
AssertSubscriber<Long> ts = AssertSubscriber.create();
383+
AtomicLong dropped = new AtomicLong();
384+
385+
Hooks.onNextDropped(__ -> dropped.incrementAndGet());
386+
try {
387+
final int total = 100_000;
388+
Flux.range(0, total)
389+
.groupBy(i -> (i / 2d) * 2d, 42)
390+
.flatMap(it -> it.take(1)
391+
.hide(), 2)
392+
.publishOn(Schedulers.fromExecutorService(forkJoinPool), 2)
393+
.count()
394+
.subscribe(ts);
395+
396+
ts.await(Duration.ofSeconds(50));
397+
398+
ts.assertValues(total - dropped.get())
399+
.assertNoError()
400+
.assertComplete();
401+
402+
} finally {
403+
Hooks.resetOnNextDropped();
404+
}
405+
}
406+
}
407+
377408
@Test
378409
public void twoGroupsConsumeWithSubscribe() {
379410
ForkJoinPool forkJoinPool = new ForkJoinPool();

0 commit comments

Comments
 (0)