Skip to content

Commit f591f9d

Browse files
authored
update versions and fixes memleak in UnboundedProcessor (#1106)
This PR updates dependencies and makes minor modifications to UnboundedProcessor due to repeating failures of UnboundedProcessorJCStreassTest, which started reproducing some unspotted issues. Motivation: UnboundedProcessor is a critical component in the RSocket-Java ecosystem and must work properly. After analysis of its internal state machine, it was spotted that sometimes: The request may not be delivered due to natural concurrency The terminal signal may not be delivered since it checks for demand which might be consumed already (due to natural concurrency) The final value could be delivered violating reactive-streams spec Modifications: This PR adds a minimal set of changes, preserving old implementation but eliminating the mentioned bugs --------- Signed-off-by: Oleh Dokuka <[email protected]>
1 parent cb811cf commit f591f9d

File tree

5 files changed

+370
-52
lines changed

5 files changed

+370
-52
lines changed

build.gradle

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ subprojects {
3333
apply plugin: 'com.github.sherter.google-java-format'
3434
apply plugin: 'com.github.vlsi.gradle-extensions'
3535

36-
ext['reactor-bom.version'] = '2020.0.32'
37-
ext['logback.version'] = '1.2.10'
38-
ext['netty-bom.version'] = '4.1.93.Final'
39-
ext['netty-boringssl.version'] = '2.0.61.Final'
36+
ext['reactor-bom.version'] = '2020.0.39'
37+
ext['logback.version'] = '1.3.14'
38+
ext['netty-bom.version'] = '4.1.106.Final'
39+
ext['netty-boringssl.version'] = '2.0.62.Final'
4040
ext['hdrhistogram.version'] = '2.1.12'
4141
ext['mockito.version'] = '4.11.0'
4242
ext['slf4j.version'] = '1.7.36'

rsocket-core/build.gradle

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ dependencies {
4141
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
4242

4343
jcstressImplementation(project(":rsocket-test"))
44+
jcstressImplementation 'org.slf4j:slf4j-api'
4445
jcstressImplementation "ch.qos.logback:logback-classic"
4546
jcstressImplementation 'io.projectreactor:reactor-test'
4647
}
4748

4849
jcstress {
49-
mode = 'quick' //quick, default, tough
50-
jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.15"
50+
mode = 'sanity' //sanity, quick, default, tough
51+
jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.16"
5152
}
5253

5354
jar {

rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java

+51-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.UnpooledByteBufAllocator;
55
import io.rsocket.core.StressSubscriber;
6+
import io.rsocket.utils.FastLogger;
7+
import java.util.Arrays;
8+
import java.util.ConcurrentModificationException;
69
import org.openjdk.jcstress.annotations.Actor;
710
import org.openjdk.jcstress.annotations.Arbiter;
811
import org.openjdk.jcstress.annotations.Expect;
@@ -14,14 +17,17 @@
1417
import org.openjdk.jcstress.infra.results.L_Result;
1518
import reactor.core.Fuseable;
1619
import reactor.core.publisher.Hooks;
20+
import reactor.util.Logger;
1721

1822
public abstract class UnboundedProcessorStressTest {
1923

2024
static {
2125
Hooks.onErrorDropped(t -> {});
2226
}
2327

24-
final UnboundedProcessor unboundedProcessor = new UnboundedProcessor();
28+
final Logger logger = new FastLogger(getClass().getName());
29+
30+
final UnboundedProcessor unboundedProcessor = new UnboundedProcessor(logger);
2531

2632
@JCStressTest
2733
@Outcome(
@@ -145,6 +151,8 @@ public void arbiter(LLL_Result r) {
145151
stressSubscriber.values.forEach(ByteBuf::release);
146152

147153
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
154+
155+
checkOutcomes(this, r.toString(), logger);
148156
}
149157
}
150158

@@ -270,6 +278,8 @@ public void arbiter(LLL_Result r) {
270278
stressSubscriber.values.forEach(ByteBuf::release);
271279

272280
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
281+
282+
checkOutcomes(this, r.toString(), logger);
273283
}
274284
}
275285

@@ -375,6 +385,8 @@ public void arbiter(LLL_Result r) {
375385
stressSubscriber.values.forEach(ByteBuf::release);
376386

377387
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
388+
389+
checkOutcomes(this, r.toString(), logger);
378390
}
379391
}
380392

@@ -476,6 +488,8 @@ public void arbiter(LLL_Result r) {
476488
stressSubscriber.values.forEach(ByteBuf::release);
477489

478490
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
491+
492+
checkOutcomes(this, r.toString(), logger);
479493
}
480494
}
481495

@@ -578,6 +592,8 @@ public void arbiter(LLL_Result r) {
578592
stressSubscriber.values.forEach(ByteBuf::release);
579593

580594
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
595+
596+
checkOutcomes(this, r.toString(), logger);
581597
}
582598
}
583599

