diff --git a/fdbcli/AuditStorageCommand.actor.cpp b/fdbcli/AuditStorageCommand.actor.cpp new file mode 100644 index 0000000000..2748c2e89e --- /dev/null +++ b/fdbcli/AuditStorageCommand.actor.cpp @@ -0,0 +1,72 @@ +/* + * AuditStorageCommand.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbcli/fdbcli.actor.h" + +#include "fdbclient/IClientApi.h" + +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/Audit.h" + +#include "flow/Arena.h" +#include "flow/FastRef.h" +#include "flow/ThreadHelper.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace fdb_cli { + +ACTOR Future auditStorageCommandActor(Reference clusterFile, + std::vector tokens) { + if (tokens.size() < 2) { + printUsage(tokens[0]); + return UID(); + } + + AuditType type = AuditType::Invalid; + if (tokencmp(tokens[1], "ha")) { + type = AuditType::ValidateHA; + } else { + printUsage(tokens[0]); + return UID(); + } + + Key begin, end; + if (tokens.size() == 2) { + begin = allKeys.begin; + end = allKeys.end; + } else if (tokens.size() == 3) { + begin = tokens[2]; + } else if (tokens.size() == 4) { + begin = tokens[2]; + end = tokens[3]; + } else { + printUsage(tokens[0]); + return UID(); + } + + UID auditId = wait(auditStorage(clusterFile, KeyRangeRef(begin, end), type, true)); + return auditId; +} + +CommandFactory auditStorageFactory("audit_storage", + CommandHelp("audit_storage [BeginKey] [EndKey]", + "Start an audit storage", + "Trigger an audit storage, the auditID is returned.\n")); +} // namespace fdb_cli diff --git a/fdbcli/GetAuditStatusCommand.actor.cpp b/fdbcli/GetAuditStatusCommand.actor.cpp new file mode 100644 index 0000000000..f8ade0fdfb --- /dev/null +++ b/fdbcli/GetAuditStatusCommand.actor.cpp @@ -0,0 +1,67 @@ +/* + * GetAuditStatusCommand.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbcli/fdbcli.actor.h" +#include "fdbclient/Audit.h" +#include "fdbclient/AuditUtils.actor.h" +#include "fdbclient/IClientApi.h" +#include "flow/Arena.h" +#include "flow/FastRef.h" +#include "flow/ThreadHelper.actor.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace fdb_cli { + +ACTOR Future getAuditStatusCommandActor(Database cx, std::vector tokens) { + if (tokens.size() != 4) { + printUsage(tokens[0]); + return false; + } + + AuditType type = AuditType::Invalid; + if (tokencmp(tokens[1], "ha")) { + type = AuditType::ValidateHA; + } else { + printUsage(tokens[0]); + return false; + } + + if (tokencmp(tokens[2], "id")) { + const UID id = UID::fromString(tokens[3].toString()); + AuditStorageState res = wait(getAuditState(cx, type, id)); + printf("Audit result is:\n%s", res.toString().c_str()); + } else if (tokencmp(tokens[2], "recent")) { + const int count = std::stoi(tokens[3].toString()); + std::vector res = wait(getLatestAuditStates(cx, type, count)); + for (const auto& it : res) { + printf("Audit result is:\n%s\n", it.toString().c_str()); + } + } + return true; +} + +CommandFactory getAuditStatusFactory( + "get_audit_status", + CommandHelp("get_audit_status [ARGs]", + "Retrieve audit storage results of the specific type", + "Fetch audit result with an ID: get_audit_status [Type] id [ID];\n" + "Fetch most recent audit results: get_audit_status [Type] recent [Count].\n")); +} // namespace fdb_cli diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 369d1f8de0..6c29fa9e50 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -1635,6 +1635,24 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise, Reference fileConfigureCommandActor(Reference db, std::string filePath, bool isNewDatabase, bool force); +// Trigger audit storage +ACTOR Future auditStorageCommandActor(Reference clusterFile, + std::vector tokens); +// Retrieve audit storage status +ACTOR Future getAuditStatusCommandActor(Database cx, std::vector tokens); // force_recovery_with_data_loss command ACTOR Future forceRecoveryWithDataLossCommandActor(Reference db, std::vector tokens); // include command diff --git a/fdbclient/AuditUtils.actor.cpp b/fdbclient/AuditUtils.actor.cpp new file mode 100644 index 0000000000..235859462a --- /dev/null +++ b/fdbclient/AuditUtils.actor.cpp @@ -0,0 +1,175 @@ +/* + * AuditUtils.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/AuditUtils.actor.h" + +#include "fdbclient/Audit.h" +#include "fdbclient/FDBTypes.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/ClientKnobs.h" + +#include "flow/actorcompiler.h" // has to be last include + +ACTOR static Future> getLatestAuditStatesImpl(Transaction* tr, AuditType type, int num) { + state std::vector auditStates; + + loop { + auditStates.clear(); + try { + RangeResult res = wait(tr->getRange(auditKeyRange(type), num, Snapshot::False, Reverse::True)); + for (int i = 0; i < res.size(); ++i) { + auditStates.push_back(decodeAuditStorageState(res[i].value)); + } + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + return auditStates; +} + +ACTOR Future persistNewAuditState(Database cx, AuditStorageState auditState) { + ASSERT(!auditState.id.isValid()); + state Transaction tr(cx); + state UID auditId; + + loop { + try { + std::vector auditStates = wait(getLatestAuditStatesImpl(&tr, auditState.getType(), 1)); + uint64_t nextId = 1; + if (!auditStates.empty()) { + nextId = auditStates.front().id.first() + 1; + } + auditId = UID(nextId, 0LL); + auditState.id = auditId; + tr.set(auditKey(auditState.getType(), auditId), auditStorageStateValue(auditState)); + wait(tr.commit()); + TraceEvent("PersistedNewAuditState", auditId).detail("AuditKey", auditKey(auditState.getType(), auditId)); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + return auditId; +} + +ACTOR Future persistAuditState(Database cx, AuditStorageState auditState) { + state Transaction tr(cx); + + loop { + try { + tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState)); + wait(tr.commit()); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + return Void(); +} + +ACTOR Future getAuditState(Database cx, AuditType type, UID id) { + state Transaction tr(cx); + state Optional res; + + loop { + try { + Optional res_ = wait(tr.get(auditKey(type, id))); + res = res_; + TraceEvent("ReadAuditState", id).detail("AuditKey", auditKey(type, id)); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + if (!res.present()) { + throw key_not_found(); + } + + return decodeAuditStorageState(res.get()); +} + +ACTOR Future> getLatestAuditStates(Database cx, AuditType type, int num) { + Transaction tr(cx); + std::vector auditStates = wait(getLatestAuditStatesImpl(&tr, type, num)); + return auditStates; +} + +ACTOR Future persistAuditStateMap(Database cx, AuditStorageState auditState) { + state Transaction tr(cx); + + loop { + try { + wait(krmSetRange( + &tr, auditRangePrefixFor(auditState.id), auditState.range, auditStorageStateValue(auditState))); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + return Void(); +} + +ACTOR Future> getAuditStateForRange(Database cx, UID id, KeyRange range) { + state RangeResult auditStates; + state Transaction tr(cx); + + loop { + try { + RangeResult res_ = wait(krmGetRanges(&tr, + auditRangePrefixFor(id), + range, + CLIENT_KNOBS->KRM_GET_RANGE_LIMIT, + CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)); + auditStates = res_; + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + std::vector res; + for (int i = 0; i < auditStates.size() - 1; ++i) { + KeyRange currentRange = KeyRangeRef(auditStates[i].key, auditStates[i + 1].key); + AuditStorageState auditState; + if (!auditStates[i].value.empty()) { + AuditStorageState auditState = decodeAuditStorageState(auditStates[i].value); + } + auditState.range = currentRange; + res.push_back(auditState); + } + + return res; +} + +StringRef auditTypeToString(const AuditType type) { + switch (type) { + case AuditType::Invalid: + return "Invalid"_sr; + case AuditType::ValidateHA: + return "ValidateHA"_sr; + } + return "Invalid"_sr; +} \ No newline at end of file diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 6d7bf5f3a0..b4aba8b999 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -2535,16 +2535,22 @@ ACTOR Future forceRecovery(Reference clusterFile } } -ACTOR Future auditStorage(Reference clusterFile, KeyRange range, AuditType type) { +ACTOR Future auditStorage(Reference clusterFile, + KeyRange range, + AuditType type, + bool async) { state Reference>> clusterInterface(new AsyncVar>); state Future leaderMon = monitorLeader(clusterFile, clusterInterface); + TraceEvent(SevDebug, "ManagementAPIAuditStorageBegin"); loop { while (!clusterInterface->get().present()) { wait(clusterInterface->onChange()); } - UID auditId = wait(clusterInterface->get().get().triggerAudit.getReply(TriggerAuditRequest(type, range))); + TriggerAuditRequest req(type, range); + req.async = async; + UID auditId = wait(clusterInterface->get().get().triggerAudit.getReply(req)); TraceEvent(SevDebug, "ManagementAPIAuditStorageEnd").detail("AuditID", auditId); return auditId; } diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 1bc1374dce..c8a32faa02 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -283,25 +283,40 @@ const KeyRangeRef readConflictRangeKeysRange = const KeyRangeRef writeConflictRangeKeysRange = KeyRangeRef("\xff\xff/transaction/write_conflict_range/"_sr, "\xff\xff/transaction/write_conflict_range/\xff\xff"_sr); -const KeyRangeRef auditRange = KeyRangeRef("\xff/audit/"_sr, "\xff/audit0"_sr); -const KeyRef auditPrefix = auditRange.begin; +const KeyRangeRef auditKeys = KeyRangeRef("\xff/audits/"_sr, "\xff/audits0"_sr); +const KeyRef auditPrefix = auditKeys.begin; +const KeyRangeRef auditRanges = KeyRangeRef("\xff/auditRanges/"_sr, "\xff/auditRanges0"_sr); +const KeyRef auditRangePrefix = auditRanges.begin; -const Key auditRangeKey(const AuditType type, const UID& auditId, const KeyRef& key) { +const Key auditKey(const AuditType type, const UID& auditId) { BinaryWriter wr(Unversioned()); wr.serializeBytes(auditPrefix); wr << static_cast(type); wr.serializeBytes("/"_sr); + wr << bigEndian64(auditId.first()); + return wr.toValue(); +} + +const KeyRange auditKeyRange(const AuditType type) { + BinaryWriter wr(Unversioned()); + wr.serializeBytes(auditPrefix); + wr << static_cast(type); + wr.serializeBytes("/"_sr); + return prefixRange(wr.toValue()); +} + +const Key auditRangeKey(const UID& auditId, const KeyRef& key) { + BinaryWriter wr(Unversioned()); + wr.serializeBytes(auditRangePrefix); wr << auditId; wr.serializeBytes("/"_sr); wr.serializeBytes(key); return wr.toValue(); } -const Key auditRangePrefix(const AuditType type, const UID& auditId) { +const Key auditRangePrefixFor(const UID& auditId) { BinaryWriter wr(Unversioned()); wr.serializeBytes(auditPrefix); - wr << static_cast(type); - wr.serializeBytes("/"_sr); wr << auditId; wr.serializeBytes("/"_sr); return wr.toValue(); diff --git a/fdbclient/include/fdbclient/Audit.h b/fdbclient/include/fdbclient/Audit.h index f23e33da9e..c0d2bf20ad 100644 --- a/fdbclient/include/fdbclient/Audit.h +++ b/fdbclient/include/fdbclient/Audit.h @@ -41,21 +41,35 @@ enum class AuditType : uint8_t { struct AuditStorageState { constexpr static FileIdentifier file_identifier = 13804340; - AuditStorageState() = default; - AuditStorageState(UID id, AuditType type) : id(id), type(static_cast(type)) {} + AuditStorageState() : type(0), phase(0) {} + AuditStorageState(UID id, AuditType type) : id(id), type(static_cast(type)), phase(0) {} + AuditStorageState(UID id, KeyRange range, AuditType type) + : id(id), range(range), type(static_cast(type)), phase(0) {} template void serialize(Ar& ar) { - serializer(ar, id, type, phase, error); + serializer(ar, id, range, type, phase, error); } - void setType(AuditType type) { this->type = static_cast(this->type); } + void setType(AuditType type) { this->type = static_cast(type); } AuditType getType() const { return static_cast(this->type); } void setPhase(AuditPhase phase) { this->phase = static_cast(phase); } AuditPhase getPhase() const { return static_cast(this->phase); } + std::string toString() const { + std::string res = "AuditStorageState: [ID]: " + id.toString() + + "[Range]: " + Traceable::toString(range) + "[Type]: " + std::to_string(type) + + "[Phase]: " + std::to_string(phase); + if (!error.empty()) { + res += "[Error]: " + error; + } + + return res; + } + UID id; + KeyRange range; uint8_t type; uint8_t phase; std::string error; diff --git a/fdbclient/include/fdbclient/AuditUtils.actor.h b/fdbclient/include/fdbclient/AuditUtils.actor.h index 94daa2f5f8..cc255da848 100644 --- a/fdbclient/include/fdbclient/AuditUtils.actor.h +++ b/fdbclient/include/fdbclient/AuditUtils.actor.h @@ -18,17 +18,29 @@ * limitations under the License. */ -#ifndef FDBCLIENT_AUDITUTILS_ACTOR_H +#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_AUDITUTILS_ACTOR_G_H) +#define FDBCLIENT_AUDITUTILS_ACTOR_G_H +#include "fdbclient/AuditUtils.actor.g.h" +#elif !defined(FDBCLIENT_AUDITUTILS_ACTOR_H) #define FDBCLIENT_AUDITUTILS_ACTOR_H #pragma once #include "fdbclient/Audit.h" #include "fdbclient/FDBTypes.h" +#include "fdbclient/NativeAPI.actor.h" #include "fdbrpc/fdbrpc.h" #include "flow/actorcompiler.h" // has to be last include -ACTOR Future persistAuditStorageState(Key key, AuditStorageState auditState); +ACTOR Future persistNewAuditState(Database cx, AuditStorageState auditState); +ACTOR Future persistAuditState(Database cx, AuditStorageState auditState); +ACTOR Future getAuditState(Database cx, AuditType type, UID id); +ACTOR Future> getLatestAuditStates(Database cx, AuditType type, int num); + +ACTOR Future persistAuditStateMap(Database cx, AuditStorageState auditState); +ACTOR Future> getAuditStateForRange(Database cx, UID id, KeyRange range); + +StringRef auditTypeToString(const AuditType type); #include "flow/unactorcompiler.h" #endif diff --git a/fdbclient/include/fdbclient/ManagementAPI.actor.h b/fdbclient/include/fdbclient/ManagementAPI.actor.h index 42b8c15c82..7a149dff3b 100644 --- a/fdbclient/include/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/include/fdbclient/ManagementAPI.actor.h @@ -139,7 +139,10 @@ ACTOR Future setDDMode(Database cx, int mode); ACTOR Future forceRecovery(Reference clusterFile, Standalone dcId); // Start an audit on range of the specific type. -ACTOR Future auditStorage(Reference clusterFile, KeyRange range, AuditType type); +ACTOR Future auditStorage(Reference clusterFile, + KeyRange range, + AuditType type, + bool async = false); ACTOR Future printHealthyZone(Database cx); ACTOR Future clearHealthyZone(Database cx, bool printWarning = false, bool clearSSFailureZoneString = false); diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 2d2fa3b12b..766ca3af93 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -92,10 +92,16 @@ void decodeKeyServersValue(RangeResult result, UID& destID, bool missingIsError = true); -extern const KeyRangeRef auditRange; +extern const KeyRangeRef auditKeys; extern const KeyRef auditPrefix; -const Key auditRangeKey(const AuditType type, const UID& auditId, const KeyRef& key); -const Key auditRangePrefix(const AuditType type, const UID& auditId); +extern const KeyRangeRef auditRanges; +extern const KeyRef auditRangePrefix; + +const Key auditKey(const AuditType type, const UID& auditId); +const KeyRange auditKeyRange(const AuditType type); +const Key auditRangeKey(const UID& auditId, const KeyRef& key); +const Key auditRangePrefixFor(const UID& auditId); + const Value auditStorageStateValue(const AuditStorageState& auditStorageState); AuditStorageState decodeAuditStorageState(const ValueRef& value); diff --git a/fdbserver/DDTxnProcessor.actor.cpp b/fdbserver/DDTxnProcessor.actor.cpp index 404782717d..e47261118b 100644 --- a/fdbserver/DDTxnProcessor.actor.cpp +++ b/fdbserver/DDTxnProcessor.actor.cpp @@ -256,6 +256,8 @@ class DDTxnProcessorImpl { numDataMoves = 0; server_dc.clear(); result->allServers.clear(); + result->dataMoveMap = KeyRangeMap>(std::make_shared()); + result->auditStates.clear(); tss_servers.clear(); team_cache.clear(); succeeded = false; @@ -343,6 +345,12 @@ class DDTxnProcessorImpl { ++numDataMoves; } + RangeResult ads = wait(tr.getRange(auditKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!ads.more && ads.size() < CLIENT_KNOBS->TOO_MANY); + for (int i = 0; i < ads.size(); ++i) { + result->auditStates.push_back(decodeAuditStorageState(ads[i].value)); + } + succeeded = true; break; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index c0731df4b8..11329f3a62 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -22,6 +22,7 @@ #include #include "fdbclient/Audit.h" +#include "fdbclient/AuditUtils.actor.h" #include "fdbclient/DatabaseContext.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" @@ -32,26 +33,26 @@ #include "fdbclient/SystemData.h" #include "fdbclient/Tenant.h" #include "fdbrpc/Replication.h" -#include "fdbserver/DataDistribution.actor.h" +#include "fdbserver/DDSharedContext.h" #include "fdbserver/DDTeamCollection.h" +#include "fdbserver/DataDistribution.actor.h" #include "fdbserver/FDBExecHelper.actor.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/Knobs.h" #include "fdbserver/QuietDatabase.h" #include "fdbserver/ServerDBInfo.h" -#include "fdbserver/TenantCache.h" #include "fdbserver/TLogInterface.h" +#include "fdbserver/TenantCache.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/workloads/workloads.actor.h" #include "flow/ActorCollection.h" #include "flow/Arena.h" #include "flow/BooleanParam.h" -#include "flow/genericactors.actor.h" -#include "flow/serialize.h" #include "flow/Trace.h" #include "flow/UnitTest.h" -#include "fdbserver/DDSharedContext.h" #include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/genericactors.actor.h" +#include "flow/serialize.h" ShardSizeBounds ShardSizeBounds::shardSizeBoundsBeforeTrack() { return ShardSizeBounds{ @@ -304,6 +305,7 @@ public: Promise initialized; std::unordered_map>> audits; + Future auditInitialized; Optional> ddTenantCache; @@ -498,7 +500,7 @@ public: for (; it != self->initData->dataMoveMap.ranges().end(); ++it) { const DataMoveMetaData& meta = it.value()->meta; if (meta.ranges.empty()) { - TraceEvent(SevWarn, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString()); + TraceEvent(SevInfo, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString()); continue; } if (it.value()->isCancelled() || (it.value()->valid && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) { @@ -561,6 +563,19 @@ public: } }; +ACTOR Future resumeAuditStorage(Reference self, AuditStorageState auditStates); +ACTOR Future auditStorage(Reference self, TriggerAuditRequest req); +ACTOR Future loadAndDispatchAuditRange(Reference self, + std::shared_ptr audit, + KeyRange range); +ACTOR Future scheduleAuditForRange(Reference self, + std::shared_ptr audit, + KeyRange range); +ACTOR Future doAuditOnStorageServer(Reference self, + std::shared_ptr audit, + StorageServerInterface ssi, + AuditStorageRequest req); + // Periodically check and log the physicalShard status; clean up empty physicalShard; ACTOR Future monitorPhysicalShardStatus(Reference self) { ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA); @@ -603,6 +618,11 @@ ACTOR Future dataDistribution(Reference self, // When/If this assertion fails, Evan owes Ben a pat on the back for his foresight ASSERT(self->configuration.storageTeamSize > 0); + for (const auto& auditState : self->initData->auditStates) { + TraceEvent("ResumingAuditStorage", self->ddId).detail("AuditID", auditState.id); + self->addActor.send(resumeAuditStorage(self, auditState)); + } + state PromiseStream> getAverageShardBytes; state PromiseStream> getUnhealthyRelocationCount; state PromiseStream getShardMetrics; @@ -1355,55 +1375,160 @@ ACTOR Future ddGetMetrics(GetDataDistributorMetricsRequest req, return Void(); } -ACTOR Future auditStorage(Reference self, TriggerAuditRequest req); -ACTOR Future scheduleAuditForRange(Reference self, - std::shared_ptr audit, - KeyRange range); -ACTOR Future doAuditOnStorageServer(Reference self, - std::shared_ptr audit, - StorageServerInterface ssi, - AuditStorageRequest req); - -ACTOR Future auditStorage(Reference self, TriggerAuditRequest req) { - // TODO(heliu): Load running audit, and create one if no audit is running. - state std::shared_ptr audit; - auto it = self->audits.find(req.getType()); - if (it != self->audits.end() && !it->second.empty()) { - ASSERT_EQ(it->second.size(), 1); - auto& currentAudit = it->second.front(); - if (currentAudit->range.contains(req.range)) { - audit = it->second.front(); - } else { - req.reply.sendError(audit_storage_exceeded_request_limit()); - return Void(); - } - } else { - const UID auditId = deterministicRandom()->randomUniqueID(); - audit = std::make_shared(auditId, req.range, req.getType()); - self->audits[req.getType()].push_back(audit); - audit->actors.add(scheduleAuditForRange(self, audit, req.range)); - TraceEvent(SevDebug, "DDAuditStorageBegin", audit->id).detail("Range", req.range).detail("AuditType", req.type); +ACTOR Future resumeAuditStorage(Reference self, AuditStorageState auditState) { + if (auditState.getPhase() == AuditPhase::Complete) { + return Void(); } - if (req.async && !req.reply.isSet()) { - req.reply.send(audit->id); - } + state std::shared_ptr audit = + std::make_shared(auditState.id, auditState.range, auditState.getType()); + self->audits[auditState.getType()].push_back(audit); + audit->actors.add(loadAndDispatchAuditRange(self, audit, auditState.range)); + TraceEvent(SevDebug, "DDResumAuditStorageBegin", self->ddId) + .detail("AuditID", audit->id) + .detail("Range", auditState.range) + .detail("AuditType", auditState.type); try { wait(audit->actors.getResult()); - TraceEvent(SevDebug, "DDAuditStorageEnd", audit->id).detail("Range", req.range).detail("AuditType", req.type); - // TODO(heliu): Set the audit result, and clear auditId. - if (!req.async && !req.reply.isSet()) { - TraceEvent(SevDebug, "DDAuditStorageReply", audit->id) + TraceEvent(SevDebug, "DDResumeAuditStorageEnd", self->ddId) + .detail("AuditID", audit->id) + .detail("Range", auditState.range) + .detail("AuditType", auditState.type); + auditState.setPhase(AuditPhase::Complete); + wait(persistAuditState(self->txnProcessor->context(), auditState)); + } catch (Error& e) { + TraceEvent(SevInfo, "DDResumeAuditStorageOperationError", self->ddId) + .errorUnsuppressed(e) + .detail("AuditID", audit->id) + .detail("Range", auditState.range) + .detail("AuditType", auditState.type); + if (e.code() == error_code_audit_storage_error) { + auditState.setPhase(AuditPhase::Error); + wait(persistAuditState(self->txnProcessor->context(), auditState)); + } else if (e.code() != error_code_actor_cancelled) { + wait(delay(30)); + self->addActor.send(resumeAuditStorage(self, auditState)); + } + } + + return Void(); +} + +ACTOR Future auditStorage(Reference self, TriggerAuditRequest req) { + state std::shared_ptr audit; + // TODO: store AuditStorageState in DDAudit. + state AuditStorageState auditState; + auditState.setType(req.getType()); + + try { + try { + auto it = self->audits.find(req.getType()); + if (it != self->audits.end() && !it->second.empty()) { + for (auto& currentAudit : it->second) { + if (currentAudit->range.contains(req.range)) { + auditState.id = currentAudit->id; + auditState.range = currentAudit->range; + auditState.setPhase(AuditPhase::Running); + audit = currentAudit; + break; + } + } + if (audit == nullptr) { + req.reply.sendError(audit_storage_exceeded_request_limit()); + return Void(); + } + } else { + auditState.range = req.range; + auditState.setPhase(AuditPhase::Running); + UID auditId = wait(persistNewAuditState(self->txnProcessor->context(), auditState)); + auditState.id = auditId; + audit = std::make_shared(auditId, req.range, req.getType()); + self->audits[req.getType()].push_back(audit); + audit->actors.add(loadAndDispatchAuditRange(self, audit, req.range)); + // Simulate restarting. + if (g_network->isSimulated() && deterministicRandom()->coinflip()) { + // throw operation_failed(); + } + } + + ASSERT(audit != nullptr); + TraceEvent(SevInfo, "DDAuditStorageScheduled", self->ddId) + .detail("AuditID", audit->id) .detail("Range", req.range) .detail("AuditType", req.type); - req.reply.send(audit->id); + + if (req.async && !req.reply.isSet()) { + req.reply.send(audit->id); + } + + wait(audit->actors.getResult()); + TraceEvent(SevInfo, "DDAuditStorageSuccess", self->ddId) + .detail("AuditID", audit->id) + .detail("Range", req.range) + .detail("AuditType", req.type); + auditState.setPhase(AuditPhase::Complete); + wait(persistAuditState(self->txnProcessor->context(), auditState)); + if (!req.async && !req.reply.isSet()) { + req.reply.send(audit->id); + } + } catch (Error& e) { + TraceEvent(SevInfo, "DDAuditStorageError", self->ddId) + .errorUnsuppressed(e) + .detail("AuditID", (audit == nullptr ? UID() : audit->id)) + .detail("Range", req.range) + .detail("AuditType", req.type); + state Error err(e); + if (e.code() == error_code_audit_storage_error) { + auditState.setPhase(AuditPhase::Error); + wait(persistAuditState(self->txnProcessor->context(), auditState)); + } + throw err; } } catch (Error& e) { - TraceEvent(SevWarnAlways, "DDAuditStorageOperationError", audit->id) - .errorUnsuppressed(e) - .detail("Range", req.range) - .detail("AuditType", req.type); + if (!req.async && !req.reply.isSet()) { + if (e.code() == error_code_audit_storage_error) { + req.reply.sendError(e); + } else { + req.reply.sendError(audit_storage_failed()); + } + } + if (e.code() == error_code_actor_cancelled) { + throw e; + } + } + + return Void(); +} + +ACTOR Future loadAndDispatchAuditRange(Reference self, + std::shared_ptr audit, + KeyRange range) { + TraceEvent(SevInfo, "DDLoadAndDispatchAuditRangeBegin", self->ddId) + .detail("AuditID", audit->id) + .detail("Range", range) + .detail("AuditType", audit->type); + state Key begin = range.begin; + state KeyRange currentRange = range; + + while (begin < range.end) { + currentRange = KeyRangeRef(begin, range.end); + std::vector auditStates = + wait(getAuditStateForRange(self->txnProcessor->context(), audit->id, currentRange)); + ASSERT(!auditStates.empty()); + begin = auditStates.back().range.end; + for (const auto& auditState : auditStates) { + const AuditPhase phase = auditState.getPhase(); + audit->auditMap.insert(auditState.range, phase); + if (phase == AuditPhase::Complete) { + continue; + } else if (phase == AuditPhase::Error) { + throw audit_storage_error(); + } else { + audit->actors.add(scheduleAuditForRange(self, audit, auditState.range)); + } + } + wait(delay(0.1)); } return Void(); @@ -1412,28 +1537,15 @@ ACTOR Future auditStorage(Reference self, TriggerAuditReq ACTOR Future scheduleAuditForRange(Reference self, std::shared_ptr audit, KeyRange range) { - TraceEvent(SevDebug, "DDScheduleAuditForRangeBegin", audit->id) + TraceEvent(SevInfo, "DDScheduleAuditForRangeBegin", self->ddId) + .detail("AuditID", audit->id) .detail("Range", range) .detail("AuditType", audit->type); - // TODO(heliu): Load the audit map for `range`. state Key begin = range.begin; state KeyRange currentRange = range; while (begin < range.end) { currentRange = KeyRangeRef(begin, range.end); - - // Find the first keyrange that hasn't been validated. - auto f = audit->auditMap.intersectingRanges(currentRange); - for (auto it = f.begin(); it != f.end(); ++it) { - if (it->value() != AuditPhase::Invalid && it->value() != AuditPhase::Failed) { - begin = it->range().end; - currentRange = KeyRangeRef(it->range().end, currentRange.end); - } else { - currentRange = KeyRangeRef(it->range().begin, it->range().end) & currentRange; - break; - } - } - try { state std::vector rangeLocations = wait(self->txnProcessor->getSourceServerInterfacesForRange(currentRange)); @@ -1456,12 +1568,11 @@ ACTOR Future scheduleAuditForRange(Reference self, wait(delay(0.01)); } } catch (Error& e) { - TraceEvent(SevWarnAlways, "DDScheduleAuditRangeError", audit->id) + TraceEvent(SevInfo, "DDScheduleAuditRangeError", self->ddId) .errorUnsuppressed(e) + .detail("AuditID", audit->id) .detail("Range", range); - if (e.code() == error_code_actor_cancelled) { - throw e; - } + throw e; } } @@ -1472,7 +1583,8 @@ ACTOR Future doAuditOnStorageServer(Reference self, std::shared_ptr audit, StorageServerInterface ssi, AuditStorageRequest req) { - TraceEvent(SevDebug, "DDDoAuditOnStorageServerBegin", req.id) + TraceEvent(SevDebug, "DDDoAuditOnStorageServerBegin", self->ddId) + .detail("AuditID", req.id) .detail("Range", req.range) .detail("AuditType", req.type) .detail("StorageServer", ssi.toString()) @@ -1485,21 +1597,24 @@ ACTOR Future doAuditOnStorageServer(Reference self, if (vResult.isError()) { throw vResult.getError(); } - TraceEvent e(vResult.get().error.empty() ? SevInfo : SevWarnAlways, "DDAuditStorageState", req.id); - e.detail("Range", req.range); - e.detail("StorageServer", ssi.toString()); - if (!vResult.get().error.empty()) { - e.detail("ErrorMessage", vResult.get().error); - } + TraceEvent(SevDebug, "DDDoAuditOnStorageServerEnd", self->ddId) + .detail("AuditID", req.id) + .detail("Range", req.range) + .detail("AuditType", req.type) + .detail("StorageServer", ssi.toString()) + .detail("TargetServers", describe(req.targetServers)); } catch (Error& e) { - TraceEvent(SevWarn, "DDDoAuditOnStorageServerError", req.id) + TraceEvent(SevInfo, "DDDoAuditOnStorageServerError", req.id) .errorUnsuppressed(e) + .detail("AuditID", req.id) .detail("Range", req.range) .detail("StorageServer", ssi.toString()) .detail("TargetServers", describe(req.targetServers)); - if (e.code() != error_code_actor_cancelled) { + if (e.code() == error_code_actor_cancelled || e.code() == error_code_audit_storage_error) { + throw e; + } else { audit->auditMap.insert(req.range, AuditPhase::Failed); - audit->actors.add(scheduleAuditForRange(self, audit, req.range)); + audit->actors.add(loadAndDispatchAuditRange(self, audit, req.range)); } } diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index 05dc7d139d..b4f56c25af 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -489,6 +489,7 @@ struct InitialDataDistribution : ReferenceCounted { std::vector shards; Optional initHealthyZoneValue; // set for maintenance mode KeyRangeMap> dataMoveMap; + std::vector auditStates; }; // Holds the permitted size and IO Bounds for a shard diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index bbbbd7a0e7..9e287eb630 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -48,6 +48,7 @@ #include "fdbclient/Tracing.h" #include "flow/Util.h" #include "fdbclient/Atomic.h" +#include "fdbclient/AuditUtils.actor.h" #include "fdbclient/BlobConnectionProvider.h" #include "fdbclient/BlobGranuleReader.actor.h" #include "fdbclient/CommitProxyInterface.h" @@ -4446,14 +4447,17 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector>& vec, } ACTOR Future validateRangeAgainstServer(StorageServer* data, - KeyRange range, + AuditStorageState auditState, Version version, StorageServerInterface remoteServer) { TraceEvent(SevInfo, "ValidateRangeAgainstServerBegin", data->thisServerID) - .detail("Range", range) + .detail("AuditID", auditState.id) + .detail("Range", auditState.range) .detail("Version", version) .detail("RemoteServer", remoteServer.toString()); + state KeyRange range = auditState.range; + state Key originBegin = range.begin; state int validatedKeys = 0; state std::string error; loop { @@ -4498,8 +4502,9 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, } } - GetKeyValuesReply remote = reps[0].get(), local = reps[1].get(); + const GetKeyValuesReply &remote = reps[0].get(), local = reps[1].get(); Key lastKey = range.begin; + auditState.range = range; const int end = std::min(local.data.size(), remote.data.size()); int i = 0; @@ -4507,7 +4512,7 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, KeyValueRef remoteKV = remote.data[i]; KeyValueRef localKV = local.data[i]; if (!range.contains(remoteKV.key) || !range.contains(localKV.key)) { - TraceEvent(SevDebug, "SSValidateRangeKeyOutOfRange", data->thisServerID) + TraceEvent(SevWarn, "SSValidateRangeKeyOutOfRange", data->thisServerID) .detail("Range", range) .detail("RemoteServer", remoteServer.toString().c_str()) .detail("LocalKey", Traceable::toString(localKV.key).c_str()) @@ -4556,9 +4561,14 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, break; } - range = KeyRangeRef(keyAfter(lastKey), range.end); + if (i > 0) { + range = KeyRangeRef(keyAfter(lastKey), range.end); + auditState.range = KeyRangeRef(originBegin, range.begin); + auditState.setPhase(AuditPhase::Complete); + wait(persistAuditStateMap(data->cx, auditState)); + } } catch (Error& e) { - TraceEvent(SevWarnAlways, "ValidateRangeAgainstServerError", data->thisServerID) + TraceEvent(SevWarn, "ValidateRangeAgainstServerError", data->thisServerID) .errorUnsuppressed(e) .detail("RemoteServer", remoteServer.toString()) .detail("Range", range) @@ -4573,9 +4583,12 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, .detail("Version", version) .detail("ErrorMessage", error) .detail("RemoteServer", remoteServer.toString()); + auditState.setPhase(AuditPhase::Error); + wait(persistAuditStateMap(data->cx, auditState)); + throw audit_storage_error(); } - TraceEvent(SevDebug, "ValidateRangeAgainstServerEnd", data->thisServerID) + TraceEvent(SevInfo, "ValidateRangeAgainstServerEnd", data->thisServerID) .detail("Range", range) .detail("Version", version) .detail("ValidatedKeys", validatedKeys) @@ -4584,9 +4597,9 @@ ACTOR Future validateRangeAgainstServer(StorageServer* data, return Void(); } -ACTOR Future validateRangeShard(StorageServer* data, KeyRange range, std::vector candidates) { +ACTOR Future validateRangeShard(StorageServer* data, AuditStorageState auditState, std::vector candidates) { TraceEvent(SevDebug, "ServeValidateRangeShardBegin", data->thisServerID) - .detail("Range", range) + .detail("Range", auditState.range) .detail("Servers", describe(candidates)); state Version version; @@ -4627,7 +4640,7 @@ ACTOR Future validateRangeShard(StorageServer* data, KeyRange range, std:: if (ssis.size() < 2) { TraceEvent(SevWarn, "ServeValidateRangeShardNotHAConfig", data->thisServerID) - .detail("Range", range) + .detail("Range", auditState.range) .detail("Servers", describe(candidates)); return Void(); } @@ -4645,10 +4658,10 @@ ACTOR Future validateRangeShard(StorageServer* data, KeyRange range, std:: } if (remoteServer != nullptr) { - wait(validateRangeAgainstServer(data, range, version, *remoteServer)); + wait(validateRangeAgainstServer(data, auditState, version, *remoteServer)); } else { TraceEvent(SevWarn, "ServeValidateRangeShardRemoteNotFound", data->thisServerID) - .detail("Range", range) + .detail("Range", auditState.range) .detail("Servers", describe(candidates)); throw audit_storage_failed(); } @@ -4656,9 +4669,12 @@ ACTOR Future validateRangeShard(StorageServer* data, KeyRange range, std:: return Void(); } -ACTOR Future validateRangeAgainstServers(StorageServer* data, KeyRange range, std::vector targetServers) { +ACTOR Future validateRangeAgainstServers(StorageServer* data, + AuditStorageState auditState, + std::vector targetServers) { TraceEvent(SevDebug, "ValidateRangeAgainstServersBegin", data->thisServerID) - .detail("Range", range) + .detail("AuditID", auditState.id) + .detail("Range", auditState.range) .detail("TargetServers", describe(targetServers)); state Version version; @@ -4676,10 +4692,8 @@ ACTOR Future validateRangeAgainstServers(StorageServer* data, KeyRange ran } } - std::vector> serverListValues_ = wait(getAll(serverListEntries)); - serverListValues = serverListValues_; - Version version_ = wait(tr.getReadVersion()); - version = version_; + wait(store(serverListValues, getAll(serverListEntries))); + wait(store(version, tr.getReadVersion())); break; } catch (Error& e) { wait(tr.onError(e)); @@ -4689,10 +4703,12 @@ ACTOR Future validateRangeAgainstServers(StorageServer* data, KeyRange ran std::vector> fs; for (const auto& v : serverListValues) { if (!v.present()) { - TraceEvent(SevWarn, "ValidateRangeRemoteServerNotFound", data->thisServerID).detail("Range", range); + TraceEvent(SevWarn, "ValidateRangeRemoteServerNotFound", data->thisServerID) + .detail("AuditID", auditState.id) + .detail("Range", auditState.range); throw audit_storage_failed(); } - fs.push_back(validateRangeAgainstServer(data, range, version, decodeServerListValue(v.get()))); + fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get()))); } wait(waitForAll(fs)); @@ -4704,7 +4720,7 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { wait(data->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield)); state FlowLock::Releaser holder(data->serveAuditStorageParallelismLock); - TraceEvent(SevInfo, "ServeAuditStorageBegin", data->thisServerID) + TraceEvent(SevInfo, "SSAuditStorageBegin", data->thisServerID) .detail("RequestID", req.id) .detail("Range", req.range) .detail("AuditType", req.type) @@ -4712,6 +4728,7 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { state Key begin = req.range.begin; state std::vector> fs; + state AuditStorageState res(req.id, req.range, req.getType()); try { if (req.targetServers.empty()) { @@ -4735,7 +4752,10 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { std::vector dest; UID srcId, destId; decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId); - fs.push_back(validateRangeShard(data, KeyRangeRef(shards[i].key, shards[i + 1].key), src)); + fs.push_back(validateRangeShard( + data, + AuditStorageState(res.id, KeyRangeRef(shards[i].key, shards[i + 1].key), res.getType()), + src)); begin = shards[i + 1].key; } } catch (Error& e) { @@ -4743,19 +4763,27 @@ ACTOR Future auditStorageQ(StorageServer* data, AuditStorageRequest req) { } } } else { - fs.push_back(validateRangeAgainstServers(data, req.range, req.targetServers)); + fs.push_back(validateRangeAgainstServers(data, res, req.targetServers)); } + wait(waitForAll(fs)); - AuditStorageState res(req.id, req.getType()); res.setPhase(AuditPhase::Complete); req.reply.send(res); } catch (Error& e) { - TraceEvent(SevWarn, "ServeAuditStorageError", data->thisServerID) + TraceEvent(SevInfo, "SSAuditStorageError", data->thisServerID) .errorUnsuppressed(e) .detail("RequestID", req.id) .detail("Range", req.range) - .detail("AuditType", req.type); - req.reply.sendError(audit_storage_failed()); + .detail("AuditType", req.type) + .detail("TargetServers", describe(req.targetServers)); + if (e.code() != error_code_audit_storage_error) { + req.reply.sendError(audit_storage_failed()); + } else { + req.reply.sendError(audit_storage_error()); + } + if (e.code() == error_code_actor_cancelled) { + throw e; + } } return Void(); diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 23df5447f8..a57498d536 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -29,6 +29,8 @@ #include "flow/DeterministicRandom.h" #include "fdbrpc/sim_validation.h" #include "fdbrpc/simulator.h" +#include "fdbclient/Audit.h" +#include "fdbclient/AuditUtils.actor.h" #include "fdbclient/ClusterInterface.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/SystemData.h" @@ -1081,6 +1083,44 @@ ACTOR Future changeConfiguration(Database cx, std::vector return Void(); } +ACTOR Future auditStorageCorrectness(Reference> dbInfo) { + state UID auditId; + TraceEvent("AuditStorageCorrectnessBegin"); + + loop { + try { + TriggerAuditRequest req(AuditType::ValidateHA, allKeys); + req.async = true; + UID auditId_ = wait(dbInfo->get().clusterInterface.clientInterface.triggerAudit.getReply(req)); + auditId = auditId_; + break; + } catch (Error& e) { + TraceEvent(SevWarn, "StartAuditStorageError").errorUnsuppressed(e); + wait(delay(1)); + } + } + + state Database cx = openDBOnServer(dbInfo); + loop { + try { + AuditStorageState auditState = wait(getAuditState(cx, AuditType::ValidateHA, auditId)); + if (auditState.getPhase() != AuditPhase::Complete) { + ASSERT(auditState.getPhase() == AuditPhase::Running); + wait(delay(30)); + } else { + TraceEvent(SevInfo, "AuditStorageResult").detail("AuditStorageState", auditState.toString()); + ASSERT(auditState.getPhase() == AuditPhase::Complete); + break; + } + } catch (Error& e) { + TraceEvent("WaitAuditStorageError").errorUnsuppressed(e).detail("AuditID", auditId); + wait(delay(1)); + } + } + + return Void(); +} + // Runs the consistency check workload, which verifies that the database is in a consistent state ACTOR Future checkConsistency(Database cx, std::vector testers, diff --git a/fdbserver/workloads/ValidateStorage.actor.cpp b/fdbserver/workloads/ValidateStorage.actor.cpp index 9f77973a1c..e48ae1a5ba 100644 --- a/fdbserver/workloads/ValidateStorage.actor.cpp +++ b/fdbserver/workloads/ValidateStorage.actor.cpp @@ -19,6 +19,7 @@ */ #include "fdbclient/Audit.h" +#include "fdbclient/AuditUtils.actor.h" #include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbrpc/simulator.h" @@ -75,6 +76,7 @@ struct ValidateStorage : TestWorkload { { "TestKeyD"_sr, "TestValueD"_sr }, { "TestKeyE"_sr, "TestValueE"_sr }, { "TestKeyF"_sr, "TestValueF"_sr } }); + state UID auditId; Version _ = wait(self->populateData(self, cx, &kvs)); @@ -85,11 +87,32 @@ struct ValidateStorage : TestWorkload { loop { try { - UID auditId = wait(auditStorage(cx->getConnectionRecord(), allKeys, AuditType::ValidateHA)); + Optional auditId_ = wait(timeout( + auditStorage(cx->getConnectionRecord(), allKeys, AuditType::ValidateHA, /*async=*/true), 30)); + if (!auditId_.present()) { + throw audit_storage_failed(); + } + auditId = auditId_.get(); TraceEvent("TestValidateEnd").detail("AuditID", auditId); break; } catch (Error& e) { - TraceEvent("AuditStorageError").errorUnsuppressed(e); + TraceEvent(SevWarn, "StartAuditStorageError").errorUnsuppressed(e); + wait(delay(1)); + } + } + + loop { + try { + AuditStorageState auditState = wait(getAuditState(cx, AuditType::ValidateHA, auditId)); + if (auditState.getPhase() != AuditPhase::Complete) { + ASSERT(auditState.getPhase() == AuditPhase::Running); + wait(delay(30)); + } else { + ASSERT(auditState.getPhase() == AuditPhase::Complete); + break; + } + } catch (Error& e) { + TraceEvent("WaitAuditStorageError").errorUnsuppressed(e).detail("AuditID", auditId); wait(delay(1)); } } @@ -169,7 +192,7 @@ struct ValidateStorage : TestWorkload { try { wait(tr.onError(e)); } catch (Error& e) { - if (e.code() != error_code_audit_storage_failed) { + if (e.code() != error_code_audit_storage_failed && e.code() != error_code_broken_promise) { throw e; } } diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index 38d764dd35..f580c474a9 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -136,6 +136,7 @@ ERROR( audit_storage_exceeded_request_limit, 1222, "Exceeded the max number of a ERROR( proxy_tag_throttled, 1223, "Exceeded maximum proxy tag throttling duration" ) ERROR( key_value_store_deadline_exceeded, 1224, "Exceeded maximum time allowed to read or write.") ERROR( storage_quota_exceeded, 1225, "Exceeded the maximum storage quota allocated to the tenant.") +ERROR( audit_storage_error, 1226, "Found data corruption" ) // 15xx Platform errors ERROR( platform_error, 1500, "Platform error" ) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index aff5ee132a..88ad3232cf 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -210,7 +210,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml) add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT) if(WITH_ROCKSDB_EXPERIMENTAL) - add_fdb_test(TEST_FILES fast/ValidateStorage.toml IGNORE) + add_fdb_test(TEST_FILES fast/ValidateStorage.toml) add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml UNIT) add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT) add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml) diff --git a/tests/fast/ValidateStorage.toml b/tests/fast/ValidateStorage.toml index fc9ae49ae1..ae3b373076 100644 --- a/tests/fast/ValidateStorage.toml +++ b/tests/fast/ValidateStorage.toml @@ -1,9 +1,8 @@ [configuration] config = 'triple' -storageEngineType = 5 generateFearless = true allowDefaultTenant = false -machineCount = 45 +machineCount = 18 [[knobs]] shard_encode_location_metadata = true