Fix a version related issue.

This commit is contained in:
negoyal 2020-05-07 12:33:14 -07:00
parent f4d30f8dce
commit 90cf28d341
1 changed files with 59 additions and 60 deletions

View File

@ -170,6 +170,7 @@ public:
// The following are in rough order from newest to oldest
// TODO double check which ones we need for storageCache servers
Version lastTLogVersion, lastVersionWithData;
Version peekVersion; // version to peek the log at
NotifiedVersion version; // current version i.e. the max version that can be read from the cache
NotifiedVersion desiredOldestVersion; // oldestVersion can be increased to this after compaction
NotifiedVersion oldestVersion; // Min version that might be read from the cache
@ -234,7 +235,7 @@ public:
: versionedData(FastAllocPTree<KeyRef>{std::make_shared<int>(0)}),
thisServerID(thisServerID), index(index), logProtocol(0), db(db),
cacheRangeChangeCounter(0),
lastTLogVersion(0), lastVersionWithData(0),
lastTLogVersion(0), lastVersionWithData(0), peekVersion(0),
compactionInProgress(Void()),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
debug_inApplyUpdate(false), debug_lastValidateTime(0),
@ -1725,8 +1726,7 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
}
when( wait( dbInfoChange ) ) {
if( data->logSystem ) {
TraceEvent(SevDebug, "PullAsyncData", data->thisServerID).detail("DataVersion", data->version.get());
cursor = data->logSystem->peekSingle( data->thisServerID, data->version.get() + 1, cacheTag, std::vector<std::pair<Version,Tag>>()) ;
cursor = data->logSystem->peekSingle( data->thisServerID, data->peekVersion, cacheTag, std::vector<std::pair<Version,Tag>>()) ;
} else
cursor = Reference<ILogSystem::IPeekCursor>();
dbInfoChange = data->db->onChange();
@ -1841,7 +1841,6 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
else {
MutationRef msg;
reader >> msg;
//fprintf(stderr, "%lld : %s\n", cursor->version().version, msg.toString().c_str());
if (ver != invalidVersion) // This change belongs to a version < minVersion
{
@ -1872,8 +1871,10 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
break;
}
}
else
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cursor->version().toString());
else {
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("CursorVersion", cloneCursor2->version().version).
detail("DataVersion", data->version.get());
}
tagAt = cursor->version().version + 1;
}
@ -1901,6 +1902,7 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
//data->noRecentUpdates.set(false);
//data->lastUpdate = now();
data->version.set( ver ); // Triggers replies to waiting gets for new version(s)
data->peekVersion = ver + 1;
// TODO double check
//setDataVersion(data->thisServerID, data->version.get());
@ -1940,6 +1942,55 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
}
}
// Fetch metadata mutation from the database to establish cache ranges and apply them
ACTOR Future<Void> storageCacheStartUpWarmup(StorageCacheData* self) {
state Transaction tr(self->cx);
state Value trueValue = storageCacheValue(std::vector<uint16_t>{ 0 });
state Value falseValue = storageCacheValue(std::vector<uint16_t>{});
state MutationRef privatized;
state Version readVersion;
try {
loop {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
Standalone<RangeResultRef> range = wait(tr.getRange(storageCacheKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!range.more);
readVersion = tr.getReadVersion().get();
bool currCached = false;
KeyRef begin, end;
for (const auto& kv : range) {
// These booleans have to flip consistently
ASSERT(currCached == (kv.value == falseValue));
if (kv.value == trueValue) {
begin = kv.key;
privatized.param1 = begin.withPrefix(systemKeys.begin);
privatized.param2 = serverKeysTrue;
//TraceEvent(SevDebug, "SCStartupFetch", self->thisServerID).detail("BeginKey", begin.substr(storageCacheKeys.begin.size()));
applyMutation(self->updater, self, privatized, readVersion);
currCached = true;
} else {
currCached = false;
end = kv.key;
privatized.param1 = begin.withPrefix(systemKeys.begin);
privatized.param2 = serverKeysFalse;
//TraceEvent(SevDebug, "SCStartupFetch", self->thisServerID).detail("EndKey", end.substr(storageCacheKeys.begin.size()));
applyMutation(self->updater, self, privatized, readVersion);
}
}
self->peekVersion = readVersion + 1;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
} catch (Error& e) {
TraceEvent(SevError, "SCStartUpFailed").error(e);
throw;
}
return Void();
}
ACTOR Future<Void> watchInterface(StorageCacheData* self, StorageServerInterface ssi) {
state Transaction tr(self->cx);
state Key storageKey = storageCacheServerKey(ssi.id());
@ -1972,11 +2023,6 @@ ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi, uint16_t id, R
state Future<Void> dbInfoChange = Void();
state StorageCacheUpdater updater(self.lastVersionWithData);
self.updater = &updater;
state Transaction tr(self.cx);
state Value trueValue = storageCacheValue(std::vector<uint16_t>{ 0 });
state Value falseValue = storageCacheValue(std::vector<uint16_t>{});
state MutationRef privatized;
state Version readVersion;
//TraceEvent("StorageCache_CacheServerInterface", self.thisServerID).detail("UID", ssi.uniqueID).detail("IsCacheServer", ssi.isCacheServer);
@ -1986,55 +2032,8 @@ ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi, uint16_t id, R
actors.add(waitFailureServer(ssi.waitFailure.getFuture()));
actors.add(traceCounters("CacheMetrics", self.thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self.counters.cc, self.thisServerID.toString() + "/CacheMetrics"));
TraceEvent(SevDebug, "SCStartUpPostWarmup", self.thisServerID).detail("DataVersionBefore", self.version.get());
// Fetch metadata mutation from the database to establish cache ranges and apply them
try {
loop {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
Standalone<RangeResultRef> range = wait(tr.getRange(storageCacheKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!range.more);
readVersion = tr.getReadVersion().get();
bool currCached = false;
KeyRef begin, end;
for (const auto& kv : range) {
// These booleans have to flip consistently
ASSERT(currCached == (kv.value == falseValue));
if (kv.value == trueValue) {
//begin = kv.key.substr(storageCacheKeys.begin.size());
begin = kv.key;
privatized.param1 = begin.withPrefix(systemKeys.begin);
privatized.param2 = serverKeysTrue;
TraceEvent(SevDebug, "SCStartupFetch", self.thisServerID).detail("BeginKey", begin.substr(storageCacheKeys.begin.size()));
applyMutation(self.updater, &self, privatized, readVersion);
currCached = true;
} else {
currCached = false;
//end = kv.key.substr(storageCacheKeys.begin.size());
end = kv.key;
privatized.param1 = begin.withPrefix(systemKeys.begin);
privatized.param2 = serverKeysFalse;
TraceEvent(SevDebug, "SCStartupFetch", self.thisServerID).detail("EndKey", end.substr(storageCacheKeys.begin.size()));
applyMutation(self.updater, &self, privatized, readVersion);
//KeyRangeRef cachedRange{begin, end};
//TraceEvent(SevDebug, "SCStartupWarmup", self.thisServerID).detail("BeginKey", begin).detail("EndKey", end);
//cacheWarmup(&self, cachedRange, TRUE, updater.currentVersion-1);
}
}
// FIXME: Some tests start failing when I set the version as below. Commenting out makes the tests pass. Ideas?
self.version.set(readVersion);
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
} catch (Error& e) {
TraceEvent(SevError, "SCFetchCachedRangesFailed").error(e);
throw;
}
TraceEvent(SevDebug, "SCStartUpPostWarmup", self.thisServerID).detail("DataVersionAfter", self.version.get());
// fetch already cached ranges from the database and apply them before proceeding
wait( storageCacheStartUpWarmup(&self) );
//compactCache actor will periodically compact the cache when certain version condition is met
actors.add(compactCache(&self));