Merge commit '7c89cd705faee52d5d78e6c77665cb7cc4502f58' into redwood-commit-overlap

This commit is contained in:
Steve Atherton 2022-10-07 11:58:07 -07:00
commit 8ccdc91b5e
55 changed files with 493 additions and 211 deletions

View File

@ -198,6 +198,10 @@ void ApiWorkload::clearTenantData(TTaskFct cont, std::optional<int> tenantId) {
void ApiWorkload::clearData(TTaskFct cont) {
execTransaction(
[this](auto ctx) {
// Make this self-conflicting, so that if we're retrying on timeouts
// once we get a successful commit all previous attempts are no
// longer in-flight.
ctx->tx().addReadConflictRange(keyPrefix, keyPrefix + fdb::Key(1, '\xff'));
ctx->tx().clearRange(keyPrefix, keyPrefix + fdb::Key(1, '\xff'));
ctx->commit();
},

View File

@ -180,24 +180,13 @@ public:
if (databaseCreateErrorInjected && canBeInjectedDatabaseCreateError(err.code())) {
// Failed to create a database because of failure injection
// Restart by recreating the transaction in a valid database
auto thisRef = std::static_pointer_cast<TransactionContextBase>(shared_from_this());
scheduler->schedule([thisRef]() {
fdb::Database db = thisRef->executor->selectDatabase();
thisRef->fdbDb.atomic_store(db);
if (thisRef->transactional) {
if (thisRef->tenantName) {
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
thisRef->fdbTx.atomic_store(tenant.createTransaction());
} else {
thisRef->fdbTx.atomic_store(db.createTransaction());
}
}
thisRef->restartTransaction();
});
recreateAndRestartTransaction();
} else if (transactional) {
onErrorArg = err;
onErrorFuture = tx().onError(err);
handleOnErrorFuture();
} else if (err.retryable()) {
restartTransaction();
} else {
transactionFailed(err);
}
@ -262,6 +251,23 @@ protected:
startFct(shared_from_this());
}
void recreateAndRestartTransaction() {
auto thisRef = std::static_pointer_cast<TransactionContextBase>(shared_from_this());
scheduler->schedule([thisRef]() {
fdb::Database db = thisRef->executor->selectDatabase();
thisRef->fdbDb.atomic_store(db);
if (thisRef->transactional) {
if (thisRef->tenantName) {
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
thisRef->fdbTx.atomic_store(tenant.createTransaction());
} else {
thisRef->fdbTx.atomic_store(db.createTransaction());
}
}
thisRef->restartTransaction();
});
}
// Checks if a transaction can be retried. Fails the transaction if the check fails
bool canRetry(fdb::Error lastErr) {
ASSERT(txState == TxState::ON_ERROR);
@ -671,11 +677,23 @@ public:
try {
std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>(
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true);
ctx = std::make_shared<BlockingTransactionContext>(this,
startFct,
cont,
scheduler,
options.transactionRetryLimit,
bgBasePath,
tenantName,
transactional);
} else {
ctx = std::make_shared<AsyncTransactionContext>(
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true);
ctx = std::make_shared<AsyncTransactionContext>(this,
startFct,
cont,
scheduler,
options.transactionRetryLimit,
bgBasePath,
tenantName,
transactional);
}
startFct(ctx);
} catch (...) {

View File

@ -662,6 +662,13 @@ public:
void clearRange(KeyRef begin, KeyRef end) {
native::fdb_transaction_clear_range(tr.get(), begin.data(), intSize(begin), end.data(), intSize(end));
}
void addReadConflictRange(KeyRef begin, KeyRef end) {
if (auto err = Error(native::fdb_transaction_add_conflict_range(
tr.get(), begin.data(), intSize(begin), end.data(), intSize(end), FDB_CONFLICT_RANGE_TYPE_READ))) {
throwError("fdb_transaction_add_conflict_range returned error: ", err);
}
}
};
class Tenant final {

View File

@ -4606,7 +4606,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
.detail("RestoreVersion", restoreVersion)
.detail("Dest", destVersion);
if (destVersion <= restoreVersion) {
CODE_PROBE(true, "Forcing restored cluster to higher version");
CODE_PROBE(true, "Forcing restored cluster to higher version", probe::decoration::rare);
tr->set(minRequiredCommitVersionKey, BinaryWriter::toValue(restoreVersion + 1, Unversioned()));
wait(tr->commit());
} else {

View File

@ -1461,7 +1461,10 @@ Optional<TenantName> MultiVersionTransaction::getTenant() {
// Waits for the specified duration and signals the assignment variable with a timed out error
// This will be canceled if a new timeout is set, in which case the tsav will not be signaled.
ACTOR Future<Void> timeoutImpl(Reference<ThreadSingleAssignmentVar<Void>> tsav, double duration) {
wait(delay(duration));
state double endTime = now() + duration;
while (now() < endTime) {
wait(delayUntil(std::min(endTime + 0.0001, now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL)));
}
tsav->trySendError(transaction_timed_out());
return Void();
@ -1501,14 +1504,17 @@ void MultiVersionTransaction::setTimeout(Optional<StringRef> value) {
{ // lock scope
ThreadSpinLockHolder holder(timeoutLock);
Reference<ThreadSingleAssignmentVar<Void>> tsav = timeoutTsav;
ThreadFuture<Void> newTimeout = onMainThread([transactionStartTime, tsav, timeoutDuration]() {
return timeoutImpl(tsav, timeoutDuration - std::max(0.0, now() - transactionStartTime));
});
prevTimeout = currentTimeout;
currentTimeout = newTimeout;
if (timeoutDuration > 0) {
Reference<ThreadSingleAssignmentVar<Void>> tsav = timeoutTsav;
ThreadFuture<Void> newTimeout = onMainThread([transactionStartTime, tsav, timeoutDuration]() {
return timeoutImpl(tsav, timeoutDuration - std::max(0.0, now() - transactionStartTime));
});
currentTimeout = newTimeout;
} else {
currentTimeout = ThreadFuture<Void>();
}
}
// Cancel the previous timeout now that we have a new one. This means that changing the timeout
@ -1577,6 +1583,9 @@ void MultiVersionTransaction::reset() {
MultiVersionTransaction::~MultiVersionTransaction() {
timeoutTsav->trySendError(transaction_cancelled());
if (currentTimeout.isValid()) {
currentTimeout.cancel();
}
}
bool MultiVersionTransaction::isValid() {

View File

@ -2710,7 +2710,7 @@ bool DatabaseContext::isCurrentGrvProxy(UID proxyId) const {
if (proxy.id() == proxyId)
return true;
}
CODE_PROBE(true, "stale GRV proxy detected");
CODE_PROBE(true, "stale GRV proxy detected", probe::decoration::rare);
return false;
}
@ -3734,7 +3734,7 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
} else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) {
// clang-format off
CODE_PROBE(e.code() == error_code_watch_cancelled, "Too many watches on the storage server, poll for changes instead");
CODE_PROBE(e.code() == error_code_process_behind, "The storage servers are all behind");
CODE_PROBE(e.code() == error_code_process_behind, "The storage servers are all behind", probe::decoration::rare);
// clang-format on
wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, parameters->taskID));
} else if (e.code() == error_code_timed_out) { // The storage server occasionally times out watches in case
@ -5660,18 +5660,18 @@ Future<Void> Transaction::getRangeStream(PromiseStream<RangeResult>& results,
KeySelector b = begin;
if (b.orEqual) {
CODE_PROBE(true, "Native stream begin orEqual==true");
CODE_PROBE(true, "Native stream begin orEqual==true", probe::decoration::rare);
b.removeOrEqual(b.arena());
}
KeySelector e = end;
if (e.orEqual) {
CODE_PROBE(true, "Native stream end orEqual==true");
CODE_PROBE(true, "Native stream end orEqual==true", probe::decoration::rare);
e.removeOrEqual(e.arena());
}
if (b.offset >= e.offset && b.getKey() >= e.getKey()) {
CODE_PROBE(true, "Native stream range inverted");
CODE_PROBE(true, "Native stream range inverted", probe::decoration::rare);
results.sendError(end_of_stream());
return Void();
}
@ -9754,7 +9754,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
results->storageData.clear();
if (e.code() == error_code_change_feed_popped) {
++db->feedNonRetriableErrors;
CODE_PROBE(true, "getChangeFeedStreamActor got popped");
CODE_PROBE(true, "getChangeFeedStreamActor got popped", probe::decoration::rare);
results->mutations.sendError(e);
results->refresh.sendError(e);
} else {

View File

@ -1214,7 +1214,7 @@ public:
// isolation support. But it is not default and is rarely used. So we disallow it until we have thorough test
// coverage for it.)
if (snapshot) {
CODE_PROBE(true, "getMappedRange not supported for snapshot.");
CODE_PROBE(true, "getMappedRange not supported for snapshot.", probe::decoration::rare);
throw unsupported_operation();
}
// For now, getMappedRange requires read-your-writes being NOT disabled. But the support of RYW is limited
@ -1223,7 +1223,7 @@ public:
// which returns the written value transparently. In another word, it makes sure not break RYW semantics without
// actually implementing reading from the writes.
if (ryw->options.readYourWritesDisabled) {
CODE_PROBE(true, "getMappedRange not supported for read-your-writes disabled.");
CODE_PROBE(true, "getMappedRange not supported for read-your-writes disabled.", probe::decoration::rare);
throw unsupported_operation();
}
@ -1331,6 +1331,11 @@ public:
ACTOR static void simulateTimeoutInFlightCommit(ReadYourWritesTransaction* ryw_) {
state Reference<ReadYourWritesTransaction> ryw = Reference<ReadYourWritesTransaction>::addRef(ryw_);
ASSERT(ryw->options.timeoutInSeconds > 0);
// An actual in-flight commit (i.e. one that's past the point where cancelling the transaction would stop it)
// would already have a read version. We need to get a read version too, otherwise committing a conflicting
// transaction may not ensure this transaction is no longer in-flight, since this transaction could get a read
// version _after_.
wait(success(ryw->getReadVersion()));
if (!ryw->resetPromise.isSet())
ryw->resetPromise.sendError(transaction_timed_out());
wait(delay(deterministicRandom()->random01() * 5));
@ -1649,7 +1654,7 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(KeySelector begin,
// This optimization prevents nullptr operations from being added to the conflict range
if (limits.isReached()) {
CODE_PROBE(true, "RYW range read limit 0");
CODE_PROBE(true, "RYW range read limit 0", probe::decoration::rare);
return RangeResult();
}
@ -1663,7 +1668,7 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(KeySelector begin,
end.removeOrEqual(end.arena());
if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) {
CODE_PROBE(true, "RYW range inverted");
CODE_PROBE(true, "RYW range inverted", probe::decoration::rare);
return RangeResult();
}

View File

@ -116,7 +116,7 @@ public:
static Future<Void> deleteFile(std::string filename, bool mustBeDurable) {
::deleteFile(filename);
if (mustBeDurable) {
CODE_PROBE(true, "deleteFile and fsync parent dir");
CODE_PROBE(true, "deleteFile and fsync parent dir", probe::decoration::rare);
return async_fsync_parent(filename);
} else
return Void();

View File

@ -360,7 +360,7 @@ public:
//(e.g. to simulate power failure)
Future<Void> kill() {
TraceEvent("AsyncFileNonDurable_Kill", id).detail("Filename", filename);
CODE_PROBE(true, "AsyncFileNonDurable was killed");
CODE_PROBE(true, "AsyncFileNonDurable was killed", probe::decoration::rare);
return sync(this, false);
}
@ -404,7 +404,7 @@ private:
TraceEvent("AsyncFileNonDurable_KilledFileOperation", self->id)
.detail("In", context)
.detail("Filename", self->filename);
CODE_PROBE(true, "AsyncFileNonDurable operation killed");
CODE_PROBE(true, "AsyncFileNonDurable operation killed", probe::decoration::rare);
throw io_error().asInjectedFault();
}
@ -603,13 +603,13 @@ private:
.detail("HasGarbage", garbage)
.detail("Side", side)
.detail("Filename", self->filename);
CODE_PROBE(true, "AsyncFileNonDurable bad write");
CODE_PROBE(true, "AsyncFileNonDurable bad write", probe::decoration::rare);
} else {
TraceEvent("AsyncFileNonDurable_DroppedWrite", self->id)
.detail("Offset", offset + writeOffset + pageOffset)
.detail("Length", sectorLength)
.detail("Filename", self->filename);
CODE_PROBE(true, "AsyncFileNonDurable dropped write");
CODE_PROBE(true, "AsyncFileNonDurable dropped write", probe::decoration::rare);
}
pageOffset += sectorLength;
@ -689,7 +689,7 @@ private:
wait(self->file->truncate(size));
else {
TraceEvent("AsyncFileNonDurable_DroppedTruncate", self->id).detail("Size", size);
CODE_PROBE(true, "AsyncFileNonDurable dropped truncate");
CODE_PROBE(true, "AsyncFileNonDurable dropped truncate", probe::decoration::rare);
}
return Void();
@ -753,7 +753,7 @@ private:
// temporary file and then renamed to the correct location once sync is called. By not calling sync, we
// simulate a failure to fsync the directory storing the file
if (self->hasBeenSynced && writeDurable && deterministicRandom()->random01() < 0.5) {
CODE_PROBE(true, "AsyncFileNonDurable kill was durable and synced");
CODE_PROBE(true, "AsyncFileNonDurable kill was durable and synced", probe::decoration::rare);
wait(success(errorOr(self->file->sync())));
}

View File

@ -1654,7 +1654,8 @@ public:
CODE_PROBE(kt == FailDisk,
"Simulated machine was killed with a failed disk",
probe::context::sim2,
probe::assert::simOnly);
probe::assert::simOnly,
probe::decoration::rare);
if (kt == KillInstantly) {
TraceEvent(SevWarn, "FailMachine")
@ -2126,8 +2127,11 @@ public:
.detail("KillTypeMin", ktMin)
.detail("KilledDC", kt == ktMin);
CODE_PROBE(
kt != ktMin, "DataCenter kill was rejected by killMachine", probe::context::sim2, probe::assert::simOnly);
CODE_PROBE(kt != ktMin,
"DataCenter kill was rejected by killMachine",
probe::context::sim2,
probe::assert::simOnly,
probe::decoration::rare);
CODE_PROBE((kt == ktMin) && (kt == RebootAndDelete),
"Datacenter kill Resulted in a reboot and delete",
probe::context::sim2,

View File

@ -644,7 +644,7 @@ private:
if (!initialCommit)
txnStateStore->set(KeyValueRef(m.param1, m.param2));
confChange = true;
CODE_PROBE(true, "Setting version epoch");
CODE_PROBE(true, "Setting version epoch", probe::decoration::rare);
}
void checkSetWriteRecoverKey(MutationRef m) {

View File

@ -1558,7 +1558,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr->getTransaction(), granuleRange));
if (purgeState != ForcedPurgeState::NonePurged) {
CODE_PROBE(true, "Initial Split Re-evaluate stopped because of force purge");
CODE_PROBE(true, "Initial Split Re-evaluate stopped because of force purge", probe::decoration::rare);
TraceEvent("GranuleSplitReEvalCancelledForcePurge", bmData->id)
.detail("Epoch", bmData->epoch)
.detail("GranuleRange", granuleRange);
@ -1579,7 +1579,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
KeyRange range = blobGranuleFileKeyRangeFor(granuleID);
RangeResult granuleFiles = wait(tr->getRange(range, 1));
if (!granuleFiles.empty()) {
CODE_PROBE(true, "split too big was eventually solved by another worker");
CODE_PROBE(true, "split too big was eventually solved by another worker", probe::decoration::rare);
if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: solved by another worker\n",
bmData->epoch,
@ -1637,7 +1637,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
RangeResult existingRanges = wait(
krmGetRanges(tr, blobGranuleMappingKeys.begin, granuleRange, 3, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
if (existingRanges.size() > 2 || existingRanges.more) {
CODE_PROBE(true, "split too big was already re-split");
CODE_PROBE(true, "split too big was already re-split", probe::decoration::rare);
if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: already split\n",
bmData->epoch,
@ -2077,7 +2077,7 @@ ACTOR Future<bool> forceGranuleFlush(Reference<BlobManagerData> bmData,
try {
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr, keyRange));
if (purgeState != ForcedPurgeState::NonePurged) {
CODE_PROBE(true, "Granule flush stopped because of force purge");
CODE_PROBE(true, "Granule flush stopped because of force purge", probe::decoration::rare);
TraceEvent("GranuleFlushCancelledForcePurge", bmData->id)
.detail("Epoch", bmData->epoch)
.detail("KeyRange", keyRange);
@ -2225,7 +2225,7 @@ ACTOR Future<std::pair<UID, Version>> persistMergeGranulesStart(Reference<BlobMa
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr->getTransaction(), mergeRange));
if (purgeState != ForcedPurgeState::NonePurged) {
CODE_PROBE(true, "Merge start stopped because of force purge");
CODE_PROBE(true, "Merge start stopped because of force purge", probe::decoration::rare);
TraceEvent("GranuleMergeStartCancelledForcePurge", bmData->id)
.detail("Epoch", bmData->epoch)
.detail("GranuleRange", mergeRange);
@ -2311,7 +2311,7 @@ ACTOR Future<bool> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
}
}
if (tmpWorkerId == UID()) {
CODE_PROBE(true, "All workers dead right now");
CODE_PROBE(true, "All workers dead right now", probe::decoration::rare);
while (bmData->workersById.empty()) {
wait(bmData->recruitingStream.onChange() || bmData->foundBlobWorkers.getFuture());
}
@ -2564,7 +2564,9 @@ static void attemptStartMerge(Reference<BlobManagerData> bmData,
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(mergeRange);
for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().mergeEligible()) {
CODE_PROBE(true, " granule no longer merge candidate after checking metrics, aborting merge");
CODE_PROBE(true,
"granule no longer merge candidate after checking metrics, aborting merge",
probe::decoration::rare);
return;
}
}

View File

@ -659,7 +659,7 @@ ACTOR Future<Void> updateGranuleSplitState(Transaction* tr,
CODE_PROBE(true, "Granule split stopping change feed");
}
} else if (BW_DEBUG) {
CODE_PROBE(true, "Out of order granule split state updates ignored");
CODE_PROBE(true, "Out of order granule split state updates ignored", probe::decoration::rare);
fmt::print("Ignoring granule {0} split state from {1} {2} -> {3}\n",
currentGranuleID.toString(),
parentGranuleID.toString(),
@ -2650,7 +2650,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// queue too many files in parallel, and slow down change feed consuming to let file writing
// catch up
CODE_PROBE(true, "Granule processing long tail of old change feed");
CODE_PROBE(true, "Granule processing long tail of old change feed", probe::decoration::rare);
if (inFlightFiles.size() > 10 && inFlightFiles.front().version <= metadata->knownCommittedVersion) {
if (BW_DEBUG) {
fmt::print("[{0} - {1}) Waiting on delta file b/c old change feed\n",
@ -2731,7 +2731,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// FIXME: better way to fix this?
bool isForcePurging = wait(checkFileNotFoundForcePurgeRace(bwData, metadata->keyRange));
if (isForcePurging) {
CODE_PROBE(true, "Granule got file not found from force purge");
CODE_PROBE(true, "Granule got file not found from force purge", probe::decoration::rare);
TraceEvent("GranuleFileUpdaterFileNotFoundForcePurge", bwData->id)
.error(e2)
.detail("KeyRange", metadata->keyRange)
@ -3550,7 +3550,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// We can get change feed cancelled from whenAtLeast. This means the change feed may
// retry, or may be cancelled. Wait a bit and try again to see
if (e.code() == error_code_change_feed_popped) {
CODE_PROBE(true, "Change feed popped while read waiting");
CODE_PROBE(true, "Change feed popped while read waiting", probe::decoration::rare);
throw wrong_shard_server();
}
if (e.code() != error_code_change_feed_cancelled) {
@ -3574,7 +3574,9 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
Version emptyVersion = metadata->activeCFData.get()->popVersion - 1;
if (req.readVersion > metadata->durableDeltaVersion.get() &&
emptyVersion > metadata->bufferedDeltaVersion) {
CODE_PROBE(true, "feed popped for read but granule updater didn't notice yet");
CODE_PROBE(true,
"feed popped for read but granule updater didn't notice yet",
probe::decoration::rare);
// FIXME: could try to cancel the actor here somehow, but it should find out eventually
throw wrong_shard_server();
}
@ -3789,7 +3791,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
when(wait(doBlobGranuleFileRequest(bwData, req))) {}
when(wait(delay(SERVER_KNOBS->BLOB_WORKER_REQUEST_TIMEOUT))) {
if (!req.reply.isSet()) {
CODE_PROBE(true, "Blob Worker request timeout hit");
CODE_PROBE(true, "Blob Worker request timeout hit", probe::decoration::rare);
if (BW_DEBUG) {
fmt::print("BW {0} request [{1} - {2}) @ {3} timed out, sending WSS\n",
bwData->id.toString().substr(0, 5),
@ -3878,7 +3880,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
// if it's the first snapshot of a new granule, history won't be present
if (info.history.present()) {
if (info.granuleID != info.history.get().value.granuleID) {
CODE_PROBE(true, "Blob Worker re-opening granule after merge+resplit");
CODE_PROBE(true, "Blob Worker re-opening granule after merge+resplit", probe::decoration::rare);
// The only case this can happen is when a granule was merged into a larger granule,
// then split back out to the same one. Validate that this is a new granule that was
// split previously. Just check lock based on epoch, since seqno is intentionally

View File

@ -368,7 +368,9 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
CODE_PROBE(err.code() == error_code_grv_proxy_failed, "Terminated due to GRV proxy failure");
CODE_PROBE(err.code() == error_code_resolver_failed, "Terminated due to resolver failure");
CODE_PROBE(err.code() == error_code_backup_worker_failed, "Terminated due to backup worker failure");
CODE_PROBE(err.code() == error_code_operation_failed, "Terminated due to failed operation");
CODE_PROBE(err.code() == error_code_operation_failed,
"Terminated due to failed operation",
probe::decoration::rare);
CODE_PROBE(err.code() == error_code_restart_cluster_controller,
"Terminated due to cluster-controller restart.");
@ -1308,7 +1310,7 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
}
checkOutstandingRequests(self);
} else {
CODE_PROBE(true, "Received an old worker registration request.");
CODE_PROBE(true, "Received an old worker registration request.", probe::decoration::rare);
}
// For each singleton
@ -2621,23 +2623,31 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
self->degradationInfo = self->getDegradationInfo();
// Compare `self->degradedServers` with `self->excludedDegradedServers` and remove those that have
// Compare `self->degradationInfo` with `self->excludedDegradedServers` and remove those that have
// recovered.
for (auto it = self->excludedDegradedServers.begin(); it != self->excludedDegradedServers.end();) {
if (self->degradationInfo.degradedServers.find(*it) == self->degradationInfo.degradedServers.end()) {
if (self->degradationInfo.degradedServers.find(*it) == self->degradationInfo.degradedServers.end() &&
self->degradationInfo.disconnectedServers.find(*it) ==
self->degradationInfo.disconnectedServers.end()) {
self->excludedDegradedServers.erase(it++);
} else {
++it;
}
}
if (!self->degradationInfo.degradedServers.empty() || self->degradationInfo.degradedSatellite) {
if (!self->degradationInfo.degradedServers.empty() || !self->degradationInfo.disconnectedServers.empty() ||
self->degradationInfo.degradedSatellite) {
std::string degradedServerString;
for (const auto& server : self->degradationInfo.degradedServers) {
degradedServerString += server.toString() + " ";
}
std::string disconnectedServerString;
for (const auto& server : self->degradationInfo.disconnectedServers) {
disconnectedServerString += server.toString() + " ";
}
TraceEvent("ClusterControllerHealthMonitor")
.detail("DegradedServers", degradedServerString)
.detail("DisconnectedServers", disconnectedServerString)
.detail("DegradedSatellite", self->degradationInfo.degradedSatellite);
// Check if the cluster controller should trigger a recovery to exclude any degraded servers from
@ -2647,6 +2657,8 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
if (self->recentRecoveryCountDueToHealth() < SERVER_KNOBS->CC_MAX_HEALTH_RECOVERY_COUNT) {
self->recentHealthTriggeredRecoveryTime.push(now());
self->excludedDegradedServers = self->degradationInfo.degradedServers;
self->excludedDegradedServers.insert(self->degradationInfo.disconnectedServers.begin(),
self->degradationInfo.disconnectedServers.end());
TraceEvent("DegradedServerDetectedAndTriggerRecovery")
.detail("RecentRecoveryCountDueToHealth", self->recentRecoveryCountDueToHealth());
self->db.forceMasterFailure.trigger();
@ -2997,6 +3009,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
req.address = workerAddress;
req.degradedPeers.push_back(badPeer1);
req.degradedPeers.push_back(badPeer2);
req.disconnectedPeers.push_back(badPeer1);
req.disconnectedPeers.push_back(badPeer2);
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
@ -3005,6 +3019,11 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
ASSERT_EQ(health.degradedPeers[badPeer1].startTime, health.degradedPeers[badPeer1].lastRefreshTime);
ASSERT(health.degradedPeers.find(badPeer2) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer2].startTime, health.degradedPeers[badPeer2].lastRefreshTime);
ASSERT_EQ(health.disconnectedPeers.size(), 2);
ASSERT(health.disconnectedPeers.find(badPeer1) != health.disconnectedPeers.end());
ASSERT_EQ(health.disconnectedPeers[badPeer1].startTime, health.disconnectedPeers[badPeer1].lastRefreshTime);
ASSERT(health.disconnectedPeers.find(badPeer2) != health.disconnectedPeers.end());
ASSERT_EQ(health.disconnectedPeers[badPeer2].startTime, health.disconnectedPeers[badPeer2].lastRefreshTime);
}
// Create a `UpdateWorkerHealthRequest` with two bad peers, one from the previous test and a new one.
@ -3019,6 +3038,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
req.address = workerAddress;
req.degradedPeers.push_back(badPeer1);
req.degradedPeers.push_back(badPeer3);
req.disconnectedPeers.push_back(badPeer1);
req.disconnectedPeers.push_back(badPeer3);
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
@ -3030,6 +3051,15 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
ASSERT_EQ(health.degradedPeers[badPeer2].startTime, health.degradedPeers[badPeer1].startTime);
ASSERT(health.degradedPeers.find(badPeer3) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer3].startTime, health.degradedPeers[badPeer3].lastRefreshTime);
ASSERT_EQ(health.disconnectedPeers.size(), 3);
ASSERT(health.disconnectedPeers.find(badPeer1) != health.disconnectedPeers.end());
ASSERT_LT(health.disconnectedPeers[badPeer1].startTime, health.disconnectedPeers[badPeer1].lastRefreshTime);
ASSERT(health.disconnectedPeers.find(badPeer2) != health.disconnectedPeers.end());
ASSERT_EQ(health.disconnectedPeers[badPeer2].startTime, health.disconnectedPeers[badPeer2].lastRefreshTime);
ASSERT_EQ(health.disconnectedPeers[badPeer2].startTime, health.disconnectedPeers[badPeer1].startTime);
ASSERT(health.disconnectedPeers.find(badPeer3) != health.disconnectedPeers.end());
ASSERT_EQ(health.disconnectedPeers[badPeer3].startTime, health.disconnectedPeers[badPeer3].lastRefreshTime);
previousStartTime = health.degradedPeers[badPeer3].startTime;
previousRefreshTime = health.degradedPeers[badPeer3].lastRefreshTime;
}
@ -3047,20 +3077,10 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
ASSERT(health.degradedPeers.find(badPeer3) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer3].startTime, previousStartTime);
ASSERT_EQ(health.degradedPeers[badPeer3].lastRefreshTime, previousRefreshTime);
}
// Create a `UpdateWorkerHealthRequest` with disconnected peers, which should update the bad peer's lastRefreshTime.
{
wait(delay(0.001));
UpdateWorkerHealthRequest req;
req.address = workerAddress;
req.disconnectedPeers.push_back(badPeer3);
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
ASSERT_EQ(health.degradedPeers.size(), 3);
ASSERT(health.degradedPeers.find(badPeer3) != health.degradedPeers.end());
ASSERT_LT(health.degradedPeers[badPeer3].startTime, health.degradedPeers[badPeer3].lastRefreshTime);
ASSERT_EQ(health.disconnectedPeers.size(), 3);
ASSERT(health.disconnectedPeers.find(badPeer3) != health.disconnectedPeers.end());
ASSERT_EQ(health.disconnectedPeers[badPeer3].startTime, previousStartTime);
ASSERT_EQ(health.disconnectedPeers[badPeer3].lastRefreshTime, previousRefreshTime);
}
return Void();
@ -3076,11 +3096,14 @@ TEST_CASE("/fdbserver/clustercontroller/updateRecoveredWorkers") {
NetworkAddress worker2(IPAddress(0x11111111), 1);
NetworkAddress badPeer1(IPAddress(0x02020202), 1);
NetworkAddress badPeer2(IPAddress(0x03030303), 1);
NetworkAddress disconnectedPeer3(IPAddress(0x04040404), 1);
// Create following test scenario:
// worker1 -> badPeer1 active
// worker1 -> badPeer2 recovered
// worker1 -> disconnectedPeer3 active
// worker2 -> badPeer2 recovered
// worker2 -> disconnectedPeer3 recovered
data.workerHealth[worker1].degradedPeers[badPeer1] = {
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1, now()
};
@ -3088,16 +3111,25 @@ TEST_CASE("/fdbserver/clustercontroller/updateRecoveredWorkers") {
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1,
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1
};
data.workerHealth[worker1].degradedPeers[disconnectedPeer3] = {
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1, now()
};
data.workerHealth[worker2].degradedPeers[badPeer2] = {
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1,
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1
};
data.workerHealth[worker2].degradedPeers[disconnectedPeer3] = {
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1,
now() - SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL - 1
};
data.updateRecoveredWorkers();
ASSERT_EQ(data.workerHealth.size(), 1);
ASSERT(data.workerHealth.find(worker1) != data.workerHealth.end());
ASSERT(data.workerHealth[worker1].degradedPeers.find(badPeer1) != data.workerHealth[worker1].degradedPeers.end());
ASSERT(data.workerHealth[worker1].degradedPeers.find(badPeer2) == data.workerHealth[worker1].degradedPeers.end());
ASSERT(data.workerHealth[worker1].degradedPeers.find(disconnectedPeer3) !=
data.workerHealth[worker1].degradedPeers.end());
ASSERT(data.workerHealth.find(worker2) == data.workerHealth.end());
return Void();
@ -3119,6 +3151,7 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
// cluster controller.
{
data.workerHealth[worker].degradedPeers[badPeer1] = { now(), now() };
data.workerHealth[worker].disconnectedPeers[badPeer2] = { now(), now() };
ASSERT(data.getDegradationInfo().degradedServers.empty());
data.workerHealth.clear();
}
@ -3131,6 +3164,19 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
auto degradationInfo = data.getDegradationInfo();
ASSERT(degradationInfo.degradedServers.size() == 1);
ASSERT(degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end());
ASSERT(degradationInfo.disconnectedServers.empty());
data.workerHealth.clear();
}
// Test that when there is only one reported disconnected link, getDegradationInfo can return correct
// degraded server.
{
data.workerHealth[worker].disconnectedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
auto degradationInfo = data.getDegradationInfo();
ASSERT(degradationInfo.disconnectedServers.size() == 1);
ASSERT(degradationInfo.disconnectedServers.find(badPeer1) != degradationInfo.disconnectedServers.end());
ASSERT(degradationInfo.degradedServers.empty());
data.workerHealth.clear();
}
@ -3140,16 +3186,25 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
now() };
data.workerHealth[badPeer1].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[worker].disconnectedPeers[badPeer2] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[badPeer2].disconnectedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
auto degradationInfo = data.getDegradationInfo();
ASSERT(degradationInfo.degradedServers.size() == 1);
ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end() ||
degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end());
ASSERT(degradationInfo.disconnectedServers.size() == 1);
ASSERT(degradationInfo.disconnectedServers.find(worker) != degradationInfo.disconnectedServers.end() ||
degradationInfo.disconnectedServers.find(badPeer2) != degradationInfo.disconnectedServers.end());
data.workerHealth.clear();
}
// Test that if B complains A and C complains A, A is selected as degraded server instead of B or C.
{
ASSERT(SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE < 4);
// test for both degraded peers and disconnected peers.
data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[badPeer1].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
@ -3158,9 +3213,19 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
now() };
data.workerHealth[badPeer2].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[worker].disconnectedPeers[badPeer3] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[badPeer3].disconnectedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[worker].disconnectedPeers[badPeer4] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[badPeer4].disconnectedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
auto degradationInfo = data.getDegradationInfo();
ASSERT(degradationInfo.degradedServers.size() == 1);
ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end());
ASSERT(degradationInfo.disconnectedServers.size() == 1);
ASSERT(degradationInfo.disconnectedServers.find(worker) != degradationInfo.disconnectedServers.end());
data.workerHealth.clear();
}
@ -3179,6 +3244,23 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
data.workerHealth.clear();
}
// Test that CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE doesn't affect disconnectedServers calculation.
{
ASSERT(SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE < 4);
data.workerHealth[badPeer1].disconnectedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[badPeer2].disconnectedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[badPeer3].disconnectedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
data.workerHealth[badPeer4].disconnectedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
ASSERT(data.getDegradationInfo().disconnectedServers.size() == 1);
ASSERT(data.getDegradationInfo().disconnectedServers.find(worker) !=
data.getDegradationInfo().disconnectedServers.end());
data.workerHealth.clear();
}
// Test that if the degradation is reported both ways between A and other 4 servers, no degraded server is
// returned.
{
@ -3300,40 +3382,65 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
data.degradationInfo.degradedServers.insert(master);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(master);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// Trigger recovery when primary TLog is degraded.
data.degradationInfo.degradedServers.insert(tlog);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(tlog);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No recovery when satellite Tlog is degraded.
data.degradationInfo.degradedServers.insert(satelliteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(satelliteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No recovery when remote tlog is degraded.
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No recovery when log router is degraded.
data.degradationInfo.degradedServers.insert(logRouter);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(logRouter);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No recovery when backup worker is degraded.
data.degradationInfo.degradedServers.insert(backup);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(backup);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// Trigger recovery when proxy is degraded.
data.degradationInfo.degradedServers.insert(proxy);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(proxy);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// Trigger recovery when resolver is degraded.
data.degradationInfo.degradedServers.insert(resolver);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(resolver);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
return Void();
}
@ -3414,6 +3521,9 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
data.degradationInfo.degradedServers.insert(master);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(master);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// Trigger failover when enough servers in the txn system are degraded.
data.degradationInfo.degradedServers.insert(master);
@ -3422,6 +3532,13 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
data.degradationInfo.degradedServers.insert(proxy2);
data.degradationInfo.degradedServers.insert(resolver);
ASSERT(data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(master);
data.degradationInfo.disconnectedServers.insert(tlog);
data.degradationInfo.disconnectedServers.insert(proxy);
data.degradationInfo.disconnectedServers.insert(proxy2);
data.degradationInfo.disconnectedServers.insert(resolver);
ASSERT(data.shouldTriggerFailoverDueToDegradedServers());
// No failover when usable region is 1.
data.db.config.usableRegions = 1;
@ -3432,6 +3549,9 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No failover when some are not from transaction system
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1));
@ -3452,6 +3572,9 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
return Void();
}

View File

@ -751,7 +751,7 @@ ACTOR Future<Void> updateLogsValue(Reference<ClusterRecoveryData> self, Database
}
if (!found) {
CODE_PROBE(true, "old master attempted to change logsKey");
CODE_PROBE(true, "old master attempted to change logsKey", probe::decoration::rare);
return Void();
}
@ -830,7 +830,7 @@ ACTOR Future<Void> updateRegistration(Reference<ClusterRecoveryData> self, Refer
std::vector<UID>()));
} else {
// The cluster should enter the accepting commits phase soon, and then we will register again
CODE_PROBE(true, "cstate is updated but we aren't accepting commits yet");
CODE_PROBE(true, "cstate is updated but we aren't accepting commits yet", probe::decoration::rare);
}
}
}

View File

@ -230,7 +230,7 @@ class ConfigNodeImpl {
// Handle a very rare case where a ConfigNode loses data between
// responding with a committed version and responding to the
// subsequent get changes request.
CODE_PROBE(true, "ConfigNode data loss occurred on a minority of coordinators");
CODE_PROBE(true, "ConfigNode data loss occurred on a minority of coordinators", probe::decoration::rare);
req.reply.sendError(process_behind()); // Reuse the process_behind error
return Void();
}

View File

@ -323,7 +323,8 @@ struct MovableCoordinatedStateImpl {
Value oldQuorumState = wait(cs.read());
if (oldQuorumState != self->lastCSValue.get()) {
CODE_PROBE(true, "Quorum change aborted by concurrent write to old coordination state");
CODE_PROBE(
true, "Quorum change aborted by concurrent write to old coordination state", probe::decoration::rare);
TraceEvent("QuorumChangeAbortedByConcurrency").log();
throw coordinated_state_conflict();
}

View File

@ -1934,7 +1934,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
throw error;
}
} else {
CODE_PROBE(true, "move to removed server");
CODE_PROBE(true, "move to removed server", probe::decoration::rare);
healthyDestinations.addDataInFlightToTeam(-metrics.bytes);
auto readLoad = metrics.bytesReadPerKSecond;
auto& destinationRef = healthyDestinations;

View File

@ -1375,7 +1375,7 @@ ACTOR Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKM
when(wait(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01) ? Never()
: fetchTopKShardMetrics_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT");
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT", probe::decoration::rare);
req.reply.send(GetTopKMetricsReply());
}
}

View File

@ -1321,7 +1321,7 @@ public:
}
}
if (addedNewBadTeam && self->badTeamRemover.isReady()) {
CODE_PROBE(true, "Server locality change created bad teams");
CODE_PROBE(true, "Server locality change created bad teams", probe::decoration::rare);
self->doBuildTeams = true;
self->badTeamRemover = removeBadTeams(self);
self->addActor.send(self->badTeamRemover);
@ -5920,4 +5920,4 @@ TEST_CASE("/DataDistribution/StorageWiggler/NextIdWithTSS") {
ASSERT(now() - startTime < SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 150.0);
ASSERT(id == UID(2, 0));
return Void();
}
}

View File

@ -917,7 +917,7 @@ ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>>
configuration.storageTeamSize - 1) -
storageFailures;
if (*storageFaultTolerance < 0) {
CODE_PROBE(true, "Too many failed storage servers to complete snapshot");
CODE_PROBE(true, "Too many failed storage servers to complete snapshot", probe::decoration::rare);
throw snap_storage_failed();
}
// tlogs
@ -938,7 +938,7 @@ ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>>
// get coordinators
Optional<Value> coordinators = wait(tr.get(coordinatorsKey));
if (!coordinators.present()) {
CODE_PROBE(true, "Failed to read the coordinatorsKey");
CODE_PROBE(true, "Failed to read the coordinatorsKey", probe::decoration::rare);
throw operation_failed();
}
ClusterConnectionString ccs(coordinators.get().toString());
@ -1487,4 +1487,4 @@ TEST_CASE("/DataDistribution/Initialization/ResumeFromShard") {
self->shardsAffectedByTeamFailure->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::ForceCheck);
self->shardsAffectedByTeamFailure->check();
return Void();
}
}

View File

@ -375,7 +375,7 @@ bool LogPushData::writeTransactionInfo(int location, uint32_t subseq) {
// parent->child.
SpanContextMessage contextMessage;
if (spanContext.isSampled()) {
CODE_PROBE(true, "Converting OTELSpanContextMessage to traced SpanContextMessage");
CODE_PROBE(true, "Converting OTELSpanContextMessage to traced SpanContextMessage", probe::decoration::rare);
contextMessage = SpanContextMessage(UID(spanContext.traceID.first(), spanContext.traceID.second()));
} else {
CODE_PROBE(true, "Converting OTELSpanContextMessage to untraced SpanContextMessage");
@ -400,4 +400,4 @@ void LogPushData::setMutations(uint32_t totalMutations, VectorRef<StringRef> mut
BinaryWriter& wr = messagesWriter[i];
wr.serializeBytes(mutations[i].substr(header));
}
}
}

View File

@ -652,7 +652,7 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
// Attempt to move onto a server that isn't in serverList (removed or never added to the
// database) This can happen (why?) and is handled by the data distribution algorithm
// FIXME: Answer why this can happen?
CODE_PROBE(true, "start move keys moving to a removed server");
CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare);
throw move_to_removed_server();
}
}
@ -846,7 +846,7 @@ ACTOR Future<Void> checkFetchingState(Database cx,
for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
// FIXME: Is this the right behavior? dataMovementComplete will never be sent!
CODE_PROBE(true, "check fetching state moved to removed server");
CODE_PROBE(true, "check fetching state moved to removed server", probe::decoration::rare);
throw move_to_removed_server();
}
auto si = decodeServerListValue(serverListValues[s].get());

