Merge branch 'main' of https://github.com/apple/foundationdb into disable-physical-shard-move

This commit is contained in:
He Liu 2023-05-30 15:27:57 -07:00
commit aeb7f2bd4d
18 changed files with 255 additions and 122 deletions

View File

@ -22,8 +22,8 @@ set(RocksDB_CMAKE_ARGS
-DFAIL_ON_WARNINGS=OFF
-DWITH_GFLAGS=OFF
-DWITH_TESTS=OFF
-DWITH_TOOLS=OFF
-DWITH_CORE_TOOLS=OFF
-DWITH_TOOLS=${ROCKSDB_TOOLS}
-DWITH_CORE_TOOLS=${ROCKSDB_TOOLS}
-DWITH_BENCHMARK_TOOLS=OFF
-DWITH_BZ2=OFF
-DWITH_LZ4=ON

View File

@ -157,6 +157,7 @@ set(PORTABLE_ROCKSDB ON CACHE BOOL "Compile RocksDB in portable mode") # Set thi
set(ROCKSDB_SSE42 OFF CACHE BOOL "Compile RocksDB with SSE42 enabled")
set(ROCKSDB_AVX ${USE_AVX} CACHE BOOL "Compile RocksDB with AVX enabled")
set(ROCKSDB_AVX2 OFF CACHE BOOL "Compile RocksDB with AVX2 enabled")
set(ROCKSDB_TOOLS OFF CACHE BOOL "Compile RocksDB tools")
set(WITH_LIBURING OFF CACHE BOOL "Build with liburing enabled") # Set this to ON to include liburing
# RocksDB is currently enabled by default for GCC but does not build with the latest
# Clang.

View File

@ -63,7 +63,7 @@ ACTOR Future<bool> getAuditStatusCommandActor(Database cx, std::vector<StringRef
}
const UID id = UID::fromString(tokens[3].toString());
AuditStorageState res = wait(getAuditState(cx, type, id));
printf("Audit result is:\n%s", res.toStringForCLI().c_str());
printf("Audit result is:\n%s", res.toString().c_str());
} else if (tokencmp(tokens[2], "recent")) {
int count = CLIENT_KNOBS->TOO_MANY;
if (tokens.size() == 4) {

View File

@ -119,26 +119,28 @@ ACTOR Future<std::vector<AuditStorageState>> getAuditStates(Database cx,
state std::vector<AuditStorageState> auditStates;
state Key readBegin;
state Key readEnd;
state RangeResult res;
state Reverse reverse = newFirst ? Reverse::True : Reverse::False;
if (num.present() && num.get() == 0) {
return auditStates;
}
loop {
try {
readBegin = auditKeyRange(auditType).begin;
readEnd = auditKeyRange(auditType).end;
auditStates.clear();
while (true) {
res.clear();
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
KeyRangeRef rangeToRead(readBegin, readEnd);
if (num.present()) {
wait(store(res, tr.getRange(rangeToRead, num.get(), Snapshot::False, reverse)));
} else {
wait(store(res, tr.getRange(rangeToRead, GetRangeLimits(), Snapshot::False, reverse)));
}
state RangeResult res = wait(tr.getRange(rangeToRead,
num.present() ? GetRangeLimits(num.get()) : GetRangeLimits(),
Snapshot::False,
reverse));
for (int i = 0; i < res.size(); ++i) {
auditStates.push_back(decodeAuditStorageState(res[i].value));
if (num.present() && auditStates.size() == num.get()) {
return auditStates; // since res.more is not reliable when GetRangeLimits is set to 1
}
}
if (!res.more) {
break;
@ -251,7 +253,6 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
TraceEvent(SevDebug, "ConflictWithPreviousOwner");
throw movekeys_conflict(); // need a new name
}
// Take the lock
if (isWrite) {
BinaryWriter wrMyOwner(Unversioned());
@ -267,7 +268,6 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
.detail("MyOwner", lock.myOwner.toString())
.detail("Writer", lastWriter.toString());
}
return Void();
} else if (currentOwner == lock.myOwner) {
if (isWrite) {
@ -278,7 +278,6 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
// Make this transaction self-conflicting so the database will not execute it twice with the same write key
tr->makeSelfConflicting();
}
return Void();
} else {
CODE_PROBE(true, "checkMoveKeysLock: Conflict with new owner");
@ -287,6 +286,37 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
}
}
ACTOR Future<Void> updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(checkMoveKeysLock(&tr, lock, ddEnabled, true));
// Persist audit result
tr.set(auditKey(auditState.getType(), auditState.id), auditStorageStateValue(auditState));
wait(tr.commit());
TraceEvent(SevDebug, "AuditUtilUpdateAuditState", auditState.id)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditPhase", auditState.getPhase())
.detail("AuditKey", auditKey(auditState.getType(), auditState.id));
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilUpdateAuditStateError", auditState.id)
.errorUnsuppressed(e)
.detail("AuditID", auditState.id)
.detail("AuditType", auditState.getType())
.detail("AuditPhase", auditState.getPhase())
.detail("AuditKey", auditKey(auditState.getType(), auditState.id));
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<UID> persistNewAuditState(Database cx,
AuditStorageState auditState,
MoveKeyLockInfo lock,
@ -333,7 +363,6 @@ ACTOR Future<UID> persistNewAuditState(Database cx,
TraceEvent(SevDebug, "AuditUtilPersistedNewAuditState", auditId)
.detail("AuditKey", auditKey(auditState.getType(), auditId));
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilPersistedNewAuditStateError", auditId)
.errorUnsuppressed(e)
@ -384,7 +413,6 @@ ACTOR Future<Void> persistAuditState(Database cx,
.detail("AuditKey", auditKey(auditState.getType(), auditState.id))
.detail("Context", context);
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilPersistAuditStateError", auditState.id)
.errorUnsuppressed(e)
@ -415,7 +443,6 @@ ACTOR Future<AuditStorageState> getAuditState(Database cx, AuditType type, UID i
.detail("AuditType", type)
.detail("AuditKey", auditKey(type, id));
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilReadAuditStateError", id)
.errorUnsuppressed(e)
@ -493,12 +520,15 @@ ACTOR Future<Void> persistAuditStateByRange(Database cx, AuditStorageState audit
if (ddAuditState.getPhase() != AuditPhase::Running) {
throw audit_storage_failed();
}
ASSERT(ddAuditState.ddId.isValid());
if (ddAuditState.ddId != auditState.ddId) {
throw audit_storage_failed(); // a new dd starts and this audit task is outdated
}
wait(krmSetRange(&tr,
auditRangeBasedProgressPrefixFor(auditState.getType(), auditState.id),
auditState.range,
auditStorageStateValue(auditState)));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
@ -525,7 +555,6 @@ ACTOR Future<std::vector<AuditStorageState>> getAuditStateByRange(Database cx,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES));
auditStates = res_;
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId);
wait(tr.onError(e));
@ -567,13 +596,16 @@ ACTOR Future<Void> persistAuditStateByServer(Database cx, AuditStorageState audi
if (ddAuditState.getPhase() != AuditPhase::Running) {
throw audit_storage_failed();
}
ASSERT(ddAuditState.ddId.isValid());
if (ddAuditState.ddId != auditState.ddId) {
throw audit_storage_failed(); // a new dd starts and this audit task is outdated
}
wait(krmSetRange(
&tr,
auditServerBasedProgressPrefixFor(auditState.getType(), auditState.id, auditState.auditServerId),
auditState.range,
auditStorageStateValue(auditState)));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
@ -601,7 +633,6 @@ ACTOR Future<std::vector<AuditStorageState>> getAuditStateByServer(Database cx,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES));
auditStates = res_;
break;
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilGetAuditStateForRangeError").errorUnsuppressed(e).detail("AuditID", auditId);
wait(tr.onError(e));

View File

@ -26,7 +26,9 @@
#include "fdbclient/NativeAPI.actor.h"
#include <ctime>
#include <climits>
#include "fdbrpc/simulator.h"
#include "flow/IAsyncFile.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/Hash3.h"
#include <numeric>
@ -361,8 +363,10 @@ struct BackupRangeTaskFunc : TaskFuncBase {
if ((!prevAdjacent || !nextAdjacent) &&
rangeCount > ((prevAdjacent || nextAdjacent) ? CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT
: CLIENT_KNOBS->BACKUP_MAP_KEY_LOWER_LIMIT)) {
CODE_PROBE(true, "range insert delayed because too versionMap is too large");
: CLIENT_KNOBS->BACKUP_MAP_KEY_LOWER_LIMIT) &&
(!g_network->isSimulated() ||
(isBuggifyEnabled(BuggifyType::General) && !g_simulator->speedUpSimulation))) {
CODE_PROBE(true, "range insert delayed because versionMap is too large");
if (rangeCount > CLIENT_KNOBS->BACKUP_MAP_KEY_UPPER_LIMIT)
TraceEvent(SevWarnAlways, "DBA_KeyRangeMapTooLarge").log();

View File

@ -180,7 +180,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled
init( ENABLE_DD_PHYSICAL_SHARD_MOVE, false ); if( isSimulated ) ENABLE_DD_PHYSICAL_SHARD_MOVE = deterministicRandom()->coinflip();
init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.0 ); if( isSimulated ) DD_PHYSICAL_SHARD_MOVE_PROBABILITY = 0.5;
init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server
init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD
@ -755,6 +754,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3;
init( SPRING_BYTES_STORAGE_SERVER, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER = 300e3;
init( AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES, 800e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES = 2500e3;
init( AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER, 200e6 ); if( smallStorageTarget ) AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER = 500e3;
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 750e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
@ -833,13 +833,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( GLOBAL_TAG_THROTTLING, true ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip();
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
// 60 seconds was chosen as a default value to ensure that
// 10 seconds was chosen as a default value to ensure that
// the global tag throttler does not react too drastically to
// changes in workload. To make the global tag throttler more reactive,
// lower this knob. To make global tag throttler more smooth, raise this knob.
// Setting this knob lower than TAG_MEASUREMENT_INTERVAL can cause erratic
// behaviour and is not recommended.
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 60.0 );
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
init( GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL, 60.0 );
@ -878,8 +878,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( SERVE_AUDIT_STORAGE_PARALLELISM, 1 );
init( PERSIST_FINISH_AUDIT_COUNT, 10 ); if ( isSimulated ) PERSIST_FINISH_AUDIT_COUNT = 1;
init( AUDIT_RETRY_COUNT_MAX, 100 ); if ( isSimulated ) AUDIT_RETRY_COUNT_MAX = 10;
init( SS_AUDIT_AUTO_PROCEED_COUNT_MAX, 5 );
init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 50 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1);
init( CONCURRENT_AUDIT_TASK_COUNT_MAX, 10 ); if ( isSimulated ) CONCURRENT_AUDIT_TASK_COUNT_MAX = deterministicRandom()->randomInt(1, CONCURRENT_AUDIT_TASK_COUNT_MAX+1);
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
@ -919,7 +918,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
init( MIN_TAG_READ_PAGES_RATE, 100 ); if( randomize && BUGGIFY ) MIN_TAG_READ_PAGES_RATE = 0;
init( MIN_TAG_WRITE_PAGES_RATE, 100 ); if( randomize && BUGGIFY ) MIN_TAG_WRITE_PAGES_RATE = 0;
init( TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 4.0;
init( TAG_MEASUREMENT_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 10.0;
init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, true ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = false;
init( REPORT_DD_METRICS, true );
init( DD_METRICS_REPORT_INTERVAL, 30.0 );

View File

@ -45,27 +45,26 @@ enum class AuditType : uint8_t {
struct AuditStorageState {
constexpr static FileIdentifier file_identifier = 13804340;
AuditStorageState() : type(0), auditServerId(UID()), phase(0) {}
AuditStorageState() : type(0), auditServerId(UID()), phase(0), ddId(UID()) {}
AuditStorageState(UID id, UID auditServerId, AuditType type)
: id(id), auditServerId(auditServerId), type(static_cast<uint8_t>(type)), phase(0) {}
: id(id), auditServerId(auditServerId), type(static_cast<uint8_t>(type)), phase(0), ddId(UID()) {}
AuditStorageState(UID id, KeyRange range, AuditType type)
: id(id), auditServerId(UID()), range(range), type(static_cast<uint8_t>(type)), phase(0) {}
: id(id), auditServerId(UID()), range(range), type(static_cast<uint8_t>(type)), phase(0), ddId(UID()) {}
AuditStorageState(UID id, AuditType type)
: id(id), auditServerId(UID()), type(static_cast<uint8_t>(type)), phase(0) {}
: id(id), auditServerId(UID()), type(static_cast<uint8_t>(type)), phase(0), ddId(UID()) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, auditServerId, range, type, phase, error);
serializer(ar, id, auditServerId, range, type, phase, error, ddId);
}
void setType(AuditType type) { this->type = static_cast<uint8_t>(type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
inline void setType(AuditType type) { this->type = static_cast<uint8_t>(type); }
inline AuditType getType() const { return static_cast<AuditType>(this->type); }
void setPhase(AuditPhase phase) { this->phase = static_cast<uint8_t>(phase); }
AuditPhase getPhase() const { return static_cast<AuditPhase>(this->phase); }
inline void setPhase(AuditPhase phase) { this->phase = static_cast<uint8_t>(phase); }
inline AuditPhase getPhase() const { return static_cast<AuditPhase>(this->phase); }
// for fdbcli get_audit_status
std::string toStringForCLI() const {
std::string toString() const {
std::string res = "AuditStorageState: [ID]: " + id.toString() +
", [Range]: " + Traceable<KeyRangeRef>::toString(range) +
", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase);
@ -76,21 +75,15 @@ struct AuditStorageState {
return res;
}
// for traceevent
std::string toString() const {
std::string res = "AuditStorageState: [ID]: " + id.toString() +
", [Range]: " + Traceable<KeyRangeRef>::toString(range) +
", [Type]: " + std::to_string(type) + ", [Phase]: " + std::to_string(phase) +
", [AuditServerID]: " + auditServerId.toString();
if (!error.empty()) {
res += "[Error]: " + error;
}
return res;
}
UID id;
UID auditServerId;
UID ddId; // ddId indicates this audit is managed by which dd
// ddId is used to check if dd has changed
// When a new dd starts in the middle of an onging audit,
// The ongoing audit's ddId gets updated
// When SS updates the progress, it checks ddId
// If the ddId is updated, SS Audit actors of the old dd will stop themselves
// New dd will issue new requests to SSes to continue the remaining work
UID auditServerId; // UID of SS who is working on this audit task
KeyRange range;
uint8_t type;
uint8_t phase;
@ -105,15 +98,16 @@ struct AuditStorageRequest {
AuditStorageRequest(UID id, KeyRange range, AuditType type)
: id(id), range(range), type(static_cast<uint8_t>(type)) {}
void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
AuditType getType() const { return static_cast<AuditType>(this->type); }
inline void setType(AuditType type) { this->type = static_cast<uint8_t>(this->type); }
inline AuditType getType() const { return static_cast<AuditType>(this->type); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, range, type, targetServers, reply);
serializer(ar, id, range, type, targetServers, reply, ddId);
}
UID id;
UID ddId; // UID of DD who claims the audit
KeyRange range;
uint8_t type;
std::vector<UID> targetServers;

View File

@ -66,5 +66,6 @@ ACTOR Future<Void> clearAuditMetadataForType(Database cx,
UID maxAuditIdToClear,
int numFinishAuditToKeep);
ACTOR Future<bool> checkStorageServerRemoved(Database cx, UID ssid);
ACTOR Future<Void> updateAuditState(Database cx, AuditStorageState auditState, MoveKeyLockInfo lock, bool ddEnabled);
#include "flow/unactorcompiler.h"
#endif

View File

@ -829,7 +829,7 @@ struct RangeResultRef : VectorRef<KeyValueRef> {
serializer(ar, ((VectorRef<KeyValueRef>&)*this), more, readThrough, readToBegin, readThroughEnd);
}
int logicalSize() const {
int64_t logicalSize() const {
return VectorRef<KeyValueRef>::expectedSize() - VectorRef<KeyValueRef>::size() * sizeof(KeyValueRef);
}

View File

@ -204,7 +204,6 @@ public:
bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID.
bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true.
bool ENABLE_DD_PHYSICAL_SHARD_MOVE; // Enable physical shard move.
double DD_PHYSICAL_SHARD_MOVE_PROBABILITY; // Percentage of physical shard move, in the range of [0, 1].
int64_t MAX_PHYSICAL_SHARD_BYTES;
double PHYSICAL_SHARD_METRICS_DELAY;
@ -711,6 +710,7 @@ public:
int64_t TARGET_BYTES_PER_STORAGE_SERVER;
int64_t SPRING_BYTES_STORAGE_SERVER;
int64_t AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES;
int64_t AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER;
int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH;
int64_t SPRING_BYTES_STORAGE_SERVER_BATCH;
int64_t STORAGE_HARD_LIMIT_BYTES;
@ -865,7 +865,6 @@ public:
int SERVE_AUDIT_STORAGE_PARALLELISM;
int PERSIST_FINISH_AUDIT_COUNT; // Num of persist complete/failed audits for each type
int AUDIT_RETRY_COUNT_MAX;
int SS_AUDIT_AUTO_PROCEED_COUNT_MAX;
int CONCURRENT_AUDIT_TASK_COUNT_MAX;
int BUGGIFY_BLOCK_BYTES;
int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT;

View File

@ -1047,7 +1047,7 @@ void DDQueue::launchQueuedWork(std::set<RelocateData, std::greater<RelocateData>
rrs.dataMoveId = UID();
} else {
const bool enabled =
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(),
AssignEmptyRange::False,
EnablePhysicalShardMove(enabled));
@ -1635,7 +1635,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
self->moveCreateNewPhysicalShard++;
}
const bool enabled =
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
rd.dataMoveId = newDataMoveId(
physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled));
TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard")

View File

@ -96,8 +96,8 @@ struct DDAudit {
int64_t overallIssuedDoAuditCount;
int64_t overallCompleteDoAuditCount;
void setAuditRunActor(Future<Void> actor) { auditActor = actor; }
Future<Void> getAuditRunActor() { return auditActor; }
inline void setAuditRunActor(Future<Void> actor) { auditActor = actor; }
inline Future<Void> getAuditRunActor() { return auditActor; }
// auditActor and actors are guaranteed to deliver a cancel signal
void cancel() {
@ -771,15 +771,49 @@ void cancelAllAuditsInAuditMap(Reference<DataDistributor> self) {
return;
}
void resumeStorageAudits(Reference<DataDistributor> self) {
ACTOR Future<Void> resumeStorageAudits(Reference<DataDistributor> self) {
ASSERT(!self->auditInitialized.getFuture().isReady());
if (self->initData->auditStates.empty()) {
self->auditInitialized.send(Void());
TraceEvent(SevVerbose, "AuditStorageResumeEmptyDone", self->ddId);
return;
return Void();
}
cancelAllAuditsInAuditMap(self); // cancel existing audits
// Update metadata
state int retryCount = 0;
loop {
try {
std::vector<Future<Void>> fs;
state MoveKeyLockInfo lockInfo;
lockInfo.myOwner = self->lock.myOwner;
lockInfo.prevOwner = self->lock.prevOwner;
lockInfo.prevWrite = self->lock.prevWrite;
for (const auto& auditState : self->initData->auditStates) {
// Only running audit will be resumed
if (auditState.getPhase() == AuditPhase::Running) {
AuditStorageState toUpdate = auditState;
toUpdate.ddId = self->ddId;
fs.push_back(updateAuditState(
self->txnProcessor->context(), toUpdate, lockInfo, self->context->isDDEnabled()));
}
}
wait(waitForAll(fs));
break;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_movekeys_conflict) {
throw e;
}
if (retryCount > 50) {
TraceEvent(SevWarnAlways, "ResumeAuditStorageUnableUpdateMetadata", self->ddId).errorUnsuppressed(e);
return Void();
}
retryCount++;
}
}
// Following is atomic
// Cancel existing audits and restore
cancelAllAuditsInAuditMap(self);
std::unordered_map<AuditType, std::vector<AuditStorageState>> restoredAudits;
for (const auto& auditState : self->initData->auditStates) {
restoredAudits[auditState.getType()].push_back(auditState);
@ -843,7 +877,7 @@ void resumeStorageAudits(Reference<DataDistributor> self) {
self->auditInitialized.send(Void());
TraceEvent(SevDebug, "AuditStorageResumeDone", self->ddId);
return;
return Void();
}
// Periodically check and log the physicalShard status; clean up empty physicalShard;
@ -1001,7 +1035,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
anyZeroHealthyTeams = zeroHealthyTeams[0];
}
resumeStorageAudits(self);
actors.push_back(resumeStorageAudits(self));
actors.push_back(self->pollMoveKeysLock());
@ -1753,6 +1787,7 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
try {
ASSERT(audit != nullptr);
ASSERT(audit->coreState.ddId == self->ddId);
loadAndDispatchAudit(self, audit, audit->coreState.range);
TraceEvent(SevInfo, "DDAuditStorageCoreScheduled", self->ddId)
.detail("Context", context)
@ -1931,6 +1966,7 @@ void runAuditStorage(Reference<DataDistributor> self,
ASSERT(auditState.id.isValid());
ASSERT(!auditState.range.empty());
ASSERT(auditState.getPhase() == AuditPhase::Running);
auditState.ddId = self->ddId; // make sure any existing audit state claims the current DD
std::shared_ptr<DDAudit> audit = std::make_shared<DDAudit>(auditState);
audit->retryCount = retryCount;
TraceEvent(SevDebug, "DDRunAuditStorage", self->ddId)
@ -1997,6 +2033,7 @@ ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRan
auditState.setType(auditType);
auditState.range = auditRange;
auditState.setPhase(AuditPhase::Running);
auditState.ddId = self->ddId; // persist ddId to new ddAudit metadata
TraceEvent(SevVerbose, "DDAuditStorageLaunchPersistNewAuditIDBefore", self->ddId)
.detail("AuditType", auditType)
.detail("Range", auditRange);
@ -2227,6 +2264,7 @@ ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor>
// We always issue exactly one audit task (for the remaining part) when schedule
ASSERT(issueDoAuditCount == 0);
issueDoAuditCount++;
req.ddId = self->ddId; // send this ddid to SS
audit->actors.add(doAuditOnStorageServer(self, audit, ssi, req));
}
}
@ -2447,6 +2485,7 @@ ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX - 1);
}
issueDoAuditCount++;
req.ddId = self->ddId; // send this ddid to SS
audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req));
}
}
@ -2532,7 +2571,8 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
self->remainingBudgetForAuditTasks[auditType].set(self->remainingBudgetForAuditTasks[auditType].get() + 1);
ASSERT(self->remainingBudgetForAuditTasks[auditType].get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX);
if (e.code() == error_code_actor_cancelled) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_not_implemented ||
e.code() == error_code_audit_storage_exceeded_request_limit) {
throw e;
} else if (e.code() == error_code_audit_storage_error) {
audit->foundError = true;

View File

@ -534,7 +534,7 @@ public:
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
auto& ssInfo = ssInfos[ss.id];
ssInfo.throttlingRatio = ss.getTagThrottlingRatio(SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES,
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
SERVER_KNOBS->AUTO_TAG_THROTTLE_SPRING_BYTES_STORAGE_SERVER);
ssInfo.zoneId = ss.locality.zoneId();
auto& tagToThroughputCounters = throughput[ss.id];

View File

@ -1953,9 +1953,7 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
wait(waitForAll(actors));
if (range.end == dataMove.ranges.front().end) {
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) {
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
}
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
tr.clear(dataMoveKeyFor(dataMoveId));
complete = true;
TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", dataMoveId)
@ -2707,9 +2705,7 @@ ACTOR Future<Void> cleanUpDataMoveCore(Database occ,
}
if (range.end == dataMove.ranges.front().end) {
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) {
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
}
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
tr.clear(dataMoveKeyFor(dataMoveId));
complete = true;
TraceEvent(sevDm, "CleanUpDataMoveDeleteMetaData", dataMoveId)

View File

@ -1391,11 +1391,13 @@ UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) {
Optional<double> StorageQueueInfo::getTagThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const {
auto const storageQueue = getStorageQueueBytes();
if (storageQueue < storageTargetBytes - storageSpringBytes) {
return {};
// TODO: Remove duplicate calculation from Ratekeeper::updateRate
double inverseResult = std::min(
2.0, (storageQueue - storageTargetBytes + storageSpringBytes) / static_cast<double>(storageSpringBytes));
if (inverseResult > 0) {
return 1.0 / inverseResult;
} else {
return std::max(
0.0, static_cast<double>((storageTargetBytes + storageSpringBytes) - storageQueue) / storageSpringBytes);
return {};
}
}

View File

@ -997,7 +997,7 @@ public:
std::unordered_map<UID, std::shared_ptr<MoveInShard>> moveInShards;
bool shardAware; // True if the storage server is aware of the physical shards.
Future<Void> auditSSShardInfoActor;
std::unordered_map<AuditType, std::pair<UID, ActorCollection>> auditTasks;
// Histograms
struct FetchKeysHistograms {
@ -4899,7 +4899,8 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector<Optional<Tuple>>& vec,
ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
AuditStorageState auditState,
Version version,
StorageServerInterface remoteServer) {
StorageServerInterface remoteServer,
UID ddId) {
TraceEvent(SevInfo, "ValidateRangeAgainstServerBegin", data->thisServerID)
.detail("AuditID", auditState.id)
.detail("Range", auditState.range)
@ -4911,6 +4912,10 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
state int validatedKeys = 0;
state std::string error;
state int64_t cumulatedValidatedKeysNum = 0;
state Reference<IRateControl> rateLimiter =
Reference<IRateControl>(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1));
state int64_t remoteReadBytes = 0;
loop {
try {
std::vector<Future<ErrorOr<GetKeyValuesReply>>> fs;
@ -4956,6 +4961,7 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
}
const GetKeyValuesReply &remote = reps[0].get(), local = reps[1].get();
remoteReadBytes = remote.data.expectedSize();
Key lastKey = range.begin;
auditState.range = range;
@ -5029,8 +5035,13 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
range = KeyRangeRef(keyAfter(lastKey), range.end);
auditState.range = KeyRangeRef(originBegin, range.begin);
auditState.setPhase(AuditPhase::Complete);
ASSERT(ddId.isValid());
auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByRange(data->cx, auditState));
}
wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping
} catch (Error& e) {
TraceEvent(SevWarn, "ValidateRangeAgainstServerFailed", data->thisServerID)
.errorUnsuppressed(e)
@ -5048,6 +5059,8 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
.detail("ErrorMessage", error)
.detail("RemoteServer", remoteServer.toString());
auditState.setPhase(AuditPhase::Error);
ASSERT(ddId.isValid());
auditState.ddId = ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByRange(data->cx, auditState));
throw audit_storage_error();
}
@ -5061,7 +5074,10 @@ ACTOR Future<Void> validateRangeAgainstServer(StorageServer* data,
return Void();
}
ACTOR Future<Void> validateRangeShard(StorageServer* data, AuditStorageState auditState, std::vector<UID> candidates) {
ACTOR Future<Void> validateRangeShard(StorageServer* data,
AuditStorageState auditState,
std::vector<UID> candidates,
UID ddId) {
TraceEvent(SevDebug, "ServeValidateRangeShardBegin", data->thisServerID)
.detail("Range", auditState.range)
.detail("Servers", describe(candidates));
@ -5122,7 +5138,7 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, AuditStorageState aud
}
if (remoteServer != nullptr) {
wait(validateRangeAgainstServer(data, auditState, version, *remoteServer));
wait(validateRangeAgainstServer(data, auditState, version, *remoteServer, ddId));
} else {
TraceEvent(SevWarn, "ServeValidateRangeShardRemoteNotFound", data->thisServerID)
.detail("Range", auditState.range)
@ -5135,7 +5151,8 @@ ACTOR Future<Void> validateRangeShard(StorageServer* data, AuditStorageState aud
ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data,
AuditStorageState auditState,
std::vector<UID> targetServers) {
std::vector<UID> targetServers,
UID ddId) {
TraceEvent(SevDebug, "ValidateRangeAgainstServersBegin", data->thisServerID)
.detail("AuditID", auditState.id)
.detail("Range", auditState.range)
@ -5172,7 +5189,7 @@ ACTOR Future<Void> validateRangeAgainstServers(StorageServer* data,
.detail("Range", auditState.range);
throw audit_storage_failed();
}
fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get())));
fs.push_back(validateRangeAgainstServer(data, auditState, version, decodeServerListValue(v.get()), ddId));
}
wait(waitForAll(fs));
@ -5332,9 +5349,15 @@ struct AuditGetServerKeysRes {
Version readAtVersion;
UID serverId;
std::vector<KeyRange> ownRanges;
int64_t readBytes;
AuditGetServerKeysRes() = default;
AuditGetServerKeysRes(KeyRange completeRange, Version readAtVersion, UID serverId, std::vector<KeyRange> ownRanges)
: completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges) {}
AuditGetServerKeysRes(KeyRange completeRange,
Version readAtVersion,
UID serverId,
std::vector<KeyRange> ownRanges,
int64_t readBytes)
: completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges),
readBytes(readBytes) {}
};
// Given an input server, get ranges within the input range via the input transaction
@ -5382,7 +5405,7 @@ ACTOR Future<AuditGetServerKeysRes> getThisServerKeysFromServerKeys(UID serverID
.detail("ReadAtVersion", readAtVersion)
.detail("CompleteRange", completeRange)
.detail("ResultSize", ownRanges.size());
res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges);
res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges, readResult.logicalSize());
} catch (Error& e) {
TraceEvent(SevDebug, "AuditStorageGetThisServerKeysError", serverID)
@ -5397,12 +5420,15 @@ ACTOR Future<AuditGetServerKeysRes> getThisServerKeysFromServerKeys(UID serverID
struct AuditGetKeyServersRes {
KeyRange completeRange;
Version readAtVersion;
int64_t readBytes;
std::unordered_map<UID, std::vector<KeyRange>> rangeOwnershipMap;
AuditGetKeyServersRes() = default;
AuditGetKeyServersRes(KeyRange completeRange,
Version readAtVersion,
std::unordered_map<UID, std::vector<KeyRange>> rangeOwnershipMap)
: completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap) {}
std::unordered_map<UID, std::vector<KeyRange>> rangeOwnershipMap,
int64_t readBytes)
: completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap),
readBytes(readBytes) {}
};
// Given an input server, get ranges within the input range via the input transaction
@ -5462,7 +5488,7 @@ ACTOR Future<AuditGetKeyServersRes> getShardMapFromKeyServers(UID auditServerId,
.detail("AtVersion", readAtVersion)
.detail("ShardsInAnonymousPhysicalShardCount", shardsInAnonymousPhysicalShardCount)
.detail("TotalShardsCount", totalShardsCount);
res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges);
res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges, readResult.logicalSize());
} catch (Error& e) {
TraceEvent(SevDebug, "AuditStorageGetThisServerKeysFromKeyServersError", auditServerId)
@ -5502,11 +5528,11 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
state Key rangeToReadBegin = req.range.begin;
state KeyRangeRef rangeToRead;
state int retryCount = 0;
state int storageAutoProceedCount = 0;
// storageAutoProceedCount is guard to make sure that audit at SS does not run too long
// by itself without being notified by DD
state int64_t cumulatedValidatedLocalShardsNum = 0;
state int64_t cumulatedValidatedServerKeysNum = 0;
state Reference<IRateControl> rateLimiter =
Reference<IRateControl>(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1));
state int64_t remoteReadBytes = 0;
try {
while (true) {
@ -5538,12 +5564,14 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
serverKeyCompleteRange = serverKeyRes.completeRange;
serverKeyReadAtVersion = serverKeyRes.readAtVersion;
ownRangesSeenByServerKey = serverKeyRes.ownRanges;
remoteReadBytes = serverKeyRes.readBytes;
// We want to do transactional read at a version newer than data->version
while (serverKeyReadAtVersion < localShardInfoReadAtVersion) {
if (retryCount >= SERVER_KNOBS->AUDIT_RETRY_COUNT_MAX) {
failureReason = "Read serverKeys retry count exceeds the max";
throw audit_storage_failed();
}
wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping
retryCount++;
wait(delay(0.5));
tr.reset();
@ -5552,6 +5580,7 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
serverKeyCompleteRange = serverKeyRes.completeRange;
serverKeyReadAtVersion = serverKeyRes.readAtVersion;
ownRangesSeenByServerKey = serverKeyRes.ownRanges;
remoteReadBytes = serverKeyRes.readBytes;
} // retry until serverKeyReadAtVersion is as larger as localShardInfoReadAtVersion
ASSERT(serverKeyReadAtVersion >= localShardInfoReadAtVersion);
try {
@ -5651,6 +5680,8 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
.detail("AuditServer", data->thisServerID);
res.range = claimRange;
res.setPhase(AuditPhase::Error);
ASSERT(req.ddId.isValid());
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByServer(data->cx, res));
req.reply.sendError(audit_storage_error());
break;
@ -5658,6 +5689,8 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
// Expand persisted complete range
res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end));
res.setPhase(AuditPhase::Complete);
ASSERT(req.ddId.isValid());
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByServer(data->cx, res));
TraceEvent(SevInfo, "AuditStorageSsShardDone", data->thisServerID)
.detail("AuditId", req.id)
@ -5665,16 +5698,14 @@ ACTOR Future<Void> auditStorageStorageServerShardQ(StorageServer* data, AuditSto
.detail("AuditServer", data->thisServerID)
.detail("CompleteRange", res.range);
if (claimRange.end < rangeToRead.end) {
if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) {
throw audit_storage_failed();
}
rangeToReadBegin = claimRange.end;
storageAutoProceedCount++;
} else { // complete
req.reply.send(res);
break;
}
}
wait(rateLimiter->getAllowance(remoteReadBytes)); // RateKeeping
}
} catch (Error& e) {
TraceEvent(SevInfo, "AuditStorageSsShardFailed", data->thisServerID)
@ -5726,11 +5757,11 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
state Transaction tr(data->cx);
state Key rangeToReadBegin = req.range.begin;
state KeyRangeRef rangeToRead;
state int storageAutoProceedCount = 0;
// storageAutoProceedCount is guard to make sure that audit at SS does not run too long
// by itself without being notified by DD
state int64_t cumulatedValidatedServerKeysNum = 0;
state int64_t cumulatedValidatedKeyServersNum = 0;
state Reference<IRateControl> rateLimiter =
Reference<IRateControl>(new SpeedLimit(CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX, 1));
state int64_t remoteReadBytes = 0;
try {
while (true) {
@ -5741,6 +5772,7 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
mapFromKeyServers.clear();
serverKeyResMap.clear();
mapFromKeyServersRaw.clear();
remoteReadBytes = 0;
rangeToRead = KeyRangeRef(rangeToReadBegin, req.range.end);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
@ -5749,6 +5781,7 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
completeRangeByKeyServer = keyServerRes.completeRange;
readAtVersion = keyServerRes.readAtVersion;
mapFromKeyServersRaw = keyServerRes.rangeOwnershipMap;
remoteReadBytes += keyServerRes.readBytes;
// Use ssid of mapFromKeyServersRaw to read ServerKeys
for (auto& [ssid, _] : mapFromKeyServersRaw) {
actors.push_back(store(serverKeyResMap[ssid], getThisServerKeysFromServerKeys(ssid, &tr, rangeToRead)));
@ -5768,6 +5801,7 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
ASSERT(!overlappingRange.empty());
claimRange = overlappingRange;
ASSERT(readAtVersion == serverKeyRes.readAtVersion);
remoteReadBytes += serverKeyRes.readBytes;
}
// Use claimRange to get mapFromServerKeys and mapFromKeyServers to compare
int64_t numValidatedServerKeys = 0;
@ -5887,6 +5921,8 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
.detail("ClaimRange", claimRange);
res.range = claimRange;
res.setPhase(AuditPhase::Error);
ASSERT(req.ddId.isValid());
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByRange(data->cx, res));
req.reply.sendError(audit_storage_error());
break;
@ -5894,6 +5930,8 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
// Expand persisted complete range
res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end));
res.setPhase(AuditPhase::Complete);
ASSERT(req.ddId.isValid());
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByRange(data->cx, res));
TraceEvent(SevInfo, "AuditStorageShardLocMetadataDone", data->thisServerID)
.detail("AuditId", req.id)
@ -5902,16 +5940,13 @@ ACTOR Future<Void> auditStorageLocationMetadataQ(StorageServer* data, AuditStora
.detail("AuditServerId", data->thisServerID)
.detail("CompleteRange", res.range);
if (claimRange.end < rangeToRead.end) {
if (storageAutoProceedCount > SERVER_KNOBS->SS_AUDIT_AUTO_PROCEED_COUNT_MAX) {
throw audit_storage_failed();
}
rangeToReadBegin = claimRange.end;
storageAutoProceedCount++;
} else { // complete
req.reply.send(res);
break;
}
}
wait(rateLimiter->getAllowance(remoteReadBytes)); // Rate Keeping
}
} catch (Error& e) {
@ -5968,7 +6003,8 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
fs.push_back(validateRangeShard(
data,
AuditStorageState(res.id, KeyRangeRef(shards[i].key, shards[i + 1].key), res.getType()),
src));
src,
req.ddId));
begin = shards[i + 1].key;
}
} catch (Error& e) {
@ -5976,7 +6012,7 @@ ACTOR Future<Void> auditStorageQ(StorageServer* data, AuditStorageRequest req) {
}
}
} else {
fs.push_back(validateRangeAgainstServers(data, res, req.targetServers));
fs.push_back(validateRangeAgainstServers(data, res, req.targetServers, req.ddId));
}
wait(waitForAll(fs));
@ -9435,6 +9471,8 @@ ACTOR Future<Void> cleanUpMoveInShard(StorageServer* data, Version version, Move
}
wait(data->durableVersion.whenAtLeast(mLV.version + 1));
data->moveInShards.erase(moveInShard->id());
return Void();
}
@ -13510,20 +13548,43 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
self->actors.add(fetchCheckpointKeyValuesQ(self, req));
}
when(AuditStorageRequest req = waitNext(ssi.auditStorage.getFuture())) {
// A SS can run one ValidateStorageServerShard at a time
// We do not have this limitation on other audit types
if (req.getType() == AuditType::ValidateStorageServerShard) {
if (self->auditSSShardInfoActor.isValid() && !self->auditSSShardInfoActor.isReady()) {
TraceEvent(SevWarn, "ExistRunningAuditStorageForServerShard")
.detail("NewAuditId", req.id)
.detail("NewAuditType", req.getType());
self->auditSSShardInfoActor.cancel();
} // New audit immediately starts and existing one gets cancelled
self->auditSSShardInfoActor = auditStorageStorageServerShardQ(self, req);
// Check existing audit task states
if (self->auditTasks.contains(req.getType())) {
if (req.id != self->auditTasks[req.getType()].first) {
// Any task of past audit must be ready
if (!self->auditTasks[req.getType()].second.getResult().isReady()) {
req.reply.sendError(audit_storage_exceeded_request_limit());
TraceEvent(SevWarnAlways, "ExistSSAuditWithDifferentId") // unexpected
.detail("NewAuditId", req.id)
.detail("NewAuditType", req.getType());
continue;
}
} else if (req.getType() == AuditType::ValidateStorageServerShard &&
!self->auditTasks[req.getType()].second.getResult().isReady()) {
// Only one ValidateStorageServerShard is allowed to run at a time
TraceEvent(SevWarn, "ExistSSAuditForServerShardWithSameId")
.detail("AuditId", req.id)
.detail("AuditType", req.getType());
self->auditTasks[req.getType()].second.clear(true);
}
}
// Prepare for the new audit task
if (!self->auditTasks.contains(req.getType()) ||
self->auditTasks[req.getType()].second.getResult().isReady()) {
ASSERT(req.id.isValid());
self->auditTasks[req.getType()] = std::make_pair(req.id, ActorCollection(true));
}
// Start the new audit task
if (req.getType() == AuditType::ValidateHA) {
self->auditTasks[req.getType()].second.add(auditStorageQ(self, req));
} else if (req.getType() == AuditType::ValidateReplica) {
self->auditTasks[req.getType()].second.add(auditStorageQ(self, req));
} else if (req.getType() == AuditType::ValidateLocationMetadata) {
self->actors.add(auditStorageLocationMetadataQ(self, req));
self->auditTasks[req.getType()].second.add(auditStorageLocationMetadataQ(self, req));
} else if (req.getType() == AuditType::ValidateStorageServerShard) {
self->auditTasks[req.getType()].second.add(auditStorageStorageServerShardQ(self, req));
} else {
self->actors.add(auditStorageQ(self, req));
req.reply.sendError(not_implemented());
}
}
when(wait(updateProcessStatsTimer)) {
@ -13886,6 +13947,8 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
self.ssLock->halt();
self.moveInShards.clear();
state Error err = e;
if (storageServerTerminated(self, persistentData, err)) {
ssCore.cancel();

View File

@ -101,7 +101,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
newDataMoveId(deterministicRandom()->randomUInt64(),
AssignEmptyRange::False,
EnablePhysicalShardMove::True),
// EnablePhysicalShardMove::False),
KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr),
teamSize,
includes,

View File

@ -207,6 +207,10 @@ struct ValidateStorage : TestWorkload {
.detail("AuditIDA", auditIdA)
.detail("AuditIDB", auditIdB);
}
std::vector<AuditStorageState> res = wait(getAuditStates(cx, type, /*newFirst=*/true, 1));
if (res.size() != 1) {
TraceEvent(SevError, "TestGetAuditStatesError").detail("ActualResSize", res.size());
}
return Void();
}