Merge pull request #1871 from bnamasivayam/tr-priority-add-client-log
Track the priority of sampled Transaction as part of GetReadVersion e…
This commit is contained in:
commit
e29a6ea280
|
@ -35,6 +35,14 @@ namespace FdbClientLogEvents {
|
|||
EVENTTYPEEND // End of EventType
|
||||
};
|
||||
|
||||
typedef int TrasactionPriorityType;
|
||||
enum {
|
||||
PRIORITY_DEFAULT = 0,
|
||||
PRIORITY_BATCH = 1,
|
||||
PRIORITY_IMMEDIATE = 2,
|
||||
PRIORITY_END
|
||||
};
|
||||
|
||||
struct Event {
|
||||
Event(EventType t, double ts) : type(t), startTs(ts) { }
|
||||
Event() { }
|
||||
|
@ -67,6 +75,39 @@ namespace FdbClientLogEvents {
|
|||
}
|
||||
};
|
||||
|
||||
// Version V2 of EventGetVersion starting at 6.2
|
||||
struct EventGetVersion_V2 : public Event {
|
||||
EventGetVersion_V2(double ts, double lat, uint32_t type) : Event(GET_VERSION_LATENCY, ts), latency(lat) {
|
||||
if(type == GetReadVersionRequest::PRIORITY_DEFAULT) {
|
||||
priorityType = PRIORITY_DEFAULT;
|
||||
} else if (type == GetReadVersionRequest::PRIORITY_BATCH) {
|
||||
priorityType = PRIORITY_BATCH;
|
||||
} else if (type == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE){
|
||||
priorityType = PRIORITY_IMMEDIATE;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
EventGetVersion_V2() { }
|
||||
|
||||
template <typename Ar> Ar& serialize(Ar &ar) {
|
||||
if (!ar.isDeserializing)
|
||||
return serializer(Event::serialize(ar), latency, priorityType);
|
||||
else
|
||||
return serializer(ar, latency, priorityType);
|
||||
}
|
||||
|
||||
double latency;
|
||||
TrasactionPriorityType priorityType {PRIORITY_END};
|
||||
|
||||
void logEvent(std::string id, int maxFieldLength) const {
|
||||
TraceEvent("TransactionTrace_GetVersion")
|
||||
.detail("TransactionID", id)
|
||||
.detail("Latency", latency)
|
||||
.detail("PriorityType", priorityType);
|
||||
}
|
||||
};
|
||||
|
||||
struct EventGet : public Event {
|
||||
EventGet(double ts, double lat, int size, const KeyRef &in_key) : Event(GET_LATENCY, ts), latency(lat), valueSize(size), key(in_key) { }
|
||||
EventGet() { }
|
||||
|
|
|
@ -3150,12 +3150,12 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion) {
|
||||
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion) {
|
||||
GetReadVersionReply rep = wait(f);
|
||||
double latency = now() - startTime;
|
||||
cx->GRVLatencies.addSample(latency);
|
||||
if (trLogInfo)
|
||||
trLogInfo->addLog(FdbClientLogEvents::EventGetVersion(startTime, latency));
|
||||
trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V2(startTime, latency, flags & GetReadVersionRequest::FLAG_PRIORITY_MASK));
|
||||
if(rep.locked && !lockAware)
|
||||
throw database_locked();
|
||||
|
||||
|
@ -3180,7 +3180,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
|||
Promise<GetReadVersionReply> p;
|
||||
batcher.stream.send( std::make_pair( p, info.debugID ) );
|
||||
startTime = now();
|
||||
readVersion = extractReadVersion( cx.getPtr(), trLogInfo, p.getFuture(), options.lockAware, startTime, metadataVersion);
|
||||
readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, p.getFuture(), options.lockAware, startTime, metadataVersion);
|
||||
}
|
||||
return readVersion;
|
||||
}
|
||||
|
|
|
@ -24,12 +24,95 @@ static const auto trIdStartIndex = sampleTrInfoKey.toString().find('R');
|
|||
static const int trIdFormatSize = 16;
|
||||
|
||||
|
||||
namespace ClientLogEventsParser {
|
||||
|
||||
void parseEventGetVersion(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGetVersion gv;
|
||||
reader >> gv;
|
||||
ASSERT(gv.latency < 10000);
|
||||
}
|
||||
|
||||
void parseEventGetVersion_V2(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGetVersion_V2 gv;
|
||||
reader >> gv;
|
||||
ASSERT(gv.latency < 10000);
|
||||
ASSERT(gv.priorityType >= 0 && gv.priorityType < FdbClientLogEvents::PRIORITY_END);
|
||||
}
|
||||
|
||||
void parseEventGet(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGet g;
|
||||
reader >> g;
|
||||
ASSERT(g.latency < 10000 && g.valueSize < CLIENT_KNOBS->VALUE_SIZE_LIMIT && g.key.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
}
|
||||
|
||||
void parseEventGetRange(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGetRange gr;
|
||||
reader >> gr;
|
||||
ASSERT(gr.latency < 10000 && gr.rangeSize < 1000000000 && gr.startKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT && gr.endKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
}
|
||||
|
||||
void parseEventCommit(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventCommit c;
|
||||
reader >> c;
|
||||
ASSERT(c.latency < 10000 && c.commitBytes < CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT && c.numMutations < 1000000);
|
||||
}
|
||||
|
||||
void parseEventErrorGet(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGetError ge;
|
||||
reader >> ge;
|
||||
ASSERT(ge.errCode < 10000 && ge.key.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
}
|
||||
|
||||
void parseEventErrorGetRange(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventGetRangeError gre;
|
||||
reader >> gre;
|
||||
ASSERT(gre.errCode < 10000 && gre.startKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT && gre.endKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
}
|
||||
|
||||
void parseEventErrorCommit(BinaryReader &reader) {
|
||||
FdbClientLogEvents::EventCommitError ce;
|
||||
reader >> ce;
|
||||
ASSERT(ce.errCode < 10000);
|
||||
}
|
||||
|
||||
struct ParserBase {
|
||||
std::function<void (BinaryReader &)> parseGetVersion = parseEventGetVersion;
|
||||
std::function<void (BinaryReader &)> parseGet = parseEventGet;
|
||||
std::function<void (BinaryReader &)> parseGetRange = parseEventGetRange;
|
||||
std::function<void (BinaryReader &)> parseCommit = parseEventCommit;
|
||||
std::function<void (BinaryReader &)> parseErrorGet = parseEventErrorGet;
|
||||
std::function<void (BinaryReader &)> parseErrorGetRange = parseEventErrorGetRange;
|
||||
std::function<void (BinaryReader &)> parseErrorCommit = parseEventErrorCommit;
|
||||
virtual ~ParserBase() = 0;
|
||||
};
|
||||
ParserBase::~ParserBase() {}
|
||||
|
||||
struct Parser_V1 : ParserBase {
|
||||
virtual ~Parser_V1() override {}
|
||||
};
|
||||
struct Parser_V2 : ParserBase {
|
||||
Parser_V2() { parseGetVersion = parseEventGetVersion_V2; }
|
||||
virtual ~Parser_V2() override {}
|
||||
};
|
||||
|
||||
struct ParserFactory {
|
||||
static std::unique_ptr<ParserBase> getParser(ProtocolVersion version) {
|
||||
if(version.version() >= (uint64_t) 0x0FDB00B062000001LL) {
|
||||
return std::unique_ptr<ParserBase>(new Parser_V2());
|
||||
} else {
|
||||
return std::unique_ptr<ParserBase>(new Parser_V1());
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
// Checks TransactionInfo format
|
||||
bool checkTxInfoEntryFormat(BinaryReader &reader) {
|
||||
// Check protocol version
|
||||
ProtocolVersion protocolVersion;
|
||||
reader >> protocolVersion;
|
||||
reader.setProtocolVersion(protocolVersion);
|
||||
std::unique_ptr<ClientLogEventsParser::ParserBase> parser = ClientLogEventsParser::ParserFactory::getParser(protocolVersion);
|
||||
|
||||
while (!reader.empty()) {
|
||||
// Get EventType and timestamp
|
||||
|
@ -40,54 +123,26 @@ bool checkTxInfoEntryFormat(BinaryReader &reader) {
|
|||
switch (event)
|
||||
{
|
||||
case FdbClientLogEvents::GET_VERSION_LATENCY:
|
||||
{
|
||||
FdbClientLogEvents::EventGetVersion gv;
|
||||
reader >> gv;
|
||||
ASSERT(gv.latency < 10000);
|
||||
parser->parseGetVersion(reader);
|
||||
break;
|
||||
}
|
||||
case FdbClientLogEvents::GET_LATENCY:
|
||||
{
|
||||
FdbClientLogEvents::EventGet g;
|
||||
reader >> g;
|
||||
ASSERT(g.latency < 10000 && g.valueSize < CLIENT_KNOBS->VALUE_SIZE_LIMIT && g.key.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
parser->parseGet(reader);
|
||||
break;
|
||||
}
|
||||
case FdbClientLogEvents::GET_RANGE_LATENCY:
|
||||
{
|
||||
FdbClientLogEvents::EventGetRange gr;
|
||||
reader >> gr;
|
||||
ASSERT(gr.latency < 10000 && gr.rangeSize < 1000000000 && gr.startKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT && gr.endKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
parser->parseGetRange(reader);
|
||||
break;
|
||||
}
|
||||
case FdbClientLogEvents::COMMIT_LATENCY:
|
||||
{
|
||||
FdbClientLogEvents::EventCommit c;
|
||||
reader >> c;
|
||||
ASSERT(c.latency < 10000 && c.commitBytes < CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT && c.numMutations < 1000000);
|
||||
parser->parseCommit(reader);
|
||||
break;
|
||||
}
|
||||
case FdbClientLogEvents::ERROR_GET:
|
||||
{
|
||||
FdbClientLogEvents::EventGetError ge;
|
||||
reader >> ge;
|
||||
ASSERT(ge.errCode < 10000 && ge.key.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
parser->parseErrorGet(reader);
|
||||
break;
|
||||
}
|
||||
case FdbClientLogEvents::ERROR_GET_RANGE:
|
||||
{
|
||||
FdbClientLogEvents::EventGetRangeError gre;
|
||||
reader >> gre;
|
||||
ASSERT(gre.errCode < 10000 && gre.startKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT && gre.endKey.size() < CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
parser->parseErrorGetRange(reader);
|
||||
break;
|
||||
}
|
||||
case FdbClientLogEvents::ERROR_COMMIT:
|
||||
{
|
||||
FdbClientLogEvents::EventCommitError ce;
|
||||
reader >> ce;
|
||||
ASSERT(ce.errCode < 10000);
|
||||
parser->parseErrorCommit(reader);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
TraceEvent(SevError, "ClientTransactionProfilingUnknownEvent").detail("EventType", event);
|
||||
return false;
|
||||
|
@ -104,11 +159,9 @@ struct ClientTransactionProfileCorrectnessWorkload : TestWorkload {
|
|||
ClientTransactionProfileCorrectnessWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx)
|
||||
{
|
||||
if (clientId == 0) {
|
||||
samplingProbability = getOption(options, LiteralStringRef("samplingProbability"), deterministicRandom()->random01() / 10); //rand range 0 - 0.1
|
||||
trInfoSizeLimit = getOption(options, LiteralStringRef("trInfoSizeLimit"), deterministicRandom()->randomInt(100 * 1024, 10 * 1024 * 1024)); // 100 KB - 10 MB
|
||||
TraceEvent(SevInfo, "ClientTransactionProfilingSetup").detail("SamplingProbability", samplingProbability).detail("TrInfoSizeLimit", trInfoSizeLimit);
|
||||
}
|
||||
samplingProbability = getOption(options, LiteralStringRef("samplingProbability"), deterministicRandom()->random01() / 10); //rand range 0 - 0.1
|
||||
trInfoSizeLimit = getOption(options, LiteralStringRef("trInfoSizeLimit"), deterministicRandom()->randomInt(100 * 1024, 10 * 1024 * 1024)); // 100 KB - 10 MB
|
||||
TraceEvent(SevInfo, "ClientTransactionProfilingSetup").detail("ClientId", clientId).detail("SamplingProbability", samplingProbability).detail("TrInfoSizeLimit", trInfoSizeLimit);
|
||||
}
|
||||
|
||||
virtual std::string description() { return "ClientTransactionProfileCorrectness"; }
|
||||
|
|
|
@ -96,7 +96,7 @@ public: // introduced features
|
|||
//
|
||||
// xyzdev
|
||||
// vvvv
|
||||
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B061070001LL);
|
||||
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B062000001LL);
|
||||
// This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to
|
||||
// change when we reach version 10.
|
||||
static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version");
|
||||
|
|
Loading…
Reference in New Issue