View File

@ -98,7 +98,7 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
SpanContextMessage scm;
br >> scm;
} else if (OTELSpanContextMessage::startsOTELSpanContextMessage(mutationType)) {
CODE_PROBE(true, "MutationTracking reading OTELSpanContextMessage");
CODE_PROBE(true, "MutationTracking reading OTELSpanContextMessage", probe::decoration::rare);
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
OTELSpanContextMessage scm;
br >> scm;

View File

@ -182,7 +182,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) {
if (h.size()) {
CODE_PROBE(true, "Zero fill within size field");
CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -488,7 +488,7 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
CODE_PROBE(true, "TLog stopped by recovering master");
CODE_PROBE(logData->stopped, "LogData already stopped");
CODE_PROBE(!logData->stopped, "LogData not yet stopped");
CODE_PROBE(!logData->stopped, "LogData not yet stopped", probe::decoration::rare);
TraceEvent("TLogStop", logData->logId)
.detail("Ver", stopVersion)
@ -1026,7 +1026,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != rep.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version");
CODE_PROBE(true, "tlog peek second attempt ended at a different version", probe::decoration::rare);
replyPromise.sendError(operation_obsolete());
return Void();
}
@ -1099,7 +1099,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != reply.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)");
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)", probe::decoration::rare);
replyPromise.sendError(operation_obsolete());
return Void();
}
@ -1462,7 +1462,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
if (!fFormat.get().present()) {
RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1));
if (!v.size()) {
CODE_PROBE(true, "The DB is completely empty, so it was never initialized. Delete it.");
CODE_PROBE(
true, "The DB is completely empty, so it was never initialized. Delete it.", probe::decoration::rare);
throw worker_removed();
} else {
// This should never happen
@ -1548,7 +1549,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
try {
loop {
if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery");
CODE_PROBE(true, "all tlogs removed during queue recovery", probe::decoration::rare);
throw worker_removed();
}
choose {

View File

@ -148,7 +148,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) {
if (h.size()) {
CODE_PROBE(true, "Zero fill within size field");
CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -162,7 +162,7 @@ private:
Standalone<StringRef> e = wait(self->queue->readNext(payloadSize + 1));
if (e.size() != payloadSize + 1) {
CODE_PROBE(true, "Zero fill within payload");
CODE_PROBE(true, "Zero fill within payload", probe::decoration::rare);
zeroFillSize = payloadSize + 1 - e.size();
break;
}
@ -176,7 +176,7 @@ private:
}
}
if (zeroFillSize) {
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue");
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue", probe::decoration::rare);
for (int i = 0; i < zeroFillSize; i++)
self->queue->push(StringRef((const uint8_t*)"", 1));
}
@ -631,7 +631,7 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
CODE_PROBE(true, "TLog stopped by recovering master");
CODE_PROBE(logData->stopped, "logData already stopped");
CODE_PROBE(!logData->stopped, "logData not yet stopped");
CODE_PROBE(!logData->stopped, "logData not yet stopped", probe::decoration::rare);
TraceEvent("TLogStop", logData->logId)
.detail("Ver", stopVersion)
@ -1317,7 +1317,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version");
CODE_PROBE(true, "tlog peek second attempt ended at a different version", probe::decoration::rare);
replyPromise.sendError(operation_obsolete());
return Void();
}
@ -1415,7 +1415,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
if (sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)");
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)", probe::decoration::rare);
replyPromise.sendError(operation_obsolete());
return Void();
}
@ -1522,7 +1522,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
.detail("LogId", logData->logId)
.detail("Version", it->version.get())
.detail("QueueVer", it->queueCommittedVersion.get());
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue");
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue", probe::decoration::rare);
it->queueCommittedVersion.set(it->version.get());
}
return Void();
@ -1982,7 +1982,7 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) {
//TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get());
ASSERT(logData->isPrimary);
CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped");
CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped", probe::decoration::rare);
if (!logData->stopped)
logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput));
else
@ -2323,7 +2323,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
if (!fFormat.get().present()) {
RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1));
if (!v.size()) {
CODE_PROBE(true, "The DB is completely empty, so it was never initialized. Delete it.");
CODE_PROBE(
true, "The DB is completely empty, so it was never initialized. Delete it.", probe::decoration::rare);
throw worker_removed();
} else {
// This should never happen
@ -2463,7 +2464,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
try {
loop {
if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery");
CODE_PROBE(true, "all tlogs removed during queue recovery", probe::decoration::rare);
throw worker_removed();
}
choose {

View File

@ -156,7 +156,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) {
if (h.size()) {
CODE_PROBE(true, "Zero fill within size field");
CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -733,7 +733,7 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
CODE_PROBE(true, "TLog stopped by recovering master");
CODE_PROBE(logData->stopped, "logData already stopped");
CODE_PROBE(!logData->stopped, "logData not yet stopped");
CODE_PROBE(!logData->stopped, "logData not yet stopped", probe::decoration::rare);
TraceEvent("TLogStop", logData->logId)
.detail("Ver", stopVersion)
@ -1655,7 +1655,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version");
CODE_PROBE(true, "tlog peek second attempt ended at a different version", probe::decoration::rare);
replyPromise.sendError(operation_obsolete());
return Void();
}
@ -1843,7 +1843,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
if (sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)");
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)", probe::decoration::rare);
replyPromise.sendError(operation_obsolete());
return Void();
}
@ -1905,7 +1905,7 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
wait(lowPriorityDelay(SERVER_KNOBS->TLOG_DEGRADED_DURATION));
TraceEvent(SevWarnAlways, "TLogDegraded", self->dbgid).log();
CODE_PROBE(true, "TLog degraded");
CODE_PROBE(true, "TLog degraded", probe::decoration::rare);
self->degraded->set(true);
return Void();
}
@ -1963,7 +1963,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
.detail("LogId", logData->logId)
.detail("Version", it->version.get())
.detail("QueueVer", it->queueCommittedVersion.get());
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue");
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue", probe::decoration::rare);
it->queueCommittedVersion.set(it->version.get());
}
return Void();
@ -2426,7 +2426,7 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) {
//TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get());
ASSERT(logData->isPrimary);
CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped");
CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped", probe::decoration::rare);
if (!logData->stopped)
logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput));
else
@ -2790,7 +2790,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
if (!fFormat.get().present()) {
RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1));
if (!v.size()) {
CODE_PROBE(true, "The DB is completely empty, so it was never initialized. Delete it.");
CODE_PROBE(
true, "The DB is completely empty, so it was never initialized. Delete it.", probe::decoration::rare);
throw worker_removed();
} else {
// This should never happen
@ -2938,7 +2939,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
throw end_of_stream();
loop {
if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery");
CODE_PROBE(true, "all tlogs removed during queue recovery", probe::decoration::rare);
throw worker_removed();
}
choose {

View File

@ -99,7 +99,7 @@ Optional<double> RkTagThrottleCollection::autoThrottleTag(UID id,
itr = autoThrottledTags.try_emplace(tag).first;
initializeTag(tag);
} else if (itr->second.limits.expiration <= now()) {
CODE_PROBE(true, "Re-throttling expired tag that hasn't been cleaned up");
CODE_PROBE(true, "Re-throttling expired tag that hasn't been cleaned up", probe::decoration::rare);
present = false;
itr->second = RkTagThrottleData();
}

View File

@ -908,7 +908,7 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
CODE_PROBE(bootCount >= 1, "Simulated machine rebooted");
CODE_PROBE(bootCount >= 2, "Simulated machine rebooted twice");
CODE_PROBE(bootCount >= 3, "Simulated machine rebooted three times");
CODE_PROBE(bootCount >= 3, "Simulated machine rebooted three times", probe::decoration::rare);
++bootCount;
TraceEvent("SimulatedMachineStart", randomId)
@ -1056,7 +1056,7 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
avail.pop_back();
if (myFolders != toRebootFrom) {
CODE_PROBE(true, "Simulated machine swapped data folders");
CODE_PROBE(true, "Simulated machine swapped data folders", probe::decoration::rare);
TraceEvent("SimulatedMachineFolderSwap", randomId)
.detail("OldFolder0", myFolders[0])
.detail("NewFolder0", toRebootFrom[0])

View File

@ -90,7 +90,7 @@ void StorageServerMetrics::notify(KeyRef key, StorageMetrics& metrics) {
if (g_network->isSimulated()) {
CODE_PROBE(metrics.bytesPerKSecond != 0, "ShardNotifyMetrics bytes");
CODE_PROBE(metrics.iosPerKSecond != 0, "ShardNotifyMetrics ios");
CODE_PROBE(metrics.bytesReadPerKSecond != 0, "ShardNotifyMetrics bytesRead");
CODE_PROBE(metrics.bytesReadPerKSecond != 0, "ShardNotifyMetrics bytesRead", probe::decoration::rare);
}
double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;

View File

@ -157,7 +157,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) {
if (h.size()) {
CODE_PROBE(true, "Zero fill within size field");
CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -2189,7 +2189,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
.detail("LogId", logData->logId)
.detail("Version", it->version.get())
.detail("QueueVer", it->queueCommittedVersion.get());
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue");
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue", probe::decoration::rare);
it->queueCommittedVersion.set(it->version.get());
}
return Void();

View File

@ -2968,9 +2968,8 @@ public:
workerHealth[req.address].degradedPeers[degradedPeer] = { currentTime, currentTime };
}
// TODO(zhewu): add disconnected peers in worker health.
for (const auto& degradedPeer : req.disconnectedPeers) {
workerHealth[req.address].degradedPeers[degradedPeer] = { currentTime, currentTime };
workerHealth[req.address].disconnectedPeers[degradedPeer] = { currentTime, currentTime };
}
return;
@ -2980,23 +2979,24 @@ public:
auto& health = workerHealth[req.address];
auto updateDegradedPeer = [&health, currentTime](const NetworkAddress& degradedPeer) {
auto it = health.degradedPeers.find(degradedPeer);
if (it == health.degradedPeers.end()) {
health.degradedPeers[degradedPeer] = { currentTime, currentTime };
return;
}
it->second.lastRefreshTime = currentTime;
};
// Update the worker's degradedPeers.
for (const auto& peer : req.degradedPeers) {
updateDegradedPeer(peer);
auto it = health.degradedPeers.find(peer);
if (it == health.degradedPeers.end()) {
health.degradedPeers[peer] = { currentTime, currentTime };
continue;
}
it->second.lastRefreshTime = currentTime;
}
// TODO(zhewu): add disconnected peers in worker health.
// Update the worker's disconnectedPeers.
for (const auto& peer : req.disconnectedPeers) {
updateDegradedPeer(peer);
auto it = health.disconnectedPeers.find(peer);
if (it == health.disconnectedPeers.end()) {
health.disconnectedPeers[peer] = { currentTime, currentTime };
continue;
}
it->second.lastRefreshTime = currentTime;
}
}
@ -3012,10 +3012,18 @@ public:
++it;
}
}
for (auto it = health.disconnectedPeers.begin(); it != health.disconnectedPeers.end();) {
if (currentTime - it->second.lastRefreshTime > SERVER_KNOBS->CC_DEGRADED_LINK_EXPIRATION_INTERVAL) {
TraceEvent("WorkerPeerHealthRecovered").detail("Worker", workerAddress).detail("Peer", it->first);
health.disconnectedPeers.erase(it++);
} else {
++it;
}
}
}
for (auto it = workerHealth.begin(); it != workerHealth.end();) {
if (it->second.degradedPeers.empty()) {
if (it->second.degradedPeers.empty() && it->second.disconnectedPeers.empty()) {
TraceEvent("WorkerAllPeerHealthRecovered").detail("Worker", it->first);
workerHealth.erase(it++);
} else {
@ -3028,6 +3036,8 @@ public:
std::unordered_set<NetworkAddress>
degradedServers; // The servers that the cluster controller is considered as degraded. The servers in this
// list are not excluded unless they are added to `excludedDegradedServers`.
std::unordered_set<NetworkAddress>
disconnectedServers; // Similar to the above list, but the servers experiencing connection issue.
bool degradedSatellite = false; // Indicates that the entire satellite DC is degraded.
};
@ -3038,6 +3048,7 @@ public:
// Build a map keyed by measured degraded peer. This map gives the info that who complains a particular server.
std::unordered_map<NetworkAddress, std::unordered_set<NetworkAddress>> degradedLinkDst2Src;
std::unordered_map<NetworkAddress, std::unordered_set<NetworkAddress>> disconnectedLinkDst2Src;
double currentTime = now();
for (const auto& [server, health] : workerHealth) {
for (const auto& [degradedPeer, times] : health.degradedPeers) {
@ -3047,6 +3058,13 @@ public:
}
degradedLinkDst2Src[degradedPeer].insert(server);
}
for (const auto& [disconnectedPeer, times] : health.disconnectedPeers) {
if (currentTime - times.startTime < SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL) {
// This degraded link is not long enough to be considered as degraded.
continue;
}
disconnectedLinkDst2Src[disconnectedPeer].insert(server);
}
}
// Sort degraded peers based on the number of workers complaining about it.
@ -3056,6 +3074,12 @@ public:
}
std::sort(count2DegradedPeer.begin(), count2DegradedPeer.end(), std::greater<>());
std::vector<std::pair<int, NetworkAddress>> count2DisconnectedPeer;
for (const auto& [disconnectedPeer, complainers] : disconnectedLinkDst2Src) {
count2DisconnectedPeer.push_back({ complainers.size(), disconnectedPeer });
}
std::sort(count2DisconnectedPeer.begin(), count2DisconnectedPeer.end(), std::greater<>());
// Go through all reported degraded peers by decreasing order of the number of complainers. For a particular
// degraded peer, if a complainer has already be considered as degraded, we skip the current examine degraded
// peer since there has been one endpoint on the link between degradedPeer and complainer considered as
@ -3084,9 +3108,25 @@ public:
}
}
DegradationInfo currentDegradationInfo;
for (const auto& [complainerCount, badServer] : count2DisconnectedPeer) {
for (const auto& complainer : disconnectedLinkDst2Src[badServer]) {
if (currentDegradationInfo.disconnectedServers.find(complainer) ==
currentDegradationInfo.disconnectedServers.end()) {
currentDegradationInfo.disconnectedServers.insert(badServer);
break;
}
}
if (SERVER_KNOBS->CC_ENABLE_ENTIRE_SATELLITE_MONITORING &&
addressInDbAndPrimarySatelliteDc(badServer, db.serverInfo) &&
complainerCount >= SERVER_KNOBS->CC_SATELLITE_DEGRADATION_MIN_COMPLAINER) {
++satelliteBadServerCount;
}
}
// For degraded server that are complained by more than SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE, we
// don't know if it is a hot server, or the network is bad. We remove from the returned degraded server list.
DegradationInfo currentDegradationInfo;
for (const auto& badServer : currentDegradedServers) {
if (degradedLinkDst2Src[badServer].size() <= SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE) {
currentDegradationInfo.degradedServers.insert(badServer);
@ -3104,43 +3144,48 @@ public:
// Whether the transaction system (in primary DC if in HA setting) contains degraded servers.
bool transactionSystemContainsDegradedServers() {
const ServerDBInfo dbi = db.serverInfo->get();
for (const auto& excludedServer : degradationInfo.degradedServers) {
if (dbi.master.addresses().contains(excludedServer)) {
return true;
}
for (auto& logSet : dbi.logSystemConfig.tLogs) {
if (!logSet.isLocal || logSet.locality == tagLocalitySatellite) {
continue;
const ServerDBInfo& dbi = db.serverInfo->get();
auto transactionWorkerInList = [&dbi](const std::unordered_set<NetworkAddress>& serverList) -> bool {
for (const auto& server : serverList) {
if (dbi.master.addresses().contains(server)) {
return true;
}
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(excludedServer)) {
for (const auto& logSet : dbi.logSystemConfig.tLogs) {
if (!logSet.isLocal || logSet.locality == tagLocalitySatellite) {
continue;
}
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(server)) {
return true;
}
}
}
for (const auto& proxy : dbi.client.grvProxies) {
if (proxy.addresses().contains(server)) {
return true;
}
}
for (const auto& proxy : dbi.client.commitProxies) {
if (proxy.addresses().contains(server)) {
return true;
}
}
for (const auto& resolver : dbi.resolvers) {
if (resolver.addresses().contains(server)) {
return true;
}
}
}
for (auto& proxy : dbi.client.grvProxies) {
if (proxy.addresses().contains(excludedServer)) {
return true;
}
}
return false;
};
for (auto& proxy : dbi.client.commitProxies) {
if (proxy.addresses().contains(excludedServer)) {
return true;
}
}
for (auto& resolver : dbi.resolvers) {
if (resolver.addresses().contains(excludedServer)) {
return true;
}
}
}
return false;
return transactionWorkerInList(degradationInfo.degradedServers) ||
transactionWorkerInList(degradationInfo.disconnectedServers);
}
// Whether transaction system in the remote DC, e.g. log router and tlogs in the remote DC, contains degraded
@ -3156,6 +3201,12 @@ public:
}
}
for (const auto& excludedServer : degradationInfo.disconnectedServers) {
if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
return true;
}
}
return false;
}
@ -3188,7 +3239,8 @@ public:
// Returns true when the cluster controller should trigger a recovery due to degraded servers used in the
// transaction system in the primary data center.
bool shouldTriggerRecoveryDueToDegradedServers() {
if (degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
if (degradationInfo.degradedServers.size() + degradationInfo.disconnectedServers.size() >
SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
return false;
}
@ -3227,8 +3279,9 @@ public:
return true;
}
if (degradationInfo.degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION ||
degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) {
int degradedServerSize = degradationInfo.degradedServers.size() + degradationInfo.disconnectedServers.size();
if (degradedServerSize < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION ||
degradedServerSize > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) {
return false;
}
@ -3319,6 +3372,7 @@ public:
double lastRefreshTime = 0;
};
std::unordered_map<NetworkAddress, DegradedTimes> degradedPeers;
std::unordered_map<NetworkAddress, DegradedTimes> disconnectedPeers;
// TODO(zhewu): Include disk and CPU signals.
};

