Audit should always complete, any failures are retried.

This commit is contained in:
He Liu 2022-09-02 09:11:19 -07:00
parent cade0baf7e
commit 033741daab
9 changed files with 514 additions and 69 deletions

View File

@ -705,7 +705,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( FETCH_CHANGEFEED_PARALLELISM, 4 );
init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 );
init( SERVE_VALIDATE_STORAGE_PARALLELISM, 2 );
init( SERVE_AUDIT_STORAGE_PARALLELISM, 2 );
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;

View File

@ -0,0 +1,97 @@
/*
* Audit.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_AUDIT_H
#define FDBCLIENT_AUDIT_H
#pragma once
#include "fdbclient/FDBTypes.h"
enum class AuditPhase { Invalid = 0; Running = 1; Complete = 2; Error = 3; };
enum class AuditType { Invalid = 0; ValidateHA = 1 };
struct AuditStorageState {
constexpr static FileIdentifier file_identifier = 13804340;
ValidateStorageResult() = default;
ValidateStorageResult(UID id, AuditType type) : id(id), type(type), validatedKeys(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, type, phase, error);
}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
void setPhase(AuditPhase phase) { this->phase = static_cast<uint8_t>(phase); }
AuditPhase getPhase() const { return static_cast<AuditPhase>(this->phase); }
UID id;
uint8_t type;
uint8_t phase;
std::string error;
};
struct AuditStorageRequest {
constexpr static FileIdentifier file_identifier = 13804341;
AuditStorageRequest() = default;
AuditStorageRequest(UID id, KeyRange range, AuditType type)
: id(id), range(range), type(static_cast<uint8_t>(type)) {}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, range, actions, reply);
}
UID id;
KeyRange range;
uint8_t type;
ReplyPromise<AuditStorageState> reply;
};
// Triggers an audit of the specific type, an audit id is returned if an audit is scheduled successfully.
// If there is an running audit, the corresponding id will be returned, unless force is true;
// When force is set, the ongoing audit will be cancelled, and a new audit will be scheduled.
struct TriggerAuditRequest {
constexpr static FileIdentifier file_identifier = 1384445;
TriggerAuditRequest() = default;
SplitShardRequest(AuditType type, KeyRange range) : type(type), range(range), force(false) {}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, type, range, force, reply);
}
uint8_t type;
KeyRange range;
bool force;
ReplyPromise<UID> reply;
};
#endif

View File

@ -0,0 +1,326 @@
/*
* DataDistributorClientInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_ClusterInterface_H
#define FDBCLIENT_ClusterInterface_H
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/Status.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/ClientWorkerInterface.h"
#include "fdbclient/ClientVersion.h"
struct DataDistributorClientInterface {
constexpr static FileIdentifier file_identifier = 15988863;
RequestStream<struct > openDatabase;
RequestStream<struct FailureMonitoringRequest> failureMonitoring;
RequestStream<struct StatusRequest> databaseStatus;
RequestStream<ReplyPromise<Void>> ping;
RequestStream<struct GetClientWorkersRequest> getClientWorkers;
RequestStream<struct ForceRecoveryRequest> forceRecovery;
RequestStream<struct MoveShardRequest> moveShard;
RequestStream<struct RepairSystemDataRequest> repairSystemData;
RequestStream<struct SplitShardRequest> splitShard;
bool operator==(ClusterInterface const& r) const { return id() == r.id(); }
bool operator!=(ClusterInterface const& r) const { return id() != r.id(); }
UID id() const { return openDatabase.getEndpoint().token; }
NetworkAddress address() const { return openDatabase.getEndpoint().getPrimaryAddress(); }
bool hasMessage() const {
return openDatabase.getFuture().isReady() || failureMonitoring.getFuture().isReady() ||
databaseStatus.getFuture().isReady() || ping.getFuture().isReady() ||
getClientWorkers.getFuture().isReady() || forceRecovery.getFuture().isReady() ||
moveShard.getFuture().isReady() || repairSystemData.getFuture().isReady() ||
splitShard.getFuture().isReady();
}
void initEndpoints() {
openDatabase.getEndpoint(TaskPriority::ClusterController);
failureMonitoring.getEndpoint(TaskPriority::FailureMonitor);
databaseStatus.getEndpoint(TaskPriority::ClusterController);
ping.getEndpoint(TaskPriority::ClusterController);
getClientWorkers.getEndpoint(TaskPriority::ClusterController);
forceRecovery.getEndpoint(TaskPriority::ClusterController);
moveShard.getEndpoint(TaskPriority::ClusterController);
repairSystemData.getEndpoint(TaskPriority::ClusterController);
splitShard.getEndpoint(TaskPriority::ClusterController);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar,
openDatabase,
failureMonitoring,
databaseStatus,
ping,
getClientWorkers,
forceRecovery,
moveShard,
repairSystemData,
splitShard);
}
};
struct ClusterControllerClientInterface {
constexpr static FileIdentifier file_identifier = 14997695;
ClusterInterface clientInterface;
bool operator==(ClusterControllerClientInterface const& r) const {
return clientInterface.id() == r.clientInterface.id();
}
bool operator!=(ClusterControllerClientInterface const& r) const {
return clientInterface.id() != r.clientInterface.id();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clientInterface);
}
};
template <class T>
struct ItemWithExamples {
T item;
int count;
std::vector<std::pair<NetworkAddress, Key>> examples;
ItemWithExamples() : item{}, count(0) {}
ItemWithExamples(T const& item, int count, std::vector<std::pair<NetworkAddress, Key>> const& examples)
: item(item), count(count), examples(examples) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, item, count, examples);
}
};
struct OpenDatabaseRequest {
constexpr static FileIdentifier file_identifier = 2799502;
// Sent by the native API to the cluster controller to open a database and track client
// info changes. Returns immediately if the current client info id is different from
// knownClientInfoID; otherwise returns when it next changes (or perhaps after a long interval)
int clientCount;
std::vector<ItemWithExamples<Key>> issues;
std::vector<ItemWithExamples<Standalone<ClientVersionRef>>> supportedVersions;
std::vector<ItemWithExamples<Key>> maxProtocolSupported;
UID knownClientInfoID;
ReplyPromise<struct ClientDBInfo> reply;
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().hasOpenDatabase());
}
serializer(ar, clientCount, issues, supportedVersions, maxProtocolSupported, knownClientInfoID, reply);
}
};
struct SystemFailureStatus {
constexpr static FileIdentifier file_identifier = 3194108;
NetworkAddressList addresses;
FailureStatus status;
SystemFailureStatus() {}
SystemFailureStatus(NetworkAddressList const& a, FailureStatus const& s) : addresses(a), status(s) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, addresses, status);
}
};
struct FailureMonitoringReply {
constexpr static FileIdentifier file_identifier = 6820325;
VectorRef<SystemFailureStatus> changes;
Version failureInformationVersion;
bool allOthersFailed; // If true, changes are relative to all servers being failed, otherwise to the version given
// in the request
int clientRequestIntervalMS, // after this many milliseconds, send another request
considerServerFailedTimeoutMS; // after this many additional milliseconds, consider the ClusterController itself
// to be failed
Arena arena;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar,
changes,
failureInformationVersion,
allOthersFailed,
clientRequestIntervalMS,
considerServerFailedTimeoutMS,
arena);
}
};
struct FailureMonitoringRequest {
// Sent by all participants to the cluster controller reply.clientRequestIntervalMS
// ms after receiving the previous reply.
// Provides the controller the self-diagnosed status of the sender, and also
// requests the status of other systems. Failure to timely send one of these implies
// a failed status.
// If !senderStatus.present(), the sender wants to receive the latest failure information
// but doesn't want to be monitored.
// The failureInformationVersion returned in reply should be passed back to the
// next request to facilitate delta compression of the failure information.
constexpr static FileIdentifier file_identifier = 5867851;
Optional<FailureStatus> senderStatus;
Version failureInformationVersion;
NetworkAddressList addresses;
ReplyPromise<struct FailureMonitoringReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, senderStatus, failureInformationVersion, addresses, reply);
}
};
struct StatusReply {
constexpr static FileIdentifier file_identifier = 9980504;
StatusObject statusObj;
std::string statusStr;
StatusReply() {}
explicit StatusReply(StatusObject obj)
: statusObj(obj), statusStr(json_spirit::write_string(json_spirit::mValue(obj))) {}
explicit StatusReply(std::string&& text) : statusStr(text) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, statusStr);
if (ar.isDeserializing) {
json_spirit::mValue mv;
if (g_network->isSimulated()) {
mv = readJSONStrictly(statusStr);
} else {
// In non-simulation allow errors because some status data is better than no status data
json_spirit::read_string(statusStr, mv);
}
statusObj = std::move(mv.get_obj());
}
}
};
struct StatusRequest {
constexpr static FileIdentifier file_identifier = 14419140;
ReplyPromise<struct StatusReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct GetClientWorkersRequest {
constexpr static FileIdentifier file_identifier = 10771791;
ReplyPromise<std::vector<ClientWorkerInterface>> reply;
GetClientWorkersRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct ForceRecoveryRequest {
constexpr static FileIdentifier file_identifier = 14821350;
Key dcId;
ReplyPromise<Void> reply;
ForceRecoveryRequest() {}
explicit ForceRecoveryRequest(Key dcId) : dcId(dcId) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, dcId, reply);
}
};
// Request to move a keyrange (shard) to a new team represented as addresses.
struct MoveShardRequest {
constexpr static FileIdentifier file_identifier = 2799592;
KeyRange shard;
std::vector<NetworkAddress> addresses;
ReplyPromise<Void> reply;
MoveShardRequest() {}
MoveShardRequest(KeyRange shard, std::vector<NetworkAddress> addresses)
: shard{ std::move(shard) }, addresses{ std::move(addresses) } {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, shard, addresses, reply);
}
};
// Request to trigger a master recovery, and during the following recovery, the system metadata will be
// reconstructed from TLogs, and written to a new SS team.
// This is used when metadata on SSes are lost or corrupted.
struct RepairSystemDataRequest {
constexpr static FileIdentifier file_identifier = 2799593;
ReplyPromise<Void> reply;
RepairSystemDataRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
// Returns the actual shards generated by the SplitShardRequest.
struct SplitShardReply {
constexpr static FileIdentifier file_identifier = 1384440;
std::vector<KeyRange> shards;
SplitShardReply() {}
explicit SplitShardReply(std::vector<KeyRange> shards) : shards{ std::move(shards) } {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, shards);
}
};
// Split keyrange [shard.begin, shard.end) into num shards.
// Split points are chosen as the arithmetically equal division points of the given range.
struct SplitShardRequest {
constexpr static FileIdentifier file_identifier = 1384443;
KeyRange shard;
int num;
ReplyPromise<SplitShardReply> reply;
SplitShardRequest() : num(0) {}
SplitShardRequest(KeyRange shard, int num) : shard{ std::move(shard) }, num(num) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, shard, num, reply);
}
};
#endif

View File

@ -665,7 +665,7 @@ public:
int FETCH_KEYS_LOWER_PRIORITY;
int FETCH_CHANGEFEED_PARALLELISM;
int SERVE_FETCH_CHECKPOINT_PARALLELISM;
int SERVE_VALIDATE_STORAGE_PARALLELISM;
int SERVE_AUDIT_STORAGE_PARALLELISM;
int BUGGIFY_BLOCK_BYTES;
int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT;
double STORAGE_DURABILITY_LAG_REJECT_THRESHOLD;

View File

@ -1290,18 +1290,18 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
return Void();
}
ACTOR Future<Void> validateStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
ACTOR Future<Void> scheduleAuditForRange(UID auditId,
KeyRange range,
AuditType type,
Reference<ActorCollection> actors,
Reference<KeyRangeMap<AuditPhase>> auditMap);
ACTOR Future<Void> doAuditStorage(Reference<ActorCollection> actors,
Reference<KeyRangeMap<AuditPhase>> auditMap,
StorageServerInterface ssi,
AuditStorageRequest req);
Reference<KeyRangeMap<AuditPhase>> auditMap,
StorageServerInterface ssi,
AuditStorageRequest req);
ACTOR Future<Void> validateStorage(Reference<DataDistributor> self, TriggerAuditRequest req) {
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req) {
// TODO(heliu): Load running audit, and create one if no audit is running.
state Reference<KeyRangeMap<AuditPhase>> auditMap =
makeReference<KeyRangeMap<AuditPhase>>(AuditPhase::Invalid, allKeys.end);
@ -1311,20 +1311,21 @@ ACTOR Future<Void> validateStorage(Reference<DataDistributor> self, TriggerAudit
Ranges f = auditMap.intersectingRanges(req.range);
for (auto it = f.begin(); it != f.end(); ++it) {
if (it->value() == AuditPhase::Invalid || it->value() == AuditPhase::Error) {
subReq.range = KeyRangeRef(it->range().begin, it->range().end);
actors->add(scheduleAuditForRange(
auditId, KeyRangeRef(it->range().begin, it->range().end), req.getType(), actors, auditMap));
}
}
req.reply.send(auditId);
try {
wait(actors.getResult());
req.reply.send(Void());
// TODO(heliu): Set the audit result, and clear auditId.
} catch (Error& e) {
TraceEvent(SevWarnAlways, "DDValidateStorageError", req.requestId)
TraceEvent(SevWarnAlways, "DDAuditStorageOperationError", auditId)
.errorUnsuppressed(e)
.detail("Range", req.range);
req.reply.sendError(e);
.detail("Range", req.range)
.detail("Type", req.type);
}
return Void();
@ -1335,18 +1336,18 @@ ACTOR Future<Void> scheduleAuditForRange(UID auditId,
AuditType type,
Reference<ActorCollection> actors,
Reference<KeyRangeMap<AuditPhase>> auditMap) {
state Key begin = req.range.begin;
// TODO(heliu): Load the audit map, or make auditStorage a streaming call, to skip audited ranges.
state Key begin = range.begin;
while (begin < req.range.end) {
while (begin < range.end) {
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
std::cout << "1" << std::endl;
state RangeResult shards = wait(krmGetRanges(&tr,
keyServersPrefix,
KeyRangeRef(begin, req.end),
KeyRangeRef(begin, range.end),
SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT,
SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT));
ASSERT(!shards.empty());
@ -1356,21 +1357,25 @@ ACTOR Future<Void> scheduleAuditForRange(UID auditId,
state int i = 0;
for (i = 0; i < shards.size() - 1; ++i) {
std::cout << "2" << std::endl;
std::vector<UID> src;
std::vector<UID> dest;
UID srcId, destId;
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
const int idx = deterministicRandom()->randomInt(0, src.size());
Optional<Value> serverListValue = wait(tr.get(serverListKeyFor(src[idx])));
ASSERT(serverListValue.present());
state UID ssId = serverListKeyFor(src[deterministicRandom()->randomInt(0, src.size())]);
Optional<Value> serverListValue = wait(tr.get(serverListKeyFor(ssId)));
KeyRangeRef currentRange(shards[i].key, shards[i + 1].key);
if (!serverListValue.present()) {
TraceEvent(SevWarnAlways, "ScheduleAuditForRangeMissingServer", auditId)
.detail("StorageServer", ssId)
.detail("Range", currentRange);
break;
}
const StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
AuditStorageRequest req(auditId,KeyRangeRef(shards[i].key, shards[i + 1].key), type);
const AuditStorageRequest req(auditId, currentRange, type);
actors->add(doAuditStorage(actors, auditMap, ssi, req));
begin = req.range.end;
std::cout << "3" << std::endl;
wait(delay(0.01));
}
} catch (Error& e) {
@ -1383,12 +1388,17 @@ ACTOR Future<Void> scheduleAuditForRange(UID auditId,
}
ACTOR Future<Void> doAuditStorage(Reference<ActorCollection> actors,
Reference<KeyRangeMap<AuditPhase>> auditMap,
StorageServerInterface ssi,
AuditStorageRequest req) {
Reference<KeyRangeMap<AuditPhase>> auditMap,
StorageServerInterface ssi,
AuditStorageRequest req) {
TraceEvent(SevDebug, "DDAuditStorageBegin", req.id)
.detail("Range", req.range)
.detail("Type", req.type)
.detail("StorageServer", ssi.toString());
try {
auditMap->insert(req.range, AuditPhase::Running);
ValidateStorageResult vResult = wait(ssi.validateStorage.getReply(req));
ValidateStorageResult vResult = wait(ssi.auditStorage.getReply(req));
TraceEvent e(vResult.error.empty() ? SevInfo : SevWarnAlways, "DDValidateStorageResult", req.requestId);
e.detail("Range", req.range);
e.detail("StorageServer", ssi.toString());
@ -1401,6 +1411,7 @@ ACTOR Future<Void> doAuditStorage(Reference<ActorCollection> actors,
.detail("Range", req.range)
.detail("StorageServer", ssi.toString());
if (e.code() != error_code_actor_cancelled) {
auditMap->insert(req.range, AuditPhase::Error);
actors.add(ScheduleAuditForRange(req.id, req.range, req.getType(), actors, auditMap));
}
}
@ -1476,8 +1487,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
when(GetStorageWigglerStateRequest req = waitNext(di.storageWigglerState.getFuture())) {
req.reply.send(getStorageWigglerStates(self));
}
when(TriggerAuditRequest req = waitNext(di.validateStorage.getFuture())) {
actors.add(validateStorage(self, req));
when(TriggerAuditRequest req = waitNext(di.auditStorage.getFuture())) {
actors.add(auditStorage(self, req));
}
}
} catch (Error& err) {

View File

@ -1007,7 +1007,7 @@ public:
FlowLock serveFetchCheckpointParallelismLock;
FlowLock serveValidateStorageParallelismLock;
FlowLock serveAuditStorageParallelismLock;
int64_t instanceID;
@ -1186,10 +1186,10 @@ public:
return self->serveFetchCheckpointParallelismLock.waiters();
});
specialCounter(cc, "ServeValidateStorageActive", [self]() {
return self->serveValidateStorageParallelismLock.activePermits();
return self->serveAuditStorageParallelismLock.activePermits();
});
specialCounter(cc, "ServeValidateStorageWaiting", [self]() {
return self->serveValidateStorageParallelismLock.waiters();
return self->serveAuditStorageParallelismLock.waiters();
});
specialCounter(cc, "QueryQueueMax", [self]() { return self->getAndResetMaxQueryQueueSize(); });
specialCounter(cc, "BytesStored", [self]() { return self->metrics.byteSample.getEstimate(allKeys); });
@ -1247,7 +1247,7 @@ public:
fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
serveValidateStorageParallelismLock(SERVER_KNOBS->SERVE_VALIDATE_STORAGE_PARALLELISM),
serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this),
@ -3604,13 +3604,12 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
.detail("Version", version)
.detail("ErrorMessage", error)
.detail("RemoteServer", remoteServer.toString());
throw validate_storage_error();
}
TraceEvent(SevDebug, "ServeValidateRangeAgainstServerEnd", data->thisServerID)
.detail("Range", range)
.detail("Version", version)
.detail("ValidatedKeys", validatedKeys)
.detail("ValidatedKeys", validatedKeys)
.detail("Servers", remoteServer.toString());
return Void();
@ -3673,49 +3672,49 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, KeyRange range, std::
return Void();
}
ACTOR Future<Void> validateStorageQ(StorageServer* self, AuditStorageRequest req) {
wait(self->serveValidateStorageParallelismLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holder(self->serveValidateStorageParallelismLock);
ACTOR Future<Void> auditStorageQ(StorageServer* self, AuditStorageRequest req) {
wait(self->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holder(self->serveAuditStorageParallelismLock);
TraceEvent("ServeValidateStorageBegin", self->thisServerID)
.detail("RequestID", req.requestId)
.detail("Range", req.range);
TraceEvent("ServeAuditStorageBegin", self->thisServerID)
.detail("RequestID", req.id)
.detail("Range", req.range)
.detail("Type", req.type);
state Transaction tr(self->cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
try {
state RangeResult shards = wait(krmGetRanges(&tr,
keyServersPrefix,
req.range,
SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT,
SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT));
ASSERT(!shards.empty() && !shards.more);
try {
loop {
try {
state RangeResult shards = wait(krmGetRanges(&tr,
keyServersPrefix,
req.range,
SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT,
SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT));
ASSERT(!shards.empty() && !shards.more);
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
std::vector<Future<Void>> fs;
for (int i = 0; i < shards.size() - 1; ++i) {
std::vector<UID> src;
std::vector<UID> dest;
UID srcId, destId;
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
fs.push_back(validateRangeShard(self, KeyRangeRef(shards[i].key, shards[i + 1].key), src));
}
std::vector<Future<Void>> fs;
for (int i = 0; i < shards.size() - 1; ++i) {
std::vector<UID> src;
std::vector<UID> dest;
UID srcId, destId;
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
fs.push_back(validateRangeShard(self, KeyRangeRef(shards[i].key, shards[i + 1].key), src));
}
wait(waitForAll(fs));
req.reply.send(ValidateStorageResult(req.requestId));
break;
} catch (Error& e) {
if (e.code() == error_code_validate_storage_error) {
req.reply.sendError(e);
wait(waitForAll(fs));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
wait(tr.onError(e));
}
} catch (Error& e) {
req.reply.sendError(audit_storage_failed());
}
return Void();
@ -10328,8 +10327,8 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) {
self->actors.add(fetchCheckpointKeyValuesQ(self, req));
}
when(AuditStorageRequest req = waitNext(ssi.validateStorage.getFuture())) {
self->actors.add(validateStorageQ(self, req));
when(AuditStorageRequest req = waitNext(ssi.auditStorage.getFuture())) {
self->actors.add(auditStorageQ(self, req));
}
when(wait(updateProcessStatsTimer)) {
updateProcessStats(self);

View File

@ -145,7 +145,7 @@ struct ValidateStorage : TestWorkload {
const StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
AuditStorageRequest req(deterministicRandom()->randomUniqueID(),
KeyRangeRef(shards[i].key, shards[i + 1].key));
ValidateStorageResult vResult = wait(ssi.validateStorage.getReply(req));
ValidateStorageResult vResult = wait(ssi.auditStorage.getReply(req));
std::cout << "3" << std::endl;
}

View File

@ -125,7 +125,7 @@ ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different clu
ERROR( restart_cluster_controller, 1218, "Restart cluster controller process" )
ERROR( please_reboot_remote_kv_store, 1219, "Need to reboot the storage engine process as it died abnormally")
ERROR( incompatible_software_version, 1220, "Current software does not support database format" )
ERROR( validate_storage_error, 1221, "Validate storage consistency error" )
ERROR( audit_storage_failed, 1221, "Validate storage consistency error" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -0,0 +1,12 @@
[configuration]
config = 'triple'
generateFearless = true
allowDefaultTenant = false
machineCount = 15
[[test]]
testTitle = 'ValidateStorageWorkload'
useDB = true
[[test.workload]]
testName = 'ValidateStorageWorkload'