diff --git a/fdbcli/LocationMetadataCommand.actor.cpp b/fdbcli/LocationMetadataCommand.actor.cpp index 63c7e092bf..bf7d63e450 100644 --- a/fdbcli/LocationMetadataCommand.actor.cpp +++ b/fdbcli/LocationMetadataCommand.actor.cpp @@ -182,8 +182,8 @@ ACTOR Future printServerShards(Database cx, UID serverId) { KeyRangeRef currentRange(serverShards[i].key, serverShards[i + 1].key); UID shardId; bool assigned, emptyRange; - EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False; - decodeServerKeysValue(serverShards[i].value, assigned, emptyRange, enablePSM, shardId); + DataMoveType dataMoveType = DataMoveType::LOGICAL; + decodeServerKeysValue(serverShards[i].value, assigned, emptyRange, dataMoveType, shardId); printf("Range: %s, ShardID: %s, Assigned: %s\n", Traceable::toString(currentRange).c_str(), shardId.toString().c_str(), diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index 10b7e14afe..ee655519c7 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -3375,10 +3375,7 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); partitionedLog = config.partitionedLogEnabled().get(tr); - state Future startVersionFuture = tr->getReadVersion(); - wait(success(partitionedLog) && success(startVersionFuture)); - - Params.beginVersion().set(task, startVersionFuture.get()); + wait(success(partitionedLog)); break; } catch (Error& e) { wait(tr->onError(e)); @@ -3393,6 +3390,23 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase { // the flag was not set before. wait(success(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=1", true))); backupWorkerEnabled = true; + // the user is responsible for manually disabling backup worker after the last backup is done + } + + // Get start version after backup worker are enabled + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + state Future startVersionFuture = tr->getReadVersion(); + wait(success(startVersionFuture)); + Params.beginVersion().set(task, startVersionFuture.get()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } } // Set the "backupStartedKey" and wait for all backup worker started diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 678407c277..63c0371c4e 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -183,7 +183,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( MAX_LARGE_SHARD_BYTES, 1000000000 ); // 1G init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true; init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled - init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.0 ); if( isSimulated ) DD_PHYSICAL_SHARD_MOVE_PROBABILITY = 0.5; + init( DD_PHYSICAL_SHARD_MOVE_PROBABILITY, 0.0 ); if( isSimulated ) DD_PHYSICAL_SHARD_MOVE_PROBABILITY = 0.5; + init( ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT, false ); if( isSimulated ) ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT = deterministicRandom()->coinflip(); init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD init( ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME, 600.0 ); if( randomize && BUGGIFY ) ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME = 0.0; // 600 seconds; for ENABLE_DD_PHYSICAL_SHARD @@ -517,7 +518,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_COMPACTION_READAHEAD_SIZE, 32768 ); // 32 KB, performs bigger reads when doing compaction. init( ROCKSDB_BLOCK_SIZE, 32768 ); // 32 KB, size of the block in rocksdb cache. init( ENABLE_SHARDED_ROCKSDB, false ); - init( ROCKSDB_WRITE_BUFFER_SIZE, isSimulated ? 16 << 20 : 64 << 20 ); // 64 MB + init( ROCKSDB_WRITE_BUFFER_SIZE, isSimulated ? 256 << 10 : 64 << 20 ); // 64 MB init( ROCKSDB_MAX_WRITE_BUFFER_NUMBER, 6 ); // RocksDB default. init( ROCKSDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE, 2 ); // RocksDB default. init( ROCKSDB_LEVEL0_FILENUM_COMPACTION_TRIGGER, 2 ); // RocksDB default. @@ -553,14 +554,16 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init (SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01(); init (SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB init (ROCKSDB_MAX_MANIFEST_FILE_SIZE, 100 << 20 ); if (isSimulated) ROCKSDB_MAX_MANIFEST_FILE_SIZE = 500 << 20; // 500MB in simulation - init (SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER, 6 ); // RocksDB default. init (SHARDED_ROCKSDB_AVERAGE_FILE_SIZE, 8 << 20 ); // 8MB init (SHARDED_ROCKSDB_COMPACTION_PERIOD, isSimulated? 3600 : 2592000 ); // 30d init (SHARDED_ROCKSDB_COMPACTION_ACTOR_DELAY, 3600 ); // 1h - init (SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT, 1 ); - init( SHARDED_ROCKSDB_WRITE_BUFFER_SIZE, isSimulated? 128 << 20 : 1 << 30 ); // 1G - init( SHARDED_ROCKSDB_CF_WRITE_BUFFER_SIZE, isSimulated? 16 << 20 : 64 << 20 ); // 64M, RocksDB default. - init( SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE, 16777216 ); // 16MB, RocksDB default. + init (SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT, -1 ); + init( SHARDED_ROCKSDB_WRITE_BUFFER_SIZE, 16 << 20 ); // 16MB + init( SHARDED_ROCKSDB_TOTAL_WRITE_BUFFER_SIZE, 1 << 30 ); // 1GB + init( SHARDED_ROCKSDB_MEMTABLE_BUDGET, 64 << 20); // 64MB + init( SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER, 6 ); // RocksDB default. + init( SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE, 16 << 20); // 16MB + init( SHARDED_ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER, 1 ); // RocksDB default. // Leader election bool longLeaderElection = randomize && BUGGIFY; @@ -674,7 +677,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BACKUP_TIMEOUT, 0.4 ); init( BACKUP_NOOP_POP_DELAY, 5.0 ); init( BACKUP_FILE_BLOCK_BYTES, 1024 * 1024 ); - init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(1024, 4096) * 256 * 1024; + init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(1024, 4096) * 4096; init( BACKUP_UPLOAD_DELAY, 10.0 ); if(randomize && BUGGIFY) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 60; //Cluster Controller diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 16441f0d27..7182c55e79 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -518,7 +518,7 @@ const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTr const UID newDataMoveId(const uint64_t physicalShardId, AssignEmptyRange assignEmptyRange, - EnablePhysicalShardMove enablePSM, + const DataMoveType type, UnassignShard unassignShard) { uint64_t split = 0; if (assignEmptyRange) { @@ -528,11 +528,8 @@ const UID newDataMoveId(const uint64_t physicalShardId, } else { do { split = deterministicRandom()->randomUInt64(); - if (enablePSM) { - split |= 1U; - } else { - split &= ~1U; - } + // Set the lowest 8 bits + split = ((~0xFF) & split) | static_cast(type); } while (split == anonymousShardId.second() || split == 0 || split == emptyShardId); } return UID(physicalShardId, split); @@ -574,8 +571,8 @@ std::pair serverKeysDecodeServerBegin(const KeyRef& key) { bool serverHasKey(ValueRef storedValue) { UID shardId; bool assigned, emptyRange; - EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False; - decodeServerKeysValue(storedValue, assigned, emptyRange, enablePSM, shardId); + DataMoveType dataMoveType = DataMoveType::LOGICAL; + decodeServerKeysValue(storedValue, assigned, emptyRange, dataMoveType, shardId); return assigned; } @@ -589,12 +586,21 @@ const Value serverKeysValue(const UID& id) { return wr.toValue(); } +void decodeDataMoveId(const UID& id, bool& assigned, bool& emptyRange, DataMoveType& dataMoveType) { + dataMoveType = DataMoveType::LOGICAL; + assigned = id.second() != 0LL; + emptyRange = id.second() == emptyShardId; + if (assigned && !emptyRange && id != anonymousShardId) { + dataMoveType = static_cast(0xFF & id.second()); + } +} + void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, - EnablePhysicalShardMove& enablePSM, + DataMoveType& dataMoveType, UID& id) { - enablePSM = EnablePhysicalShardMove::False; + dataMoveType = DataMoveType::LOGICAL; if (value.size() == 0) { assigned = false; emptyRange = false; @@ -615,18 +621,10 @@ void decodeServerKeysValue(const ValueRef& value, BinaryReader rd(value, IncludeVersion()); ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData()); rd >> id; - assigned = id.second() != 0; - emptyRange = id.second() == emptyShardId; - if (id.second() & 1U) { - enablePSM = EnablePhysicalShardMove::True; - } + decodeDataMoveId(id, assigned, emptyRange, dataMoveType); } } -bool physicalShardMoveEnabled(const UID& dataMoveId) { - return (dataMoveId.second() & 1U); -} - const KeyRef cacheKeysPrefix = "\xff\x02/cacheKeys/"_sr; const Key cacheKeysKey(uint16_t idx, const KeyRef& key) { @@ -2071,3 +2069,21 @@ TEST_CASE("noSim/SystemData/compat/KeyServers") { return Void(); } + +TEST_CASE("noSim/SystemData/DataMoveId") { + printf("testing data move ID encoding/decoding\n"); + const uint64_t physicalShardId = deterministicRandom()->randomUInt64(); + const DataMoveType type = + static_cast(deterministicRandom()->randomInt(0, static_cast(DataMoveType::NUMBER_OF_TYPES))); + const UID dataMoveId = newDataMoveId(physicalShardId, AssignEmptyRange(false), type, UnassignShard(false)); + + bool assigned, emptyRange; + DataMoveType decodeType; + decodeDataMoveId(dataMoveId, assigned, emptyRange, decodeType); + + ASSERT(type == decodeType); + + printf("testing data move ID encoding/decoding complete\n"); + + return Void(); +} diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 58d2fb3e86..eedc129d63 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -211,6 +211,7 @@ public: bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID. bool ENABLE_DD_PHYSICAL_SHARD; // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true. double DD_PHYSICAL_SHARD_MOVE_PROBABILITY; // Percentage of physical shard move, in the range of [0, 1]. + bool ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT; int64_t MAX_PHYSICAL_SHARD_BYTES; double PHYSICAL_SHARD_METRICS_DELAY; double ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME; @@ -507,14 +508,16 @@ public: double SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO; int SHARD_METADATA_SCAN_BYTES_LIMIT; int ROCKSDB_MAX_MANIFEST_FILE_SIZE; - int SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER; int SHARDED_ROCKSDB_AVERAGE_FILE_SIZE; double SHARDED_ROCKSDB_COMPACTION_PERIOD; double SHARDED_ROCKSDB_COMPACTION_ACTOR_DELAY; int SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT; int64_t SHARDED_ROCKSDB_WRITE_BUFFER_SIZE; - int64_t SHARDED_ROCKSDB_CF_WRITE_BUFFER_SIZE; + int64_t SHARDED_ROCKSDB_TOTAL_WRITE_BUFFER_SIZE; + int64_t SHARDED_ROCKSDB_MEMTABLE_BUDGET; + int64_t SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER; int SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE; + int SHARDED_ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER; // Leader election int MAX_NOTIFICATIONS; diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 42ba2405c7..9090e99c89 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -37,6 +37,13 @@ FDB_BOOLEAN_PARAM(AssignEmptyRange); FDB_BOOLEAN_PARAM(UnassignShard); FDB_BOOLEAN_PARAM(EnablePhysicalShardMove); +enum class DataMoveType : uint8_t { + LOGICAL = 0, + PHYSICAL = 1, + PHYSICAL_EXP = 2, + NUMBER_OF_TYPES = 3, +}; + // SystemKey is just a Key but with a special type so that instances of it can be found easily throughput the code base // and in simulation constructions will verify that no SystemKey is a direct prefix of any other. struct SystemKey : Key { @@ -163,7 +170,7 @@ extern const KeyRef serverKeysPrefix; extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse; const UID newDataMoveId(const uint64_t physicalShardId, AssignEmptyRange assignEmptyRange, - EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False, + const DataMoveType type, UnassignShard unassignShard = UnassignShard::False); const Key serverKeysKey(UID serverID, const KeyRef& keys); const Key serverKeysPrefixFor(UID serverID); @@ -171,12 +178,12 @@ UID serverKeysDecodeServer(const KeyRef& key); std::pair serverKeysDecodeServerBegin(const KeyRef& key); bool serverHasKey(ValueRef storedValue); const Value serverKeysValue(const UID& id); +void decodeDataMoveId(const UID& id, bool& assigned, bool& emptyRange, DataMoveType& dataMoveType); void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, - EnablePhysicalShardMove& enablePSM, + DataMoveType& dataMoveType, UID& id); -bool physicalShardMoveEnabled(const UID& dataMoveId); extern const KeyRangeRef conflictingKeysRange; extern const ValueRef conflictingKeysTrue, conflictingKeysFalse; diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index cbed292db6..fa959dedbb 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -372,10 +372,10 @@ struct BackupData { } // keep track of each arena and accumulate their sizes - int64_t bytes = 0; - for (int i = 0; i < num; i++) { + int64_t bytes = messages[0].bytes; + for (int i = 1; i < num; i++) { const Arena& a = messages[i].arena; - const Arena& b = messages[i + 1].arena; + const Arena& b = messages[i - 1].arena; if (!a.sameArena(b)) { bytes += messages[i].bytes; TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId).detail("Release", messages[i].bytes); @@ -878,25 +878,19 @@ ACTOR Future uploadData(BackupData* self) { int lastVersionIndex = 0; Version lastVersion = invalidVersion; - if (self->messages.empty()) { - // Even though messages is empty, we still want to advance popVersion. - if (!self->endVersion.present()) { - popVersion = std::max(popVersion, self->minKnownCommittedVersion); + for (auto& message : self->messages) { + // message may be prefetched in peek; uncommitted message should not be uploaded. + const Version version = message.getVersion(); + if (version > self->maxPopVersion()) { + break; } - } else { - for (auto& message : self->messages) { - // message may be prefetched in peek; uncommitted message should not be uploaded. - const Version version = message.getVersion(); - if (version > self->maxPopVersion()) - break; - if (version > popVersion) { - lastVersionIndex = numMsg; - lastVersion = popVersion; - popVersion = version; - } - message.collectCipherDetailIfEncrypted(cipherDetails); - numMsg++; + if (version > popVersion) { + lastVersionIndex = numMsg; + lastVersion = popVersion; + popVersion = version; } + message.collectCipherDetailIfEncrypted(cipherDetails); + numMsg++; } if (self->pullFinished()) { popVersion = self->endVersion.get(); @@ -938,7 +932,6 @@ ACTOR Future uploadData(BackupData* self) { } if (self->allMessageSaved()) { - self->eraseMessages(self->messages.size()); return Void(); } @@ -1138,6 +1131,7 @@ ACTOR Future backupWorker(BackupInterface interf, TraceEvent("BackupWorkerWaitKey", self.myId).detail("Present", present).detail("ExitEarly", self.exitEarly); pull = self.exitEarly ? Void() : monitorBackupKeyOrPullData(&self, present); + addActor.send(pull); done = self.exitEarly ? Void() : uploadData(&self); loop choose { diff --git a/fdbserver/DDRelocationQueue.actor.cpp b/fdbserver/DDRelocationQueue.actor.cpp index ebc05b1343..450dcd5f6c 100644 --- a/fdbserver/DDRelocationQueue.actor.cpp +++ b/fdbserver/DDRelocationQueue.actor.cpp @@ -112,6 +112,13 @@ std::pair buildP return std::make_pair(&reasonPriority, &priorityReason); } +DataMoveType getDataMoveType(const UID& dataMoveId) { + bool assigned, emptyRange; + DataMoveType dataMoveType; + decodeDataMoveId(dataMoveId, assigned, emptyRange, dataMoveType); + return dataMoveType; +} + int dataMovementPriority(DataMovementReason reason) { auto [reasonPriority, _] = buildPriorityMappings(); return reasonPriority->at(reason); @@ -957,6 +964,18 @@ void DDQueue::launchQueuedWork(RelocateData launchData, const DDEnabledState* dd launchQueuedWork(combined, ddEnabledState); } +DataMoveType newDataMoveType() { + DataMoveType type = DataMoveType::LOGICAL; + if (deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY) { + type = DataMoveType::PHYSICAL; + } + if (type != DataMoveType::PHYSICAL && SERVER_KNOBS->ENABLE_PHYSICAL_SHARD_MOVE_EXPERIMENT) { + type = DataMoveType::PHYSICAL_EXP; + } + + return type; +} + // For each relocateData rd in the queue, check if there exist inflight relocate data whose keyrange is overlapped // with rd. If there exist, cancel them by cancelling their actors and reducing the src servers' busyness of those // canceled inflight relocateData. Launch the relocation for the rd. @@ -1069,11 +1088,8 @@ void DDQueue::launchQueuedWork(std::set if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD) { rrs.dataMoveId = UID(); } else { - const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; - rrs.dataMoveId = newDataMoveId(deterministicRandom()->randomUInt64(), - AssignEmptyRange::False, - EnablePhysicalShardMove(enabled)); + rrs.dataMoveId = newDataMoveId( + deterministicRandom()->randomUInt64(), AssignEmptyRange::False, newDataMoveType()); TraceEvent(SevInfo, "NewDataMoveWithRandomDestID") .detail("DataMoveID", rrs.dataMoveId.toString()) .detail("Range", rrs.keys) @@ -1658,10 +1674,7 @@ ACTOR Future dataDistributionRelocator(DDQueue* self, } else { self->moveCreateNewPhysicalShard++; } - const bool enabled = - deterministicRandom()->random01() < SERVER_KNOBS->DD_PHYSICAL_SHARD_MOVE_PROBABILITY; - rd.dataMoveId = newDataMoveId( - physicalShardIDCandidate, AssignEmptyRange::False, EnablePhysicalShardMove(enabled)); + rd.dataMoveId = newDataMoveId(physicalShardIDCandidate, AssignEmptyRange::False, newDataMoveType()); TraceEvent(SevInfo, "NewDataMoveWithPhysicalShard") .detail("DataMoveID", rd.dataMoveId.toString()) .detail("Reason", rd.reason.toString()) @@ -1905,7 +1918,7 @@ ACTOR Future dataDistributionRelocator(DDQueue* self, .detail("Reason", rd.reason.toString()) .detail("DataMoveReason", static_cast(rd.dmReason)) .detail("DataMoveID", rd.dataMoveId) - .detail("PhysicalShardMove", physicalShardMoveEnabled(rd.dataMoveId)); + .detail("DataMoveType", getDataMoveType(rd.dataMoveId)); if (now() - startTime > 600) { TraceEvent(SevWarnAlways, "RelocateShardTooLong") .detail("Duration", now() - startTime) diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 07b4da077a..dd6b82d2c7 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -679,15 +679,22 @@ rocksdb::WALRecoveryMode getWalRecoveryMode() { rocksdb::ColumnFamilyOptions getCFOptions() { rocksdb::ColumnFamilyOptions options; - options.memtable_max_range_deletions = SERVER_KNOBS->ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS; - options.disable_auto_compactions = SERVER_KNOBS->ROCKSDB_DISABLE_AUTO_COMPACTIONS; - options.level_compaction_dynamic_level_bytes = SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES; - options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES); + + if (SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES) { + options.level_compaction_dynamic_level_bytes = SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES; + options.OptimizeLevelStyleCompaction(SERVER_KNOBS->SHARDED_ROCKSDB_MEMTABLE_BUDGET); + } + options.write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_WRITE_BUFFER_SIZE; + options.max_write_buffer_number = SERVER_KNOBS->SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER; + options.target_file_size_base = SERVER_KNOBS->SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE; + options.target_file_size_multiplier = SERVER_KNOBS->SHARDED_ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER; + if (SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS > 0) { options.periodic_compaction_seconds = SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS; } + options.disable_auto_compactions = SERVER_KNOBS->ROCKSDB_DISABLE_AUTO_COMPACTIONS; options.paranoid_file_checks = SERVER_KNOBS->ROCKSDB_PARANOID_FILE_CHECKS; - + options.memtable_max_range_deletions = SERVER_KNOBS->ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS; options.disable_auto_compactions = SERVER_KNOBS->ROCKSDB_DISABLE_AUTO_COMPACTIONS; if (SERVER_KNOBS->SHARD_SOFT_PENDING_COMPACT_BYTES_LIMIT > 0) { options.soft_pending_compaction_bytes_limit = SERVER_KNOBS->SHARD_SOFT_PENDING_COMPACT_BYTES_LIMIT; @@ -754,19 +761,16 @@ rocksdb::ColumnFamilyOptions getCFOptions() { return options; } -rocksdb::Options getOptions() { - rocksdb::Options options; +rocksdb::DBOptions getOptions() { + rocksdb::DBOptions options; options.avoid_unnecessary_blocking_io = true; options.create_if_missing = true; options.atomic_flush = SERVER_KNOBS->ROCKSDB_ATOMIC_FLUSH; - options.memtable_max_range_deletions = SERVER_KNOBS->ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS; if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) { options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM); } options.wal_recovery_mode = getWalRecoveryMode(); - options.target_file_size_base = SERVER_KNOBS->SHARDED_ROCKSDB_TARGET_FILE_SIZE_BASE; - options.target_file_size_multiplier = SERVER_KNOBS->ROCKSDB_TARGET_FILE_SIZE_MULTIPLIER; options.max_open_files = SERVER_KNOBS->ROCKSDB_MAX_OPEN_FILES; options.delete_obsolete_files_period_micros = SERVER_KNOBS->ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD * 1000000; options.max_total_wal_size = SERVER_KNOBS->ROCKSDB_MAX_TOTAL_WAL_SIZE; @@ -788,8 +792,7 @@ rocksdb::Options getOptions() { options.WAL_ttl_seconds = SERVER_KNOBS->ROCKSDB_WAL_TTL_SECONDS; options.WAL_size_limit_MB = SERVER_KNOBS->ROCKSDB_WAL_SIZE_LIMIT_MB; - options.db_write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_WRITE_BUFFER_SIZE; - options.write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_CF_WRITE_BUFFER_SIZE; + options.db_write_buffer_size = SERVER_KNOBS->SHARDED_ROCKSDB_TOTAL_WRITE_BUFFER_SIZE; options.statistics = rocksdb::CreateDBStatistics(); options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers); options.db_log_dir = g_network->isSimulated() ? "" : SERVER_KNOBS->LOG_DIRECTORY; @@ -802,7 +805,6 @@ rocksdb::Options getOptions() { options.skip_stats_update_on_db_open = SERVER_KNOBS->ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN; options.skip_checking_sst_file_sizes_on_db_open = SERVER_KNOBS->ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN; options.max_manifest_file_size = SERVER_KNOBS->ROCKSDB_MAX_MANIFEST_FILE_SIZE; - options.max_write_buffer_number = SERVER_KNOBS->SHARDED_ROCKSDB_MAX_WRITE_BUFFER_NUMBER; return options; } @@ -1191,7 +1193,7 @@ class ShardManager { public: ShardManager(std::string path, UID logId, - const rocksdb::Options& options, + const rocksdb::DBOptions& options, std::shared_ptr errorListener, std::shared_ptr eventListener, Counters* cc) @@ -1859,7 +1861,7 @@ public: logRocksDBError(s, "Close"); return; } - s = rocksdb::DestroyDB(path, dbOptions); + s = rocksdb::DestroyDB(path, rocksdb::Options(dbOptions, getCFOptions())); if (!s.ok()) { logRocksDBError(s, "DestroyDB"); } @@ -1946,7 +1948,7 @@ public: private: const std::string path; const UID logId; - rocksdb::Options dbOptions; + rocksdb::DBOptions dbOptions; rocksdb::ColumnFamilyOptions cfOptions; rocksdb::DB* db = nullptr; std::unordered_map> physicalShards; @@ -4108,7 +4110,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } std::shared_ptr rState; - rocksdb::Options dbOptions; + rocksdb::DBOptions dbOptions; std::shared_ptr errorListener; std::shared_ptr eventListener; ShardManager shardManager; diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 6b4750ea2f..058cd6aab6 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -51,6 +51,13 @@ struct Shard { UID id; }; +bool shouldCreateCheckpoint(const UID& dataMoveId) { + bool assigned, emptyRange; + DataMoveType type; + decodeDataMoveId(dataMoveId, assigned, emptyRange, type); + return type == DataMoveType::PHYSICAL || type == DataMoveType::PHYSICAL_EXP; +} + // Unassigns keyrange `range` from server `ssId`, except ranges in `shards`. // Note: krmSetRangeCoalescing() doesn't work in this case since each shard is assigned an ID. ACTOR Future unassignServerKeys(Transaction* tr, UID ssId, KeyRange range, std::vector shards, UID logId) { @@ -164,7 +171,7 @@ ACTOR Future unassignServerKeys(Transaction* tr, UID ssId, KeyRange range, } ACTOR Future deleteCheckpoints(Transaction* tr, std::set checkpointIds, UID dataMoveId) { - if (!physicalShardMoveEnabled(dataMoveId)) { + if (!shouldCreateCheckpoint(dataMoveId)) { return Void(); } TraceEvent(SevDebug, "DataMoveDeleteCheckpoints", dataMoveId).detail("Checkpoints", describe(checkpointIds)); @@ -408,8 +415,8 @@ ACTOR Future validateRangeAssignment(Database occ, for (int i = 0; i < readResult.size() - 1; i++) { UID shardId; bool assigned, emptyRange; - EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False; - decodeServerKeysValue(readResult[i].value, assigned, emptyRange, enablePSM, shardId); + DataMoveType dataMoveType = DataMoveType::LOGICAL; + decodeServerKeysValue(readResult[i].value, assigned, emptyRange, dataMoveType, shardId); if (!assigned) { TraceEvent(SevError, "ValidateRangeAssignmentCorruptionDetected") .detail("DataMoveID", dataMoveId) @@ -418,7 +425,7 @@ ACTOR Future validateRangeAssignment(Database occ, .detail("ErrorMessage", "KeyServers has range but ServerKeys does not have") .detail("CurrentEmptyRange", emptyRange) .detail("CurrentAssignment", assigned) - .detail("EnablePSM", enablePSM) + .detail("DataMoveType", static_cast(dataMoveType)) .detail("ServerID", ssid) .detail("ShardID", shardId); allCorrect = false; @@ -1797,7 +1804,7 @@ ACTOR static Future startMoveShards(Database occ, dataMove.src.insert(src.begin(), src.end()); - if (physicalShardMoveEnabled(dataMoveId)) { + if (shouldCreateCheckpoint(dataMoveId)) { const UID checkpointId = UID(deterministicRandom()->randomUInt64(), srcId.first()); CheckpointMetaData checkpoint(std::vector{ rangeIntersectKeys }, DataMoveRocksCF, @@ -2519,8 +2526,8 @@ ACTOR Future canRemoveStorageServer(Reference t // than one result UID shardId; bool assigned, emptyRange; - EnablePhysicalShardMove enablePSM = EnablePhysicalShardMove::False; - decodeServerKeysValue(keys[0].value, assigned, emptyRange, enablePSM, shardId); + DataMoveType dataMoveType = DataMoveType::LOGICAL; + decodeServerKeysValue(keys[0].value, assigned, emptyRange, dataMoveType, shardId); TraceEvent(SevVerbose, "CanRemoveStorageServer") .detail("ServerID", serverID) .detail("Key1", keys[0].key) @@ -2757,8 +2764,8 @@ ACTOR Future removeKeysFromFailedServer(Database cx, } } - const UID shardId = - newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::True); + const UID shardId = newDataMoveId( + deterministicRandom()->randomUInt64(), AssignEmptyRange::True, DataMoveType::LOGICAL); // Assign the shard to teamForDroppedRange in keyServer space. if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index bf2b1fa4c5..9d2003dfec 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -2221,8 +2221,7 @@ void SimulationConfig::setProcessesPerMachine(const TestConfig& testConfig) { void SimulationConfig::setTss(const TestConfig& testConfig) { int tssCount = 0; // TODO: Support TSS in SHARD_ENCODE_LOCATION_METADATA mode. - if (!testConfig.simpleConfig && !testConfig.disableTss && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && - deterministicRandom()->random01() < 0.25) { + if (!testConfig.simpleConfig && !testConfig.disableTss && deterministicRandom()->random01() < 0.25) { // 1 or 2 tss tssCount = deterministicRandom()->randomInt(1, 3); } diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 0c5d3e8c70..2c77982db6 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1622,7 +1622,7 @@ void commitMessages(TLogData* self, Reference logData, Version version, Version poppedVersion(Reference self, Tag tag) { auto tagData = self->getTagData(tag); if (!tagData) { - if (tag == txsTag || tag.locality == tagLocalityTxs) { + if (tag == txsTag || tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter) { return 0; } return std::max(self->recoveredAt + 1, self->recoveryTxnVersion); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index da11452822..ece8ecc918 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -8936,27 +8936,12 @@ void AddingShard::addMutation(Version version, ASSERT(false); } -ACTOR Future updateMoveInShardMetaData(StorageServer* data, MoveInShard* shard) { - state double startTime = now(); - if (g_network->isSimulated()) { - Optional pm = wait(data->storage.readValue(persistMoveInShardKey(shard->id()))); - if (!pm.present()) { - TraceEvent(SevError, "UpdatedMoveInShardMetaDataNotFound", data->thisServerID) - .detail("Shard", shard->toString()) - .detail("ShardKey", persistMoveInShardKey(shard->id())) - .detail("DurableVersion", data->durableVersion.get()); - } - } - +void updateMoveInShardMetaData(StorageServer* data, MoveInShard* shard) { data->storage.writeKeyValue(KeyValueRef(persistMoveInShardKey(shard->id()), moveInShardValue(*shard->meta))); - wait(data->durableVersion.whenAtLeast(data->storageVersion() + 1)); TraceEvent(shard->logSev, "UpdatedMoveInShardMetaData", data->thisServerID) .detail("Shard", shard->toString()) .detail("ShardKey", persistMoveInShardKey(shard->id())) - .detail("DurableVersion", data->durableVersion.get()) - .detail("DurationSecs", now() - startTime); - - return Void(); + .detail("DurableVersion", data->durableVersion.get()); } void changeServerKeysWithPhysicalShards(StorageServer* data, @@ -9091,7 +9076,7 @@ ACTOR Future fetchShardCheckpoint(StorageServer* data, MoveInShard* moveIn moveInShard->meta->checkpoints = std::move(localRecords); moveInShard->setPhase(MoveInPhase::Ingesting); - wait(updateMoveInShardMetaData(data, moveInShard)); + updateMoveInShardMetaData(data, moveInShard); TraceEvent(SevInfo, "FetchShardCheckpointsEnd", data->thisServerID).detail("MoveInShard", moveInShard->toString()); return Void(); @@ -9114,7 +9099,7 @@ ACTOR Future fetchShardIngestCheckpoint(StorageServer* data, MoveInShard* .detail("Checkpoints", describe(moveInShard->checkpoints())); if (e.code() == error_code_failed_to_restore_checkpoint && !moveInShard->failed()) { moveInShard->setPhase(MoveInPhase::Fetching); - wait(updateMoveInShardMetaData(data, moveInShard)); + updateMoveInShardMetaData(data, moveInShard); return Void(); } throw err; @@ -9160,7 +9145,7 @@ ACTOR Future fetchShardIngestCheckpoint(StorageServer* data, MoveInShard* } moveInShard->setPhase(MoveInPhase::ApplyingUpdates); - wait(updateMoveInShardMetaData(data, moveInShard)); + updateMoveInShardMetaData(data, moveInShard); moveInShard->fetchComplete.send(Void()); @@ -9281,7 +9266,7 @@ ACTOR Future fetchShardApplyUpdates(StorageServer* data, double duration = now() - startTime; const int64_t totalBytes = getTotalFetchedBytes(moveInShard->checkpoints()); - TraceEvent(SevInfo, "IngestShardStats", data->thisServerID) + TraceEvent(moveInShard->logSev, "FetchShardApplyUpdatesStats", data->thisServerID) .detail("MoveInShard", moveInShard->toString()) .detail("Duration", duration) .detail("TotalBytes", totalBytes) @@ -10534,7 +10519,10 @@ private: // We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same // keys startKey = m.param1; - decodeServerKeysValue(m.param2, nowAssigned, emptyRange, enablePSM, dataMoveId); + DataMoveType dataMoveType = DataMoveType::LOGICAL; + decodeServerKeysValue(m.param2, nowAssigned, emptyRange, dataMoveType, dataMoveId); + enablePSM = EnablePhysicalShardMove(dataMoveType == DataMoveType::PHYSICAL || + (dataMoveType == DataMoveType::PHYSICAL_EXP && data->isTss())); processedStartKey = true; } else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) { // lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version) diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 4eb1df393f..3275892486 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -2032,6 +2032,7 @@ ACTOR Future runTests(Reference includes; state int teamSize = 1; state KeyRangeRef currentRange = KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr); - wait(store(teamA, - self->moveShard(self, - cx, - newDataMoveId(deterministicRandom()->randomUInt64(), - AssignEmptyRange::False, - EnablePhysicalShardMove::True), - currentRange, - teamSize, - includes, - excludes))); + wait(store( + teamA, + self->moveShard( + self, + cx, + newDataMoveId(deterministicRandom()->randomUInt64(), AssignEmptyRange::False, DataMoveType::PHYSICAL), + currentRange, + teamSize, + includes, + excludes))); TraceEvent(SevDebug, "TestMovedRange1").detail("Range", currentRange).detail("Team", describe(teamA)); excludes.insert(teamA.begin(), teamA.end()); @@ -119,7 +119,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload { wait(store(teamA, self->moveShard(self, cx, - newDataMoveId(sh0, AssignEmptyRange::False, EnablePhysicalShardMove::True), + newDataMoveId(sh0, AssignEmptyRange::False, DataMoveType::PHYSICAL), currentRange, teamSize, includes,