Merge remote-tracking branch 'apple-upstream/main'

This commit is contained in:
Sreenath Bodagala 2023-11-14 16:30:12 +00:00
commit 03f85e2e9b
15 changed files with 178 additions and 131 deletions

View File

@ -182,8 +182,8 @@ ACTOR Future<Void> printServerShards(Database cx, UID serverId) {
KeyRangeRef currentRange(serverShards[i].key, serverShards[i + 1].key);
UID shardId;
bool assigned, emptyRange;
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False;
decodeServerKeysValue(serverShards[i].value, assigned, emptyRange, enablePSM, shardId);
DataMoveType dataMoveType = DataMoveType::LOGICAL;
decodeServerKeysValue(serverShards[i].value, assigned, emptyRange, dataMoveType, shardId);
printf("Range: %s, ShardID: %s, Assigned: %s\n",
Traceable<KeyRangeRef>::toString(currentRange).c_str(),
shardId.toString().c_str(),

View File

@ -3375,10 +3375,7 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
partitionedLog = config.partitionedLogEnabled().get(tr);
state Future<Version> startVersionFuture = tr->getReadVersion();
wait(success(partitionedLog) && success(startVersionFuture));
Params.beginVersion().set(task, startVersionFuture.get());
wait(success(partitionedLog));
break;
} catch (Error& e) {
wait(tr->onError(e));
@ -3393,6 +3390,23 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase {
// the flag was not set before.
wait(success(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=1", true)));
backupWorkerEnabled = true;
// the user is responsible for manually disabling backup worker after the last backup is done
}
// Get start version after backup worker are enabled
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Version> startVersionFuture = tr->getReadVersion();
wait(success(startVersionFuture));
Params.beginVersion().set(task, startVersionFuture.get());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
// Set the "backupStartedKey" and wait for all backup worker started

View File

@ -183,7 +183,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( MAX_LARGE_SHARD_BYTES, 1000000000 ); // 1G
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled
init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.0 ); if( isSimulated ) DD_PHYSICAL_SHARD_MOVE_PROBABILITY = 0.5;
init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.0 ); if( isSimulated ) DD_PHYSICAL_SHARD_MOVE_PROBABILITY = 0.5;
init( ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT, false ); if( isSimulated ) ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT = deterministicRandom()->coinflip();
init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server
init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD
init( ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME, 600.0 ); if( randomize && BUGGIFY ) ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME = 0.0; // 600 seconds; for ENABLE_DD_PHYSICAL_SHARD
@ -517,7 +518,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_COMPACTION_READAHEAD_SIZE, 32768 ); // 32 KB, performs bigger reads when doing compaction.
init( ROCKSDB_BLOCK_SIZE, 32768 ); // 32 KB, size of the block in rocksdb cache.
init( ENABLE_SHARDED_ROCKSDB, false );
init( ROCKSDB_WRITE_BUFFER_SIZE, isSimulated ? 16 << 20 : 64 << 20 ); // 64 MB
init( ROCKSDB_WRITE_BUFFER_SIZE, isSimulated ? 256 << 10 : 64 << 20 ); // 64 MB
init( ROCKSDB_MAX_WRITE_BUFFER_NUMBER, 6 ); // RocksDB default.
init( ROCKSDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE, 2 ); // RocksDB default.
init( ROCKSDB_LEVEL0_FILENUM_COMPACTION_TRIGGER, 2 ); // RocksDB default.
@ -553,14 +554,16 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init (SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01();
init (SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB
init (ROCKSDB_MAX_MANIFEST_FILE_SIZE, 100 << 20 ); if (isSimulated) ROCKSDB_MAX_MANIFEST_FILE_SIZE = 500 << 20; // 500MB in simulation
init (SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER, 6 ); // RocksDB default.
init (SHARDED_ROCKSDB_AVERAGE_FILE_SIZE, 8 << 20 ); // 8MB
init (SHARDED_ROCKSDB_COMPACTION_PERIOD, isSimulated? 3600 : 2592000 ); // 30d
init (SHARDED_ROCKSDB_COMPACTION_ACTOR_DELAY, 3600 ); // 1h
init (SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT, 1 );
init( SHARDED_ROCKSDB_WRITE_BUFFER_SIZE, isSimulated? 128 << 20 : 1 << 30 ); // 1G
init( SHARDED_ROCKSDB_CF_WRITE_BUFFER_SIZE, isSimulated? 16 << 20 : 64 << 20 ); // 64M, RocksDB default.
init( SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE, 16777216 ); // 16MB, RocksDB default.
init (SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT, -1 );
init( SHARDED_ROCKSDB_WRITE_BUFFER_SIZE, 16 << 20 ); // 16MB
init( SHARDED_ROCKSDB_TOTAL_WRITE_BUFFER_SIZE, 1 << 30 ); // 1GB
init( SHARDED_ROCKSDB_MEMTABLE_BUDGET, 64 << 20); // 64MB
init( SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER, 6 ); // RocksDB default.
init( SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE, 16 << 20); // 16MB
init( SHARDED_ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER, 1 ); // RocksDB default.
// Leader election
bool longLeaderElection = randomize && BUGGIFY;
@ -674,7 +677,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BACKUP_TIMEOUT, 0.4 );
init( BACKUP_NOOP_POP_DELAY, 5.0 );
init( BACKUP_FILE_BLOCK_BYTES, 1024 * 1024 );
init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(1024, 4096) * 256 * 1024;
init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(1024, 4096) * 4096;
init( BACKUP_UPLOAD_DELAY, 10.0 ); if(randomize && BUGGIFY) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 60;
//Cluster Controller

View File

@ -518,7 +518,7 @@ const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTr
const UID newDataMoveId(const uint64_t physicalShardId,
AssignEmptyRange assignEmptyRange,
EnablePhysicalShardMove enablePSM,
const DataMoveType type,
UnassignShard unassignShard) {
uint64_t split = 0;
if (assignEmptyRange) {
@ -528,11 +528,8 @@ const UID newDataMoveId(const uint64_t physicalShardId,
} else {
do {
split = deterministicRandom()->randomUInt64();
if (enablePSM) {
split |= 1U;
} else {
split &= ~1U;
}
// Set the lowest 8 bits
split = ((~0xFF) & split) | static_cast<uint64_t>(type);
} while (split == anonymousShardId.second() || split == 0 || split == emptyShardId);
}
return UID(physicalShardId, split);
@ -574,8 +571,8 @@ std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key) {
bool serverHasKey(ValueRef storedValue) {
UID shardId;
bool assigned, emptyRange;
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False;
decodeServerKeysValue(storedValue, assigned, emptyRange, enablePSM, shardId);
DataMoveType dataMoveType = DataMoveType::LOGICAL;
decodeServerKeysValue(storedValue, assigned, emptyRange, dataMoveType, shardId);
return assigned;
}
@ -589,12 +586,21 @@ const Value serverKeysValue(const UID& id) {
return wr.toValue();
}
void decodeDataMoveId(const UID& id, bool& assigned, bool& emptyRange, DataMoveType& dataMoveType) {
dataMoveType = DataMoveType::LOGICAL;
assigned = id.second() != 0LL;
emptyRange = id.second() == emptyShardId;
if (assigned && !emptyRange && id != anonymousShardId) {
dataMoveType = static_cast<DataMoveType>(0xFF & id.second());
}
}
void decodeServerKeysValue(const ValueRef& value,
bool& assigned,
bool& emptyRange,
EnablePhysicalShardMove& enablePSM,
DataMoveType& dataMoveType,
UID& id) {
enablePSM = EnablePhysicalShardMove::False;
dataMoveType = DataMoveType::LOGICAL;
if (value.size() == 0) {
assigned = false;
emptyRange = false;
@ -615,18 +621,10 @@ void decodeServerKeysValue(const ValueRef& value,
BinaryReader rd(value, IncludeVersion());
ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData());
rd >> id;
assigned = id.second() != 0;
emptyRange = id.second() == emptyShardId;
if (id.second() & 1U) {
enablePSM = EnablePhysicalShardMove::True;
}
decodeDataMoveId(id, assigned, emptyRange, dataMoveType);
}
}
bool physicalShardMoveEnabled(const UID& dataMoveId) {
return (dataMoveId.second() & 1U);
}
const KeyRef cacheKeysPrefix = "\xff\x02/cacheKeys/"_sr;
const Key cacheKeysKey(uint16_t idx, const KeyRef& key) {
@ -2071,3 +2069,21 @@ TEST_CASE("noSim/SystemData/compat/KeyServers") {
return Void();
}
TEST_CASE("noSim/SystemData/DataMoveId") {
printf("testing data move ID encoding/decoding\n");
const uint64_t physicalShardId = deterministicRandom()->randomUInt64();
const DataMoveType type =
static_cast<DataMoveType>(deterministicRandom()->randomInt(0, static_cast<int>(DataMoveType::NUMBER_OF_TYPES)));
const UID dataMoveId = newDataMoveId(physicalShardId, AssignEmptyRange(false), type, UnassignShard(false));
bool assigned, emptyRange;
DataMoveType decodeType;
decodeDataMoveId(dataMoveId, assigned, emptyRange, decodeType);
ASSERT(type == decodeType);
printf("testing data move ID encoding/decoding complete\n");
return Void();
}

View File

@ -211,6 +211,7 @@ public:
bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID.
bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true.
double DD_PHYSICAL_SHARD_MOVE_PROBABILITY; // Percentage of physical shard move, in the range of [0, 1].
bool ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT;
int64_t MAX_PHYSICAL_SHARD_BYTES;
double PHYSICAL_SHARD_METRICS_DELAY;
double ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME;
@ -507,14 +508,16 @@ public:
double SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO;
int SHARD_METADATA_SCAN_BYTES_LIMIT;
int ROCKSDB_MAX_MANIFEST_FILE_SIZE;
int SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER;
int SHARDED_ROCKSDB_AVERAGE_FILE_SIZE;
double SHARDED_ROCKSDB_COMPACTION_PERIOD;
double SHARDED_ROCKSDB_COMPACTION_ACTOR_DELAY;
int SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT;
int64_t SHARDED_ROCKSDB_WRITE_BUFFER_SIZE;
int64_t SHARDED_ROCKSDB_CF_WRITE_BUFFER_SIZE;
int64_t SHARDED_ROCKSDB_TOTAL_WRITE_BUFFER_SIZE;
int64_t SHARDED_ROCKSDB_MEMTABLE_BUDGET;
int64_t SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER;
int SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE;
int SHARDED_ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -37,6 +37,13 @@ FDB_BOOLEAN_PARAM(AssignEmptyRange);
FDB_BOOLEAN_PARAM(UnassignShard);
FDB_BOOLEAN_PARAM(EnablePhysicalShardMove);
enum class DataMoveType : uint8_t {
LOGICAL = 0,
PHYSICAL = 1,
PHYSICAL_EXP = 2,
NUMBER_OF_TYPES = 3,
};
// SystemKey is just a Key but with a special type so that instances of it can be found easily throughput the code base
// and in simulation constructions will verify that no SystemKey is a direct prefix of any other.
struct SystemKey : Key {
@ -163,7 +170,7 @@ extern const KeyRef serverKeysPrefix;
extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
const UID newDataMoveId(const uint64_t physicalShardId,
AssignEmptyRange assignEmptyRange,
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False,
const DataMoveType type,
UnassignShard unassignShard = UnassignShard::False);
const Key serverKeysKey(UID serverID, const KeyRef& keys);
const Key serverKeysPrefixFor(UID serverID);
@ -171,12 +178,12 @@ UID serverKeysDecodeServer(const KeyRef& key);
std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key);
bool serverHasKey(ValueRef storedValue);
const Value serverKeysValue(const UID& id);
void decodeDataMoveId(const UID& id, bool& assigned, bool& emptyRange, DataMoveType& dataMoveType);
void decodeServerKeysValue(const ValueRef& value,
bool& assigned,
bool& emptyRange,
EnablePhysicalShardMove& enablePSM,
DataMoveType& dataMoveType,
UID& id);
bool physicalShardMoveEnabled(const UID& dataMoveId);
extern const KeyRangeRef conflictingKeysRange;
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;

View File

@ -372,10 +372,10 @@ struct BackupData {
}
// keep track of each arena and accumulate their sizes
int64_t bytes = 0;
for (int i = 0; i < num; i++) {
int64_t bytes = messages[0].bytes;
for (int i = 1; i < num; i++) {
const Arena& a = messages[i].arena;
const Arena& b = messages[i + 1].arena;
const Arena& b = messages[i - 1].arena;
if (!a.sameArena(b)) {
bytes += messages[i].bytes;
TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId).detail("Release", messages[i].bytes);
@ -878,25 +878,19 @@ ACTOR Future<Void> uploadData(BackupData* self) {
int lastVersionIndex = 0;
Version lastVersion = invalidVersion;
if (self->messages.empty()) {
// Even though messages is empty, we still want to advance popVersion.
if (!self->endVersion.present()) {
popVersion = std::max(popVersion, self->minKnownCommittedVersion);
for (auto& message : self->messages) {
// message may be prefetched in peek; uncommitted message should not be uploaded.
const Version version = message.getVersion();
if (version > self->maxPopVersion()) {
break;
}
} else {
for (auto& message : self->messages) {
// message may be prefetched in peek; uncommitted message should not be uploaded.
const Version version = message.getVersion();
if (version > self->maxPopVersion())
break;
if (version > popVersion) {
lastVersionIndex = numMsg;
lastVersion = popVersion;
popVersion = version;
}
message.collectCipherDetailIfEncrypted(cipherDetails);
numMsg++;
if (version > popVersion) {
lastVersionIndex = numMsg;
lastVersion = popVersion;
popVersion = version;
}
message.collectCipherDetailIfEncrypted(cipherDetails);
numMsg++;
}
if (self->pullFinished()) {
popVersion = self->endVersion.get();
@ -938,7 +932,6 @@ ACTOR Future<Void> uploadData(BackupData* self) {
}
if (self->allMessageSaved()) {
self->eraseMessages(self->messages.size());
return Void();
}
@ -1138,6 +1131,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf,
TraceEvent("BackupWorkerWaitKey", self.myId).detail("Present", present).detail("ExitEarly", self.exitEarly);
pull = self.exitEarly ? Void() : monitorBackupKeyOrPullData(&self, present);
addActor.send(pull);
done = self.exitEarly ? Void() : uploadData(&self);
loop choose {

View File

@ -112,6 +112,13 @@ std::pair<const DmReasonPriorityMapping*, const PriorityDmReasonMapping*> buildP
return std::make_pair(&reasonPriority, &priorityReason);
}
DataMoveType getDataMoveType(const UID& dataMoveId) {
bool assigned, emptyRange;
DataMoveType dataMoveType;
decodeDataMoveId(dataMoveId, assigned, emptyRange, dataMoveType);
return dataMoveType;
}
int dataMovementPriority(DataMovementReason reason) {
auto [reasonPriority, _] = buildPriorityMappings();
return reasonPriority->at(reason);
@ -957,6 +964,18 @@ void DDQueue::launchQueuedWork(RelocateData launchData, const DDEnabledState* dd
launchQueuedWork(combined, ddEnabledState);
}
DataMoveType newDataMoveType() {
DataMoveType type = DataMoveType::LOGICAL;
if (deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY) {
type = DataMoveType::PHYSICAL;
}
if (type != DataMoveType::PHYSICAL && SERVER_KNOBS->ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT) {
type = DataMoveType::PHYSICAL_EXP;
}
return type;
}
// For each relocateData rd in the queue, check if there exist inflight relocate data whose keyrange is overlapped
// with rd. If there exist, cancel them by cancelling their actors and reducing the src servers' busyness of those
// canceled inflight relocateData. Launch the relocation for the rd.
@ -1069,11 +1088,8 @@ void DDQueue::launchQueuedWork(std::set<RelocateData, std::greater<RelocateData>
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD) {
rrs.dataMoveId = UID();
} else {
const bool enabled =
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(),
AssignEmptyRange::False,
EnablePhysicalShardMove(enabled));
rrs.dataMoveId = newDataMoveId(
deterministicRandom()->randomUInt64(), AssignEmptyRange::False, newDataMoveType());
TraceEvent(SevInfo, "NewDataMoveWithRandomDestID")
.detail("DataMoveID", rrs.dataMoveId.toString())
.detail("Range", rrs.keys)
@ -1658,10 +1674,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
} else {
self->moveCreateNewPhysicalShard++;
}
const bool enabled =
deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY;
rd.dataMoveId = newDataMoveId(
physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled));
rd.dataMoveId = newDataMoveId(physicalShardIDCandidate, AssignEmptyRange::False, newDataMoveType());
TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard")
.detail("DataMoveID", rd.dataMoveId.toString())
.detail("Reason", rd.reason.toString())
@ -1905,7 +1918,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
.detail("Reason", rd.reason.toString())
.detail("DataMoveReason", static_cast<int>(rd.dmReason))
.detail("DataMoveID", rd.dataMoveId)
.detail("PhysicalShardMove", physicalShardMoveEnabled(rd.dataMoveId));
.detail("DataMoveType", getDataMoveType(rd.dataMoveId));
if (now() - startTime > 600) {
TraceEvent(SevWarnAlways, "RelocateShardTooLong")
.detail("Duration", now() - startTime)

View File

@ -679,15 +679,22 @@ rocksdb::WALRecoveryMode getWalRecoveryMode() {
rocksdb::ColumnFamilyOptions getCFOptions() {
rocksdb::ColumnFamilyOptions options;
options.memtable_max_range_deletions = SERVER_KNOBS->ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS;
options.disable_auto_compactions = SERVER_KNOBS->ROCKSDB_DISABLE_AUTO_COMPACTIONS;
options.level_compaction_dynamic_level_bytes = SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES;
options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES);
if (SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES) {
options.level_compaction_dynamic_level_bytes = SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES;
options.OptimizeLevelStyleCompaction(SERVER_KNOBS->SHARDED_ROCKSDB_MEMTABLE_BUDGET);
}
options.write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_WRITE_BUFFER_SIZE;
options.max_write_buffer_number = SERVER_KNOBS->SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER;
options.target_file_size_base = SERVER_KNOBS->SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE;
options.target_file_size_multiplier = SERVER_KNOBS->SHARDED_ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER;
if (SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS > 0) {
options.periodic_compaction_seconds = SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS;
}
options.disable_auto_compactions = SERVER_KNOBS->ROCKSDB_DISABLE_AUTO_COMPACTIONS;
options.paranoid_file_checks = SERVER_KNOBS->ROCKSDB_PARANOID_FILE_CHECKS;
options.memtable_max_range_deletions = SERVER_KNOBS->ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS;
options.disable_auto_compactions = SERVER_KNOBS->ROCKSDB_DISABLE_AUTO_COMPACTIONS;
if (SERVER_KNOBS->SHARD_SOFT_PENDING_COMPACT_BYTES_LIMIT > 0) {
options.soft_pending_compaction_bytes_limit = SERVER_KNOBS->SHARD_SOFT_PENDING_COMPACT_BYTES_LIMIT;
@ -754,19 +761,16 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
return options;
}
rocksdb::Options getOptions() {
rocksdb::Options options;
rocksdb::DBOptions getOptions() {
rocksdb::DBOptions options;
options.avoid_unnecessary_blocking_io = true;
options.create_if_missing = true;
options.atomic_flush = SERVER_KNOBS->ROCKSDB_ATOMIC_FLUSH;
options.memtable_max_range_deletions = SERVER_KNOBS->ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS;
if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) {
options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM);
}
options.wal_recovery_mode = getWalRecoveryMode();
options.target_file_size_base = SERVER_KNOBS->SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE;
options.target_file_size_multiplier = SERVER_KNOBS->ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER;
options.max_open_files = SERVER_KNOBS->ROCKSDB_MAX_OPEN_FILES;
options.delete_obsolete_files_period_micros = SERVER_KNOBS->ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD * 1000000;
options.max_total_wal_size = SERVER_KNOBS->ROCKSDB_MAX_TOTAL_WAL_SIZE;
@ -788,8 +792,7 @@ rocksdb::Options getOptions() {
options.WAL_ttl_seconds = SERVER_KNOBS->ROCKSDB_WAL_TTL_SECONDS;
options.WAL_size_limit_MB = SERVER_KNOBS->ROCKSDB_WAL_SIZE_LIMIT_MB;
options.db_write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_WRITE_BUFFER_SIZE;
options.write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_CF_WRITE_BUFFER_SIZE;
options.db_write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_TOTAL_WRITE_BUFFER_SIZE;
options.statistics = rocksdb::CreateDBStatistics();
options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers);
options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY;
@ -802,7 +805,6 @@ rocksdb::Options getOptions() {
options.skip_stats_update_on_db_open = SERVER_KNOBS->ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN;
options.skip_checking_sst_file_sizes_on_db_open = SERVER_KNOBS->ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN;
options.max_manifest_file_size = SERVER_KNOBS->ROCKSDB_MAX_MANIFEST_FILE_SIZE;
options.max_write_buffer_number = SERVER_KNOBS->SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER;
return options;
}
@ -1191,7 +1193,7 @@ class ShardManager {
public:
ShardManager(std::string path,
UID logId,
const rocksdb::Options& options,
const rocksdb::DBOptions& options,
std::shared_ptr<RocksDBErrorListener> errorListener,
std::shared_ptr<RocksDBEventListener> eventListener,
Counters* cc)
@ -1859,7 +1861,7 @@ public:
logRocksDBError(s, "Close");
return;
}
s = rocksdb::DestroyDB(path, dbOptions);
s = rocksdb::DestroyDB(path, rocksdb::Options(dbOptions, getCFOptions()));
if (!s.ok()) {
logRocksDBError(s, "DestroyDB");
}
@ -1946,7 +1948,7 @@ public:
private:
const std::string path;
const UID logId;
rocksdb::Options dbOptions;
rocksdb::DBOptions dbOptions;
rocksdb::ColumnFamilyOptions cfOptions;
rocksdb::DB* db = nullptr;
std::unordered_map<std::string, std::shared_ptr<PhysicalShard>> physicalShards;
@ -4108,7 +4110,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
std::shared_ptr<ShardedRocksDBState> rState;
rocksdb::Options dbOptions;
rocksdb::DBOptions dbOptions;
std::shared_ptr<RocksDBErrorListener> errorListener;
std::shared_ptr<RocksDBEventListener> eventListener;
ShardManager shardManager;

View File

@ -51,6 +51,13 @@ struct Shard {
UID id;
};
bool shouldCreateCheckpoint(const UID& dataMoveId) {
bool assigned, emptyRange;
DataMoveType type;
decodeDataMoveId(dataMoveId, assigned, emptyRange, type);
return type == DataMoveType::PHYSICAL || type == DataMoveType::PHYSICAL_EXP;
}
// Unassigns keyrange `range` from server `ssId`, except ranges in `shards`.
// Note: krmSetRangeCoalescing() doesn't work in this case since each shard is assigned an ID.
ACTOR Future<Void> unassignServerKeys(Transaction* tr, UID ssId, KeyRange range, std::vector<Shard> shards, UID logId) {
@ -164,7 +171,7 @@ ACTOR Future<Void> unassignServerKeys(Transaction* tr, UID ssId, KeyRange range,
}
ACTOR Future<Void> deleteCheckpoints(Transaction* tr, std::set<UID> checkpointIds, UID dataMoveId) {
if (!physicalShardMoveEnabled(dataMoveId)) {
if (!shouldCreateCheckpoint(dataMoveId)) {
return Void();
}
TraceEvent(SevDebug, "DataMoveDeleteCheckpoints", dataMoveId).detail("Checkpoints", describe(checkpointIds));
@ -408,8 +415,8 @@ ACTOR Future<bool> validateRangeAssignment(Database occ,
for (int i = 0; i < readResult.size() - 1; i++) {
UID shardId;
bool assigned, emptyRange;
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False;
decodeServerKeysValue(readResult[i].value, assigned, emptyRange, enablePSM, shardId);
DataMoveType dataMoveType = DataMoveType::LOGICAL;
decodeServerKeysValue(readResult[i].value, assigned, emptyRange, dataMoveType, shardId);
if (!assigned) {
TraceEvent(SevError, "ValidateRangeAssignmentCorruptionDetected")
.detail("DataMoveID", dataMoveId)
@ -418,7 +425,7 @@ ACTOR Future<bool> validateRangeAssignment(Database occ,
.detail("ErrorMessage", "KeyServers has range but ServerKeys does not have")
.detail("CurrentEmptyRange", emptyRange)
.detail("CurrentAssignment", assigned)
.detail("EnablePSM", enablePSM)
.detail("DataMoveType", static_cast<uint8_t>(dataMoveType))
.detail("ServerID", ssid)
.detail("ShardID", shardId);
allCorrect = false;
@ -1797,7 +1804,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
dataMove.src.insert(src.begin(), src.end());
if (physicalShardMoveEnabled(dataMoveId)) {
if (shouldCreateCheckpoint(dataMoveId)) {
const UID checkpointId = UID(deterministicRandom()->randomUInt64(), srcId.first());
CheckpointMetaData checkpoint(std::vector<KeyRange>{ rangeIntersectKeys },
DataMoveRocksCF,
@ -2519,8 +2526,8 @@ ACTOR Future<bool> canRemoveStorageServer(Reference<ReadYourWritesTransaction> t
// than one result
UID shardId;
bool assigned, emptyRange;
EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False;
decodeServerKeysValue(keys[0].value, assigned, emptyRange, enablePSM, shardId);
DataMoveType dataMoveType = DataMoveType::LOGICAL;
decodeServerKeysValue(keys[0].value, assigned, emptyRange, dataMoveType, shardId);
TraceEvent(SevVerbose, "CanRemoveStorageServer")
.detail("ServerID", serverID)
.detail("Key1", keys[0].key)
@ -2757,8 +2764,8 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
}
}
const UID shardId =
newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::True);
const UID shardId = newDataMoveId(
deterministicRandom()->randomUInt64(), AssignEmptyRange::True, DataMoveType::LOGICAL);
// Assign the shard to teamForDroppedRange in keyServer space.
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {

View File

@ -2221,8 +2221,7 @@ void SimulationConfig::setProcessesPerMachine(const TestConfig& testConfig) {
void SimulationConfig::setTss(const TestConfig& testConfig) {
int tssCount = 0;
// TODO: Support TSS in SHARD_ENCODE_LOCATION_METADATA mode.
if (!testConfig.simpleConfig && !testConfig.disableTss && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
deterministicRandom()->random01() < 0.25) {
if (!testConfig.simpleConfig && !testConfig.disableTss && deterministicRandom()->random01() < 0.25) {
// 1 or 2 tss
tssCount = deterministicRandom()->randomInt(1, 3);
}

View File

@ -1622,7 +1622,7 @@ void commitMessages(TLogData* self, Reference<LogData> logData, Version version,
Version poppedVersion(Reference<LogData> self, Tag tag) {
auto tagData = self->getTagData(tag);
if (!tagData) {
if (tag == txsTag || tag.locality == tagLocalityTxs) {
if (tag == txsTag || tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter) {
return 0;
}
return std::max(self->recoveredAt + 1, self->recoveryTxnVersion);

View File

@ -8936,27 +8936,12 @@ void AddingShard::addMutation(Version version,
ASSERT(false);
}
ACTOR Future<Void> updateMoveInShardMetaData(StorageServer* data, MoveInShard* shard) {
state double startTime = now();
if (g_network->isSimulated()) {
Optional<Value> pm = wait(data->storage.readValue(persistMoveInShardKey(shard->id())));
if (!pm.present()) {
TraceEvent(SevError, "UpdatedMoveInShardMetaDataNotFound", data->thisServerID)
.detail("Shard", shard->toString())
.detail("ShardKey", persistMoveInShardKey(shard->id()))
.detail("DurableVersion", data->durableVersion.get());
}
}
void updateMoveInShardMetaData(StorageServer* data, MoveInShard* shard) {
data->storage.writeKeyValue(KeyValueRef(persistMoveInShardKey(shard->id()), moveInShardValue(*shard->meta)));
wait(data->durableVersion.whenAtLeast(data->storageVersion() + 1));
TraceEvent(shard->logSev, "UpdatedMoveInShardMetaData", data->thisServerID)
.detail("Shard", shard->toString())
.detail("ShardKey", persistMoveInShardKey(shard->id()))
.detail("DurableVersion", data->durableVersion.get())
.detail("DurationSecs", now() - startTime);
return Void();
.detail("DurableVersion", data->durableVersion.get());
}
void changeServerKeysWithPhysicalShards(StorageServer* data,
@ -9091,7 +9076,7 @@ ACTOR Future<Void> fetchShardCheckpoint(StorageServer* data, MoveInShard* moveIn
moveInShard->meta->checkpoints = std::move(localRecords);
moveInShard->setPhase(MoveInPhase::Ingesting);
wait(updateMoveInShardMetaData(data, moveInShard));
updateMoveInShardMetaData(data, moveInShard);
TraceEvent(SevInfo, "FetchShardCheckpointsEnd", data->thisServerID).detail("MoveInShard", moveInShard->toString());
return Void();
@ -9114,7 +9099,7 @@ ACTOR Future<Void> fetchShardIngestCheckpoint(StorageServer* data, MoveInShard*
.detail("Checkpoints", describe(moveInShard->checkpoints()));
if (e.code() == error_code_failed_to_restore_checkpoint && !moveInShard->failed()) {
moveInShard->setPhase(MoveInPhase::Fetching);
wait(updateMoveInShardMetaData(data, moveInShard));
updateMoveInShardMetaData(data, moveInShard);
return Void();
}
throw err;
@ -9160,7 +9145,7 @@ ACTOR Future<Void> fetchShardIngestCheckpoint(StorageServer* data, MoveInShard*
}
moveInShard->setPhase(MoveInPhase::ApplyingUpdates);
wait(updateMoveInShardMetaData(data, moveInShard));
updateMoveInShardMetaData(data, moveInShard);
moveInShard->fetchComplete.send(Void());
@ -9281,7 +9266,7 @@ ACTOR Future<Void> fetchShardApplyUpdates(StorageServer* data,
double duration = now() - startTime;
const int64_t totalBytes = getTotalFetchedBytes(moveInShard->checkpoints());
TraceEvent(SevInfo, "IngestShardStats", data->thisServerID)
TraceEvent(moveInShard->logSev, "FetchShardApplyUpdatesStats", data->thisServerID)
.detail("MoveInShard", moveInShard->toString())
.detail("Duration", duration)
.detail("TotalBytes", totalBytes)
@ -10534,7 +10519,10 @@ private:
// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same
// keys
startKey = m.param1;
decodeServerKeysValue(m.param2, nowAssigned, emptyRange, enablePSM, dataMoveId);
DataMoveType dataMoveType = DataMoveType::LOGICAL;
decodeServerKeysValue(m.param2, nowAssigned, emptyRange, dataMoveType, dataMoveId);
enablePSM = EnablePhysicalShardMove(dataMoveType == DataMoveType::PHYSICAL ||
(dataMoveType == DataMoveType::PHYSICAL_EXP && data->isTss()));
processedStartKey = true;
} else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
// lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)

View File

@ -2032,6 +2032,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
state ISimulator::BackupAgentType simDrAgents = ISimulator::BackupAgentType::NoBackupAgents;
state bool enableDD = false;
state TesterConsistencyScanState consistencyScanState;
if (tests.empty())
useDB = true;
for (auto iter = tests.begin(); iter != tests.end(); ++iter) {

View File

@ -96,16 +96,16 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
state std::unordered_set<UID> includes;
state int teamSize = 1;
state KeyRangeRef currentRange = KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr);
wait(store(teamA,
self->moveShard(self,
cx,
newDataMoveId(deterministicRandom()->randomUInt64(),
AssignEmptyRange::False,
EnablePhysicalShardMove::True),
currentRange,
teamSize,
includes,
excludes)));
wait(store(
teamA,
self->moveShard(
self,
cx,
newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, DataMoveType::PHYSICAL),
currentRange,
teamSize,
includes,
excludes)));
TraceEvent(SevDebug, "TestMovedRange1").detail("Range", currentRange).detail("Team", describe(teamA));
excludes.insert(teamA.begin(), teamA.end());
@ -119,7 +119,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
wait(store(teamA,
self->moveShard(self,
cx,
newDataMoveId(sh0, AssignEmptyRange::False, EnablePhysicalShardMove::True),
newDataMoveId(sh0, AssignEmptyRange::False, DataMoveType::PHYSICAL),
currentRange,
teamSize,
includes,