File tree 1 file changed +21
-2
lines changed
reactor-core/src/main/java/reactor/core/publisher
1 file changed +21
-2
lines changed Original file line number Diff line number Diff line change @@ -515,6 +515,7 @@ public K key() {
515
515
volatile boolean outputFused ;
516
516
517
517
int produced ;
518
+ boolean isFirstRequest = true ;
518
519
519
520
UnicastGroupedFlux (K key ,
520
521
Queue <V > queue ,
@@ -572,7 +573,16 @@ void drainRegular(Subscriber<? super V> a) {
572
573
if (e != 0 ) {
573
574
GroupByMain <?, K , V > main = parent ;
574
575
if (main != null ) {
575
- main .s .request (e );
576
+ if (this .isFirstRequest ) {
577
+ this .isFirstRequest = false ;
578
+ long toRequest = e - 1 ;
579
+
580
+ if (toRequest > 0 ) {
581
+ main .s .request (toRequest );
582
+ }
583
+ } else {
584
+ main .s .request (e );
585
+ }
576
586
}
577
587
if (r != Long .MAX_VALUE ) {
578
588
REQUESTED .addAndGet (this , -e );
@@ -751,7 +761,16 @@ void tryReplenish() {
751
761
produced = 0 ;
752
762
GroupByMain <?, K , V > main = parent ;
753
763
if (main != null ) {
754
- main .s .request (p );
764
+ if (this .isFirstRequest ) {
765
+ this .isFirstRequest = false ;
766
+ p --;
767
+
768
+ if (p > 0 ) {
769
+ main .s .request (p );
770
+ }
771
+ } else {
772
+ main .s .request (p );
773
+ }
755
774
}
756
775
}
757
776
}
You can’t perform that action at this time.
0 commit comments