Merge commit '7c89cd705faee52d5d78e6c77665cb7cc4502f58' into storageserver-pml

This commit is contained in:
Steve Atherton 2022-10-07 11:39:42 -07:00
commit b7ce834d28
54 changed files with 252 additions and 145 deletions

View File

@ -198,6 +198,10 @@ void ApiWorkload::clearTenantData(TTaskFct cont, std::optional<int> tenantId) {
void ApiWorkload::clearData(TTaskFct cont) { void ApiWorkload::clearData(TTaskFct cont) {
execTransaction( execTransaction(
[this](auto ctx) { [this](auto ctx) {
// Make this self-conflicting, so that if we're retrying on timeouts
// once we get a successful commit all previous attempts are no
// longer in-flight.
ctx->tx().addReadConflictRange(keyPrefix, keyPrefix + fdb::Key(1, '\xff'));
ctx->tx().clearRange(keyPrefix, keyPrefix + fdb::Key(1, '\xff')); ctx->tx().clearRange(keyPrefix, keyPrefix + fdb::Key(1, '\xff'));
ctx->commit(); ctx->commit();
}, },

View File

@ -180,24 +180,13 @@ public:
if (databaseCreateErrorInjected && canBeInjectedDatabaseCreateError(err.code())) { if (databaseCreateErrorInjected && canBeInjectedDatabaseCreateError(err.code())) {
// Failed to create a database because of failure injection // Failed to create a database because of failure injection
// Restart by recreating the transaction in a valid database // Restart by recreating the transaction in a valid database
auto thisRef = std::static_pointer_cast<TransactionContextBase>(shared_from_this()); recreateAndRestartTransaction();
scheduler->schedule([thisRef]() {
fdb::Database db = thisRef->executor->selectDatabase();
thisRef->fdbDb.atomic_store(db);
if (thisRef->transactional) {
if (thisRef->tenantName) {
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
thisRef->fdbTx.atomic_store(tenant.createTransaction());
} else {
thisRef->fdbTx.atomic_store(db.createTransaction());
}
}
thisRef->restartTransaction();
});
} else if (transactional) { } else if (transactional) {
onErrorArg = err; onErrorArg = err;
onErrorFuture = tx().onError(err); onErrorFuture = tx().onError(err);
handleOnErrorFuture(); handleOnErrorFuture();
} else if (err.retryable()) {
restartTransaction();
} else { } else {
transactionFailed(err); transactionFailed(err);
} }
@ -262,6 +251,23 @@ protected:
startFct(shared_from_this()); startFct(shared_from_this());
} }
void recreateAndRestartTransaction() {
auto thisRef = std::static_pointer_cast<TransactionContextBase>(shared_from_this());
scheduler->schedule([thisRef]() {
fdb::Database db = thisRef->executor->selectDatabase();
thisRef->fdbDb.atomic_store(db);
if (thisRef->transactional) {
if (thisRef->tenantName) {
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
thisRef->fdbTx.atomic_store(tenant.createTransaction());
} else {
thisRef->fdbTx.atomic_store(db.createTransaction());
}
}
thisRef->restartTransaction();
});
}
// Checks if a transaction can be retried. Fails the transaction if the check fails // Checks if a transaction can be retried. Fails the transaction if the check fails
bool canRetry(fdb::Error lastErr) { bool canRetry(fdb::Error lastErr) {
ASSERT(txState == TxState::ON_ERROR); ASSERT(txState == TxState::ON_ERROR);
@ -671,11 +677,23 @@ public:
try { try {
std::shared_ptr<ITransactionContext> ctx; std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) { if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>( ctx = std::make_shared<BlockingTransactionContext>(this,
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true); startFct,
cont,
scheduler,
options.transactionRetryLimit,
bgBasePath,
tenantName,
transactional);
} else { } else {
ctx = std::make_shared<AsyncTransactionContext>( ctx = std::make_shared<AsyncTransactionContext>(this,
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true); startFct,
cont,
scheduler,
options.transactionRetryLimit,
bgBasePath,
tenantName,
transactional);
} }
startFct(ctx); startFct(ctx);
} catch (...) { } catch (...) {

View File

@ -662,6 +662,13 @@ public:
void clearRange(KeyRef begin, KeyRef end) { void clearRange(KeyRef begin, KeyRef end) {
native::fdb_transaction_clear_range(tr.get(), begin.data(), intSize(begin), end.data(), intSize(end)); native::fdb_transaction_clear_range(tr.get(), begin.data(), intSize(begin), end.data(), intSize(end));
} }
void addReadConflictRange(KeyRef begin, KeyRef end) {
if (auto err = Error(native::fdb_transaction_add_conflict_range(
tr.get(), begin.data(), intSize(begin), end.data(), intSize(end), FDB_CONFLICT_RANGE_TYPE_READ))) {
throwError("fdb_transaction_add_conflict_range returned error: ", err);
}
}
}; };
class Tenant final { class Tenant final {

View File

@ -4606,7 +4606,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
.detail("RestoreVersion", restoreVersion) .detail("RestoreVersion", restoreVersion)
.detail("Dest", destVersion); .detail("Dest", destVersion);
if (destVersion <= restoreVersion) { if (destVersion <= restoreVersion) {
CODE_PROBE(true, "Forcing restored cluster to higher version"); CODE_PROBE(true, "Forcing restored cluster to higher version", probe::decoration::rare);
tr->set(minRequiredCommitVersionKey, BinaryWriter::toValue(restoreVersion + 1, Unversioned())); tr->set(minRequiredCommitVersionKey, BinaryWriter::toValue(restoreVersion + 1, Unversioned()));
wait(tr->commit()); wait(tr->commit());
} else { } else {

View File

@ -1461,7 +1461,10 @@ Optional<TenantName> MultiVersionTransaction::getTenant() {
// Waits for the specified duration and signals the assignment variable with a timed out error // Waits for the specified duration and signals the assignment variable with a timed out error
// This will be canceled if a new timeout is set, in which case the tsav will not be signaled. // This will be canceled if a new timeout is set, in which case the tsav will not be signaled.
ACTOR Future<Void> timeoutImpl(Reference<ThreadSingleAssignmentVar<Void>> tsav, double duration) { ACTOR Future<Void> timeoutImpl(Reference<ThreadSingleAssignmentVar<Void>> tsav, double duration) {
wait(delay(duration)); state double endTime = now() + duration;
while (now() < endTime) {
wait(delayUntil(std::min(endTime + 0.0001, now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL)));
}
tsav->trySendError(transaction_timed_out()); tsav->trySendError(transaction_timed_out());
return Void(); return Void();
@ -1501,14 +1504,17 @@ void MultiVersionTransaction::setTimeout(Optional<StringRef> value) {
{ // lock scope { // lock scope
ThreadSpinLockHolder holder(timeoutLock); ThreadSpinLockHolder holder(timeoutLock);
Reference<ThreadSingleAssignmentVar<Void>> tsav = timeoutTsav;
ThreadFuture<Void> newTimeout = onMainThread([transactionStartTime, tsav, timeoutDuration]() {
return timeoutImpl(tsav, timeoutDuration - std::max(0.0, now() - transactionStartTime));
});
prevTimeout = currentTimeout; prevTimeout = currentTimeout;
currentTimeout = newTimeout;
if (timeoutDuration > 0) {
Reference<ThreadSingleAssignmentVar<Void>> tsav = timeoutTsav;
ThreadFuture<Void> newTimeout = onMainThread([transactionStartTime, tsav, timeoutDuration]() {
return timeoutImpl(tsav, timeoutDuration - std::max(0.0, now() - transactionStartTime));
});
currentTimeout = newTimeout;
} else {
currentTimeout = ThreadFuture<Void>();
}
} }
// Cancel the previous timeout now that we have a new one. This means that changing the timeout // Cancel the previous timeout now that we have a new one. This means that changing the timeout
@ -1577,6 +1583,9 @@ void MultiVersionTransaction::reset() {
MultiVersionTransaction::~MultiVersionTransaction() { MultiVersionTransaction::~MultiVersionTransaction() {
timeoutTsav->trySendError(transaction_cancelled()); timeoutTsav->trySendError(transaction_cancelled());
if (currentTimeout.isValid()) {
currentTimeout.cancel();
}
} }
bool MultiVersionTransaction::isValid() { bool MultiVersionTransaction::isValid() {

View File

@ -2710,7 +2710,7 @@ bool DatabaseContext::isCurrentGrvProxy(UID proxyId) const {
if (proxy.id() == proxyId) if (proxy.id() == proxyId)
return true; return true;
} }
CODE_PROBE(true, "stale GRV proxy detected"); CODE_PROBE(true, "stale GRV proxy detected", probe::decoration::rare);
return false; return false;
} }
@ -3734,7 +3734,7 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
} else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) { } else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) {
// clang-format off // clang-format off
CODE_PROBE(e.code() == error_code_watch_cancelled, "Too many watches on the storage server, poll for changes instead"); CODE_PROBE(e.code() == error_code_watch_cancelled, "Too many watches on the storage server, poll for changes instead");
CODE_PROBE(e.code() == error_code_process_behind, "The storage servers are all behind"); CODE_PROBE(e.code() == error_code_process_behind, "The storage servers are all behind", probe::decoration::rare);
// clang-format on // clang-format on
wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, parameters->taskID)); wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, parameters->taskID));
} else if (e.code() == error_code_timed_out) { // The storage server occasionally times out watches in case } else if (e.code() == error_code_timed_out) { // The storage server occasionally times out watches in case
@ -5661,18 +5661,18 @@ Future<Void> Transaction::getRangeStream(PromiseStream<RangeResult>& results,
KeySelector b = begin; KeySelector b = begin;
if (b.orEqual) { if (b.orEqual) {
CODE_PROBE(true, "Native stream begin orEqual==true"); CODE_PROBE(true, "Native stream begin orEqual==true", probe::decoration::rare);
b.removeOrEqual(b.arena()); b.removeOrEqual(b.arena());
} }
KeySelector e = end; KeySelector e = end;
if (e.orEqual) { if (e.orEqual) {
CODE_PROBE(true, "Native stream end orEqual==true"); CODE_PROBE(true, "Native stream end orEqual==true", probe::decoration::rare);
e.removeOrEqual(e.arena()); e.removeOrEqual(e.arena());
} }
if (b.offset >= e.offset && b.getKey() >= e.getKey()) { if (b.offset >= e.offset && b.getKey() >= e.getKey()) {
CODE_PROBE(true, "Native stream range inverted"); CODE_PROBE(true, "Native stream range inverted", probe::decoration::rare);
results.sendError(end_of_stream()); results.sendError(end_of_stream());
return Void(); return Void();
} }
@ -9755,7 +9755,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
results->storageData.clear(); results->storageData.clear();
if (e.code() == error_code_change_feed_popped) { if (e.code() == error_code_change_feed_popped) {
++db->feedNonRetriableErrors; ++db->feedNonRetriableErrors;
CODE_PROBE(true, "getChangeFeedStreamActor got popped"); CODE_PROBE(true, "getChangeFeedStreamActor got popped", probe::decoration::rare);
results->mutations.sendError(e); results->mutations.sendError(e);
results->refresh.sendError(e); results->refresh.sendError(e);
} else { } else {

View File

@ -1214,7 +1214,7 @@ public:
// isolation support. But it is not default and is rarely used. So we disallow it until we have thorough test // isolation support. But it is not default and is rarely used. So we disallow it until we have thorough test
// coverage for it.) // coverage for it.)
if (snapshot) { if (snapshot) {
CODE_PROBE(true, "getMappedRange not supported for snapshot."); CODE_PROBE(true, "getMappedRange not supported for snapshot.", probe::decoration::rare);
throw unsupported_operation(); throw unsupported_operation();
} }
// For now, getMappedRange requires read-your-writes being NOT disabled. But the support of RYW is limited // For now, getMappedRange requires read-your-writes being NOT disabled. But the support of RYW is limited
@ -1223,7 +1223,7 @@ public:
// which returns the written value transparently. In another word, it makes sure not break RYW semantics without // which returns the written value transparently. In another word, it makes sure not break RYW semantics without
// actually implementing reading from the writes. // actually implementing reading from the writes.
if (ryw->options.readYourWritesDisabled) { if (ryw->options.readYourWritesDisabled) {
CODE_PROBE(true, "getMappedRange not supported for read-your-writes disabled."); CODE_PROBE(true, "getMappedRange not supported for read-your-writes disabled.", probe::decoration::rare);
throw unsupported_operation(); throw unsupported_operation();
} }
@ -1331,6 +1331,11 @@ public:
ACTOR static void simulateTimeoutInFlightCommit(ReadYourWritesTransaction* ryw_) { ACTOR static void simulateTimeoutInFlightCommit(ReadYourWritesTransaction* ryw_) {
state Reference<ReadYourWritesTransaction> ryw = Reference<ReadYourWritesTransaction>::addRef(ryw_); state Reference<ReadYourWritesTransaction> ryw = Reference<ReadYourWritesTransaction>::addRef(ryw_);
ASSERT(ryw->options.timeoutInSeconds > 0); ASSERT(ryw->options.timeoutInSeconds > 0);
// An actual in-flight commit (i.e. one that's past the point where cancelling the transaction would stop it)
// would already have a read version. We need to get a read version too, otherwise committing a conflicting
// transaction may not ensure this transaction is no longer in-flight, since this transaction could get a read
// version _after_.
wait(success(ryw->getReadVersion()));
if (!ryw->resetPromise.isSet()) if (!ryw->resetPromise.isSet())
ryw->resetPromise.sendError(transaction_timed_out()); ryw->resetPromise.sendError(transaction_timed_out());
wait(delay(deterministicRandom()->random01() * 5)); wait(delay(deterministicRandom()->random01() * 5));
@ -1649,7 +1654,7 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(KeySelector begin,
// This optimization prevents nullptr operations from being added to the conflict range // This optimization prevents nullptr operations from being added to the conflict range
if (limits.isReached()) { if (limits.isReached()) {
CODE_PROBE(true, "RYW range read limit 0"); CODE_PROBE(true, "RYW range read limit 0", probe::decoration::rare);
return RangeResult(); return RangeResult();
} }
@ -1663,7 +1668,7 @@ Future<RangeResult> ReadYourWritesTransaction::getRange(KeySelector begin,
end.removeOrEqual(end.arena()); end.removeOrEqual(end.arena());
if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) { if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) {
CODE_PROBE(true, "RYW range inverted"); CODE_PROBE(true, "RYW range inverted", probe::decoration::rare);
return RangeResult(); return RangeResult();
} }

View File

@ -116,7 +116,7 @@ public:
static Future<Void> deleteFile(std::string filename, bool mustBeDurable) { static Future<Void> deleteFile(std::string filename, bool mustBeDurable) {
::deleteFile(filename); ::deleteFile(filename);
if (mustBeDurable) { if (mustBeDurable) {
CODE_PROBE(true, "deleteFile and fsync parent dir"); CODE_PROBE(true, "deleteFile and fsync parent dir", probe::decoration::rare);
return async_fsync_parent(filename); return async_fsync_parent(filename);
} else } else
return Void(); return Void();

View File

@ -360,7 +360,7 @@ public:
//(e.g. to simulate power failure) //(e.g. to simulate power failure)
Future<Void> kill() { Future<Void> kill() {
TraceEvent("AsyncFileNonDurable_Kill", id).detail("Filename", filename); TraceEvent("AsyncFileNonDurable_Kill", id).detail("Filename", filename);
CODE_PROBE(true, "AsyncFileNonDurable was killed"); CODE_PROBE(true, "AsyncFileNonDurable was killed", probe::decoration::rare);
return sync(this, false); return sync(this, false);
} }
@ -404,7 +404,7 @@ private:
TraceEvent("AsyncFileNonDurable_KilledFileOperation", self->id) TraceEvent("AsyncFileNonDurable_KilledFileOperation", self->id)
.detail("In", context) .detail("In", context)
.detail("Filename", self->filename); .detail("Filename", self->filename);
CODE_PROBE(true, "AsyncFileNonDurable operation killed"); CODE_PROBE(true, "AsyncFileNonDurable operation killed", probe::decoration::rare);
throw io_error().asInjectedFault(); throw io_error().asInjectedFault();
} }
@ -603,13 +603,13 @@ private:
.detail("HasGarbage", garbage) .detail("HasGarbage", garbage)
.detail("Side", side) .detail("Side", side)
.detail("Filename", self->filename); .detail("Filename", self->filename);
CODE_PROBE(true, "AsyncFileNonDurable bad write"); CODE_PROBE(true, "AsyncFileNonDurable bad write", probe::decoration::rare);
} else { } else {
TraceEvent("AsyncFileNonDurable_DroppedWrite", self->id) TraceEvent("AsyncFileNonDurable_DroppedWrite", self->id)
.detail("Offset", offset + writeOffset + pageOffset) .detail("Offset", offset + writeOffset + pageOffset)
.detail("Length", sectorLength) .detail("Length", sectorLength)
.detail("Filename", self->filename); .detail("Filename", self->filename);
CODE_PROBE(true, "AsyncFileNonDurable dropped write"); CODE_PROBE(true, "AsyncFileNonDurable dropped write", probe::decoration::rare);
} }
pageOffset += sectorLength; pageOffset += sectorLength;
@ -689,7 +689,7 @@ private:
wait(self->file->truncate(size)); wait(self->file->truncate(size));
else { else {
TraceEvent("AsyncFileNonDurable_DroppedTruncate", self->id).detail("Size", size); TraceEvent("AsyncFileNonDurable_DroppedTruncate", self->id).detail("Size", size);
CODE_PROBE(true, "AsyncFileNonDurable dropped truncate"); CODE_PROBE(true, "AsyncFileNonDurable dropped truncate", probe::decoration::rare);
} }
return Void(); return Void();
@ -753,7 +753,7 @@ private:
// temporary file and then renamed to the correct location once sync is called. By not calling sync, we // temporary file and then renamed to the correct location once sync is called. By not calling sync, we
// simulate a failure to fsync the directory storing the file // simulate a failure to fsync the directory storing the file
if (self->hasBeenSynced && writeDurable && deterministicRandom()->random01() < 0.5) { if (self->hasBeenSynced && writeDurable && deterministicRandom()->random01() < 0.5) {
CODE_PROBE(true, "AsyncFileNonDurable kill was durable and synced"); CODE_PROBE(true, "AsyncFileNonDurable kill was durable and synced", probe::decoration::rare);
wait(success(errorOr(self->file->sync()))); wait(success(errorOr(self->file->sync())));
} }

View File

@ -1654,7 +1654,8 @@ public:
CODE_PROBE(kt == FailDisk, CODE_PROBE(kt == FailDisk,
"Simulated machine was killed with a failed disk", "Simulated machine was killed with a failed disk",
probe::context::sim2, probe::context::sim2,
probe::assert::simOnly); probe::assert::simOnly,
probe::decoration::rare);
if (kt == KillInstantly) { if (kt == KillInstantly) {
TraceEvent(SevWarn, "FailMachine") TraceEvent(SevWarn, "FailMachine")
@ -2126,8 +2127,11 @@ public:
.detail("KillTypeMin", ktMin) .detail("KillTypeMin", ktMin)
.detail("KilledDC", kt == ktMin); .detail("KilledDC", kt == ktMin);
CODE_PROBE( CODE_PROBE(kt != ktMin,
kt != ktMin, "DataCenter kill was rejected by killMachine", probe::context::sim2, probe::assert::simOnly); "DataCenter kill was rejected by killMachine",
probe::context::sim2,
probe::assert::simOnly,
probe::decoration::rare);
CODE_PROBE((kt == ktMin) && (kt == RebootAndDelete), CODE_PROBE((kt == ktMin) && (kt == RebootAndDelete),
"Datacenter kill Resulted in a reboot and delete", "Datacenter kill Resulted in a reboot and delete",
probe::context::sim2, probe::context::sim2,

View File

@ -644,7 +644,7 @@ private:
if (!initialCommit) if (!initialCommit)
txnStateStore->set(KeyValueRef(m.param1, m.param2)); txnStateStore->set(KeyValueRef(m.param1, m.param2));
confChange = true; confChange = true;
CODE_PROBE(true, "Setting version epoch"); CODE_PROBE(true, "Setting version epoch", probe::decoration::rare);
} }
void checkSetWriteRecoverKey(MutationRef m) { void checkSetWriteRecoverKey(MutationRef m) {

View File

@ -1558,7 +1558,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr->getTransaction(), granuleRange)); ForcedPurgeState purgeState = wait(getForcePurgedState(&tr->getTransaction(), granuleRange));
if (purgeState != ForcedPurgeState::NonePurged) { if (purgeState != ForcedPurgeState::NonePurged) {
CODE_PROBE(true, "Initial Split Re-evaluate stopped because of force purge"); CODE_PROBE(true, "Initial Split Re-evaluate stopped because of force purge", probe::decoration::rare);
TraceEvent("GranuleSplitReEvalCancelledForcePurge", bmData->id) TraceEvent("GranuleSplitReEvalCancelledForcePurge", bmData->id)
.detail("Epoch", bmData->epoch) .detail("Epoch", bmData->epoch)
.detail("GranuleRange", granuleRange); .detail("GranuleRange", granuleRange);
@ -1579,7 +1579,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
KeyRange range = blobGranuleFileKeyRangeFor(granuleID); KeyRange range = blobGranuleFileKeyRangeFor(granuleID);
RangeResult granuleFiles = wait(tr->getRange(range, 1)); RangeResult granuleFiles = wait(tr->getRange(range, 1));
if (!granuleFiles.empty()) { if (!granuleFiles.empty()) {
CODE_PROBE(true, "split too big was eventually solved by another worker"); CODE_PROBE(true, "split too big was eventually solved by another worker", probe::decoration::rare);
if (BM_DEBUG) { if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: solved by another worker\n", fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: solved by another worker\n",
bmData->epoch, bmData->epoch,
@ -1637,7 +1637,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
RangeResult existingRanges = wait( RangeResult existingRanges = wait(
krmGetRanges(tr, blobGranuleMappingKeys.begin, granuleRange, 3, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); krmGetRanges(tr, blobGranuleMappingKeys.begin, granuleRange, 3, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
if (existingRanges.size() > 2 || existingRanges.more) { if (existingRanges.size() > 2 || existingRanges.more) {
CODE_PROBE(true, "split too big was already re-split"); CODE_PROBE(true, "split too big was already re-split", probe::decoration::rare);
if (BM_DEBUG) { if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: already split\n", fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: already split\n",
bmData->epoch, bmData->epoch,
@ -2077,7 +2077,7 @@ ACTOR Future<bool> forceGranuleFlush(Reference<BlobManagerData> bmData,
try { try {
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr, keyRange)); ForcedPurgeState purgeState = wait(getForcePurgedState(&tr, keyRange));
if (purgeState != ForcedPurgeState::NonePurged) { if (purgeState != ForcedPurgeState::NonePurged) {
CODE_PROBE(true, "Granule flush stopped because of force purge"); CODE_PROBE(true, "Granule flush stopped because of force purge", probe::decoration::rare);
TraceEvent("GranuleFlushCancelledForcePurge", bmData->id) TraceEvent("GranuleFlushCancelledForcePurge", bmData->id)
.detail("Epoch", bmData->epoch) .detail("Epoch", bmData->epoch)
.detail("KeyRange", keyRange); .detail("KeyRange", keyRange);
@ -2225,7 +2225,7 @@ ACTOR Future<std::pair<UID, Version>> persistMergeGranulesStart(Reference<BlobMa
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr->getTransaction(), mergeRange)); ForcedPurgeState purgeState = wait(getForcePurgedState(&tr->getTransaction(), mergeRange));
if (purgeState != ForcedPurgeState::NonePurged) { if (purgeState != ForcedPurgeState::NonePurged) {
CODE_PROBE(true, "Merge start stopped because of force purge"); CODE_PROBE(true, "Merge start stopped because of force purge", probe::decoration::rare);
TraceEvent("GranuleMergeStartCancelledForcePurge", bmData->id) TraceEvent("GranuleMergeStartCancelledForcePurge", bmData->id)
.detail("Epoch", bmData->epoch) .detail("Epoch", bmData->epoch)
.detail("GranuleRange", mergeRange); .detail("GranuleRange", mergeRange);
@ -2311,7 +2311,7 @@ ACTOR Future<bool> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
} }
} }
if (tmpWorkerId == UID()) { if (tmpWorkerId == UID()) {
CODE_PROBE(true, "All workers dead right now"); CODE_PROBE(true, "All workers dead right now", probe::decoration::rare);
while (bmData->workersById.empty()) { while (bmData->workersById.empty()) {
wait(bmData->recruitingStream.onChange() || bmData->foundBlobWorkers.getFuture()); wait(bmData->recruitingStream.onChange() || bmData->foundBlobWorkers.getFuture());
} }
@ -2564,7 +2564,9 @@ static void attemptStartMerge(Reference<BlobManagerData> bmData,
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(mergeRange); auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(mergeRange);
for (auto it : reCheckMergeCandidates) { for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().mergeEligible()) { if (!it->cvalue().mergeEligible()) {
CODE_PROBE(true, " granule no longer merge candidate after checking metrics, aborting merge"); CODE_PROBE(true,
"granule no longer merge candidate after checking metrics, aborting merge",
probe::decoration::rare);
return; return;
} }
} }

View File

@ -659,7 +659,7 @@ ACTOR Future<Void> updateGranuleSplitState(Transaction* tr,
CODE_PROBE(true, "Granule split stopping change feed"); CODE_PROBE(true, "Granule split stopping change feed");
} }
} else if (BW_DEBUG) { } else if (BW_DEBUG) {
CODE_PROBE(true, "Out of order granule split state updates ignored"); CODE_PROBE(true, "Out of order granule split state updates ignored", probe::decoration::rare);
fmt::print("Ignoring granule {0} split state from {1} {2} -> {3}\n", fmt::print("Ignoring granule {0} split state from {1} {2} -> {3}\n",
currentGranuleID.toString(), currentGranuleID.toString(),
parentGranuleID.toString(), parentGranuleID.toString(),
@ -2650,7 +2650,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// queue too many files in parallel, and slow down change feed consuming to let file writing // queue too many files in parallel, and slow down change feed consuming to let file writing
// catch up // catch up
CODE_PROBE(true, "Granule processing long tail of old change feed"); CODE_PROBE(true, "Granule processing long tail of old change feed", probe::decoration::rare);
if (inFlightFiles.size() > 10 && inFlightFiles.front().version <= metadata->knownCommittedVersion) { if (inFlightFiles.size() > 10 && inFlightFiles.front().version <= metadata->knownCommittedVersion) {
if (BW_DEBUG) { if (BW_DEBUG) {
fmt::print("[{0} - {1}) Waiting on delta file b/c old change feed\n", fmt::print("[{0} - {1}) Waiting on delta file b/c old change feed\n",
@ -2731,7 +2731,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// FIXME: better way to fix this? // FIXME: better way to fix this?
bool isForcePurging = wait(checkFileNotFoundForcePurgeRace(bwData, metadata->keyRange)); bool isForcePurging = wait(checkFileNotFoundForcePurgeRace(bwData, metadata->keyRange));
if (isForcePurging) { if (isForcePurging) {
CODE_PROBE(true, "Granule got file not found from force purge"); CODE_PROBE(true, "Granule got file not found from force purge", probe::decoration::rare);
TraceEvent("GranuleFileUpdaterFileNotFoundForcePurge", bwData->id) TraceEvent("GranuleFileUpdaterFileNotFoundForcePurge", bwData->id)
.error(e2) .error(e2)
.detail("KeyRange", metadata->keyRange) .detail("KeyRange", metadata->keyRange)
@ -3550,7 +3550,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// We can get change feed cancelled from whenAtLeast. This means the change feed may // We can get change feed cancelled from whenAtLeast. This means the change feed may
// retry, or may be cancelled. Wait a bit and try again to see // retry, or may be cancelled. Wait a bit and try again to see
if (e.code() == error_code_change_feed_popped) { if (e.code() == error_code_change_feed_popped) {
CODE_PROBE(true, "Change feed popped while read waiting"); CODE_PROBE(true, "Change feed popped while read waiting", probe::decoration::rare);
throw wrong_shard_server(); throw wrong_shard_server();
} }
if (e.code() != error_code_change_feed_cancelled) { if (e.code() != error_code_change_feed_cancelled) {
@ -3574,7 +3574,9 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
Version emptyVersion = metadata->activeCFData.get()->popVersion - 1; Version emptyVersion = metadata->activeCFData.get()->popVersion - 1;
if (req.readVersion > metadata->durableDeltaVersion.get() && if (req.readVersion > metadata->durableDeltaVersion.get() &&
emptyVersion > metadata->bufferedDeltaVersion) { emptyVersion > metadata->bufferedDeltaVersion) {
CODE_PROBE(true, "feed popped for read but granule updater didn't notice yet"); CODE_PROBE(true,
"feed popped for read but granule updater didn't notice yet",
probe::decoration::rare);
// FIXME: could try to cancel the actor here somehow, but it should find out eventually // FIXME: could try to cancel the actor here somehow, but it should find out eventually
throw wrong_shard_server(); throw wrong_shard_server();
} }
@ -3789,7 +3791,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
when(wait(doBlobGranuleFileRequest(bwData, req))) {} when(wait(doBlobGranuleFileRequest(bwData, req))) {}
when(wait(delay(SERVER_KNOBS->BLOB_WORKER_REQUEST_TIMEOUT))) { when(wait(delay(SERVER_KNOBS->BLOB_WORKER_REQUEST_TIMEOUT))) {
if (!req.reply.isSet()) { if (!req.reply.isSet()) {
CODE_PROBE(true, "Blob Worker request timeout hit"); CODE_PROBE(true, "Blob Worker request timeout hit", probe::decoration::rare);
if (BW_DEBUG) { if (BW_DEBUG) {
fmt::print("BW {0} request [{1} - {2}) @ {3} timed out, sending WSS\n", fmt::print("BW {0} request [{1} - {2}) @ {3} timed out, sending WSS\n",
bwData->id.toString().substr(0, 5), bwData->id.toString().substr(0, 5),
@ -3878,7 +3880,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
// if it's the first snapshot of a new granule, history won't be present // if it's the first snapshot of a new granule, history won't be present
if (info.history.present()) { if (info.history.present()) {
if (info.granuleID != info.history.get().value.granuleID) { if (info.granuleID != info.history.get().value.granuleID) {
CODE_PROBE(true, "Blob Worker re-opening granule after merge+resplit"); CODE_PROBE(true, "Blob Worker re-opening granule after merge+resplit", probe::decoration::rare);
// The only case this can happen is when a granule was merged into a larger granule, // The only case this can happen is when a granule was merged into a larger granule,
// then split back out to the same one. Validate that this is a new granule that was // then split back out to the same one. Validate that this is a new granule that was
// split previously. Just check lock based on epoch, since seqno is intentionally // split previously. Just check lock based on epoch, since seqno is intentionally

View File

@ -368,7 +368,9 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
CODE_PROBE(err.code() == error_code_grv_proxy_failed, "Terminated due to GRV proxy failure"); CODE_PROBE(err.code() == error_code_grv_proxy_failed, "Terminated due to GRV proxy failure");
CODE_PROBE(err.code() == error_code_resolver_failed, "Terminated due to resolver failure"); CODE_PROBE(err.code() == error_code_resolver_failed, "Terminated due to resolver failure");
CODE_PROBE(err.code() == error_code_backup_worker_failed, "Terminated due to backup worker failure"); CODE_PROBE(err.code() == error_code_backup_worker_failed, "Terminated due to backup worker failure");
CODE_PROBE(err.code() == error_code_operation_failed, "Terminated due to failed operation"); CODE_PROBE(err.code() == error_code_operation_failed,
"Terminated due to failed operation",
probe::decoration::rare);
CODE_PROBE(err.code() == error_code_restart_cluster_controller, CODE_PROBE(err.code() == error_code_restart_cluster_controller,
"Terminated due to cluster-controller restart."); "Terminated due to cluster-controller restart.");
@ -1308,7 +1310,7 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
} }
checkOutstandingRequests(self); checkOutstandingRequests(self);
} else { } else {
CODE_PROBE(true, "Received an old worker registration request."); CODE_PROBE(true, "Received an old worker registration request.", probe::decoration::rare);
} }
// For each singleton // For each singleton

View File

@ -751,7 +751,7 @@ ACTOR Future<Void> updateLogsValue(Reference<ClusterRecoveryData> self, Database
} }
if (!found) { if (!found) {
CODE_PROBE(true, "old master attempted to change logsKey"); CODE_PROBE(true, "old master attempted to change logsKey", probe::decoration::rare);
return Void(); return Void();
} }
@ -830,7 +830,7 @@ ACTOR Future<Void> updateRegistration(Reference<ClusterRecoveryData> self, Refer
std::vector<UID>())); std::vector<UID>()));
} else { } else {
// The cluster should enter the accepting commits phase soon, and then we will register again // The cluster should enter the accepting commits phase soon, and then we will register again
CODE_PROBE(true, "cstate is updated but we aren't accepting commits yet"); CODE_PROBE(true, "cstate is updated but we aren't accepting commits yet", probe::decoration::rare);
} }
} }
} }

View File

@ -230,7 +230,7 @@ class ConfigNodeImpl {
// Handle a very rare case where a ConfigNode loses data between // Handle a very rare case where a ConfigNode loses data between
// responding with a committed version and responding to the // responding with a committed version and responding to the
// subsequent get changes request. // subsequent get changes request.
CODE_PROBE(true, "ConfigNode data loss occurred on a minority of coordinators"); CODE_PROBE(true, "ConfigNode data loss occurred on a minority of coordinators", probe::decoration::rare);
req.reply.sendError(process_behind()); // Reuse the process_behind error req.reply.sendError(process_behind()); // Reuse the process_behind error
return Void(); return Void();
} }

View File

@ -323,7 +323,8 @@ struct MovableCoordinatedStateImpl {
Value oldQuorumState = wait(cs.read()); Value oldQuorumState = wait(cs.read());
if (oldQuorumState != self->lastCSValue.get()) { if (oldQuorumState != self->lastCSValue.get()) {
CODE_PROBE(true, "Quorum change aborted by concurrent write to old coordination state"); CODE_PROBE(
true, "Quorum change aborted by concurrent write to old coordination state", probe::decoration::rare);
TraceEvent("QuorumChangeAbortedByConcurrency").log(); TraceEvent("QuorumChangeAbortedByConcurrency").log();
throw coordinated_state_conflict(); throw coordinated_state_conflict();
} }

View File

@ -1934,7 +1934,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
throw error; throw error;
} }
} else { } else {
CODE_PROBE(true, "move to removed server"); CODE_PROBE(true, "move to removed server", probe::decoration::rare);
healthyDestinations.addDataInFlightToTeam(-metrics.bytes); healthyDestinations.addDataInFlightToTeam(-metrics.bytes);
auto readLoad = metrics.bytesReadPerKSecond; auto readLoad = metrics.bytesReadPerKSecond;
auto& destinationRef = healthyDestinations; auto& destinationRef = healthyDestinations;

View File

@ -1375,7 +1375,7 @@ ACTOR Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKM
when(wait(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01) ? Never() when(wait(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01) ? Never()
: fetchTopKShardMetrics_impl(self, req))) {} : fetchTopKShardMetrics_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT"); CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT", probe::decoration::rare);
req.reply.send(GetTopKMetricsReply()); req.reply.send(GetTopKMetricsReply());
} }
} }

View File

@ -1321,7 +1321,7 @@ public:
} }
} }
if (addedNewBadTeam && self->badTeamRemover.isReady()) { if (addedNewBadTeam && self->badTeamRemover.isReady()) {
CODE_PROBE(true, "Server locality change created bad teams"); CODE_PROBE(true, "Server locality change created bad teams", probe::decoration::rare);
self->doBuildTeams = true; self->doBuildTeams = true;
self->badTeamRemover = removeBadTeams(self); self->badTeamRemover = removeBadTeams(self);
self->addActor.send(self->badTeamRemover); self->addActor.send(self->badTeamRemover);

View File

@ -917,7 +917,7 @@ ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>>
configuration.storageTeamSize - 1) - configuration.storageTeamSize - 1) -
storageFailures; storageFailures;
if (*storageFaultTolerance < 0) { if (*storageFaultTolerance < 0) {
CODE_PROBE(true, "Too many failed storage servers to complete snapshot"); CODE_PROBE(true, "Too many failed storage servers to complete snapshot", probe::decoration::rare);
throw snap_storage_failed(); throw snap_storage_failed();
} }
// tlogs // tlogs
@ -938,7 +938,7 @@ ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>>
// get coordinators // get coordinators
Optional<Value> coordinators = wait(tr.get(coordinatorsKey)); Optional<Value> coordinators = wait(tr.get(coordinatorsKey));
if (!coordinators.present()) { if (!coordinators.present()) {
CODE_PROBE(true, "Failed to read the coordinatorsKey"); CODE_PROBE(true, "Failed to read the coordinatorsKey", probe::decoration::rare);
throw operation_failed(); throw operation_failed();
} }
ClusterConnectionString ccs(coordinators.get().toString()); ClusterConnectionString ccs(coordinators.get().toString());

View File

@ -375,7 +375,7 @@ bool LogPushData::writeTransactionInfo(int location, uint32_t subseq) {
// parent->child. // parent->child.
SpanContextMessage contextMessage; SpanContextMessage contextMessage;
if (spanContext.isSampled()) { if (spanContext.isSampled()) {
CODE_PROBE(true, "Converting OTELSpanContextMessage to traced SpanContextMessage"); CODE_PROBE(true, "Converting OTELSpanContextMessage to traced SpanContextMessage", probe::decoration::rare);
contextMessage = SpanContextMessage(UID(spanContext.traceID.first(), spanContext.traceID.second())); contextMessage = SpanContextMessage(UID(spanContext.traceID.first(), spanContext.traceID.second()));
} else { } else {
CODE_PROBE(true, "Converting OTELSpanContextMessage to untraced SpanContextMessage"); CODE_PROBE(true, "Converting OTELSpanContextMessage to untraced SpanContextMessage");

View File

@ -652,7 +652,7 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
// Attempt to move onto a server that isn't in serverList (removed or never added to the // Attempt to move onto a server that isn't in serverList (removed or never added to the
// database) This can happen (why?) and is handled by the data distribution algorithm // database) This can happen (why?) and is handled by the data distribution algorithm
// FIXME: Answer why this can happen? // FIXME: Answer why this can happen?
CODE_PROBE(true, "start move keys moving to a removed server"); CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare);
throw move_to_removed_server(); throw move_to_removed_server();
} }
} }
@ -846,7 +846,7 @@ ACTOR Future<Void> checkFetchingState(Database cx,
for (int s = 0; s < serverListValues.size(); s++) { for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) { if (!serverListValues[s].present()) {
// FIXME: Is this the right behavior? dataMovementComplete will never be sent! // FIXME: Is this the right behavior? dataMovementComplete will never be sent!
CODE_PROBE(true, "check fetching state moved to removed server"); CODE_PROBE(true, "check fetching state moved to removed server", probe::decoration::rare);
throw move_to_removed_server(); throw move_to_removed_server();
} }
auto si = decodeServerListValue(serverListValues[s].get()); auto si = decodeServerListValue(serverListValues[s].get());

View File

@ -98,7 +98,7 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
SpanContextMessage scm; SpanContextMessage scm;
br >> scm; br >> scm;
} else if (OTELSpanContextMessage::startsOTELSpanContextMessage(mutationType)) { } else if (OTELSpanContextMessage::startsOTELSpanContextMessage(mutationType)) {
CODE_PROBE(true, "MutationTracking reading OTELSpanContextMessage"); CODE_PROBE(true, "MutationTracking reading OTELSpanContextMessage", probe::decoration::rare);
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
OTELSpanContextMessage scm; OTELSpanContextMessage scm;
br >> scm; br >> scm;

View File

@ -182,7 +182,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t))); Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) { if (h.size() != sizeof(uint32_t)) {
if (h.size()) { if (h.size()) {
CODE_PROBE(true, "Zero fill within size field"); CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0; int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size()); memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -488,7 +488,7 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
CODE_PROBE(true, "TLog stopped by recovering master"); CODE_PROBE(true, "TLog stopped by recovering master");
CODE_PROBE(logData->stopped, "LogData already stopped"); CODE_PROBE(logData->stopped, "LogData already stopped");
CODE_PROBE(!logData->stopped, "LogData not yet stopped"); CODE_PROBE(!logData->stopped, "LogData not yet stopped", probe::decoration::rare);
TraceEvent("TLogStop", logData->logId) TraceEvent("TLogStop", logData->logId)
.detail("Ver", stopVersion) .detail("Ver", stopVersion)
@ -1026,7 +1026,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
} }
if (sequenceData.isSet()) { if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != rep.end) { if (sequenceData.getFuture().get() != rep.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version"); CODE_PROBE(true, "tlog peek second attempt ended at a different version", probe::decoration::rare);
replyPromise.sendError(operation_obsolete()); replyPromise.sendError(operation_obsolete());
return Void(); return Void();
} }
@ -1099,7 +1099,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
auto& sequenceData = trackerData.sequence_version[sequence + 1]; auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (sequenceData.isSet()) { if (sequenceData.isSet()) {
if (sequenceData.getFuture().get() != reply.end) { if (sequenceData.getFuture().get() != reply.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)"); CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)", probe::decoration::rare);
replyPromise.sendError(operation_obsolete()); replyPromise.sendError(operation_obsolete());
return Void(); return Void();
} }
@ -1462,7 +1462,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
if (!fFormat.get().present()) { if (!fFormat.get().present()) {
RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1)); RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1));
if (!v.size()) { if (!v.size()) {
CODE_PROBE(true, "The DB is completely empty, so it was never initialized. Delete it."); CODE_PROBE(
true, "The DB is completely empty, so it was never initialized. Delete it.", probe::decoration::rare);
throw worker_removed(); throw worker_removed();
} else { } else {
// This should never happen // This should never happen
@ -1548,7 +1549,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
try { try {
loop { loop {
if (allRemoved.isReady()) { if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery"); CODE_PROBE(true, "all tlogs removed during queue recovery", probe::decoration::rare);
throw worker_removed(); throw worker_removed();
} }
choose { choose {

View File

@ -148,7 +148,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t))); Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) { if (h.size() != sizeof(uint32_t)) {
if (h.size()) { if (h.size()) {
CODE_PROBE(true, "Zero fill within size field"); CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0; int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size()); memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -162,7 +162,7 @@ private:
Standalone<StringRef> e = wait(self->queue->readNext(payloadSize + 1)); Standalone<StringRef> e = wait(self->queue->readNext(payloadSize + 1));
if (e.size() != payloadSize + 1) { if (e.size() != payloadSize + 1) {
CODE_PROBE(true, "Zero fill within payload"); CODE_PROBE(true, "Zero fill within payload", probe::decoration::rare);
zeroFillSize = payloadSize + 1 - e.size(); zeroFillSize = payloadSize + 1 - e.size();
break; break;
} }
@ -176,7 +176,7 @@ private:
} }
} }
if (zeroFillSize) { if (zeroFillSize) {
CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue"); CODE_PROBE(true, "Fixing a partial commit at the end of the tlog queue", probe::decoration::rare);
for (int i = 0; i < zeroFillSize; i++) for (int i = 0; i < zeroFillSize; i++)
self->queue->push(StringRef((const uint8_t*)"", 1)); self->queue->push(StringRef((const uint8_t*)"", 1));
} }
@ -631,7 +631,7 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
CODE_PROBE(true, "TLog stopped by recovering master"); CODE_PROBE(true, "TLog stopped by recovering master");
CODE_PROBE(logData->stopped, "logData already stopped"); CODE_PROBE(logData->stopped, "logData already stopped");
CODE_PROBE(!logData->stopped, "logData not yet stopped"); CODE_PROBE(!logData->stopped, "logData not yet stopped", probe::decoration::rare);
TraceEvent("TLogStop", logData->logId) TraceEvent("TLogStop", logData->logId)
.detail("Ver", stopVersion) .detail("Ver", stopVersion)
@ -1317,7 +1317,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
} }
if (sequenceData.isSet()) { if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) { if (sequenceData.getFuture().get().first != rep.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version"); CODE_PROBE(true, "tlog peek second attempt ended at a different version", probe::decoration::rare);
replyPromise.sendError(operation_obsolete()); replyPromise.sendError(operation_obsolete());
return Void(); return Void();
} }
@ -1415,7 +1415,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
if (sequenceData.isSet()) { if (sequenceData.isSet()) {
trackerData.duplicatePeeks++; trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) { if (sequenceData.getFuture().get().first != reply.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)"); CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)", probe::decoration::rare);
replyPromise.sendError(operation_obsolete()); replyPromise.sendError(operation_obsolete());
return Void(); return Void();
} }
@ -1522,7 +1522,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
.detail("LogId", logData->logId) .detail("LogId", logData->logId)
.detail("Version", it->version.get()) .detail("Version", it->version.get())
.detail("QueueVer", it->queueCommittedVersion.get()); .detail("QueueVer", it->queueCommittedVersion.get());
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue"); CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue", probe::decoration::rare);
it->queueCommittedVersion.set(it->version.get()); it->queueCommittedVersion.set(it->version.get());
} }
return Void(); return Void();
@ -1983,7 +1983,7 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) { when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) {
//TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get()); //TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get());
ASSERT(logData->isPrimary); ASSERT(logData->isPrimary);
CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped"); CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped", probe::decoration::rare);
if (!logData->stopped) if (!logData->stopped)
logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput)); logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput));
else else
@ -2325,7 +2325,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
if (!fFormat.get().present()) { if (!fFormat.get().present()) {
RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1)); RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1));
if (!v.size()) { if (!v.size()) {
CODE_PROBE(true, "The DB is completely empty, so it was never initialized. Delete it."); CODE_PROBE(
true, "The DB is completely empty, so it was never initialized. Delete it.", probe::decoration::rare);
throw worker_removed(); throw worker_removed();
} else { } else {
// This should never happen // This should never happen
@ -2465,7 +2466,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
try { try {
loop { loop {
if (allRemoved.isReady()) { if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery"); CODE_PROBE(true, "all tlogs removed during queue recovery", probe::decoration::rare);
throw worker_removed(); throw worker_removed();
} }
choose { choose {

View File

@ -156,7 +156,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t))); Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) { if (h.size() != sizeof(uint32_t)) {
if (h.size()) { if (h.size()) {
CODE_PROBE(true, "Zero fill within size field"); CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0; int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size()); memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -733,7 +733,7 @@ ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply,
CODE_PROBE(true, "TLog stopped by recovering master"); CODE_PROBE(true, "TLog stopped by recovering master");
CODE_PROBE(logData->stopped, "logData already stopped"); CODE_PROBE(logData->stopped, "logData already stopped");
CODE_PROBE(!logData->stopped, "logData not yet stopped"); CODE_PROBE(!logData->stopped, "logData not yet stopped", probe::decoration::rare);
TraceEvent("TLogStop", logData->logId) TraceEvent("TLogStop", logData->logId)
.detail("Ver", stopVersion) .detail("Ver", stopVersion)
@ -1655,7 +1655,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
} }
if (sequenceData.isSet()) { if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != rep.end) { if (sequenceData.getFuture().get().first != rep.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version"); CODE_PROBE(true, "tlog peek second attempt ended at a different version", probe::decoration::rare);
replyPromise.sendError(operation_obsolete()); replyPromise.sendError(operation_obsolete());
return Void(); return Void();
} }
@ -1843,7 +1843,7 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
if (sequenceData.isSet()) { if (sequenceData.isSet()) {
trackerData.duplicatePeeks++; trackerData.duplicatePeeks++;
if (sequenceData.getFuture().get().first != reply.end) { if (sequenceData.getFuture().get().first != reply.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)"); CODE_PROBE(true, "tlog peek second attempt ended at a different version (2)", probe::decoration::rare);
replyPromise.sendError(operation_obsolete()); replyPromise.sendError(operation_obsolete());
return Void(); return Void();
} }
@ -1905,7 +1905,7 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
wait(lowPriorityDelay(SERVER_KNOBS->TLOG_DEGRADED_DURATION)); wait(lowPriorityDelay(SERVER_KNOBS->TLOG_DEGRADED_DURATION));
TraceEvent(SevWarnAlways, "TLogDegraded", self->dbgid).log(); TraceEvent(SevWarnAlways, "TLogDegraded", self->dbgid).log();
CODE_PROBE(true, "TLog degraded"); CODE_PROBE(true, "TLog degraded", probe::decoration::rare);
self->degraded->set(true); self->degraded->set(true);
return Void(); return Void();
} }
@ -1963,7 +1963,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
.detail("LogId", logData->logId) .detail("LogId", logData->logId)
.detail("Version", it->version.get()) .detail("Version", it->version.get())
.detail("QueueVer", it->queueCommittedVersion.get()); .detail("QueueVer", it->queueCommittedVersion.get());
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue"); CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue", probe::decoration::rare);
it->queueCommittedVersion.set(it->version.get()); it->queueCommittedVersion.set(it->version.get());
} }
return Void(); return Void();
@ -2427,7 +2427,7 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) { when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) {
//TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get()); //TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get());
ASSERT(logData->isPrimary); ASSERT(logData->isPrimary);
CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped"); CODE_PROBE(logData->stopped, "TLogCommitRequest while stopped", probe::decoration::rare);
if (!logData->stopped) if (!logData->stopped)
logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput)); logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput));
else else
@ -2792,7 +2792,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
if (!fFormat.get().present()) { if (!fFormat.get().present()) {
RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1)); RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), "\xff"_sr), 1));
if (!v.size()) { if (!v.size()) {
CODE_PROBE(true, "The DB is completely empty, so it was never initialized. Delete it."); CODE_PROBE(
true, "The DB is completely empty, so it was never initialized. Delete it.", probe::decoration::rare);
throw worker_removed(); throw worker_removed();
} else { } else {
// This should never happen // This should never happen
@ -2940,7 +2941,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
throw end_of_stream(); throw end_of_stream();
loop { loop {
if (allRemoved.isReady()) { if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery"); CODE_PROBE(true, "all tlogs removed during queue recovery", probe::decoration::rare);
throw worker_removed(); throw worker_removed();
} }
choose { choose {

View File

@ -99,7 +99,7 @@ Optional<double> RkTagThrottleCollection::autoThrottleTag(UID id,
itr = autoThrottledTags.try_emplace(tag).first; itr = autoThrottledTags.try_emplace(tag).first;
initializeTag(tag); initializeTag(tag);
} else if (itr->second.limits.expiration <= now()) { } else if (itr->second.limits.expiration <= now()) {
CODE_PROBE(true, "Re-throttling expired tag that hasn't been cleaned up"); CODE_PROBE(true, "Re-throttling expired tag that hasn't been cleaned up", probe::decoration::rare);
present = false; present = false;
itr->second = RkTagThrottleData(); itr->second = RkTagThrottleData();
} }

View File

@ -908,7 +908,7 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
CODE_PROBE(bootCount >= 1, "Simulated machine rebooted"); CODE_PROBE(bootCount >= 1, "Simulated machine rebooted");
CODE_PROBE(bootCount >= 2, "Simulated machine rebooted twice"); CODE_PROBE(bootCount >= 2, "Simulated machine rebooted twice");
CODE_PROBE(bootCount >= 3, "Simulated machine rebooted three times"); CODE_PROBE(bootCount >= 3, "Simulated machine rebooted three times", probe::decoration::rare);
++bootCount; ++bootCount;
TraceEvent("SimulatedMachineStart", randomId) TraceEvent("SimulatedMachineStart", randomId)
@ -1056,7 +1056,7 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
avail.pop_back(); avail.pop_back();
if (myFolders != toRebootFrom) { if (myFolders != toRebootFrom) {
CODE_PROBE(true, "Simulated machine swapped data folders"); CODE_PROBE(true, "Simulated machine swapped data folders", probe::decoration::rare);
TraceEvent("SimulatedMachineFolderSwap", randomId) TraceEvent("SimulatedMachineFolderSwap", randomId)
.detail("OldFolder0", myFolders[0]) .detail("OldFolder0", myFolders[0])
.detail("NewFolder0", toRebootFrom[0]) .detail("NewFolder0", toRebootFrom[0])

View File

@ -90,7 +90,7 @@ void StorageServerMetrics::notify(KeyRef key, StorageMetrics& metrics) {
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
CODE_PROBE(metrics.bytesPerKSecond != 0, "ShardNotifyMetrics bytes"); CODE_PROBE(metrics.bytesPerKSecond != 0, "ShardNotifyMetrics bytes");
CODE_PROBE(metrics.iosPerKSecond != 0, "ShardNotifyMetrics ios"); CODE_PROBE(metrics.iosPerKSecond != 0, "ShardNotifyMetrics ios");
CODE_PROBE(metrics.bytesReadPerKSecond != 0, "ShardNotifyMetrics bytesRead"); CODE_PROBE(metrics.bytesReadPerKSecond != 0, "ShardNotifyMetrics bytesRead", probe::decoration::rare);
} }
double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL; double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;

View File

@ -157,7 +157,7 @@ private:
Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t))); Standalone<StringRef> h = wait(self->queue->readNext(sizeof(uint32_t)));
if (h.size() != sizeof(uint32_t)) { if (h.size() != sizeof(uint32_t)) {
if (h.size()) { if (h.size()) {
CODE_PROBE(true, "Zero fill within size field"); CODE_PROBE(true, "Zero fill within size field", probe::decoration::rare);
int payloadSize = 0; int payloadSize = 0;
memcpy(&payloadSize, h.begin(), h.size()); memcpy(&payloadSize, h.begin(), h.size());
zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself
@ -2189,7 +2189,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
.detail("LogId", logData->logId) .detail("LogId", logData->logId)
.detail("Version", it->version.get()) .detail("Version", it->version.get())
.detail("QueueVer", it->queueCommittedVersion.get()); .detail("QueueVer", it->queueCommittedVersion.get());
CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue"); CODE_PROBE(true, "A TLog was replaced before having a chance to commit its queue", probe::decoration::rare);
it->queueCommittedVersion.set(it->version.get()); it->queueCommittedVersion.set(it->version.get());
} }
return Void(); return Void();

View File

@ -160,7 +160,9 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
return Void(); return Void();
} }
CODE_PROBE(proxyItr->second.latestRequestNum.get() < req.requestNum - 1, "Commit version request queued up"); CODE_PROBE(proxyItr->second.latestRequestNum.get() < req.requestNum - 1,
"Commit version request queued up",
probe::decoration::rare);
wait(proxyItr->second.latestRequestNum.whenAtLeast(req.requestNum - 1)); wait(proxyItr->second.latestRequestNum.whenAtLeast(req.requestNum - 1));
auto itr = proxyItr->second.replies.find(req.requestNum); auto itr = proxyItr->second.replies.find(req.requestNum);
@ -169,7 +171,8 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
req.reply.send(itr->second); req.reply.send(itr->second);
} else if (req.requestNum <= proxyItr->second.latestRequestNum.get()) { } else if (req.requestNum <= proxyItr->second.latestRequestNum.get()) {
CODE_PROBE(true, CODE_PROBE(true,
"Old request for previously acknowledged sequence - may be impossible with current FlowTransport"); "Old request for previously acknowledged sequence - may be impossible with current FlowTransport",
probe::decoration::rare);
ASSERT(req.requestNum < ASSERT(req.requestNum <
proxyItr->second.latestRequestNum.get()); // The latest request can never be acknowledged proxyItr->second.latestRequestNum.get()); // The latest request can never be acknowledged
req.reply.send(Never()); req.reply.send(Never());
@ -442,11 +445,20 @@ ACTOR Future<Void> masterServer(MasterInterface mi,
addActor.getFuture().pop(); addActor.getFuture().pop();
} }
CODE_PROBE(err.code() == error_code_tlog_failed, "Master: terminated due to tLog failure"); CODE_PROBE(
CODE_PROBE(err.code() == error_code_commit_proxy_failed, "Master: terminated due to commit proxy failure"); err.code() == error_code_tlog_failed, "Master: terminated due to tLog failure", probe::decoration::rare);
CODE_PROBE(err.code() == error_code_grv_proxy_failed, "Master: terminated due to GRV proxy failure"); CODE_PROBE(err.code() == error_code_commit_proxy_failed,
CODE_PROBE(err.code() == error_code_resolver_failed, "Master: terminated due to resolver failure"); "Master: terminated due to commit proxy failure",
CODE_PROBE(err.code() == error_code_backup_worker_failed, "Master: terminated due to backup worker failure"); probe::decoration::rare);
CODE_PROBE(err.code() == error_code_grv_proxy_failed,
"Master: terminated due to GRV proxy failure",
probe::decoration::rare);
CODE_PROBE(err.code() == error_code_resolver_failed,
"Master: terminated due to resolver failure",
probe::decoration::rare);
CODE_PROBE(err.code() == error_code_backup_worker_failed,
"Master: terminated due to backup worker failure",
probe::decoration::rare);
if (normalMasterErrors().count(err.code())) { if (normalMasterErrors().count(err.code())) {
TraceEvent("MasterTerminated", mi.id()).error(err); TraceEvent("MasterTerminated", mi.id()).error(err);

View File

@ -2022,7 +2022,8 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
options.debugID = metadata->debugID; options.debugID = metadata->debugID;
CODE_PROBE(latest >= minVersion && latest < data->data().latestVersion, CODE_PROBE(latest >= minVersion && latest < data->data().latestVersion,
"Starting watch loop with latestVersion > data->version"); "Starting watch loop with latestVersion > data->version",
probe::decoration::rare);
GetValueRequest getReq( GetValueRequest getReq(
span.context, TenantInfo(), metadata->key, latest, metadata->tags, options, VersionVector()); span.context, TenantInfo(), metadata->key, latest, metadata->tags, options, VersionVector());
state Future<Void> getValue = getValueQ( state Future<Void> getValue = getValueQ(
@ -4573,7 +4574,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either // end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either
// begin is past end or equal to end (so the result is definitely empty) // begin is past end or equal to end (so the result is definitely empty)
if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) { if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) {
CODE_PROBE(true, "wrong_shard_server due to offset in getMappedKeyValuesQ"); CODE_PROBE(true, "wrong_shard_server due to offset in getMappedKeyValuesQ", probe::decoration::rare);
// We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end, // We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end,
// and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway // and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway
// via its "slow path"), but we would have to add some flags to the response to encode whether we went off // via its "slow path"), but we would have to add some flags to the response to encode whether we went off
@ -4772,7 +4773,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either // end the last actual key returned must be from this shard. A begin offset of 1 is also OK because then either
// begin is past end or equal to end (so the result is definitely empty) // begin is past end or equal to end (so the result is definitely empty)
if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) { if ((offset1 && offset1 != 1) || (offset2 && offset2 != 1)) {
CODE_PROBE(true, "wrong_shard_server due to offset in rangeStream"); CODE_PROBE(true, "wrong_shard_server due to offset in rangeStream", probe::decoration::rare);
// We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end, // We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end,
// and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway // and return a clipped range rather than an error (since that is what the NativeAPI.getRange will do anyway
// via its "slow path"), but we would have to add some flags to the response to encode whether we went off // via its "slow path"), but we would have to add some flags to the response to encode whether we went off
@ -6152,7 +6153,8 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
if (!existing) { if (!existing) {
CODE_PROBE(cleanupPending, CODE_PROBE(cleanupPending,
"Fetch change feed which is cleanup pending. This means there was a move away and a move back, " "Fetch change feed which is cleanup pending. This means there was a move away and a move back, "
"this will remake the metadata"); "this will remake the metadata",
probe::decoration::rare);
changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo()); changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
changeFeedInfo->range = cfEntry.range; changeFeedInfo->range = cfEntry.range;
@ -6203,7 +6205,9 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
} }
if (changeFeedInfo->destroyed) { if (changeFeedInfo->destroyed) {
CODE_PROBE(true, "Change feed fetched and destroyed by other fetch while fetching metadata"); CODE_PROBE(true,
"Change feed fetched and destroyed by other fetch while fetching metadata",
probe::decoration::rare);
continue; continue;
} }
@ -6261,7 +6265,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
// isn't in the fetched response. In that case, the feed must have been destroyed between lastMetadataVersion // isn't in the fetched response. In that case, the feed must have been destroyed between lastMetadataVersion
// and fetchedMetadataVersion // and fetchedMetadataVersion
if (lastMetadataVersion >= fetchedMetadataVersion) { if (lastMetadataVersion >= fetchedMetadataVersion) {
CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away"); CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away", probe::decoration::rare);
continue; continue;
} }
@ -9898,7 +9902,7 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
} }
when(ReadHotSubRangeRequest req = waitNext(ssi.getReadHotRanges.getFuture())) { when(ReadHotSubRangeRequest req = waitNext(ssi.getReadHotRanges.getFuture())) {
if (!self->isReadable(req.keys)) { if (!self->isReadable(req.keys)) {
CODE_PROBE(true, "readHotSubRanges immediate wrong_shard_server()"); CODE_PROBE(true, "readHotSubRanges immediate wrong_shard_server()", probe::decoration::rare);
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty()); self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
} else { } else {
self->metrics.getReadHotRanges(req); self->metrics.getReadHotRanges(req);
@ -10122,7 +10126,8 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
self->sendErrorWithPenalty(req.reply, e, self->getPenalty()); self->sendErrorWithPenalty(req.reply, e, self->getPenalty());
break; break;
} }
CODE_PROBE(true, "Reading a watched key failed with transaction_too_old case 5"); CODE_PROBE(
true, "Reading a watched key failed with transaction_too_old case 5", probe::decoration::rare);
} }
} }
} }

