Skip to content

Commit 812e898

Browse files
133tosakarinOneSizeFitsQuorum
authored andcommitted
[fix] Change IoTConsensusService and PipeConsensusService from async to sync (#13077)
hotfix for meituan, not released yet
1 parent ef3738b commit 812e898

File tree

4 files changed

+85
-137
lines changed

4 files changed

+85
-137
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public IoTConsensus(ConsensusConfig config, Registry registry) {
133133
@Override
134134
public synchronized void start() throws IOException {
135135
initAndRecover();
136-
service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
136+
service.initSyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
137137
try {
138138
registerManager.register(service);
139139
} catch (StartupException e) {

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java

+5-15
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,9 @@
2929
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
3030
import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
3131

32-
import org.apache.thrift.TBaseAsyncProcessor;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534

36-
import java.lang.reflect.InvocationTargetException;
37-
3835
public class IoTConsensusRPCService extends ThriftService implements IoTConsensusRPCServiceMBean {
3936

4037
private static final Logger logger = LoggerFactory.getLogger(IoTConsensusRPCService.class);
@@ -54,17 +51,15 @@ public ServiceType getID() {
5451
}
5552

5653
@Override
57-
public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
54+
public void initSyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
5855
this.iotConsensusRPCServiceProcessor =
5956
(IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor;
60-
super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor);
57+
super.initSyncedServiceImpl(iotConsensusRPCServiceProcessor);
6158
}
6259

6360
@Override
64-
public void initTProcessor()
65-
throws ClassNotFoundException, IllegalAccessException, InstantiationException,
66-
NoSuchMethodException, InvocationTargetException {
67-
processor = new IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor);
61+
public void initTProcessor() {
62+
processor = new IoTConsensusIService.Processor<>(iotConsensusRPCServiceProcessor);
6863
}
6964

7065
@Override
@@ -73,20 +68,15 @@ public void initThriftServiceThread()
7368
try {
7469
thriftServiceThread =
7570
new ThriftServiceThread(
76-
(TBaseAsyncProcessor<?>) processor,
71+
processor,
7772
getID().getName(),
7873
ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(),
7974
getBindIP(),
8075
getBindPort(),
81-
config.getRpc().getRpcSelectorThreadNum(),
82-
config.getRpc().getRpcMinConcurrentClientNum(),
8376
config.getRpc().getRpcMaxConcurrentClientNum(),
8477
config.getRpc().getThriftServerAwaitTimeForStopService(),
8578
new IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor),
8679
config.getRpc().isRpcThriftCompressionEnabled(),
87-
config.getRpc().getConnectionTimeoutInMs(),
88-
config.getRpc().getThriftMaxFrameSize(),
89-
ThriftServiceThread.ServerType.SELECTOR,
9080
ZeroCopyRpcTransportFactory.INSTANCE);
9181
} catch (RPCServiceException e) {
9282
throw new IllegalAccessException(e.getMessage());

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java

+77-108
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,13 @@
5555
import org.apache.iotdb.rpc.TSStatusCode;
5656

5757
import org.apache.thrift.TException;
58-
import org.apache.thrift.async.AsyncMethodCallback;
5958
import org.slf4j.Logger;
6059
import org.slf4j.LoggerFactory;
6160

6261
import java.util.Collections;
6362
import java.util.stream.Collectors;
6463

65-
public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.AsyncIface {
64+
public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Iface {
6665

6766
private static final Logger LOGGER =
6867
LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class);
@@ -74,98 +73,85 @@ public IoTConsensusRPCServiceProcessor(IoTConsensus consensus) {
7473
}
7574

7675
@Override
77-
public void syncLogEntries(
78-
TSyncLogEntriesReq req, AsyncMethodCallback<TSyncLogEntriesRes> resultHandler) {
79-
try {
80-
ConsensusGroupId groupId =
81-
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
82-
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
83-
if (impl == null) {
84-
String message =
85-
String.format(
86-
"unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s",
87-
groupId, req.getLogEntries().size());
88-
LOGGER.error(message);
89-
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
90-
status.setMessage(message);
91-
resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status)));
92-
return;
93-
}
94-
if (impl.isReadOnly()) {
95-
String message = "fail to sync logEntries because system is read-only.";
96-
LOGGER.error(message);
97-
TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
98-
status.setMessage(message);
99-
resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status)));
100-
return;
101-
}
102-
if (!impl.isActive()) {
103-
TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
104-
status.setMessage("peer is inactive and not ready to receive sync log request");
105-
resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status)));
106-
return;
107-
}
108-
BatchIndexedConsensusRequest logEntriesInThisBatch =
109-
new BatchIndexedConsensusRequest(req.peerId);
110-
// We use synchronized to ensure atomicity of executing multiple logs
111-
for (TLogEntry entry : req.getLogEntries()) {
112-
logEntriesInThisBatch.add(
113-
impl.buildIndexedConsensusRequestForRemoteRequest(
114-
entry.getSearchIndex(),
115-
entry.getData().stream()
116-
.map(
117-
entry.isFromWAL()
118-
? IoTConsensusRequest::new
119-
: ByteBufferConsensusRequest::new)
120-
.collect(Collectors.toList())));
121-
}
122-
long buildRequestTime = System.nanoTime();
123-
IConsensusRequest deserializedRequest =
124-
impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
125-
impl.getIoTConsensusServerMetrics()
126-
.recordDeserializeCost(System.nanoTime() - buildRequestTime);
127-
TSStatus writeStatus =
128-
impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest);
129-
LOGGER.debug(
130-
"execute TSyncLogEntriesReq for {} with result {}",
131-
req.consensusGroupId,
132-
writeStatus.subStatus);
133-
resultHandler.onComplete(new TSyncLogEntriesRes(writeStatus.subStatus));
134-
} catch (Exception e) {
135-
resultHandler.onError(e);
76+
public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
77+
ConsensusGroupId groupId =
78+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
79+
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
80+
if (impl == null) {
81+
String message =
82+
String.format(
83+
"unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s",
84+
groupId, req.getLogEntries().size());
85+
LOGGER.error(message);
86+
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
87+
status.setMessage(message);
88+
return new TSyncLogEntriesRes(Collections.singletonList(status));
13689
}
90+
if (impl.isReadOnly()) {
91+
String message = "fail to sync logEntries because system is read-only.";
92+
LOGGER.error(message);
93+
TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
94+
status.setMessage(message);
95+
return new TSyncLogEntriesRes(Collections.singletonList(status));
96+
}
97+
if (!impl.isActive()) {
98+
TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
99+
status.setMessage("peer is inactive and not ready to receive sync log request");
100+
return new TSyncLogEntriesRes(Collections.singletonList(status));
101+
}
102+
BatchIndexedConsensusRequest logEntriesInThisBatch =
103+
new BatchIndexedConsensusRequest(req.peerId);
104+
// We use synchronized to ensure atomicity of executing multiple logs
105+
for (TLogEntry entry : req.getLogEntries()) {
106+
logEntriesInThisBatch.add(
107+
impl.buildIndexedConsensusRequestForRemoteRequest(
108+
entry.getSearchIndex(),
109+
entry.getData().stream()
110+
.map(
111+
entry.isFromWAL()
112+
? IoTConsensusRequest::new
113+
: ByteBufferConsensusRequest::new)
114+
.collect(Collectors.toList())));
115+
}
116+
long buildRequestTime = System.nanoTime();
117+
IConsensusRequest deserializedRequest =
118+
impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
119+
impl.getIoTConsensusServerMetrics().recordDeserializeCost(System.nanoTime() - buildRequestTime);
120+
TSStatus writeStatus =
121+
impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest);
122+
LOGGER.debug(
123+
"execute TSyncLogEntriesReq for {} with result {}",
124+
req.consensusGroupId,
125+
writeStatus.subStatus);
126+
return new TSyncLogEntriesRes(writeStatus.subStatus);
137127
}
138128

