Validate Storage part II (#8471)

* Implemented AuditUtils.actor.cpp

Moved AuditUtils to fdbserver/

* Persist AuditStorageState.

* Passed persisted AuditStorageState test.

* Added audit_storage_error to indicate a corruption is caught.

Throw/Send audit_storage_error when there is a data corruption.

Added doAuditStorage() for resuming Audit.

* Load and resume AuditStorage when DD restarts.

* Generate audit id monotonically.

* Fixed minor issue AuditId/Type was not set.

* Adding getLatestAuditStates.

* Improved persisted errors and added AuditStorageCommand.actor.cpp for
fdbcli.

* Added `audit_storage` fdbcli command.

* fmt.

* Fixed null shared_ptr issue.

* Improve audit data.

* Change DDAuditFailed to SevWarn.

* Sev.

* set SERVE_AUDIT_STORAGE_PARALLELISM to 1.

* Moved AuditUtils* to fdbclient/.

* Added getAuditStatus fdbcli command.

* Refactor audit storage fdb cli commands.

* Added auditStorage in sim.

* Cleanup.

* Resolved comments.

* Resolved comments.

* Test disabling audit for sims.

* Cleanup.

Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
He Liu 2023-01-15 21:46:14 -08:00 committed by GitHub
parent 329882554e
commit 00203c8732
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 734 additions and 126 deletions

View File

@ -0,0 +1,72 @@
/*
* AuditStorageCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/Audit.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
ACTOR Future<UID> auditStorageCommandActor(Reference<IClusterConnectionRecord> clusterFile,
std::vector<StringRef> tokens) {
if (tokens.size() < 2) {
printUsage(tokens[0]);
return UID();
}
AuditType type = AuditType::Invalid;
if (tokencmp(tokens[1], "ha")) {
type = AuditType::ValidateHA;
} else {
printUsage(tokens[0]);
return UID();
}
Key begin, end;
if (tokens.size() == 2) {
begin = allKeys.begin;
end = allKeys.end;
} else if (tokens.size() == 3) {
begin = tokens[2];
} else if (tokens.size() == 4) {
begin = tokens[2];
end = tokens[3];
} else {
printUsage(tokens[0]);
return UID();
}
UID auditId = wait(auditStorage(clusterFile, KeyRangeRef(begin, end), type, true));
return auditId;
}
CommandFactory auditStorageFactory("audit_storage",
CommandHelp("audit_storage <ha> [BeginKey] [EndKey]",
"Start an audit storage",
"Trigger an audit storage, the auditID is returned.\n"));
} // namespace fdb_cli

View File

@ -0,0 +1,67 @@
/*
* GetAuditStatusCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/Audit.h"
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/IClientApi.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
ACTOR Future<bool> getAuditStatusCommandActor(Database cx, std::vector<StringRef> tokens) {
if (tokens.size() != 4) {
printUsage(tokens[0]);
return false;
}
AuditType type = AuditType::Invalid;
if (tokencmp(tokens[1], "ha")) {
type = AuditType::ValidateHA;
} else {
printUsage(tokens[0]);
return false;
}
if (tokencmp(tokens[2], "id")) {
const UID id = UID::fromString(tokens[3].toString());
AuditStorageState res = wait(getAuditState(cx, type, id));
printf("Audit result is:\n%s", res.toString().c_str());
} else if (tokencmp(tokens[2], "recent")) {
const int count = std::stoi(tokens[3].toString());
std::vector<AuditStorageState> res = wait(getLatestAuditStates(cx, type, count));
for (const auto& it : res) {
printf("Audit result is:\n%s\n", it.toString().c_str());
}
}
return true;
}
CommandFactory getAuditStatusFactory(
"get_audit_status",
CommandHelp("get_audit_status <ha> <id|recent> [ARGs]",
"Retrieve audit storage results of the specific type",
"Fetch audit result with an ID: get_audit_status [Type] id [ID];\n"
"Fetch most recent audit results: get_audit_status [Type] recent [Count].\n"));
} // namespace fdb_cli

View File

@ -1635,6 +1635,24 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
continue;
}
if (tokencmp(tokens[0], "audit_storage")) {
UID auditId = wait(makeInterruptable(auditStorageCommandActor(ccf, tokens)));
if (!auditId.isValid()) {
is_error = true;
} else {
printf("Started audit: %s\n", auditId.toString().c_str());
}
continue;
}
if (tokencmp(tokens[0], "get_audit_status")) {
bool _result = wait(makeInterruptable(getAuditStatusCommandActor(localDb, tokens)));
if (!_result) {
is_error = true;
}
continue;
}
if (tokencmp(tokens[0], "force_recovery_with_data_loss")) {
bool _result = wait(makeInterruptable(forceRecoveryWithDataLossCommandActor(db, tokens)));
if (!_result)

View File

@ -183,6 +183,11 @@ ACTOR Future<bool> fileConfigureCommandActor(Reference<IDatabase> db,
std::string filePath,
bool isNewDatabase,
bool force);
// Trigger audit storage
ACTOR Future<UID> auditStorageCommandActor(Reference<IClusterConnectionRecord> clusterFile,
std::vector<StringRef> tokens);
// Retrieve audit storage status
ACTOR Future<bool> getAuditStatusCommandActor(Database cx, std::vector<StringRef> tokens);
// force_recovery_with_data_loss command
ACTOR Future<bool> forceRecoveryWithDataLossCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// include command

View File

@ -0,0 +1,175 @@
/*
* AuditUtils.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/Audit.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ClientKnobs.h"
#include "flow/actorcompiler.h" // has to be last include
ACTOR static Future<std::vector<AuditStorageState>> getLatestAuditStatesImpl(Transaction* tr, AuditType type, int num) {
state std::vector<AuditStorageState> auditStates;
loop {
auditStates.clear();
try {
RangeResult res = wait(tr->getRange(auditKeyRange(type), num, Snapshot::False, Reverse::True));
for (int i = 0; i < res.size(); ++i) {
auditStates.push_back(decodeAuditStorageState(res[i].value));
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
return auditStates;
}
ACTOR Future<UID> persistNewAuditState(Database cx, AuditStorageState auditState) {
ASSERT(!auditState.id.isValid());
state Transaction tr(cx);
state UID auditId;
loop {
try {
std::vector<AuditStorageState> auditStates = wait(getLatestAuditStatesImpl(&tr, auditState.getType(), 1));
uint64_t nextId = 1;
if (!auditStates.empty()) {
nextId = auditStates.front().id.first() + 1;
}
auditId = UID(nextId, 0LL);
auditState.id = auditId;
tr.set(auditKey(auditState.getType(), auditId), auditStorageStateValue(auditState));
wait(tr.commit());
TraceEvent("PersistedNewAuditState", auditId).detail("AuditKey", auditKey(auditState.getType(), auditId));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return auditId;
}
ACTOR Future<Void> persistAuditState(Database cx, AuditStorageState auditState) {
state Transaction tr(cx);
loop {
try {
tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<AuditStorageState> getAuditState(Database cx, AuditType type, UID id) {
state Transaction tr(cx);
state Optional<Value> res;
loop {
try {
Optional<Value> res_ = wait(tr.get(auditKey(type, id)));
res = res_;
TraceEvent("ReadAuditState", id).detail("AuditKey", auditKey(type, id));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
if (!res.present()) {
throw key_not_found();
}
return decodeAuditStorageState(res.get());
}
ACTOR Future<std::vector<AuditStorageState>> getLatestAuditStates(Database cx, AuditType type, int num) {
Transaction tr(cx);
std::vector<AuditStorageState> auditStates = wait(getLatestAuditStatesImpl(&tr, type, num));
return auditStates;
}
ACTOR Future<Void> persistAuditStateMap(Database cx, AuditStorageState auditState) {
state Transaction tr(cx);
loop {
try {
wait(krmSetRange(
&tr, auditRangePrefixFor(auditState.id), auditState.range, auditStorageStateValue(auditState)));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<std::vector<AuditStorageState>> getAuditStateForRange(Database cx, UID id, KeyRange range) {
state RangeResult auditStates;
state Transaction tr(cx);
loop {
try {
RangeResult res_ = wait(krmGetRanges(&tr,
auditRangePrefixFor(id),
range,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES));
auditStates = res_;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
std::vector<AuditStorageState> res;
for (int i = 0; i < auditStates.size() - 1; ++i) {
KeyRange currentRange = KeyRangeRef(auditStates[i].key, auditStates[i + 1].key);
AuditStorageState auditState;
if (!auditStates[i].value.empty()) {
AuditStorageState auditState = decodeAuditStorageState(auditStates[i].value);
}
auditState.range = currentRange;
res.push_back(auditState);
}
return res;
}
StringRef auditTypeToString(const AuditType type) {
switch (type) {
case AuditType::Invalid:
return "Invalid"_sr;
case AuditType::ValidateHA:
return "ValidateHA"_sr;
}
return "Invalid"_sr;
}

View File

@ -2535,16 +2535,22 @@ ACTOR Future<Void> forceRecovery(Reference<IClusterConnectionRecord> clusterFile
}
}
ACTOR Future<UID> auditStorage(Reference<IClusterConnectionRecord> clusterFile, KeyRange range, AuditType type) {
ACTOR Future<UID> auditStorage(Reference<IClusterConnectionRecord> clusterFile,
KeyRange range,
AuditType type,
bool async) {
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);
TraceEvent(SevDebug, "ManagementAPIAuditStorageBegin");
loop {
while (!clusterInterface->get().present()) {
wait(clusterInterface->onChange());
}
UID auditId = wait(clusterInterface->get().get().triggerAudit.getReply(TriggerAuditRequest(type, range)));
TriggerAuditRequest req(type, range);
req.async = async;
UID auditId = wait(clusterInterface->get().get().triggerAudit.getReply(req));
TraceEvent(SevDebug, "ManagementAPIAuditStorageEnd").detail("AuditID", auditId);
return auditId;
}

View File

@ -283,25 +283,40 @@ const KeyRangeRef readConflictRangeKeysRange =
const KeyRangeRef writeConflictRangeKeysRange = KeyRangeRef("\xff\xff/transaction/write_conflict_range/"_sr,
"\xff\xff/transaction/write_conflict_range/\xff\xff"_sr);
const KeyRangeRef auditRange = KeyRangeRef("\xff/audit/"_sr, "\xff/audit0"_sr);
const KeyRef auditPrefix = auditRange.begin;
const KeyRangeRef auditKeys = KeyRangeRef("\xff/audits/"_sr, "\xff/audits0"_sr);
const KeyRef auditPrefix = auditKeys.begin;
const KeyRangeRef auditRanges = KeyRangeRef("\xff/auditRanges/"_sr, "\xff/auditRanges0"_sr);
const KeyRef auditRangePrefix = auditRanges.begin;
const Key auditRangeKey(const AuditType type, const UID& auditId, const KeyRef& key) {
const Key auditKey(const AuditType type, const UID& auditId) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(auditPrefix);
wr << static_cast<uint8_t>(type);
wr.serializeBytes("/"_sr);
wr << bigEndian64(auditId.first());
return wr.toValue();
}
const KeyRange auditKeyRange(const AuditType type) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(auditPrefix);
wr << static_cast<uint8_t>(type);
wr.serializeBytes("/"_sr);
return prefixRange(wr.toValue());
}
const Key auditRangeKey(const UID& auditId, const KeyRef& key) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(auditRangePrefix);
wr << auditId;
wr.serializeBytes("/"_sr);
wr.serializeBytes(key);
return wr.toValue();
}
const Key auditRangePrefix(const AuditType type, const UID& auditId) {
const Key auditRangePrefixFor(const UID& auditId) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(auditPrefix);
wr << static_cast<uint8_t>(type);
wr.serializeBytes("/"_sr);
wr << auditId;
wr.serializeBytes("/"_sr);
return wr.toValue();

View File

@ -41,21 +41,35 @@ enum class AuditType : uint8_t {
struct AuditStorageState {
constexpr static FileIdentifier file_identifier = 13804340;
AuditStorageState() = default;
AuditStorageState(UID id, AuditType type) : id(id), type(static_cast<uint8_t>(type)) {}
AuditStorageState() : type(0), phase(0) {}
AuditStorageState(UID id, AuditType type) : id(id), type(static_cast<uint8_t>(type)), phase(0) {}
AuditStorageState(UID id, KeyRange range, AuditType type)
: id(id), range(range), type(static_cast<uint8_t>(type)), phase(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, type, phase, error);
serializer(ar, id, range, type, phase, error);
}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
void setType(AuditType type) { this->type = static_cast<uint8_t>(type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
void setPhase(AuditPhase phase) { this->phase = static_cast<uint8_t>(phase); }
AuditPhase getPhase() const { return static_cast<AuditPhase>(this->phase); }
std::string toString() const {
std::string res = "AuditStorageState: [ID]: " + id.toString() +
"[Range]: " + Traceable<KeyRangeRef>::toString(range) + "[Type]: " + std::to_string(type) +
"[Phase]: " + std::to_string(phase);
if (!error.empty()) {
res += "[Error]: " + error;
}
return res;
}
UID id;
KeyRange range;
uint8_t type;
uint8_t phase;
std::string error;

View File

@ -18,17 +18,29 @@
* limitations under the License.
*/
#ifndef FDBCLIENT_AUDITUTILS_ACTOR_H
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_AUDITUTILS_ACTOR_G_H)
#define FDBCLIENT_AUDITUTILS_ACTOR_G_H
#include "fdbclient/AuditUtils.actor.g.h"
#elif !defined(FDBCLIENT_AUDITUTILS_ACTOR_H)
#define FDBCLIENT_AUDITUTILS_ACTOR_H
#pragma once
#include "fdbclient/Audit.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbrpc/fdbrpc.h"
#include "flow/actorcompiler.h" // has to be last include
ACTOR Future<Void> persistAuditStorageState(Key key, AuditStorageState auditState);
ACTOR Future<UID> persistNewAuditState(Database cx, AuditStorageState auditState);
ACTOR Future<Void> persistAuditState(Database cx, AuditStorageState auditState);
ACTOR Future<AuditStorageState> getAuditState(Database cx, AuditType type, UID id);
ACTOR Future<std::vector<AuditStorageState>> getLatestAuditStates(Database cx, AuditType type, int num);
ACTOR Future<Void> persistAuditStateMap(Database cx, AuditStorageState auditState);
ACTOR Future<std::vector<AuditStorageState>> getAuditStateForRange(Database cx, UID id, KeyRange range);
StringRef auditTypeToString(const AuditType type);
#include "flow/unactorcompiler.h"
#endif

