Merge pull request #4934 from sfc-gh-jslocum/tss_quarantine

Added TSS Quarantine and fixed TSS mapping bug in CommitProxy
This commit is contained in:
Josh Slocum 2021-06-11 11:52:42 -07:00 committed by GitHub
commit 65c49fc1e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 330 additions and 64 deletions

View File

@ -24,6 +24,7 @@
#include "fdbclient/IClientApi.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/Status.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
@ -685,6 +686,12 @@ void initHelp() {
CommandHelp("triggerddteaminfolog",
"trigger the data distributor teams logging",
"Trigger the data distributor to log detailed information about its teams.");
helpMap["tssq"] =
CommandHelp("tssq start|stop <StorageUID>",
"start/stop tss quarantine",
"Toggles Quarantine mode for a Testing Storage Server. Quarantine will happen automatically if the "
"TSS is detected to have incorrect data, but can also be initiated manually. You can also remove a "
"TSS from quarantine once your investigation is finished, which will destroy the TSS process.");
hiddenCommands.insert("expensive_data_check");
hiddenCommands.insert("datadistribution");
@ -1884,6 +1891,75 @@ ACTOR Future<Void> triggerDDTeamInfoLog(Database db) {
}
}
ACTOR Future<Void> tssQuarantineList(Database db) {
state ReadYourWritesTransaction tr(db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
RangeResult result = wait(tr.getRange(tssQuarantineKeys, CLIENT_KNOBS->TOO_MANY));
// shouldn't have many quarantined TSSes
ASSERT(!result.more);
printf("Found %d quarantined TSS processes%s\n", result.size(), result.size() == 0 ? "." : ":");
for (auto& it : result) {
printf(" %s\n", decodeTssQuarantineKey(it.key).toString().c_str());
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<bool> tssQuarantine(Database db, bool enable, UID tssId) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Do some validation first to make sure the command is valid
Optional<Value> serverListValue = wait(tr->get(serverListKeyFor(tssId)));
if (!serverListValue.present()) {
printf("No TSS %s found in cluster!\n", tssId.toString().c_str());
return false;
}
state StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
if (!ssi.isTss()) {
printf("Cannot quarantine Non-TSS storage ID %s!\n", tssId.toString().c_str());
return false;
}
Optional<Value> currentQuarantineValue = wait(tr->get(tssQuarantineKeyFor(tssId)));
if (enable && currentQuarantineValue.present()) {
printf("TSS %s already in quarantine, doing nothing.\n", tssId.toString().c_str());
return false;
} else if (!enable && !currentQuarantineValue.present()) {
printf("TSS %s is not in quarantine, cannot remove from quarantine!.\n", tssId.toString().c_str());
return false;
}
if (enable) {
tr->set(tssQuarantineKeyFor(tssId), LiteralStringRef(""));
// remove server from TSS mapping when quarantine is enabled
tssMapDB.erase(tr, ssi.tssPairID.get());
} else {
tr->clear(tssQuarantineKeyFor(tssId));
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
printf("Successfully %s TSS %s\n", enable ? "quarantined" : "removed", tssId.toString().c_str());
return true;
}
ACTOR Future<Void> timeWarning(double when, const char* msg) {
wait(delay(when));
fputs(msg, stderr);
@ -3437,6 +3513,31 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "tssq")) {
if (tokens.size() == 2) {
if (tokens[1] != LiteralStringRef("list")) {
printUsage(tokens[0]);
is_error = true;
} else {
wait(tssQuarantineList(db));
}
}
if (tokens.size() == 3) {
if ((tokens[1] != LiteralStringRef("start") && tokens[1] != LiteralStringRef("stop")) ||
(tokens[2].size() != 32) || !std::all_of(tokens[2].begin(), tokens[2].end(), &isxdigit)) {
printUsage(tokens[0]);
is_error = true;
} else {
bool enable = tokens[1] == LiteralStringRef("start");
UID tssId = UID::fromString(tokens[2].toString());
bool err = wait(tssQuarantine(db, enable, tssId));
if (err)
is_error = true;
}
}
continue;
}
if (tokencmp(tokens[0], "configure")) {
bool err = wait(configure(db, tokens, db->getConnectionFile(), &linenoise, warn));
if (err)

View File

@ -97,6 +97,7 @@ void ClientKnobs::initialize(bool randomize) {
init( DETAILED_HEALTH_METRICS_MAX_STALENESS, 5.0 );
init( MID_SHARD_SIZE_MAX_STALENESS, 10.0 );
init( TAG_ENCODE_KEY_SERVERS, false ); if( randomize && BUGGIFY ) TAG_ENCODE_KEY_SERVERS = true;
init( QUARANTINE_TSS_ON_MISMATCH, true ); if( randomize && BUGGIFY ) QUARANTINE_TSS_ON_MISMATCH = false; // if true, a tss mismatch will put the offending tss in quarantine. If false, it will just be killed
//KeyRangeMap
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;

View File

@ -90,6 +90,7 @@ public:
double DETAILED_HEALTH_METRICS_MAX_STALENESS;
double MID_SHARD_SIZE_MAX_STALENESS;
bool TAG_ENCODE_KEY_SERVERS;
bool QUARANTINE_TSS_ON_MISMATCH;
// KeyRangeMap
int KRM_GET_RANGE_LIMIT;

View File

@ -838,11 +838,12 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
}
}
if (found) {
TraceEvent(SevWarnAlways, "TSS_KillMismatch").detail("TSSID", tssID.toString());
TEST(true); // killing TSS because it got mismatch
state bool quarantine = CLIENT_KNOBS->QUARANTINE_TSS_ON_MISMATCH;
TraceEvent(SevWarnAlways, quarantine ? "TSS_QuarantineMismatch" : "TSS_KillMismatch")
.detail("TSSID", tssID.toString());
TEST(quarantine); // Quarantining TSS because it got mismatch
TEST(!quarantine); // Killing TSS because it got mismatch
// TODO we could write something to the system keyspace and then have DD listen to that keyspace and then DD
// do exactly this, so why not just cut out the middle man (or the middle system keys, as it were)
tr = makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(cx)));
state int tries = 0;
loop {
@ -850,7 +851,11 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->clear(serverTagKeyFor(tssID));
if (quarantine) {
tr->set(tssQuarantineKeyFor(tssID), LiteralStringRef(""));
} else {
tr->clear(serverTagKeyFor(tssID));
}
tssMapDB.erase(tr, tssPairID);
wait(tr->commit());
@ -861,16 +866,15 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
}
tries++;
if (tries > 10) {
// Give up on trying to kill the tss, it'll get another mismatch or a human will investigate
// eventually
TraceEvent("TSS_KillMismatchGaveUp").detail("TSSID", tssID.toString());
// Give up, it'll get another mismatch or a human will investigate eventually
TraceEvent("TSS_MismatchGaveUp").detail("TSSID", tssID.toString());
break;
}
}
// clear out txn so that the extra DatabaseContext ref gets decref'd and we can free cx
tr = makeReference<ReadYourWritesTransaction>();
} else {
TEST(true); // Not killing TSS with mismatch because it's already gone
TEST(true); // Not handling TSS with mismatch because it's already gone
}
}
}
@ -5878,21 +5882,20 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
}
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, bool lock_aware) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if(lock_aware) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lock_aware) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
}
tr.set(perpetualStorageWiggleKey, enable ? LiteralStringRef("1") : LiteralStringRef("0"));
wait(tr.commit());
break;
}
catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
tr.set(perpetualStorageWiggleKey, enable ? LiteralStringRef("1") : LiteralStringRef("0"));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

