Merge branch 'main' of github.com:sfc-gh-nwijetunga/foundationdb into nim/tenant-encryption-property

* 'main' of github.com:sfc-gh-nwijetunga/foundationdb: (42 commits)
  Get ShardedRocks ready for simulation test. (#7679)
  fixing specific unit test
  formatting
  addressing review comments
  Fix incorrect deserialization of FdbClientLogEvents::Event (#7707)
  Fix a crash bug during CC shutdown process (#7705)
  addressing review comments
  remove runAfter
  Add comments explaining the use of the TransactionState tenant() and hasTenant() functions
  Make sure resumeFromDataMoves() starts after resumeFromShards().
  Fix: during recovery, it was possible for tenant operations to briefly fail because the tenant mode is not known
  formatting
  Fixed granule purging bug and improved debugging for purging
  making purge failures fail test
  cleanup and polish
  Bug fix and cleanup
  First version of key-sorted delta files
  Added full granule read unit test
  Completed delta format unit test
  delta file test and delta generation
  ...
This commit is contained in:
Nim Wijetunga 2022-07-27 08:13:30 -07:00
commit 50391c35b1
38 changed files with 2746 additions and 596 deletions

View File

@ -194,7 +194,7 @@ class BaseInfo(object):
if protocol_version >= PROTOCOL_VERSION_6_3:
self.dc_id = bb.get_bytes_with_length()
if protocol_version >= PROTOCOL_VERSION_7_1:
if bb.get_bytes(1):
if bb.get_bool():
self.tenant = bb.get_bytes_with_length()
class GetVersionInfo(BaseInfo):

File diff suppressed because it is too large Load Diff

View File

@ -3234,13 +3234,26 @@ TenantInfo TransactionState::getTenantInfo() {
} else if (!t.present()) {
return TenantInfo();
} else if (cx->clientInfo->get().tenantMode == TenantMode::DISABLED && t.present()) {
// If we are running provisional proxies, we allow a tenant request to go through since we don't know the tenant
// mode. Such a transaction would not be allowed to commit without enabling provisional commits because either
// the commit proxies will be provisional or the read version will be too old.
if (!cx->clientInfo->get().grvProxies.empty() && !cx->clientInfo->get().grvProxies[0].provisional) {
throw tenants_disabled();
} else {
ASSERT(!useProvisionalProxies);
}
}
ASSERT(tenantId != TenantInfo::INVALID_TENANT);
return TenantInfo(t.get(), tenantId);
}
// Returns the tenant used in this transaction. If the tenant is unset and raw access isn't specified, then the default
// tenant from DatabaseContext is applied to this transaction (note: the default tenant is typically unset, but in
// simulation could be something different).
//
// This function should not be called in the transaction constructor or in the setOption function to allow a user the
// opportunity to set raw access.
Optional<TenantName> const& TransactionState::tenant() {
if (tenantSet) {
return tenant_;
@ -3253,6 +3266,9 @@ Optional<TenantName> const& TransactionState::tenant() {
}
}
// Returns true if the tenant has been set, but does not cause default tenant resolution. This is useful in setOption
// (where we do not want to call tenant()) if we want to enforce that an option not be set on a Tenant transaction (e.g.
// for raw access).
bool TransactionState::hasTenant() const {
return tenantSet && tenant_.present();
}
@ -6570,6 +6586,11 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
case FDBTransactionOptions::USE_PROVISIONAL_PROXIES:
validateOptionValueNotPresent(value);
if (trState->hasTenant()) {
Error e = invalid_option();
TraceEvent(SevWarn, "TenantTransactionUseProvisionalProxies").error(e).detail("Tenant", trState->tenant());
throw e;
}
trState->options.getReadVersionFlags |= GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES;
trState->useProvisionalProxies = UseProvisionalProxies::True;
break;
@ -9388,8 +9409,17 @@ Future<Void> DatabaseContext::getChangeFeedStream(Reference<ChangeFeedData> resu
Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range, replyBufferSize, canReadPopped);
}
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingChangeFeeds(
Database cx,
Version OverlappingChangeFeedsInfo::getFeedMetadataVersion(const KeyRangeRef& range) const {
Version v = invalidVersion;
for (auto& it : feedMetadataVersions) {
if (it.second > v && it.first.intersects(range)) {
v = it.second;
}
}
return v;
}
ACTOR Future<OverlappingChangeFeedsReply> singleLocationOverlappingChangeFeeds(Database cx,
Reference<LocationInfo> location,
KeyRangeRef range,
Version minVersion) {
@ -9404,14 +9434,14 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingC
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
return rep.rangeIds;
return rep;
}
bool compareChangeFeedResult(const OverlappingChangeFeedEntry& i, const OverlappingChangeFeedEntry& j) {
return i.rangeId < j.rangeId;
return i.feedId < j.feedId;
}
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db,
ACTOR Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db,
KeyRangeRef range,
Version minVersion) {
state Database cx(db);
@ -9439,19 +9469,33 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsA
throw all_alternatives_failed();
}
state std::vector<Future<std::vector<OverlappingChangeFeedEntry>>> allOverlappingRequests;
state std::vector<Future<OverlappingChangeFeedsReply>> allOverlappingRequests;
for (auto& it : locations) {
allOverlappingRequests.push_back(
singleLocationOverlappingChangeFeeds(cx, it.locations, it.range & range, minVersion));
}
wait(waitForAll(allOverlappingRequests));
std::vector<OverlappingChangeFeedEntry> result;
for (auto& it : allOverlappingRequests) {
result.insert(result.end(), it.get().begin(), it.get().end());
OverlappingChangeFeedsInfo result;
std::unordered_map<KeyRef, OverlappingChangeFeedEntry> latestFeedMetadata;
for (int i = 0; i < locations.size(); i++) {
result.arena.dependsOn(allOverlappingRequests[i].get().arena);
result.arena.dependsOn(locations[i].range.arena());
result.feedMetadataVersions.push_back(
{ locations[i].range, allOverlappingRequests[i].get().feedMetadataVersion });
for (auto& it : allOverlappingRequests[i].get().feeds) {
auto res = latestFeedMetadata.insert({ it.feedId, it });
if (!res.second) {
CODE_PROBE(true, "deduping fetched overlapping feed by higher metadata version");
if (res.first->second.feedMetadataVersion < it.feedMetadataVersion) {
res.first->second = it;
}
}
}
}
for (auto& it : latestFeedMetadata) {
result.feeds.push_back(result.arena, it.second);
}
std::sort(result.begin(), result.end(), compareChangeFeedResult);
result.resize(std::unique(result.begin(), result.end()) - result.begin());
return result;
} catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
@ -9464,8 +9508,7 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsA
}
}
Future<std::vector<OverlappingChangeFeedEntry>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range,
Version minVersion) {
Future<OverlappingChangeFeedsInfo> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range, Version minVersion) {
return getOverlappingChangeFeedsActor(Reference<DatabaseContext>::addRef(this), range, minVersion);
}
@ -9589,7 +9632,7 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
state bool loadedTenantPrefix = false;
// FIXME: implement force
if (!force) {
if (force) {
throw unsupported_operation();
}

View File

@ -701,8 +701,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( FETCH_BLOCK_BYTES, 2e6 );
init( FETCH_KEYS_PARALLELISM_BYTES, 4e6 ); if( randomize && BUGGIFY ) FETCH_KEYS_PARALLELISM_BYTES = 3e6;
init( FETCH_KEYS_PARALLELISM, 2 );
init( FETCH_KEYS_PARALLELISM_FULL, 10 );
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( FETCH_CHANGEFEED_PARALLELISM, 2 );
init( FETCH_CHANGEFEED_PARALLELISM, 4 );
init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 );
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
@ -907,11 +908,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// BlobGranuleVerify* simulation tests use "blobRangeKeys", BlobGranuleCorrectness* use "tenant", default in real clusters is "tenant"
init( BG_RANGE_SOURCE, "tenant" );
// BlobGranuleVerify* simulation tests use "knobs", BlobGranuleCorrectness* use "tenant", default in real clusters is "knobs"
bool buggifyMediumGranules = simulationMediumShards || (randomize && BUGGIFY);
init( BG_METADATA_SOURCE, "knobs" );
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards ) BG_SNAPSHOT_FILE_TARGET_BYTES = 100000; else if (simulationMediumShards || (randomize && BUGGIFY) ) BG_SNAPSHOT_FILE_TARGET_BYTES = 1000000;
init( BG_SNAPSHOT_FILE_TARGET_CHUNKS, 100 ); if ( randomize && BUGGIFY ) BG_SNAPSHOT_FILE_TARGET_CHUNKS = 1 << deterministicRandom()->randomInt(0, 8);
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards ) BG_SNAPSHOT_FILE_TARGET_BYTES = 100000; else if (buggifyMediumGranules) BG_SNAPSHOT_FILE_TARGET_BYTES = 1000000;
init( BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES, 64*1024 ); if ( randomize && BUGGIFY ) BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES = BG_SNAPSHOT_FILE_TARGET_BYTES / (1 << deterministicRandom()->randomInt(0, 8));
init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 );
init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 );
init( BG_DELTA_FILE_TARGET_CHUNK_BYTES, 64*1024 ); if ( randomize && BUGGIFY ) BG_DELTA_FILE_TARGET_CHUNK_BYTES = BG_DELTA_FILE_TARGET_BYTES / (1 << deterministicRandom()->randomInt(0, 7));
init( BG_MAX_SPLIT_FANOUT, 10 ); if( randomize && BUGGIFY ) BG_MAX_SPLIT_FANOUT = deterministicRandom()->randomInt(5, 15);
init( BG_MAX_MERGE_FANIN, 10 ); if( randomize && BUGGIFY ) BG_MAX_MERGE_FANIN = deterministicRandom()->randomInt(2, 15);
init( BG_HOT_SNAPSHOT_VERSIONS, 5000000 );

View File

@ -46,6 +46,7 @@ struct GranuleSnapshot : VectorRef<KeyValueRef> {
}
};
// Deltas in version order
struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
constexpr static FileIdentifier file_identifier = 8563013;

View File

@ -27,11 +27,15 @@
#include "flow/CompressionUtils.h"
Value serializeChunkedSnapshot(Standalone<GranuleSnapshot> snapshot,
int chunks,
int chunkSize,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx = Optional<BlobGranuleCipherKeysCtx>());
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx = {});
// FIXME: support sorted and chunked delta files
Value serializeChunkedDeltaFile(Standalone<GranuleDeltas> deltas,
const KeyRangeRef& fileRange,
int chunkSize,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx = {});
ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<BlobGranuleChunkRef>>& files,
const KeyRangeRef& keyRange,

View File

@ -207,6 +207,16 @@ struct KeyRangeLocationInfo {
: tenantEntry(tenantEntry), range(range), locations(locations) {}
};
struct OverlappingChangeFeedsInfo {
Arena arena;
VectorRef<OverlappingChangeFeedEntry> feeds;
// would prefer to use key range map but it complicates copy/move constructors
std::vector<std::pair<KeyRangeRef, Version>> feedMetadataVersions;
// for a feed that wasn't present, returns the metadata version it would have been fetched at.
Version getFeedMetadataVersion(const KeyRangeRef& feedRange) const;
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
public:
static DatabaseContext* allocateOnForeignThread() {
@ -361,7 +371,7 @@ public:
int replyBufferSize = -1,
bool canReadPopped = true);
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
Future<Key> purgeBlobGranules(KeyRange keyRange,

View File

@ -659,6 +659,7 @@ public:
int FETCH_BLOCK_BYTES;
int FETCH_KEYS_PARALLELISM_BYTES;
int FETCH_KEYS_PARALLELISM;
int FETCH_KEYS_PARALLELISM_FULL;
int FETCH_KEYS_LOWER_PRIORITY;
int FETCH_CHANGEFEED_PARALLELISM;
int SERVE_FETCH_CHECKPOINT_PARALLELISM;
@ -887,8 +888,9 @@ public:
std::string BG_METADATA_SOURCE;
int BG_SNAPSHOT_FILE_TARGET_BYTES;
int BG_SNAPSHOT_FILE_TARGET_CHUNKS;
int BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES;
int BG_DELTA_FILE_TARGET_BYTES;
int BG_DELTA_FILE_TARGET_CHUNK_BYTES;
int BG_DELTA_BYTES_BEFORE_COMPACT;
int BG_MAX_SPLIT_FANOUT;
int BG_MAX_MERGE_FANIN;

View File

@ -970,39 +970,51 @@ struct FetchCheckpointKeyValuesRequest {
};
struct OverlappingChangeFeedEntry {
Key rangeId;
KeyRange range;
KeyRef feedId;
KeyRangeRef range;
Version emptyVersion;
Version stopVersion;
Version feedMetadataVersion;
bool operator==(const OverlappingChangeFeedEntry& r) const {
return rangeId == r.rangeId && range == r.range && emptyVersion == r.emptyVersion &&
stopVersion == r.stopVersion;
return feedId == r.feedId && range == r.range && emptyVersion == r.emptyVersion &&
stopVersion == r.stopVersion && feedMetadataVersion == r.feedMetadataVersion;
}
OverlappingChangeFeedEntry() {}
OverlappingChangeFeedEntry(Key const& rangeId, KeyRange const& range, Version emptyVersion, Version stopVersion)
: rangeId(rangeId), range(range), emptyVersion(emptyVersion), stopVersion(stopVersion) {}
OverlappingChangeFeedEntry(KeyRef const& feedId,
KeyRangeRef const& range,
Version emptyVersion,
Version stopVersion,
Version feedMetadataVersion)
: feedId(feedId), range(range), emptyVersion(emptyVersion), stopVersion(stopVersion),
feedMetadataVersion(feedMetadataVersion) {}
OverlappingChangeFeedEntry(Arena& arena, const OverlappingChangeFeedEntry& rhs)
: feedId(arena, rhs.feedId), range(arena, rhs.range), emptyVersion(rhs.emptyVersion),
stopVersion(rhs.stopVersion), feedMetadataVersion(rhs.feedMetadataVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeId, range, emptyVersion, stopVersion);
serializer(ar, feedId, range, emptyVersion, stopVersion, feedMetadataVersion);
}
};
struct OverlappingChangeFeedsReply {
constexpr static FileIdentifier file_identifier = 11815134;
std::vector<OverlappingChangeFeedEntry> rangeIds;
VectorRef<OverlappingChangeFeedEntry> feeds;
bool cached;
Arena arena;
Version feedMetadataVersion;
OverlappingChangeFeedsReply() : cached(false) {}
explicit OverlappingChangeFeedsReply(std::vector<OverlappingChangeFeedEntry> const& rangeIds)
: rangeIds(rangeIds), cached(false) {}
OverlappingChangeFeedsReply() : cached(false), feedMetadataVersion(invalidVersion) {}
explicit OverlappingChangeFeedsReply(VectorRef<OverlappingChangeFeedEntry> const& feeds,
Version feedMetadataVersion)
: feeds(feeds), cached(false), feedMetadataVersion(feedMetadataVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeIds, arena);
serializer(ar, feeds, arena, feedMetadataVersion);
}
};

View File

@ -23,6 +23,7 @@
#include "flow/ProtocolVersion.h"
#include <algorithm>
#include <string>
#include <limits>
#pragma once
#include "flow/flow.h"
@ -469,6 +470,8 @@ public:
bool setDiffProtocol; // true if a process with a different protocol version has been started
bool allowStorageMigrationTypeChange = false;
double injectTargetedSSRestartTime = std::numeric_limits<double>::max();
double injectSSDelayTime = std::numeric_limits<double>::max();
flowGlobalType global(int id) const final { return getCurrentProcess()->global(id); };
void setGlobal(size_t id, flowGlobalType v) final { getCurrentProcess()->setGlobal(id, v); };

View File

@ -143,8 +143,15 @@ bool compareFDBAndBlob(RangeResult fdb,
}
}
printGranuleChunks(blob.second);
}
}
return correct;
}
void printGranuleChunks(const Standalone<VectorRef<BlobGranuleChunkRef>>& chunks) {
printf("Chunks:\n");
for (auto& chunk : blob.second) {
for (auto& chunk : chunks) {
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n",
@ -163,9 +170,6 @@ bool compareFDBAndBlob(RangeResult fdb,
}
printf("\n");
}
}
return correct;
}
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
// clear key range and check whether it is merged or not, repeatedly

View File

