Merge commit '53a3744de729639ec2d8ad7ba8ee899355a8f28d' into redwood-queue-error-handling

This commit is contained in:
Steve Atherton 2023-03-04 03:29:36 -08:00
commit 1b01c1b76c
17 changed files with 554 additions and 35 deletions

View File

@ -1,14 +1,15 @@
if __name__ == '__main__':
if __name__ == "__main__":
import re
import sys
r = re.compile('DLLEXPORT[^(]*(fdb_[^(]*)[(]')
r = re.compile("^DLLEXPORT[^(]*(fdb_[^(]*)[(].*$", re.MULTILINE)
header_files = sys.argv[1:-1]
symbols_file = sys.argv[-1]
symbols = set()
for header_file in header_files:
with open(header_file, 'r') as f:
symbols.update('_' + m.group(1) for m in r.finditer(f.read()))
with open(header_file, "r") as f:
symbols.update("_" + m.group(1) for m in r.finditer(f.read()))
symbols = sorted(symbols)
with open(symbols_file, 'w') as f:
f.write('\n'.join(symbols))
f.write('\n')
with open(symbols_file, "w") as f:
f.write("\n".join(symbols))
f.write("\n")

View File

@ -178,6 +178,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( READ_REBALANCE_SHARD_TOPK, READ_REBALANCE_SRC_PARALLELISM * 2 );
init( READ_REBALANCE_DIFF_FRAC, 0.3);
init( READ_REBALANCE_MAX_SHARD_FRAC, 0.2); // FIXME: add buggify here when we have DD test, seems DD is pretty sensitive to this parameter
// TODO: now we set it to a large number so that the shard average traffic can guard this change. Consider change it to a lower value in the future.
init( READ_REBALANCE_MIN_READ_BYTES_KS, std::numeric_limits<double>::max() );
init( RETRY_RELOCATESHARD_DELAY, 0.1 );
init( DATA_DISTRIBUTION_FAILURE_REACTION_TIME, 60.0 ); if( randomize && BUGGIFY ) DATA_DISTRIBUTION_FAILURE_REACTION_TIME = 1.0;
bool buggifySmallShards = randomize && BUGGIFY;
@ -476,6 +479,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD, 21600 ); // 6h, RocksDB default.
init( ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY, isSimulated ? 10.0 : 300.0 ); // Delays shard clean up, must be larger than ROCKSDB_READ_VALUE_TIMEOUT to prevent reading deleted shard.
init( ROCKSDB_EMPTY_RANGE_CHECK, isSimulated ? true : false);
init( ROCKSDB_CREATE_BYTES_SAMPLE_FILE_RETRY_MAX, 50 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;
@ -822,6 +826,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( STORAGE_COMMIT_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_INTERVAL = 2.0;
init( BYTE_SAMPLING_FACTOR, 250 ); //cannot buggify because of differences in restarting tests
init( BYTE_SAMPLING_OVERHEAD, 100 );
init( MIN_BYTE_SAMPLING_PROBABILITY, 0 ); // Adjustable only for test of PhysicalShardMove. should be always zero for other cases
init( MAX_STORAGE_SERVER_WATCH_BYTES, 100e6 ); if( randomize && BUGGIFY ) MAX_STORAGE_SERVER_WATCH_BYTES = 10e3;
init( MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE, 1e9 ); if( randomize && BUGGIFY ) MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE = 1e3;
init( LONG_BYTE_SAMPLE_RECOVERY_DELAY, 60.0 );

View File

@ -173,6 +173,8 @@ public:
double
READ_REBALANCE_DIFF_FRAC; // only when (srcLoad - destLoad)/srcLoad > DIFF_FRAC the read rebalance will happen
double READ_REBALANCE_MAX_SHARD_FRAC; // only move shard whose readLoad < (srcLoad - destLoad) * MAX_SHARD_FRAC
double
READ_REBALANCE_MIN_READ_BYTES_KS; // only move shard whose readLoad > min(MIN_READ_BYTES_KS, shard avg traffic);
double RETRY_RELOCATESHARD_DELAY;
double DATA_DISTRIBUTION_FAILURE_REACTION_TIME;
@ -389,6 +391,7 @@ public:
int64_t ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD;
double ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY;
bool ROCKSDB_EMPTY_RANGE_CHECK;
int ROCKSDB_CREATE_BYTES_SAMPLE_FILE_RETRY_MAX;
// Leader election
int MAX_NOTIFICATIONS;
@ -770,6 +773,8 @@ public:
double STORAGE_COMMIT_INTERVAL;
int BYTE_SAMPLING_FACTOR;
int BYTE_SAMPLING_OVERHEAD;
double MIN_BYTE_SAMPLING_PROBABILITY; // Adjustable only for test of PhysicalShardMove. Should always be 0 for other
// cases
int MAX_STORAGE_SERVER_WATCH_BYTES;
int MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE;
double LONG_BYTE_SAMPLE_RECOVERY_DELAY;

View File

@ -24,6 +24,8 @@
#include "fdbclient/FDBTypes.h"
const std::string checkpointBytesSampleFileName = "metadata_bytes.sst";
// FDB storage checkpoint format.
enum CheckpointFormat {
InvalidFormat = 0,
@ -52,6 +54,7 @@ struct CheckpointMetaData {
std::vector<UID> src; // Storage server(s) on which this checkpoint is created.
UID checkpointID; // A unique id for this checkpoint.
int16_t state; // CheckpointState.
Optional<std::string> bytesSampleFile;
// A serialized metadata associated with format, this data can be understood by the corresponding KVS.
Standalone<StringRef> serializedCheckpoint;
@ -107,14 +110,16 @@ struct CheckpointMetaData {
" [Version]: " + std::to_string(version) + " [Format]: " + std::to_string(format) +
" [Server]: " + describe(src) + " [ID]: " + checkpointID.toString() +
" [State]: " + std::to_string(static_cast<int>(state)) +
(actionId.present() ? (" [Action ID]: " + actionId.get().toString()) : "");
(actionId.present() ? (" [Action ID]: " + actionId.get().toString()) : "") +
(bytesSampleFile.present() ? " [bytesSampleFile]: " + bytesSampleFile.get() : "");
;
return res;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, ranges, format, state, checkpointID, src, serializedCheckpoint, actionId);
serializer(
ar, version, ranges, format, state, checkpointID, src, serializedCheckpoint, actionId, bytesSampleFile);
}
};

View File

@ -2138,8 +2138,10 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueue* self,
// randomly choose topK shards
int topK = std::max(1, std::min(int(0.1 * shards.size()), SERVER_KNOBS->READ_REBALANCE_SHARD_TOPK));
state Future<HealthMetrics> healthMetrics = self->txnProcessor->getHealthMetrics(true);
state GetTopKMetricsRequest req(
shards, topK, (srcLoad - destLoad) * SERVER_KNOBS->READ_REBALANCE_MAX_SHARD_FRAC, srcLoad / shards.size());
state GetTopKMetricsRequest req(shards,
topK,
(srcLoad - destLoad) * SERVER_KNOBS->READ_REBALANCE_MAX_SHARD_FRAC,
std::min(srcLoad / shards.size(), SERVER_KNOBS->READ_REBALANCE_MIN_READ_BYTES_KS));
state GetTopKMetricsReply reply = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req)));
wait(ready(healthMetrics));
auto cpu = getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs());