View File

@ -348,6 +348,22 @@ uint16_t cacheChangeKeyDecodeIndex(const KeyRef& key) {
const KeyRangeRef tssMappingKeys(LiteralStringRef("\xff/tss/"), LiteralStringRef("\xff/tss0"));
const KeyRangeRef tssQuarantineKeys(LiteralStringRef("\xff/tssQ/"), LiteralStringRef("\xff/tssQ0"));
const Key tssQuarantineKeyFor(UID serverID) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(tssQuarantineKeys.begin);
wr << serverID;
return wr.toValue();
}
UID decodeTssQuarantineKey(KeyRef const& key) {
UID serverID;
BinaryReader rd(key.removePrefix(tssQuarantineKeys.begin), Unversioned());
rd >> serverID;
return serverID;
}
const KeyRangeRef serverTagKeys(LiteralStringRef("\xff/serverTag/"), LiteralStringRef("\xff/serverTag0"));
const KeyRef serverTagPrefix = serverTagKeys.begin;

View File

@ -114,9 +114,16 @@ extern const KeyRef cacheChangePrefix;
const Key cacheChangeKeyFor(uint16_t idx);
uint16_t cacheChangeKeyDecodeIndex(const KeyRef& key);
// "\xff/tss/[[serverId]]" := "[[tssId]]"
// "\xff/tss/[[serverId]]" := "[[tssId]]"
extern const KeyRangeRef tssMappingKeys;
// "\xff/tssQ/[[serverId]]" := ""
// For quarantining a misbehaving TSS.
extern const KeyRangeRef tssQuarantineKeys;
const Key tssQuarantineKeyFor(UID serverID);
UID decodeTssQuarantineKey(KeyRef const&);
// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
// Provides the Tag for the given serverID. Used to access a
// storage server's corresponding TLog in order to apply mutations.