View File

@ -160,7 +160,9 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
return Void();
}
CODE_PROBE(proxyItr->second.latestRequestNum.get() < req.requestNum - 1, "Commit version request queued up");
CODE_PROBE(proxyItr->second.latestRequestNum.get() < req.requestNum - 1,
"Commit version request queued up",
probe::decoration::rare);
wait(proxyItr->second.latestRequestNum.whenAtLeast(req.requestNum - 1));
auto itr = proxyItr->second.replies.find(req.requestNum);
@ -169,7 +171,8 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
req.reply.send(itr->second);
} else if (req.requestNum <= proxyItr->second.latestRequestNum.get()) {
CODE_PROBE(true,
"Old request for previously acknowledged sequence - may be impossible with current FlowTransport");
"Old request for previously acknowledged sequence - may be impossible with current FlowTransport",
probe::decoration::rare);
ASSERT(req.requestNum <
proxyItr->second.latestRequestNum.get()); // The latest request can never be acknowledged
req.reply.send(Never());
@ -442,11 +445,20 @@ ACTOR Future<Void> masterServer(MasterInterface mi,
addActor.getFuture().pop();
}
CODE_PROBE(err.code() == error_code_tlog_failed, "Master: terminated due to tLog failure");
CODE_PROBE(err.code() == error_code_commit_proxy_failed, "Master: terminated due to commit proxy failure");
CODE_PROBE(err.code() == error_code_grv_proxy_failed, "Master: terminated due to GRV proxy failure");
CODE_PROBE(err.code() == error_code_resolver_failed, "Master: terminated due to resolver failure");
CODE_PROBE(err.code() == error_code_backup_worker_failed, "Master: terminated due to backup worker failure");
CODE_PROBE(
err.code() == error_code_tlog_failed, "Master: terminated due to tLog failure", probe::decoration::rare);
CODE_PROBE(err.code() == error_code_commit_proxy_failed,
"Master: terminated due to commit proxy failure",
probe::decoration::rare);
CODE_PROBE(err.code() == error_code_grv_proxy_failed,
"Master: terminated due to GRV proxy failure",
probe::decoration::rare);
CODE_PROBE(err.code() == error_code_resolver_failed,
"Master: terminated due to resolver failure",
probe::decoration::rare);
CODE_PROBE(err.code() == error_code_backup_worker_failed,
"Master: terminated due to backup worker failure",
probe::decoration::rare);
if (normalMasterErrors().count(err.code())) {
TraceEvent("MasterTerminated", mi.id()).error(err);

View File

@ -2011,7 +2011,8 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
options.debugID = metadata->debugID;
CODE_PROBE(latest >= minVersion && latest < data->data().latestVersion,
"Starting watch loop with latestVersion > data->version");
"Starting watch loop with latestVersion > data->version",
probe::decoration::rare);
GetValueRequest getReq(
span.context, TenantInfo(), metadata->key, latest, metadata->tags, options, VersionVector());
state Future<Void> getValue = getValueQ(
@ -4558,7 +4559,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either
// begin is past end or equal to end (so the result is definitely empty)
if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) {
CODE_PROBE(true, "wrong_shard_server due to offset in getMappedKeyValuesQ");
CODE_PROBE(true, "wrong_shard_server due to offset in getMappedKeyValuesQ", probe::decoration::rare);
// We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end,
// and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway
// via its "slow path"), but we would have to add some flags to the response to encode whether we went off
@ -4754,7 +4755,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either
// begin is past end or equal to end (so the result is definitely empty)
if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) {
CODE_PROBE(true, "wrong_shard_server due to offset in rangeStream");
CODE_PROBE(true, "wrong_shard_server due to offset in rangeStream", probe::decoration::rare);
// We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end,
// and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway
// via its "slow path"), but we would have to add some flags to the response to encode whether we went off
@ -6131,7 +6132,8 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
if (!existing) {
CODE_PROBE(cleanupPending,
"Fetch change feed which is cleanup pending. This means there was a move away and a move back, "
"this will remake the metadata");
"this will remake the metadata",
probe::decoration::rare);
changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
changeFeedInfo->range = cfEntry.range;
@ -6182,7 +6184,9 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
}
if (changeFeedInfo->destroyed) {
CODE_PROBE(true, "Change feed fetched and destroyed by other fetch while fetching metadata");
CODE_PROBE(true,
"Change feed fetched and destroyed by other fetch while fetching metadata",
probe::decoration::rare);
continue;
}
@ -6240,7 +6244,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
// isn't in the fetched response. In that case, the feed must have been destroyed between lastMetadataVersion
// and fetchedMetadataVersion
if (lastMetadataVersion >= fetchedMetadataVersion) {
CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away");
CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away", probe::decoration::rare);
continue;
}
@ -9865,7 +9869,7 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
}
when(ReadHotSubRangeRequest req = waitNext(ssi.getReadHotRanges.getFuture())) {
if (!self->isReadable(req.keys)) {
CODE_PROBE(true, "readHotSubRanges immediate wrong_shard_server()");
CODE_PROBE(true, "readHotSubRanges immediate wrong_shard_server()", probe::decoration::rare);
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
} else {
self->metrics.getReadHotRanges(req);
@ -10089,7 +10093,8 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
self->sendErrorWithPenalty(req.reply, e, self->getPenalty());
break;
}
CODE_PROBE(true, "Reading a watched key failed with transaction_too_old case 5");
CODE_PROBE(
true, "Reading a watched key failed with transaction_too_old case 5", probe::decoration::rare);
}
}
}