View File

@ -245,7 +245,7 @@ ACTOR Future<Void> handleIOErrors(Future<Void> actor, IClosable* store, UID id,
CODE_PROBE(true, "Worker terminated with file_not_found error"); CODE_PROBE(true, "Worker terminated with file_not_found error");
return Void(); return Void();
} else if (e.getError().code() == error_code_lock_file_failure) { } else if (e.getError().code() == error_code_lock_file_failure) {
CODE_PROBE(true, "Unable to lock file"); CODE_PROBE(true, "Unable to lock file", probe::decoration::rare);
throw please_reboot_kv_store(); throw please_reboot_kv_store();
} }
throw e.getError(); throw e.getError();
@ -1055,6 +1055,7 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
if (disconnectedPeer || degradedPeer) { if (disconnectedPeer || degradedPeer) {
TraceEvent("HealthMonitorDetectDegradedPeer") TraceEvent("HealthMonitorDetectDegradedPeer")
.detail("Peer", address) .detail("Peer", address)
.detail("Satellite", true)
.detail("Elapsed", now() - peer->lastLoggedTime) .detail("Elapsed", now() - peer->lastLoggedTime)
.detail("Disconnected", disconnectedPeer) .detail("Disconnected", disconnectedPeer)
.detail("MinLatency", peer->pingLatencies.min()) .detail("MinLatency", peer->pingLatencies.min())
@ -2214,7 +2215,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// from the same reqId. To keep epoch safety between different managers, instead of restarting the // from the same reqId. To keep epoch safety between different managers, instead of restarting the
// same manager id at the same epoch, we should just tell it the original request succeeded, and let // same manager id at the same epoch, we should just tell it the original request succeeded, and let
// it realize this manager died via failure detection and start a new one. // it realize this manager died via failure detection and start a new one.
CODE_PROBE(true, "Recruited while formerly the same blob manager."); CODE_PROBE(true, "Recruited while formerly the same blob manager.", probe::decoration::rare);
} else { } else {
// TODO: it'd be more optimal to halt the last manager if present here, but it will figure it out // TODO: it'd be more optimal to halt the last manager if present here, but it will figure it out
// via the epoch check // via the epoch check

View File

@ -451,7 +451,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
throw; throw;
} }
ASSERT(e.code() == error_code_blob_granule_transaction_too_old); ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
CODE_PROBE(true, "BGV verified too old after purge"); CODE_PROBE(true, "BGV verified too old after purge", probe::decoration::rare);
} }
} }
} }
@ -730,7 +730,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (!foundAnyHistoryForRange) { if (!foundAnyHistoryForRange) {
// if range never existed in blob, and was doing the initial snapshot, it could have a // if range never existed in blob, and was doing the initial snapshot, it could have a
// change feed but not a history entry/snapshot // change feed but not a history entry/snapshot
CODE_PROBE(true, "not failing test for leaked feed with no history"); CODE_PROBE(
true, "not failing test for leaked feed with no history", probe::decoration::rare);
fmt::print("Not failing test b/c feed never had history!\n"); fmt::print("Not failing test b/c feed never had history!\n");
} }
return !foundAnyHistoryForRange; return !foundAnyHistoryForRange;

