Merge pull request #7777 from sfc-gh-xwang/feature/main/eligible-wiggle
Make storage wiggler support SS_MIN_AGE
This commit is contained in:
commit
8ecee1992b
|
@ -292,6 +292,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY, 120 ); if( randomize && BUGGIFY ) DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY = 5;
|
||||
init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 10 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 1000;
|
||||
init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 20 );
|
||||
init( DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC, isSimulated ? 2 : 21 * 60 * 60 * 24 ); if(randomize && BUGGIFY) DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC = isSimulated ? 0: 120;
|
||||
init( DD_TENANT_AWARENESS_ENABLED, false );
|
||||
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2.0 );
|
||||
|
||||
|
|
|
@ -234,6 +234,8 @@ public:
|
|||
int DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY;
|
||||
int DD_STORAGE_WIGGLE_PAUSE_THRESHOLD; // How many unhealthy relocations are ongoing will pause storage wiggle
|
||||
int DD_STORAGE_WIGGLE_STUCK_THRESHOLD; // How many times bestTeamStuck accumulate will pause storage wiggle
|
||||
int64_t
|
||||
DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC; // Minimal age of a correct-configured server before it's chosen to be wiggled
|
||||
bool DD_TENANT_AWARENESS_ENABLED;
|
||||
int TENANT_CACHE_LIST_REFRESH_INTERVAL; // How often the TenantCache is refreshed
|
||||
|
||||
|
|
|
@ -2830,56 +2830,37 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<UID> getNextWigglingServerID(DDTeamCollection* teamCollection) {
|
||||
state Optional<Value> localityKey;
|
||||
state Optional<Value> localityValue;
|
||||
|
||||
// NOTE: because normal \xff/conf change through `changeConfig` now will cause DD throw `movekeys_conflict()`
|
||||
// then recruit a new DD, we only need to read current configuration once
|
||||
if (teamCollection->configuration.perpetualStorageWiggleLocality != "0") {
|
||||
// parsing format is like "datahall:0"
|
||||
std::string& localityKeyValue = teamCollection->configuration.perpetualStorageWiggleLocality;
|
||||
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValue));
|
||||
// get key and value from perpetual_storage_wiggle_locality.
|
||||
int split = localityKeyValue.find(':');
|
||||
localityKey = Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str(), split));
|
||||
localityValue = Optional<Value>(
|
||||
ValueRef((uint8_t*)localityKeyValue.c_str() + split + 1, localityKeyValue.size() - split - 1));
|
||||
}
|
||||
|
||||
ACTOR static Future<UID> getNextWigglingServerID(Reference<StorageWiggler> wiggler,
|
||||
Optional<Value> localityKey = Optional<Value>(),
|
||||
Optional<Value> localityValue = Optional<Value>(),
|
||||
DDTeamCollection* teamCollection = nullptr) {
|
||||
loop {
|
||||
// wait until the wiggle queue is not empty
|
||||
if (teamCollection->storageWiggler->empty()) {
|
||||
wait(teamCollection->storageWiggler->nonEmpty.onChange());
|
||||
state Optional<UID> id = wiggler->getNextServerId();
|
||||
if (!id.present()) {
|
||||
wait(wiggler->onCheck());
|
||||
continue;
|
||||
}
|
||||
|
||||
// if perpetual_storage_wiggle_locality has value and not 0(disabled).
|
||||
if (localityKey.present()) {
|
||||
// Whether the selected server matches the locality
|
||||
auto id = teamCollection->storageWiggler->getNextServerId();
|
||||
if (!id.present())
|
||||
continue;
|
||||
auto server = teamCollection->server_info.at(id.get());
|
||||
|
||||
// TraceEvent("PerpetualLocality").detail("Server", server->getLastKnownInterface().locality.get(localityKey)).detail("Desire", localityValue);
|
||||
if (server->getLastKnownInterface().locality.get(localityKey.get()) == localityValue) {
|
||||
return id.get();
|
||||
} else {
|
||||
if (teamCollection->storageWiggler->empty()) {
|
||||
// None of the entries in wiggle queue matches the given locality.
|
||||
TraceEvent("PerpetualStorageWiggleEmptyQueue", teamCollection->distributorId)
|
||||
.detail("WriteValue", "No process matched the given perpetualStorageWiggleLocality")
|
||||
.detail("PerpetualStorageWiggleLocality",
|
||||
teamCollection->configuration.perpetualStorageWiggleLocality);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
auto id = teamCollection->storageWiggler->getNextServerId();
|
||||
if (!id.present())
|
||||
continue;
|
||||
return id.get();
|
||||
|
||||
if (wiggler->empty()) {
|
||||
// None of the entries in wiggle queue matches the given locality.
|
||||
TraceEvent("PerpetualStorageWiggleEmptyQueue", teamCollection->distributorId)
|
||||
.detail("WriteValue", "No process matched the given perpetualStorageWiggleLocality")
|
||||
.detail("PerpetualStorageWiggleLocality",
|
||||
teamCollection->configuration.perpetualStorageWiggleLocality);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
return id.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3538,7 +3519,23 @@ Future<UID> DDTeamCollection::getClusterId() {
|
|||
}
|
||||
|
||||
Future<UID> DDTeamCollection::getNextWigglingServerID() {
|
||||
return DDTeamCollectionImpl::getNextWigglingServerID(this);
|
||||
Optional<Value> localityKey;
|
||||
Optional<Value> localityValue;
|
||||
|
||||
// NOTE: because normal \xff/conf change through `changeConfig` now will cause DD throw `movekeys_conflict()`
|
||||
// then recruit a new DD, we only need to read current configuration once
|
||||
if (configuration.perpetualStorageWiggleLocality != "0") {
|
||||
// parsing format is like "datahall:0"
|
||||
std::string& localityKeyValue = configuration.perpetualStorageWiggleLocality;
|
||||
ASSERT(isValidPerpetualStorageWiggleLocality(localityKeyValue));
|
||||
// get key and value from perpetual_storage_wiggle_locality.
|
||||
int split = localityKeyValue.find(':');
|
||||
localityKey = Optional<Value>(ValueRef((uint8_t*)localityKeyValue.c_str(), split));
|
||||
localityValue = Optional<Value>(
|
||||
ValueRef((uint8_t*)localityKeyValue.c_str() + split + 1, localityKeyValue.size() - split - 1));
|
||||
}
|
||||
|
||||
return DDTeamCollectionImpl::getNextWigglingServerID(storageWiggler, localityKey, localityValue, this);
|
||||
}
|
||||
|
||||
Future<Void> DDTeamCollection::readStorageWiggleMap() {
|
||||
|
@ -5872,3 +5869,50 @@ TEST_CASE("/DataDistribution/GetTeam/DeprioritizeWigglePausedTeam") {
|
|||
wait(DDTeamCollectionUnitTest::GetTeam_DeprioritizeWigglePausedTeam());
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/StorageWiggler/NextIdWithMinAge") {
|
||||
state StorageWiggler wiggler(nullptr);
|
||||
state double startTime = now();
|
||||
wiggler.addServer(UID(1, 0),
|
||||
StorageMetadataType(startTime - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 5.0,
|
||||
KeyValueStoreType::SSD_BTREE_V2));
|
||||
wiggler.addServer(UID(2, 0),
|
||||
StorageMetadataType(
|
||||
startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC, KeyValueStoreType::MEMORY, true));
|
||||
wiggler.addServer(UID(3, 0), StorageMetadataType(startTime - 5.0, KeyValueStoreType::SSD_ROCKSDB_V1, true));
|
||||
wiggler.addServer(UID(4, 0),
|
||||
StorageMetadataType(startTime - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC - 1.0,
|
||||
KeyValueStoreType::SSD_BTREE_V2));
|
||||
std::vector<Optional<UID>> correctResult{ UID(3, 0), UID(2, 0), UID(4, 0), Optional<UID>() };
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
auto id = wiggler.getNextServerId();
|
||||
ASSERT(id == correctResult[i]);
|
||||
}
|
||||
std::cout << "Finish Initial Check. Start test getNextWigglingServerID() loop...\n";
|
||||
// test the getNextWigglingServerID() loop
|
||||
state UID id = wait(DDTeamCollectionImpl::getNextWigglingServerID(Reference<StorageWiggler>::addRef(&wiggler)));
|
||||
ASSERT(id == UID(1, 0));
|
||||
|
||||
std::cout << "Test after addServer() ...\n";
|
||||
state Future<UID> nextFuture =
|
||||
DDTeamCollectionImpl::getNextWigglingServerID(Reference<StorageWiggler>::addRef(&wiggler));
|
||||
startTime = now();
|
||||
StorageMetadataType metadata(startTime + SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC + 100.0,
|
||||
KeyValueStoreType::SSD_BTREE_V2);
|
||||
wiggler.addServer(UID(5, 0), metadata);
|
||||
ASSERT(!nextFuture.isReady() || now() - metadata.createdTime >= SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC);
|
||||
|
||||
std::cout << "Test after updateServer() ...\n";
|
||||
StorageWiggler* ptr = &wiggler;
|
||||
wait(trigger(
|
||||
[ptr]() {
|
||||
ptr->updateMetadata(UID(5, 0),
|
||||
StorageMetadataType(now() - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC,
|
||||
KeyValueStoreType::SSD_BTREE_V2));
|
||||
},
|
||||
delay(5.0)));
|
||||
wait(store(id, nextFuture));
|
||||
|
||||
ASSERT(id == UID(5, 0));
|
||||
return Void();
|
||||
}
|
|
@ -103,13 +103,16 @@ void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int pr
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> StorageWiggler::onCheck() const {
|
||||
return delay(MIN_ON_CHECK_DELAY_SEC) || pqCanCheck.onTrigger();
|
||||
}
|
||||
|
||||
// add server to wiggling queue
|
||||
void StorageWiggler::addServer(const UID& serverId, const StorageMetadataType& metadata) {
|
||||
// std::cout << "size: " << pq_handles.size() << " add " << serverId.toString() << " DC: "
|
||||
// << teamCollection->isPrimary() << std::endl;
|
||||
ASSERT(!pq_handles.count(serverId));
|
||||
pq_handles[serverId] = wiggle_pq.emplace(metadata, serverId);
|
||||
nonEmpty.set(true);
|
||||
}
|
||||
|
||||
void StorageWiggler::removeServer(const UID& serverId) {
|
||||
|
@ -120,7 +123,6 @@ void StorageWiggler::removeServer(const UID& serverId) {
|
|||
pq_handles.erase(serverId);
|
||||
wiggle_pq.erase(handle);
|
||||
}
|
||||
nonEmpty.set(!wiggle_pq.empty());
|
||||
}
|
||||
|
||||
void StorageWiggler::updateMetadata(const UID& serverId, const StorageMetadataType& metadata) {
|
||||
|
@ -131,11 +133,19 @@ void StorageWiggler::updateMetadata(const UID& serverId, const StorageMetadataTy
|
|||
return;
|
||||
}
|
||||
wiggle_pq.update(handle, std::make_pair(metadata, serverId));
|
||||
pqCanCheck.trigger();
|
||||
}
|
||||
|
||||
bool StorageWiggler::eligible(const UID& serverId, const StorageMetadataType& metadata) const {
|
||||
return metadata.wrongConfigured || (now() - metadata.createdTime > SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC);
|
||||
}
|
||||
|
||||
Optional<UID> StorageWiggler::getNextServerId() {
|
||||
if (!wiggle_pq.empty()) {
|
||||
auto [metadata, id] = wiggle_pq.top();
|
||||
if (!eligible(id, metadata)) {
|
||||
return {};
|
||||
}
|
||||
wiggle_pq.pop();
|
||||
pq_handles.erase(id);
|
||||
return Optional<UID>(id);
|
||||
|
|
|
@ -482,8 +482,14 @@ struct StorageWiggleMetrics {
|
|||
};
|
||||
|
||||
struct StorageWiggler : ReferenceCounted<StorageWiggler> {
|
||||
static constexpr double MIN_ON_CHECK_DELAY_SEC = 5.0;
|
||||
|
||||
private:
|
||||
mutable Debouncer pqCanCheck{ MIN_ON_CHECK_DELAY_SEC };
|
||||
|
||||
public:
|
||||
enum State : uint8_t { INVALID = 0, RUN = 1, PAUSE = 2 };
|
||||
AsyncVar<bool> nonEmpty;
|
||||
|
||||
DDTeamCollection const* teamCollection;
|
||||
StorageWiggleMetrics metrics;
|
||||
// data structures
|
||||
|
@ -496,7 +502,7 @@ struct StorageWiggler : ReferenceCounted<StorageWiggler> {
|
|||
State wiggleState = State::INVALID;
|
||||
double lastStateChangeTs = 0.0; // timestamp describes when did the state change
|
||||
|
||||
explicit StorageWiggler(DDTeamCollection* collection) : nonEmpty(false), teamCollection(collection){};
|
||||
explicit StorageWiggler(DDTeamCollection* collection) : teamCollection(collection){};
|
||||
// add server to wiggling queue
|
||||
void addServer(const UID& serverId, const StorageMetadataType& metadata);
|
||||
// remove server from wiggling queue
|
||||
|
@ -505,8 +511,14 @@ struct StorageWiggler : ReferenceCounted<StorageWiggler> {
|
|||
void updateMetadata(const UID& serverId, const StorageMetadataType& metadata);
|
||||
bool contains(const UID& serverId) const { return pq_handles.count(serverId) > 0; }
|
||||
bool empty() const { return wiggle_pq.empty(); }
|
||||
Optional<UID> getNextServerId();
|
||||
|
||||
// It's guarantee that When a.metadata >= b.metadata, if !eligible(a) then !eligible(b)
|
||||
bool eligible(const UID& serverId, const StorageMetadataType& metadata) const;
|
||||
|
||||
// try to return the next storage server that is eligible for wiggle
|
||||
Optional<UID> getNextServerId();
|
||||
// next check time to avoid busy loop
|
||||
Future<Void> onCheck() const;
|
||||
State getWiggleState() const { return wiggleState; }
|
||||
void setWiggleState(State s) {
|
||||
if (wiggleState != s) {
|
||||
|
|
Loading…
Reference in New Issue