Move role UIDs for MutationTracking TraceEvents from various inconsistent detail fields into the TraceEvent UID field.
This commit is contained in:
parent
faa4154a56
commit
c73e861074
|
@ -939,8 +939,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
pProxyCommitData->singleKeyMutationEvent->log();
|
||||
}
|
||||
|
||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
|
||||
.detail("Dbgid", pProxyCommitData->dbgid)
|
||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
|
||||
.detail("To", tags);
|
||||
self->toCommit.addTags(tags);
|
||||
if (pProxyCommitData->cacheInfo[m.param1]) {
|
||||
|
@ -954,8 +953,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
++firstRange;
|
||||
if (firstRange == ranges.end()) {
|
||||
// Fast path
|
||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
|
||||
.detail("Dbgid", pProxyCommitData->dbgid)
|
||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
|
||||
.detail("To", ranges.begin().value().tags);
|
||||
|
||||
ranges.begin().value().populateTags();
|
||||
|
@ -991,8 +989,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
trCost->get().clearIdxCosts.pop_front();
|
||||
}
|
||||
}
|
||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
|
||||
.detail("Dbgid", pProxyCommitData->dbgid)
|
||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
|
||||
.detail("To", allSources);
|
||||
|
||||
self->toCommit.addTags(allSources);
|
||||
|
@ -1520,7 +1517,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
|
|||
// We can't respond to these requests until we have valid txnStateStore
|
||||
wait(commitData->validState.getFuture());
|
||||
|
||||
TraceEvent("ProxyReadyForReads", proxy.id());
|
||||
TraceEvent("ProxyReadyForReads", proxy.id()).log();
|
||||
|
||||
loop {
|
||||
GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture());
|
||||
|
|
|
@ -111,8 +111,7 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
|
|||
}
|
||||
|
||||
messageAndTags.loadFromArena(&rd, &messageVersion.sub);
|
||||
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage())
|
||||
.detail("CursorID", this->randomID);
|
||||
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage(), this->randomID);
|
||||
// Rewind and consume the header so that reader() starts from the message.
|
||||
rd.rewind();
|
||||
rd.readBytes(messageAndTags.getHeaderSize());
|
||||
|
|
|
@ -31,32 +31,32 @@
|
|||
// Track any of these keys in simulation via enabling MUTATION_TRACKING_ENABLED and setting the keys here.
|
||||
std::vector<KeyRef> debugKeys = { ""_sr, "\xff\xff"_sr };
|
||||
|
||||
TraceEvent debugMutationEnabled(const char* context, Version version, MutationRef const& mutation) {
|
||||
TraceEvent debugMutationEnabled(const char* context, Version version, MutationRef const& mutation, UID id) {
|
||||
if (std::any_of(debugKeys.begin(), debugKeys.end(), [&mutation](const KeyRef& debugKey) {
|
||||
return ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) &&
|
||||
mutation.param1 <= debugKey && mutation.param2 > debugKey) ||
|
||||
mutation.param1 == debugKey;
|
||||
})) {
|
||||
|
||||
TraceEvent event("MutationTracking");
|
||||
TraceEvent event("MutationTracking", id);
|
||||
event.detail("At", context).detail("Version", version).detail("Mutation", mutation);
|
||||
return event;
|
||||
}
|
||||
return TraceEvent();
|
||||
}
|
||||
|
||||
TraceEvent debugKeyRangeEnabled(const char* context, Version version, KeyRangeRef const& keys) {
|
||||
TraceEvent debugKeyRangeEnabled(const char* context, Version version, KeyRangeRef const& keys, UID id) {
|
||||
if (std::any_of(
|
||||
debugKeys.begin(), debugKeys.end(), [&keys](const KeyRef& debugKey) { return keys.contains(debugKey); })) {
|
||||
|
||||
TraceEvent event("MutationTracking");
|
||||
TraceEvent event("MutationTracking", id);
|
||||
event.detail("At", context).detail("Version", version).detail("Mutation", MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end));
|
||||
return event;
|
||||
}
|
||||
return TraceEvent();
|
||||
}
|
||||
|
||||
TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, StringRef commitBlob) {
|
||||
TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, StringRef commitBlob, UID id) {
|
||||
BinaryReader rdr(commitBlob, AssumeVersion(g_network->protocolVersion()));
|
||||
while (!rdr.empty()) {
|
||||
if (*(int32_t*)rdr.peekBytes(4) == VERSION_HEADER) {
|
||||
|
@ -86,7 +86,7 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
|
|||
MutationRef m;
|
||||
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
|
||||
br >> m;
|
||||
TraceEvent event = debugMutation(context, version, m);
|
||||
TraceEvent event = debugMutation(context, version, m, id);
|
||||
if (event.isEnabled()) {
|
||||
event.detail("MessageTags", msg.tags);
|
||||
return event;
|
||||
|
@ -97,23 +97,23 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
|
|||
}
|
||||
|
||||
#if MUTATION_TRACKING_ENABLED
|
||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation) {
|
||||
return debugMutationEnabled(context, version, mutation);
|
||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id) {
|
||||
return debugMutationEnabled(context, version, mutation, id);
|
||||
}
|
||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys) {
|
||||
return debugKeyRangeEnabled(context, version, keys);
|
||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id) {
|
||||
return debugKeyRangeEnabled(context, version, keys, id);
|
||||
}
|
||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob) {
|
||||
return debugTagsAndMessageEnabled(context, version, commitBlob);
|
||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id) {
|
||||
return debugTagsAndMessageEnabled(context, version, commitBlob, id);
|
||||
}
|
||||
#else
|
||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation) {
|
||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id) {
|
||||
return TraceEvent();
|
||||
}
|
||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys) {
|
||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id) {
|
||||
return TraceEvent();
|
||||
}
|
||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob) {
|
||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id) {
|
||||
return TraceEvent();
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -28,19 +28,18 @@
|
|||
#define MUTATION_TRACKING_ENABLED 0
|
||||
// The keys to track are defined in the .cpp file to limit recompilation.
|
||||
|
||||
#define DEBUG_MUTATION(context, version, mutation) MUTATION_TRACKING_ENABLED&& debugMutation(context, version, mutation)
|
||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation);
|
||||
#define DEBUG_MUTATION(...) MUTATION_TRACKING_ENABLED && debugMutation(__VA_ARGS__)
|
||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id = UID());
|
||||
|
||||
// debugKeyRange and debugTagsAndMessage only log the *first* occurrence of a key in their range/commit.
|
||||
// TODO: Create a TraceEventGroup that forwards all calls to each element of a vector<TraceEvent>,
|
||||
// to allow "multiple" TraceEvents to be returned.
|
||||
|
||||
#define DEBUG_KEY_RANGE(context, version, keys) MUTATION_TRACKING_ENABLED&& debugKeyRange(context, version, keys)
|
||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys);
|
||||
#define DEBUG_KEY_RANGE(...) MUTATION_TRACKING_ENABLED && debugKeyRange(__VA_ARGS__)
|
||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id = UID());
|
||||
|
||||
#define DEBUG_TAGS_AND_MESSAGE(context, version, commitBlob) \
|
||||
MUTATION_TRACKING_ENABLED&& debugTagsAndMessage(context, version, commitBlob)
|
||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob);
|
||||
#define DEBUG_TAGS_AND_MESSAGE(...) MUTATION_TRACKING_ENABLED && debugTagsAndMessage(__VA_ARGS__)
|
||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id = UID());
|
||||
|
||||
// TODO: Version Tracking. If the bug is in handling a version rather than a key, then it'd be good to be able to log
|
||||
// each time that version is handled within simulation. A similar set of functions should be implemented.
|
||||
|
|
|
@ -1419,9 +1419,8 @@ void commitMessages(TLogData* self,
|
|||
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
|
||||
}
|
||||
|
||||
DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage())
|
||||
.detail("UID", self->dbgid)
|
||||
.detail("LogId", logData->logId);
|
||||
DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage(), logData->logId)
|
||||
.detail("DebugID", self->dbgid);
|
||||
block.append(block.arena(), msg.message.begin(), msg.message.size());
|
||||
for (auto tag : msg.tags) {
|
||||
if (logData->locality == tagLocalitySatellite) {
|
||||
|
@ -1552,8 +1551,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
|
|||
messages << it->second.toStringRef();
|
||||
void* data = messages.getData();
|
||||
DEBUG_TAGS_AND_MESSAGE(
|
||||
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset))
|
||||
.detail("LogId", self->logId)
|
||||
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId)
|
||||
.detail("PeekTag", req.tag);
|
||||
}
|
||||
}
|
||||
|
@ -1823,9 +1821,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
|||
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
|
||||
for (const StringRef& msg : rawMessages) {
|
||||
messages.serializeBytes(msg);
|
||||
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg)
|
||||
.detail("UID", self->dbgid)
|
||||
.detail("LogId", logData->logId)
|
||||
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg, logData->logId)
|
||||
.detail("DebugID", self->dbgid)
|
||||
.detail("PeekTag", req.tag);
|
||||
}
|
||||
|
||||
|
|
|
@ -1267,14 +1267,16 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||
|
||||
DEBUG_MUTATION("ShardGetValue",
|
||||
version,
|
||||
MutationRef(MutationRef::DebugKey, req.key, v.present() ? v.get() : LiteralStringRef("<null>")));
|
||||
MutationRef(MutationRef::DebugKey, req.key, v.present() ? v.get() : LiteralStringRef("<null>")),
|
||||
data->thisServerID);
|
||||
DEBUG_MUTATION("ShardGetPath",
|
||||
version,
|
||||
MutationRef(MutationRef::DebugKey,
|
||||
req.key,
|
||||
path == 0 ? LiteralStringRef("0")
|
||||
: path == 1 ? LiteralStringRef("1")
|
||||
: LiteralStringRef("2")));
|
||||
: LiteralStringRef("2")),
|
||||
data->thisServerID);
|
||||
|
||||
/*
|
||||
StorageMetrics m;
|
||||
|
@ -1382,7 +1384,8 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanID parent
|
|||
latest,
|
||||
MutationRef(MutationRef::DebugKey,
|
||||
metadata->key,
|
||||
reply.value.present() ? StringRef(reply.value.get()) : LiteralStringRef("<null>")));
|
||||
reply.value.present() ? StringRef(reply.value.get()) : LiteralStringRef("<null>")),
|
||||
data->thisServerID);
|
||||
|
||||
if (metadata->debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
|
@ -2835,7 +2838,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
wait(data->coreStarted.getFuture() && delay(0));
|
||||
|
||||
try {
|
||||
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys);
|
||||
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys, data->thisServerID);
|
||||
|
||||
TraceEvent(SevDebug, interval.begin(), data->thisServerID)
|
||||
.detail("KeyBegin", shard->keys.begin)
|
||||
|
@ -2926,10 +2929,12 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
.detail("Last", this_block.size() ? this_block.end()[-1].key : std::string())
|
||||
.detail("Version", fetchVersion)
|
||||
.detail("More", this_block.more);
|
||||
DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys);
|
||||
|
||||
DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys, data->thisServerID);
|
||||
if(MUTATION_TRACKING_ENABLED) {
|
||||
for (auto k = this_block.begin(); k != this_block.end(); ++k)
|
||||
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
|
||||
for (auto k = this_block.begin(); k != this_block.end(); ++k) {
|
||||
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value), data->thisServerID);
|
||||
}
|
||||
}
|
||||
|
||||
metricReporter.addFetchedBytes(expectedBlockSize, this_block.size());
|
||||
|
@ -3094,8 +3099,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
ASSERT(b->version >= checkv);
|
||||
checkv = b->version;
|
||||
if(MUTATION_TRACKING_ENABLED) {
|
||||
for (auto& m : b->mutations)
|
||||
DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m);
|
||||
for (auto& m : b->mutations) {
|
||||
DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m, data->thisServerID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3222,7 +3228,7 @@ void changeServerKeys(StorageServer* data,
|
|||
validate(data);
|
||||
|
||||
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
|
||||
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys);
|
||||
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys, data->thisServerID);
|
||||
|
||||
bool isDifferent = false;
|
||||
auto existingShards = data->shards.intersectingRanges(keys);
|
||||
|
@ -3330,7 +3336,7 @@ void changeServerKeys(StorageServer* data,
|
|||
|
||||
void rollback(StorageServer* data, Version rollbackVersion, Version nextVersion) {
|
||||
TEST(true); // call to shard rollback
|
||||
DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys);
|
||||
DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys, data->thisServerID);
|
||||
|
||||
// We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
|
||||
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
|
||||
|
@ -3355,8 +3361,7 @@ void StorageServer::addMutation(Version version,
|
|||
return;
|
||||
}
|
||||
expanded = addMutationToMutationLog(mLog, expanded);
|
||||
DEBUG_MUTATION("applyMutation", version, expanded)
|
||||
.detail("UID", thisServerID)
|
||||
DEBUG_MUTATION("applyMutation", version, expanded, thisServerID)
|
||||
.detail("ShardBegin", shard.begin)
|
||||
.detail("ShardEnd", shard.end);
|
||||
applyMutation(this, expanded, mLog.arena(), mutableData());
|
||||
|
@ -3427,7 +3432,7 @@ public:
|
|||
} else {
|
||||
// FIXME: enable when DEBUG_MUTATION is active
|
||||
// for(auto m = changes[c].mutations.begin(); m; ++m) {
|
||||
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m);
|
||||
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m, data->thisServerID);
|
||||
//}
|
||||
|
||||
splitMutation(data, data->shards, m, ver);
|
||||
|
@ -3834,11 +3839,13 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
.suppressFor(10.0)
|
||||
.detail("Version", cloneCursor2->version().toString());
|
||||
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
|
||||
DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID);
|
||||
DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
|
||||
if (ver == 1) {
|
||||
//TraceEvent("SSPeekMutation", data->thisServerID).log();
|
||||
// The following trace event may produce a value with special characters
|
||||
TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg).detail("Version", cloneCursor2->version().toString());
|
||||
TraceEvent("SSPeekMutation", data->thisServerID)
|
||||
.detail("Mutation", msg)
|
||||
.detail("Version", cloneCursor2->version().toString());
|
||||
}
|
||||
|
||||
updater.applyMutation(data, msg, ver);
|
||||
|
@ -4158,7 +4165,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
|
|||
Version debugVersion,
|
||||
const char* debugContext) {
|
||||
for (const auto& m : mutations) {
|
||||
DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID);
|
||||
DEBUG_MUTATION(debugContext, debugVersion, m, data->thisServerID);
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
storage->set(KeyValueRef(m.param1, m.param2));
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
|
|
Loading…
Reference in New Issue