Merge pull request #8075 from liquid-helium/validate-data-consistency

Validate data consistency
This commit is contained in:
Jingyu Zhou 2022-10-13 09:20:01 -07:00 committed by GitHub
commit e4752309a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1061 additions and 8 deletions

View File

@ -2356,6 +2356,21 @@ ACTOR Future<Void> forceRecovery(Reference<IClusterConnectionRecord> clusterFile
}
}
ACTOR Future<UID> auditStorage(Reference<IClusterConnectionRecord> clusterFile, KeyRange range, AuditType type) {
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);
loop {
while (!clusterInterface->get().present()) {
wait(clusterInterface->onChange());
}
UID auditId = wait(clusterInterface->get().get().triggerAudit.getReply(TriggerAuditRequest(type, range)));
TraceEvent(SevDebug, "ManagementAPIAuditStorageEnd").detail("AuditID", auditId);
return auditId;
}
}
ACTOR Future<Void> waitForPrimaryDC(Database cx, StringRef dcId) {
state ReadYourWritesTransaction tr(cx);

View File

@ -752,6 +752,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( FETCH_KEYS_PARALLELISM_FULL, 6 );
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 );
init( SERVE_AUDIT_STORAGE_PARALLELISM, 2 );
init( CHANGE_FEED_DISK_READS_PARALLELISM, 1000 ); if( randomize && BUGGIFY ) CHANGE_FEED_DISK_READS_PARALLELISM = 20;
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );

View File

