add SkewReadWriteWorkload

This commit is contained in:
Xiaoxi Wang 2022-05-05 23:53:51 -07:00
parent 1c5bf135d5
commit 7ce53ca164
7 changed files with 1223 additions and 402 deletions

View File

@ -302,7 +302,8 @@ std::pair<std::vector<std::pair<UID, NetworkAddress>>, std::vector<std::pair<UID
return std::make_pair(logs, oldLogs);
}
const KeyRef serverKeysPrefix = "\xff/serverKeys/"_sr;
const KeyRangeRef serverKeysRange = KeyRangeRef("\xff/serverKeys/"_sr, "\xff/serverKeys0"_sr);
const KeyRef serverKeysPrefix = serverKeysRange.begin;
const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTrue
serverKeysTrueEmptyRange = "3"_sr, // the server treats the range as empty.
serverKeysFalse;
@ -328,6 +329,20 @@ UID serverKeysDecodeServer(const KeyRef& key) {
rd >> server_id;
return server_id;
}
std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key) {
UID server_id;
BinaryReader rd(key.removePrefix(serverKeysPrefix), Unversioned());
rd >> server_id;
rd.readBytes(1); // skip "/"
std::string bytes;
while (!rd.empty()) {
bytes.push_back((char)*rd.arenaRead(1));
}
// std::cout << bytes.size() << " " <<bytes << std::endl;
return std::make_pair(server_id, Key(bytes));
}
bool serverHasKey(ValueRef storedValue) {
return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange;
}

View File

@ -99,11 +99,13 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector<uint16_t>& serve
// Using the serverID as a prefix, then followed by the beginning of the shard range
// as the key, the value indicates whether the shard does or does not exist on the server.
// These values can be changed as data movement occurs.
extern const KeyRangeRef serverKeysRange;
extern const KeyRef serverKeysPrefix;
extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
const Key serverKeysKey(UID serverID, const KeyRef& keys);
const Key serverKeysPrefixFor(UID serverID);
UID serverKeysDecodeServer(const KeyRef& key);
std::pair<UID, Key> serverKeysDecodeServerBegin(const KeyRef& key);
bool serverHasKey(ValueRef storedValue);
extern const KeyRangeRef conflictingKeysRange;

View File

