File tree 2 files changed +52
-2
lines changed
main/java/reactor/core/publisher
test/java/reactor/core/publisher
2 files changed +52
-2
lines changed Original file line number Diff line number Diff line change @@ -515,6 +515,7 @@ public K key() {
515
515
volatile boolean outputFused ;
516
516
517
517
int produced ;
518
+ boolean isFirstRequest = true ;
518
519
519
520
UnicastGroupedFlux (K key ,
520
521
Queue <V > queue ,
@@ -572,7 +573,16 @@ void drainRegular(Subscriber<? super V> a) {
572
573
if (e != 0 ) {
573
574
GroupByMain <?, K , V > main = parent ;
574
575
if (main != null ) {
575
- main .s .request (e );
576
+ if (this .isFirstRequest ) {
577
+ this .isFirstRequest = false ;
578
+ long toRequest = e - 1 ;
579
+
580
+ if (toRequest > 0 ) {
581
+ main .s .request (toRequest );
582
+ }
583
+ } else {
584
+ main .s .request (e );
585
+ }
576
586
}
577
587
if (r != Long .MAX_VALUE ) {
578
588
REQUESTED .addAndGet (this , -e );
@@ -751,7 +761,16 @@ void tryReplenish() {
751
761
produced = 0 ;
752
762
GroupByMain <?, K , V > main = parent ;
753
763
if (main != null ) {
754
- main .s .request (p );
764
+ if (this .isFirstRequest ) {
765
+ this .isFirstRequest = false ;
766
+ p --;
767
+
768
+ if (p > 0 ) {
769
+ main .s .request (p );
770
+ }
771
+ } else {
772
+ main .s .request (p );
773
+ }
755
774
}
756
775
}
757
776
}
Original file line number Diff line number Diff line change @@ -374,6 +374,37 @@ public void twoGroupsLongAsyncMergeHidden() {
374
374
.assertComplete ();
375
375
}
376
376
377
+ @ Test
378
+ public void twoGroupsLongAsyncMergeHidden2 () {
379
+ ForkJoinPool forkJoinPool = new ForkJoinPool ();
380
+
381
+ for (int j = 0 ; j < 100 ; j ++) {
382
+ AssertSubscriber <Long > ts = AssertSubscriber .create ();
383
+ AtomicLong dropped = new AtomicLong ();
384
+
385
+ Hooks .onNextDropped (__ -> dropped .incrementAndGet ());
386
+ try {
387
+ final int total = 100_000 ;
388
+ Flux .range (0 , total )
389
+ .groupBy (i -> (i / 2d ) * 2d , 42 )
390
+ .flatMap (it -> it .take (1 )
391
+ .hide (), 2 )
392
+ .publishOn (Schedulers .fromExecutorService (forkJoinPool ), 2 )
393
+ .count ()
394
+ .subscribe (ts );
395
+
396
+ ts .await (Duration .ofSeconds (50 ));
397
+
398
+ ts .assertValues (total - dropped .get ())
399
+ .assertNoError ()
400
+ .assertComplete ();
401
+
402
+ } finally {
403
+ Hooks .resetOnNextDropped ();
404
+ }
405
+ }
406
+ }
407
+
377
408
@ Test
378
409
public void twoGroupsConsumeWithSubscribe () {
379
410
ForkJoinPool forkJoinPool = new ForkJoinPool ();
You can’t perform that action at this time.
0 commit comments