@ -286,6 +286,41 @@ const KeyRangeRef writeConflictRangeKeysRange = KeyRangeRef("\xff\xff/transactio
const KeyRef clusterIdKey = "\xff/clusterId"_sr;
const KeyRangeRef auditRange = KeyRangeRef("\xff/audit/"_sr, "\xff/audit0"_sr);
const KeyRef auditPrefix = auditRange.begin;
const Key auditRangeKey(const AuditType type, const UID& auditId, const KeyRef& key) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(auditPrefix);
wr << static_cast<uint8_t>(type);
wr.serializeBytes("/"_sr);
wr << auditId;
wr.serializeBytes("/"_sr);
wr.serializeBytes(key);
return wr.toValue();
}
const Key auditRangePrefix(const AuditType type, const UID& auditId) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(auditPrefix);
wr << static_cast<uint8_t>(type);
wr.serializeBytes("/"_sr);
wr << auditId;
wr.serializeBytes("/"_sr);
return wr.toValue();
}
const Value auditStorageStateValue(const AuditStorageState& auditStorageState) {
return ObjectWriter::toValue(auditStorageState, IncludeVersion());
}
AuditStorageState decodeAuditStorageState(const ValueRef& value) {
AuditStorageState auditState;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(auditState);
return auditState;
}
const KeyRef checkpointPrefix = "\xff/checkpoint/"_sr;
const Key checkpointKeyFor(UID checkpointID) {

View File

@ -0,0 +1,111 @@
/*
* Audit.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_AUDIT_H
#define FDBCLIENT_AUDIT_H
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
enum class AuditPhase : uint8_t {
Invalid = 0,
Running = 1,
Complete = 2,
Error = 3,
Failed = 4,
};
enum class AuditType : uint8_t {
Invalid = 0,
ValidateHA = 1,
};
struct AuditStorageState {
constexpr static FileIdentifier file_identifier = 13804340;
AuditStorageState() = default;
AuditStorageState(UID id, AuditType type) : id(id), type(static_cast<uint8_t>(type)) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, type, phase, error);
}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
void setPhase(AuditPhase phase) { this->phase = static_cast<uint8_t>(phase); }
AuditPhase getPhase() const { return static_cast<AuditPhase>(this->phase); }
UID id;
uint8_t type;
uint8_t phase;
std::string error;
};
struct AuditStorageRequest {
constexpr static FileIdentifier file_identifier = 13804341;
AuditStorageRequest() = default;
AuditStorageRequest(UID id, KeyRange range, AuditType type)
: id(id), range(range), type(static_cast<uint8_t>(type)) {}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, range, type, targetServers, reply);
}
UID id;
KeyRange range;
uint8_t type;
std::vector<UID> targetServers;
ReplyPromise<AuditStorageState> reply;
};
// Triggers an audit of the specific type, an audit id is returned if an audit is scheduled successfully.
// If there is an running audit, the corresponding id will be returned, unless force is true;
// When force is set, the ongoing audit will be cancelled, and a new audit will be scheduled.
struct TriggerAuditRequest {
constexpr static FileIdentifier file_identifier = 1384445;
TriggerAuditRequest() = default;
TriggerAuditRequest(AuditType type, KeyRange range)
: type(static_cast<uint8_t>(type)), range(range), force(false), async(false) {}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, type, range, force, async, reply);
}
uint8_t type;
KeyRange range;
bool force;
bool async;
ReplyPromise<UID> reply;
};
#endif

View File

@ -0,0 +1,34 @@
/*
* AuditUtils.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_AUDITUTILS_ACTOR_H
#define FDBCLIENT_AUDITUTILS_ACTOR_H
#pragma once
#include "fdbclient/Audit.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "flow/actorcompiler.h" // has to be last include
ACTOR Future<Void> persistAuditStorageState(Key key, AuditStorageState auditState);
#include "flow/unactorcompiler.h"
#endif

View File

@ -40,6 +40,7 @@ struct ClusterInterface {
RequestStream<struct MoveShardRequest> moveShard;
RequestStream<struct RepairSystemDataRequest> repairSystemData;
RequestStream<struct SplitShardRequest> splitShard;
RequestStream<struct TriggerAuditRequest> triggerAudit;
bool operator==(ClusterInterface const& r) const { return id() == r.id(); }
bool operator!=(ClusterInterface const& r) const { return id() != r.id(); }
@ -51,7 +52,7 @@ struct ClusterInterface {
databaseStatus.getFuture().isReady() || ping.getFuture().isReady() ||
getClientWorkers.getFuture().isReady() || forceRecovery.getFuture().isReady() ||
moveShard.getFuture().isReady() || repairSystemData.getFuture().isReady() ||
splitShard.getFuture().isReady();
splitShard.getFuture().isReady() || triggerAudit.getFuture().isReady();
}
void initEndpoints() {
@ -64,6 +65,7 @@ struct ClusterInterface {
moveShard.getEndpoint(TaskPriority::ClusterController);
repairSystemData.getEndpoint(TaskPriority::ClusterController);
splitShard.getEndpoint(TaskPriority::ClusterController);
triggerAudit.getEndpoint(TaskPriority::ClusterController);
}
template <class Ar>
@ -77,7 +79,8 @@ struct ClusterInterface {
forceRecovery,
moveShard,
repairSystemData,
splitShard);
splitShard,
triggerAudit);
}
};

View File

@ -138,6 +138,9 @@ ACTOR Future<int> setDDMode(Database cx, int mode);
ACTOR Future<Void> forceRecovery(Reference<IClusterConnectionRecord> clusterFile, Standalone<StringRef> dcId);
// Start an audit on range of the specific type.
ACTOR Future<UID> auditStorage(Reference<IClusterConnectionRecord> clusterFile, KeyRange range, AuditType type);
ACTOR Future<Void> printHealthyZone(Database cx);
ACTOR Future<bool> clearHealthyZone(Database cx, bool printWarning = false, bool clearSSFailureZoneString = false);
ACTOR Future<bool> setHealthyZone(Database cx, StringRef zoneId, double seconds, bool printWarning = false);

View File

@ -703,6 +703,7 @@ public:
int FETCH_KEYS_PARALLELISM_FULL;
int FETCH_KEYS_LOWER_PRIORITY;
int SERVE_FETCH_CHECKPOINT_PARALLELISM;
int SERVE_AUDIT_STORAGE_PARALLELISM;
int CHANGE_FEED_DISK_READS_PARALLELISM;
int BUGGIFY_BLOCK_BYTES;
int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT;

View File

@ -22,6 +22,7 @@
#define FDBCLIENT_STORAGESERVERINTERFACE_H
#pragma once
#include "fdbclient/Audit.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageCheckpoint.h"
#include "fdbclient/StorageServerShard.h"
@ -120,8 +121,8 @@ struct StorageServerInterface {
RequestStream<struct GetCheckpointRequest> checkpoint;
RequestStream<struct FetchCheckpointRequest> fetchCheckpoint;
RequestStream<struct FetchCheckpointKeyValuesRequest> fetchCheckpointKeyValues;
RequestStream<struct UpdateCommitCostRequest> updateCommitCostRequest;
RequestStream<struct AuditStorageRequest> auditStorage;
private:
bool acceptingRequests;
@ -194,6 +195,8 @@ public:
getValue.getEndpoint().getAdjustedEndpoint(21));
updateCommitCostRequest =
RequestStream<struct UpdateCommitCostRequest>(getValue.getEndpoint().getAdjustedEndpoint(22));
auditStorage =
RequestStream<struct AuditStorageRequest>(getValue.getEndpoint().getAdjustedEndpoint(23));
}
} else {
ASSERT(Ar::isDeserializing);
@ -245,6 +248,7 @@ public:
streams.push_back(fetchCheckpoint.getReceiver());
streams.push_back(fetchCheckpointKeyValues.getReceiver());
streams.push_back(updateCommitCostRequest.getReceiver());
streams.push_back(auditStorage.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
};

View File

@ -94,6 +94,13 @@ void decodeKeyServersValue(RangeResult result,
extern const KeyRef clusterIdKey;
extern const KeyRangeRef auditRange;
extern const KeyRef auditPrefix;
const Key auditRangeKey(const AuditType type, const UID& auditId, const KeyRef& key);
const Key auditRangePrefix(const AuditType type, const UID& auditId);
const Value auditStorageStateValue(const AuditStorageState& auditStorageState);
AuditStorageState decodeAuditStorageState(const ValueRef& value);
// "\xff/checkpoint/[[UID]] := [[CheckpointMetaData]]"
extern const KeyRef checkpointPrefix;
const Key checkpointKeyFor(UID checkpointID);

View File

@ -2013,6 +2013,53 @@ ACTOR Future<Void> handleForcedRecoveries(ClusterControllerData* self, ClusterCo
}
}
ACTOR Future<Void> triggerAuditStorage(ClusterControllerData* self, TriggerAuditRequest req) {
TraceEvent(SevInfo, "CCTriggerAuditStorageBegin", self->id)
.detail("Range", req.range)
.detail("AuditType", req.type);
state UID auditId;
try {
while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ||
!self->db.serverInfo->get().distributor.present()) {
wait(self->db.serverInfo->onChange());
}
TriggerAuditRequest fReq(req.getType(), req.range);
UID auditId_ = wait(self->db.serverInfo->get().distributor.get().triggerAudit.getReply(fReq));
auditId = auditId_;
TraceEvent(SevDebug, "CCTriggerAuditStorageEnd", self->id)
.detail("AuditID", auditId)
.detail("Range", req.range)
.detail("AuditType", req.type);
if (!req.reply.isSet()) {
req.reply.send(auditId);
}
} catch (Error& e) {
TraceEvent(SevDebug, "CCTriggerAuditStorageError", self->id)
.errorUnsuppressed(e)
.detail("AuditID", auditId)
.detail("Range", req.range)
.detail("AuditType", req.type);
if (!req.reply.isSet()) {
req.reply.sendError(audit_storage_failed());
}
}
return Void();
}
ACTOR Future<Void> handleTriggerAuditStorage(ClusterControllerData* self, ClusterControllerFullInterface interf) {
loop {
TriggerAuditRequest req = waitNext(interf.clientInterface.triggerAudit.getFuture());
TraceEvent(SevDebug, "TriggerAuditStorageReceived", self->id)
.detail("ClusterControllerDcId", self->clusterControllerDcId)
.detail("Range", req.range)
.detail("AuditType", req.type);
self->addActor.send(triggerAuditStorage(self, req));
}
}
struct SingletonRecruitThrottler {
double lastRecruitStart;
@ -2782,6 +2829,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(updatedChangedDatacenters(&self));
self.addActor.send(updateDatacenterVersionDifference(&self));
self.addActor.send(handleForcedRecoveries(&self, interf));
self.addActor.send(handleTriggerAuditStorage(&self, interf));
self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self));
self.addActor.send(monitorBlobManager(&self));

View File

@ -106,6 +106,62 @@ class DDTxnProcessorImpl {
return IDDTxnProcessor::SourceServers{ std::vector<UID>(servers.begin(), servers.end()), completeSources };
}
ACTOR static Future<std::vector<IDDTxnProcessor::DDRangeLocations>> getSourceServerInterfacesForRange(
Database cx,
KeyRangeRef range) {
state std::vector<IDDTxnProcessor::DDRangeLocations> res;
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
res.clear();
try {
state RangeResult shards = wait(krmGetRanges(&tr,
keyServersPrefix,
range,
SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT,
SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT));
ASSERT(!shards.empty());
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
state int i = 0;
for (i = 0; i < shards.size() - 1; ++i) {
state std::vector<UID> src;
std::vector<UID> dest;
UID srcId, destId;
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
std::vector<Future<Optional<Value>>> serverListEntries;
for (int j = 0; j < src.size(); ++j) {
serverListEntries.push_back(tr.get(serverListKeyFor(src[j])));
}
std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
IDDTxnProcessor::DDRangeLocations current(KeyRangeRef(shards[i].key, shards[i + 1].key));
for (int j = 0; j < serverListValues.size(); ++j) {
if (!serverListValues[j].present()) {
TraceEvent(SevWarnAlways, "GetSourceServerInterfacesMissing")
.detail("StorageServer", src[j])
.detail("Range", KeyRangeRef(shards[i].key, shards[i + 1].key));
continue;
}
StorageServerInterface ssi = decodeServerListValue(serverListValues[j].get());
current.servers[ssi.locality.describeDcId()].push_back(ssi);
}
res.push_back(current);
}
break;
} catch (Error& e) {
TraceEvent(SevWarnAlways, "GetSourceServerInterfacesError").errorUnsuppressed(e).detail("Range", range);
wait(tr.onError(e));
}
}
return res;
}
// set the system key space
ACTOR static Future<Void> updateReplicaKeys(Database cx,
std::vector<Optional<Key>> primaryDcId,
@ -537,6 +593,11 @@ Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(
return DDTxnProcessorImpl::getSourceServersForRange(cx, range);
}
Future<std::vector<IDDTxnProcessor::DDRangeLocations>> DDTxnProcessor::getSourceServerInterfacesForRange(
const KeyRangeRef range) {
return DDTxnProcessorImpl::getSourceServerInterfacesForRange(cx, range);
}
Future<ServerWorkerInfos> DDTxnProcessor::getServerListAndProcessClasses() {
return DDTxnProcessorImpl::getServerListAndProcessClasses(cx);
}

View File

@ -21,6 +21,7 @@
#include <set>
#include <string>
#include "fdbclient/Audit.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
@ -52,6 +53,17 @@
#include "fdbserver/DDSharedContext.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct DDAudit {
DDAudit(UID id, KeyRange range, AuditType type)
: id(id), range(range), type(type), auditMap(AuditPhase::Invalid, allKeys.end), actors(true) {}
const UID id;
KeyRange range;
const AuditType type;
KeyRangeMap<AuditPhase> auditMap;
ActorCollection actors;
};
void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority) {
if (!valid) {
if (shard.hasDest && shard.destId != anonymousShardId) {
@ -276,6 +288,10 @@ public:
StorageQuotaInfo storageQuotaInfo;
Promise<Void> initialized;
std::unordered_map<AuditType, std::vector<std::shared_ptr<DDAudit>>> audits;
DataDistributor(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id, Reference<DDSharedContext> context)
: dbInfo(db), context(context), ddId(id), txnProcessor(nullptr),
initialDDEventHolder(makeReference<EventCacheHolder>("InitialDD")),
@ -577,6 +593,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state bool ddIsTenantAware = SERVER_KNOBS->DD_TENANT_AWARENESS_ENABLED;
loop {
trackerCancelled = false;
self->initialized = Promise<Void>();
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
@ -594,7 +611,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state PromiseStream<GetTopKMetricsRequest> getTopKShardMetrics;
state Reference<AsyncVar<bool>> processingUnhealthy(new AsyncVar<bool>(false));
state Reference<AsyncVar<bool>> processingWiggle(new AsyncVar<bool>(false));
state Promise<Void> readyToStart;
state Optional<Reference<TenantCache>> ddTenantCache;
if (ddIsTenantAware) {
@ -636,7 +652,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
getTopKShardMetrics.getFuture(),
getShardMetricsList,
getAverageShardBytes.getFuture(),
readyToStart,
self->initialized,
anyZeroHealthyTeams,
self->ddId,
&shards,
@ -688,7 +704,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
self->configuration,
self->primaryDcId,
self->configuration.usableRegions > 1 ? self->remoteDcIds : std::vector<Optional<Key>>(),
readyToStart.getFuture(),
self->initialized.getFuture(),
zeroHealthyTeams[0],
IsPrimary::True,
processingUnhealthy,
@ -709,7 +725,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
self->configuration,
self->remoteDcIds,
Optional<std::vector<Optional<Key>>>(),
readyToStart.getFuture() && remoteRecovered(self->dbInfo),
self->initialized.getFuture() && remoteRecovered(self->dbInfo),
zeroHealthyTeams[1],
IsPrimary::False,
processingUnhealthy,
@ -1327,6 +1343,157 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
return Void();
}
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi,
AuditStorageRequest req);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req) {
// TODO(heliu): Load running audit, and create one if no audit is running.
state std::shared_ptr<DDAudit> audit;
auto it = self->audits.find(req.getType());
if (it != self->audits.end() && !it->second.empty()) {
ASSERT_EQ(it->second.size(), 1);
auto& currentAudit = it->second.front();
if (currentAudit->range.contains(req.range)) {
audit = it->second.front();
} else {
req.reply.sendError(audit_storage_exceeded_request_limit());
return Void();
}
} else {
const UID auditId = deterministicRandom()->randomUniqueID();
audit = std::make_shared<DDAudit>(auditId, req.range, req.getType());
self->audits[req.getType()].push_back(audit);
audit->actors.add(scheduleAuditForRange(self, audit, req.range));
TraceEvent(SevDebug, "DDAuditStorageBegin", audit->id).detail("Range", req.range).detail("AuditType", req.type);
}
if (req.async && !req.reply.isSet()) {
req.reply.send(audit->id);
}
try {
wait(audit->actors.getResult());
TraceEvent(SevDebug, "DDAuditStorageEnd", audit->id).detail("Range", req.range).detail("AuditType", req.type);
// TODO(heliu): Set the audit result, and clear auditId.
if (!req.async && !req.reply.isSet()) {
TraceEvent(SevDebug, "DDAuditStorageReply", audit->id)
.detail("Range", req.range)
.detail("AuditType", req.type);
req.reply.send(audit->id);
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "DDAuditStorageOperationError", audit->id)
.errorUnsuppressed(e)
.detail("Range", req.range)
.detail("AuditType", req.type);
}
return Void();
}
ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
TraceEvent(SevDebug, "DDScheduleAuditForRangeBegin", audit->id)
.detail("Range", range)
.detail("AuditType", audit->type);
// TODO(heliu): Load the audit map for `range`.
state Key begin = range.begin;
state KeyRange currentRange = range;
while (begin < range.end) {
currentRange = KeyRangeRef(begin, range.end);
// Find the first keyrange that hasn't been validated.
auto f = audit->auditMap.intersectingRanges(currentRange);
for (auto it = f.begin(); it != f.end(); ++it) {
if (it->value() != AuditPhase::Invalid && it->value() != AuditPhase::Failed) {
begin = it->range().end;
currentRange = KeyRangeRef(it->range().end, currentRange.end);
} else {
currentRange = KeyRangeRef(it->range().begin, it->range().end) & currentRange;
break;
}
}
try {
state std::vector<IDDTxnProcessor::DDRangeLocations> rangeLocations =
wait(self->txnProcessor->getSourceServerInterfacesForRange(currentRange));
state int i = 0;
for (i = 0; i < rangeLocations.size(); ++i) {
AuditStorageRequest req(audit->id, rangeLocations[i].range, audit->type);
if (audit->type == AuditType::ValidateHA && rangeLocations[i].servers.size() >= 2) {
auto it = rangeLocations[i].servers.begin();
const int idx = deterministicRandom()->randomInt(0, it->second.size());
StorageServerInterface& targetServer = it->second[idx];
++it;
for (; it != rangeLocations[i].servers.end(); ++it) {
const int idx = deterministicRandom()->randomInt(0, it->second.size());
req.targetServers.push_back(it->second[idx].id());
}
audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req));
}
begin = rangeLocations[i].range.end;
wait(delay(0.01));
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "DDScheduleAuditRangeError", audit->id)
.errorUnsuppressed(e)
.detail("Range", range);
if (e.code() == error_code_actor_cancelled) {
throw e;
}
}
}
return Void();
}
ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi,
AuditStorageRequest req) {
TraceEvent(SevDebug, "DDDoAuditOnStorageServerBegin", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
.detail("StorageServer", ssi.toString())
.detail("TargetServers", describe(req.targetServers));
try {
audit->auditMap.insert(req.range, AuditPhase::Running);
ErrorOr<AuditStorageState> vResult = wait(ssi.auditStorage.getReplyUnlessFailedFor(
req, /*sustainedFailureDuration=*/2.0, /*sustainedFailureSlope=*/0));
if (vResult.isError()) {
throw vResult.getError();
}
TraceEvent e(vResult.get().error.empty() ? SevInfo : SevWarnAlways, "DDAuditStorageState", req.id);
e.detail("Range", req.range);
e.detail("StorageServer", ssi.toString());
if (!vResult.get().error.empty()) {
e.detail("ErrorMessage", vResult.get().error);
}
} catch (Error& e) {
TraceEvent(SevWarn, "DDDoAuditOnStorageServerError", req.id)
.errorUnsuppressed(e)
.detail("Range", req.range)
.detail("StorageServer", ssi.toString())
.detail("TargetServers", describe(req.targetServers));
if (e.code() != error_code_actor_cancelled) {
audit->auditMap.insert(req.range, AuditPhase::Failed);
audit->actors.add(scheduleAuditForRange(self, audit, req.range));
}
}
return Void();
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<ServerDBInfo> const> db) {
state Reference<DDSharedContext> context(new DDSharedContext(di.id()));
state Reference<DataDistributor> self(new DataDistributor(db, di.id(), context));
@ -1393,6 +1560,9 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
when(GetStorageWigglerStateRequest req = waitNext(di.storageWigglerState.getFuture())) {
req.reply.send(getStorageWigglerStates(self));
}
when(TriggerAuditRequest req = waitNext(di.triggerAudit.getFuture())) {
actors.add(auditStorage(self, req));
}
}
} catch (Error& err) {
if (normalDataDistributorErrors().count(err.code()) == 0) {

View File

@ -1023,6 +1023,10 @@ public:
writeBatch->Put(metadataShard->cf,
getShardMappingKey(lastKey, shardMappingPrefix),
nextShard == nullptr ? "" : nextShard->physicalShard->id);
TraceEvent(SevDebug, "ShardedRocksDB", this->logId)
.detail("Action", "PersistRangeMappingEnd")
.detail("NextShardKey", lastKey)
.detail("Value", nextShard == nullptr ? "" : nextShard->physicalShard->id);
dirtyShards->insert(metadataShard.get());
}

View File

@ -44,11 +44,25 @@ public:
struct SourceServers {
std::vector<UID> srcServers, completeSources; // the same as RelocateData.src, RelocateData.completeSources;
};
struct DDRangeLocations {
DDRangeLocations() = default;
DDRangeLocations(KeyRangeRef range) : range(range) {}
// A map of dcId : list of servers
std::map<std::string, std::vector<StorageServerInterface>> servers;
KeyRange range;
};
virtual Database context() const = 0;
virtual bool isMocked() const = 0;
// get the source server list and complete source server list for range
virtual Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) { return SourceServers{}; };
virtual Future<std::vector<DDRangeLocations>> getSourceServerInterfacesForRange(const KeyRangeRef range) {
return std::vector<DDRangeLocations>();
}
// get the storage server list and Process class, only throw transaction non-retryable exceptions
virtual Future<ServerWorkerInfos> getServerListAndProcessClasses() = 0;
@ -142,6 +156,9 @@ public:
Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) override;
Future<std::vector<IDDTxnProcessor::DDRangeLocations>> getSourceServerInterfacesForRange(
const KeyRangeRef range) override;
// Call NativeAPI implementation directly
Future<ServerWorkerInfos> getServerListAndProcessClasses() override;

