Several minor improvements for ShardedRocksDB (#10520)
* Terminate DD if SHARD_ENCODE_LOCATION_METADATA is not enabled and storage_engine_type is ShardedRocksDB. * Fixed Error in non-main thread. * Minor improvements.
This commit is contained in:
parent
37689af3f2
commit
6337125712
|
@ -1066,7 +1066,7 @@ void DDQueue::launchQueuedWork(std::set<RelocateData, std::greater<RelocateData>
|
|||
rrs.dataMoveId = UID();
|
||||
} else {
|
||||
const bool enabled =
|
||||
deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(),
|
||||
AssignEmptyRange::False,
|
||||
EnablePhysicalShardMove(enabled));
|
||||
|
@ -1655,7 +1655,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
self->moveCreateNewPhysicalShard++;
|
||||
}
|
||||
const bool enabled =
|
||||
deterministicRandom()->random01() <= SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
|
||||
rd.dataMoveId = newDataMoveId(
|
||||
physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled));
|
||||
TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard")
|
||||
|
|
|
@ -2948,6 +2948,12 @@ public:
|
|||
state bool isTss = server->getLastKnownInterface().isTss();
|
||||
// Update server's storeType, especially when it was created
|
||||
wait(server->updateStoreType());
|
||||
if (server->getStoreType() == KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
|
||||
!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
TraceEvent(SevError, "PhysicalShardNotEnabledForShardedRocks", self->getDistributorId())
|
||||
.detail("StorageServer", server->getId());
|
||||
throw internal_error();
|
||||
}
|
||||
state StorageMetadataType data(
|
||||
StorageMetadataType::currentTime(),
|
||||
server->getStoreType(),
|
||||
|
|
|
@ -413,6 +413,13 @@ public:
|
|||
.setMaxFieldLength(-1)
|
||||
.detail("Conf", self->configuration.toString());
|
||||
|
||||
if (self->configuration.storageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
|
||||
!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
||||
TraceEvent(SevError, "PhysicalShardNotEnabledForShardedRocks", self->ddId)
|
||||
.detail("EnableServerKnob", "SHARD_ENCODE_LOCATION_METADATA");
|
||||
throw internal_error();
|
||||
}
|
||||
|
||||
wait(self->updateReplicaKeys());
|
||||
TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log();
|
||||
|
||||
|
|
|
@ -111,6 +111,14 @@ std::string getErrorReason(BackgroundErrorReason reason) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> forwardError(Future<int> input) {
|
||||
int errorCode = wait(input);
|
||||
if (errorCode == error_code_success) {
|
||||
return Never();
|
||||
}
|
||||
throw Error::fromCode(errorCode);
|
||||
}
|
||||
|
||||
// Background error handling is tested with Chaos test.
|
||||
// TODO: Test background error in simulation. RocksDB doesn't use flow IO in simulation, which limits our ability to
|
||||
// inject IO errors. We could implement rocksdb::FileSystem using flow IO to unblock simulation. Also, trace event is
|
||||
|
@ -135,14 +143,14 @@ public:
|
|||
// https://github.com/facebook/rocksdb/blob/2e09a54c4fb82e88bcaa3e7cfa8ccbbbbf3635d5/db/error_handler.cc#L138.
|
||||
// All background errors will be treated as storage engine failure. Send the error to storage server.
|
||||
if (bg_error->IsIOError()) {
|
||||
errorPromise.sendError(io_error());
|
||||
errorPromise.send(error_code_io_error);
|
||||
} else if (bg_error->IsCorruption()) {
|
||||
errorPromise.sendError(file_corrupt());
|
||||
errorPromise.send(error_code_file_corrupt);
|
||||
} else {
|
||||
errorPromise.sendError(unknown_error());
|
||||
errorPromise.send(error_code_unknown_error);
|
||||
}
|
||||
}
|
||||
Future<Void> getFuture() {
|
||||
Future<int> getFuture() {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
return errorPromise.getFuture();
|
||||
}
|
||||
|
@ -150,11 +158,11 @@ public:
|
|||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (!errorPromise.isValid())
|
||||
return;
|
||||
errorPromise.send(Never());
|
||||
errorPromise.send(error_code_success);
|
||||
}
|
||||
|
||||
private:
|
||||
ThreadReturnPromise<Void> errorPromise;
|
||||
ThreadReturnPromise<int> errorPromise;
|
||||
std::mutex mutex;
|
||||
};
|
||||
|
||||
|
@ -3206,7 +3214,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
|
||||
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
|
||||
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
|
||||
errorListener(std::make_shared<RocksDBErrorListener>()), errorFuture(errorListener->getFuture()),
|
||||
errorListener(std::make_shared<RocksDBErrorListener>()), errorFuture(forwardError(errorListener->getFuture())),
|
||||
dbOptions(getOptions()), shardManager(path, id, dbOptions, errorListener, &counters),
|
||||
rocksDBMetrics(std::make_shared<RocksDBMetrics>(id, dbOptions.statistics)) {
|
||||
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage
|
||||
|
|
|
@ -1655,7 +1655,7 @@ public:
|
|||
|
||||
newestAvailableVersion.insert(allKeys, invalidVersion);
|
||||
newestDirtyVersion.insert(allKeys, invalidVersion);
|
||||
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && storage->shardAware()) {
|
||||
if (storage->shardAware()) {
|
||||
addShard(ShardInfo::newShard(this, StorageServerShard::notAssigned(allKeys)));
|
||||
} else {
|
||||
addShard(ShardInfo::newNotAssigned(allKeys));
|
||||
|
@ -9605,10 +9605,10 @@ ACTOR Future<Void> fetchShard(StorageServer* data, MoveInShard* moveInShard) {
|
|||
} else if (phase == MoveInPhase::ApplyingUpdates) {
|
||||
wait(fetchShardApplyUpdates(data, moveInShard, moveInUpdates));
|
||||
} else if (phase == MoveInPhase::Complete) {
|
||||
wait(cleanUpMoveInShard(data, data->data().getLatestVersion(), moveInShard));
|
||||
data->actors.add(cleanUpMoveInShard(data, data->data().getLatestVersion(), moveInShard));
|
||||
break;
|
||||
} else if (phase == MoveInPhase::Error || phase == MoveInPhase::Cancel) {
|
||||
wait(cleanUpMoveInShard(data, data->data().getLatestVersion(), moveInShard));
|
||||
data->actors.add(cleanUpMoveInShard(data, data->data().getLatestVersion(), moveInShard));
|
||||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -12837,9 +12837,6 @@ ByteSampleInfo isKeyValueInSample(const KeyRef key, int64_t totalKvSize) {
|
|||
(double)info.size / (key.size() + SERVER_KNOBS->BYTE_SAMPLING_OVERHEAD) / SERVER_KNOBS->BYTE_SAMPLING_FACTOR;
|
||||
// MIN_BYTE_SAMPLING_PROBABILITY is 0.99 only for testing
|
||||
// MIN_BYTE_SAMPLING_PROBABILITY is 0 for other cases
|
||||
if (SERVER_KNOBS->MIN_BYTE_SAMPLING_PROBABILITY != 0) {
|
||||
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
|
||||
}
|
||||
info.probability = std::clamp(info.probability, SERVER_KNOBS->MIN_BYTE_SAMPLING_PROBABILITY, 1.0);
|
||||
info.inSample = a / ((1 << 30) * 4.0) < info.probability;
|
||||
info.sampledSize = info.size / info.probability;
|
||||
|
@ -13971,7 +13968,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
|
|||
Reference<AsyncVar<ServerDBInfo> const> db,
|
||||
std::string folder) {
|
||||
state StorageServer self(persistentData, db, ssi);
|
||||
self.shardAware = SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && persistentData->shardAware();
|
||||
self.shardAware = persistentData->shardAware();
|
||||
state Future<Void> ssCore;
|
||||
self.initialClusterVersion = startVersion;
|
||||
if (ssi.isTss()) {
|
||||
|
|
Loading…
Reference in New Issue