Merge pull request #5713 from liquid-helium/clean-sim-test-data-loss

Reproduced user data loss incident, and tested the improved exclude tool
This commit is contained in:
He Liu 2021-10-21 14:14:10 -07:00 committed by GitHub
commit 784576f214
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 317 additions and 15 deletions

View File

@ -164,6 +164,10 @@ If the ``failed`` keyword is specified, the address is marked as failed and adde
For more information on excluding servers, see :ref:`removing-machines-from-a-cluster`.
Warning about potential dataloss ``failed`` option: if a server is the last one in some team(s), excluding it with ``failed`` will lose all data in the team(s), and hence ``failed`` should only be set when the server(s) have permanently failed.
In the case all servers of a team have failed permanently, excluding all the servers will clean up the corresponding keyrange, and fix the invalid metadata. The keyrange will be assigned to a new team as an empty shard.
exit
----

View File

@ -393,5 +393,11 @@ CommandFactory excludeFactory(
"command returns \nimmediately without checking if the exclusions have completed successfully.\n"
"If 'FORCE' is set, the command does not perform safety checks before excluding.\n"
"If 'failed' is set, the transaction log queue is dropped pre-emptively before waiting\n"
"for data movement to finish and the server cannot be included again."));
"for data movement to finish and the server cannot be included again."
"\n\nWARNING of potential dataloss\n:"
"If a to-be-excluded server is the last server of some team(s), and 'failed' is set, the data in the team(s) "
"will be lost. 'failed' should be set only if the server(s) have permanently failed."
"In the case all servers of a team have failed permanently and dataloss has been a fact, excluding all the "
"servers will clean up the corresponding keyrange, and fix the invalid metadata. The keyrange will be "
"assigned to a new team as an empty shard."));
} // namespace fdb_cli

View File

