Merge remote-tracking branch 'origin/main' into main-tag-counter-optimizations
This commit is contained in:
commit
a1b0a6b35e
|
@ -251,7 +251,6 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
|
|||
TraceEvent(SevDebug, "ConflictWithPreviousOwner");
|
||||
throw movekeys_conflict(); // need a new name
|
||||
}
|
||||
|
||||
// Take the lock
|
||||
if (isWrite) {
|
||||
BinaryWriter wrMyOwner(Unversioned());
|
||||
|
@ -267,7 +266,6 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
|
|||
.detail("MyOwner", lock.myOwner.toString())
|
||||
.detail("Writer", lastWriter.toString());
|
||||
}
|
||||
|
||||
return Void();
|
||||
} else if (currentOwner == lock.myOwner) {
|
||||
if (isWrite) {
|
||||
|
@ -278,7 +276,6 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
|
|||
// Make this transaction self-conflicting so the database will not execute it twice with the same write key
|
||||
tr->makeSelfConflicting();
|
||||
}
|
||||
|
||||
return Void();
|
||||
} else {
|
||||
CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner");
|
||||
|
@ -287,6 +284,37 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled) {
|
||||
state Transaction tr(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
wait(checkMoveKeysLock(&tr, lock, ddEnabled, true));
|
||||
// Persist audit result
|
||||
tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState));
|
||||
wait(tr.commit());
|
||||
TraceEvent(SevDebug, "AuditUtilUpdateAuditState", auditState.id)
|
||||
.detail("AuditID", auditState.id)
|
||||
.detail("AuditType", auditState.getType())
|
||||
.detail("AuditPhase", auditState.getPhase())
|
||||
.detail("AuditKey", auditKey(auditState.getType(), auditState.id));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "AuditUtilUpdateAuditStateError", auditState.id)
|
||||
.errorUnsuppressed(e)
|
||||
.detail("AuditID", auditState.id)
|
||||
.detail("AuditType", auditState.getType())
|
||||
.detail("AuditPhase", auditState.getPhase())
|
||||
.detail("AuditKey", auditKey(auditState.getType(), auditState.id));
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<UID> persistNewAuditState(Database cx,
|
||||
AuditStorageState auditState,
|
||||
MoveKeyLockInfo lock,
|
||||
|
@ -333,7 +361,6 @@ ACTOR Future<UID> persistNewAuditState(Database cx,
|
|||
TraceEvent(SevDebug, "AuditUtilPersistedNewAuditState", auditId)
|
||||
.detail("AuditKey", auditKey(auditState.getType(), auditId));
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "AuditUtilPersistedNewAuditStateError", auditId)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -384,7 +411,6 @@ ACTOR Future<Void> persistAuditState(Database cx,
|
|||
.detail("AuditKey", auditKey(auditState.getType(), auditState.id))
|
||||
.detail("Context", context);
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "AuditUtilPersistAuditStateError", auditState.id)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -415,7 +441,6 @@ ACTOR Future<AuditStorageState> getAuditState(Database cx, AuditType type, UID i
|
|||
.detail("AuditType", type)
|
||||
.detail("AuditKey", auditKey(type, id));
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "AuditUtilReadAuditStateError", id)
|
||||
.errorUnsuppressed(e)
|
||||
|
@ -493,12 +518,15 @@ ACTOR Future<Void> persistAuditStateByRange(Database cx, AuditStorageState audit
|
|||
if (ddAuditState.getPhase() != AuditPhase::Running) {
|
||||
throw audit_storage_failed();
|
||||
}
|
||||
ASSERT(ddAuditState.ddId.isValid());
|
||||
if (ddAuditState.ddId != auditState.ddId) {
|
||||
throw audit_storage_failed(); // a new dd starts and this audit task is outdated
|
||||
}
|
||||
wait(krmSetRange(&tr,
|
||||
auditRangeBasedProgressPrefixFor(auditState.getType(), auditState.id),
|
||||
auditState.range,
|
||||
auditStorageStateValue(auditState)));
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
@ -525,7 +553,6 @@ ACTOR Future<std::vector<AuditStorageState>> getAuditStateByRange(Database cx,
|
|||
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES));
|
||||
auditStates = res_;
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId);
|
||||
wait(tr.onError(e));
|
||||
|
@ -567,13 +594,16 @@ ACTOR Future<Void> persistAuditStateByServer(Database cx, AuditStorageState audi
|
|||
if (ddAuditState.getPhase() != AuditPhase::Running) {
|
||||
throw audit_storage_failed();
|
||||
}
|
||||
ASSERT(ddAuditState.ddId.isValid());
|
||||
if (ddAuditState.ddId != auditState.ddId) {
|
||||
throw audit_storage_failed(); // a new dd starts and this audit task is outdated
|
||||
}
|
||||
wait(krmSetRange(
|
||||
&tr,
|
||||
auditServerBasedProgressPrefixFor(auditState.getType(), auditState.id, auditState.auditServerId),
|
||||
auditState.range,
|
||||
auditStorageStateValue(auditState)));
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
@ -601,7 +631,6 @@ ACTOR Future<std::vector<AuditStorageState>> getAuditStateByServer(Database cx,
|
|||
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES));
|
||||
auditStates = res_;
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId);
|
||||
wait(tr.onError(e));
|
||||
|
|
|
@ -2313,7 +2313,8 @@ void initializeClientTracing(Reference<IClusterConnectionRecord> connRecord, Opt
|
|||
"trace",
|
||||
networkOptions.traceLogGroup,
|
||||
identifier,
|
||||
networkOptions.tracePartialFileSuffix);
|
||||
networkOptions.tracePartialFileSuffix,
|
||||
InitializeTraceMetrics::True);
|
||||
|
||||
TraceEvent("ClientStart")
|
||||
.detail("SourceVersion", getSourceVersion())
|
||||
|
@ -2328,7 +2329,6 @@ void initializeClientTracing(Reference<IClusterConnectionRecord> connRecord, Opt
|
|||
|
||||
g_network->initMetrics();
|
||||
FlowTransport::transport().initMetrics();
|
||||
initTraceEventMetrics();
|
||||
}
|
||||
|
||||
// Initialize system monitoring once the local IP is available
|
||||
|
|
|
@ -180,7 +180,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
|
||||
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
|
||||
init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled
|
||||
init( ENABLE_DD_PHYSICAL_SHARD_MOVE, false ); if( isSimulated ) ENABLE_DD_PHYSICAL_SHARD_MOVE = deterministicRandom()->coinflip();
|
||||
init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.5 );
|
||||
init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server
|
||||
init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD
|
||||
|
@ -755,6 +754,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3;
|
||||
init( SPRING_BYTES_STORAGE_SERVER, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER = 300e3;
|
||||
init( AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES, 800e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES = 2500e3;
|
||||
init( AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER, 200e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER = 500e3;
|
||||
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 750e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
|
||||
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
|
||||
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
|
||||
|
@ -833,13 +833,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( GLOBAL_TAG_THROTTLING, true ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip();
|
||||
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
|
||||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||
// 60 seconds was chosen as a default value to ensure that
|
||||
// 10 seconds was chosen as a default value to ensure that
|
||||
// the global tag throttler does not react too drastically to
|
||||
// changes in workload. To make the global tag throttler more reactive,
|
||||
// lower this knob. To make global tag throttler more smooth, raise this knob.
|
||||
// Setting this knob lower than TAG_MEASUREMENT_INTERVAL can cause erratic
|
||||
// behaviour and is not recommended.
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 60.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
|
||||
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL, 60.0 );
|
||||
|
@ -878,7 +878,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( SERVE_AUDIT_STORAGE_PARALLELISM, 1 );
|
||||
init( PERSIST_FINISH_AUDIT_COUNT, 10 ); if ( isSimulated ) PERSIST_FINISH_AUDIT_COUNT = 1;
|
||||
init( AUDIT_RETRY_COUNT_MAX, 100 ); if ( isSimulated ) AUDIT_RETRY_COUNT_MAX = 10;
|
||||
init( SS_AUDIT_AUTO_PROCEED_COUNT_MAX, 5 );
|
||||
init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 50 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1);
|
||||
init( BUGGIFY_BLOCK_BYTES, 10000 );
|
||||
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
|
||||
|
|
|
@ -45,24 +45,24 @@ enum class AuditType : uint8_t {
|
|||
struct AuditStorageState {
|
||||
constexpr static FileIdentifier file_identifier = 13804340;
|
||||
|
||||
AuditStorageState() : type(0), auditServerId(UID()), phase(0) {}
|
||||
AuditStorageState() : type(0), auditServerId(UID()), phase(0), ddId(UID()) {}
|
||||
AuditStorageState(UID id, UID auditServerId, AuditType type)
|
||||
: id(id), auditServerId(auditServerId), type(static_cast<uint8_t>(type)), phase(0) {}
|
||||
: id(id), auditServerId(auditServerId), type(static_cast<uint8_t>(type)), phase(0), ddId(UID()) {}
|
||||
AuditStorageState(UID id, KeyRange range, AuditType type)
|
||||
: id(id), auditServerId(UID()), range(range), type(static_cast<uint8_t>(type)), phase(0) {}
|
||||
: id(id), auditServerId(UID()), range(range), type(static_cast<uint8_t>(type)), phase(0), ddId(UID()) {}
|
||||
AuditStorageState(UID id, AuditType type)
|
||||
: id(id), auditServerId(UID()), type(static_cast<uint8_t>(type)), phase(0) {}
|
||||
: id(id), auditServerId(UID()), type(static_cast<uint8_t>(type)), phase(0), ddId(UID()) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, id, auditServerId, range, type, phase, error);
|
||||
serializer(ar, id, auditServerId, range, type, phase, error, ddId);
|
||||
}
|
||||
|
||||
void setType(AuditType type) { this->type = static_cast<uint8_t>(type); }
|
||||
AuditType getType() const { return static_cast<AuditType>(this->type); }
|
||||
inline void setType(AuditType type) { this->type = static_cast<uint8_t>(type); }
|
||||
inline AuditType getType() const { return static_cast<AuditType>(this->type); }
|
||||
|
||||
void setPhase(AuditPhase phase) { this->phase = static_cast<uint8_t>(phase); }
|
||||
AuditPhase getPhase() const { return static_cast<AuditPhase>(this->phase); }
|
||||
inline void setPhase(AuditPhase phase) { this->phase = static_cast<uint8_t>(phase); }
|
||||
inline AuditPhase getPhase() const { return static_cast<AuditPhase>(this->phase); }
|
||||
|
||||
// for fdbcli get_audit_status
|
||||
std::string toStringForCLI() const {
|
||||
|
@ -90,7 +90,14 @@ struct AuditStorageState {
|
|||
}
|
||||
|
||||
UID id;
|
||||
UID auditServerId;
|
||||
UID ddId; // ddId indicates this audit is managed by which dd
|
||||
// ddId is used to check if dd has changed
|
||||
// When a new dd starts in the middle of an onging audit,
|
||||
// The ongoing audit's ddId gets updated
|
||||
// When SS updates the progress, it checks ddId
|
||||
// If the ddId is updated, SS Audit actors of the old dd will stop themselves
|
||||
// New dd will issue new requests to SSes to continue the remaining work
|
||||
UID auditServerId; // UID of SS who is working on this audit task
|
||||
KeyRange range;
|
||||
uint8_t type;
|
||||
uint8_t phase;
|
||||
|
@ -105,15 +112,16 @@ struct AuditStorageRequest {
|
|||
AuditStorageRequest(UID id, KeyRange range, AuditType type)
|
||||
: id(id), range(range), type(static_cast<uint8_t>(type)) {}
|
||||
|
||||
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
|
||||
AuditType getType() const { return static_cast<AuditType>(this->type); }
|
||||
inline void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
|
||||
inline AuditType getType() const { return static_cast<AuditType>(this->type); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, id, range, type, targetServers, reply);
|
||||
serializer(ar, id, range, type, targetServers, reply, ddId);
|
||||
}
|
||||
|
||||
UID id;
|
||||
UID ddId; // UID of DD who claims the audit
|
||||
KeyRange range;
|
||||
uint8_t type;
|
||||
std::vector<UID> targetServers;
|
||||
|
|
|
@ -66,5 +66,6 @@ ACTOR Future<Void> clearAuditMetadataForType(Database cx,
|
|||
UID maxAuditIdToClear,
|
||||
int numFinishAuditToKeep);
|
||||
ACTOR Future<bool> checkStorageServerRemoved(Database cx, UID ssid);
|
||||
ACTOR Future<Void> updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled);
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -204,7 +204,6 @@ public:
|
|||
|
||||
bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID.
|
||||
bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true.
|
||||
bool ENABLE_DD_PHYSICAL_SHARD_MOVE; // Enable physical shard move.
|
||||
double DD_PHYSICAL_SHARD_MOVE_PROBABILITY; // Percentage of physical shard move, in the range of [0, 1].
|
||||
int64_t MAX_PHYSICAL_SHARD_BYTES;
|
||||
double PHYSICAL_SHARD_METRICS_DELAY;
|
||||
|
@ -711,6 +710,7 @@ public:
|
|||
int64_t TARGET_BYTES_PER_STORAGE_SERVER;
|
||||
int64_t SPRING_BYTES_STORAGE_SERVER;
|
||||
int64_t AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES;
|
||||
int64_t AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER;
|
||||
int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH;
|
||||
int64_t SPRING_BYTES_STORAGE_SERVER_BATCH;
|
||||
int64_t STORAGE_HARD_LIMIT_BYTES;
|
||||
|
@ -865,7 +865,6 @@ public:
|
|||
int SERVE_AUDIT_STORAGE_PARALLELISM;
|
||||
int PERSIST_FINISH_AUDIT_COUNT; // Num of persist complete/failed audits for each type
|
||||
int AUDIT_RETRY_COUNT_MAX;
|
||||
int SS_AUDIT_AUTO_PROCEED_COUNT_MAX;
|
||||
int CONCURRENT_AUDIT_TASK_COUNT_MAX;
|
||||
int BUGGIFY_BLOCK_BYTES;
|
||||
int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT;
|
||||
|
|
|
@ -1047,7 +1047,7 @@ void DDQueue::launchQueuedWork(std::set<RelocateData, std::greater<RelocateData>
|
|||
rrs.dataMoveId = UID();
|
||||
} else {
|
||||
const bool enabled =
|
||||
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(),
|
||||
AssignEmptyRange::False,
|
||||
EnablePhysicalShardMove(enabled));
|
||||
|
@ -1635,7 +1635,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
self->moveCreateNewPhysicalShard++;
|
||||
}
|
||||
const bool enabled =
|
||||
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
rd.dataMoveId = newDataMoveId(
|
||||
physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled));
|
||||
TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard")
|
||||
|
|
|
@ -96,8 +96,8 @@ struct DDAudit {
|
|||
int64_t overallIssuedDoAuditCount;
|
||||
int64_t overallCompleteDoAuditCount;
|
||||
|
||||
void setAuditRunActor(Future<Void> actor) { auditActor = actor; }
|
||||
Future<Void> getAuditRunActor() { return auditActor; }
|
||||
inline void setAuditRunActor(Future<Void> actor) { auditActor = actor; }
|
||||
inline Future<Void> getAuditRunActor() { return auditActor; }
|
||||
|
||||
// auditActor and actors are guaranteed to deliver a cancel signal
|
||||
void cancel() {
|
||||
|
@ -771,15 +771,49 @@ void cancelAllAuditsInAuditMap(Reference<DataDistributor> self) {
|
|||
return;
|
||||
}
|
||||
|
||||
void resumeStorageAudits(Reference<DataDistributor> self) {
|
||||
ACTOR Future<Void> resumeStorageAudits(Reference<DataDistributor> self) {
|
||||
ASSERT(!self->auditInitialized.getFuture().isReady());
|
||||
if (self->initData->auditStates.empty()) {
|
||||
self->auditInitialized.send(Void());
|
||||
TraceEvent(SevVerbose, "AuditStorageResumeEmptyDone", self->ddId);
|
||||
return;
|
||||
return Void();
|
||||
}
|
||||
cancelAllAuditsInAuditMap(self); // cancel existing audits
|
||||
|
||||
// Update metadata
|
||||
state int retryCount = 0;
|
||||
loop {
|
||||
try {
|
||||
std::vector<Future<Void>> fs;
|
||||
state MoveKeyLockInfo lockInfo;
|
||||
lockInfo.myOwner = self->lock.myOwner;
|
||||
lockInfo.prevOwner = self->lock.prevOwner;
|
||||
lockInfo.prevWrite = self->lock.prevWrite;
|
||||
for (const auto& auditState : self->initData->auditStates) {
|
||||
// Only running audit will be resumed
|
||||
if (auditState.getPhase() == AuditPhase::Running) {
|
||||
AuditStorageState toUpdate = auditState;
|
||||
toUpdate.ddId = self->ddId;
|
||||
fs.push_back(updateAuditState(
|
||||
self->txnProcessor->context(), toUpdate, lockInfo, self->context->isDDEnabled()));
|
||||
}
|
||||
}
|
||||
wait(waitForAll(fs));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) {
|
||||
throw e;
|
||||
}
|
||||
if (retryCount > 50) {
|
||||
TraceEvent(SevWarnAlways, "ResumeAuditStorageUnableUpdateMetadata", self->ddId).errorUnsuppressed(e);
|
||||
return Void();
|
||||
}
|
||||
retryCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// Following is atomic
|
||||
// Cancel existing audits and restore
|
||||
cancelAllAuditsInAuditMap(self);
|
||||
std::unordered_map<AuditType, std::vector<AuditStorageState>> restoredAudits;
|
||||
for (const auto& auditState : self->initData->auditStates) {
|
||||
restoredAudits[auditState.getType()].push_back(auditState);
|
||||
|
@ -843,7 +877,7 @@ void resumeStorageAudits(Reference<DataDistributor> self) {
|
|||
|
||||
self->auditInitialized.send(Void());
|
||||
TraceEvent(SevDebug, "AuditStorageResumeDone", self->ddId);
|
||||
return;
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Periodically check and log the physicalShard status; clean up empty physicalShard;
|
||||
|
@ -1001,7 +1035,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
anyZeroHealthyTeams = zeroHealthyTeams[0];
|
||||
}
|
||||
|
||||
resumeStorageAudits(self);
|
||||
actors.push_back(resumeStorageAudits(self));
|
||||
|
||||
actors.push_back(self->pollMoveKeysLock());
|
||||
|
||||
|
@ -1753,6 +1787,7 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
|
|||
|
||||
try {
|
||||
ASSERT(audit != nullptr);
|
||||
ASSERT(audit->coreState.ddId == self->ddId);
|
||||
loadAndDispatchAudit(self, audit, audit->coreState.range);
|
||||
TraceEvent(SevInfo, "DDAuditStorageCoreScheduled", self->ddId)
|
||||
.detail("Context", context)
|
||||
|
@ -1931,6 +1966,7 @@ void runAuditStorage(Reference<DataDistributor> self,
|
|||
ASSERT(auditState.id.isValid());
|
||||
ASSERT(!auditState.range.empty());
|
||||
ASSERT(auditState.getPhase() == AuditPhase::Running);
|
||||
auditState.ddId = self->ddId; // make sure any existing audit state claims the current DD
|
||||
std::shared_ptr<DDAudit> audit = std::make_shared<DDAudit>(auditState);
|
||||
audit->retryCount = retryCount;
|
||||
TraceEvent(SevDebug, "DDRunAuditStorage", self->ddId)
|
||||
|
@ -1997,6 +2033,7 @@ ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRan
|
|||
auditState.setType(auditType);
|
||||
auditState.range = auditRange;
|
||||
auditState.setPhase(AuditPhase::Running);
|
||||
auditState.ddId = self->ddId; // persist ddId to new ddAudit metadata
|
||||
TraceEvent(SevVerbose, "DDAuditStorageLaunchPersistNewAuditIDBefore", self->ddId)
|
||||
.detail("AuditType", auditType)
|
||||
.detail("Range", auditRange);
|
||||
|
@ -2227,6 +2264,7 @@ ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor>
|
|||
// We always issue exactly one audit task (for the remaining part) when schedule
|
||||
ASSERT(issueDoAuditCount == 0);
|
||||
issueDoAuditCount++;
|
||||
req.ddId = self->ddId; // send this ddid to SS
|
||||
audit->actors.add(doAuditOnStorageServer(self, audit, ssi, req));
|
||||
}
|
||||
}
|
||||
|
@ -2447,6 +2485,7 @@ ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
|
|||
SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX - 1);
|
||||
}
|
||||
issueDoAuditCount++;
|
||||
req.ddId = self->ddId; // send this ddid to SS
|
||||
audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,7 +214,7 @@ class GlobalTagThrottlerImpl {
|
|||
result += getCurrentCost(id, tag).orDefault(0);
|
||||
}
|
||||
// FIXME: Disabled due to noisy trace events. Fix the noise and reenabled
|
||||
//TraceEvent("GlobalTagThrottler_GetCurrentCost").detail("Tag", printable(tag)).detail("Cost", result);
|
||||
//TraceEvent("GlobalTagThrottler_GetCurrentCost").detail("Tag", tag).detail("Cost", result);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -421,7 +421,7 @@ class GlobalTagThrottlerImpl {
|
|||
|
||||
isBusy = limitingTps.present() && limitingTps.get() < desiredTps;
|
||||
|
||||
te.detail("Tag", printable(tag))
|
||||
te.detail("Tag", tag)
|
||||
.detail("TargetTps", targetTps)
|
||||
.detail("AverageTransactionCost", averageTransactionCost)
|
||||
.detail("LimitingTps", limitingTps)
|
||||
|
@ -447,7 +447,7 @@ public:
|
|||
"been reached");
|
||||
TraceEvent("GlobalTagThrottler_IgnoringRequests")
|
||||
.suppressFor(60.0)
|
||||
.detail("Tag", printable(tag))
|
||||
.detail("Tag", tag)
|
||||
.detail("Count", count);
|
||||
} else {
|
||||
tagStatistics[tag].addTransactions(static_cast<double>(count));
|
||||
|
@ -534,7 +534,7 @@ public:
|
|||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
|
||||
auto& ssInfo = ssInfos[ss.id];
|
||||
ssInfo.throttlingRatio = ss.getTagThrottlingRatio(SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES,
|
||||
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
|
||||
SERVER_KNOBS->AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER);
|
||||
ssInfo.zoneId = ss.locality.zoneId();
|
||||
|
||||
auto& tagToThroughputCounters = throughput[ss.id];
|
||||
|
|
|
@ -112,7 +112,7 @@ void GrvProxyTagThrottler::addRequest(GetReadVersionRequest const& req) {
|
|||
TraceEvent(SevWarnAlways, "GrvProxyTagThrottler_MultipleTags")
|
||||
.suppressFor(60.0)
|
||||
.detail("NumTags", req.tags.size())
|
||||
.detail("UsingTag", printable(tag));
|
||||
.detail("UsingTag", tag);
|
||||
}
|
||||
queues[tag].requests.emplace_back(req);
|
||||
}
|
||||
|
|
|
@ -1953,9 +1953,7 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
|
|||
wait(waitForAll(actors));
|
||||
|
||||
if (range.end == dataMove.ranges.front().end) {
|
||||
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) {
|
||||
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
|
||||
}
|
||||
tr.clear(dataMoveKeyFor(dataMoveId));
|
||||
complete = true;
|
||||
TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", dataMoveId)
|
||||
|
@ -2707,9 +2705,7 @@ ACTOR Future<Void> cleanUpDataMoveCore(Database occ,
|
|||
}
|
||||
|
||||
if (range.end == dataMove.ranges.front().end) {
|
||||
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) {
|
||||
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
|
||||
}
|
||||
tr.clear(dataMoveKeyFor(dataMoveId));
|
||||
complete = true;
|
||||
TraceEvent(sevDm, "CleanUpDataMoveDeleteMetaData", dataMoveId)
|
||||
|
|
|
@ -1391,11 +1391,13 @@ UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) {
|
|||
|
||||
Optional<double> StorageQueueInfo::getTagThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const {
|
||||
auto const storageQueue = getStorageQueueBytes();
|
||||
if (storageQueue < storageTargetBytes - storageSpringBytes) {
|
||||
return {};
|
||||
// TODO: Remove duplicate calculation from Ratekeeper::updateRate
|
||||
double inverseResult = std::min(
|
||||
2.0, (storageQueue - storageTargetBytes + storageSpringBytes) / static_cast<double>(storageSpringBytes));
|
||||
if (inverseResult > 0) {
|
||||
return 1.0 / inverseResult;
|
||||
} else {
|
||||
return std::max(
|
||||
0.0, static_cast<double>((storageTargetBytes + storageSpringBytes) - storageQueue) / storageSpringBytes);
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2044,8 +2044,16 @@ int main(int argc, char* argv[]) {
|
|||
}
|
||||
}
|
||||
|
||||
openTraceFile(
|
||||
opts.publicAddresses.address, opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup);
|
||||
openTraceFile(opts.publicAddresses.address,
|
||||
opts.rollsize,
|
||||
opts.maxLogsSize,
|
||||
opts.logFolder,
|
||||
"trace",
|
||||
opts.logGroup,
|
||||
/* identifier = */ "",
|
||||
/* tracePartialFileSuffix = */ "",
|
||||
InitializeTraceMetrics::True);
|
||||
|
||||
g_network->initTLS();
|
||||
if (!opts.authzPublicKeyFile.empty()) {
|
||||
try {
|
||||
|
@ -2089,7 +2097,6 @@ int main(int argc, char* argv[]) {
|
|||
opts.fileSystemPath);
|
||||
g_network->initMetrics();
|
||||
FlowTransport::transport().initMetrics();
|
||||
initTraceEventMetrics();
|
||||
}
|
||||
|
||||
double start = timer(), startNow = now();
|
||||
|
|
|
@ -4903,7 +4903,8 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector<Optional<Tuple>>& vec,
|
|||
ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
|
||||
AuditStorageState auditState,
|
||||
Version version,
|
||||
StorageServerInterface remoteServer) {
|
||||
StorageServerInterface remoteServer,
|
||||
UID ddId) {
|
||||
TraceEvent(SevInfo, "ValidateRangeAgainstServerBegin", data->thisServerID)
|
||||
.detail("AuditID", auditState.id)
|
||||
.detail("Range", auditState.range)
|
||||
|
@ -5033,6 +5034,8 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
|
|||
range = KeyRangeRef(keyAfter(lastKey), range.end);
|
||||
auditState.range = KeyRangeRef(originBegin, range.begin);
|
||||
auditState.setPhase(AuditPhase::Complete);
|
||||
ASSERT(ddId.isValid());
|
||||
auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId
|
||||
wait(persistAuditStateByRange(data->cx, auditState));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -5052,6 +5055,8 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
|
|||
.detail("ErrorMessage", error)
|
||||
.detail("RemoteServer", remoteServer.toString());
|
||||
auditState.setPhase(AuditPhase::Error);
|
||||
ASSERT(ddId.isValid());
|
||||
auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId
|
||||
wait(persistAuditStateByRange(data->cx, auditState));
|
||||
throw audit_storage_error();
|
||||
}
|
||||
|
@ -5065,7 +5070,10 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> validateRangeShard(StorageServer* data, AuditStorageState auditState, std::vector<UID> candidates) {
|
||||
ACTOR Future<Void> validateRangeShard(StorageServer* data,
|
||||
AuditStorageState auditState,
|
||||
std::vector<UID> candidates,
|
||||
UID ddId) {
|
||||
TraceEvent(SevDebug, "ServeValidateRangeShardBegin", data->thisServerID)
|
||||
.detail("Range", auditState.range)
|
||||
.detail("Servers", describe(candidates));
|
||||
|
@ -5126,7 +5134,7 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, AuditStorageState aud
|
|||
}
|
||||
|
||||
if (remoteServer != nullptr) {
|
||||
wait(validateRangeAgainstServer(data, auditState, version, *remoteServer));
|
||||
wait(validateRangeAgainstServer(data, auditState, version, *remoteServer, ddId));
|
||||
} else {
|
||||
TraceEvent(SevWarn, "ServeValidateRangeShardRemoteNotFound", data->thisServerID)
|
||||
.detail("Range", auditState.range)
|
||||
|
@ -5139,7 +5147,8 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, AuditStorageState aud
|
|||
|
||||
ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data,
|
||||
AuditStorageState auditState,
|
||||
std::vector<UID> targetServers) {
|
||||
std::vector<UID> targetServers,
|
||||
UID ddId) {
|
||||
TraceEvent(SevDebug, "ValidateRangeAgainstServersBegin", data->thisServerID)
|
||||
.detail("AuditID", auditState.id)
|
||||
.detail("Range", auditState.range)
|
||||
|
@ -5176,7 +5185,7 @@ ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data,
|
|||
.detail("Range", auditState.range);
|
||||
throw audit_storage_failed();
|
||||
}
|
||||
fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get())));
|
||||
fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get()), ddId));
|
||||
}
|
||||
|
||||
wait(waitForAll(fs));
|
||||
|
@ -5506,9 +5515,6 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
|
|||
state Key rangeToReadBegin = req.range.begin;
|
||||
state KeyRangeRef rangeToRead;
|
||||
state int retryCount = 0;
|
||||
state int storageAutoProceedCount = 0;
|
||||
// storageAutoProceedCount is guard to make sure that audit at SS does not run too long
|
||||
// by itself without being notified by DD
|
||||
state int64_t cumulatedValidatedLocalShardsNum = 0;
|
||||
state int64_t cumulatedValidatedServerKeysNum = 0;
|
||||
|
||||
|
@ -5655,6 +5661,8 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
|
|||
.detail("AuditServer", data->thisServerID);
|
||||
res.range = claimRange;
|
||||
res.setPhase(AuditPhase::Error);
|
||||
ASSERT(req.ddId.isValid());
|
||||
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
|
||||
wait(persistAuditStateByServer(data->cx, res));
|
||||
req.reply.sendError(audit_storage_error());
|
||||
break;
|
||||
|
@ -5662,6 +5670,8 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
|
|||
// Expand persisted complete range
|
||||
res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end));
|
||||
res.setPhase(AuditPhase::Complete);
|
||||
ASSERT(req.ddId.isValid());
|
||||
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
|
||||
wait(persistAuditStateByServer(data->cx, res));
|
||||
TraceEvent(SevInfo, "AuditStorageSsShardDone", data->thisServerID)
|
||||
.detail("AuditId", req.id)
|
||||
|
@ -5669,11 +5679,7 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
|
|||
.detail("AuditServer", data->thisServerID)
|
||||
.detail("CompleteRange", res.range);
|
||||
if (claimRange.end < rangeToRead.end) {
|
||||
if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) {
|
||||
throw audit_storage_failed();
|
||||
}
|
||||
rangeToReadBegin = claimRange.end;
|
||||
storageAutoProceedCount++;
|
||||
} else { // complete
|
||||
req.reply.send(res);
|
||||
break;
|
||||
|
@ -5730,9 +5736,6 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
|
|||
state Transaction tr(data->cx);
|
||||
state Key rangeToReadBegin = req.range.begin;
|
||||
state KeyRangeRef rangeToRead;
|
||||
state int storageAutoProceedCount = 0;
|
||||
// storageAutoProceedCount is guard to make sure that audit at SS does not run too long
|
||||
// by itself without being notified by DD
|
||||
state int64_t cumulatedValidatedServerKeysNum = 0;
|
||||
state int64_t cumulatedValidatedKeyServersNum = 0;
|
||||
|
||||
|
@ -5891,6 +5894,8 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
|
|||
.detail("ClaimRange", claimRange);
|
||||
res.range = claimRange;
|
||||
res.setPhase(AuditPhase::Error);
|
||||
ASSERT(req.ddId.isValid());
|
||||
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
|
||||
wait(persistAuditStateByRange(data->cx, res));
|
||||
req.reply.sendError(audit_storage_error());
|
||||
break;
|
||||
|
@ -5898,6 +5903,8 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
|
|||
// Expand persisted complete range
|
||||
res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end));
|
||||
res.setPhase(AuditPhase::Complete);
|
||||
ASSERT(req.ddId.isValid());
|
||||
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
|
||||
wait(persistAuditStateByRange(data->cx, res));
|
||||
TraceEvent(SevInfo, "AuditStorageShardLocMetadataDone", data->thisServerID)
|
||||
.detail("AuditId", req.id)
|
||||
|
@ -5906,11 +5913,7 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
|
|||
.detail("AuditServerId", data->thisServerID)
|
||||
.detail("CompleteRange", res.range);
|
||||
if (claimRange.end < rangeToRead.end) {
|
||||
if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) {
|
||||
throw audit_storage_failed();
|
||||
}
|
||||
rangeToReadBegin = claimRange.end;
|
||||
storageAutoProceedCount++;
|
||||
} else { // complete
|
||||
req.reply.send(res);
|
||||
break;
|
||||
|
@ -5972,7 +5975,8 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
|
|||
fs.push_back(validateRangeShard(
|
||||
data,
|
||||
AuditStorageState(res.id, KeyRangeRef(shards[i].key, shards[i + 1].key), res.getType()),
|
||||
src));
|
||||
src,
|
||||
req.ddId));
|
||||
begin = shards[i + 1].key;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -5980,7 +5984,7 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
fs.push_back(validateRangeAgainstServers(data, res, req.targetServers));
|
||||
fs.push_back(validateRangeAgainstServers(data, res, req.targetServers, req.ddId));
|
||||
}
|
||||
|
||||
wait(waitForAll(fs));
|
||||
|
@ -9439,6 +9443,8 @@ ACTOR Future<Void> cleanUpMoveInShard(StorageServer* data, Version version, Move
|
|||
}
|
||||
wait(data->durableVersion.whenAtLeast(mLV.version + 1));
|
||||
|
||||
data->moveInShards.erase(moveInShard->id());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -13501,7 +13507,7 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
|||
self->busiestWriteTagContext.lastUpdateTime = req.postTime;
|
||||
TraceEvent("BusiestWriteTag", self->thisServerID)
|
||||
.detail("Elapsed", req.elapsed)
|
||||
.detail("Tag", printable(req.busiestTag))
|
||||
.detail("Tag", req.busiestTag)
|
||||
.detail("TagOps", req.opsSum)
|
||||
.detail("TagCost", req.costSum)
|
||||
.detail("TotalCost", req.totalWriteCosts)
|
||||
|
@ -13890,6 +13896,8 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
|
|||
|
||||
self.ssLock->halt();
|
||||
|
||||
self.moveInShards.clear();
|
||||
|
||||
state Error err = e;
|
||||
if (storageServerTerminated(self, persistentData, err)) {
|
||||
ssCore.cancel();
|
||||
|
|
|
@ -101,7 +101,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
|
|||
newDataMoveId(deterministicRandom()->randomUInt64(),
|
||||
AssignEmptyRange::False,
|
||||
EnablePhysicalShardMove::True),
|
||||
// EnablePhysicalShardMove::False),
|
||||
KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr),
|
||||
teamSize,
|
||||
includes,
|
||||
|
|
|
@ -170,6 +170,7 @@ public:
|
|||
bool logTraceEventMetrics;
|
||||
|
||||
void initMetrics() {
|
||||
ASSERT(!isOpen());
|
||||
SevErrorNames.init("TraceEvents.SevError"_sr);
|
||||
SevWarnAlwaysNames.init("TraceEvents.SevWarnAlways"_sr);
|
||||
SevWarnNames.init("TraceEvents.SevWarn"_sr);
|
||||
|
@ -430,9 +431,8 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
void log(int severity, const char* name, UID id, uint64_t event_ts) {
|
||||
if (!logTraceEventMetrics)
|
||||
return;
|
||||
void logMetrics(int severity, const char* name, UID id, uint64_t event_ts) {
|
||||
ASSERT(TraceEvent::isNetworkThread() && logTraceEventMetrics);
|
||||
|
||||
EventMetricHandle<TraceEventNameID>* m = nullptr;
|
||||
switch (severity) {
|
||||
|
@ -781,7 +781,8 @@ void openTraceFile(const Optional<NetworkAddress>& na,
|
|||
std::string baseOfBase,
|
||||
std::string logGroup,
|
||||
std::string identifier,
|
||||
std::string tracePartialFileSuffix) {
|
||||
std::string tracePartialFileSuffix,
|
||||
InitializeTraceMetrics initializeTraceMetrics) {
|
||||
if (g_traceLog.isOpen())
|
||||
return;
|
||||
|
||||
|
@ -808,6 +809,10 @@ void openTraceFile(const Optional<NetworkAddress>& na,
|
|||
baseName = format("%s.0.0.0.0.%d", baseOfBase.c_str(), ::getpid());
|
||||
}
|
||||
|
||||
if (initializeTraceMetrics) {
|
||||
g_traceLog.initMetrics();
|
||||
}
|
||||
|
||||
g_traceLog.open(directory,
|
||||
baseName,
|
||||
logGroup,
|
||||
|
@ -821,10 +826,6 @@ void openTraceFile(const Optional<NetworkAddress>& na,
|
|||
g_traceBatch.dump();
|
||||
}
|
||||
|
||||
void initTraceEventMetrics() {
|
||||
g_traceLog.initMetrics();
|
||||
}
|
||||
|
||||
void closeTraceFile() {
|
||||
g_traceLog.close();
|
||||
}
|
||||
|
@ -1340,17 +1341,17 @@ void BaseTraceEvent::log() {
|
|||
|
||||
if (g_traceLog.isOpen()) {
|
||||
// Log Metrics
|
||||
if (g_traceLog.logTraceEventMetrics && isNetworkThread()) {
|
||||
if (isNetworkThread() && g_traceLog.logTraceEventMetrics) {
|
||||
// Get the persistent Event Metric representing this trace event and push the fields (details)
|
||||
// accumulated in *this to it and then log() it. Note that if the event metric is disabled it
|
||||
// won't actually be logged BUT any new fields added to it will be registered. If the event IS
|
||||
// logged, a timestamp will be returned, if not then 0. Either way, pass it through to be used
|
||||
// if possible in the Sev* event metrics.
|
||||
// accumulated in *this to it and then logMetrics() it. Note that if the event metric is
|
||||
// disabled it won't actually be logged BUT any new fields added to it will be registered. If
|
||||
// the event IS logged, a timestamp will be returned, if not then 0. Either way, pass it
|
||||
// through to be used if possible in the Sev* event metrics.
|
||||
|
||||
uint64_t event_ts =
|
||||
DynamicEventMetric::getOrCreateInstance(format("TraceEvent.%s", type), StringRef(), true)
|
||||
->setFieldsAndLogFrom(tmpEventMetric.get());
|
||||
g_traceLog.log(severity, type, id, event_ts);
|
||||
g_traceLog.logMetrics(severity, type, id, event_ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "flow/Trace.h"
|
||||
|
||||
class BooleanParam {
|
||||
bool value;
|
||||
|
||||
|
@ -31,11 +29,6 @@ public:
|
|||
constexpr void set(bool value) { this->value = value; }
|
||||
};
|
||||
|
||||
template <class BooleanParamSub>
|
||||
struct Traceable<BooleanParamSub, std::enable_if_t<std::is_base_of_v<BooleanParam, BooleanParamSub>>> : std::true_type {
|
||||
static std::string toString(BooleanParamSub const& value) { return Traceable<bool>::toString(value); }
|
||||
};
|
||||
|
||||
// Declares a boolean parametr with the desired name. This declaration can be nested inside of a namespace or another
|
||||
// class. This macro should not be used directly unless this boolean parameter is going to be defined as a nested class.
|
||||
#define FDB_DECLARE_BOOLEAN_PARAM(ParamName) \
|
||||
|
|
|
@ -281,7 +281,7 @@ struct CodeProbeImpl : ICodeProbe {
|
|||
private:
|
||||
CodeProbeImpl() { registerProbe(*this); }
|
||||
inline static CodeProbeImpl _instance;
|
||||
unsigned _hitCount = 0;
|
||||
std::atomic<unsigned> _hitCount = 0;
|
||||
Annotations annotations;
|
||||
};
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include <map>
|
||||
#include <set>
|
||||
#include <type_traits>
|
||||
#include "flow/BooleanParam.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/ITrace.h"
|
||||
|
@ -39,6 +40,8 @@
|
|||
#define TRACE_DEFAULT_ROLL_SIZE (10 << 20)
|
||||
#define TRACE_DEFAULT_MAX_LOGS_SIZE (10 * TRACE_DEFAULT_ROLL_SIZE)
|
||||
|
||||
FDB_BOOLEAN_PARAM(InitializeTraceMetrics);
|
||||
|
||||
inline int fastrand() {
|
||||
static int g_seed = 0;
|
||||
g_seed = 214013 * g_seed + 2531011;
|
||||
|
@ -539,8 +542,8 @@ void openTraceFile(const Optional<NetworkAddress>& na,
|
|||
std::string baseOfBase = "trace",
|
||||
std::string logGroup = "default",
|
||||
std::string identifier = "",
|
||||
std::string tracePartialFileSuffix = "");
|
||||
void initTraceEventMetrics();
|
||||
std::string tracePartialFileSuffix = "",
|
||||
InitializeTraceMetrics initializeTraceMetrics = InitializeTraceMetrics::False);
|
||||
void closeTraceFile();
|
||||
bool traceFileIsOpen();
|
||||
void flushTraceFileVoid();
|
||||
|
|
|
@ -29,6 +29,8 @@
|
|||
#include <type_traits>
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "flow/BooleanParam.h"
|
||||
|
||||
#define PRINTABLE_COMPRESS_NULLS 0
|
||||
|
||||
template <class IntType>
|
||||
|
@ -245,6 +247,11 @@ struct Traceable<std::atomic<T>> : std::true_type {
|
|||
static std::string toString(const std::atomic<T>& value) { return Traceable<T>::toString(value.load()); }
|
||||
};
|
||||
|
||||
template <class BooleanParamSub>
|
||||
struct Traceable<BooleanParamSub, std::enable_if_t<std::is_base_of_v<BooleanParam, BooleanParamSub>>> : std::true_type {
|
||||
static std::string toString(BooleanParamSub const& value) { return Traceable<bool>::toString(value); }
|
||||
};
|
||||
|
||||
// Adapter to redirect fmt::formatter calls to Traceable for a supported type
|
||||
template <typename T>
|
||||
struct FormatUsingTraceable : fmt::formatter<std::string> {
|
||||
|
|
Loading…
Reference in New Issue