Skip to content

Pipe: Adjusted some loggers of metrics to avoid unnecessary warns & Include "lastEvent" into pipe's event count metrics & Removed the "userConflict" judgment to data sync failure caused by METADATA_ERROR #12758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,15 @@ public void register(final IoTDBConfigRegionExtractor extractor) {

public void thawRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
"Failed to thaw pipe remaining time rate, RemainingTimeOperator({}) does not exist",
pipeID);
// The configNode may have no pipe task after "startPipe"
return;
}
remainingTimeOperatorMap.get(pipeID).thawRate(true);
}

public void freezeRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
"Failed to freeze pipe remaining time rate, RemainingTimeOperator({}) does not exist",
pipeID);
// The configNode may have no pipe task after "stopPipe"
return;
}
remainingTimeOperatorMap.get(pipeID).freezeRate(true);
Expand All @@ -147,7 +143,7 @@ public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
}
final PipeConfigNodeRemainingTimeOperator operator = remainingTimeOperatorMap.get(pipeID);
if (Objects.isNull(operator)) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe region commit, RemainingTimeOperator({}) does not exist", pipeID);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void markConfigEvent(final String taskID) {
}
final Rate rate = configRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe config region write plan event, PipeConfigNodeSubtask({}) does not exist",
taskID);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
Expand Down Expand Up @@ -123,6 +124,17 @@ public void register(final IoTDBDataRegionExtractor extractor) {
}
}

public void register(final PipeProcessorSubtask processorSubtask) {
// The metric is global thus the regionId is omitted
final String pipeID = processorSubtask.getPipeName() + "_" + processorSubtask.getCreationTime();
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.register(processorSubtask);
if (Objects.nonNull(metricService)) {
createMetrics(pipeID);
}
}