@ -1,388 +1,389 @@
set(FDBSERVER_SRCS
ApplyMetadataMutation.cpp
ApplyMetadataMutation.h
BackupInterface.h
BackupProgress.actor.cpp
BackupProgress.actor.h
BackupWorker.actor.cpp
BlobGranuleServerCommon.actor.cpp
BlobGranuleServerCommon.actor.h
BlobGranuleValidation.actor.cpp
BlobGranuleValidation.actor.h
BlobManager.actor.cpp
BlobManagerInterface.h
BlobWorker.actor.cpp
ClusterController.actor.cpp
ClusterController.actor.h
ClusterRecovery.actor.cpp
ClusterRecovery.actor.h
CommitProxyServer.actor.cpp
ConfigBroadcaster.actor.cpp
ConfigBroadcaster.h
ConfigDatabaseUnitTests.actor.cpp
ConfigFollowerInterface.cpp
ConfigFollowerInterface.h
ConfigNode.actor.cpp
ConfigNode.h
ConflictSet.h
CoordinatedState.actor.cpp
CoordinatedState.h
Coordination.actor.cpp
CoordinationInterface.h
CoroFlow.h
DataDistribution.actor.cpp
DataDistribution.actor.h
DataDistributionQueue.actor.cpp
DataDistributionTracker.actor.cpp
DataDistributorInterface.h
DBCoreState.h
DDTeamCollection.actor.cpp
DDTeamCollection.h
DiskQueue.actor.cpp
EncryptKeyProxy.actor.cpp
EncryptKeyProxyInterface.h
FDBExecHelper.actor.cpp
FDBExecHelper.actor.h
fdbserver.actor.cpp
GrvProxyServer.actor.cpp
IConfigConsumer.cpp
IConfigConsumer.h
IDiskQueue.h
IKeyValueContainer.h
IKeyValueStore.h
IPager.h
KeyValueStoreCompressTestData.actor.cpp
KeyValueStoreMemory.actor.cpp
KeyValueStoreRocksDB.actor.cpp
KeyValueStoreSQLite.actor.cpp
KmsConnector.h
KmsConnectorInterface.h
KnobProtectiveGroups.cpp
KnobProtectiveGroups.h
Knobs.h
LatencyBandConfig.cpp
LatencyBandConfig.h
LeaderElection.actor.cpp
LeaderElection.h
LocalConfiguration.actor.cpp
LocalConfiguration.h
LogProtocolMessage.h
LogRouter.actor.cpp
LogSystem.cpp
LogSystem.h
LogSystemConfig.cpp
LogSystemConfig.h
LogSystemDiskQueueAdapter.actor.cpp
LogSystemDiskQueueAdapter.h
LogSystemPeekCursor.actor.cpp
MasterInterface.h
masterserver.actor.cpp
MetricLogger.actor.cpp
MetricLogger.actor.h
MoveKeys.actor.cpp
MoveKeys.actor.h
MutationTracking.cpp
MutationTracking.h
networktest.actor.cpp
NetworkTest.h
OldTLogServer_4_6.actor.cpp
OldTLogServer_6_0.actor.cpp
OldTLogServer_6_2.actor.cpp
OnDemandStore.actor.cpp
OnDemandStore.h
PaxosConfigConsumer.actor.cpp
PaxosConfigConsumer.h
ProxyCommitData.actor.h
pubsub.actor.cpp
pubsub.h
QuietDatabase.actor.cpp
QuietDatabase.h
RadixTree.h
Ratekeeper.actor.cpp
Ratekeeper.h
RatekeeperInterface.h
RecoveryState.h
RemoteIKeyValueStore.actor.h
RemoteIKeyValueStore.actor.cpp
ResolutionBalancer.actor.cpp
ResolutionBalancer.actor.h
Resolver.actor.cpp
ResolverInterface.h
RestoreApplier.actor.cpp
RestoreApplier.actor.h
RestoreCommon.actor.cpp
RestoreCommon.actor.h
RestoreController.actor.cpp
RestoreController.actor.h
RestoreLoader.actor.cpp
RestoreLoader.actor.h
RestoreRoleCommon.actor.cpp
RestoreRoleCommon.actor.h
RestoreUtil.actor.cpp
RestoreUtil.h
RestoreWorker.actor.cpp
RestoreWorker.actor.h
RestoreWorkerInterface.actor.cpp
RestoreWorkerInterface.actor.h
RkTagThrottleCollection.cpp
RkTagThrottleCollection.h
RocksDBCheckpointUtils.actor.cpp
RocksDBCheckpointUtils.actor.h
RoleLineage.actor.cpp
RoleLineage.actor.h
ServerCheckpoint.actor.cpp
ServerCheckpoint.actor.h
ServerDBInfo.actor.h
ServerDBInfo.h
SigStack.cpp
SimKmsConnector.actor.h
SimKmsConnector.actor.cpp
SimpleConfigConsumer.actor.cpp
SimpleConfigConsumer.h
SimulatedCluster.actor.cpp
SimulatedCluster.h
SkipList.cpp
SpanContextMessage.h
Status.actor.cpp
Status.h
StorageCache.actor.cpp
StorageMetrics.actor.h
StorageMetrics.h
storageserver.actor.cpp
TagPartitionedLogSystem.actor.cpp
TagPartitionedLogSystem.actor.h
TagThrottler.actor.cpp
TagThrottler.h
TCInfo.actor.cpp
TCInfo.h
template_fdb.h
tester.actor.cpp
TesterInterface.actor.h
TLogInterface.h
TLogServer.actor.cpp
TransactionTagCounter.cpp
TransactionTagCounter.h
TSSMappingUtil.actor.cpp
TSSMappingUtil.actor.h
VersionedBTree.actor.cpp
VFSAsync.cpp
VFSAsync.h
WaitFailure.actor.cpp
WaitFailure.h
worker.actor.cpp
WorkerInterface.actor.h
workloads/ApiCorrectness.actor.cpp
workloads/ApiWorkload.actor.cpp
workloads/ApiWorkload.h
workloads/AsyncFile.actor.h
workloads/AsyncFile.cpp
workloads/AsyncFileCorrectness.actor.cpp
workloads/AsyncFileRead.actor.cpp
workloads/AsyncFileWrite.actor.cpp
workloads/AtomicOps.actor.cpp
workloads/AtomicOpsApiCorrectness.actor.cpp
workloads/AtomicRestore.actor.cpp
workloads/AtomicSwitchover.actor.cpp
workloads/BackgroundSelectors.actor.cpp
workloads/BackupAndParallelRestoreCorrectness.actor.cpp
workloads/BackupCorrectness.actor.cpp
workloads/BackupToBlob.actor.cpp
workloads/BackupToDBAbort.actor.cpp
workloads/BackupToDBCorrectness.actor.cpp
workloads/BackupToDBUpgrade.actor.cpp
workloads/BlobGranuleCorrectnessWorkload.actor.cpp
workloads/BlobGranuleVerifier.actor.cpp
workloads/BlobStoreWorkload.h
workloads/BulkLoad.actor.cpp
workloads/BulkSetup.actor.h
workloads/Cache.actor.cpp
workloads/ChangeConfig.actor.cpp
workloads/ChangeFeeds.actor.cpp
workloads/ClearSingleRange.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/ClientWorkload.actor.cpp
workloads/ClogSingleConnection.actor.cpp
workloads/CommitBugCheck.actor.cpp
workloads/ConfigIncrement.actor.cpp
workloads/ConfigureDatabase.actor.cpp
workloads/ConflictRange.actor.cpp
workloads/ConsistencyCheck.actor.cpp
workloads/CpuProfiler.actor.cpp
workloads/Cycle.actor.cpp
workloads/DataDistributionMetrics.actor.cpp
workloads/DataLossRecovery.actor.cpp
workloads/DDBalance.actor.cpp
workloads/DDMetrics.actor.cpp
workloads/DDMetricsExclude.actor.cpp
workloads/DifferentClustersSameRV.actor.cpp
workloads/DiskDurability.actor.cpp
workloads/DiskDurabilityTest.actor.cpp
workloads/DiskFailureInjection.actor.cpp
workloads/DummyWorkload.actor.cpp
workloads/EncryptionOps.actor.cpp
workloads/EncryptKeyProxyTest.actor.cpp
workloads/ExternalWorkload.actor.cpp
workloads/FastTriggeredWatches.actor.cpp
workloads/FileSystem.actor.cpp
workloads/Fuzz.cpp
workloads/FuzzApiCorrectness.actor.cpp
workloads/GetMappedRange.actor.cpp
workloads/GetRangeStream.actor.cpp
workloads/HealthMetricsApi.actor.cpp
workloads/HighContentionPrefixAllocatorWorkload.actor.cpp
workloads/Increment.actor.cpp
workloads/IncrementalBackup.actor.cpp
workloads/IndexScan.actor.cpp
workloads/Inventory.actor.cpp
workloads/KillRegion.actor.cpp
workloads/KVStoreTest.actor.cpp
workloads/LocalRatekeeper.actor.cpp
workloads/LockDatabase.actor.cpp
workloads/LockDatabaseFrequently.actor.cpp
workloads/LogMetrics.actor.cpp
workloads/LowLatency.actor.cpp
workloads/MachineAttrition.actor.cpp
workloads/Mako.actor.cpp
workloads/MemoryKeyValueStore.cpp
workloads/MemoryKeyValueStore.h
workloads/MemoryLifetime.actor.cpp
workloads/MetricLogging.actor.cpp
workloads/MiniCycle.actor.cpp
workloads/MutationLogReaderCorrectness.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/Performance.actor.cpp
workloads/PhysicalShardMove.actor.cpp
workloads/Ping.actor.cpp
workloads/PopulateTPCC.actor.cpp
workloads/PrivateEndpoints.actor.cpp
workloads/ProtocolVersion.actor.cpp
workloads/PubSubMultiples.actor.cpp
workloads/QueuePush.actor.cpp
workloads/RandomClogging.actor.cpp
workloads/RandomMoveKeys.actor.cpp
workloads/RandomSelector.actor.cpp
workloads/ReadAfterWrite.actor.cpp
workloads/ReadHotDetection.actor.cpp
workloads/ReadWrite.actor.cpp
workloads/RemoveServersSafely.actor.cpp
workloads/ReportConflictingKeys.actor.cpp
workloads/RestoreBackup.actor.cpp
workloads/RestoreFromBlob.actor.cpp
workloads/Rollback.actor.cpp
workloads/RyowCorrectness.actor.cpp
workloads/RYWDisable.actor.cpp
workloads/RYWPerformance.actor.cpp
workloads/SaveAndKill.actor.cpp
workloads/SelectorCorrectness.actor.cpp
workloads/Serializability.actor.cpp
workloads/Sideband.actor.cpp
workloads/SidebandSingle.actor.cpp
workloads/SimpleAtomicAdd.actor.cpp
workloads/SlowTaskWorkload.actor.cpp
workloads/SnapTest.actor.cpp
workloads/SpecialKeySpaceCorrectness.actor.cpp
workloads/StatusWorkload.actor.cpp
workloads/Storefront.actor.cpp
workloads/StreamingRangeRead.actor.cpp
workloads/StreamingRead.actor.cpp
workloads/SubmitBackup.actor.cpp
workloads/SuspendProcesses.actor.cpp
workloads/TagThrottleApi.actor.cpp
workloads/TargetedKill.actor.cpp
workloads/TaskBucketCorrectness.actor.cpp
workloads/TenantManagement.actor.cpp
workloads/ThreadSafety.actor.cpp
workloads/Throttling.actor.cpp
workloads/Throughput.actor.cpp
workloads/TimeKeeperCorrectness.actor.cpp
workloads/TPCC.actor.cpp
workloads/TPCCWorkload.h
workloads/TriggerRecovery.actor.cpp
workloads/UDPWorkload.actor.cpp
workloads/UnitPerf.actor.cpp
workloads/UnitTests.actor.cpp
workloads/Unreadable.actor.cpp
workloads/VersionStamp.actor.cpp
workloads/WatchAndWait.actor.cpp
workloads/Watches.actor.cpp
workloads/WatchesSameKeyCorrectness.actor.cpp
workloads/WorkerErrors.actor.cpp
workloads/workloads.actor.h
workloads/WriteBandwidth.actor.cpp
workloads/WriteDuringRead.actor.cpp
workloads/WriteTagThrottling.actor.cpp
)
ApplyMetadataMutation.cpp
ApplyMetadataMutation.h
BackupInterface.h
BackupProgress.actor.cpp
BackupProgress.actor.h
BackupWorker.actor.cpp
BlobGranuleServerCommon.actor.cpp
BlobGranuleServerCommon.actor.h
BlobGranuleValidation.actor.cpp
BlobGranuleValidation.actor.h
BlobManager.actor.cpp
BlobManagerInterface.h
BlobWorker.actor.cpp
ClusterController.actor.cpp
ClusterController.actor.h
ClusterRecovery.actor.cpp
ClusterRecovery.actor.h
CommitProxyServer.actor.cpp
ConfigBroadcaster.actor.cpp
ConfigBroadcaster.h
ConfigDatabaseUnitTests.actor.cpp
ConfigFollowerInterface.cpp
ConfigFollowerInterface.h
ConfigNode.actor.cpp
ConfigNode.h
ConflictSet.h
CoordinatedState.actor.cpp
CoordinatedState.h
Coordination.actor.cpp
CoordinationInterface.h
CoroFlow.h
DataDistribution.actor.cpp
DataDistribution.actor.h
DataDistributionQueue.actor.cpp
DataDistributionTracker.actor.cpp
DataDistributorInterface.h
DBCoreState.h
DDTeamCollection.actor.cpp
DDTeamCollection.h
DiskQueue.actor.cpp
EncryptKeyProxy.actor.cpp
EncryptKeyProxyInterface.h
FDBExecHelper.actor.cpp
FDBExecHelper.actor.h
fdbserver.actor.cpp
GrvProxyServer.actor.cpp
IConfigConsumer.cpp
IConfigConsumer.h
IDiskQueue.h
IKeyValueContainer.h
IKeyValueStore.h
IPager.h
KeyValueStoreCompressTestData.actor.cpp
KeyValueStoreMemory.actor.cpp
KeyValueStoreRocksDB.actor.cpp
KeyValueStoreSQLite.actor.cpp
KmsConnector.h
KmsConnectorInterface.h
KnobProtectiveGroups.cpp
KnobProtectiveGroups.h
Knobs.h
LatencyBandConfig.cpp
LatencyBandConfig.h
LeaderElection.actor.cpp
LeaderElection.h
LocalConfiguration.actor.cpp
LocalConfiguration.h
LogProtocolMessage.h
LogRouter.actor.cpp
LogSystem.cpp
LogSystem.h
LogSystemConfig.cpp
LogSystemConfig.h
LogSystemDiskQueueAdapter.actor.cpp
LogSystemDiskQueueAdapter.h
LogSystemPeekCursor.actor.cpp
MasterInterface.h
masterserver.actor.cpp
MetricLogger.actor.cpp
MetricLogger.actor.h
MoveKeys.actor.cpp
MoveKeys.actor.h
MutationTracking.cpp
MutationTracking.h
networktest.actor.cpp
NetworkTest.h
OldTLogServer_4_6.actor.cpp
OldTLogServer_6_0.actor.cpp
OldTLogServer_6_2.actor.cpp
OnDemandStore.actor.cpp
OnDemandStore.h
PaxosConfigConsumer.actor.cpp
PaxosConfigConsumer.h
ProxyCommitData.actor.h
pubsub.actor.cpp
pubsub.h
QuietDatabase.actor.cpp
QuietDatabase.h
RadixTree.h
Ratekeeper.actor.cpp
Ratekeeper.h
RatekeeperInterface.h
RecoveryState.h
RemoteIKeyValueStore.actor.h
RemoteIKeyValueStore.actor.cpp
ResolutionBalancer.actor.cpp
ResolutionBalancer.actor.h
Resolver.actor.cpp
ResolverInterface.h
RestoreApplier.actor.cpp
RestoreApplier.actor.h
RestoreCommon.actor.cpp
RestoreCommon.actor.h
RestoreController.actor.cpp
RestoreController.actor.h
RestoreLoader.actor.cpp
RestoreLoader.actor.h
RestoreRoleCommon.actor.cpp
RestoreRoleCommon.actor.h
RestoreUtil.actor.cpp
RestoreUtil.h
RestoreWorker.actor.cpp
RestoreWorker.actor.h
RestoreWorkerInterface.actor.cpp
RestoreWorkerInterface.actor.h
RkTagThrottleCollection.cpp
RkTagThrottleCollection.h
RocksDBCheckpointUtils.actor.cpp
RocksDBCheckpointUtils.actor.h
RoleLineage.actor.cpp
RoleLineage.actor.h
ServerCheckpoint.actor.cpp
ServerCheckpoint.actor.h
ServerDBInfo.actor.h
ServerDBInfo.h
SigStack.cpp
SimKmsConnector.actor.h
SimKmsConnector.actor.cpp
SimpleConfigConsumer.actor.cpp
SimpleConfigConsumer.h
SimulatedCluster.actor.cpp
SimulatedCluster.h
SkipList.cpp
SpanContextMessage.h
Status.actor.cpp
Status.h
StorageCache.actor.cpp
StorageMetrics.actor.h
StorageMetrics.h
storageserver.actor.cpp
TagPartitionedLogSystem.actor.cpp
TagPartitionedLogSystem.actor.h
TagThrottler.actor.cpp
TagThrottler.h
TCInfo.actor.cpp
TCInfo.h
template_fdb.h
tester.actor.cpp
TesterInterface.actor.h
TLogInterface.h
TLogServer.actor.cpp
TransactionTagCounter.cpp
TransactionTagCounter.h
TSSMappingUtil.actor.cpp
TSSMappingUtil.actor.h
VersionedBTree.actor.cpp
VFSAsync.cpp
VFSAsync.h
WaitFailure.actor.cpp
WaitFailure.h
worker.actor.cpp
WorkerInterface.actor.h
workloads/ApiCorrectness.actor.cpp
workloads/ApiWorkload.actor.cpp
workloads/ApiWorkload.h
workloads/AsyncFile.actor.h
workloads/AsyncFile.cpp
workloads/AsyncFileCorrectness.actor.cpp
workloads/AsyncFileRead.actor.cpp
workloads/AsyncFileWrite.actor.cpp
workloads/AtomicOps.actor.cpp
workloads/AtomicOpsApiCorrectness.actor.cpp
workloads/AtomicRestore.actor.cpp
workloads/AtomicSwitchover.actor.cpp
workloads/BackgroundSelectors.actor.cpp
workloads/BackupAndParallelRestoreCorrectness.actor.cpp
workloads/BackupCorrectness.actor.cpp
workloads/BackupToBlob.actor.cpp
workloads/BackupToDBAbort.actor.cpp
workloads/BackupToDBCorrectness.actor.cpp
workloads/BackupToDBUpgrade.actor.cpp
workloads/BlobGranuleCorrectnessWorkload.actor.cpp
workloads/BlobGranuleVerifier.actor.cpp
workloads/BlobStoreWorkload.h
workloads/BulkLoad.actor.cpp
workloads/BulkSetup.actor.h
workloads/Cache.actor.cpp
workloads/ChangeConfig.actor.cpp
workloads/ChangeFeeds.actor.cpp
workloads/ClearSingleRange.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/ClientWorkload.actor.cpp
workloads/ClogSingleConnection.actor.cpp
workloads/CommitBugCheck.actor.cpp
workloads/ConfigIncrement.actor.cpp
workloads/ConfigureDatabase.actor.cpp
workloads/ConflictRange.actor.cpp
workloads/ConsistencyCheck.actor.cpp
workloads/CpuProfiler.actor.cpp
workloads/Cycle.actor.cpp
workloads/DataDistributionMetrics.actor.cpp
workloads/DataLossRecovery.actor.cpp
workloads/DDBalance.actor.cpp
workloads/DDMetrics.actor.cpp
workloads/DDMetricsExclude.actor.cpp
workloads/DifferentClustersSameRV.actor.cpp
workloads/DiskDurability.actor.cpp
workloads/DiskDurabilityTest.actor.cpp
workloads/DiskFailureInjection.actor.cpp
workloads/DummyWorkload.actor.cpp
workloads/EncryptionOps.actor.cpp
workloads/EncryptKeyProxyTest.actor.cpp
workloads/ExternalWorkload.actor.cpp
workloads/FastTriggeredWatches.actor.cpp
workloads/FileSystem.actor.cpp
workloads/Fuzz.cpp
workloads/FuzzApiCorrectness.actor.cpp
workloads/GetMappedRange.actor.cpp
workloads/GetRangeStream.actor.cpp
workloads/HealthMetricsApi.actor.cpp
workloads/HighContentionPrefixAllocatorWorkload.actor.cpp
workloads/Increment.actor.cpp
workloads/IncrementalBackup.actor.cpp
workloads/IndexScan.actor.cpp
workloads/Inventory.actor.cpp
workloads/KillRegion.actor.cpp
workloads/KVStoreTest.actor.cpp
workloads/LocalRatekeeper.actor.cpp
workloads/LockDatabase.actor.cpp
workloads/LockDatabaseFrequently.actor.cpp
workloads/LogMetrics.actor.cpp
workloads/LowLatency.actor.cpp
workloads/MachineAttrition.actor.cpp
workloads/Mako.actor.cpp
workloads/MemoryKeyValueStore.cpp
workloads/MemoryKeyValueStore.h
workloads/MemoryLifetime.actor.cpp
workloads/MetricLogging.actor.cpp
workloads/MiniCycle.actor.cpp
workloads/MutationLogReaderCorrectness.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/Performance.actor.cpp
workloads/PhysicalShardMove.actor.cpp
workloads/Ping.actor.cpp
workloads/PopulateTPCC.actor.cpp
workloads/PrivateEndpoints.actor.cpp
workloads/ProtocolVersion.actor.cpp
workloads/PubSubMultiples.actor.cpp
workloads/QueuePush.actor.cpp
workloads/RandomClogging.actor.cpp
workloads/RandomMoveKeys.actor.cpp
workloads/RandomSelector.actor.cpp
workloads/ReadAfterWrite.actor.cpp
workloads/ReadHotDetection.actor.cpp
workloads/ReadWrite.actor.cpp
workloads/RemoveServersSafely.actor.cpp
workloads/ReportConflictingKeys.actor.cpp
workloads/RestoreBackup.actor.cpp
workloads/RestoreFromBlob.actor.cpp
workloads/Rollback.actor.cpp
workloads/RyowCorrectness.actor.cpp
workloads/RYWDisable.actor.cpp
workloads/RYWPerformance.actor.cpp
workloads/SaveAndKill.actor.cpp
workloads/SelectorCorrectness.actor.cpp
workloads/Serializability.actor.cpp
workloads/Sideband.actor.cpp
workloads/SidebandSingle.actor.cpp
workloads/SimpleAtomicAdd.actor.cpp
workloads/SkewedReadWrite.actor.cpp
workloads/SlowTaskWorkload.actor.cpp
workloads/SnapTest.actor.cpp
workloads/SpecialKeySpaceCorrectness.actor.cpp
workloads/StatusWorkload.actor.cpp
workloads/Storefront.actor.cpp
workloads/StreamingRangeRead.actor.cpp
workloads/StreamingRead.actor.cpp
workloads/SubmitBackup.actor.cpp
workloads/SuspendProcesses.actor.cpp
workloads/TagThrottleApi.actor.cpp
workloads/TargetedKill.actor.cpp
workloads/TaskBucketCorrectness.actor.cpp
workloads/TenantManagement.actor.cpp
workloads/ThreadSafety.actor.cpp
workloads/Throttling.actor.cpp
workloads/Throughput.actor.cpp
workloads/TimeKeeperCorrectness.actor.cpp
workloads/TPCC.actor.cpp
workloads/TPCCWorkload.h
workloads/TriggerRecovery.actor.cpp
workloads/UDPWorkload.actor.cpp
workloads/UnitPerf.actor.cpp
workloads/UnitTests.actor.cpp
workloads/Unreadable.actor.cpp
workloads/VersionStamp.actor.cpp
workloads/WatchAndWait.actor.cpp
workloads/Watches.actor.cpp
workloads/WatchesSameKeyCorrectness.actor.cpp
workloads/WorkerErrors.actor.cpp
workloads/workloads.actor.h
workloads/WriteBandwidth.actor.cpp
workloads/WriteDuringRead.actor.cpp
workloads/WriteTagThrottling.actor.cpp
)
if(${COROUTINE_IMPL} STREQUAL libcoro)
list(APPEND FDBSERVER_SRCS CoroFlowCoro.actor.cpp)
else()
list(APPEND FDBSERVER_SRCS CoroFlow.actor.cpp)
endif()
if (${COROUTINE_IMPL} STREQUAL libcoro)
list(APPEND FDBSERVER_SRCS CoroFlowCoro.actor.cpp)
else ()
list(APPEND FDBSERVER_SRCS CoroFlow.actor.cpp)
endif ()
add_library(fdb_sqlite STATIC
sqlite/btree.h
sqlite/hash.h
sqlite/sqlite3.h
sqlite/sqlite3ext.h
sqlite/sqliteInt.h
sqlite/sqliteLimit.h
sqlite/sqlite3.amalgamation.c)
sqlite/btree.h
sqlite/hash.h
sqlite/sqlite3.h
sqlite/sqlite3ext.h
sqlite/sqliteInt.h
sqlite/sqliteLimit.h
sqlite/sqlite3.amalgamation.c)
if (WITH_ROCKSDB_EXPERIMENTAL)
add_definitions(-DSSD_ROCKSDB_EXPERIMENTAL)
add_definitions(-DSSD_ROCKSDB_EXPERIMENTAL)
include(CompileRocksDB)
# CompileRocksDB sets `lz4_LIBRARIES` to be the shared lib, we want to link
# statically, so find the static library here.
find_library(lz4_STATIC_LIBRARIES
NAMES liblz4.a REQUIRED)
if (WITH_LIBURING)
find_package(uring)
endif()
endif()
include(CompileRocksDB)
# CompileRocksDB sets `lz4_LIBRARIES` to be the shared lib, we want to link
# statically, so find the static library here.
find_library(lz4_STATIC_LIBRARIES
NAMES liblz4.a REQUIRED)
if (WITH_LIBURING)
find_package(uring)
endif ()
endif ()
# Suppress warnings in sqlite since it's third party
if(NOT WIN32)
target_compile_definitions(fdb_sqlite PRIVATE $<$<CONFIG:Debug>:NDEBUG>)
target_compile_options(fdb_sqlite BEFORE PRIVATE -w) # disable warnings for third party
endif()
if (NOT WIN32)
target_compile_definitions(fdb_sqlite PRIVATE $<$<CONFIG:Debug>:NDEBUG>)
target_compile_options(fdb_sqlite BEFORE PRIVATE -w) # disable warnings for third party
endif ()
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/workloads)
add_flow_target(EXECUTABLE NAME fdbserver SRCS ${FDBSERVER_SRCS})
target_include_directories(fdbserver PRIVATE
${CMAKE_SOURCE_DIR}/bindings/c
${CMAKE_BINARY_DIR}/bindings/c
${CMAKE_CURRENT_BINARY_DIR}/workloads
${CMAKE_CURRENT_SOURCE_DIR}/workloads)
${CMAKE_SOURCE_DIR}/bindings/c
${CMAKE_BINARY_DIR}/bindings/c
${CMAKE_CURRENT_BINARY_DIR}/workloads
${CMAKE_CURRENT_SOURCE_DIR}/workloads)
if (WITH_ROCKSDB_EXPERIMENTAL)
add_dependencies(fdbserver rocksdb)
if(WITH_LIBURING)
target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR} ${uring_INCLUDE_DIR})
target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${uring_LIBRARIES} ${lz4_STATIC_LIBRARIES})
target_compile_definitions(fdbserver PRIVATE BOOST_ASIO_HAS_IO_URING=1 BOOST_ASIO_DISABLE_EPOLL=1)
else()
target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR})
target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${lz4_STATIC_LIBRARIES})
target_compile_definitions(fdbserver PRIVATE)
endif()
else()
target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite)
endif()
add_dependencies(fdbserver rocksdb)
if (WITH_LIBURING)
target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR} ${uring_INCLUDE_DIR})
target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${uring_LIBRARIES} ${lz4_STATIC_LIBRARIES})
target_compile_definitions(fdbserver PRIVATE BOOST_ASIO_HAS_IO_URING=1 BOOST_ASIO_DISABLE_EPOLL=1)
else ()
target_include_directories(fdbserver PRIVATE ${ROCKSDB_INCLUDE_DIR})
target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite ${ROCKSDB_LIBRARIES} ${lz4_STATIC_LIBRARIES})
target_compile_definitions(fdbserver PRIVATE)
endif ()
else ()
target_link_libraries(fdbserver PRIVATE fdbclient fdb_sqlite)
endif ()
target_link_libraries(fdbserver PRIVATE toml11_target jemalloc)
# target_compile_definitions(fdbserver PRIVATE -DENABLE_SAMPLING)
if (GPERFTOOLS_FOUND)
target_link_libraries(fdbserver PRIVATE gperftools)
endif()
target_link_libraries(fdbserver PRIVATE gperftools)
endif ()
if(NOT OPEN_FOR_IDE)
if(GENERATE_DEBUG_PACKAGES)
fdb_install(TARGETS fdbserver DESTINATION sbin COMPONENT server)
else()
add_custom_target(prepare_fdbserver_install ALL DEPENDS strip_only_fdbserver)
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbserver DESTINATION sbin COMPONENT server)
endif()
endif()
if (NOT OPEN_FOR_IDE)
if (GENERATE_DEBUG_PACKAGES)
fdb_install(TARGETS fdbserver DESTINATION sbin COMPONENT server)
else ()
add_custom_target(prepare_fdbserver_install ALL DEPENDS strip_only_fdbserver)
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbserver DESTINATION sbin COMPONENT server)
endif ()
endif ()

