Add fdbcli command to read/write version epoch (#6480)

* Initialize cluster version at wall-clock time

Previously, new clusters would begin at version 0. After this change,
clusters will initialize at a version matching wall-clock time. Instead
of using the Unix epoch (or Windows epoch), FDB clusters will use a new
epoch, defaulting to January 1, 2010, 01:00:00+00:00. In the future,
this base epoch will be modifiable through fdbcli, allowing
administrators to advance the cluster version.

Basing the version off of time allows different FDB clusters to share
data without running into version issues.

* Send version epoch to master

* Cleanup

* Update fdbserver/storageserver.actor.cpp

Co-authored-by: A.J. Beamon <aj.beamon@snowflake.com>

* Jump directly to expected version if possible

* Fix initial version issue on storage servers

* Add random recovery offset to start version in simulation

* Type fixes

* Disable reference time by default

Enable on a cluster using the fdbcli command `versionepoch add 0`.

* Use correct recoveryTransactionVersion when recovering

* Allow version epoch to be adjusted forwards (to decrease the version)

* Set version epoch in simulation

* Add quiet database check to ensure small version offset

* Fix initial version issue on storage servers

* Disable reference time by default

Enable on a cluster using the fdbcli command `versionepoch add 0`.

* Add fdbcli command to read/write version epoch

* Cause recovery when version epoch is set

* Handle optional version epoch key

* Add ability to clear the version epoch

This causes version advancement to revert to the old methodology whereas
versions attempt to advance by about a million versions per second,
instead of trying to match the clock.

* Update transaction access

* Modify version epoch to use microseconds instead of seconds

* Modify fdbcli version target API

Move commands from `versionepoch` to `targetversion` top level command.

* Add fdbcli tests for

* Temporarily disable targetversion cli tests

* Fix version epoch fetch issue

* Fix Arena issue

* Reduce max version jump in simulation to 1,000,000

* Rework fdbcli API

It now requires two commands to fully switch a cluster to using the
version epoch. First, enable the version epoch with `versionepoch
enable` or `versionepoch set <versionepoch>`. At this point, versions
will be given out at a faster or slower rate in an attempt to reach the
expected version. Then, run `versionepoch commit` to perform a one time
jump to the expected version. This is essentially irreversible.

* Temporarily disable old targetversion tests

* Cleanup

* Move version epoch buggify to sequencer

This will cause some issues with the QuietDatabase check for the version
offset - namely, it won't do anything, since the version epoch is not
being written to the txnStateStore in simulation. This will get fixed in
the future.

Co-authored-by: A.J. Beamon <aj.beamon@snowflake.com>
This commit is contained in:
Lukas Joswiak 2022-04-08 12:33:19 -07:00 committed by GitHub
parent b742149869
commit 73a7c32982
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 447 additions and 21 deletions

View File

@ -260,6 +260,45 @@ def suspend(logger):
assert get_value_from_status_json(False, 'client', 'database_status', 'available')
def extract_version_epoch(cli_output):
return int(cli_output.split("\n")[-1].split(" ")[-1])
@enable_logging()
def targetversion(logger):
version1 = run_fdbcli_command('targetversion getepoch')
assert version1 == "Version epoch is unset"
version2 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version2))
assert version2 >= 0
# set the version epoch to the default value
logger.debug("setting version epoch to default")
run_fdbcli_command('targetversion add 0')
# get the version epoch
versionepoch1 = extract_version_epoch(run_fdbcli_command('targetversion getepoch'))
logger.debug("version epoch: {}".format(versionepoch1))
# make sure the version increased
version3 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version3))
assert version3 >= version2
# slightly increase the version epoch
versionepoch2 = extract_version_epoch(run_fdbcli_command("targetversion setepoch {}".format(versionepoch1 + 1000000)))
logger.debug("version epoch: {}".format(versionepoch2))
assert versionepoch2 == versionepoch1 + 1000000
# slightly decrease the version epoch
versionepoch3 = extract_version_epoch(run_fdbcli_command("targetversion add {}".format(-1000000)))
logger.debug("version epoch: {}".format(versionepoch3))
assert versionepoch3 == versionepoch2 - 1000000 == versionepoch1
# the versions should still be increasing
version4 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version4))
assert version4 >= version3
# clear the version epoch and make sure it is now unset
run_fdbcli_command("targetversion clearepoch")
version5 = run_fdbcli_command('targetversion getepoch')
assert version5 == "Version epoch is unset"
def get_value_from_status_json(retry, *args):
while True:
result = json.loads(run_fdbcli_command('status', 'json'))
@ -685,6 +724,9 @@ if __name__ == '__main__':
throttle()
triggerddteaminfolog()
tenants()
# TODO: similar to advanceversion, this seems to cause some issues, so disable for now
# This must go last, otherwise the version advancement can mess with the other tests
# targetversion()
else:
assert args.process_number > 1, "Process number should be positive"
coordinators()

