Pipe: Smoothed the rate in pipe's remaining time calculations (#12699)

This commit is contained in:
Caideyipi 2024-06-17 11:43:49 +08:00 committed by GitHub
parent d00c7665ba
commit 89af73d208
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 672 additions and 339 deletions

View File

@ -44,26 +44,28 @@ public class CountPointProcessor implements PipeProcessor {
private PartialPath aggregateSeries;
@Override
public void validate(PipeParameterValidator validator) {
public void validate(final PipeParameterValidator validator) {
validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
}
@Override
public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
public void customize(
final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration)
throws Exception {
this.aggregateSeries = new PartialPath(parameters.getString(AGGREGATE_SERIES_KEY));
}
@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) {
public void process(
final TabletInsertionEvent tabletInsertionEvent, final EventCollector eventCollector) {
tabletInsertionEvent.processTablet(
(tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
}
@Override
public void process(Event event, EventCollector eventCollector) throws Exception {
public void process(final Event event, final EventCollector eventCollector) throws Exception {
if (event instanceof PipeHeartbeatEvent) {
Tablet tablet =
final Tablet tablet =
new Tablet(
aggregateSeries.getDevice(),
Collections.singletonList(
@ -73,7 +75,7 @@ public class CountPointProcessor implements PipeProcessor {
tablet.addTimestamp(0, System.currentTimeMillis());
tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get());
eventCollector.collect(
new PipeRawTabletInsertionEvent(tablet, false, null, null, null, false));
new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, false));
}
}

View File

@ -64,6 +64,16 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
return PipeConfigNodeAgent.runtime().isShutdown();
}
@Override
protected void thawRate(final String pipeName, final long creationTime) {
PipeConfigNodeRemainingTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
}
@Override
protected void freezeRate(final String pipeName, final long creationTime) {
PipeConfigNodeRemainingTimeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime);
}
@Override
protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta pipeMetaFromConfigNode)
throws IllegalPathException {

View File

@ -90,7 +90,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
public PipeConfigRegionSnapshotEvent(
final String snapshotPath, final String templateFilePath, final CNSnapshotFileType type) {
this(snapshotPath, templateFilePath, type, null, null, null);
this(snapshotPath, templateFilePath, type, null, 0, null, null);
}
public PipeConfigRegionSnapshotEvent(
@ -98,9 +98,15 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
final String templateFilePath,
final CNSnapshotFileType type,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
super(pipeName, pipeTaskMeta, pattern, PipeConfigNodeSnapshotResourceManager.getInstance());
super(
pipeName,
creationTime,
pipeTaskMeta,
pattern,
PipeConfigNodeSnapshotResourceManager.getInstance());
this.snapshotPath = snapshotPath;
this.templateFilePath = Objects.nonNull(templateFilePath) ? templateFilePath : "";
this.fileType = type;
@ -157,12 +163,13 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionSnapshotEvent(
snapshotPath, templateFilePath, fileType, pipeName, pipeTaskMeta, pattern);
snapshotPath, templateFilePath, fileType, pipeName, creationTime, pipeTaskMeta, pattern);
}
@Override

View File

@ -41,16 +41,17 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan, final boolean isGeneratedByPipe) {
this(configPhysicalPlan, null, null, null, isGeneratedByPipe);
this(configPhysicalPlan, null, 0, null, null, isGeneratedByPipe);
}
public PipeConfigRegionWritePlanEvent(
final ConfigPhysicalPlan configPhysicalPlan,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
this.configPhysicalPlan = configPhysicalPlan;
}
@ -61,12 +62,13 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeConfigRegionWritePlanEvent(
configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
configPhysicalPlan, pipeName, creationTime, pipeTaskMeta, pattern, false);
}
@Override

View File

@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager.pipe.metric;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@ -110,6 +109,26 @@ public class PipeConfigNodeRemainingTimeMetrics implements IMetricSet {
}
}
public void thawRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
"Failed to thaw pipe remaining time rate, RemainingTimeOperator({}) does not exist",
pipeID);
return;
}
remainingTimeOperatorMap.get(pipeID).thawRate(true);
}
public void freezeRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
"Failed to freeze pipe remaining time rate, RemainingTimeOperator({}) does not exist",
pipeID);
return;
}
remainingTimeOperatorMap.get(pipeID).freezeRate(true);
}
public void deregister(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
@ -122,12 +141,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements IMetricSet {
}
}
public void markRegionCommit(final PipeTaskRuntimeEnvironment pipeTaskRuntimeEnvironment) {
// Filter commit attempt from assigner
final String pipeName = pipeTaskRuntimeEnvironment.getPipeName();
final long creationTime = pipeTaskRuntimeEnvironment.getCreationTime();
final String pipeID = pipeName + "_" + creationTime;
public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
if (Objects.isNull(metricService)) {
return;
}
@ -137,12 +151,6 @@ public class PipeConfigNodeRemainingTimeMetrics implements IMetricSet {
"Failed to mark pipe region commit, RemainingTimeOperator({}) does not exist", pipeID);
return;
}
// Prevent not set pipeName / creation times & potential differences between pipeNames and
// creation times
if (!Objects.equals(pipeName, operator.getPipeName())
|| !Objects.equals(creationTime, operator.getCreationTime())) {
return;
}
operator.markConfigRegionCommit();
}

View File

