From 7ce53ca164d7c08aa07219c5a344f61653be6130 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Thu, 5 May 2022 23:53:51 -0700 Subject: [PATCH 1/8] add SkewReadWriteWorkload --- fdbclient/SystemData.cpp | 17 +- fdbclient/SystemData.h | 2 + fdbserver/CMakeLists.txt | 733 +++++++++-------- fdbserver/tester.actor.cpp | 22 +- fdbserver/workloads/ReadWrite.actor.cpp | 71 +- fdbserver/workloads/SkewedReadWrite.actor.cpp | 778 ++++++++++++++++++ fdbserver/workloads/workloads.actor.h | 2 + 7 files changed, 1223 insertions(+), 402 deletions(-) create mode 100644 fdbserver/workloads/SkewedReadWrite.actor.cpp diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index af9ba32a31..a84eae4f77 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -302,7 +302,8 @@ std::pair>, std::vector> server_id; return server_id; } + +std::pair serverKeysDecodeServerBegin(const KeyRef& key) { + UID server_id; + BinaryReader rd(key.removePrefix(serverKeysPrefix), Unversioned()); + rd >> server_id; + rd.readBytes(1); // skip "/" + std::string bytes; + while (!rd.empty()) { + bytes.push_back((char)*rd.arenaRead(1)); + } + // std::cout << bytes.size() << " " <& serve // Using the serverID as a prefix, then followed by the beginning of the shard range // as the key, the value indicates whether the shard does or does not exist on the server. // These values can be changed as data movement occurs. +extern const KeyRangeRef serverKeysRange; extern const KeyRef serverKeysPrefix; extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse; const Key serverKeysKey(UID serverID, const KeyRef& keys); const Key serverKeysPrefixFor(UID serverID); UID serverKeysDecodeServer(const KeyRef& key); +std::pair serverKeysDecodeServerBegin(const KeyRef& key); bool serverHasKey(ValueRef storedValue); extern const KeyRangeRef conflictingKeysRange; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 01b3cd343e..6c6b991301 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -1,388 +1,389 @@ set(FDBSERVER_SRCS - ApplyMetadataMutation.cpp - ApplyMetadataMutation.h - BackupInterface.h - BackupProgress.actor.cpp - BackupProgress.actor.h - BackupWorker.actor.cpp - BlobGranuleServerCommon.actor.cpp - BlobGranuleServerCommon.actor.h - BlobGranuleValidation.actor.cpp - BlobGranuleValidation.actor.h - BlobManager.actor.cpp - BlobManagerInterface.h - BlobWorker.actor.cpp - ClusterController.actor.cpp - ClusterController.actor.h - ClusterRecovery.actor.cpp - ClusterRecovery.actor.h - CommitProxyServer.actor.cpp - ConfigBroadcaster.actor.cpp - ConfigBroadcaster.h - ConfigDatabaseUnitTests.actor.cpp - ConfigFollowerInterface.cpp - ConfigFollowerInterface.h - ConfigNode.actor.cpp - ConfigNode.h - ConflictSet.h - CoordinatedState.actor.cpp - CoordinatedState.h - Coordination.actor.cpp - CoordinationInterface.h - CoroFlow.h - DataDistribution.actor.cpp - DataDistribution.actor.h - DataDistributionQueue.actor.cpp - DataDistributionTracker.actor.cpp - DataDistributorInterface.h - DBCoreState.h - DDTeamCollection.actor.cpp - DDTeamCollection.h - DiskQueue.actor.cpp - EncryptKeyProxy.actor.cpp - EncryptKeyProxyInterface.h - FDBExecHelper.actor.cpp - FDBExecHelper.actor.h - fdbserver.actor.cpp - GrvProxyServer.actor.cpp - IConfigConsumer.cpp - IConfigConsumer.h - IDiskQueue.h - IKeyValueContainer.h - IKeyValueStore.h - IPager.h - KeyValueStoreCompressTestData.actor.cpp - KeyValueStoreMemory.actor.cpp - KeyValueStoreRocksDB.actor.cpp - KeyValueStoreSQLite.actor.cpp - KmsConnector.h - KmsConnectorInterface.h - KnobProtectiveGroups.cpp - KnobProtectiveGroups.h - Knobs.h - LatencyBandConfig.cpp - LatencyBandConfig.h - LeaderElection.actor.cpp - LeaderElection.h - LocalConfiguration.actor.cpp - LocalConfiguration.h - LogProtocolMessage.h - LogRouter.actor.cpp - LogSystem.cpp - LogSystem.h - LogSystemConfig.cpp - LogSystemConfig.h - LogSystemDiskQueueAdapter.actor.cpp - LogSystemDiskQueueAdapter.h - LogSystemPeekCursor.actor.cpp - MasterInterface.h - masterserver.actor.cpp - MetricLogger.actor.cpp - MetricLogger.actor.h - MoveKeys.actor.cpp - MoveKeys.actor.h - MutationTracking.cpp - MutationTracking.h - networktest.actor.cpp - NetworkTest.h - OldTLogServer_4_6.actor.cpp - OldTLogServer_6_0.actor.cpp - OldTLogServer_6_2.actor.cpp - OnDemandStore.actor.cpp - OnDemandStore.h - PaxosConfigConsumer.actor.cpp - PaxosConfigConsumer.h - ProxyCommitData.actor.h - pubsub.actor.cpp - pubsub.h - QuietDatabase.actor.cpp - QuietDatabase.h - RadixTree.h - Ratekeeper.actor.cpp - Ratekeeper.h - RatekeeperInterface.h - RecoveryState.h - RemoteIKeyValueStore.actor.h - RemoteIKeyValueStore.actor.cpp - ResolutionBalancer.actor.cpp - ResolutionBalancer.actor.h - Resolver.actor.cpp - ResolverInterface.h - RestoreApplier.actor.cpp - RestoreApplier.actor.h - RestoreCommon.actor.cpp - RestoreCommon.actor.h - RestoreController.actor.cpp - RestoreController.actor.h - RestoreLoader.actor.cpp - RestoreLoader.actor.h - RestoreRoleCommon.actor.cpp - RestoreRoleCommon.actor.h - RestoreUtil.actor.cpp - RestoreUtil.h - RestoreWorker.actor.cpp - RestoreWorker.actor.h - RestoreWorkerInterface.actor.cpp - RestoreWorkerInterface.actor.h - RkTagThrottleCollection.cpp - RkTagThrottleCollection.h - RocksDBCheckpointUtils.actor.cpp - RocksDBCheckpointUtils.actor.h - RoleLineage.actor.cpp - RoleLineage.actor.h - ServerCheckpoint.actor.cpp - ServerCheckpoint.actor.h - ServerDBInfo.actor.h - ServerDBInfo.h - SigStack.cpp - SimKmsConnector.actor.h - SimKmsConnector.actor.cpp - SimpleConfigConsumer.actor.cpp - SimpleConfigConsumer.h - SimulatedCluster.actor.cpp - SimulatedCluster.h - SkipList.cpp - SpanContextMessage.h - Status.actor.cpp - Status.h - StorageCache.actor.cpp - StorageMetrics.actor.h - StorageMetrics.h - storageserver.actor.cpp - TagPartitionedLogSystem.actor.cpp - TagPartitionedLogSystem.actor.h - TagThrottler.actor.cpp - TagThrottler.h - TCInfo.actor.cpp - TCInfo.h - template_fdb.h - tester.actor.cpp - TesterInterface.actor.h - TLogInterface.h - TLogServer.actor.cpp - TransactionTagCounter.cpp - TransactionTagCounter.h - TSSMappingUtil.actor.cpp - TSSMappingUtil.actor.h - VersionedBTree.actor.cpp - VFSAsync.cpp - VFSAsync.h - WaitFailure.actor.cpp - WaitFailure.h - worker.actor.cpp - WorkerInterface.actor.h - workloads/ApiCorrectness.actor.cpp - workloads/ApiWorkload.actor.cpp - workloads/ApiWorkload.h - workloads/AsyncFile.actor.h - workloads/AsyncFile.cpp - workloads/AsyncFileCorrectness.actor.cpp - workloads/AsyncFileRead.actor.cpp - workloads/AsyncFileWrite.actor.cpp - workloads/AtomicOps.actor.cpp - workloads/AtomicOpsApiCorrectness.actor.cpp - workloads/AtomicRestore.actor.cpp - workloads/AtomicSwitchover.actor.cpp - workloads/BackgroundSelectors.actor.cpp - workloads/BackupAndParallelRestoreCorrectness.actor.cpp - workloads/BackupCorrectness.actor.cpp - workloads/BackupToBlob.actor.cpp - workloads/BackupToDBAbort.actor.cpp - workloads/BackupToDBCorrectness.actor.cpp - workloads/BackupToDBUpgrade.actor.cpp - workloads/BlobGranuleCorrectnessWorkload.actor.cpp - workloads/BlobGranuleVerifier.actor.cpp - workloads/BlobStoreWorkload.h - workloads/BulkLoad.actor.cpp - workloads/BulkSetup.actor.h - workloads/Cache.actor.cpp - workloads/ChangeConfig.actor.cpp - workloads/ChangeFeeds.actor.cpp - workloads/ClearSingleRange.actor.cpp - workloads/ClientTransactionProfileCorrectness.actor.cpp - workloads/ClientWorkload.actor.cpp - workloads/ClogSingleConnection.actor.cpp - workloads/CommitBugCheck.actor.cpp - workloads/ConfigIncrement.actor.cpp - workloads/ConfigureDatabase.actor.cpp - workloads/ConflictRange.actor.cpp - workloads/ConsistencyCheck.actor.cpp - workloads/CpuProfiler.actor.cpp - workloads/Cycle.actor.cpp - workloads/DataDistributionMetrics.actor.cpp - workloads/DataLossRecovery.actor.cpp - workloads/DDBalance.actor.cpp - workloads/DDMetrics.actor.cpp - workloads/DDMetricsExclude.actor.cpp - workloads/DifferentClustersSameRV.actor.cpp - workloads/DiskDurability.actor.cpp - workloads/DiskDurabilityTest.actor.cpp - workloads/DiskFailureInjection.actor.cpp - workloads/DummyWorkload.actor.cpp - workloads/EncryptionOps.actor.cpp - workloads/EncryptKeyProxyTest.actor.cpp - workloads/ExternalWorkload.actor.cpp - workloads/FastTriggeredWatches.actor.cpp - workloads/FileSystem.actor.cpp - workloads/Fuzz.cpp - workloads/FuzzApiCorrectness.actor.cpp - workloads/GetMappedRange.actor.cpp - workloads/GetRangeStream.actor.cpp - workloads/HealthMetricsApi.actor.cpp - workloads/HighContentionPrefixAllocatorWorkload.actor.cpp - workloads/Increment.actor.cpp - workloads/IncrementalBackup.actor.cpp - workloads/IndexScan.actor.cpp - workloads/Inventory.actor.cpp - workloads/KillRegion.actor.cpp - workloads/KVStoreTest.actor.cpp - workloads/LocalRatekeeper.actor.cpp - workloads/LockDatabase.actor.cpp - workloads/LockDatabaseFrequently.actor.cpp - workloads/LogMetrics.actor.cpp - workloads/LowLatency.actor.cpp - workloads/MachineAttrition.actor.cpp - workloads/Mako.actor.cpp - workloads/MemoryKeyValueStore.cpp - workloads/MemoryKeyValueStore.h - workloads/MemoryLifetime.actor.cpp - workloads/MetricLogging.actor.cpp - workloads/MiniCycle.actor.cpp - workloads/MutationLogReaderCorrectness.actor.cpp - workloads/ParallelRestore.actor.cpp - workloads/Performance.actor.cpp - workloads/PhysicalShardMove.actor.cpp - workloads/Ping.actor.cpp - workloads/PopulateTPCC.actor.cpp - workloads/PrivateEndpoints.actor.cpp - workloads/ProtocolVersion.actor.cpp - workloads/PubSubMultiples.actor.cpp - workloads/QueuePush.actor.cpp - workloads/RandomClogging.actor.cpp - workloads/RandomMoveKeys.actor.cpp - workloads/RandomSelector.actor.cpp - workloads/ReadAfterWrite.actor.cpp - workloads/ReadHotDetection.actor.cpp - workloads/ReadWrite.actor.cpp - workloads/RemoveServersSafely.actor.cpp - workloads/ReportConflictingKeys.actor.cpp - workloads/RestoreBackup.actor.cpp - workloads/RestoreFromBlob.actor.cpp - workloads/Rollback.actor.cpp - workloads/RyowCorrectness.actor.cpp - workloads/RYWDisable.actor.cpp - workloads/RYWPerformance.actor.cpp - workloads/SaveAndKill.actor.cpp - workloads/SelectorCorrectness.actor.cpp - workloads/Serializability.actor.cpp - workloads/Sideband.actor.cpp - workloads/SidebandSingle.actor.cpp - workloads/SimpleAtomicAdd.actor.cpp - workloads/SlowTaskWorkload.actor.cpp - workloads/SnapTest.actor.cpp - workloads/SpecialKeySpaceCorrectness.actor.cpp - workloads/StatusWorkload.actor.cpp - workloads/Storefront.actor.cpp - workloads/StreamingRangeRead.actor.cpp - workloads/StreamingRead.actor.cpp - workloads/SubmitBackup.actor.cpp - workloads/SuspendProcesses.actor.cpp - workloads/TagThrottleApi.actor.cpp - workloads/TargetedKill.actor.cpp - workloads/TaskBucketCorrectness.actor.cpp - workloads/TenantManagement.actor.cpp - workloads/ThreadSafety.actor.cpp - workloads/Throttling.actor.cpp - workloads/Throughput.actor.cpp - workloads/TimeKeeperCorrectness.actor.cpp - workloads/TPCC.actor.cpp - workloads/TPCCWorkload.h - workloads/TriggerRecovery.actor.cpp - workloads/UDPWorkload.actor.cpp - workloads/UnitPerf.actor.cpp - workloads/UnitTests.actor.cpp - workloads/Unreadable.actor.cpp - workloads/VersionStamp.actor.cpp - workloads/WatchAndWait.actor.cpp - workloads/Watches.actor.cpp - workloads/WatchesSameKeyCorrectness.actor.cpp - workloads/WorkerErrors.actor.cpp - workloads/workloads.actor.h - workloads/WriteBandwidth.actor.cpp - workloads/WriteDuringRead.actor.cpp - workloads/WriteTagThrottling.actor.cpp -) + ApplyMetadataMutation.cpp + ApplyMetadataMutation.h + BackupInterface.h + BackupProgress.actor.cpp + BackupProgress.actor.h + BackupWorker.actor.cpp + BlobGranuleServerCommon.actor.cpp + BlobGranuleServerCommon.actor.h + BlobGranuleValidation.actor.cpp + BlobGranuleValidation.actor.h + BlobManager.actor.cpp + BlobManagerInterface.h + BlobWorker.actor.cpp + ClusterController.actor.cpp + ClusterController.actor.h + ClusterRecovery.actor.cpp + ClusterRecovery.actor.h + CommitProxyServer.actor.cpp + ConfigBroadcaster.actor.cpp + ConfigBroadcaster.h + ConfigDatabaseUnitTests.actor.cpp + ConfigFollowerInterface.cpp + ConfigFollowerInterface.h + ConfigNode.actor.cpp + ConfigNode.h + ConflictSet.h + CoordinatedState.actor.cpp + CoordinatedState.h + Coordination.actor.cpp + CoordinationInterface.h + CoroFlow.h + DataDistribution.actor.cpp + DataDistribution.actor.h + DataDistributionQueue.actor.cpp + DataDistributionTracker.actor.cpp + DataDistributorInterface.h + DBCoreState.h + DDTeamCollection.actor.cpp + DDTeamCollection.h + DiskQueue.actor.cpp + EncryptKeyProxy.actor.cpp + EncryptKeyProxyInterface.h + FDBExecHelper.actor.cpp + FDBExecHelper.actor.h + fdbserver.actor.cpp + GrvProxyServer.actor.cpp + IConfigConsumer.cpp + IConfigConsumer.h + IDiskQueue.h + IKeyValueContainer.h + IKeyValueStore.h + IPager.h + KeyValueStoreCompressTestData.actor.cpp + KeyValueStoreMemory.actor.cpp + KeyValueStoreRocksDB.actor.cpp + KeyValueStoreSQLite.actor.cpp + KmsConnector.h + KmsConnectorInterface.h + KnobProtectiveGroups.cpp + KnobProtectiveGroups.h + Knobs.h + LatencyBandConfig.cpp + LatencyBandConfig.h + LeaderElection.actor.cpp + LeaderElection.h + LocalConfiguration.actor.cpp + LocalConfiguration.h + LogProtocolMessage.h + LogRouter.actor.cpp + LogSystem.cpp + LogSystem.h + LogSystemConfig.cpp + LogSystemConfig.h + LogSystemDiskQueueAdapter.actor.cpp + LogSystemDiskQueueAdapter.h + LogSystemPeekCursor.actor.cpp + MasterInterface.h + masterserver.actor.cpp + MetricLogger.actor.cpp + MetricLogger.actor.h + MoveKeys.actor.cpp + MoveKeys.actor.h + MutationTracking.cpp + MutationTracking.h + networktest.actor.cpp + NetworkTest.h + OldTLogServer_4_6.actor.cpp + OldTLogServer_6_0.actor.cpp + OldTLogServer_6_2.actor.cpp + OnDemandStore.actor.cpp + OnDemandStore.h + PaxosConfigConsumer.actor.cpp + PaxosConfigConsumer.h + ProxyCommitData.actor.h + pubsub.actor.cpp + pubsub.h + QuietDatabase.actor.cpp + QuietDatabase.h + RadixTree.h + Ratekeeper.actor.cpp + Ratekeeper.h + RatekeeperInterface.h + RecoveryState.h + RemoteIKeyValueStore.actor.h + RemoteIKeyValueStore.actor.cpp + ResolutionBalancer.actor.cpp + ResolutionBalancer.actor.h + Resolver.actor.cpp + ResolverInterface.h + RestoreApplier.actor.cpp + RestoreApplier.actor.h + RestoreCommon.actor.cpp + RestoreCommon.actor.h + RestoreController.actor.cpp + RestoreController.actor.h + RestoreLoader.actor.cpp + RestoreLoader.actor.h + RestoreRoleCommon.actor.cpp + RestoreRoleCommon.actor.h + RestoreUtil.actor.cpp + RestoreUtil.h + RestoreWorker.actor.cpp + RestoreWorker.actor.h + RestoreWorkerInterface.actor.cpp + RestoreWorkerInterface.actor.h + RkTagThrottleCollection.cpp + RkTagThrottleCollection.h + RocksDBCheckpointUtils.actor.cpp + RocksDBCheckpointUtils.actor.h + RoleLineage.actor.cpp + RoleLineage.actor.h + ServerCheckpoint.actor.cpp + ServerCheckpoint.actor.h + ServerDBInfo.actor.h + ServerDBInfo.h + SigStack.cpp + SimKmsConnector.actor.h + SimKmsConnector.actor.cpp + SimpleConfigConsumer.actor.cpp + SimpleConfigConsumer.h + SimulatedCluster.actor.cpp + SimulatedCluster.h + SkipList.cpp + SpanContextMessage.h + Status.actor.cpp + Status.h + StorageCache.actor.cpp + StorageMetrics.actor.h + StorageMetrics.h + storageserver.actor.cpp + TagPartitionedLogSystem.actor.cpp + TagPartitionedLogSystem.actor.h + TagThrottler.actor.cpp + TagThrottler.h + TCInfo.actor.cpp + TCInfo.h + template_fdb.h + tester.actor.cpp + TesterInterface.actor.h + TLogInterface.h + TLogServer.actor.cpp + TransactionTagCounter.cpp + TransactionTagCounter.h + TSSMappingUtil.actor.cpp + TSSMappingUtil.actor.h + VersionedBTree.actor.cpp + VFSAsync.cpp + VFSAsync.h + WaitFailure.actor.cpp + WaitFailure.h + worker.actor.cpp + WorkerInterface.actor.h + workloads/ApiCorrectness.actor.cpp + workloads/ApiWorkload.actor.cpp + workloads/ApiWorkload.h + workloads/AsyncFile.actor.h + workloads/AsyncFile.cpp + workloads/AsyncFileCorrectness.actor.cpp + workloads/AsyncFileRead.actor.cpp + workloads/AsyncFileWrite.actor.cpp + workloads/AtomicOps.actor.cpp + workloads/AtomicOpsApiCorrectness.actor.cpp + workloads/AtomicRestore.actor.cpp + workloads/AtomicSwitchover.actor.cpp + workloads/BackgroundSelectors.actor.cpp + workloads/BackupAndParallelRestoreCorrectness.actor.cpp + workloads/BackupCorrectness.actor.cpp + workloads/BackupToBlob.actor.cpp + workloads/BackupToDBAbort.actor.cpp + workloads/BackupToDBCorrectness.actor.cpp + workloads/BackupToDBUpgrade.actor.cpp + workloads/BlobGranuleCorrectnessWorkload.actor.cpp + workloads/BlobGranuleVerifier.actor.cpp + workloads/BlobStoreWorkload.h + workloads/BulkLoad.actor.cpp + workloads/BulkSetup.actor.h + workloads/Cache.actor.cpp + workloads/ChangeConfig.actor.cpp + workloads/ChangeFeeds.actor.cpp + workloads/ClearSingleRange.actor.cpp + workloads/ClientTransactionProfileCorrectness.actor.cpp + workloads/ClientWorkload.actor.cpp + workloads/ClogSingleConnection.actor.cpp + workloads/CommitBugCheck.actor.cpp + workloads/ConfigIncrement.actor.cpp + workloads/ConfigureDatabase.actor.cpp + workloads/ConflictRange.actor.cpp + workloads/ConsistencyCheck.actor.cpp + workloads/CpuProfiler.actor.cpp + workloads/Cycle.actor.cpp + workloads/DataDistributionMetrics.actor.cpp + workloads/DataLossRecovery.actor.cpp + workloads/DDBalance.actor.cpp + workloads/DDMetrics.actor.cpp + workloads/DDMetricsExclude.actor.cpp + workloads/DifferentClustersSameRV.actor.cpp + workloads/DiskDurability.actor.cpp + workloads/DiskDurabilityTest.actor.cpp + workloads/DiskFailureInjection.actor.cpp + workloads/DummyWorkload.actor.cpp + workloads/EncryptionOps.actor.cpp + workloads/EncryptKeyProxyTest.actor.cpp + workloads/ExternalWorkload.actor.cpp + workloads/FastTriggeredWatches.actor.cpp + workloads/FileSystem.actor.cpp + workloads/Fuzz.cpp + workloads/FuzzApiCorrectness.actor.cpp + workloads/GetMappedRange.actor.cpp + workloads/GetRangeStream.actor.cpp + workloads/HealthMetricsApi.actor.cpp + workloads/HighContentionPrefixAllocatorWorkload.actor.cpp + workloads/Increment.actor.cpp + workloads/IncrementalBackup.actor.cpp + workloads/IndexScan.actor.cpp + workloads/Inventory.actor.cpp + workloads/KillRegion.actor.cpp + workloads/KVStoreTest.actor.cpp + workloads/LocalRatekeeper.actor.cpp + workloads/LockDatabase.actor.cpp + workloads/LockDatabaseFrequently.actor.cpp + workloads/LogMetrics.actor.cpp + workloads/LowLatency.actor.cpp + workloads/MachineAttrition.actor.cpp + workloads/Mako.actor.cpp + workloads/MemoryKeyValueStore.cpp + workloads/MemoryKeyValueStore.h + workloads/MemoryLifetime.actor.cpp + workloads/MetricLogging.actor.cpp + workloads/MiniCycle.actor.cpp + workloads/MutationLogReaderCorrectness.actor.cpp + workloads/ParallelRestore.actor.cpp + workloads/Performance.actor.cpp + workloads/PhysicalShardMove.actor.cpp + workloads/Ping.actor.cpp + workloads/PopulateTPCC.actor.cpp + workloads/PrivateEndpoints.actor.cpp + workloads/ProtocolVersion.actor.cpp + workloads/PubSubMultiples.actor.cpp + workloads/QueuePush.actor.cpp + workloads/RandomClogging.actor.cpp + workloads/RandomMoveKeys.actor.cpp + workloads/RandomSelector.actor.cpp + workloads/ReadAfterWrite.actor.cpp + workloads/ReadHotDetection.actor.cpp + workloads/ReadWrite.actor.cpp + workloads/RemoveServersSafely.actor.cpp + workloads/ReportConflictingKeys.actor.cpp + workloads/RestoreBackup.actor.cpp + workloads/RestoreFromBlob.actor.cpp + workloads/Rollback.actor.cpp + workloads/RyowCorrectness.actor.cpp + workloads/RYWDisable.actor.cpp + workloads/RYWPerformance.actor.cpp + workloads/SaveAndKill.actor.cpp + workloads/SelectorCorrectness.actor.cpp + workloads/Serializability.actor.cpp + workloads/Sideband.actor.cpp + workloads/SidebandSingle.actor.cpp + workloads/SimpleAtomicAdd.actor.cpp + workloads/SkewedReadWrite.actor.cpp + workloads/SlowTaskWorkload.actor.cpp + workloads/SnapTest.actor.cpp + workloads/SpecialKeySpaceCorrectness.actor.cpp + workloads/StatusWorkload.actor.cpp + workloads/Storefront.actor.cpp + workloads/StreamingRangeRead.actor.cpp + workloads/StreamingRead.actor.cpp + workloads/SubmitBackup.actor.cpp + workloads/SuspendProcesses.actor.cpp + workloads/TagThrottleApi.actor.cpp + workloads/TargetedKill.actor.cpp + workloads/TaskBucketCorrectness.actor.cpp + workloads/TenantManagement.actor.cpp + workloads/ThreadSafety.actor.cpp + workloads/Throttling.actor.cpp + workloads/Throughput.actor.cpp + workloads/TimeKeeperCorrectness.actor.cpp + workloads/TPCC.actor.cpp + workloads/TPCCWorkload.h + workloads/TriggerRecovery.actor.cpp + workloads/UDPWorkload.actor.cpp + workloads/UnitPerf.actor.cpp + workloads/UnitTests.actor.cpp + workloads/Unreadable.actor.cpp + workloads/VersionStamp.actor.cpp + workloads/WatchAndWait.actor.cpp + workloads/Watches.actor.cpp + workloads/WatchesSameKeyCorrectness.actor.cpp + workloads/WorkerErrors.actor.cpp + workloads/workloads.actor.h + workloads/WriteBandwidth.actor.cpp + workloads/WriteDuringRead.actor.cpp + workloads/WriteTagThrottling.actor.cpp + ) -if(${COROUTINE_IMPL} STREQUAL libcoro) - list(APPEND FDBSERVER_SRCS CoroFlowCoro.actor.cpp) -else() - list(APPEND FDBSERVER_SRCS CoroFlow.actor.cpp) -endif() +if (${COROUTINE_IMPL} STREQUAL libcoro) + list(APPEND FDBSERVER_SRCS CoroFlowCoro.actor.cpp) +else () + list(APPEND FDBSERVER_SRCS CoroFlow.actor.cpp) +endif () add_library(fdb_sqlite STATIC - sqlite/btree.h - sqlite/hash.h - sqlite/sqlite3.h - sqlite/sqlite3ext.h - sqlite/sqliteInt.h - sqlite/sqliteLimit.h - sqlite/sqlite3.amalgamation.c) + sqlite/btree.h + sqlite/hash.h + sqlite/sqlite3.h + sqlite/sqlite3ext.h + sqlite/sqliteInt.h + sqlite/sqliteLimit.h + sqlite/sqlite3.amalgamation.c) if (WITH_ROCKSDB_EXPERIMENTAL) - add_definitions(-DSSD_ROCKSDB_EXPERIMENTAL) + add_definitions(-DSSD_ROCKSDB_EXPERIMENTAL) - include(CompileRocksDB) - # CompileRocksDB sets `lz4_LIBRARIES` to be the shared lib, we want to link - # statically, so find the static library here. - find_library(lz4_STATIC_LIBRARIES - NAMES liblz4.a REQUIRED) - if (WITH_LIBURING) - find_package(uring) - endif() -endif() + include(CompileRocksDB) + # CompileRocksDB sets `lz4_LIBRARIES` to be the shared lib, we want to link + # statically, so find the static library here. + find_library(lz4_STATIC_LIBRARIES + NAMES liblz4.a REQUIRED) + if (WITH_LIBURING) + find_package(uring) + endif () +endif () # Suppress warnings in sqlite since it's third party -if(NOT WIN32) - target_compile_definitions(fdb_sqlite PRIVATE $<$:NDEBUG>) - target_compile_options(fdb_sqlite BEFORE PRIVATE -w) # disable warnings for third party -endif() +if (NOT WIN32) + target_compile_definitions(fdb_sqlite PRIVATE $<$:NDEBUG>) + target_compile_options(fdb_sqlite BEFORE PRIVATE -w) # disable warnings for third party +endif () file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/workloads) add_flow_target(EXECUTABLE NAME fdbserver SRCS ${FDBSERVER_SRCS}) target_include_directories(fdbserver PRIVATE - ${CMAKE_SOURCE_DIR}/bindings/c - ${CMAKE_BINARY_DIR}/bindings/c - ${CMAKE_CURRENT_BINARY_DIR}/workloads - ${CMAKE_CURRENT_SOURCE_DIR}/workloads) + ${CMAKE_SOURCE_DIR}/bindings/c + ${CMAKE_BINARY_DIR}/bindings/c + ${CMAKE_CURRENT_BINARY_DIR}/workloads + ${CMAKE_CURRENT_SOURCE_DIR}/workloads) if (WITH_ROCKSDB_EXPERIMENTAL) - add_dependencies(fdbserver rocksdb) - if(WITH_LIBURING) - target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR} ${uring_INCLUDE_DIR}) - target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${uring_LIBRARIES} ${lz4_STATIC_LIBRARIES}) - target_compile_definitions(fdbserver PRIVATE BOOST_ASIO_HAS_IO_URING=1 BOOST_ASIO_DISABLE_EPOLL=1) - else() - target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR}) - target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${lz4_STATIC_LIBRARIES}) - target_compile_definitions(fdbserver PRIVATE) - endif() -else() - target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite) -endif() + add_dependencies(fdbserver rocksdb) + if (WITH_LIBURING) + target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR} ${uring_INCLUDE_DIR}) + target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${uring_LIBRARIES} ${lz4_STATIC_LIBRARIES}) + target_compile_definitions(fdbserver PRIVATE BOOST_ASIO_HAS_IO_URING=1 BOOST_ASIO_DISABLE_EPOLL=1) + else () + target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR}) + target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${lz4_STATIC_LIBRARIES}) + target_compile_definitions(fdbserver PRIVATE) + endif () +else () + target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite) +endif () target_link_libraries(fdbserver PRIVATE toml11_target jemalloc) # target_compile_definitions(fdbserver PRIVATE -DENABLE_SAMPLING) if (GPERFTOOLS_FOUND) - target_link_libraries(fdbserver PRIVATE gperftools) -endif() + target_link_libraries(fdbserver PRIVATE gperftools) +endif () -if(NOT OPEN_FOR_IDE) - if(GENERATE_DEBUG_PACKAGES) - fdb_install(TARGETS fdbserver DESTINATION sbin COMPONENT server) - else() - add_custom_target(prepare_fdbserver_install ALL DEPENDS strip_only_fdbserver) - fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbserver DESTINATION sbin COMPONENT server) - endif() -endif() +if (NOT OPEN_FOR_IDE) + if (GENERATE_DEBUG_PACKAGES) + fdb_install(TARGETS fdbserver DESTINATION sbin COMPONENT server) + else () + add_custom_target(prepare_fdbserver_install ALL DEPENDS strip_only_fdbserver) + fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbserver DESTINATION sbin COMPONENT server) + endif () +endif () diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 64b9924560..d03ea133cd 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -99,6 +99,20 @@ Key KVWorkload::keyForIndex(uint64_t index) const { } } +int64_t KVWorkload::indexForKey(const KeyRef& key, bool absent) const { + int idx = 0; + if (nodePrefix > 0) { + ASSERT(keyBytes >= 32); + idx += 16; + } + ASSERT(keyBytes >= 16); + // extract int64_t index, the reverse process of emplaceIndex() + auto end = key.size() - idx - (absent ? 1 : 0); + std::string str((char*)key.begin() + idx, end); + int64_t res = std::stoll(str, nullptr, 16); + return res; +} + Key KVWorkload::keyForIndex(uint64_t index, bool absent) const { int adjustedKeyBytes = (absent) ? (keyBytes + 1) : keyBytes; Key result = makeString(adjustedKeyBytes); @@ -112,8 +126,8 @@ Key KVWorkload::keyForIndex(uint64_t index, bool absent) const { idx += 16; } ASSERT(keyBytes >= 16); - double d = double(index) / nodeCount; - emplaceIndex(data, idx, *(int64_t*)&d); + emplaceIndex(data, idx, (int64_t)index); + // ASSERT(indexForKey(result) == (int64_t)index); // debug assert return result; } @@ -1855,7 +1869,9 @@ ACTOR Future runTests(Reference connRecord, } choose { - when(wait(tests)) { return Void(); } + when(wait(tests)) { + return Void(); + } when(wait(quorum(actors, 1))) { ASSERT(false); throw internal_error(); diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp index 4741a01371..1a5c5e3e56 100644 --- a/fdbserver/workloads/ReadWrite.actor.cpp +++ b/fdbserver/workloads/ReadWrite.actor.cpp @@ -76,57 +76,64 @@ DESCR struct ReadMetric { }; struct ReadWriteWorkload : KVWorkload { + // general test setting + Standalone descriptionString; + bool doSetup, cancelWorkersAtDuration; + double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime; + double metricsStart, metricsDuration; + std::vector insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup + + // test log setting + bool enableReadLatencyLogging; + double periodicLoggingInterval; + + // use ReadWrite as a ramp up workload + bool rampUpLoad; // indicate this is a ramp up workload + int rampSweepCount; // how many times of ramp up + bool rampTransactionType; // choose transaction type based on client start time + bool rampUpConcurrency; // control client concurrency + + // transaction setting + bool useRYW; + bool batchPriority; + bool rangeReads; // read operations are all single key range read + bool dependentReads; // read operations are issued sequentially + bool inconsistentReads; // read with previous read version + bool adjacentReads; // keys are adjacent within a transaction + bool adjacentWrites; + double alpha; // probability for run TransactionA type + // two type of transaction int readsPerTransactionA, writesPerTransactionA; int readsPerTransactionB, writesPerTransactionB; int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction; - double testDuration, transactionsPerSecond, alpha, warmingDelay, loadTime, maxInsertRate, debugInterval, debugTime; - double metricsStart, metricsDuration, clientBegin; std::string valueString; + // hot traffic pattern + double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting - bool dependentReads; - bool enableReadLatencyLogging; - double periodicLoggingInterval; - bool cancelWorkersAtDuration; - bool inconsistentReads; - bool adjacentReads; - bool adjacentWrites; - bool rampUpLoad; - int rampSweepCount; - double hotKeyFraction, forceHotProbability; - bool rangeReads; - bool useRYW; - bool rampTransactionType; - bool rampUpConcurrency; - bool batchPriority; - - Standalone descriptionString; - + // states of metric Int64MetricHandle totalReadsMetric; Int64MetricHandle totalRetriesMetric; EventMetricHandle transactionSuccessMetric; EventMetricHandle transactionFailureMetric; EventMetricHandle readMetric; - - std::vector> clients; PerfIntCounter aTransactions, bTransactions, retries; ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies; double readLatencyTotal; int readLatencyCount; - - std::vector insertionCountsToMeasure; - std::vector> ratesAtKeyCounts; - std::vector periodicMetrics; + std::vector> ratesAtKeyCounts; // sequential insertion speed - bool doSetup; + // other internal states + std::vector> clients; + double loadTime, clientBegin; ReadWriteWorkload(WorkloadContext const& wcx) - : KVWorkload(wcx), loadTime(0.0), clientBegin(0), dependentReads(false), adjacentReads(false), - adjacentWrites(false), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), + : KVWorkload(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"), - bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), - commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), - readLatencyCount(0) { + bTransactions("B Transactions"), retries("Retries"), + latencies(sampleSize), readLatencies(sampleSize), commitLatencies(sampleSize), GRVLatencies(sampleSize), + fullReadLatencies(sampleSize), readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), + clientBegin(0) { transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction")); transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction")); readMetric.init(LiteralStringRef("RWWorkload.Read")); diff --git a/fdbserver/workloads/SkewedReadWrite.actor.cpp b/fdbserver/workloads/SkewedReadWrite.actor.cpp new file mode 100644 index 0000000000..6ea9d9b8aa --- /dev/null +++ b/fdbserver/workloads/SkewedReadWrite.actor.cpp @@ -0,0 +1,778 @@ +/* + * ReadWrite.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "fdbrpc/ContinuousSample.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbserver/TesterInterface.actor.h" +#include "fdbserver/WorkerInterface.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/workloads/BulkSetup.actor.h" +#include "fdbclient/ReadYourWrites.h" +#include "flow/TDMetric.actor.h" +#include "fdbclient/RunTransaction.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +const int sampleSize = 10000; +DESCR struct TransactionSuccessMetric { + int64_t totalLatency; // ns + int64_t startLatency; // ns + int64_t commitLatency; // ns + int64_t retries; // count +}; + +DESCR struct TransactionFailureMetric { + int64_t startLatency; // ns + int64_t errorCode; // flow error code +}; + +DESCR struct ReadMetric { + int64_t readLatency; // ns +}; + +struct SkewedReadWriteWorkload : KVWorkload { + // general test setting + Standalone descriptionString; + bool doSetup, cancelWorkersAtDuration; + double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime; + double metricsStart, metricsDuration; + std::vector insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup + + // test log setting + bool enableReadLatencyLogging; + double periodicLoggingInterval; + + // transaction setting + bool useRYW; + double alpha; // probability for run TransactionA type + // two type of transaction + int readsPerTransactionA, writesPerTransactionA; + int readsPerTransactionB, writesPerTransactionB; + std::string valueString; + + // server based hot traffic setting + int skewRound = 0; // skewDuration = ceil(testDuration / skewRound) + double hotServerFraction = 0, hotServerShardFraction = 1.0; // set > 0 to issue hot key based on shard map + double hotServerReadFrac, hotServerWriteFrac; // hot many traffic goes to hot servers + double hotReadWriteServerOverlap; // the portion of intersection of write and hot server + + // hot server state + typedef std::vector> IndexRangeVec; + // keyForIndex generate key from index. So for a shard range, recording the start and end is enough + std::vector> serverShards; // storage server and the shards it owns + std::map serverInterfaces; + int hotServerCount = 0, currentHotRound = -1; + + // states of metric + Int64MetricHandle totalReadsMetric; + Int64MetricHandle totalRetriesMetric; + EventMetricHandle transactionSuccessMetric; + EventMetricHandle transactionFailureMetric; + EventMetricHandle readMetric; + PerfIntCounter aTransactions, bTransactions, retries; + ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies; + double readLatencyTotal; + int readLatencyCount; + std::vector periodicMetrics; + std::vector> ratesAtKeyCounts; // sequential insertion speed + + // other internal states + std::vector> clients; + double loadTime, clientBegin; + + SkewedReadWriteWorkload(WorkloadContext const& wcx) + : KVWorkload(wcx), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), + totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"), + bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), + commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), + readLatencyCount(0), loadTime(0.0), clientBegin(0) { + + transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction")); + transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction")); + readMetric.init(LiteralStringRef("RWWorkload.Read")); + + testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); + transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount; + double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250); + actorCount = ceil(transactionsPerSecond * allowedLatency); + actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount); + + readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10); + writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0); + readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1); + writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9); + alpha = getOption(options, LiteralStringRef("alpha"), 0.1); + + valueString = std::string(maxValueBytes, '.'); + if (nodePrefix > 0) { + keyBytes += 16; + } + + metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0); + metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration); + if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) { + // discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test + metricsStart += testDuration * 0.125; + metricsDuration *= 0.75; + } + + warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0); + maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12); + debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0); + debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0); + enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false); + periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0); + cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true); + useRYW = getOption(options, LiteralStringRef("useRYW"), false); + doSetup = getOption(options, LiteralStringRef("setup"), true); + descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("SkewedReadWrite")); + + // Validate that keyForIndex() is monotonic + for (int i = 0; i < 30; i++) { + int64_t a = deterministicRandom()->randomInt64(0, nodeCount); + int64_t b = deterministicRandom()->randomInt64(0, nodeCount); + if (a > b) { + std::swap(a, b); + } + ASSERT(a <= b); + ASSERT((keyForIndex(a, false) <= keyForIndex(b, false))); + } + + std::vector insertionCountsToMeasureString = + getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector()); + for (int i = 0; i < insertionCountsToMeasureString.size(); i++) { + try { + uint64_t count = boost::lexical_cast(insertionCountsToMeasureString[i]); + insertionCountsToMeasure.push_back(count); + } catch (...) { + } + } + + { + hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2); + hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0); + hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0); + skewRound = getOption(options, "skewRound"_sr, 1); + hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8); + hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0); + ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) && + skewRound > 0); + } + } + + std::string description() const override { return descriptionString.toString(); } + Future setup(Database const& cx) override { return _setup(cx, this); } + Future start(Database const& cx) override { return _start(cx, this); } + + ACTOR static Future traceDumpWorkers(Reference const> db) { + try { + loop { + choose { + when(wait(db->onChange())) {} + + when(ErrorOr> workerList = + wait(db->get().clusterInterface.getWorkers.tryGetReply(GetWorkersRequest()))) { + if (workerList.present()) { + std::vector>> dumpRequests; + dumpRequests.reserve(workerList.get().size()); + for (int i = 0; i < workerList.get().size(); i++) + dumpRequests.push_back(workerList.get()[i].interf.traceBatchDumpRequest.tryGetReply( + TraceBatchDumpRequest())); + wait(waitForAll(dumpRequests)); + return true; + } + wait(delay(1.0)); + } + } + } + } catch (Error& e) { + TraceEvent(SevError, "FailedToDumpWorkers").error(e); + throw; + } + } + + Future check(Database const& cx) override { + clients.clear(); + + if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration) + metricsDuration = now() - metricsStart; + + g_traceBatch.dump(); + if (clientId == 0) + return traceDumpWorkers(dbInfo); + else + return true; + } + + void getMetrics(std::vector& m) override { + double duration = metricsDuration; + int reads = + (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB); + int writes = + (aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB); + m.emplace_back("Measured Duration", duration, Averaged::True); + m.emplace_back( + "Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False); + m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False); + m.push_back(aTransactions.getMetric()); + m.push_back(bTransactions.getMetric()); + m.push_back(retries.getMetric()); + m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True); + m.emplace_back("Read rows", reads, Averaged::False); + m.emplace_back("Write rows", writes, Averaged::False); + m.emplace_back("Read rows/sec", reads / duration, Averaged::False); + m.emplace_back("Write rows/sec", writes / duration, Averaged::False); + m.emplace_back( + "Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); + m.emplace_back("Bytes written/sec", + (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, + Averaged::False); + m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end()); + + std::vector>::iterator ratesItr = ratesAtKeyCounts.begin(); + for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) + m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False); + } + + Value randomValue() { + return StringRef((uint8_t*)valueString.c_str(), + deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1)); + } + + Standalone operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); } + + void debugPrintServerShards() const { + std::cout << std::hex; + for (auto it : this->serverShards) { + std::cout << serverInterfaces.at(it.first).address().toString() << ": ["; + for (auto p : it.second) { + std::cout << "[" << p.first << "," << p.second << "], "; + } + std::cout << "] \n"; + } + } + + // for each boundary except the last one in boundaries, found the first existed key generated from keyForIndex as + // beginIdx, found the last existed key generated from keyForIndex the endIdx. + ACTOR static Future convertKeyBoundaryToIndexShard(Database cx, + SkewedReadWriteWorkload* self, + Standalone> boundaries) { + state IndexRangeVec res; + state int i = 0; + for (; i < boundaries.size() - 1; ++i) { + KeyRangeRef currentShard = KeyRangeRef(boundaries[i], boundaries[i + 1]); + // std::cout << currentShard.toString() << "\n"; + std::vector ranges = wait(runRYWTransaction( + cx, [currentShard](Reference tr) -> Future> { + std::vector> f; + f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::False)); + f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::True)); + return getAll(f); + })); + ASSERT(ranges[0].size() == 1 && ranges[1].size() == 1); + res.emplace_back(self->indexForKey(ranges[0][0].key), self->indexForKey(ranges[1][0].key)); + } + + ASSERT(res.size() == boundaries.size() - 1); + return res; + } + + ACTOR static Future updateServerShards(Database cx, SkewedReadWriteWorkload* self) { + state Future serverList = + runRYWTransaction(cx, [](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + return tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY); + }); + state RangeResult range = + wait(runRYWTransaction(cx, [](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + return tr->getRange(serverKeysRange, CLIENT_KNOBS->TOO_MANY); + })); + wait(success(serverList)); + // decode server interfaces + self->serverInterfaces.clear(); + for (int i = 0; i < serverList.get().size(); i++) { + auto ssi = decodeServerListValue(serverList.get()[i].value); + self->serverInterfaces.emplace(ssi.id(), ssi); + } + // clear self->serverShards + self->serverShards.clear(); + + // leftEdge < workloadBegin < workloadEnd + Key workloadBegin = self->keyForIndex(0), workloadEnd = self->keyForIndex(self->nodeCount); + Key leftEdge(allKeys.begin); + std::vector leftServer; // left server owns the range [leftEdge, workloadBegin) + KeyRangeRef workloadRange(workloadBegin, workloadEnd); + state std::map> beginServers; // begin index to server ID + + for (auto kv = range.begin(); kv != range.end(); kv++) { + if (serverHasKey(kv->value)) { + auto [id, key] = serverKeysDecodeServerBegin(kv->key); + + if (workloadRange.contains(key)) { + beginServers[key].push_back(id); + } else if (workloadBegin > key && key > leftEdge) { // update left boundary + leftEdge = key; + leftServer.clear(); + } + + if (key == leftEdge) { + leftServer.push_back(id); + } + } + } + ASSERT(beginServers.size() == 0 || beginServers.begin()->first >= workloadBegin); + // handle the left boundary + if (beginServers.size() == 0 || beginServers.begin()->first > workloadBegin) { + beginServers[workloadBegin] = leftServer; + } + Standalone> keyBegins; + for (auto p = beginServers.begin(); p != beginServers.end(); ++p) { + keyBegins.push_back(keyBegins.arena(), p->first); + } + // deep count because wait below will destruct workloadEnd + keyBegins.push_back_deep(keyBegins.arena(), workloadEnd); + + IndexRangeVec indexShards = wait(convertKeyBoundaryToIndexShard(cx, self, keyBegins)); + ASSERT(beginServers.size() == indexShards.size()); + // sort shard begin idx + // build self->serverShards, starting from the left shard + std::map serverShards; + int i = 0; + for (auto p = beginServers.begin(); p != beginServers.end(); ++p) { + for (int j = 0; j < p->second.size(); ++j) { + serverShards[p->second[j]].emplace_back(indexShards[i]); + } + ++i; + } + // self->serverShards is ordered by UID + for (auto it : serverShards) { + self->serverShards.emplace_back(it); + } + // if (self->clientId == 0) { + // self->debugPrintServerShards(); + // } + return Void(); + } + + ACTOR static Future tracePeriodically(SkewedReadWriteWorkload* self) { + state double start = now(); + state double elapsed = 0.0; + state int64_t last_ops = 0; + + loop { + elapsed += self->periodicLoggingInterval; + wait(delayUntil(start + elapsed)); + + TraceEvent((self->description() + "_RowReadLatency").c_str()) + .detail("Mean", self->readLatencies.mean()) + .detail("Median", self->readLatencies.median()) + .detail("Percentile5", self->readLatencies.percentile(.05)) + .detail("Percentile95", self->readLatencies.percentile(.95)) + .detail("Percentile99", self->readLatencies.percentile(.99)) + .detail("Percentile99_9", self->readLatencies.percentile(.999)) + .detail("Max", self->readLatencies.max()) + .detail("Count", self->readLatencyCount) + .detail("Elapsed", elapsed); + + TraceEvent((self->description() + "_GRVLatency").c_str()) + .detail("Mean", self->GRVLatencies.mean()) + .detail("Median", self->GRVLatencies.median()) + .detail("Percentile5", self->GRVLatencies.percentile(.05)) + .detail("Percentile95", self->GRVLatencies.percentile(.95)) + .detail("Percentile99", self->GRVLatencies.percentile(.99)) + .detail("Percentile99_9", self->GRVLatencies.percentile(.999)) + .detail("Max", self->GRVLatencies.max()); + + TraceEvent((self->description() + "_CommitLatency").c_str()) + .detail("Mean", self->commitLatencies.mean()) + .detail("Median", self->commitLatencies.median()) + .detail("Percentile5", self->commitLatencies.percentile(.05)) + .detail("Percentile95", self->commitLatencies.percentile(.95)) + .detail("Percentile99", self->commitLatencies.percentile(.99)) + .detail("Percentile99_9", self->commitLatencies.percentile(.999)) + .detail("Max", self->commitLatencies.max()); + + TraceEvent((self->description() + "_TotalLatency").c_str()) + .detail("Mean", self->latencies.mean()) + .detail("Median", self->latencies.median()) + .detail("Percentile5", self->latencies.percentile(.05)) + .detail("Percentile95", self->latencies.percentile(.95)) + .detail("Percentile99", self->latencies.percentile(.99)) + .detail("Percentile99_9", self->latencies.percentile(.999)) + .detail("Max", self->latencies.max()); + + int64_t ops = + (self->aTransactions.getValue() * (self->readsPerTransactionA + self->writesPerTransactionA)) + + (self->bTransactions.getValue() * (self->readsPerTransactionB + self->writesPerTransactionB)); + bool recordBegin = self->shouldRecord(std::max(now() - self->periodicLoggingInterval, self->clientBegin)); + bool recordEnd = self->shouldRecord(now()); + if (recordBegin && recordEnd) { + std::string ts = format("T=%04.0fs:", elapsed); + self->periodicMetrics.emplace_back( + ts + "Operations/sec", (ops - last_ops) / self->periodicLoggingInterval, Averaged::False); + + // if(self->rampUpLoad) { + self->periodicMetrics.emplace_back( + ts + "Mean Latency (ms)", 1000 * self->latencies.mean(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Median Latency (ms, averaged)", 1000 * self->latencies.median(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "5% Latency (ms, averaged)", 1000 * self->latencies.percentile(.05), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "95% Latency (ms, averaged)", 1000 * self->latencies.percentile(.95), Averaged::True); + + self->periodicMetrics.emplace_back( + ts + "Mean Row Read Latency (ms)", 1000 * self->readLatencies.mean(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Median Row Read Latency (ms, averaged)", 1000 * self->readLatencies.median(), Averaged::True); + self->periodicMetrics.emplace_back(ts + "5% Row Read Latency (ms, averaged)", + 1000 * self->readLatencies.percentile(.05), + Averaged::True); + self->periodicMetrics.emplace_back(ts + "95% Row Read Latency (ms, averaged)", + 1000 * self->readLatencies.percentile(.95), + Averaged::True); + + self->periodicMetrics.emplace_back( + ts + "Mean Total Read Latency (ms)", 1000 * self->fullReadLatencies.mean(), Averaged::True); + self->periodicMetrics.emplace_back(ts + "Median Total Read Latency (ms, averaged)", + 1000 * self->fullReadLatencies.median(), + Averaged::True); + self->periodicMetrics.emplace_back(ts + "5% Total Read Latency (ms, averaged)", + 1000 * self->fullReadLatencies.percentile(.05), + Averaged::True); + self->periodicMetrics.emplace_back(ts + "95% Total Read Latency (ms, averaged)", + 1000 * self->fullReadLatencies.percentile(.95), + Averaged::True); + + self->periodicMetrics.emplace_back( + ts + "Mean GRV Latency (ms)", 1000 * self->GRVLatencies.mean(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Median GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.median(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "5% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.05), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "95% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.95), Averaged::True); + + self->periodicMetrics.emplace_back( + ts + "Mean Commit Latency (ms)", 1000 * self->commitLatencies.mean(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Median Commit Latency (ms, averaged)", 1000 * self->commitLatencies.median(), Averaged::True); + self->periodicMetrics.emplace_back(ts + "5% Commit Latency (ms, averaged)", + 1000 * self->commitLatencies.percentile(.05), + Averaged::True); + self->periodicMetrics.emplace_back(ts + "95% Commit Latency (ms, averaged)", + 1000 * self->commitLatencies.percentile(.95), + Averaged::True); + //} + + self->periodicMetrics.emplace_back( + ts + "Max Latency (ms, averaged)", 1000 * self->latencies.max(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Max Row Read Latency (ms, averaged)", 1000 * self->readLatencies.max(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Max Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.max(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Max GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.max(), Averaged::True); + self->periodicMetrics.emplace_back( + ts + "Max Commit Latency (ms, averaged)", 1000 * self->commitLatencies.max(), Averaged::True); + } + last_ops = ops; + + // if(self->rampUpLoad) { + self->latencies.clear(); + self->readLatencies.clear(); + self->fullReadLatencies.clear(); + self->GRVLatencies.clear(); + self->commitLatencies.clear(); + //} + + self->readLatencyTotal = 0.0; + self->readLatencyCount = 0; + } + } + + ACTOR static Future logLatency(Future> f, + ContinuousSample* latencies, + double* totalLatency, + int* latencyCount, + EventMetricHandle readMetric, + bool shouldRecord) { + state double readBegin = now(); + Optional value = wait(f); + + double latency = now() - readBegin; + readMetric->readLatency = latency * 1e9; + readMetric->log(); + + if (shouldRecord) { + *totalLatency += latency; + ++*latencyCount; + latencies->addSample(latency); + } + return Void(); + } + + ACTOR template + Future readOp(Trans* tr, std::vector keys, SkewedReadWriteWorkload* self, bool shouldRecord) { + if (!keys.size()) + return Void(); + + std::vector> readers; + for (int op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])), + &self->readLatencies, + &self->readLatencyTotal, + &self->readLatencyCount, + self->readMetric, + shouldRecord)); + } + + wait(waitForAll(readers)); + return Void(); + } + + ACTOR static Future _setup(Database cx, SkewedReadWriteWorkload* self) { + if (!self->doSetup) + return Void(); + + state Promise loadTime; + state Promise>> ratesAtKeyCounts; + + wait(bulkSetup(cx, + self, + self->nodeCount, + loadTime, + self->insertionCountsToMeasure.empty(), + self->warmingDelay, + self->maxInsertRate, + self->insertionCountsToMeasure, + ratesAtKeyCounts)); + + self->loadTime = loadTime.getFuture().get(); + self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get(); + + return Void(); + } + + void startReadWriteClients(Database cx, std::vector>& clients) { + clientBegin = now(); + for (int c = 0; c < actorCount; c++) { + Future worker; + if (useRYW) + worker = + randomReadWriteClient(cx, this, actorCount / transactionsPerSecond, c); + else + worker = randomReadWriteClient(cx, this, actorCount / transactionsPerSecond, c); + clients.push_back(worker); + } + } + + ACTOR static Future _start(Database cx, SkewedReadWriteWorkload* self) { + state std::vector> clients; + if (self->enableReadLatencyLogging) + clients.push_back(tracePeriodically(self)); + + wait(updateServerShards(cx, self)); + for (self->currentHotRound = 0; self->currentHotRound < self->skewRound; ++self->currentHotRound) { + self->setHotServers(); + self->startReadWriteClients(cx, clients); + wait(timeout(waitForAll(clients), self->testDuration / self->skewRound, Void())); + clients.clear(); + wait(delay(5.0) >> updateServerShards(cx, self)); + } + + return Void(); + } + + bool shouldRecord() { return shouldRecord(now()); } + + bool shouldRecord(double checkTime) { + double timeSinceStart = checkTime - clientBegin; + return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration); + } + + // calculate hot server count + void setHotServers() { + hotServerCount = ceil(hotServerFraction * serverShards.size()); + std::cout << "Choose " << hotServerCount << "/" << serverShards.size() << "/" << serverInterfaces.size() + << " hot servers: ["; + int begin = currentHotRound * hotServerCount; + for (int i = 0; i < hotServerCount; ++i) { + int idx = (begin + i) % serverShards.size(); + std::cout << serverInterfaces.at(serverShards[idx].first).address().toString() << ","; + } + std::cout << "]\n"; + } + + int64_t getRandomKeyFromHotServer(bool hotServerRead = true) { + ASSERT(hotServerCount > 0); + int begin = currentHotRound * hotServerCount; + if (!hotServerRead) { + begin += hotServerCount * (1.0 - hotReadWriteServerOverlap); // calculate non-overlap part offset + } + int idx = deterministicRandom()->randomInt(begin, begin + hotServerCount) % serverShards.size(); + int shardMax = std::min(serverShards[idx].second.size(), + (size_t)ceil(serverShards[idx].second.size() * hotServerShardFraction)); + int shardIdx = deterministicRandom()->randomInt(0, shardMax); + return deterministicRandom()->randomInt64(serverShards[idx].second[shardIdx].first, + serverShards[idx].second[shardIdx].second + 1); + } + + int64_t getRandomKey(uint64_t nodeCount, bool hotServerRead = true) { + auto random = deterministicRandom()->random01(); + if (hotServerFraction > 0) { + if ((hotServerRead && random < hotServerReadFrac) || (!hotServerRead && random < hotServerWriteFrac)) { + return getRandomKeyFromHotServer(hotServerRead); + } + } + return deterministicRandom()->randomInt64(0, nodeCount); + } + + ACTOR template + Future randomReadWriteClient(Database cx, SkewedReadWriteWorkload* self, double delay, int clientIndex) { + state double startTime = now(); + state double lastTime = now(); + state double GRVStartTime; + state UID debugID; + + loop { + wait(poisson(&lastTime, delay)); + + state double tstart = now(); + state bool aTransaction = deterministicRandom()->random01() > self->alpha; + + state std::vector keys; + state std::vector values; + state std::vector extra_ranges; + int reads = aTransaction ? self->readsPerTransactionA : self->readsPerTransactionB; + state int writes = aTransaction ? self->writesPerTransactionA : self->writesPerTransactionB; + for (int op = 0; op < reads; op++) + keys.push_back(self->getRandomKey(self->nodeCount)); + + values.reserve(writes); + for (int op = 0; op < writes; op++) + values.push_back(self->randomValue()); + + state Trans tr(cx); + + if (tstart - self->clientBegin > self->debugTime && + tstart - self->clientBegin <= self->debugTime + self->debugInterval) { + debugID = deterministicRandom()->randomUniqueID(); + tr.debugTransaction(debugID); + g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before"); + } else { + debugID = UID(); + } + + self->transactionSuccessMetric->retries = 0; + self->transactionSuccessMetric->commitLatency = -1; + + loop { + try { + GRVStartTime = now(); + self->transactionFailureMetric->startLatency = -1; + + double grvLatency = now() - GRVStartTime; + self->transactionSuccessMetric->startLatency = grvLatency * 1e9; + self->transactionFailureMetric->startLatency = grvLatency * 1e9; + if (self->shouldRecord()) + self->GRVLatencies.addSample(grvLatency); + + state double readStart = now(); + wait(self->readOp(&tr, keys, self, self->shouldRecord())); + + double readLatency = now() - readStart; + if (self->shouldRecord()) + self->fullReadLatencies.addSample(readLatency); + + if (!writes) + break; + + for (int op = 0; op < writes; op++) + tr.set(self->keyForIndex(self->getRandomKey(self->nodeCount, false), false), values[op]); + + state double commitStart = now(); + wait(tr.commit()); + + double commitLatency = now() - commitStart; + self->transactionSuccessMetric->commitLatency = commitLatency * 1e9; + if (self->shouldRecord()) + self->commitLatencies.addSample(commitLatency); + + break; + } catch (Error& e) { + self->transactionFailureMetric->errorCode = e.code(); + self->transactionFailureMetric->log(); + + wait(tr.onError(e)); + + ++self->transactionSuccessMetric->retries; + ++self->totalRetriesMetric; + + if (self->shouldRecord()) + ++self->retries; + } + } + + if (debugID != UID()) + g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After"); + + tr = Trans(); + + double transactionLatency = now() - tstart; + self->transactionSuccessMetric->totalLatency = transactionLatency * 1e9; + self->transactionSuccessMetric->log(); + + if (self->shouldRecord()) { + if (aTransaction) + ++self->aTransactions; + else + ++self->bTransactions; + + self->latencies.addSample(transactionLatency); + } + } + } +}; + +WorkloadFactory SkewedReadWriteWorkloadFactory("SkewedReadWrite"); + +TEST_CASE("/KVWorkload/methods/ParseKeyForIndex") { + auto wk = SkewedReadWriteWorkload(WorkloadContext()); + for (int i = 0; i < 1000; ++i) { + auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount); + Key k = wk.keyForIndex(idx); + auto parse = wk.indexForKey(k); + // std::cout << parse << " " << idx << "\n"; + ASSERT(parse == idx); + } + for (int i = 0; i < 1000; ++i) { + auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount); + Key k = wk.keyForIndex(idx, true); + auto parse = wk.indexForKey(k, true); + ASSERT(parse == idx); + } + return Void(); +} \ No newline at end of file diff --git a/fdbserver/workloads/workloads.actor.h b/fdbserver/workloads/workloads.actor.h index bdbbd5707c..df08911fc7 100644 --- a/fdbserver/workloads/workloads.actor.h +++ b/fdbserver/workloads/workloads.actor.h @@ -131,6 +131,8 @@ struct KVWorkload : TestWorkload { Key getRandomKey(bool absent) const; Key keyForIndex(uint64_t index) const; Key keyForIndex(uint64_t index, bool absent) const; + // the reverse process of keyForIndex() without division. Set absent=true to ignore the last byte in Key + int64_t indexForKey(const KeyRef& key, bool absent = false) const; }; struct IWorkloadFactory : ReferenceCounted { From a94be36e03d499204933a0439bd6fb1619804b1a Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 6 May 2022 09:22:58 -0700 Subject: [PATCH 2/8] add test spec file --- fdbserver/tester.actor.cpp | 4 +--- fdbserver/workloads/ReadWrite.actor.cpp | 10 +++++----- tests/CMakeLists.txt | 1 + tests/rare/ReadSkewReadWrite.toml | 24 ++++++++++++++++++++++++ 4 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 tests/rare/ReadSkewReadWrite.toml diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index d03ea133cd..c4380af809 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -1869,9 +1869,7 @@ ACTOR Future runTests(Reference connRecord, } choose { - when(wait(tests)) { - return Void(); - } + when(wait(tests)) { return Void(); } when(wait(quorum(actors, 1))) { ASSERT(false); throw internal_error(); diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp index 1a5c5e3e56..f149de94ea 100644 --- a/fdbserver/workloads/ReadWrite.actor.cpp +++ b/fdbserver/workloads/ReadWrite.actor.cpp @@ -128,12 +128,12 @@ struct ReadWriteWorkload : KVWorkload { double loadTime, clientBegin; ReadWriteWorkload(WorkloadContext const& wcx) - : KVWorkload(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), + : KVWorkload(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false), + totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"), - bTransactions("B Transactions"), retries("Retries"), - latencies(sampleSize), readLatencies(sampleSize), commitLatencies(sampleSize), GRVLatencies(sampleSize), - fullReadLatencies(sampleSize), readLatencyTotal(0), readLatencyCount(0), loadTime(0.0), - clientBegin(0) { + bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), + commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), + readLatencyCount(0), loadTime(0.0), clientBegin(0) { transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction")); transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction")); readMetric.init(LiteralStringRef("RWWorkload.Read")); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6006da60c1..1b9089078f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -195,6 +195,7 @@ if(WITH_PYTHON) endif() add_fdb_test(TEST_FILES rare/CheckRelocation.toml) add_fdb_test(TEST_FILES rare/ClogUnclog.toml) + add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml) add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml) add_fdb_test(TEST_FILES rare/ConfigIncrement.toml) add_fdb_test(TEST_FILES rare/ConfigIncrementWithKills.toml) diff --git a/tests/rare/ReadSkewReadWrite.toml b/tests/rare/ReadSkewReadWrite.toml new file mode 100644 index 0000000000..207ecd6c63 --- /dev/null +++ b/tests/rare/ReadSkewReadWrite.toml @@ -0,0 +1,24 @@ +[[test]] +testTitle = 'SkewedReadWriteTest' +connectionFailuresDisableDuration = 100000 +# waitForQuiescenceBegin= false +# waitForQuiescenceEnd=false +clearAfterTest = true +runSetup = true # false +timeout = 3600.0 + +[[test.workload]] +testName = 'SkewedReadWrite' +transactionsPerSecond = 100 +testDuration = 40.0 +skewRound = 1 +nodeCount = 3000 # 30000000 +valueBytes = 100 +readsPerTransactionA = 8 +writesPerTransactionA = 0 +alpha = 0 +discardEdgeMeasurements = false +hotServerFraction = 0.2 +hotServerReadFrac = 0.8 +# hotServerShardFraction = 0.3 +warmingDelay = 180.0 From 5bb70dda915e5d560428ddc9581f25c73b548ca5 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 6 May 2022 10:39:45 -0700 Subject: [PATCH 3/8] add remainedBytes method --- fdbclient/SystemData.cpp | 10 ++++------ flow/serialize.h | 9 +++++++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index a84eae4f77..73baffd127 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -335,12 +335,10 @@ std::pair serverKeysDecodeServerBegin(const KeyRef& key) { BinaryReader rd(key.removePrefix(serverKeysPrefix), Unversioned()); rd >> server_id; rd.readBytes(1); // skip "/" - std::string bytes; - while (!rd.empty()) { - bytes.push_back((char)*rd.arenaRead(1)); - } - // std::cout << bytes.size() << " " < { From 78f819fb2ab530e2300800eb1235c7794bbbcc6d Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 13 May 2022 11:36:03 -0700 Subject: [PATCH 4/8] Update flow/serialize.h Co-authored-by: Trevor Clinkenbeard --- flow/serialize.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/serialize.h b/flow/serialize.h index d24d3bcaf3..4b331a73fc 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -635,7 +635,7 @@ public: } size_t size() const { return len; } - size_t remainedBytes() const { return end - begin; }; + size_t remainingBytes() const { return end - begin; }; protected: _Reader(const char* begin, const char* end) : begin(begin), end(end), len(end - begin) {} From 8014ac6baf7e7bc7cfda5947feddb662141844f0 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 13 May 2022 12:23:53 -0700 Subject: [PATCH 5/8] CMakeList.txt --- fdbserver/CMakeLists.txt | 1 + tests/CMakeLists.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index f47e003a67..03a51cb662 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -281,6 +281,7 @@ set(FDBSERVER_SRCS workloads/Sideband.actor.cpp workloads/SidebandSingle.actor.cpp workloads/SimpleAtomicAdd.actor.cpp + workloads/SkewedReadWrite.actor.cpp workloads/SlowTaskWorkload.actor.cpp workloads/SnapTest.actor.cpp workloads/SpecialKeySpaceCorrectness.actor.cpp diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1489ecb97e..b409f6ae35 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -212,6 +212,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.toml) add_fdb_test(TEST_FILES rare/RYWDisable.toml) add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml) + add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml) add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml) add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml) add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml) From b0c26e93b2ce7763b816a0d9f987a6db8a9c18fd Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 13 May 2022 12:55:19 -0700 Subject: [PATCH 6/8] remove size() method --- fdbclient/SystemData.cpp | 4 ++-- flow/serialize.h | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 73baffd127..2768a3e4c1 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -335,8 +335,8 @@ std::pair serverKeysDecodeServerBegin(const KeyRef& key) { BinaryReader rd(key.removePrefix(serverKeysPrefix), Unversioned()); rd >> server_id; rd.readBytes(1); // skip "/" - const auto remainedBytes = rd.remainedBytes(); - KeyRef ref = KeyRef(rd.arenaRead(remainedBytes), remainedBytes); + const auto remainingBytes = rd.remainingBytes(); + KeyRef ref = KeyRef(rd.arenaRead(remainingBytes), remainingBytes); // std::cout << ref.size() << " " << ref.toString() << std::endl; return std::make_pair(server_id, Key(ref)); } diff --git a/flow/serialize.h b/flow/serialize.h index 4b331a73fc..5c218b9bc6 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -634,19 +634,16 @@ public: check = nullptr; } - size_t size() const { return len; } size_t remainingBytes() const { return end - begin; }; protected: - _Reader(const char* begin, const char* end) : begin(begin), end(end), len(end - begin) {} - _Reader(const char* begin, const char* end, const Arena& arena) - : begin(begin), end(end), m_pool(arena), len(end - begin) {} + _Reader(const char* begin, const char* end) : begin(begin), end(end) {} + _Reader(const char* begin, const char* end, const Arena& arena) : begin(begin), end(end), m_pool(arena) {} const char *begin, *end; const char* check = nullptr; Arena m_pool; ProtocolVersion m_protocolVersion; - size_t len; }; class ArenaReader : public _Reader { From 4f3a7b7e7f41f80f04743c6da89e1df4565b4144 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 13 May 2022 15:26:39 -0700 Subject: [PATCH 7/8] refactor ReadWriteWorkload --- fdbserver/CMakeLists.txt | 1 + fdbserver/workloads/ReadWrite.actor.cpp | 572 +++++++----------- fdbserver/workloads/ReadWriteWorkload.actor.h | 171 ++++++ fdbserver/workloads/SkewedReadWrite.actor.cpp | 416 +------------ 4 files changed, 415 insertions(+), 745 deletions(-) create mode 100644 fdbserver/workloads/ReadWriteWorkload.actor.h diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 03a51cb662..18a9e10b6d 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -267,6 +267,7 @@ set(FDBSERVER_SRCS workloads/ReadAfterWrite.actor.cpp workloads/ReadHotDetection.actor.cpp workloads/ReadWrite.actor.cpp + workloads/ReadWriteWorkload.actor.h workloads/RemoveServersSafely.actor.cpp workloads/ReportConflictingKeys.actor.cpp workloads/RestoreBackup.actor.cpp diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp index f149de94ea..475c3a023c 100644 --- a/fdbserver/workloads/ReadWrite.actor.cpp +++ b/fdbserver/workloads/ReadWrite.actor.cpp @@ -28,209 +28,13 @@ #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/workloads/BulkSetup.actor.h" +#include "fdbserver/workloads/ReadWriteWorkload.actor.h" #include "fdbclient/ReadYourWrites.h" #include "flow/TDMetric.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. -const int sampleSize = 10000; -static Future nextRV; -static Version lastRV = invalidVersion; - -ACTOR static Future getNextRV(Database db) { - state Transaction tr(db); - loop { - try { - Version v = wait(tr.getReadVersion()); - return v; - } catch (Error& e) { - wait(tr.onError(e)); - } - } -} -static Future getInconsistentReadVersion(Database const& db) { - if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running - if (nextRV.isValid()) - lastRV = nextRV.get(); - nextRV = getNextRV(db); - } - if (lastRV == invalidVersion) - return nextRV; - else - return lastRV; -} - -DESCR struct TransactionSuccessMetric { - int64_t totalLatency; // ns - int64_t startLatency; // ns - int64_t commitLatency; // ns - int64_t retries; // count -}; - -DESCR struct TransactionFailureMetric { - int64_t startLatency; // ns - int64_t errorCode; // flow error code -}; - -DESCR struct ReadMetric { - int64_t readLatency; // ns -}; - -struct ReadWriteWorkload : KVWorkload { - // general test setting - Standalone descriptionString; - bool doSetup, cancelWorkersAtDuration; - double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime; - double metricsStart, metricsDuration; - std::vector insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup - - // test log setting - bool enableReadLatencyLogging; - double periodicLoggingInterval; - - // use ReadWrite as a ramp up workload - bool rampUpLoad; // indicate this is a ramp up workload - int rampSweepCount; // how many times of ramp up - bool rampTransactionType; // choose transaction type based on client start time - bool rampUpConcurrency; // control client concurrency - - // transaction setting - bool useRYW; - bool batchPriority; - bool rangeReads; // read operations are all single key range read - bool dependentReads; // read operations are issued sequentially - bool inconsistentReads; // read with previous read version - bool adjacentReads; // keys are adjacent within a transaction - bool adjacentWrites; - double alpha; // probability for run TransactionA type - // two type of transaction - int readsPerTransactionA, writesPerTransactionA; - int readsPerTransactionB, writesPerTransactionB; - int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction; - std::string valueString; - // hot traffic pattern - double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting - - // states of metric - Int64MetricHandle totalReadsMetric; - Int64MetricHandle totalRetriesMetric; - EventMetricHandle transactionSuccessMetric; - EventMetricHandle transactionFailureMetric; - EventMetricHandle readMetric; - PerfIntCounter aTransactions, bTransactions, retries; - ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies; - double readLatencyTotal; - int readLatencyCount; - std::vector periodicMetrics; - std::vector> ratesAtKeyCounts; // sequential insertion speed - - // other internal states - std::vector> clients; - double loadTime, clientBegin; - - ReadWriteWorkload(WorkloadContext const& wcx) - : KVWorkload(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false), - totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), - totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"), - bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), - commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), - readLatencyCount(0), loadTime(0.0), clientBegin(0) { - transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction")); - transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction")); - readMetric.init(LiteralStringRef("RWWorkload.Read")); - - testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); - transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount; - double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250); - actorCount = ceil(transactionsPerSecond * allowedLatency); - actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount); - - readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10); - writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0); - readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1); - writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9); - alpha = getOption(options, LiteralStringRef("alpha"), 0.1); - - extraReadConflictRangesPerTransaction = - getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0); - extraWriteConflictRangesPerTransaction = - getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0); - - valueString = std::string(maxValueBytes, '.'); - if (nodePrefix > 0) { - keyBytes += 16; - } - - metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0); - metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration); - if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) { - // discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test - metricsStart += testDuration * 0.125; - metricsDuration *= 0.75; - } - - dependentReads = getOption(options, LiteralStringRef("dependentReads"), false); - warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0); - maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12); - debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0); - debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0); - enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false); - periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0); - cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true); - inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false); - adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false); - adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false); - rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false); - useRYW = getOption(options, LiteralStringRef("useRYW"), false); - rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1); - rangeReads = getOption(options, LiteralStringRef("rangeReads"), false); - rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false); - rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false); - doSetup = getOption(options, LiteralStringRef("setup"), true); - batchPriority = getOption(options, LiteralStringRef("batchPriority"), false); - descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite")); - - if (rampUpConcurrency) - ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down - - // Validate that keyForIndex() is monotonic - for (int i = 0; i < 30; i++) { - int64_t a = deterministicRandom()->randomInt64(0, nodeCount); - int64_t b = deterministicRandom()->randomInt64(0, nodeCount); - if (a > b) { - std::swap(a, b); - } - ASSERT(a <= b); - ASSERT((keyForIndex(a, false) <= keyForIndex(b, false))); - } - - std::vector insertionCountsToMeasureString = - getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector()); - for (int i = 0; i < insertionCountsToMeasureString.size(); i++) { - try { - uint64_t count = boost::lexical_cast(insertionCountsToMeasureString[i]); - insertionCountsToMeasure.push_back(count); - } catch (...) { - } - } - - { - // with P(hotTrafficFraction) an access is directed to one of a fraction - // of hot keys, else it is directed to a disjoint set of cold keys - hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0); - double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0); - ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1); - ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot! - // p(Cold key) = (1-FHP) * (1-hkf) - // p(Cold key) = (1-htf) - // solving for FHP gives: - forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction); - } - } - - std::string description() const override { return descriptionString.toString(); } - Future setup(Database const& cx) override { return _setup(cx, this); } - Future start(Database const& cx) override { return _start(cx, this); } - +struct ReadWriteCommonImpl { + // trace methods ACTOR static Future traceDumpWorkers(Reference const> db) { try { loop { @@ -257,91 +61,7 @@ struct ReadWriteWorkload : KVWorkload { throw; } } - - Future check(Database const& cx) override { - clients.clear(); - - if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration) - metricsDuration = now() - metricsStart; - - g_traceBatch.dump(); - if (clientId == 0) - return traceDumpWorkers(dbInfo); - else - return true; - } - - void getMetrics(std::vector& m) override { - double duration = metricsDuration; - int reads = - (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB); - int writes = - (aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB); - m.emplace_back("Measured Duration", duration, Averaged::True); - m.emplace_back( - "Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False); - m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False); - m.push_back(aTransactions.getMetric()); - m.push_back(bTransactions.getMetric()); - m.push_back(retries.getMetric()); - m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True); - m.emplace_back("Read rows", reads, Averaged::False); - m.emplace_back("Write rows", writes, Averaged::False); - - if (!rampUpLoad) { - m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True); - m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True); - m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True); - m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True); - m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True); - - m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True); - m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True); - m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True); - - m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True); - m.emplace_back( - "Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True); - m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True); - - m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True); - m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True); - m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True); - - m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True); - m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True); - m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True); - } - - m.emplace_back("Read rows/sec", reads / duration, Averaged::False); - m.emplace_back("Write rows/sec", writes / duration, Averaged::False); - m.emplace_back( - "Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); - m.emplace_back("Bytes written/sec", - (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, - Averaged::False); - m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end()); - - std::vector>::iterator ratesItr = ratesAtKeyCounts.begin(); - for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) - m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False); - } - - Value randomValue() { - return StringRef((uint8_t*)valueString.c_str(), - deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1)); - } - - Standalone operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); } - - template - void setupTransaction(Trans* tr) { - if (batchPriority) { - tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); - } - } - - ACTOR static Future tracePeriodically(ReadWriteWorkload* self) { + ACTOR static Future tracePeriodically(ReadWriteCommon* self) { state double start = now(); state double elapsed = 0.0; state int64_t last_ops = 0; @@ -477,7 +197,6 @@ struct ReadWriteWorkload : KVWorkload { self->readLatencyCount = 0; } } - ACTOR static Future logLatency(Future> f, ContinuousSample* latencies, double* totalLatency, @@ -498,7 +217,6 @@ struct ReadWriteWorkload : KVWorkload { } return Void(); } - ACTOR static Future logLatency(Future f, ContinuousSample* latencies, double* totalLatency, @@ -520,52 +238,7 @@ struct ReadWriteWorkload : KVWorkload { return Void(); } - ACTOR template - Future readOp(Trans* tr, std::vector keys, ReadWriteWorkload* self, bool shouldRecord) { - if (!keys.size()) - return Void(); - if (!self->dependentReads) { - std::vector> readers; - if (self->rangeReads) { - for (int op = 0; op < keys.size(); op++) { - ++self->totalReadsMetric; - readers.push_back(logLatency( - tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))), - GetRangeLimits(-1, 80000)), - &self->readLatencies, - &self->readLatencyTotal, - &self->readLatencyCount, - self->readMetric, - shouldRecord)); - } - } else { - for (int op = 0; op < keys.size(); op++) { - ++self->totalReadsMetric; - readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])), - &self->readLatencies, - &self->readLatencyTotal, - &self->readLatencyCount, - self->readMetric, - shouldRecord)); - } - } - wait(waitForAll(readers)); - } else { - state int op; - for (op = 0; op < keys.size(); op++) { - ++self->totalReadsMetric; - wait(logLatency(tr->get(self->keyForIndex(keys[op])), - &self->readLatencies, - &self->readLatencyTotal, - &self->readLatencyCount, - self->readMetric, - shouldRecord)); - } - } - return Void(); - } - - ACTOR Future _setup(Database cx, ReadWriteWorkload* self) { + ACTOR static Future setup(Database cx, ReadWriteCommon* self) { if (!self->doSetup) return Void(); @@ -587,8 +260,232 @@ struct ReadWriteWorkload : KVWorkload { return Void(); } +}; - ACTOR Future _start(Database cx, ReadWriteWorkload* self) { +Future ReadWriteCommon::tracePeriodically() { + return ReadWriteCommonImpl::tracePeriodically(this); +} + +Future ReadWriteCommon::logLatency(Future> f, bool shouldRecord) { + return ReadWriteCommonImpl::logLatency( + f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord); +} + +Future ReadWriteCommon::logLatency(Future f, bool shouldRecord) { + return ReadWriteCommonImpl::logLatency( + f, &readLatencies, &readLatencyTotal, &readLatencyCount, readMetric, shouldRecord); +} + +Future ReadWriteCommon::setup(Database const& cx) { + return ReadWriteCommonImpl::setup(cx, this); +} + +Future ReadWriteCommon::check(Database const& cx) { + clients.clear(); + + if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration) + metricsDuration = now() - metricsStart; + + g_traceBatch.dump(); + if (clientId == 0) + return ReadWriteCommonImpl::traceDumpWorkers(dbInfo); + else + return true; +} + +void ReadWriteCommon::getMetrics(std::vector& m) { + double duration = metricsDuration; + int reads = (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB); + int writes = + (aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB); + m.emplace_back("Measured Duration", duration, Averaged::True); + m.emplace_back( + "Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False); + m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False); + m.push_back(aTransactions.getMetric()); + m.push_back(bTransactions.getMetric()); + m.push_back(retries.getMetric()); + m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True); + m.emplace_back("Read rows", reads, Averaged::False); + m.emplace_back("Write rows", writes, Averaged::False); + m.emplace_back("Read rows/sec", reads / duration, Averaged::False); + m.emplace_back("Write rows/sec", writes / duration, Averaged::False); + m.emplace_back( + "Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); + m.emplace_back( + "Bytes written/sec", (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); + m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end()); + + std::vector>::iterator ratesItr = ratesAtKeyCounts.begin(); + for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) + m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False); +} + +Value ReadWriteCommon::randomValue() { + return StringRef((uint8_t*)valueString.c_str(), deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1)); +} + +Standalone ReadWriteCommon::operator()(uint64_t n) { + return KeyValueRef(keyForIndex(n, false), randomValue()); +} + +bool ReadWriteCommon::shouldRecord(double checkTime) { + double timeSinceStart = checkTime - clientBegin; + return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration); +} + +static Future nextRV; +static Version lastRV = invalidVersion; + +ACTOR static Future getNextRV(Database db) { + state Transaction tr(db); + loop { + try { + Version v = wait(tr.getReadVersion()); + return v; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +static Future getInconsistentReadVersion(Database const& db) { + if (!nextRV.isValid() || nextRV.isReady()) { // if no getNextRV() running + if (nextRV.isValid()) + lastRV = nextRV.get(); + nextRV = getNextRV(db); + } + if (lastRV == invalidVersion) + return nextRV; + else + return lastRV; +} + +struct ReadWriteWorkload : ReadWriteCommon { + // use ReadWrite as a ramp up workload + bool rampUpLoad; // indicate this is a ramp up workload + int rampSweepCount; // how many times of ramp up + bool rampTransactionType; // choose transaction type based on client start time + bool rampUpConcurrency; // control client concurrency + + // transaction setting + bool batchPriority; + bool rangeReads; // read operations are all single key range read + bool dependentReads; // read operations are issued sequentially + bool inconsistentReads; // read with previous read version + bool adjacentReads; // keys are adjacent within a transaction + bool adjacentWrites; + int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction; + + // hot traffic pattern + double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting + + ReadWriteWorkload(WorkloadContext const& wcx) + : ReadWriteCommon(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false) { + extraReadConflictRangesPerTransaction = + getOption(options, LiteralStringRef("extraReadConflictRangesPerTransaction"), 0); + extraWriteConflictRangesPerTransaction = + getOption(options, LiteralStringRef("extraWriteConflictRangesPerTransaction"), 0); + dependentReads = getOption(options, LiteralStringRef("dependentReads"), false); + inconsistentReads = getOption(options, LiteralStringRef("inconsistentReads"), false); + adjacentReads = getOption(options, LiteralStringRef("adjacentReads"), false); + adjacentWrites = getOption(options, LiteralStringRef("adjacentWrites"), false); + rampUpLoad = getOption(options, LiteralStringRef("rampUpLoad"), false); + rampSweepCount = getOption(options, LiteralStringRef("rampSweepCount"), 1); + rangeReads = getOption(options, LiteralStringRef("rangeReads"), false); + rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false); + rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false); + batchPriority = getOption(options, LiteralStringRef("batchPriority"), false); + descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite")); + + if (rampUpConcurrency) + ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down + + { + // with P(hotTrafficFraction) an access is directed to one of a fraction + // of hot keys, else it is directed to a disjoint set of cold keys + hotKeyFraction = getOption(options, LiteralStringRef("hotKeyFraction"), 0.0); + double hotTrafficFraction = getOption(options, LiteralStringRef("hotTrafficFraction"), 0.0); + ASSERT(hotKeyFraction >= 0 && hotTrafficFraction <= 1); + ASSERT(hotKeyFraction <= hotTrafficFraction); // hot keys should be actually hot! + // p(Cold key) = (1-FHP) * (1-hkf) + // p(Cold key) = (1-htf) + // solving for FHP gives: + forceHotProbability = (hotTrafficFraction - hotKeyFraction) / (1 - hotKeyFraction); + } + } + + std::string description() const override { return descriptionString.toString(); } + + template + void setupTransaction(Trans* tr) { + if (batchPriority) { + tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); + } + } + + void getMetrics(std::vector& m) override { + ReadWriteCommon::getMetrics(m); + if (!rampUpLoad) { + m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), Averaged::True); + m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), Averaged::True); + m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), Averaged::True); + m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), Averaged::True); + m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), Averaged::True); + + m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), Averaged::True); + m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), Averaged::True); + m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), Averaged::True); + + m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), Averaged::True); + m.emplace_back( + "Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), Averaged::True); + m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), Averaged::True); + + m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), Averaged::True); + m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), Averaged::True); + m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), Averaged::True); + + m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True); + m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True); + m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True); + } + } + + Future start(Database const& cx) override { return _start(cx, this); } + + ACTOR template + static Future readOp(Trans* tr, std::vector keys, ReadWriteWorkload* self, bool shouldRecord) { + if (!keys.size()) + return Void(); + if (!self->dependentReads) { + std::vector> readers; + if (self->rangeReads) { + for (int op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + readers.push_back(self->logLatency( + tr->getRange(KeyRangeRef(self->keyForIndex(keys[op]), Key(strinc(self->keyForIndex(keys[op])))), + GetRangeLimits(-1, 80000)), + shouldRecord)); + } + } else { + for (int op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord)); + } + } + wait(waitForAll(readers)); + } else { + state int op; + for (op = 0; op < keys.size(); op++) { + ++self->totalReadsMetric; + wait(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord)); + } + } + return Void(); + } + + ACTOR static Future _start(Database cx, ReadWriteWorkload* self) { // Read one record from the database to warm the cache of keyServers state std::vector keys; keys.push_back(deterministicRandom()->randomInt64(0, self->nodeCount)); @@ -610,7 +507,7 @@ struct ReadWriteWorkload : KVWorkload { std::vector> clients; if (self->enableReadLatencyLogging) - clients.push_back(tracePeriodically(self)); + clients.push_back(self->tracePeriodically()); self->clientBegin = now(); for (int c = 0; c < self->actorCount; c++) { @@ -632,13 +529,6 @@ struct ReadWriteWorkload : KVWorkload { return Void(); } - bool shouldRecord() { return shouldRecord(now()); } - - bool shouldRecord(double checkTime) { - double timeSinceStart = checkTime - clientBegin; - return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration); - } - int64_t getRandomKey(uint64_t nodeCount) { if (forceHotProbability && deterministicRandom()->random01() < forceHotProbability) return deterministicRandom()->randomInt64(0, nodeCount * hotKeyFraction) / diff --git a/fdbserver/workloads/ReadWriteWorkload.actor.h b/fdbserver/workloads/ReadWriteWorkload.actor.h new file mode 100644 index 0000000000..fe33be0213 --- /dev/null +++ b/fdbserver/workloads/ReadWriteWorkload.actor.h @@ -0,0 +1,171 @@ +/* + * ReadWriteWorkload.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H) +#define FDBSERVER_READWRITEWORKLOAD_ACTOR_G_H +#include "fdbserver/workloads/ReadWriteWorkload.actor.g.h" +#elif !defined(FDBSERVER_READWRITEWORKLOAD_ACTOR_H) +#define FDBSERVER_READWRITEWORKLOAD_ACTOR_H + +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/TDMetric.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. +DESCR struct TransactionSuccessMetric { + int64_t totalLatency; // ns + int64_t startLatency; // ns + int64_t commitLatency; // ns + int64_t retries; // count +}; + +DESCR struct TransactionFailureMetric { + int64_t startLatency; // ns + int64_t errorCode; // flow error code +}; + +DESCR struct ReadMetric { + int64_t readLatency; // ns +}; + +// Common ReadWrite test settings +struct ReadWriteCommon : KVWorkload { + static constexpr int sampleSize = 10000; + friend struct ReadWriteCommonImpl; + + // general test setting + Standalone descriptionString; + bool doSetup, cancelWorkersAtDuration; + double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime; + double metricsStart, metricsDuration; + std::vector insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup + + // test log setting + bool enableReadLatencyLogging; + double periodicLoggingInterval; + + // two type of transaction + int readsPerTransactionA, writesPerTransactionA; + int readsPerTransactionB, writesPerTransactionB; + std::string valueString; + double alpha; // probability for run TransactionA type + // transaction setting + bool useRYW; + + // states of metric + Int64MetricHandle totalReadsMetric; + Int64MetricHandle totalRetriesMetric; + EventMetricHandle transactionSuccessMetric; + EventMetricHandle transactionFailureMetric; + EventMetricHandle readMetric; + PerfIntCounter aTransactions, bTransactions, retries; + ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies; + double readLatencyTotal; + int readLatencyCount; + std::vector periodicMetrics; + std::vector> ratesAtKeyCounts; // sequential insertion speed + + // other internal states + std::vector> clients; + double loadTime, clientBegin; + + explicit ReadWriteCommon(WorkloadContext const& wcx) + : KVWorkload(wcx), totalReadsMetric(LiteralStringRef("ReadWrite.TotalReads")), + totalRetriesMetric(LiteralStringRef("ReadWrite.TotalRetries")), aTransactions("A Transactions"), + bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), + commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), + readLatencyCount(0), loadTime(0.0), clientBegin(0) { + + transactionSuccessMetric.init(LiteralStringRef("ReadWrite.SuccessfulTransaction")); + transactionFailureMetric.init(LiteralStringRef("ReadWrite.FailedTransaction")); + readMetric.init(LiteralStringRef("ReadWrite.Read")); + + testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); + transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount; + double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250); + actorCount = ceil(transactionsPerSecond * allowedLatency); + actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount); + + readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10); + writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0); + readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1); + writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9); + alpha = getOption(options, LiteralStringRef("alpha"), 0.1); + + valueString = std::string(maxValueBytes, '.'); + if (nodePrefix > 0) { + keyBytes += 16; + } + + metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0); + metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration); + if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) { + // discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test + metricsStart += testDuration * 0.125; + metricsDuration *= 0.75; + } + + warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0); + maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12); + debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0); + debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0); + enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false); + periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0); + cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true); + + useRYW = getOption(options, LiteralStringRef("useRYW"), false); + doSetup = getOption(options, LiteralStringRef("setup"), true); + + // Validate that keyForIndex() is monotonic + for (int i = 0; i < 30; i++) { + int64_t a = deterministicRandom()->randomInt64(0, nodeCount); + int64_t b = deterministicRandom()->randomInt64(0, nodeCount); + if (a > b) { + std::swap(a, b); + } + ASSERT(a <= b); + ASSERT((keyForIndex(a, false) <= keyForIndex(b, false))); + } + + std::vector insertionCountsToMeasureString = + getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector()); + for (int i = 0; i < insertionCountsToMeasureString.size(); i++) { + try { + uint64_t count = boost::lexical_cast(insertionCountsToMeasureString[i]); + insertionCountsToMeasure.push_back(count); + } catch (...) { + } + } + } + + Future tracePeriodically(); + Future logLatency(Future> f, bool shouldRecord); + Future logLatency(Future f, bool shouldRecord); + + Future setup(Database const& cx) override; + Future check(Database const& cx) override; + void getMetrics(std::vector& m) override; + + Standalone operator()(uint64_t n); + bool shouldRecord(double checkTime = now()); + Value randomValue(); +}; + +#include "flow/unactorcompiler.h" +#endif // FDBSERVER_READWRITEWORKLOAD_ACTOR_H diff --git a/fdbserver/workloads/SkewedReadWrite.actor.cpp b/fdbserver/workloads/SkewedReadWrite.actor.cpp index 6ea9d9b8aa..78576f957f 100644 --- a/fdbserver/workloads/SkewedReadWrite.actor.cpp +++ b/fdbserver/workloads/SkewedReadWrite.actor.cpp @@ -28,48 +28,13 @@ #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/workloads/BulkSetup.actor.h" +#include "fdbserver/workloads/ReadWriteWorkload.actor.h" #include "fdbclient/ReadYourWrites.h" #include "flow/TDMetric.actor.h" #include "fdbclient/RunTransaction.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. -const int sampleSize = 10000; -DESCR struct TransactionSuccessMetric { - int64_t totalLatency; // ns - int64_t startLatency; // ns - int64_t commitLatency; // ns - int64_t retries; // count -}; - -DESCR struct TransactionFailureMetric { - int64_t startLatency; // ns - int64_t errorCode; // flow error code -}; - -DESCR struct ReadMetric { - int64_t readLatency; // ns -}; - -struct SkewedReadWriteWorkload : KVWorkload { - // general test setting - Standalone descriptionString; - bool doSetup, cancelWorkersAtDuration; - double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime; - double metricsStart, metricsDuration; - std::vector insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup - - // test log setting - bool enableReadLatencyLogging; - double periodicLoggingInterval; - - // transaction setting - bool useRYW; - double alpha; // probability for run TransactionA type - // two type of transaction - int readsPerTransactionA, writesPerTransactionA; - int readsPerTransactionB, writesPerTransactionB; - std::string valueString; - +struct SkewedReadWriteWorkload : ReadWriteCommon { // server based hot traffic setting int skewRound = 0; // skewDuration = ceil(testDuration / skewRound) double hotServerFraction = 0, hotServerShardFraction = 1.0; // set > 0 to issue hot key based on shard map @@ -83,184 +48,20 @@ struct SkewedReadWriteWorkload : KVWorkload { std::map serverInterfaces; int hotServerCount = 0, currentHotRound = -1; - // states of metric - Int64MetricHandle totalReadsMetric; - Int64MetricHandle totalRetriesMetric; - EventMetricHandle transactionSuccessMetric; - EventMetricHandle transactionFailureMetric; - EventMetricHandle readMetric; - PerfIntCounter aTransactions, bTransactions, retries; - ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies; - double readLatencyTotal; - int readLatencyCount; - std::vector periodicMetrics; - std::vector> ratesAtKeyCounts; // sequential insertion speed - - // other internal states - std::vector> clients; - double loadTime, clientBegin; - - SkewedReadWriteWorkload(WorkloadContext const& wcx) - : KVWorkload(wcx), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")), - totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"), - bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize), - commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0), - readLatencyCount(0), loadTime(0.0), clientBegin(0) { - - transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction")); - transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction")); - readMetric.init(LiteralStringRef("RWWorkload.Read")); - - testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); - transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount; - double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250); - actorCount = ceil(transactionsPerSecond * allowedLatency); - actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount); - - readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10); - writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0); - readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1); - writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9); - alpha = getOption(options, LiteralStringRef("alpha"), 0.1); - - valueString = std::string(maxValueBytes, '.'); - if (nodePrefix > 0) { - keyBytes += 16; - } - - metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0); - metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration); - if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) { - // discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test - metricsStart += testDuration * 0.125; - metricsDuration *= 0.75; - } - - warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0); - maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12); - debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0); - debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0); - enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false); - periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0); - cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true); - useRYW = getOption(options, LiteralStringRef("useRYW"), false); - doSetup = getOption(options, LiteralStringRef("setup"), true); + SkewedReadWriteWorkload(WorkloadContext const& wcx) : ReadWriteCommon(wcx) { descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("SkewedReadWrite")); - - // Validate that keyForIndex() is monotonic - for (int i = 0; i < 30; i++) { - int64_t a = deterministicRandom()->randomInt64(0, nodeCount); - int64_t b = deterministicRandom()->randomInt64(0, nodeCount); - if (a > b) { - std::swap(a, b); - } - ASSERT(a <= b); - ASSERT((keyForIndex(a, false) <= keyForIndex(b, false))); - } - - std::vector insertionCountsToMeasureString = - getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector()); - for (int i = 0; i < insertionCountsToMeasureString.size(); i++) { - try { - uint64_t count = boost::lexical_cast(insertionCountsToMeasureString[i]); - insertionCountsToMeasure.push_back(count); - } catch (...) { - } - } - - { - hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2); - hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0); - hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0); - skewRound = getOption(options, "skewRound"_sr, 1); - hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8); - hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0); - ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) && - skewRound > 0); - } + hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2); + hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0); + hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0); + skewRound = getOption(options, "skewRound"_sr, 1); + hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8); + hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0); + ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) && skewRound > 0); } std::string description() const override { return descriptionString.toString(); } - Future setup(Database const& cx) override { return _setup(cx, this); } Future start(Database const& cx) override { return _start(cx, this); } - ACTOR static Future traceDumpWorkers(Reference const> db) { - try { - loop { - choose { - when(wait(db->onChange())) {} - - when(ErrorOr> workerList = - wait(db->get().clusterInterface.getWorkers.tryGetReply(GetWorkersRequest()))) { - if (workerList.present()) { - std::vector>> dumpRequests; - dumpRequests.reserve(workerList.get().size()); - for (int i = 0; i < workerList.get().size(); i++) - dumpRequests.push_back(workerList.get()[i].interf.traceBatchDumpRequest.tryGetReply( - TraceBatchDumpRequest())); - wait(waitForAll(dumpRequests)); - return true; - } - wait(delay(1.0)); - } - } - } - } catch (Error& e) { - TraceEvent(SevError, "FailedToDumpWorkers").error(e); - throw; - } - } - - Future check(Database const& cx) override { - clients.clear(); - - if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration) - metricsDuration = now() - metricsStart; - - g_traceBatch.dump(); - if (clientId == 0) - return traceDumpWorkers(dbInfo); - else - return true; - } - - void getMetrics(std::vector& m) override { - double duration = metricsDuration; - int reads = - (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB); - int writes = - (aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB); - m.emplace_back("Measured Duration", duration, Averaged::True); - m.emplace_back( - "Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False); - m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False); - m.push_back(aTransactions.getMetric()); - m.push_back(bTransactions.getMetric()); - m.push_back(retries.getMetric()); - m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True); - m.emplace_back("Read rows", reads, Averaged::False); - m.emplace_back("Write rows", writes, Averaged::False); - m.emplace_back("Read rows/sec", reads / duration, Averaged::False); - m.emplace_back("Write rows/sec", writes / duration, Averaged::False); - m.emplace_back( - "Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False); - m.emplace_back("Bytes written/sec", - (writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, - Averaged::False); - m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end()); - - std::vector>::iterator ratesItr = ratesAtKeyCounts.begin(); - for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++) - m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False); - } - - Value randomValue() { - return StringRef((uint8_t*)valueString.c_str(), - deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1)); - } - - Standalone operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); } - void debugPrintServerShards() const { std::cout << std::hex; for (auto it : this->serverShards) { @@ -375,164 +176,6 @@ struct SkewedReadWriteWorkload : KVWorkload { return Void(); } - ACTOR static Future tracePeriodically(SkewedReadWriteWorkload* self) { - state double start = now(); - state double elapsed = 0.0; - state int64_t last_ops = 0; - - loop { - elapsed += self->periodicLoggingInterval; - wait(delayUntil(start + elapsed)); - - TraceEvent((self->description() + "_RowReadLatency").c_str()) - .detail("Mean", self->readLatencies.mean()) - .detail("Median", self->readLatencies.median()) - .detail("Percentile5", self->readLatencies.percentile(.05)) - .detail("Percentile95", self->readLatencies.percentile(.95)) - .detail("Percentile99", self->readLatencies.percentile(.99)) - .detail("Percentile99_9", self->readLatencies.percentile(.999)) - .detail("Max", self->readLatencies.max()) - .detail("Count", self->readLatencyCount) - .detail("Elapsed", elapsed); - - TraceEvent((self->description() + "_GRVLatency").c_str()) - .detail("Mean", self->GRVLatencies.mean()) - .detail("Median", self->GRVLatencies.median()) - .detail("Percentile5", self->GRVLatencies.percentile(.05)) - .detail("Percentile95", self->GRVLatencies.percentile(.95)) - .detail("Percentile99", self->GRVLatencies.percentile(.99)) - .detail("Percentile99_9", self->GRVLatencies.percentile(.999)) - .detail("Max", self->GRVLatencies.max()); - - TraceEvent((self->description() + "_CommitLatency").c_str()) - .detail("Mean", self->commitLatencies.mean()) - .detail("Median", self->commitLatencies.median()) - .detail("Percentile5", self->commitLatencies.percentile(.05)) - .detail("Percentile95", self->commitLatencies.percentile(.95)) - .detail("Percentile99", self->commitLatencies.percentile(.99)) - .detail("Percentile99_9", self->commitLatencies.percentile(.999)) - .detail("Max", self->commitLatencies.max()); - - TraceEvent((self->description() + "_TotalLatency").c_str()) - .detail("Mean", self->latencies.mean()) - .detail("Median", self->latencies.median()) - .detail("Percentile5", self->latencies.percentile(.05)) - .detail("Percentile95", self->latencies.percentile(.95)) - .detail("Percentile99", self->latencies.percentile(.99)) - .detail("Percentile99_9", self->latencies.percentile(.999)) - .detail("Max", self->latencies.max()); - - int64_t ops = - (self->aTransactions.getValue() * (self->readsPerTransactionA + self->writesPerTransactionA)) + - (self->bTransactions.getValue() * (self->readsPerTransactionB + self->writesPerTransactionB)); - bool recordBegin = self->shouldRecord(std::max(now() - self->periodicLoggingInterval, self->clientBegin)); - bool recordEnd = self->shouldRecord(now()); - if (recordBegin && recordEnd) { - std::string ts = format("T=%04.0fs:", elapsed); - self->periodicMetrics.emplace_back( - ts + "Operations/sec", (ops - last_ops) / self->periodicLoggingInterval, Averaged::False); - - // if(self->rampUpLoad) { - self->periodicMetrics.emplace_back( - ts + "Mean Latency (ms)", 1000 * self->latencies.mean(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Median Latency (ms, averaged)", 1000 * self->latencies.median(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "5% Latency (ms, averaged)", 1000 * self->latencies.percentile(.05), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "95% Latency (ms, averaged)", 1000 * self->latencies.percentile(.95), Averaged::True); - - self->periodicMetrics.emplace_back( - ts + "Mean Row Read Latency (ms)", 1000 * self->readLatencies.mean(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Median Row Read Latency (ms, averaged)", 1000 * self->readLatencies.median(), Averaged::True); - self->periodicMetrics.emplace_back(ts + "5% Row Read Latency (ms, averaged)", - 1000 * self->readLatencies.percentile(.05), - Averaged::True); - self->periodicMetrics.emplace_back(ts + "95% Row Read Latency (ms, averaged)", - 1000 * self->readLatencies.percentile(.95), - Averaged::True); - - self->periodicMetrics.emplace_back( - ts + "Mean Total Read Latency (ms)", 1000 * self->fullReadLatencies.mean(), Averaged::True); - self->periodicMetrics.emplace_back(ts + "Median Total Read Latency (ms, averaged)", - 1000 * self->fullReadLatencies.median(), - Averaged::True); - self->periodicMetrics.emplace_back(ts + "5% Total Read Latency (ms, averaged)", - 1000 * self->fullReadLatencies.percentile(.05), - Averaged::True); - self->periodicMetrics.emplace_back(ts + "95% Total Read Latency (ms, averaged)", - 1000 * self->fullReadLatencies.percentile(.95), - Averaged::True); - - self->periodicMetrics.emplace_back( - ts + "Mean GRV Latency (ms)", 1000 * self->GRVLatencies.mean(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Median GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.median(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "5% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.05), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "95% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.95), Averaged::True); - - self->periodicMetrics.emplace_back( - ts + "Mean Commit Latency (ms)", 1000 * self->commitLatencies.mean(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Median Commit Latency (ms, averaged)", 1000 * self->commitLatencies.median(), Averaged::True); - self->periodicMetrics.emplace_back(ts + "5% Commit Latency (ms, averaged)", - 1000 * self->commitLatencies.percentile(.05), - Averaged::True); - self->periodicMetrics.emplace_back(ts + "95% Commit Latency (ms, averaged)", - 1000 * self->commitLatencies.percentile(.95), - Averaged::True); - //} - - self->periodicMetrics.emplace_back( - ts + "Max Latency (ms, averaged)", 1000 * self->latencies.max(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Max Row Read Latency (ms, averaged)", 1000 * self->readLatencies.max(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Max Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.max(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Max GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.max(), Averaged::True); - self->periodicMetrics.emplace_back( - ts + "Max Commit Latency (ms, averaged)", 1000 * self->commitLatencies.max(), Averaged::True); - } - last_ops = ops; - - // if(self->rampUpLoad) { - self->latencies.clear(); - self->readLatencies.clear(); - self->fullReadLatencies.clear(); - self->GRVLatencies.clear(); - self->commitLatencies.clear(); - //} - - self->readLatencyTotal = 0.0; - self->readLatencyCount = 0; - } - } - - ACTOR static Future logLatency(Future> f, - ContinuousSample* latencies, - double* totalLatency, - int* latencyCount, - EventMetricHandle readMetric, - bool shouldRecord) { - state double readBegin = now(); - Optional value = wait(f); - - double latency = now() - readBegin; - readMetric->readLatency = latency * 1e9; - readMetric->log(); - - if (shouldRecord) { - *totalLatency += latency; - ++*latencyCount; - latencies->addSample(latency); - } - return Void(); - } - ACTOR template Future readOp(Trans* tr, std::vector keys, SkewedReadWriteWorkload* self, bool shouldRecord) { if (!keys.size()) @@ -541,41 +184,13 @@ struct SkewedReadWriteWorkload : KVWorkload { std::vector> readers; for (int op = 0; op < keys.size(); op++) { ++self->totalReadsMetric; - readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])), - &self->readLatencies, - &self->readLatencyTotal, - &self->readLatencyCount, - self->readMetric, - shouldRecord)); + readers.push_back(self->logLatency(tr->get(self->keyForIndex(keys[op])), shouldRecord)); } wait(waitForAll(readers)); return Void(); } - ACTOR static Future _setup(Database cx, SkewedReadWriteWorkload* self) { - if (!self->doSetup) - return Void(); - - state Promise loadTime; - state Promise>> ratesAtKeyCounts; - - wait(bulkSetup(cx, - self, - self->nodeCount, - loadTime, - self->insertionCountsToMeasure.empty(), - self->warmingDelay, - self->maxInsertRate, - self->insertionCountsToMeasure, - ratesAtKeyCounts)); - - self->loadTime = loadTime.getFuture().get(); - self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get(); - - return Void(); - } - void startReadWriteClients(Database cx, std::vector>& clients) { clientBegin = now(); for (int c = 0; c < actorCount; c++) { @@ -592,7 +207,7 @@ struct SkewedReadWriteWorkload : KVWorkload { ACTOR static Future _start(Database cx, SkewedReadWriteWorkload* self) { state std::vector> clients; if (self->enableReadLatencyLogging) - clients.push_back(tracePeriodically(self)); + clients.push_back(self->tracePeriodically()); wait(updateServerShards(cx, self)); for (self->currentHotRound = 0; self->currentHotRound < self->skewRound; ++self->currentHotRound) { @@ -606,13 +221,6 @@ struct SkewedReadWriteWorkload : KVWorkload { return Void(); } - bool shouldRecord() { return shouldRecord(now()); } - - bool shouldRecord(double checkTime) { - double timeSinceStart = checkTime - clientBegin; - return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration); - } - // calculate hot server count void setHotServers() { hotServerCount = ceil(hotServerFraction * serverShards.size()); From ef0d49eb938ee479aa6a2d6b9b112129bd9d0fdd Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Mon, 16 May 2022 11:13:18 -0700 Subject: [PATCH 8/8] Update tests/CMakeLists.txt --- tests/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b409f6ae35..f9695703e6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -197,7 +197,6 @@ if(WITH_PYTHON) endif() add_fdb_test(TEST_FILES rare/CheckRelocation.toml) add_fdb_test(TEST_FILES rare/ClogUnclog.toml) - add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml) add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml) add_fdb_test(TEST_FILES rare/ConfigIncrement.toml) add_fdb_test(TEST_FILES rare/ConfigIncrementWithKills.toml)