View File

@ -29,6 +29,7 @@ set(FDBCLI_SRCS
TriggerDDTeamInfoLogCommand.actor.cpp
TssqCommand.actor.cpp
Util.actor.cpp
VersionEpochCommand.actor.cpp
linenoise/linenoise.h)
if(NOT WIN32)

View File

@ -0,0 +1,174 @@
/*
* VersionEpochCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "boost/lexical_cast.hpp"
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
const KeyRef versionEpochSpecialKey = LiteralStringRef("\xff\xff/management/version_epoch");
struct VersionInfo {
int64_t version;
int64_t expectedVersion;
};
ACTOR static Future<Optional<VersionInfo>> getVersionInfo(Reference<IDatabase> db) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
state Version rv = wait(safeThreadFutureToFuture(tr->getReadVersion()));
state ThreadFuture<Optional<Value>> versionEpochValFuture = tr->get(versionEpochKey);
Optional<Value> versionEpochVal = wait(safeThreadFutureToFuture(versionEpochValFuture));
if (!versionEpochVal.present()) {
return Optional<VersionInfo>();
}
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(versionEpochVal.get(), Unversioned());
int64_t expected = g_network->timer() * CLIENT_KNOBS->CORE_VERSIONSPERSECOND - versionEpoch;
return VersionInfo{ rv, expected };
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR static Future<Optional<int64_t>> getVersionEpoch(Reference<ITransaction> tr) {
loop {
try {
state ThreadFuture<Optional<Value>> versionEpochValFuture = tr->get(versionEpochSpecialKey);
Optional<Value> versionEpochVal = wait(safeThreadFutureToFuture(versionEpochValFuture));
return versionEpochVal.present() ? boost::lexical_cast<int64_t>(versionEpochVal.get().toString())
: Optional<int64_t>();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR Future<bool> versionEpochCommandActor(Reference<IDatabase> db, Database cx, std::vector<StringRef> tokens) {
if (tokens.size() <= 3) {
state Reference<ITransaction> tr = db->createTransaction();
if (tokens.size() == 1) {
Optional<VersionInfo> versionInfo = wait(getVersionInfo(db));
if (versionInfo.present()) {
int64_t diff = versionInfo.get().expectedVersion - versionInfo.get().version;
printf("Version: %" PRId64 "\n", versionInfo.get().version);
printf("Expected: %" PRId64 "\n", versionInfo.get().expectedVersion);
printf("Difference: %" PRId64 " (%.2fs)\n", diff, 1.0 * diff / CLIENT_KNOBS->VERSIONS_PER_SECOND);
} else {
printf("Version epoch is unset\n");
}
return true;
} else if (tokens.size() == 2 && tokencmp(tokens[1], "get")) {
Optional<int64_t> versionEpoch = wait(getVersionEpoch(db->createTransaction()));
if (versionEpoch.present()) {
printf("Current version epoch is %" PRId64 "\n", versionEpoch.get());
} else {
printf("Version epoch is unset\n");
}
return true;
} else if (tokens.size() == 2 && tokencmp(tokens[1], "disable")) {
// Clearing the version epoch means versions will no longer attempt
// to advance at the same rate as the clock. The current version
// will remain unchanged.
loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
Optional<int64_t> versionEpoch = wait(getVersionEpoch(db->createTransaction()));
if (!versionEpoch.present()) {
return true;
} else {
tr->clear(versionEpochSpecialKey);
wait(safeThreadFutureToFuture(tr->commit()));
}
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
} else if ((tokens.size() == 2 && tokencmp(tokens[1], "enable")) ||
(tokens.size() == 3 && tokencmp(tokens[1], "set"))) {
state int64_t v;
if (tokens.size() == 3) {
int n = 0;
if (sscanf(tokens[2].toString().c_str(), "%" SCNd64 "%n", &v, &n) != 1 || n != tokens[2].size()) {
printUsage(tokens[0]);
return false;
}
} else {
v = 0; // default version epoch
}
loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
Optional<int64_t> versionEpoch = wait(getVersionEpoch(tr));
if (!versionEpoch.present() || (versionEpoch.get() != v && tokens.size() == 3)) {
tr->set(versionEpochSpecialKey, BinaryWriter::toValue(v, Unversioned()));
wait(safeThreadFutureToFuture(tr->commit()));
} else {
printf("Version epoch enabled. Run `versionepoch commit` to irreversibly jump to the target "
"version\n");
return true;
}
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
} else if (tokens.size() == 2 && tokencmp(tokens[1], "commit")) {
Optional<VersionInfo> versionInfo = wait(getVersionInfo(db));
if (versionInfo.present()) {
wait(advanceVersion(cx, versionInfo.get().expectedVersion));
} else {
printf("Must set the version epoch before committing it (see `versionepoch enable`)\n");
}
return true;
}
}
printUsage(tokens[0]);
return false;
}
CommandFactory versionEpochFactory(
"versionepoch",
CommandHelp("versionepoch [<enable|commit|set|disable> [EPOCH]]",
"Read or write the version epoch",
"If no arguments are specified, reports the offset between the expected version "
"and the actual version. Otherwise, enables, disables, or commits the version epoch. "
"Setting the version epoch can be irreversible since it can cause a large verison jump. "
"Thus, the version epoch must first by enabled with the enable or set command. This "
"causes a recovery. Once the version epoch has been set, versions may be given out at "
"a faster or slower rate to attempt to match the actual version to the expected version, "
"based on the version epoch. After setting the version, run the commit command to perform "
"a one time jump to the expected version. This is useful when there is a very large gap "
"between the current version and the expected version. Note that once a version jump has "
"occurred, it cannot be undone. Run this command without any arguments to see the current "
"and expected version."));
} // namespace fdb_cli

View File

@ -1646,6 +1646,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "versionepoch")) {
bool _result = wait(makeInterruptable(versionEpochCommandActor(db, localDb, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "kill")) {
getTransaction(db, managementTenant, tr, options, intrans);
bool _result = wait(makeInterruptable(killCommandActor(db, tr, tokens, &address_interface)));

View File

@ -210,6 +210,10 @@ ACTOR Future<bool> throttleCommandActor(Reference<IDatabase> db, std::vector<Str
ACTOR Future<bool> triggerddteaminfologCommandActor(Reference<IDatabase> db);
// tssq command
ACTOR Future<bool> tssqCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// versionepoch command
ACTOR Future<bool> versionEpochCommandActor(Reference<IDatabase> db, Database cx, std::vector<StringRef> tokens);
// targetversion command
ACTOR Future<bool> targetVersionCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
} // namespace fdb_cli

View File

@ -22,6 +22,7 @@
#define FDBCLIENT_FDBTYPES_H
#include <algorithm>
#include <cinttypes>
#include <set>
#include <string>
#include <vector>

View File

@ -1519,6 +1519,12 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
std::make_unique<AdvanceVersionImpl>(
singleKeyRange(LiteralStringRef("min_required_commit_version"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<VersionEpochImpl>(
singleKeyRange(LiteralStringRef("version_epoch"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,

View File

@ -36,6 +36,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 1 * VERSIONS_PER_SECOND);
init( MAX_COMMIT_BATCH_INTERVAL, 2.0 ); if( randomize && BUGGIFY ) MAX_COMMIT_BATCH_INTERVAL = 0.5; // Each commit proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
MAX_COMMIT_BATCH_INTERVAL = std::min(MAX_COMMIT_BATCH_INTERVAL, MAX_READ_TRANSACTION_LIFE_VERSIONS/double(2*VERSIONS_PER_SECOND)); // Ensure that the proxy commits 2 times every MAX_READ_TRANSACTION_LIFE_VERSIONS, otherwise the master will not give out versions fast enough
init( MAX_VERSION_RATE_MODIFIER, 0.1 );
init( MAX_VERSION_RATE_OFFSET, VERSIONS_PER_SECOND ); // If the calculated version is more than this amount away from the expected version, it will be clamped to this value. This prevents huge version jumps.
// TLogs
init( TLOG_TIMEOUT, 0.4 ); //cannot buggify because of availability

View File

@ -39,6 +39,8 @@ public:
int64_t MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
double MAX_COMMIT_BATCH_INTERVAL; // Each commit proxy generates a CommitTransactionBatchRequest at least this
// often, so that versions always advance smoothly
double MAX_VERSION_RATE_MODIFIER;
int64_t MAX_VERSION_RATE_OFFSET;
// TLogs
bool PEEK_USING_STREAMING;

View File

@ -106,6 +106,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
{ "advanceversion",
singleKeyRange(LiteralStringRef("min_required_commit_version"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "versionepoch",
singleKeyRange(LiteralStringRef("version_epoch")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "profile",
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
@ -1909,6 +1911,42 @@ Future<Optional<std::string>> AdvanceVersionImpl::commit(ReadYourWritesTransacti
return Optional<std::string>();
}
ACTOR static Future<RangeResult> getVersionEpochActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
ryw->getTransaction().setOption(FDBTransactionOptions::RAW_ACCESS);
Optional<Value> val = wait(ryw->getTransaction().get(versionEpochKey));
RangeResult result;
if (val.present()) {
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(val.get(), Unversioned());
ValueRef version(result.arena(), boost::lexical_cast<std::string>(versionEpoch));
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, version));
}
return result;
}
VersionEpochImpl::VersionEpochImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<RangeResult> VersionEpochImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
ASSERT(kr == getKeyRange());
return getVersionEpochActor(ryw, kr);
}
Future<Optional<std::string>> VersionEpochImpl::commit(ReadYourWritesTransaction* ryw) {
auto versionEpoch =
ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandPrefix("versionepoch")].second;
if (versionEpoch.present()) {
int64_t epoch = BinaryReader::fromStringRef<int64_t>(versionEpoch.get(), Unversioned());
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
ryw->getTransaction().setOption(FDBTransactionOptions::RAW_ACCESS);
ryw->getTransaction().set(versionEpochKey, BinaryWriter::toValue(epoch, Unversioned()));
} else {
ryw->getTransaction().clear(versionEpochKey);
}
return Optional<std::string>();
}
ClientProfilingImpl::ClientProfilingImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
ACTOR static Future<RangeResult> ClientProfilingGetRangeActor(ReadYourWritesTransaction* ryw,

View File

@ -476,6 +476,15 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class VersionEpochImpl : public SpecialKeyRangeRWImpl {
public:
explicit VersionEpochImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class ClientProfilingImpl : public SpecialKeyRangeRWImpl {
public:
explicit ClientProfilingImpl(KeyRangeRef kr);

View File

@ -823,6 +823,7 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
const KeyRef coordinatorsKey = LiteralStringRef("\xff/coordinators");
const KeyRef logsKey = LiteralStringRef("\xff/logs");
const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCommitVersion");
const KeyRef versionEpochKey = LiteralStringRef("\xff/versionEpoch");
const KeyRef globalKeysPrefix = LiteralStringRef("\xff/globals");
const KeyRef lastEpochEndKey = LiteralStringRef("\xff/globals/lastEpochEnd");

View File

@ -348,6 +348,11 @@ extern const KeyRef logsKey;
// Used during backup/recovery to restrict version requirements
extern const KeyRef minRequiredCommitVersionKey;
// "\xff/versionEpochKey" = "[[uint64_t]]"
// Defines the base epoch representing version 0. The value itself is the
// number of microseconds since the Unix epoch.
extern const KeyRef versionEpochKey;
const Value logsValue(const std::vector<std::pair<UID, NetworkAddress>>& logs,
const std::vector<std::pair<UID, NetworkAddress>>& oldLogs);
std::pair<std::vector<std::pair<UID, NetworkAddress>>, std::vector<std::pair<UID, NetworkAddress>>> decodeLogsValue(

View File

@ -590,6 +590,18 @@ private:
TEST(true); // Recovering at a higher version.
}
void checkSetVersionEpochKey(MutationRef m) {
if (m.param1 != versionEpochKey) {
return;
}
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(m.param2, Unversioned());
TraceEvent("VersionEpoch", dbgid).detail("Epoch", versionEpoch);
if (!initialCommit)
txnStateStore->set(KeyValueRef(m.param1, m.param2));
confChange = true;
TEST(true); // Setting version epoch
}
void checkSetWriteRecoverKey(MutationRef m) {
if (m.param1 != writeRecoveryKey) {
return;
@ -957,6 +969,16 @@ private:
}
}
void checkClearVersionEpochKeys(MutationRef m, KeyRangeRef range) {
if (!range.contains(versionEpochKey)) {
return;
}
if (!initialCommit)
txnStateStore->clear(singleKeyRange(versionEpochKey));
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m);
confChange = true;
}
void checkClearTenantMapPrefix(KeyRangeRef range) {
if (tenantMapKeys.intersects(range)) {
if (tenantMap) {
@ -1119,6 +1141,7 @@ public:
checkSetGlobalKeys(m);
checkSetWriteRecoverKey(m);
checkSetMinRequiredCommitVersionKey(m);
checkSetVersionEpochKey(m);
checkSetTenantMapPrefix(m);
checkSetOtherKeys(m);
} else if (m.type == MutationRef::ClearRange && isSystemKey(m.param2)) {
@ -1135,6 +1158,7 @@ public:
checkClearLogRangesRange(range);
checkClearTssMappingKeys(m, range);
checkClearTssQuarantineKeys(m, range);
checkClearVersionEpochKeys(m, range);
checkClearTenantMapPrefix(range);
checkClearMiscRangeKeys(range);
}

View File

@ -342,6 +342,7 @@ ACTOR Future<Void> newSeedServers(Reference<ClusterRecoveryData> self,
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = deterministicRandom()->randomUniqueID();
isr.clusterId = self->clusterId;
isr.initialClusterVersion = self->recoveryTransactionVersion;
ErrorOr<InitializeStorageReply> newServer = wait(recruits.storageServers[idx].storage.tryGetReply(isr));
@ -989,8 +990,12 @@ ACTOR Future<std::vector<Standalone<CommitTransactionRef>>> recruitEverything(
newTLogServers(self, recruits, oldLogSystem, &confChanges));
// Update recovery related information to the newly elected sequencer (master) process.
wait(brokenPromiseToNever(self->masterInterface.updateRecoveryData.getReply(UpdateRecoveryDataRequest(
self->recoveryTransactionVersion, self->lastEpochEnd, self->commitProxies, self->resolvers))));
wait(brokenPromiseToNever(
self->masterInterface.updateRecoveryData.getReply(UpdateRecoveryDataRequest(self->recoveryTransactionVersion,
self->lastEpochEnd,
self->commitProxies,
self->resolvers,
self->versionEpoch))));
return confChanges;
}
@ -1036,6 +1041,14 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
self->txnStateStore =
keyValueStoreLogSystem(self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false, true);
// Version 0 occurs at the version epoch. The version epoch is the number
// of microseconds since the Unix epoch. It can be set through fdbcli.
self->versionEpoch.reset();
Optional<Standalone<StringRef>> versionEpochValue = wait(self->txnStateStore->readValue(versionEpochKey));
if (versionEpochValue.present()) {
self->versionEpoch = BinaryReader::fromStringRef<int64_t>(versionEpochValue.get(), Unversioned());
}
// Versionstamped operations (particularly those applied from DR) define a minimum commit version
// that we may recover to, as they embed the version in user-readable data and require that no
// transactions will be committed at a lower version.
@ -1046,6 +1059,11 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
if (requiredCommitVersion.present()) {
minRequiredCommitVersion = BinaryReader::fromStringRef<Version>(requiredCommitVersion.get(), Unversioned());
}
if (g_network->isSimulated() && self->versionEpoch.present()) {
minRequiredCommitVersion = std::max(
minRequiredCommitVersion,
static_cast<Version>(g_network->timer() * SERVER_KNOBS->VERSIONS_PER_SECOND - self->versionEpoch.get()));
}
// Recover version info
self->lastEpochEnd = oldLogSystem->getEnd() - 1;
@ -1058,14 +1076,14 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
self->recoveryTransactionVersion = self->lastEpochEnd + SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT;
}
if (BUGGIFY) {
self->recoveryTransactionVersion +=
deterministicRandom()->randomInt64(0, SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT);
}
if (self->recoveryTransactionVersion < minRequiredCommitVersion)
self->recoveryTransactionVersion = minRequiredCommitVersion;
}
if (BUGGIFY) {
self->recoveryTransactionVersion += deterministicRandom()->randomInt64(0, 10000000);
}
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_RECOVERED_EVENT_NAME).c_str(),
self->dbgid)
.detail("LastEpochEnd", self->lastEpochEnd)

View File

@ -169,6 +169,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
AsyncTrigger registrationTrigger;
Version lastEpochEnd, // The last version in the old epoch not (to be) rolled back in this recovery
recoveryTransactionVersion; // The first version in this epoch
Optional<int64_t> versionEpoch; // The epoch which all versions are based off of
double lastCommitTime;
Version liveCommittedVersion; // The largest live committed version reported by commit proxies.
@ -209,6 +210,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
UID clusterId;
Version initialClusterVersion = -1;
Standalone<StringRef> dbId;
MasterInterface masterInterface;

View File

@ -157,19 +157,21 @@ struct UpdateRecoveryDataRequest {
Version lastEpochEnd;
std::vector<CommitProxyInterface> commitProxies;
std::vector<ResolverInterface> resolvers;
Optional<int64_t> versionEpoch;
ReplyPromise<Void> reply;
UpdateRecoveryDataRequest() = default;
UpdateRecoveryDataRequest(Version recoveryTransactionVersion,
Version lastEpochEnd,
const std::vector<CommitProxyInterface>& commitProxies,
const std::vector<ResolverInterface>& resolvers)
const std::vector<ResolverInterface>& resolvers,
Optional<int64_t> versionEpoch)
: recoveryTransactionVersion(recoveryTransactionVersion), lastEpochEnd(lastEpochEnd),
commitProxies(commitProxies), resolvers(resolvers) {}
commitProxies(commitProxies), resolvers(resolvers), versionEpoch(versionEpoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, recoveryTransactionVersion, lastEpochEnd, commitProxies, resolvers, reply);
serializer(ar, recoveryTransactionVersion, lastEpochEnd, commitProxies, resolvers, versionEpoch, reply);
}
};

View File

@ -598,6 +598,31 @@ ACTOR Future<bool> getStorageServersRecruiting(Database cx, WorkerInterface dist
}
}
// Gets the difference between the expected version (based on the version
// epoch) and the actual version.
ACTOR Future<int64_t> getVersionOffset(Database cx,
WorkerInterface distributorWorker,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
loop {
state Transaction tr(cx);
try {
TraceEvent("GetVersionOffset").detail("Stage", "ReadingVersionEpoch");
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state Version rv = wait(tr.getReadVersion());
Optional<Standalone<StringRef>> versionEpochValue = wait(tr.get(versionEpochKey));
if (!versionEpochValue.present()) {
return 0;
}
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(versionEpochValue.get(), Unversioned());
int64_t versionOffset = abs(rv - (g_network->timer() * SERVER_KNOBS->VERSIONS_PER_SECOND - versionEpoch));
return versionOffset;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
@ -652,7 +677,8 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
int64_t maxTLogQueueGate = 5e6,
int64_t maxStorageServerQueueGate = 5e6,
int64_t maxDataDistributionQueueSize = 0,
int64_t maxPoppedVersionLag = 30e6) {
int64_t maxPoppedVersionLag = 30e6,
int64_t maxVersionOffset = 1e6) {
state Future<Void> reconfig =
reconfigureAfter(cx, 100 + (deterministicRandom()->random01() * 100), dbInfo, "QuietDatabase");
state Future<int64_t> dataInFlight;
@ -662,6 +688,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
state Future<int64_t> storageQueueSize;
state Future<bool> dataDistributionActive;
state Future<bool> storageServersRecruiting;
state Future<int64_t> versionOffset;
auto traceMessage = "QuietDatabase" + phase + "Begin";
TraceEvent(traceMessage.c_str()).log();
@ -698,10 +725,11 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo);
dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID);
versionOffset = getVersionOffset(cx, distributorWorker, dbInfo);
wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) &&
success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) &&
success(storageServersRecruiting));
success(storageServersRecruiting) && success(versionOffset));
TraceEvent(("QuietDatabase" + phase).c_str())
.detail("DataInFlight", dataInFlight.get())
@ -717,13 +745,17 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
.detail("MaxStorageServerQueueGate", maxStorageServerQueueGate)
.detail("DataDistributionActive", dataDistributionActive.get())
.detail("StorageServersRecruiting", storageServersRecruiting.get())
.detail("RecoveryCount", dbInfo->get().recoveryCount)
.detail("VersionOffset", versionOffset.get())
.detail("NumSuccesses", numSuccesses);
maxVersionOffset += dbInfo->get().recoveryCount * SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT;
if (dataInFlight.get() > dataInFlightGate || tLogQueueInfo.get().first > maxTLogQueueGate ||
tLogQueueInfo.get().second > maxPoppedVersionLag ||
dataDistributionQueueSize.get() > maxDataDistributionQueueSize ||
storageQueueSize.get() > maxStorageServerQueueGate || !dataDistributionActive.get() ||
storageServersRecruiting.get() || !teamCollectionValid.get()) {
storageServersRecruiting.get() || versionOffset.get() > maxVersionOffset ||
!teamCollectionValid.get()) {
wait(delay(1.0));
numSuccesses = 0;
@ -779,6 +811,10 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "storageServersRecruiting");
}
if (versionOffset.isReady() && versionOffset.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "versionOffset");
}
wait(delay(1.0));
numSuccesses = 0;
}
@ -794,7 +830,8 @@ Future<Void> quietDatabase(Database const& cx,
int64_t maxTLogQueueGate,
int64_t maxStorageServerQueueGate,
int64_t maxDataDistributionQueueSize,
int64_t maxPoppedVersionLag) {
int64_t maxPoppedVersionLag,
int64_t maxVersionOffset) {
return waitForQuietDatabase(cx,
dbInfo,
phase,
@ -802,5 +839,6 @@ Future<Void> quietDatabase(Database const& cx,
maxTLogQueueGate,
maxStorageServerQueueGate,
maxDataDistributionQueueSize,
maxPoppedVersionLag);
maxPoppedVersionLag,
maxVersionOffset);
}

View File

@ -767,11 +767,13 @@ struct InitializeStorageRequest {
Optional<std::pair<UID, Version>>
tssPairIDAndVersion; // Only set if recruiting a tss. Will be the UID and Version of its SS pair.
UID clusterId; // Unique cluster identifier. Only needed at recruitment, will be read from txnStateStore on recovery
Version initialClusterVersion;
ReplyPromise<InitializeStorageReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, clusterId);
serializer(
ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, clusterId, initialClusterVersion);
}
};
@ -1086,6 +1088,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version startVersion,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include <algorithm>
#include <iterator>
#include "fdbrpc/sim_validation.h"
@ -47,6 +48,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Version version; // The last version assigned to a proxy by getVersion()
double lastVersionTime;
Optional<Version> referenceVersion;
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
@ -125,12 +127,36 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
if (BUGGIFY) {
t1 = self->lastVersionTime;
}
rep.prevVersion = self->version;
self->version +=
// Versions should roughly follow wall-clock time, based on the
// system clock of the current machine and an FDB-specific epoch.
// Calculate the expected version and determine whether we need to
// hand out versions faster or slower to stay in sync with the
// clock.
Version toAdd =
std::max<Version>(1,
std::min<Version>(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS,
SERVER_KNOBS->VERSIONS_PER_SECOND * (t1 - self->lastVersionTime)));
rep.prevVersion = self->version;
if (self->referenceVersion.present()) {
Version expected =
g_network->timer() * SERVER_KNOBS->VERSIONS_PER_SECOND - self->referenceVersion.get();
// Attempt to jump directly to the expected version. But make
// sure that versions are still being handed out at a rate
// around VERSIONS_PER_SECOND. This rate is scaled depending on
// how far off the calculated version is from the expected
// version.
int64_t maxOffset = std::min(static_cast<int64_t>(toAdd * SERVER_KNOBS->MAX_VERSION_RATE_MODIFIER),
SERVER_KNOBS->MAX_VERSION_RATE_OFFSET);
self->version =
std::clamp(expected, self->version + toAdd - maxOffset, self->version + toAdd + maxOffset);
ASSERT_GT(self->version, rep.prevVersion);
} else {
self->version = self->version + toAdd;
}
TEST(self->version - rep.prevVersion == 1); // Minimum possible version gap
bool maxVersionGap = self->version - rep.prevVersion == SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;
@ -214,7 +240,8 @@ ACTOR Future<Void> updateRecoveryData(Reference<MasterData> self) {
TraceEvent("UpdateRecoveryData", self->dbgid)
.detail("RecoveryTxnVersion", req.recoveryTransactionVersion)
.detail("LastEpochEnd", req.lastEpochEnd)
.detail("NumCommitProxies", req.commitProxies.size());
.detail("NumCommitProxies", req.commitProxies.size())
.detail("VersionEpoch", req.versionEpoch);
if (self->recoveryTransactionVersion == invalidVersion ||
req.recoveryTransactionVersion > self->recoveryTransactionVersion) {
@ -230,6 +257,16 @@ ACTOR Future<Void> updateRecoveryData(Reference<MasterData> self) {
self->lastCommitProxyVersionReplies[p.id()] = CommitProxyVersionReplies();
}
}
if (req.versionEpoch.present()) {
self->referenceVersion = req.versionEpoch.get();
} else if (BUGGIFY) {
// Cannot use a positive version epoch in simulation because of the
// clock starting at 0. A positive version epoch would mean the initial
// cluster version was negative.
// TODO: Increase the size of this interval after fixing the issue
// with restoring ranges with large version gaps.
self->referenceVersion = deterministicRandom()->randomInt64(-1e6, 0);
}
self->resolutionBalancer.setCommitProxies(req.commitProxies);
self->resolutionBalancer.setResolvers(req.resolvers);

View File

@ -795,6 +795,9 @@ public:
Reference<ILogSystem::IPeekCursor> logCursor;
Promise<UID> clusterId;
// The version the cluster starts on. This value is not persisted and may
// not be valid after a recovery.
Version initialClusterVersion = invalidVersion;
UID thisServerID;
Optional<UID> tssPairID; // if this server is a tss, this is the id of its (ss) pair
Optional<UID> ssPairID; // if this server is an ss, this is the id of its (tss) pair
@ -5849,7 +5852,8 @@ void changeServerKeys(StorageServer* data,
data->watches.triggerRange(range.begin, range.end);
} else if (!dataAvailable) {
// SOMEDAY: Avoid restarting adding/transferred shards
if (version == 0) { // bypass fetchkeys; shard is known empty at version 0
// bypass fetchkeys; shard is known empty at initial cluster version
if (version == data->initialClusterVersion - 1) {
TraceEvent("ChangeServerKeysInitialRange", data->thisServerID)
.detail("Begin", range.begin)
.detail("End", range.end);
@ -6734,7 +6738,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
.detail("Version", cloneCursor2->version().toString());
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
if (ver == 1) {
if (ver == data->initialClusterVersion) {
//TraceEvent("SSPeekMutation", data->thisServerID).log();
// The following trace event may produce a value with special characters
TraceEvent("SSPeekMutation", data->thisServerID)
@ -6850,6 +6854,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
proposedOldestVersion = std::min(proposedOldestVersion, data->version.get() - 1);
proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());
proposedOldestVersion = std::max(proposedOldestVersion, data->initialClusterVersion);
//TraceEvent("StorageServerUpdated", data->thisServerID).detail("Ver", ver).detail("DataVersion", data->version.get())
// .detail("LastTLogVersion", data->lastTLogVersion).detail("NewOldest",
@ -8715,6 +8720,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version startVersion,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
@ -8722,6 +8728,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
state StorageServer self(persistentData, db, ssi);
state Future<Void> ssCore;
self.clusterId.send(clusterId);
self.initialClusterVersion = startVersion;
if (ssi.isTss()) {
self.setTssPair(ssi.tssPairID.get());
ASSERT(self.isTss());

View File

@ -2183,6 +2183,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
[&req](const auto& p) { return p.second != req.storeType; }) ||
req.seedTag != invalidTag)) {
ASSERT(req.clusterId.isValid());
ASSERT(req.initialClusterVersion >= 0);
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
bool isTss = req.tssPairIDAndVersion.present();
@ -2244,6 +2245,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
recruited,
req.seedTag,
req.clusterId,
req.initialClusterVersion,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,
dbInfo,

View File

@ -235,7 +235,8 @@ Future<Void> quietDatabase(Database const& cx,
int64_t maxTLogQueueGate = 5e6,
int64_t maxStorageServerQueueGate = 5e6,
int64_t maxDataDistributionQueueSize = 0,
int64_t maxPoppedVersionLag = 30e6);
int64_t maxPoppedVersionLag = 30e6,
int64_t maxVersionOffset = 1e6);
/**
* A utility function for testing error situations. It succeeds if the given test