View File

@ -329,7 +329,8 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
} }
Future<Void> start(Database const& cx) override { Future<Void> start(Database const& cx) override {
CODE_PROBE(true, "Testing"); // TODO: Enable this workload in testing
CODE_PROBE(true, "Running EncryptKeyProxyTest", probe::decoration::rare);
if (!enableTest) { if (!enableTest) {
return Void(); return Void();
} }

View File

@ -134,7 +134,7 @@ bool TDMetricCollection::canLog(int level) const {
void TDMetricCollection::checkRoll(uint64_t t, int64_t usedBytes) { void TDMetricCollection::checkRoll(uint64_t t, int64_t usedBytes) {
currentTimeBytes += usedBytes; currentTimeBytes += usedBytes;
if (currentTimeBytes > 1e6) { if (currentTimeBytes > 1e6) {
CODE_PROBE(true, "metrics were rolled"); CODE_PROBE(true, "metrics were rolled", probe::decoration::rare);
currentTimeBytes = 0; currentTimeBytes = 0;
rollTimes.push_back(t); rollTimes.push_back(t);
for (auto& it : metricMap) for (auto& it : metricMap)

View File

@ -114,6 +114,25 @@ constexpr auto noSim = !simOnly;
} // namespace assert } // namespace assert
namespace decoration {
// Code probes that currently (as of 9/25/2022) are not expected to show up in a 250k-test Joshua run
// are marked as "rare." This indicates a testing bug, and these probes should either be removed or testing
// coverage should be improved to hit them. Ideally, then, we should remove uses of this annotation in the
// long-term. However, this annotation has been added to prevent further regressions in code coverage, so that
// we can detect changes that fail to hit non-rare code probes.
//
// This should also hopefully help with debugging, because if a code probe is marked as rare, it means that this
// is a case not likely hit in simulation, and it may be a case that is more prone to buggy behaviour.
struct Rare {
constexpr static AnnotationType type = AnnotationType::Decoration;
void trace(struct ICodeProbe const*, BaseTraceEvent& evt, bool) const { evt.detail("Rare", true); }
};
constexpr Rare rare;
} // namespace decoration
namespace func { namespace func {
struct Deduplicate { struct Deduplicate {

View File

@ -1,4 +1,5 @@
storageEngineExcludeTypes=3 [configuration]
storageEngineExcludeTypes=[3]
[[test]] [[test]]
testTitle = 'SubmitBackup' testTitle = 'SubmitBackup'

View File

@ -1,5 +1,6 @@
[configuration] [configuration]
extraMachineCountDC = 2 extraMachineCountDC = 2
storageEngineExcludeTypes = [3]
[[test]] [[test]]
testTitle = 'CloggedConfigureDatabaseTest' testTitle = 'CloggedConfigureDatabaseTest'

View File

@ -1,3 +1,6 @@
[configuration]
storageEngineExcludeTypes = [3]
[[test]] [[test]]
testTitle='CloggedConfigureDatabaseTest' testTitle='CloggedConfigureDatabaseTest'
clearAfterTest=false clearAfterTest=false

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5] storageEngineExcludeTypes=3,4,5
;Take snap and do cycle test ;Take snap and do cycle test
testTitle=SnapCyclePre testTitle=SnapCyclePre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5] storageEngineExcludeTypes=4,5
buggify=off buggify=off
testTitle=SnapCycleRestore testTitle=SnapCycleRestore

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5] storageEngineExcludeTypes=3,4,5
logAntiQuorum = 0 logAntiQuorum = 0

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5] storageEngineExcludeTypes=4,5
testTitle=RestoreBackup testTitle=RestoreBackup
simBackupAgents=BackupToFile simBackupAgents=BackupToFile

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5] storageEngineExcludeTypes=3,4,5
;write 1000 Keys ending with even numbers ;write 1000 Keys ending with even numbers
testTitle=SnapTestPre testTitle=SnapTestPre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5] storageEngineExcludeTypes=4,5
buggify=off buggify=off

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5] storageEngineExcludeTypes=3,4,5
;write 1000 Keys ending with even numbers ;write 1000 Keys ending with even numbers
testTitle=SnapTestPre testTitle=SnapTestPre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5] storageEngineExcludeTypes=4,5
buggify=off buggify=off

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[3, 4, 5] storageEngineExcludeTypes=3,4,5
;write 1000 Keys ending with even number ;write 1000 Keys ending with even number
testTitle=SnapSimplePre testTitle=SnapSimplePre

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=[4, 5] storageEngineExcludeTypes=4,5
buggify=off buggify=off

View File

@ -1,3 +1,6 @@
[configuration]
storageEngineExcludeTypes = [3]
[[knobs]] [[knobs]]
enable_version_vector = true enable_version_vector = true
enable_version_vector_tlog_unicast = true enable_version_vector_tlog_unicast = true

View File

@ -1,3 +1,6 @@
[configuration]
storageEngineExcludeTypes = [3]
[[knobs]] [[knobs]]
enable_version_vector = false enable_version_vector = false
enable_version_vector_tlog_unicast = false enable_version_vector_tlog_unicast = false

View File

@ -3,7 +3,7 @@ extraMachineCountDC = 2
maxTLogVersion=6 maxTLogVersion=6
disableHostname=true disableHostname=true
disableEncryption=true disableEncryption=true
storageEngineExcludeTypes=[4] storageEngineExcludeTypes=[3,4]
[[test]] [[test]]
testTitle = 'CloggedConfigureDatabaseTest' testTitle = 'CloggedConfigureDatabaseTest'