Fix a corruption bug.

This commit is contained in:
negoyal 2020-03-13 18:52:34 -07:00
parent 9354fca2e4
commit 99a5cb0572
7 changed files with 49 additions and 17 deletions

View File

@ -98,7 +98,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
return runAfter(loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model), return runAfter(loadBalance(alternatives->locations(), channel, request, taskID, atMostOnce, model),
[ctx](auto res) { [ctx](auto res) {
if (res.cached) { if (res.cached) {
TraceEvent(SevDebug, "NativeCientReqCached"); //TraceEvent(SevDebug, "NativeCientReqCached");
ctx->updateCache.trigger(); ctx->updateCache.trigger();
} }
return res; return res;
@ -644,9 +644,8 @@ ACTOR Future<Void> monitorCacheList(DatabaseContext* self) {
std::insert_iterator<std::map<UID, StorageServerInterface>>( std::insert_iterator<std::map<UID, StorageServerInterface>>(
deletedCacheServers, deletedCacheServers.begin())); deletedCacheServers, deletedCacheServers.begin()));
hasChanges = !(newCacheServers.empty() && deletedCacheServers.empty()); hasChanges = !(newCacheServers.empty() && deletedCacheServers.empty());
TraceEvent(SevDebug, "MonitorCacheList").detail("AllCacheServers",allCacheServers.size()) //TraceEvent(SevDebug, "MonitorCacheList").detail("AllCacheServers",allCacheServers.size())
.detail("NewCacheServers",newCacheServers.size()) //.detail("NewCacheServers",newCacheServers.size()).detail("OldCacheServers",cacheServerMap.size());
.detail("OldCacheServers",cacheServerMap.size());
if (hasChanges) { if (hasChanges) {
updateLocationCacheWithCaches(self, deletedCacheServers, newCacheServers); updateLocationCacheWithCaches(self, deletedCacheServers, newCacheServers);
} }

View File

@ -1129,6 +1129,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
options(tr), deferredError(cx->deferredError) { options(tr), deferredError(cx->deferredError) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
std::back_inserter(persistentOptions)); std::back_inserter(persistentOptions));
//debugTransaction( deterministicRandom()->randomUniqueID() );
applyPersistentOptions(); applyPersistentOptions();
} }

View File

@ -768,7 +768,7 @@ public:
void insert(const K& k, const T& t) { void insert(const K& k, const T& t) {
insert( k, t, latestVersion ); insert( k, t, latestVersion );
} }
void insert(const K& k, const T& t, Version insertAt) {[[]] void insert(const K& k, const T& t, Version insertAt) {
if (PTreeImpl::contains(roots.back().second, latestVersion, k )) PTreeImpl::remove( roots.back().second, latestVersion, k, allocator); // FIXME: Make PTreeImpl::insert do this automatically (see also WriteMap.h FIXME) if (PTreeImpl::contains(roots.back().second, latestVersion, k )) PTreeImpl::remove( roots.back().second, latestVersion, k, allocator); // FIXME: Make PTreeImpl::insert do this automatically (see also WriteMap.h FIXME)
PTreeImpl::insert( roots.back().second, latestVersion, MapPair<K,std::pair<T,Version>>(k,std::make_pair(t,insertAt)), allocator); PTreeImpl::insert( roots.back().second, latestVersion, MapPair<K,std::pair<T,Version>>(k,std::make_pair(t,insertAt)), allocator);
} }

View File

