Add read version and commit version info to getVersion and commit client transaction events.
This commit is contained in:
parent
939a62449f
commit
d6bc5263ad
|
@ -45,11 +45,12 @@ PROTOCOL_VERSION_5_2 = 0x0FDB00A552000001
|
|||
PROTOCOL_VERSION_6_0 = 0x0FDB00A570010001
|
||||
PROTOCOL_VERSION_6_1 = 0x0FDB00B061060001
|
||||
PROTOCOL_VERSION_6_2 = 0x0FDB00B062010001
|
||||
PROTOCOL_VERSION_6_3 = 0x0FDB00B063010001
|
||||
supported_protocol_versions = frozenset([PROTOCOL_VERSION_5_2, PROTOCOL_VERSION_6_0, PROTOCOL_VERSION_6_1,
|
||||
PROTOCOL_VERSION_6_2])
|
||||
PROTOCOL_VERSION_6_2, PROTOCOL_VERSION_6_3])
|
||||
|
||||
|
||||
fdb.api_version(600)
|
||||
fdb.api_version(520)
|
||||
|
||||
BASIC_FORMAT = "%(asctime)s - %(levelname)-8s %(message)s"
|
||||
LOG_PATH = "transaction_profiling_analyzer.log"
|
||||
|
@ -180,7 +181,8 @@ class GetVersionInfo(BaseInfo):
|
|||
self.latency = bb.get_double()
|
||||
if protocol_version >= PROTOCOL_VERSION_6_2:
|
||||
self.transaction_priority_type = bb.get_int()
|
||||
|
||||
if protocol_version >= PROTOCOL_VERSION_6_3:
|
||||
self.read_version = bb.get_long()
|
||||
|
||||
class GetInfo(BaseInfo):
|
||||
def __init__(self, bb):
|
||||
|
@ -204,7 +206,9 @@ class CommitInfo(BaseInfo):
|
|||
self.latency = bb.get_double()
|
||||
self.num_mutations = bb.get_int()
|
||||
self.commit_bytes = bb.get_int()
|
||||
|
||||
|
||||
if protocol_version >= PROTOCOL_VERSION_6_3:
|
||||
self.commit_version = bb.get_long()
|
||||
read_conflict_range = bb.get_key_range_list()
|
||||
if full_output:
|
||||
self.read_conflict_range = read_conflict_range
|
||||
|
|
|
@ -108,6 +108,41 @@ namespace FdbClientLogEvents {
|
|||
}
|
||||
};
|
||||
|
||||
// Version V3 of EventGetVersion starting at 6.3
|
||||
struct EventGetVersion_V3 : public Event {
|
||||
EventGetVersion_V3(double ts, double lat, uint32_t type, Version version) : Event(GET_VERSION_LATENCY, ts), latency(lat), readVersion(version) {
|
||||
if(type == GetReadVersionRequest::PRIORITY_DEFAULT) {
|
||||
priorityType = PRIORITY_DEFAULT;
|
||||
} else if (type == GetReadVersionRequest::PRIORITY_BATCH) {
|
||||
priorityType = PRIORITY_BATCH;
|
||||
} else if (type == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE){
|
||||
priorityType = PRIORITY_IMMEDIATE;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
EventGetVersion_V3() { }
|
||||
|
||||
template <typename Ar> Ar& serialize(Ar &ar) {
|
||||
if (!ar.isDeserializing)
|
||||
return serializer(Event::serialize(ar), latency, priorityType, readVersion);
|
||||
else
|
||||
return serializer(ar, latency, priorityType, readVersion);
|
||||
}
|
||||
|
||||
double latency;
|
||||
TrasactionPriorityType priorityType {PRIORITY_END};
|
||||
Version readVersion;
|
||||
|
||||
void logEvent(std::string id, int maxFieldLength) const {
|
||||
TraceEvent("TransactionTrace_GetVersion")
|
||||
.detail("TransactionID", id)
|
||||
.detail("Latency", latency)
|
||||
.detail("PriorityType", priorityType)
|
||||
.detail("ReadVersion", readVersion);
|
||||
}
|
||||
};
|
||||
|
||||
struct EventGet : public Event {
|
||||
EventGet(double ts, double lat, int size, const KeyRef &in_key) : Event(GET_LATENCY, ts), latency(lat), valueSize(size), key(in_key) { }
|
||||
EventGet() { }
|
||||
|
@ -213,6 +248,61 @@ namespace FdbClientLogEvents {
|
|||
}
|
||||
};
|
||||
|
||||
// Version V2 of EventGetVersion starting at 6.3
|
||||
struct EventCommit_V2 : public Event {
|
||||
EventCommit_V2(double ts, double lat, int mut, int bytes, Version version, const CommitTransactionRequest &commit_req)
|
||||
: Event(COMMIT_LATENCY, ts), latency(lat), numMutations(mut), commitBytes(bytes), commitVersion(version), req(commit_req) { }
|
||||
EventCommit_V2() { }
|
||||
|
||||
template <typename Ar> Ar& serialize(Ar &ar) {
|
||||
if (!ar.isDeserializing)
|
||||
return serializer(Event::serialize(ar), latency, numMutations, commitBytes, commitVersion, req.transaction, req.arena);
|
||||
else
|
||||
return serializer(ar, latency, numMutations, commitBytes, commitVersion, req.transaction, req.arena);
|
||||
}
|
||||
|
||||
double latency;
|
||||
int numMutations;
|
||||
int commitBytes;
|
||||
Version commitVersion;
|
||||
CommitTransactionRequest req; // Only CommitTransactionRef and Arena object within CommitTransactionRequest is serialized
|
||||
|
||||
void logEvent(std::string id, int maxFieldLength) const {
|
||||
for (auto &read_range : req.transaction.read_conflict_ranges) {
|
||||
TraceEvent("TransactionTrace_Commit_ReadConflictRange")
|
||||
.setMaxEventLength(-1)
|
||||
.detail("TransactionID", id)
|
||||
.setMaxFieldLength(maxFieldLength)
|
||||
.detail("Begin", read_range.begin)
|
||||
.detail("End", read_range.end);
|
||||
}
|
||||
|
||||
for (auto &write_range : req.transaction.write_conflict_ranges) {
|
||||
TraceEvent("TransactionTrace_Commit_WriteConflictRange")
|
||||
.setMaxEventLength(-1)
|
||||
.detail("TransactionID", id)
|
||||
.setMaxFieldLength(maxFieldLength)
|
||||
.detail("Begin", write_range.begin)
|
||||
.detail("End", write_range.end);
|
||||
}
|
||||
|
||||
for (auto &mutation : req.transaction.mutations) {
|
||||
TraceEvent("TransactionTrace_Commit_Mutation")
|
||||
.setMaxEventLength(-1)
|
||||
.detail("TransactionID", id)
|
||||
.setMaxFieldLength(maxFieldLength)
|
||||
.detail("Mutation", mutation.toString());
|
||||
}
|
||||
|
||||
TraceEvent("TransactionTrace_Commit")
|
||||
.detail("TransactionID", id)
|
||||
.detail("CommitVersion", commitVersion)
|
||||
.detail("Latency", latency)
|
||||
.detail("NumMutations", numMutations)
|
||||
.detail("CommitSizeBytes", commitBytes);
|
||||
}
|
||||
};
|
||||
|
||||
struct EventGetError : public Event {
|
||||
EventGetError(double ts, int err_code, const KeyRef &in_key) : Event(ERROR_GET, ts), errCode(err_code), key(in_key) { }
|
||||
EventGetError() { }
|
||||
|
|
|
@ -2786,7 +2786,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
cx->commitLatencies.addSample(latency);
|
||||
cx->latencies.addSample(now() - tr->startTime);
|
||||
if (trLogInfo)
|
||||
trLogInfo->addLog(FdbClientLogEvents::EventCommit(startTime, latency, req.transaction.mutations.size(), req.transaction.mutations.expectedSize(), req));
|
||||
trLogInfo->addLog(FdbClientLogEvents::EventCommit_V2(startTime, latency, req.transaction.mutations.size(), req.transaction.mutations.expectedSize(), ci.version, req));
|
||||
return Void();
|
||||
} else {
|
||||
// clear the RYW transaction which contains previous conflicting keys
|
||||
|
@ -3224,7 +3224,7 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Re
|
|||
double latency = now() - startTime;
|
||||
cx->GRVLatencies.addSample(latency);
|
||||
if (trLogInfo)
|
||||
trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V2(startTime, latency, flags & GetReadVersionRequest::FLAG_PRIORITY_MASK));
|
||||
trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V3(startTime, latency, flags & GetReadVersionRequest::FLAG_PRIORITY_MASK, rep.version));
|
||||
if (rep.version == 1 && rep.locked) {
|
||||
throw proxy_memory_limit_exceeded();
|
||||
}
|
||||
|
|
|
@ -39,6 +39,13 @@ namespace ClientLogEventsParser {
|
|||
ASSERT(gv.priorityType >= 0 && gv.priorityType < FdbClientLogEvents::PRIORITY_END);
|
||||
}
|
||||
|
||||
void parseEventGetVersion_V3(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGetVersion_V3 gv;
|
||||
reader >> gv;
|
||||
ASSERT(gv.latency < 10000);
|
||||
ASSERT(gv.priorityType >= 0 && gv.priorityType < FdbClientLogEvents::PRIORITY_END && gv.readVersion > 0);
|
||||
}
|
||||
|
||||
void parseEventGet(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGet g;
|
||||
reader >> g;
|
||||
|
@ -57,6 +64,12 @@ namespace ClientLogEventsParser {
|
|||
ASSERT(c.latency < 10000 && c.commitBytes < CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT && c.numMutations < 1000000);
|
||||
}
|
||||
|
||||
void parseEventCommit_V2(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventCommit_V2 c;
|
||||
reader >> c;
|
||||
ASSERT(c.latency < 10000 && c.commitBytes < CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT && c.numMutations < 1000000 && c.commitVersion > 0);
|
||||
}
|
||||
|
||||
void parseEventErrorGet(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGetError ge;
|
||||
reader >> ge;
|
||||
|
@ -94,10 +107,19 @@ namespace ClientLogEventsParser {
|
|||
Parser_V2() { parseGetVersion = parseEventGetVersion_V2; }
|
||||
virtual ~Parser_V2() override {}
|
||||
};
|
||||
struct Parser_V3 : ParserBase {
|
||||
Parser_V3() {
|
||||
parseGetVersion = parseEventGetVersion_V3;
|
||||
parseCommit = parseEventCommit_V2;
|
||||
}
|
||||
virtual ~Parser_V3() override {}
|
||||
};
|
||||
|
||||
struct ParserFactory {
|
||||
static std::unique_ptr<ParserBase> getParser(ProtocolVersion version) {
|
||||
if(version.version() >= (uint64_t) 0x0FDB00B062000001LL) {
|
||||
if(version.version() >= (uint64_t) 0x0FDB00B063010001LL) {
|
||||
return std::unique_ptr<ParserBase>(new Parser_V3());
|
||||
} else if(version.version() >= (uint64_t) 0x0FDB00B062000001LL) {
|
||||
return std::unique_ptr<ParserBase>(new Parser_V2());
|
||||
} else {
|
||||
return std::unique_ptr<ParserBase>(new Parser_V1());
|
||||
|
|
Loading…
Reference in New Issue