BulkLoadJob Should Not Schedule Completed BulkLoadTask (#12030)

* make bulkload job manager logic clear

* bypass task if the task has been completed

* improve scheduleBulkLoadJob
This commit is contained in:
Zhe Wang 2025-03-14 14:52:33 -07:00 committed by GitHub
parent bc4dec8e5d
commit 6ae46b4917
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 181 additions and 117 deletions

View File

@ -68,6 +68,15 @@ ACTOR Future<Void> printPastBulkLoadJob(Database cx) {
return Void();
}
void printBulkLoadJobTotalTaskCount(Optional<uint64_t> count) {
if (count.present()) {
fmt::println("Total {} tasks", count.get());
} else {
fmt::println("Total task count is unknown");
}
return;
}
ACTOR Future<Void> printBulkLoadJobProgress(Database cx, BulkLoadJobState job) {
state Transaction tr(cx);
state Key readBegin = job.getJobRange().begin;
@ -93,11 +102,7 @@ ACTOR Future<Void> printBulkLoadJobProgress(Database cx, BulkLoadJobState job) {
fmt::println("Submitted {} tasks", submitTaskCount);
fmt::println("Finished {} tasks", completeTaskCount);
fmt::println("Error {} tasks", errorTaskCount);
if (totalTaskCount.present()) {
fmt::println("Total {} tasks", totalTaskCount.get());
} else {
fmt::println("Total task count is unknown");
}
printBulkLoadJobTotalTaskCount(totalTaskCount);
if (bulkLoadTask.phase == BulkLoadPhase::Submitted &&
bulkLoadTask.getJobId() != UID::fromString("00000000-0000-0000-0000-000000000000")) {
fmt::println("Job {} has been cancelled or has completed", jobId.toString());
@ -119,11 +124,7 @@ ACTOR Future<Void> printBulkLoadJobProgress(Database cx, BulkLoadJobState job) {
fmt::println("Submitted {} tasks", submitTaskCount);
fmt::println("Finished {} tasks", completeTaskCount);
fmt::println("Error {} tasks", errorTaskCount);
if (totalTaskCount.present()) {
fmt::println("Total {} tasks", totalTaskCount.get());
} else {
fmt::println("Total task count is unknown");
}
printBulkLoadJobTotalTaskCount(totalTaskCount);
return Void();
}

View File

@ -100,7 +100,8 @@ std::string convertBulkLoadJobPhaseToString(const BulkLoadJobPhase& phase) {
} else if (phase == BulkLoadJobPhase::Cancelled) {
return "Cancelled";
} else {
UNREACHABLE();
TraceEvent(SevError, "UnexpectedBulkLoadJobPhase").detail("Val", phase);
return "";
}
}
@ -154,12 +155,15 @@ std::string getBulkLoadJobRoot(const std::string& root, const UID& jobId) {
}
std::string convertBulkLoadTransportMethodToString(BulkLoadTransportMethod method) {
if (method == BulkLoadTransportMethod::CP) {
return "Local file copy";
if (method == BulkLoadTransportMethod::Invalid) {
return "Invalid";
} else if (method == BulkLoadTransportMethod::CP) {
return "LocalFileCopy";
} else if (method == BulkLoadTransportMethod::BLOBSTORE) {
return "Blob store";
return "BlobStore";
} else {
UNREACHABLE();
TraceEvent(SevError, "UnexpectedBulkLoadTransportMethod").detail("Val", method);
return "";
}
}

View File

@ -3280,11 +3280,8 @@ ACTOR Future<Optional<BulkLoadJobState>> getRunningBulkLoadJob(Database cx) {
if (!jobState.isValid()) {
continue;
}
if (jobState.getPhase() == BulkLoadJobPhase::Complete ||
jobState.getPhase() == BulkLoadJobPhase::Error) {
continue;
}
ASSERT(jobState.getPhase() == BulkLoadJobPhase::Submitted);
// If a job is fully completed, the metadata should be removed from bulkLoadJobKeys
// The metadata is added to bulkload history.
return jobState;
}
beginKey = rangeResult.back().key;

View File

@ -767,7 +767,7 @@ private:
// jobRoot is the root path of data store used by bulkload/dump funcationality.
// Given the job manifest file is stored in the jobFolder, the
// jobFolder = getBulkLoadJobRoot(jobRoot, jobId).
BulkLoadJobPhase phase;
BulkLoadJobPhase phase = BulkLoadJobPhase::Invalid;
double submitTime = 0;
double endTime = 0;
Optional<uint64_t> taskCount;

View File

@ -452,7 +452,7 @@ public:
ParallelismLimitor bulkLoadEngineParallelismLimitor;
std::string bulkLoadFolder;
DDBulkLoadJobManager bulkLoadJobManager;
Optional<DDBulkLoadJobManager> bulkLoadJobManager;
bool bulkDumpEnabled = false;
ParallelismLimitor bulkDumpParallelismLimitor;
@ -1486,7 +1486,7 @@ ACTOR Future<Optional<BulkLoadTaskState>> bulkLoadJobFindTask(Reference<DataDist
KeyRange currentRange = Standalone(KeyRangeRef(result[0].key, result[1].key));
ASSERT(result[0].key != result[1].key);
if (bulkLoadTaskState.getRange() != currentRange) {
TraceEvent(SevError, "DDBulkLoadJobRangeMismatch", logId)
TraceEvent(SevError, "DDBulkLoadJobExecutorFindRangeMismatch", logId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("CurrentRange", currentRange)
@ -1515,12 +1515,14 @@ ACTOR Future<BulkLoadTaskState> bulkLoadJobTriggerTask(Reference<DataDistributor
wait(setBulkLoadSubmissionTransaction(&tr, bulkLoadTask));
// setBulkLoadSubmissionTransaction shuts down traffic to the range
wait(tr.commit());
TraceEvent(SevInfo, "DDBulkLoadJobSubmitTask", self->ddId)
Version commitVersion = tr.getCommittedVersion();
TraceEvent(SevInfo, "DDBulkLoadJobExecutorSubmitTask", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("JobId", jobId.toString())
.detail("Manifest", manifest.toString())
.detail("BulkLoadTask", bulkLoadTask.toString());
.detail("BulkLoadTask", bulkLoadTask.toString())
.detail("CommitVersion", commitVersion);
break;
} catch (Error& e) {
wait(tr.onError(e));
@ -1553,7 +1555,7 @@ ACTOR Future<Void> bulkLoadJobWaitUntilTaskCompleteOrError(Reference<DataDistrib
throw bulkload_task_outdated();
}
if (currentTask.phase == BulkLoadPhase::Error) {
TraceEvent(SevWarnAlways, "DDBulkLoadJobUnretrievableError", self->ddId)
TraceEvent(SevWarnAlways, "DDBulkLoadJobExecutorFindUnretrievableError", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("JobId", jobId.toString())
@ -1578,7 +1580,8 @@ ACTOR Future<Void> bulkLoadJobExecuteTask(Reference<DataDistributor> self,
std::string jobRoot,
BulkLoadTransportMethod jobTransportMethod,
std::string manifestLocalTempFolder,
BulkLoadJobFileManifestEntry manifestEntry) {
BulkLoadJobFileManifestEntry manifestEntry,
Promise<Void> errorOut) {
state Database cx = self->txnProcessor->context();
state BulkLoadTaskState bulkLoadTask;
state double beginTime = now();
@ -1595,16 +1598,13 @@ ACTOR Future<Void> bulkLoadJobExecuteTask(Reference<DataDistributor> self,
manifestEntry, manifestLocalTempFolder, jobTransportMethod, jobRoot, self->ddId));
// Step 3: Trigger bulkload task which is handled by bulkload task engine
// Discussion about what if another newer job has persist some task on the range with a different
// job Id. This case should never happens because before the newer job starts, the old job has been
// completed or cancelled.
wait(store(bulkLoadTask, bulkLoadJobTriggerTask(self, jobId, manifest)));
TraceEvent(SevInfo, "DDBulkLoadJobExecuteNewTask", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("JobId", jobId.toString())
.detail("Task", bulkLoadTask.toString())
.detail("Duration", now() - beginTime);
} else {
bulkLoadTask = bulkLoadTask_.get();
TraceEvent(SevInfo, "DDBulkLoadJobExecuteTaskFound", self->ddId)
TraceEvent(SevInfo, "DDBulkLoadJobExecutorTaskFound", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("JobId", jobId.toString())
@ -1612,51 +1612,57 @@ ACTOR Future<Void> bulkLoadJobExecuteTask(Reference<DataDistributor> self,
.detail("Duration", now() - beginTime);
}
if (g_network->isSimulated() && deterministicRandom()->random01() < 0.1) {
TraceEvent(SevWarnAlways, "DDBulkLoadJobExecutorInjectDDRestart", self->ddId);
throw movekeys_conflict(); // improve code coverage
}
// Step 4: Monitor the bulkload completion
wait(bulkLoadJobWaitUntilTaskCompleteOrError(self, jobId, bulkLoadTask));
TraceEvent(SevInfo, "DDBulkLoadJobExecuteTaskEnd", self->ddId)
TraceEvent(SevInfo, "DDBulkLoadJobExecutorTaskEnd", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("JobId", jobId.toString())
.detail("Task", bulkLoadTask.toString())
.detail("Duration", now() - beginTime);
self->bulkLoadParallelismLimitor.decrementTaskCounter();
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevInfo, "DDBulkLoadJobExecuteTaskError", self->ddId)
TraceEvent(SevInfo, "DDBulkLoadJobExecutorTaskError", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.errorUnsuppressed(e)
.detail("JobId", jobId.toString())
.detail("Task", bulkLoadTask.toString())
.detail("Duration", now() - beginTime);
if (e.code() == error_code_movekeys_conflict) {
throw e; // trigger DD restarts, which resets bulkLoadParallelismLimitor
self->bulkLoadParallelismLimitor.decrementTaskCounter();
if (errorOut.canBeSet()) {
errorOut.sendError(e);
}
// Sliently exit for any error
// Currently, all errors here come from the bulkload job mechanism.
// BulkLoad task is guaranteed to be completed by the engine given a task metadata is persisted.
}
self->bulkLoadParallelismLimitor.decrementTaskCounter();
return Void();
}
ACTOR Future<Void> persistBulkLoadJobTaskCount(Reference<DataDistributor> self, UID logId) {
ACTOR Future<Void> persistBulkLoadJobTaskCount(Reference<DataDistributor> self) {
state Database cx = self->txnProcessor->context();
state Transaction tr(cx);
state UID jobId = self->bulkLoadJobManager.jobState.getJobId();
state KeyRange jobRange = self->bulkLoadJobManager.jobState.getJobRange();
ASSERT(self->bulkLoadJobManager.jobState.getTaskCount().present());
state uint64_t taskCount = self->bulkLoadJobManager.jobState.getTaskCount().get();
ASSERT(self->bulkLoadJobManager.present() && self->bulkLoadJobManager.get().isValid());
state BulkLoadJobState jobState = self->bulkLoadJobManager.get().jobState;
state UID jobId = jobState.getJobId();
state KeyRange jobRange = jobState.getJobRange();
ASSERT(jobState.getTaskCount().present());
state uint64_t taskCount = jobState.getTaskCount().get();
state BulkLoadJobState currentJobState;
loop {
try {
wait(store(currentJobState, getBulkLoadJob(&tr, jobId)));
if (currentJobState.getTaskCount().present()) {
if (currentJobState.getTaskCount().get() != taskCount) {
TraceEvent(SevError, "DDBulkLoadJobTaskCountMismatch", logId)
TraceEvent(SevError, "DDBulkLoadJobManagerFindTaskCountMismatch", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("JobID", jobId.toString())
@ -1671,7 +1677,7 @@ ACTOR Future<Void> persistBulkLoadJobTaskCount(Reference<DataDistributor> self,
wait(krmSetRange(&tr, bulkLoadJobPrefix, jobRange, bulkLoadJobValue(currentJobState)));
wait(tr.commit());
Version commitVersion = tr.getCommittedVersion();
TraceEvent(SevInfo, "DDBulkLoadJobPersistTaskCountToJobMetadata", logId)
TraceEvent(SevInfo, "DDBulkLoadJobManagerPersistTaskCountToJobMetadata", self->ddId)
.detail("JobID", jobId.toString())
.detail("JobRange", jobRange)
.detail("CommitVersion", commitVersion)
@ -1690,7 +1696,8 @@ ACTOR Future<Void> moveErrorBulkLoadJobToHistory(Reference<DataDistributor> self
state Database cx = self->txnProcessor->context();
state Transaction tr(cx);
state BulkLoadJobState currentJobState;
state UID jobId = self->bulkLoadJobManager.jobState.getJobId();
ASSERT(self->bulkLoadJobManager.present() && self->bulkLoadJobManager.get().isValid());
state UID jobId = self->bulkLoadJobManager.get().jobState.getJobId();
loop {
try {
wait(checkMoveKeysLock(&tr, self->context->lock, self->context->ddEnabledState.get()));
@ -1716,18 +1723,19 @@ ACTOR Future<Void> fetchBulkLoadTaskManifestEntryMap(Reference<DataDistributor>
BulkLoadTransportMethod jobTransportMethod,
std::string localJobManifestFilePath,
std::string remoteJobManifestFilePath) {
ASSERT(self->bulkLoadJobManager.isValid() && self->bulkLoadJobManager.manifestEntryMap->empty());
ASSERT(self->bulkLoadJobManager.present() && self->bulkLoadJobManager.get().isValid() &&
self->bulkLoadJobManager.get().manifestEntryMap->empty());
state double beginTime = now();
state KeyRange jobRange = self->bulkLoadJobManager.jobState.getJobRange();
state KeyRange jobRange = self->bulkLoadJobManager.get().jobState.getJobRange();
try {
if (!fileExists(abspath(localJobManifestFilePath))) {
TraceEvent(SevDebug, "DDBulkLoadJobDownloadJobManifest", self->ddId)
TraceEvent(SevDebug, "DDBulkLoadJobManagerDownloadJobManifest", self->ddId)
.detail("JobTransportMethod", jobTransportMethod)
.detail("LocalJobManifestFilePath", localJobManifestFilePath)
.detail("RemoteJobManifestFilePath", remoteJobManifestFilePath);
wait(downloadBulkLoadJobManifestFile(
jobTransportMethod, localJobManifestFilePath, remoteJobManifestFilePath, self->ddId));
TraceEvent(SevInfo, "DDBulkLoadJobManifestDownloaded", self->ddId)
TraceEvent(SevInfo, "DDBulkLoadJobManagerManifestDownloaded", self->ddId)
.detail("JobTransportMethod", jobTransportMethod)
.detail("LocalJobManifestFilePath", localJobManifestFilePath)
.detail("RemoteJobManifestFilePath", remoteJobManifestFilePath)
@ -1737,20 +1745,23 @@ ACTOR Future<Void> fetchBulkLoadTaskManifestEntryMap(Reference<DataDistributor>
// This job manifest file stores all remote manifest filepath per range.
// Here, we want to get all manifest entries of the file with in the range specified by jobRange.
wait(getBulkLoadJobFileManifestEntryFromJobManifestFile(
localJobManifestFilePath, jobRange, self->ddId, /*output=*/self->bulkLoadJobManager.manifestEntryMap));
self->bulkLoadJobManager.jobState.setTaskCount(self->bulkLoadJobManager.manifestEntryMap->size());
TraceEvent(SevInfo, "DDBulkLoadJobManifestMapBuilt", self->ddId)
localJobManifestFilePath,
jobRange,
self->ddId,
/*output=*/self->bulkLoadJobManager.get().manifestEntryMap));
self->bulkLoadJobManager.get().jobState.setTaskCount(self->bulkLoadJobManager.get().manifestEntryMap->size());
TraceEvent(SevInfo, "DDBulkLoadJobManagerManifestMapBuilt", self->ddId)
.detail("JobTransportMethod", jobTransportMethod)
.detail("LocalJobManifestFilePath", localJobManifestFilePath)
.detail("RemoteJobManifestFilePath", remoteJobManifestFilePath)
.detail("TaskCount", self->bulkLoadJobManager.manifestEntryMap->size())
.detail("TaskCount", self->bulkLoadJobManager.get().manifestEntryMap->size())
.detail("Duration", now() - beginTime);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
state Error err = e;
TraceEvent(SevWarnAlways, "DDBulkLoadJobManifestUnretrievableError", self->ddId)
TraceEvent(SevWarnAlways, "DDBulkLoadJobManagerFindUnretrievableError", self->ddId)
.errorUnsuppressed(err)
.detail("JobTransportMethod", jobTransportMethod)
.detail("LocalJobManifestFilePath", localJobManifestFilePath)
@ -1762,7 +1773,7 @@ ACTOR Future<Void> fetchBulkLoadTaskManifestEntryMap(Reference<DataDistributor>
". The transport method is " +
convertBulkLoadTransportMethodToString(jobTransportMethod) + ".";
wait(moveErrorBulkLoadJobToHistory(self, errorMessage));
TraceEvent(SevWarnAlways, "DDBulkLoadJobManifestUnretrievableErrorPersisted", self->ddId)
TraceEvent(SevWarnAlways, "DDBulkLoadJobManagerPersistUnretrievableError", self->ddId)
.errorUnsuppressed(err)
.detail("JobTransportMethod", jobTransportMethod)
.detail("LocalJobManifestFilePath", localJobManifestFilePath)
@ -1773,50 +1784,75 @@ ACTOR Future<Void> fetchBulkLoadTaskManifestEntryMap(Reference<DataDistributor>
return Void();
}
ACTOR Future<Void> scheduleBulkLoadJob(Reference<DataDistributor> self, BulkLoadJobState jobState) {
ACTOR Future<Void> scheduleBulkLoadJob(Reference<DataDistributor> self, Promise<Void> errorOut) {
state BulkLoadJobFileManifestEntry manifestEntry;
state Key beginKeyToDispatch = jobState.getJobRange().begin;
ASSERT(self->bulkLoadJobManager.present() && self->bulkLoadJobManager.get().isValid());
state BulkLoadJobState jobState = self->bulkLoadJobManager.get().jobState;
state Key beginKey = jobState.getJobRange().begin;
state std::vector<Future<Void>> actors;
state Database cx = self->txnProcessor->context();
state Transaction tr(cx);
// We load the bulkload task from the job manifest.
// The job manifest is organized in a sorted map. The key is the beginKey of the manifest.
// The value is the manifest. For details, please see comments in getBulkLoadJobManifestData.
loop {
auto it = self->bulkLoadJobManager.manifestEntryMap->find(beginKeyToDispatch);
ASSERT(it != self->bulkLoadJobManager.manifestEntryMap->end());
manifestEntry = it->second;
// Limit parallelism
loop {
if (self->bulkLoadParallelismLimitor.canStart()) {
try {
state RangeResult res =
wait(krmGetRanges(&tr, bulkLoadTaskPrefix, KeyRangeRef(beginKey, jobState.getJobRange().end)));
state int i = 0;
for (; i < res.size() - 1; i++) {
if (!res[i].value.empty()) {
BulkLoadTaskState task = decodeBulkLoadTaskState(res[i].value);
if (task.isValid()) {
if (task.getJobId() != self->bulkLoadJobManager.get().jobState.getJobId()) {
throw bulkload_task_outdated();
}
if (task.onAnyPhase(
{ BulkLoadPhase::Complete, BulkLoadPhase::Acknowledged, BulkLoadPhase::Error })) {
auto it = self->bulkLoadJobManager.get().manifestEntryMap->find(beginKey);
ASSERT(it != self->bulkLoadJobManager.get().manifestEntryMap->end());
manifestEntry = it->second;
beginKey = manifestEntry.getEndKey();
continue; // Bypass completed tasks
}
}
}
// Limit parallelism
loop {
if (self->bulkLoadParallelismLimitor.canStart()) {
break;
}
wait(self->bulkLoadParallelismLimitor.waitUntilCounterChanged());
}
auto it = self->bulkLoadJobManager.get().manifestEntryMap->find(beginKey);
ASSERT(it != self->bulkLoadJobManager.get().manifestEntryMap->end());
manifestEntry = it->second;
actors.push_back(bulkLoadJobExecuteTask(self,
jobState.getJobId(),
jobState.getJobRoot(),
jobState.getTransportMethod(),
self->bulkLoadJobManager.get().manifestLocalTempFolder,
manifestEntry,
errorOut));
beginKey = manifestEntry.getEndKey();
}
if (beginKey == jobState.getJobRange().end) {
// last round
break;
}
wait(self->bulkLoadParallelismLimitor.waitUntilCounterChanged());
}
ASSERT(self->bulkLoadJobManager.jobState.getJobId() == jobState.getJobId());
TraceEvent(SevInfo, "DDBulkLoadDispatchTask", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("ManifestEntry", manifestEntry.toString());
actors.push_back(bulkLoadJobExecuteTask(self,
jobState.getJobId(),
jobState.getJobRoot(),
jobState.getTransportMethod(),
self->bulkLoadJobManager.manifestLocalTempFolder,
manifestEntry));
if (manifestEntry.getEndKey() == jobState.getJobRange().end) {
// last round
break;
} else {
beginKeyToDispatch = manifestEntry.getEndKey();
wait(delay(0.1));
} catch (Error& e) {
wait(tr.onError(e));
}
}
wait(waitForAll(actors));
return Void();
}
ACTOR Future<bool> checkBulkLoadTaskCompleteOrError(Reference<DataDistributor> self, BulkLoadJobState jobState) {
ACTOR Future<bool> checkBulkLoadTaskCompleteOrError(Reference<DataDistributor> self) {
state Database cx = self->txnProcessor->context();
state Transaction tr(cx);
ASSERT(self->bulkLoadJobManager.present() && self->bulkLoadJobManager.get().isValid());
state BulkLoadJobState jobState = self->bulkLoadJobManager.get().jobState;
state Key beginKey = jobState.getJobRange().begin;
state Key endKey = jobState.getJobRange().end;
state BulkLoadTaskState existTask;
@ -1838,7 +1874,7 @@ ACTOR Future<bool> checkBulkLoadTaskCompleteOrError(Reference<DataDistributor> s
// When start loading a job, the old job metadata must be cleared at first.
// So, any existing bulkload job id must match the running job id.
if (existTask.getJobId() != jobState.getJobId()) {
TraceEvent(SevError, "DDBulkLoadJobIdMisMatch", self->ddId)
TraceEvent(SevError, "DDBulkLoadJobManagerFindIdMisMatch", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Task", existTask.toString())
@ -1846,7 +1882,7 @@ ACTOR Future<bool> checkBulkLoadTaskCompleteOrError(Reference<DataDistributor> s
ASSERT(false);
}
if (existTask.phase == BulkLoadPhase::Error) {
TraceEvent(SevWarnAlways, "DDBulkLoadJobHasErrorTask", self->ddId)
TraceEvent(SevWarnAlways, "DDBulkLoadJobManagerFindErrorTask", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Task", existTask.toString())
@ -1854,7 +1890,7 @@ ACTOR Future<bool> checkBulkLoadTaskCompleteOrError(Reference<DataDistributor> s
continue;
}
if (existTask.phase != BulkLoadPhase::Complete) {
TraceEvent(SevDebug, "DDBulkLoadTaskIncomplete", self->ddId)
TraceEvent(SevDebug, "DDBulkLoadJobManageFindRunningTask", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("ExistTask", existTask.toString())
@ -1870,9 +1906,11 @@ ACTOR Future<bool> checkBulkLoadTaskCompleteOrError(Reference<DataDistributor> s
return true;
}
ACTOR Future<Void> finalizeBulkLoadJob(Reference<DataDistributor> self, BulkLoadJobState jobState) {
ACTOR Future<Void> finalizeBulkLoadJob(Reference<DataDistributor> self) {
state Database cx = self->txnProcessor->context();
state Transaction tr(cx);
ASSERT(self->bulkLoadJobManager.present() && self->bulkLoadJobManager.get().isValid());
state BulkLoadJobState jobState = self->bulkLoadJobManager.get().jobState;
state Key beginKey = jobState.getJobRange().begin;
state Key endKey = jobState.getJobRange().end;
state Optional<Key> lastKey;
@ -1904,7 +1942,7 @@ ACTOR Future<Void> finalizeBulkLoadJob(Reference<DataDistributor> self, BulkLoad
// if no old metadata exists (the old job metadata has been cleared). So, we can stop at this point.
ASSERT(existTask.getJobId() == jobState.getJobId());
if (existTask.phase == BulkLoadPhase::Error) {
TraceEvent(SevWarnAlways, "DDBulkLoadJobStopClearMetadata", self->ddId)
TraceEvent(SevWarnAlways, "DDBulkLoadJobManagerStopClearMetadata", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Job", jobState.toString())
@ -1913,7 +1951,7 @@ ACTOR Future<Void> finalizeBulkLoadJob(Reference<DataDistributor> self, BulkLoad
hasError = true;
} else {
if (existTask.phase != BulkLoadPhase::Complete && existTask.phase != BulkLoadPhase::Acknowledged) {
TraceEvent(SevError, "DDBulkLoadJobWrongTaskPhase", self->ddId)
TraceEvent(SevError, "DDBulkLoadJobManagerWrongTaskPhase", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Task", existTask.toString());
@ -1944,7 +1982,7 @@ ACTOR Future<Void> finalizeBulkLoadJob(Reference<DataDistributor> self, BulkLoad
}
wait(tr.commit());
Version commitVersion = tr.getCommittedVersion();
TraceEvent(SevInfo, "DDBulkLoadJobExecuteTaskFinalized", self->ddId)
TraceEvent(SevInfo, "DDBulkLoadJobManagerFinalizeRange", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("JobCompleteRange", jobCompleteRange)
@ -1964,8 +2002,8 @@ ACTOR Future<Void> bulkLoadJobManager(Reference<DataDistributor> self) {
state Database cx = self->txnProcessor->context();
state Optional<BulkLoadJobState> job = wait(getRunningBulkLoadJob(cx));
if (!job.present()) {
TraceEvent(SevInfo, "DDBulkLoadJobNoJobExist", self->ddId);
self->bulkLoadJobManager = DDBulkLoadJobManager(); // set to empty
TraceEvent(SevInfo, "DDBulkLoadJobManagerNoJobExist", self->ddId);
self->bulkLoadJobManager.reset(); // set to empty
return Void();
}
state UID jobId = job.get().getJobId();
@ -1973,16 +2011,21 @@ ACTOR Future<Void> bulkLoadJobManager(Reference<DataDistributor> self) {
state std::string jobRoot = job.get().getJobRoot();
state BulkLoadTransportMethod jobTransportMethod = job.get().getTransportMethod();
// If a bulkload job is not finished, create bulkLoadJobManager and run scheduleBulkLoadJob
if (job.get().getPhase() != BulkLoadJobPhase::Complete && job.get().getPhase() != BulkLoadJobPhase::Error) {
TraceEvent(SevInfo, "DDBulkLoadFoundJobToRun", self->ddId)
.detail("JobId", jobId)
.detail("JobRange", jobRange)
.detail("JobRoot", jobRoot)
.detail("JobTransportMethod", jobTransportMethod);
// Build up bulkLoadJobManager if a new job starts or the bulkLoadJobManager has not been set up.
if (!self->bulkLoadJobManager.present() || self->bulkLoadJobManager.get().jobState.getJobId() != jobId) {
TraceEvent(SevInfo, "DDBulkLoadJobManagerBuild", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("OldJob",
self->bulkLoadJobManager.present() ? self->bulkLoadJobManager.get().jobState.toString()
: "No old job")
.detail("NewJobId", jobId)
.detail("NewJobRange", jobRange)
.detail("NewJobRoot", jobRoot)
.detail("NewJobTransportMethod", jobTransportMethod);
// Set up all metadata and information required to run the job.
std::string localFolder = getBulkLoadJobRoot(self->bulkLoadFolder, jobId);
state std::string manifestLocalTempFolder = abspath(joinPath(localFolder, "manifest-temp"));
std::string manifestLocalTempFolder = abspath(joinPath(localFolder, "manifest-temp"));
resetFileFolder(manifestLocalTempFolder);
std::string remoteFolder = getBulkLoadJobRoot(jobRoot, jobId);
std::string jobManifestFileName = getBulkLoadJobManifestFileName();
@ -1991,24 +2034,43 @@ ACTOR Future<Void> bulkLoadJobManager(Reference<DataDistributor> self) {
self->bulkLoadJobManager = DDBulkLoadJobManager(job.get(), manifestLocalTempFolder);
wait(fetchBulkLoadTaskManifestEntryMap(
self, jobTransportMethod, localJobManifestFilePath, remoteJobManifestFilePath));
wait(persistBulkLoadJobTaskCount(self, self->ddId));
wait(persistBulkLoadJobTaskCount(self));
} else {
TraceEvent(SevInfo, "DDBulkLoadJobManagerExist", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Job", self->bulkLoadJobManager.get().jobState.toString());
}
// Check if all bulkload tasks are marked as complete or error.
// If yes, acknowledge complete tasks and leave error tasks there.
// At this point, bulkLoadJobManager must be available.
ASSERT(self->bulkLoadJobManager.present() && self->bulkLoadJobManager.get().isValid());
// Check if all bulkload tasks are marked as complete or error
// If yes, acknowledge complete tasks and leave error tasks there
// We turn on the traffic and shard boundary change only for completed range when bulkload
// on all ranges have been completed or error
state Promise<Void> errorOut; // Capture errors from bulkLoadJobExecuteTask
loop {
bool complete = wait(checkBulkLoadTaskCompleteOrError(self, self->bulkLoadJobManager.jobState));
bool complete = wait(checkBulkLoadTaskCompleteOrError(self));
if (complete) {
TraceEvent(SevInfo, "DDBulkLoadJobAllComplete", self->ddId)
.detail("JobId", jobId)
.detail("JobRange", jobRange);
wait(finalizeBulkLoadJob(self, self->bulkLoadJobManager.jobState));
TraceEvent(SevInfo, "DDBulkLoadJobManagerAllTaskComplete", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Job", self->bulkLoadJobManager.get().jobState.toString());
wait(finalizeBulkLoadJob(self));
break; // end
} else {
wait(scheduleBulkLoadJob(self, self->bulkLoadJobManager.jobState));
TraceEvent(SevInfo, "DDBulkLoadJobJobDispatched", self->ddId)
.detail("JobId", jobId)
.detail("JobRange", jobRange);
std::vector<Future<Void>> actors;
actors.push_back(errorOut.getFuture());
actors.push_back(scheduleBulkLoadJob(self, errorOut));
wait(waitForAny(actors));
// Any error in bulkLoadJobExecuteTask will cause cancellation of scheduleBulkLoadJob
// Note that bulkLoadJobExecuteTask simply does transaction to create and monitor
// the bulkload task. The error is expected to be bulkload_task_outdated error.
TraceEvent(SevInfo, "DDBulkLoadJobManagerTaskDispatched", self->ddId)
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Job", self->bulkLoadJobManager.get().jobState.toString());
}
wait(delay(SERVER_KNOBS->DD_BULKLOAD_SCHEDULE_MIN_INTERVAL_SEC));
}
@ -2024,7 +2086,7 @@ ACTOR Future<Void> bulkLoadJobCore(Reference<DataDistributor> self, Future<Void>
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "DDBulkLoadingCoreError", self->ddId).errorUnsuppressed(e);
TraceEvent(SevWarn, "DDBulkLoadJobManagerError", self->ddId).errorUnsuppressed(e);
if (e.code() == error_code_movekeys_conflict) {
throw e;
}