@ -273,6 +273,7 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
allTags.insert(decodeServerTagValue(kv.value)); allTags.insert(decodeServerTagValue(kv.value));
} }
} }
allTags.insert(cacheTag);
if (m.param1 == lastEpochEndKey) { if (m.param1 == lastEpochEndKey) {
toCommit->addTags(allTags); toCommit->addTags(allTags);
@ -283,6 +284,11 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTags(allTags); toCommit->addTags(allTags);
toCommit->addTypedMessage(privatized); toCommit->addTypedMessage(privatized);
//if (m.param1 == lastEpochEndKey) {
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
//toCommit->addTag( cacheTag );
//toCommit->addTypedMessage(privatized);
//}
} }
} }
else if (m.param1 == minRequiredCommitVersionKey) { else if (m.param1 == minRequiredCommitVersionKey) {

View File

@ -493,7 +493,7 @@ ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
++data->counters.rowsQueried; ++data->counters.rowsQueried;
resultSize = v.get().size(); resultSize = v.get().size();
data->counters.bytesQueried += resultSize; data->counters.bytesQueried += resultSize;
//TraceEvent(SevDebug, "SCGetValueQPresent", data->thisServerID).detail("ResultSize",resultSize).detail("Version", version).detail("ReqKey",req.key); //TraceEvent(SevDebug, "SCGetValueQPresent", data->thisServerID).detail("ResultSize",resultSize).detail("Version", version).detail("ReqKey",req.key).detail("Value",v);
} }
if( req.debugID.present() ) if( req.debugID.present() )
@ -911,20 +911,36 @@ void StorageCacheData::applyMutation( MutationRef const& m, Arena& arena, Storag
// TODO double check if the insert version of the previous clear needs to be preserved for the "left half", // TODO double check if the insert version of the previous clear needs to be preserved for the "left half",
// insert() invalidates prev, so prev.key() is not safe to pass to it by reference // insert() invalidates prev, so prev.key() is not safe to pass to it by reference
data.insert( KeyRef(prev.key()), ValueOrClearToRef::clearTo( m.param1 ), prev.insertVersion() ); // overwritten by below insert if empty data.insert( KeyRef(prev.key()), ValueOrClearToRef::clearTo( m.param1 ), prev.insertVersion() ); // overwritten by below insert if empty
//TraceEvent(SevDebug, "ApplyMutationClearTo")
//.detail("Key1", prev.key())
//.detail("Key2",m.param1)
//.detail("Version1", prev.insertVersion());
KeyRef nextKey = keyAfter(m.param1, arena); KeyRef nextKey = keyAfter(m.param1, arena);
if ( end != nextKey ) { if ( end != nextKey ) {
ASSERT( end > nextKey ); ASSERT( end > nextKey );
// TODO double check if it's okay to let go of the the insert version of the "right half" // TODO double check if it's okay to let go of the the insert version of the "right half"
// FIXME: This copy is technically an asymptotic problem, definitely a waste of memory (copy of keyAfter is a waste, but not asymptotic) // FIXME: This copy is technically an asymptotic problem, definitely a waste of memory (copy of keyAfter is a waste, but not asymptotic)
data.insert( nextKey, ValueOrClearToRef::clearTo( KeyRef(arena, end) ) ); data.insert( nextKey, ValueOrClearToRef::clearTo( KeyRef(arena, end) ) );
//TraceEvent(SevDebug, "ApplyMutationClearTo2")
//.detail("K1", nextKey)
//.detail("K2", end)
//.detail("V", data.latestVersion);
} }
} }
data.insert( m.param1, ValueOrClearToRef::value(m.param2) ); data.insert( m.param1, ValueOrClearToRef::value(m.param2) );
//TraceEvent(SevDebug, "ApplyMutation")
// .detail("Key", m.param1)
// .detail("Value",m.param2)
// .detail("Version", data.latestVersion);
} else if (m.type == MutationRef::ClearRange) { } else if (m.type == MutationRef::ClearRange) {
data.erase( m.param1, m.param2 ); data.erase( m.param1, m.param2 );
ASSERT( m.param2 > m.param1 ); ASSERT( m.param2 > m.param1 );
ASSERT( !data.isClearContaining( data.atLatest(), m.param1 ) ); ASSERT( !data.isClearContaining( data.atLatest(), m.param1 ) );
data.insert( m.param1, ValueOrClearToRef::clearTo(m.param2) ); data.insert( m.param1, ValueOrClearToRef::clearTo(m.param2) );
//TraceEvent(SevDebug, "ApplyMutationClearTo3")
// .detail("Key21", m.param1)
// .detail("Key22", m.param2)
// .detail("V2", data.latestVersion);
} }
} }
@ -1616,8 +1632,8 @@ private:
rollback( data, rollbackVersion, currentVersion ); rollback( data, rollbackVersion, currentVersion );
} }
} else { } else {
fprintf(stderr, "SCPrivateCacheMutation: Unknown private mutation\n"); TraceEvent(SevWarn, "SCPrivateCacheMutation: Unknown private mutation");
ASSERT(false); // Unknown private mutation //ASSERT(false); // Unknown private mutation
} }
} }
}; };
@ -1703,6 +1719,7 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
loop{ loop{
state uint64_t changeCounter = data->cacheRangeChangeCounter; state uint64_t changeCounter = data->cacheRangeChangeCounter;
bool epochEnd = false; bool epochEnd = false;
bool hasPrivateData = false;
bool firstMutation = true; bool firstMutation = true;
bool dbgLastMessageWasProtocol = false; bool dbgLastMessageWasProtocol = false;
@ -1725,20 +1742,22 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
MutationRef msg; MutationRef msg;
cloneReader >> msg; cloneReader >> msg;
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
firstMutation = false;
if (msg.param1 == lastEpochEndPrivateKey) { if (msg.param1 == lastEpochEndPrivateKey) {
epochEnd = true; epochEnd = true;
ASSERT(firstMutation); //ASSERT(firstMutation);
ASSERT(dbgLastMessageWasProtocol); ASSERT(dbgLastMessageWasProtocol);
} }
firstMutation = false;
dbgLastMessageWasProtocol = false; dbgLastMessageWasProtocol = false;
} }
} }
// Any fetchKeys which are ready to transition their cacheRanges to the adding,transferred state do so now. // Any fetchKeys which are ready to transition their cacheRanges to the adding,transferred state do so now.
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in the middle of a rolled back version range. // If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in the middle of a rolled back version range.
while(!epochEnd && !data->readyFetchKeys.empty()) { while(!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back(); auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back(); data->readyFetchKeys.pop_back();
fk.send( &fii ); fk.send( &fii );

View File

@ -884,6 +884,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
++data->counters.rowsQueried; ++data->counters.rowsQueried;
resultSize = v.get().size(); resultSize = v.get().size();
data->counters.bytesQueried += resultSize; data->counters.bytesQueried += resultSize;
//TraceEvent(SevDebug, "SSGetValueQPresent", data->thisServerID).detail("ResultSize",resultSize).detail("Version", version).detail("ReqKey",req.key).detail("Value",v);
} }
else { else {
++data->counters.emptyQueries; ++data->counters.emptyQueries;
@ -1768,6 +1769,7 @@ void applyMutation( StorageServer *self, MutationRef const& m, Arena& arena, Sto
self->metrics.notify(m.param1, metrics); self->metrics.notify(m.param1, metrics);
if (m.type == MutationRef::SetValue) { if (m.type == MutationRef::SetValue) {
//TraceEvent("ApplyMutation", self->thisServerID).detail("Mutation", m.toString()).detail("Version", data.latestVersion);
auto prev = data.atLatest().lastLessOrEqual(m.param1); auto prev = data.atLatest().lastLessOrEqual(m.param1);
if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) { if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) {
ASSERT( prev.key() <= m.param1 ); ASSERT( prev.key() <= m.param1 );

View File

@ -115,9 +115,9 @@ struct CycleWorkload : TestWorkload {
tr.set( self->key(r), self->value(r3) ); tr.set( self->key(r), self->value(r3) );
tr.set( self->key(r2), self->value(r4) ); tr.set( self->key(r2), self->value(r4) );
tr.set( self->key(r3), self->value(r2) ); tr.set( self->key(r3), self->value(r2) );
// TraceEvent("CyclicTestMX").detail("Key", self->key(r).toString()).detail("Value", self->value(r3).toString()); //TraceEvent("CyclicTestMX1").detail("Key", self->key(r).toString()).detail("Value", self->value(r3).toString());
// TraceEvent("CyclicTestMX").detail("Key", self->key(r2).toString()).detail("Value", self->value(r4).toString()); //TraceEvent("CyclicTestMX2").detail("Key", self->key(r2).toString()).detail("Value", self->value(r4).toString());
// TraceEvent("CyclicTestMX").detail("Key", self->key(r3).toString()).detail("Value", self->value(r2).toString()); //TraceEvent("CyclicTestMX3").detail("Key", self->key(r3).toString()).detail("Value", self->value(r2).toString());
wait( tr.commit() ); wait( tr.commit() );
// TraceEvent("CycleCommit"); // TraceEvent("CycleCommit");
@ -161,7 +161,10 @@ struct CycleWorkload : TestWorkload {
return false; return false;
} }
int i=0; int i=0;
for(int c=0; c<nodeCount; c++) { int iPrev=0;
double d;
int c;
for(c=0; c<nodeCount; c++) {
if (c && !i) { if (c && !i) {
TraceEvent(SevError, "TestFailure").detail("Reason", "Cycle got shorter").detail("Before", nodeCount).detail("After", c).detail("KeyPrefix", keyPrefix.printable()); TraceEvent(SevError, "TestFailure").detail("Reason", "Cycle got shorter").detail("Before", nodeCount).detail("After", c).detail("KeyPrefix", keyPrefix.printable());
logTestData(data); logTestData(data);
@ -172,7 +175,8 @@ struct CycleWorkload : TestWorkload {
logTestData(data); logTestData(data);
return false; return false;
} }
double d = testKeyToDouble(data[i].value, keyPrefix); d = testKeyToDouble(data[i].value, keyPrefix);
iPrev = i;
i = (int)d; i = (int)d;
if ( i != d || i<0 || i>=nodeCount) { if ( i != d || i<0 || i>=nodeCount) {
TraceEvent(SevError, "TestFailure").detail("Reason", "Invalid value").detail("KeyPrefix", keyPrefix.printable()); TraceEvent(SevError, "TestFailure").detail("Reason", "Invalid value").detail("KeyPrefix", keyPrefix.printable());
@ -181,7 +185,8 @@ struct CycleWorkload : TestWorkload {
} }
} }
if (i != 0) { if (i != 0) {
TraceEvent(SevError, "TestFailure").detail("Reason", "Cycle got longer").detail("KeyPrefix", keyPrefix.printable()); TraceEvent(SevError, "TestFailure").detail("Reason", "Cycle got longer").detail("KeyPrefix", keyPrefix.printable()).detail("Key", key(i)).detail("Value", data[i].value).
detail("Iteration", c).detail("Nodecount", nodeCount).detail("Int", i).detail("Double", d).detail("ValuePrev", data[iPrev].value).detail("KeyPrev", data[iPrev].key);
logTestData(data); logTestData(data);
return false; return false;
} }