@ -19,77 +19,72 @@
package org.apache.iotdb.confignode.manager.pipe.metric;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtask;
import org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
import com.codahale.metrics.Clock;
import com.codahale.metrics.ExponentialMovingAverages;
import com.codahale.metrics.Meter;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
class PipeConfigNodeRemainingTimeOperator {
class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator {
private static final long CONFIG_NODE_REMAINING_MAX_SECONDS = 365 * 24 * 60 * 60L; // 1 year
private final Set<IoTDBConfigRegionExtractor> configRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicReference<Meter> configRegionCommitMeter = new AtomicReference<>(null);
private String pipeName;
private long creationTime = 0;
private final ConcurrentMap<IoTDBConfigRegionExtractor, IoTDBConfigRegionExtractor>
configRegionExtractors = new ConcurrentHashMap<>();
private final Meter configRegionCommitMeter =
new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
private double lastConfigRegionCommitSmoothingValue = Long.MIN_VALUE;
//////////////////////////// Tags ////////////////////////////
String getPipeName() {
return pipeName;
}
long getCreationTime() {
return creationTime;
}
private double lastConfigRegionCommitSmoothingValue = Long.MAX_VALUE;
//////////////////////////// Remaining time calculation ////////////////////////////
/**
* This will calculate the estimated remaining time of the given pipe's config region subTask.
* This will calculate the estimated remaining time of the given pipe's {@link
* PipeConfigNodeSubtask}.
*
* @return The estimated remaining time
*/
double getRemainingTime() {
final double pipeRemainingTimeCommitRateSmoothingFactor =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateSmoothingFactor();
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
// Do not calculate heartbeat event
final long totalConfigRegionWriteEventCount =
configRegionExtractors.keySet().stream()
configRegionExtractors.stream()
.map(IoTDBConfigRegionExtractor::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
lastConfigRegionCommitSmoothingValue =
lastConfigRegionCommitSmoothingValue == Long.MIN_VALUE
? configRegionCommitMeter.getOneMinuteRate()
: pipeRemainingTimeCommitRateSmoothingFactor
* configRegionCommitMeter.getOneMinuteRate()
+ (1 - pipeRemainingTimeCommitRateSmoothingFactor)
* lastConfigRegionCommitSmoothingValue;
configRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
lastConfigRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});
final double configRegionRemainingTime;
if (totalConfigRegionWriteEventCount <= 0) {
notifyEmpty();
configRegionRemainingTime = 0;
} else {
notifyNonEmpty();
configRegionRemainingTime =
lastConfigRegionCommitSmoothingValue <= 0
? Double.MAX_VALUE
: totalConfigRegionWriteEventCount / lastConfigRegionCommitSmoothingValue;
}
return configRegionRemainingTime >= CONFIG_NODE_REMAINING_MAX_SECONDS
? CONFIG_NODE_REMAINING_MAX_SECONDS
return configRegionRemainingTime >= REMAINING_MAX_SECONDS
? REMAINING_MAX_SECONDS
: configRegionRemainingTime;
}
@ -97,17 +92,37 @@ class PipeConfigNodeRemainingTimeOperator {
void register(final IoTDBConfigRegionExtractor extractor) {
setNameAndCreationTime(extractor.getPipeName(), extractor.getCreationTime());
configRegionExtractors.put(extractor, extractor);
}
private void setNameAndCreationTime(final String pipeName, final long creationTime) {
this.pipeName = pipeName;
this.creationTime = creationTime;
configRegionExtractors.add(extractor);
}
//////////////////////////// Rate ////////////////////////////
void markConfigRegionCommit() {
configRegionCommitMeter.mark();
configRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
meter.mark();
}
return meter;
});
}
//////////////////////////// Switch ////////////////////////////
@Override
public synchronized void thawRate(final boolean isStartPipe) {
super.thawRate(isStartPipe);
// The stopped pipe's rate should only be thawed by "startPipe" command
if (isStopped) {
return;
}
configRegionCommitMeter.compareAndSet(
null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock()));
}
@Override
public synchronized void freezeRate(final boolean isStopPipe) {
super.freezeRate(isStopPipe);
configRegionCommitMeter.set(null);
}
}

View File