139129
@Override
140-
public void inactivatePeer(
141-
TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes> resultHandler)
142-
throws TException {
130+
public TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) throws TException {
143131
if (req.isForDeletionPurpose()) {
144132
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
145133
}
146134
ConsensusGroupId groupId =
147135
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
148136
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
137+
149138
if (impl == null) {
150139
String message =
151140
String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId);
152141
LOGGER.error(message);
153142
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
154143
status.setMessage(message);
155-
resultHandler.onComplete(new TInactivatePeerRes(status));
156-
return;
144+
return new TInactivatePeerRes(status);
157145
}
158146
impl.setActive(false);
159-
resultHandler.onComplete(
160-
new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
161147
if (req.isForDeletionPurpose()) {
162148
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
163149
}
150+
return new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
164151
}
165152

166153
@Override
167-
public void activatePeer(
168-
TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> resultHandler) throws TException {
154+
public TActivatePeerRes activatePeer(TActivatePeerReq req) throws TException {
169155
ConsensusGroupId groupId =
170156
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
171157
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -175,18 +161,15 @@ public void activatePeer(
175161
LOGGER.error(message);
176162
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
177163
status.setMessage(message);
178-
resultHandler.onComplete(new TActivatePeerRes(status));
179-
return;
164+
return new TActivatePeerRes(status);
180165
}
181166
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
182167
impl.setActive(true);
183-
resultHandler.onComplete(
184-
new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
168+
return new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
185169
}
186170

187171
@Override
188-
public void buildSyncLogChannel(
189-
TBuildSyncLogChannelReq req, AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
172+
public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
190173
throws TException {
191174
ConsensusGroupId groupId =
192175
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -197,8 +180,7 @@ public void buildSyncLogChannel(
197180
LOGGER.error(message);
198181
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
199182
status.setMessage(message);
200-
resultHandler.onComplete(new TBuildSyncLogChannelRes(status));
201-
return;
183+
return new TBuildSyncLogChannelRes(status);
202184
}
203185
TSStatus responseStatus;
204186
try {
@@ -208,12 +190,11 @@ public void buildSyncLogChannel(
208190
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
209191
responseStatus.setMessage(e.getMessage());
210192
}
211-
resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
193+
return new TBuildSyncLogChannelRes(responseStatus);
212194
}
213195

214196
@Override
215-
public void removeSyncLogChannel(
216-
TRemoveSyncLogChannelReq req, AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler)
197+
public TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
217198
throws TException {
218199
ConsensusGroupId groupId =
219200
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -224,8 +205,7 @@ public void removeSyncLogChannel(
224205
LOGGER.error(message);
225206
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
226207
status.setMessage(message);
227-
resultHandler.onComplete(new TRemoveSyncLogChannelRes(status));
228-
return;
208+
return new TRemoveSyncLogChannelRes(status);
229209
}
230210
TSStatus responseStatus;
231211
try {
@@ -235,12 +215,11 @@ public void removeSyncLogChannel(
235215
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
236216
responseStatus.setMessage(e.getMessage());
237217
}
238-
resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
218+
return new TRemoveSyncLogChannelRes(responseStatus);
239219
}
240220

241221
@Override
242-
public void waitSyncLogComplete(
243-
TWaitSyncLogCompleteReq req, AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler)
222+
public TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req)
244223
throws TException {
245224
ConsensusGroupId groupId =
246225
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -251,18 +230,15 @@ public void waitSyncLogComplete(
251230
LOGGER.error(message);
252231
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
253232
status.setMessage(message);
254-
resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
255-
return;
233+
return new TWaitSyncLogCompleteRes(true, 0, 0);
256234
}
257235
long searchIndex = impl.getSearchIndex();
258236
long safeIndex = impl.getMinSyncIndex();
259-
resultHandler.onComplete(
260-
new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
237+
return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex);
261238
}
262239

263240
@Override
264-
public void sendSnapshotFragment(
265-
TSendSnapshotFragmentReq req, AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
241+
public TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
266242
throws TException {
267243
ConsensusGroupId groupId =
268244
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -273,8 +249,7 @@ public void sendSnapshotFragment(
273249
LOGGER.error(message);
274250
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
275251
status.setMessage(message);
276-
resultHandler.onComplete(new TSendSnapshotFragmentRes(status));
277-
return;
252+
return new TSendSnapshotFragmentRes(status);
278253
}
279254
TSStatus responseStatus;
280255
try {
@@ -284,12 +259,11 @@ public void sendSnapshotFragment(
284259
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
285260
responseStatus.setMessage(e.getMessage());
286261
}
287-
resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus));
262+
return new TSendSnapshotFragmentRes(responseStatus);
288263
}
289264

290265
@Override
291-
public void triggerSnapshotLoad(
292-
TTriggerSnapshotLoadReq req, AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler)
266+
public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
293267
throws TException {
294268
ConsensusGroupId groupId =
295269
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -300,20 +274,16 @@ public void triggerSnapshotLoad(
300274
LOGGER.error(message);
301275
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
302276
status.setMessage(message);
303-
resultHandler.onComplete(new TTriggerSnapshotLoadRes(status));
304-
return;
277+
return new TTriggerSnapshotLoadRes(status);
305278
}
306279
impl.loadSnapshot(req.snapshotId);
307280
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
308-
resultHandler.onComplete(
309-
new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
281+
return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
310282
}
311283

312284
@Override
313-
public void cleanupTransferredSnapshot(
314-
TCleanupTransferredSnapshotReq req,
315-
AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler)
316-
throws TException {
285+
public TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(
286+
TCleanupTransferredSnapshotReq req) throws TException {
317287
ConsensusGroupId groupId =
318288
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
319289
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -323,8 +293,7 @@ public void cleanupTransferredSnapshot(
323293
LOGGER.error(message);
324294
TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
325295
status.setMessage(message);
326-
resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status));
327-
return;
296+
return new TCleanupTransferredSnapshotRes(status);
328297
}
329298
TSStatus responseStatus;
330299
try {
@@ -335,7 +304,7 @@ public void cleanupTransferredSnapshot(
335304
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
336305
responseStatus.setMessage(e.getMessage());
337306
}
338-
resultHandler.onComplete(new TCleanupTransferredSnapshotRes(responseStatus));
307+
return new TCleanupTransferredSnapshotRes(responseStatus);
339308
}
340309

341310
public void handleClientExit() {}

0 commit comments

Comments
 (0)