@@ -701,6 +717,8 @@ public void arbiter(LLL_Result r) {
701717
stressSubscriber.values.forEach(ByteBuf::release);
702718

703719
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
720+
721+
checkOutcomes(this, r.toString(), logger);
704722
}
705723
}
706724

@@ -781,6 +799,8 @@ public void arbiter(LLL_Result r) {
781799
stressSubscriber.values.forEach(ByteBuf::release);
782800

783801
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
802+
803+
checkOutcomes(this, r.toString(), logger);
784804
}
785805
}
786806

@@ -837,9 +857,15 @@ public void arbiter(LLL_Result r) {
837857
+ stressSubscriber.onErrorCalls * 2
838858
+ stressSubscriber.droppedErrors.size() * 3;
839859

860+
if (stressSubscriber.concurrentOnNext || stressSubscriber.concurrentOnComplete) {
861+
throw new ConcurrentModificationException("boo");
862+
}
863+
840864
stressSubscriber.values.forEach(ByteBuf::release);
841865

842866
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
867+
868+
checkOutcomes(this, r.toString(), logger);
843869
}
844870
}
845871

@@ -892,6 +918,8 @@ public void arbiter(LLL_Result r) {
892918
stressSubscriber.values.forEach(ByteBuf::release);
893919

894920
r.r3 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
921+
922+
checkOutcomes(this, r.toString(), logger);
895923
}
896924
}
897925

@@ -1107,6 +1135,8 @@ public void arbiter(LLLL_Result r) {
11071135
stressSubscriber.values.forEach(ByteBuf::release);
11081136

11091137
r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
1138+
1139+
checkOutcomes(this, r.toString(), logger);
11101140
}
11111141
}
11121142

@@ -1238,6 +1268,8 @@ public void arbiter(LLLL_Result r) {
12381268
stressSubscriber.values.forEach(ByteBuf::release);
12391269

12401270
r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
1271+
1272+
checkOutcomes(this, r.toString(), logger);
12411273
}
12421274
}
12431275

@@ -1390,6 +1422,8 @@ public void arbiter(LLLL_Result r) {
13901422
stressSubscriber.values.forEach(ByteBuf::release);
13911423

13921424
r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
1425+
1426+
checkOutcomes(this, r.toString(), logger);
13931427
}
13941428
}
13951429

@@ -1522,6 +1556,8 @@ public void arbiter(LLLL_Result r) {
15221556
stressSubscriber.values.forEach(ByteBuf::release);
15231557

15241558
r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
1559+
1560+
checkOutcomes(this, r.toString(), logger);
15251561
}
15261562
}
15271563

@@ -1587,6 +1623,8 @@ public void arbiter(LLLL_Result r) {
15871623
stressSubscriber.values.forEach(ByteBuf::release);
15881624

15891625
r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
1626+
1627+
checkOutcomes(this, r.toString(), logger);
15901628
}
15911629
}
15921630

@@ -1652,6 +1690,8 @@ public void arbiter(LLLL_Result r) {
16521690
stressSubscriber.values.forEach(ByteBuf::release);
16531691

16541692
r.r4 = byteBuf1.refCnt() + byteBuf2.refCnt() + byteBuf3.refCnt() + byteBuf4.refCnt();
1693+
1694+
checkOutcomes(this, r.toString(), logger);
16551695
}
16561696
}
16571697