View File

@ -99,6 +99,20 @@ Key KVWorkload::keyForIndex(uint64_t index) const {
}
}
int64_t KVWorkload::indexForKey(const KeyRef& key, bool absent) const {
int idx = 0;
if (nodePrefix > 0) {
ASSERT(keyBytes >= 32);
idx += 16;
}
ASSERT(keyBytes >= 16);
// extract int64_t index, the reverse process of emplaceIndex()
auto end = key.size() - idx - (absent ? 1 : 0);
std::string str((char*)key.begin() + idx, end);
int64_t res = std::stoll(str, nullptr, 16);
return res;
}
Key KVWorkload::keyForIndex(uint64_t index, bool absent) const {
int adjustedKeyBytes = (absent) ? (keyBytes + 1) : keyBytes;
Key result = makeString(adjustedKeyBytes);
@ -112,8 +126,8 @@ Key KVWorkload::keyForIndex(uint64_t index, bool absent) const {
idx += 16;
}
ASSERT(keyBytes >= 16);
double d = double(index) / nodeCount;
emplaceIndex(data, idx, *(int64_t*)&d);
emplaceIndex(data, idx, (int64_t)index);
// ASSERT(indexForKey(result) == (int64_t)index); // debug assert
return result;
}
@ -1855,7 +1869,9 @@ ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
}
choose {
when(wait(tests)) { return Void(); }
when(wait(tests)) {
return Void();
}
when(wait(quorum(actors, 1))) {
ASSERT(false);
throw internal_error();

View File

@ -76,57 +76,64 @@ DESCR struct ReadMetric {
};
struct ReadWriteWorkload : KVWorkload {
// general test setting
Standalone<StringRef> descriptionString;
bool doSetup, cancelWorkersAtDuration;
double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime;
double metricsStart, metricsDuration;
std::vector<uint64_t> insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup
// test log setting
bool enableReadLatencyLogging;
double periodicLoggingInterval;
// use ReadWrite as a ramp up workload
bool rampUpLoad; // indicate this is a ramp up workload
int rampSweepCount; // how many times of ramp up
bool rampTransactionType; // choose transaction type based on client start time
bool rampUpConcurrency; // control client concurrency
// transaction setting
bool useRYW;
bool batchPriority;
bool rangeReads; // read operations are all single key range read
bool dependentReads; // read operations are issued sequentially
bool inconsistentReads; // read with previous read version
bool adjacentReads; // keys are adjacent within a transaction
bool adjacentWrites;
double alpha; // probability for run TransactionA type
// two type of transaction
int readsPerTransactionA, writesPerTransactionA;
int readsPerTransactionB, writesPerTransactionB;
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
double testDuration, transactionsPerSecond, alpha, warmingDelay, loadTime, maxInsertRate, debugInterval, debugTime;
double metricsStart, metricsDuration, clientBegin;
std::string valueString;
// hot traffic pattern
double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting
bool dependentReads;
bool enableReadLatencyLogging;
double periodicLoggingInterval;
bool cancelWorkersAtDuration;
bool inconsistentReads;
bool adjacentReads;
bool adjacentWrites;
bool rampUpLoad;
int rampSweepCount;
double hotKeyFraction, forceHotProbability;
bool rangeReads;
bool useRYW;
bool rampTransactionType;
bool rampUpConcurrency;
bool batchPriority;
Standalone<StringRef> descriptionString;
// states of metric
Int64MetricHandle totalReadsMetric;
Int64MetricHandle totalRetriesMetric;
EventMetricHandle<TransactionSuccessMetric> transactionSuccessMetric;
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
EventMetricHandle<ReadMetric> readMetric;
std::vector<Future<Void>> clients;
PerfIntCounter aTransactions, bTransactions, retries;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
double readLatencyTotal;
int readLatencyCount;
std::vector<uint64_t> insertionCountsToMeasure;
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
std::vector<PerfMetric> periodicMetrics;
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts; // sequential insertion speed
bool doSetup;
// other internal states
std::vector<Future<Void>> clients;
double loadTime, clientBegin;
ReadWriteWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), loadTime(0.0), clientBegin(0), dependentReads(false), adjacentReads(false),
adjacentWrites(false), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")),
: KVWorkload(wcx), dependentReads(false), adjacentReads(false), adjacentWrites(false), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")),
totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"),
bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize),
commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0),
readLatencyCount(0) {
bTransactions("B Transactions"), retries("Retries"),
latencies(sampleSize), readLatencies(sampleSize), commitLatencies(sampleSize), GRVLatencies(sampleSize),
fullReadLatencies(sampleSize), readLatencyTotal(0), readLatencyCount(0), loadTime(0.0),
clientBegin(0) {
transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction"));
transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction"));
readMetric.init(LiteralStringRef("RWWorkload.Read"));

View File

@ -0,0 +1,778 @@
/*
* ReadWrite.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <boost/lexical_cast.hpp>
#include <utility>
#include <vector>
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const int sampleSize = 10000;
DESCR struct TransactionSuccessMetric {
int64_t totalLatency; // ns
int64_t startLatency; // ns
int64_t commitLatency; // ns
int64_t retries; // count
};
DESCR struct TransactionFailureMetric {
int64_t startLatency; // ns
int64_t errorCode; // flow error code
};
DESCR struct ReadMetric {
int64_t readLatency; // ns
};
struct SkewedReadWriteWorkload : KVWorkload {
// general test setting
Standalone<StringRef> descriptionString;
bool doSetup, cancelWorkersAtDuration;
double testDuration, transactionsPerSecond, warmingDelay, maxInsertRate, debugInterval, debugTime;
double metricsStart, metricsDuration;
std::vector<uint64_t> insertionCountsToMeasure; // measure the speed of sequential insertion when bulkSetup
// test log setting
bool enableReadLatencyLogging;
double periodicLoggingInterval;
// transaction setting
bool useRYW;
double alpha; // probability for run TransactionA type
// two type of transaction
int readsPerTransactionA, writesPerTransactionA;
int readsPerTransactionB, writesPerTransactionB;
std::string valueString;
// server based hot traffic setting
int skewRound = 0; // skewDuration = ceil(testDuration / skewRound)
double hotServerFraction = 0, hotServerShardFraction = 1.0; // set > 0 to issue hot key based on shard map
double hotServerReadFrac, hotServerWriteFrac; // hot many traffic goes to hot servers
double hotReadWriteServerOverlap; // the portion of intersection of write and hot server
// hot server state
typedef std::vector<std::pair<int64_t, int64_t>> IndexRangeVec;
// keyForIndex generate key from index. So for a shard range, recording the start and end is enough
std::vector<std::pair<UID, IndexRangeVec>> serverShards; // storage server and the shards it owns
std::map<UID, StorageServerInterface> serverInterfaces;
int hotServerCount = 0, currentHotRound = -1;
// states of metric
Int64MetricHandle totalReadsMetric;
Int64MetricHandle totalRetriesMetric;
EventMetricHandle<TransactionSuccessMetric> transactionSuccessMetric;
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
EventMetricHandle<ReadMetric> readMetric;
PerfIntCounter aTransactions, bTransactions, retries;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
double readLatencyTotal;
int readLatencyCount;
std::vector<PerfMetric> periodicMetrics;
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts; // sequential insertion speed
// other internal states
std::vector<Future<Void>> clients;
double loadTime, clientBegin;
SkewedReadWriteWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), totalReadsMetric(LiteralStringRef("RWWorkload.TotalReads")),
totalRetriesMetric(LiteralStringRef("RWWorkload.TotalRetries")), aTransactions("A Transactions"),
bTransactions("B Transactions"), retries("Retries"), latencies(sampleSize), readLatencies(sampleSize),
commitLatencies(sampleSize), GRVLatencies(sampleSize), fullReadLatencies(sampleSize), readLatencyTotal(0),
readLatencyCount(0), loadTime(0.0), clientBegin(0) {
transactionSuccessMetric.init(LiteralStringRef("RWWorkload.SuccessfulTransaction"));
transactionFailureMetric.init(LiteralStringRef("RWWorkload.FailedTransaction"));
readMetric.init(LiteralStringRef("RWWorkload.Read"));
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
double allowedLatency = getOption(options, LiteralStringRef("allowedLatency"), 0.250);
actorCount = ceil(transactionsPerSecond * allowedLatency);
actorCount = getOption(options, LiteralStringRef("actorCountPerTester"), actorCount);
readsPerTransactionA = getOption(options, LiteralStringRef("readsPerTransactionA"), 10);
writesPerTransactionA = getOption(options, LiteralStringRef("writesPerTransactionA"), 0);
readsPerTransactionB = getOption(options, LiteralStringRef("readsPerTransactionB"), 1);
writesPerTransactionB = getOption(options, LiteralStringRef("writesPerTransactionB"), 9);
alpha = getOption(options, LiteralStringRef("alpha"), 0.1);
valueString = std::string(maxValueBytes, '.');
if (nodePrefix > 0) {
keyBytes += 16;
}
metricsStart = getOption(options, LiteralStringRef("metricsStart"), 0.0);
metricsDuration = getOption(options, LiteralStringRef("metricsDuration"), testDuration);
if (getOption(options, LiteralStringRef("discardEdgeMeasurements"), true)) {
// discardEdgeMeasurements keeps the metrics from the middle 3/4 of the test
metricsStart += testDuration * 0.125;
metricsDuration *= 0.75;
}
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
debugInterval = getOption(options, LiteralStringRef("debugInterval"), 0.0);
debugTime = getOption(options, LiteralStringRef("debugTime"), 0.0);
enableReadLatencyLogging = getOption(options, LiteralStringRef("enableReadLatencyLogging"), false);
periodicLoggingInterval = getOption(options, LiteralStringRef("periodicLoggingInterval"), 5.0);
cancelWorkersAtDuration = getOption(options, LiteralStringRef("cancelWorkersAtDuration"), true);
useRYW = getOption(options, LiteralStringRef("useRYW"), false);
doSetup = getOption(options, LiteralStringRef("setup"), true);
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("SkewedReadWrite"));
// Validate that keyForIndex() is monotonic
for (int i = 0; i < 30; i++) {
int64_t a = deterministicRandom()->randomInt64(0, nodeCount);
int64_t b = deterministicRandom()->randomInt64(0, nodeCount);
if (a > b) {
std::swap(a, b);
}
ASSERT(a <= b);
ASSERT((keyForIndex(a, false) <= keyForIndex(b, false)));
}
std::vector<std::string> insertionCountsToMeasureString =
getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector<std::string>());
for (int i = 0; i < insertionCountsToMeasureString.size(); i++) {
try {
uint64_t count = boost::lexical_cast<uint64_t>(insertionCountsToMeasureString[i]);
insertionCountsToMeasure.push_back(count);
} catch (...) {
}
}
{
hotServerFraction = getOption(options, "hotServerFraction"_sr, 0.2);
hotServerShardFraction = getOption(options, "hotServerShardFraction"_sr, 1.0);
hotReadWriteServerOverlap = getOption(options, "hotReadWriteServerOverlap"_sr, 0.0);
skewRound = getOption(options, "skewRound"_sr, 1);
hotServerReadFrac = getOption(options, "hotServerReadFrac"_sr, 0.8);
hotServerWriteFrac = getOption(options, "hotServerWriteFrac"_sr, 0.0);
ASSERT((hotServerReadFrac >= hotServerFraction || hotServerWriteFrac >= hotServerFraction) &&
skewRound > 0);
}
}
std::string description() const override { return descriptionString.toString(); }
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
Future<Void> start(Database const& cx) override { return _start(cx, this); }
ACTOR static Future<bool> traceDumpWorkers(Reference<AsyncVar<ServerDBInfo> const> db) {
try {
loop {
choose {
when(wait(db->onChange())) {}
when(ErrorOr<std::vector<WorkerDetails>> workerList =
wait(db->get().clusterInterface.getWorkers.tryGetReply(GetWorkersRequest()))) {
if (workerList.present()) {
std::vector<Future<ErrorOr<Void>>> dumpRequests;
dumpRequests.reserve(workerList.get().size());
for (int i = 0; i < workerList.get().size(); i++)
dumpRequests.push_back(workerList.get()[i].interf.traceBatchDumpRequest.tryGetReply(
TraceBatchDumpRequest()));
wait(waitForAll(dumpRequests));
return true;
}
wait(delay(1.0));
}
}
}
} catch (Error& e) {
TraceEvent(SevError, "FailedToDumpWorkers").error(e);
throw;
}
}
Future<bool> check(Database const& cx) override {
clients.clear();
if (!cancelWorkersAtDuration && now() < metricsStart + metricsDuration)
metricsDuration = now() - metricsStart;
g_traceBatch.dump();
if (clientId == 0)
return traceDumpWorkers(dbInfo);
else
return true;
}
void getMetrics(std::vector<PerfMetric>& m) override {
double duration = metricsDuration;
int reads =
(aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB);
int writes =
(aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB);
m.emplace_back("Measured Duration", duration, Averaged::True);
m.emplace_back(
"Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, Averaged::False);
m.emplace_back("Operations/sec", ((reads + writes) / duration), Averaged::False);
m.push_back(aTransactions.getMetric());
m.push_back(bTransactions.getMetric());
m.push_back(retries.getMetric());
m.emplace_back("Mean load time (seconds)", loadTime, Averaged::True);
m.emplace_back("Read rows", reads, Averaged::False);
m.emplace_back("Write rows", writes, Averaged::False);
m.emplace_back("Read rows/sec", reads / duration, Averaged::False);
m.emplace_back("Write rows/sec", writes / duration, Averaged::False);
m.emplace_back(
"Bytes read/sec", (reads * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration, Averaged::False);
m.emplace_back("Bytes written/sec",
(writes * (keyBytes + (minValueBytes + maxValueBytes) * 0.5)) / duration,
Averaged::False);
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
std::vector<std::pair<uint64_t, double>>::iterator ratesItr = ratesAtKeyCounts.begin();
for (; ratesItr != ratesAtKeyCounts.end(); ratesItr++)
m.emplace_back(format("%lld keys imported bytes/sec", ratesItr->first), ratesItr->second, Averaged::False);
}
Value randomValue() {
return StringRef((uint8_t*)valueString.c_str(),
deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1));
}
Standalone<KeyValueRef> operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); }
void debugPrintServerShards() const {
std::cout << std::hex;
for (auto it : this->serverShards) {
std::cout << serverInterfaces.at(it.first).address().toString() << ": [";
for (auto p : it.second) {
std::cout << "[" << p.first << "," << p.second << "], ";
}
std::cout << "] \n";
}
}
// for each boundary except the last one in boundaries, found the first existed key generated from keyForIndex as
// beginIdx, found the last existed key generated from keyForIndex the endIdx.
ACTOR static Future<IndexRangeVec> convertKeyBoundaryToIndexShard(Database cx,
SkewedReadWriteWorkload* self,
Standalone<VectorRef<KeyRef>> boundaries) {
state IndexRangeVec res;
state int i = 0;
for (; i < boundaries.size() - 1; ++i) {
KeyRangeRef currentShard = KeyRangeRef(boundaries[i], boundaries[i + 1]);
// std::cout << currentShard.toString() << "\n";
std::vector<RangeResult> ranges = wait(runRYWTransaction(
cx, [currentShard](Reference<ReadYourWritesTransaction> tr) -> Future<std::vector<RangeResult>> {
std::vector<Future<RangeResult>> f;
f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::False));
f.push_back(tr->getRange(currentShard, 1, Snapshot::False, Reverse::True));
return getAll(f);
}));
ASSERT(ranges[0].size() == 1 && ranges[1].size() == 1);
res.emplace_back(self->indexForKey(ranges[0][0].key), self->indexForKey(ranges[1][0].key));
}
ASSERT(res.size() == boundaries.size() - 1);
return res;
}
ACTOR static Future<Void> updateServerShards(Database cx, SkewedReadWriteWorkload* self) {
state Future<RangeResult> serverList =
runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) -> Future<RangeResult> {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
return tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
});
state RangeResult range =
wait(runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) -> Future<RangeResult> {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
return tr->getRange(serverKeysRange, CLIENT_KNOBS->TOO_MANY);
}));
wait(success(serverList));
// decode server interfaces
self->serverInterfaces.clear();
for (int i = 0; i < serverList.get().size(); i++) {
auto ssi = decodeServerListValue(serverList.get()[i].value);
self->serverInterfaces.emplace(ssi.id(), ssi);
}
// clear self->serverShards
self->serverShards.clear();
// leftEdge < workloadBegin < workloadEnd
Key workloadBegin = self->keyForIndex(0), workloadEnd = self->keyForIndex(self->nodeCount);
Key leftEdge(allKeys.begin);
std::vector<UID> leftServer; // left server owns the range [leftEdge, workloadBegin)
KeyRangeRef workloadRange(workloadBegin, workloadEnd);
state std::map<Key, std::vector<UID>> beginServers; // begin index to server ID
for (auto kv = range.begin(); kv != range.end(); kv++) {
if (serverHasKey(kv->value)) {
auto [id, key] = serverKeysDecodeServerBegin(kv->key);
if (workloadRange.contains(key)) {
beginServers[key].push_back(id);
} else if (workloadBegin > key && key > leftEdge) { // update left boundary
leftEdge = key;
leftServer.clear();
}
if (key == leftEdge) {
leftServer.push_back(id);
}
}
}
ASSERT(beginServers.size() == 0 || beginServers.begin()->first >= workloadBegin);
// handle the left boundary
if (beginServers.size() == 0 || beginServers.begin()->first > workloadBegin) {
beginServers[workloadBegin] = leftServer;
}
Standalone<VectorRef<KeyRef>> keyBegins;
for (auto p = beginServers.begin(); p != beginServers.end(); ++p) {
keyBegins.push_back(keyBegins.arena(), p->first);
}
// deep count because wait below will destruct workloadEnd
keyBegins.push_back_deep(keyBegins.arena(), workloadEnd);
IndexRangeVec indexShards = wait(convertKeyBoundaryToIndexShard(cx, self, keyBegins));
ASSERT(beginServers.size() == indexShards.size());
// sort shard begin idx
// build self->serverShards, starting from the left shard
std::map<UID, IndexRangeVec> serverShards;
int i = 0;
for (auto p = beginServers.begin(); p != beginServers.end(); ++p) {
for (int j = 0; j < p->second.size(); ++j) {
serverShards[p->second[j]].emplace_back(indexShards[i]);
}
++i;
}
// self->serverShards is ordered by UID
for (auto it : serverShards) {
self->serverShards.emplace_back(it);
}
// if (self->clientId == 0) {
// self->debugPrintServerShards();
// }
return Void();
}
ACTOR static Future<Void> tracePeriodically(SkewedReadWriteWorkload* self) {
state double start = now();
state double elapsed = 0.0;
state int64_t last_ops = 0;
loop {
elapsed += self->periodicLoggingInterval;
wait(delayUntil(start + elapsed));
TraceEvent((self->description() + "_RowReadLatency").c_str())
.detail("Mean", self->readLatencies.mean())
.detail("Median", self->readLatencies.median())
.detail("Percentile5", self->readLatencies.percentile(.05))
.detail("Percentile95", self->readLatencies.percentile(.95))
.detail("Percentile99", self->readLatencies.percentile(.99))
.detail("Percentile99_9", self->readLatencies.percentile(.999))
.detail("Max", self->readLatencies.max())
.detail("Count", self->readLatencyCount)
.detail("Elapsed", elapsed);
TraceEvent((self->description() + "_GRVLatency").c_str())
.detail("Mean", self->GRVLatencies.mean())
.detail("Median", self->GRVLatencies.median())
.detail("Percentile5", self->GRVLatencies.percentile(.05))
.detail("Percentile95", self->GRVLatencies.percentile(.95))
.detail("Percentile99", self->GRVLatencies.percentile(.99))
.detail("Percentile99_9", self->GRVLatencies.percentile(.999))
.detail("Max", self->GRVLatencies.max());
TraceEvent((self->description() + "_CommitLatency").c_str())
.detail("Mean", self->commitLatencies.mean())
.detail("Median", self->commitLatencies.median())
.detail("Percentile5", self->commitLatencies.percentile(.05))
.detail("Percentile95", self->commitLatencies.percentile(.95))
.detail("Percentile99", self->commitLatencies.percentile(.99))
.detail("Percentile99_9", self->commitLatencies.percentile(.999))
.detail("Max", self->commitLatencies.max());
TraceEvent((self->description() + "_TotalLatency").c_str())
.detail("Mean", self->latencies.mean())
.detail("Median", self->latencies.median())
.detail("Percentile5", self->latencies.percentile(.05))
.detail("Percentile95", self->latencies.percentile(.95))
.detail("Percentile99", self->latencies.percentile(.99))
.detail("Percentile99_9", self->latencies.percentile(.999))
.detail("Max", self->latencies.max());
int64_t ops =
(self->aTransactions.getValue() * (self->readsPerTransactionA + self->writesPerTransactionA)) +
(self->bTransactions.getValue() * (self->readsPerTransactionB + self->writesPerTransactionB));
bool recordBegin = self->shouldRecord(std::max(now() - self->periodicLoggingInterval, self->clientBegin));
bool recordEnd = self->shouldRecord(now());
if (recordBegin && recordEnd) {
std::string ts = format("T=%04.0fs:", elapsed);
self->periodicMetrics.emplace_back(
ts + "Operations/sec", (ops - last_ops) / self->periodicLoggingInterval, Averaged::False);
// if(self->rampUpLoad) {
self->periodicMetrics.emplace_back(
ts + "Mean Latency (ms)", 1000 * self->latencies.mean(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Median Latency (ms, averaged)", 1000 * self->latencies.median(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "5% Latency (ms, averaged)", 1000 * self->latencies.percentile(.05), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "95% Latency (ms, averaged)", 1000 * self->latencies.percentile(.95), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Mean Row Read Latency (ms)", 1000 * self->readLatencies.mean(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Median Row Read Latency (ms, averaged)", 1000 * self->readLatencies.median(), Averaged::True);
self->periodicMetrics.emplace_back(ts + "5% Row Read Latency (ms, averaged)",
1000 * self->readLatencies.percentile(.05),
Averaged::True);
self->periodicMetrics.emplace_back(ts + "95% Row Read Latency (ms, averaged)",
1000 * self->readLatencies.percentile(.95),
Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Mean Total Read Latency (ms)", 1000 * self->fullReadLatencies.mean(), Averaged::True);
self->periodicMetrics.emplace_back(ts + "Median Total Read Latency (ms, averaged)",
1000 * self->fullReadLatencies.median(),
Averaged::True);
self->periodicMetrics.emplace_back(ts + "5% Total Read Latency (ms, averaged)",
1000 * self->fullReadLatencies.percentile(.05),
Averaged::True);
self->periodicMetrics.emplace_back(ts + "95% Total Read Latency (ms, averaged)",
1000 * self->fullReadLatencies.percentile(.95),
Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Mean GRV Latency (ms)", 1000 * self->GRVLatencies.mean(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Median GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.median(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "5% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.05), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "95% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.95), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Mean Commit Latency (ms)", 1000 * self->commitLatencies.mean(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Median Commit Latency (ms, averaged)", 1000 * self->commitLatencies.median(), Averaged::True);
self->periodicMetrics.emplace_back(ts + "5% Commit Latency (ms, averaged)",
1000 * self->commitLatencies.percentile(.05),
Averaged::True);
self->periodicMetrics.emplace_back(ts + "95% Commit Latency (ms, averaged)",
1000 * self->commitLatencies.percentile(.95),
Averaged::True);
//}
self->periodicMetrics.emplace_back(
ts + "Max Latency (ms, averaged)", 1000 * self->latencies.max(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Max Row Read Latency (ms, averaged)", 1000 * self->readLatencies.max(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Max Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.max(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Max GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.max(), Averaged::True);
self->periodicMetrics.emplace_back(
ts + "Max Commit Latency (ms, averaged)", 1000 * self->commitLatencies.max(), Averaged::True);
}
last_ops = ops;
// if(self->rampUpLoad) {
self->latencies.clear();
self->readLatencies.clear();
self->fullReadLatencies.clear();
self->GRVLatencies.clear();
self->commitLatencies.clear();
//}
self->readLatencyTotal = 0.0;
self->readLatencyCount = 0;
}
}
ACTOR static Future<Void> logLatency(Future<Optional<Value>> f,
ContinuousSample<double>* latencies,
double* totalLatency,
int* latencyCount,
EventMetricHandle<ReadMetric> readMetric,
bool shouldRecord) {
state double readBegin = now();
Optional<Value> value = wait(f);
double latency = now() - readBegin;
readMetric->readLatency = latency * 1e9;
readMetric->log();
if (shouldRecord) {
*totalLatency += latency;
++*latencyCount;
latencies->addSample(latency);
}
return Void();
}
ACTOR template <class Trans>
Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, SkewedReadWriteWorkload* self, bool shouldRecord) {
if (!keys.size())
return Void();
std::vector<Future<Void>> readers;
for (int op = 0; op < keys.size(); op++) {
++self->totalReadsMetric;
readers.push_back(logLatency(tr->get(self->keyForIndex(keys[op])),
&self->readLatencies,
&self->readLatencyTotal,
&self->readLatencyCount,
self->readMetric,
shouldRecord));
}
wait(waitForAll(readers));
return Void();
}
ACTOR static Future<Void> _setup(Database cx, SkewedReadWriteWorkload* self) {
if (!self->doSetup)
return Void();
state Promise<double> loadTime;
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
wait(bulkSetup(cx,
self,
self->nodeCount,
loadTime,
self->insertionCountsToMeasure.empty(),
self->warmingDelay,
self->maxInsertRate,
self->insertionCountsToMeasure,
ratesAtKeyCounts));
self->loadTime = loadTime.getFuture().get();
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
return Void();
}
void startReadWriteClients(Database cx, std::vector<Future<Void>>& clients) {
clientBegin = now();
for (int c = 0; c < actorCount; c++) {
Future<Void> worker;
if (useRYW)
worker =
randomReadWriteClient<ReadYourWritesTransaction>(cx, this, actorCount / transactionsPerSecond, c);
else
worker = randomReadWriteClient<Transaction>(cx, this, actorCount / transactionsPerSecond, c);
clients.push_back(worker);
}
}
ACTOR static Future<Void> _start(Database cx, SkewedReadWriteWorkload* self) {
state std::vector<Future<Void>> clients;
if (self->enableReadLatencyLogging)
clients.push_back(tracePeriodically(self));
wait(updateServerShards(cx, self));
for (self->currentHotRound = 0; self->currentHotRound < self->skewRound; ++self->currentHotRound) {
self->setHotServers();
self->startReadWriteClients(cx, clients);
wait(timeout(waitForAll(clients), self->testDuration / self->skewRound, Void()));
clients.clear();
wait(delay(5.0) >> updateServerShards(cx, self));
}
return Void();
}
bool shouldRecord() { return shouldRecord(now()); }
bool shouldRecord(double checkTime) {
double timeSinceStart = checkTime - clientBegin;
return timeSinceStart >= metricsStart && timeSinceStart < (metricsStart + metricsDuration);
}
// calculate hot server count
void setHotServers() {
hotServerCount = ceil(hotServerFraction * serverShards.size());
std::cout << "Choose " << hotServerCount << "/" << serverShards.size() << "/" << serverInterfaces.size()
<< " hot servers: [";
int begin = currentHotRound * hotServerCount;
for (int i = 0; i < hotServerCount; ++i) {
int idx = (begin + i) % serverShards.size();
std::cout << serverInterfaces.at(serverShards[idx].first).address().toString() << ",";
}
std::cout << "]\n";
}
int64_t getRandomKeyFromHotServer(bool hotServerRead = true) {
ASSERT(hotServerCount > 0);
int begin = currentHotRound * hotServerCount;
if (!hotServerRead) {
begin += hotServerCount * (1.0 - hotReadWriteServerOverlap); // calculate non-overlap part offset
}
int idx = deterministicRandom()->randomInt(begin, begin + hotServerCount) % serverShards.size();
int shardMax = std::min(serverShards[idx].second.size(),
(size_t)ceil(serverShards[idx].second.size() * hotServerShardFraction));
int shardIdx = deterministicRandom()->randomInt(0, shardMax);
return deterministicRandom()->randomInt64(serverShards[idx].second[shardIdx].first,
serverShards[idx].second[shardIdx].second + 1);
}
int64_t getRandomKey(uint64_t nodeCount, bool hotServerRead = true) {
auto random = deterministicRandom()->random01();
if (hotServerFraction > 0) {
if ((hotServerRead && random < hotServerReadFrac) || (!hotServerRead && random < hotServerWriteFrac)) {
return getRandomKeyFromHotServer(hotServerRead);
}
}
return deterministicRandom()->randomInt64(0, nodeCount);
}
ACTOR template <class Trans>
Future<Void> randomReadWriteClient(Database cx, SkewedReadWriteWorkload* self, double delay, int clientIndex) {
state double startTime = now();
state double lastTime = now();
state double GRVStartTime;
state UID debugID;
loop {
wait(poisson(&lastTime, delay));
state double tstart = now();
state bool aTransaction = deterministicRandom()->random01() > self->alpha;
state std::vector<int64_t> keys;
state std::vector<Value> values;
state std::vector<KeyRange> extra_ranges;
int reads = aTransaction ? self->readsPerTransactionA : self->readsPerTransactionB;
state int writes = aTransaction ? self->writesPerTransactionA : self->writesPerTransactionB;
for (int op = 0; op < reads; op++)
keys.push_back(self->getRandomKey(self->nodeCount));
values.reserve(writes);
for (int op = 0; op < writes; op++)
values.push_back(self->randomValue());
state Trans tr(cx);
if (tstart - self->clientBegin > self->debugTime &&
tstart - self->clientBegin <= self->debugTime + self->debugInterval) {
debugID = deterministicRandom()->randomUniqueID();
tr.debugTransaction(debugID);
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before");
} else {
debugID = UID();
}
self->transactionSuccessMetric->retries = 0;
self->transactionSuccessMetric->commitLatency = -1;
loop {
try {
GRVStartTime = now();
self->transactionFailureMetric->startLatency = -1;
double grvLatency = now() - GRVStartTime;
self->transactionSuccessMetric->startLatency = grvLatency * 1e9;
self->transactionFailureMetric->startLatency = grvLatency * 1e9;
if (self->shouldRecord())
self->GRVLatencies.addSample(grvLatency);
state double readStart = now();
wait(self->readOp(&tr, keys, self, self->shouldRecord()));
double readLatency = now() - readStart;
if (self->shouldRecord())
self->fullReadLatencies.addSample(readLatency);
if (!writes)
break;
for (int op = 0; op < writes; op++)
tr.set(self->keyForIndex(self->getRandomKey(self->nodeCount, false), false), values[op]);
state double commitStart = now();
wait(tr.commit());
double commitLatency = now() - commitStart;
self->transactionSuccessMetric->commitLatency = commitLatency * 1e9;
if (self->shouldRecord())
self->commitLatencies.addSample(commitLatency);
break;
} catch (Error& e) {
self->transactionFailureMetric->errorCode = e.code();
self->transactionFailureMetric->log();
wait(tr.onError(e));
++self->transactionSuccessMetric->retries;
++self->totalRetriesMetric;
if (self->shouldRecord())
++self->retries;
}
}
if (debugID != UID())
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After");
tr = Trans();
double transactionLatency = now() - tstart;
self->transactionSuccessMetric->totalLatency = transactionLatency * 1e9;
self->transactionSuccessMetric->log();
if (self->shouldRecord()) {
if (aTransaction)
++self->aTransactions;
else
++self->bTransactions;
self->latencies.addSample(transactionLatency);
}
}
}
};
WorkloadFactory<SkewedReadWriteWorkload> SkewedReadWriteWorkloadFactory("SkewedReadWrite");
TEST_CASE("/KVWorkload/methods/ParseKeyForIndex") {
auto wk = SkewedReadWriteWorkload(WorkloadContext());
for (int i = 0; i < 1000; ++i) {
auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount);
Key k = wk.keyForIndex(idx);
auto parse = wk.indexForKey(k);
// std::cout << parse << " " << idx << "\n";
ASSERT(parse == idx);
}
for (int i = 0; i < 1000; ++i) {
auto idx = deterministicRandom()->randomInt64(0, wk.nodeCount);
Key k = wk.keyForIndex(idx, true);
auto parse = wk.indexForKey(k, true);
ASSERT(parse == idx);
}
return Void();
}

View File

@ -131,6 +131,8 @@ struct KVWorkload : TestWorkload {
Key getRandomKey(bool absent) const;
Key keyForIndex(uint64_t index) const;
Key keyForIndex(uint64_t index, bool absent) const;
// the reverse process of keyForIndex() without division. Set absent=true to ignore the last byte in Key
int64_t indexForKey(const KeyRef& key, bool absent = false) const;
};
struct IWorkloadFactory : ReferenceCounted<IWorkloadFactory> {