Merge remote-tracking branch 'apple/main' into vgasiunas-fdbmonitor-in-python

This commit is contained in:
Vaidas Gasiunas 2022-11-07 10:33:06 +01:00
commit 286f9b729a
53 changed files with 806 additions and 299 deletions

View File

@ -459,8 +459,10 @@ int main(int argc, char** argv) {
retCode = 1;
}
fprintf(stderr, "Stopping FDB network thread\n");
fdb_check(fdb::network::stop(), "Failed to stop FDB thread");
network_thread.join();
fprintf(stderr, "FDB network thread successfully stopped\n");
} catch (const std::exception& err) {
fmt::print(stderr, "ERROR: {}\n", err.what());
retCode = 1;

View File

@ -142,6 +142,8 @@ Here is a complete list of valid parameters:
*multipart_min_part_size* (or *minps*) - Min part size for multipart uploads.
*enable_read_cache* (or *erc*) - Whether to enable read block cache.
*read_block_size* (or *rbs*) - Block size in bytes to be used for reads.
*read_ahead_blocks* (or *rab*) - Number of blocks to read ahead of requested offset.

View File

@ -11,6 +11,7 @@ Release Notes
* Released with AVX disabled.
* Fixed a transaction log data corruption bug. `(PR #8525) <https://github.com/apple/foundationdb/pull/8525>`_, `(PR #8562) <https://github.com/apple/foundationdb/pull/8562>`_, and `(PR #8647) <https://github.com/apple/foundationdb/pull/8647>`_
* Fixed a rare data race in transaction logs when PEEK_BATCHING_EMPTY_MSG is enabled. `(PR #8660) <https://github.com/apple/foundationdb/pull/8660>`_
* Fixed a heap-use-after-free bug in cluster controller. `(PR #8683) <https://github.com/apple/foundationdb/pull/8683>`_
* Changed consistency check to report all corruptions. `(PR #8571) <https://github.com/apple/foundationdb/pull/8571>`_
* Fixed a rare storage server crashing bug after recovery. `(PR #8468) <https://github.com/apple/foundationdb/pull/8468>`_
* Added client knob UNLINKONLOAD_FDBCLIB to control deletion of external client libraries. `(PR #8434) <https://github.com/apple/foundationdb/pull/8434>`_

View File

@ -175,11 +175,13 @@ Future<Reference<IAsyncFile>> BackupContainerS3BlobStore::readFile(const std::st
if (usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, AsyncFileEncrypted::Mode::READ_ONLY);
}
if (m_bstore->knobs.enable_read_cache) {
f = makeReference<AsyncFileReadAheadCache>(f,
m_bstore->knobs.read_block_size,
m_bstore->knobs.read_ahead_blocks,
m_bstore->knobs.concurrent_reads_per_file,
m_bstore->knobs.read_cache_blocks_per_file);
}
return f;
}

View File

@ -76,6 +76,10 @@ BlobCipherMetrics::BlobCipherMetrics()
UID(),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
getBlobMetadataLatency("GetBlobMetadataLatency",
UID(),
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL,
FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_SAMPLE_SIZE),
counterSets({ CounterSet(cc, "TLog"),
CounterSet(cc, "KVMemory"),
CounterSet(cc, "KVRedwood"),

View File

@ -650,12 +650,12 @@ struct IndexedBlobGranuleFile {
IndexBlobGranuleFileChunkRef chunkRef =
IndexBlobGranuleFileChunkRef::fromBytes(cipherKeysCtx, childData, childArena);
ChildType child;
ObjectReader dataReader(chunkRef.chunkBytes.get().begin(), IncludeVersion());
dataReader.deserialize(FileIdentifierFor<ChildType>::value, child, childArena);
// TODO implement some sort of decrypted+decompressed+deserialized cache, if this object gets reused?
return Standalone<ChildType>(child, childArena);
BinaryReader br(chunkRef.chunkBytes.get(), IncludeVersion());
Standalone<ChildType> child;
br >> child;
return child;
}
template <class Ar>
@ -751,7 +751,7 @@ Value serializeChunkedSnapshot(const Standalone<StringRef>& fileNameRef,
if (currentChunkBytesEstimate >= targetChunkBytes || i == snapshot.size() - 1) {
Value serialized =
ObjectWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
BinaryWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
Value chunkBytes =
IndexBlobGranuleFileChunkRef::toBytes(cipherKeysCtx, compressFilter, serialized, file.arena());
chunks.push_back(chunkBytes);
@ -1020,7 +1020,7 @@ Value serializeChunkedDeltaFile(const Standalone<StringRef>& fileNameRef,
if (currentChunkBytesEstimate >= chunkSize || i == boundaries.size() - 1) {
Value serialized =
ObjectWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
BinaryWriter::toValue(currentChunk, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
Value chunkBytes =
IndexBlobGranuleFileChunkRef::toBytes(cipherKeysCtx, compressFilter, serialized, file.arena());
chunks.push_back(chunkBytes);

View File

@ -220,6 +220,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BLOBSTORE_CONCURRENT_WRITES_PER_FILE, 5 );
init( BLOBSTORE_CONCURRENT_READS_PER_FILE, 3 );
init( BLOBSTORE_ENABLE_READ_CACHE, true );
init( BLOBSTORE_READ_BLOCK_SIZE, 1024 * 1024 );
init( BLOBSTORE_READ_AHEAD_BLOCKS, 0 );
init( BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE, 2 );

View File

@ -658,7 +658,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
parse((&type), value);
blobGranulesEnabled = (type != 0);
} else if (ck == "encryption_at_rest_mode"_sr) {
encryptionAtRestMode = EncryptionAtRestMode::fromValue(value);
encryptionAtRestMode = EncryptionAtRestMode::fromValueRef(Optional<ValueRef>(value));
} else {
return false;
}

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "flow/Trace.h"
#ifdef ADDRESS_SANITIZER
#include <sanitizer/lsan_interface.h>
#endif
@ -2812,11 +2813,19 @@ void MultiVersionApi::runNetwork() {
});
}
try {
localClient->api->runNetwork();
} catch (const Error& e) {
closeTraceFile();
throw e;
}
for (auto h : handles) {
waitThread(h);
}
TraceEvent("MultiVersionRunNetworkTerminating");
closeTraceFile();
}
void MultiVersionApi::stopNetwork() {

View File

@ -6384,8 +6384,11 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
}
if (req.tagSet.present() && trState->options.priority < TransactionPriority::IMMEDIATE) {
wait(store(req.transaction.read_snapshot, readVersion) &&
store(req.commitCostEstimation, estimateCommitCosts(trState, &req.transaction)));
state Future<Optional<ClientTrCommitCostEstimation>> commitCostFuture =
estimateCommitCosts(trState, &req.transaction);
// We need to wait for the read version first so that we can be notified if the database is locked
wait(store(req.transaction.read_snapshot, readVersion));
wait(store(req.commitCostEstimation, commitCostFuture));
} else {
wait(store(req.transaction.read_snapshot, readVersion));
}

View File

@ -88,6 +88,7 @@ S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
concurrent_lists = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_LISTS;
concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE;
concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
enable_read_cache = CLIENT_KNOBS->BLOBSTORE_ENABLE_READ_CACHE;
read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE;
read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS;
read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE;
@ -125,6 +126,7 @@ bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
TRY_PARAM(concurrent_lists, cl);
TRY_PARAM(concurrent_reads_per_file, crpf);
TRY_PARAM(concurrent_writes_per_file, cwpf);
TRY_PARAM(enable_read_cache, erc);
TRY_PARAM(read_block_size, rbs);
TRY_PARAM(read_ahead_blocks, rab);
TRY_PARAM(read_cache_blocks_per_file, rcb);
@ -162,6 +164,7 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
_CHECK_PARAM(concurrent_lists, cl);
_CHECK_PARAM(concurrent_reads_per_file, crpf);
_CHECK_PARAM(concurrent_writes_per_file, cwpf);
_CHECK_PARAM(enable_read_cache, erc);
_CHECK_PARAM(read_block_size, rbs);
_CHECK_PARAM(read_ahead_blocks, rab);
_CHECK_PARAM(read_cache_blocks_per_file, rcb);

View File

@ -115,6 +115,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ENABLE_DETAILED_TLOG_POP_TRACE, false ); if ( randomize && BUGGIFY ) ENABLE_DETAILED_TLOG_POP_TRACE = true;
init( PEEK_BATCHING_EMPTY_MSG, false ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG = true;
init( PEEK_BATCHING_EMPTY_MSG_INTERVAL, 0.001 ); if ( randomize && BUGGIFY ) PEEK_BATCHING_EMPTY_MSG_INTERVAL = 0.01;
init( POP_FROM_LOG_DELAY, 1 ); if ( randomize && BUGGIFY ) POP_FROM_LOG_DELAY = 0;
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( MAX_FORKED_PROCESS_OUTPUT, 1024 );
@ -295,7 +296,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 10 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 1000;
init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 20 );
init( DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC, isSimulated ? 2 : 21 * 60 * 60 * 24 ); if(randomize && BUGGIFY) DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC = isSimulated ? 0: 120;
init( DD_TENANT_AWARENESS_ENABLED, false );
init( DD_TENANT_AWARENESS_ENABLED, false ); if(isSimulated) DD_TENANT_AWARENESS_ENABLED = deterministicRandom()->coinflip();
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_LIST_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
init( TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
init( TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
@ -407,6 +408,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0;
init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1;
init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false;
init( ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS, false ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS = deterministicRandom()->coinflip() ? true : false;
init( ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT, 200 );
// Set to 0 to disable rocksdb write rate limiting. Rate limiter unit: bytes per second.
init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 );
// If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO.
@ -958,7 +961,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( KMS_CONNECTOR_TYPE, "RESTKmsConnector" );
// Blob granlues
init( BG_URL, isSimulated ? "file://fdbblob/" : "" ); // TODO: store in system key space or something, eventually
init( BG_URL, isSimulated ? "file://simfdb/fdbblob/" : "" ); // TODO: store in system key space or something, eventually
bool buggifyMediumGranules = simulationMediumShards || (randomize && BUGGIFY);
// BlobGranuleVerify* simulation tests use "knobs", BlobGranuleCorrectness* use "tenant", default in real clusters is "knobs"
init( BG_METADATA_SOURCE, "knobs" );
@ -1002,6 +1005,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_MANIFEST_BACKUP, false );
init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 30.0 );
init( BLOB_FULL_RESTORE_MODE, false );
init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0);
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );

View File

@ -743,10 +743,10 @@ void ThreadSafeApi::runNetwork() {
Optional<Error> runErr;
try {
::runNetwork();
} catch (Error& e) {
} catch (const Error& e) {
TraceEvent(SevError, "RunNetworkError").error(e);
runErr = e;
} catch (std::exception& e) {
} catch (const std::exception& e) {
runErr = unknown_error();
TraceEvent(SevError, "RunNetworkError").error(unknown_error()).detail("RootException", e.what());
} catch (...) {
@ -757,9 +757,9 @@ void ThreadSafeApi::runNetwork() {
for (auto& hook : threadCompletionHooks) {
try {
hook.first(hook.second);
} catch (Error& e) {
} catch (const Error& e) {
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
} catch (std::exception& e) {
} catch (const std::exception& e) {
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()).detail("RootException", e.what());
} catch (...) {
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
@ -767,12 +767,10 @@ void ThreadSafeApi::runNetwork() {
}
if (runErr.present()) {
closeTraceFile();
throw runErr.get();
}
TraceEvent("RunNetworkTerminating");
closeTraceFile();
}
void ThreadSafeApi::stopNetwork() {

View File

@ -103,6 +103,7 @@ public:
Counter latestCipherKeyCacheNeedsRefresh;
LatencySample getCipherKeysLatency;
LatencySample getLatestCipherKeysLatency;
LatencySample getBlobMetadataLatency;
std::array<CounterSet, int(UsageType::MAX)> counterSets;
};

View File

@ -235,6 +235,7 @@ public:
int BLOBSTORE_CONCURRENT_LISTS;
int BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
int BLOBSTORE_CONCURRENT_READS_PER_FILE;
int BLOBSTORE_ENABLE_READ_CACHE;
int BLOBSTORE_READ_BLOCK_SIZE;
int BLOBSTORE_READ_AHEAD_BLOCKS;
int BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE;

View File

@ -546,36 +546,37 @@ struct hash<KeyRange> {
enum { invalidVersion = -1, latestVersion = -2, MAX_VERSION = std::numeric_limits<int64_t>::max() };
inline Key keyAfter(const KeyRef& key) {
if (key == "\xff\xff"_sr)
return key;
Standalone<StringRef> r;
uint8_t* s = new (r.arena()) uint8_t[key.size() + 1];
if (key.size() > 0) {
memcpy(s, key.begin(), key.size());
}
s[key.size()] = 0;
((StringRef&)r) = StringRef(s, key.size() + 1);
return r;
}
inline KeyRef keyAfter(const KeyRef& key, Arena& arena) {
if (key == "\xff\xff"_sr)
return key;
// Don't include fdbclient/SystemData.h for the allKeys symbol to avoid a cyclic include
static const auto allKeysEnd = "\xff\xff"_sr;
if (key == allKeysEnd) {
return allKeysEnd;
}
uint8_t* t = new (arena) uint8_t[key.size() + 1];
if (!key.empty()) {
memcpy(t, key.begin(), key.size());
}
t[key.size()] = 0;
return KeyRef(t, key.size() + 1);
}
inline KeyRange singleKeyRange(const KeyRef& a) {
return KeyRangeRef(a, keyAfter(a));
inline Key keyAfter(const KeyRef& key) {
Key result;
result.contents() = keyAfter(key, result.arena());
return result;
}
inline KeyRangeRef singleKeyRange(KeyRef const& key, Arena& arena) {
uint8_t* t = new (arena) uint8_t[key.size() + 1];
if (!key.empty()) {
memcpy(t, key.begin(), key.size());
}
t[key.size()] = 0;
return KeyRangeRef(KeyRef(t, key.size()), KeyRef(t, key.size() + 1));
}
inline KeyRange singleKeyRange(const KeyRef& a) {
KeyRange result;
result.contents() = singleKeyRange(a, result.arena());
return result;
}
inline KeyRange prefixRange(KeyRef prefix) {
Standalone<KeyRangeRef> range;
KeyRef start = KeyRef(range.arena(), prefix);
@ -1494,7 +1495,7 @@ struct EncryptionAtRestMode {
bool operator==(const EncryptionAtRestMode& e) const { return isEquals(e); }
bool operator!=(const EncryptionAtRestMode& e) const { return !isEquals(e); }
static EncryptionAtRestMode fromValue(Optional<ValueRef> val) {
static EncryptionAtRestMode fromValueRef(Optional<ValueRef> val) {
if (!val.present()) {
return DISABLED;
}
@ -1508,6 +1509,14 @@ struct EncryptionAtRestMode {
return static_cast<Mode>(num);
}
static EncryptionAtRestMode fromValue(Optional<Value> val) {
if (!val.present()) {
return EncryptionAtRestMode();
}
return EncryptionAtRestMode::fromValueRef(Optional<ValueRef>(val.get().contents()));
}
uint32_t mode;
};

View File

@ -58,8 +58,8 @@ public:
requests_per_second, list_requests_per_second, write_requests_per_second, read_requests_per_second,
delete_requests_per_second, multipart_max_part_size, multipart_min_part_size, concurrent_requests,
concurrent_uploads, concurrent_lists, concurrent_reads_per_file, concurrent_writes_per_file,
read_block_size, read_ahead_blocks, read_cache_blocks_per_file, max_send_bytes_per_second,
max_recv_bytes_per_second, sdk_auth;
enable_read_cache, read_block_size, read_ahead_blocks, read_cache_blocks_per_file,
max_send_bytes_per_second, max_recv_bytes_per_second, sdk_auth;
bool set(StringRef name, int value);
std::string getURLParameters() const;
static std::vector<std::string> getKnobDescriptions() {
@ -86,6 +86,7 @@ public:
"concurrent_lists (or cl) Max concurrent list operations that can be in progress at once.",
"concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.",
"concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.",
"enable_read_cache (or erc) Whether read block caching is enabled.",
"read_block_size (or rbs) Block size in bytes to be used for reads.",
"read_ahead_blocks (or rab) Number of blocks to read ahead of requested offset.",
"read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.",

View File

@ -110,6 +110,7 @@ public:
double BLOCKING_PEEK_TIMEOUT;
bool PEEK_BATCHING_EMPTY_MSG;
double PEEK_BATCHING_EMPTY_MSG_INTERVAL;
double POP_FROM_LOG_DELAY;
// Data distribution queue
double HEALTH_POLL_TIME;
@ -334,6 +335,8 @@ public:
double ROCKSDB_HISTOGRAMS_SAMPLE_RATE;
double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME;
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
bool ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS;
int ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT;
int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC;
bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE;
std::string DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY;
@ -982,6 +985,7 @@ public:
bool BLOB_MANIFEST_BACKUP;
double BLOB_MANIFEST_BACKUP_INTERVAL;
bool BLOB_FULL_RESTORE_MODE;
double BLOB_MIGRATOR_CHECK_INTERVAL;
// Blob metadata
int64_t BLOB_METADATA_CACHE_TTL;

View File

@ -734,6 +734,7 @@ public:
// If cancelled, request was or will be delivered zero or more times.
template <class X>
Future<REPLY_TYPE(X)> getReply(const X& value) const {
// Ensure the same request isn't used multiple times
ASSERT(!getReplyPromise(value).getFuture().isReady());
if (queue->isRemoteEndpoint()) {
return sendCanceler(getReplyPromise(value),

View File

@ -477,6 +477,7 @@ public:
Optional<Standalone<StringRef>> primaryDcId;
Reference<IReplicationPolicy> remoteTLogPolicy;
int32_t usableRegions;
bool quiesced = false;
std::string disablePrimary;
std::string disableRemote;
std::string originalRegions;

View File

@ -1410,6 +1410,7 @@ public:
for (auto processInfo : getAllProcesses()) {
if (currentDcId != processInfo->locality.dcId() || // skip other dc
processInfo->startingClass != ProcessClass::BlobWorkerClass || // skip non blob workers
processInfo->failed || // if process was killed but has not yet been removed from the process list
processInfo->locality.machineId() == machineId) { // skip current machine
continue;
}

View File

@ -462,7 +462,7 @@ ACTOR Future<Void> loadBlobMetadataForTenants(
}
// FIXME: if one tenant gets an error, don't kill whole process
// TODO: add latency metrics
state double startTime = now();
loop {
Future<EKPGetLatestBlobMetadataReply> requestFuture;
if (self->dbInfo.isValid() && self->dbInfo->get().encryptKeyProxy.present()) {
@ -485,6 +485,8 @@ ACTOR Future<Void> loadBlobMetadataForTenants(
ASSERT(dataEntry.begin() == info->second.prefix);
dataEntry.cvalue()->updateBStore(metadata);
}
double elapsed = now() - startTime;
BlobCipherMetrics::getInstance()->getBlobMetadataLatency.addMeasurement(elapsed);
return Void();
}
when(wait(self->dbInfo->onChange())) {}

View File

@ -4238,7 +4238,13 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
Version purgeVersion,
KeyRange granuleRange,
Optional<UID> mergeChildID,
bool force) {
bool force,
Future<Void> parentFuture) {
// wait for parent to finish first to avoid ordering/orphaning issues
wait(parentFuture);
// yield to avoid a long callstack and to allow this to get cancelled
wait(delay(0));
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Fully deleting granule [{1} - {2}): {3} @ {4}{5}\n",
self->epoch,
@ -4296,6 +4302,11 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
// deleting files before corresponding metadata reduces the # of orphaned files.
wait(waitForAll(deletions));
if (BUGGIFY && self->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
}
// delete metadata in FDB (history entry and file keys)
if (BM_PURGE_DEBUG) {
fmt::print(
@ -4331,6 +4342,11 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
}
}
if (BUGGIFY && self->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
}
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Fully deleting granule {1}: success {2}\n",
self->epoch,
@ -4501,7 +4517,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
state std::queue<std::tuple<KeyRange, Version, Version, Optional<UID>>> historyEntryQueue;
// stacks of <granuleId, historyKey> and <granuleId> (and mergeChildID) to track which granules to delete
state std::vector<std::tuple<UID, Key, KeyRange, Optional<UID>>> toFullyDelete;
state std::vector<std::tuple<UID, Key, KeyRange, Optional<UID>, Version>> toFullyDelete;
state std::vector<std::pair<UID, KeyRange>> toPartiallyDelete;
// track which granules we have already added to traversal
@ -4737,7 +4753,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
fmt::print(
"BM {0} Granule {1} will be FULLY deleted\n", self->epoch, currHistoryNode.granuleID.toString());
}
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange, mergeChildID });
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange, mergeChildID, startVersion });
} else if (startVersion < purgeVersion) {
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Granule {1} will be partially deleted\n",
@ -4810,36 +4826,65 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
.detail("DeletingFullyCount", toFullyDelete.size())
.detail("DeletingPartiallyCount", toPartiallyDelete.size());
state std::vector<Future<Void>> partialDeletions;
state int i;
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: {1} granules to fully delete\n", self->epoch, toFullyDelete.size());
}
// Go backwards through set of granules to guarantee deleting oldest first. This avoids orphaning granules in the
// deletion process
// FIXME: could track explicit parent dependencies and parallelize so long as a parent and child aren't running in
// parallel, but that's non-trivial
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
if (!toFullyDelete.empty()) {
state std::vector<Future<Void>> fullDeletions;
KeyRangeMap<std::pair<Version, Future<Void>>> parentDelete;
parentDelete.insert(normalKeys, { 0, Future<Void>(Void()) });
std::vector<std::pair<Version, int>> deleteOrder;
deleteOrder.reserve(toFullyDelete.size());
for (int i = 0; i < toFullyDelete.size(); i++) {
deleteOrder.push_back({ std::get<4>(toFullyDelete[i]), i });
}
std::sort(deleteOrder.begin(), deleteOrder.end());
for (i = 0; i < deleteOrder.size(); i++) {
state UID granuleId;
Key historyKey;
KeyRange keyRange;
Optional<UID> mergeChildId;
std::tie(granuleId, historyKey, keyRange, mergeChildId) = toFullyDelete[i];
Version startVersion;
std::tie(granuleId, historyKey, keyRange, mergeChildId, startVersion) =
toFullyDelete[deleteOrder[i].second];
// FIXME: consider batching into a single txn (need to take care of txn size limit)
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: About to fully delete granule {1}\n", self->epoch, granuleId.toString());
}
wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion, keyRange, mergeChildId, force));
if (BUGGIFY && self->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
std::vector<Future<Void>> parents;
auto parentRanges = parentDelete.intersectingRanges(keyRange);
for (auto& it : parentRanges) {
if (startVersion <= it.cvalue().first) {
fmt::print("ERROR: [{0} - {1}) @ {2} <= [{3} - {4}) @ {5}\n",
keyRange.begin.printable(),
keyRange.end.printable(),
startVersion,
it.begin().printable(),
it.end().printable(),
it.cvalue().first);
}
ASSERT(startVersion > it.cvalue().first);
parents.push_back(it.cvalue().second);
}
Future<Void> deleteFuture = fullyDeleteGranule(
self, granuleId, historyKey, purgeVersion, keyRange, mergeChildId, force, waitForAll(parents));
fullDeletions.push_back(deleteFuture);
parentDelete.insert(keyRange, { startVersion, deleteFuture });
}
wait(waitForAll(fullDeletions));
}
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: {1} granules to partially delete\n", self->epoch, toPartiallyDelete.size());
}
state std::vector<Future<Void>> partialDeletions;
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
UID granuleId;
KeyRange keyRange;
@ -4852,6 +4897,11 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
wait(waitForAll(partialDeletions));
if (BUGGIFY && self->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
}
if (force) {
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -4877,6 +4927,11 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
}
}
if (BUGGIFY && self->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
}
// Now that all the necessary granules and their files have been deleted, we can
// clear the purgeIntent key to signify that the work is done. However, there could have been
// another purgeIntent that got written for this table while we were processing this one.

View File

@ -18,8 +18,6 @@
* limitations under the License.
*/
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
@ -35,6 +33,8 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // has to be last include
#include "flow/network.h"
#include <algorithm>
@ -72,7 +72,7 @@ public:
self->blobGranules_ = granules;
wait(prepare(self, normalKeys));
wait(advanceVersion(self));
wait(serverLoop(self));
return Void();
}
@ -148,9 +148,78 @@ private:
}
}
// Print migration progress periodically
ACTOR static Future<Void> logProgress(Reference<BlobMigrator> self) {
loop {
bool done = wait(checkProgress(self));
if (done)
return Void();
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
}
}
// Check key ranges that are migrated. Return true if all ranges are done
ACTOR static Future<bool> checkProgress(Reference<BlobMigrator> self) {
state Transaction tr(self->db_);
loop {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
// Get key ranges that are still owned by the migrator. Those ranges are
// incompleted migrations
state UID serverID = self->interf_.ssi.id();
RangeResult ranges = wait(krmGetRanges(&tr, serverKeysPrefixFor(serverID), normalKeys));
// Count incompleted size
int64_t incompleted = 0;
for (auto i = 0; i < ranges.size() - 1; ++i) {
if (ranges[i].value == serverKeysTrue) {
KeyRangeRef range(ranges[i].key, ranges[i + 1].key);
int64_t bytes = sizeInBytes(self, range);
dprint(" incompleted {}, size: {}\n", range.toString(), bytes);
incompleted += bytes;
}
}
// Calculated progress
int64_t total = sizeInBytes(self);
int progress = (total - incompleted) * 100 / total;
bool done = incompleted == 0;
dprint("Progress {} :{}%. done {}\n", serverID.toString(), progress, done);
return done;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Advance version, so that future commits will have a larger version than the restored data
ACTOR static Future<Void> advanceVersion(Reference<BlobMigrator> self) {
state Transaction tr(self->db_);
loop {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
Version currentVersion = wait(tr.getRawReadVersion());
Version expectedVersion = maxVersion(self);
if (currentVersion <= expectedVersion) {
tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(expectedVersion + 1, Unversioned()));
dprint("Advance version from {} to {}\n", currentVersion, expectedVersion);
wait(tr.commit());
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Main server loop
ACTOR static Future<Void> serverLoop(Reference<BlobMigrator> self) {
self->actors_.add(waitFailureServer(self->interf_.ssi.waitFailure.getFuture()));
self->actors_.add(logProgress(self));
self->actors_.add(handleRequest(self));
self->actors_.add(handleUnsupportedRequest(self));
loop {

View File

@ -4470,9 +4470,10 @@ ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
return Void();
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
if (!bwData->shuttingDown) {
if (!bwData->shuttingDown && !isSelfReassign) {
// the cancelled was because the granule open was cancelled, not because the whole blob
// worker was.
ASSERT(!req.reply.isSet());
req.reply.sendError(granule_assignment_conflict());
}
throw e;

View File

@ -25,6 +25,7 @@
#include <set>
#include <vector>
#include "fdbclient/FDBTypes.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbrpc/FailureMonitor.h"
@ -32,6 +33,7 @@
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
@ -66,6 +68,7 @@
#include "fdbrpc/ReplicationUtils.h"
#include "fdbrpc/sim_validation.h"
#include "fdbclient/KeyBackedTypes.h"
#include "flow/Error.h"
#include "flow/Trace.h"
#include "flow/Util.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -389,7 +392,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
wait(delay(0.0));
recoveryCore.cancel();
wait(cleanupRecoveryActorCollection(recoveryData, true /* exThrown */));
wait(cleanupRecoveryActorCollection(recoveryData, /*exThrown=*/true));
ASSERT(addActor.isEmpty());
CODE_PROBE(err.code() == error_code_tlog_failed, "Terminated due to tLog failure");
@ -3025,6 +3028,18 @@ ACTOR Future<Void> updateClusterId(ClusterControllerData* self) {
}
}
ACTOR Future<Void> handleGetEncryptionAtRestMode(ClusterControllerData* self, ClusterControllerFullInterface ccInterf) {
loop {
state GetEncryptionAtRestModeRequest req = waitNext(ccInterf.getEncryptionAtRestMode.getFuture());
TraceEvent("HandleGetEncryptionAtRestModeStart").detail("TlogId", req.tlogId);
EncryptionAtRestMode mode = wait(self->encryptionAtRestMode.getFuture());
GetEncryptionAtRestModeResponse resp;
resp.mode = mode;
req.reply.send(resp);
TraceEvent("HandleGetEncryptionAtRestModeEnd").detail("TlogId", req.tlogId).detail("Mode", resp.mode);
}
}
ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
Future<Void> leaderFail,
ServerCoordinators coordinators,
@ -3070,6 +3085,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(metaclusterMetricsUpdater(&self));
self.addActor.send(dbInfoUpdater(&self));
self.addActor.send(updateClusterId(&self));
self.addActor.send(handleGetEncryptionAtRestMode(&self, interf));
self.addActor.send(self.clusterControllerMetrics.traceCounters("ClusterControllerMetrics",
self.id,
SERVER_KNOBS->STORAGE_LOGGING_DELAY,
@ -3090,8 +3106,8 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
endRole(Role::CLUSTER_CONTROLLER, interf.id(), "Stop Received Signal", true);
}
// We shut down normally even if there was a serious error (so this fdbserver may be re-elected cluster
// controller)
// We shut down normally even if there was a serious error (so this fdbserver may be re-elected
// cluster controller)
return Void();
}
when(OpenDatabaseRequest req = waitNext(interf.clientInterface.openDatabase.getFuture())) {
@ -3243,11 +3259,11 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
Reference<AsyncVar<Optional<UID>>> clusterId) {
// Defer this wait optimization of cluster configuration has 'Encryption data at-rest' enabled.
// Encryption depends on available of EncryptKeyProxy (EKP) FDB role to enable fetch/refresh of encryption keys
// created and managed by external KeyManagementService (KMS).
// Encryption depends on available of EncryptKeyProxy (EKP) FDB role to enable fetch/refresh of
// encryption keys created and managed by external KeyManagementService (KMS).
//
// TODO: Wait optimization is to ensure the worker server on the same process gets registered with the new CC before
// recruitment. Unify the codepath for both Encryption enable vs disable scenarios.
// TODO: Wait optimization is to ensure the worker server on the same process gets registered with the
// new CC before recruitment. Unify the codepath for both Encryption enable vs disable scenarios.
if (!SERVER_KNOBS->ENABLE_ENCRYPTION) {
wait(recoveredDiskFiles);
@ -3278,8 +3294,8 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
namespace {
// Tests `ClusterControllerData::updateWorkerHealth()` can update `ClusterControllerData::workerHealth` based on
// `UpdateWorkerHealth` request correctly.
// Tests `ClusterControllerData::updateWorkerHealth()` can update `ClusterControllerData::workerHealth`
// based on `UpdateWorkerHealth` request correctly.
TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
state ClusterControllerData data(ClusterControllerFullInterface(),
@ -3292,8 +3308,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
state NetworkAddress badPeer2(IPAddress(0x03030303), 1);
state NetworkAddress badPeer3(IPAddress(0x04040404), 1);
// Create a `UpdateWorkerHealthRequest` with two bad peers, and they should appear in the `workerAddress`'s
// degradedPeers.
// Create a `UpdateWorkerHealthRequest` with two bad peers, and they should appear in the
// `workerAddress`'s degradedPeers.
{
UpdateWorkerHealthRequest req;
req.address = workerAddress;
@ -3354,8 +3370,8 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
previousRefreshTime = health.degradedPeers[badPeer3].lastRefreshTime;
}
// Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should not remove the worker from
// `workerHealth`.
// Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should not remove the worker
// from `workerHealth`.
{
wait(delay(0.001));
UpdateWorkerHealthRequest req;
@ -3439,8 +3455,8 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
NetworkAddress badPeer3(IPAddress(0x04040404), 1);
NetworkAddress badPeer4(IPAddress(0x05050505), 1);
// Test that a reported degraded link should stay for sometime before being considered as a degraded link by
// cluster controller.
// Test that a reported degraded link should stay for sometime before being considered as a degraded
// link by cluster controller.
{
data.workerHealth[worker].degradedPeers[badPeer1] = { now(), now() };
data.workerHealth[worker].disconnectedPeers[badPeer2] = { now(), now() };
@ -3472,7 +3488,8 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
data.workerHealth.clear();
}
// Test that if both A complains B and B compalins A, only one of the server will be chosen as degraded server.
// Test that if both A complains B and B compalins A, only one of the server will be chosen as degraded
// server.
{
data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
now() };
@ -3553,8 +3570,8 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
data.workerHealth.clear();
}
// Test that if the degradation is reported both ways between A and other 4 servers, no degraded server is
// returned.
// Test that if the degradation is reported both ways between A and other 4 servers, no degraded server
// is returned.
{
ASSERT(SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE < 4);
data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,

View File

@ -18,12 +18,14 @@
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Metacluster.h"
#include "fdbrpc/sim_validation.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/BackupProgress.actor.h"
#include "fdbserver/ClusterRecovery.actor.h"
#include "fdbserver/EncryptionOpsUtils.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WaitFailure.h"
@ -429,18 +431,34 @@ ACTOR Future<Void> rejoinRequestHandler(Reference<ClusterRecoveryData> self) {
}
}
namespace {
EncryptionAtRestMode getEncryptionAtRest() {
// TODO: Use db-config encryption config to determine cluster encryption status
if (SERVER_KNOBS->ENABLE_ENCRYPTION) {
return EncryptionAtRestMode(EncryptionAtRestMode::Mode::AES_256_CTR);
} else {
return EncryptionAtRestMode();
}
}
} // namespace
// Keeps the coordinated state (cstate) updated as the set of recruited tlogs change through recovery.
ACTOR Future<Void> trackTlogRecovery(Reference<ClusterRecoveryData> self,
Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems,
Future<Void> minRecoveryDuration) {
state Future<Void> rejoinRequests = Never();
state DBRecoveryCount recoverCount = self->cstate.myDBState.recoveryCount + 1;
state EncryptionAtRestMode encryptionAtRestMode = getEncryptionAtRest();
state DatabaseConfiguration configuration =
self->configuration; // self-configuration can be changed by configurationMonitor so we need a copy
loop {
state DBCoreState newState;
self->logSystem->toCoreState(newState);
newState.recoveryCount = recoverCount;
// Update Coordinators EncryptionAtRest status during the very first recovery of the cluster (empty database)
newState.encryptionAtRestMode = encryptionAtRestMode;
state Future<Void> changed = self->logSystem->onCoreStateChanged();
ASSERT(newState.tLogs[0].tLogWriteAntiQuorum == configuration.tLogWriteAntiQuorum &&
@ -454,6 +472,7 @@ ACTOR Future<Void> trackTlogRecovery(Reference<ClusterRecoveryData> self,
.detail("FinalUpdate", finalUpdate)
.detail("NewState.tlogs", newState.tLogs.size())
.detail("NewState.OldTLogs", newState.oldTLogData.size())
.detail("NewState.EncryptionAtRestMode", newState.encryptionAtRestMode.toString())
.detail("Expected.tlogs",
configuration.expectedLogSets(self->primaryDcId.size() ? self->primaryDcId[0] : Optional<Key>()));
wait(self->cstate.write(newState, finalUpdate));
@ -934,7 +953,7 @@ ACTOR Future<std::vector<Standalone<CommitTransactionRef>>> recruitEverything(
.detail("Status", RecoveryStatus::names[status])
.trackLatest(self->clusterRecoveryStateEventHolder->trackingKey);
return Never();
} else
} else {
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(),
self->dbgid)
.detail("StatusCode", RecoveryStatus::recruiting_transaction_servers)
@ -945,6 +964,12 @@ ACTOR Future<std::vector<Standalone<CommitTransactionRef>>> recruitEverything(
.detail("RequiredResolvers", 1)
.trackLatest(self->clusterRecoveryStateEventHolder->trackingKey);
// The cluster's EncryptionAtRest status is now readable.
if (self->controllerData->encryptionAtRestMode.canBeSet()) {
self->controllerData->encryptionAtRestMode.send(getEncryptionAtRest());
}
}
// FIXME: we only need log routers for the same locality as the master
int maxLogRouters = self->cstate.prevDBState.logRouterTags;
for (auto& old : self->cstate.prevDBState.oldTLogData) {
@ -1443,6 +1468,12 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
wait(self->cstate.read());
// Unless the cluster database is 'empty', the cluster's EncryptionAtRest status is readable once cstate is
// recovered
if (!self->cstate.myDBState.tLogs.empty() && self->controllerData->encryptionAtRestMode.canBeSet()) {
self->controllerData->encryptionAtRestMode.send(self->cstate.myDBState.encryptionAtRestMode);
}
if (self->cstate.prevDBState.lowestCompatibleProtocolVersion > currentProtocolVersion()) {
TraceEvent(SevWarnAlways, "IncompatibleProtocolVersion", self->dbgid).log();
throw internal_error();

View File

@ -20,6 +20,7 @@
#include <algorithm>
#include <tuple>
#include <variant>
#include "fdbclient/Atomic.h"
#include "fdbclient/BackupAgent.actor.h"
@ -68,6 +69,8 @@
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
using WriteMutationRefVar = std::variant<MutationRef, VectorRef<MutationRef>>;
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) {
state ReplyPromise<Void> reply = req.reply;
resetReply(req);
@ -1256,16 +1259,78 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
return Void();
}
ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
ACTOR Future<WriteMutationRefVar> writeMutationEncryptedMutation(CommitBatchContext* self,
int64_t tenantId,
const MutationRef* mutation,
Optional<MutationRef>* encryptedMutationOpt,
Arena* arena) {
state MutationRef encryptedMutation = encryptedMutationOpt->get();
state const BlobCipherEncryptHeader* header;
static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID);
ASSERT(self->pProxyCommitData->isEncryptionEnabled);
ASSERT(g_network && g_network->isSimulated());
ASSERT(encryptedMutation.isEncrypted());
Reference<AsyncVar<ServerDBInfo> const> dbInfo = self->pProxyCommitData->db;
header = encryptedMutation.encryptionHeader();
TextAndHeaderCipherKeys cipherKeys = wait(getEncryptCipherKeys(dbInfo, *header, BlobCipherMetrics::TLOG));
MutationRef decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG);
ASSERT(decryptedMutation.param1 == mutation->param1 && decryptedMutation.param2 == mutation->param2 &&
decryptedMutation.type == mutation->type);
CODE_PROBE(true, "encrypting non-metadata mutations");
self->toCommit.writeTypedMessage(encryptedMutation);
return encryptedMutation;
}
ACTOR Future<WriteMutationRefVar> writeMutationFetchEncryptKey(CommitBatchContext* self,
int64_t tenantId,
const MutationRef* mutation,
Arena* arena) {
state EncryptCipherDomainId domainId = tenantId;
state MutationRef encryptedMutation;
static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID);
ASSERT(self->pProxyCommitData->isEncryptionEnabled);
ASSERT_NE((MutationRef::Type)mutation->type, MutationRef::Type::ClearRange);
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, *mutation);
domainId = p.second;
Reference<BlobCipherKey> cipherKey =
wait(getLatestEncryptCipherKey(self->pProxyCommitData->db, domainId, p.first, BlobCipherMetrics::TLOG));
self->cipherKeys[domainId] = cipherKey;
CODE_PROBE(true, "Raw access mutation encryption");
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG);
self->toCommit.writeTypedMessage(encryptedMutation);
return encryptedMutation;
}
Future<WriteMutationRefVar> writeMutation(CommitBatchContext* self,
int64_t tenantId,
const MutationRef* mutation,
Optional<MutationRef>* encryptedMutationOpt,
Arena* arena) {
static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID);
// WriteMutation routine is responsible for appending mutations to be persisted in TLog, the operation
// isn't a 'blocking' operation, except for few cases when Encryption is supported by the cluster such
// as:
// 1. Fetch encryption keys to encrypt the mutation.
// 2. Split ClearRange mutation to respect Encryption domain boundaries.
// 3. Ensure sanity of already encrypted mutation - simulation limited check.
//
// Approach optimizes "fast" path by avoiding alloc/dealloc overhead due to be ACTOR framework support,
// the penalty happens iff any of above conditions are met. Otherwise, corresponding handle routine (ACTOR
// compliant) gets invoked ("slow path").
if (self->pProxyCommitData->isEncryptionEnabled) {
state EncryptCipherDomainId domainId = tenantId;
state MutationRef encryptedMutation;
EncryptCipherDomainId domainId = tenantId;
MutationRef encryptedMutation;
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::DISABLED,
"using disabled tenant mode");
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::OPTIONAL_TENANT,
@ -1279,13 +1344,7 @@ ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
ASSERT(encryptedMutation.isEncrypted());
// During simulation check whether the encrypted mutation matches the decrpyted mutation
if (g_network && g_network->isSimulated()) {
Reference<AsyncVar<ServerDBInfo> const> dbInfo = self->pProxyCommitData->db;
state const BlobCipherEncryptHeader* header = encryptedMutation.encryptionHeader();
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(dbInfo, *header, BlobCipherMetrics::TLOG));
MutationRef decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG);
ASSERT(decryptedMutation.param1 == mutation->param1 && decryptedMutation.param2 == mutation->param2 &&
decryptedMutation.type == mutation->type);
return writeMutationEncryptedMutation(self, tenantId, mutation, encryptedMutationOpt, arena);
}
} else {
if (domainId == INVALID_ENCRYPT_DOMAIN_ID) {
@ -1294,9 +1353,7 @@ ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
domainId = p.second;
if (self->cipherKeys.find(domainId) == self->cipherKeys.end()) {
Reference<BlobCipherKey> cipherKey = wait(getLatestEncryptCipherKey(
self->pProxyCommitData->db, domainId, p.first, BlobCipherMetrics::TLOG));
self->cipherKeys[domainId] = cipherKey;
return writeMutationFetchEncryptKey(self, tenantId, mutation, arena);
}
CODE_PROBE(true, "Raw access mutation encryption");
@ -1308,10 +1365,10 @@ ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
ASSERT(encryptedMutation.isEncrypted());
CODE_PROBE(true, "encrypting non-metadata mutations");
self->toCommit.writeTypedMessage(encryptedMutation);
return encryptedMutation;
return std::variant<MutationRef, VectorRef<MutationRef>>{ encryptedMutation };
} else {
self->toCommit.writeTypedMessage(*mutation);
return *mutation;
return std::variant<MutationRef, VectorRef<MutationRef>>{ *mutation };
}
}
@ -1399,8 +1456,10 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (encryptedMutation.present()) {
ASSERT(encryptedMutation.get().isEncrypted());
}
MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
writtenMutation = tempMutation;
WriteMutationRefVar var = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
// FIXME: Remove assert once ClearRange RAW_ACCESS usecase handling is done
ASSERT(std::holds_alternative<MutationRef>(var));
writtenMutation = std::get<MutationRef>(var);
} else if (m.type == MutationRef::ClearRange) {
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange);
@ -1453,8 +1512,10 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->needsCacheTag(clearRange)) {
self->toCommit.addTag(cacheTag);
}
MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
writtenMutation = tempMutation;
WriteMutationRefVar var = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
// FIXME: Remove assert once ClearRange RAW_ACCESS usecase handling is done
ASSERT(std::holds_alternative<MutationRef>(var));
writtenMutation = std::get<MutationRef>(var);
} else {
UNREACHABLE();
}
@ -1505,8 +1566,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
MutationRef backupMutation(
MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
// TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must be
// changed to account for clear ranges which span tenant boundaries
// TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must
// be changed to account for clear ranges which span tenant boundaries
if (self->pProxyCommitData->isEncryptionEnabled) {
CODE_PROBE(true, "encrypting clear range backup mutation");
if (backupMutation.param1 == m.param1 && backupMutation.param2 == m.param2 &&
@ -1627,9 +1688,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
CODE_PROBE(true, "encrypting idempotency mutation");
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, idempotencyIdSet);
Arena arena;
MutationRef encryptedMutation = idempotencyIdSet.encrypt(
self->cipherKeys, p.second, arena, BlobCipherMetrics::TLOG);
self->cipherKeys, p.second, self->arena, BlobCipherMetrics::TLOG);
self->toCommit.writeTypedMessage(encryptedMutation);
} else {
self->toCommit.writeTypedMessage(idempotencyIdSet);
@ -1637,11 +1697,13 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
});
state int i = 0;
for (i = 0; i < pProxyCommitData->idempotencyClears.size(); i++) {
MutationRef& m = pProxyCommitData->idempotencyClears[i];
auto& tags = pProxyCommitData->tagsForKey(m.param1);
auto& tags = pProxyCommitData->tagsForKey(pProxyCommitData->idempotencyClears[i].param1);
self->toCommit.addTags(tags);
Arena arena;
wait(success(writeMutation(self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &m, nullptr, &arena)));
// We already have an arena with an appropriate lifetime handy
Arena& arena = pProxyCommitData->idempotencyClears.arena();
WriteMutationRefVar var = wait(writeMutation(
self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &pProxyCommitData->idempotencyClears[i], nullptr, &arena));
ASSERT(std::holds_alternative<MutationRef>(var));
}
pProxyCommitData->idempotencyClears = Standalone<VectorRef<MutationRef>>();

View File

@ -129,6 +129,7 @@ ACTOR Future<bool> getKeyServers(
// one needs to be reachable
if (performQuiescentChecks && !shards.present()) {
TraceEvent("ConsistencyCheck_CommitProxyUnavailable")
.error(shards.getError())
.detail("CommitProxyID", commitProxyInfo->getId(i));
testFailure("Commit proxy unavailable", performQuiescentChecks, true);
return false;

View File

@ -1066,7 +1066,7 @@ private:
};
uint64_t seq; // seq is the index of the virtually infinite disk queue file. Its unit is bytes.
uint64_t popped;
int payloadSize;
int32_t payloadSize;
};
// The on disk format depends on the size of PageHeader.
static_assert(sizeof(PageHeader) == 36, "PageHeader must be 36 bytes");

View File

@ -102,7 +102,7 @@ void GrvProxyTagThrottler::addRequest(GetReadVersionRequest const& req) {
// SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES is enabled, there may be
// unexpected behaviour, because only one tag is used for throttling.
TraceEvent(SevWarnAlways, "GrvProxyTagThrottler_MultipleTags")
.suppressFor(1.0)
.suppressFor(60.0)
.detail("NumTags", req.tags.size())
.detail("UsingTag", printable(tag));
}

View File

@ -397,13 +397,23 @@ struct Counters {
};
struct ReadIterator {
CF& cf;
uint64_t index; // incrementing counter to uniquely identify read iterator.
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
double creationTime;
KeyRange keyRange;
std::shared_ptr<rocksdb::Slice> beginSlice, endSlice;
ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions& options)
: cf(cf), index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions options, KeyRange keyRange)
: index(index), inUse(true), creationTime(now()), keyRange(keyRange) {
beginSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.begin)));
options.iterate_lower_bound = beginSlice.get();
endSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.end)));
options.iterate_upper_bound = endSlice.get();
iter = std::shared_ptr<rocksdb::Iterator>(db->NewIterator(options, cf));
}
};
/*
@ -426,42 +436,84 @@ public:
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
TraceEvent("ReadIteratorPool", id)
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS)
.detail("KnobRocksDBReadRangeReuseBoundedIterators",
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS)
.detail("KnobRocksDBReadRangeBoundedIteratorsMaxLimit",
SERVER_KNOBS->ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT)
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN);
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS &&
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
TraceEvent(SevWarn, "ReadIteratorKnobsMismatch");
}
}
// Called on every db commit.
void update() {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS ||
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
iteratorsMap.clear();
}
}
// Called on every read operation.
ReadIterator getIterator() {
ReadIterator getIterator(KeyRange keyRange) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
mutex.lock();
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
if (!it->second.inUse) {
it->second.inUse = true;
iteratorsReuseCount++;
return it->second;
ReadIterator iter = it->second;
mutex.unlock();
return iter;
}
}
index++;
ReadIterator iter(cf, index, db, readRangeOptions);
iteratorsMap.insert({ index, iter });
uint64_t readIteratorIndex = index;
mutex.unlock();
ReadIterator iter(cf, readIteratorIndex, db, readRangeOptions);
mutex.lock();
iteratorsMap.insert({ readIteratorIndex, iter });
mutex.unlock();
return iter;
} else if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
// TODO: Based on the datasize in the keyrange, decide whether to store the iterator for reuse.
mutex.lock();
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
if (!it->second.inUse && it->second.keyRange.contains(keyRange)) {
it->second.inUse = true;
iteratorsReuseCount++;
ReadIterator iter = it->second;
mutex.unlock();
return iter;
}
}
index++;
uint64_t readIteratorIndex = index;
mutex.unlock();
ReadIterator iter(cf, readIteratorIndex, db, readRangeOptions, keyRange);
if (iteratorsMap.size() < SERVER_KNOBS->ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT) {
// Not storing more than ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT of iterators
// to avoid 'out of memory' issues.
mutex.lock();
iteratorsMap.insert({ readIteratorIndex, iter });
mutex.unlock();
}
return iter;
} else {
index++;
ReadIterator iter(cf, index, db, readRangeOptions);
ReadIterator iter(cf, index, db, readRangeOptions, keyRange);
return iter;
}
}
// Called on every read operation, after the keys are collected.
void returnIterator(ReadIterator& iter) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS ||
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
it = iteratorsMap.find(iter.index);
// iterator found: put the iterator back to the pool(inUse=false).
@ -768,7 +820,7 @@ uint64_t PerfContextMetrics::getRocksdbPerfcontextMetric(int metric) {
}
ACTOR Future<Void> refreshReadIteratorPool(std::shared_ptr<ReadIteratorPool> readIterPool) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS || SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME));
readIterPool->refreshIterators();
@ -1559,7 +1611,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
rocksdb::Status s;
if (a.rowLimit >= 0) {
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
ReadIterator readIter = readIterPool->getIterator();
ReadIterator readIter = readIterPool->getIterator(a.keys);
if (a.getHistograms) {
metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(),
timer_monotonic() - iterCreationBeginTime));
@ -1588,7 +1640,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
readIterPool->returnIterator(readIter);
} else {
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
ReadIterator readIter = readIterPool->getIterator();
ReadIterator readIter = readIterPool->getIterator(a.keys);
if (a.getHistograms) {
metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(),
timer_monotonic() - iterCreationBeginTime));

View File

@ -155,7 +155,7 @@ struct ShardedRocksDBState {
std::shared_ptr<rocksdb::Cache> rocksdb_block_cache = nullptr;
rocksdb::Slice toSlice(StringRef s) {
const rocksdb::Slice toSlice(StringRef s) {
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
}
@ -309,8 +309,20 @@ struct ReadIterator {
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
double creationTime;
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, rocksdb::ReadOptions& options)
KeyRange keyRange;
std::shared_ptr<rocksdb::Slice> beginSlice, endSlice;
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const rocksdb::ReadOptions& options)
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const KeyRange& range)
: index(index), inUse(true), creationTime(now()), keyRange(range) {
auto options = getReadOptions();
beginSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.begin)));
options.iterate_lower_bound = beginSlice.get();
endSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.end)));
options.iterate_upper_bound = endSlice.get();
iter = std::shared_ptr<rocksdb::Iterator>(db->NewIterator(options, cf));
}
};
/*
@ -348,7 +360,8 @@ public:
}
// Called on every read operation.
ReadIterator getIterator() {
ReadIterator getIterator(const KeyRange& range) {
// Shared iterators are not bounded.
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
@ -364,7 +377,7 @@ public:
return iter;
} else {
index++;
ReadIterator iter(cf, index, db, readRangeOptions);
ReadIterator iter(cf, index, db, range);
return iter;
}
}
@ -511,7 +524,7 @@ struct PhysicalShard {
double deleteTimeSec;
};
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) {
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, int byteLimit, RangeResult* result) {
if (rowLimit == 0 || byteLimit == 0) {
return 0;
}
@ -523,7 +536,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit,
// When using a prefix extractor, ensure that keys are returned in order even if they cross
// a prefix boundary.
if (rowLimit >= 0) {
ReadIterator readIter = shard->readIterPool->getIterator();
ReadIterator readIter = shard->readIterPool->getIterator(range);
auto cursor = readIter.iter;
cursor->Seek(toSlice(range.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < range.end) {
@ -540,7 +553,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit,
s = cursor->status();
shard->readIterPool->returnIterator(readIter);
} else {
ReadIterator readIter = shard->readIterPool->getIterator();
ReadIterator readIter = shard->readIterPool->getIterator(range);
auto cursor = readIter.iter;
cursor->SeekForPrev(toSlice(range.end));
if (cursor->Valid() && toStringRef(cursor->key()) == range.end) {
@ -2150,10 +2163,16 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()),
getHistograms(
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) {
std::set<PhysicalShard*> usedShards;
for (const DataShard* shard : shards) {
if (shard != nullptr) {
ASSERT(shard);
shardRanges.emplace_back(shard->physicalShard, keys & shard->range);
usedShards.insert(shard->physicalShard);
}
if (usedShards.size() != shards.size()) {
TraceEvent("ReadRangeMetrics")
.detail("NumPhysicalShards", usedShards.size())
.detail("NumDataShards", shards.size());
}
}
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }

View File

@ -514,8 +514,13 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
}
state double startTime = now();
state Version poppedVer;
state Version endVersion;
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
Version poppedVer = poppedVersion(self, reqTag);
poppedVer = poppedVersion(self, reqTag);
if (poppedVer > reqBegin || reqBegin < self->startVersion) {
// This should only happen if a packet is sent multiple times and the reply is not needed.
@ -539,10 +544,8 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
return Void();
}
state Version endVersion;
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
ASSERT(reqBegin >= poppedVersion(self, reqTag) && reqBegin >= self->startVersion);
endVersion = self->version.get() + 1;
peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion);

View File

@ -656,7 +656,7 @@ ACTOR Future<int64_t> getVersionOffset(Database cx,
ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
if (g_network->isSimulated() && g_simulator->usableRegions > 1) {
if (g_network->isSimulated() && g_simulator->usableRegions > 1 && !g_simulator->quiesced) {
bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId);
bool remoteDead = g_simulator->datacenterDead(g_simulator->remoteDcId);

View File

@ -307,7 +307,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
// map from machine networkAddress to datacenter ID
std::map<NetworkAddress, std::string> dcIds;
std::map<NetworkAddress, LocalityData> locality;
std::map<std::string, bool> notExcludedMap;
std::map<std::string, bool> excludedMap;
std::map<std::string, int32_t> workerContribMap;
std::map<std::string, JsonBuilderObject> machineJsonMap;
@ -377,7 +377,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
statusObj["network"] = networkObj;
if (configuration.present()) {
notExcludedMap[machineId] =
excludedMap[machineId] =
true; // Will be set to false below if this or any later process is not excluded
}
@ -385,18 +385,21 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
machineJsonMap[machineId] = statusObj;
}
// FIXME: this will not catch if the secondary address of the process was excluded
NetworkAddressList tempList;
tempList.address = it->first;
bool excludedServer = false;
bool excludedLocality = false;
if (configuration.present() && configuration.get().isExcludedServer(tempList))
excludedServer = true;
bool excludedServer = true;
bool excludedLocality = true;
if (configuration.present() && !configuration.get().isExcludedServer(tempList))
excludedServer = false;
if (locality.count(it->first) && configuration.present() &&
configuration.get().isMachineExcluded(locality[it->first]))
excludedLocality = true;
!configuration.get().isMachineExcluded(locality[it->first]))
excludedLocality = false;
notExcludedMap[machineId] = excludedServer || excludedLocality;
// If any server is not excluded, set the overall exclusion status
// of the machine to false.
if (!excludedServer && !excludedLocality) {
excludedMap[machineId] = false;
}
workerContribMap[machineId]++;
} catch (Error&) {
++failed;
@ -407,7 +410,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
for (auto& mapPair : machineJsonMap) {
auto& machineId = mapPair.first;
auto& jsonItem = machineJsonMap[machineId];
jsonItem["excluded"] = notExcludedMap[machineId];
jsonItem["excluded"] = excludedMap[machineId];
jsonItem["contributing_workers"] = workerContribMap[machineId];
machineMap[machineId] = jsonItem;
}
@ -781,6 +784,9 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
// Map the address of the worker to the error message object
tracefileOpenErrorMap[traceFileErrorsItr->first.toString()] = msgObj;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
incomplete_reasons->insert("file_open_error details could not be retrieved");
}
}
@ -1095,6 +1101,9 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
// Something strange occurred, process list is incomplete but what was built so far, if anything, will be
// returned.
incomplete_reasons->insert("Cannot retrieve all process status information.");
@ -1410,6 +1419,9 @@ ACTOR static Future<JsonBuilderObject> latencyProbeFetcher(Database cx,
wait(waitForAll(probes));
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
incomplete_reasons->insert(format("Unable to retrieve latency probe information (%s).", e.what()));
}
@ -1449,6 +1461,9 @@ ACTOR static Future<Void> consistencyCheckStatusFetcher(Database cx,
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
incomplete_reasons->insert(format("Unable to retrieve consistency check settings (%s).", e.what()));
}
return Void();
@ -1540,6 +1555,9 @@ ACTOR static Future<Void> logRangeWarningFetcher(Database cx,
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
incomplete_reasons->insert(format("Unable to retrieve log ranges (%s).", e.what()));
}
return Void();
@ -1713,7 +1731,10 @@ static JsonBuilderObject configurationFetcher(Optional<DatabaseConfiguration> co
}
int count = coordinators.clientLeaderServers.size();
statusObj["coordinators_count"] = count;
} catch (Error&) {
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
incomplete_reasons->insert("Could not retrieve all configuration status information.");
}
return statusObj;
@ -2735,6 +2756,9 @@ ACTOR Future<JsonBuilderObject> layerStatusFetcher(Database cx,
}
} catch (Error& e) {
TraceEvent(SevWarn, "LayerStatusError").error(e);
if (e.code() == error_code_actor_cancelled) {
throw;
}
incomplete_reasons->insert(format("Unable to retrieve layer status (%s).", e.what()));
json.create("_error") = format("Unable to retrieve layer status (%s).", e.what());
json.create("_valid") = false;

View File

@ -45,6 +45,8 @@
#include "fdbserver/FDBExecHelper.actor.h"
#include "flow/Histogram.h"
#include "flow/DebugTrace.h"
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct TLogQueueEntryRef {
@ -216,6 +218,8 @@ static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr);
static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr);
static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr);
static const KeyRef persistEncryptionAtRestModeKey = "encryptionAtRestMode"_sr;
static Key persistTagMessagesKey(UID id, Tag tag, Version version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(persistTagMessagesKeys.begin);
@ -306,6 +310,8 @@ struct TLogData : NonCopyable {
UID dbgid;
UID workerID;
Optional<EncryptionAtRestMode> encryptionAtRestMode;
IKeyValueStore* persistentData; // Durable data on disk that were spilled.
IDiskQueue* rawPersistentQueue; // The physical queue the persistentQueue below stores its data. Ideally, log
// interface should work without directly accessing rawPersistentQueue
@ -1796,8 +1802,14 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
}
state double workStart = now();
state Version poppedVer;
state Version endVersion;
state bool onlySpilled;
state Version poppedVer = poppedVersion(logData, reqTag);
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
poppedVer = poppedVersion(logData, reqTag);
auto tagData = logData->getTagData(reqTag);
bool tagRecovered = tagData && !tagData->unpoppedRecovered;
@ -1859,12 +1871,8 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
return Void();
}
state Version endVersion;
state bool onlySpilled;
ASSERT(reqBegin >= poppedVersion(logData, reqTag));
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
endVersion = logData->version.get() + 1;
onlySpilled = false;
@ -2391,6 +2399,33 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
return Void();
}
ACTOR Future<EncryptionAtRestMode> getEncryptionAtRestMode(TLogData* self) {
loop {
state GetEncryptionAtRestModeRequest req(self->dbgid);
try {
choose {
when(wait(self->dbInfo->onChange())) {}
when(GetEncryptionAtRestModeResponse resp = wait(brokenPromiseToNever(
self->dbInfo->get().clusterInterface.getEncryptionAtRestMode.getReply(req)))) {
TraceEvent("GetEncryptionAtRestMode", self->dbgid).detail("Mode", resp.mode);
// TODO: TLOG_ENCTYPTION KNOB shall be removed and db-config check should be sufficient to
// determine tlog (and cluster) encryption status
if ((EncryptionAtRestMode::Mode)resp.mode != EncryptionAtRestMode::Mode::DISABLED &&
SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
return EncryptionAtRestMode((EncryptionAtRestMode::Mode)resp.mode);
} else {
return EncryptionAtRestMode();
}
}
}
} catch (Error& e) {
TraceEvent("GetEncryptionAtRestError", self->dbgid).error(e);
throw;
}
}
}
// send stopped promise instead of LogData* to avoid reference cycles
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
@ -2579,6 +2614,32 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
return Void();
}
ACTOR Future<Void> checkUpdateEncryptionAtRestMode(TLogData* self) {
EncryptionAtRestMode encryptionAtRestMode = wait(getEncryptionAtRestMode(self));
if (self->encryptionAtRestMode.present()) {
// Ensure the TLog encryptionAtRestMode status matches with the cluster config, if not, kill the TLog process.
// Approach prevents a fake TLog process joining the cluster.
if (self->encryptionAtRestMode.get() != encryptionAtRestMode) {
TraceEvent("EncryptionAtRestMismatch", self->dbgid)
.detail("Expected", encryptionAtRestMode.toString())
.detail("Present", self->encryptionAtRestMode.get().toString());
ASSERT(false);
}
} else {
self->encryptionAtRestMode = Optional<EncryptionAtRestMode>(encryptionAtRestMode);
wait(self->persistentDataCommitLock.take());
state FlowLock::Releaser commitLockReleaser(self->persistentDataCommitLock);
self->persistentData->set(
KeyValueRef(persistEncryptionAtRestModeKey, self->encryptionAtRestMode.get().toValue()));
wait(self->persistentData->commit());
TraceEvent("PersistEncryptionAtRestMode", self->dbgid)
.detail("Mode", self->encryptionAtRestMode.get().toString());
}
return Void();
}
ACTOR Future<Void> serveTLogInterface(TLogData* self,
TLogInterface tli,
Reference<LogData> logData,
@ -2966,6 +3027,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
state IKeyValueStore* storage = self->persistentData;
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
state Future<Optional<Value>> fEncryptionAtRestMode = storage->readValue(persistEncryptionAtRestModeKey);
state Future<RangeResult> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<RangeResult> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<RangeResult> fLocality = storage->readRange(persistLocalityKeys);
@ -2977,7 +3039,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
// FIXME: metadata in queue?
wait(waitForAll(std::vector{ fFormat, fRecoveryLocation }));
wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fEncryptionAtRestMode }));
wait(waitForAll(std::vector{ fVers,
fKnownCommitted,
fLocality,
@ -2987,6 +3049,12 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
fProtocolVersions,
fTLogSpillTypes }));
if (fEncryptionAtRestMode.get().present()) {
self->encryptionAtRestMode =
Optional<EncryptionAtRestMode>(EncryptionAtRestMode::fromValue(fEncryptionAtRestMode.get()));
TraceEvent("PersistEncryptionAtRestModeRead").detail("Mode", self->encryptionAtRestMode.get().toString());
}
if (fFormat.get().present() && !persistFormatReadableRange.contains(fFormat.get().get())) {
// FIXME: remove when we no longer need to test upgrades from 4.X releases
if (g_network->isSimulated()) {
@ -3537,11 +3605,13 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
// Disk errors need a chance to kill this actor.
wait(delay(0.000001));
if (recovered.canBeSet())
if (recovered.canBeSet()) {
recovered.send(Void());
}
self.sharedActors.send(commitQueue(&self));
self.sharedActors.send(updateStorageLoop(&self));
self.sharedActors.send(checkUpdateEncryptionAtRestMode(&self));
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
state Future<Void> activeSharedChange = Void();

View File

@ -1449,8 +1449,7 @@ void TagPartitionedLogSystem::pop(Version upTo, Tag tag, Version durableKnownCom
}
if (prev == 0) {
// pop tag from log upto version defined in outstandingPops[].first
popActors.add(
popFromLog(this, log, tag, /*delayBeforePop*/ 1.0, /*popLogRouter=*/false)); //< FIXME: knob
popActors.add(popFromLog(this, log, tag, SERVER_KNOBS->POP_FROM_LOG_DELAY, /*popLogRouter=*/false));
}
}
}