View File

@ -139,7 +139,10 @@ ACTOR Future<int> setDDMode(Database cx, int mode);
ACTOR Future<Void> forceRecovery(Reference<IClusterConnectionRecord> clusterFile, Standalone<StringRef> dcId);
// Start an audit on range of the specific type.
ACTOR Future<UID> auditStorage(Reference<IClusterConnectionRecord> clusterFile, KeyRange range, AuditType type);
ACTOR Future<UID> auditStorage(Reference<IClusterConnectionRecord> clusterFile,
KeyRange range,
AuditType type,
bool async = false);
ACTOR Future<Void> printHealthyZone(Database cx);
ACTOR Future<bool> clearHealthyZone(Database cx, bool printWarning = false, bool clearSSFailureZoneString = false);

View File

@ -92,10 +92,16 @@ void decodeKeyServersValue(RangeResult result,
UID& destID,
bool missingIsError = true);
extern const KeyRangeRef auditRange;
extern const KeyRangeRef auditKeys;
extern const KeyRef auditPrefix;
const Key auditRangeKey(const AuditType type, const UID& auditId, const KeyRef& key);
const Key auditRangePrefix(const AuditType type, const UID& auditId);
extern const KeyRangeRef auditRanges;
extern const KeyRef auditRangePrefix;
const Key auditKey(const AuditType type, const UID& auditId);
const KeyRange auditKeyRange(const AuditType type);
const Key auditRangeKey(const UID& auditId, const KeyRef& key);
const Key auditRangePrefixFor(const UID& auditId);
const Value auditStorageStateValue(const AuditStorageState& auditStorageState);
AuditStorageState decodeAuditStorageState(const ValueRef& value);