View File

@ -245,7 +245,7 @@ ACTOR Future<Void> handleIOErrors(Future<Void> actor, IClosable* store, UID id,
CODE_PROBE(true, "Worker terminated with file_not_found error");
return Void();
} else if (e.getError().code() == error_code_lock_file_failure) {
CODE_PROBE(true, "Unable to lock file");
CODE_PROBE(true, "Unable to lock file", probe::decoration::rare);
throw please_reboot_kv_store();
}
throw e.getError();
@ -1055,6 +1055,7 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
if (disconnectedPeer || degradedPeer) {
TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("Peer", address)
.detail("Satellite", true)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("Disconnected", disconnectedPeer)
.detail("MinLatency", peer->pingLatencies.min())
@ -2214,7 +2215,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// from the same reqId. To keep epoch safety between different managers, instead of restarting the
// same manager id at the same epoch, we should just tell it the original request succeeded, and let
// it realize this manager died via failure detection and start a new one.
CODE_PROBE(true, "Recruited while formerly the same blob manager.");
CODE_PROBE(true, "Recruited while formerly the same blob manager.", probe::decoration::rare);
} else {
// TODO: it'd be more optimal to halt the last manager if present here, but it will figure it out
// via the epoch check

View File

@ -451,7 +451,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
throw;
}
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
CODE_PROBE(true, "BGV verified too old after purge");
CODE_PROBE(true, "BGV verified too old after purge", probe::decoration::rare);
}
}
}
@ -730,7 +730,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (!foundAnyHistoryForRange) {
// if range never existed in blob, and was doing the initial snapshot, it could have a
// change feed but not a history entry/snapshot
CODE_PROBE(true, "not failing test for leaked feed with no history");
CODE_PROBE(
true, "not failing test for leaked feed with no history", probe::decoration::rare);
fmt::print("Not failing test b/c feed never had history!\n");
}
return !foundAnyHistoryForRange;