View File

@ -22,6 +22,7 @@
#include "fdbclient/EncryptKeyProxyInterface.h"
#include "fdbrpc/Locality.h"
#include "fdbserver/EncryptKeyProxy.actor.h"
#include "fdbserver/KmsConnector.h"
#include "fdbserver/KmsConnectorInterface.h"
#include "fdbserver/Knobs.h"
@ -385,7 +386,18 @@ ACTOR Future<Void> getCipherKeysByBaseCipherKeyIds(Reference<EncryptKeyProxyData
}
keysByIdsReq.debugId = keysByIds.debugId;
state double startTime = now();
KmsConnLookupEKsByKeyIdsRep keysByIdsRep = wait(kmsConnectorInf.ekLookupByIds.getReply(keysByIdsReq));
std::function<Future<KmsConnLookupEKsByKeyIdsRep>()> keysByIdsRepF = [&]() {
return kmsConnectorInf.ekLookupByIds.getReply(keysByIdsReq);
};
std::function<void()> retryTrace = [&]() {
for (const auto& item : keysByIdsReq.encryptKeyInfos) {
TraceEvent(SevDebug, "GetCipherKeysByKeyIdsRetry")
.suppressFor(30)
.detail("DomainId", item.domainId);
}
};
KmsConnLookupEKsByKeyIdsRep keysByIdsRep =
wait(kmsReqWithExponentialBackoff(keysByIdsRepF, retryTrace, "GetCipherKeysByKeyIds"_sr));
ekpProxyData->kmsLookupByIdsReqLatency.addMeasurement(now() - startTime);
for (const auto& item : keysByIdsRep.cipherKeyDetails) {
@ -520,8 +532,16 @@ ACTOR Future<Void> getLatestCipherKeys(Reference<EncryptKeyProxyData> ekpProxyDa
keysByDomainIdReq.debugId = latestKeysReq.debugId;
state double startTime = now();
std::function<Future<KmsConnLookupEKsByDomainIdsRep>()> keysByDomainIdRepF = [&]() {
return kmsConnectorInf.ekLookupByDomainIds.getReply(keysByDomainIdReq);
};
std::function<void()> retryTrace = [&]() {
for (const auto& item : keysByDomainIdReq.encryptDomainIds) {
TraceEvent(SevDebug, "GetLatestCipherKeysRetry").suppressFor(30).detail("DomainId", item);
}
};
KmsConnLookupEKsByDomainIdsRep keysByDomainIdRep =
wait(kmsConnectorInf.ekLookupByDomainIds.getReply(keysByDomainIdReq));
wait(kmsReqWithExponentialBackoff(keysByDomainIdRepF, retryTrace, "GetLatestCipherKeys"_sr));
ekpProxyData->kmsLookupByDomainIdsReqLatency.addMeasurement(now() - startTime);
for (auto& item : keysByDomainIdRep.cipherKeyDetails) {
@ -631,7 +651,15 @@ ACTOR Future<Void> refreshEncryptionKeysImpl(Reference<EncryptKeyProxyData> ekpP
}
state double startTime = now();
KmsConnLookupEKsByDomainIdsRep rep = wait(kmsConnectorInf.ekLookupByDomainIds.getReply(req));
std::function<Future<KmsConnLookupEKsByDomainIdsRep>()> repF = [&]() {
return kmsConnectorInf.ekLookupByDomainIds.getReply(req);
};
std::function<void()> retryTrace = [&]() {
for (const auto& item : req.encryptDomainIds) {
TraceEvent(SevDebug, "RefreshEKsRetry").suppressFor(30).detail("DomainId", item);
}
};
KmsConnLookupEKsByDomainIdsRep rep = wait(kmsReqWithExponentialBackoff(repF, retryTrace, "RefreshEKs"_sr));
ekpProxyData->kmsLookupByDomainIdsReqLatency.addMeasurement(now() - startTime);
for (const auto& item : rep.cipherKeyDetails) {
const auto itr = ekpProxyData->baseCipherDomainIdCache.find(item.encryptDomainId);
@ -727,7 +755,16 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
ekpProxyData->blobMetadataCacheMisses += kmsReq.domainIds.size();
try {
state double startTime = now();
KmsConnBlobMetadataRep kmsRep = wait(kmsConnectorInf.blobMetadataReq.getReply(kmsReq));
std::function<Future<KmsConnBlobMetadataRep>()> kmsRepF = [&]() {
return kmsConnectorInf.blobMetadataReq.getReply(kmsReq);
};
std::function<void()> retryTrace = [&]() {
for (const auto& item : kmsReq.domainIds) {
TraceEvent(SevDebug, "GetLatestBlobMetadataRetry").suppressFor(30).detail("DomainId", item);
}
};
KmsConnBlobMetadataRep kmsRep =
wait(kmsReqWithExponentialBackoff(kmsRepF, retryTrace, "GetLatestBlobMetadata"_sr));
ekpProxyData->kmsBlobMetadataReqLatency.addMeasurement(now() - startTime);
metadataDetails.arena().dependsOn(kmsRep.metadataDetails.arena());
@ -790,7 +827,15 @@ ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpPro
}
startTime = now();
KmsConnBlobMetadataRep rep = wait(kmsConnectorInf.blobMetadataReq.getReply(req));
std::function<Future<KmsConnBlobMetadataRep>()> repF = [&]() {
return kmsConnectorInf.blobMetadataReq.getReply(req);
};
std::function<void()> retryTrace = [&]() {
for (const auto& item : req.domainIds) {
TraceEvent(SevDebug, "RefreshBlobMetadataRetry").suppressFor(30).detail("DomainId", item);
}
};
KmsConnBlobMetadataRep rep = wait(kmsReqWithExponentialBackoff(repF, retryTrace, "RefreshBlobMetadata"_sr));
ekpProxyData->kmsBlobMetadataReqLatency.addMeasurement(now() - startTime);
for (auto& item : rep.metadataDetails) {
ekpProxyData->insertIntoBlobMetadataCache(item.domainId, item);
@ -890,3 +935,48 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
return Void();
}
TEST_CASE("/EncryptKeyProxy/ExponentialBackoff") {
state int callCount = 0;
state bool alwaysThrow = false;
state std::vector<Error> errors = { encrypt_keys_fetch_failed(), timed_out(), connection_failed() };
state std::function<Future<bool>()> repF = [&]() {
if (callCount > 1 && !alwaysThrow) {
return true;
}
callCount += 1;
throw deterministicRandom()->randomChoice(errors);
};
state std::function<void()> retryTrace = [&]() {};
bool resp = wait(kmsReqWithExponentialBackoff(repF, retryTrace, "TestEKPBackoff1"_sr));
ASSERT(resp);
ASSERT_EQ(callCount, 2);
// Exceeding retry limit should result in failure
IKnobCollection::getMutableGlobalKnobCollection().setKnob("ekp_kms_connection_retries",
KnobValueRef::create(int{ 3 }));
callCount = 0;
alwaysThrow = true;
errors = { timed_out() };
try {
wait(success(kmsReqWithExponentialBackoff(repF, retryTrace, "TestEKPBackoff2"_sr)));
ASSERT(false);
} catch (Error& e) {
ASSERT_EQ(e.code(), error_code_timed_out);
ASSERT_EQ(callCount, FLOW_KNOBS->EKP_KMS_CONNECTION_RETRIES + 1);
}
// A non-retryable error should throw immediately
callCount = 0;
alwaysThrow = false;
errors = { encrypt_key_not_found() };
try {
wait(success(kmsReqWithExponentialBackoff(repF, retryTrace, "TestEKPBackoff3"_sr)));
ASSERT(false);
} catch (Error& e) {
ASSERT_EQ(e.code(), error_code_encrypt_key_not_found);
ASSERT_EQ(callCount, 1);
}
return Void();
}

View File

@ -3763,6 +3763,94 @@ TEST_CASE("noSim/ShardedRocksDB/CheckpointBasic") {
return Void();
}
TEST_CASE("noSim/ShardedRocksDB/RocksDBSstFileWriter") {
state std::string localFile = "rocksdb-sst-file-dump.sst";
state std::unique_ptr<IRocksDBSstFileWriter> sstWriter = newRocksDBSstFileWriter();
// Write nothing to sst file
sstWriter->open(localFile);
bool anyFileCreated = sstWriter->finish();
ASSERT(!anyFileCreated);
// Write kvs1 to sst file
state std::map<Key, Value> kvs1({ { "a"_sr, "1"_sr },
{ "ab"_sr, "12"_sr },
{ "ad"_sr, "14"_sr },
{ "b"_sr, "2"_sr },
{ "ba"_sr, "21"_sr },
{ "c"_sr, "3"_sr },
{ "d"_sr, "4"_sr },
{ "e"_sr, "5"_sr },
{ "h"_sr, "8"_sr },
{ "ha"_sr, "81"_sr } });
sstWriter = newRocksDBSstFileWriter();
sstWriter->open(localFile);
for (const auto& [key, value] : kvs1) {
sstWriter->write(key, value);
}
anyFileCreated = sstWriter->finish();
ASSERT(anyFileCreated);
// Write kvs2 to the same sst file where kvs2 keys are different from kvs1
state std::map<Key, Value> kvs2({ { "fa"_sr, "61"_sr },
{ "fab"_sr, "612"_sr },
{ "fad"_sr, "614"_sr },
{ "fb"_sr, "62"_sr },
{ "fba"_sr, "621"_sr },
{ "fc"_sr, "63"_sr },
{ "fd"_sr, "64"_sr },
{ "fe"_sr, "65"_sr },
{ "fh"_sr, "68"_sr },
{ "fha"_sr, "681"_sr } });
sstWriter->open(localFile);
for (const auto& [key, value] : kvs2) {
sstWriter->write(key, value);
}
anyFileCreated = sstWriter->finish();
ASSERT(anyFileCreated);
// Write kvs3 to the same sst file where kvs3 modifies values of kvs2
state std::map<Key, Value> kvs3({ { "fa"_sr, "1"_sr },
{ "fab"_sr, "12"_sr },
{ "fad"_sr, "14"_sr },
{ "fb"_sr, "2"_sr },
{ "fba"_sr, "21"_sr },
{ "fc"_sr, "3"_sr },
{ "fd"_sr, "4"_sr },
{ "fe"_sr, "5"_sr },
{ "fh"_sr, "8"_sr },
{ "fha"_sr, "81"_sr } });
sstWriter->open(localFile);
for (const auto& [key, value] : kvs3) {
sstWriter->write(key, value);
}
anyFileCreated = sstWriter->finish();
ASSERT(anyFileCreated);
// Check: sst only contains kv of kvs3
rocksdb::Status status;
rocksdb::IngestExternalFileOptions ingestOptions;
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
status = rocksdb::DB::Open(options, "testdb", &db);
ASSERT(status.ok());
status = db->IngestExternalFile({ localFile }, ingestOptions);
ASSERT(status.ok());
std::string value;
for (const auto& [key, targetValue] : kvs1) {
status = db->Get(rocksdb::ReadOptions(), key.toString(), &value);
ASSERT(status.IsNotFound());
}
for (const auto& [key, targetValue] : kvs2) {
status = db->Get(rocksdb::ReadOptions(), key.toString(), &value);
ASSERT(value != targetValue.toString());
}
for (const auto& [key, targetValue] : kvs3) {
status = db->Get(rocksdb::ReadOptions(), key.toString(), &value);
ASSERT(status.ok());
ASSERT(value == targetValue.toString());
}
delete db;
return Void();
}
} // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL

View File

@ -545,6 +545,75 @@ ACTOR Future<Void> RocksDBCheckpointReader::doClose(RocksDBCheckpointReader* sel
return Void();
}
class RocksDBSstFileWriter : public IRocksDBSstFileWriter {
public:
RocksDBSstFileWriter()
: writer(std::make_unique<rocksdb::SstFileWriter>(rocksdb::EnvOptions(), rocksdb::Options())), hasData(false){};
void open(const std::string localFile) override;
void write(const KeyRef key, const ValueRef value) override;
bool finish() override;
private:
std::unique_ptr<rocksdb::SstFileWriter> writer;
std::string localFile;
bool hasData;
};
void RocksDBSstFileWriter::open(const std::string localFile) {
try {
this->localFile = abspath(localFile);
rocksdb::Status status = this->writer->Open(this->localFile);
if (!status.ok()) {
TraceEvent(SevError, "RocksDBSstFileWriterWrapperOpenFileError")
.detail("LocalFile", this->localFile)
.detail("Status", status.ToString());
throw failed_to_create_checkpoint_shard_metadata();
}
} catch (Error& e) {
throw e;
}
}
void RocksDBSstFileWriter::write(const KeyRef key, const ValueRef value) {
try {
rocksdb::Status status = this->writer->Put(toSlice(key), toSlice(value));
if (!status.ok()) {
TraceEvent(SevError, "RocksDBSstFileWriterWrapperWriteError")
.detail("LocalFile", this->localFile)
.detail("Key", key)
.detail("Value", value)
.detail("Status", status.ToString());
throw failed_to_create_checkpoint_shard_metadata();
}
this->hasData = true;
} catch (Error& e) {
throw e;
}
}
bool RocksDBSstFileWriter::finish() {
try {
if (!this->hasData) {
// writer->finish() cannot create sst file with no entries
// So, we have to check whether any data set to be written to sst file before writer->finish()
return false;
}
rocksdb::Status status = this->writer->Finish();
if (!status.ok()) {
TraceEvent(SevError, "RocksDBSstFileWriterWrapperCloseError")
.detail("LocalFile", this->localFile)
.detail("Status", status.ToString());
throw failed_to_create_checkpoint_shard_metadata();
}
return true;
} catch (Error& e) {
throw e;
}
}
// RocksDBCFCheckpointReader reads an exported RocksDB Column Family checkpoint, and returns the serialized
// checkpoint via nextChunk.
class RocksDBCFCheckpointReader : public ICheckpointReader {
@ -1038,6 +1107,14 @@ ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoi
return nullptr;
}
std::unique_ptr<IRocksDBSstFileWriter> newRocksDBSstFileWriter() {
#ifdef SSD_ROCKSDB_EXPERIMENTAL
std::unique_ptr<IRocksDBSstFileWriter> sstWriter = std::make_unique<RocksDBSstFileWriter>();
return sstWriter;
#endif // SSD_ROCKSDB_EXPERIMENTAL
return nullptr;
}
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint) {
RocksDBColumnFamilyCheckpoint rocksCF;
ObjectReader reader(checkpoint.serializedCheckpoint.begin(), IncludeVersion());

View File

@ -0,0 +1,81 @@
/*
* EncryptKeyProxy.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#if defined(NO_INTELLISENSE) && !defined(ENCRYPT_KEY_PROXY_ACTOR_G_H)
#define ENCRYPT_KEY_PROXY_ACTOR_G_H
#include "fdbserver/EncryptKeyProxy.actor.g.h"
#elif !defined(ENCRYPT_KEY_PROXY_ACTOR_H)
#define ENCRYPT_KEY_PROXY_ACTOR_H
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace EncryptKeyProxy {
inline bool canRetryWith(Error e) {
// The below are the only errors that should be retried, all others should throw immediately
switch (e.code()) {
case error_code_encrypt_keys_fetch_failed:
case error_code_timed_out:
case error_code_connection_failed:
return true;
default:
return false;
}
}
} // namespace EncryptKeyProxy
ACTOR template <class T>
Future<T> kmsReqWithExponentialBackoff(std::function<Future<T>()> func,
std::function<void()> retryTrace,
StringRef funcName) {
state int numRetries = 0;
state double kmsBackoff = FLOW_KNOBS->EKP_KMS_CONNECTION_BACKOFF;
TraceEvent(SevDebug, "KMSRequestStart").suppressFor(30).detail("Function", funcName);
loop {
try {
T val = wait(func());
return val;
} catch (Error& e) {
TraceEvent("KMSRequestReceivedError").detail("Function", funcName).detail("ErrorCode", e.code());
if (!EncryptKeyProxy::canRetryWith(e)) {
throw e;
}
if (numRetries >= FLOW_KNOBS->EKP_KMS_CONNECTION_RETRIES) {
TraceEvent(SevWarnAlways, "KMSRequestRetryLimitExceeded")
.detail("Function", funcName)
.detail("ErrorCode", e.code());
throw e;
}
retryTrace();
numRetries++;
wait(delay(kmsBackoff));
kmsBackoff = kmsBackoff * 2; // exponential backoff
}
}
}
#include "flow/unactorcompiler.h"
#endif

View File

@ -31,6 +31,17 @@
#include "flow/actorcompiler.h" // has to be last include
class IRocksDBSstFileWriter {
public:
virtual void open(const std::string localFile) = 0;
virtual void write(const KeyRef key, const ValueRef value) = 0;
virtual bool finish() = 0;
virtual ~IRocksDBSstFileWriter() {}
};
struct CheckpointFile {
constexpr static FileIdentifier file_identifier = 13804348;
std::string path;
@ -285,6 +296,8 @@ ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoi
const CheckpointAsKeyValues checkpointAsKeyValues,
UID logID);
std::unique_ptr<IRocksDBSstFileWriter> newRocksDBSstFileWriter();
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint);
RocksDBCheckpoint getRocksCheckpoint(const CheckpointMetaData& checkpoint);

View File

@ -202,6 +202,7 @@ static const KeyRangeRef persistCheckpointKeys =
static const KeyRangeRef persistPendingCheckpointKeys =
KeyRangeRef(PERSIST_PREFIX "PendingCheckpoint/"_sr, PERSIST_PREFIX "PendingCheckpoint0"_sr);
static const std::string rocksdbCheckpointDirPrefix = "/rockscheckpoints_";
static const std::string checkpointBytesSampleTempFolder = "/metadata_temp";
struct AddingShard : NonCopyable {
KeyRange keys;
@ -9623,22 +9624,150 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
}
}
ACTOR Future<bool> createSstFileForCheckpointShardBytesSample(StorageServer* data,
CheckpointMetaData metaData,
std::string bytesSampleFile) {
state int failureCount = 0;
state std::unique_ptr<IRocksDBSstFileWriter> sstWriter;
state int64_t numGetRangeQueries;
state int64_t numSampledKeys;
state std::vector<KeyRange>::iterator metaDataRangesIter;
state Key readBegin;
state Key readEnd;
state bool anyFileCreated;
loop {
try {
// Any failure leads to retry until retryCount reaches maximum
// For each retry, cleanup the bytesSampleFile created in last time
ASSERT(directoryExists(parentDirectory(bytesSampleFile)));
if (fileExists(abspath(bytesSampleFile))) {
deleteFile(abspath(bytesSampleFile));
}
anyFileCreated = false;
sstWriter = newRocksDBSstFileWriter();
sstWriter->open(bytesSampleFile);
if (sstWriter == nullptr) {
break;
}
ASSERT(metaData.ranges.size() > 0);
std::sort(metaData.ranges.begin(), metaData.ranges.end(), [](KeyRange a, KeyRange b) {
// Debug usage: make sure no overlapping between compared two ranges
/* if (a.begin < b.begin) {
ASSERT(a.end <= b.begin);
} else if (a.begin > b.begin) {
ASSERT(a.end >= b.begin);
} else {
ASSERT(false);
} */
// metaData.ranges must be in ascending order
// sstWriter requires written keys to be in ascending order
return a.begin < b.begin;
});
numGetRangeQueries = 0;
numSampledKeys = 0;
metaDataRangesIter = metaData.ranges.begin();
while (metaDataRangesIter != metaData.ranges.end()) {
KeyRange range = *metaDataRangesIter;
readBegin = range.begin.withPrefix(persistByteSampleKeys.begin);
readEnd = range.end.withPrefix(persistByteSampleKeys.begin);
loop {
try {
RangeResult readResult = wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd),
SERVER_KNOBS->STORAGE_LIMIT_BYTES,
SERVER_KNOBS->STORAGE_LIMIT_BYTES));
numGetRangeQueries++;
for (int i = 0; i < readResult.size(); i++) {
ASSERT(!readResult[i].key.empty() && !readResult[i].value.empty());
sstWriter->write(readResult[i].key, readResult[i].value);
numSampledKeys++;
}
if (readResult.more) {
ASSERT(readResult.readThrough.present());
readBegin = keyAfter(readResult.readThrough.get());
ASSERT(readBegin <= readEnd);
} else {
break; // finish for current metaDataRangesIter
}
} catch (Error& e) {
if (failureCount < SERVER_KNOBS->ROCKSDB_CREATE_BYTES_SAMPLE_FILE_RETRY_MAX) {
throw retry(); // retry from sketch
} else {
throw e;
}
}
}
metaDataRangesIter++;
}
anyFileCreated = sstWriter->finish();
ASSERT((numSampledKeys > 0 && anyFileCreated) || (numSampledKeys == 0 && !anyFileCreated));
TraceEvent(SevDebug, "DumpCheckPointMetaData", data->thisServerID)
.detail("NumSampledKeys", numSampledKeys)
.detail("NumGetRangeQueries", numGetRangeQueries)
.detail("CheckpointID", metaData.checkpointID)
.detail("BytesSampleTempFile", anyFileCreated ? bytesSampleFile : "noFileCreated");
break;
} catch (Error& e) {
if (e.code() == error_code_retry) {
wait(delay(0.5));
failureCount++;
continue;
} else {
TraceEvent(SevDebug, "StorageCreateCheckpointMetaDataSstFileDumpedFailure", data->thisServerID)
.detail("PendingCheckpoint", metaData.toString())
.detail("Error", e.name());
throw e;
}
}
}
return anyFileCreated;
}
ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData metaData) {
ASSERT(std::find(metaData.src.begin(), metaData.src.end(), data->thisServerID) != metaData.src.end() &&
!metaData.ranges.empty());
state std::string checkpointDir = data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString();
state std::string bytesSampleFile = checkpointDir + "/" + checkpointBytesSampleFileName;
state std::string bytesSampleTempDir = data->folder + checkpointBytesSampleTempFolder;
state std::string bytesSampleTempFile =
bytesSampleTempDir + "/" + metaData.checkpointID.toString() + "_" + checkpointBytesSampleFileName;
const CheckpointRequest req(metaData.version,
metaData.ranges,
static_cast<CheckpointFormat>(metaData.format),
metaData.checkpointID,
data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString());
checkpointDir);
state CheckpointMetaData checkpointResult;
state bool sampleByteSstFileCreated;
std::vector<Future<Void>> createCheckpointActors;
try {
wait(store(checkpointResult, data->storage.checkpoint(req)));
// Create checkpoint
createCheckpointActors.push_back(store(checkpointResult, data->storage.checkpoint(req)));
// Dump the checkpoint meta data to the sst file of metadata.
if (!directoryExists(abspath(bytesSampleTempDir))) {
platform::createDirectory(abspath(bytesSampleTempDir));
}
createCheckpointActors.push_back(store(
sampleByteSstFileCreated, createSstFileForCheckpointShardBytesSample(data, metaData, bytesSampleTempFile)));
wait(waitForAll(createCheckpointActors));
checkpointResult.bytesSampleFile = sampleByteSstFileCreated ? bytesSampleFile : Optional<std::string>();
ASSERT(checkpointResult.src.empty() && checkpointResult.getState() == CheckpointMetaData::Complete);
checkpointResult.src.push_back(data->thisServerID);
checkpointResult.actionId = metaData.actionId;
data->checkpoints[checkpointResult.checkpointID] = checkpointResult;
TraceEvent("StorageCreatedCheckpoint", data->thisServerID).detail("Checkpoint", checkpointResult.toString());
// Move sst file to the checkpoint folder
if (sampleByteSstFileCreated) {
ASSERT(directoryExists(abspath(checkpointDir)));
ASSERT(!fileExists(abspath(bytesSampleFile)));
ASSERT(fileExists(abspath(bytesSampleTempFile)));
renameFile(abspath(bytesSampleTempFile), abspath(bytesSampleFile));
}
TraceEvent("StorageCreatedCheckpoint", data->thisServerID)
.detail("Checkpoint", checkpointResult.toString())
.detail("BytesSampleFile", sampleByteSstFileCreated ? bytesSampleFile : "noFileCreated");
} catch (Error& e) {
// If checkpoint creation fails, the failure is persisted.
checkpointResult = metaData;
@ -10653,6 +10782,12 @@ ByteSampleInfo isKeyValueInSample(const KeyRef key, int64_t totalKvSize) {
double probability =
(double)info.size / (key.size() + SERVER_KNOBS->BYTE_SAMPLING_OVERHEAD) / SERVER_KNOBS->BYTE_SAMPLING_FACTOR;
// MIN_BYTE_SAMPLING_PROBABILITY is 0.99 only for testing
// MIN_BYTE_SAMPLING_PROBABILITY is 0 for other cases
if (SERVER_KNOBS->MIN_BYTE_SAMPLING_PROBABILITY != 0) {
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
}
probability = std::max(probability, SERVER_KNOBS->MIN_BYTE_SAMPLING_PROBABILITY);
info.inSample = a / ((1 << 30) * 4.0) < probability;
info.sampledSize = info.size / std::min(1.0, probability);

View File

@ -113,6 +113,23 @@ struct TenantCapacityLimits : TestWorkload {
return Void();
}
}
ACTOR static Future<Void> setLastTenantId(Database dataDb, int64_t tenantId) {
// set the max tenant id for the standalone cluster
state Reference<ReadYourWritesTransaction> dataTr = makeReference<ReadYourWritesTransaction>(dataDb);
loop {
try {
dataTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
TenantMetadata::lastTenantId().set(dataTr, tenantId);
wait(dataTr->commit());
break;
} catch (Error& e) {
wait(dataTr->onError(e));
}
}
return Void();
}
ACTOR static Future<Void> _start(Database cx, TenantCapacityLimits* self) {
if (self->useMetacluster) {
// Set the max tenant id for the metacluster
@ -139,18 +156,7 @@ struct TenantCapacityLimits : TestWorkload {
}
} else {
// set the max tenant id for the standalone cluster
state Reference<ReadYourWritesTransaction> dataTr = makeReference<ReadYourWritesTransaction>(self->dataDb);
loop {
try {
dataTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
int64_t maxTenantId = TenantAPI::getMaxAllowableTenantId(0);
TenantMetadata::lastTenantId().set(dataTr, maxTenantId);
wait(dataTr->commit());
break;
} catch (Error& e) {
wait(dataTr->onError(e));
}
}
wait(setLastTenantId(self->dataDb, TenantAPI::getMaxAllowableTenantId(0)));
// Use the management database api to create a tenant which should fail since the cluster is at capacity
try {
wait(success(TenantAPI::createTenant(self->dataDb.getReference(), "test_tenant_management_api"_sr)));
@ -159,14 +165,17 @@ struct TenantCapacityLimits : TestWorkload {
ASSERT(e.code() == error_code_cluster_no_capacity);
}
wait(setLastTenantId(self->dataDb, TenantAPI::getMaxAllowableTenantId(0) - 2));
// use special keys to create a tenant which should fail since the cluster is at capacity
state Reference<ReadYourWritesTransaction> dataTr = makeReference<ReadYourWritesTransaction>(self->dataDb);
loop {
try {
dataTr->reset();
dataTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
dataTr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
TenantMapEntry entry;
dataTr->set(self->specialKeysTenantMapPrefix.withSuffix("test_tenant_special_keys"_sr), ""_sr);
dataTr->set(self->specialKeysTenantMapPrefix.withSuffix("test_tenant_special_keys_1"_sr), ""_sr);
dataTr->set(self->specialKeysTenantMapPrefix.withSuffix("test_tenant_special_keys_2"_sr), ""_sr);
dataTr->set(self->specialKeysTenantMapPrefix.withSuffix("test_tenant_special_keys_3"_sr), ""_sr);
wait(dataTr->commit());
ASSERT(false);
} catch (Error& e) {

View File

@ -313,6 +313,10 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
// Refer to EncryptUtil::EncryptAuthTokenAlgo for more details
init( ENCRYPT_HEADER_AUTH_TOKEN_ENABLED, false ); if ( randomize && BUGGIFY ) { ENCRYPT_HEADER_AUTH_TOKEN_ENABLED = !ENCRYPT_HEADER_AUTH_TOKEN_ENABLED; }
init( ENCRYPT_HEADER_AUTH_TOKEN_ALGO, 0 ); if ( randomize && ENCRYPT_HEADER_AUTH_TOKEN_ENABLED ) { ENCRYPT_HEADER_AUTH_TOKEN_ALGO = getRandomAuthTokenAlgo(); }
// start exponential backoff at 5s when reaching out to the KMS from EKP
init( EKP_KMS_CONNECTION_BACKOFF, 5.0 );
// number of times to retry KMS requests from the EKP (roughly attempt to reach out to the KMS for a total of 5 minutes)
init( EKP_KMS_CONNECTION_RETRIES, 6 );
// REST Client
init( RESTCLIENT_MAX_CONNECTIONPOOL_SIZE, 10 );

View File

@ -375,6 +375,8 @@ public:
double ENCRYPT_KEY_CACHE_LOGGING_SKETCH_ACCURACY;
bool ENCRYPT_HEADER_AUTH_TOKEN_ENABLED;
int ENCRYPT_HEADER_AUTH_TOKEN_ALGO;
double EKP_KMS_CONNECTION_BACKOFF;
int EKP_KMS_CONNECTION_RETRIES;
// RESTClient
int RESTCLIENT_MAX_CONNECTIONPOOL_SIZE;

View File

@ -217,6 +217,7 @@ ERROR( invalid_checkpoint_format, 2044, "Invalid checkpoint format" )
ERROR( invalid_throttle_quota_value, 2045, "Invalid quota value. Note that reserved_throughput cannot exceed total_throughput" )
ERROR( failed_to_create_checkpoint, 2046, "Failed to create a checkpoint" )
ERROR( failed_to_restore_checkpoint, 2047, "Failed to restore a checkpoint" )
ERROR( failed_to_create_checkpoint_shard_metadata, 2048, "Failed to dump shard metadata for a checkpoint to a sst file")
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )

View File

@ -8,11 +8,11 @@ fdb_find_sources(FLOWBENCH_SRCS)
if(EXISTS /opt/googlebenchmark-f91b6b AND CLANG)
add_flow_target(EXECUTABLE NAME flowbench SRCS ${FLOWBENCH_SRCS})
target_include_directories(flowbench PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" /opt/googlebenchmark-f91b6b/include)
target_link_directories(flowbench PRIVATE /opt/googlebenchmark-f91b6b/src)
target_link_directories(flowbench PRIVATE /opt/googlebenchmark-f91b6b/lib64)
elseif(EXISTS /opt/googlebenchmark-f91b6b-g++ AND NOT CLANG)
add_flow_target(EXECUTABLE NAME flowbench SRCS ${FLOWBENCH_SRCS})
target_include_directories(flowbench PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" /opt/googlebenchmark-f91b6b-g++/include)
target_link_directories(flowbench PRIVATE /opt/googlebenchmark-f91b6b-g++/src)
target_link_directories(flowbench PRIVATE /opt/googlebenchmark-f91b6b-g++/lib64)
else()
## This seems to be copy-pasted from the the google benchmark documentation.
## It breaks if you attempt to re-use a build of googlebenchmark across FDB

View File

@ -9,6 +9,7 @@ allowDefaultTenant = false
[[knobs]]
shard_encode_location_metadata = true
enable_dd_physical_shard = true
min_byte_sampling_probability = 0.99
[[test]]
testTitle = 'PhysicalShardMove'