@ -52,6 +52,7 @@
*/
#define BM_DEBUG false
#define BM_PURGE_DEBUG false
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
Arena& ar,
@ -1649,7 +1650,9 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
state Key lockKey = blobGranuleLockKeyFor(parentRange);
state Future<Optional<Value>> oldLockFuture = tr->get(lockKey);
wait(updateChangeFeed(tr,
// This has to be
// TODO: fix this better! (privatize change feed key clear)
wait(updateChangeFeed(&tr->getTransaction(),
granuleIDToCFKey(parentGranuleIDs[parentIdx]),
ChangeFeedStatus::CHANGE_FEED_DESTROY,
parentRange));
@ -3168,8 +3171,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
Key historyKey,
Version purgeVersion,
KeyRange granuleRange) {
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: init\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Fully deleting granule {1}: init\n", self->epoch, granuleId.toString());
}
// if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially
@ -3195,8 +3198,11 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
filesToDelete.emplace_back(fname);
}
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Fully deleting granule {1}: deleting {2} files\n",
self->epoch,
granuleId.toString(),
filesToDelete.size());
for (auto filename : filesToDelete) {
fmt::print(" - {}\n", filename.c_str());
}
@ -3209,8 +3215,9 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
wait(waitForAll(deletions));
// delete metadata in FDB (history entry and file keys)
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: deleting history and file keys\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print(
"BM {0} Fully deleting granule {1}: deleting history and file keys\n", self->epoch, granuleId.toString());
}
state Transaction tr(self->db);
@ -3229,8 +3236,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
}
}
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: success\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Fully deleting granule {1}: success\n", self->epoch, granuleId.toString());
}
TraceEvent("GranuleFullPurge", self->id)
@ -3242,6 +3249,8 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
++self->stats.granulesFullyPurged;
self->stats.filesPurged += filesToDelete.size();
CODE_PROBE(true, "full granule purged");
return Void();
}
@ -3257,8 +3266,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
UID granuleId,
Version purgeVersion,
KeyRange granuleRange) {
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: init\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Partially deleting granule {1}: init\n", self->epoch, granuleId.toString());
}
state Reference<BlobConnectionProvider> bstore = wait(getBStoreForGranule(self, granuleRange));
@ -3307,8 +3316,11 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
filesToDelete.emplace_back(fname);
}
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Partially deleting granule {1}: deleting {2} files\n",
self->epoch,
granuleId.toString(),
filesToDelete.size());
for (auto filename : filesToDelete) {
fmt::print(" - {0}\n", filename);
}
@ -3325,8 +3337,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
wait(waitForAll(deletions));
// delete metadata in FDB (deleted file keys)
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: deleting file keys\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Partially deleting granule {1}: deleting file keys\n", self->epoch, granuleId.toString());
}
state Transaction tr(self->db);
@ -3345,8 +3357,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
}
}
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: success\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Partially deleting granule {1}: success\n", self->epoch, granuleId.toString());
}
TraceEvent("GranulePartialPurge", self->id)
.detail("Epoch", self->epoch)
@ -3357,6 +3369,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
++self->stats.granulesPartiallyPurged;
self->stats.filesPurged += filesToDelete.size();
CODE_PROBE(true, " partial granule purged");
return Void();
}
@ -3369,8 +3383,9 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
* processing this purge intent.
*/
ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range, Version purgeVersion, bool force) {
if (BM_DEBUG) {
fmt::print("purgeRange starting for range [{0} - {1}) @ purgeVersion={2}, force={3}\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} purgeRange starting for range [{1} - {2}) @ purgeVersion={3}, force={4}\n",
self->epoch,
range.begin.printable(),
range.end.printable(),
purgeVersion,
@ -3392,8 +3407,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
// track which granules we have already added to traversal
// note: (startKey, startVersion) uniquely identifies a granule
state std::unordered_set<std::pair<const uint8_t*, Version>, boost::hash<std::pair<const uint8_t*, Version>>>
visited;
state std::unordered_set<std::pair<std::string, Version>, boost::hash<std::pair<std::string, Version>>> visited;
// find all active granules (that comprise the range) and add to the queue
state KeyRangeMap<UID>::Ranges activeRanges = self->workerAssignments.intersectingRanges(range);
@ -3404,8 +3418,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
state KeyRangeMap<UID>::iterator activeRange;
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
if (BM_DEBUG) {
fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be purged\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Checking if active range [{1} - {2}), owned by BW {3}, should be purged\n",
self->epoch,
activeRange.begin().printable(),
activeRange.end().printable(),
activeRange.value().toString());
@ -3413,6 +3428,10 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
// assumption: purge boundaries must respect granule boundaries
if (activeRange.begin() < range.begin || activeRange.end() > range.end) {
TraceEvent(SevWarn, "GranulePurgeRangesUnaligned", self->id)
.detail("Epoch", self->epoch)
.detail("PurgeRange", range)
.detail("GranuleRange", activeRange.range());
continue;
}
@ -3422,20 +3441,29 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
loop {
try {
if (BM_DEBUG) {
fmt::print("Fetching latest history entry for range [{0} - {1})\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Fetching latest history entry for range [{1} - {2})\n",
self->epoch,
activeRange.begin().printable(),
activeRange.end().printable());
}
// FIXME: doing this serially will likely be too slow for large purges
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a
// get
if (history.present()) {
if (BM_DEBUG) {
printf("Adding range to history queue\n");
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Adding range to history queue: [{1} - {2}) @ {3} ({4})\n",
self->epoch,
activeRange.begin().printable(),
activeRange.end().printable(),
history.get().version,
(void*)(activeRange.range().begin.begin()));
}
visited.insert({ activeRange.range().begin.begin(), history.get().version });
visited.insert({ activeRange.range().begin.toString(), history.get().version });
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
} else if (BM_PURGE_DEBUG) {
fmt::print("BM {0} No history for range, ignoring\n", self->epoch);
}
break;
} catch (Error& e) {
@ -3444,8 +3472,12 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
}
}
if (BM_DEBUG) {
printf("Beginning BFS traversal of history\n");
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Beginning BFS traversal of {1} history items for range [{2} - {3}) \n",
self->epoch,
historyEntryQueue.size(),
range.begin.printable(),
range.end.printable());
}
while (!historyEntryQueue.empty()) {
// process the node at the front of the queue and remove it
@ -3455,8 +3487,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
std::tie(currRange, startVersion, endVersion) = historyEntryQueue.front();
historyEntryQueue.pop();
if (BM_DEBUG) {
fmt::print("Processing history node [{0} - {1}) with versions [{2}, {3})\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Processing history node [{1} - {2}) with versions [{3}, {4})\n",
self->epoch,
currRange.begin.printable(),
currRange.end.printable(),
startVersion,
@ -3481,11 +3514,15 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
}
if (!foundHistory) {
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} No history for this node, skipping\n", self->epoch);
}
continue;
}
if (BM_DEBUG) {
fmt::print("Found history entry for this node. It's granuleID is {0}\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Found history entry for this node. It's granuleID is {1}\n",
self->epoch,
currHistoryNode.granuleID.toString());
}
@ -3496,33 +3533,45 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
// and so this granule should be partially deleted
// - otherwise, this granule is active, so don't schedule it for deletion
if (force || endVersion <= purgeVersion) {
if (BM_DEBUG) {
fmt::print("Granule {0} will be FULLY deleted\n", currHistoryNode.granuleID.toString());
if (BM_PURGE_DEBUG) {
fmt::print(
"BM {0} Granule {1} will be FULLY deleted\n", self->epoch, currHistoryNode.granuleID.toString());
}
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange });
} else if (startVersion < purgeVersion) {
if (BM_DEBUG) {
fmt::print("Granule {0} will be partially deleted\n", currHistoryNode.granuleID.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Granule {1} will be partially deleted\n",
self->epoch,
currHistoryNode.granuleID.toString());
}
toPartiallyDelete.push_back({ currHistoryNode.granuleID, currRange });
}
// add all of the node's parents to the queue
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Checking {1} parents\n", self->epoch, currHistoryNode.parentVersions.size());
}
for (int i = 0; i < currHistoryNode.parentVersions.size(); i++) {
// for (auto& parent : currHistoryNode.parentVersions.size()) {
// if we already added this node to queue, skip it; otherwise, mark it as visited
KeyRangeRef parentRange(currHistoryNode.parentBoundaries[i], currHistoryNode.parentBoundaries[i + 1]);
Version parentVersion = currHistoryNode.parentVersions[i];
if (visited.count({ parentRange.begin.begin(), parentVersion })) {
if (BM_DEBUG) {
fmt::print("Already added {0} to queue, so skipping it\n", currHistoryNode.granuleID.toString());
std::string beginStr = parentRange.begin.toString();
if (!visited.insert({ beginStr, parentVersion }).second) {
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Already added [{1} - {2}) @ {3} - {4} to queue, so skipping it\n",
self->epoch,
parentRange.begin.printable(),
parentRange.end.printable(),
parentVersion,
startVersion);
}
continue;
}
visited.insert({ parentRange.begin.begin(), parentVersion });
if (BM_DEBUG) {
fmt::print("Adding parent [{0} - {1}) with versions [{2} - {3}) to queue\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Adding parent [{1} - {2}) @ {3} - {4} to queue\n",
self->epoch,
parentRange.begin.printable(),
parentRange.end.printable(),
parentVersion,
@ -3550,10 +3599,19 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
// we won't run into any issues with trying to "re-delete" a blob file since deleting
// a file that doesn't exist is considered successful
TraceEvent("PurgeGranulesTraversalComplete", self->id)
.detail("Epoch", self->epoch)
.detail("Range", range)
.detail("PurgeVersion", purgeVersion)
.detail("Force", force)
.detail("VisitedCount", visited.size())
.detail("DeletingFullyCount", toFullyDelete.size())
.detail("DeletingPartiallyCount", toPartiallyDelete.size());
state std::vector<Future<Void>> partialDeletions;
state int i;
if (BM_DEBUG) {
fmt::print("{0} granules to fully delete\n", toFullyDelete.size());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: {1} granules to fully delete\n", self->epoch, toFullyDelete.size());
}
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
state UID granuleId;
@ -3561,22 +3619,22 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
KeyRange keyRange;
std::tie(granuleId, historyKey, keyRange) = toFullyDelete[i];
// FIXME: consider batching into a single txn (need to take care of txn size limit)
if (BM_DEBUG) {
fmt::print("About to fully delete granule {0}\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: About to fully delete granule {1}\n", self->epoch, granuleId.toString());
}
wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion, range));
}
if (BM_DEBUG) {
fmt::print("{0} granules to partially delete\n", toPartiallyDelete.size());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: {1} granules to partially delete\n", self->epoch, toPartiallyDelete.size());
}
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
UID granuleId;
KeyRange range;
std::tie(granuleId, range) = toPartiallyDelete[i];
if (BM_DEBUG) {
fmt::print("About to partially delete granule {0}\n", granuleId.toString());
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: About to partially delete granule {1}\n", self->epoch, granuleId.toString());
}
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, purgeVersion, range));
}
@ -3588,8 +3646,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
// another purgeIntent that got written for this table while we were processing this one.
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
if (BM_DEBUG) {
fmt::print("Successfully purged range [{0} - {1}) at purgeVersion={2}\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0}: Successfully purged range [{1} - {2}) at purgeVersion={3}\n",
self->epoch,
range.begin.printable(),
range.end.printable(),
purgeVersion);
@ -3601,6 +3660,8 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
.detail("PurgeVersion", purgeVersion)
.detail("Force", force);
CODE_PROBE(true, "range purge complete");
++self->stats.purgesProcessed;
return Void();
}
@ -3651,6 +3712,7 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
// TODO: replace 10000 with a knob
state RangeResult purgeIntents = wait(tr->getRange(blobGranulePurgeKeys, BUGGIFY ? 1 : 10000));
if (purgeIntents.size()) {
CODE_PROBE(true, "BM found purges to process");
int rangeIdx = 0;
for (; rangeIdx < purgeIntents.size(); ++rangeIdx) {
Version purgeVersion;
@ -3672,8 +3734,9 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
}
purgeMap.insert(range, std::make_pair(purgeVersion, force));
if (BM_DEBUG) {
fmt::print("about to purge range [{0} - {1}) @ {2}, force={3}\n",
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} about to purge range [{1} - {2}) @ {3}, force={4}\n",
self->epoch,
range.begin.printable(),
range.end.printable(),
purgeVersion,
@ -3725,9 +3788,11 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
}
}
if (BM_DEBUG) {
printf("Done clearing current set of purge intents.\n");
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Done clearing current set of purge intents.\n", self->epoch);
}
CODE_PROBE(true, "BM finished processing purge intents");
}
}

View File

@ -602,7 +602,20 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
state std::string fileName = randomBGFilename(bwData->id, granuleID, currentDeltaVersion, ".delta");
state Value serialized = ObjectWriter::toValue(deltasToWrite, Unversioned());
state Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx;
state Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
state Arena arena;
// TODO support encryption, figure out proper state stuff
/*if (isBlobFileEncryptionSupported()) {
BlobGranuleCipherKeysCtx ciphKeysCtx = wait(getLatestGranuleCipherKeys(bwData, keyRange, &arena));
cipherKeysCtx = ciphKeysCtx;
cipherKeysMeta = BlobGranuleCipherKeysCtx::toCipherKeysMeta(cipherKeysCtx.get());
}*/
Optional<CompressionFilter> compressFilter = getBlobFileCompressFilter();
state Value serialized = serializeChunkedDeltaFile(
deltasToWrite, keyRange, SERVER_KNOBS->BG_DELTA_FILE_TARGET_CHUNK_BYTES, compressFilter, cipherKeysCtx);
state size_t serializedSize = serialized.size();
// Free up deltasToWrite here to reduce memory
@ -640,7 +653,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D');
// TODO change once we support file multiplexing
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize);
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta);
tr->set(dfKey, dfValue);
if (oldGranuleComplete.present()) {
@ -668,7 +681,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
wait(delay(deterministicRandom()->random01()));
}
// FIXME: change when we implement multiplexing
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize);
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize, cipherKeysMeta);
} catch (Error& e) {
wait(tr->onError(e));
}
@ -753,8 +766,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
}
Optional<CompressionFilter> compressFilter = getBlobFileCompressFilter();
state Value serialized =
serializeChunkedSnapshot(snapshot, SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNKS, compressFilter, cipherKeysCtx);
state Value serialized = serializeChunkedSnapshot(
snapshot, SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES, compressFilter, cipherKeysCtx);
state size_t serializedSize = serialized.size();
// free snapshot to reduce memory
@ -970,6 +983,7 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
snapshotF.cipherKeysMeta);
// TODO: optimization - batch 'encryption-key' lookup given the GranuleFile set is known
// FIXME: get cipher keys for delta as well!
if (chunk.snapshotFile.get().cipherKeysMetaRef.present()) {
ASSERT(isBlobFileEncryptionSupported());
BlobGranuleCipherKeysCtx cipherKeysCtx =
@ -3187,6 +3201,8 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
getGranuleCipherKeys(bwData, chunk.snapshotFile.get().cipherKeysMetaRef.get(), &rep.arena);
}
// FIXME: get cipher keys for delta files too!
// new deltas (if version is larger than version of last delta file)
// FIXME: do trivial key bounds here if key range is not fully contained in request key
// range

View File

