diff --git a/fdbclient/AuditUtils.actor.cpp b/fdbclient/AuditUtils.actor.cpp index 98f0118bc4..ba43a34d5f 100644 --- a/fdbclient/AuditUtils.actor.cpp +++ b/fdbclient/AuditUtils.actor.cpp @@ -251,7 +251,6 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, TraceEvent(SevDebug, "ConflictWithPreviousOwner"); throw movekeys_conflict(); // need a new name } - // Take the lock if (isWrite) { BinaryWriter wrMyOwner(Unversioned()); @@ -267,7 +266,6 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, .detail("MyOwner", lock.myOwner.toString()) .detail("Writer", lastWriter.toString()); } - return Void(); } else if (currentOwner == lock.myOwner) { if (isWrite) { @@ -278,7 +276,6 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, // Make this transaction self-conflicting so the database will not execute it twice with the same write key tr->makeSelfConflicting(); } - return Void(); } else { CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner"); @@ -287,6 +284,37 @@ ACTOR static Future checkMoveKeysLock(Transaction* tr, } } +ACTOR Future updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled) { + state Transaction tr(cx); + + loop { + try { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + wait(checkMoveKeysLock(&tr, lock, ddEnabled, true)); + // Persist audit result + tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState)); + wait(tr.commit()); + TraceEvent(SevDebug, "AuditUtilUpdateAuditState", auditState.id) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditPhase", auditState.getPhase()) + .detail("AuditKey", auditKey(auditState.getType(), auditState.id)); + break; + } catch (Error& e) { + TraceEvent(SevDebug, "AuditUtilUpdateAuditStateError", auditState.id) + .errorUnsuppressed(e) + .detail("AuditID", auditState.id) + .detail("AuditType", auditState.getType()) + .detail("AuditPhase", auditState.getPhase()) + .detail("AuditKey", auditKey(auditState.getType(), auditState.id)); + wait(tr.onError(e)); + } + } + + return Void(); +} + ACTOR Future persistNewAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, @@ -333,7 +361,6 @@ ACTOR Future persistNewAuditState(Database cx, TraceEvent(SevDebug, "AuditUtilPersistedNewAuditState", auditId) .detail("AuditKey", auditKey(auditState.getType(), auditId)); break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilPersistedNewAuditStateError", auditId) .errorUnsuppressed(e) @@ -384,7 +411,6 @@ ACTOR Future persistAuditState(Database cx, .detail("AuditKey", auditKey(auditState.getType(), auditState.id)) .detail("Context", context); break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilPersistAuditStateError", auditState.id) .errorUnsuppressed(e) @@ -415,7 +441,6 @@ ACTOR Future getAuditState(Database cx, AuditType type, UID i .detail("AuditType", type) .detail("AuditKey", auditKey(type, id)); break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilReadAuditStateError", id) .errorUnsuppressed(e) @@ -493,12 +518,15 @@ ACTOR Future persistAuditStateByRange(Database cx, AuditStorageState audit if (ddAuditState.getPhase() != AuditPhase::Running) { throw audit_storage_failed(); } + ASSERT(ddAuditState.ddId.isValid()); + if (ddAuditState.ddId != auditState.ddId) { + throw audit_storage_failed(); // a new dd starts and this audit task is outdated + } wait(krmSetRange(&tr, auditRangeBasedProgressPrefixFor(auditState.getType(), auditState.id), auditState.range, auditStorageStateValue(auditState))); break; - } catch (Error& e) { wait(tr.onError(e)); } @@ -525,7 +553,6 @@ ACTOR Future> getAuditStateByRange(Database cx, CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)); auditStates = res_; break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId); wait(tr.onError(e)); @@ -567,13 +594,16 @@ ACTOR Future persistAuditStateByServer(Database cx, AuditStorageState audi if (ddAuditState.getPhase() != AuditPhase::Running) { throw audit_storage_failed(); } + ASSERT(ddAuditState.ddId.isValid()); + if (ddAuditState.ddId != auditState.ddId) { + throw audit_storage_failed(); // a new dd starts and this audit task is outdated + } wait(krmSetRange( &tr, auditServerBasedProgressPrefixFor(auditState.getType(), auditState.id, auditState.auditServerId), auditState.range, auditStorageStateValue(auditState))); break; - } catch (Error& e) { wait(tr.onError(e)); } @@ -601,7 +631,6 @@ ACTOR Future> getAuditStateByServer(Database cx, CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)); auditStates = res_; break; - } catch (Error& e) { TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId); wait(tr.onError(e)); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index bd753fdbc0..fa9566a27f 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2313,7 +2313,8 @@ void initializeClientTracing(Reference connRecord, Opt "trace", networkOptions.traceLogGroup, identifier, - networkOptions.tracePartialFileSuffix); + networkOptions.tracePartialFileSuffix, + InitializeTraceMetrics::True); TraceEvent("ClientStart") .detail("SourceVersion", getSourceVersion()) @@ -2328,7 +2329,6 @@ void initializeClientTracing(Reference connRecord, Opt g_network->initMetrics(); FlowTransport::transport().initMetrics(); - initTraceEventMetrics(); } // Initialize system monitoring once the local IP is available diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index f95bfe5372..039c5d7160 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -180,7 +180,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true; init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled - init( ENABLE_DD_PHYSICAL_SHARD_MOVE, false ); if( isSimulated ) ENABLE_DD_PHYSICAL_SHARD_MOVE = deterministicRandom()->coinflip(); init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.5 ); init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD @@ -755,6 +754,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3; init( SPRING_BYTES_STORAGE_SERVER, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER = 300e3; init( AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES, 800e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES = 2500e3; + init( AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER, 200e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER = 500e3; init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 750e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3; init( SPRING_BYTES_STORAGE_SERVER_BATCH, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3; init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3; @@ -833,13 +833,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( GLOBAL_TAG_THROTTLING, true ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip(); init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING ); init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 ); - // 60 seconds was chosen as a default value to ensure that + // 10 seconds was chosen as a default value to ensure that // the global tag throttler does not react too drastically to // changes in workload. To make the global tag throttler more reactive, // lower this knob. To make global tag throttler more smooth, raise this knob. // Setting this knob lower than TAG_MEASUREMENT_INTERVAL can cause erratic // behaviour and is not recommended. - init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 60.0 ); + init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 ); init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 ); init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 ); init( GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL, 60.0 ); @@ -878,7 +878,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SERVE_AUDIT_STORAGE_PARALLELISM, 1 ); init( PERSIST_FINISH_AUDIT_COUNT, 10 ); if ( isSimulated ) PERSIST_FINISH_AUDIT_COUNT = 1; init( AUDIT_RETRY_COUNT_MAX, 100 ); if ( isSimulated ) AUDIT_RETRY_COUNT_MAX = 10; - init( SS_AUDIT_AUTO_PROCEED_COUNT_MAX, 5 ); init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 50 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1); init( BUGGIFY_BLOCK_BYTES, 10000 ); init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS ); diff --git a/fdbclient/include/fdbclient/Audit.h b/fdbclient/include/fdbclient/Audit.h index 1d6c9a999e..ce9ae90a6b 100644 --- a/fdbclient/include/fdbclient/Audit.h +++ b/fdbclient/include/fdbclient/Audit.h @@ -45,24 +45,24 @@ enum class AuditType : uint8_t { struct AuditStorageState { constexpr static FileIdentifier file_identifier = 13804340; - AuditStorageState() : type(0), auditServerId(UID()), phase(0) {} + AuditStorageState() : type(0), auditServerId(UID()), phase(0), ddId(UID()) {} AuditStorageState(UID id, UID auditServerId, AuditType type) - : id(id), auditServerId(auditServerId), type(static_cast(type)), phase(0) {} + : id(id), auditServerId(auditServerId), type(static_cast(type)), phase(0), ddId(UID()) {} AuditStorageState(UID id, KeyRange range, AuditType type) - : id(id), auditServerId(UID()), range(range), type(static_cast(type)), phase(0) {} + : id(id), auditServerId(UID()), range(range), type(static_cast(type)), phase(0), ddId(UID()) {} AuditStorageState(UID id, AuditType type) - : id(id), auditServerId(UID()), type(static_cast(type)), phase(0) {} + : id(id), auditServerId(UID()), type(static_cast(type)), phase(0), ddId(UID()) {} template void serialize(Ar& ar) { - serializer(ar, id, auditServerId, range, type, phase, error); + serializer(ar, id, auditServerId, range, type, phase, error, ddId); } - void setType(AuditType type) { this->type = static_cast(type); } - AuditType getType() const { return static_cast(this->type); } + inline void setType(AuditType type) { this->type = static_cast(type); } + inline AuditType getType() const { return static_cast(this->type); } - void setPhase(AuditPhase phase) { this->phase = static_cast(phase); } - AuditPhase getPhase() const { return static_cast(this->phase); } + inline void setPhase(AuditPhase phase) { this->phase = static_cast(phase); } + inline AuditPhase getPhase() const { return static_cast(this->phase); } // for fdbcli get_audit_status std::string toStringForCLI() const { @@ -90,7 +90,14 @@ struct AuditStorageState { } UID id; - UID auditServerId; + UID ddId; // ddId indicates this audit is managed by which dd + // ddId is used to check if dd has changed + // When a new dd starts in the middle of an onging audit, + // The ongoing audit's ddId gets updated + // When SS updates the progress, it checks ddId + // If the ddId is updated, SS Audit actors of the old dd will stop themselves + // New dd will issue new requests to SSes to continue the remaining work + UID auditServerId; // UID of SS who is working on this audit task KeyRange range; uint8_t type; uint8_t phase; @@ -105,15 +112,16 @@ struct AuditStorageRequest { AuditStorageRequest(UID id, KeyRange range, AuditType type) : id(id), range(range), type(static_cast(type)) {} - void setType(AuditType type) { this->type = static_cast(this->type); } - AuditType getType() const { return static_cast(this->type); } + inline void setType(AuditType type) { this->type = static_cast(this->type); } + inline AuditType getType() const { return static_cast(this->type); } template void serialize(Ar& ar) { - serializer(ar, id, range, type, targetServers, reply); + serializer(ar, id, range, type, targetServers, reply, ddId); } UID id; + UID ddId; // UID of DD who claims the audit KeyRange range; uint8_t type; std::vector targetServers; diff --git a/fdbclient/include/fdbclient/AuditUtils.actor.h b/fdbclient/include/fdbclient/AuditUtils.actor.h index 4b63e7389a..f93f60a26b 100644 --- a/fdbclient/include/fdbclient/AuditUtils.actor.h +++ b/fdbclient/include/fdbclient/AuditUtils.actor.h @@ -66,5 +66,6 @@ ACTOR Future clearAuditMetadataForType(Database cx, UID maxAuditIdToClear, int numFinishAuditToKeep); ACTOR Future checkStorageServerRemoved(Database cx, UID ssid); +ACTOR Future updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled); #include "flow/unactorcompiler.h" #endif diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 0c7909eaec..5eb4b04c5d 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -204,7 +204,6 @@ public: bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID. bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true. - bool ENABLE_DD_PHYSICAL_SHARD_MOVE; // Enable physical shard move. double DD_PHYSICAL_SHARD_MOVE_PROBABILITY; // Percentage of physical shard move, in the range of [0, 1]. int64_t MAX_PHYSICAL_SHARD_BYTES; double PHYSICAL_SHARD_METRICS_DELAY; @@ -711,6 +710,7 @@ public: int64_t TARGET_BYTES_PER_STORAGE_SERVER; int64_t SPRING_BYTES_STORAGE_SERVER; int64_t AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES; + int64_t AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER; int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH; int64_t SPRING_BYTES_STORAGE_SERVER_BATCH; int64_t STORAGE_HARD_LIMIT_BYTES; @@ -865,7 +865,6 @@ public: int SERVE_AUDIT_STORAGE_PARALLELISM; int PERSIST_FINISH_AUDIT_COUNT; // Num of persist complete/failed audits for each type int AUDIT_RETRY_COUNT_MAX; - int SS_AUDIT_AUTO_PROCEED_COUNT_MAX; int CONCURRENT_AUDIT_TASK_COUNT_MAX; int BUGGIFY_BLOCK_BYTES; int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT; diff --git a/fdbserver/DDRelocationQueue.actor.cpp b/fdbserver/DDRelocationQueue.actor.cpp index 9c0f957742..819a67c408 100644 --- a/fdbserver/DDRelocationQueue.actor.cpp +++ b/fdbserver/DDRelocationQueue.actor.cpp @@ -1047,7 +1047,7 @@ void DDQueue::launchQueuedWork(std::set rrs.dataMoveId = UID(); } else { const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; + deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, EnablePhysicalShardMove(enabled)); @@ -1635,7 +1635,7 @@ ACTOR Future dataDistributionRelocator(DDQueue* self, self->moveCreateNewPhysicalShard++; } const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; + deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; rd.dataMoveId = newDataMoveId( physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled)); TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard") diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index a433d8d371..9c70181620 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -96,8 +96,8 @@ struct DDAudit { int64_t overallIssuedDoAuditCount; int64_t overallCompleteDoAuditCount; - void setAuditRunActor(Future actor) { auditActor = actor; } - Future getAuditRunActor() { return auditActor; } + inline void setAuditRunActor(Future actor) { auditActor = actor; } + inline Future getAuditRunActor() { return auditActor; } // auditActor and actors are guaranteed to deliver a cancel signal void cancel() { @@ -771,15 +771,49 @@ void cancelAllAuditsInAuditMap(Reference self) { return; } -void resumeStorageAudits(Reference self) { +ACTOR Future resumeStorageAudits(Reference self) { ASSERT(!self->auditInitialized.getFuture().isReady()); if (self->initData->auditStates.empty()) { self->auditInitialized.send(Void()); TraceEvent(SevVerbose, "AuditStorageResumeEmptyDone", self->ddId); - return; + return Void(); } - cancelAllAuditsInAuditMap(self); // cancel existing audits + // Update metadata + state int retryCount = 0; + loop { + try { + std::vector> fs; + state MoveKeyLockInfo lockInfo; + lockInfo.myOwner = self->lock.myOwner; + lockInfo.prevOwner = self->lock.prevOwner; + lockInfo.prevWrite = self->lock.prevWrite; + for (const auto& auditState : self->initData->auditStates) { + // Only running audit will be resumed + if (auditState.getPhase() == AuditPhase::Running) { + AuditStorageState toUpdate = auditState; + toUpdate.ddId = self->ddId; + fs.push_back(updateAuditState( + self->txnProcessor->context(), toUpdate, lockInfo, self->context->isDDEnabled())); + } + } + wait(waitForAll(fs)); + break; + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) { + throw e; + } + if (retryCount > 50) { + TraceEvent(SevWarnAlways, "ResumeAuditStorageUnableUpdateMetadata", self->ddId).errorUnsuppressed(e); + return Void(); + } + retryCount++; + } + } + + // Following is atomic + // Cancel existing audits and restore + cancelAllAuditsInAuditMap(self); std::unordered_map> restoredAudits; for (const auto& auditState : self->initData->auditStates) { restoredAudits[auditState.getType()].push_back(auditState); @@ -843,7 +877,7 @@ void resumeStorageAudits(Reference self) { self->auditInitialized.send(Void()); TraceEvent(SevDebug, "AuditStorageResumeDone", self->ddId); - return; + return Void(); } // Periodically check and log the physicalShard status; clean up empty physicalShard; @@ -1001,7 +1035,7 @@ ACTOR Future dataDistribution(Reference self, anyZeroHealthyTeams = zeroHealthyTeams[0]; } - resumeStorageAudits(self); + actors.push_back(resumeStorageAudits(self)); actors.push_back(self->pollMoveKeysLock()); @@ -1753,6 +1787,7 @@ ACTOR Future auditStorageCore(Reference self, try { ASSERT(audit != nullptr); + ASSERT(audit->coreState.ddId == self->ddId); loadAndDispatchAudit(self, audit, audit->coreState.range); TraceEvent(SevInfo, "DDAuditStorageCoreScheduled", self->ddId) .detail("Context", context) @@ -1931,6 +1966,7 @@ void runAuditStorage(Reference self, ASSERT(auditState.id.isValid()); ASSERT(!auditState.range.empty()); ASSERT(auditState.getPhase() == AuditPhase::Running); + auditState.ddId = self->ddId; // make sure any existing audit state claims the current DD std::shared_ptr audit = std::make_shared(auditState); audit->retryCount = retryCount; TraceEvent(SevDebug, "DDRunAuditStorage", self->ddId) @@ -1997,6 +2033,7 @@ ACTOR Future launchAudit(Reference self, KeyRange auditRan auditState.setType(auditType); auditState.range = auditRange; auditState.setPhase(AuditPhase::Running); + auditState.ddId = self->ddId; // persist ddId to new ddAudit metadata TraceEvent(SevVerbose, "DDAuditStorageLaunchPersistNewAuditIDBefore", self->ddId) .detail("AuditType", auditType) .detail("Range", auditRange); @@ -2227,6 +2264,7 @@ ACTOR Future scheduleAuditStorageShardOnServer(Reference // We always issue exactly one audit task (for the remaining part) when schedule ASSERT(issueDoAuditCount == 0); issueDoAuditCount++; + req.ddId = self->ddId; // send this ddid to SS audit->actors.add(doAuditOnStorageServer(self, audit, ssi, req)); } } @@ -2447,6 +2485,7 @@ ACTOR Future scheduleAuditOnRange(Reference self, SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX - 1); } issueDoAuditCount++; + req.ddId = self->ddId; // send this ddid to SS audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req)); } } diff --git a/fdbserver/GlobalTagThrottler.actor.cpp b/fdbserver/GlobalTagThrottler.actor.cpp index 833665b78f..6e23cf6c0e 100644 --- a/fdbserver/GlobalTagThrottler.actor.cpp +++ b/fdbserver/GlobalTagThrottler.actor.cpp @@ -214,7 +214,7 @@ class GlobalTagThrottlerImpl { result += getCurrentCost(id, tag).orDefault(0); } // FIXME: Disabled due to noisy trace events. Fix the noise and reenabled - //TraceEvent("GlobalTagThrottler_GetCurrentCost").detail("Tag", printable(tag)).detail("Cost", result); + //TraceEvent("GlobalTagThrottler_GetCurrentCost").detail("Tag", tag).detail("Cost", result); return result; } @@ -421,7 +421,7 @@ class GlobalTagThrottlerImpl { isBusy = limitingTps.present() && limitingTps.get() < desiredTps; - te.detail("Tag", printable(tag)) + te.detail("Tag", tag) .detail("TargetTps", targetTps) .detail("AverageTransactionCost", averageTransactionCost) .detail("LimitingTps", limitingTps) @@ -447,7 +447,7 @@ public: "been reached"); TraceEvent("GlobalTagThrottler_IgnoringRequests") .suppressFor(60.0) - .detail("Tag", printable(tag)) + .detail("Tag", tag) .detail("Count", count); } else { tagStatistics[tag].addTransactions(static_cast(count)); @@ -534,7 +534,7 @@ public: Future tryUpdateAutoThrottling(StorageQueueInfo const& ss) { auto& ssInfo = ssInfos[ss.id]; ssInfo.throttlingRatio = ss.getTagThrottlingRatio(SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES, - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER); + SERVER_KNOBS->AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER); ssInfo.zoneId = ss.locality.zoneId(); auto& tagToThroughputCounters = throughput[ss.id]; diff --git a/fdbserver/GrvProxyTagThrottler.actor.cpp b/fdbserver/GrvProxyTagThrottler.actor.cpp index 3b375daed1..485dad38b5 100644 --- a/fdbserver/GrvProxyTagThrottler.actor.cpp +++ b/fdbserver/GrvProxyTagThrottler.actor.cpp @@ -112,7 +112,7 @@ void GrvProxyTagThrottler::addRequest(GetReadVersionRequest const& req) { TraceEvent(SevWarnAlways, "GrvProxyTagThrottler_MultipleTags") .suppressFor(60.0) .detail("NumTags", req.tags.size()) - .detail("UsingTag", printable(tag)); + .detail("UsingTag", tag); } queues[tag].requests.emplace_back(req); } diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 251a13e026..ed8340f5d0 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1953,9 +1953,7 @@ ACTOR static Future finishMoveShards(Database occ, wait(waitForAll(actors)); if (range.end == dataMove.ranges.front().end) { - if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) { - wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); - } + wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); tr.clear(dataMoveKeyFor(dataMoveId)); complete = true; TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", dataMoveId) @@ -2707,9 +2705,7 @@ ACTOR Future cleanUpDataMoveCore(Database occ, } if (range.end == dataMove.ranges.front().end) { - if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) { - wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); - } + wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); tr.clear(dataMoveKeyFor(dataMoveId)); complete = true; TraceEvent(sevDm, "CleanUpDataMoveDeleteMetaData", dataMoveId) diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index e1b91d8457..ff20e2b869 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -1391,11 +1391,13 @@ UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) { Optional StorageQueueInfo::getTagThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const { auto const storageQueue = getStorageQueueBytes(); - if (storageQueue < storageTargetBytes - storageSpringBytes) { - return {}; + // TODO: Remove duplicate calculation from Ratekeeper::updateRate + double inverseResult = std::min( + 2.0, (storageQueue - storageTargetBytes + storageSpringBytes) / static_cast(storageSpringBytes)); + if (inverseResult > 0) { + return 1.0 / inverseResult; } else { - return std::max( - 0.0, static_cast((storageTargetBytes + storageSpringBytes) - storageQueue) / storageSpringBytes); + return {}; } } diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index e0a1034102..c0e34fff84 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -2044,8 +2044,16 @@ int main(int argc, char* argv[]) { } } - openTraceFile( - opts.publicAddresses.address, opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup); + openTraceFile(opts.publicAddresses.address, + opts.rollsize, + opts.maxLogsSize, + opts.logFolder, + "trace", + opts.logGroup, + /* identifier = */ "", + /* tracePartialFileSuffix = */ "", + InitializeTraceMetrics::True); + g_network->initTLS(); if (!opts.authzPublicKeyFile.empty()) { try { @@ -2089,7 +2097,6 @@ int main(int argc, char* argv[]) { opts.fileSystemPath); g_network->initMetrics(); FlowTransport::transport().initMetrics(); - initTraceEventMetrics(); } double start = timer(), startNow = now(); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 961daf2edd..417ec5e6fa 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4903,7 +4903,8 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector>& vec, ACTOR Future validateRangeAgainstServer(StorageServer* data, AuditStorageState auditState, Version version, - StorageServerInterface remoteServer) { + StorageServerInterface remoteServer, + UID ddId) { TraceEvent(SevInfo, "ValidateRangeAgainstServerBegin", data->thisServerID) .detail("AuditID", auditState.id) .detail("Range", auditState.range) @@ -5033,6 +5034,8 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, range = KeyRangeRef(keyAfter(lastKey), range.end); auditState.range = KeyRangeRef(originBegin, range.begin); auditState.setPhase(AuditPhase::Complete); + ASSERT(ddId.isValid()); + auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, auditState)); } } catch (Error& e) { @@ -5052,6 +5055,8 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, .detail("ErrorMessage", error) .detail("RemoteServer", remoteServer.toString()); auditState.setPhase(AuditPhase::Error); + ASSERT(ddId.isValid()); + auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, auditState)); throw audit_storage_error(); } @@ -5065,7 +5070,10 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, return Void(); } -ACTOR Future validateRangeShard(StorageServer* data, AuditStorageState auditState, std::vector candidates) { +ACTOR Future validateRangeShard(StorageServer* data, + AuditStorageState auditState, + std::vector candidates, + UID ddId) { TraceEvent(SevDebug, "ServeValidateRangeShardBegin", data->thisServerID) .detail("Range", auditState.range) .detail("Servers", describe(candidates)); @@ -5126,7 +5134,7 @@ ACTOR Future validateRangeShard(StorageServer* data, AuditStorageState aud } if (remoteServer != nullptr) { - wait(validateRangeAgainstServer(data, auditState, version, *remoteServer)); + wait(validateRangeAgainstServer(data, auditState, version, *remoteServer, ddId)); } else { TraceEvent(SevWarn, "ServeValidateRangeShardRemoteNotFound", data->thisServerID) .detail("Range", auditState.range) @@ -5139,7 +5147,8 @@ ACTOR Future validateRangeShard(StorageServer* data, AuditStorageState aud ACTOR Future validateRangeAgainstServers(StorageServer* data, AuditStorageState auditState, - std::vector targetServers) { + std::vector targetServers, + UID ddId) { TraceEvent(SevDebug, "ValidateRangeAgainstServersBegin", data->thisServerID) .detail("AuditID", auditState.id) .detail("Range", auditState.range) @@ -5176,7 +5185,7 @@ ACTOR Future validateRangeAgainstServers(StorageServer* data, .detail("Range", auditState.range); throw audit_storage_failed(); } - fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get()))); + fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get()), ddId)); } wait(waitForAll(fs)); @@ -5506,9 +5515,6 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto state Key rangeToReadBegin = req.range.begin; state KeyRangeRef rangeToRead; state int retryCount = 0; - state int storageAutoProceedCount = 0; - // storageAutoProceedCount is guard to make sure that audit at SS does not run too long - // by itself without being notified by DD state int64_t cumulatedValidatedLocalShardsNum = 0; state int64_t cumulatedValidatedServerKeysNum = 0; @@ -5655,6 +5661,8 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto .detail("AuditServer", data->thisServerID); res.range = claimRange; res.setPhase(AuditPhase::Error); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByServer(data->cx, res)); req.reply.sendError(audit_storage_error()); break; @@ -5662,6 +5670,8 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto // Expand persisted complete range res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end)); res.setPhase(AuditPhase::Complete); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByServer(data->cx, res)); TraceEvent(SevInfo, "AuditStorageSsShardDone", data->thisServerID) .detail("AuditId", req.id) @@ -5669,11 +5679,7 @@ ACTOR Future auditStorageStorageServerShardQ(StorageServer* data, AuditSto .detail("AuditServer", data->thisServerID) .detail("CompleteRange", res.range); if (claimRange.end < rangeToRead.end) { - if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) { - throw audit_storage_failed(); - } rangeToReadBegin = claimRange.end; - storageAutoProceedCount++; } else { // complete req.reply.send(res); break; @@ -5730,9 +5736,6 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora state Transaction tr(data->cx); state Key rangeToReadBegin = req.range.begin; state KeyRangeRef rangeToRead; - state int storageAutoProceedCount = 0; - // storageAutoProceedCount is guard to make sure that audit at SS does not run too long - // by itself without being notified by DD state int64_t cumulatedValidatedServerKeysNum = 0; state int64_t cumulatedValidatedKeyServersNum = 0; @@ -5891,6 +5894,8 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora .detail("ClaimRange", claimRange); res.range = claimRange; res.setPhase(AuditPhase::Error); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, res)); req.reply.sendError(audit_storage_error()); break; @@ -5898,6 +5903,8 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora // Expand persisted complete range res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end)); res.setPhase(AuditPhase::Complete); + ASSERT(req.ddId.isValid()); + res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId wait(persistAuditStateByRange(data->cx, res)); TraceEvent(SevInfo, "AuditStorageShardLocMetadataDone", data->thisServerID) .detail("AuditId", req.id) @@ -5906,11 +5913,7 @@ ACTOR Future auditStorageLocationMetadataQ(StorageServer* data, AuditStora .detail("AuditServerId", data->thisServerID) .detail("CompleteRange", res.range); if (claimRange.end < rangeToRead.end) { - if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) { - throw audit_storage_failed(); - } rangeToReadBegin = claimRange.end; - storageAutoProceedCount++; } else { // complete req.reply.send(res); break; @@ -5972,7 +5975,8 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { fs.push_back(validateRangeShard( data, AuditStorageState(res.id, KeyRangeRef(shards[i].key, shards[i + 1].key), res.getType()), - src)); + src, + req.ddId)); begin = shards[i + 1].key; } } catch (Error& e) { @@ -5980,7 +5984,7 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { } } } else { - fs.push_back(validateRangeAgainstServers(data, res, req.targetServers)); + fs.push_back(validateRangeAgainstServers(data, res, req.targetServers, req.ddId)); } wait(waitForAll(fs)); @@ -9439,6 +9443,8 @@ ACTOR Future cleanUpMoveInShard(StorageServer* data, Version version, Move } wait(data->durableVersion.whenAtLeast(mLV.version + 1)); + data->moveInShards.erase(moveInShard->id()); + return Void(); } @@ -13501,7 +13507,7 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->busiestWriteTagContext.lastUpdateTime = req.postTime; TraceEvent("BusiestWriteTag", self->thisServerID) .detail("Elapsed", req.elapsed) - .detail("Tag", printable(req.busiestTag)) + .detail("Tag", req.busiestTag) .detail("TagOps", req.opsSum) .detail("TagCost", req.costSum) .detail("TotalCost", req.totalWriteCosts) @@ -13890,6 +13896,8 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, self.ssLock->halt(); + self.moveInShards.clear(); + state Error err = e; if (storageServerTerminated(self, persistentData, err)) { ssCore.cancel(); diff --git a/fdbserver/workloads/PhysicalShardMove.actor.cpp b/fdbserver/workloads/PhysicalShardMove.actor.cpp index 287d542639..05f82a7237 100644 --- a/fdbserver/workloads/PhysicalShardMove.actor.cpp +++ b/fdbserver/workloads/PhysicalShardMove.actor.cpp @@ -101,7 +101,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload { newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, EnablePhysicalShardMove::True), - // EnablePhysicalShardMove::False), KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr), teamSize, includes, diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 1c681f9a40..1939d740c9 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -170,6 +170,7 @@ public: bool logTraceEventMetrics; void initMetrics() { + ASSERT(!isOpen()); SevErrorNames.init("TraceEvents.SevError"_sr); SevWarnAlwaysNames.init("TraceEvents.SevWarnAlways"_sr); SevWarnNames.init("TraceEvents.SevWarn"_sr); @@ -430,9 +431,8 @@ public: } } - void log(int severity, const char* name, UID id, uint64_t event_ts) { - if (!logTraceEventMetrics) - return; + void logMetrics(int severity, const char* name, UID id, uint64_t event_ts) { + ASSERT(TraceEvent::isNetworkThread() && logTraceEventMetrics); EventMetricHandle* m = nullptr; switch (severity) { @@ -781,7 +781,8 @@ void openTraceFile(const Optional& na, std::string baseOfBase, std::string logGroup, std::string identifier, - std::string tracePartialFileSuffix) { + std::string tracePartialFileSuffix, + InitializeTraceMetrics initializeTraceMetrics) { if (g_traceLog.isOpen()) return; @@ -808,6 +809,10 @@ void openTraceFile(const Optional& na, baseName = format("%s.0.0.0.0.%d", baseOfBase.c_str(), ::getpid()); } + if (initializeTraceMetrics) { + g_traceLog.initMetrics(); + } + g_traceLog.open(directory, baseName, logGroup, @@ -821,10 +826,6 @@ void openTraceFile(const Optional& na, g_traceBatch.dump(); } -void initTraceEventMetrics() { - g_traceLog.initMetrics(); -} - void closeTraceFile() { g_traceLog.close(); } @@ -1340,17 +1341,17 @@ void BaseTraceEvent::log() { if (g_traceLog.isOpen()) { // Log Metrics - if (g_traceLog.logTraceEventMetrics && isNetworkThread()) { + if (isNetworkThread() && g_traceLog.logTraceEventMetrics) { // Get the persistent Event Metric representing this trace event and push the fields (details) - // accumulated in *this to it and then log() it. Note that if the event metric is disabled it - // won't actually be logged BUT any new fields added to it will be registered. If the event IS - // logged, a timestamp will be returned, if not then 0. Either way, pass it through to be used - // if possible in the Sev* event metrics. + // accumulated in *this to it and then logMetrics() it. Note that if the event metric is + // disabled it won't actually be logged BUT any new fields added to it will be registered. If + // the event IS logged, a timestamp will be returned, if not then 0. Either way, pass it + // through to be used if possible in the Sev* event metrics. uint64_t event_ts = DynamicEventMetric::getOrCreateInstance(format("TraceEvent.%s", type), StringRef(), true) ->setFieldsAndLogFrom(tmpEventMetric.get()); - g_traceLog.log(severity, type, id, event_ts); + g_traceLog.logMetrics(severity, type, id, event_ts); } } } diff --git a/flow/include/flow/BooleanParam.h b/flow/include/flow/BooleanParam.h index b1c7adb91d..7413c7b58a 100644 --- a/flow/include/flow/BooleanParam.h +++ b/flow/include/flow/BooleanParam.h @@ -20,8 +20,6 @@ #pragma once -#include "flow/Trace.h" - class BooleanParam { bool value; @@ -31,11 +29,6 @@ public: constexpr void set(bool value) { this->value = value; } }; -template -struct Traceable>> : std::true_type { - static std::string toString(BooleanParamSub const& value) { return Traceable::toString(value); } -}; - // Declares a boolean parametr with the desired name. This declaration can be nested inside of a namespace or another // class. This macro should not be used directly unless this boolean parameter is going to be defined as a nested class. #define FDB_DECLARE_BOOLEAN_PARAM(ParamName) \ diff --git a/flow/include/flow/CodeProbe.h b/flow/include/flow/CodeProbe.h index bab19ca3cd..bc525465da 100644 --- a/flow/include/flow/CodeProbe.h +++ b/flow/include/flow/CodeProbe.h @@ -281,7 +281,7 @@ struct CodeProbeImpl : ICodeProbe { private: CodeProbeImpl() { registerProbe(*this); } inline static CodeProbeImpl _instance; - unsigned _hitCount = 0; + std::atomic _hitCount = 0; Annotations annotations; }; diff --git a/flow/include/flow/Trace.h b/flow/include/flow/Trace.h index 324c52f642..8d43317d23 100644 --- a/flow/include/flow/Trace.h +++ b/flow/include/flow/Trace.h @@ -31,6 +31,7 @@ #include #include #include +#include "flow/BooleanParam.h" #include "flow/IRandom.h" #include "flow/Error.h" #include "flow/ITrace.h" @@ -39,6 +40,8 @@ #define TRACE_DEFAULT_ROLL_SIZE (10 << 20) #define TRACE_DEFAULT_MAX_LOGS_SIZE (10 * TRACE_DEFAULT_ROLL_SIZE) +FDB_BOOLEAN_PARAM(InitializeTraceMetrics); + inline int fastrand() { static int g_seed = 0; g_seed = 214013 * g_seed + 2531011; @@ -539,8 +542,8 @@ void openTraceFile(const Optional& na, std::string baseOfBase = "trace", std::string logGroup = "default", std::string identifier = "", - std::string tracePartialFileSuffix = ""); -void initTraceEventMetrics(); + std::string tracePartialFileSuffix = "", + InitializeTraceMetrics initializeTraceMetrics = InitializeTraceMetrics::False); void closeTraceFile(); bool traceFileIsOpen(); void flushTraceFileVoid(); diff --git a/flow/include/flow/Traceable.h b/flow/include/flow/Traceable.h index 96d571ad38..431330d46d 100644 --- a/flow/include/flow/Traceable.h +++ b/flow/include/flow/Traceable.h @@ -29,6 +29,8 @@ #include #include +#include "flow/BooleanParam.h" + #define PRINTABLE_COMPRESS_NULLS 0 template @@ -245,6 +247,11 @@ struct Traceable> : std::true_type { static std::string toString(const std::atomic& value) { return Traceable::toString(value.load()); } }; +template +struct Traceable>> : std::true_type { + static std::string toString(BooleanParamSub const& value) { return Traceable::toString(value); } +}; + // Adapter to redirect fmt::formatter calls to Traceable for a supported type template struct FormatUsingTraceable : fmt::formatter {