View File

@ -256,6 +256,8 @@ class DDTxnProcessorImpl {
numDataMoves = 0;
server_dc.clear();
result->allServers.clear();
result->dataMoveMap = KeyRangeMap<std::shared_ptr<DataMove>>(std::make_shared<DataMove>());
result->auditStates.clear();
tss_servers.clear();
team_cache.clear();
succeeded = false;
@ -343,6 +345,12 @@ class DDTxnProcessorImpl {
++numDataMoves;
}
RangeResult ads = wait(tr.getRange(auditKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!ads.more && ads.size() < CLIENT_KNOBS->TOO_MANY);
for (int i = 0; i < ads.size(); ++i) {
result->auditStates.push_back(decodeAuditStorageState(ads[i].value));
}
succeeded = true;
break;

View File

@ -22,6 +22,7 @@
#include <string>
#include "fdbclient/Audit.h"
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
@ -32,26 +33,26 @@
#include "fdbclient/SystemData.h"
#include "fdbclient/Tenant.h"
#include "fdbrpc/Replication.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/DDSharedContext.h"
#include "fdbserver/DDTeamCollection.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TenantCache.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/TenantCache.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/BooleanParam.h"
#include "flow/genericactors.actor.h"
#include "flow/serialize.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "fdbserver/DDSharedContext.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/genericactors.actor.h"
#include "flow/serialize.h"
ShardSizeBounds ShardSizeBounds::shardSizeBoundsBeforeTrack() {
return ShardSizeBounds{
@ -304,6 +305,7 @@ public:
Promise<Void> initialized;
std::unordered_map<AuditType, std::vector<std::shared_ptr<DDAudit>>> audits;
Future<Void> auditInitialized;
Optional<Reference<TenantCache>> ddTenantCache;
@ -498,7 +500,7 @@ public:
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
if (meta.ranges.empty()) {
TraceEvent(SevWarn, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString());
TraceEvent(SevInfo, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString());
continue;
}
if (it.value()->isCancelled() || (it.value()->valid && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
@ -561,6 +563,19 @@ public:
}
};
ACTOR Future<Void> resumeAuditStorage(Reference<DataDistributor> self, AuditStorageState auditStates);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
ACTOR Future<Void> loadAndDispatchAuditRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi,
AuditStorageRequest req);
// Periodically check and log the physicalShard status; clean up empty physicalShard;
ACTOR Future<Void> monitorPhysicalShardStatus(Reference<PhysicalShardCollection> self) {
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
@ -603,6 +618,11 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
ASSERT(self->configuration.storageTeamSize > 0);
for (const auto& auditState : self->initData->auditStates) {
TraceEvent("ResumingAuditStorage", self->ddId).detail("AuditID", auditState.id);
self->addActor.send(resumeAuditStorage(self, auditState));
}
state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<Promise<int>> getUnhealthyRelocationCount;
state PromiseStream<GetMetricsRequest> getShardMetrics;
@ -1355,55 +1375,160 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
return Void();
}
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi,
AuditStorageRequest req);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req) {
// TODO(heliu): Load running audit, and create one if no audit is running.
state std::shared_ptr<DDAudit> audit;
auto it = self->audits.find(req.getType());
if (it != self->audits.end() && !it->second.empty()) {
ASSERT_EQ(it->second.size(), 1);
auto& currentAudit = it->second.front();
if (currentAudit->range.contains(req.range)) {
audit = it->second.front();
} else {
req.reply.sendError(audit_storage_exceeded_request_limit());
return Void();
}
} else {
const UID auditId = deterministicRandom()->randomUniqueID();
audit = std::make_shared<DDAudit>(auditId, req.range, req.getType());
self->audits[req.getType()].push_back(audit);
audit->actors.add(scheduleAuditForRange(self, audit, req.range));
TraceEvent(SevDebug, "DDAuditStorageBegin", audit->id).detail("Range", req.range).detail("AuditType", req.type);
ACTOR Future<Void> resumeAuditStorage(Reference<DataDistributor> self, AuditStorageState auditState) {
if (auditState.getPhase() == AuditPhase::Complete) {
return Void();
}
if (req.async && !req.reply.isSet()) {
req.reply.send(audit->id);
}
state std::shared_ptr<DDAudit> audit =
std::make_shared<DDAudit>(auditState.id, auditState.range, auditState.getType());
self->audits[auditState.getType()].push_back(audit);
audit->actors.add(loadAndDispatchAuditRange(self, audit, auditState.range));
TraceEvent(SevDebug, "DDResumAuditStorageBegin", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
try {
wait(audit->actors.getResult());
TraceEvent(SevDebug, "DDAuditStorageEnd", audit->id).detail("Range", req.range).detail("AuditType", req.type);
// TODO(heliu): Set the audit result, and clear auditId.
if (!req.async && !req.reply.isSet()) {
TraceEvent(SevDebug, "DDAuditStorageReply", audit->id)
TraceEvent(SevDebug, "DDResumeAuditStorageEnd", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
auditState.setPhase(AuditPhase::Complete);
wait(persistAuditState(self->txnProcessor->context(), auditState));
} catch (Error& e) {
TraceEvent(SevInfo, "DDResumeAuditStorageOperationError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
if (e.code() == error_code_audit_storage_error) {
auditState.setPhase(AuditPhase::Error);
wait(persistAuditState(self->txnProcessor->context(), auditState));
} else if (e.code() != error_code_actor_cancelled) {
wait(delay(30));
self->addActor.send(resumeAuditStorage(self, auditState));
}
}
return Void();
}
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req) {
state std::shared_ptr<DDAudit> audit;
// TODO: store AuditStorageState in DDAudit.
state AuditStorageState auditState;
auditState.setType(req.getType());
try {
try {
auto it = self->audits.find(req.getType());
if (it != self->audits.end() && !it->second.empty()) {
for (auto& currentAudit : it->second) {
if (currentAudit->range.contains(req.range)) {
auditState.id = currentAudit->id;
auditState.range = currentAudit->range;
auditState.setPhase(AuditPhase::Running);
audit = currentAudit;
break;
}
}
if (audit == nullptr) {
req.reply.sendError(audit_storage_exceeded_request_limit());
return Void();
}
} else {
auditState.range = req.range;
auditState.setPhase(AuditPhase::Running);
UID auditId = wait(persistNewAuditState(self->txnProcessor->context(), auditState));
auditState.id = auditId;
audit = std::make_shared<DDAudit>(auditId, req.range, req.getType());
self->audits[req.getType()].push_back(audit);
audit->actors.add(loadAndDispatchAuditRange(self, audit, req.range));
// Simulate restarting.
if (g_network->isSimulated() && deterministicRandom()->coinflip()) {
// throw operation_failed();
}
}
ASSERT(audit != nullptr);
TraceEvent(SevInfo, "DDAuditStorageScheduled", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", req.range)
.detail("AuditType", req.type);
req.reply.send(audit->id);
if (req.async && !req.reply.isSet()) {
req.reply.send(audit->id);
}
wait(audit->actors.getResult());
TraceEvent(SevInfo, "DDAuditStorageSuccess", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", req.range)
.detail("AuditType", req.type);
auditState.setPhase(AuditPhase::Complete);
wait(persistAuditState(self->txnProcessor->context(), auditState));
if (!req.async && !req.reply.isSet()) {
req.reply.send(audit->id);
}
} catch (Error& e) {
TraceEvent(SevInfo, "DDAuditStorageError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", (audit == nullptr ? UID() : audit->id))
.detail("Range", req.range)
.detail("AuditType", req.type);
state Error err(e);
if (e.code() == error_code_audit_storage_error) {
auditState.setPhase(AuditPhase::Error);
wait(persistAuditState(self->txnProcessor->context(), auditState));
}
throw err;
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "DDAuditStorageOperationError", audit->id)
.errorUnsuppressed(e)
.detail("Range", req.range)
.detail("AuditType", req.type);
if (!req.async && !req.reply.isSet()) {
if (e.code() == error_code_audit_storage_error) {
req.reply.sendError(e);
} else {
req.reply.sendError(audit_storage_failed());
}
}
if (e.code() == error_code_actor_cancelled) {
throw e;
}
}
return Void();
}
ACTOR Future<Void> loadAndDispatchAuditRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
TraceEvent(SevInfo, "DDLoadAndDispatchAuditRangeBegin", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", range)
.detail("AuditType", audit->type);
state Key begin = range.begin;
state KeyRange currentRange = range;
while (begin < range.end) {
currentRange = KeyRangeRef(begin, range.end);
std::vector<AuditStorageState> auditStates =
wait(getAuditStateForRange(self->txnProcessor->context(), audit->id, currentRange));
ASSERT(!auditStates.empty());
begin = auditStates.back().range.end;
for (const auto& auditState : auditStates) {
const AuditPhase phase = auditState.getPhase();
audit->auditMap.insert(auditState.range, phase);
if (phase == AuditPhase::Complete) {
continue;
} else if (phase == AuditPhase::Error) {
throw audit_storage_error();
} else {
audit->actors.add(scheduleAuditForRange(self, audit, auditState.range));
}
}
wait(delay(0.1));
}
return Void();
@ -1412,28 +1537,15 @@ ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditReq
ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
TraceEvent(SevDebug, "DDScheduleAuditForRangeBegin", audit->id)
TraceEvent(SevInfo, "DDScheduleAuditForRangeBegin", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", range)
.detail("AuditType", audit->type);
// TODO(heliu): Load the audit map for `range`.
state Key begin = range.begin;
state KeyRange currentRange = range;
while (begin < range.end) {
currentRange = KeyRangeRef(begin, range.end);
// Find the first keyrange that hasn't been validated.
auto f = audit->auditMap.intersectingRanges(currentRange);
for (auto it = f.begin(); it != f.end(); ++it) {
if (it->value() != AuditPhase::Invalid && it->value() != AuditPhase::Failed) {
begin = it->range().end;
currentRange = KeyRangeRef(it->range().end, currentRange.end);
} else {
currentRange = KeyRangeRef(it->range().begin, it->range().end) & currentRange;
break;
}
}
try {
state std::vector<IDDTxnProcessor::DDRangeLocations> rangeLocations =
wait(self->txnProcessor->getSourceServerInterfacesForRange(currentRange));
@ -1456,12 +1568,11 @@ ACTOR Future<Void> scheduleAuditForRange(Reference<DataDistributor> self,
wait(delay(0.01));
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "DDScheduleAuditRangeError", audit->id)
TraceEvent(SevInfo, "DDScheduleAuditRangeError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->id)
.detail("Range", range);
if (e.code() == error_code_actor_cancelled) {
throw e;
}
throw e;
}
}
@ -1472,7 +1583,8 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi,
AuditStorageRequest req) {
TraceEvent(SevDebug, "DDDoAuditOnStorageServerBegin", req.id)
TraceEvent(SevDebug, "DDDoAuditOnStorageServerBegin", self->ddId)
.detail("AuditID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
.detail("StorageServer", ssi.toString())
@ -1485,21 +1597,24 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
if (vResult.isError()) {
throw vResult.getError();
}
TraceEvent e(vResult.get().error.empty() ? SevInfo : SevWarnAlways, "DDAuditStorageState", req.id);
e.detail("Range", req.range);
e.detail("StorageServer", ssi.toString());
if (!vResult.get().error.empty()) {
e.detail("ErrorMessage", vResult.get().error);
}
TraceEvent(SevDebug, "DDDoAuditOnStorageServerEnd", self->ddId)
.detail("AuditID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
.detail("StorageServer", ssi.toString())
.detail("TargetServers", describe(req.targetServers));
} catch (Error& e) {
TraceEvent(SevWarn, "DDDoAuditOnStorageServerError", req.id)
TraceEvent(SevInfo, "DDDoAuditOnStorageServerError", req.id)
.errorUnsuppressed(e)
.detail("AuditID", req.id)
.detail("Range", req.range)
.detail("StorageServer", ssi.toString())
.detail("TargetServers", describe(req.targetServers));
if (e.code() != error_code_actor_cancelled) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_audit_storage_error) {
throw e;
} else {
audit->auditMap.insert(req.range, AuditPhase::Failed);
audit->actors.add(scheduleAuditForRange(self, audit, req.range));
audit->actors.add(loadAndDispatchAuditRange(self, audit, req.range));
}
}

View File

@ -489,6 +489,7 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
std::vector<DDShardInfo> shards;
Optional<Key> initHealthyZoneValue; // set for maintenance mode
KeyRangeMap<std::shared_ptr<DataMove>> dataMoveMap;
std::vector<AuditStorageState> auditStates;
};
// Holds the permitted size and IO Bounds for a shard

View File

@ -48,6 +48,7 @@
#include "fdbclient/Tracing.h"
#include "flow/Util.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/BlobConnectionProvider.h"
#include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbclient/CommitProxyInterface.h"
@ -4446,14 +4447,17 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector<Optional<Tuple>>& vec,
}
ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
KeyRange range,
AuditStorageState auditState,
Version version,
StorageServerInterface remoteServer) {
TraceEvent(SevInfo, "ValidateRangeAgainstServerBegin", data->thisServerID)
.detail("Range", range)
.detail("AuditID", auditState.id)
.detail("Range", auditState.range)
.detail("Version", version)
.detail("RemoteServer", remoteServer.toString());
state KeyRange range = auditState.range;
state Key originBegin = range.begin;
state int validatedKeys = 0;
state std::string error;
loop {
@ -4498,8 +4502,9 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
}
}
GetKeyValuesReply remote = reps[0].get(), local = reps[1].get();
const GetKeyValuesReply &remote = reps[0].get(), local = reps[1].get();
Key lastKey = range.begin;
auditState.range = range;
const int end = std::min(local.data.size(), remote.data.size());
int i = 0;
@ -4507,7 +4512,7 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
KeyValueRef remoteKV = remote.data[i];
KeyValueRef localKV = local.data[i];
if (!range.contains(remoteKV.key) || !range.contains(localKV.key)) {
TraceEvent(SevDebug, "SSValidateRangeKeyOutOfRange", data->thisServerID)
TraceEvent(SevWarn, "SSValidateRangeKeyOutOfRange", data->thisServerID)
.detail("Range", range)
.detail("RemoteServer", remoteServer.toString().c_str())
.detail("LocalKey", Traceable<StringRef>::toString(localKV.key).c_str())
@ -4556,9 +4561,14 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
break;
}
range = KeyRangeRef(keyAfter(lastKey), range.end);
if (i > 0) {
range = KeyRangeRef(keyAfter(lastKey), range.end);
auditState.range = KeyRangeRef(originBegin, range.begin);
auditState.setPhase(AuditPhase::Complete);
wait(persistAuditStateMap(data->cx, auditState));
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "ValidateRangeAgainstServerError", data->thisServerID)
TraceEvent(SevWarn, "ValidateRangeAgainstServerError", data->thisServerID)
.errorUnsuppressed(e)
.detail("RemoteServer", remoteServer.toString())
.detail("Range", range)
@ -4573,9 +4583,12 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
.detail("Version", version)
.detail("ErrorMessage", error)
.detail("RemoteServer", remoteServer.toString());
auditState.setPhase(AuditPhase::Error);
wait(persistAuditStateMap(data->cx, auditState));
throw audit_storage_error();
}
TraceEvent(SevDebug, "ValidateRangeAgainstServerEnd", data->thisServerID)
TraceEvent(SevInfo, "ValidateRangeAgainstServerEnd", data->thisServerID)
.detail("Range", range)
.detail("Version", version)
.detail("ValidatedKeys", validatedKeys)
@ -4584,9 +4597,9 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
return Void();
}
ACTOR Future<Void> validateRangeShard(StorageServer* data, KeyRange range, std::vector<UID> candidates) {
ACTOR Future<Void> validateRangeShard(StorageServer* data, AuditStorageState auditState, std::vector<UID> candidates) {
TraceEvent(SevDebug, "ServeValidateRangeShardBegin", data->thisServerID)
.detail("Range", range)
.detail("Range", auditState.range)
.detail("Servers", describe(candidates));
state Version version;
@ -4627,7 +4640,7 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, KeyRange range, std::
if (ssis.size() < 2) {
TraceEvent(SevWarn, "ServeValidateRangeShardNotHAConfig", data->thisServerID)
.detail("Range", range)
.detail("Range", auditState.range)
.detail("Servers", describe(candidates));
return Void();
}
@ -4645,10 +4658,10 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, KeyRange range, std::
}
if (remoteServer != nullptr) {
wait(validateRangeAgainstServer(data, range, version, *remoteServer));
wait(validateRangeAgainstServer(data, auditState, version, *remoteServer));
} else {
TraceEvent(SevWarn, "ServeValidateRangeShardRemoteNotFound", data->thisServerID)
.detail("Range", range)
.detail("Range", auditState.range)
.detail("Servers", describe(candidates));
throw audit_storage_failed();
}
@ -4656,9 +4669,12 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, KeyRange range, std::
return Void();
}
ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data, KeyRange range, std::vector<UID> targetServers) {
ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data,
AuditStorageState auditState,
std::vector<UID> targetServers) {
TraceEvent(SevDebug, "ValidateRangeAgainstServersBegin", data->thisServerID)
.detail("Range", range)
.detail("AuditID", auditState.id)
.detail("Range", auditState.range)
.detail("TargetServers", describe(targetServers));
state Version version;
@ -4676,10 +4692,8 @@ ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data, KeyRange ran
}
}
std::vector<Optional<Value>> serverListValues_ = wait(getAll(serverListEntries));
serverListValues = serverListValues_;
Version version_ = wait(tr.getReadVersion());
version = version_;
wait(store(serverListValues, getAll(serverListEntries)));
wait(store(version, tr.getReadVersion()));
break;
} catch (Error& e) {
wait(tr.onError(e));
@ -4689,10 +4703,12 @@ ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data, KeyRange ran
std::vector<Future<Void>> fs;
for (const auto& v : serverListValues) {
if (!v.present()) {
TraceEvent(SevWarn, "ValidateRangeRemoteServerNotFound", data->thisServerID).detail("Range", range);
TraceEvent(SevWarn, "ValidateRangeRemoteServerNotFound", data->thisServerID)
.detail("AuditID", auditState.id)
.detail("Range", auditState.range);
throw audit_storage_failed();
}
fs.push_back(validateRangeAgainstServer(data, range, version, decodeServerListValue(v.get())));
fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get())));
}
wait(waitForAll(fs));
@ -4704,7 +4720,7 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
wait(data->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holder(data->serveAuditStorageParallelismLock);
TraceEvent(SevInfo, "ServeAuditStorageBegin", data->thisServerID)
TraceEvent(SevInfo, "SSAuditStorageBegin", data->thisServerID)
.detail("RequestID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type)
@ -4712,6 +4728,7 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
state Key begin = req.range.begin;
state std::vector<Future<Void>> fs;
state AuditStorageState res(req.id, req.range, req.getType());
try {
if (req.targetServers.empty()) {
@ -4735,7 +4752,10 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
std::vector<UID> dest;
UID srcId, destId;
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
fs.push_back(validateRangeShard(data, KeyRangeRef(shards[i].key, shards[i + 1].key), src));
fs.push_back(validateRangeShard(
data,
AuditStorageState(res.id, KeyRangeRef(shards[i].key, shards[i + 1].key), res.getType()),
src));
begin = shards[i + 1].key;
}
} catch (Error& e) {
@ -4743,19 +4763,27 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
}
}
} else {
fs.push_back(validateRangeAgainstServers(data, req.range, req.targetServers));
fs.push_back(validateRangeAgainstServers(data, res, req.targetServers));
}
wait(waitForAll(fs));
AuditStorageState res(req.id, req.getType());
res.setPhase(AuditPhase::Complete);
req.reply.send(res);
} catch (Error& e) {
TraceEvent(SevWarn, "ServeAuditStorageError", data->thisServerID)
TraceEvent(SevInfo, "SSAuditStorageError", data->thisServerID)
.errorUnsuppressed(e)
.detail("RequestID", req.id)
.detail("Range", req.range)
.detail("AuditType", req.type);
req.reply.sendError(audit_storage_failed());
.detail("AuditType", req.type)
.detail("TargetServers", describe(req.targetServers));
if (e.code() != error_code_audit_storage_error) {
req.reply.sendError(audit_storage_failed());
} else {
req.reply.sendError(audit_storage_error());
}
if (e.code() == error_code_actor_cancelled) {
throw e;
}
}
return Void();