@ -484,8 +484,11 @@ public:
}
// TODO: unit test needed
ACTOR static Future<Void> resumeFromDataMoves(Reference<DataDistributor> self) {
ACTOR static Future<Void> resumeFromDataMoves(Reference<DataDistributor> self, Future<Void> readyToStart) {
state KeyRangeMap<std::shared_ptr<DataMove>>::iterator it = self->initData->dataMoveMap.ranges().begin();
wait(readyToStart);
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
if (it.value()->isCancelled() || (it.value()->valid && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
@ -528,8 +531,8 @@ public:
// usage if it turns out to be a problem.
Future<Void> resumeRelocations() {
ASSERT(shardsAffectedByTeamFailure); // has to be allocated
return runAfter(resumeFromShards(Reference<DataDistributor>::addRef(this), g_network->isSimulated()),
resumeFromDataMoves(Reference<DataDistributor>::addRef(this)));
Future<Void> shardsReady = resumeFromShards(Reference<DataDistributor>::addRef(this), g_network->isSimulated());
return resumeFromDataMoves(Reference<DataDistributor>::addRef(this), shardsReady);
}
};

View File

@ -110,9 +110,9 @@ class RocksDBErrorListener : public rocksdb::EventListener {
public:
RocksDBErrorListener(){};
void OnBackgroundError(rocksdb::BackgroundErrorReason reason, rocksdb::Status* bg_error) override {
TraceEvent(SevError, "RocksDBBGError")
TraceEvent(SevError, "ShardedRocksDBBGError")
.detail("Reason", getErrorReason(reason))
.detail("RocksDBSeverity", bg_error->severity())
.detail("ShardedRocksDBSeverity", bg_error->severity())
.detail("Status", bg_error->ToString());
std::unique_lock<std::mutex> lock(mutex);
if (!errorPromise.isValid())
@ -186,8 +186,8 @@ std::vector<std::pair<KeyRange, std::string>> decodeShardMapping(const RangeResu
void logRocksDBError(const rocksdb::Status& status, const std::string& method) {
auto level = status.IsTimedOut() ? SevWarn : SevError;
TraceEvent e(level, "RocksDBError");
e.detail("Error", status.ToString()).detail("Method", method).detail("RocksDBSeverity", status.severity());
TraceEvent e(level, "ShardedRocksDBError");
e.detail("Error", status.ToString()).detail("Method", method).detail("ShardedRocksDBSeverity", status.severity());
if (status.IsIOError()) {
e.detail("SubCode", status.subcode());
}
@ -219,7 +219,7 @@ const char* ShardOpToString(ShardOp op) {
}
}
void logShardEvent(StringRef name, ShardOp op, Severity severity = SevInfo, const std::string& message = "") {
TraceEvent e(severity, "KVSShardEvent");
TraceEvent e(severity, "ShardedRocksKVSShardEvent");
e.detail("Name", name).detail("Action", ShardOpToString(op));
if (!message.empty()) {
e.detail("Message", message);
@ -230,7 +230,7 @@ void logShardEvent(StringRef name,
ShardOp op,
Severity severity = SevInfo,
const std::string& message = "") {
TraceEvent e(severity, "KVSShardEvent");
TraceEvent e(severity, "ShardedRocksKVSShardEvent");
e.detail("Name", name).detail("Action", ShardOpToString(op)).detail("Begin", range.begin).detail("End", range.end);
if (message != "") {
e.detail("Message", message);
@ -343,7 +343,7 @@ public:
ASSERT(cf);
readRangeOptions.background_purge_on_iterator_cleanup = true;
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
TraceEvent(SevDebug, "ReadIteratorPool")
TraceEvent(SevVerbose, "ShardedRocksReadIteratorPool")
.detail("Path", path)
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS)
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN);
@ -425,7 +425,7 @@ private:
ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e("RocksDBFlowLock");
TraceEvent e("ShardedRocksDBFlowLock");
e.detail("ReadAvailable", readLock->available());
e.detail("ReadActivePermits", readLock->activePermits());
e.detail("ReadWaiters", readLock->waiters());
@ -588,13 +588,13 @@ public:
if (rState->closing) {
break;
}
TraceEvent(SevInfo, "KVSPhysialShardMetrics")
TraceEvent(SevInfo, "ShardedRocksKVSPhysialShardMetrics")
.detail("NumActiveShards", shardManager->numActiveShards())
.detail("TotalPhysicalShards", shardManager->numPhysicalShards());
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent(SevError, "ShardMetricsLoggerError").errorUnsuppressed(e);
TraceEvent(SevError, "ShardedRocksShardMetricsLoggerError").errorUnsuppressed(e);
}
}
return Void();
@ -602,7 +602,7 @@ public:
rocksdb::Status init() {
// Open instance.
TraceEvent(SevVerbose, "ShardManagerInitBegin", this->logId).detail("DataPath", path);
TraceEvent(SevInfo, "ShardedRocksShardManagerInitBegin", this->logId).detail("DataPath", path);
std::vector<std::string> columnFamilies;
rocksdb::Options options = getOptions();
rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, path, &columnFamilies);
@ -632,6 +632,8 @@ public:
}
if (foundMetadata) {
TraceEvent(SevInfo, "ShardedRocksInitLoadPhysicalShards", this->logId)
.detail("PhysicalShardCount", handles.size());
for (auto handle : handles) {
if (handle->GetName() == "kvs-metadata") {
metadataShard = std::make_shared<PhysicalShard>(db, "kvs-metadata", handle);
@ -639,7 +641,8 @@ public:
physicalShards[handle->GetName()] = std::make_shared<PhysicalShard>(db, handle->GetName(), handle);
}
columnFamilyMap[handle->GetID()] = handle;
TraceEvent(SevInfo, "ShardedRocskDB").detail("FoundShard", handle->GetName()).detail("Action", "Init");
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId)
.detail("PhysicalShard", handle->GetName());
}
RangeResult metadata;
readRangeInDb(metadataShard.get(), prefixRange(shardMappingPrefix), UINT16_MAX, UINT16_MAX, &metadata);
@ -647,7 +650,7 @@ public:
std::vector<std::pair<KeyRange, std::string>> mapping = decodeShardMapping(metadata, shardMappingPrefix);
for (const auto& [range, name] : mapping) {
TraceEvent(SevDebug, "ShardedRocksLoadPhysicalShard", this->logId)
TraceEvent(SevVerbose, "ShardedRocksLoadRange", this->logId)
.detail("Range", range)
.detail("PhysicalShard", name);
auto it = physicalShards.find(name);
@ -662,10 +665,10 @@ public:
activePhysicalShardIds.emplace(name);
}
// TODO: remove unused column families.
} else {
// DB is opened with default shard.
ASSERT(handles.size() == 1);
// Add SpecialKeys range. This range should not be modified.
std::shared_ptr<PhysicalShard> defaultShard = std::make_shared<PhysicalShard>(db, "default", handles[0]);
columnFamilyMap[defaultShard->cf->GetID()] = defaultShard->cf;
@ -688,7 +691,7 @@ public:
return status;
}
metadataShard->readIterPool->update();
TraceEvent(SevInfo, "InitializeMetaDataShard", this->logId)
TraceEvent(SevInfo, "ShardedRocksInitializeMetaDataShard", this->logId)
.detail("MetadataShardCF", metadataShard->cf->GetID());
}
physicalShards["kvs-metadata"] = metadataShard;
@ -696,7 +699,7 @@ public:
writeBatch = std::make_unique<rocksdb::WriteBatch>();
dirtyShards = std::make_unique<std::set<PhysicalShard*>>();
TraceEvent(SevDebug, "ShardManagerInitEnd", this->logId).detail("DataPath", path);
TraceEvent(SevInfo, "ShardedRocksShardManagerInitEnd", this->logId).detail("DataPath", path);
return status;
}
@ -712,7 +715,7 @@ public:
for (auto it = rangeIterator.begin(); it != rangeIterator.end(); ++it) {
if (it.value() == nullptr) {
TraceEvent(SevDebug, "ShardedRocksDB")
TraceEvent(SevVerbose, "ShardedRocksDB")
.detail("Info", "ShardNotFound")
.detail("BeginKey", range.begin)
.detail("EndKey", range.end);
@ -724,9 +727,10 @@ public:
}
PhysicalShard* addRange(KeyRange range, std::string id) {
TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId)
TraceEvent(SevInfo, "ShardedRocksAddRangeBegin", this->logId)
.detail("Range", range)
.detail("PhysicalShardID", id);
// Newly added range should not overlap with any existing range.
auto ranges = dataShardMap.intersectingRanges(range);
@ -750,7 +754,7 @@ public:
validate();
TraceEvent(SevVerbose, "ShardedRocksAddRangeEnd", this->logId)
TraceEvent(SevInfo, "ShardedRocksAddRangeEnd", this->logId)
.detail("Range", range)
.detail("PhysicalShardID", id);
@ -758,7 +762,7 @@ public:
}
std::vector<std::string> removeRange(KeyRange range) {
TraceEvent(SevVerbose, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range);
TraceEvent(SevInfo, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range);
std::vector<std::string> shardIds;
@ -796,6 +800,7 @@ public:
}
continue;
}
// Range modification could result in more than one segments. Remove the original segment key here.
existingShard->dataShards.erase(shardRange.begin.toString());
if (shardRange.begin < range.begin) {
@ -826,7 +831,7 @@ public:
validate();
TraceEvent(SevVerbose, "ShardedRocksRemoveRangeEnd", this->logId).detail("Range", range);
TraceEvent(SevInfo, "ShardedRocksRemoveRangeEnd", this->logId).detail("Range", range);
return shardIds;
}
@ -849,7 +854,7 @@ public:
TraceEvent(SevError, "ShardedRocksDB").detail("Error", "write to non-exist shard").detail("WriteKey", key);
return;
}
TraceEvent(SevVerbose, "ShardManagerPut", this->logId)
TraceEvent(SevVerbose, "ShardedRocksShardManagerPut", this->logId)
.detail("WriteKey", key)
.detail("Value", value)
.detail("MapRange", it.range())
@ -859,7 +864,9 @@ public:
ASSERT(dirtyShards != nullptr);
writeBatch->Put(it.value()->physicalShard->cf, toSlice(key), toSlice(value));
dirtyShards->insert(it.value()->physicalShard);
TraceEvent(SevVerbose, "ShardManagerPutEnd", this->logId).detail("WriteKey", key).detail("Value", value);
TraceEvent(SevVerbose, "ShardedRocksShardManagerPutEnd", this->logId)
.detail("WriteKey", key)
.detail("Value", value);
}
void clear(KeyRef key) {
@ -884,7 +891,7 @@ public:
}
void persistRangeMapping(KeyRangeRef range, bool isAdd) {
TraceEvent(SevDebug, "ShardedRocksDB")
TraceEvent(SevDebug, "ShardedRocksDB", this->logId)
.detail("Info", "RangeToPersist")
.detail("BeginKey", range.begin)
.detail("EndKey", range.end);
@ -902,7 +909,7 @@ public:
writeBatch->Put(metadataShard->cf,
getShardMappingKey(it.range().begin, shardMappingPrefix),
it.value()->physicalShard->id);
TraceEvent(SevDebug, "ShardedRocksDB")
TraceEvent(SevDebug, "ShardedRocksDB", this->logId)
.detail("Action", "PersistRangeMapping")
.detail("BeginKey", it.range().begin)
.detail("EndKey", it.range().end)
@ -911,7 +918,7 @@ public:
} else {
// Empty range.
writeBatch->Put(metadataShard->cf, getShardMappingKey(it.range().begin, shardMappingPrefix), "");
TraceEvent(SevDebug, "ShardedRocksDB")
TraceEvent(SevDebug, "ShardedRocksDB", this->logId)
.detail("Action", "PersistRangeMapping")
.detail("BeginKey", it.range().begin)
.detail("EndKey", it.range().end)
@ -921,7 +928,7 @@ public:
}
} else {
writeBatch->Put(metadataShard->cf, getShardMappingKey(range.begin, shardMappingPrefix), "");
TraceEvent(SevDebug, "ShardedRocksDB")
TraceEvent(SevDebug, "ShardedRocksDB", this->logId)
.detail("Action", "PersistRangeMapping")
.detail("RemoveRange", "True")
.detail("BeginKey", range.begin)
@ -972,7 +979,7 @@ public:
if (!s.ok()) {
logRocksDBError(s, "DestroyDB");
}
TraceEvent("RocksDB").detail("Info", "DBDestroyed");
TraceEvent("ShardedRocksDB", this->logId).detail("Info", "DBDestroyed");
}
rocksdb::DB* getDb() const { return db; }
@ -997,9 +1004,9 @@ public:
}
void validate() {
TraceEvent(SevVerbose, "ValidateShardManager", this->logId);
TraceEvent(SevVerbose, "ShardedRocksValidateShardManager", this->logId);
for (auto s = dataShardMap.ranges().begin(); s != dataShardMap.ranges().end(); ++s) {
TraceEvent e(SevVerbose, "ValidateDataShardMap", this->logId);
TraceEvent e(SevVerbose, "ShardedRocksValidateDataShardMap", this->logId);
e.detail("Range", s->range());
const DataShard* shard = s->value();
e.detail("ShardAddress", reinterpret_cast<std::uintptr_t>(shard));
@ -1008,6 +1015,13 @@ public:
} else {
e.detail("Shard", "Empty");
}
if (shard != nullptr) {
ASSERT(shard->range == static_cast<KeyRangeRef>(s->range()));
ASSERT(shard->physicalShard != nullptr);
auto it = shard->physicalShard->dataShards.find(shard->range.begin.toString());
ASSERT(it != shard->physicalShard->dataShards.end());
ASSERT(it->second.get() == shard);
}
}
}
@ -1338,7 +1352,7 @@ std::shared_ptr<rocksdb::Statistics> RocksDBMetrics::getStatsObjForRocksDB() {
}
void RocksDBMetrics::logStats(rocksdb::DB* db) {
TraceEvent e("RocksDBMetrics");
TraceEvent e("ShardedRocksDBMetrics");
uint64_t stat;
for (auto& [name, ticker, cumulation] : tickerStats) {
stat = stats->getTickerCount(ticker);
@ -1361,7 +1375,7 @@ void RocksDBMetrics::logStats(rocksdb::DB* db) {
}
void RocksDBMetrics::logMemUsagePerShard(std::string shardName, rocksdb::DB* db) {
TraceEvent e("RocksDBShardMemMetrics");
TraceEvent e("ShardedRocksDBShardMemMetrics");
uint64_t stat;
ASSERT(db != nullptr);
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kBlockCacheUsage, &stat));
@ -1387,7 +1401,7 @@ void RocksDBMetrics::setPerfContext(int index) {
}
void RocksDBMetrics::logPerfContext(bool ignoreZeroMetric) {
TraceEvent e("RocksDBPerfContextMetrics");
TraceEvent e("ShardedRocksDBPerfContextMetrics");
e.setMaxEventLength(20000);
for (auto& [name, metric, vals] : perfContextMetrics) {
uint64_t s = 0;
@ -1650,7 +1664,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return;
}
TraceEvent(SevInfo, "RocksDB").detail("Method", "Open");
TraceEvent(SevInfo, "ShardedRocksDB").detail("Method", "Open");
a.done.send(Void());
}
@ -1841,7 +1855,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
} else {
a.shardManager->closeAllShards();
}
TraceEvent(SevInfo, "RocksDB").detail("Method", "Close");
TraceEvent(SevInfo, "ShardedRocksDB").detail("Method", "Close");
a.done.send(Void());
}
};
@ -1908,7 +1922,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
}
if (readBeginTime - a.startTime > readValueTimeout) {
TraceEvent(SevWarn, "RocksDBError")
TraceEvent(SevWarn, "ShardedRocksDBError")
.detail("Error", "Read value request timedout")
.detail("Method", "ReadValueAction")
.detail("Timeout value", readValueTimeout);
@ -1995,7 +2009,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
if (readBeginTime - a.startTime > readValuePrefixTimeout) {
TraceEvent(SevWarn, "RocksDBError")
TraceEvent(SevWarn, "ShardedRocksDBError")
.detail("Error", "Read value prefix request timedout")
.detail("Method", "ReadValuePrefixAction")
.detail("Timeout value", readValuePrefixTimeout);
@ -2080,7 +2094,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
rocksDBMetrics->getReadRangeQueueWaitHistogram(threadIndex)->sampleSeconds(readBeginTime - a.startTime);
}
if (readBeginTime - a.startTime > readRangeTimeout) {
TraceEvent(SevWarn, "KVSReadTimeout")
TraceEvent(SevWarn, "ShardedRocksKVSReadTimeout")
.detail("Error", "Read range request timedout")
.detail("Method", "ReadRangeAction")
.detail("Timeout value", readRangeTimeout);
@ -2127,10 +2141,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
}
Histogram::getHistogram(
ROCKSDBSTORAGE_HISTOGRAM_GROUP, "ShardedRocksDBNumShardsInRangeRead"_sr, Histogram::Unit::countLinear)
->sample(numShards);
result.more =
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
if (result.more) {
@ -2184,7 +2194,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
readThreads = createGenericThreadPool();
}
writeThread->addThread(new Writer(id, 0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr");
TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
TraceEvent("ShardedRocksDBReadThreads", id)
.detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(id, i, rocksDBMetrics), "fdb-rocksdb-re");
}
@ -2302,7 +2313,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
auto* shard = shardManager.getDataShard(key);
if (shard == nullptr || !shard->physicalShard->initialized()) {
// TODO: read non-exist system key range should not cause an error.
TraceEvent(SevWarnAlways, "ShardedRocksDB")
TraceEvent(SevWarnAlways, "ShardedRocksDB", this->id)
.detail("Detail", "Read non-exist key range")
.detail("ReadKey", key);
return Optional<Value>();
@ -2330,7 +2341,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
auto* shard = shardManager.getDataShard(key);
if (shard == nullptr || !shard->physicalShard->initialized()) {
// TODO: read non-exist system key range should not cause an error.
TraceEvent(SevWarnAlways, "ShardedRocksDB")
TraceEvent(SevWarnAlways, "ShardedRocksDB", this->id)
.detail("Detail", "Read non-exist key range")
.detail("ReadKey", key);
return Optional<Value>();
@ -2452,7 +2463,7 @@ IKeyValueStore* keyValueStoreShardedRocksDB(std::string const& path,
#ifdef SSD_ROCKSDB_EXPERIMENTAL
return new ShardedRocksDBKeyValueStore(path, logID);
#else
TraceEvent(SevError, "RocksDBEngineInitFailure").detail("Reason", "Built without RocksDB");
TraceEvent(SevError, "ShardedRocksDBEngineInitFailure").detail("Reason", "Built without RocksDB");
ASSERT(false);
return nullptr;
#endif // SSD_ROCKSDB_EXPERIMENTAL

View File

@ -280,6 +280,13 @@ class TestConfig {
if (attrib == "blobGranulesEnabled") {
blobGranulesEnabled = strcmp(value.c_str(), "true") == 0;
}
if (attrib == "injectSSTargetedRestart") {
injectTargetedSSRestart = strcmp(value.c_str(), "true") == 0;
}
if (attrib == "injectSSDelay") {
injectSSDelay = strcmp(value.c_str(), "true") == 0;
}
}
ifs.close();
@ -327,6 +334,8 @@ public:
bool allowDefaultTenant = true;
bool allowDisablingTenants = true;
bool injectTargetedSSRestart = false;
bool injectSSDelay = false;
ConfigDBType getConfigDBType() const { return configDBType; }
@ -384,7 +393,9 @@ public:
.add("blobGranulesEnabled", &blobGranulesEnabled)
.add("allowDefaultTenant", &allowDefaultTenant)
.add("allowDisablingTenants", &allowDisablingTenants)
.add("randomlyRenameZoneId", &randomlyRenameZoneId);
.add("randomlyRenameZoneId", &randomlyRenameZoneId)
.add("injectTargetedSSRestart", &injectTargetedSSRestart)
.add("injectSSDelay", &injectSSDelay);
try {
auto file = toml::parse(testFile);
if (file.contains("configuration") && toml::find(file, "configuration").is_table()) {
@ -1384,7 +1395,7 @@ void SimulationConfig::setDatacenters(const TestConfig& testConfig) {
void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
// Using [0, 4) to disable the RocksDB storage engine.
// TODO: Figure out what is broken with the RocksDB engine in simulation.
int storage_engine_type = deterministicRandom()->randomInt(0, 4);
int storage_engine_type = deterministicRandom()->randomInt(0, 6);
if (testConfig.storageEngineType.present()) {
storage_engine_type = testConfig.storageEngineType.get();
} else {
@ -1392,7 +1403,7 @@ void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
while (std::find(testConfig.storageEngineExcludeTypes.begin(),
testConfig.storageEngineExcludeTypes.end(),
storage_engine_type) != testConfig.storageEngineExcludeTypes.end()) {
storage_engine_type = deterministicRandom()->randomInt(0, 5);
storage_engine_type = deterministicRandom()->randomInt(0, 6);
}
}
@ -1435,6 +1446,8 @@ void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
TraceEvent(SevWarnAlways, "RocksDBNonDeterminism")
.detail("Explanation", "The Sharded RocksDB storage engine is threaded and non-deterministic");
noUnseed = true;
auto& g_knobs = IKnobCollection::getMutableGlobalKnobCollection();
g_knobs.setKnob("shard_encode_location_metadata", KnobValueRef::create(bool{ true }));
break;
}
default:
@ -2364,6 +2377,13 @@ ACTOR void setupAndRun(std::string dataFolder,
testConfig.readFromConfig(testFile);
g_simulator.hasDiffProtocolProcess = testConfig.startIncompatibleProcess;
g_simulator.setDiffProtocol = false;
if (testConfig.injectTargetedSSRestart && deterministicRandom()->random01() < 0.25) {
g_simulator.injectTargetedSSRestartTime = 60.0 + 340.0 * deterministicRandom()->random01();
}
if (testConfig.injectSSDelay && deterministicRandom()->random01() < 0.25) {
g_simulator.injectSSDelayTime = 60.0 + 240.0 * deterministicRandom()->random01();
}
// Build simulator allow list
allowList.addTrustedSubnet("0.0.0.0/2"sv);
@ -2377,6 +2397,7 @@ ACTOR void setupAndRun(std::string dataFolder,
// https://github.com/apple/foundationdb/issues/5155
if (std::string_view(testFile).find("restarting") != std::string_view::npos) {
testConfig.storageEngineExcludeTypes.push_back(4);
testConfig.storageEngineExcludeTypes.push_back(5);
// Disable the default tenant in restarting tests for now
// TODO: persist the chosen default tenant in the restartInfo.ini file for the second test
@ -2389,6 +2410,7 @@ ACTOR void setupAndRun(std::string dataFolder,
// Re-enable the backup and restore related simulation tests when the tests are passing again.
if (std::string_view(testFile).find("Backup") != std::string_view::npos) {
testConfig.storageEngineExcludeTypes.push_back(4);
testConfig.storageEngineExcludeTypes.push_back(5);
}
// Disable the default tenant in backup and DR tests for now. This is because backup does not currently duplicate
@ -2402,6 +2424,7 @@ ACTOR void setupAndRun(std::string dataFolder,
// in the build.
if (!rocksDBEnabled) {
testConfig.storageEngineExcludeTypes.push_back(4);
testConfig.storageEngineExcludeTypes.push_back(5);
}
state ProtocolVersion protocolVersion = currentProtocolVersion;

View File

@ -2751,6 +2751,9 @@ ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<ServerDBI
try {
wait(tr.onError(e));
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
incomplete_reasons->insert(format("Unable to determine if database is locked (%s).", e.what()));
break;
}

View File

@ -51,6 +51,8 @@ bool compareFDBAndBlob(RangeResult fdb,
Version v,
bool debug);
void printGranuleChunks(const Standalone<VectorRef<BlobGranuleChunkRef>>& chunks);
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range);
#include "flow/unactorcompiler.h"

View File

@ -294,8 +294,8 @@ Future<Void> bulkSetup(Database cx,
// Here we wait for data in flight to go to 0 (this will not work on a database with other users)
if (postSetupWarming != 0) {
try {
wait(delay(5.0) >>
waitForLowInFlight(cx, workload)); // Wait for the data distribution in a small test to start
wait(delay(5.0));
wait(waitForLowInFlight(cx, workload)); // Wait for the data distribution in a small test to start
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;

View File

@ -536,6 +536,9 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
Version storageVersion = invalidVersion; // The version between the storage version and the durable version are
// being written to disk as part of the current commit in updateStorage.
Version durableVersion = invalidVersion; // All versions before the durable version are durable on disk
// FIXME: this needs to get persisted to disk to still fix same races across restart!
Version metadataVersion = invalidVersion; // Last update to the change feed metadata. Used for reasoning about
// fetched metadata vs local metadata
Version emptyVersion = 0; // The change feed does not have any mutations before emptyVersion
KeyRange range;
Key id;
@ -551,8 +554,6 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
bool removing = false;
bool destroyed = false;
bool possiblyDestroyed = false;
bool refreshInProgress = false;
KeyRangeMap<std::unordered_map<UID, Promise<Void>>> moveTriggers;
@ -587,12 +588,21 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
}
void destroy(Version destroyVersion) {
updateMetadataVersion(destroyVersion);
removing = true;
destroyed = true;
refreshInProgress = false;
moved(range);
newMutations.trigger();
}
bool updateMetadataVersion(Version version) {
// don't update metadata version if removing, so that metadata version remains the moved away version
if (!removing && version > metadataVersion) {
metadataVersion = version;
return true;
}
return false;
}
};
class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> {
@ -895,7 +905,7 @@ public:
KeyRangeMap<std::vector<Reference<ChangeFeedInfo>>> keyChangeFeed;
std::map<Key, Reference<ChangeFeedInfo>> uidChangeFeed;
Deque<std::pair<std::vector<Key>, Version>> changeFeedVersions;
std::map<UID, PromiseStream<Key>> changeFeedRemovals;
std::map<UID, PromiseStream<Key>> changeFeedDestroys;
std::set<Key> currentChangeFeeds;
std::set<Key> fetchingChangeFeeds;
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
@ -971,6 +981,9 @@ public:
FlowLock durableVersionLock;
FlowLock fetchKeysParallelismLock;
// Extra lock that prevents too much post-initial-fetch work from building up, such as mutation applying and change
// feed tail fetching
FlowLock fetchKeysParallelismFullLock;
FlowLock fetchChangeFeedParallelismLock;
int64_t fetchKeysBytesBudget;
AsyncVar<bool> fetchKeysBudgetUsed;
@ -1046,7 +1059,8 @@ public:
Counter sampledBytesCleared;
// The number of key-value pairs fetched by fetchKeys()
Counter kvFetched;
Counter mutations, setMutations, clearRangeMutations, atomicMutations;
Counter mutations, setMutations, clearRangeMutations, atomicMutations, changeFeedMutations,
changeFeedMutationsDurable;
Counter updateBatches, updateVersions;
Counter loops;
Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount;
@ -1071,6 +1085,8 @@ public:
Counter kvScans;
// The count of commit operation to the storage engine.
Counter kvCommits;
// The count of change feed reads that hit disk
Counter changeFeedDiskReads;
LatencySample readLatencySample;
LatencyBands readLatencyBands;
@ -1095,15 +1111,17 @@ public:
feedBytesFetched("FeedBytesFetched", cc), sampledBytesCleared("SampledBytesCleared", cc),
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc),
fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc),
readsRejected("ReadsRejected", cc), wrongShardServer("WrongShardServer", cc),
fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc),
quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc),
quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc),
kvScanBytes("KVScanBytes", cc), kvGetBytes("KVGetBytes", cc), eagerReadsKeys("EagerReadsKeys", cc),
kvGets("KVGets", cc), kvScans("KVScans", cc), kvCommits("KVCommits", cc),
changeFeedMutations("ChangeFeedMutations", cc),
changeFeedMutationsDurable("ChangeFeedMutationsDurable", cc), updateBatches("UpdateBatches", cc),
updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc),
fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc),
wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc),
fetchesFromLogs("FetchesFromLogs", cc), quickGetValueHit("QuickGetValueHit", cc),
quickGetValueMiss("QuickGetValueMiss", cc), quickGetKeyValuesHit("QuickGetKeyValuesHit", cc),
quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc), kvScanBytes("KVScanBytes", cc),
kvGetBytes("KVGetBytes", cc), eagerReadsKeys("EagerReadsKeys", cc), kvGets("KVGets", cc),
kvScans("KVScans", cc), kvCommits("KVCommits", cc), changeFeedDiskReads("ChangeFeedDiskReads", cc),
readLatencySample("ReadLatencyMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
@ -1133,6 +1151,11 @@ public:
specialCounter(
cc, "FetchKeysFetchActive", [self]() { return self->fetchKeysParallelismLock.activePermits(); });
specialCounter(cc, "FetchKeysWaiting", [self]() { return self->fetchKeysParallelismLock.waiters(); });
specialCounter(cc, "FetchKeysFullFetchActive", [self]() {
return self->fetchKeysParallelismFullLock.activePermits();
});
specialCounter(
cc, "FetchKeysFullFetchWaiting", [self]() { return self->fetchKeysParallelismFullLock.waiters(); });
specialCounter(cc, "FetchChangeFeedFetchActive", [self]() {
return self->fetchChangeFeedParallelismLock.activePermits();
});
@ -1196,6 +1219,7 @@ public:
byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), durableInProgress(Void()), watchBytes(0),
numWatches(0), noRecentUpdates(false), lastUpdate(now()), updateEagerReads(nullptr),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchKeysParallelismFullLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_FULL),
fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
@ -1386,6 +1410,28 @@ public:
req.reply.sendError(e);
}
}
void maybeInjectTargetedRestart(Version v) {
// inject an SS restart at most once per test
if (g_network->isSimulated() && !g_simulator.speedUpSimulation &&
now() > g_simulator.injectTargetedSSRestartTime &&
rebootAfterDurableVersion == std::numeric_limits<Version>::max()) {
CODE_PROBE(true, "Injecting SS targeted restart");
TraceEvent("SimSSInjectTargetedRestart", thisServerID).detail("Version", v);
rebootAfterDurableVersion = v;
g_simulator.injectTargetedSSRestartTime = std::numeric_limits<double>::max();
}
}
bool maybeInjectDelay() {
if (g_network->isSimulated() && !g_simulator.speedUpSimulation && now() > g_simulator.injectSSDelayTime) {
CODE_PROBE(true, "Injecting SS targeted delay");
TraceEvent("SimSSInjectDelay", thisServerID);
g_simulator.injectSSDelayTime = std::numeric_limits<double>::max();
return true;
}
return false;
}
};
const StringRef StorageServer::CurrentRunningFetchKeys::emptyString = LiteralStringRef("");
@ -2198,46 +2244,54 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
return Void();
}
Version metadataVersion = invalidVersion;
Version metadataWaitVersion = invalidVersion;
auto ranges = data->keyChangeFeed.intersectingRanges(req.range);
std::map<Key, std::tuple<KeyRange, Version, Version>> rangeIds;
std::map<Key, std::tuple<KeyRange, Version, Version, Version>> rangeIds;
for (auto r : ranges) {
for (auto& it : r.value()) {
if (!it->removing) {
// Can't tell other SS about a change feed create or stopVersion that may get rolled back, and we only
// need to tell it about the metadata if req.minVersion > metadataVersion, since it will get the
// information from its own private mutations if it hasn't processed up that version yet
metadataVersion = std::max(metadataVersion, it->metadataCreateVersion);
metadataWaitVersion = std::max(metadataWaitVersion, it->metadataCreateVersion);
// don't wait for all it->metadataVersion updates, if metadata was fetched from elsewhere it's already
// durable, and some updates are unecessary to wait for
Version stopVersion;
if (it->stopVersion != MAX_VERSION && req.minVersion > it->stopVersion) {
stopVersion = it->stopVersion;
metadataVersion = std::max(metadataVersion, stopVersion);
metadataWaitVersion = std::max(metadataWaitVersion, stopVersion);
} else {
stopVersion = MAX_VERSION;
}
rangeIds[it->id] = std::tuple(it->range, it->emptyVersion, stopVersion);
rangeIds[it->id] = std::tuple(it->range, it->emptyVersion, stopVersion, it->metadataVersion);
}
}
}
state OverlappingChangeFeedsReply reply;
reply.feedMetadataVersion = data->version.get();
for (auto& it : rangeIds) {
reply.rangeIds.push_back(OverlappingChangeFeedEntry(
it.first, std::get<0>(it.second), std::get<1>(it.second), std::get<2>(it.second)));
reply.feeds.push_back_deep(reply.arena,
OverlappingChangeFeedEntry(it.first,
std::get<0>(it.second),
std::get<1>(it.second),
std::get<2>(it.second),
std::get<3>(it.second)));
TraceEvent(SevDebug, "OverlappingChangeFeedEntry", data->thisServerID)
.detail("MinVersion", req.minVersion)
.detail("FeedID", it.first)
.detail("Range", std::get<0>(it.second))
.detail("EmptyVersion", std::get<1>(it.second))
.detail("StopVersion", std::get<2>(it.second));
.detail("StopVersion", std::get<2>(it.second))
.detail("FeedMetadataVersion", std::get<3>(it.second));
}
// Make sure all of the metadata we are sending won't get rolled back
if (metadataVersion != invalidVersion && metadataVersion > data->knownCommittedVersion.get()) {
if (metadataWaitVersion != invalidVersion && metadataWaitVersion > data->knownCommittedVersion.get()) {
CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be committed");
wait(data->desiredOldestVersion.whenAtLeast(metadataVersion));
wait(data->desiredOldestVersion.whenAtLeast(metadataWaitVersion));
}
req.reply.send(reply);
return Void();
@ -2366,12 +2420,10 @@ static std::deque<Standalone<MutationsAndVersionRef>>::const_iterator searchChan
break;
}
lastEnd = currentEnd + 1;
jump = std::min((int)(currentEnd - mutations.begin()), jump);
currentEnd -= jump;
jump <<= 1;
}
if (currentEnd < mutations.begin()) {
currentEnd = mutations.begin();
}
auto ret = std::lower_bound(currentEnd, lastEnd, searchKey, MutationsAndVersionRef::OrderByVersion());
// TODO REMOVE: for validation
if (ret != mutations.end()) {
@ -2498,6 +2550,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
remainingDurableBytes));
data->counters.kvScanBytes += res.logicalSize();
++data->counters.changeFeedDiskReads;
if (!inverted && !req.range.empty()) {
data->checkChangeCounter(changeCounter, req.range);
@ -2569,7 +2622,14 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
} else if (memoryVerifyIdx < memoryReply.mutations.size() &&
version == memoryReply.mutations[memoryVerifyIdx].version) {
fmt::print("ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n",
if (version > feedInfo->storageVersion && version > feedInfo->fetchVersion) {
// Another validation case - feed was popped, data was fetched, fetched data was persisted but pop
// wasn't yet, then SS restarted. Now SS has the data without the popped version. This looks wrong
// here but is fine.
memoryVerifyIdx++;
} else {
fmt::print(
"ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
@ -2583,8 +2643,17 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
}
}
fmt::print(" Disk(pre-filter): ({})\n", mutations.size());
for (auto& it : mutations) {
if (it.type == MutationRef::SetValue) {
fmt::print(" {}=\n", it.param1.printable().c_str());
} else {
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
}
}
ASSERT(false);
}
}
remainingDurableBytes -=
sizeof(KeyValueRef) +
kv.expectedSize(); // This is tracking the size on disk rather than the reply size
@ -5073,6 +5142,8 @@ void applyChangeFeedMutation(StorageServer* self, MutationRef const& m, Version
DEBUG_MUTATION("ChangeFeedWriteSet", version, m, self->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id);
++self->counters.changeFeedMutations;
} else {
CODE_PROBE(version <= it->emptyVersion, "Skip CF write because version <= emptyVersion");
CODE_PROBE(it->removing, "Skip CF write because removing");
@ -5098,6 +5169,7 @@ void applyChangeFeedMutation(StorageServer* self, MutationRef const& m, Version
DEBUG_MUTATION("ChangeFeedWriteClear", version, m, self->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id);
++self->counters.changeFeedMutations;
} else {
CODE_PROBE(version <= it->emptyVersion, "Skip CF clear because version <= emptyVersion");
CODE_PROBE(it->removing, "Skip CF clear because removing");
@ -5353,22 +5425,27 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
// We have to store the version the change feed was stopped at in the SS instead of just the stopped status
// In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds
// from other SS correctly
const Value changeFeedSSValue(KeyRangeRef const& range, Version popVersion, Version stopVersion) {
const Value changeFeedSSValue(KeyRangeRef const& range,
Version popVersion,
Version stopVersion,
Version metadataVersion) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed()));
wr << range;
wr << popVersion;
wr << stopVersion;
wr << metadataVersion;
return wr.toValue();
}
std::tuple<KeyRange, Version, Version> decodeChangeFeedSSValue(ValueRef const& value) {
std::tuple<KeyRange, Version, Version, Version> decodeChangeFeedSSValue(ValueRef const& value) {
KeyRange range;
Version popVersion, stopVersion;
Version popVersion, stopVersion, metadataVersion;
BinaryReader reader(value, IncludeVersion());
reader >> range;
reader >> popVersion;
reader >> stopVersion;
return std::make_tuple(range, popVersion, stopVersion);
reader >> metadataVersion;
return std::make_tuple(range, popVersion, stopVersion, metadataVersion);
}
ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) {
@ -5402,10 +5479,12 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
auto& mLV = self->addVersionToMutationLog(durableVersion);
self->addMutationToMutationLog(
mLV,
MutationRef(
MutationRef::SetValue,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
changeFeedSSValue(feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
changeFeedSSValue(feed->second->range,
feed->second->emptyVersion + 1,
feed->second->stopVersion,
feed->second->metadataVersion)));
if (feed->second->storageVersion != invalidVersion) {
++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV,
@ -5497,7 +5576,8 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
changeFeedSSValue(changeFeedInfo->range,
changeFeedInfo->emptyVersion + 1,
changeFeedInfo->stopVersion)));
changeFeedInfo->stopVersion,
changeFeedInfo->metadataVersion)));
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::ClearRange,
@ -5616,8 +5696,10 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
changeFeedSSValue(
changeFeedInfo->range, changeFeedInfo->emptyVersion + 1, changeFeedInfo->stopVersion)));
changeFeedSSValue(changeFeedInfo->range,
changeFeedInfo->emptyVersion + 1,
changeFeedInfo->stopVersion,
changeFeedInfo->metadataVersion)));
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(changeFeedInfo->id, 0),
@ -5714,13 +5796,6 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
}
}
/*fmt::print("DBG: SS {} Feed {} possibly destroyed {}, {} metadata create, {} desired committed\n",
data->thisServerID.toString().substr(0, 4),
changeFeedInfo->id.printable(),
changeFeedInfo->possiblyDestroyed,
changeFeedInfo->metadataCreateVersion,
data->desiredOldestVersion.get());*/
// There are two reasons for change_feed_not_registered:
// 1. The feed was just created, but the ss mutation stream is ahead of the GRV that fetchChangeFeedApplier
// uses to read the change feed data from the database. In this case we need to wait and retry
@ -5759,7 +5834,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
data->changeFeedCleanupDurable[changeFeedInfo->id] = cleanupVersion;
}
for (auto& it : data->changeFeedRemovals) {
for (auto& it : data->changeFeedDestroys) {
it.second.send(changeFeedInfo->id);
}
@ -5775,7 +5850,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
KeyRange keys,
PromiseStream<Key> removals,
PromiseStream<Key> destroyedFeeds,
UID fetchKeysID) {
// Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of the
@ -5789,82 +5864,55 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
.detail("FetchVersion", fetchVersion)
.detail("FKID", fetchKeysID);
state std::set<Key> refreshedFeedIds;
state std::set<Key> destroyedFeedIds;
// before fetching feeds from other SS's, refresh any feeds we already have that are being marked as removed
state OverlappingChangeFeedsInfo feedMetadata = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
// rest of this actor needs to happen without waits that might yield to scheduler, to avoid races in feed metadata.
// Find set of feeds we currently have that were not present in fetch, to infer that they may have been destroyed.
state std::unordered_map<Key, Version> missingFeeds;
auto ranges = data->keyChangeFeed.intersectingRanges(keys);
for (auto& r : ranges) {
for (auto& cfInfo : r.value()) {
auto feedCleanup = data->changeFeedCleanupDurable.find(cfInfo->id);
if (feedCleanup != data->changeFeedCleanupDurable.end() && cfInfo->removing && !cfInfo->destroyed) {
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
destroyedFeedIds.insert(cfInfo->id);
cfInfo->removing = false;
// because we now have a gap in the metadata, it's possible this feed was destroyed
cfInfo->possiblyDestroyed = true;
// Set refreshInProgress, so that if this actor is replaced by an expanded move actor, the new actor
// picks up the refresh
cfInfo->refreshInProgress = true;
// reset fetch versions because everything previously fetched was cleaned up
cfInfo->fetchVersion = invalidVersion;
cfInfo->durableFetchVersion = NotifiedVersion();
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfInfo->id)
.detail("Range", cfInfo->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfInfo->emptyVersion)
.detail("StopVersion", cfInfo->stopVersion)
.detail("FKID", fetchKeysID);
} else if (cfInfo->refreshInProgress) {
CODE_PROBE(true, "Racing refreshes for same change feed in fetch");
destroyedFeedIds.insert(cfInfo->id);
if (cfInfo->removing && !cfInfo->destroyed) {
missingFeeds.insert({ cfInfo->id, cfInfo->metadataVersion });
}
}
}
state std::vector<OverlappingChangeFeedEntry> feeds = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
// handle change feeds removed while fetching overlapping
while (removals.getFuture().isReady()) {
Key remove = waitNext(removals.getFuture());
for (int i = 0; i < feeds.size(); i++) {
if (feeds[i].rangeId == remove) {
swapAndPop(&feeds, i--);
// handle change feeds destroyed while fetching overlapping info
while (destroyedFeeds.getFuture().isReady()) {
Key destroyed = waitNext(destroyedFeeds.getFuture());
for (int i = 0; i < feedMetadata.feeds.size(); i++) {
if (feedMetadata.feeds[i].feedId == destroyed) {
missingFeeds.erase(destroyed); // feed definitely destroyed, no need to infer
swapAndPop(&feedMetadata.feeds, i--);
}
}
}
std::vector<Key> feedIds;
feedIds.reserve(feeds.size());
feedIds.reserve(feedMetadata.feeds.size());
// create change feed metadata if it does not exist
for (auto& cfEntry : feeds) {
auto cleanupEntry = data->changeFeedCleanupDurable.find(cfEntry.rangeId);
for (auto& cfEntry : feedMetadata.feeds) {
auto cleanupEntry = data->changeFeedCleanupDurable.find(cfEntry.feedId);
bool cleanupPending = cleanupEntry != data->changeFeedCleanupDurable.end();
feedIds.push_back(cfEntry.rangeId);
auto existingEntry = data->uidChangeFeed.find(cfEntry.rangeId);
auto existingEntry = data->uidChangeFeed.find(cfEntry.feedId);
bool existing = existingEntry != data->uidChangeFeed.end();
TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfEntry.rangeId)
.detail("RangeID", cfEntry.feedId)
.detail("Range", cfEntry.range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfEntry.emptyVersion)
.detail("StopVersion", cfEntry.stopVersion)
.detail("FeedMetadataVersion", cfEntry.feedMetadataVersion)
.detail("Existing", existing)
.detail("ExistingMetadataVersion", existing ? existingEntry->second->metadataVersion : invalidVersion)
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion)
.detail("FKID", fetchKeysID);
bool addMutationToLog = false;
Reference<ChangeFeedInfo> changeFeedInfo;
auto fid = destroyedFeedIds.find(cfEntry.rangeId);
if (fid != destroyedFeedIds.end()) {
refreshedFeedIds.insert(cfEntry.rangeId);
destroyedFeedIds.erase(fid);
}
if (!existing) {
CODE_PROBE(cleanupPending,
"Fetch change feed which is cleanup pending. This means there was a move away and a move back, "
@ -5872,24 +5920,51 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
changeFeedInfo->range = cfEntry.range;
changeFeedInfo->id = cfEntry.rangeId;
changeFeedInfo->id = cfEntry.feedId;
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
changeFeedInfo->stopVersion = cfEntry.stopVersion;
data->uidChangeFeed[cfEntry.rangeId] = changeFeedInfo;
data->uidChangeFeed[cfEntry.feedId] = changeFeedInfo;
auto rs = data->keyChangeFeed.modify(cfEntry.range);
for (auto r = rs.begin(); r != rs.end(); ++r) {
r->value().push_back(changeFeedInfo);
}
data->keyChangeFeed.coalesce(cfEntry.range.contents());
data->keyChangeFeed.coalesce(cfEntry.range);
addMutationToLog = true;
} else {
changeFeedInfo = existingEntry->second;
CODE_PROBE(cfEntry.feedMetadataVersion > data->version.get(),
"Change Feed fetched future metadata version");
auto fid = missingFeeds.find(cfEntry.feedId);
if (fid != missingFeeds.end()) {
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
.detail("StopVersion", changeFeedInfo->stopVersion)
.detail("PreviousMetadataVersion", changeFeedInfo->metadataVersion)
.detail("NewMetadataVersion", cfEntry.feedMetadataVersion)
.detail("FKID", fetchKeysID);
missingFeeds.erase(fid);
ASSERT(!changeFeedInfo->destroyed);
ASSERT(changeFeedInfo->removing);
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
changeFeedInfo->removing = false;
// reset fetch versions because everything previously fetched was cleaned up
changeFeedInfo->fetchVersion = invalidVersion;
changeFeedInfo->durableFetchVersion = NotifiedVersion();
addMutationToLog = true;
}
if (changeFeedInfo->destroyed) {
// race where multiple feeds fetched overlapping change feed, one realized feed was missing and marked
// it removed+destroyed, then this one fetched the same info
CODE_PROBE(true, "Change feed fetched and destroyed by other fetch while fetching metadata");
continue;
}
@ -5909,82 +5984,63 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
addMutationToLog = true;
}
}
feedIds.push_back(cfEntry.feedId);
addMutationToLog |= changeFeedInfo->updateMetadataVersion(cfEntry.feedMetadataVersion);
if (addMutationToLog) {
ASSERT(changeFeedInfo.isValid());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
Version logV = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(logV);
data->addMutationToMutationLog(
mLV,
MutationRef(
MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + cfEntry.rangeId.toString(),
changeFeedSSValue(cfEntry.range, changeFeedInfo->emptyVersion + 1, changeFeedInfo->stopVersion)));
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + cfEntry.feedId.toString(),
changeFeedSSValue(cfEntry.range,
changeFeedInfo->emptyVersion + 1,
changeFeedInfo->stopVersion,
changeFeedInfo->metadataVersion)));
// if we updated pop version, remove mutations
while (!changeFeedInfo->mutations.empty() &&
changeFeedInfo->mutations.front().version <= changeFeedInfo->emptyVersion) {
changeFeedInfo->mutations.pop_front();
}
if (BUGGIFY) {
data->maybeInjectTargetedRestart(logV);
}
}
}
CODE_PROBE(!refreshedFeedIds.empty(), "Feed refreshed between move away and move back");
CODE_PROBE(!destroyedFeedIds.empty(), "Feed destroyed between move away and move back");
for (auto& feedId : refreshedFeedIds) {
auto existingEntry = data->uidChangeFeed.find(feedId);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed ||
!existingEntry->second->refreshInProgress) {
CODE_PROBE(true, "feed refreshed");
for (auto& feed : missingFeeds) {
auto existingEntry = data->uidChangeFeed.find(feed.first);
ASSERT(existingEntry != data->uidChangeFeed.end());
ASSERT(existingEntry->second->removing);
ASSERT(!existingEntry->second->destroyed);
Version fetchedMetadataVersion = feedMetadata.getFeedMetadataVersion(existingEntry->second->range);
Version lastMetadataVersion = feed.second;
// Look for case where feed's range was moved away, feed was destroyed, and then feed's range was moved back.
// This happens where feed is removing, the fetch metadata is higher than the moved away version, and the feed
// isn't in the fetched response. In that case, the feed must have been destroyed between lastMetadataVersion
// and fetchedMetadataVersion
if (lastMetadataVersion >= fetchedMetadataVersion) {
CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away");
continue;
}
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
// it
// We may just want to refactor this so updateStorage does explicit deletes based on
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
// Then we wouldn't have to reset anything here or above
// Do the mutation log update here instead of above to ensure we only add it back to the mutation log if we're
// sure it wasn't deleted in the metadata gap
Version metadataVersion = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(metadataVersion);
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + existingEntry->second->id.toString(),
changeFeedSSValue(existingEntry->second->range,
existingEntry->second->emptyVersion + 1,
existingEntry->second->stopVersion)));
TraceEvent(SevDebug, "PersistingResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", existingEntry->second->id)
.detail("Range", existingEntry->second->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", existingEntry->second->emptyVersion)
.detail("StopVersion", existingEntry->second->stopVersion)
.detail("FKID", fetchKeysID)
.detail("MetadataVersion", metadataVersion);
existingEntry->second->refreshInProgress = false;
}
for (auto& feedId : destroyedFeedIds) {
auto existingEntry = data->uidChangeFeed.find(feedId);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
CODE_PROBE(true, "feed refreshed but then destroyed elsewhere");
continue;
}
/*fmt::print("DBG: SS {} fetching feed {} was refreshed but not present!! assuming destroyed\n",
data->thisServerID.toString().substr(0, 4),
feedId.printable());*/
Version cleanupVersion = data->data().getLatestVersion();
CODE_PROBE(true, "Destroying change feed from fetch metadata"); //
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID)
.detail("RangeID", feedId)
.detail("RangeID", feed.first)
.detail("Range", existingEntry->second->range)
.detail("Version", cleanupVersion)
.detail("FKID", fetchKeysID);
if (g_network->isSimulated()) {
ASSERT(g_simulator.validationData.allDestroyedChangeFeedIDs.count(feedId.toString()));
// verify that the feed was actually destroyed and it's not an error in this inference logic
ASSERT(g_simulator.validationData.allDestroyedChangeFeedIDs.count(feed.first.toString()));
}
Key beginClearKey = feedId.withPrefix(persistChangeFeedKeys.begin);
Key beginClearKey = feed.first.withPrefix(persistChangeFeedKeys.begin);
auto& mLV = data->addVersionToMutationLog(cleanupVersion);
data->addMutationToMutationLog(mLV,
@ -5992,15 +6048,18 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feedId, 0),
changeFeedDurableKey(feedId, cleanupVersion)));
changeFeedDurableKey(feed.first, 0),
changeFeedDurableKey(feed.first, cleanupVersion)));
++data->counters.kvSystemClearRanges;
existingEntry->second->destroy(cleanupVersion);
data->changeFeedCleanupDurable[feedId] = cleanupVersion;
data->changeFeedCleanupDurable[feed.first] = cleanupVersion;
for (auto& it : data->changeFeedRemovals) {
it.second.send(feedId);
for (auto& it : data->changeFeedDestroys) {
it.second.send(feed.first);
}
if (BUGGIFY) {
data->maybeInjectTargetedRestart(cleanupVersion);
}
}
return feedIds;
@ -6013,7 +6072,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
KeyRange keys,
Version beginVersion,
Version endVersion,
PromiseStream<Key> removals,
PromiseStream<Key> destroyedFeeds,
std::vector<Key>* feedIds,
std::unordered_set<Key> newFeedIds) {
state std::unordered_map<Key, Version> feedMaxFetched;
@ -6042,7 +6101,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
loop {
Future<Version> nextFeed = Never();
if (!removals.getFuture().isReady()) {
if (!destroyedFeeds.getFuture().isReady()) {
bool done = true;
while (!feedFetches.empty()) {
if (feedFetches.begin()->second.isReady()) {
@ -6062,11 +6121,11 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
}
}
choose {
when(state Key remove = waitNext(removals.getFuture())) {
when(state Key destroyed = waitNext(destroyedFeeds.getFuture())) {
wait(delay(0));
feedFetches.erase(remove);
feedFetches.erase(destroyed);
for (int i = 0; i < feedIds->size(); i++) {
if ((*feedIds)[i] == remove) {
if ((*feedIds)[i] == destroyed) {
swapAndPop(feedIds, i--);
}
}
@ -6077,7 +6136,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
} catch (Error& e) {
if (!data->shuttingDown) {
data->changeFeedRemovals.erase(fetchKeysID);
data->changeFeedDestroys.erase(fetchKeysID);
}
throw;
}
@ -6090,6 +6149,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state Future<Void> warningLogger = logFetchKeysWarning(shard);
state const double startTime = now();
state Version fetchVersion = invalidVersion;
state PromiseStream<Key> destroyedFeeds;
state FetchKeysMetricReporter metricReporter(fetchKeysID,
startTime,
keys,
@ -6098,18 +6159,28 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
data->counters.bytesFetched,
data->counters.kvFetched);
// need to set this at the very start of the fetch, to handle any private change feed destroy mutations we get for
// this key range, that apply to change feeds we don't know about yet because their metadata hasn't been fetched yet
data->changeFeedDestroys[fetchKeysID] = destroyedFeeds;
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
// This allows adding->start() to be called inline with CSK.
try {
wait(data->coreStarted.getFuture() && delay(0));
// On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be skipped if
// added before latest version advances.
// To ensure this doesn't happen, we wait for version to increase by one if this fetchKeys was initiated by a
// changeServerKeys from restoreDurableState
// On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be skipped
// if added before latest version advances. To ensure this doesn't happen, we wait for version to increase by
// one if this fetchKeys was initiated by a changeServerKeys from restoreDurableState
if (data->version.get() == data->durableVersion.get()) {
wait(data->version.whenAtLeast(data->version.get() + 1));
wait(delay(0));
}
} catch (Error& e) {
if (!data->shuttingDown) {
data->changeFeedDestroys.erase(fetchKeysID);
}
throw e;
}
try {
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys, data->thisServerID);
@ -6120,9 +6191,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.detail("Version", data->version.get())
.detail("FKID", fetchKeysID);
state PromiseStream<Key> removals;
data->changeFeedRemovals[fetchKeysID] = removals;
state Future<std::vector<Key>> fetchCFMetadata = fetchChangeFeedMetadata(data, keys, removals, fetchKeysID);
state Future<std::vector<Key>> fetchCFMetadata =
fetchChangeFeedMetadata(data, keys, destroyedFeeds, fetchKeysID);
validate(data);
@ -6145,6 +6215,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
TraceEvent(SevDebug, "FetchKeysVersionSatisfied", data->thisServerID).detail("FKID", interval.pairID);
wait(data->fetchKeysParallelismFullLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holdingFullFKPL(data->fetchKeysParallelismFullLock);
wait(data->fetchKeysParallelismLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holdingFKPL(data->fetchKeysParallelismLock);
@ -6376,8 +6449,14 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// being recovered. Instead we wait for the updateStorage loop to commit something (and consequently also what
// we have written)
state Future<std::unordered_map<Key, Version>> feedFetchMain = dispatchChangeFeeds(
data, fetchKeysID, keys, 0, fetchVersion + 1, removals, &changeFeedsToFetch, std::unordered_set<Key>());
state Future<std::unordered_map<Key, Version>> feedFetchMain = dispatchChangeFeeds(data,
fetchKeysID,
keys,
0,
fetchVersion + 1,
destroyedFeeds,
&changeFeedsToFetch,
std::unordered_set<Key>());
state Future<Void> fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1);
state Future<Void> dataArrive = data->version.whenAtLeast(fetchVersion);
@ -6440,7 +6519,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
keys,
fetchVersion + 1,
shard->transferredVersion,
removals,
destroyedFeeds,
&changeFeedsToFetch,
newChangeFeeds);
@ -6494,7 +6573,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
}
}
data->changeFeedRemovals.erase(fetchKeysID);
data->changeFeedDestroys.erase(fetchKeysID);
shard->phase = AddingShard::Waiting;
@ -6519,6 +6598,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// Note that since it receives a pointer to FetchInjectionInfo, the thread does not leave this actor until this
// point.
// At this point change feed fetching and mutation injection is complete, so full fetch is finished.
holdingFullFKPL.release();
// Wait for the transferred version (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(feedTransferredVersion));
@ -6547,7 +6629,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.errorUnsuppressed(e)
.detail("Version", data->version.get());
if (!data->shuttingDown) {
data->changeFeedRemovals.erase(fetchKeysID);
data->changeFeedDestroys.erase(fetchKeysID);
}
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
if (shard->phase < AddingShard::FetchingCF) {
@ -6800,11 +6882,15 @@ void cleanUpChangeFeeds(StorageServer* data, const KeyRangeRef& keys, Version ve
auto feed = data->uidChangeFeed.find(f.first);
if (feed != data->uidChangeFeed.end()) {
feed->second->updateMetadataVersion(version);
feed->second->removing = true;
feed->second->refreshInProgress = false;
feed->second->moved(feed->second->range);
feed->second->newMutations.trigger();
}
if (BUGGIFY) {
data->maybeInjectTargetedRestart(durableVersion);
}
} else {
// if just part of feed's range is moved away
auto feed = data->uidChangeFeed.find(f.first);
@ -7425,7 +7511,7 @@ private:
.detail("Status", status);
// Because of data moves, we can get mutations operating on a change feed we don't yet know about, because
// the fetch hasn't started yet
// the metadata fetch hasn't started yet
bool createdFeed = false;
if (feed == data->uidChangeFeed.end() && status != ChangeFeedStatus::CHANGE_FEED_DESTROY) {
createdFeed = true;
@ -7457,6 +7543,9 @@ private:
}
data->keyChangeFeed.coalesce(changeFeedRange.contents());
}
if (feed != data->uidChangeFeed.end()) {
feed->second->updateMetadataVersion(currentVersion);
}
bool popMutationLog = false;
bool addMutationToLog = false;
@ -7518,22 +7607,29 @@ private:
feed->second->destroy(currentVersion);
data->changeFeedCleanupDurable[feed->first] = cleanupVersion;
if (BUGGIFY) {
data->maybeInjectTargetedRestart(cleanupVersion);
}
}
if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) {
for (auto& it : data->changeFeedRemovals) {
for (auto& it : data->changeFeedDestroys) {
it.second.send(changeFeedId);
}
}
if (addMutationToLog) {
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
Version logV = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(logV);
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + changeFeedId.toString(),
changeFeedSSValue(
feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
changeFeedSSValue(feed->second->range,
feed->second->emptyVersion + 1,
feed->second->stopVersion,
feed->second->metadataVersion)));
if (popMutationLog) {
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
@ -7541,6 +7637,9 @@ private:
changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, popVersion)));
}
if (BUGGIFY) {
data->maybeInjectTargetedRestart(logV);
}
}
} else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) &&
m.param1.startsWith(TenantMetadata::tenantMapPrivatePrefix)) {
@ -7753,6 +7852,10 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
}
}
if (data->maybeInjectDelay()) {
wait(delay(deterministicRandom()->random01() * 10.0));
}
while (data->byteSampleClearsTooLarge.get()) {
wait(data->byteSampleClearsTooLarge.onChange());
}
@ -8394,6 +8497,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
state std::vector<Key> updatedChangeFeeds(modifiedChangeFeeds.begin(), modifiedChangeFeeds.end());
state int curFeed = 0;
state int64_t durableChangeFeedMutations = 0;
while (curFeed < updatedChangeFeeds.size()) {
auto info = data->uidChangeFeed.find(updatedChangeFeeds[curFeed]);
if (info != data->uidChangeFeed.end()) {
@ -8412,6 +8516,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
// in the stream. We should fix this assert to be strictly > and re-enable it
ASSERT(it.version >= info->second->storageVersion);
info->second->storageVersion = it.version;
durableChangeFeedMutations++;
}
if (info->second->fetchVersion != invalidVersion && !info->second->removing) {
@ -8500,6 +8605,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
TraceEvent("RebootWhenDurableTriggered", data->thisServerID)
.detail("NewOldestVersion", newOldestVersion)
.detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
CODE_PROBE(true, "SS rebooting after durable");
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this
// process) never sets durableInProgress, we should set durableInProgress before send the
// please_reboot() error. Otherwise, in the race situation when storage server receives both reboot and
@ -8571,6 +8677,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
}
}
data->counters.changeFeedMutationsDurable += durableChangeFeedMutations;
durableInProgress.send(Void());
wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to
// shut down, so delay to check for cancellation
@ -8646,7 +8754,8 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
// ASSERT( self->debug_inApplyUpdate );
ASSERT(!keys.empty());
auto& mLV = self->addVersionToMutationLog(self->data().getLatestVersion());
Version logV = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(logV);
KeyRange availableKeys = KeyRangeRef(persistShardAvailableKeys.begin.toString() + keys.begin.toString(),
persistShardAvailableKeys.begin.toString() + keys.end.toString());
@ -8682,6 +8791,10 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
.detail("DeleteVersion", mLV.version + 1);
}
}
if (BUGGIFY) {
self->maybeInjectTargetedRestart(logV);
}
}
void updateStorageShard(StorageServer* data, StorageServerShard shard) {
@ -8718,7 +8831,8 @@ void updateStorageShard(StorageServer* data, StorageServerShard shard) {
void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) {
ASSERT(!keys.empty());
auto& mLV = self->addVersionToMutationLog(self->data().getLatestVersion());
Version logV = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(logV);
KeyRange assignedKeys = KeyRangeRef(persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
persistShardAssignedKeys.begin.toString() + keys.end.toString());
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
@ -8735,6 +8849,10 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
assignedKeys.end,
endAssigned ? LiteralStringRef("1") : LiteralStringRef("0")));
}
if (BUGGIFY) {
self->maybeInjectTargetedRestart(logV);
}
}
void StorageServerDisk::clearRange(KeyRangeRef keys) {
@ -9138,13 +9256,15 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) {
Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin);
KeyRange changeFeedRange;
Version popVersion, stopVersion;
std::tie(changeFeedRange, popVersion, stopVersion) = decodeChangeFeedSSValue(changeFeeds[feedLoc].value);
Version popVersion, stopVersion, metadataVersion;
std::tie(changeFeedRange, popVersion, stopVersion, metadataVersion) =
decodeChangeFeedSSValue(changeFeeds[feedLoc].value);
TraceEvent(SevDebug, "RestoringChangeFeed", data->thisServerID)
.detail("RangeID", changeFeedId)
.detail("Range", changeFeedRange)
.detail("StopVersion", stopVersion)
.detail("PopVer", popVersion);
.detail("PopVer", popVersion)
.detail("MetadataVersion", metadataVersion);
Reference<ChangeFeedInfo> changeFeedInfo(new ChangeFeedInfo());
changeFeedInfo->range = changeFeedRange;
changeFeedInfo->id = changeFeedId;
@ -9152,6 +9272,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
changeFeedInfo->storageVersion = version;
changeFeedInfo->emptyVersion = popVersion - 1;
changeFeedInfo->stopVersion = stopVersion;
changeFeedInfo->metadataVersion = metadataVersion;
data->uidChangeFeed[changeFeedId] = changeFeedInfo;
auto rs = data->keyChangeFeed.modify(changeFeedRange);
for (auto r = rs.begin(); r != rs.end(); ++r) {

View File

@ -385,7 +385,9 @@ ACTOR Future<Reference<TestWorkload>> getWorkloadIface(WorkloadRequest work,
wcx.sharedRandomNumber = work.sharedRandomNumber;
workload = IWorkloadFactory::create(testName.toString(), wcx);
if (workload) {
wait(workload->initialized());
}
auto unconsumedOptions = checkAllOptionsConsumed(workload ? workload->options : VectorRef<KeyValueRef>());
if (!workload || unconsumedOptions.size()) {

View File

@ -237,57 +237,64 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
while (timeTravelIt != timeTravelChecks.end() && currentTime >= timeTravelIt->first) {
state OldRead oldRead = timeTravelIt->second;
timeTravelChecksMemory -= oldRead.oldResult.expectedSize();
// advance iterator before doing read, so if it gets error we don't retry it
timeTravelIt = timeTravelChecks.erase(timeTravelIt);
if (prevPurgeVersion == -1) {
prevPurgeVersion = oldRead.v;
}
// advance iterator before doing read, so if it gets error we don't retry it
try {
// before doing read, purge just before read version
state Version newPurgeVersion = 0;
state bool doPurging = allowPurging && deterministicRandom()->random01() < 0.5;
if (doPurging) {
CODE_PROBE(true, "BGV considering purge");
Version maxPurgeVersion = oldRead.v;
for (auto& it : timeTravelChecks) {
maxPurgeVersion = std::min(it.second.v, maxPurgeVersion);
}
if (prevPurgeVersion < maxPurgeVersion) {
CODE_PROBE(true, "BGV doing purge");
newPurgeVersion = deterministicRandom()->randomInt64(prevPurgeVersion, maxPurgeVersion);
prevPurgeVersion = std::max(prevPurgeVersion, newPurgeVersion);
if (BGV_DEBUG) {
fmt::print("BGV Purging @ {0}\n", newPurgeVersion);
}
try {
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, {}, false));
if (BGV_DEBUG) {
fmt::print("BGV Purged @ {0}, waiting\n", newPurgeVersion);
}
wait(cx->waitPurgeGranulesComplete(purgeKey));
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
// purging shouldn't error, it should retry.
if (BGV_DEBUG) {
fmt::print("Unexpected error {0} purging @ {1}!\n", e.name(), newPurgeVersion);
}
ASSERT(false);
}
CODE_PROBE(true, "BGV purge complete");
if (BGV_DEBUG) {
fmt::print("BGV Purge complete @ {0}\n", newPurgeVersion);
}
self->purges++;
} else {
doPurging = false;
}
}
// do time travel read
try {
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, oldRead.v));
if (!compareFDBAndBlob(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, BGV_DEBUG)) {
self->mismatches++;
}
self->timeTravelReads++;
if (doPurging) {
wait(self->killBlobWorkers(cx, self));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
try {
Version minSnapshotVersion = newPurgeVersion;
for (auto& it : versionRead.second) {
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
}
}
} catch (Error& e) {
fmt::print("Error TT: {0}\n", e.name());
if (e.code() == error_code_blob_granule_transaction_too_old) {
self->timeTravelTooOld++;
// TODO: add debugging info for when this is a failure
@ -297,6 +304,51 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
oldRead.v);
}
}
// if purged just before read, verify that purge cleaned up data by restarting blob workers and
// reading older than the purge version
if (doPurging) {
wait(self->killBlobWorkers(cx, self));
if (BGV_DEBUG) {
fmt::print("BGV Reading post-purge [{0} - {1}) @ {2}\n",
oldRead.range.begin.printable(),
oldRead.range.end.printable(),
prevPurgeVersion);
}
// ensure purge version exactly is still readable
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead1 =
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
if (BGV_DEBUG) {
fmt::print("BGV Post-purge first read:\n");
printGranuleChunks(versionRead1.second);
}
try {
// read at purgeVersion - 1, should NOT be readable
Version minSnapshotVersion = newPurgeVersion;
for (auto& it : versionRead1.second) {
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
}
if (BGV_DEBUG) {
fmt::print("BGV Reading post-purge again [{0} - {1}) @ {2}\n",
oldRead.range.begin.printable(),
oldRead.range.end.printable(),
minSnapshotVersion - 1);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead2 =
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1));
if (BGV_DEBUG) {
fmt::print("BGV ERROR: data not purged! Read successful!!\n");
printGranuleChunks(versionRead2.second);
}
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
CODE_PROBE(true, "BGV verified too old after purge");
}
}
}
// pick a random range
@ -471,6 +523,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// For some reason simulation is still passing when this fails?.. so assert for now
ASSERT(result);
// FIXME: if doPurging was set, possibly do one last purge here, and verify it succeeds with no errors
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) {
CODE_PROBE(true, "BGV clearing database and awaiting merge");
wait(clearAndAwaitMerge(cx, normalKeys));

View File

@ -0,0 +1,767 @@
/*
* ChangeFeedOperations.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/Util.h"
#include "flow/serialize.h"
#include <cstring>
#include <limits>
#include "flow/actorcompiler.h" // This must be the last #include.
// enable to debug specific operations for a given change feed
#define DEBUG_KEY ""_sr
#define DEBUG_CF(feedKey) (feedKey.printable() == DEBUG_KEY)
ACTOR Future<Void> doPop(Database cx, Key key, Key feedID, Version version, Version* doneOut) {
wait(cx->popChangeFeedMutations(feedID, version));
if (*doneOut < version) {
*doneOut = version;
}
if (DEBUG_CF(key)) {
fmt::print("DBG) {0} Popped through {1}\n", key.printable(), version);
}
// TODO: could strengthen pop checking by validating that a read immediately after the pop completes has no data
return Void();
}
struct FeedTestData : ReferenceCounted<FeedTestData>, NonCopyable {
Key key;
KeyRange keyRange;
Key feedID;
int nextVal;
Future<Void> liveReader;
bool lastCleared = false;
std::vector<Future<Void>> pops;
Version poppingVersion;
Version poppedVersion;
Optional<Version> stopVersion;
bool destroying;
bool destroyed;
bool complete;
int popWindow;
int popDelayWindow;
std::deque<std::pair<Version, Optional<Value>>> writesByVersion;
// these were all committed
std::deque<std::pair<Version, Optional<Value>>> pendingCheck;
NotifiedVersion checkVersion;
FeedTestData(Key key, bool doPops)
: key(key), keyRange(KeyRangeRef(key, keyAfter(key))), feedID(key.withPrefix(LiteralStringRef("CF"))), nextVal(0),
lastCleared(false), poppingVersion(0), poppedVersion(0), destroying(false), destroyed(false), complete(false),
checkVersion(0) {
if (doPops) {
popWindow = deterministicRandom()->randomExp(1, 8);
popDelayWindow = deterministicRandom()->randomInt(0, 2) * deterministicRandom()->randomExp(1, 4);
} else {
popWindow = -1;
popDelayWindow = -1;
}
}
Value nextValue() {
std::string v = std::to_string(nextVal);
nextVal++;
return Value(v);
}
void update(Version version, Optional<Value> value) {
if (!stopVersion.present()) {
// if feed is stopped, value should not get read
writesByVersion.push_back({ version, value });
pendingCheck.push_back(writesByVersion.back());
checkVersion.set(version);
}
}
void testComplete() {
complete = true;
checkVersion.set(checkVersion.get() + 1);
}
void pop(Database cx, Version v) {
if (DEBUG_CF(key)) {
fmt::print("DBG) {0} Popping through {1}\n", key.printable(), v);
}
ASSERT(poppingVersion < v);
poppingVersion = v;
while (!writesByVersion.empty() && v > writesByVersion.front().first) {
writesByVersion.pop_front();
}
while (!pendingCheck.empty() && v > pendingCheck.front().first) {
pendingCheck.pop_front();
}
pops.push_back(doPop(cx, key, feedID, v, &poppedVersion));
}
};
static void rollbackFeed(Key key,
std::deque<Standalone<MutationsAndVersionRef>>& buffered,
Version version,
MutationRef rollbackMutation) {
Version rollbackVersion;
BinaryReader br(rollbackMutation.param2, Unversioned());
br >> rollbackVersion;
TraceEvent("ChangeFeedRollback").detail("Key", key).detail("Ver", version).detail("RollbackVer", rollbackVersion);
if (DEBUG_CF(key)) {
fmt::print("DBG) {0} Rolling back {1} -> {2}\n", key.printable(), version, rollbackVersion);
}
while (!buffered.empty() && buffered.back().version > rollbackVersion) {
TraceEvent("ChangeFeedRollbackVer").detail("Ver", buffered.back().version);
buffered.pop_back();
}
}
static void checkNextResult(Key key,
std::deque<Standalone<MutationsAndVersionRef>>& buffered,
std::deque<std::pair<Version, Optional<Value>>>& checkData) {
// First asserts are checking data is in the form the test is supposed to produce
ASSERT(!buffered.empty());
ASSERT(buffered.front().mutations.size() == 1);
ASSERT(buffered.front().mutations[0].param1 == key);
// Below asserts are correctness of change feed invariants.
// Handle case where txn retried and wrote same value twice. checkData's version is the committed one, so the same
// update may appear at an earlier version. This is fine, as long as it then actually appears at the committed
// version
// TODO: could strengthen this check a bit and only allow it to appear at the lower version if the txn retried on
// commit_unknown_result?
if (checkData.front().first < buffered.front().version) {
fmt::print("ERROR. {0} Check version {1} != {2}.\n Check: {3} {4}\n Buffered: {5} {6}\n",
key.printable(),
checkData.front().first,
buffered.front().version,
checkData.front().second.present() ? "SET" : "CLEAR",
checkData.front().second.present() ? checkData.front().second.get().printable()
: keyAfter(key).printable(),
buffered.front().mutations[0].type == MutationRef::SetValue ? "SET" : "CLEAR",
buffered.front().mutations[0].param2.printable());
}
ASSERT(checkData.front().first >= buffered.front().version);
if (checkData.front().second.present()) {
ASSERT(buffered.front().mutations[0].type == MutationRef::SetValue);
ASSERT(buffered.front().mutations[0].param2 == checkData.front().second.get());
} else {
ASSERT(buffered.front().mutations[0].type == MutationRef::ClearRange);
ASSERT(buffered.front().mutations[0].param2 == keyAfter(key));
}
if (checkData.front().first == buffered.front().version) {
checkData.pop_front();
}
buffered.pop_front();
}
ACTOR Future<Void> liveReader(Database cx, Reference<FeedTestData> data, Version begin) {
state Version lastCheckVersion = 0;
state Version nextCheckVersion = 0;
state std::deque<Standalone<MutationsAndVersionRef>> buffered;
state Reference<ChangeFeedData> results = makeReference<ChangeFeedData>();
state Future<Void> stream =
cx->getChangeFeedStream(results, data->feedID, begin, std::numeric_limits<Version>::max(), data->keyRange);
try {
loop {
if (data->complete && data->pendingCheck.empty()) {
return Void();
}
nextCheckVersion = data->pendingCheck.empty() ? invalidVersion : data->pendingCheck.front().first;
choose {
when(Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(results->mutations.getFuture())) {
for (auto& it : res) {
if (it.mutations.size() == 1 && it.mutations.back().param1 == lastEpochEndPrivateKey) {
rollbackFeed(data->key, buffered, it.version, it.mutations.back());
} else {
if (it.mutations.size() == 0) {
// FIXME: THIS SHOULD NOT HAPPEN
// FIXME: these are also getting sent past stopVersion!!
} else {
if (data->stopVersion.present()) {
if (it.version > data->stopVersion.get()) {
fmt::print("DBG) {0} Read data with version {1} > stop version {2} ({3})\n",
data->key.printable(),
it.version,
data->stopVersion.get(),
it.mutations.size());
}
ASSERT(it.version <= data->stopVersion.get());
}
buffered.push_back(Standalone<MutationsAndVersionRef>(it));
if (DEBUG_CF(data->key)) {
fmt::print("DBG) {0} Live read through {1} ({2})\n",
data->key.printable(),
it.version,
it.mutations.size());
}
}
}
}
}
when(wait(data->checkVersion.whenAtLeast(lastCheckVersion + 1))) {
// wake loop and start new whenAtLeast whenever checkVersion is set
lastCheckVersion = data->checkVersion.get();
}
when(wait(data->pendingCheck.empty() ? Never()
: results->whenAtLeast(data->pendingCheck.front().first))) {
if (data->pendingCheck.empty() || data->pendingCheck.front().first > nextCheckVersion) {
// pendingCheck wasn't empty before whenAtLeast, and nextCheckVersion = the front version, so if
// either of these are true, the data was popped concurrently and we can move on to checking the
// next value
CODE_PROBE(true, "popped while waiting for whenAtLeast to check next value");
continue;
}
while (!buffered.empty() && buffered.front().version < data->poppingVersion) {
CODE_PROBE(true, "live reader ignoring data that is being popped");
buffered.pop_front();
}
if (buffered.empty()) {
if (data->poppingVersion < data->pendingCheck.front().first) {
fmt::print("DBG) {0} Buffered empty after ready for check, and data not popped! popped "
"{1}, popping {2}, check {3}\n",
data->key.printable(),
data->poppedVersion,
data->poppingVersion,
data->pendingCheck.front().first);
}
ASSERT(data->poppingVersion >= data->pendingCheck.front().first);
data->pendingCheck.pop_front();
} else {
Version v = buffered.front().version;
if (DEBUG_CF(data->key)) {
fmt::print("DBG) {0} Live checking through {1}\n",
data->key.printable(),
data->pendingCheck.front().first);
}
checkNextResult(data->key, buffered, data->pendingCheck);
if (DEBUG_CF(data->key)) {
fmt::print("DBG) {0} Live Checked through {1}\n", data->key.printable(), v);
}
if (data->popDelayWindow >= 0 && data->popWindow >= 0 &&
data->writesByVersion.size() == data->popWindow + data->popDelayWindow) {
data->pop(cx, data->writesByVersion[data->popWindow - 1].first + 1);
ASSERT(data->writesByVersion.size() == data->popDelayWindow);
}
}
}
}
}
} catch (Error& e) {
throw e;
}
}
ACTOR Future<Void> historicReader(Database cx,
Reference<FeedTestData> data,
Version begin,
Version end,
bool skipPopped) {
state std::deque<std::pair<Version, Optional<Value>>> checkData;
state std::deque<Standalone<MutationsAndVersionRef>> buffered;
state Reference<ChangeFeedData> results = makeReference<ChangeFeedData>();
state Future<Void> stream = cx->getChangeFeedStream(results, data->feedID, begin, end, data->keyRange);
state Version poppedVersionAtStart = data->poppedVersion;
if (DEBUG_CF(data->key)) {
fmt::print("DBG) {0} Starting historical read {1} - {2}\n", data->key.printable(), begin, end);
}
// TODO could cpu optimize this
for (auto& it : data->writesByVersion) {
if (it.first >= end) {
break;
}
if (it.first >= begin) {
checkData.push_back(it);
}
}
try {
loop {
Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(results->mutations.getFuture());
for (auto& it : res) {
if (it.mutations.size() == 1 && it.mutations.back().param1 == lastEpochEndPrivateKey) {
rollbackFeed(data->key, buffered, it.version, it.mutations.back());
} else {
if (it.mutations.size() == 0) {
// FIXME: THIS SHOULD NOT HAPPEN
// FIXME: these are also getting sent past stopVersion!!
} else {
if (data->stopVersion.present()) {
ASSERT(it.version <= data->stopVersion.get());
}
buffered.push_back(Standalone<MutationsAndVersionRef>(it));
}
}
}
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
throw;
}
}
if (skipPopped) {
while (!buffered.empty() && buffered.front().version < data->poppingVersion) {
// ignore data
buffered.pop_front();
}
while (!checkData.empty() && checkData.front().first < data->poppingVersion) {
checkData.pop_front();
}
}
while (!checkData.empty() && !buffered.empty()) {
checkNextResult(data->key, buffered, checkData);
}
// Change feed missing data it should have
ASSERT(checkData.empty());
// Change feed read extra data it shouldn't have
ASSERT(buffered.empty());
// check pop version of cursor
// TODO: this check might not always work if read is for old data and SS is way behind
// FIXME: this check doesn't work for now, probably due to above comment
/*if (data->poppingVersion != 0) {
ASSERT(results->popVersion >= poppedVersionAtStart && results->popVersion <= data->poppingVersion);
}*/
return Void();
}
enum Op {
CREATE_DELETE = 0,
READ = 1,
UPDATE_CLEAR = 2,
STOP = 3,
POP = 4,
OP_COUNT = 5 /* keep this last */
};
struct ChangeFeedOperationsWorkload : TestWorkload {
// test settings
double testDuration;
int operationsPerSecond;
int targetFeeds;
bool clientsDisjointKeyspace;
bool clearKeyWhenDestroy;
double clearFrequency;
int popMode;
int opWeights[Op::OP_COUNT];
int totalOpWeight;
Future<Void> client;
std::unordered_set<Key> usedKeys;
std::vector<Reference<FeedTestData>> data;
ChangeFeedOperationsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
testDuration = getOption(options, "testDuration"_sr, 60.0);
operationsPerSecond = getOption(options, "opsPerSecond"_sr, 100.0);
int64_t rand = wcx.sharedRandomNumber;
targetFeeds = deterministicRandom()->randomExp(1, 1 + rand % 10);
targetFeeds *= (0.8 + (deterministicRandom()->random01() * 0.4));
targetFeeds = std::max(1, targetFeeds / clientCount);
rand /= 10;
clientsDisjointKeyspace = rand % 2;
rand /= 2;
clearKeyWhenDestroy = rand % 2;
rand /= 2;
bool doStops = rand % 2;
rand /= 2;
bool noCreateDelete = rand % 10 == 0;
rand /= 10;
popMode = rand % 3; // 0=none, 1=read-driven, 2=op-driven
rand /= 3;
ASSERT(clientId >= 0);
ASSERT(clientId < clientCount);
ASSERT(clientCount < 255);
clearFrequency = deterministicRandom()->random01();
for (int i = 0; i < Op::OP_COUNT; i++) {
int randWeight = deterministicRandom()->randomExp(0, 5);
ASSERT(randWeight > 0);
opWeights[i] = randWeight;
}
if (!doStops) {
opWeights[Op::STOP] = 0;
}
if (noCreateDelete) {
opWeights[Op::CREATE_DELETE] = 0;
}
if (popMode != 2) {
opWeights[Op::POP] = 0;
}
std::string weightString = "|";
totalOpWeight = 0;
for (int i = 0; i < Op::OP_COUNT; i++) {
totalOpWeight += opWeights[i];
weightString += std::to_string(opWeights[i]) + "|";
}
TraceEvent("ChangeFeedOperationsInit")
.detail("TargetFeeds", targetFeeds)
.detail("DisjointKeyspace", clientsDisjointKeyspace)
.detail("ClearWhenDestroy", clearKeyWhenDestroy)
.detail("DoStops", doStops)
.detail("NoCreateDelete", noCreateDelete)
.detail("Weights", weightString);
}
Key unusedNewRandomKey() {
while (true) {
Key k = newRandomKey();
if (usedKeys.insert(k).second) {
return k;
}
}
}
Key newRandomKey() {
if (clientsDisjointKeyspace) {
double keyspaceRange = (1.0 / clientCount);
double randPartOfRange = deterministicRandom()->random01() * (keyspaceRange - 0.0001);
double randomDouble = clientId * keyspaceRange + 0.0001 + randPartOfRange;
return doubleToTestKey(randomDouble);
} else {
// this is kinda hacky but it guarantees disjoint keys per client
Key ret = doubleToTestKey(deterministicRandom()->random01());
std::string str = ret.toString();
str.back() = (uint8_t)clientId;
return Key(str);
}
}
// Pick op with weighted average
Op pickRandomOp() {
int r = deterministicRandom()->randomInt(0, totalOpWeight);
int i = 0;
while (i < Op::OP_COUNT && (opWeights[i] <= r || opWeights[i] == 0)) {
r -= opWeights[i];
i++;
}
ASSERT(i < Op::OP_COUNT);
return (Op)i;
}
ACTOR Future<Void> createNewFeed(Database cx, ChangeFeedOperationsWorkload* self) {
state Transaction tr(cx);
state Key key = self->unusedNewRandomKey();
state Reference<FeedTestData> feedData = makeReference<FeedTestData>(key, self->popMode == 1);
state Value initialValue = feedData->nextValue();
if (DEBUG_CF(key)) {
fmt::print("DBG) Creating {0}\n", key.printable());
}
loop {
try {
tr.set(key, initialValue);
wait(updateChangeFeed(&tr, feedData->feedID, ChangeFeedStatus::CHANGE_FEED_CREATE, feedData->keyRange));
wait(tr.commit());
Version createVersion = tr.getCommittedVersion();
if (DEBUG_CF(key)) {
fmt::print("DBG) Created {0} @ {1}\n", key.printable(), createVersion);
}
feedData->update(createVersion, initialValue);
feedData->liveReader = liveReader(cx, feedData, createVersion);
self->data.push_back(feedData);
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
std::string description() const override { return "ChangeFeedOperationsWorkload"; }
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
ACTOR Future<Void> _setup(Database cx, ChangeFeedOperationsWorkload* self) {
// create initial targetFeeds feeds
TraceEvent("ChangeFeedOperationsSetup").detail("InitialFeeds", self->targetFeeds).log();
state int i;
for (i = 0; i < self->targetFeeds; i++) {
wait(self->createNewFeed(cx, self));
}
TraceEvent("ChangeFeedOperationsSetupComplete");
return Void();
}
Future<Void> start(Database const& cx) override {
client = changeFeedOperationsClient(cx->clone(), this);
return delay(testDuration);
}
Future<bool> check(Database const& cx) override {
client = Future<Void>();
return _check(cx, this);
}
ACTOR Future<Void> checkFeed(Database cx, ChangeFeedOperationsWorkload* self, Reference<FeedTestData> feedData) {
state int popIdx;
feedData->testComplete();
if (DEBUG_CF(feedData->key)) {
fmt::print("Final check {0} waiting on live reader\n", feedData->key.printable());
}
// wait on live reader and pops to make sure they complete without error
wait(feedData->liveReader);
if (DEBUG_CF(feedData->key)) {
fmt::print("Final check {0} waiting on {1} pops\n", feedData->key.printable(), feedData->pops.size());
}
for (popIdx = 0; popIdx < feedData->pops.size(); popIdx++) {
wait(feedData->pops[popIdx]);
}
// do final check, read everything not popped
if (DEBUG_CF(feedData->key)) {
fmt::print("Final check {0} waiting on data check\n", feedData->key.printable(), feedData->pops.size());
}
wait(self->doRead(cx, feedData, feedData->writesByVersion.size()));
// ensure reading [0, poppedVersion) returns no results
if (feedData->poppedVersion > 0) {
if (DEBUG_CF(feedData->key)) {
fmt::print(
"Final check {0} waiting on read popped check\n", feedData->key.printable(), feedData->pops.size());
}
// FIXME: re-enable checking for popped data by changing skipPopped back to false!
wait(historicReader(cx, feedData, 0, feedData->poppedVersion, true));
}
return Void();
}
ACTOR Future<bool> _check(Database cx, ChangeFeedOperationsWorkload* self) {
TraceEvent("ChangeFeedOperationsCheck").detail("FeedCount", self->data.size()).log();
fmt::print("Checking {0} feeds\n", self->data.size()); // TODO REMOVE
state std::vector<Future<Void>> feedChecks;
for (int i = 0; i < self->data.size(); i++) {
if (self->data[i]->destroying) {
continue;
}
if (DEBUG_CF(self->data[i]->key)) {
fmt::print("Final check {0}\n", self->data[i]->key.printable());
}
feedChecks.push_back(self->checkFeed(cx, self, self->data[i]));
}
wait(waitForAll(feedChecks));
// FIXME: check that all destroyed feeds are actually destroyed?
TraceEvent("ChangeFeedOperationsCheckComplete");
return true;
}
void getMetrics(std::vector<PerfMetric>& m) override {}
ACTOR Future<Void> stopFeed(Database cx, Reference<FeedTestData> feedData) {
state Transaction tr(cx);
if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Stopping\n", feedData->key.printable());
}
loop {
try {
wait(updateChangeFeed(&tr, feedData->feedID, ChangeFeedStatus::CHANGE_FEED_STOP, feedData->keyRange));
wait(tr.commit());
Version stopVersion = tr.getCommittedVersion();
if (!feedData->stopVersion.present()) {
feedData->stopVersion = stopVersion;
}
if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Stopped @ {1}\n", feedData->key.printable(), stopVersion);
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
void popFeed(Database cx, Reference<FeedTestData> feedData) {
if (!feedData->writesByVersion.empty()) {
feedData->pop(cx, feedData->writesByVersion.front().first + 1);
}
}
ACTOR Future<Void> destroyFeed(Database cx, ChangeFeedOperationsWorkload* self, int feedIdx) {
state Reference<FeedTestData> feedData = self->data[feedIdx];
state Transaction tr(cx);
feedData->destroying = true;
if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Destroying\n", feedData->key.printable());
}
loop {
try {
wait(
updateChangeFeed(&tr, feedData->feedID, ChangeFeedStatus::CHANGE_FEED_DESTROY, feedData->keyRange));
if (self->clearKeyWhenDestroy) {
tr.clear(feedData->key);
}
wait(tr.commit());
feedData->destroyed = true;
// remove feed from list
ASSERT(self->data[feedIdx]->key == feedData->key);
swapAndPop(&self->data, feedIdx);
if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Destroyed @ {1}\n", feedData->key.printable(), tr.getCommittedVersion());
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> doRead(Database cx, Reference<FeedTestData> feedData, int targetReadWidth) {
if (feedData->writesByVersion.empty()) {
return Void();
}
Version beginVersion;
Version endVersion;
if (targetReadWidth >= feedData->writesByVersion.size()) {
beginVersion = feedData->writesByVersion.front().first;
endVersion = feedData->writesByVersion.back().first + 1;
} else {
// either up to or including end
int randStart = deterministicRandom()->randomInt(0, feedData->writesByVersion.size() - targetReadWidth);
beginVersion = feedData->writesByVersion[randStart].first;
int end = randStart + targetReadWidth;
if (end == feedData->writesByVersion.size()) {
endVersion = feedData->writesByVersion.back().first + 1;
} else {
// Make sure last included value (end version -1) is a committed version for checking
endVersion = feedData->writesByVersion[end].first + 1;
}
}
if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Reading @ {1} - {2}\n", feedData->key.printable(), beginVersion, endVersion);
}
// FIXME: this sometimes reads popped data!
wait(historicReader(cx, feedData, beginVersion, endVersion, true));
if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Read complete\n", feedData->key.printable());
}
return Void();
}
ACTOR Future<Void> doUpdateClear(Database cx,
ChangeFeedOperationsWorkload* self,
Reference<FeedTestData> feedData) {
state Transaction tr(cx);
state Optional<Value> updateValue;
// if value is already not set, don't do a clear, otherwise pick either
if (feedData->lastCleared || deterministicRandom()->random01() > self->clearFrequency) {
updateValue = feedData->nextValue();
if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Setting {1}\n", feedData->key.printable(), updateValue.get().printable());
}
} else if (DEBUG_CF(feedData->key)) {
fmt::print("DBG) {0} Clearing\n", feedData->key.printable());
}
loop {
try {
if (updateValue.present()) {
tr.set(feedData->key, updateValue.get());
} else {
tr.clear(feedData->key);
}
wait(tr.commit());
Version writtenVersion = tr.getCommittedVersion();
if (DEBUG_CF(feedData->key) && updateValue.present()) {
fmt::print("DBG) {0} Set {1} @ {2}\n",
feedData->key.printable(),
updateValue.get().printable(),
writtenVersion);
}
if (DEBUG_CF(feedData->key) && !updateValue.present()) {
fmt::print("DBG) {0} Cleared @ {1}\n", feedData->key.printable(), writtenVersion);
}
feedData->update(writtenVersion, updateValue);
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> changeFeedOperationsClient(Database cx, ChangeFeedOperationsWorkload* self) {
state double last = now();
loop {
state Future<Void> waitNextOp = poisson(&last, 1.0 / self->operationsPerSecond);
Op op = self->pickRandomOp();
int feedIdx = deterministicRandom()->randomInt(0, self->data.size());
if (op == Op::CREATE_DELETE) {
// bundle these together so random creates/deletes keep about the target number of feeds
if (deterministicRandom()->random01() < 0.5 || self->data.size() == 1) {
wait(self->createNewFeed(cx, self));
} else {
wait(self->destroyFeed(cx, self, feedIdx));
}
} else if (op == Op::READ) {
// relatively small random read
wait(self->doRead(cx, self->data[feedIdx], deterministicRandom()->randomExp(2, 8)));
} else if (op == Op::UPDATE_CLEAR) {
wait(self->doUpdateClear(cx, self, self->data[feedIdx]));
} else if (op == Op::STOP) {
wait(self->stopFeed(cx, self->data[feedIdx]));
} else if (op == Op::POP) {
self->popFeed(cx, self->data[feedIdx]);
} else {
ASSERT(false);
}
wait(waitNextOp);
}
}
};
WorkloadFactory<ChangeFeedOperationsWorkload> ChangeFeedOperationsWorkloadFactory("ChangeFeedOperations");

View File

@ -325,6 +325,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
TraceEvent("TestCancelDataMoveEnd").detail("DataMove", dataMove.toString());
}
TraceEvent("TestMoveShardStartMoveKeys").detail("DataMove", dataMoveId);
wait(moveKeys(cx,
dataMoveId,
keys,

View File

@ -215,7 +215,8 @@ struct SkewedReadWriteWorkload : ReadWriteCommon {
self->startReadWriteClients(cx, clients);
wait(timeout(waitForAll(clients), self->testDuration / self->skewRound, Void()));
clients.clear();
wait(delay(5.0) >> updateServerShards(cx, self));
wait(delay(5.0));
wait(updateServerShards(cx, self));
}
return Void();

View File

@ -173,6 +173,19 @@ public:
}
bool coinflip() { return (this->random01() < 0.5); }
// Picks a number between 2^minExp and 2^maxExp, but uniformly distributed over exponential buckets 2^n - 2^n+1
// For example, randomExp(0, 4) would have a 25% chance of returning 1, a 25% chance of returning 2-3, a 25% chance
// of returning 4-7, and a 25% chance of returning 8-15
// Similar in Expected Value to doing 1 << randomInt(minExp, maxExp+1), except numbers returned aren't just powers
// of 2
int randomExp(int minExp, int maxExp) {
if (minExp == maxExp) { // N=2, case
return 1 << minExp;
}
int val = 1 << this->randomInt(minExp, maxExp);
return this->randomInt(val, val * 2);
}
};
extern FILE* randLog;

View File

@ -174,6 +174,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, EncryptionAtRest);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, ShardEncodeLocationMetaData);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, Tenants);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, BlobGranuleFile);
};
template <>