View File

@ -2149,7 +2149,7 @@ int main(int argc, char* argv[]) {
auto dataFolder = opts.dataFolder.size() ? opts.dataFolder : "simfdb";
std::vector<std::string> directories = platform::listDirectories(dataFolder);
const std::set<std::string> allowedDirectories = { ".", "..", "backups", "unittests" };
const std::set<std::string> allowedDirectories = { ".", "..", "backups", "unittests", "fdbblob" };
for (const auto& dir : directories) {
if (dir.size() != 32 && allowedDirectories.count(dir) == 0 && dir.find("snap") == std::string::npos) {

View File

@ -3400,6 +3400,12 @@ public:
excludedDegradedServers; // The degraded servers to be excluded when assigning workers to roles.
std::queue<double> recentHealthTriggeredRecoveryTime;
// Capture cluster's Encryption data at-rest mode; the status is set 'only' at the time of cluster creation.
// The promise gets set as part of cluster recovery process and is used by recovering encryption participant
// stateful processes (such as TLog) to ensure the stateful process on-disk encryption status matches with cluster's
// encryption status.
Promise<EncryptionAtRestMode> encryptionAtRestMode;
CounterCollection clusterControllerMetrics;
Counter openDatabaseRequests;

View File

@ -132,7 +132,7 @@ private:
try {
wait(self->cstate.setExclusive(
BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withDBCoreState()))));
BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withEncryptionAtRest()))));
} catch (Error& e) {
CODE_PROBE(true, "Master displaced during writeMasterState");
throw;

View File

@ -28,6 +28,7 @@
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/MasterInterface.h"
#include "flow/ObjectSerializerTraits.h"
class LogSet;
struct OldLogData;
@ -143,7 +144,7 @@ struct DBCoreState {
std::set<int8_t> pseudoLocalities;
ProtocolVersion newestProtocolVersion;
ProtocolVersion lowestCompatibleProtocolVersion;
EncryptionAtRestMode encryptionAtRestMode;
EncryptionAtRestMode encryptionAtRestMode; // cluster encryption data at-rest mode
DBCoreState()
: logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty),

View File

@ -44,6 +44,8 @@ const StringRef TLOG_MSGS_PTREE_UPDATES_LATENCY_HISTOGRAM = "TLogMsgsPTreeUpdate
const StringRef STORAGE_UPDATES_DURABLE_LATENCY_HISTOGRAM = "StorageUpdatesDurableLatency"_sr;
const StringRef STORAGE_COMMIT_LATENCY_HISTOGRAM = "StorageCommitLatency"_sr;
const StringRef SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM = "SSDurableVersionUpdateLatency"_sr;
const StringRef SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM = "SSReadRangeBytesReturned"_sr;
const StringRef SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM = "SSReadRangeBytesLimit"_sr;
struct StorageMetricSample {
IndexedSet<Key, int64_t> sample;

View File

@ -175,6 +175,7 @@ struct ClusterControllerFullInterface {
tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new controller
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
RequestStream<struct ChangeCoordinatorsRequest> changeCoordinators;
RequestStream<struct GetEncryptionAtRestModeRequest> getEncryptionAtRestMode;
UID id() const { return clientInterface.id(); }
bool operator==(ClusterControllerFullInterface const& r) const { return id() == r.id(); }
@ -189,7 +190,7 @@ struct ClusterControllerFullInterface {
getWorkers.getFuture().isReady() || registerMaster.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady() || updateWorkerHealth.getFuture().isReady() ||
tlogRejoin.getFuture().isReady() || notifyBackupWorkerDone.getFuture().isReady() ||
changeCoordinators.getFuture().isReady();
changeCoordinators.getFuture().isReady() || getEncryptionAtRestMode.getFuture().isReady();
}
void initEndpoints() {
@ -206,6 +207,7 @@ struct ClusterControllerFullInterface {
tlogRejoin.getEndpoint(TaskPriority::MasterTLogRejoin);
notifyBackupWorkerDone.getEndpoint(TaskPriority::ClusterController);
changeCoordinators.getEndpoint(TaskPriority::DefaultEndpoint);
getEncryptionAtRestMode.getEndpoint(TaskPriority::ClusterController);
}
template <class Ar>
@ -226,7 +228,8 @@ struct ClusterControllerFullInterface {
updateWorkerHealth,
tlogRejoin,
notifyBackupWorkerDone,
changeCoordinators);
changeCoordinators,
getEncryptionAtRestMode);
}
};
@ -572,6 +575,33 @@ struct BackupWorkerDoneRequest {
}
};
struct GetEncryptionAtRestModeResponse {
constexpr static FileIdentifier file_identifier = 2932156;
uint32_t mode;
GetEncryptionAtRestModeResponse() : mode(EncryptionAtRestMode::Mode::DISABLED) {}
GetEncryptionAtRestModeResponse(uint32_t m) : mode(m) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mode);
}
};
struct GetEncryptionAtRestModeRequest {
constexpr static FileIdentifier file_identifier = 2670826;
UID tlogId;
ReplyPromise<GetEncryptionAtRestModeResponse> reply;
GetEncryptionAtRestModeRequest() {}
GetEncryptionAtRestModeRequest(UID tId) : tlogId(tId) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tlogId, reply);
}
};
struct InitializeTLogRequest {
constexpr static FileIdentifier file_identifier = 15604392;
UID recruitmentID;

View File

@ -595,9 +595,12 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
auto toRemove = moveTriggers.modify(range);
for (auto triggerRange = toRemove.begin(); triggerRange != toRemove.end(); ++triggerRange) {
auto streamToRemove = triggerRange->value().find(streamUID);
ASSERT(streamToRemove != triggerRange->cvalue().end());
if (streamToRemove == triggerRange->cvalue().end()) {
ASSERT(destroyed);
} else {
triggerRange->value().erase(streamToRemove);
}
}
// TODO: may be more cleanup possible here
}
@ -732,6 +735,9 @@ public:
Reference<Histogram> storageUpdatesDurableLatencyHistogram;
Reference<Histogram> storageCommitLatencyHistogram;
Reference<Histogram> ssDurableVersionUpdateLatencyHistogram;
// Histograms of requests sent to KVS.
Reference<Histogram> readRangeBytesReturnedHistogram;
Reference<Histogram> readRangeBytesLimitHistogram;
// watch map operations
Reference<ServerWatchMetadata> getWatchMetadata(KeyRef key) const;
@ -1293,6 +1299,12 @@ public:
ssDurableVersionUpdateLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
readRangeBytesReturnedHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM,
Histogram::Unit::countLinear)),
readRangeBytesLimitHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM,
Histogram::Unit::countLinear)),
tag(invalidTag), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0),
storage(this, storage), shardChangeCounter(0), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
prevVersion(0), rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
@ -3457,6 +3469,8 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options));
data->counters.kvScanBytes += atStorageVersion.logicalSize();
data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize());
data->readRangeBytesLimitHistogram->sample(*pLimitBytes);
ASSERT(atStorageVersion.size() <= limit);
if (data->storageVersion() > version) {
@ -3552,6 +3566,8 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options));
data->counters.kvScanBytes += atStorageVersion.logicalSize();
data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize());
data->readRangeBytesLimitHistogram->sample(*pLimitBytes);
ASSERT(atStorageVersion.size() <= -limit);
if (data->storageVersion() > version)

