SS-focused cleanup

This commit is contained in:
Josh Slocum 2022-03-09 10:39:04 -06:00
parent b21d0943b9
commit 9dbb6d216e
3 changed files with 86 additions and 306 deletions

View File

@ -43,7 +43,7 @@
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include
#define BM_DEBUG true
#define BM_DEBUG false
// TODO add comments + documentation
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,

View File

@ -45,7 +45,7 @@
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // has to be last include
#define BW_DEBUG true
#define BW_DEBUG false
#define BW_REQUEST_DEBUG false
struct GranuleStartState {

View File

@ -23,6 +23,7 @@
#include <type_traits>
#include <unordered_map>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "flow/ActorCollection.h"
@ -392,8 +393,8 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
void moved(KeyRange range) {
auto toTrigger = moveTriggers.intersectingRanges(range);
for (auto triggerRange : toTrigger) {
for (auto triggerStream : triggerRange.cvalue()) {
for (auto& triggerRange : toTrigger) {
for (auto& triggerStream : triggerRange.cvalue()) {
if (triggerStream.second.canBeSet()) {
triggerStream.second.send(Void());
}
@ -410,7 +411,7 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
ASSERT(streamToRemove != triggerRange->cvalue().end());
triggerRange->value().erase(streamToRemove);
}
// TODO: could clean up on
// TODO: may be more cleanup possible here
}
};
@ -1203,7 +1204,7 @@ public:
auto& clientVersions = changeFeedClientVersions[addr];
Version minVersion = version.get();
for (auto& it : clientVersions) {
// printf("Blocked client %s @ %lld\n", it.first.toString().substr(0, 8).c_str(), it.second);
// fmt::print("Blocked client {0} @ {1}\n", it.first.toString().substr(0, 8), it.second);
minVersion = std::min(minVersion, it.second);
}
return minVersion;
@ -1892,16 +1893,11 @@ MutationsAndVersionRef filterMutations(Arena& arena,
return m;
}
// TODO REMOVE!!! when BG is correctness clean
#define DEBUG_SS_ID ""_sr
#define DEBUG_SS_CF_ID ""_sr
#define DEBUG_SS_STREAM_ID ""_sr
#define DEBUG_SS_CF_BEGIN_VERSION invalidVersion
#define DEBUG_SS_CFM(ssId, cfId, streamId, v) \
((ssId.toString().substr(0, 4) == DEBUG_SS_ID && cfId.printable().substr(0, 6) == DEBUG_SS_CF_ID && \
(v >= DEBUG_SS_CF_BEGIN_VERSION || latestVersion == DEBUG_SS_CF_BEGIN_VERSION)) || \
(streamId.toString().substr(0, 8) == DEBUG_SS_STREAM_ID))
// set this for VERY verbose logs on change feed SS reads
#define DEBUG_CF_TRACE false
// To easily find if a change feed read missed data. Set the CF to the feedId, the key to the missing key, and the
// version to the version the mutation is missing at.
#define DO_DEBUG_CF_MISSING false
#define DEBUG_CF_MISSING_CF ""_sr
#define DEBUG_CF_MISSING_KEY ""_sr
@ -1915,25 +1911,15 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
ChangeFeedStreamRequest req,
bool inverted,
bool atLatest,
UID streamUID
/*TODO REMOVE*/) {
UID streamUID /* for debugging */) {
state ChangeFeedStreamReply reply;
state ChangeFeedStreamReply memoryReply;
state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
state int remainingDurableBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
state Version startVersion = data->version.get();
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: SQ %s [%s - %s) %lld - %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.range.begin.printable().c_str(),
req.range.end.printable().c_str(),
req.begin,
req.end);
// TODO REMOVE
TraceEvent(SevDebug, "ChangeFeedMutations", data->thisServerID)
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedMutationsBegin", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
@ -1961,14 +1947,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state Reference<ChangeFeedInfo> feedInfo = feed->second;
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: got version %lld >= %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
data->version.get(),
req.begin);
}
// We must copy the mutationDeque when fetching the durable bytes in case mutations are popped from memory while
// waiting for the results
state Version dequeVersion = data->version.get();
@ -1976,22 +1954,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state Version emptyVersion = feedInfo->emptyVersion;
Version fetchStorageVersion = std::max(feedInfo->fetchVersion, feedInfo->durableFetchVersion.get());
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: SQ %s atLatest=%s, dequeVersion=%lld, emptyVersion=%lld, storageVersion=%lld, "
"durableVersion=%lld, "
"fetchStorageVersion=%lld (%lld, %lld)\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
atLatest ? "T" : "F",
dequeVersion,
feedInfo->emptyVersion,
feedInfo->storageVersion,
feedInfo->durableVersion,
fetchStorageVersion,
feedInfo->fetchVersion,
feedInfo->durableFetchVersion.get());
TraceEvent(SevDebug, "ChangeFeedMutationsDetails", data->thisServerID)
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedMutationsDetails", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
@ -2022,14 +1986,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
}
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: got %lld - %lld (%d) from memory\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
memoryReply.mutations.empty() ? invalidVersion : memoryReply.mutations.front().version,
memoryReply.mutations.empty() ? invalidVersion : memoryReply.mutations.back().version,
memoryReply.mutations.size());
}
}
state bool readDurable = feedInfo->durableVersion != invalidVersion && req.begin <= feedInfo->durableVersion;
@ -2044,23 +2000,10 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
ASSERT(req.begin <= feedInfo->fetchVersion);
TEST(true); // getChangeFeedMutations before fetched data durable
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: waiting on fetch durable up to %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
feedInfo->fetchVersion);
}
// Wait for next commit to write pending feed data to storage
wait(feedInfo->durableFetchVersion.whenAtLeast(feedInfo->fetchVersion));
// To let update storage finish
wait(delay(0));
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: got fetch durable up to %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
feedInfo->durableFetchVersion.get());
}
}
RangeResult res = wait(
data->storage.readRange(KeyRangeRef(changeFeedDurableKey(req.rangeID, std::max(req.begin, emptyVersion)),
@ -2097,23 +2040,22 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
memoryVerifyIdx++;
continue;
} else {
printf("ERROR: SS %s CF %s SQ %s has mutation at %lld in memory but not on disk (next disk is "
"%lld) "
"(emptyVersion=%lld, emptyBefore=%lld)!\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
memoryReply.mutations[memoryVerifyIdx].version,
version,
feedInfo->emptyVersion,
emptyVersion);
fmt::print("ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but not on disk (next disk "
"is {4}) (emptyVersion={5}, emptyBefore={6})!\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
memoryReply.mutations[memoryVerifyIdx].version,
version,
feedInfo->emptyVersion,
emptyVersion);
printf(" Memory: (%d)\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) {
if (it.type == MutationRef::SetValue) {
printf(" %s=\n", it.param1.printable().c_str());
fmt::print(" {}=\n", it.param1.printable());
} else {
printf(" %s - %s\n", it.param1.printable().c_str(), it.param2.printable().c_str());
fmt::print(" {} - {}\n", it.param1.printable(), it.param2.printable());
}
}
ASSERT(false);
@ -2128,25 +2070,24 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if (memoryVerifyIdx < memoryReply.mutations.size() &&
version == memoryReply.mutations[memoryVerifyIdx].version) {
// TODO: we could do some validation here too, but it's complicated because clears can get split
// We could do validation of mutations here too, but it's complicated because clears can get split
// and stuff
memoryVerifyIdx++;
}
} else if (memoryVerifyIdx < memoryReply.mutations.size() &&
version == memoryReply.mutations[memoryVerifyIdx].version) {
// TODO REMOVE debugging eventually
printf("ERROR: SS %s CF %s SQ %s has mutation at %lld in memory but all filtered out on disk!\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
version);
fmt::print("ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
version);
printf(" Memory: (%d)\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) {
if (it.type == MutationRef::SetValue) {
printf(" %s=\n", it.param1.printable().c_str());
fmt::print(" {}=\n", it.param1.printable().c_str());
} else {
printf(" %s - %s\n", it.param1.printable().c_str(), it.param2.printable().c_str());
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
}
}
ASSERT(false);
@ -2158,14 +2099,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
lastVersion = version;
lastKnownCommitted = knownCommittedVersion;
}
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: got %lld - %lld (%d) from disk\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
reply.mutations.empty() ? invalidVersion : reply.mutations.front().version,
reply.mutations.empty() ? invalidVersion : reply.mutations.back().version,
reply.mutations.size());
}
if (remainingDurableBytes > 0) {
reply.arena.dependsOn(memoryReply.arena);
auto it = memoryReply.mutations.begin();
@ -2178,21 +2111,9 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
// If still empty, that means disk results were filtered out, but skipped all memory results. Add an empty,
// either the last version from disk
if (reply.mutations.empty() && res.size()) {
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: adding empty from disk and memory %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
lastVersion);
}
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastVersion, lastKnownCommitted));
}
} else if (reply.mutations.empty() || reply.mutations.back().version < lastVersion) {
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: adding empty from disk %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
lastVersion);
}
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastVersion, lastKnownCommitted));
}
} else {
@ -2203,12 +2124,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
Version finalVersion = std::min(req.end - 1, dequeVersion);
if ((reply.mutations.empty() || reply.mutations.back().version < finalVersion) && remainingLimitBytes > 0 &&
remainingDurableBytes > 0) {
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: adding empty %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
finalVersion);
}
reply.mutations.push_back(
reply.arena, MutationsAndVersionRef(finalVersion, finalVersion == dequeVersion ? dequeKnownCommit : 0));
// if we add empty mutation after the last thing in memory, and didn't read from disk, gotAll is true
@ -2229,16 +2144,15 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
TEST(ok); // feed popped while valid read waiting
TEST(!ok); // feed popped while invalid read waiting
if (!ok) {
printf("SS %s: CF %s SQ %s popped after read! req.begin=%lld, emptyVersion=%lld, emptyBeforeRead=%lld, "
"atLatest=%s, minVersionSent=%lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.begin,
feedInfo->emptyVersion,
emptyVersion,
atLatest ? "T" : "F",
minVersion);
TraceEvent("ChangeFeedMutationsPopped", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("EmptyVersion", feedInfo->emptyVersion)
.detail("AtLatest", atLatest)
.detail("MinVersionSent", minVersion);
throw change_feed_popped();
}
}
@ -2246,7 +2160,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if (MUTATION_TRACKING_ENABLED) {
for (auto& mutations : reply.mutations) {
for (auto& m : mutations.mutations) {
DEBUG_MUTATION("ChangeFeedRead", mutations.version, m, data->thisServerID)
DEBUG_MUTATION("ChangeFeedSSRead", mutations.version, m, data->thisServerID)
.detail("ChangeFeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("ReqBegin", req.begin)
@ -2256,14 +2170,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: result %lld - %lld (%d)\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
reply.mutations.empty() ? invalidVersion : reply.mutations.front().version,
reply.mutations.empty() ? invalidVersion : reply.mutations.back().version,
reply.mutations.size());
// TODO REMOVE
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "ChangeFeedMutationsDone", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
@ -2292,36 +2199,36 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
if (!foundVersion || !foundKey) {
printf("ERROR: SS %s CF %s SQ %s missing %s @ %lld from request for [%s - %s) %lld - %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
foundVersion ? "key" : "version",
DEBUG_CF_MISSING_VERSION,
req.range.begin.printable().c_str(),
req.range.end.printable().c_str(),
req.begin,
req.end);
printf("ERROR: %d versions in response %lld - %lld:\n",
reply.mutations.size(),
reply.mutations.front().version,
reply.mutations.back().version);
fmt::print("ERROR: SS {0} CF {1} SQ {2} missing {3} @ {4} from request for [{5} - {6}) {7} - {8}\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
foundVersion ? "key" : "version",
DEBUG_CF_MISSING_VERSION,
req.range.begin.printable(),
req.range.end.printable(),
req.begin,
req.end);
fmt::print("ERROR: {0} versions in response {1} - {2}:\n",
reply.mutations.size(),
reply.mutations.front().version,
reply.mutations.back().version);
for (auto& it : reply.mutations) {
printf("ERROR: %lld (%d)%s\n",
it.version,
it.mutations.size(),
it.version == DEBUG_CF_MISSING_VERSION ? "<-------" : "");
fmt::print("ERROR: {0} ({1}){2}\n",
it.version,
it.mutations.size(),
it.version == DEBUG_CF_MISSING_VERSION ? "<-------" : "");
}
} else {
printf("DBG: SS %s CF %s SQ %s correct @ %lld from request for [%s - %s) %lld - %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
DEBUG_CF_MISSING_VERSION,
req.range.begin.printable().c_str(),
req.range.end.printable().c_str(),
req.begin,
req.end);
fmt::print("DBG: SS {0} CF {1} SQ {2} correct @ {3} from request for [{4} - {5}) {6} - {7}\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
DEBUG_CF_MISSING_VERSION,
req.range.begin.printable(),
req.range.end.printable(),
req.begin,
req.end);
}
}
@ -2391,7 +2298,6 @@ ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq
}
return Void();
}
printf("CFSQ %s Moved! %lld - %lld. sending WSS\n", streamUID.toString().substr(0, 8).c_str(), req.begin, req.end);
// DO NOT call req.reply.onReady before sending - we need to propagate this error through regardless of how far
// behind client is
req.reply.sendError(wrong_shard_server());
@ -2406,22 +2312,20 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
if (req.replyBufferSize <= 0) {
req.reply.setByteLimit(SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES);
} else {
req.reply.setByteLimit(req.replyBufferSize);
req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES));
}
wait(delay(0, TaskPriority::DefaultEndpoint));
try {
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: got CFSQ %s [%s - %s) %lld - %lld, crp=%s\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.range.begin.printable().c_str(),
req.range.end.printable().c_str(),
req.begin,
req.end,
req.canReadPopped ? "T" : "F");
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedMutationsBegin", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("CanReadPopped", req.canReadPopped);
}
data->activeFeedQueries++;
@ -2434,14 +2338,6 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
ASSERT(emptyInitialReply.minStreamVersion == invalidVersion);
req.reply.send(emptyInitialReply);
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, streamUID, req.begin)) {
printf("CFM: SS %s CF %s: CFSQ %s send empty initial version %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.begin - 1);
}
loop {
Future<Void> onReady = req.reply.onReady();
if (atLatest && !onReady.isReady() && !removeUID) {
@ -2462,14 +2358,6 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
ChangeFeedStreamReply feedReply = _feedReply.first;
bool gotAll = _feedReply.second;
// TODO REMOVE debugging
if (feedReply.mutations.size() == 0) {
printf("CFM: SS %s CF %s: CFSQ %s empty results for begin=%lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.begin);
}
ASSERT(feedReply.mutations.size() > 0);
req.begin = feedReply.mutations.back().version + 1;
if (!atLatest && gotAll) {
@ -2515,7 +2403,6 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
when(wait(feed->second->newMutations.onTrigger())) {}
when(wait(req.end == std::numeric_limits<Version>::max() ? Future<Void>(Never())
: data->version.whenAtLeast(req.end))) {}
when(wait(delay(5.0))) {} // TODO REMOVE this once empty version logic is fully implemented
}
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end() || feed->second->removing) {
@ -2529,8 +2416,6 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
}
} catch (Error& e) {
data->activeFeedQueries--;
// TODO REMOVE
printf("CFSQ %s got error %s\n", streamUID.toString().substr(0, 8).c_str(), e.name());
auto it = data->changeFeedClientVersions.find(req.reply.getEndpoint().getPrimaryAddress());
if (it != data->changeFeedClientVersions.end()) {
if (removeUID) {
@ -4750,8 +4635,7 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
KeyRange range,
Version emptyVersion,
Version beginVersion,
Version endVersion,
bool existing) {
Version endVersion) {
state Version startVersion = beginVersion;
startVersion = std::max(startVersion, emptyVersion + 1);
@ -4760,17 +4644,6 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
ASSERT(startVersion >= 0);
// TODO REMOVE
TraceEvent(SevDebug, "FetchChangeFeedStarting", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion)
.detail("BeginVersion", beginVersion)
.detail("EmptyVersion", emptyVersion)
.detail("FetchVersion", changeFeedInfo->fetchVersion)
.detail("DurableFetchVersion", changeFeedInfo->durableFetchVersion.get());
if (startVersion >= endVersion) {
TEST(true); // Change Feed popped before fetch
TraceEvent(SevDebug, "FetchChangeFeedNoOp", data->thisServerID)
@ -4785,7 +4658,6 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
state Future<Void> feed = data->cx->getChangeFeedStream(
feedResults, rangeId, startVersion, endVersion, range, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES, true);
// TODO remove debugging eventually?
state Version firstVersion = invalidVersion;
state Version lastVersion = invalidVersion;
state int64_t versionsFetched = 0;
@ -4862,7 +4734,6 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
for (auto& m : remoteResult[remoteLoc].mutations) {
DEBUG_MUTATION("ChangeFeedWriteMove", remoteVersion, m, data->thisServerID)
.detail("Range", range)
.detail("Existing", existing)
.detail("ChangeFeedID", rangeId);
}
}
@ -4885,7 +4756,6 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
for (auto& m : remoteResult[remoteLoc].mutations) {
DEBUG_MUTATION("ChangeFeedWriteMoveIgnore", remoteVersion, m, data->thisServerID)
.detail("Range", range)
.detail("Existing", existing)
.detail("ChangeFeedID", rangeId)
.detail("EmptyVersion", changeFeedInfo->emptyVersion);
}
@ -4963,7 +4833,6 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
}
}
// TODO REMOVE?
TraceEvent(SevDebug, "FetchChangeFeedDone", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
@ -4973,7 +4842,6 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
.detail("FirstFetchedVersion", firstVersion)
.detail("LastFetchedVersion", lastVersion)
.detail("VersionsFetched", versionsFetched)
.detail("Existing", existing)
.detail("Removed", changeFeedInfo->removing);
return lastVersion;
}
@ -4997,10 +4865,6 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
auto cleanupPending = data->changeFeedCleanupDurable.find(changeFeedInfo->id);
if (cleanupPending != data->changeFeedCleanupDurable.end()) {
/*printf("SS %s waiting for CF %s cleanup @ %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
changeFeedInfo->id.toString().substr(0, 6).c_str(),
cleanupPending->second);*/
TraceEvent(SevDebug, "FetchChangeFeedWaitCleanup", data->thisServerID)
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range.toString())
@ -5025,15 +4889,13 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
loop {
try {
// TODO clean up existing param for !existing
Version maxFetched = wait(fetchChangeFeedApplier(data,
changeFeedInfo,
changeFeedInfo->id,
changeFeedInfo->range,
changeFeedInfo->emptyVersion,
beginVersion,
endVersion,
false));
endVersion));
data->fetchingChangeFeeds.insert(changeFeedInfo->id);
return maxFetched;
} catch (Error& e) {
@ -5109,14 +4971,6 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, KeyR
// reset fetch versions because everything previously fetched was cleaned up
changeFeedInfo->fetchVersion = invalidVersion;
changeFeedInfo->durableFetchVersion = NotifiedVersion();
// TODO REMOVE
TraceEvent(SevDebug, "FetchedChangeFeedInfoReset", data->thisServerID)
.detail("RangeID", cfEntry.rangeId.printable())
.detail("Range", cfEntry.range.toString())
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfEntry.emptyVersion)
.detail("CleanupVersion", feedCleanup->second)
.detail("StopVersion", cfEntry.stopVersion);
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
// it
@ -5581,25 +5435,6 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// Wait for the transferred version (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(feedTransferredVersion));
// TODO if this works, remove all of the fetch version stuff
// Also wait on all fetched change feed data to become committed and durable
while (!feedFetchedVersions.empty()) {
auto feed = feedFetchedVersions.begin();
state Key feedId = feed->first;
Version maxFetched = feed->second;
feedFetchedVersions.erase(feed);
auto feedIt = data->uidChangeFeed.find(feedId);
/*if (feedIt != data->uidChangeFeed.end() && feedIt->second->durableFetchVersion.get() < maxFetched) {
wait(feedIt->second->durableFetchVersion.whenAtLeast(maxFetched));
// return to updateStorage
wait(delay(0));
}*/
if (feedIt != data->uidChangeFeed.end()) {
ASSERT(feedIt->second->durableFetchVersion.get() >= maxFetched);
}
}
ASSERT(data->shards[shard->keys.begin]->assigned() &&
data->shards[shard->keys.begin]->keys ==
shard->keys); // We aren't changing whether the shard is assigned
@ -5860,8 +5695,6 @@ void changeServerKeys(StorageServer* data,
}
if (!foundAssigned) {
// TODO REMOVE
Version durableVersion = data->data().getLatestVersion();
TraceEvent(SevDebug, "ChangeFeedCleanup", data->thisServerID)
.detail("FeedID", f.first)
@ -5880,19 +5713,6 @@ void changeServerKeys(StorageServer* data,
changeFeedDurableKey(f.first, 0),
changeFeedDurableKey(f.first, version)));
// do this cleanup later!!
/*auto rs = data->keyChangeFeed.modify(f.second);
for (auto r = rs.begin(); r != rs.end(); ++r) {
auto& feedList = r->value();
for (int i = 0; i < feedList.size(); i++) {
if (feedList[i]->id == f.first) {
swapAndPop(&feedList, i--);
}
}
}
data->keyChangeFeed.coalesce(f.second.contents());
*/
// We can't actually remove this change feed fully until the mutations clearing its data become durable.
// If the SS restarted at version R before the clearing mutations became durable at version D (R < D),
// then the restarted SS would restore the change feed clients would be able to read data and would miss
@ -5904,7 +5724,6 @@ void changeServerKeys(StorageServer* data,
feed->second->removing = true;
feed->second->moved(feed->second->range);
feed->second->newMutations.trigger();
// data->uidChangeFeed.erase(feed);
}
} else {
// if just part of feed's range is moved away
@ -6129,7 +5948,6 @@ private:
std::tie(changeFeedRange, popVersion, status) = decodeChangeFeedValue(m.param2);
auto feed = data->uidChangeFeed.find(changeFeedId);
// TODO REMOVE eventually
TraceEvent(SevDebug, "ChangeFeedPrivateMutation", data->thisServerID)
.detail("RangeID", changeFeedId.printable())
.detail("Range", changeFeedRange.toString())
@ -6878,13 +6696,6 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
}
if (info->second->fetchVersion != invalidVersion && !info->second->removing) {
// TODO REMOVE trace
TraceEvent(SevDebug, "UpdateStorageChangeFeedStart", data->thisServerID)
.detail("RangeID", info->second->id.printable())
.detail("Range", info->second->range.toString())
.detail("FetchVersion", info->second->fetchVersion)
.detail("StopVersion", info->second->stopVersion)
.detail("Removing", info->second->removing);
feedFetchVersions.push_back(std::pair(info->second->id, info->second->fetchVersion));
}
// handle case where fetch had version ahead of last in-memory mutation
@ -6958,15 +6769,6 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
// Don't update if the feed is pending cleanup. Either it will get cleaned up and destroyed, or it will get
// fetched again, where the fetch version will get reset.
if (info != data->uidChangeFeed.end() && !data->changeFeedCleanupDurable.count(info->second->id)) {
// TODO REMOVE trace
TraceEvent(SevDebug, "UpdateStorageChangeFeedDurable", data->thisServerID)
.detail("RangeID", info->second->id.printable())
.detail("Range", info->second->range.toString())
.detail("FetchVersion", info->second->fetchVersion)
.detail("OldDurableVersion", info->second->durableFetchVersion.get())
.detail("NewDurableVersion", feedFetchVersions[curFeed].second)
.detail("StopVersion", info->second->stopVersion)
.detail("Removing", info->second->removing);
if (feedFetchVersions[curFeed].second > info->second->durableFetchVersion.get()) {
info->second->durableFetchVersion.set(feedFetchVersions[curFeed].second);
}
@ -6986,11 +6788,6 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
auto feed = data->uidChangeFeed.find(cfCleanup->first);
ASSERT(feed != data->uidChangeFeed.end());
if (feed->second->removing) {
// TODO REMOVE
/*printf("DBG: SS %s Feed %s removing metadata @ %lld!\n",
data->thisServerID.toString().substr(0, 4).c_str(),
feed->first.printable().substr(0, 6).c_str(),
cfCleanup->second);*/
auto rs = data->keyChangeFeed.modify(feed->second->range);
for (auto r = rs.begin(); r != rs.end(); ++r) {
auto& feedList = r->value();
@ -7002,27 +6799,10 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
}
data->keyChangeFeed.coalesce(feed->second->range.contents());
// TODO REMOVE
TraceEvent(SevDebug, "UpdateStorageChangeFeedCleanup", data->thisServerID)
.detail("RangeID", feed->second->id.printable())
.detail("Range", feed->second->range.toString());
data->uidChangeFeed.erase(feed);
} else {
TEST(true); // Feed re-fetched after remove
// TODO REMOVE
/*printf("DBG: SS %s Feed %s not removing metadata @ %lld, must have been re-fetched after moved "
"away!\n",
data->thisServerID.toString().substr(0, 4).c_str(),
feed->first.printable().substr(0, 6).c_str(),
cfCleanup->second);*/
}
// TODO REMOVE
/*printf("DBG: SS %s Feed %s removing cleanup entry @ %lld!\n",
data->thisServerID.toString().substr(0, 4).c_str(),
feed->first.printable().substr(0, 6).c_str(),
cfCleanup->second);*/
cfCleanup = data->changeFeedCleanupDurable.erase(cfCleanup);
} else {
cfCleanup++;