@ -32,12 +32,12 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
private final PipeConfigNodeSubtask subtask;
public PipeConfigNodeTaskStage(
String pipeName,
long creationTime,
Map<String, String> extractorAttributes,
Map<String, String> processorAttributes,
Map<String, String> connectorAttributes,
PipeTaskMeta pipeTaskMeta) {
final String pipeName,
final long creationTime,
final Map<String, String> extractorAttributes,
final Map<String, String> processorAttributes,
final Map<String, String> connectorAttributes,
final PipeTaskMeta pipeTaskMeta) {
try {
subtask =
@ -48,7 +48,7 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
processorAttributes,
connectorAttributes,
pipeTaskMeta);
} catch (Exception e) {
} catch (final Exception e) {
throw new PipeException(
String.format(
"Failed to create subtask for pipe %s, creation time %d", pipeName, creationTime),
@ -63,8 +63,6 @@ public class PipeConfigNodeTaskStage extends PipeTaskStage {
@Override
public void startSubtask() throws PipeException {
// IoTDBConfigRegionExtractor is started by executor because starting
// here may cause deadlock when triggering snapshot
PipeConfigNodeSubtaskExecutor.getInstance().start(subtask.getTaskID());
}

View File

@ -237,6 +237,17 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
});
}
@Override
protected void thawRate(final String pipeName, final long creationTime) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
}
@Override
protected void freezeRate(final String pipeName, final long creationTime) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.freezeRate(pipeName + "_" + creationTime);
}
@Override
protected boolean dropPipe(final String pipeName, final long creationTime) {
if (!super.dropPipe(pipeName, creationTime)) {

View File

@ -31,7 +31,7 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
private final UserDefinedEvent userDefinedEvent;
private final EnrichedEvent enrichedEvent;
public static Event maybeOf(Event event) {
public static Event maybeOf(final Event event) {
return event instanceof UserDefinedEvent
&& ((UserDefinedEvent) event).getSourceEvent() instanceof EnrichedEvent
? new UserDefinedEnrichedEvent(
@ -39,9 +39,11 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
: event;
}
private UserDefinedEnrichedEvent(UserDefinedEvent userDefinedEvent, EnrichedEvent enrichedEvent) {
private UserDefinedEnrichedEvent(
final UserDefinedEvent userDefinedEvent, final EnrichedEvent enrichedEvent) {
super(
enrichedEvent.getPipeName(),
enrichedEvent.getCreationTime(),
enrichedEvent.getPipeTaskMeta(),
enrichedEvent.getPipePattern(),
enrichedEvent.getStartTime(),
@ -55,12 +57,12 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
}
@Override
public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
return enrichedEvent.internallyIncreaseResourceReferenceCount(holderMessage);
}
@Override
public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
return enrichedEvent.internallyDecreaseResourceReferenceCount(holderMessage);
}
@ -71,13 +73,14 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime) {
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
pipeName, pipeTaskMeta, pattern, startTime, endTime);
pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
}
@Override

View File

@ -38,7 +38,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
private final String dataRegionId;
private String pipeName;
private long timePublished;
private long timeAssigned;
@ -60,18 +59,19 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private final boolean shouldPrintMessage;
public PipeHeartbeatEvent(final String dataRegionId, final boolean shouldPrintMessage) {
super(null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
super(null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.shouldPrintMessage = shouldPrintMessage;
}
public PipeHeartbeatEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final String dataRegionId,
final long timePublished,
final boolean shouldPrintMessage) {
super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.timePublished = timePublished;
this.shouldPrintMessage = shouldPrintMessage;
@ -100,6 +100,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
@ -107,7 +108,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
// Should record PipeTaskMeta, for sometimes HeartbeatEvents should report exceptions.
// Here we ignore parameters `pattern`, `startTime`, and `endTime`.
return new PipeHeartbeatEvent(
pipeName, pipeTaskMeta, dataRegionId, timePublished, shouldPrintMessage);
pipeName, creationTime, pipeTaskMeta, dataRegionId, timePublished, shouldPrintMessage);
}
@Override
@ -128,12 +129,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
/////////////////////////////// Delay Reporting ///////////////////////////////
public void bindPipeName(final String pipeName) {
if (shouldPrintMessage) {
this.pipeName = pipeName;
}
}
public void onPublished() {
if (shouldPrintMessage) {
timePublished = System.currentTimeMillis();

View File

@ -105,6 +105,7 @@ public class PipeRowCollector implements RowCollector {
tablet,
isAligned,
sourceEvent == null ? null : sourceEvent.getPipeName(),
sourceEvent == null ? 0 : sourceEvent.getCreationTime(),
pipeTaskMeta,
sourceEvent,
false));

View File

@ -78,7 +78,7 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent {
public PipeSchemaRegionSnapshotEvent(
final String mTreeSnapshotPath, final String tagLogSnapshotPath, final String databaseName) {
this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, null, null);
this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, 0, null, null);
}
public PipeSchemaRegionSnapshotEvent(
@ -86,9 +86,10 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent {
final String tagLogSnapshotPath,
final String databaseName,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern) {
super(pipeName, pipeTaskMeta, pattern, PipeResourceManager.snapshot());
super(pipeName, creationTime, pipeTaskMeta, pattern, PipeResourceManager.snapshot());
this.mTreeSnapshotPath = mTreeSnapshotPath;
this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ? tagLogSnapshotPath : "";
this.databaseName = databaseName;
@ -145,12 +146,19 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeSchemaRegionSnapshotEvent(
mTreeSnapshotPath, tagLogSnapshotPath, databaseName, pipeName, pipeTaskMeta, pattern);
mTreeSnapshotPath,
tagLogSnapshotPath,
databaseName,
pipeName,
creationTime,
pipeTaskMeta,
pattern);
}
@Override

View File

@ -40,16 +40,17 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
}
public PipeSchemaRegionWritePlanEvent(final PlanNode planNode, final boolean isGeneratedByPipe) {
this(planNode, null, null, null, isGeneratedByPipe);
this(planNode, null, 0, null, null, isGeneratedByPipe);
}
public PipeSchemaRegionWritePlanEvent(
final PlanNode planNode,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
super(pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
this.planNode = planNode;
}
@ -60,12 +61,13 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeSchemaRegionWritePlanEvent(
planNode, pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
planNode, pipeName, creationTime, pipeTaskMeta, pattern, isGeneratedByPipe);
}
@Override

View File

@ -68,11 +68,11 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
private ProgressIndex progressIndex;
public PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
PartialPath devicePath,
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe) {
final WALEntryHandler walEntryHandler,
final PartialPath devicePath,
final ProgressIndex progressIndex,
final boolean isAligned,
final boolean isGeneratedByPipe) {
this(
walEntryHandler,
devicePath,
@ -80,6 +80,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
isAligned,
isGeneratedByPipe,
null,
0,
null,
null,
Long.MIN_VALUE,
@ -87,17 +88,18 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
}
private PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
PartialPath devicePath,
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe,
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
final WALEntryHandler walEntryHandler,
final PartialPath devicePath,
final ProgressIndex progressIndex,
final boolean isAligned,
final boolean isGeneratedByPipe,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
this.walEntryHandler = walEntryHandler;
// Record device path here so there's no need to get it from InsertNode cache later.
this.devicePath = devicePath;
@ -129,11 +131,11 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
try {
PipeResourceManager.wal().pin(walEntryHandler);
return true;
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.warn(
String.format(
"Increase reference count for memtable %d error. Holder Message: %s",
@ -144,7 +146,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
}
@Override
public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
try {
PipeResourceManager.wal().unpin(walEntryHandler);
// Release the containers' memory.
@ -153,7 +155,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
dataContainers = null;
}
return true;
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.warn(
String.format(
"Decrease reference count for memtable %d error. Holder Message: %s",
@ -164,7 +166,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
}
@Override
public void bindProgressIndex(ProgressIndex progressIndex) {
public void bindProgressIndex(final ProgressIndex progressIndex) {
this.progressIndex = progressIndex;
}
@ -175,11 +177,12 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
@Override
public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime) {
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
devicePath,
@ -187,6 +190,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
isAligned,
isGeneratedByPipe,
pipeName,
creationTime,
pipeTaskMeta,
pattern,
startTime,
@ -224,7 +228,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
throw new UnSupportedDataTypeException(
String.format("InsertNode type %s is not supported.", insertNode.getClass().getName()));
}
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.warn(
"Exception occurred when determining the event time of PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. Returning true to ensure data integrity.",
this,
@ -238,7 +242,8 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) {
public Iterable<TabletInsertionEvent> processRowByRow(
final BiConsumer<Row, RowCollector> consumer) {
return initDataContainers().stream()
.map(tabletInsertionDataContainer -> tabletInsertionDataContainer.processRowByRow(consumer))
.flatMap(Collection::stream)
@ -246,7 +251,8 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
}
@Override
public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) {
public Iterable<TabletInsertionEvent> processTablet(
final BiConsumer<Tablet, RowCollector> consumer) {
return initDataContainers().stream()
.map(tabletInsertionDataContainer -> tabletInsertionDataContainer.processTablet(consumer))
.flatMap(Collection::stream)
@ -334,7 +340,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
.map(
tablet ->
new PipeRawTabletInsertionEvent(
tablet, isAligned, pipeName, pipeTaskMeta, this, false))
tablet, isAligned, pipeName, creationTime, pipeTaskMeta, this, false))
.filter(event -> !event.hasNoNeedParsingAndIsEmpty())
.collect(Collectors.toList());

View File

@ -52,16 +52,17 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
private ProgressIndex overridingProgressIndex;
private PipeRawTabletInsertionEvent(
Tablet tablet,
boolean isAligned,
EnrichedEvent sourceEvent,
boolean needToReport,
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
final Tablet tablet,
final boolean isAligned,
final EnrichedEvent sourceEvent,
final boolean needToReport,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
this.tablet = Objects.requireNonNull(tablet);
this.isAligned = isAligned;
this.sourceEvent = sourceEvent;
@ -69,18 +70,20 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
}
public PipeRawTabletInsertionEvent(
Tablet tablet,
boolean isAligned,
String pipeName,
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
boolean needToReport) {
final Tablet tablet,
final boolean isAligned,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent,
final boolean needToReport) {
this(
tablet,
isAligned,
sourceEvent,
needToReport,
pipeName,
creationTime,
pipeTaskMeta,
null,
Long.MIN_VALUE,
@ -88,28 +91,30 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
}
@TestOnly
public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) {
this(tablet, isAligned, null, false, null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) {
this(tablet, isAligned, null, false, null, 0, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
}
@TestOnly
public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, PipePattern pattern) {
this(tablet, isAligned, null, false, null, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
public PipeRawTabletInsertionEvent(
final Tablet tablet, final boolean isAligned, final PipePattern pattern) {
this(tablet, isAligned, null, false, null, 0, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
}
@TestOnly
public PipeRawTabletInsertionEvent(Tablet tablet, long startTime, long endTime) {
this(tablet, false, null, false, null, null, null, startTime, endTime);
public PipeRawTabletInsertionEvent(
final Tablet tablet, final long startTime, final long endTime) {
this(tablet, false, null, false, null, 0, null, null, startTime, endTime);
}
@Override
public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
allocatedMemoryBlock = PipeResourceManager.memory().forceAllocateWithRetry(tablet);
return true;
}
@Override
public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
allocatedMemoryBlock.close();
// Record the deviceId before the memory is released,
@ -130,7 +135,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
}
@Override
public void bindProgressIndex(ProgressIndex overridingProgressIndex) {
public void bindProgressIndex(final ProgressIndex overridingProgressIndex) {
// Normally not all events need to report progress, but if the overridingProgressIndex
// is given, indicating that the progress needs to be reported.
if (Objects.nonNull(overridingProgressIndex)) {
@ -152,17 +157,19 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime) {
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeRawTabletInsertionEvent(
tablet,
isAligned,
sourceEvent,
needToReport,
pipeName,
creationTime,
pipeTaskMeta,
pattern,
startTime,
@ -196,7 +203,8 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) {
public Iterable<TabletInsertionEvent> processRowByRow(
final BiConsumer<Row, RowCollector> consumer) {
if (dataContainer == null) {
dataContainer =
new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, isAligned, pipePattern);
@ -205,7 +213,8 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
}
@Override
public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) {
public Iterable<TabletInsertionEvent> processTablet(
final BiConsumer<Tablet, RowCollector> consumer) {
if (dataContainer == null) {
dataContainer =
new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, isAligned, pipePattern);
@ -241,7 +250,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet
public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, needToReport);
convertToTablet(), isAligned, pipeName, creationTime, pipeTaskMeta, this, needToReport);
}
public boolean hasNoNeedParsingAndIsEmpty() {

View File

@ -38,8 +38,11 @@ public class PipeTerminateEvent extends EnrichedEvent {
private final int dataRegionId;
public PipeTerminateEvent(
final String pipeName, final PipeTaskMeta pipeTaskMeta, final int dataRegionId) {
super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final int dataRegionId) {
super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
}
@ -61,13 +64,14 @@ public class PipeTerminateEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
// Should record PipeTaskMeta, for the terminateEvent shall report progress to
// notify the pipeTask it's completed.
return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId);
}
@Override

View File

@ -59,7 +59,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
private TsFileInsertionDataContainer dataContainer;
public PipeTsFileInsertionEvent(
TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
final TsFileResource resource, final boolean isLoaded, final boolean isGeneratedByPipe) {
// The modFile must be copied before the event is assigned to the listening pipes
this(
resource,
@ -67,6 +67,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
isLoaded,
isGeneratedByPipe,
null,
0,
null,
null,
Long.MIN_VALUE,
@ -74,16 +75,17 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
}
public PipeTsFileInsertionEvent(
TsFileResource resource,
boolean isWithMod,
boolean isLoaded,
boolean isGeneratedByPipe,
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
final TsFileResource resource,
final boolean isWithMod,
final boolean isLoaded,
final boolean isGeneratedByPipe,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
super(pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
this.resource = resource;
tsFile = resource.getTsFile();
@ -186,14 +188,14 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
try {
tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile, true, resource);
if (isWithMod) {
modFile = PipeResourceManager.tsfile().increaseFileReference(modFile, false, null);
}
return true;
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.warn(
String.format(
"Increase reference count for TsFile %s or modFile %s error. Holder Message: %s",
@ -204,14 +206,14 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
}
@Override
public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
try {
PipeResourceManager.tsfile().decreaseFileReference(tsFile);
if (isWithMod) {
PipeResourceManager.tsfile().decreaseFileReference(modFile);
}
return true;
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.warn(
String.format(
"Decrease reference count for TsFile %s error. Holder Message: %s",
@ -231,7 +233,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
return MinimumProgressIndex.INSTANCE;
}
return resource.getMaxProgressIndexAfterClose();
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
LOGGER.warn(
String.format(
"Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath()));
@ -242,17 +244,19 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
@Override
public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime) {
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeTsFileInsertionEvent(
resource,
isWithMod,
isLoaded,
isGeneratedByPipe,
pipeName,
creationTime,
pipeTaskMeta,
pattern,
startTime,
@ -305,7 +309,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
return Collections.emptyList();
}
return initDataContainer().toTabletInsertionEvents();
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
close();
@ -325,7 +329,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
}
return dataContainer;
} catch (IOException e) {
} catch (final IOException e) {
close();
final String errorMsg = String.format("Read TsFile %s error.", resource.getTsFilePath());
@ -334,7 +338,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
}
}
public long count(boolean skipReportOnCommit) throws IOException {
public long count(final boolean skipReportOnCommit) throws IOException {
long count = 0;
if (shouldParseTime()) {

View File

@ -252,6 +252,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable {
tablet,
isAligned,
sourceEvent != null ? sourceEvent.getPipeName() : null,
sourceEvent != null ? sourceEvent.getCreationTime() : 0,
pipeTaskMeta,
sourceEvent,
true);
@ -262,6 +263,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable {
tablet,
isAligned,
sourceEvent != null ? sourceEvent.getPipeName() : null,
sourceEvent != null ? sourceEvent.getCreationTime() : 0,
pipeTaskMeta,
sourceEvent,
false);

View File

@ -58,7 +58,13 @@ public class PipeRealtimeEvent extends EnrichedEvent {
// PipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent
// is only used in the realtime event extractor, which does not need to report the progress
// of the event, so the pipeTaskMeta is always null.
super(event != null ? event.getPipeName() : null, pipeTaskMeta, pattern, startTime, endTime);
super(
event != null ? event.getPipeName() : null,
event != null ? event.getCreationTime() : 0,
pipeTaskMeta,
pattern,
startTime,
endTime);
this.event = event;
this.tsFileEpoch = tsFileEpoch;
@ -140,13 +146,14 @@ public class PipeRealtimeEvent extends EnrichedEvent {
@Override
public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new PipeRealtimeEvent(
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
pipeName, pipeTaskMeta, pattern, startTime, endTime),
pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime),
this.tsFileEpoch,
this.device2Measurements,
pipeTaskMeta,

View File

@ -94,6 +94,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
private String pipeName;
private long creationTime;
private PipeTaskMeta pipeTaskMeta;
private ProgressIndex startIndex;
@ -250,6 +252,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
(PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment();
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
startIndex = environment.getPipeTaskMeta().getProgressIndex();
@ -563,7 +566,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
if (resource == null) {
isTerminateSignalSent = true;
final PipeTerminateEvent terminateEvent =
new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, dataRegionId);
terminateEvent.increaseReferenceCount(
PipeHistoricalDataRegionTsFileExtractor.class.getName());
return terminateEvent;
@ -576,6 +579,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
false,
false,
pipeName,
creationTime,
pipeTaskMeta,
pipePattern,
historicalDataExtractionStartTime,

View File

@ -74,6 +74,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
LoggerFactory.getLogger(PipeRealtimeDataRegionExtractor.class);
protected String pipeName;
protected long creationTime;
protected String dataRegionId;
protected PipeTaskMeta pipeTaskMeta;
@ -167,7 +168,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
// indexed by the taskID of IoTDBDataRegionExtractor. To avoid PipeRealtimeDataRegionExtractor
// holding a reference to IoTDBDataRegionExtractor, the taskID should be constructed to
// match that of IoTDBDataRegionExtractor.
final long creationTime = environment.getCreationTime();
creationTime = environment.getCreationTime();
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
@ -387,6 +388,10 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
return pipeName;
}
public final long getCreationTime() {
return creationTime;
}
public final PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
}

View File

@ -76,6 +76,7 @@ public class PipeDataRegionAssigner implements Closeable {
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
extractor.getPipeName(),
extractor.getCreationTime(),
extractor.getPipeTaskMeta(),
extractor.getPipePattern(),
extractor.getRealtimeDataExtractionStartTime(),
@ -90,7 +91,6 @@ public class PipeDataRegionAssigner implements Closeable {
extractor.extract(copiedEvent);
if (innerEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) innerEvent).bindPipeName(extractor.getPipeName());
((PipeHeartbeatEvent) innerEvent).onAssigned();
}
});

View File

@ -19,17 +19,12 @@
package org.apache.iotdb.db.pipe.metric;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
@ -151,6 +146,26 @@ public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
}
}
public void thawRate(final String pipeID) {
if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
// In dataNode, the "thawRate" may be called when there are no subtasks, and we call
// "startPipe".
// We thaw it later in "startPipeTask".
return;
}
remainingEventAndTimeOperatorMap.get(pipeID).thawRate(true);
}
public void freezeRate(final String pipeID) {
if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
// In dataNode, the "freezeRate" may be called when there are no subtasks, and we call
// "stopPipe" after calling "startPipe".
// We do nothing because in that case the rate is not thawed initially
return;
}
remainingEventAndTimeOperatorMap.get(pipeID).freezeRate(true);
}
public void deregister(final String pipeID) {
if (!remainingEventAndTimeOperatorMap.containsKey(pipeID)) {
LOGGER.warn(
@ -163,13 +178,7 @@ public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
}
}
public void markRegionCommit(final PipeTaskRuntimeEnvironment pipeTaskRuntimeEnvironment) {
// Filter commit attempt from assigner
final String pipeName = pipeTaskRuntimeEnvironment.getPipeName();
final int regionId = pipeTaskRuntimeEnvironment.getRegionId();
final long creationTime = pipeTaskRuntimeEnvironment.getCreationTime();
final String pipeID = pipeName + "_" + creationTime;
public void markRegionCommit(final String pipeID, final boolean isDataRegion) {
if (Objects.isNull(metricService)) {
return;
}
@ -181,19 +190,10 @@ public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
pipeID);
return;
}
// Prevent not set pipeName / creation times & potential differences between pipeNames and
// creation times
if (!Objects.equals(pipeName, operator.getPipeName())
|| !Objects.equals(creationTime, operator.getCreationTime())) {
return;
}
// Prevent empty region-ids
if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))) {
if (isDataRegion) {
operator.markDataRegionCommit();
}
if (SchemaEngine.getInstance().getAllSchemaRegionIds().contains(new SchemaRegionId(regionId))) {
} else {
operator.markSchemaRegionCommit();
}
}

View File

@ -19,61 +19,49 @@
package org.apache.iotdb.db.pipe.metric;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.event.Event;
import com.codahale.metrics.Clock;
import com.codahale.metrics.ExponentialMovingAverages;
import com.codahale.metrics.Meter;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
class PipeDataNodeRemainingEventAndTimeOperator {
class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
private final Set<IoTDBDataRegionExtractor> dataRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<PipeConnectorSubtask> dataRegionConnectors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicReference<Meter> dataRegionCommitMeter = new AtomicReference<>(null);
private final AtomicReference<Meter> schemaRegionCommitMeter = new AtomicReference<>(null);
private static final long DATA_NODE_REMAINING_MAX_SECONDS = 365 * 24 * 60 * 60L; // 1 year
private String pipeName;
private long creationTime = 0;
private final ConcurrentMap<IoTDBDataRegionExtractor, IoTDBDataRegionExtractor>
dataRegionExtractors = new ConcurrentHashMap<>();
private final ConcurrentMap<PipeConnectorSubtask, PipeConnectorSubtask> dataRegionConnectors =
new ConcurrentHashMap<>();
private final ConcurrentMap<IoTDBSchemaRegionExtractor, IoTDBSchemaRegionExtractor>
schemaRegionExtractors = new ConcurrentHashMap<>();
private final Meter dataRegionCommitMeter =
new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
private final Meter schemaRegionCommitMeter =
new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
private double lastDataRegionCommitSmoothingValue = Long.MIN_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MIN_VALUE;
//////////////////////////// Tags ////////////////////////////
String getPipeName() {
return pipeName;
}
long getCreationTime() {
return creationTime;
}
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
//////////////////////////// Remaining event & time calculation ////////////////////////////
long getRemainingEvents() {
return dataRegionExtractors.keySet().stream()
return dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
+ dataRegionConnectors.keySet().stream()
+ dataRegionConnectors.stream()
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
.orElse(0)
+ schemaRegionExtractors.keySet().stream()
+ schemaRegionExtractors.stream()
.map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
@ -82,39 +70,41 @@ class PipeDataNodeRemainingEventAndTimeOperator {
/**
* This will calculate the estimated remaining time of pipe.
*
* <p>Note: The events in pipe assigner are omitted.
* <p>Note: The {@link Event}s in pipe assigner are omitted.
*
* @return The estimated remaining time
*/
double getRemainingTime() {
final double pipeRemainingTimeCommitRateSmoothingFactor =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateSmoothingFactor();
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
// Do not take heartbeat event into account
final int totalDataRegionWriteEventCount =
dataRegionExtractors.keySet().stream()
dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
+ dataRegionConnectors.keySet().stream()
+ dataRegionConnectors.stream()
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
.orElse(0)
- dataRegionExtractors.keySet().stream()
- dataRegionExtractors.stream()
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
.orElse(0)
- dataRegionConnectors.keySet().stream()
- dataRegionConnectors.stream()
.map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
.orElse(0);
lastDataRegionCommitSmoothingValue =
lastDataRegionCommitSmoothingValue == Long.MIN_VALUE
? dataRegionCommitMeter.getOneMinuteRate()
: pipeRemainingTimeCommitRateSmoothingFactor * dataRegionCommitMeter.getOneMinuteRate()
+ (1 - pipeRemainingTimeCommitRateSmoothingFactor)
* lastDataRegionCommitSmoothingValue;
dataRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
lastDataRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});
final double dataRegionRemainingTime;
if (totalDataRegionWriteEventCount <= 0) {
dataRegionRemainingTime = 0;
@ -126,18 +116,19 @@ class PipeDataNodeRemainingEventAndTimeOperator {
}
final long totalSchemaRegionWriteEventCount =
schemaRegionExtractors.keySet().stream()
schemaRegionExtractors.stream()
.map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
lastSchemaRegionCommitSmoothingValue =
lastSchemaRegionCommitSmoothingValue == Long.MIN_VALUE
? schemaRegionCommitMeter.getOneMinuteRate()
: pipeRemainingTimeCommitRateSmoothingFactor
* schemaRegionCommitMeter.getOneMinuteRate()
+ (1 - pipeRemainingTimeCommitRateSmoothingFactor)
* lastSchemaRegionCommitSmoothingValue;
schemaRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
lastSchemaRegionCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});
final double schemaRegionRemainingTime;
if (totalSchemaRegionWriteEventCount <= 0) {
schemaRegionRemainingTime = 0;
@ -148,40 +139,77 @@ class PipeDataNodeRemainingEventAndTimeOperator {
: totalSchemaRegionWriteEventCount / lastSchemaRegionCommitSmoothingValue;
}
if (totalDataRegionWriteEventCount + totalSchemaRegionWriteEventCount == 0) {
notifyEmpty();
} else {
notifyNonEmpty();
}
final double result = Math.max(dataRegionRemainingTime, schemaRegionRemainingTime);
return result >= DATA_NODE_REMAINING_MAX_SECONDS ? DATA_NODE_REMAINING_MAX_SECONDS : result;
return result >= REMAINING_MAX_SECONDS ? REMAINING_MAX_SECONDS : result;
}
//////////////////////////// Register & deregister (pipe integration) ////////////////////////////
void register(final IoTDBDataRegionExtractor extractor) {
setNameAndCreationTime(extractor.getPipeName(), extractor.getCreationTime());
dataRegionExtractors.put(extractor, extractor);
dataRegionExtractors.add(extractor);
}
void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName, final long creationTime) {
setNameAndCreationTime(pipeName, creationTime);
dataRegionConnectors.put(connectorSubtask, connectorSubtask);
dataRegionConnectors.add(connectorSubtask);
}
void register(final IoTDBSchemaRegionExtractor extractor) {
setNameAndCreationTime(extractor.getPipeName(), extractor.getCreationTime());
schemaRegionExtractors.put(extractor, extractor);
}
private void setNameAndCreationTime(final String pipeName, final long creationTime) {
this.pipeName = pipeName;
this.creationTime = creationTime;
schemaRegionExtractors.add(extractor);
}
//////////////////////////// Rate ////////////////////////////
void markDataRegionCommit() {
dataRegionCommitMeter.mark();
dataRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
meter.mark();
}
return meter;
});
}
void markSchemaRegionCommit() {
schemaRegionCommitMeter.mark();
schemaRegionCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
meter.mark();
}
return meter;
});
}
//////////////////////////// Switch ////////////////////////////
// Thread-safe & Idempotent
@Override
public synchronized void thawRate(final boolean isStartPipe) {
super.thawRate(isStartPipe);
// The stopped pipe's rate should only be thawed by "startPipe" command
if (isStopped) {
return;
}
dataRegionCommitMeter.compareAndSet(
null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock()));
schemaRegionCommitMeter.compareAndSet(
null, new Meter(new ExponentialMovingAverages(), Clock.defaultClock()));
}
// Thread-safe & Idempotent
@Override
public synchronized void freezeRate(final boolean isStopPipe) {
super.freezeRate(isStopPipe);
dataRegionCommitMeter.set(null);
schemaRegionCommitMeter.set(null);
}
}

View File

@ -253,7 +253,7 @@ public class TwoStageCountProcessor implements PipeProcessor {
tablet.addValue(outputSeries.getMeasurement(), 0, timestampCountPair.right);
eventCollector.collect(
new PipeRawTabletInsertionEvent(tablet, false, null, null, null, false));
new PipeRawTabletInsertionEvent(tablet, false, null, 0, null, null, false));
PipeCombineHandlerManager.getInstance()
.updateLastCombinedValue(pipeName, creationTime, timestampCountPair);

View File

@ -142,6 +142,7 @@ public class PipeEventCollector implements EventCollector {
new PipeSchemaRegionWritePlanEvent(
planNode,
deleteDataEvent.getPipeName(),
deleteDataEvent.getCreationTime(),
deleteDataEvent.getPipeTaskMeta(),
deleteDataEvent.getPipePattern(),
deleteDataEvent.isGeneratedByPipe()))

View File

@ -95,6 +95,10 @@
<artifactId>metrics-core</artifactId>
<version>1.3.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -22,6 +22,7 @@ package org.apache.iotdb.commons.conf;
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
@ -235,7 +236,9 @@ public class CommonConfig {
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
private double pipeRemainingTimeCommitRateSmoothingFactor = 0.5;
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
private PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
PipeRemainingTimeRateAverageTime.MEAN;
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; // 3 minutes
@ -1020,13 +1023,23 @@ public class CommonConfig {
this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
}
public double getPipeRemainingTimeCommitRateSmoothingFactor() {
return pipeRemainingTimeCommitRateSmoothingFactor;
public long getPipeRemainingTimeCommitRateAutoSwitchSeconds() {
return pipeRemainingTimeCommitRateAutoSwitchSeconds;
}
public void setPipeRemainingTimeCommitRateSmoothingFactor(
double pipeRemainingTimeCommitRateSmoothingFactor) {
this.pipeRemainingTimeCommitRateSmoothingFactor = pipeRemainingTimeCommitRateSmoothingFactor;
public void setPipeRemainingTimeCommitRateAutoSwitchSeconds(
long pipeRemainingTimeCommitRateAutoSwitchSeconds) {
this.pipeRemainingTimeCommitRateAutoSwitchSeconds =
pipeRemainingTimeCommitRateAutoSwitchSeconds;
}
public PipeRemainingTimeRateAverageTime getPipeRemainingTimeCommitRateAverageTime() {
return pipeRemainingTimeCommitRateAverageTime;
}
public void setPipeRemainingTimeCommitRateAverageTime(
PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime) {
this.pipeRemainingTimeCommitRateAverageTime = pipeRemainingTimeCommitRateAverageTime;
}
public double getPipeAllSinksRateLimitBytesPerSecond() {

View File

@ -20,6 +20,7 @@
package org.apache.iotdb.commons.conf;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
@ -541,11 +542,18 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_snapshot_execution_max_batch_size",
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
config.setPipeRemainingTimeCommitRateSmoothingFactor(
Double.parseDouble(
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
Long.parseLong(
properties.getProperty(
"pipe_remaining_time_commit_rate_smoothing_factor",
String.valueOf(config.getPipeRemainingTimeCommitRateSmoothingFactor()))));
"pipe_remaining_time_commit_rate_auto_switch_seconds",
String.valueOf(config.getPipeRemainingTimeCommitRateAutoSwitchSeconds()))));
config.setPipeRemainingTimeCommitRateAverageTime(
PipeRemainingTimeRateAverageTime.valueOf(
properties
.getProperty(
"pipe_remaining_time_commit_rate_average_time",
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
.trim()));
config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
Long.parseLong(

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.enums;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import com.codahale.metrics.Meter;
public enum PipeRemainingTimeRateAverageTime {
ONE_MINUTE,
FIVE_MINUTES,
FIFTEEN_MINUTES,
MEAN;
public double getMeterRate(final Meter meter) {
switch (this) {
case ONE_MINUTE:
return meter.getOneMinuteRate();
case FIVE_MINUTES:
return meter.getFiveMinuteRate();
case FIFTEEN_MINUTES:
return meter.getFifteenMinuteRate();
case MEAN:
return meter.getMeanRate();
default:
throw new UnsupportedOperationException(
String.format(
"The type %s is not supported in average time of pipe remaining time.",
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime()));
}
}
}

View File

@ -272,6 +272,9 @@ public abstract class PipeTaskAgent {
break;
case STOPPED:
if (Objects.requireNonNull(statusInAgent) == PipeStatus.RUNNING) {
// Only freeze rate for user stopped pipes
// Freeze first to get better results in calculation
freezeRate(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
stopPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
} else {
throw new IllegalStateException(
@ -290,6 +293,10 @@ public abstract class PipeTaskAgent {
}
}
protected abstract void thawRate(final String pipeName, final long creationTime);
protected abstract void freezeRate(final String pipeName, final long creationTime);
public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) {
acquireWriteLock();
try {
@ -576,9 +583,11 @@ public abstract class PipeTaskAgent {
.getConsensusGroupId2TaskMetaMap()
.values()
.forEach(PipeTaskMeta::clearExceptionMessages);
thawRate(pipeName, creationTime);
}
protected void stopPipe(final String pipeName, final long creationTime) {
private void stopPipe(final String pipeName, final long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (!checkBeforeStopPipe(existedPipeMeta, pipeName, creationTime)) {
@ -856,6 +865,7 @@ public abstract class PipeTaskAgent {
final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, consensusGroupId);
if (pipeTask != null) {
pipeTask.start();
thawRate(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
}
}

View File

@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.config;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -139,8 +140,12 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
}
public double getPipeRemainingTimeCommitRateSmoothingFactor() {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateSmoothingFactor();
public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAutoSwitchSeconds();
}
public PipeRemainingTimeRateAverageTime getPipeRemainingTimeCommitRateAverageTime() {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
}
/////////////////////////////// Meta Consistency ///////////////////////////////
@ -328,8 +333,10 @@ public class PipeConfig {
getPipeListeningQueueTransferSnapshotThreshold());
LOGGER.info("PipeSnapshotExecutionMaxBatchSize: {}", getPipeSnapshotExecutionMaxBatchSize());
LOGGER.info(
"PipeRemainingTimeCommitRateSmoothingFactor: {}",
getPipeRemainingTimeCommitRateSmoothingFactor());
"PipeRemainingTimeCommitAutoSwitchSeconds: {}",
getPipeRemainingTimeCommitAutoSwitchSeconds());
LOGGER.info(
"PipeRemainingTimeCommitRateAverageTime: {}", getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", getPipeAsyncConnectorMaxClientNumber());

View File

@ -48,6 +48,7 @@ public abstract class EnrichedEvent implements Event {
protected final AtomicBoolean isReleased;
protected final String pipeName;
protected final long creationTime;
protected final PipeTaskMeta pipeTaskMeta;
protected String committerKey;
@ -67,6 +68,7 @@ public abstract class EnrichedEvent implements Event {
protected EnrichedEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pipePattern,
final long startTime,
@ -74,6 +76,7 @@ public abstract class EnrichedEvent implements Event {
referenceCount = new AtomicInteger(0);
isReleased = new AtomicBoolean(false);
this.pipeName = pipeName;
this.creationTime = creationTime;
this.pipeTaskMeta = pipeTaskMeta;
this.pipePattern = pipePattern;
this.startTime = startTime;
@ -129,7 +132,7 @@ public abstract class EnrichedEvent implements Event {
* {@code false} if the {@link EnrichedEvent} is not controlled by the invoker, which means
* the data stored in the event is not safe to use
*/
public abstract boolean internallyIncreaseResourceReferenceCount(String holderMessage);
public abstract boolean internallyIncreaseResourceReferenceCount(final String holderMessage);
/**
* Decrease the {@link EnrichedEvent#referenceCount} of this {@link EnrichedEvent} by 1. If the
@ -208,7 +211,7 @@ public abstract class EnrichedEvent implements Event {
* @return {@code true} if the {@link EnrichedEvent#referenceCount} is decreased successfully,
* {@code true} otherwise
*/
public abstract boolean internallyDecreaseResourceReferenceCount(String holderMessage);
public abstract boolean internallyDecreaseResourceReferenceCount(final String holderMessage);
protected void reportProgress() {
if (pipeTaskMeta != null) {
@ -245,6 +248,14 @@ public abstract class EnrichedEvent implements Event {
return pipeName;
}
public final long getCreationTime() {
return creationTime;
}
public final boolean isDataRegionEvent() {
return !(this instanceof PipeWritePlanEvent) && !(this instanceof PipeSnapshotEvent);
}
/**
* Get the pattern string of this {@link EnrichedEvent}.
*
@ -291,11 +302,12 @@ public abstract class EnrichedEvent implements Event {
}
public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
String pipeName,
PipeTaskMeta pipeTaskMeta,
PipePattern pattern,
long startTime,
long endTime);
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime);
public PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
@ -315,7 +327,7 @@ public abstract class EnrichedEvent implements Event {
this.commitId = commitId;
}
public void setRebootTimes(int rebootTimes) {
public void setRebootTimes(final int rebootTimes) {
this.rebootTimes = rebootTimes;
}
@ -345,14 +357,14 @@ public abstract class EnrichedEvent implements Event {
* Used for pipeConsensus. In PipeConsensus, we only need committerKey, commitId and rebootTimes
* to uniquely identify an event
*/
public boolean equalsInPipeConsensus(Object o) {
public boolean equalsInPipeConsensus(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EnrichedEvent otherEvent = (EnrichedEvent) o;
final EnrichedEvent otherEvent = (EnrichedEvent) o;
return Objects.equals(committerKey, otherEvent.committerKey)
&& commitId == otherEvent.commitId
&& rebootTimes == otherEvent.rebootTimes;

View File

@ -35,10 +35,11 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent implements Seriali
protected PipeSnapshotEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final PipeSnapshotResourceManager resourceManager) {
super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
this.resourceManager = resourceManager;
}

View File

@ -31,10 +31,11 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
protected PipeWritePlanEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
super(pipeName, creationTime, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
this.isGeneratedByPipe = isGeneratedByPipe;
}

View File

@ -34,11 +34,12 @@ public class ProgressReportEvent extends EnrichedEvent {
public ProgressReportEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pipePattern,
final long startTime,
final long endTime) {
super(pipeName, pipeTaskMeta, pipePattern, startTime, endTime);
super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime);
}
@Override
@ -64,11 +65,13 @@ public class ProgressReportEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
final long endTime) {
return new ProgressReportEvent(pipeName, pipeTaskMeta, pattern, startTime, endTime);
return new ProgressReportEvent(
pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
}
@Override

View File

@ -154,7 +154,12 @@ public abstract class IoTDBNonDataRegionExtractor extends IoTDBExtractor {
historicalEvents
.remove(0)
.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE, Long.MAX_VALUE);
pipeName,
creationTime,
pipeTaskMeta,
pipePattern,
Long.MIN_VALUE,
Long.MAX_VALUE);
if (historicalEvents.isEmpty()) {
// We only report progress for the last snapshot event.
@ -180,7 +185,7 @@ public abstract class IoTDBNonDataRegionExtractor extends IoTDBExtractor {
|| (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
final ProgressReportEvent event =
new ProgressReportEvent(
pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE, Long.MAX_VALUE);
pipeName, creationTime, pipeTaskMeta, pipePattern, Long.MIN_VALUE, Long.MAX_VALUE);
event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 1));
event.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
return event;
@ -189,7 +194,7 @@ public abstract class IoTDBNonDataRegionExtractor extends IoTDBExtractor {
realtimeEvent =
(PipeWritePlanEvent)
realtimeEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
pipeName, pipeTaskMeta, pipePattern, Long.MIN_VALUE, Long.MAX_VALUE);
pipeName, creationTime, pipeTaskMeta, pipePattern, Long.MIN_VALUE, Long.MAX_VALUE);
realtimeEvent.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 1));
realtimeEvent.increaseReferenceCount(IoTDBNonDataRegionExtractor.class.getName());
return realtimeEvent;

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.pipe.metric;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
public abstract class PipeRemainingOperator {
protected static final long REMAINING_MAX_SECONDS = 365 * 24 * 60 * 60L; // 1 year
protected String pipeName;
protected long creationTime = 0;
private long lastEmptyTimeStamp = System.currentTimeMillis();
private long lastNonEmptyTimeStamp = System.currentTimeMillis();
protected boolean isStopped = true;
//////////////////////////// Tags ////////////////////////////
public String getPipeName() {
return pipeName;
}
public long getCreationTime() {
return creationTime;
}
//////////////////////////// Register & deregister (pipe integration) ////////////////////////////
protected void setNameAndCreationTime(final String pipeName, final long creationTime) {
this.pipeName = pipeName;
this.creationTime = creationTime;
}
//////////////////////////// Switch ////////////////////////////
protected void notifyNonEmpty() {
final long pipeRemainingTimeCommitAutoSwitchSeconds =
PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();
lastNonEmptyTimeStamp = System.currentTimeMillis();
if (lastNonEmptyTimeStamp - lastEmptyTimeStamp
>= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
thawRate(false);
}
}
protected void notifyEmpty() {
final long pipeRemainingTimeCommitAutoSwitchSeconds =
PipeConfig.getInstance().getPipeRemainingTimeCommitAutoSwitchSeconds();
lastEmptyTimeStamp = System.currentTimeMillis();
if (lastEmptyTimeStamp - lastNonEmptyTimeStamp
>= pipeRemainingTimeCommitAutoSwitchSeconds * 1000) {
freezeRate(false);
}
}
public synchronized void thawRate(final boolean isStartPipe) {
if (isStartPipe) {
isStopped = false;
}
}
public synchronized void freezeRate(final boolean isStopPipe) {
if (isStopPipe) {
isStopped = true;
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.iotdb.commons.pipe.progress;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
public class PipeEventCommitManager {
@ -38,7 +37,7 @@ public class PipeEventCommitManager {
// key: pipeName_regionId
private final Map<String, PipeEventCommitter> eventCommitterMap = new ConcurrentHashMap<>();
private Consumer<PipeTaskRuntimeEnvironment> commitRateMarker;
private BiConsumer<String, Boolean> commitRateMarker;
public void register(
final String pipeName,
@ -88,10 +87,26 @@ public class PipeEventCommitManager {
}
public void commit(final EnrichedEvent event, final String committerKey) {
if (committerKey == null
|| event == null
if (event == null
|| !event.needToCommit()
|| event.getCommitId() <= EnrichedEvent.NO_COMMIT_ID) {
|| Objects.isNull(event.getPipeName())
|| event.getCreationTime() == 0) {
return;
}
if (Objects.nonNull(commitRateMarker)) {
try {
commitRateMarker.accept(
event.getPipeName() + '_' + event.getCreationTime(), event.isDataRegionEvent());
} catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Failed to mark commit rate for pipe task: {}, stack trace: {}",
committerKey,
Thread.currentThread().getStackTrace());
}
}
}
if (committerKey == null || event.getCommitId() <= EnrichedEvent.NO_COMMIT_ID) {
return;
}
final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
@ -108,20 +123,6 @@ public class PipeEventCommitManager {
}
committer.commit(event);
if (Objects.nonNull(commitRateMarker)) {
try {
commitRateMarker.accept(
new PipeTaskRuntimeEnvironment(
committer.getPipeName(), committer.getCreationTime(), committer.getRegionId()));
} catch (Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Failed to mark commit rate for pipe: {}, stack trace: {}",
committerKey,
Thread.currentThread().getStackTrace());
}
}
}
}
private static String generateCommitterKey(
@ -129,7 +130,7 @@ public class PipeEventCommitManager {
return String.format("%s_%s_%s", pipeName, regionId, creationTime);
}
public void setCommitRateMarker(final Consumer<PipeTaskRuntimeEnvironment> commitRateMarker) {
public void setCommitRateMarker(final BiConsumer<String, Boolean> commitRateMarker) {
this.commitRateMarker = commitRateMarker;
}