Skip to content

Commit 4fa1b11

Browse files
committed
improve IT & fix bugs in SubscriptionExecutorServiceManager.java
1 parent 668030d commit 4fa1b11

File tree

4 files changed

+20
-6
lines changed

4 files changed

+20
-6
lines changed

integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class IoTDBSubscriptionITConstant {
2323

2424
public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
2525
public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
26-
public static final long AWAITILITY_AT_MOST_SECOND = 240L;
26+
public static final long AWAITILITY_AT_MOST_SECOND = 600L;
2727

2828
public static final long SLEEP_NS = 1_000_000_000L;
2929
public static final long POLL_TIMEOUT_MS = 10_000L;

integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java

+13
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,30 @@
2121

2222
import org.apache.iotdb.it.env.MultiEnvFactory;
2323
import org.apache.iotdb.itbase.env.BaseEnv;
24+
import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
2425

2526
import org.junit.After;
2627
import org.junit.Before;
28+
import org.junit.Rule;
29+
import org.junit.rules.TestName;
2730

2831
abstract class AbstractSubscriptionDualIT {
2932

3033
protected BaseEnv senderEnv;
3134
protected BaseEnv receiverEnv;
3235

36+
@Rule public TestName testName = new TestName();
37+
3338
@Before
3439
public void setUp() {
40+
// set thread name
41+
Thread.currentThread().setName(String.format("%s - main", testName.getMethodName()));
42+
43+
// set thread pools core size
44+
SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
45+
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
46+
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
47+
3548
MultiEnvFactory.createEnv(2);
3649
senderEnv = MultiEnvFactory.getEnv(0);
3750
receiverEnv = MultiEnvFactory.getEnv(1);

integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1005,7 +1005,7 @@ private void pollMessagesAndCheck(
10051005
LOGGER.info("consumer {} exiting...", consumers.get(index));
10061006
}
10071007
},
1008-
consumers.get(index).toString());
1008+
String.format("%s - %s", testName.getMethodName(), consumers.get(index).toString()));
10091009
t.start();
10101010
threads.add(t);
10111011
}
@@ -1016,6 +1016,7 @@ private void pollMessagesAndCheck(
10161016
final Statement statement = connection.createStatement()) {
10171017
// Keep retrying if there are execution failures
10181018
Awaitility.await()
1019+
.pollInSameThread()
10191020
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS)
10201021
.pollInterval(
10211022
IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS)

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929
import java.util.concurrent.ScheduledFuture;
3030
import java.util.concurrent.TimeUnit;
3131

32-
final class SubscriptionExecutorServiceManager {
32+
public final class SubscriptionExecutorServiceManager {
3333

3434
private static final Logger LOGGER =
3535
LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);
3636

37-
private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
37+
private static final long AWAIT_TERMINATION_TIMEOUT_MS = 15_000L;
3838

3939
private static final String CONTROL_FLOW_EXECUTOR_NAME = "SubscriptionControlFlowExecutor";
4040
private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
@@ -172,9 +172,9 @@ boolean isShutdown() {
172172
}
173173

174174
void setCorePoolSize(final int corePoolSize) {
175-
if (!isShutdown()) {
175+
if (isShutdown()) {
176176
synchronized (this) {
177-
if (!isShutdown()) {
177+
if (isShutdown()) {
178178
this.corePoolSize = corePoolSize;
179179
return;
180180
}

0 commit comments

Comments
 (0)