Merge remote-tracking branch 'origin/main' into combine-gtt-knobs
This commit is contained in:
commit
53444d835b
|
@ -653,12 +653,12 @@ const (
|
|||
StreamingModeWantAll StreamingMode = -1
|
||||
|
||||
// The default. The client doesn't know how much of the range it is likely
|
||||
// to used and wants different performance concerns to be balanced. Only a
|
||||
// small portion of data is transferred to the client initially (in order to
|
||||
// minimize costs if the client doesn't read the entire range), and as the
|
||||
// caller iterates over more items in the range larger batches will be
|
||||
// transferred in order to minimize latency. After enough iterations, the
|
||||
// iterator mode will eventually reach the same byte limit as “WANT_ALL“
|
||||
// to used and wants different performance concerns to be balanced.
|
||||
// Only a small portion of data is transferred to the client initially (in
|
||||
// order to minimize costs if the client doesn't read the entire range), and
|
||||
// as the caller iterates over more items in the range larger batches will
|
||||
// be transferred in order to minimize latency. After enough iterations,
|
||||
// the iterator mode will eventually reach the same byte limit as “WANT_ALL“
|
||||
StreamingModeIterator StreamingMode = 0
|
||||
|
||||
// Infrequently used. The client has passed a specific row limit and wants
|
||||
|
@ -668,8 +668,8 @@ const (
|
|||
// mode is used.
|
||||
StreamingModeExact StreamingMode = 1
|
||||
|
||||
// Infrequently used. Transfer data in batches small enough to not be much
|
||||
// more expensive than reading individual rows, to minimize cost if
|
||||
// Infrequently used. Transfer data in batches small enough to not be
|
||||
// much more expensive than reading individual rows, to minimize cost if
|
||||
// iteration stops early.
|
||||
StreamingModeSmall StreamingMode = 2
|
||||
|
||||
|
@ -677,16 +677,16 @@ const (
|
|||
// large.
|
||||
StreamingModeMedium StreamingMode = 3
|
||||
|
||||
// Infrequently used. Transfer data in batches large enough to be, in a
|
||||
// high-concurrency environment, nearly as efficient as possible. If the
|
||||
// client stops iteration early, some disk and network bandwidth may be
|
||||
// wasted. The batch size may still be too small to allow a single client to
|
||||
// get high throughput from the database, so if that is what you need
|
||||
// Infrequently used. Transfer data in batches large enough to be,
|
||||
// in a high-concurrency environment, nearly as efficient as possible.
|
||||
// If the client stops iteration early, some disk and network bandwidth may
|
||||
// be wasted. The batch size may still be too small to allow a single client
|
||||
// to get high throughput from the database, so if that is what you need
|
||||
// consider the SERIAL StreamingMode.
|
||||
StreamingModeLarge StreamingMode = 4
|
||||
|
||||
// Transfer data in batches large enough that an individual client can get
|
||||
// reasonable read bandwidth from the database. If the client stops
|
||||
// Transfer data in batches large enough that an individual client can
|
||||
// get reasonable read bandwidth from the database. If the client stops
|
||||
// iteration early, considerable disk and network bandwidth may be wasted.
|
||||
StreamingModeSerial StreamingMode = 5
|
||||
)
|
||||
|
|
|
@ -2,14 +2,14 @@ if(WITH_GO_BINDING)
|
|||
set(MOCK_KMS_SRC fault_injection.go get_encryption_keys.go mock_kms.go utils.go)
|
||||
set(MOCK_KMS_TEST_SRC ${MOCK_KMS_SRC} mockkms_test.go)
|
||||
add_custom_command(OUTPUT ${CMAKE_BINARY_DIR}/bin/mockkms
|
||||
COMMAND go build -o ${CMAKE_BINARY_DIR}/bin/mockkms ${MOCK_KMS_SRC}
|
||||
COMMAND ${GO_EXECUTABLE} build -o ${CMAKE_BINARY_DIR}/bin/mockkms ${MOCK_KMS_SRC}
|
||||
DEPENDS ${MOCK_KMS_SRC}
|
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
add_custom_target(mockkms ALL DEPENDS ${CMAKE_BINARY_DIR}/bin/mockkms)
|
||||
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/bin/mockkms DESTINATION bin COMPONENT server)
|
||||
|
||||
add_custom_command(OUTPUT ${CMAKE_BINARY_DIR}/bin/mockkms_test
|
||||
COMMAND go test -c -o ${CMAKE_BINARY_DIR}/bin/mockkms_test ${MOCK_KMS_TEST_SRC}
|
||||
COMMAND ${GO_EXECUTABLE} test -c -o ${CMAKE_BINARY_DIR}/bin/mockkms_test ${MOCK_KMS_TEST_SRC}
|
||||
DEPENDS ${MOCK_KMS_TEST_SRC}
|
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
add_custom_target(mockkms_test ALL DEPENDS ${CMAKE_BINARY_DIR}/bin/mockkms_test)
|
||||
|
|
|
@ -1131,6 +1131,16 @@ public:
|
|||
return false;
|
||||
}
|
||||
|
||||
// fallback for using existing write api if the underlying blob store doesn't support efficient writeEntireFile
|
||||
ACTOR static Future<Void> writeEntireFileFallback(Reference<BackupContainerFileSystem> bc,
|
||||
std::string fileName,
|
||||
std::string fileContents) {
|
||||
state Reference<IBackupFile> objectFile = wait(bc->writeFile(fileName));
|
||||
wait(objectFile->append(&fileContents[0], fileContents.size()));
|
||||
wait(objectFile->finish());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> createTestEncryptionKeyFile(std::string filename) {
|
||||
state Reference<IAsyncFile> keyFile = wait(IAsyncFileSystem::filesystem()->open(
|
||||
filename,
|
||||
|
@ -1484,6 +1494,12 @@ Future<Void> BackupContainerFileSystem::encryptionSetupComplete() const {
|
|||
return encryptionSetupFuture;
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerFileSystem::writeEntireFileFallback(const std::string& fileName,
|
||||
const std::string& fileContents) {
|
||||
return BackupContainerFileSystemImpl::writeEntireFileFallback(
|
||||
Reference<BackupContainerFileSystem>::addRef(this), fileName, fileContents);
|
||||
}
|
||||
|
||||
void BackupContainerFileSystem::setEncryptionKey(Optional<std::string> const& encryptionKeyFileName) {
|
||||
if (encryptionKeyFileName.present()) {
|
||||
encryptionSetupFuture = BackupContainerFileSystemImpl::readEncryptionKey(encryptionKeyFileName.get());
|
||||
|
|
|
@ -279,6 +279,10 @@ Future<Reference<IBackupFile>> BackupContainerLocalDirectory::writeFile(const st
|
|||
return map(f, [=](Reference<IAsyncFile> f) { return Reference<IBackupFile>(new BackupFile(path, f, fullPath)); });
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerLocalDirectory::writeEntireFile(const std::string& path, const std::string& contents) {
|
||||
return writeEntireFileFallback(path, contents);
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerLocalDirectory::deleteFile(const std::string& path) {
|
||||
::deleteFile(joinPath(m_path, path));
|
||||
return Void();
|
||||
|
|
|
@ -198,6 +198,10 @@ Future<Reference<IBackupFile>> BackupContainerS3BlobStore::writeFile(const std::
|
|||
return Future<Reference<IBackupFile>>(makeReference<BackupContainerS3BlobStoreImpl::BackupFile>(path, f));
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerS3BlobStore::writeEntireFile(const std::string& path, const std::string& fileContents) {
|
||||
return m_bstore->writeEntireFile(m_bucket, dataPath(path), fileContents);
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerS3BlobStore::deleteFile(const std::string& path) {
|
||||
return m_bstore->deleteObject(m_bucket, dataPath(path));
|
||||
}
|
||||
|
|
|
@ -5924,6 +5924,7 @@ public:
|
|||
printf("Restoring backup to version: %lld\n", (long long)targetVersion);
|
||||
}
|
||||
|
||||
state int retryCount = 0;
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
loop {
|
||||
try {
|
||||
|
@ -5947,9 +5948,17 @@ public:
|
|||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) {
|
||||
retryCount++;
|
||||
}
|
||||
if (e.code() == error_code_restore_duplicate_tag) {
|
||||
throw;
|
||||
}
|
||||
if (g_network->isSimulated() && retryCount > 50) {
|
||||
CODE_PROBE(true, "submitRestore simulation speedup");
|
||||
// try to make the read window back to normal size (5 * version_per_sec)
|
||||
g_simulator->speedUpSimulation = true;
|
||||
}
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1568,11 +1568,12 @@ ACTOR Future<Void> writeEntireFile_impl(Reference<S3BlobStoreEndpoint> bstore,
|
|||
std::string object,
|
||||
std::string content) {
|
||||
state UnsentPacketQueue packets;
|
||||
PacketWriter pw(packets.getWriteBuffer(content.size()), nullptr, Unversioned());
|
||||
pw.serializeBytes(content);
|
||||
if (content.size() > bstore->knobs.multipart_max_part_size)
|
||||
throw file_too_large();
|
||||
|
||||
PacketWriter pw(packets.getWriteBuffer(content.size()), nullptr, Unversioned());
|
||||
pw.serializeBytes(content);
|
||||
|
||||
// Yield because we may have just had to copy several MB's into packet buffer chain and next we have to calculate an
|
||||
// MD5 sum of it.
|
||||
// TODO: If this actor is used to send large files then combine the summing and packetization into a loop with a
|
||||
|
|
|
@ -739,7 +739,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( GLOBAL_TAG_THROTTLING, false ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip();
|
||||
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
|
||||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
||||
// 60 seconds was chosen as a default value to ensure that
|
||||
// the global tag throttler does not react too drastically to
|
||||
// changes in workload. To make the global tag throttler more reactive,
|
||||
// lower this knob. To make global tag throttler more smooth, raise this knob.
|
||||
// Setting this knob lower than TAG_MEASUREMENT_INTERVAL can cause erratic
|
||||
// behaviour and is not recommended.
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 60.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
|
||||
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
|
||||
init( PROXY_MAX_TAG_THROTTLE_DURATION, 5.0 ); if( randomize && BUGGIFY ) PROXY_MAX_TAG_THROTTLE_DURATION = 0.5;
|
||||
|
@ -942,6 +948,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
|
||||
// Server request latency measurement
|
||||
init( LATENCY_SAMPLE_SIZE, 100000 );
|
||||
init( FILE_LATENCY_SAMPLE_SIZE, 10000 );
|
||||
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
|
||||
|
||||
// Cluster recovery
|
||||
|
@ -983,6 +990,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BG_ENABLE_READ_DRIVEN_COMPACTION, true ); if (randomize && BUGGIFY) BG_ENABLE_READ_DRIVEN_COMPACTION = false;
|
||||
init( BG_RDC_BYTES_FACTOR, 2 ); if (randomize && BUGGIFY) BG_RDC_BYTES_FACTOR = deterministicRandom()->randomInt(1, 10);
|
||||
init( BG_RDC_READ_FACTOR, 3 ); if (randomize && BUGGIFY) BG_RDC_READ_FACTOR = deterministicRandom()->randomInt(1, 10);
|
||||
init( BG_WRITE_MULTIPART, false ); if (randomize && BUGGIFY) BG_WRITE_MULTIPART = true;
|
||||
|
||||
init( BG_ENABLE_MERGING, true ); if (randomize && BUGGIFY) BG_ENABLE_MERGING = false;
|
||||
init( BG_MERGE_CANDIDATE_THRESHOLD_SECONDS, isSimulated ? 20.0 : 30 * 60 ); if (randomize && BUGGIFY) BG_MERGE_CANDIDATE_THRESHOLD_SECONDS = 5.0;
|
||||
|
|
|
@ -369,6 +369,11 @@ Future<Reference<IBackupFile>> BackupContainerAzureBlobStore::writeFile(const st
|
|||
return BackupContainerAzureBlobStoreImpl::writeFile(this, fileName);
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::writeEntireFile(const std::string& fileName,
|
||||
const std::string& fileConents) {
|
||||
return writeEntireFileFallback(fileName, fileContents);
|
||||
}
|
||||
|
||||
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerAzureBlobStore::listFiles(
|
||||
const std::string& path,
|
||||
std::function<bool(std::string const&)> folderPathFilter) {
|
||||
|
|
|
@ -56,6 +56,8 @@ public:
|
|||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& fileName) override;
|
||||
|
||||
Future<Void> writeEntireFile(const std::string& fileName, const std::string& fileContents) override;
|
||||
|
||||
Future<FilesAndSizesT> listFiles(const std::string& path = "",
|
||||
std::function<bool(std::string const&)> folderPathFilter = nullptr) override;
|
||||
|
||||
|
|
|
@ -98,6 +98,9 @@ public:
|
|||
// Open a file for write by fileName
|
||||
virtual Future<Reference<IBackupFile>> writeFile(const std::string& fileName) = 0;
|
||||
|
||||
// write entire file
|
||||
virtual Future<Void> writeEntireFile(const std::string& fileName, const std::string& contents) = 0;
|
||||
|
||||
// Delete a file
|
||||
virtual Future<Void> deleteFile(const std::string& fileName) = 0;
|
||||
|
||||
|
@ -166,6 +169,8 @@ protected:
|
|||
void setEncryptionKey(Optional<std::string> const& encryptionKeyFileName);
|
||||
Future<Void> encryptionSetupComplete() const;
|
||||
|
||||
Future<Void> writeEntireFileFallback(const std::string& fileName, const std::string& fileContents);
|
||||
|
||||
private:
|
||||
struct VersionProperty {
|
||||
VersionProperty(Reference<BackupContainerFileSystem> bc, const std::string& name)
|
||||
|
|
|
@ -46,6 +46,8 @@ public:
|
|||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& path) final;
|
||||
|
||||
Future<Void> writeEntireFile(const std::string& path, const std::string& contents) final;
|
||||
|
||||
Future<Void> deleteFile(const std::string& path) final;
|
||||
|
||||
Future<FilesAndSizesT> listFiles(const std::string& path, std::function<bool(std::string const&)>) final;
|
||||
|
|
|
@ -57,6 +57,8 @@ public:
|
|||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& path) final;
|
||||
|
||||
Future<Void> writeEntireFile(const std::string& path, const std::string& contents) final;
|
||||
|
||||
Future<Void> deleteFile(const std::string& path) final;
|
||||
|
||||
Future<FilesAndSizesT> listFiles(const std::string& path, std::function<bool(std::string const&)> pathFilter) final;
|
||||
|
|
|
@ -57,6 +57,11 @@ struct BlobWorkerStats {
|
|||
int64_t lastResidentMemory;
|
||||
int64_t estimatedMaxResidentMemory;
|
||||
|
||||
LatencySample snapshotBlobWriteLatencySample;
|
||||
LatencySample deltaBlobWriteLatencySample;
|
||||
LatencySample reSnapshotLatencySample;
|
||||
LatencySample readLatencySample;
|
||||
|
||||
Reference<FlowLock> initialSnapshotLock;
|
||||
Reference<FlowLock> resnapshotLock;
|
||||
Reference<FlowLock> deltaWritesLock;
|
||||
|
@ -68,7 +73,10 @@ struct BlobWorkerStats {
|
|||
double interval,
|
||||
Reference<FlowLock> initialSnapshotLock,
|
||||
Reference<FlowLock> resnapshotLock,
|
||||
Reference<FlowLock> deltaWritesLock)
|
||||
Reference<FlowLock> deltaWritesLock,
|
||||
double sampleLoggingInterval,
|
||||
int fileOpLatencySampleSize,
|
||||
int requestLatencySampleSize)
|
||||
: cc("BlobWorkerStats", id.toString()),
|
||||
|
||||
s3PutReqs("S3PutReqs", cc), s3GetReqs("S3GetReqs", cc), s3DeleteReqs("S3DeleteReqs", cc),
|
||||
|
@ -87,6 +95,10 @@ struct BlobWorkerStats {
|
|||
forceFlushCleanups("ForceFlushCleanups", cc), readDrivenCompactions("ReadDrivenCompactions", cc),
|
||||
numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0), granulesPendingSplitCheck(0),
|
||||
minimumCFVersion(0), cfVersionLag(0), notAtLatestChangeFeeds(0), lastResidentMemory(0),
|
||||
snapshotBlobWriteLatencySample("SnapshotBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
deltaBlobWriteLatencySample("DeltaBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
reSnapshotLatencySample("GranuleResnapshotMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
readLatencySample("GranuleReadLatencyMetrics", id, sampleLoggingInterval, requestLatencySampleSize),
|
||||
estimatedMaxResidentMemory(0), initialSnapshotLock(initialSnapshotLock), resnapshotLock(resnapshotLock),
|
||||
deltaWritesLock(deltaWritesLock) {
|
||||
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbrpc/TimedRequest.h"
|
||||
#include "fdbclient/StorageServerInterface.h" // for TenantInfo - should we refactor that elsewhere?
|
||||
|
||||
struct BlobWorkerInterface {
|
||||
|
@ -105,7 +106,7 @@ struct BlobGranuleFileReply {
|
|||
|
||||
// TODO could do a reply promise stream of file mutations to bound memory requirements?
|
||||
// Have to load whole snapshot file into memory though so it doesn't actually matter too much
|
||||
struct BlobGranuleFileRequest {
|
||||
struct BlobGranuleFileRequest : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 4150141;
|
||||
Arena arena;
|
||||
KeyRangeRef keyRange;
|
||||
|
|
|
@ -919,6 +919,7 @@ public:
|
|||
|
||||
// Server request latency measurement
|
||||
int LATENCY_SAMPLE_SIZE;
|
||||
int FILE_LATENCY_SAMPLE_SIZE;
|
||||
double LATENCY_METRICS_LOGGING_INTERVAL;
|
||||
|
||||
// Cluster recovery
|
||||
|
@ -964,6 +965,7 @@ public:
|
|||
bool BG_ENABLE_READ_DRIVEN_COMPACTION;
|
||||
int BG_RDC_BYTES_FACTOR;
|
||||
int BG_RDC_READ_FACTOR;
|
||||
bool BG_WRITE_MULTIPART;
|
||||
|
||||
int BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM;
|
||||
int BLOB_WORKER_RESNAPSHOT_PARALLELISM;
|
||||
|
|
|
@ -56,6 +56,8 @@ struct CheckpointMetaData {
|
|||
// A serialized metadata associated with format, this data can be understood by the corresponding KVS.
|
||||
Standalone<StringRef> serializedCheckpoint;
|
||||
|
||||
UID dataMoveId;
|
||||
|
||||
CheckpointMetaData() = default;
|
||||
CheckpointMetaData(KeyRange const& range, CheckpointFormat format, UID const& ssID, UID const& checkpointID)
|
||||
: version(invalidVersion), format(format), ssID(ssID), checkpointID(checkpointID), state(Pending),
|
||||
|
@ -67,6 +69,9 @@ struct CheckpointMetaData {
|
|||
gcTime(0) {
|
||||
this->ranges.push_back(range);
|
||||
}
|
||||
CheckpointMetaData(Version version, CheckpointFormat format, UID checkpointID)
|
||||
: version(version), format(format), ssID(UID()), checkpointID(checkpointID), state(Pending), referenceCount(0),
|
||||
gcTime(0) {}
|
||||
|
||||
CheckpointState getState() const { return static_cast<CheckpointState>(state); }
|
||||
|
||||
|
@ -77,16 +82,17 @@ struct CheckpointMetaData {
|
|||
void setFormat(CheckpointFormat format) { this->format = static_cast<int16_t>(format); }
|
||||
|
||||
std::string toString() const {
|
||||
std::string res = "Checkpoint MetaData:\nRange: " + describe(ranges) + "\nVersion: " + std::to_string(version) +
|
||||
"\nFormat: " + std::to_string(format) + "\nServer: " + ssID.toString() +
|
||||
"\nID: " + checkpointID.toString() + "\nState: " + std::to_string(static_cast<int>(state)) +
|
||||
"\n";
|
||||
std::string res = "Checkpoint MetaData: [Ranges]: " + describe(ranges) +
|
||||
" [Version]: " + std::to_string(version) + " [Format]: " + std::to_string(format) +
|
||||
" [Server]: " + ssID.toString() + " [ID]: " + checkpointID.toString() +
|
||||
" [State]: " + std::to_string(static_cast<int>(state)) +
|
||||
" [DataMove ID]: " + dataMoveId.toString();
|
||||
return res;
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, ranges, format, state, checkpointID, ssID, gcTime, serializedCheckpoint);
|
||||
serializer(ar, version, ranges, format, state, checkpointID, ssID, gcTime, serializedCheckpoint, dataMoveId);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -86,9 +86,14 @@ ACTOR Future<Void> createObject(ConnectionProviderTestSettings* settings, Provid
|
|||
state std::string fullPath;
|
||||
std::tie(bstore, fullPath) = provider->provider->createForWrite(objName);
|
||||
|
||||
state Reference<IBackupFile> file = wait(bstore->writeFile(fullPath));
|
||||
wait(file->append(data.begin(), data.size()));
|
||||
wait(file->finish());
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
state Reference<IBackupFile> file = wait(bstore->writeFile(fullPath));
|
||||
wait(file->append(data.begin(), data.size()));
|
||||
wait(file->finish());
|
||||
} else {
|
||||
std::string contents = data.toString();
|
||||
wait(bstore->writeEntireFile(fullPath, contents));
|
||||
}
|
||||
|
||||
// after write, put in the readable list
|
||||
provider->data.push_back({ fullPath, data });
|
||||
|
|
|
@ -297,7 +297,14 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
|
|||
initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)),
|
||||
resnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_PARALLELISM)),
|
||||
deltaWritesLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_FILE_WRITE_PARALLELISM)),
|
||||
stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, initialSnapshotLock, resnapshotLock, deltaWritesLock),
|
||||
stats(id,
|
||||
SERVER_KNOBS->WORKER_LOGGING_INTERVAL,
|
||||
initialSnapshotLock,
|
||||
resnapshotLock,
|
||||
deltaWritesLock,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->FILE_LATENCY_SAMPLE_SIZE,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
isEncryptionEnabled(isEncryptionOpSupported(EncryptOperationType::BLOB_GRANULE_ENCRYPTION)) {}
|
||||
|
||||
bool managerEpochOk(int64_t epoch) {
|
||||
|
@ -764,6 +771,28 @@ ACTOR Future<std::pair<BlobGranuleSplitState, Version>> getGranuleSplitState(Tra
|
|||
}
|
||||
}
|
||||
|
||||
// tries to use writeEntireFile if possible, but if too big falls back to multi-part upload
|
||||
ACTOR Future<Void> writeFile(Reference<BackupContainerFileSystem> writeBStore, std::string fname, Value serialized) {
|
||||
if (!SERVER_KNOBS->BG_WRITE_MULTIPART) {
|
||||
try {
|
||||
state std::string fileContents = serialized.toString();
|
||||
wait(writeBStore->writeEntireFile(fname, fileContents));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
// error_code_file_too_large means it was too big to do with single write
|
||||
if (e.code() != error_code_file_too_large) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state Reference<IBackupFile> objectFile = wait(writeBStore->writeFile(fname));
|
||||
wait(objectFile->append(serialized.begin(), serialized.size()));
|
||||
wait(objectFile->finish());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// writeDelta file writes speculatively in the common case to optimize throughput. It creates the s3 object even though
|
||||
// the data in it may not yet be committed, and even though previous delta files with lower versioned data may still be
|
||||
// in flight. The synchronization happens after the s3 file is written, but before we update the FDB index of what files
|
||||
|
@ -814,14 +843,16 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
state Reference<BackupContainerFileSystem> writeBStore;
|
||||
state std::string fname;
|
||||
std::tie(writeBStore, fname) = bstore->createForWrite(fileName);
|
||||
state Reference<IBackupFile> objectFile = wait(writeBStore->writeFile(fname));
|
||||
|
||||
state double writeStartTimer = g_network->timer();
|
||||
|
||||
wait(writeFile(writeBStore, fname, serialized));
|
||||
|
||||
++bwData->stats.s3PutReqs;
|
||||
++bwData->stats.deltaFilesWritten;
|
||||
bwData->stats.deltaBytesWritten += serializedSize;
|
||||
|
||||
wait(objectFile->append(serialized.begin(), serializedSize));
|
||||
wait(objectFile->finish());
|
||||
double duration = g_network->timer() - writeStartTimer;
|
||||
bwData->stats.deltaBlobWriteLatencySample.addMeasurement(duration);
|
||||
|
||||
// free serialized since it is persisted in blob
|
||||
serialized = Value();
|
||||
|
@ -1025,14 +1056,16 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
|||
state Reference<BackupContainerFileSystem> writeBStore;
|
||||
state std::string fname;
|
||||
std::tie(writeBStore, fname) = bstore->createForWrite(fileName);
|
||||
state Reference<IBackupFile> objectFile = wait(writeBStore->writeFile(fname));
|
||||
|
||||
state double writeStartTimer = g_network->timer();
|
||||
|
||||
wait(writeFile(writeBStore, fname, serialized));
|
||||
|
||||
++bwData->stats.s3PutReqs;
|
||||
++bwData->stats.snapshotFilesWritten;
|
||||
bwData->stats.snapshotBytesWritten += serializedSize;
|
||||
|
||||
wait(objectFile->append(serialized.begin(), serializedSize));
|
||||
wait(objectFile->finish());
|
||||
double duration = g_network->timer() - writeStartTimer;
|
||||
bwData->stats.snapshotBlobWriteLatencySample.addMeasurement(duration);
|
||||
|
||||
// free serialized since it is persisted in blob
|
||||
serialized = Value();
|
||||
|
@ -1225,6 +1258,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
|
|||
state Arena filenameArena;
|
||||
state std::vector<Future<RangeResult>> chunksToRead;
|
||||
state int64_t compactBytesRead = 0;
|
||||
state double resnapshotStartTimer = g_network->timer();
|
||||
|
||||
for (auto& f : fileSet) {
|
||||
ASSERT(!f.snapshotFiles.empty());
|
||||
ASSERT(!f.deltaFiles.empty());
|
||||
|
@ -1324,6 +1359,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
|
|||
|
||||
BlobFileIndex f = wait(snapshotWriter);
|
||||
DEBUG_KEY_RANGE("BlobWorkerBlobSnapshot", version, metadata->keyRange, bwData->id);
|
||||
double duration = g_network->timer() - resnapshotStartTimer;
|
||||
bwData->stats.reSnapshotLatencySample.addMeasurement(duration);
|
||||
return f;
|
||||
} catch (Error& e) {
|
||||
if (BW_DEBUG) {
|
||||
|
@ -3846,6 +3883,10 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
if (didCollapse) {
|
||||
++bwData->stats.readRequestsCollapsed;
|
||||
}
|
||||
|
||||
double duration = g_network->timer() - req.requestTime();
|
||||
bwData->stats.readLatencySample.addMeasurement(duration);
|
||||
|
||||
ASSERT(!req.reply.isSet());
|
||||
req.reply.send(rep);
|
||||
--bwData->stats.activeReadRequests;
|
||||
|
|
|
@ -641,7 +641,7 @@ class MockStorageServer {
|
|||
Smoother smoother;
|
||||
|
||||
public:
|
||||
Cost() : smoother(5.0) {}
|
||||
Cost() : smoother(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
|
||||
Cost& operator+=(double delta) {
|
||||
smoother.addDelta(delta);
|
||||
return *this;
|
||||
|
@ -931,9 +931,9 @@ TEST_CASE("/GlobalTagThrottler/MultiClientThrottling") {
|
|||
|
||||
// 10 storage servers can handle 100 pages/second each.
|
||||
// Total quota set to 100 pages/second.
|
||||
// 2 clients each attempt 20 10-byte transactions per second.
|
||||
// 2 clients each attempt 20 10-page transactions per second.
|
||||
// Target rate should adjust to allow 100/10 transactions per second.
|
||||
// Each client is throttled to only perform 100/20 transactions per second.
|
||||
// Each client is throttled to only perform (100/10)/2 transactions per second.
|
||||
TEST_CASE("/GlobalTagThrottler/MultiClientThrottling2") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{}, 0);
|
||||
state StorageServerCollection storageServers(10, 100);
|
||||
|
@ -953,8 +953,8 @@ TEST_CASE("/GlobalTagThrottler/MultiClientThrottling2") {
|
|||
|
||||
// 10 storage servers can handle 100 pages/second each.
|
||||
// Total quota set to 100 pages/second.
|
||||
// One client attempts 5 5-byte read transactions per second.
|
||||
// Another client attempts 25 5-byte read transactions per second.
|
||||
// One client attempts 5 5-page read transactions per second.
|
||||
// Another client attempts 25 5-page read transactions per second.
|
||||
// Target rate should adjust to allow 100/5 transactions per second.
|
||||
// This 20 transactions/second limit is split with a distribution of (5, 15) between the 2 clients.
|
||||
TEST_CASE("/GlobalTagThrottler/SkewedMultiClientThrottling") {
|
||||
|
@ -976,7 +976,7 @@ TEST_CASE("/GlobalTagThrottler/SkewedMultiClientThrottling") {
|
|||
|
||||
// 10 storage servers can handle 100 pages/second each.
|
||||
// Total quota is initially set to 100 pages/second.
|
||||
// Client attempts 5 6-byte transactions per second.
|
||||
// Client attempts 5 6-page transactions per second.
|
||||
// Test that the tag throttler can reach equilibrium, then adjust to a new equilibrium once the quota is changed
|
||||
// Target rate should adjust to allow 100/6 transactions per second.
|
||||
// Total quota is modified to 50 pages/second.
|
||||
|
@ -1003,7 +1003,7 @@ TEST_CASE("/GlobalTagThrottler/UpdateQuota") {
|
|||
|
||||
// 10 storage servers can handle 100 pages/second each.
|
||||
// Total quota is initially set to 100 pages/second.
|
||||
// Client attempts 5 6-byte read transactions per second.
|
||||
// Client attempts 5 6-page read transactions per second.
|
||||
// Target limit adjusts to allow 100/6 transactions per second.
|
||||
// Then Quota is removed.
|
||||
// Target limit is removed as a result.
|
||||
|
@ -1027,7 +1027,7 @@ TEST_CASE("/GlobalTagThrottler/RemoveQuota") {
|
|||
|
||||
// 10 storage servers can handle 5 pages/second each.
|
||||
// Total quota is set to 100 pages/second.
|
||||
// Client attempts 10 6-byte transactions per second
|
||||
// Client attempts 10 6-page transactions per second
|
||||
// Target is adjusted to 50/6 transactions per second, to match the total capacity all storage servers.
|
||||
TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{}, 0);
|
||||
|
@ -1047,7 +1047,7 @@ TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
|
|||
|
||||
// 10 storage servers can handle 5 pages/second each.
|
||||
// Total quota is set to 50 pages/second for one tag, 100 pages/second for another.
|
||||
// For each tag, a client attempts to execute 10 6-byte read transactions per second.
|
||||
// For each tag, a client attempts to execute 10 6-page read transactions per second.
|
||||
// Target rates are adjusted to utilize the full 50 pages/second capacity of the
|
||||
// add storage servers. The two tags receive this capacity with a 2:1 ratio,
|
||||
// matching the ratio of their total quotas.
|
||||
|
@ -1076,7 +1076,7 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling") {
|
|||
|
||||
// 3 storage servers can handle 50 pages/second each.
|
||||
// Total quota is set to 100 pages/second for each tag.
|
||||
// Each client attempts 10 6-byte read transactions per second.
|
||||
// Each client attempts 10 6-page read transactions per second.
|
||||
// This workload is sent to 2 storage servers per client (with an overlap of one storage server).
|
||||
// Target rates for both tags are adjusted to 50/6 transactions per second to match the throughput
|
||||
// that the busiest server can handle.
|
||||
|
@ -1105,8 +1105,8 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling2") {
|
|||
|
||||
// 3 storage servers can handle 50 pages/second each.
|
||||
// Total quota is set to 100 pages/second for each tag.
|
||||
// One client attempts 10 6-byte read transactions per second, all directed towards a single storage server.
|
||||
// Another client, using a different tag, attempts 10 6-byte read transactions split across the other two storage
|
||||
// One client attempts 10 6-page read transactions per second, all directed towards a single storage server.
|
||||
// Another client, using a different tag, attempts 10 6-page read transactions split across the other two storage
|
||||
// servers. Target rates adjust to 50/6 and 100/6 transactions per second for the two clients, based on the capacities
|
||||
// of the
|
||||
// storage servers being accessed.
|
||||
|
@ -1136,7 +1136,7 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling3") {
|
|||
// 10 storage servers can serve 5 pages/second each.
|
||||
// Total quota is set to 100 pages/second.
|
||||
// Reserved quota is set to 70 pages/second.
|
||||
// A client attempts to execute 10 6-byte read transactions per second.
|
||||
// A client attempts to execute 10 6-page read transactions per second.
|
||||
// Despite the storage server only having capacity to serve 50/6 transactions per second,
|
||||
// the reserved quota will ensure the target rate adjusts to 70/6 transactions per second.
|
||||
TEST_CASE("/GlobalTagThrottler/ReservedQuota") {
|
||||
|
@ -1195,12 +1195,12 @@ TEST_CASE("/GlobalTagThrottler/TagLimit") {
|
|||
}
|
||||
|
||||
// 9 storage servers can handle 100 pages/second each.
|
||||
// 1 unhealthy storage server can only handle 1 byte/second.
|
||||
// 1 unhealthy storage server can only handle 1 page/second.
|
||||
// Total quota is set to 100 pages/second.
|
||||
// Client attempts 5 6-byte transactions per second.
|
||||
// Client attempts 5 6-page transactions per second.
|
||||
// Target rate adjusts to 100/6 transactions per second, ignoring the worst storage server.
|
||||
// Then, a second storage server becomes unhealthy and can only handle 1 byte/second.
|
||||
// Target rate adjusts down to its minimum rate, because only one bad zone can be ignored.
|
||||
// Then, a second storage server becomes unhealthy and can only handle 1 page/second.
|
||||
// Target rate adjusts down to 10/6 transactions per second, because only one bad zone can be ignored.
|
||||
TEST_CASE("/GlobalTagThrottler/IgnoreWorstZone") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{}, 1);
|
||||
state StorageServerCollection storageServers(10, 100);
|
||||
|
@ -1215,9 +1215,8 @@ TEST_CASE("/GlobalTagThrottler/IgnoreWorstZone") {
|
|||
state Future<Void> updater = updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 600.0));
|
||||
storageServers.setCapacity(1, 1);
|
||||
monitor = monitorActor(&globalTagThrottler, [](auto& gtt) {
|
||||
return targetRateIsNear(gtt, "sampleTag1"_sr, SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE);
|
||||
});
|
||||
monitor =
|
||||
monitorActor(&globalTagThrottler, [](auto& gtt) { return targetRateIsNear(gtt, "sampleTag1"_sr, 10.0 / 6.0); });
|
||||
wait(timeoutError(monitor || client || updater, 600.0));
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -2205,7 +2205,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
|
||||
TraceEvent("RocksDBServeCheckpointBegin", id)
|
||||
.detail("MinVersion", a.request.version)
|
||||
.detail("Range", a.request.range.toString())
|
||||
.detail("Ranges", describe(a.request.ranges))
|
||||
.detail("Format", static_cast<int>(a.request.format))
|
||||
.detail("CheckpointDir", a.request.checkpointDir);
|
||||
|
||||
|
@ -2236,7 +2236,8 @@ void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
|
|||
.detail("PersistVersion", version);
|
||||
|
||||
// TODO: set the range as the actual shard range.
|
||||
CheckpointMetaData res(version, a.request.range, a.request.format, a.request.checkpointID);
|
||||
CheckpointMetaData res(version, a.request.format, a.request.checkpointID);
|
||||
res.ranges = a.request.ranges;
|
||||
const std::string& checkpointDir = abspath(a.request.checkpointDir);
|
||||
|
||||
if (a.request.format == RocksDBColumnFamily) {
|
||||
|
@ -2519,7 +2520,7 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreColumnFamily")
|
|||
state std::string checkpointDir = cwd + "checkpoint";
|
||||
|
||||
CheckpointRequest request(
|
||||
latestVersion, allKeys, RocksDBColumnFamily, deterministicRandom()->randomUniqueID(), checkpointDir);
|
||||
latestVersion, { allKeys }, RocksDBColumnFamily, deterministicRandom()->randomUniqueID(), checkpointDir);
|
||||
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
|
||||
|
||||
std::vector<CheckpointMetaData> checkpoints;
|
||||
|
@ -2559,7 +2560,8 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreKeyValues") {
|
|||
platform::eraseDirectoryRecursive("checkpoint");
|
||||
std::string checkpointDir = cwd + "checkpoint";
|
||||
|
||||
CheckpointRequest request(latestVersion, allKeys, RocksDB, deterministicRandom()->randomUniqueID(), checkpointDir);
|
||||
CheckpointRequest request(
|
||||
latestVersion, { allKeys }, RocksDB, deterministicRandom()->randomUniqueID(), checkpointDir);
|
||||
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
|
||||
|
||||
state ICheckpointReader* cpReader = newCheckpointReader(metaData, deterministicRandom()->randomUniqueID());
|
||||
|
|
|
@ -289,7 +289,11 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
|||
// Detect conflicts
|
||||
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
|
||||
ConflictBatch conflictBatch(self->conflictSet, &reply.conflictingKeyRangeMap, &reply.arena);
|
||||
const Version newOldestVersion = req.version - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
|
||||
Version newOldestVersion = req.version - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
|
||||
if (g_network->isSimulated() && g_simulator->speedUpSimulation) {
|
||||
newOldestVersion = req.version - std::max(5 * SERVER_KNOBS->VERSIONS_PER_SECOND,
|
||||
SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS);
|
||||
}
|
||||
for (int t = 0; t < req.transactions.size(); t++) {
|
||||
conflictBatch.addTransaction(req.transactions[t], newOldestVersion);
|
||||
self->resolvedReadConflictRanges += req.transactions[t].read_conflict_ranges.size();
|
||||
|
|
|
@ -33,17 +33,17 @@
|
|||
|
||||
struct CheckpointRequest {
|
||||
const Version version; // The FDB version at which the checkpoint is created.
|
||||
const KeyRange range; // Keyrange this checkpoint must contain.
|
||||
const std::vector<KeyRange> ranges; // Keyranges this checkpoint must contain.
|
||||
const CheckpointFormat format;
|
||||
const UID checkpointID;
|
||||
const std::string checkpointDir; // The local directory where the checkpoint file will be created.
|
||||
|
||||
CheckpointRequest(const Version version,
|
||||
const KeyRange& range,
|
||||
const std::vector<KeyRange>& ranges,
|
||||
const CheckpointFormat format,
|
||||
const UID& id,
|
||||
const std::string& checkpointDir)
|
||||
: version(version), range(range), format(format), checkpointID(id), checkpointDir(checkpointDir) {}
|
||||
: version(version), ranges(ranges), format(format), checkpointID(id), checkpointDir(checkpointDir) {}
|
||||
};
|
||||
|
||||
class IKeyValueStore : public IClosable {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/GetEncryptCipherKeys.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
|
@ -339,7 +340,7 @@ public:
|
|||
|
||||
std::tuple<int64_t, size_t> getEncryptionDomain(const KeyRef& key, Optional<int64_t> possibleDomainId) override {
|
||||
// System key.
|
||||
if (key.startsWith("\xff\xff"_sr)) {
|
||||
if (key.startsWith(systemKeys.begin)) {
|
||||
return { SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, 2 };
|
||||
}
|
||||
// Key smaller than tenant prefix in size belongs to the default domain.
|
||||
|
|
|
@ -1113,9 +1113,11 @@ public:
|
|||
std::vector<int> readPriorityRanks;
|
||||
|
||||
Future<PriorityMultiLock::Lock> getReadLock(const Optional<ReadOptions>& options) {
|
||||
int readType = (int)(options.present() ? options.get().type : ReadType::NORMAL);
|
||||
readType = std::clamp<int>(readType, 0, readPriorityRanks.size() - 1);
|
||||
return ssLock.lock(readPriorityRanks[readType]);
|
||||
// TODO: Fix perf regression in 100% cache read case where taking this lock adds too much overhead
|
||||
return PriorityMultiLock::Lock();
|
||||
// int readType = (int)(options.present() ? options.get().type : ReadType::NORMAL);
|
||||
// readType = std::clamp<int>(readType, 0, readPriorityRanks.size() - 1);
|
||||
// return ssLock.lock(readPriorityRanks[readType]);
|
||||
}
|
||||
|
||||
FlowLock serveAuditStorageParallelismLock;
|
||||
|
@ -9105,7 +9107,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData metaData) {
|
||||
ASSERT(metaData.ssID == data->thisServerID && !metaData.ranges.empty());
|
||||
const CheckpointRequest req(metaData.version,
|
||||
metaData.ranges.front(),
|
||||
metaData.ranges,
|
||||
static_cast<CheckpointFormat>(metaData.format),
|
||||
metaData.checkpointID,
|
||||
data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString());
|
||||
|
|
Loading…
Reference in New Issue