Destroy CF handle before closing physical shard. (#8394)

This commit is contained in:
Yao Xiao 2022-10-21 17:18:36 -07:00 committed by GitHub
parent a1eb1d4a48
commit f1875268b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 53 additions and 26 deletions

View File

@ -49,6 +49,7 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 27) ? ROCKSDB_PATCH >= 3 :
"Unsupported rocksdb version. Update the rocksdb to 6.27.3 version");
const std::string rocksDataFolderSuffix = "-data";
const std::string METADATA_SHARD_ID = "kvs-metadata";
const KeyRef shardMappingPrefix("\xff\xff/ShardMapping/"_sr);
// TODO: move constants to a header file.
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = "RocksDBStorage"_sr;
@ -304,13 +305,12 @@ rocksdb::ReadOptions getReadOptions() {
}
struct ReadIterator {
rocksdb::ColumnFamilyHandle* cf;
uint64_t index; // incrementing counter to uniquely identify read iterator.
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
double creationTime;
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, rocksdb::ReadOptions& options)
: cf(cf), index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
};
/*
@ -475,16 +475,29 @@ struct PhysicalShard {
}
~PhysicalShard() {
if (!deletePending)
return;
logShardEvent(id, ShardOp::CLOSE);
isInitialized.store(false);
readIterPool.reset();
// Destroy CF
// Deleting default column family is not allowed.
if (id == "default") {
return;
}
if (deletePending) {
auto s = db->DropColumnFamily(cf);
if (!s.ok()) {
logRocksDBError(s, "DestroyShard");
logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString());
return;
}
}
auto s = db->DestroyColumnFamilyHandle(cf);
if (!s.ok()) {
logRocksDBError(s, "DestroyCFHandle");
logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString());
return;
}
logShardEvent(id, ShardOp::DESTROY);
}
@ -628,7 +641,7 @@ public:
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
bool foundMetadata = false;
for (const auto& name : columnFamilies) {
if (name == "kvs-metadata") {
if (name == METADATA_SHARD_ID) {
foundMetadata = true;
}
descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, cfOptions });
@ -652,19 +665,19 @@ public:
TraceEvent(SevInfo, "ShardedRocksInitLoadPhysicalShards", this->logId)
.detail("PhysicalShardCount", handles.size());
std::shared_ptr<PhysicalShard> metadataShard = nullptr;
for (auto handle : handles) {
if (handle->GetName() == "kvs-metadata") {
metadataShard = std::make_shared<PhysicalShard>(db, "kvs-metadata", handle);
} else {
physicalShards[handle->GetName()] = std::make_shared<PhysicalShard>(db, handle->GetName(), handle);
auto shard = std::make_shared<PhysicalShard>(db, handle->GetName(), handle);
if (shard->id == METADATA_SHARD_ID) {
metadataShard = shard;
}
physicalShards[shard->id] = shard;
columnFamilyMap[handle->GetID()] = handle;
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId)
.detail("PhysicalShard", handle->GetName());
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId).detail("PhysicalShard", shard->id);
}
std::set<std::string> unusedShards(columnFamilies.begin(), columnFamilies.end());
unusedShards.erase("kvs-metadata");
unusedShards.erase(METADATA_SHARD_ID);
unusedShards.erase("default");
KeyRange keyRange = prefixRange(shardMappingPrefix);
@ -746,9 +759,11 @@ public:
defaultShard->dataShards[specialKeys.begin.toString()] = std::move(dataShard);
physicalShards[defaultShard->id] = defaultShard;
metadataShard = std::make_shared<PhysicalShard>(db, "kvs-metadata");
// Create metadata shard.
auto metadataShard = std::make_shared<PhysicalShard>(db, METADATA_SHARD_ID);
metadataShard->init();
columnFamilyMap[metadataShard->cf->GetID()] = metadataShard->cf;
physicalShards[METADATA_SHARD_ID] = metadataShard;
// Write special key range metadata.
writeBatch = std::make_unique<rocksdb::WriteBatch>();
@ -763,7 +778,6 @@ public:
TraceEvent(SevInfo, "ShardedRocksInitializeMetaDataShard", this->logId)
.detail("MetadataShardCF", metadataShard->cf->GetID());
}
physicalShards["kvs-metadata"] = metadataShard;
writeBatch = std::make_unique<rocksdb::WriteBatch>();
dirtyShards = std::make_unique<std::set<PhysicalShard*>>();
@ -910,6 +924,9 @@ public:
std::vector<std::shared_ptr<PhysicalShard>> getPendingDeletionShards(double cleanUpDelay) {
std::vector<std::shared_ptr<PhysicalShard>> emptyShards;
double currentTime = now();
TraceEvent(SevInfo, "ShardedRocksDB", logId)
.detail("PendingDeletionShardQueueSize", pendingDeletionShards.size());
while (!pendingDeletionShards.empty()) {
const auto& id = pendingDeletionShards.front();
auto it = physicalShards.find(id);
@ -976,6 +993,10 @@ public:
.detail("Info", "RangeToPersist")
.detail("BeginKey", range.begin)
.detail("EndKey", range.end);
auto it = physicalShards.find(METADATA_SHARD_ID);
ASSERT(it != physicalShards.end());
auto metadataShard = it->second;
writeBatch->DeleteRange(metadataShard->cf,
getShardMappingKey(range.begin, shardMappingPrefix),
getShardMappingKey(range.end, shardMappingPrefix));
@ -1043,24 +1064,30 @@ public:
}
void closeAllShards() {
for (auto& [_, shard] : physicalShards) {
shard->readIterPool.reset();
}
columnFamilyMap.clear();
physicalShards.clear();
// Close DB.
auto s = db->Close();
if (!s.ok()) {
logRocksDBError(s, "Close");
return;
}
TraceEvent("ShardedRocksDB", this->logId).detail("Info", "DBClosed");
}
void destroyAllShards() {
closeAllShards();
std::vector<rocksdb::ColumnFamilyDescriptor> cfs;
for (const auto& [key, _] : physicalShards) {
cfs.push_back(rocksdb::ColumnFamilyDescriptor{ key, getCFOptions() });
columnFamilyMap.clear();
for (auto& [_, shard] : physicalShards) {
shard->deletePending = true;
}
auto s = rocksdb::DestroyDB(path, getOptions(), cfs);
physicalShards.clear();
// Close DB.
auto s = db->Close();
if (!s.ok()) {
logRocksDBError(s, "Close");
return;
}
s = rocksdb::DestroyDB(path, getOptions());
if (!s.ok()) {
logRocksDBError(s, "DestroyDB");
}
@ -1121,7 +1148,6 @@ private:
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
std::unique_ptr<std::set<PhysicalShard*>> dirtyShards;
KeyRangeMap<DataShard*> dataShardMap;
std::shared_ptr<PhysicalShard> metadataShard = nullptr;
std::deque<std::string> pendingDeletionShards;
};
@ -2240,6 +2266,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
// TODO: Adapt the simulation framework to not advance time quickly when background reads/writes are
// occurring.
if (g_network->isSimulated()) {
TraceEvent(SevDebug, "ShardedRocksDB").detail("Info", "Use Coro threads in simulation.");
writeThread = CoroThreadPool::createThreadPool();
readThreads = CoroThreadPool::createThreadPool();
} else {