public void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName, final long creationTime) {
// The metric is global thus the regionId is omitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.event.Event;

import com.codahale.metrics.Clock;
Expand All @@ -40,6 +41,8 @@
class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
private final Set<IoTDBDataRegionExtractor> dataRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<PipeProcessorSubtask> dataRegionProcessors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<PipeConnectorSubtask> dataRegionConnectors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
Expand All @@ -57,6 +60,10 @@ long getRemainingEvents() {
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
+ dataRegionProcessors.stream()
.map(processorSubtask -> processorSubtask.getEventCount(false))
.reduce(Integer::sum)
.orElse(0)
+ dataRegionConnectors.stream()
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
Expand Down Expand Up @@ -84,6 +91,10 @@ long getRemainingEvents() {
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
+ dataRegionProcessors.stream()
.map(processorSubtask -> processorSubtask.getEventCount(true))
.reduce(Integer::sum)
.orElse(0)
+ dataRegionConnectors.stream()
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
Expand Down Expand Up @@ -156,6 +167,11 @@ void register(final IoTDBDataRegionExtractor extractor) {
dataRegionExtractors.add(extractor);
}

void register(final PipeProcessorSubtask processorSubtask) {
setNameAndCreationTime(processorSubtask.getPipeName(), processorSubtask.getCreationTime());
dataRegionProcessors.add(processorSubtask);
}

void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName, final long creationTime) {
setNameAndCreationTime(pipeName, creationTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void markTabletEvent(final String taskID) {
}
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe data region connector tablet event, PipeConnectorSubtask({}) does not exist",
taskID);
return;
Expand All @@ -294,7 +294,7 @@ public void markTsFileEvent(final String taskID) {
}
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe data region connector tsfile event, PipeConnectorSubtask({}) does not exist",
taskID);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void markTabletEvent(final String taskID) {
}
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe data region extractor tablet event, IoTDBDataRegionExtractor({}) does not exist",
taskID);
return;
Expand All @@ -330,7 +330,7 @@ public void markTsFileEvent(final String taskID) {
}
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe data region extractor tsfile event, IoTDBDataRegionExtractor({}) does not exist",
taskID);
return;
Expand All @@ -344,7 +344,7 @@ public void markPipeHeartbeatEvent(final String taskID) {
}
final Rate rate = pipeHeartbeatRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe data region extractor heartbeat event, IoTDBDataRegionExtractor({}) does not exist",
taskID);
return;
Expand All @@ -359,7 +359,7 @@ public void setRecentProcessedTsFileEpochState(
}
final Gauge gauge = recentProcessedTsFileEpochStateMap.get(taskID);
if (gauge == null) {
LOGGER.warn(
LOGGER.info(
"Failed to set recent processed tsfile epoch state, PipeRealtimeDataRegionExtractor({}) does not exist",
taskID);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ public class PipeProcessorMetrics implements IMetricSet {
//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////

@Override
public void bindTo(AbstractMetricService metricService) {
public void bindTo(final AbstractMetricService metricService) {
this.metricService = metricService;
ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
for (String taskID : taskIDs) {
final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
for (final String taskID : taskIDs) {
createMetrics(taskID);
}
}

private void createMetrics(String taskID) {
private void createMetrics(final String taskID) {
createRate(taskID);
}

private void createRate(String taskID) {
PipeProcessorSubtask processor = processorMap.get(taskID);
private void createRate(final String taskID) {
final PipeProcessorSubtask processor = processorMap.get(taskID);
// process event rate
tabletRateMap.put(
taskID,
Expand Down Expand Up @@ -106,22 +106,22 @@ private void createRate(String taskID) {
}

@Override
public void unbindFrom(AbstractMetricService metricService) {
ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
for (String taskID : taskIDs) {
public void unbindFrom(final AbstractMetricService metricService) {
final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
for (final String taskID : taskIDs) {
deregister(taskID);
}
if (!processorMap.isEmpty()) {
LOGGER.warn("Failed to unbind from pipe processor metrics, processor map not empty");
}
}

private void removeMetrics(String taskID) {
private void removeMetrics(final String taskID) {
removeAutoGauge(taskID);
removeRate(taskID);
}

private void removeAutoGauge(String taskID) {
private void removeAutoGauge(final String taskID) {
PipeProcessorSubtask processor = processorMap.get(taskID);
// pending event count
metricService.remove(
Expand Down Expand Up @@ -153,7 +153,7 @@ private void removeAutoGauge(String taskID) {
String.valueOf(processor.getCreationTime()));
}

private void removeRate(String taskID) {
private void removeRate(final String taskID) {
PipeProcessorSubtask processor = processorMap.get(taskID);
// process event rate
metricService.remove(
Expand Down Expand Up @@ -190,15 +190,15 @@ private void removeRate(String taskID) {

//////////////////////////// register & deregister (pipe integration) ////////////////////////////

public void register(@NonNull PipeProcessorSubtask pipeProcessorSubtask) {
String taskID = pipeProcessorSubtask.getTaskID();
public void register(@NonNull final PipeProcessorSubtask pipeProcessorSubtask) {
final String taskID = pipeProcessorSubtask.getTaskID();
processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
if (Objects.nonNull(metricService)) {
createMetrics(taskID);
}
}

public void deregister(String taskID) {
public void deregister(final String taskID) {
if (!processorMap.containsKey(taskID)) {
LOGGER.warn(
"Failed to deregister pipe processor metrics, PipeProcessorSubtask({}) does not exist",
Expand All @@ -211,41 +211,41 @@ public void deregister(String taskID) {
processorMap.remove(taskID);
}

public void markTabletEvent(String taskID) {
public void markTabletEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
}
Rate rate = tabletRateMap.get(taskID);
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe processor tablet event, PipeProcessorSubtask({}) does not exist",
taskID);
return;
}
rate.mark();
}

public void markTsFileEvent(String taskID) {
public void markTsFileEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
}
Rate rate = tsFileRateMap.get(taskID);
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe processor tsfile event, PipeProcessorSubtask({}) does not exist",
taskID);
return;
}
rate.mark();
}

public void markPipeHeartbeatEvent(String taskID) {
public void markPipeHeartbeatEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
}
Rate rate = pipeHeartbeatRateMap.get(taskID);
final Rate rate = pipeHeartbeatRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe processor heartbeat event, PipeProcessorSubtask({}) does not exist",
taskID);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void markSchemaEvent(final String taskID) {
}
final Rate rate = schemaRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
LOGGER.info(
"Failed to mark pipe schema region write plan event, PipeConnectorSubtask({}) does not exist",
taskID);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,10 @@ private TSStatus visitInsertBase(
} else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
&& config.isEnablePartialInsert()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
} else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
&& (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
&& config.isEnablePartialInsert())) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return visitStatement(insertBaseStatement, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
Expand Down Expand Up @@ -233,15 +234,18 @@ public int getConnectorIndex() {
}

public int getTsFileInsertionEventCount() {
return inputPendingQueue.getTsFileInsertionEventCount();
return inputPendingQueue.getTsFileInsertionEventCount()
+ (lastEvent instanceof TsFileInsertionEvent ? 1 : 0);
}

public int getTabletInsertionEventCount() {
return inputPendingQueue.getTabletInsertionEventCount();
return inputPendingQueue.getTabletInsertionEventCount()
+ (lastEvent instanceof TabletInsertionEvent ? 1 : 0);
}

public int getPipeHeartbeatEventCount() {
return inputPendingQueue.getPipeHeartbeatEventCount();
return inputPendingQueue.getPipeHeartbeatEventCount()
+ (lastEvent instanceof PipeHeartbeatEvent ? 1 : 0);
}

public int getAsyncConnectorRetryEventQueueSize() {
Expand All @@ -262,7 +266,7 @@ public int getEventCount(final String pipeName) {
count.incrementAndGet();
}
});
} catch (Exception e) {
} catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Exception occurred when counting event of pipe {}, root cause: {}",
Expand All @@ -271,10 +275,14 @@ public int getEventCount(final String pipeName) {
e);
}
}
// Avoid potential NPE in "getPipeName"
final EnrichedEvent event =
lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
return count.get()
+ (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getRetryEventCount(pipeName)
: 0);
: 0)
+ (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1 : 0);
}

//////////////////////////// Error report ////////////////////////////
Expand Down
Loading
Loading