AtomicOps:Resolve review comments
This commit is contained in:
parent
96989e0fb6
commit
0ccded1929
|
@ -360,7 +360,7 @@ struct RestoreSendMutationVectorVersionedRequest : TimedRequest {
|
|||
|
||||
std::string toString() {
|
||||
std::stringstream ss;
|
||||
ss << "fileIndex" << fileIndex << " prevVersion:" << prevVersion << " version:" << version
|
||||
ss << "fileIndex:" << fileIndex << " prevVersion:" << prevVersion << " version:" << version
|
||||
<< " isRangeFile:" << isRangeFile << " mutations.size:" << mutations.size();
|
||||
return ss.str();
|
||||
}
|
||||
|
|
|
@ -134,7 +134,7 @@ void handleSetApplierKeyRangeVectorRequest(const RestoreSetApplierKeyRangeVector
|
|||
if (self->rangeToApplier.empty()) {
|
||||
self->rangeToApplier = req.rangeToApplier;
|
||||
} else {
|
||||
ASSERT_WE_THINK(self->rangeToApplier == req.rangeToApplier);
|
||||
ASSERT(self->rangeToApplier == req.rangeToApplier);
|
||||
}
|
||||
req.reply.send(RestoreCommonReply(self->id()));
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
lbsum = 0;
|
||||
ubsum = 0;
|
||||
|
||||
int64_t randNum = sharedRandomNumber / 10;
|
||||
int64_t randNum = sharedRandomNumber / 10;
|
||||
if(opType == -1)
|
||||
opType = randNum % 8;
|
||||
|
||||
|
@ -123,7 +123,6 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
virtual void getMetrics( vector<PerfMetric>& m ) {
|
||||
}
|
||||
|
||||
// Key logKey( int group ) { return StringRef(format("log%08x%08x%08x",group,clientId,opNum++));}
|
||||
std::pair<Key, Key> logDebugKey(int group) {
|
||||
Key logKey(format("log%08x%08x%08x", group, clientId, opNum));
|
||||
Key debugKey(format("debug%08x%08x%08x", group, clientId, opNum));
|
||||
|
@ -207,47 +206,62 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> dumpLogKV(Database cx, int g) {
|
||||
ReadYourWritesTransaction tr(cx);
|
||||
Key begin(format("log%08x", g));
|
||||
Standalone<RangeResultRef> log = wait(tr.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
uint64_t sum = 0;
|
||||
for (auto& kv : log) {
|
||||
uint64_t intValue = 0;
|
||||
memcpy(&intValue, kv.value.begin(), kv.value.size());
|
||||
sum += intValue;
|
||||
TraceEvent("AtomicOpLog")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Val", kv.value)
|
||||
.detail("IntValue", intValue)
|
||||
.detail("CurSum", sum);
|
||||
try {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
Key begin(format("log%08x", g));
|
||||
Standalone<RangeResultRef> log = wait(tr.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
uint64_t sum = 0;
|
||||
for (auto& kv : log) {
|
||||
uint64_t intValue = 0;
|
||||
memcpy(&intValue, kv.value.begin(), kv.value.size());
|
||||
sum += intValue;
|
||||
TraceEvent("AtomicOpLog")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Val", kv.value)
|
||||
.detail("IntValue", intValue)
|
||||
.detail("CurSum", sum);
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("DumpLogKVError").detail("Error", e.what());
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> dumpDebugKV(Database cx, int g) {
|
||||
ReadYourWritesTransaction tr(cx);
|
||||
Key begin(format("debug%08x", g));
|
||||
Standalone<RangeResultRef> log = wait(tr.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
for (auto& kv : log) {
|
||||
TraceEvent("AtomicOpDebug").detail("Key", kv.key).detail("Val", kv.value);
|
||||
try {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
Key begin(format("debug%08x", g));
|
||||
Standalone<RangeResultRef> log = wait(tr.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
for (auto& kv : log) {
|
||||
TraceEvent("AtomicOpDebug").detail("Key", kv.key).detail("Val", kv.value);
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("DumpDebugKVError").detail("Error", e.what());
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> dumpOpsKV(Database cx, int g) {
|
||||
ReadYourWritesTransaction tr(cx);
|
||||
Key begin(format("ops%08x", g));
|
||||
Standalone<RangeResultRef> ops = wait(tr.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
uint64_t sum = 0;
|
||||
for (auto& kv : ops) {
|
||||
uint64_t intValue = 0;
|
||||
memcpy(&intValue, kv.value.begin(), kv.value.size());
|
||||
sum += intValue;
|
||||
TraceEvent("AtomicOpOps")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Val", kv.value)
|
||||
.detail("IntVal", intValue)
|
||||
.detail("CurSum", sum);
|
||||
try {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
Key begin(format("ops%08x", g));
|
||||
Standalone<RangeResultRef> ops = wait(tr.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
uint64_t sum = 0;
|
||||
for (auto& kv : ops) {
|
||||
uint64_t intValue = 0;
|
||||
memcpy(&intValue, kv.value.begin(), kv.value.size());
|
||||
sum += intValue;
|
||||
TraceEvent("AtomicOpOps")
|
||||
.detail("Key", kv.key)
|
||||
.detail("Val", kv.value)
|
||||
.detail("IntVal", intValue)
|
||||
.detail("CurSum", sum);
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("DumpOpsKVError").detail("Error", e.what());
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -259,6 +273,7 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
Key begin(format("debug%08x", g));
|
||||
Standalone<RangeResultRef> debuglog =
|
||||
wait(tr1.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!debuglog.more);
|
||||
for (auto& kv : debuglog) {
|
||||
records[kv.value] = kv.key;
|
||||
}
|
||||
|
@ -268,6 +283,7 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
state std::map<Key, int64_t> logVal; // debugKey, log's value
|
||||
Key begin(format("log%08x", g));
|
||||
Standalone<RangeResultRef> log = wait(tr2.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!log.more);
|
||||
for (auto& kv : log) {
|
||||
uint64_t intValue = 0;
|
||||
memcpy(&intValue, kv.value.begin(), kv.value.size());
|
||||
|
@ -279,6 +295,7 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
state std::map<Key, int64_t> opsVal; // ops key, ops value
|
||||
Key begin(format("ops%08x", g));
|
||||
Standalone<RangeResultRef> ops = wait(tr3.getRange(KeyRangeRef(begin, strinc(begin)), CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!ops.more);
|
||||
// Validate if ops' key value is consistent with logs' key value
|
||||
for (auto& kv : ops) {
|
||||
bool inRecord = records.find(kv.key) != records.end();
|
||||
|
@ -303,11 +320,9 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
|
||||
// Validate if there is any ops key missing
|
||||
for (auto& kv : records) {
|
||||
uint64_t intValue = opsVal[kv.first];
|
||||
if (intValue <= 0) {
|
||||
if (opsVal.find(kv.first) == opsVal.end()) {
|
||||
TraceEvent(SevError, "MissingOpsKey2")
|
||||
.detail("OpsKey", kv.first)
|
||||
.detail("OpsVal", intValue)
|
||||
.detail("DebugKey", kv.second);
|
||||
}
|
||||
}
|
||||
|
@ -376,7 +391,7 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
.detail("OpsResultStr", printable(opsResultStr))
|
||||
.detail("Size", opsResultStr.size())
|
||||
.detail("LowerBoundSum", self->lbsum)
|
||||
.detail("UperBoundSum", self->ubsum);
|
||||
.detail("UpperBoundSum", self->ubsum);
|
||||
wait(self->dumpLogKV(cx, g));
|
||||
wait(self->dumpDebugKV(cx, g));
|
||||
wait(self->dumpOpsKV(cx, g));
|
||||
|
|
Loading…
Reference in New Issue