View File

@ -37,6 +37,7 @@ struct DataDistributorInterface {
RequestStream<struct GetDataDistributorMetricsRequest> dataDistributorMetrics;
RequestStream<struct DistributorSplitRangeRequest> distributorSplitRange;
RequestStream<struct GetStorageWigglerStateRequest> storageWigglerState;
RequestStream<struct TriggerAuditRequest> triggerAudit;
DataDistributorInterface() {}
explicit DataDistributorInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
@ -58,7 +59,8 @@ struct DataDistributorInterface {
distributorExclCheckReq,
dataDistributorMetrics,
distributorSplitRange,
storageWigglerState);
storageWigglerState,
triggerAudit);
}
};

View File

@ -27,6 +27,7 @@
#include "fdbclient/BlobGranuleCommon.h"
#include "flow/ApiVersion.h"
#include "fmt/format.h"
#include "fdbclient/Audit.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
@ -1015,6 +1016,8 @@ public:
FlowLock serveFetchCheckpointParallelismLock;
FlowLock serveAuditStorageParallelismLock;
int64_t instanceID;
Promise<Void> otherError;
@ -1224,6 +1227,12 @@ public:
specialCounter(cc, "ServeFetchCheckpointWaiting", [self]() {
return self->serveFetchCheckpointParallelismLock.waiters();
});
specialCounter(cc, "ServeValidateStorageActive", [self]() {
return self->serveAuditStorageParallelismLock.activePermits();
});
specialCounter(cc, "ServeValidateStorageWaiting", [self]() {
return self->serveAuditStorageParallelismLock.waiters();
});
specialCounter(
cc, "ChangeFeedDiskReadsActive", [self]() { return self->changeFeedDiskReadsLock.activePermits(); });
specialCounter(
@ -1290,6 +1299,7 @@ public:
changeFeedDiskReadsLock(SERVER_KNOBS->CHANGE_FEED_DISK_READS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()),
@ -4138,6 +4148,322 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector<Optional<Tuple>>& vec,
return mappedKeyTuple.pack();
}
ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
KeyRange range,
Version version,
StorageServerInterface remoteServer) {
TraceEvent(SevInfo, "ValidateRangeAgainstServerBegin", data->thisServerID)
.detail("Range", range)
.detail("Version", version)
.detail("RemoteServer", remoteServer.toString());
state int validatedKeys = 0;
state std::string error;
loop {
try {
std::vector<Future<ErrorOr<GetKeyValuesReply>>> fs;
int limit = 1e4;
int limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
GetKeyValuesRequest req;
req.begin = firstGreaterOrEqual(range.begin);
req.end = firstGreaterOrEqual(range.end);
req.limit = limit;
req.limitBytes = limitBytes;
req.version = version;
req.tags = TagSet();
fs.push_back(remoteServer.getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
GetKeyValuesRequest localReq;
localReq.begin = firstGreaterOrEqual(range.begin);
localReq.end = firstGreaterOrEqual(range.end);
localReq.limit = limit;
localReq.limitBytes = limitBytes;
localReq.version = version;
localReq.tags = TagSet();
data->actors.add(getKeyValuesQ(data, localReq));
fs.push_back(errorOr(localReq.reply.getFuture()));
std::vector<ErrorOr<GetKeyValuesReply>> reps = wait(getAll(fs));
for (int i = 0; i < reps.size(); ++i) {
if (reps[i].isError()) {
TraceEvent(SevWarn, "ValidateRangeGetKeyValuesError", data->thisServerID)
.errorUnsuppressed(reps[i].getError())
.detail("ReplyIndex", i)
.detail("Range", range);
throw reps[i].getError();
}
if (reps[i].get().error.present()) {
TraceEvent(SevWarn, "ValidateRangeGetKeyValuesError", data->thisServerID)
.errorUnsuppressed(reps[i].get().error.get())
.detail("ReplyIndex", i)
.detail("Range", range);
throw reps[i].get().error.get();
}
}
GetKeyValuesReply remote = reps[0].get(), local = reps[1].get();
Key lastKey = range.begin;
const int end = std::min(local.data.size(), remote.data.size());
int i = 0;
for (; i < end; ++i) {
KeyValueRef remoteKV = remote.data[i];
KeyValueRef localKV = local.data[i];
if (!range.contains(remoteKV.key) || !range.contains(localKV.key)) {
TraceEvent(SevDebug, "SSValidateRangeKeyOutOfRange", data->thisServerID)
.detail("Range", range)
.detail("RemoteServer", remoteServer.toString().c_str())
.detail("LocalKey", Traceable<StringRef>::toString(localKV.key).c_str())
.detail("RemoteKey", Traceable<StringRef>::toString(remoteKV.key).c_str());
throw wrong_shard_server();
}
if (remoteKV.key != localKV.key) {
error = format("Key Mismatch: local server (%016llx): %s, remote server(%016llx) %s",
data->thisServerID.first(),
Traceable<StringRef>::toString(localKV.key).c_str(),
remoteServer.uniqueID.first(),
Traceable<StringRef>::toString(remoteKV.key).c_str());
} else if (remoteKV.value != localKV.value) {
error = format("Value Mismatch for Key %s: local server (%016llx): %s, remote server(%016llx) %s",
Traceable<StringRef>::toString(localKV.key).c_str(),
data->thisServerID.first(),
Traceable<StringRef>::toString(localKV.value).c_str(),
remoteServer.uniqueID.first(),
Traceable<StringRef>::toString(remoteKV.value).c_str());
} else {
TraceEvent(SevVerbose, "ValidatedKey", data->thisServerID).detail("Key", localKV.key);
++validatedKeys;
}
lastKey = localKV.key;
}
if (!error.empty()) {
break;
}
if (!local.more && !remote.more && local.data.size() == remote.data.size()) {
break;
} else if (i >= local.data.size() && !local.more && i < remote.data.size()) {
error = format("Missing key(s) form local server (%lld), next key: %s, remote server(%016llx) ",
data->thisServerID.first(),
Traceable<StringRef>::toString(remote.data[i].key).c_str(),
remoteServer.uniqueID.first());
break;
} else if (i >= remote.data.size() && !remote.more && i < local.data.size()) {
error = format("Missing key(s) form remote server (%lld), next local server(%016llx) key: %s",
remoteServer.uniqueID.first(),
data->thisServerID.first(),
Traceable<StringRef>::toString(local.data[i].key).c_str());
break;
}
range = KeyRangeRef(keyAfter(lastKey), range.end);
} catch (Error& e) {
TraceEvent(SevWarnAlways, "ValidateRangeAgainstServerError", data->thisServerID)
.errorUnsuppressed(e)
.detail("RemoteServer", remoteServer.toString())
.detail("Range", range)
.detail("Version", version);
throw e;
}
}
if (!error.empty()) {
TraceEvent(SevError, "ValidateRangeAgainstServerError", data->thisServerID)
.detail("Range", range)
.detail("Version", version)
.detail("ErrorMessage", error)
.detail("RemoteServer", remoteServer.toString());
}
TraceEvent(SevDebug, "ValidateRangeAgainstServerEnd", data->thisServerID)
.detail("Range", range)
.detail("Version", version)
.detail("ValidatedKeys", validatedKeys)
.detail("Servers", remoteServer.toString());
return Void();
}
ACTOR Future<Void> validateRangeShard(StorageServer* data, KeyRange range, std::vector<UID> candidates) {
TraceEvent(SevDebug, "ServeValidateRangeShardBegin", data->thisServerID)
.detail("Range", range)
.detail("Servers", describe(candidates));
state Version version;
state std::vector<Optional<Value>> serverListValues;
state Transaction tr(data->cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
try {
std::vector<Future<Optional<Value>>> serverListEntries;
for (const UID& id : candidates) {
serverListEntries.push_back(tr.get(serverListKeyFor(id)));
}
std::vector<Optional<Value>> serverListValues_ = wait(getAll(serverListEntries));
serverListValues = serverListValues_;
Version version_ = wait(tr.getReadVersion());
version = version_;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
std::unordered_map<std::string, std::vector<StorageServerInterface>> ssis;
std::string thisDcId;
for (const auto& v : serverListValues) {
if (!v.present()) {
continue;
}
const StorageServerInterface ssi = decodeServerListValue(v.get());
if (ssi.uniqueID == data->thisServerID) {
thisDcId = ssi.locality.describeDcId();
}
ssis[ssi.locality.describeDcId()].push_back(ssi);
}
if (ssis.size() < 2) {
TraceEvent(SevWarn, "ServeValidateRangeShardNotHAConfig", data->thisServerID)
.detail("Range", range)
.detail("Servers", describe(candidates));
return Void();
}
StorageServerInterface* remoteServer = nullptr;
for (auto& [dcId, ssiList] : ssis) {
if (dcId != thisDcId) {
if (ssiList.empty()) {
break;
}
const int idx = deterministicRandom()->randomInt(0, ssiList.size());
remoteServer = &ssiList[idx];
break;
}
}
if (remoteServer != nullptr) {
wait(validateRangeAgainstServer(data, range, version, *remoteServer));
} else {
TraceEvent(SevWarn, "ServeValidateRangeShardRemoteNotFound", data->thisServerID)
.detail("Range", range)
.detail("Servers", describe(candidates));
throw audit_storage_failed();
}
return Void();
}
ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data, KeyRange range, std::vector<UID> targetServers) {
TraceEvent(SevDebug, "ValidateRangeAgainstServersBegin", data->thisServerID)
.detail("Range", range)
.detail("TargetServers", describe(targetServers));
state Version version;
state std::vector<Optional<Value>> serverListValues;
state Transaction tr(data->cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
try {
std::vector<Future<Optional<Value>>> serverListEntries;
for (const UID& id : targetServers) {
if (id != data->thisServerID) {
serverListEntries.push_back(tr.get(serverListKeyFor(id)));
}
}
std::vector<Optional<Value>> serverListValues_ = wait(getAll(serverListEntries));
serverListValues = serverListValues_;
Version version_ = wait(tr.getReadVersion());
version = version_;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
std::vector<Future<Void>> fs;
for (const auto& v : serverListValues) {
if (!v.present()) {
TraceEvent(SevWarn, "ValidateRangeRemoteServerNotFound", data->thisServerID).detail("Range", range);
throw audit_storage_failed();
}
fs.push_back(validateRangeAgainstServer(data, range, version, decodeServerListValue(v.get())));
}
wait(waitForAll(fs));
return Void();
}
ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
wait(data->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holder(data->serveAuditStorageParallelismLock);
TraceEvent(SevInfo, "ServeAuditStorageBegin", data->thisServerID)
.detail("RequestID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
.detail("TargetServers", describe(req.targetServers));
state Key begin = req.range.begin;
state std::vector<Future<Void>> fs;
try {
if (req.targetServers.empty()) {
while (begin < req.range.end) {
state Transaction tr(data->cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
state RangeResult shards = wait(krmGetRanges(&tr,
keyServersPrefix,
req.range,
SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT,
SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT));
ASSERT(!shards.empty());
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
for (int i = 0; i < shards.size() - 1; ++i) {
std::vector<UID> src;
std::vector<UID> dest;
UID srcId, destId;
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
fs.push_back(validateRangeShard(data, KeyRangeRef(shards[i].key, shards[i + 1].key), src));
begin = shards[i + 1].key;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
} else {
fs.push_back(validateRangeAgainstServers(data, req.range, req.targetServers));
}
wait(waitForAll(fs));
AuditStorageState res(req.id, req.getType());
res.setPhase(AuditPhase::Complete);
req.reply.send(res);
} catch (Error& e) {
TraceEvent(SevWarn, "ServeAuditStorageError", data->thisServerID)
.errorUnsuppressed(e)
.detail("RequestID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type);
req.reply.sendError(audit_storage_failed());
}
return Void();
}
TEST_CASE("/fdbserver/storageserver/constructMappedKey") {
Key key = Tuple::makeTuple("key-0"_sr, "key-1"_sr, "key-2"_sr).getDataAsStandalone();
Value value = Tuple::makeTuple("value-0"_sr, "value-1"_sr, "value-2"_sr).getDataAsStandalone();
@ -10351,6 +10677,9 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) {
self->actors.add(fetchCheckpointKeyValuesQ(self, req));
}
when(AuditStorageRequest req = waitNext(ssi.auditStorage.getFuture())) {
self->actors.add(auditStorageQ(self, req));
}
when(wait(updateProcessStatsTimer)) {
updateProcessStats(self);
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);

View File

@ -0,0 +1,189 @@
/*
* ValidateStorage.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/Audit.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include <cstdint>
#include <limits>
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
std::string printValue(const ErrorOr<Optional<Value>>& value) {
if (value.isError()) {
return value.getError().name();
}
return value.get().present() ? value.get().get().toString() : "Value Not Found.";
}
} // namespace
struct ValidateStorage : TestWorkload {
FlowLock startMoveKeysParallelismLock;
FlowLock finishMoveKeysParallelismLock;
FlowLock cleanUpDataMoveParallelismLock;
const bool enabled;
bool pass;
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
TraceEvent(SevError, "TestFailed")
.detail("ExpectedValue", printValue(expectedValue))
.detail("ActualValue", printValue(actualValue));
pass = false;
}
ValidateStorage(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
std::string description() const override { return "ValidateStorage"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
if (!enabled) {
return Void();
}
return _start(this, cx);
}
ACTOR Future<Void> _start(ValidateStorage* self, Database cx) {
TraceEvent("ValidateStorageTestBegin");
state std::map<Key, Value> kvs({ { "TestKeyA"_sr, "TestValueA"_sr },
{ "TestKeyB"_sr, "TestValueB"_sr },
{ "TestKeyC"_sr, "TestValueC"_sr },
{ "TestKeyD"_sr, "TestValueD"_sr },
{ "TestKeyE"_sr, "TestValueE"_sr },
{ "TestKeyF"_sr, "TestValueF"_sr } });
Version _ = wait(self->populateData(self, cx, &kvs));
TraceEvent("TestValueWritten");
wait(self->validateData(self, cx, KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr)));
TraceEvent("TestValueVerified");
loop {
try {
UID auditId = wait(auditStorage(cx->getConnectionRecord(), allKeys, AuditType::ValidateHA));
TraceEvent("TestValidateEnd").detail("AuditID", auditId);
break;
} catch (Error& e) {
TraceEvent("AuditStorageError").errorUnsuppressed(e);
wait(delay(1));
}
}
return Void();
}
ACTOR Future<Version> populateData(ValidateStorage* self, Database cx, std::map<Key, Value>* kvs) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state Version version;
state UID debugID;
loop {
debugID = deterministicRandom()->randomUniqueID();
try {
tr->debugTransaction(debugID);
for (const auto& [key, value] : *kvs) {
tr->set(key, value);
}
wait(tr->commit());
version = tr->getCommittedVersion();
break;
} catch (Error& e) {
TraceEvent("TestCommitError").errorUnsuppressed(e);
wait(tr->onError(e));
}
}
TraceEvent("PopulateTestDataDone")
.detail("CommitVersion", tr->getCommittedVersion())
.detail("DebugID", debugID);
return version;
}
ACTOR Future<Void> validateData(ValidateStorage* self, Database cx, KeyRange range) {
TraceEvent("TestValidateStorageBegin").detail("Range", range);
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
try {
state RangeResult shards =
wait(krmGetRanges(&tr, keyServersPrefix, range, CLIENT_KNOBS->TOO_MANY, CLIENT_KNOBS->TOO_MANY));
ASSERT(!shards.empty() && !shards.more);
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
state int i = 0;
for (i = 0; i < shards.size() - 1; ++i) {
std::vector<UID> src;
std::vector<UID> dest;
UID srcId, destId;
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
const int idx = deterministicRandom()->randomInt(0, src.size());
Optional<Value> serverListValue = wait(tr.get(serverListKeyFor(src[idx])));
ASSERT(serverListValue.present());
const StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
TraceEvent("TestValidateStorageSendingRequest")
.detail("Range", range)
.detail("StorageServer", ssi.toString());
AuditStorageRequest req(deterministicRandom()->randomUniqueID(),
KeyRangeRef(shards[i].key, shards[i + 1].key),
AuditType::ValidateHA);
Optional<AuditStorageState> vResult =
wait(timeout<AuditStorageState>(ssi.auditStorage.getReply(req), 5));
if (!vResult.present()) {
throw audit_storage_failed();
}
}
break;
} catch (Error& e) {
TraceEvent(SevWarnAlways, "TestValidateStorageError").errorUnsuppressed(e).detail("Range", range);
try {
wait(tr.onError(e));
} catch (Error& e) {
if (e.code() != error_code_audit_storage_failed) {
throw e;
}
}
}
}
TraceEvent("TestValidateStorageDone").detail("Range", range);
return Void();
}
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<ValidateStorage> ValidateStorageFactory("ValidateStorageWorkload");

View File

@ -129,6 +129,8 @@ ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different clu
ERROR( restart_cluster_controller, 1218, "Restart cluster controller process" )
ERROR( please_reboot_kv_store, 1219, "Need to reboot the storage engine")
ERROR( incompatible_software_version, 1220, "Current software does not support database format" )
ERROR( audit_storage_failed, 1221, "Validate storage consistency operation failed" )
ERROR( audit_storage_exceeded_request_limit, 1222, "Exceeded the max number of allowed concurrent audit storage requests" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -200,6 +200,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/WriteDuringRead.toml)
add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml)
add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT)
add_fdb_test(TEST_FILES fast/ValidateStorage.toml)
if (WITH_ROCKSDB_EXPERIMENTAL)
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)

View File

@ -0,0 +1,16 @@
[configuration]
config = 'triple'
storageEngineType = 5
generateFearless = true
allowDefaultTenant = false
machineCount = 45
[[knobs]]
shard_encode_location_metadata = true
[[test]]
testTitle = 'ValidateStorageWorkload'
useDB = true
[[test.workload]]
testName = 'ValidateStorageWorkload'