File tree 1 file changed +31
-0
lines changed
reactor-core/src/test/java/reactor/core/publisher
1 file changed +31
-0
lines changed Original file line number Diff line number Diff line change @@ -374,6 +374,37 @@ public void twoGroupsLongAsyncMergeHidden() {
374
374
.assertComplete ();
375
375
}
376
376
377
+ @ Test
378
+ public void twoGroupsLongAsyncMergeHidden2 () {
379
+ ForkJoinPool forkJoinPool = new ForkJoinPool ();
380
+
381
+ for (int j = 0 ; j < 100 ; j ++) {
382
+ AssertSubscriber <Long > ts = AssertSubscriber .create ();
383
+ AtomicLong dropped = new AtomicLong ();
384
+
385
+ Hooks .onNextDropped (__ -> dropped .incrementAndGet ());
386
+ try {
387
+ final int total = 100_000 ;
388
+ Flux .range (0 , total )
389
+ .groupBy (i -> (i / 2d ) * 2d , 42 )
390
+ .flatMap (it -> it .take (1 )
391
+ .hide (), 2 )
392
+ .publishOn (Schedulers .fromExecutorService (forkJoinPool ), 2 )
393
+ .count ()
394
+ .subscribe (ts );
395
+
396
+ ts .await (Duration .ofSeconds (50 ));
397
+
398
+ ts .assertValues (total - dropped .get ())
399
+ .assertNoError ()
400
+ .assertComplete ();
401
+
402
+ } finally {
403
+ Hooks .resetOnNextDropped ();
404
+ }
405
+ }
406
+ }
407
+
377
408
@ Test
378
409
public void twoGroupsConsumeWithSubscribe () {
379
410
ForkJoinPool forkJoinPool = new ForkJoinPool ();
You can’t perform that action at this time.
0 commit comments