Add delay to physical shard clean up. (#7989)

This commit is contained in:
Yao Xiao 2022-08-29 11:30:50 -07:00 committed by GitHub
parent 7a056997c7
commit 09f62acd14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 27 deletions

View File

@ -385,9 +385,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_PREFIX_LEN, 0 );
init( ROCKSDB_BLOCK_CACHE_SIZE, 0 );
init( ROCKSDB_METRICS_DELAY, 60.0 );
init( ROCKSDB_READ_VALUE_TIMEOUT, 5.0 );
init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, 5.0 );
init( ROCKSDB_READ_RANGE_TIMEOUT, 5.0 );
init( ROCKSDB_READ_VALUE_TIMEOUT, isSimulated ? 5.0 : 200.0 );
init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, isSimulated ? 5.0 : 200.0 );
init( ROCKSDB_READ_RANGE_TIMEOUT, isSimulated ? 5.0 : 200.0 );
init( ROCKSDB_READ_QUEUE_WAIT, 1.0 );
init( ROCKSDB_READ_QUEUE_HARD_MAX, 1000 );
init( ROCKSDB_READ_QUEUE_SOFT_MAX, 500 );
@ -421,6 +421,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_MAX_TOTAL_WAL_SIZE, 0 ); // RocksDB default.
init( ROCKSDB_MAX_BACKGROUND_JOBS, 2 ); // RocksDB default.
init( ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD, 21600 ); // 6h, RocksDB default.
init( ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY, isSimulated ? 10.0 : 300.0 ); // Delays shard clean up, must be larger than ROCKSDB_READ_VALUE_TIMEOUT to prevent reading deleted shard.
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -343,6 +343,7 @@ public:
int64_t ROCKSDB_MAX_TOTAL_WAL_SIZE;
int64_t ROCKSDB_MAX_BACKGROUND_JOBS;
int64_t ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD;
double ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -73,6 +73,7 @@ namespace {
struct PhysicalShard;
struct DataShard;
struct ReadIterator;
struct ShardedRocksDBKeyValueStore;
using rocksdb::BackgroundErrorReason;
@ -492,6 +493,7 @@ struct PhysicalShard {
std::shared_ptr<ReadIteratorPool> readIterPool;
bool deletePending = false;
std::atomic<bool> isInitialized;
double deleteTimeSec;
};
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) {
@ -632,6 +634,10 @@ public:
.detail("PhysicalShard", handle->GetName());
}
std::set<std::string> unusedShards(columnFamilies.begin(), columnFamilies.end());
unusedShards.erase("kvs-metadata");
unusedShards.erase("default");
KeyRange keyRange = prefixRange(shardMappingPrefix);
while (true) {
RangeResult metadata;
@ -669,6 +675,7 @@ public:
dataShardMap.insert(range, dataShard.get());
it->second->dataShards[range.begin.toString()] = std::move(dataShard);
activePhysicalShardIds.emplace(name);
unusedShards.erase(name);
}
if (metadata.back().key.removePrefix(shardMappingPrefix) == specialKeys.end) {
@ -684,7 +691,20 @@ public:
keyRange = KeyRangeRef(metadata.back().key, keyRange.end);
}
}
// TODO: remove unused column families.
for (const auto& name : unusedShards) {
TraceEvent(SevDebug, "UnusedShardName", logId).detail("Name", name);
auto it = physicalShards.find(name);
ASSERT(it != physicalShards.end());
auto shard = it->second;
if (shard->dataShards.size() == 0) {
shard->deleteTimeSec = now();
pendingDeletionShards.push_back(name);
}
}
if (unusedShards.size() > 0) {
TraceEvent("ShardedRocksDB", logId).detail("CleanUpUnusedShards", unusedShards.size());
}
} else {
// DB is opened with default shard.
ASSERT(handles.size() == 1);
@ -816,6 +836,8 @@ public:
if (existingShard->dataShards.size() == 0) {
TraceEvent(SevDebug, "ShardedRocksDB").detail("EmptyShardId", existingShard->id);
shardIds.push_back(existingShard->id);
existingShard->deleteTimeSec = now();
pendingDeletionShards.push_back(existingShard->id);
activePhysicalShardIds.erase(existingShard->id);
}
continue;
@ -856,13 +878,22 @@ public:
return shardIds;
}
std::vector<std::shared_ptr<PhysicalShard>> cleanUpShards(const std::vector<std::string>& shardIds) {
std::vector<std::shared_ptr<PhysicalShard>> getPendingDeletionShards(double cleanUpDelay) {
std::vector<std::shared_ptr<PhysicalShard>> emptyShards;
for (const auto& id : shardIds) {
double currentTime = now();
while (!pendingDeletionShards.empty()) {
const auto& id = pendingDeletionShards.front();
auto it = physicalShards.find(id);
if (it != physicalShards.end() && it->second->dataShards.size() == 0) {
if (it == physicalShards.end() || it->second->dataShards.size() != 0) {
pendingDeletionShards.pop_front();
continue;
}
if (currentTime - it->second->deleteTimeSec > cleanUpDelay) {
pendingDeletionShards.pop_front();
emptyShards.push_back(it->second);
physicalShards.erase(it);
physicalShards.erase(id);
} else {
break;
}
}
return emptyShards;
@ -903,6 +934,7 @@ public:
for (auto it = rangeIterator.begin(); it != rangeIterator.end(); ++it) {
if (it.value() == nullptr) {
TraceEvent(SevDebug, "ShardedRocksDB").detail("ClearNonExistentRange", it.range());
continue;
}
writeBatch->DeleteRange(it.value()->physicalShard->cf, toSlice(range.begin), toSlice(range.end));
@ -1057,6 +1089,7 @@ private:
std::unique_ptr<std::set<PhysicalShard*>> dirtyShards;
KeyRangeMap<DataShard*> dataShardMap;
std::shared_ptr<PhysicalShard> metadataShard = nullptr;
std::deque<std::string> pendingDeletionShards;
};
class RocksDBMetrics {
@ -1882,17 +1915,9 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
explicit Reader(UID logId, int threadIndex, std::shared_ptr<RocksDBMetrics> rocksDBMetrics)
: logId(logId), threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics), sampleStartTime(now()) {
if (g_network->isSimulated()) {
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have
// very high load and single read thread cannot process all the load within the timeouts.
readValueTimeout = 5 * 60;
readValuePrefixTimeout = 5 * 60;
readRangeTimeout = 5 * 60;
} else {
readValueTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_TIMEOUT;
readValuePrefixTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
readRangeTimeout = SERVER_KNOBS->ROCKSDB_READ_RANGE_TIMEOUT;
}
readValueTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_TIMEOUT;
readValuePrefixTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
readRangeTimeout = SERVER_KNOBS->ROCKSDB_READ_RANGE_TIMEOUT;
}
void init() override {}
@ -2202,6 +2227,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
// The metrics future retains a reference to the DB, so stop it before we delete it.
self->metrics.reset();
self->refreshHolder.cancel();
self->cleanUpJob.cancel();
wait(self->readThreads->stop());
auto a = new Writer::CloseAction(&self->shardManager, deleteOnClose);
@ -2237,6 +2263,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
this->metrics = ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) &&
rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager);
this->refreshHolder = refreshReadIteratorPools(this->rState, openFuture, shardManager.getAllShards());
this->cleanUpJob = emptyShardCleaner(this->rState, openFuture, &shardManager, writeThread);
writeThread->post(a.release());
return openFuture;
}
@ -2399,6 +2426,34 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
ACTOR static Future<Void> emptyShardCleaner(std::shared_ptr<ShardedRocksDBState> rState,
Future<Void> openFuture,
ShardManager* shardManager,
Reference<IThreadPool> writeThread) {
state double cleanUpDelay = SERVER_KNOBS->ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY;
state double cleanUpPeriod = cleanUpDelay * 2;
try {
wait(openFuture);
loop {
wait(delay(cleanUpPeriod));
if (rState->closing) {
break;
}
auto shards = shardManager->getPendingDeletionShards(cleanUpDelay);
auto a = new Writer::RemoveShardAction(shards);
Future<Void> f = a->done.getFuture();
writeThread->post(a);
TraceEvent(SevInfo, "ShardedRocksDB").detail("DeleteEmptyShards", shards.size());
wait(f);
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent(SevError, "DeleteEmptyShardsError").errorUnsuppressed(e);
}
}
return Void();
}
StorageBytes getStorageBytes() const override {
uint64_t live = 0;
ASSERT(shardManager.getDb()->GetAggregatedIntProperty(rocksdb::DB::Properties::kLiveSstFilesSize, &live));
@ -2415,14 +2470,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return shardManager.persistRangeMapping(range, isAdd);
}
Future<Void> cleanUpShardsIfNeeded(const std::vector<std::string>& shardIds) override {
auto shards = shardManager.cleanUpShards(shardIds);
auto a = new Writer::RemoveShardAction(shards);
Future<Void> res = a->done.getFuture();
writeThread->post(a);
return res;
}
// Used for debugging shard mapping issue.
std::vector<std::pair<KeyRange, std::string>> getDataMapping() { return shardManager.getDataMapping(); }
@ -2445,6 +2492,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
int numFetchWaiters;
Counters counters;
Future<Void> refreshHolder;
Future<Void> cleanUpJob;
};
} // namespace