View File

@ -329,7 +329,8 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
}
Future<Void> start(Database const& cx) override {
CODE_PROBE(true, "Testing");
// TODO: Enable this workload in testing
CODE_PROBE(true, "Running EncryptKeyProxyTest", probe::decoration::rare);
if (!enableTest) {
return Void();
}
@ -343,4 +344,4 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
std::atomic<int> EncryptKeyProxyTestWorkload::seed = 0;
WorkloadFactory<EncryptKeyProxyTestWorkload> EncryptKeyProxyTestWorkloadFactory("EncryptKeyProxyTest");
WorkloadFactory<EncryptKeyProxyTestWorkload> EncryptKeyProxyTestWorkloadFactory("EncryptKeyProxyTest");

View File

@ -134,7 +134,7 @@ bool TDMetricCollection::canLog(int level) const {
void TDMetricCollection::checkRoll(uint64_t t, int64_t usedBytes) {
currentTimeBytes += usedBytes;
if (currentTimeBytes > 1e6) {
CODE_PROBE(true, "metrics were rolled");
CODE_PROBE(true, "metrics were rolled", probe::decoration::rare);
currentTimeBytes = 0;
rollTimes.push_back(t);
for (auto& it : metricMap)

View File

@ -114,6 +114,25 @@ constexpr auto noSim = !simOnly;
} // namespace assert
namespace decoration {
// Code probes that currently (as of 9/25/2022) are not expected to show up in a 250k-test Joshua run
// are marked as "rare." This indicates a testing bug, and these probes should either be removed or testing
// coverage should be improved to hit them. Ideally, then, we should remove uses of this annotation in the
// long-term. However, this annotation has been added to prevent further regressions in code coverage, so that
// we can detect changes that fail to hit non-rare code probes.
//
// This should also hopefully help with debugging, because if a code probe is marked as rare, it means that this
// is a case not likely hit in simulation, and it may be a case that is more prone to buggy behaviour.
struct Rare {
constexpr static AnnotationType type = AnnotationType::Decoration;
void trace(struct ICodeProbe const*, BaseTraceEvent& evt, bool) const { evt.detail("Rare", true); }
};
constexpr Rare rare;
} // namespace decoration
namespace func {
struct Deduplicate {

View File

@ -1,4 +1,5 @@
storageEngineExcludeTypes=3
[configuration]
storageEngineExcludeTypes=[3]
[[test]]
testTitle = 'SubmitBackup'

View File

@ -1,5 +1,6 @@
[configuration]
extraMachineCountDC = 2
storageEngineExcludeTypes = [3]
[[test]]
testTitle = 'CloggedConfigureDatabaseTest'

View File

@ -1,3 +1,6 @@
[configuration]
storageEngineExcludeTypes = [3]
[[test]]
testTitle='CloggedConfigureDatabaseTest'
clearAfterTest=false

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5]
storageEngineExcludeTypes=3,4,5
;Take snap and do cycle test
testTitle=SnapCyclePre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5]
storageEngineExcludeTypes=4,5
buggify=off
testTitle=SnapCycleRestore

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5]
storageEngineExcludeTypes=3,4,5
logAntiQuorum = 0

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5]
storageEngineExcludeTypes=4,5
testTitle=RestoreBackup
simBackupAgents=BackupToFile

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5]
storageEngineExcludeTypes=3,4,5
;write 1000 Keys ending with even numbers
testTitle=SnapTestPre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5]
storageEngineExcludeTypes=4,5
buggify=off

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5]
storageEngineExcludeTypes=3,4,5
;write 1000 Keys ending with even numbers
testTitle=SnapTestPre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5]
storageEngineExcludeTypes=4,5
buggify=off

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5]
storageEngineExcludeTypes=3,4,5
;write 1000 Keys ending with even number
testTitle=SnapSimplePre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5]
storageEngineExcludeTypes=4,5
buggify=off

View File

@ -1,3 +1,6 @@
[configuration]
storageEngineExcludeTypes = [3]
[[knobs]]
enable_version_vector = true
enable_version_vector_tlog_unicast = true

View File

@ -1,3 +1,6 @@
[configuration]
storageEngineExcludeTypes = [3]
[[knobs]]
enable_version_vector = false
enable_version_vector_tlog_unicast = false

View File

@ -3,7 +3,7 @@ extraMachineCountDC = 2
maxTLogVersion=6
disableHostname=true
disableEncryption=true
storageEngineExcludeTypes=[4]
storageEngineExcludeTypes=[3,4]
[[test]]
testTitle = 'CloggedConfigureDatabaseTest'