View File

@ -1978,6 +1978,13 @@ Future<decltype(std::declval<Fun>()(std::declval<T>()).getValue())> runAfter(Fut
return res;
}
template <class T, class Fun>
auto operator>>=(Future<T> lhs, Fun&& rhs) -> Future<decltype(rhs(std::declval<T>()))> {
return runAfter(lhs, std::forward<Fun>(rhs));
}
/*
* NOTE: This implementation can't guarantee the doesn't really enforce the ACTOR execution order. See issue #7708
ACTOR template <class T, class U>
Future<U> runAfter(Future<T> lhs, Future<U> rhs) {
T val1 = wait(lhs);
@ -1985,15 +1992,11 @@ Future<U> runAfter(Future<T> lhs, Future<U> rhs) {
return res;
}
template <class T, class Fun>
auto operator>>=(Future<T> lhs, Fun&& rhs) -> Future<decltype(rhs(std::declval<T>()))> {
return runAfter(lhs, std::forward<Fun>(rhs));
}
template <class T, class U>
Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
return runAfter(lhs, rhs);
}
*/
/*
* IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit

View File

@ -130,8 +130,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/BlobGranuleVerifySmall.toml)
add_fdb_test(TEST_FILES fast/BlobGranuleVerifySmallClean.toml)
add_fdb_test(TEST_FILES fast/BlobGranuleVerifyAtomicOps.toml)
add_fdb_test(TEST_FILES fast/BlobGranuleVerifyCycle.toml)
add_fdb_test(TEST_FILES fast/BlobGranuleMoveVerifyCycle.toml)
add_fdb_test(TEST_FILES fast/CacheTest.toml)
add_fdb_test(TEST_FILES fast/CloggedSideband.toml)
add_fdb_test(TEST_FILES fast/CompressionUtilsUnit.toml)
@ -140,6 +139,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/CycleAndLock.toml)
add_fdb_test(TEST_FILES fast/CycleTest.toml)
add_fdb_test(TEST_FILES fast/ChangeFeeds.toml)
add_fdb_test(TEST_FILES fast/ChangeFeedOperations.toml)
add_fdb_test(TEST_FILES fast/DataLossRecovery.toml)
add_fdb_test(TEST_FILES fast/EncryptionOps.toml)
# TODO: fix failures and renable the test
@ -199,6 +199,8 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml IGNORE)
add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml IGNORE)
endif()
add_fdb_test(TEST_FILES rare/BlobGranuleVerifyAtomicOps.toml)
add_fdb_test(TEST_FILES rare/BlobGranuleVerifyCycle.toml)
add_fdb_test(TEST_FILES rare/CheckRelocation.toml)
add_fdb_test(TEST_FILES rare/ClogUnclog.toml)
add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml)

View File

@ -0,0 +1,48 @@
[configuration]
blobGranulesEnabled = true
allowDefaultTenant = false
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[knobs]]
bg_range_source = "blobRangeKeys"
[[test]]
testTitle = 'BlobGranuleMoveVerifyCycle'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 250.0
testDuration = 60.0
expectedRate = 0
[[test.workload]]
testName = 'RandomMoveKeys'
testDuration = 60.0
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 60.0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 60.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 60.0
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0

View File

@ -1,6 +1,8 @@
[configuration]
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [3, 4, 5]

View File

@ -3,7 +3,7 @@ blobGranulesEnabled = true
allowDefaultTenant = false
# FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [3, 4]
storageEngineExcludeTypes = [3, 4, 5]
[[knobs]]
bg_range_source = "blobRangeKeys"

View File

@ -0,0 +1,10 @@
[configuration]
allowDefaultTenant = false
# TODO add failure events, and then add a version that also supports randomMoveKeys
[[test]]
testTitle = 'ChangeFeedOperationsTest'
[[test.workload]]
testName = 'ChangeFeedOperations'

View File

@ -1,6 +1,8 @@
[configuration]
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]

View File

@ -1,6 +1,8 @@
[configuration]
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]

View File

@ -2,6 +2,8 @@
blobGranulesEnabled = true
allowDefaultTenant = false
allowDisablingTenants = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]

View File

@ -1,6 +1,8 @@
[configuration]
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]

View File

@ -1,6 +1,8 @@
[configuration]
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]