Skip to content

Commit 36dd489

Browse files
authored
Add clean logic for FragmentInstance in case that callback is not added. (#12768)
1 parent 6cbdf31 commit 36dd489

File tree

5 files changed

+30
-9
lines changed

5 files changed

+30
-9
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -585,11 +585,13 @@ public MPPDataExchangeManager(
585585
ExecutorService executorService,
586586
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
587587
mppDataExchangeServiceClientManager) {
588-
this.localMemoryManager = Validate.notNull(localMemoryManager);
589-
this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
590-
this.executorService = Validate.notNull(executorService);
588+
this.localMemoryManager = Validate.notNull(localMemoryManager, "localMemoryManager is null.");
589+
this.tsBlockSerdeFactory =
590+
Validate.notNull(tsBlockSerdeFactory, "tsBlockSerdeFactory is null.");
591+
this.executorService = Validate.notNull(executorService, "executorService is null.");
591592
this.mppDataExchangeServiceClientManager =
592-
Validate.notNull(mppDataExchangeServiceClientManager);
593+
Validate.notNull(
594+
mppDataExchangeServiceClientManager, "mppDataExchangeServiceClientManager is null.");
593595
sourceHandles = new ConcurrentHashMap<>();
594596
shuffleSinkHandles = new ConcurrentHashMap<>();
595597
}
@@ -601,10 +603,11 @@ public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() {
601603
return mppDataExchangeService;
602604
}
603605

604-
public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String fragmentInstanceId) {
606+
public void deRegisterFragmentInstanceFromMemoryPool(
607+
String queryId, String fragmentInstanceId, boolean forceDeregister) {
605608
localMemoryManager
606609
.getQueryPool()
607-
.deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId);
610+
.deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId, forceDeregister);
608611
}
609612

610613
public LocalMemoryManager getLocalMemoryManager() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private void initialize(IDriverScheduler scheduler, boolean isExplainAnalyze) {
309309

310310
// release memory
311311
exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
312-
instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId());
312+
instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true);
313313

314314
if (newState.isFailed()) {
315315
scheduler.abortFragmentInstance(instanceId);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java

+10
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
177177
instance.isExplainAnalyze(),
178178
exchangeManager);
179179
} catch (Throwable t) {
180+
clearFIRelatedResources(instanceId);
180181
logger.warn("error when create FragmentInstanceExecution.", t);
181182
stateMachine.failed(t);
182183
return null;
@@ -205,6 +206,14 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
205206
}
206207
}
207208

209+
private void clearFIRelatedResources(FragmentInstanceId instanceId) {
210+
// close and remove all the handles of the fragment instance
211+
exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift());
212+
// clear MemoryPool
213+
exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
214+
instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), false);
215+
}
216+
208217
private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId queryId, int dataNodeFINum) {
209218
return dataNodeQueryContextMap.computeIfAbsent(
210219
queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum));
@@ -249,6 +258,7 @@ public FragmentInstanceInfo execSchemaQueryFragmentInstance(
249258
false,
250259
exchangeManager);
251260
} catch (Throwable t) {
261+
clearFIRelatedResources(instanceId);
252262
logger.warn("Execute error caused by ", t);
253263
stateMachine.failed(t);
254264
return null;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void registerPlanNodeIdToQueryMemoryMap(
181181
* @throws MemoryLeakException throw {@link MemoryLeakException}
182182
*/
183183
public void deRegisterFragmentInstanceFromQueryMemoryMap(
184-
String queryId, String fragmentInstanceId) {
184+
String queryId, String fragmentInstanceId, boolean forceDeregister) {
185185
Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId);
186186
if (queryRelatedMemory != null) {
187187
Map<String, Long> fragmentRelatedMemory = queryRelatedMemory.get(fragmentInstanceId);
@@ -192,6 +192,13 @@ public void deRegisterFragmentInstanceFromQueryMemoryMap(
192192
hasPotentialMemoryLeak =
193193
fragmentRelatedMemory.values().stream().anyMatch(value -> value != 0L);
194194
}
195+
if (!forceDeregister && hasPotentialMemoryLeak) {
196+
// If hasPotentialMemoryLeak is true, it means that LocalSourceChannel/LocalSourceHandles
197+
// have not been closed.
198+
// We should wait for them to be closed. Make sure this method is called again with
199+
// forceDeregister == true, after all LocalSourceChannel/LocalSourceHandles are closed.
200+
return;
201+
}
195202
synchronized (queryMemoryReservations) {
196203
queryRelatedMemory.remove(fragmentInstanceId);
197204
if (queryRelatedMemory.isEmpty()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,8 @@ private void cleanUpResultHandle() {
365365
.deRegisterFragmentInstanceFromMemoryPool(
366366
fragmentInstanceId.queryId,
367367
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
368-
fragmentInstanceId));
368+
fragmentInstanceId),
369+
true);
369370
}
370371
}
371372

0 commit comments

Comments
 (0)