@ -170,6 +170,7 @@ set(FDBSERVER_SRCS
workloads/CpuProfiler.actor.cpp
workloads/Cycle.actor.cpp
workloads/DataDistributionMetrics.actor.cpp
workloads/DataLossRecovery.actor.cpp
workloads/DDBalance.actor.cpp
workloads/DDMetrics.actor.cpp
workloads/DDMetricsExclude.actor.cpp

View File

@ -922,14 +922,19 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Prefer a healthy team not containing excludeServer.
if (candidates.size() > 0) {
return teams[deterministicRandom()->randomInt(0, candidates.size())]->getServerIDs();
}
// The backup choice is a team with at least one server besides excludeServer, in this
// case, the team will be possibily relocated to a healthy destination later by DD.
if (backup.size() > 0) {
std::vector<UID> res = teams[deterministicRandom()->randomInt(0, backup.size())]->getServerIDs();
std::remove(res.begin(), res.end(), excludeServer);
return teams[candidates[deterministicRandom()->randomInt(0, candidates.size())]]->getServerIDs();
} else if (backup.size() > 0) {
// The backup choice is a team with at least one server besides excludeServer, in this
// case, the team will be possibily relocated to a healthy destination later by DD.
std::vector<UID> servers =
teams[backup[deterministicRandom()->randomInt(0, backup.size())]]->getServerIDs();
std::vector<UID> res;
for (const UID& id : servers) {
if (id != excludeServer) {
res.push_back(id);
}
}
TraceEvent("FoundNonoptimalTeamForDroppedShard", excludeServer).detail("Team", describe(res));
return res;
}

View File

@ -3263,7 +3263,7 @@ void ShardInfo::addMutation(Version version, MutationRef const& mutation) {
}
}
enum ChangeServerKeysContext { CSK_UPDATE, CSK_RESTORE };
enum ChangeServerKeysContext { CSK_UPDATE, CSK_RESTORE, CSK_ASSIGN_EMPTY };
const char* changeServerKeysContextName[] = { "Update", "Restore" };
void changeServerKeys(StorageServer* data,
@ -3331,6 +3331,7 @@ void changeServerKeys(StorageServer* data,
auto vr = data->newestAvailableVersion.intersectingRanges(keys);
std::vector<std::pair<KeyRange, Version>> changeNewestAvailable;
std::vector<KeyRange> removeRanges;
std::vector<KeyRange> newEmptyRanges;
for (auto r = vr.begin(); r != vr.end(); ++r) {
KeyRangeRef range = keys & r->range();
bool dataAvailable = r->value() == latestVersion || r->value() >= version;
@ -3341,7 +3342,14 @@ void changeServerKeys(StorageServer* data,
// .detail("NowAssigned", nowAssigned)
// .detail("NewestAvailable", r->value())
// .detail("ShardState0", data->shards[range.begin]->debugDescribeState());
if (!nowAssigned) {
if (context == CSK_ASSIGN_EMPTY && !dataAvailable) {
ASSERT(nowAssigned);
TraceEvent("ChangeServerKeysAddEmptyRange", data->thisServerID)
.detail("Begin", range.begin)
.detail("End", range.end);
newEmptyRanges.push_back(range);
data->addShard(ShardInfo::newReadWrite(range, data));
} else if (!nowAssigned) {
if (dataAvailable) {
ASSERT(r->value() ==
latestVersion); // Not that we care, but this used to be checked instead of dataAvailable
@ -3354,7 +3362,7 @@ void changeServerKeys(StorageServer* data,
} else if (!dataAvailable) {
// SOMEDAY: Avoid restarting adding/transferred shards
if (version == 0) { // bypass fetchkeys; shard is known empty at version 0
TraceEvent("ChangeServerKeysAddEmptyRange", data->thisServerID)
TraceEvent("ChangeServerKeysInitialRange", data->thisServerID)
.detail("Begin", range.begin)
.detail("End", range.end);
changeNewestAvailable.emplace_back(range, latestVersion);
@ -3388,6 +3396,14 @@ void changeServerKeys(StorageServer* data,
removeDataRange(data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r);
setAvailableStatus(data, *r, false);
}
// Clear the moving-in empty range, and set it available at the latestVersion.
for (const auto& range : newEmptyRanges) {
MutationRef clearRange(MutationRef::ClearRange, range.begin, range.end);
data->addMutation(data->data().getLatestVersion(), clearRange, range, data->updateEagerReads);
data->newestAvailableVersion.insert(range, latestVersion);
setAvailableStatus(data, range, true);
}
validate(data);
}
@ -3532,8 +3548,8 @@ private:
// the data for change.version-1 (changes from versions < change.version)
// If emptyRange, treat the shard as empty, see removeKeysFromFailedServer() for more details about this
// scenario.
const Version shardVersion = (emptyRange && nowAssigned) ? 0 : currentVersion - 1;
changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE);
const ChangeServerKeysContext context = emptyRange ? CSK_ASSIGN_EMPTY : CSK_UPDATE;
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, context);
}
processedStartKey = false;
@ -5022,7 +5038,7 @@ ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
level = SevWarnAlways;
}
TraceEvent(level, "FetchKeyCurrentStatus")
TraceEvent(level, "FetchKeysCurrentStatus", self->thisServerID)
.detail("Timestamp", now())
.detail("LongestRunningTime", longestRunningFetchKeys.first)
.detail("StartKey", longestRunningFetchKeys.second.begin)

View File

@ -0,0 +1,256 @@
/*
* DataLossRecovery.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdint>
#include <limits>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
std::string printValue(const ErrorOr<Optional<Value>>& value) {
if (value.isError()) {
return value.getError().name();
}
return value.get().present() ? value.get().get().toString() : "Value Not Found.";
}
} // namespace
struct DataLossRecoveryWorkload : TestWorkload {
FlowLock startMoveKeysParallelismLock;
FlowLock finishMoveKeysParallelismLock;
const bool enabled;
bool pass;
NetworkAddress addr;
DataLossRecoveryWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), startMoveKeysParallelismLock(1), finishMoveKeysParallelismLock(1), enabled(!clientId),
pass(true) {}
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
TraceEvent(SevError, "TestFailed")
.detail("ExpectedValue", printValue(expectedValue))
.detail("ActualValue", printValue(actualValue));
pass = false;
}
std::string description() const override { return "DataLossRecovery"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
if (!enabled) {
return Void();
}
return _start(this, cx);
}
ACTOR Future<Void> _start(DataLossRecoveryWorkload* self, Database cx) {
state Key key = "TestKey"_sr;
state Key endKey = "TestKey0"_sr;
state Value oldValue = "TestValue"_sr;
state Value newValue = "TestNewValue"_sr;
wait(self->writeAndVerify(self, cx, key, oldValue));
// Move [key, endKey) to team: {address}.
state NetworkAddress address = wait(self->disableDDAndMoveShard(self, cx, KeyRangeRef(key, endKey)));
wait(self->readAndVerify(self, cx, key, oldValue));
// Kill team {address}, and expect read to timeout.
self->killProcess(self, address);
wait(self->readAndVerify(self, cx, key, timed_out()));
// Reenable DD and exclude address as fail, so that [key, endKey) will be dropped and moved to a new team.
// Expect read to return 'value not found'.
int ignore = wait(setDDMode(cx, 1));
wait(self->exclude(cx, address));
wait(self->readAndVerify(self, cx, key, Optional<Value>()));
// Write will scceed.
wait(self->writeAndVerify(self, cx, key, newValue));
return Void();
}
ACTOR Future<Void> readAndVerify(DataLossRecoveryWorkload* self,
Database cx,
Key key,
ErrorOr<Optional<Value>> expectedValue) {
state Transaction tr(cx);
loop {
try {
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
const bool equal = !expectedValue.isError() && res == expectedValue.get();
if (!equal) {
self->validationFailed(expectedValue, ErrorOr<Optional<Value>>(res));
}
break;
} catch (Error& e) {
if (expectedValue.isError() && expectedValue.getError().code() == e.code()) {
break;
}
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<Void> writeAndVerify(DataLossRecoveryWorkload* self, Database cx, Key key, Optional<Value> value) {
state Transaction tr(cx);
loop {
try {
if (value.present()) {
tr.set(key, value.get());
} else {
tr.clear(key);
}
wait(timeoutError(tr.commit(), 30.0));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
wait(self->readAndVerify(self, cx, key, value));
return Void();
}
ACTOR Future<Void> exclude(Database cx, NetworkAddress addr) {
state Transaction tr(cx);
state std::vector<AddressExclusion> servers;
servers.push_back(AddressExclusion(addr.ip, addr.port));
loop {
try {
excludeServers(tr, servers, true);
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
// Wait until all data are moved out of servers.
std::set<NetworkAddress> inProgress = wait(checkForExcludingServers(cx, servers, true));
ASSERT(inProgress.empty());
TraceEvent("ExcludedFailedServer").detail("Address", addr.toString());
return Void();
}
// Move keys to a random selected team consisting of a single SS, after disabling DD, so that keys won't be
// kept in the new team until DD is enabled.
// Returns the address of the single SS of the new team.
ACTOR Future<NetworkAddress> disableDDAndMoveShard(DataLossRecoveryWorkload* self, Database cx, KeyRange keys) {
// Disable DD to avoid DD undoing of our move.
state int ignore = wait(setDDMode(cx, 0));
state NetworkAddress addr;
// Pick a random SS as the dest, keys will reside on a single server after the move.
state std::vector<UID> dest;
while (dest.empty()) {
std::vector<StorageServerInterface> interfs = wait(getStorageServers(cx));
if (!interfs.empty()) {
const auto& interf = interfs[deterministicRandom()->randomInt(0, interfs.size())];
if (g_simulator.protectedAddresses.count(interf.address()) == 0) {
dest.push_back(interf.uniqueID);
addr = interf.address();
}
}
}
state UID owner = deterministicRandom()->randomUniqueID();
state DDEnabledState ddEnabledState;
state Transaction tr(cx);
loop {
try {
BinaryWriter wrMyOwner(Unversioned());
wrMyOwner << owner;
tr.set(moveKeysLockOwnerKey, wrMyOwner.toValue());
wait(tr.commit());
MoveKeysLock moveKeysLock;
moveKeysLock.myOwner = owner;
wait(moveKeys(cx,
keys,
dest,
dest,
moveKeysLock,
Promise<Void>(),
&self->startMoveKeysParallelismLock,
&self->finishMoveKeysParallelismLock,
false,
UID(), // for logging only
&ddEnabledState));
break;
} catch (Error& e) {
if (e.code() == error_code_movekeys_conflict) {
// Conflict on moveKeysLocks with the current running DD is expected, just retry.
tr.reset();
} else {
wait(tr.onError(e));
}
}
}
TraceEvent("TestKeyMoved").detail("NewTeam", describe(dest)).detail("Address", addr.toString());
state Transaction validateTr(cx);
loop {
try {
Standalone<VectorRef<const char*>> addresses = wait(validateTr.getAddressesForKey(keys.begin));
// The move function is not what we are testing here, crash the test if the move fails.
ASSERT(addresses.size() == 1);
ASSERT(std::string(addresses[0]) == addr.toString());
break;
} catch (Error& e) {
wait(validateTr.onError(e));
}
}
return addr;
}
void killProcess(DataLossRecoveryWorkload* self, const NetworkAddress& addr) {
ISimulator::ProcessInfo* process = g_simulator.getProcessByAddress(addr);
ASSERT(process->addresses.contains(addr));
g_simulator.killProcess(process, ISimulator::KillInstantly);
TraceEvent("TestTeamKilled").detail("Address", addr);
}
Future<bool> check(Database const& cx) override { return pass; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<DataLossRecoveryWorkload> DataLossRecoveryWorkloadFactory("DataLossRecovery");

View File

@ -131,6 +131,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.toml)
add_fdb_test(TEST_FILES fast/CycleAndLock.toml)
add_fdb_test(TEST_FILES fast/CycleTest.toml)
add_fdb_test(TEST_FILES fast/DataLossRecovery.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)

View File

@ -0,0 +1,13 @@
[configuration]
config = 'triple'
storageEngineType = 0
processesPerMachine = 2
coordinators = 3
machineCount = 45
[[test]]
testTitle = 'DataLossRecovery'
useDB = true
[[test.workload]]
testName = 'DataLossRecovery'