add level for DDBulkLoad except for datadistribution
This commit is contained in:
parent
6ae46b4917
commit
69b4737351
|
@ -202,6 +202,8 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS, true );
|
||||
init( CONSISTENCY_CHECK_REQUIRED_REPLICAS, -2 ); // Do consistency check based on all available storage replicas
|
||||
init( BULKLOAD_JOB_HISTORY_COUNT_MAX, 10 ); if (randomize && BUGGIFY) BULKLOAD_JOB_HISTORY_COUNT_MAX = deterministicRandom()->randomInt(1, 10);
|
||||
init( BULKLOAD_VERBOSE_LEVEL, 10 );
|
||||
init( S3CLIENT_VERBOSE_LEVEL, 10 );
|
||||
|
||||
// Configuration
|
||||
init( DEFAULT_AUTO_COMMIT_PROXIES, 3 );
|
||||
|
|
|
@ -2822,7 +2822,9 @@ ACTOR Future<int> setBulkLoadMode(Database cx, int mode) {
|
|||
tr.set(moveKeysLockWriteKey, wrLastWrite.toValue());
|
||||
tr.set(bulkLoadModeKey, wr.toValue());
|
||||
wait(tr.commit());
|
||||
TraceEvent("DDBulkLoadEngineModeKeyChanged").detail("NewMode", mode).detail("OldMode", oldMode);
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadEngineModeKeyChanged")
|
||||
.detail("NewMode", mode)
|
||||
.detail("OldMode", oldMode);
|
||||
}
|
||||
return oldMode;
|
||||
} catch (Error& e) {
|
||||
|
@ -2928,7 +2930,7 @@ ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
|
|||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
RangeResult result = wait(krmGetRanges(tr, bulkLoadTaskPrefix, range));
|
||||
if (result.size() > 2) {
|
||||
TraceEvent(SevInfo, "GetBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "GetBulkLoadTaskError")
|
||||
.detail("Reason", "TooManyRanges")
|
||||
.detail("Range", printable(range))
|
||||
.detail("Size", result.size())
|
||||
|
@ -2936,7 +2938,7 @@ ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
|
|||
.backtrace();
|
||||
throw bulkload_task_outdated();
|
||||
} else if (result[0].value.empty()) {
|
||||
TraceEvent(SevInfo, "GetBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "GetBulkLoadTaskError")
|
||||
.detail("Reason", "EmptyValue")
|
||||
.detail("Range", printable(range))
|
||||
.detail("TaskId", taskId.toString())
|
||||
|
@ -2946,7 +2948,7 @@ ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
|
|||
ASSERT(result.size() == 2);
|
||||
bulkLoadTaskState = decodeBulkLoadTaskState(result[0].value);
|
||||
if (!bulkLoadTaskState.isValid()) {
|
||||
TraceEvent(SevInfo, "GetBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "GetBulkLoadTaskError")
|
||||
.detail("Reason", "HasBeenCleared")
|
||||
.detail("Range", printable(range))
|
||||
.detail("TaskId", taskId.toString())
|
||||
|
@ -2956,7 +2958,7 @@ ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
|
|||
ASSERT(bulkLoadTaskState.getTaskId().isValid());
|
||||
if (taskId != bulkLoadTaskState.getTaskId()) {
|
||||
// This task is overwritten by a newer task
|
||||
TraceEvent(SevInfo, "GetBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "GetBulkLoadTaskError")
|
||||
.detail("Reason", "TaskIdMismatch")
|
||||
.detail("Range", printable(range))
|
||||
.detail("TaskId", taskId.toString())
|
||||
|
@ -2968,7 +2970,7 @@ ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
|
|||
if (bulkLoadTaskState.getRange() != currentRange) {
|
||||
// This task is partially overwritten by a newer task
|
||||
ASSERT(bulkLoadTaskState.getRange().contains(currentRange));
|
||||
TraceEvent(SevInfo, "GetBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "GetBulkLoadTaskError")
|
||||
.detail("Reason", "RangeMismatch")
|
||||
.detail("Range", printable(range))
|
||||
.detail("TaskId", taskId.toString())
|
||||
|
@ -2978,7 +2980,7 @@ ACTOR Future<BulkLoadTaskState> getBulkLoadTask(Transaction* tr,
|
|||
throw bulkload_task_outdated();
|
||||
}
|
||||
if (phases.size() > 0 && !bulkLoadTaskState.onAnyPhase(phases)) {
|
||||
TraceEvent(SevInfo, "GetBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "GetBulkLoadTaskError")
|
||||
.detail("Reason", "PhaseMismatch")
|
||||
.detail("Range", printable(range))
|
||||
.detail("TaskId", taskId.toString())
|
||||
|
@ -3208,7 +3210,7 @@ ACTOR Future<Void> submitBulkLoadJob(Database cx, BulkLoadJobState jobState) {
|
|||
// There is at most one bulkLoad job or bulkDump job at a time globally
|
||||
Optional<BulkDumpState> aliveBulkDumpJob = wait(getSubmittedBulkDumpJob(&tr));
|
||||
if (aliveBulkDumpJob.present()) {
|
||||
TraceEvent(SevInfo, "SubmitBulkLoadJobFailed")
|
||||
TraceEvent(SevWarn, "SubmitBulkLoadJobFailed")
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("Reason", "Conflict to a running BulkDump job")
|
||||
|
@ -3221,7 +3223,7 @@ ACTOR Future<Void> submitBulkLoadJob(Database cx, BulkLoadJobState jobState) {
|
|||
if (aliveJob.get().getJobId() == jobState.getJobId()) {
|
||||
return Void(); // The job has been submitted.
|
||||
}
|
||||
TraceEvent(SevInfo, "SubmitBulkLoadJobFailed")
|
||||
TraceEvent(SevWarn, "SubmitBulkLoadJobFailed")
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("Reason", "Conflict to a running BulkLoad job")
|
||||
|
@ -3368,7 +3370,7 @@ ACTOR Future<int> setBulkDumpMode(Database cx, int mode) {
|
|||
tr.set(moveKeysLockWriteKey, wrLastWrite.toValue());
|
||||
tr.set(bulkDumpModeKey, wr.toValue());
|
||||
wait(tr.commit());
|
||||
TraceEvent("DDBulkDumpModeKeyChanged").detail("NewMode", mode).detail("OldMode", oldMode);
|
||||
TraceEvent(SevInfo, "DDBulkDumpModeKeyChanged").detail("NewMode", mode).detail("OldMode", oldMode);
|
||||
}
|
||||
return oldMode;
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -197,7 +197,7 @@ ACTOR static Future<Void> copyUpFile(Reference<S3BlobStoreEndpoint> endpoint,
|
|||
state int64_t size = fileSize(filepath);
|
||||
|
||||
try {
|
||||
TraceEvent(SevInfo, "S3ClientCopyUpFileStart")
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientCopyUpFileStart")
|
||||
.detail("Filepath", filepath)
|
||||
.detail("Bucket", bucket)
|
||||
.detail("ObjectName", objectName)
|
||||
|
@ -283,7 +283,7 @@ ACTOR static Future<Void> copyUpFile(Reference<S3BlobStoreEndpoint> endpoint,
|
|||
tags[S3_CHECKSUM_TAG_NAME] = checksum;
|
||||
wait(endpoint->putObjectTags(bucket, objectName, tags));
|
||||
|
||||
TraceEvent(SevInfo, "S3ClientCopyUpFileEnd")
|
||||
TraceEvent(s3PerfEventSev(), "S3ClientCopyUpFileEnd")
|
||||
.detail("Bucket", bucket)
|
||||
.detail("ObjectName", objectName)
|
||||
.detail("FileSize", size)
|
||||
|
@ -331,7 +331,7 @@ ACTOR Future<Void> copyUpDirectory(std::string dirpath, std::string s3url) {
|
|||
state std::string bucket = parameters["bucket"];
|
||||
state std::vector<std::string> files;
|
||||
platform::findFilesRecursively(dirpath, files);
|
||||
TraceEvent("S3ClientUploadDirStart")
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientUploadDirStart")
|
||||
.detail("Filecount", files.size())
|
||||
.detail("Bucket", bucket)
|
||||
.detail("Resource", resource);
|
||||
|
@ -340,7 +340,7 @@ ACTOR Future<Void> copyUpDirectory(std::string dirpath, std::string s3url) {
|
|||
std::string s3path = resource + "/" + file.substr(dirpath.size() + 1);
|
||||
wait(copyUpFile(endpoint, bucket, s3path, filepath));
|
||||
}
|
||||
TraceEvent("S3ClientUploadDirEnd").detail("Bucket", bucket).detail("Resource", resource);
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientUploadDirEnd").detail("Bucket", bucket).detail("Resource", resource);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -351,7 +351,7 @@ ACTOR Future<Void> copyUpBulkDumpFileSet(std::string s3url,
|
|||
S3BlobStoreEndpoint::ParametersT parameters;
|
||||
state Reference<S3BlobStoreEndpoint> endpoint = getEndpoint(s3url, resource, parameters);
|
||||
state std::string bucket = parameters["bucket"];
|
||||
TraceEvent("S3ClientCopyUpBulkDumpFileSetStart")
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientCopyUpBulkDumpFileSetStart")
|
||||
.detail("Bucket", bucket)
|
||||
.detail("SourceFileSet", sourceFileSet.toString())
|
||||
.detail("DestinationFileSet", destinationFileSet.toString());
|
||||
|
@ -375,7 +375,7 @@ ACTOR Future<Void> copyUpBulkDumpFileSet(std::string s3url,
|
|||
auto destinationByteSamplePath = joinPath(batch_dir, destinationFileSet.getByteSampleFileName());
|
||||
wait(copyUpFile(endpoint, bucket, destinationByteSamplePath, sourceFileSet.getBytesSampleFileFullPath()));
|
||||
}
|
||||
TraceEvent("S3ClientCopyUpBulkDumpFileSetEnd")
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientCopyUpBulkDumpFileSetEnd")
|
||||
.detail("BatchDir", batch_dir)
|
||||
.detail("NumDeleted", pNumDeleted)
|
||||
.detail("BytesDeleted", pBytesDeleted);
|
||||
|
@ -469,7 +469,7 @@ ACTOR static Future<Void> copyDownFile(Reference<S3BlobStoreEndpoint> endpoint,
|
|||
state std::string expectedChecksum;
|
||||
|
||||
try {
|
||||
TraceEvent(SevInfo, "S3ClientCopyDownFileStart")
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientCopyDownFileStart")
|
||||
.detail("Bucket", bucket)
|
||||
.detail("Object", objectName)
|
||||
.detail("FilePath", filepath);
|
||||
|
@ -549,7 +549,7 @@ ACTOR static Future<Void> copyDownFile(Reference<S3BlobStoreEndpoint> endpoint,
|
|||
// Close file properly
|
||||
file = Reference<IAsyncFile>();
|
||||
|
||||
TraceEvent(SevInfo, "S3ClientCopyDownFileEnd")
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientCopyDownFileEnd")
|
||||
.detail("Bucket", bucket)
|
||||
.detail("Object", objectName)
|
||||
.detail("FileSize", fileSize)
|
||||
|
@ -597,7 +597,7 @@ ACTOR Future<Void> copyDownDirectory(std::string s3url, std::string dirpath) {
|
|||
state std::string bucket = parameters["bucket"];
|
||||
S3BlobStoreEndpoint::ListResult items = wait(endpoint->listObjects(bucket, resource));
|
||||
state std::vector<S3BlobStoreEndpoint::ObjectInfo> objects = items.objects;
|
||||
TraceEvent("S3ClientDownDirectoryStart")
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientDownDirectoryStart")
|
||||
.detail("Filecount", objects.size())
|
||||
.detail("Bucket", bucket)
|
||||
.detail("Resource", resource);
|
||||
|
@ -606,7 +606,7 @@ ACTOR Future<Void> copyDownDirectory(std::string s3url, std::string dirpath) {
|
|||
std::string s3path = object.name;
|
||||
wait(copyDownFile(endpoint, bucket, s3path, filepath));
|
||||
}
|
||||
TraceEvent("S3ClientDownDirectoryEnd").detail("Bucket", bucket).detail("Resource", resource);
|
||||
TraceEvent(s3VerboseEventSev(), "S3ClientDownDirectoryEnd").detail("Bucket", bucket).detail("Resource", resource);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -29,8 +29,19 @@
|
|||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
|
||||
// For all trace events for bulkload/dump operations
|
||||
inline Severity bulkLoadVerboseEventSev() {
|
||||
return !g_network->isSimulated() && CLIENT_KNOBS->BULKLOAD_VERBOSE_LEVEL >= 10 ? SevInfo : SevDebug;
|
||||
}
|
||||
|
||||
// For all trace events measuring the performance of bulkload/dump operations
|
||||
inline Severity bulkLoadPerfEventSev() {
|
||||
return !g_network->isSimulated() && CLIENT_KNOBS->BULKLOAD_VERBOSE_LEVEL >= 5 ? SevInfo : SevDebug;
|
||||
}
|
||||
|
||||
const std::string bulkLoadJobManifestLineTerminator = "\n";
|
||||
|
||||
// Remove the input prefix from the input str. Throw bulkload_manifest_decode_error if no prefix found in str.
|
||||
|
@ -269,7 +280,7 @@ public:
|
|||
.detail("FileSet", toString());
|
||||
throw bulkload_fileset_invalid_filepath();
|
||||
} else {
|
||||
TraceEvent(SevInfo, "BulkLoadFileSetProvideDataFile")
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "BulkLoadFileSetProvideDataFile")
|
||||
.detail("DataFile", dataFileName)
|
||||
.detail("Relative", getRelativePath())
|
||||
.detail("Folder", getFolder());
|
||||
|
|
|
@ -204,6 +204,9 @@ public:
|
|||
int BULKLOAD_JOB_HISTORY_COUNT_MAX; // the max number of bulk load job history to keep. The oldest job history will
|
||||
// be removed when the count exceeds this value. Set to 0 to disable history.
|
||||
// Do not set the value to a large number, e.g. <= 10.
|
||||
int BULKLOAD_VERBOSE_LEVEL; // Set to 1 to minimize the verbosity. Set to 5 to turn on all events for performance
|
||||
// insights. Set to 10 to turn on all events.
|
||||
int S3CLIENT_VERBOSE_LEVEL; // Similar to BULKLOAD_VERBOSE_LEVEL
|
||||
|
||||
// Configuration
|
||||
int32_t DEFAULT_AUTO_COMMIT_PROXIES;
|
||||
|
|
|
@ -45,6 +45,16 @@
|
|||
// TODO: Handle prefix as a parameter on the URL so can strip the first part
|
||||
// of the resource from the blobstore URL.
|
||||
|
||||
// For all trace events for s3 client operations
|
||||
inline Severity s3VerboseEventSev() {
|
||||
return !g_network->isSimulated() && CLIENT_KNOBS->S3CLIENT_VERBOSE_LEVEL >= 10 ? SevInfo : SevDebug;
|
||||
}
|
||||
|
||||
// For all trace events measuring the performance of s3 client operations
|
||||
inline Severity s3PerfEventSev() {
|
||||
return !g_network->isSimulated() && CLIENT_KNOBS->S3CLIENT_VERBOSE_LEVEL >= 5 ? SevInfo : SevDebug;
|
||||
}
|
||||
|
||||
const std::string BLOBSTORE_PREFIX = "blobstore://";
|
||||
|
||||
// Copy the directory content from the local filesystem up to s3.
|
||||
|
|
|
@ -209,7 +209,7 @@ ACTOR Future<bool> doBytesSamplingOnDataFile(std::string dataFileFullPath, // in
|
|||
retryCount++;
|
||||
}
|
||||
}
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskSamplingComplete", logId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskSamplingComplete", logId)
|
||||
.detail("DataFileFullPath", dataFileFullPath)
|
||||
.detail("ByteSampleFileFullPath", byteSampleFileFullPath)
|
||||
.detail("Duration", now() - startTime)
|
||||
|
@ -311,7 +311,7 @@ ACTOR Future<BulkLoadFileSet> bulkLoadDownloadTaskFileSet(BulkLoadTransportMetho
|
|||
} else {
|
||||
UNREACHABLE();
|
||||
}
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskDownloadFileSet", logId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskDownloadFileSet", logId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("FromRemoteFileSet", fromRemoteFileSet.toString())
|
||||
|
@ -360,7 +360,7 @@ ACTOR Future<Void> downloadManifestFile(BulkLoadTransportMethod transportMethod,
|
|||
if (!fileExists(abspath(toLocalPath))) {
|
||||
throw retry();
|
||||
}
|
||||
TraceEvent(SevInfo, "BulkLoadDownloadManifestFile", logId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "BulkLoadDownloadManifestFile", logId)
|
||||
.detail("FromRemotePath", fromRemotePath)
|
||||
.detail("ToLocalPath", toLocalPath)
|
||||
.detail("Duration", now() - startTime)
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
#include "fdbserver/DataDistributionTeam.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Buggify.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
|
@ -1052,7 +1053,7 @@ void DDQueue::launchQueuedWork(std::set<RelocateData, std::greater<RelocateData>
|
|||
// for unhealthy. Make the bulk load task visible on the global task map
|
||||
bool doBulkLoading = runPendingBulkLoadTaskWithRelocateData(this, rd);
|
||||
if (doBulkLoading) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskLaunchingDataMove", this->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskLaunchingDataMove", this->distributorId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("NewDataMoveId", rd.dataMoveId)
|
||||
|
@ -1522,7 +1523,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
rd.bulkLoadTask.get().coreState.getRange(),
|
||||
rd.bulkLoadTask.get().coreState.getTaskId(),
|
||||
{ BulkLoadPhase::Triggered, BulkLoadPhase::Running }));
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskDataMoveLaunched", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskDataMoveLaunched", self->distributorId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTask", rd.bulkLoadTask.get().coreState.toString())
|
||||
|
@ -1555,7 +1556,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
DataMoveType dataMoveType = newDataMoveType(doBulkLoading);
|
||||
rd.dataMoveId = newDataMoveId(
|
||||
deterministicRandom()->randomUInt64(), AssignEmptyRange::False, dataMoveType, rd.dmReason);
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskNewDataMoveID", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskNewDataMoveID", self->distributorId)
|
||||
.detail("DataMoveID", rd.dataMoveId.toString())
|
||||
.detail("TrackID", rd.randomId)
|
||||
.detail("Range", rd.keys)
|
||||
|
@ -1652,7 +1653,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
anyHealthy = true;
|
||||
bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second);
|
||||
if (doBulkLoading) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskSelectDestTeam", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskSelectDestTeam", self->distributorId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("Context", "Restore")
|
||||
|
@ -1730,7 +1731,9 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
bestTeamReady = fbestTeam.isReady();
|
||||
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> bestTeam = wait(fbestTeam);
|
||||
if (doBulkLoading) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskRelocatorBestTeamReceived", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(),
|
||||
"DDBulkLoadTaskRelocatorBestTeamReceived",
|
||||
self->distributorId)
|
||||
.detail("DataMoveID", rd.dataMoveId)
|
||||
.detail("BulkLoadTask", rd.bulkLoadTask.get().toString())
|
||||
.detail("BestTeamReady", bestTeamReady);
|
||||
|
@ -1832,7 +1835,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
} else {
|
||||
bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second);
|
||||
if (doBulkLoading) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskSelectDestTeam", self->distributorId)
|
||||
TraceEvent(
|
||||
bulkLoadVerboseEventSev(), "DDBulkLoadTaskSelectDestTeam", self->distributorId)
|
||||
.detail("Context", "New")
|
||||
.detail("SrcIds", describe(rd.src))
|
||||
.detail("DestIds", bestTeam.first.get()->getServerIDs())
|
||||
|
@ -2264,7 +2268,8 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
if (doBulkLoading) {
|
||||
try {
|
||||
self->bulkLoadTaskCollection->terminateTask(rd.bulkLoadTask.get().coreState);
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskRelocatorComplete", self->distributorId)
|
||||
TraceEvent(
|
||||
bulkLoadVerboseEventSev(), "DDBulkLoadTaskRelocatorComplete", self->distributorId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("Dests", describe(destIds))
|
||||
|
@ -2274,7 +2279,9 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
if (bulkLoadError.code() != error_code_bulkload_task_outdated) {
|
||||
throw bulkLoadError;
|
||||
}
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskRelocatorCompleteButOutdated", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(),
|
||||
"DDBulkLoadTaskRelocatorCompleteButOutdated",
|
||||
self->distributorId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("Dests", describe(destIds))
|
||||
|
@ -2284,7 +2291,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
return Void();
|
||||
} else {
|
||||
if (doBulkLoading) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskRelocatorError")
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskRelocatorError")
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.errorUnsuppressed(error)
|
||||
|
|
|
@ -993,8 +993,9 @@ void createShardToBulkLoad(DataDistributionTracker* self,
|
|||
KeyRange keys = bulkLoadTaskState.getRange();
|
||||
ASSERT(!keys.empty());
|
||||
bool issueDataMoveForCancel = cancelledDataMovePriority.present();
|
||||
TraceEvent e(
|
||||
issueDataMoveForCancel ? SevWarnAlways : SevInfo, "DDBulkLoadEngineCreateShardToBulkLoad", self->distributorId);
|
||||
TraceEvent e(issueDataMoveForCancel ? SevWarnAlways : bulkLoadVerboseEventSev(),
|
||||
"DDBulkLoadEngineCreateShardToBulkLoad",
|
||||
self->distributorId);
|
||||
e.detail("TaskId", bulkLoadTaskState.getTaskId());
|
||||
e.detail("BulkLoadRange", keys);
|
||||
// Create shards at the two ends and do not data move for those shards
|
||||
|
|
|
@ -199,7 +199,7 @@ public:
|
|||
// Random selection for load balance
|
||||
ACTOR static Future<Void> getTeamForBulkLoad(DDTeamCollection* self, GetTeamRequest req) {
|
||||
try {
|
||||
TraceEvent(SevInfo, "DDBulkLoadEngineTaskGetTeamReqReceived", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadEngineTaskGetTeamReqReceived", self->distributorId)
|
||||
.detail("TCReady", self->readyToStart.isReady())
|
||||
.detail("TeamBuilderValid", self->teamBuilder.isValid())
|
||||
.detail("TeamBuilderReady", self->teamBuilder.isValid() ? self->teamBuilder.isReady() : false)
|
||||
|
@ -208,7 +208,7 @@ public:
|
|||
.detail("TeamSize", self->teams.size());
|
||||
wait(self->checkBuildTeams());
|
||||
|
||||
TraceEvent(SevInfo, "DDBulkLoadEngineTaskGetTeamCheckBuildTeamDone", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadEngineTaskGetTeamCheckBuildTeamDone", self->distributorId)
|
||||
.detail("TCReady", self->readyToStart.isReady())
|
||||
.detail("TeamBuilderValid", self->teamBuilder.isValid())
|
||||
.detail("TeamBuilderReady", self->teamBuilder.isValid() ? self->teamBuilder.isReady() : false)
|
||||
|
@ -275,7 +275,7 @@ public:
|
|||
Optional<Reference<IDataDistributionTeam>> res;
|
||||
if (candidateTeams.size() >= 1) {
|
||||
res = deterministicRandom()->randomChoice(candidateTeams);
|
||||
TraceEvent(SevInfo, "DDBulkLoadEngineTaskGetTeamReply", self->distributorId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadEngineTaskGetTeamReply", self->distributorId)
|
||||
.detail("TCReady", self->readyToStart.isReady())
|
||||
.detail("SrcIds", describe(req.src))
|
||||
.detail("Primary", self->isPrimary())
|
||||
|
|
|
@ -1123,7 +1123,7 @@ ACTOR Future<std::pair<BulkLoadTaskState, Version>> triggerBulkLoadTask(Referenc
|
|||
bulkLoadTaskStateValue(newBulkLoadTaskState)));
|
||||
wait(tr.commit());
|
||||
Version commitVersion = tr.getCommittedVersion();
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskPersistTriggerState", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskPersistTriggerState", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("CommitVersion", commitVersion)
|
||||
|
@ -1133,7 +1133,7 @@ ACTOR Future<std::pair<BulkLoadTaskState, Version>> triggerBulkLoadTask(Referenc
|
|||
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskPersistTriggerStateError", self->ddId)
|
||||
TraceEvent(SevWarn, "DDBulkLoadTaskPersistTriggerStateError", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -1167,7 +1167,7 @@ ACTOR Future<Void> failBulkLoadTask(Reference<DataDistributor> self,
|
|||
&tr, bulkLoadTaskPrefix, bulkLoadTaskState.getRange(), bulkLoadTaskStateValue(bulkLoadTaskState)));
|
||||
wait(tr.commit());
|
||||
Version commitVersion = tr.getCommittedVersion();
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskPersistErrorState", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskPersistErrorState", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("CommitVersion", commitVersion)
|
||||
|
@ -1175,7 +1175,7 @@ ACTOR Future<Void> failBulkLoadTask(Reference<DataDistributor> self,
|
|||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskPersistErrorStateError", self->ddId)
|
||||
TraceEvent(SevWarn, "DDBulkLoadTaskPersistErrorStateError", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -1202,7 +1202,7 @@ ACTOR Future<Void> doBulkLoadTask(Reference<DataDistributor> self, KeyRange rang
|
|||
std::pair<BulkLoadTaskState, Version> triggeredBulkLoadTask_ = wait(triggerBulkLoadTask(self, range, taskId));
|
||||
triggeredBulkLoadTask = triggeredBulkLoadTask_.first;
|
||||
commitVersion = triggeredBulkLoadTask_.second;
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskDoTask", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskDoTask", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("Phase", "Triggered")
|
||||
|
@ -1256,7 +1256,7 @@ ACTOR Future<Void> doBulkLoadTask(Reference<DataDistributor> self, KeyRange rang
|
|||
// sliently exits
|
||||
}
|
||||
} else {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskDoTask", self->ddId)
|
||||
TraceEvent(bulkLoadPerfEventSev(), "DDBulkLoadTaskDoTask", self->ddId)
|
||||
.detail("Phase", "Task complete")
|
||||
.detail("Range", range)
|
||||
.detail("TaskID", taskId.toString())
|
||||
|
@ -1294,7 +1294,7 @@ ACTOR Future<Void> eraseBulkLoadTask(Reference<DataDistributor> self, KeyRange t
|
|||
&tr, bulkLoadTaskPrefix, taskRange, normalKeys, bulkLoadTaskStateValue(BulkLoadTaskState())));
|
||||
wait(tr.commit());
|
||||
Version commitVersion = tr.getCommittedVersion();
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskEraseState", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskEraseState", self->ddId)
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("TaskRange", taskRange)
|
||||
.detail("TaskId", taskId.toString());
|
||||
|
@ -1305,7 +1305,7 @@ ACTOR Future<Void> eraseBulkLoadTask(Reference<DataDistributor> self, KeyRange t
|
|||
// When cancelledDataMovePriority is set, we want to issue a data move. For details, see comments at
|
||||
// cancelledDataMovePriority of BulkLoadTaskState.
|
||||
self->triggerShardBulkLoading.send(BulkLoadShardRequest(bulkLoadTask, cancelledDataMovePriority.get()));
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskTriggerShardDatamove", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskTriggerShardDatamove", self->ddId)
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("TaskRange", taskRange)
|
||||
.detail("TaskId", taskId.toString());
|
||||
|
@ -1313,7 +1313,7 @@ ACTOR Future<Void> eraseBulkLoadTask(Reference<DataDistributor> self, KeyRange t
|
|||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskEraseStateError", self->ddId)
|
||||
TraceEvent(SevWarn, "DDBulkLoadTaskEraseStateError", self->ddId)
|
||||
.errorUnsuppressed(e)
|
||||
.detail("TaskRange", taskRange)
|
||||
.detail("TaskId", taskId.toString());
|
||||
|
@ -1372,14 +1372,14 @@ ACTOR Future<Void> scheduleBulkLoadTasks(Reference<DataDistributor> self) {
|
|||
}
|
||||
wait(self->bulkLoadEngineParallelismLimitor.waitUntilCounterChanged());
|
||||
}
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskSchedule", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskSchedule", self->ddId)
|
||||
.detail("Range", bulkLoadTaskState.getRange())
|
||||
.detail("TaskID", bulkLoadTaskState.getTaskId())
|
||||
.detail("Phase", bulkLoadTaskState.phase);
|
||||
bulkLoadActors.push_back(
|
||||
doBulkLoadTask(self, bulkLoadTaskState.getRange(), bulkLoadTaskState.getTaskId()));
|
||||
} else if (bulkLoadTaskState.phase == BulkLoadPhase::Acknowledged) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskClearMetadata", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskClearMetadata", self->ddId)
|
||||
.detail("Range", bulkLoadTaskState.getRange())
|
||||
.detail("TaskID", bulkLoadTaskState.getTaskId());
|
||||
// We do one metadata erase at a time to aviod unnecessary transaction conflicts
|
||||
|
@ -1516,7 +1516,7 @@ ACTOR Future<BulkLoadTaskState> bulkLoadJobTriggerTask(Reference<DataDistributor
|
|||
// setBulkLoadSubmissionTransaction shuts down traffic to the range
|
||||
wait(tr.commit());
|
||||
Version commitVersion = tr.getCommittedVersion();
|
||||
TraceEvent(SevInfo, "DDBulkLoadJobExecutorSubmitTask", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadJobExecutorSubmitTask", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("JobId", jobId.toString())
|
||||
|
@ -1604,7 +1604,7 @@ ACTOR Future<Void> bulkLoadJobExecuteTask(Reference<DataDistributor> self,
|
|||
wait(store(bulkLoadTask, bulkLoadJobTriggerTask(self, jobId, manifest)));
|
||||
} else {
|
||||
bulkLoadTask = bulkLoadTask_.get();
|
||||
TraceEvent(SevInfo, "DDBulkLoadJobExecutorTaskFound", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadJobExecutorTaskFound", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("JobId", jobId.toString())
|
||||
|
@ -1619,7 +1619,7 @@ ACTOR Future<Void> bulkLoadJobExecuteTask(Reference<DataDistributor> self,
|
|||
|
||||
// Step 4: Monitor the bulkload completion
|
||||
wait(bulkLoadJobWaitUntilTaskCompleteOrError(self, jobId, bulkLoadTask));
|
||||
TraceEvent(SevInfo, "DDBulkLoadJobExecutorTaskEnd", self->ddId)
|
||||
TraceEvent(bulkLoadPerfEventSev(), "DDBulkLoadJobExecutorTaskEnd", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("JobId", jobId.toString())
|
||||
|
@ -1630,7 +1630,7 @@ ACTOR Future<Void> bulkLoadJobExecuteTask(Reference<DataDistributor> self,
|
|||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
TraceEvent(SevInfo, "DDBulkLoadJobExecutorTaskError", self->ddId)
|
||||
TraceEvent(SevWarn, "DDBulkLoadJobExecutorTaskError", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -2002,7 +2002,7 @@ ACTOR Future<Void> bulkLoadJobManager(Reference<DataDistributor> self) {
|
|||
state Database cx = self->txnProcessor->context();
|
||||
state Optional<BulkLoadJobState> job = wait(getRunningBulkLoadJob(cx));
|
||||
if (!job.present()) {
|
||||
TraceEvent(SevInfo, "DDBulkLoadJobManagerNoJobExist", self->ddId);
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadJobManagerNoJobExist", self->ddId);
|
||||
self->bulkLoadJobManager.reset(); // set to empty
|
||||
return Void();
|
||||
}
|
||||
|
@ -2106,7 +2106,7 @@ ACTOR Future<Void> doBulkDumpTask(Reference<DataDistributor> self,
|
|||
state double beginTime = now();
|
||||
ASSERT(self->bulkDumpParallelismLimitor.canStart());
|
||||
self->bulkDumpParallelismLimitor.incrementTaskCounter();
|
||||
TraceEvent(SevInfo, "DDBulkDumpDoTaskStart", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkDumpDoTaskStart", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("TaskId", bulkDumpState.getTaskId())
|
||||
|
@ -2118,7 +2118,7 @@ ACTOR Future<Void> doBulkDumpTask(Reference<DataDistributor> self,
|
|||
if (vResult.isError()) {
|
||||
throw vResult.getError();
|
||||
}
|
||||
TraceEvent(SevInfo, "DDBulkDumpDoTaskComplete", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkDumpDoTaskComplete", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("TaskId", bulkDumpState.getTaskId())
|
||||
|
@ -2129,7 +2129,7 @@ ACTOR Future<Void> doBulkDumpTask(Reference<DataDistributor> self,
|
|||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
TraceEvent(SevInfo, "DDBulkDumpDoTaskError", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkDumpDoTaskError", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -2180,7 +2180,7 @@ ACTOR Future<Void> scheduleBulkDumpJob(Reference<DataDistributor> self) {
|
|||
KeyRangeRef(bulkDumpResult[bulkDumpResultIndex].key, bulkDumpResult[bulkDumpResultIndex + 1].key));
|
||||
bulkDumpState = decodeBulkDumpState(bulkDumpResult[bulkDumpResultIndex].value);
|
||||
if (!bulkDumpState.isValid() || bulkDumpState.getJobId() != jobId) {
|
||||
TraceEvent(SevInfo, "DDBulkDumpJobScheduleJobOutdated", self->ddId)
|
||||
TraceEvent(SevWarn, "DDBulkDumpJobScheduleJobOutdated", self->ddId)
|
||||
.detail("JobId", jobId)
|
||||
.detail("CurrentJob", bulkDumpState.getJobId());
|
||||
throw bulkdump_task_outdated();
|
||||
|
@ -2201,7 +2201,7 @@ ACTOR Future<Void> scheduleBulkDumpJob(Reference<DataDistributor> self) {
|
|||
ASSERT(bulkDumpState.getPhase() == BulkDumpPhase::Submitted);
|
||||
// Partition the job in the unit of shard
|
||||
wait(store(rangeLocations, self->txnProcessor->getSourceServerInterfacesForRange(bulkDumpRange)));
|
||||
TraceEvent(SevInfo, "DDBulkDumpJobScheduleJobPartition", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkDumpJobScheduleJobPartition", self->ddId)
|
||||
.detail("JobId", jobId)
|
||||
.detail("NumShard", rangeLocations.size())
|
||||
.detail("RangeToRead", rangeToRead)
|
||||
|
@ -2223,7 +2223,7 @@ ACTOR Future<Void> scheduleBulkDumpJob(Reference<DataDistributor> self) {
|
|||
SSBulkDumpTask task = getSSBulkDumpTask(rangeLocations[rangeLocationIndex].servers,
|
||||
bulkDumpState.generateRangeTask(taskRange));
|
||||
// Issue task
|
||||
TraceEvent(SevInfo, "DDBulkDumpJobSpawnRange", self->ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkDumpJobSpawnRange", self->ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("JobId", jobId)
|
||||
|
@ -2460,7 +2460,7 @@ ACTOR Future<Void> bulkDumpCore(Reference<DataDistributor> self, Future<Void> re
|
|||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
TraceEvent(SevInfo, "DDBulkDumpCoreError", self->ddId).errorUnsuppressed(e);
|
||||
TraceEvent(SevWarn, "DDBulkDumpCoreError", self->ddId).errorUnsuppressed(e);
|
||||
if (e.code() == error_code_movekeys_conflict) {
|
||||
throw e;
|
||||
}
|
||||
|
|
|
@ -1895,7 +1895,8 @@ ACTOR static Future<Void> startMoveShards(Database occ,
|
|||
bulkLoadTaskPrefix,
|
||||
newBulkLoadTaskState.getRange(),
|
||||
bulkLoadTaskStateValue(newBulkLoadTaskState)));
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskSetRunningStateTransaction", relocationIntervalId)
|
||||
TraceEvent(
|
||||
bulkLoadVerboseEventSev(), "DDBulkLoadTaskSetRunningStateTransaction", relocationIntervalId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("DataMoveID", dataMoveId)
|
||||
|
@ -1925,7 +1926,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
|
|||
|
||||
if (currentKeys.end == keys.end && bulkLoadTaskState.present()) {
|
||||
Version commitVersion = tr.getCommittedVersion();
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskPersistRunningState", relocationIntervalId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskPersistRunningState", relocationIntervalId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("DataMoveID", dataMoveId)
|
||||
|
@ -2346,7 +2347,8 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
|
|||
bulkLoadTaskPrefix,
|
||||
newBulkLoadTaskState.getRange(),
|
||||
bulkLoadTaskStateValue(newBulkLoadTaskState)));
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskSetCompleteTransaction", relocationIntervalId)
|
||||
TraceEvent(
|
||||
bulkLoadVerboseEventSev(), "DDBulkLoadTaskSetCompleteTransaction", relocationIntervalId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("DataMoveID", dataMoveId)
|
||||
|
@ -2372,7 +2374,8 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
|
|||
|
||||
if (range.end == dataMove.ranges.front().end && bulkLoadTaskState.present()) {
|
||||
Version commitVersion = tr.getCommittedVersion();
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskPersistCompleteState", relocationIntervalId)
|
||||
TraceEvent(
|
||||
bulkLoadVerboseEventSev(), "DDBulkLoadTaskPersistCompleteState", relocationIntervalId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("DataMoveID", dataMoveId)
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_DATA_DISTRIBUTION_ACTOR_G_H)
|
||||
#define FDBSERVER_DATA_DISTRIBUTION_ACTOR_G_H
|
||||
#include "fdbserver/DataDistribution.actor.g.h"
|
||||
|
@ -637,7 +638,7 @@ public:
|
|||
}
|
||||
if (it->value().get().completeAck.canBeSet()) {
|
||||
it->value().get().completeAck.sendError(bulkload_task_outdated());
|
||||
TraceEvent(SevInfo, "DDBulkLoadTaskCollectionPublishTaskOverwriteTask", ddId)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskCollectionPublishTaskOverwriteTask", ddId)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("NewRange", bulkLoadTaskState.getRange())
|
||||
|
|
|
@ -8745,7 +8745,7 @@ ACTOR Future<BulkLoadFileSet> bulkLoadFetchKeyValueFileToLoad(StorageServer* dat
|
|||
std::string dir,
|
||||
BulkLoadTaskState bulkLoadTaskState) {
|
||||
ASSERT(bulkLoadTaskState.getLoadType() == BulkLoadType::SST);
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchSSTFile", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchSSTFile", data->thisServerID)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTask", bulkLoadTaskState.toString())
|
||||
|
@ -8758,7 +8758,7 @@ ACTOR Future<BulkLoadFileSet> bulkLoadFetchKeyValueFileToLoad(StorageServer* dat
|
|||
// Do not need byte sampling locally in fetchKeys
|
||||
const double duration = now() - fetchStartTime;
|
||||
const int64_t totalBytes = bulkLoadTaskState.getTotalBytes();
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchSSTFileFetched", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchSSTFileFetched", data->thisServerID)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTask", bulkLoadTaskState.toString())
|
||||
|
@ -8829,7 +8829,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
{}, SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY ? ReadType::FETCH : ReadType::NORMAL, CacheResult::False);
|
||||
|
||||
if (conductBulkLoad) {
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
.detail("DataMoveId", dataMoveId.toString())
|
||||
.detail("Range", keys)
|
||||
.detail("Phase", "Begin");
|
||||
|
@ -9015,7 +9015,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
rangeEnd = keys.end;
|
||||
}
|
||||
} else if (conductBulkLoad) {
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
.detail("DataMoveId", dataMoveId.toString())
|
||||
.detail("Range", keys)
|
||||
.detail("Phase", "Read task metadata");
|
||||
|
@ -9031,7 +9031,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
// getBulkLoadTaskStateFromDataMove get stuck, this fetchKeys is guaranteed to be cancelled.
|
||||
BulkLoadTaskState bulkLoadTaskState = wait(getBulkLoadTaskStateFromDataMove(
|
||||
data->cx, dataMoveId, /*atLeastVersion=*/data->version.get(), data->thisServerID));
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
.detail("DataMoveId", dataMoveId.toString())
|
||||
.detail("Range", keys)
|
||||
.detail("Phase", "Got task metadata");
|
||||
|
@ -9110,7 +9110,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
wait(data->storage.replaceRange(blockRange, blockData));
|
||||
|
||||
if (conductBulkLoad) {
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
.detail("FKID", interval.pairID)
|
||||
.detail("DataMoveId", dataMoveId.toString())
|
||||
.detail("Range", keys)
|
||||
|
@ -9182,7 +9182,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
shard->server->addShard(ShardInfo::newAdding(
|
||||
data, KeyRangeRef(blockBegin, keys.end), shard->reason, shard->getSSBulkLoadMetadata()));
|
||||
if (conductBulkLoad) {
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID)
|
||||
.detail("FKID", interval.pairID)
|
||||
.detail("DataMoveId", dataMoveId.toString())
|
||||
.detail("Range", keys)
|
||||
|
@ -9562,7 +9562,7 @@ ACTOR Future<Void> bulkLoadFetchShardFileToLoad(StorageServer* data,
|
|||
std::string localRoot,
|
||||
BulkLoadTaskState bulkLoadTaskState) {
|
||||
ASSERT(bulkLoadTaskState.getLoadType() == BulkLoadType::SST);
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchShardFile", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchShardFile", data->thisServerID)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTask", bulkLoadTaskState.toString())
|
||||
|
@ -9588,7 +9588,7 @@ ACTOR Future<Void> bulkLoadFetchShardFileToLoad(StorageServer* data,
|
|||
// Download data file and byte sample file from fromRemoteFileSet to toLocalFileSet
|
||||
state BulkLoadFileSet toLocalFileSet = wait(bulkLoadDownloadTaskFileSet(
|
||||
bulkLoadTaskState.getTransportMethod(), fromRemoteFileSet, localRoot, data->thisServerID));
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchShardSSTFileFetched", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchShardSSTFileFetched", data->thisServerID)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTask", bulkLoadTaskState.toString())
|
||||
|
@ -9598,7 +9598,8 @@ ACTOR Future<Void> bulkLoadFetchShardFileToLoad(StorageServer* data,
|
|||
|
||||
// Step 2: Do byte sampling locally if the remote byte sampling file is not valid nor existing
|
||||
if (!toLocalFileSet.hasByteSampleFile()) {
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchShardSSTFileValidByteSampleNotFound", data->thisServerID)
|
||||
TraceEvent(
|
||||
bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchShardSSTFileValidByteSampleNotFound", data->thisServerID)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTaskState", bulkLoadTaskState.toString())
|
||||
|
@ -9612,7 +9613,7 @@ ACTOR Future<Void> bulkLoadFetchShardFileToLoad(StorageServer* data,
|
|||
toLocalFileSet.setByteSampleFileName(byteSampleFileName);
|
||||
}
|
||||
}
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchShardByteSampled", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchShardByteSampled", data->thisServerID)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTask", bulkLoadTaskState.toString())
|
||||
|
@ -9653,7 +9654,7 @@ ACTOR Future<Void> bulkLoadFetchShardFileToLoad(StorageServer* data,
|
|||
|
||||
const double duration = now() - fetchStartTime;
|
||||
const int64_t totalBytes = getTotalFetchedBytes(moveInShard->meta->checkpoints);
|
||||
TraceEvent(SevInfo, "SSBulkLoadTaskFetchShardSSTFileBuildMetadata", data->thisServerID)
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchShardSSTFileBuildMetadata", data->thisServerID)
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTask", bulkLoadTaskState.toString())
|
||||
|
|
|
@ -109,13 +109,13 @@ struct BulkLoading : TestWorkload {
|
|||
loop {
|
||||
try {
|
||||
wait(submitBulkLoadTask(cx, tasks[i]));
|
||||
TraceEvent("BulkLoadingSubmitBulkLoadTask")
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "BulkLoadingSubmitBulkLoadTask")
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTaskState", tasks[i].toString());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("BulkLoadingSubmitBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "BulkLoadingSubmitBulkLoadTaskError")
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -133,13 +133,13 @@ struct BulkLoading : TestWorkload {
|
|||
loop {
|
||||
try {
|
||||
wait(finalizeBulkLoadTask(cx, tasks[i].getRange(), tasks[i].getTaskId()));
|
||||
TraceEvent("BulkLoadingAcknowledgeBulkLoadTask")
|
||||
TraceEvent(bulkLoadVerboseEventSev(), "BulkLoadingAcknowledgeBulkLoadTask")
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("BulkLoadTaskState", tasks[i].toString());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("BulkLoadingAcknowledgeBulkLoadTaskError")
|
||||
TraceEvent(SevWarn, "BulkLoadingAcknowledgeBulkLoadTaskError")
|
||||
.setMaxEventLength(-1)
|
||||
.setMaxFieldLength(-1)
|
||||
.errorUnsuppressed(e)
|
||||
|
|
Loading…
Reference in New Issue