View File

@ -29,6 +29,8 @@
#include "flow/DeterministicRandom.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/Audit.h"
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
@ -1081,6 +1083,44 @@ ACTOR Future<Void> changeConfiguration(Database cx, std::vector<TesterInterface>
return Void();
}
ACTOR Future<Void> auditStorageCorrectness(Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state UID auditId;
TraceEvent("AuditStorageCorrectnessBegin");
loop {
try {
TriggerAuditRequest req(AuditType::ValidateHA, allKeys);
req.async = true;
UID auditId_ = wait(dbInfo->get().clusterInterface.clientInterface.triggerAudit.getReply(req));
auditId = auditId_;
break;
} catch (Error& e) {
TraceEvent(SevWarn, "StartAuditStorageError").errorUnsuppressed(e);
wait(delay(1));
}
}
state Database cx = openDBOnServer(dbInfo);
loop {
try {
AuditStorageState auditState = wait(getAuditState(cx, AuditType::ValidateHA, auditId));
if (auditState.getPhase() != AuditPhase::Complete) {
ASSERT(auditState.getPhase() == AuditPhase::Running);
wait(delay(30));
} else {
TraceEvent(SevInfo, "AuditStorageResult").detail("AuditStorageState", auditState.toString());
ASSERT(auditState.getPhase() == AuditPhase::Complete);
break;
}
} catch (Error& e) {
TraceEvent("WaitAuditStorageError").errorUnsuppressed(e).detail("AuditID", auditId);
wait(delay(1));
}
}
return Void();
}
// Runs the consistency check workload, which verifies that the database is in a consistent state
ACTOR Future<Void> checkConsistency(Database cx,
std::vector<TesterInterface> testers,

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/Audit.h"
#include "fdbclient/AuditUtils.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbrpc/simulator.h"
@ -75,6 +76,7 @@ struct ValidateStorage : TestWorkload {
{ "TestKeyD"_sr, "TestValueD"_sr },
{ "TestKeyE"_sr, "TestValueE"_sr },
{ "TestKeyF"_sr, "TestValueF"_sr } });
state UID auditId;
Version _ = wait(self->populateData(self, cx, &kvs));
@ -85,11 +87,32 @@ struct ValidateStorage : TestWorkload {
loop {
try {
UID auditId = wait(auditStorage(cx->getConnectionRecord(), allKeys, AuditType::ValidateHA));
Optional<UID> auditId_ = wait(timeout(
auditStorage(cx->getConnectionRecord(), allKeys, AuditType::ValidateHA, /*async=*/true), 30));
if (!auditId_.present()) {
throw audit_storage_failed();
}
auditId = auditId_.get();
TraceEvent("TestValidateEnd").detail("AuditID", auditId);
break;
} catch (Error& e) {
TraceEvent("AuditStorageError").errorUnsuppressed(e);
TraceEvent(SevWarn, "StartAuditStorageError").errorUnsuppressed(e);
wait(delay(1));
}
}
loop {
try {
AuditStorageState auditState = wait(getAuditState(cx, AuditType::ValidateHA, auditId));
if (auditState.getPhase() != AuditPhase::Complete) {
ASSERT(auditState.getPhase() == AuditPhase::Running);
wait(delay(30));
} else {
ASSERT(auditState.getPhase() == AuditPhase::Complete);
break;
}
} catch (Error& e) {
TraceEvent("WaitAuditStorageError").errorUnsuppressed(e).detail("AuditID", auditId);
wait(delay(1));
}
}
@ -169,7 +192,7 @@ struct ValidateStorage : TestWorkload {
try {
wait(tr.onError(e));
} catch (Error& e) {
if (e.code() != error_code_audit_storage_failed) {
if (e.code() != error_code_audit_storage_failed && e.code() != error_code_broken_promise) {
throw e;
}
}

View File

@ -136,6 +136,7 @@ ERROR( audit_storage_exceeded_request_limit, 1222, "Exceeded the max number of a
ERROR( proxy_tag_throttled, 1223, "Exceeded maximum proxy tag throttling duration" )
ERROR( key_value_store_deadline_exceeded, 1224, "Exceeded maximum time allowed to read or write.")
ERROR( storage_quota_exceeded, 1225, "Exceeded the maximum storage quota allocated to the tenant.")
ERROR( audit_storage_error, 1226, "Found data corruption" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -210,7 +210,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml)
add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT)
if(WITH_ROCKSDB_EXPERIMENTAL)
add_fdb_test(TEST_FILES fast/ValidateStorage.toml IGNORE)
add_fdb_test(TEST_FILES fast/ValidateStorage.toml)
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml)

View File

@ -1,9 +1,8 @@
[configuration]
config = 'triple'
storageEngineType = 5
generateFearless = true
allowDefaultTenant = false
machineCount = 45
machineCount = 18
[[knobs]]
shard_encode_location_metadata = true