View File

@ -131,6 +131,10 @@ struct ConsistencyCheckWorkload : TestWorkload {
try {
wait(timeoutError(quietDatabase(cx, self->dbInfo, "ConsistencyCheckStart", 0, 1e5, 0, 0),
self->quiescentWaitTimeout)); // FIXME: should be zero?
if (g_network->isSimulated()) {
g_simulator->quiesced = true;
TraceEvent("ConsistencyCheckQuiesced").detail("Quiesced", g_simulator->quiesced);
}
} catch (Error& e) {
TraceEvent("ConsistencyCheck_QuietDatabaseError").error(e);
self->testFailure("Unable to achieve a quiet database");
@ -201,6 +205,10 @@ struct ConsistencyCheckWorkload : TestWorkload {
when(wait(self->suspendConsistencyCheck.onChange())) {}
}
}
if (self->firstClient && g_network->isSimulated() && self->performQuiescentChecks) {
g_simulator->quiesced = false;
TraceEvent("ConsistencyCheckQuiescedEnd").detail("Quiesced", g_simulator->quiesced);
}
return Void();
}

View File

@ -665,23 +665,6 @@ struct TenantManagementWorkload : TestWorkload {
return Void();
}
// Returns GRV and eats GRV errors
ACTOR static Future<Version> getReadVersion(Reference<ReadYourWritesTransaction> tr) {
loop {
try {
Version version = wait(tr->getReadVersion());
return version;
} catch (Error& e) {
if (e.code() == error_code_grv_proxy_memory_limit_exceeded ||
e.code() == error_code_batch_transaction_throttled) {
wait(tr->onError(e));
} else {
throw;
}
}
}
}
ACTOR static Future<Void> deleteTenant(TenantManagementWorkload* self) {
state TenantName beginTenant = self->chooseTenantName(true);
state OperationType operationType = self->randomOperationType();
@ -772,7 +755,8 @@ struct TenantManagementWorkload : TestWorkload {
state bool retried = false;
loop {
try {
state Version beforeVersion = wait(self->getReadVersion(tr));
state Version beforeVersion =
wait(getLatestReadVersion(self, OperationType::MANAGEMENT_DATABASE));
Optional<Void> result =
wait(timeout(deleteTenantImpl(tr, beginTenant, endTenant, tenants, operationType, self),
deterministicRandom()->randomInt(1, 30)));
@ -780,8 +764,8 @@ struct TenantManagementWorkload : TestWorkload {
if (result.present()) {
if (anyExists) {
if (self->oldestDeletionVersion == 0 && !tenants.empty()) {
tr->reset();
Version afterVersion = wait(self->getReadVersion(tr));
Version afterVersion =
wait(self->getLatestReadVersion(self, OperationType::MANAGEMENT_DATABASE));
self->oldestDeletionVersion = afterVersion;
}
self->newestDeletionVersion = beforeVersion;

View File

@ -34,7 +34,7 @@ class TransactionCostWorkload : public TestWorkload {
return bw.toValue().withPrefix(prefix);
}
static Value getValue(uint32_t size) { return makeString(size); }
static Value getValue(uint32_t size) { return ValueRef(std::string(size, '\x00')); }
static UID getDebugID(uint64_t testNumber) { return UID(testNumber << 32, testNumber << 32); }

View File

@ -25,6 +25,7 @@
#include "flow/JsonTraceLogFormatter.h"
#include "flow/flow.h"
#include "flow/DeterministicRandom.h"
#include <exception>
#include <stdlib.h>
#include <stdarg.h>
#include <cctype>
@ -514,6 +515,7 @@ public:
void close() {
if (opened) {
try {
MutexHolder hold(mutex);
// Write remaining contents
@ -533,6 +535,9 @@ public:
f.getBlocking();
opened = false;
} catch (const std::exception& e) {
fprintf(stderr, "Error closing trace file: %s\n", e.what());
}
}
}

View File

@ -178,7 +178,7 @@ RUN yum -y install \
rm -rf /var/cache/yum
WORKDIR /tmp
RUN curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \
RUN NO_PROXY="" no_proxy="" curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \
echo "860c3d37a5979491895767e7332404d28dc0d7797c7673c33df30ca80e215a07 kubectl" > kubectl.txt && \
sha256sum --quiet -c kubectl.txt && \
mv kubectl /usr/local/bin/kubectl && \

View File

@ -53,7 +53,7 @@ RUN curl -Ls https://github.com/krallin/tini/releases/download/v0.19.0/tini-amd6
mv tini /usr/bin/ && \
rm -rf /tmp/*
RUN curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \
RUN NO_PROXY="" no_proxy="" curl -Ls https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl -o kubectl && \
echo "860c3d37a5979491895767e7332404d28dc0d7797c7673c33df30ca80e215a07 kubectl" > kubectl.txt && \
sha256sum --quiet -c kubectl.txt && \
mv kubectl /usr/local/bin/kubectl && \

View File

@ -239,7 +239,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml)
add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml)
add_fdb_test(TEST_FILES rare/Throttling.toml)
add_fdb_test(TEST_FILES rare/ThroughputQuota.toml)
add_fdb_test(TEST_FILES rare/ThroughputQuota.toml IGNORE)
add_fdb_test(TEST_FILES rare/TransactionCost.toml)
add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml)
add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml)

View File

@ -280,11 +280,13 @@ class UpgradeTest:
os.close(self.ctrl_pipe)
# Kill the tester process if it is still alive
def kill_tester_if_alive(self, workload_thread):
def kill_tester_if_alive(self, workload_thread, dump_stacks):
if not workload_thread.is_alive():
return
if self.tester_proc is not None:
try:
if dump_stacks:
os.system("pstack {}".format(self.tester_proc.pid))
print("Killing the tester process")
self.tester_proc.kill()
workload_thread.join(5)
@ -310,11 +312,11 @@ class UpgradeTest:
except Exception:
print("Upgrade test failed")
print(traceback.format_exc())
self.kill_tester_if_alive(workload_thread)
self.kill_tester_if_alive(workload_thread, False)
finally:
workload_thread.join(5)
reader_thread.join(5)
self.kill_tester_if_alive(workload_thread)
self.kill_tester_if_alive(workload_thread, True)
if test_retcode == 0:
test_retcode = self.tester_retcode
return test_retcode