View File

@ -52,7 +52,7 @@ void applyMetadataMutations(SpanID const& spanContext,
Arena& arena,
VectorRef<MutationRef> const& mutations,
IKeyValueStore* txnStateStore,
LogPushData* toCommit,
LogPushData* toCommit, // non-null if these mutations were part of a new commit handled by this commit proxy
bool& confChange,
Reference<ILogSystem> logSystem,
Version popVersion,
@ -66,7 +66,8 @@ void applyMetadataMutations(SpanID const& spanContext,
std::map<UID, Reference<StorageInfo>>* storageCache,
std::map<Tag, Version>* tag_popped,
std::unordered_map<UID, StorageServerInterface>* tssMapping,
bool initialCommit) {
bool initialCommit // true if the mutations were already written to the txnStateStore as part of recovery
) {
// std::map<keyRef, vector<uint16_t>> cacheRangeInfo;
std::map<KeyRef, MutationRef> cachedRangeInfo;
@ -245,24 +246,45 @@ void applyMetadataMutations(SpanID const& spanContext,
}
}
} else if (m.param1.startsWith(tssMappingKeys.begin)) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
if (tssMapping) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
}
if (tssMapping) {
tssMappingToAdd.push_back(std::pair(ssId, tssId));
}
tssMappingToAdd.push_back(std::pair(ssId, tssId));
if (toCommit) {
// send private mutation to SS that it now has a TSS pair
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
// send private mutation to SS that it now has a TSS pair
if (toCommit) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
} else if (m.param1.startsWith(tssQuarantineKeys.begin)) {
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
if (toCommit) {
UID tssId = decodeTssQuarantineKey(m.param1);
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get();
if (ssiV.present()) {
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
if (ssi.isTss()) {
Optional<Value> tagV =
txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
}
}
@ -510,15 +532,52 @@ void applyMetadataMutations(SpanID const& spanContext,
txnStateStore->clear(range & serverTagHistoryKeys);
}
if (tssMappingKeys.intersects(range)) {
KeyRangeRef rangeToClear = range & tssMappingKeys;
ASSERT(rangeToClear.singleKeyRange());
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
if (!initialCommit) {
KeyRangeRef rangeToClear = range & tssMappingKeys;
txnStateStore->clear(rangeToClear);
}
if (tssMapping) {
tssMapping->erase(ssId);
}
if (toCommit) {
// send private mutation to SS to notify that it no longer has a tss pair
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
}
if (tssQuarantineKeys.intersects(range)) {
if (!initialCommit) {
KeyRangeRef rangeToClear = range & tssQuarantineKeys;
ASSERT(rangeToClear.singleKeyRange());
txnStateStore->clear(rangeToClear);
if (tssMapping) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId =
Codec<UID>::unpack(Tuple::unpack(rangeToClear.begin.removePrefix(tssMappingKeys.begin)));
tssMapping->erase(ssId);
if (toCommit) {
UID tssId = decodeTssQuarantineKey(m.param1);
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get();
if (ssiV.present()) {
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
if (ssi.isTss()) {
Optional<Value> tagV =
txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
}
}
}
}

View File

@ -1439,9 +1439,9 @@ void maybeAddTssMapping(GetKeyServerLocationsReply& reply,
if (!included.count(ssId)) {
auto mappingItr = commitData->tssMapping.find(ssId);
if (mappingItr != commitData->tssMapping.end()) {
included.insert(ssId);
reply.resultsTssMapping.push_back(*mappingItr);
}
included.insert(ssId);
}
}

View File

@ -1253,7 +1253,14 @@ ACTOR Future<Void> removeStorageServer(Database cx,
TraceEvent(SevError, "TSSIdentityMappingEnabled");
tssMapDB.erase(tr, serverID);
} else if (tssPairID.present()) {
// remove the TSS from the mapping
tssMapDB.erase(tr, tssPairID.get());
// remove the TSS from quarantine, if it was in quarantine
Key tssQuarantineKey = tssQuarantineKeyFor(serverID);
Optional<Value> tssInQuarantine = wait(tr->get(tssQuarantineKey));
if (tssInQuarantine.present()) {
tr->clear(tssQuarantineKeyFor(serverID));
}
}
retry = true;

View File

@ -180,6 +180,7 @@ struct StorageServerDisk {
void makeNewStorageServerDurable();
bool makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft);
void makeVersionDurable(Version version);
void makeTssQuarantineDurable();
Future<bool> restoreDurableState();
void changeLogProtocol(Version version, ProtocolVersion protocol);
@ -540,6 +541,22 @@ public:
}
}
// If a TSS is "in quarantine", it means it has incorrect data. It is effectively in a "zombie" state where it
// rejects all read requests and ignores all non-private mutations and data movements, but otherwise is still part
// of the cluster. The purpose of this state is to "freeze" the TSS state after a mismatch so a human operator can
// investigate, but preventing a new storage process from replacing the TSS on the worker. It will still get removed
// from the cluster if it falls behind on the mutation stream, or if its tss pair gets removed and its tag is no
// longer valid.
bool isTSSInQuarantine() { return tssPairID.present() && tssInQuarantine; }
void startTssQuarantine() {
if (!tssInQuarantine) {
// persist quarantine so it's still quarantined if rebooted
storage.makeTssQuarantineDurable();
}
tssInQuarantine = true;
}
StorageServerDisk storage;
KeyRangeMap<Reference<ShardInfo>> shards;
@ -585,6 +602,8 @@ public:
Optional<UID> tssPairID; // if this server is a tss, this is the id of its (ss) pair
Optional<UID> ssPairID; // if this server is an ss, this is the id of its (tss) pair
Optional<double> tssFaultInjectTime;
bool tssInQuarantine;
Key sk;
Reference<AsyncVar<ServerDBInfo>> db;
Database cx;
@ -787,7 +806,7 @@ public:
primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), shuttingDown(false),
debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), logProtocol(0),
counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()), tssInQuarantine(false),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), behind(false), versionBehind(false),
byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), lastUpdate(now()),
poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0) {
@ -914,10 +933,10 @@ public:
template <class Request>
bool shouldRead(const Request& request) {
auto rate = currentRate();
if (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD &&
deterministicRandom()->random01() >
std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE,
rate / SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD)) {
if (isTSSInQuarantine() || (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD &&
deterministicRandom()->random01() >
std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE,
rate / SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD))) {
sendErrorWithPenalty(request.reply, server_overloaded(), getPenalty());
++counters.readsRejected;
return false;
@ -3188,6 +3207,7 @@ static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("Foundation
LiteralStringRef("FoundationDB/StorageServer/1/5"));
static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID");
static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID");
static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ");
// (Potentially) change with the durable version or when fetchKeys completes
static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version");
@ -3264,12 +3284,16 @@ private:
ASSERT(m.type == MutationRef::SetValue && m.param1.startsWith(data->sk));
KeyRangeRef keys(startKey.removePrefix(data->sk), m.param1.removePrefix(data->sk));
// add changes in shard assignment to the mutation log
setAssignedStatus(data, keys, nowAssigned);
// ignore data movements for tss in quarantine
if (!data->isTSSInQuarantine()) {
// add changes in shard assignment to the mutation log
setAssignedStatus(data, keys, nowAssigned);
// The changes for version have already been received (and are being processed now). We need to fetch
// the data for change.version-1 (changes from versions < change.version)
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE);
}
// The changes for version have already been received (and are being processed now). We need
// to fetch the data for change.version-1 (changes from versions < change.version)
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE);
processedStartKey = false;
} else if (m.type == MutationRef::SetValue && m.param1.startsWith(data->sk)) {
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
@ -3307,7 +3331,8 @@ private:
bool matchesThisServer = serverTagKey == data->thisServerID;
bool matchesTssPair = data->isTss() ? serverTagKey == data->tssPairID.get() : false;
if ((m.type == MutationRef::SetValue && !data->isTss() && !matchesThisServer) ||
(m.type == MutationRef::ClearRange && (matchesThisServer || (data->isTss() && matchesTssPair)))) {
(m.type == MutationRef::ClearRange &&
((!data->isTSSInQuarantine() && matchesThisServer) || (data->isTss() && matchesTssPair)))) {
throw worker_removed();
}
if (!data->isTss() && m.type == MutationRef::ClearRange && data->ssPairID.present() &&
@ -3323,12 +3348,32 @@ private:
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2));
} else if (m.type == MutationRef::SetValue && m.param1.substr(1).startsWith(tssMappingKeys.begin)) {
} else if (m.param1.substr(1).startsWith(tssMappingKeys.begin) &&
(m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange)) {
if (!data->isTss()) {
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.substr(1).removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
ASSERT(ssId == data->thisServerID);
data->setSSWithTssPair(tssId);
if (m.type == MutationRef::SetValue) {
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
data->setSSWithTssPair(tssId);
} else {
data->clearSSWithTssPair();
}
}
} else if (m.param1.substr(1).startsWith(tssQuarantineKeys.begin) &&
(m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange)) {
if (data->isTss()) {
UID ssId = decodeTssQuarantineKey(m.param1.substr(1));
ASSERT(ssId == data->thisServerID);
if (m.type == MutationRef::SetValue) {
TEST(true); // Putting TSS in quarantine
TraceEvent(SevWarn, "TSSQuarantineStart", data->thisServerID);
data->startTssQuarantine();
} else {
TraceEvent(SevWarn, "TSSQuarantineStop", data->thisServerID);
// dipose of this TSS
throw worker_removed();
}
}
} else {
ASSERT(false); // Unknown private mutation
@ -3579,14 +3624,22 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
Span span("SS:update"_loc, { spanContext });
span.addTag("key"_sr, msg.param1);
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
// quarantine.
if (g_network->isSimulated() && data->isTss() &&
g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations &&
data->tssFaultInjectTime.present() && data->tssFaultInjectTime.get() < now() &&
(msg.type == MutationRef::SetValue || msg.type == MutationRef::ClearRange) && msg.param1.size() &&
msg.param1[0] != 0xff && deterministicRandom()->random01() < 0.05) {
(msg.type == MutationRef::SetValue || msg.type == MutationRef::ClearRange) &&
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff) &&
deterministicRandom()->random01() < 0.05) {
TraceEvent(SevWarnAlways, "TSSInjectDropMutation", data->thisServerID)
.detail("Mutation", msg.toString())
.detail("Version", cloneCursor2->version().toString());
} else if (data->isTSSInQuarantine() &&
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff)) {
TraceEvent("TSSQuarantineDropMutation", data->thisServerID)
.suppressFor(10.0)
.detail("Version", cloneCursor2->version().toString());
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID);
if (ver == 1) {
@ -3945,6 +3998,11 @@ void StorageServerDisk::makeVersionDurable(Version version) {
// .detail("ToVersion", version);
}
// Update data->storage to persist tss quarantine state
void StorageServerDisk::makeTssQuarantineDurable() {
storage->set(KeyValueRef(persistTssQuarantine, LiteralStringRef("1")));
}
void StorageServerDisk::changeLogProtocol(Version version, ProtocolVersion protocol) {
data->addMutationToMutationLogOrStorage(
version,
@ -4061,6 +4119,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fID = storage->readValue(persistID);
state Future<Optional<Value>> ftssPairID = storage->readValue(persistTssPairID);
state Future<Optional<Value>> fTssQuarantine = storage->readValue(persistTssQuarantine);
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
@ -4073,7 +4132,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
restoreByteSample(data, storage, byteSampleSampleRecovered, startByteSampleRestore.getFuture());
TraceEvent("ReadingDurableState", data->thisServerID);
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable }));
wait(byteSampleSampleRecovered.getFuture());
TraceEvent("RestoringDurableState", data->thisServerID);
@ -4097,6 +4156,14 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
data->setTssPair(BinaryReader::fromStringRef<UID>(ftssPairID.get().get(), Unversioned()));
}
// It's a bit sketchy to rely on an untrusted storage engine to persist its quarantine state when the quarantine
// state means the storage engine already had a durability or correctness error, but it should get re-quarantined
// very quickly because of a mismatch if it starts trying to do things again
if (fTssQuarantine.get().present()) {
TEST(true); // TSS restarted while quarantined
data->tssInQuarantine = true;
}
data->sk = serverKeysPrefixFor((data->tssPairID.present()) ? data->tssPairID.get() : data->thisServerID)
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
@ -4118,6 +4185,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
? allKeys.end
: available[availableLoc + 1].key.removePrefix(persistShardAvailableKeys.begin));
ASSERT(!keys.empty());
bool nowAvailable = available[availableLoc].value != LiteralStringRef("0");
/*if(nowAvailable)
TraceEvent("AvailableShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/
@ -4796,6 +4864,7 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
if (e.code() == error_code_please_reboot) {
// do nothing.
} else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
// SOMEDAY: could close instead of dispose if tss in quarantine gets removed so it could still be investigated?
persistentData->dispose();
} else {
persistentData->close();
@ -5026,7 +5095,9 @@ ACTOR Future<Void> replaceTSSInterface(StorageServer* self, StorageServerInterfa
tr->set(serverListKeyFor(ssi.id()), serverListValue(ssi));
// add itself back to tss mapping
tssMapDB.set(tr, self->tssPairID.get(), ssi.id());
if (!self->isTSSInQuarantine()) {
tssMapDB.set(tr, self->tssPairID.get(), ssi.id());
}
wait(tr->commit());
self->tag = myTag;