diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 35d25c6364..ff1785c8e7 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -306,6 +306,10 @@ const UID dataDistributionModeLock = UID(6345,3425); // Client status info prefix const KeyRangeRef fdbClientInfoPrefixRange(LiteralStringRef("\xff\x02/fdbClientInfo/"), LiteralStringRef("\xff\x02/fdbClientInfo0")); +// Keyspace to maintain wall clock to version map +const KeyRangeRef timeKeeperPrefixRange(LiteralStringRef("\xff\x02/timeKeeper/map/"), LiteralStringRef("\xff\x02/timeKeeper/map0")); +const KeyRef timeKeeperVersionKey = LiteralStringRef("\xff\x02/timeKeeper/version"); + // Backup Log Mutation constant variables const KeyRef backupEnabledKey = LiteralStringRef("\xff/backupEnabled"); const KeyRangeRef backupLogKeys(LiteralStringRef("\xff\x02/blog/"), LiteralStringRef("\xff\x02/blog0")); diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 22f92fd55d..9be7dd11c0 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -173,6 +173,10 @@ extern const KeyRangeRef applyMutationsKeyVersionCountRange; // FdbClient Info prefix extern const KeyRangeRef fdbClientInfoPrefixRange; +// Keyspace to maintain wall clock to version map +extern const KeyRangeRef timeKeeperPrefixRange; +extern const KeyRef timeKeeperVersionKey; + // Layer status metadata prefix extern const KeyRangeRef layerStatusMetaPrefixRange; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index e77c221822..53c120e7b1 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -37,6 +37,7 @@ #include "fdbclient/ReadYourWrites.h" #include "fdbrpc/Replication.h" #include "fdbrpc/ReplicationUtils.h" +#include "fdbclient/KeyBackedTypes.h" void failAfter( Future trigger, Endpoint e ); @@ -1356,6 +1357,69 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { TEST(true); // Received an old worker registration request. } +#define TIME_KEEPER_VERSION LiteralStringRef("1") + +ACTOR Future timeKeeperSetVersion(ClusterControllerData *self) { + try { + loop { + state Reference tr = Reference( + new ReadYourWritesTransaction(self->cx)); + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + tr->set(timeKeeperVersionKey, TIME_KEEPER_VERSION); + Void _ = wait(tr->commit()); + break; + } catch (Error &e) { + Void _ = wait(tr->onError(e)); + } + } + } catch (Error & e) { + TraceEvent(SevWarnAlways, "TimeKeeperSetupVersionFailed").detail("cause", e.what()); + } + + return Void(); +} + +// This actor periodically gets read version and writes it to cluster with current timestamp as key. To avoid running +// out of space, it limits the max number of entries and clears old entries on each update. This mapping is used from +// backup and restore to get the version information for a timestamp. +ACTOR Future timeKeeper(ClusterControllerData *self) { + state KeyBackedMap versionMap(timeKeeperPrefixRange.begin); + + TraceEvent(SevInfo, "TimeKeeperStarted"); + + Void _ = wait(timeKeeperSetVersion(self)); + + loop { + try { + state Reference tr = Reference(new ReadYourWritesTransaction(self->cx)); + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + Version v = wait(tr->getReadVersion()); + int64_t currentTime = (int64_t)now(); + versionMap.set(tr, currentTime, v); + + int64_t ttl = currentTime - SERVER_KNOBS->TIME_KEEPER_DELAY * SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES; + if (ttl > 0) { + versionMap.erase(tr, 0, ttl); + } + + Void _ = wait(tr->commit()); + } catch (Error &e) { + Void _ = wait(tr->onError(e)); + } + } catch (Error &e) { + // Failed to update time-version map even after retries, just ignore this iteration + TraceEvent(SevWarn, "TimeKeeperFailed").detail("cause", e.what()); + } + + Void _ = wait(delay(SERVER_KNOBS->TIME_KEEPER_DELAY)); + } +} + ACTOR Future statusServer(FutureStream< StatusRequest> requests, ClusterControllerData *self, ServerCoordinators coordinators) @@ -1551,6 +1615,7 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, addActor.send( clusterWatchDatabase( &self, &self.db ) ); // Start the master database addActor.send( self.updateWorkerList.init( self.db.db ) ); addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators)); + addActor.send( timeKeeper(&self) ); addActor.send( monitorProcessClasses(&self) ); addActor.send( monitorClientTxnInfoConfigs(&self.db) ); //printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str()); diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index decfb89bd4..723f43c064 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -360,6 +360,10 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( STATUS_MIN_TIME_BETWEEN_REQUESTS, 0.0 ); init( CONFIGURATION_ROWS_TO_FETCH, 20000 ); + // Timekeeper + init( TIME_KEEPER_DELAY, 10 ); + init( TIME_KEEPER_MAX_ENTRIES, 3600 * 24 * 30 * 6); if( randomize && BUGGIFY ) { TIME_KEEPER_MAX_ENTRIES = 2; } + if(clientKnobs) clientKnobs->IS_ACCEPTABLE_DELAY = clientKnobs->IS_ACCEPTABLE_DELAY*std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS)/(5.0*VERSIONS_PER_SECOND); } diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index e27787a468..eaf2a915de 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -302,6 +302,10 @@ public: double STATUS_MIN_TIME_BETWEEN_REQUESTS; int CONFIGURATION_ROWS_TO_FETCH; + // Timekeeper + int64_t TIME_KEEPER_DELAY; + int64_t TIME_KEEPER_MAX_ENTRIES; + ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL); }; diff --git a/fdbserver/fdbserver.vcxproj b/fdbserver/fdbserver.vcxproj index 7b97f634cd..ec89c2276d 100644 --- a/fdbserver/fdbserver.vcxproj +++ b/fdbserver/fdbserver.vcxproj @@ -92,6 +92,7 @@ + diff --git a/fdbserver/fdbserver.vcxproj.filters b/fdbserver/fdbserver.vcxproj.filters index 38a460e641..486d5387b8 100644 --- a/fdbserver/fdbserver.vcxproj.filters +++ b/fdbserver/fdbserver.vcxproj.filters @@ -237,6 +237,9 @@ workloads + + workloads + workloads diff --git a/fdbserver/workloads/TimeKeeperCorrectness.actor.cpp b/fdbserver/workloads/TimeKeeperCorrectness.actor.cpp new file mode 100644 index 0000000000..6247b8bec4 --- /dev/null +++ b/fdbserver/workloads/TimeKeeperCorrectness.actor.cpp @@ -0,0 +1,144 @@ +/* + * TimeKeeperCorrectness.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "workloads.h" +#include "fdbclient/SystemData.h" +#include "fdbclient/KeyBackedTypes.h" +#include "fdbserver/Knobs.h" + +struct TimeKeeperCorrectnessWorkload : TestWorkload { + double testDuration; + std::map inMemTimeKeeper; + + TimeKeeperCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + testDuration = getOption( options, LiteralStringRef("testDuration"), 20.0 ); + } + + virtual std::string description() { + return "TimeKeeperCorrectness"; + } + + virtual Future setup(Database const& cx) { + return Void(); + } + + virtual void getMetrics(vector& m) { + } + + ACTOR static Future _start(Database cx, TimeKeeperCorrectnessWorkload *self) { + state Future testCompleted = delay(self->testDuration); + + TraceEvent(SevInfo, "TKCorrectness_start"); + + while (!testCompleted.isReady()) { + state Transaction tr(cx); + + try { + Version v = wait(tr.getReadVersion()); + self->inMemTimeKeeper[(int64_t) now()] = v; + } catch (Error & e) { + Void _ = wait(tr.onError(e)); + } + + // For every sample from Timekeeper collect two samples here. + Void _ = wait(delay(SERVER_KNOBS->TIME_KEEPER_DELAY / 2)); + } + + TraceEvent(SevInfo, "TKCorrectness_completed"); + return Void(); + } + + virtual Future start( Database const& cx ) { + return _start(cx, this); + } + + ACTOR static Future _check(Database cx, TimeKeeperCorrectnessWorkload *self) { + state KeyBackedMap dbTimeKeeper = KeyBackedMap(timeKeeperPrefixRange.begin); + state Reference tr = Reference(new ReadYourWritesTransaction(cx)); + + TraceEvent(SevInfo, "TKCorrectness_checkStart") + .detail("TIME_KEPER_MAX_ENTRIES", SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES) + .detail("TIME_KEEPER_DELAY", SERVER_KNOBS->TIME_KEEPER_DELAY); + + if (SERVER_KNOBS->TIME_KEEPER_DELAY < 2) { + TraceEvent(SevError, "TKCorrectness_tooSmallDelay").detail("found", SERVER_KNOBS->TIME_KEEPER_DELAY); + return false; + } + + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + std::vector> futureItems = wait( + dbTimeKeeper.getRange(tr, ((int64_t) now()) + 1, Optional(), 1)); + if (!futureItems.empty()) { + TraceEvent(SevError, "TKCorrectness_FutureMappings").detail("count", futureItems.empty()); + return false; + } + + std::vector> allItems = wait( + dbTimeKeeper.getRange(tr, 0, Optional(), self->inMemTimeKeeper.size() + 2)); + for (auto item : allItems) { + self->inMemTimeKeeper[item.first] = item.second; + } + + if (allItems.size() > SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES) { + TraceEvent(SevError, "TKCorrectness_tooManyEntries") + .detail("expected", SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES) + .detail("found", allItems.size()); + return false; + } + + if (allItems.size() < self->testDuration / SERVER_KNOBS->TIME_KEEPER_DELAY) { + TraceEvent(SevWarnAlways, "TKCorrectness_tooFewEntries") + .detail("expected", self->testDuration / SERVER_KNOBS->TIME_KEEPER_DELAY) + .detail("found", allItems.size()); + } + + bool first = true; + std::pair prevItem; + for (auto item : self->inMemTimeKeeper) { + if (first) { + first = false; + } else if (prevItem.second > item.second) { + TraceEvent(SevError, "TKCorrectness_timeMismatch") + .detail("prevVersion", prevItem.second) + .detail("currentVersion", item.second); + return false; + } + + prevItem = item; + } + + TraceEvent(SevInfo, "TKCorrectness_passed"); + return true; + } catch (Error & e) { + Void _ = wait(tr->onError(e)); + } + } + } + + virtual Future check( Database const& cx ) { + return _check(cx, this); + } +}; + +WorkloadFactory TimeKeeperCorrectnessWorkloadFactory("TimeKeeperCorrectness"); diff --git a/tests/fast/TimeKeeperCorrectness.txt b/tests/fast/TimeKeeperCorrectness.txt new file mode 100644 index 0000000000..0910d7cb8a --- /dev/null +++ b/tests/fast/TimeKeeperCorrectness.txt @@ -0,0 +1,3 @@ +testTitle=TimeKeeperCorrectness + testName=TimeKeeperCorrectness + testDuration=40.0