commit
adcedb9301
|
@ -82,17 +82,30 @@ ShardSizeBounds ShardSizeBounds::shardSizeBoundsBeforeTrack() {
|
|||
.opsReadPerKSecond = StorageMetrics::infinity } };
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
std::set<int> const& normalDDQueueErrors() {
|
||||
static std::set<int> s{ error_code_movekeys_conflict,
|
||||
error_code_broken_promise,
|
||||
error_code_data_move_cancelled,
|
||||
error_code_data_move_dest_team_not_found };
|
||||
return s;
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
enum class DDAuditContext : uint8_t {
|
||||
Invalid = 0,
|
||||
Resume = 1,
|
||||
Launch = 2,
|
||||
Retry = 3,
|
||||
INVALID = 0,
|
||||
RESUME = 1,
|
||||
LAUNCH = 2,
|
||||
RETRY = 3,
|
||||
};
|
||||
|
||||
struct DDAudit {
|
||||
DDAudit(AuditStorageState coreState)
|
||||
: coreState(coreState), actors(true), foundError(false), auditStorageAnyChildFailed(false), retryCount(0),
|
||||
cancelled(false), overallCompleteDoAuditCount(0), overallIssuedDoAuditCount(0), overallSkippedDoAuditCount(0),
|
||||
remainingBudgetForAuditTasks(SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX), context(0) {}
|
||||
cancelled(false), overallCompleteDoAuditCount(0), overallIssuedDoAuditCount(0),
|
||||
remainingBudgetForAuditTasks(SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX), context(DDAuditContext::INVALID) {}
|
||||
|
||||
AuditStorageState coreState;
|
||||
ActorCollection actors;
|
||||
|
@ -105,14 +118,14 @@ struct DDAudit {
|
|||
int64_t overallCompleteDoAuditCount;
|
||||
int64_t overallSkippedDoAuditCount;
|
||||
AsyncVar<int> remainingBudgetForAuditTasks;
|
||||
uint8_t context;
|
||||
DDAuditContext context;
|
||||
std::unordered_map<UID, bool> serverProgressFinishMap; // dedicated to ssshard
|
||||
|
||||
inline void setAuditRunActor(Future<Void> actor) { auditActor = actor; }
|
||||
inline Future<Void> getAuditRunActor() { return auditActor; }
|
||||
|
||||
inline void setDDAuditContext(DDAuditContext context) { this->context = static_cast<uint8_t>(context); }
|
||||
inline DDAuditContext getDDAuditContext() const { return static_cast<DDAuditContext>(this->context); }
|
||||
inline void setDDAuditContext(DDAuditContext context_) { this->context = context_; }
|
||||
inline DDAuditContext getDDAuditContext() const { return context; }
|
||||
|
||||
// auditActor and actors are guaranteed to deliver a cancel signal
|
||||
void cancel() {
|
||||
|
@ -308,17 +321,6 @@ ACTOR Future<Void> debugCheckCoalescing(Database cx) {
|
|||
}
|
||||
}
|
||||
|
||||
static std::set<int> const& normalDDQueueErrors() {
|
||||
static std::set<int> s;
|
||||
if (s.empty()) {
|
||||
s.insert(error_code_movekeys_conflict);
|
||||
s.insert(error_code_broken_promise);
|
||||
s.insert(error_code_data_move_cancelled);
|
||||
s.insert(error_code_data_move_dest_team_not_found);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
struct DataDistributor;
|
||||
void runAuditStorage(Reference<DataDistributor> self,
|
||||
AuditStorageState auditStates,
|
||||
|
@ -455,7 +457,7 @@ public:
|
|||
// instance in DD audits map
|
||||
continue;
|
||||
}
|
||||
runAuditStorage(self, auditState, 0, DDAuditContext::Resume);
|
||||
runAuditStorage(self, auditState, 0, DDAuditContext::RESUME);
|
||||
TraceEvent(SevInfo, "AuditStorageResumed", self->ddId)
|
||||
.detail("AuditID", auditState.id)
|
||||
.detail("AuditType", auditState.getType())
|
||||
|
@ -522,14 +524,20 @@ public:
|
|||
// AuditStorage does not rely on DatabaseConfiguration
|
||||
// AuditStorage read neccessary info purely from system key space
|
||||
if (!self->auditStorageInitStarted) {
|
||||
// Avoid multiple initAuditStorages
|
||||
self->addActor.send(self->initAuditStorage(self));
|
||||
// AuditStorage currently does not support DDMockTxnProcessor
|
||||
if (!self->txnProcessor->isMocked()) {
|
||||
// Avoid multiple initAuditStorages
|
||||
self->addActor.send(self->initAuditStorage(self));
|
||||
}
|
||||
}
|
||||
// It is possible that an audit request arrives and then DDMode
|
||||
// is set to 2 at this point
|
||||
// No polling MoveKeyLock is running
|
||||
// So, we need to check MoveKeyLock when waitUntilDataDistributorExitSecurityMode
|
||||
wait(waitUntilDataDistributorExitSecurityMode(self)); // Trap DDMode == 2
|
||||
if (!self->txnProcessor->isMocked()) {
|
||||
// AuditStorage currently does not suport DDMockTxnProcessor
|
||||
wait(waitUntilDataDistributorExitSecurityMode(self)); // Trap DDMode == 2
|
||||
}
|
||||
// It is possible DDMode begins with 2 and passes
|
||||
// waitDataDistributorEnabled and then set to 0 before
|
||||
// waitUntilDataDistributorExitSecurityMode. For this case,
|
||||
|
@ -1991,7 +1999,7 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
|
|||
// Erase the old audit from map and spawn a new audit inherit from the old audit
|
||||
removeAuditFromAuditMap(self, audit->coreState.getType(),
|
||||
audit->coreState.id); // remove audit
|
||||
runAuditStorage(self, audit->coreState, audit->retryCount, DDAuditContext::Retry);
|
||||
runAuditStorage(self, audit->coreState, audit->retryCount, DDAuditContext::RETRY);
|
||||
} else {
|
||||
try {
|
||||
audit->coreState.setPhase(AuditPhase::Failed);
|
||||
|
@ -2164,7 +2172,7 @@ ACTOR Future<UID> launchAudit(Reference<DataDistributor> self,
|
|||
// At this point, the new audit is already in the audit map
|
||||
return auditID;
|
||||
}
|
||||
runAuditStorage(self, auditState, 0, DDAuditContext::Launch);
|
||||
runAuditStorage(self, auditState, 0, DDAuditContext::LAUNCH);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
|
|
|
@ -1828,6 +1828,13 @@ SimulationStorageEngine chooseSimulationStorageEngine(const TestConfig& testConf
|
|||
if (testConfig.storageEngineType.present()) {
|
||||
reason = "ConfigureSpecified"_sr;
|
||||
result = testConfig.storageEngineType.get();
|
||||
if (testConfig.excludedStorageEngineType(result) ||
|
||||
std::find(std::begin(SIMULATION_STORAGE_ENGINE), std::end(SIMULATION_STORAGE_ENGINE), result) ==
|
||||
std::end(SIMULATION_STORAGE_ENGINE)) {
|
||||
|
||||
TraceEvent(SevError, "StorageEngineNotSupported").detail("StorageEngineType", result);
|
||||
ASSERT(false);
|
||||
}
|
||||
} else {
|
||||
constexpr auto NUM_RETRIES = 1000;
|
||||
for (auto _ = 0; _ < NUM_RETRIES; ++_) {
|
||||
|
|
|
@ -42,6 +42,15 @@ ACTOR Future<bool> IssueConfigurationChange(Database cx, std::string config, boo
|
|||
wait(delay(5.0)); // wait for read window
|
||||
return true;
|
||||
}
|
||||
|
||||
constexpr bool hasRocksDB =
|
||||
#ifdef WITH_ROCKSDB
|
||||
true
|
||||
#else
|
||||
false
|
||||
#endif
|
||||
;
|
||||
|
||||
} // namespace
|
||||
|
||||
struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
|
||||
|
@ -65,6 +74,10 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
|
|||
Future<bool> check(Database const& cx) override { return true; };
|
||||
|
||||
ACTOR static Future<Void> _start(PerpetualWiggleStorageMigrationWorkload* self, Database cx) {
|
||||
if (!hasRocksDB) {
|
||||
// RocksDB, which is required by this test, is not supported
|
||||
return Void();
|
||||
}
|
||||
state std::vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
|
||||
// The test should have enough storage servers to exclude.
|
||||
ASSERT(storageServers.size() > 3);
|
||||
|
@ -227,4 +240,4 @@ struct PerpetualWiggleStorageMigrationWorkload : public TestWorkload {
|
|||
void getMetrics(std::vector<PerfMetric>& m) override { return; }
|
||||
};
|
||||
|
||||
WorkloadFactory<PerpetualWiggleStorageMigrationWorkload> PerpetualWiggleStorageMigrationWorkload;
|
||||
WorkloadFactory<PerpetualWiggleStorageMigrationWorkload> PerpetualWiggleStorageMigrationWorkload;
|
||||
|
|
Loading…
Reference in New Issue