@@ -1678,6 +1718,16 @@ public void subscribe2() {
16781718
@Arbiter
16791719
public void arbiter(L_Result r) {
16801720
r.r1 = stressSubscriber1.onErrorCalls + stressSubscriber2.onErrorCalls;
1721+
1722+
checkOutcomes(this, r.toString(), logger);
1723+
}
1724+
}
1725+
1726+
static void checkOutcomes(Object instance, String result, Logger logger) {
1727+
if (Arrays.stream(instance.getClass().getDeclaredAnnotationsByType(Outcome.class))
1728+
.flatMap(o -> Arrays.stream(o.id()))
1729+
.noneMatch(s -> s.equalsIgnoreCase(result))) {
1730+
throw new RuntimeException(result + " " + logger);
16811731
}
16821732
}
16831733
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package io.rsocket.utils;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
import java.util.Comparator;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.regex.Matcher;
10+
import java.util.regex.Pattern;
11+
import java.util.stream.Collectors;
12+
import reactor.util.Logger;
13+
14+
/**
15+
* Implementation of {@link Logger} which is based on the {@link ThreadLocal} based queue which
16+
* collects all the events on the per-thread basis. </br> Such logger is designed to have all events
17+
* stored during the stress-test run and then sorted and printed out once all the Threads completed
18+
* execution (inside the {@link org.openjdk.jcstress.annotations.Arbiter} annotated method. </br>
19+
* Note, this implementation only supports trace-level logs and ignores all others, it is intended
20+
* to be used by {@link reactor.core.publisher.StateLogger}.
21+
*/
22+
public class FastLogger implements Logger {
23+
24+
final Map<Thread, List<String>> queues = new ConcurrentHashMap<>();
25+
26+
final ThreadLocal<List<String>> logsQueueLocal =
27+
ThreadLocal.withInitial(
28+
() -> {
29+
final ArrayList<String> logs = new ArrayList<>(100);
30+
queues.put(Thread.currentThread(), logs);
31+
return logs;
32+
});
33+
34+
private final String name;
35+
36+
public FastLogger(String name) {
37+
this.name = name;
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return queues
43+
.values()
44+
.stream()
45+
.flatMap(List::stream)
46+
.sorted(
47+
Comparator.comparingLong(
48+
s -> {
49+
Pattern pattern = Pattern.compile("\\[(.*?)]");
50+
Matcher matcher = pattern.matcher(s);
51+
matcher.find();
52+
return Long.parseLong(matcher.group(1));
53+
}))
54+
.collect(Collectors.joining("\n"));
55+
}
56+
57+
@Override
58+
public String getName() {
59+
return this.name;
60+
}
61+
62+
@Override
63+
public boolean isTraceEnabled() {
64+
return true;
65+
}
66+
67+
@Override
68+
public void trace(String msg) {
69+
logsQueueLocal.get().add(String.format("[%s] %s", System.nanoTime(), msg));
70+
}
71+
72+
@Override
73+
public void trace(String format, Object... arguments) {
74+
trace(String.format(format, arguments));
75+
}
76+
77+
@Override
78+
public void trace(String msg, Throwable t) {
79+
trace(String.format("%s, %s", msg, Arrays.toString(t.getStackTrace())));
80+
}
81+
82+
@Override
83+
public boolean isDebugEnabled() {
84+
return false;
85+
}
86+
87+
@Override
88+
public void debug(String msg) {}
89+
90+
@Override
91+
public void debug(String format, Object... arguments) {}
92+
93+
@Override
94+
public void debug(String msg, Throwable t) {}
95+
96+
@Override
97+
public boolean isInfoEnabled() {
98+
return false;
99+
}
100+
101+
@Override
102+
public void info(String msg) {}
103+
104+
@Override
105+
public void info(String format, Object... arguments) {}
106+
107+
@Override
108+
public void info(String msg, Throwable t) {}
109+
110+
@Override
111+
public boolean isWarnEnabled() {
112+
return false;
113+
}
114+
115+
@Override
116+
public void warn(String msg) {}
117+
118+
@Override
119+
public void warn(String format, Object... arguments) {}
120+
121+
@Override
122+
public void warn(String msg, Throwable t) {}
123+
124+
@Override
125+
public boolean isErrorEnabled() {
126+
return false;
127+
}
128+
129+
@Override
130+
public void error(String msg) {}
131+
132+
@Override
133+
public void error(String format, Object... arguments) {}
134+
135+
@Override
136+
public void error(String msg, Throwable t) {}
137+
}

0 commit comments

Comments
 (0)