diff --git a/fdbclient/AuditUtils.actor.cpp b/fdbclient/AuditUtils.actor.cpp index 64ff8ba2f3..38529f3688 100644 --- a/fdbclient/AuditUtils.actor.cpp +++ b/fdbclient/AuditUtils.actor.cpp @@ -871,7 +871,8 @@ ACTOR Future> initAuditMetadata(Database cx, throw e; } if (retryCount > 50) { - TraceEvent(SevWarnAlways, "InitAuditMetadataExceedRetryMax", dataDistributorId).errorUnsuppressed(e); + TraceEvent(SevWarnAlways, "AuditUtilInitAuditMetadataExceedRetryMax", dataDistributorId) + .errorUnsuppressed(e); break; } try { @@ -884,3 +885,257 @@ ACTOR Future> initAuditMetadata(Database cx, } return auditStatesToResume; } + +// Check if any pair of ranges are exclusive with each other +// This is not a part in consistency check of audit metadata +// This is used for checking the validity of inputs to rangesSame() +bool elementsAreExclusiveWithEachOther(std::vector ranges) { + ASSERT(std::is_sorted(ranges.begin(), ranges.end(), KeyRangeRef::ArbitraryOrder())); + for (int i = 0; i < ranges.size() - 1; ++i) { + if (ranges[i].end > ranges[i + 1].begin) { + TraceEvent(SevError, "AuditUtilElementsAreNotExclusiveWithEachOther").detail("Ranges", describe(ranges)); + return false; + } + } + return true; +} + +// Check if any range is empty in the given list of ranges +// This is not a part in consistency check of audit metadata +// This is used for checking the validity of inputs to rangesSame() +bool noEmptyRangeInRanges(std::vector ranges) { + for (const auto& range : ranges) { + if (range.empty()) { + return false; + } + } + return true; +} + +// Given a list of ranges, where ranges can overlap with each other +// Return a list of exclusive ranges which covers the ranges exactly +// the same as the input list of ranges +std::vector coalesceRangeList(std::vector ranges) { + std::sort(ranges.begin(), ranges.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; }); + std::vector res; + for (const auto& range : ranges) { + if (res.empty()) { + res.push_back(range); + continue; + } + if (range.begin <= res.back().end) { + if (range.end > res.back().end) { // update res.back if current range extends the back range + KeyRange newBack = Standalone(KeyRangeRef(res.back().begin, range.end)); + res.pop_back(); + res.push_back(newBack); + } + } else { + res.push_back(range); + } + } + return res; +} + +// Given two lists of ranges --- rangesA and rangesB, check if two lists are identical +// If not, return the mismatched two ranges of rangeA and rangeB respectively +Optional> rangesSame(std::vector rangesA, std::vector rangesB) { + if (g_network->isSimulated()) { + ASSERT(noEmptyRangeInRanges(rangesA)); + ASSERT(noEmptyRangeInRanges(rangesB)); + } + KeyRange emptyRange; + if (rangesA.empty() && rangesB.empty()) { // no mismatch + return Optional>(); + } else if (rangesA.empty() && !rangesB.empty()) { // rangesA is empty while rangesB has a range + return std::make_pair(emptyRange, rangesB.front()); + } else if (!rangesA.empty() && rangesB.empty()) { // rangesB is empty while rangesA has a range + return std::make_pair(rangesA.front(), emptyRange); + } + TraceEvent(SevVerbose, "AuditUtilRangesSameBeforeSort").detail("RangesA", rangesA).detail("Rangesb", rangesB); + // sort in ascending order + std::sort(rangesA.begin(), rangesA.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; }); + std::sort(rangesB.begin(), rangesB.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; }); + TraceEvent(SevVerbose, "AuditUtilRangesSameAfterSort").detail("RangesA", rangesA).detail("Rangesb", rangesB); + if (g_network->isSimulated()) { + ASSERT(elementsAreExclusiveWithEachOther(rangesA)); + ASSERT(elementsAreExclusiveWithEachOther(rangesB)); + } + if (rangesA.front().begin != rangesB.front().begin) { // rangeList heads mismatch + return std::make_pair(rangesA.front(), rangesB.front()); + } else if (rangesA.back().end != rangesB.back().end) { // rangeList backs mismatch + return std::make_pair(rangesA.back(), rangesB.back()); + } + int ia = 0; + int ib = 0; + KeyRangeRef rangeA = rangesA[0]; + KeyRangeRef rangeB = rangesB[0]; + KeyRange lastRangeA = Standalone(rangeA); + KeyRange lastRangeB = Standalone(rangeB); + while (true) { + if (rangeA.begin == rangeB.begin) { + if (rangeA.end == rangeB.end) { + if (rangeA.end == rangesA.back().end) { + break; + } + ++ia; + ++ib; + rangeA = rangesA[ia]; + rangeB = rangesB[ib]; + lastRangeA = Standalone(rangeA); + lastRangeB = Standalone(rangeB); + } else if (rangeA.end > rangeB.end) { + rangeA = KeyRangeRef(rangeB.end, rangeA.end); + ++ib; + rangeB = rangesB[ib]; + lastRangeB = Standalone(rangeB); + } else { + rangeB = KeyRangeRef(rangeA.end, rangeB.end); + ++ia; + rangeA = rangesA[ia]; + lastRangeA = Standalone(rangeA); + } + } else { + return std::make_pair(lastRangeA, lastRangeB); + } + } + return Optional>(); +} + +// Given an input server, get ranges within the input range via the input transaction +// from the perspective of ServerKeys system key space +// Input: (1) SS id; (2) transaction tr; (3) within range +// Return AuditGetServerKeysRes, including: (1) complete range by a single read range; +// (2) verison of the read; (3) ranges of the input SS +ACTOR Future getThisServerKeysFromServerKeys(UID serverID, Transaction* tr, KeyRange range) { + state RangeResult readResult; + state AuditGetServerKeysRes res; + + try { + wait(store(readResult, + krmGetRanges(tr, + serverKeysPrefixFor(serverID), + range, + CLIENT_KNOBS->KRM_GET_RANGE_LIMIT, + CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES))); + Future grvF = tr->getReadVersion(); + if (!grvF.isReady()) { + TraceEvent(SevWarnAlways, "AuditUtilReadServerKeysGRVError", serverID); + throw audit_storage_cancelled(); + } + Version readAtVersion = grvF.get(); + + TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromServerKeysReadDone", serverID) + .detail("Range", range) + .detail("Prefix", serverKeysPrefixFor(serverID)) + .detail("ResultSize", readResult.size()) + .detail("AduitServerID", serverID); + + std::vector ownRanges; + for (int i = 0; i < readResult.size() - 1; ++i) { + TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromServerKeysAddToResult", serverID) + .detail("ValueIsServerKeysFalse", readResult[i].value == serverKeysFalse) + .detail("ServerHasKey", serverHasKey(readResult[i].value)) + .detail("Range", KeyRangeRef(readResult[i].key, readResult[i + 1].key)) + .detail("AduitServerID", serverID); + if (serverHasKey(readResult[i].value)) { + KeyRange shardRange; + ownRanges.push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key))); + } + } + const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key)); + TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromServerKeysEnd", serverID) + .detail("AduitServerID", serverID) + .detail("Range", range) + .detail("Prefix", serverKeysPrefixFor(serverID)) + .detail("ReadAtVersion", readAtVersion) + .detail("CompleteRange", completeRange) + .detail("ResultSize", ownRanges.size()); + res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges, readResult.logicalSize()); + + } catch (Error& e) { + TraceEvent(SevDebug, "AuditUtilGetThisServerKeysError", serverID) + .errorUnsuppressed(e) + .detail("AduitServerID", serverID); + throw e; + } + + return res; +} + +// Given an input server, get ranges within the input range via the input transaction +// from the perspective of KeyServers system key space +// Input: (1) Audit Server ID (for logging); (2) transaction tr; (3) within range +// Return AuditGetKeyServersRes, including : (1) complete range by a single read range; (2) verison of the read; +// (3) map between SSes and their ranges --- in KeyServers space, a range corresponds to multiple SSes +ACTOR Future getShardMapFromKeyServers(UID auditServerId, Transaction* tr, KeyRange range) { + state AuditGetKeyServersRes res; + state std::vector> actors; + state RangeResult readResult; + state RangeResult UIDtoTagMap; + state int64_t totalShardsCount = 0; + state int64_t shardsInAnonymousPhysicalShardCount = 0; + + try { + // read + actors.push_back(store(readResult, + krmGetRanges(tr, + keyServersPrefix, + range, + CLIENT_KNOBS->KRM_GET_RANGE_LIMIT, + CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES))); + actors.push_back(store(UIDtoTagMap, tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY))); + wait(waitForAll(actors)); + if (UIDtoTagMap.more || UIDtoTagMap.size() >= CLIENT_KNOBS->TOO_MANY) { + TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, + "AuditUtilReadKeyServersReadTagError", + auditServerId); + throw audit_storage_cancelled(); + } + Future grvF = tr->getReadVersion(); + if (!grvF.isReady()) { + TraceEvent(SevWarnAlways, "AuditUtilReadKeyServersGRVError", auditServerId); + throw audit_storage_cancelled(); + } + Version readAtVersion = grvF.get(); + + TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromKeyServersReadDone", auditServerId) + .detail("Range", range) + .detail("ResultSize", readResult.size()) + .detail("AduitServerID", auditServerId); + + // produce result + std::unordered_map> serverOwnRanges; + for (int i = 0; i < readResult.size() - 1; ++i) { + std::vector src; + std::vector dest; + UID srcID; + UID destID; + decodeKeyServersValue(UIDtoTagMap, readResult[i].value, src, dest, srcID, destID); + if (srcID == anonymousShardId) { + shardsInAnonymousPhysicalShardCount++; + } + totalShardsCount++; + std::vector servers(src.size() + dest.size()); + std::merge(src.begin(), src.end(), dest.begin(), dest.end(), servers.begin()); + for (auto& ssid : servers) { + serverOwnRanges[ssid].push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key))); + } + } + const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key)); + TraceEvent(SevInfo, "AuditUtilGetThisServerKeysFromKeyServersEnd", auditServerId) + .detail("Range", range) + .detail("CompleteRange", completeRange) + .detail("AtVersion", readAtVersion) + .detail("ShardsInAnonymousPhysicalShardCount", shardsInAnonymousPhysicalShardCount) + .detail("TotalShardsCount", totalShardsCount); + res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges, readResult.logicalSize()); + + } catch (Error& e) { + TraceEvent(SevDebug, "AuditUtilGetThisServerKeysFromKeyServersError", auditServerId) + .errorUnsuppressed(e) + .detail("AuditServerId", auditServerId); + throw e; + } + + return res; +} diff --git a/fdbclient/include/fdbclient/AuditUtils.actor.h b/fdbclient/include/fdbclient/AuditUtils.actor.h index c7e17a47cd..4b2addb214 100644 --- a/fdbclient/include/fdbclient/AuditUtils.actor.h +++ b/fdbclient/include/fdbclient/AuditUtils.actor.h @@ -82,5 +82,41 @@ ACTOR Future> initAuditMetadata(Database cx, bool ddEnabled, UID dataDistributorId, int persistFinishAuditCount); + +struct AuditGetServerKeysRes { + KeyRange completeRange; + Version readAtVersion; + UID serverId; + std::vector ownRanges; + int64_t readBytes; + AuditGetServerKeysRes() = default; + AuditGetServerKeysRes(KeyRange completeRange, + Version readAtVersion, + UID serverId, + std::vector ownRanges, + int64_t readBytes) + : completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges), + readBytes(readBytes) {} +}; + +struct AuditGetKeyServersRes { + KeyRange completeRange; + Version readAtVersion; + int64_t readBytes; + std::unordered_map> rangeOwnershipMap; + AuditGetKeyServersRes() = default; + AuditGetKeyServersRes(KeyRange completeRange, + Version readAtVersion, + std::unordered_map> rangeOwnershipMap, + int64_t readBytes) + : completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap), + readBytes(readBytes) {} +}; + +std::vector coalesceRangeList(std::vector ranges); +Optional> rangesSame(std::vector rangesA, std::vector rangesB); +ACTOR Future getThisServerKeysFromServerKeys(UID serverID, Transaction* tr, KeyRange range); +ACTOR Future getShardMapFromKeyServers(UID auditServerId, Transaction* tr, KeyRange range); + #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 19d85fbc2b..90ee9daa97 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -330,17 +330,23 @@ ACTOR Future auditStorageCore(Reference self, UID auditID, AuditType auditType, int currentRetryCount); -ACTOR Future launchAudit(Reference self, KeyRange auditRange, AuditType auditType); +ACTOR Future launchAudit(Reference self, + KeyRange auditRange, + AuditType auditType, + KeyValueStoreType auditStorageEngineType); ACTOR Future auditStorage(Reference self, TriggerAuditRequest req); void loadAndDispatchAudit(Reference self, std::shared_ptr audit); ACTOR Future dispatchAuditStorageServerShard(Reference self, std::shared_ptr audit); ACTOR Future scheduleAuditStorageShardOnServer(Reference self, std::shared_ptr audit, StorageServerInterface ssi); -ACTOR Future scheduleAuditLocationMetadata(Reference self, +ACTOR Future dispatchAuditStorage(Reference self, std::shared_ptr audit); +ACTOR Future dispatchAuditLocationMetadata(Reference self, std::shared_ptr audit, KeyRange range); -ACTOR Future dispatchAuditStorage(Reference self, std::shared_ptr audit); +ACTOR Future doAuditLocationMetadata(Reference self, + std::shared_ptr audit, + KeyRange auditRange); ACTOR Future scheduleAuditOnRange(Reference self, std::shared_ptr audit, KeyRange range); @@ -2327,7 +2333,7 @@ void loadAndDispatchAudit(Reference self, std::shared_ptrcoreState.getType() == AuditType::ValidateReplica) { audit->actors.add(dispatchAuditStorage(self, audit)); } else if (audit->coreState.getType() == AuditType::ValidateLocationMetadata) { - audit->actors.add(scheduleAuditLocationMetadata(self, audit, allKeys)); + audit->actors.add(dispatchAuditLocationMetadata(self, audit, allKeys)); } else if (audit->coreState.getType() == AuditType::ValidateStorageServerShard) { audit->actors.add(dispatchAuditStorageServerShard(self, audit)); } else { @@ -2338,20 +2344,18 @@ void loadAndDispatchAudit(Reference self, std::shared_ptr scheduleAuditLocationMetadata(Reference self, +ACTOR Future dispatchAuditLocationMetadata(Reference self, std::shared_ptr audit, KeyRange range) { state const AuditType auditType = audit->coreState.getType(); ASSERT(auditType == AuditType::ValidateLocationMetadata); - TraceEvent(SevInfo, "DDScheduleAuditLocationMetadataBegin", self->ddId) + TraceEvent(SevInfo, "DDdispatchAuditLocationMetadataBegin", self->ddId) .detail("AuditID", audit->coreState.id) .detail("AuditType", auditType); state Key begin = range.begin; state KeyRange currentRange = range; state std::vector auditStates; state int64_t issueDoAuditCount = 0; - state AuditStorageRequest req; - state std::vector targetCandidates; try { while (begin < range.end) { @@ -2361,7 +2365,7 @@ ACTOR Future scheduleAuditLocationMetadata(Reference self getAuditStateByRange(self->txnProcessor->context(), auditType, audit->coreState.id, currentRange))); ASSERT(!auditStates.empty()); begin = auditStates.back().range.end; - TraceEvent(SevInfo, "DDScheduleAuditLocationMetadataDispatch", self->ddId) + TraceEvent(SevInfo, "DDdispatchAuditLocationMetadataDispatch", self->ddId) .detail("AuditID", audit->coreState.id) .detail("CurrentRange", currentRange) .detail("AuditType", auditType) @@ -2386,23 +2390,17 @@ ACTOR Future scheduleAuditLocationMetadata(Reference self audit->remainingBudgetForAuditTasks.set(audit->remainingBudgetForAuditTasks.get() - 1); ASSERT(audit->remainingBudgetForAuditTasks.get() >= 0); TraceEvent(SevDebug, "RemainingBudgetForAuditTasks") - .detail("Loc", "scheduleAuditLocationMetadata") + .detail("Loc", "dispatchAuditLocationMetadata") .detail("Ops", "Decrease") .detail("Val", audit->remainingBudgetForAuditTasks.get()) .detail("AuditType", auditType); - - req = AuditStorageRequest(audit->coreState.id, auditStates[i].range, auditType); issueDoAuditCount++; - req.ddId = self->ddId; // send this ddid to SS - targetCandidates.clear(); - wait(store(targetCandidates, getStorageServers(self->txnProcessor->context()))); - StorageServerInterface targetServer = deterministicRandom()->randomChoice(targetCandidates); - audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req)); + audit->actors.add(doAuditLocationMetadata(self, audit, auditStates[i].range)); } } wait(delay(0.1)); } - TraceEvent(SevInfo, "DDScheduleAuditLocationMetadataEnd", self->ddId) + TraceEvent(SevInfo, "DDdispatchAuditLocationMetadataEnd", self->ddId) .detail("AuditID", audit->coreState.id) .detail("AuditType", auditType) .detail("IssuedDoAuditCount", issueDoAuditCount); @@ -2411,7 +2409,7 @@ ACTOR Future scheduleAuditLocationMetadata(Reference self if (e.code() == error_code_actor_cancelled) { throw e; } - TraceEvent(SevWarn, "DDScheduleAuditLocationMetadataError", self->ddId) + TraceEvent(SevWarn, "DDdispatchAuditLocationMetadataError", self->ddId) .errorUnsuppressed(e) .detail("AuditID", audit->coreState.id) .detail("AuditType", auditType); @@ -2767,7 +2765,7 @@ ACTOR Future scheduleAuditOnRange(Reference self, if (auditType == AuditType::ValidateHA) { if (rangeLocations[rangeLocationIndex].servers.size() < 2) { TraceEvent(SevInfo, "DDScheduleAuditOnRangeEnd", self->ddId) - .detail("Reason", "Single replica, ignore") + .detail("Reason", "Single DC, ignore") .detail("AuditID", audit->coreState.id) .detail("AuditRange", audit->coreState.range) .detail("AuditType", auditType); @@ -2977,6 +2975,8 @@ ACTOR Future doAuditOnStorageServer(Reference self, StorageServerInterface ssi, AuditStorageRequest req) { state AuditType auditType = req.getType(); + ASSERT(auditType == AuditType::ValidateHA || auditType == AuditType::ValidateReplica || + auditType == AuditType::ValidateStorageServerShard); TraceEvent(SevInfo, "DDDoAuditOnStorageServerBegin", self->ddId) .detail("AuditID", req.id) .detail("Range", req.range) @@ -3049,16 +3049,286 @@ ACTOR Future doAuditOnStorageServer(Reference self, } else { ASSERT(req.getType() != AuditType::ValidateStorageServerShard); audit->retryCount++; - if (req.getType() == AuditType::ValidateLocationMetadata) { - audit->actors.add(scheduleAuditLocationMetadata(self, audit, req.range)); - } else { - audit->actors.add(scheduleAuditOnRange(self, audit, req.range)); - } + audit->actors.add(scheduleAuditOnRange(self, audit, req.range)); } } return Void(); } +// Check consistency between KeyServers and ServerKeys system key space +ACTOR Future doAuditLocationMetadata(Reference self, + std::shared_ptr audit, + KeyRange auditRange) { + ASSERT(audit->coreState.getType() == AuditType::ValidateLocationMetadata); + TraceEvent(SevInfo, "DDDoAuditLocationMetadataBegin", self->ddId) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange); + state AuditStorageState res(audit->coreState.id, audit->coreState.getType()); // we will set range of audit later + state std::vector> actors; + state std::vector errors; + state AuditGetKeyServersRes keyServerRes; + state std::unordered_map serverKeyResMap; + state Version readAtVersion; + state std::unordered_map> mapFromKeyServersRaw; + // Note that since krmReadRange may not return the value of the entire range at a time + // Given auditRange, a part of the range is returned, thus, only a part of the range is + // able to be compared --- claimRange + // Given claimRange, rangeToRead is decided for reading the remaining range + // At beginning, rangeToRead is auditRange + state KeyRange claimRange; + state KeyRange completeRangeByKeyServer; + // To compare + state std::unordered_map> mapFromServerKeys; + state std::unordered_map> mapFromKeyServers; + + state Transaction tr(self->txnProcessor->context()); + state Key rangeToReadBegin = auditRange.begin; + state KeyRangeRef rangeToRead; + state int64_t cumulatedValidatedServerKeysNum = 0; + state int64_t cumulatedValidatedKeyServersNum = 0; + state Reference rateLimiter = + Reference(new SpeedLimit(SERVER_KNOBS->AUDIT_STORAGE_RATE_PER_SERVER_MAX, 1)); + state int64_t remoteReadBytes = 0; + + try { + loop { + try { + // Read + actors.clear(); + errors.clear(); + mapFromServerKeys.clear(); + mapFromKeyServers.clear(); + serverKeyResMap.clear(); + mapFromKeyServersRaw.clear(); + remoteReadBytes = 0; + + rangeToRead = KeyRangeRef(rangeToReadBegin, auditRange.end); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + // Read KeyServers + wait(store(keyServerRes, getShardMapFromKeyServers(self->ddId, &tr, rangeToRead))); + completeRangeByKeyServer = keyServerRes.completeRange; + readAtVersion = keyServerRes.readAtVersion; + mapFromKeyServersRaw = keyServerRes.rangeOwnershipMap; + remoteReadBytes += keyServerRes.readBytes; + // Use ssid of mapFromKeyServersRaw to read ServerKeys + for (auto& [ssid, _] : mapFromKeyServersRaw) { + actors.push_back( + store(serverKeyResMap[ssid], getThisServerKeysFromServerKeys(ssid, &tr, rangeToRead))); + } + wait(waitForAll(actors)); + + // Decide claimRange and check readAtVersion + claimRange = completeRangeByKeyServer; + for (auto& [ssid, serverKeyRes] : serverKeyResMap) { + KeyRange serverKeyCompleteRange = serverKeyRes.completeRange; + TraceEvent(SevVerbose, "DDDoAuditLocationMetadataGetClaimRange", self->ddId) + .detail("ServerId", ssid) + .detail("ServerKeyCompleteRange", serverKeyCompleteRange) + .detail("CurrentClaimRange", claimRange); + KeyRange overlappingRange = serverKeyCompleteRange & claimRange; + if (serverKeyCompleteRange.begin != claimRange.begin || overlappingRange.empty() || + readAtVersion != serverKeyRes.readAtVersion) { + TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, + "DDDoAuditLocationMetadataReadCheckWrong", + self->ddId) + .detail("ServerKeyCompleteRangeBegin", serverKeyCompleteRange.begin) + .detail("ClaimRangeBegin", claimRange.begin) + .detail("OverlappingRange", overlappingRange) + .detail("ReadAtVersion", readAtVersion) + .detail("ServerKeyResReadAtVersion", serverKeyRes.readAtVersion); + throw audit_storage_cancelled(); + } + claimRange = overlappingRange; + remoteReadBytes += serverKeyRes.readBytes; + } + // Use claimRange to get mapFromServerKeys and mapFromKeyServers to compare + int64_t numValidatedServerKeys = 0; + for (auto& [ssid, serverKeyRes] : serverKeyResMap) { + for (auto& range : serverKeyRes.ownRanges) { + KeyRange overlappingRange = range & claimRange; + if (overlappingRange.empty()) { + continue; + } + TraceEvent(SevVerbose, "DDDoAuditLocationMetadataAddToServerKeyMap", self->ddId) + .detail("RawRange", range) + .detail("ClaimRange", claimRange) + .detail("Range", overlappingRange) + .detail("SSID", ssid); + mapFromServerKeys[ssid].push_back(overlappingRange); + numValidatedServerKeys++; + } + } + cumulatedValidatedServerKeysNum = cumulatedValidatedServerKeysNum + numValidatedServerKeys; + + int64_t numValidatedKeyServers = 0; + for (auto& [ssid, ranges] : mapFromKeyServersRaw) { + std::vector mergedRanges = coalesceRangeList(ranges); + for (auto& range : mergedRanges) { + KeyRange overlappingRange = range & claimRange; + if (overlappingRange.empty()) { + continue; + } + TraceEvent(SevVerbose, "DDDoAuditLocationMetadataAddToKeyServerMap", self->ddId) + .detail("RawRange", range) + .detail("ClaimRange", claimRange) + .detail("Range", overlappingRange) + .detail("SSID", ssid); + mapFromKeyServers[ssid].push_back(overlappingRange); + numValidatedKeyServers++; + } + } + cumulatedValidatedKeyServersNum = cumulatedValidatedKeyServersNum + numValidatedKeyServers; + + // Compare: check if mapFromKeyServers === mapFromServerKeys + // 1. check mapFromKeyServers => mapFromServerKeys + for (auto& [ssid, keyServerRanges] : mapFromKeyServers) { + if (!mapFromServerKeys.contains(ssid)) { + std::string error = + format("KeyServers and serverKeys mismatch: Some key in range(%s, %s) exists " + "on Server(%s) in KeyServers but not ServerKeys", + claimRange.toString().c_str(), + claimRange.toString().c_str(), + ssid.toString().c_str()); + errors.push_back(error); + TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange) + .detail("ClaimRange", claimRange) + .detail("ErrorMessage", error); + } + std::vector serverKeyRanges = mapFromServerKeys[ssid]; + Optional> anyMismatch = rangesSame(keyServerRanges, serverKeyRanges); + if (anyMismatch.present()) { // mismatch detected + KeyRange mismatchedRangeByKeyServer = anyMismatch.get().first; + KeyRange mismatchedRangeByServerKey = anyMismatch.get().second; + std::string error = + format("KeyServers and serverKeys mismatch on Server(%s): KeyServer: %s; ServerKey: %s", + ssid.toString().c_str(), + mismatchedRangeByKeyServer.toString().c_str(), + mismatchedRangeByServerKey.toString().c_str()); + errors.push_back(error); + TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange) + .detail("ClaimRange", claimRange) + .detail("ErrorMessage", error) + .detail("MismatchedRangeByKeyServer", mismatchedRangeByKeyServer) + .detail("MismatchedRangeByServerKey", mismatchedRangeByServerKey); + } + } + // 2. check mapFromServerKeys => mapFromKeyServers + for (auto& [ssid, serverKeyRanges] : mapFromServerKeys) { + if (!mapFromKeyServers.contains(ssid)) { + std::string error = + format("KeyServers and serverKeys mismatch: Some key of range(%s, %s) exists " + "on Server(%s) in ServerKeys but not KeyServers", + claimRange.toString().c_str(), + claimRange.toString().c_str(), + ssid.toString().c_str()); + errors.push_back(error); + TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange) + .detail("ClaimRange", claimRange) + .detail("ErrorMessage", error); + } + } + + // Log statistic + TraceEvent(SevInfo, "DDDoAuditLocationMetadataMetadata", self->ddId) + .suppressFor(30.0) + .detail("AuditType", audit->coreState.getType()) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange) + .detail("CurrentValidatedServerKeysNum", numValidatedServerKeys) + .detail("CurrentValidatedKeyServersNum", numValidatedServerKeys) + .detail("CurrentValidatedInclusiveRange", claimRange) + .detail("CumulatedValidatedServerKeysNum", cumulatedValidatedServerKeysNum) + .detail("CumulatedValidatedKeyServersNum", cumulatedValidatedKeyServersNum) + .detail("CumulatedValidatedInclusiveRange", KeyRangeRef(auditRange.begin, claimRange.end)); + + // Return result + if (!errors.empty()) { + TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange) + .detail("NumErrors", errors.size()) + .detail("Version", readAtVersion) + .detail("ClaimRange", claimRange); + res.range = claimRange; + res.setPhase(AuditPhase::Error); + res.ddId = self->ddId; // used to compare self->ddId with existing persisted ddId + wait(persistAuditStateByRange(self->txnProcessor->context(), res)); + throw audit_storage_error(); + } else { + // Expand persisted complete range + res.range = Standalone(KeyRangeRef(auditRange.begin, claimRange.end)); + res.setPhase(AuditPhase::Complete); + res.ddId = self->ddId; // used to compare self->ddId with existing persisted ddId + wait(persistAuditStateByRange(self->txnProcessor->context(), res)); + if (res.range.end < auditRange.end) { + TraceEvent(SevInfo, "DDDoAuditLocationMetadataPartialDone", self->ddId) + .suppressFor(10.0) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange) + .detail("Version", readAtVersion) + .detail("CompleteRange", res.range); + rangeToReadBegin = res.range.end; + } else { // complete + TraceEvent(SevInfo, "DDDoAuditLocationMetadataComplete", self->ddId) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange) + .detail("CompleteRange", res.range) + .detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum) + .detail("NumValidatedKeyServers", cumulatedValidatedKeyServersNum); + break; + } + } + } catch (Error& e) { + wait(tr.onError(e)); + } + + wait(rateLimiter->getAllowance(remoteReadBytes)); // Rate Keeping + } + audit->remainingBudgetForAuditTasks.set(audit->remainingBudgetForAuditTasks.get() + 1); + ASSERT(audit->remainingBudgetForAuditTasks.get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX); + TraceEvent(SevDebug, "RemainingBudgetForAuditTasks") + .detail("Loc", "doAuditLocationMetadata") + .detail("Ops", "Increase") + .detail("Val", audit->remainingBudgetForAuditTasks.get()) + .detail("AuditType", audit->coreState.getType()); + + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw e; + } + TraceEvent(SevInfo, "DDDoAuditLocationMetadataFailed", self->ddId) + .errorUnsuppressed(e) + .detail("AuditId", audit->coreState.id) + .detail("AuditRange", auditRange); + audit->remainingBudgetForAuditTasks.set(audit->remainingBudgetForAuditTasks.get() + 1); + ASSERT(audit->remainingBudgetForAuditTasks.get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX); + TraceEvent(SevDebug, "RemainingBudgetForAuditTasks") + .detail("Loc", "doAuditLocationMetadataFailed") + .detail("Ops", "Increase") + .detail("Val", audit->remainingBudgetForAuditTasks.get()) + .detail("AuditType", audit->coreState.getType()); + if (e.code() == error_code_audit_storage_error) { + throw audit_storage_error(); + } else if (e.code() == error_code_audit_storage_cancelled) { + throw audit_storage_cancelled(); + } else if (audit->retryCount >= SERVER_KNOBS->AUDIT_RETRY_COUNT_MAX) { + throw audit_storage_failed(); + } else { + audit->retryCount++; + audit->actors.add(dispatchAuditLocationMetadata(self, audit, auditRange)); + } + } + + return Void(); +} + ACTOR Future dataDistributor_impl(DataDistributorInterface di, Reference self, IsMocked isMocked) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 46103be72d..704db54951 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -5000,121 +5000,6 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector>& vec, return mappedKeyTuple.pack(); } -// Check if any pair of ranges are exclusive with each other -// This is not a part in consistency check of audit metadata -// This is used for checking the validity of inputs to rangesSame() -bool elementsAreExclusiveWithEachOther(std::vector ranges) { - ASSERT(std::is_sorted(ranges.begin(), ranges.end(), KeyRangeRef::ArbitraryOrder())); - for (int i = 0; i < ranges.size() - 1; ++i) { - if (ranges[i].end > ranges[i + 1].begin) { - TraceEvent(SevError, "ElementsAreNotExclusiveWithEachOther").detail("Ranges", describe(ranges)); - return false; - } - } - return true; -} - -// Check if any range is empty in the given list of ranges -// This is not a part in consistency check of audit metadata -// This is used for checking the validity of inputs to rangesSame() -bool noEmptyRangeInRanges(std::vector ranges) { - for (const auto& range : ranges) { - if (range.empty()) { - return false; - } - } - return true; -} - -// Given a list of ranges, where ranges can overlap with each other -// Return a list of exclusive ranges which covers the ranges exactly -// the same as the input list of ranges -std::vector coalesceRangeList(std::vector ranges) { - std::sort(ranges.begin(), ranges.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; }); - std::vector res; - for (const auto& range : ranges) { - if (res.empty()) { - res.push_back(range); - continue; - } - if (range.begin <= res.back().end) { - if (range.end > res.back().end) { // update res.back if current range extends the back range - KeyRange newBack = Standalone(KeyRangeRef(res.back().begin, range.end)); - res.pop_back(); - res.push_back(newBack); - } - } else { - res.push_back(range); - } - } - return res; -} - -// Given two lists of ranges --- rangesA and rangesB, check if two lists are identical -// If not, return the mismatched two ranges of rangeA and rangeB respectively -Optional> rangesSame(std::vector rangesA, std::vector rangesB) { - if (g_network->isSimulated()) { - ASSERT(noEmptyRangeInRanges(rangesA)); - ASSERT(noEmptyRangeInRanges(rangesB)); - } - KeyRange emptyRange; - if (rangesA.empty() && rangesB.empty()) { // no mismatch - return Optional>(); - } else if (rangesA.empty() && !rangesB.empty()) { // rangesA is empty while rangesB has a range - return std::make_pair(emptyRange, rangesB.front()); - } else if (!rangesA.empty() && rangesB.empty()) { // rangesB is empty while rangesA has a range - return std::make_pair(rangesA.front(), emptyRange); - } - TraceEvent(SevVerbose, "RangesSameBeforeSort").detail("RangesA", rangesA).detail("Rangesb", rangesB); - // sort in ascending order - std::sort(rangesA.begin(), rangesA.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; }); - std::sort(rangesB.begin(), rangesB.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; }); - TraceEvent(SevVerbose, "RangesSameAfterSort").detail("RangesA", rangesA).detail("Rangesb", rangesB); - if (g_network->isSimulated()) { - ASSERT(elementsAreExclusiveWithEachOther(rangesA)); - ASSERT(elementsAreExclusiveWithEachOther(rangesB)); - } - if (rangesA.front().begin != rangesB.front().begin) { // rangeList heads mismatch - return std::make_pair(rangesA.front(), rangesB.front()); - } else if (rangesA.back().end != rangesB.back().end) { // rangeList backs mismatch - return std::make_pair(rangesA.back(), rangesB.back()); - } - int ia = 0; - int ib = 0; - KeyRangeRef rangeA = rangesA[0]; - KeyRangeRef rangeB = rangesB[0]; - KeyRange lastRangeA = Standalone(rangeA); - KeyRange lastRangeB = Standalone(rangeB); - while (true) { - if (rangeA.begin == rangeB.begin) { - if (rangeA.end == rangeB.end) { - if (rangeA.end == rangesA.back().end) { - break; - } - ++ia; - ++ib; - rangeA = rangesA[ia]; - rangeB = rangesB[ib]; - lastRangeA = Standalone(rangeA); - lastRangeB = Standalone(rangeB); - } else if (rangeA.end > rangeB.end) { - rangeA = KeyRangeRef(rangeB.end, rangeA.end); - ++ib; - rangeB = rangesB[ib]; - lastRangeB = Standalone(rangeB); - } else { - rangeB = KeyRangeRef(rangeA.end, rangeB.end); - ++ia; - rangeA = rangesA[ia]; - lastRangeA = Standalone(rangeA); - } - } else { - return std::make_pair(lastRangeA, lastRangeB); - } - } - return Optional>(); -} - struct AuditGetShardInfoRes { Version readAtVersion; UID serverId; @@ -5153,175 +5038,6 @@ AuditGetShardInfoRes getThisServerShardInfo(StorageServer* data, KeyRange range) return AuditGetShardInfoRes(data->version.get(), data->thisServerID, ownRange); } -struct AuditGetServerKeysRes { - KeyRange completeRange; - Version readAtVersion; - UID serverId; - std::vector ownRanges; - int64_t readBytes; - AuditGetServerKeysRes() = default; - AuditGetServerKeysRes(KeyRange completeRange, - Version readAtVersion, - UID serverId, - std::vector ownRanges, - int64_t readBytes) - : completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges), - readBytes(readBytes) {} -}; - -// Given an input server, get ranges within the input range via the input transaction -// from the perspective of ServerKeys system key space -// Input: (1) SS id; (2) transaction tr; (3) within range -// Return AuditGetServerKeysRes, including: (1) complete range by a single read range; -// (2) verison of the read; (3) ranges of the input SS -ACTOR Future getThisServerKeysFromServerKeys(UID serverID, Transaction* tr, KeyRange range) { - state RangeResult readResult; - state AuditGetServerKeysRes res; - - try { - wait(store(readResult, - krmGetRanges(tr, - serverKeysPrefixFor(serverID), - range, - CLIENT_KNOBS->KRM_GET_RANGE_LIMIT, - CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES))); - Future grvF = tr->getReadVersion(); - if (!grvF.isReady()) { - TraceEvent(SevWarnAlways, "SSAuditStorageReadServerKeysGRVError", serverID); - throw audit_storage_cancelled(); - } - Version readAtVersion = grvF.get(); - - TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromServerKeysReadDone", serverID) - .detail("Range", range) - .detail("Prefix", serverKeysPrefixFor(serverID)) - .detail("ResultSize", readResult.size()) - .detail("AduitServerID", serverID); - - std::vector ownRanges; - for (int i = 0; i < readResult.size() - 1; ++i) { - TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromServerKeysAddToResult", serverID) - .detail("ValueIsServerKeysFalse", readResult[i].value == serverKeysFalse) - .detail("ServerHasKey", serverHasKey(readResult[i].value)) - .detail("Range", KeyRangeRef(readResult[i].key, readResult[i + 1].key)) - .detail("AduitServerID", serverID); - if (serverHasKey(readResult[i].value)) { - KeyRange shardRange; - ownRanges.push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key))); - } - } - const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key)); - TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromServerKeysEnd", serverID) - .detail("AduitServerID", serverID) - .detail("Range", range) - .detail("Prefix", serverKeysPrefixFor(serverID)) - .detail("ReadAtVersion", readAtVersion) - .detail("CompleteRange", completeRange) - .detail("ResultSize", ownRanges.size()); - res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges, readResult.logicalSize()); - - } catch (Error& e) { - TraceEvent(SevDebug, "SSAuditStorageGetThisServerKeysError", serverID) - .errorUnsuppressed(e) - .detail("AduitServerID", serverID); - throw e; - } - - return res; -} - -struct AuditGetKeyServersRes { - KeyRange completeRange; - Version readAtVersion; - int64_t readBytes; - std::unordered_map> rangeOwnershipMap; - AuditGetKeyServersRes() = default; - AuditGetKeyServersRes(KeyRange completeRange, - Version readAtVersion, - std::unordered_map> rangeOwnershipMap, - int64_t readBytes) - : completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap), - readBytes(readBytes) {} -}; - -// Given an input server, get ranges within the input range via the input transaction -// from the perspective of KeyServers system key space -// Input: (1) Audit Server ID (for logging); (2) transaction tr; (3) within range -// Return AuditGetKeyServersRes, including : (1) complete range by a single read range; (2) verison of the read; -// (3) map between SSes and their ranges --- in KeyServers space, a range corresponds to multiple SSes -ACTOR Future getShardMapFromKeyServers(UID auditServerId, Transaction* tr, KeyRange range) { - state AuditGetKeyServersRes res; - state std::vector> actors; - state RangeResult readResult; - state RangeResult UIDtoTagMap; - state int64_t totalShardsCount = 0; - state int64_t shardsInAnonymousPhysicalShardCount = 0; - - try { - // read - actors.push_back(store(readResult, - krmGetRanges(tr, - keyServersPrefix, - range, - CLIENT_KNOBS->KRM_GET_RANGE_LIMIT, - CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES))); - actors.push_back(store(UIDtoTagMap, tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY))); - wait(waitForAll(actors)); - if (UIDtoTagMap.more || UIDtoTagMap.size() >= CLIENT_KNOBS->TOO_MANY) { - TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, - "SSAuditStorageReadKeyServersReadTagError", - auditServerId); - throw audit_storage_cancelled(); - } - Future grvF = tr->getReadVersion(); - if (!grvF.isReady()) { - TraceEvent(SevWarnAlways, "SSAuditStorageReadKeyServersGRVError", auditServerId); - throw audit_storage_cancelled(); - } - Version readAtVersion = grvF.get(); - - TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromKeyServersReadDone", auditServerId) - .detail("Range", range) - .detail("ResultSize", readResult.size()) - .detail("AduitServerID", auditServerId); - - // produce result - std::unordered_map> serverOwnRanges; - for (int i = 0; i < readResult.size() - 1; ++i) { - std::vector src; - std::vector dest; - UID srcID; - UID destID; - decodeKeyServersValue(UIDtoTagMap, readResult[i].value, src, dest, srcID, destID); - if (srcID == anonymousShardId) { - shardsInAnonymousPhysicalShardCount++; - } - totalShardsCount++; - std::vector servers(src.size() + dest.size()); - std::merge(src.begin(), src.end(), dest.begin(), dest.end(), servers.begin()); - for (auto& ssid : servers) { - serverOwnRanges[ssid].push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key))); - } - } - const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key)); - TraceEvent(SevInfo, "SSAuditStorageGetThisServerKeysFromKeyServersEnd", auditServerId) - .detail("Range", range) - .detail("CompleteRange", completeRange) - .detail("AtVersion", readAtVersion) - .detail("ShardsInAnonymousPhysicalShardCount", shardsInAnonymousPhysicalShardCount) - .detail("TotalShardsCount", totalShardsCount); - res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges, readResult.logicalSize()); - - } catch (Error& e) { - TraceEvent(SevDebug, "SSAuditStorageGetThisServerKeysFromKeyServersError", auditServerId) - .errorUnsuppressed(e) - .detail("AuditServerId", auditServerId); - throw e; - } - - return res; -} - // Check consistency between StorageServer->shardInfo and ServerKeys system key space ACTOR Future auditStorageServerShardQ(StorageServer* data, AuditStorageRequest req) { ASSERT(req.getType() == AuditType::ValidateStorageServerShard); @@ -5665,6 +5381,13 @@ ACTOR Future auditStorageServerShardQ(StorageServer* data, AuditStorageReq rangeToReadBegin = res.range.end; } else { // complete req.reply.send(res); + TraceEvent(SevInfo, "SSAuditStorageSsShardComplete", data->thisServerID) + .detail("AuditId", req.id) + .detail("AuditRange", req.range) + .detail("AuditServer", data->thisServerID) + .detail("CompleteRange", res.range) + .detail("NumValidatedLocalShards", cumulatedValidatedLocalShardsNum) + .detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum); break; } } @@ -5697,14 +5420,6 @@ ACTOR Future auditStorageServerShardQ(StorageServer* data, AuditStorageReq } } - TraceEvent(SevInfo, "SSAuditStorageSsShardComplete", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("AuditServer", data->thisServerID) - .detail("CompleteRange", res.range) - .detail("NumValidatedLocalShards", cumulatedValidatedLocalShardsNum) - .detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum); - // Make sure the history collection is not open due to this audit data->stopTrackShardAssignment(); TraceEvent(SevVerbose, "SSShardAssignmentHistoryRecordStopWhenExit", data->thisServerID).detail("AuditID", req.id); @@ -5712,283 +5427,6 @@ ACTOR Future auditStorageServerShardQ(StorageServer* data, AuditStorageReq return Void(); } -// Check consistency between KeyServers and ServerKeys system key space -ACTOR Future auditShardLocationMetadataQ(StorageServer* data, AuditStorageRequest req) { - ASSERT(req.getType() == AuditType::ValidateLocationMetadata); - wait(data->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield)); - state FlowLock::Releaser holder(data->serveAuditStorageParallelismLock); - TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataBegin", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range); - state AuditStorageState res(req.id, req.getType()); // we will set range of audit later - state std::vector> actors; - state std::vector errors; - state AuditGetKeyServersRes keyServerRes; - state std::unordered_map serverKeyResMap; - state Version readAtVersion; - state std::unordered_map> mapFromKeyServersRaw; - // Note that since krmReadRange may not return the value of the entire range at a time - // Given req.range, a part of the range is returned, thus, only a part of the range is - // able to be compared --- claimRange - // Given claimRange, rangeToRead is decided for reading the remaining range - // At beginning, rangeToRead is req.range - state KeyRange claimRange; - state KeyRange completeRangeByKeyServer; - // To compare - state std::unordered_map> mapFromServerKeys; - state std::unordered_map> mapFromKeyServers; - - state Transaction tr(data->cx); - state Key rangeToReadBegin = req.range.begin; - state KeyRangeRef rangeToRead; - state int64_t cumulatedValidatedServerKeysNum = 0; - state int64_t cumulatedValidatedKeyServersNum = 0; - state Reference rateLimiter = - Reference(new SpeedLimit(SERVER_KNOBS->AUDIT_STORAGE_RATE_PER_SERVER_MAX, 1)); - state int64_t remoteReadBytes = 0; - - try { - loop { - try { - // Read - actors.clear(); - errors.clear(); - mapFromServerKeys.clear(); - mapFromKeyServers.clear(); - serverKeyResMap.clear(); - mapFromKeyServersRaw.clear(); - remoteReadBytes = 0; - - rangeToRead = KeyRangeRef(rangeToReadBegin, req.range.end); - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - // Read KeyServers - wait(store(keyServerRes, getShardMapFromKeyServers(data->thisServerID, &tr, rangeToRead))); - completeRangeByKeyServer = keyServerRes.completeRange; - readAtVersion = keyServerRes.readAtVersion; - mapFromKeyServersRaw = keyServerRes.rangeOwnershipMap; - remoteReadBytes += keyServerRes.readBytes; - // Use ssid of mapFromKeyServersRaw to read ServerKeys - for (auto& [ssid, _] : mapFromKeyServersRaw) { - actors.push_back( - store(serverKeyResMap[ssid], getThisServerKeysFromServerKeys(ssid, &tr, rangeToRead))); - } - wait(waitForAll(actors)); - - // Decide claimRange and check readAtVersion - claimRange = completeRangeByKeyServer; - for (auto& [ssid, serverKeyRes] : serverKeyResMap) { - KeyRange serverKeyCompleteRange = serverKeyRes.completeRange; - TraceEvent(SevVerbose, "SSAuditStorageShardLocMetadataGetClaimRange") - .detail("ServerId", ssid) - .detail("ServerKeyCompleteRange", serverKeyCompleteRange) - .detail("CurrentClaimRange", claimRange); - KeyRange overlappingRange = serverKeyCompleteRange & claimRange; - if (serverKeyCompleteRange.begin != claimRange.begin || overlappingRange.empty() || - readAtVersion != serverKeyRes.readAtVersion) { - TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, - "SSAuditStorageShardLocMetadataReadCheckWrong", - data->thisServerID) - .detail("ServerKeyCompleteRangeBegin", serverKeyCompleteRange.begin) - .detail("ClaimRangeBegin", claimRange.begin) - .detail("OverlappingRange", overlappingRange) - .detail("ReadAtVersion", readAtVersion) - .detail("ServerKeyResReadAtVersion", serverKeyRes.readAtVersion); - throw audit_storage_cancelled(); - } - claimRange = overlappingRange; - remoteReadBytes += serverKeyRes.readBytes; - } - // Use claimRange to get mapFromServerKeys and mapFromKeyServers to compare - int64_t numValidatedServerKeys = 0; - for (auto& [ssid, serverKeyRes] : serverKeyResMap) { - for (auto& range : serverKeyRes.ownRanges) { - KeyRange overlappingRange = range & claimRange; - if (overlappingRange.empty()) { - continue; - } - TraceEvent(SevVerbose, "SSAuditStorageShardLocMetadataAddToServerKeyMap") - .detail("RawRange", range) - .detail("ClaimRange", claimRange) - .detail("Range", overlappingRange) - .detail("SSID", ssid); - mapFromServerKeys[ssid].push_back(overlappingRange); - numValidatedServerKeys++; - } - } - cumulatedValidatedServerKeysNum = cumulatedValidatedServerKeysNum + numValidatedServerKeys; - - int64_t numValidatedKeyServers = 0; - for (auto& [ssid, ranges] : mapFromKeyServersRaw) { - std::vector mergedRanges = coalesceRangeList(ranges); - for (auto& range : mergedRanges) { - KeyRange overlappingRange = range & claimRange; - if (overlappingRange.empty()) { - continue; - } - TraceEvent(SevVerbose, "SSAuditStorageShardLocMetadataAddToKeyServerMap") - .detail("RawRange", range) - .detail("ClaimRange", claimRange) - .detail("Range", overlappingRange) - .detail("SSID", ssid); - mapFromKeyServers[ssid].push_back(overlappingRange); - numValidatedKeyServers++; - } - } - cumulatedValidatedKeyServersNum = cumulatedValidatedKeyServersNum + numValidatedKeyServers; - - // Compare: check if mapFromKeyServers === mapFromServerKeys - // 1. check mapFromKeyServers => mapFromServerKeys - for (auto& [ssid, keyServerRanges] : mapFromKeyServers) { - if (!mapFromServerKeys.contains(ssid)) { - std::string error = - format("KeyServers and serverKeys mismatch: Some key in range(%s, %s) exists " - "on Server(%s) in KeyServers but not ServerKeys", - claimRange.toString().c_str(), - claimRange.toString().c_str(), - ssid.toString().c_str()); - errors.push_back(error); - TraceEvent(SevError, "SSAuditStorageShardLocationMetadataError", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("ClaimRange", claimRange) - .detail("ErrorMessage", error) - .detail("AuditServerID", data->thisServerID); - } - std::vector serverKeyRanges = mapFromServerKeys[ssid]; - Optional> anyMismatch = rangesSame(keyServerRanges, serverKeyRanges); - if (anyMismatch.present()) { // mismatch detected - KeyRange mismatchedRangeByKeyServer = anyMismatch.get().first; - KeyRange mismatchedRangeByServerKey = anyMismatch.get().second; - std::string error = - format("KeyServers and serverKeys mismatch on Server(%s): KeyServer: %s; ServerKey: %s", - ssid.toString().c_str(), - mismatchedRangeByKeyServer.toString().c_str(), - mismatchedRangeByServerKey.toString().c_str()); - errors.push_back(error); - TraceEvent(SevError, "SSAuditStorageShardLocationMetadataError", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("ClaimRange", claimRange) - .detail("ErrorMessage", error) - .detail("MismatchedRangeByKeyServer", mismatchedRangeByKeyServer) - .detail("MismatchedRangeByServerKey", mismatchedRangeByServerKey) - .detail("AuditServerID", data->thisServerID); - } - } - // 2. check mapFromServerKeys => mapFromKeyServers - for (auto& [ssid, serverKeyRanges] : mapFromServerKeys) { - if (!mapFromKeyServers.contains(ssid)) { - std::string error = - format("KeyServers and serverKeys mismatch: Some key of range(%s, %s) exists " - "on Server(%s) in ServerKeys but not KeyServers", - claimRange.toString().c_str(), - claimRange.toString().c_str(), - ssid.toString().c_str()); - errors.push_back(error); - TraceEvent(SevError, "SSAuditStorageShardLocationMetadataError", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("ClaimRange", claimRange) - .detail("ErrorMessage", error) - .detail("AuditServerID", data->thisServerID); - } - } - - // Log statistic - TraceEvent(SevInfo, "SSAuditStorageStatisticLocationMetadata", data->thisServerID) - .suppressFor(30.0) - .detail("AuditType", req.getType()) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("CurrentValidatedServerKeysNum", numValidatedServerKeys) - .detail("CurrentValidatedKeyServersNum", numValidatedServerKeys) - .detail("CurrentValidatedInclusiveRange", claimRange) - .detail("CumulatedValidatedServerKeysNum", cumulatedValidatedServerKeysNum) - .detail("CumulatedValidatedKeyServersNum", cumulatedValidatedKeyServersNum) - .detail("CumulatedValidatedInclusiveRange", KeyRangeRef(req.range.begin, claimRange.end)); - - // Return result - if (!errors.empty()) { - TraceEvent(SevError, "SSAuditStorageShardLocMetadataError", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("NumErrors", errors.size()) - .detail("Version", readAtVersion) - .detail("AuditServerId", data->thisServerID) - .detail("ClaimRange", claimRange); - res.range = claimRange; - res.setPhase(AuditPhase::Error); - if (!req.ddId.isValid()) { - TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, - "SSAuditStorageShardLocMetadataDDIdInvalid", - data->thisServerID); - throw audit_storage_cancelled(); - } - res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId - wait(persistAuditStateByRange(data->cx, res)); - req.reply.sendError(audit_storage_error()); - break; - } else { - // Expand persisted complete range - res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end)); - res.setPhase(AuditPhase::Complete); - if (!req.ddId.isValid()) { - TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, - "SSAuditStorageShardLocMetadataDDIdInvalid", - data->thisServerID); - throw audit_storage_cancelled(); - } - res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId - wait(persistAuditStateByRange(data->cx, res)); - if (res.range.end < req.range.end) { - TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataPartialDone", data->thisServerID) - .suppressFor(10.0) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("Version", readAtVersion) - .detail("AuditServerId", data->thisServerID) - .detail("CompleteRange", res.range); - rangeToReadBegin = res.range.end; - } else { // complete - req.reply.send(res); - break; - } - } - } catch (Error& e) { - wait(tr.onError(e)); - } - - wait(rateLimiter->getAllowance(remoteReadBytes)); // Rate Keeping - } - - } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) { - return Void(); // sliently exit - } - TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataFailed", data->thisServerID) - .errorUnsuppressed(e) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("AuditServer", data->thisServerID); - if (e.code() == error_code_audit_storage_cancelled) { - req.reply.sendError(audit_storage_cancelled()); - } else { - req.reply.sendError(audit_storage_failed()); - } - } - - TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataComplete", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("AuditServer", data->thisServerID) - .detail("CompleteRange", res.range) - .detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum) - .detail("NumValidatedKeyServers", cumulatedValidatedKeyServersNum); - - return Void(); -} - ACTOR Future auditStorageShardReplicaQ(StorageServer* data, AuditStorageRequest req) { ASSERT(req.getType() == AuditType::ValidateHA || req.getType() == AuditType::ValidateReplica); wait(data->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield)); @@ -6305,6 +5743,14 @@ ACTOR Future auditStorageShardReplicaQ(StorageServer* data, AuditStorageRe // Expand persisted complete range if (complete) { req.reply.send(res); + TraceEvent(SevInfo, "SSAuditStorageShardReplicaComplete", data->thisServerID) + .detail("AuditId", req.id) + .detail("AuditRange", req.range) + .detail("AuditServer", data->thisServerID) + .detail("CompleteRange", res.range) + .detail("CheckTimes", checkTimes) + .detail("NumValidatedKeys", numValidatedKeys) + .detail("ValidatedBytes", validatedBytes); break; } else { TraceEvent(SevInfo, "SSAuditStorageShardReplicaPartialDone", data->thisServerID) @@ -6339,14 +5785,6 @@ ACTOR Future auditStorageShardReplicaQ(StorageServer* data, AuditStorageRe req.reply.sendError(audit_storage_failed()); } } - TraceEvent(SevInfo, "SSAuditStorageShardReplicaComplete", data->thisServerID) - .detail("AuditId", req.id) - .detail("AuditRange", req.range) - .detail("AuditServer", data->thisServerID) - .detail("CompleteRange", res.range) - .detail("CheckTimes", checkTimes) - .detail("NumValidatedKeys", numValidatedKeys) - .detail("ValidatedBytes", validatedBytes); return Void(); } @@ -14017,7 +13455,8 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface } when(AuditStorageRequest req = waitNext(ssi.auditStorage.getFuture())) { // Check req - if (!req.id.isValid() || !req.ddId.isValid() || req.range.empty()) { + if (!req.id.isValid() || !req.ddId.isValid() || req.range.empty() || + req.getType() == AuditType::ValidateLocationMetadata) { // ddId is used when persist progress TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "AuditRequestInvalid") // unexpected .detail("AuditRange", req.range) @@ -14033,8 +13472,6 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->actors.add(auditStorageShardReplicaQ(self, req)); } else if (req.getType() == AuditType::ValidateReplica) { self->actors.add(auditStorageShardReplicaQ(self, req)); - } else if (req.getType() == AuditType::ValidateLocationMetadata) { - self->actors.add(auditShardLocationMetadataQ(self, req)); } else if (req.getType() == AuditType::ValidateStorageServerShard) { self->actors.add(auditStorageServerShardQ(self, req)); } else {