Merge branch 'main' into vv

This commit is contained in:
Dan Lambright 2022-04-08 17:18:13 -04:00 committed by GitHub
commit 1b3b4166c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1555 additions and 399 deletions

View File

@ -466,6 +466,27 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db
}).extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_purge_blob_granules(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force) {
return (FDBFuture*)(DB(db)
->purgeBlobGranules(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)),
purge_version,
force)
.extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_wait_purge_granules_complete(FDBDatabase* db,
uint8_t const* purge_key_name,
int purge_key_name_length) {
return (
FDBFuture*)(DB(db)->waitPurgeGranulesComplete(StringRef(purge_key_name, purge_key_name_length)).extractPtr());
}
extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) {
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
}

View File

@ -299,6 +299,18 @@ DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDat
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_purge_blob_granules(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_wait_purge_granules_complete(FDBDatabase* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant,
FDBTransaction** out_transaction);

View File

@ -130,6 +130,25 @@ EmptyFuture Database::create_snapshot(FDBDatabase* db,
return EmptyFuture(fdb_database_create_snapshot(db, uid, uid_length, snap_command, snap_command_length));
}
KeyFuture Database::purge_blob_granules(FDBDatabase* db,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force) {
return KeyFuture(fdb_database_purge_blob_granules(db,
(const uint8_t*)begin_key.data(),
begin_key.size(),
(const uint8_t*)end_key.data(),
end_key.size(),
purge_version,
force));
}
EmptyFuture Database::wait_purge_granules_complete(FDBDatabase* db, std::string_view purge_key) {
return EmptyFuture(
fdb_database_wait_purge_granules_complete(db, (const uint8_t*)purge_key.data(), purge_key.size()));
}
// Tenant
Tenant::Tenant(FDBDatabase* db, const uint8_t* name, int name_length) {
if (fdb_error_t err = fdb_database_open_tenant(db, name, name_length, &tenant)) {

View File

@ -97,6 +97,7 @@ public:
private:
friend class Transaction;
friend class Database;
KeyFuture(FDBFuture* f) : Future(f) {}
};
@ -201,6 +202,14 @@ public:
int uid_length,
const uint8_t* snap_command,
int snap_command_length);
static KeyFuture purge_blob_granules(FDBDatabase* db,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force);
static EmptyFuture wait_purge_granules_complete(FDBDatabase* db, std::string_view purge_key);
};
class Tenant final {

View File

@ -2592,7 +2592,6 @@ TEST_CASE("Blob Granule Functions") {
}
// write some data
insert_data(db, create_data({ { "bg1", "a" }, { "bg2", "b" }, { "bg3", "c" } }));
// because wiring up files is non-trivial, just test the calls complete with the expected no_materialize error
@ -2709,6 +2708,42 @@ TEST_CASE("Blob Granule Functions") {
tr.reset();
break;
}
// do a purge + wait at that version to purge everything before originalReadVersion
fdb::KeyFuture purgeKeyFuture =
fdb::Database::purge_blob_granules(db, key("bg"), key("bh"), originalReadVersion, false);
fdb_check(wait_future(purgeKeyFuture));
const uint8_t* purgeKeyData;
int purgeKeyLen;
fdb_check(purgeKeyFuture.get(&purgeKeyData, &purgeKeyLen));
std::string purgeKey((const char*)purgeKeyData, purgeKeyLen);
fdb::EmptyFuture waitPurgeFuture = fdb::Database::wait_purge_granules_complete(db, purgeKey);
fdb_check(wait_future(waitPurgeFuture));
// re-read again at the purge version to make sure it is still valid
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
fdb::KeyValueArrayResult r =
tr.read_blob_granules(key("bg"), key("bh"), 0, originalReadVersion, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
tr.reset();
break;
}
}
int main(int argc, char** argv) {

View File

@ -260,6 +260,45 @@ def suspend(logger):
assert get_value_from_status_json(False, 'client', 'database_status', 'available')
def extract_version_epoch(cli_output):
return int(cli_output.split("\n")[-1].split(" ")[-1])
@enable_logging()
def targetversion(logger):
version1 = run_fdbcli_command('targetversion getepoch')
assert version1 == "Version epoch is unset"
version2 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version2))
assert version2 >= 0
# set the version epoch to the default value
logger.debug("setting version epoch to default")
run_fdbcli_command('targetversion add 0')
# get the version epoch
versionepoch1 = extract_version_epoch(run_fdbcli_command('targetversion getepoch'))
logger.debug("version epoch: {}".format(versionepoch1))
# make sure the version increased
version3 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version3))
assert version3 >= version2
# slightly increase the version epoch
versionepoch2 = extract_version_epoch(run_fdbcli_command("targetversion setepoch {}".format(versionepoch1 + 1000000)))
logger.debug("version epoch: {}".format(versionepoch2))
assert versionepoch2 == versionepoch1 + 1000000
# slightly decrease the version epoch
versionepoch3 = extract_version_epoch(run_fdbcli_command("targetversion add {}".format(-1000000)))
logger.debug("version epoch: {}".format(versionepoch3))
assert versionepoch3 == versionepoch2 - 1000000 == versionepoch1
# the versions should still be increasing
version4 = int(run_fdbcli_command('getversion'))
logger.debug("read version: {}".format(version4))
assert version4 >= version3
# clear the version epoch and make sure it is now unset
run_fdbcli_command("targetversion clearepoch")
version5 = run_fdbcli_command('targetversion getepoch')
assert version5 == "Version epoch is unset"
def get_value_from_status_json(retry, *args):
while True:
result = json.loads(run_fdbcli_command('status', 'json'))
@ -685,6 +724,9 @@ if __name__ == '__main__':
throttle()
triggerddteaminfolog()
tenants()
# TODO: similar to advanceversion, this seems to cause some issues, so disable for now
# This must go last, otherwise the version advancement can mess with the other tests
# targetversion()
else:
assert args.process_number > 1, "Process number should be positive"
coordinators()

View File

@ -52,7 +52,6 @@ mark_as_advanced(
if (GPERFTOOLS_FOUND)
add_library(gperftools UNKNOWN IMPORTED)
target_compile_definitions(gperftools PUBLIC USE_GPERFTOOLS)
set_target_properties(gperftools PROPERTIES
IMPORTED_LOCATION ${GPERFTOOLS_TCMALLOC_AND_PROFILER}
INTERFACE_INCLUDE_DIRECTORIES "${GPERFTOOLS_INCLUDE_DIR}")

View File

@ -29,6 +29,7 @@ set(FDBCLI_SRCS
TriggerDDTeamInfoLogCommand.actor.cpp
TssqCommand.actor.cpp
Util.actor.cpp
VersionEpochCommand.actor.cpp
linenoise/linenoise.h)
if(NOT WIN32)

View File

@ -0,0 +1,174 @@
/*
* VersionEpochCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "boost/lexical_cast.hpp"
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
const KeyRef versionEpochSpecialKey = LiteralStringRef("\xff\xff/management/version_epoch");
struct VersionInfo {
int64_t version;
int64_t expectedVersion;
};
ACTOR static Future<Optional<VersionInfo>> getVersionInfo(Reference<IDatabase> db) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
state Version rv = wait(safeThreadFutureToFuture(tr->getReadVersion()));
state ThreadFuture<Optional<Value>> versionEpochValFuture = tr->get(versionEpochKey);
Optional<Value> versionEpochVal = wait(safeThreadFutureToFuture(versionEpochValFuture));
if (!versionEpochVal.present()) {
return Optional<VersionInfo>();
}
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(versionEpochVal.get(), Unversioned());
int64_t expected = g_network->timer() * CLIENT_KNOBS->CORE_VERSIONSPERSECOND - versionEpoch;
return VersionInfo{ rv, expected };
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR static Future<Optional<int64_t>> getVersionEpoch(Reference<ITransaction> tr) {
loop {
try {
state ThreadFuture<Optional<Value>> versionEpochValFuture = tr->get(versionEpochSpecialKey);
Optional<Value> versionEpochVal = wait(safeThreadFutureToFuture(versionEpochValFuture));
return versionEpochVal.present() ? boost::lexical_cast<int64_t>(versionEpochVal.get().toString())
: Optional<int64_t>();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR Future<bool> versionEpochCommandActor(Reference<IDatabase> db, Database cx, std::vector<StringRef> tokens) {
if (tokens.size() <= 3) {
state Reference<ITransaction> tr = db->createTransaction();
if (tokens.size() == 1) {
Optional<VersionInfo> versionInfo = wait(getVersionInfo(db));
if (versionInfo.present()) {
int64_t diff = versionInfo.get().expectedVersion - versionInfo.get().version;
printf("Version: %" PRId64 "\n", versionInfo.get().version);
printf("Expected: %" PRId64 "\n", versionInfo.get().expectedVersion);
printf("Difference: %" PRId64 " (%.2fs)\n", diff, 1.0 * diff / CLIENT_KNOBS->VERSIONS_PER_SECOND);
} else {
printf("Version epoch is unset\n");
}
return true;
} else if (tokens.size() == 2 && tokencmp(tokens[1], "get")) {
Optional<int64_t> versionEpoch = wait(getVersionEpoch(db->createTransaction()));
if (versionEpoch.present()) {
printf("Current version epoch is %" PRId64 "\n", versionEpoch.get());
} else {
printf("Version epoch is unset\n");
}
return true;
} else if (tokens.size() == 2 && tokencmp(tokens[1], "disable")) {
// Clearing the version epoch means versions will no longer attempt
// to advance at the same rate as the clock. The current version
// will remain unchanged.
loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
Optional<int64_t> versionEpoch = wait(getVersionEpoch(db->createTransaction()));
if (!versionEpoch.present()) {
return true;
} else {
tr->clear(versionEpochSpecialKey);
wait(safeThreadFutureToFuture(tr->commit()));
}
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
} else if ((tokens.size() == 2 && tokencmp(tokens[1], "enable")) ||
(tokens.size() == 3 && tokencmp(tokens[1], "set"))) {
state int64_t v;
if (tokens.size() == 3) {
int n = 0;
if (sscanf(tokens[2].toString().c_str(), "%" SCNd64 "%n", &v, &n) != 1 || n != tokens[2].size()) {
printUsage(tokens[0]);
return false;
}
} else {
v = 0; // default version epoch
}
loop {
try {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
Optional<int64_t> versionEpoch = wait(getVersionEpoch(tr));
if (!versionEpoch.present() || (versionEpoch.get() != v && tokens.size() == 3)) {
tr->set(versionEpochSpecialKey, BinaryWriter::toValue(v, Unversioned()));
wait(safeThreadFutureToFuture(tr->commit()));
} else {
printf("Version epoch enabled. Run `versionepoch commit` to irreversibly jump to the target "
"version\n");
return true;
}
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
} else if (tokens.size() == 2 && tokencmp(tokens[1], "commit")) {
Optional<VersionInfo> versionInfo = wait(getVersionInfo(db));
if (versionInfo.present()) {
wait(advanceVersion(cx, versionInfo.get().expectedVersion));
} else {
printf("Must set the version epoch before committing it (see `versionepoch enable`)\n");
}
return true;
}
}
printUsage(tokens[0]);
return false;
}
CommandFactory versionEpochFactory(
"versionepoch",
CommandHelp("versionepoch [<enable|commit|set|disable> [EPOCH]]",
"Read or write the version epoch",
"If no arguments are specified, reports the offset between the expected version "
"and the actual version. Otherwise, enables, disables, or commits the version epoch. "
"Setting the version epoch can be irreversible since it can cause a large verison jump. "
"Thus, the version epoch must first by enabled with the enable or set command. This "
"causes a recovery. Once the version epoch has been set, versions may be given out at "
"a faster or slower rate to attempt to match the actual version to the expected version, "
"based on the version epoch. After setting the version, run the commit command to perform "
"a one time jump to the expected version. This is useful when there is a very large gap "
"between the current version and the expected version. Note that once a version jump has "
"occurred, it cannot be undone. Run this command without any arguments to see the current "
"and expected version."));
} // namespace fdb_cli

View File

@ -1646,6 +1646,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "versionepoch")) {
bool _result = wait(makeInterruptable(versionEpochCommandActor(db, localDb, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "kill")) {
getTransaction(db, managementTenant, tr, options, intrans);
bool _result = wait(makeInterruptable(killCommandActor(db, tr, tokens, &address_interface)));

View File

@ -210,6 +210,10 @@ ACTOR Future<bool> throttleCommandActor(Reference<IDatabase> db, std::vector<Str
ACTOR Future<bool> triggerddteaminfologCommandActor(Reference<IDatabase> db);
// tssq command
ACTOR Future<bool> tssqCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// versionepoch command
ACTOR Future<bool> versionEpochCommandActor(Reference<IDatabase> db, Database cx, std::vector<StringRef> tokens);
// targetversion command
ACTOR Future<bool> targetVersionCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
} // namespace fdb_cli

View File

@ -44,7 +44,18 @@ struct BlobWorkerInterface {
BlobWorkerInterface() {}
explicit BlobWorkerInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
void initEndpoints() {}
void initEndpoints() {
// TODO: specify endpoint priorities?
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(waitFailure.getReceiver());
streams.push_back(blobGranuleFileRequest.getReceiver());
streams.push_back(assignBlobRangeRequest.getReceiver());
streams.push_back(revokeBlobRangeRequest.getReceiver());
streams.push_back(granuleAssignmentsRequest.getReceiver());
streams.push_back(granuleStatusStreamRequest.getReceiver());
streams.push_back(haltBlobWorker.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
UID id() const { return myId; }
NetworkAddress address() const { return blobGranuleFileRequest.getEndpoint().getPrimaryAddress(); }
NetworkAddress stableAddress() const { return blobGranuleFileRequest.getEndpoint().getStableAddress(); }
@ -54,16 +65,22 @@ struct BlobWorkerInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar,
waitFailure,
blobGranuleFileRequest,
assignBlobRangeRequest,
revokeBlobRangeRequest,
granuleAssignmentsRequest,
granuleStatusStreamRequest,
haltBlobWorker,
locality,
myId);
// use adjusted endpoints
serializer(ar, myId, locality, waitFailure);
if (Archive::isDeserializing) {
blobGranuleFileRequest =
RequestStream<struct BlobGranuleFileRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
assignBlobRangeRequest =
RequestStream<struct AssignBlobRangeRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
revokeBlobRangeRequest =
RequestStream<struct RevokeBlobRangeRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(3));
granuleAssignmentsRequest =
RequestStream<struct GetGranuleAssignmentsRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
granuleStatusStreamRequest =
RequestStream<struct GranuleStatusStreamRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(5));
haltBlobWorker =
RequestStream<struct HaltBlobWorkerRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(6));
}
}
};

View File

@ -374,6 +374,9 @@ public:
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
Future<Key> purgeBlobGranules(KeyRange keyRange, Version purgeVersion, bool force = false);
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,

View File

@ -22,6 +22,7 @@
#define FDBCLIENT_FDBTYPES_H
#include <algorithm>
#include <cinttypes>
#include <set>
#include <string>
#include <vector>

View File

@ -159,6 +159,11 @@ public:
// Management API, create snapshot
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
// purge blob granules api. purgeBlobGranules is asynchronus, calling waitPurgeGranulesComplete after guarantees
// completion.
virtual ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) = 0;
virtual ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) = 0;
// Interface to manage shared state across multiple connections to the same Database
virtual ThreadFuture<DatabaseSharedState*> createSharedState() = 0;
virtual void setSharedState(DatabaseSharedState* p) = 0;

View File

@ -521,6 +521,38 @@ ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVer
});
}
ThreadFuture<Key> DLDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
if (!api->purgeBlobGranules) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->purgeBlobGranules(db,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const uint8_t* key;
int keyLength;
FdbCApi::fdb_error_t error = api->futureGetKey(f, &key, &keyLength);
ASSERT(!error);
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
return Key(KeyRef(key, keyLength), Arena());
});
}
ThreadFuture<Void> DLDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
if (!api->waitPurgeGranulesComplete) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->waitPurgeGranulesComplete(db, purgeKey.begin(), purgeKey.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// DLApi
// Loads the specified function from a dynamic library
@ -595,6 +627,15 @@ void DLApi::init() {
loadClientFunction(
&api->databaseCreateSnapshot, lib, fdbCPath, "fdb_database_create_snapshot", headerVersion >= 700);
loadClientFunction(
&api->purgeBlobGranules, lib, fdbCPath, "fdb_database_purge_blob_granules", headerVersion >= 710);
loadClientFunction(&api->waitPurgeGranulesComplete,
lib,
fdbCPath,
"fdb_database_wait_purge_granules_complete",
headerVersion >= 710);
loadClientFunction(
&api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710);
loadClientFunction(&api->tenantDestroy, lib, fdbCPath, "fdb_tenant_destroy", headerVersion >= 710);
@ -1466,6 +1507,17 @@ double MultiVersionDatabase::getMainThreadBusyness() {
return localClientBusyness;
}
ThreadFuture<Key> MultiVersionDatabase::purgeBlobGranules(const KeyRangeRef& keyRange,
Version purgeVersion,
bool force) {
auto f = dbState->db ? dbState->db->purgeBlobGranules(keyRange, purgeVersion, force) : ThreadFuture<Key>(Never());
return abortableFuture(f, dbState->dbVar->get().onChange);
}
ThreadFuture<Void> MultiVersionDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
auto f = dbState->db ? dbState->db->waitPurgeGranulesComplete(purgeKey) : ThreadFuture<Void>(Never());
return abortableFuture(f, dbState->dbVar->get().onChange);
}
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older

View File

@ -156,6 +156,16 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
FDBFuture* (*purgeBlobGranules)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*waitPurgeGranulesComplete)(FDBDatabase* db, uint8_t const* purge_key_name, int purge_key_name_length);
// Tenant
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
void (*tenantDestroy)(FDBTenant* tenant);
@ -440,6 +450,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;
@ -720,6 +733,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;

View File

@ -1568,6 +1568,12 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
std::make_unique<AdvanceVersionImpl>(
singleKeyRange(LiteralStringRef("min_required_commit_version"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<VersionEpochImpl>(
singleKeyRange(LiteralStringRef("version_epoch"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
@ -9327,3 +9333,86 @@ Future<Void> DatabaseContext::popChangeFeedMutations(Key rangeID, Version versio
Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
}
ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
KeyRange range,
Version purgeVersion,
bool force) {
state Database cx(db);
state Transaction tr(cx);
state Key purgeKey;
// FIXME: implement force
if (!force) {
throw unsupported_operation();
}
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Value purgeValue = blobGranulePurgeValueFor(purgeVersion, range, force);
tr.atomicOp(
addVersionStampAtEnd(blobGranulePurgeKeys.begin), purgeValue, MutationRef::SetVersionstampedKey);
tr.set(blobGranulePurgeChangeKey, deterministicRandom()->randomUniqueID().toString());
state Future<Standalone<StringRef>> fTrVs = tr.getVersionstamp();
wait(tr.commit());
Standalone<StringRef> vs = wait(fTrVs);
purgeKey = blobGranulePurgeKeys.begin.withSuffix(vs);
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for range [{0} - {1}) at version {2} registered {3}\n",
range.begin.printable(),
range.end.printable(),
purgeVersion,
purgeKey.printable());
}
break;
} catch (Error& e) {
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for range [{0} - {1}) at version {2} encountered error {3}\n",
range.begin.printable(),
range.end.printable(),
purgeVersion,
e.name());
}
wait(tr.onError(e));
}
}
return purgeKey;
}
Future<Key> DatabaseContext::purgeBlobGranules(KeyRange range, Version purgeVersion, bool force) {
return purgeBlobGranulesActor(Reference<DatabaseContext>::addRef(this), range, purgeVersion, force);
}
ACTOR Future<Void> waitPurgeGranulesCompleteActor(Reference<DatabaseContext> db, Key purgeKey) {
state Database cx(db);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> purgeVal = wait(tr->get(purgeKey));
if (!purgeVal.present()) {
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for {0} succeeded\n", purgeKey.printable());
}
return Void();
}
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for {0} watching\n", purgeKey.printable());
}
state Future<Void> watchFuture = tr->watch(purgeKey);
wait(tr->commit());
wait(watchFuture);
tr->reset();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
Future<Void> DatabaseContext::waitPurgeGranulesComplete(Key purgeKey) {
return waitPurgeGranulesCompleteActor(Reference<DatabaseContext>::addRef(this), purgeKey);
}

View File

@ -39,6 +39,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
MAX_COMMIT_BATCH_INTERVAL = std::min(MAX_COMMIT_BATCH_INTERVAL, MAX_READ_TRANSACTION_LIFE_VERSIONS/double(2*VERSIONS_PER_SECOND)); // Ensure that the proxy commits 2 times every MAX_READ_TRANSACTION_LIFE_VERSIONS, otherwise the master will not give out versions fast enough
init( ENABLE_VERSION_VECTOR, false );
init( ENABLE_VERSION_VECTOR_TLOG_UNICAST, false );
init( MAX_VERSION_RATE_MODIFIER, 0.1 );
init( MAX_VERSION_RATE_OFFSET, VERSIONS_PER_SECOND ); // If the calculated version is more than this amount away from the expected version, it will be clamped to this value. This prevents huge version jumps.
// TLogs
init( TLOG_TIMEOUT, 0.4 ); //cannot buggify because of availability
@ -113,6 +115,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( MAX_FORKED_PROCESS_OUTPUT, 1024 );
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
init( MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE, 1 );
init( MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE, 1 );
// Data distribution queue
init( HEALTH_POLL_TIME, 1.0 );

View File

@ -41,6 +41,8 @@ public:
bool ENABLE_VERSION_VECTOR_TLOG_UNICAST;
double MAX_COMMIT_BATCH_INTERVAL; // Each commit proxy generates a CommitTransactionBatchRequest at least this
// often, so that versions always advance smoothly
double MAX_VERSION_RATE_MODIFIER;
int64_t MAX_VERSION_RATE_OFFSET;
// TLogs
bool PEEK_USING_STREAMING;
@ -585,6 +587,12 @@ public:
// disk snapshot
int64_t MAX_FORKED_PROCESS_OUTPUT;
double SNAP_CREATE_MAX_TIMEOUT;
// Maximum number of storage servers a snapshot can fail to
// capture while still succeeding
int64_t MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE;
// Maximum number of coordinators a snapshot can fail to
// capture while still succeeding
int64_t MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE;
// Storage Metrics
double STORAGE_METRICS_AVERAGE_INTERVAL;

View File

@ -106,6 +106,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
{ "advanceversion",
singleKeyRange(LiteralStringRef("min_required_commit_version"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "versionepoch",
singleKeyRange(LiteralStringRef("version_epoch")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "profile",
KeyRangeRef(LiteralStringRef("profiling/"), LiteralStringRef("profiling0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
@ -1909,6 +1911,42 @@ Future<Optional<std::string>> AdvanceVersionImpl::commit(ReadYourWritesTransacti
return Optional<std::string>();
}
ACTOR static Future<RangeResult> getVersionEpochActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
ryw->getTransaction().setOption(FDBTransactionOptions::RAW_ACCESS);
Optional<Value> val = wait(ryw->getTransaction().get(versionEpochKey));
RangeResult result;
if (val.present()) {
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(val.get(), Unversioned());
ValueRef version(result.arena(), boost::lexical_cast<std::string>(versionEpoch));
result.push_back_deep(result.arena(), KeyValueRef(kr.begin, version));
}
return result;
}
VersionEpochImpl::VersionEpochImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<RangeResult> VersionEpochImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
ASSERT(kr == getKeyRange());
return getVersionEpochActor(ryw, kr);
}
Future<Optional<std::string>> VersionEpochImpl::commit(ReadYourWritesTransaction* ryw) {
auto versionEpoch =
ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandPrefix("versionepoch")].second;
if (versionEpoch.present()) {
int64_t epoch = BinaryReader::fromStringRef<int64_t>(versionEpoch.get(), Unversioned());
ryw->getTransaction().setOption(FDBTransactionOptions::LOCK_AWARE);
ryw->getTransaction().setOption(FDBTransactionOptions::RAW_ACCESS);
ryw->getTransaction().set(versionEpochKey, BinaryWriter::toValue(epoch, Unversioned()));
} else {
ryw->getTransaction().clear(versionEpochKey);
}
return Optional<std::string>();
}
ClientProfilingImpl::ClientProfilingImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
ACTOR static Future<RangeResult> ClientProfilingGetRangeActor(ReadYourWritesTransaction* ryw,

View File

@ -476,6 +476,15 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class VersionEpochImpl : public SpecialKeyRangeRWImpl {
public:
explicit VersionEpochImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class ClientProfilingImpl : public SpecialKeyRangeRWImpl {
public:
explicit ClientProfilingImpl(KeyRangeRef kr);

View File

@ -823,6 +823,7 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
const KeyRef coordinatorsKey = LiteralStringRef("\xff/coordinators");
const KeyRef logsKey = LiteralStringRef("\xff/logs");
const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCommitVersion");
const KeyRef versionEpochKey = LiteralStringRef("\xff/versionEpoch");
const KeyRef globalKeysPrefix = LiteralStringRef("\xff/globals");
const KeyRef lastEpochEndKey = LiteralStringRef("\xff/globals/lastEpochEnd");
@ -1155,9 +1156,9 @@ const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), Lite
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));
const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), LiteralStringRef("\xff\x02/bgs0"));
const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0"));
const KeyRangeRef blobGranulePruneKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0"));
const KeyRangeRef blobGranulePurgeKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0"));
const KeyRangeRef blobGranuleVersionKeys(LiteralStringRef("\xff\x02/bgv/"), LiteralStringRef("\xff\x02/bgv0"));
const KeyRef blobGranulePruneChangeKey = LiteralStringRef("\xff\x02/bgpChange");
const KeyRef blobGranulePurgeChangeKey = LiteralStringRef("\xff\x02/bgpChange");
const uint8_t BG_FILE_TYPE_DELTA = 'D';
const uint8_t BG_FILE_TYPE_SNAPSHOT = 'S';
@ -1214,7 +1215,7 @@ std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFi
return std::tuple(filename, offset, length, fullFileLength);
}
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) {
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << version;
wr << range;
@ -1222,7 +1223,7 @@ const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force
return wr.toValue();
}
std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value) {
std::tuple<Version, KeyRange, bool> decodeBlobGranulePurgeValue(ValueRef const& value) {
Version version;
KeyRange range;
bool force;

View File

@ -348,6 +348,11 @@ extern const KeyRef logsKey;
// Used during backup/recovery to restrict version requirements
extern const KeyRef minRequiredCommitVersionKey;
// "\xff/versionEpochKey" = "[[uint64_t]]"
// Defines the base epoch representing version 0. The value itself is the
// number of microseconds since the Unix epoch.
extern const KeyRef versionEpochKey;
const Value logsValue(const std::vector<std::pair<UID, NetworkAddress>>& logs,
const std::vector<std::pair<UID, NetworkAddress>>& oldLogs);
std::pair<std::vector<std::pair<UID, NetworkAddress>>, std::vector<std::pair<UID, NetworkAddress>>> decodeLogsValue(
@ -564,9 +569,9 @@ extern const KeyRangeRef blobGranuleSplitKeys;
extern const KeyRangeRef blobGranuleHistoryKeys;
// \xff\x02/bgp/(start,end) = (version, force)
extern const KeyRangeRef blobGranulePruneKeys;
extern const KeyRangeRef blobGranulePurgeKeys;
extern const KeyRangeRef blobGranuleVersionKeys;
extern const KeyRef blobGranulePruneChangeKey;
extern const KeyRef blobGranulePurgeChangeKey;
const Key blobGranuleFileKeyFor(UID granuleID, Version fileVersion, uint8_t fileType);
std::tuple<UID, Version, uint8_t> decodeBlobGranuleFileKey(KeyRef const& key);
@ -575,8 +580,8 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID);
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength);
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force);
std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value);
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force);
std::tuple<Version, KeyRange, bool> decodeBlobGranulePurgeValue(ValueRef const& value);
const Value blobGranuleMappingValueFor(UID const& workerID);
UID decodeBlobGranuleMappingValue(ValueRef const& value);

View File

@ -127,6 +127,20 @@ ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<Pro
[db, expectedVersion]() -> Future<ProtocolVersion> { return db->getClusterProtocol(expectedVersion); });
}
ThreadFuture<Key> ThreadSafeDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread([db, range, purgeVersion, force]() -> Future<Key> {
return db->purgeBlobGranules(range, purgeVersion, force);
});
}
ThreadFuture<Void> ThreadSafeDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
DatabaseContext* db = this->db;
Key key = purgeKey;
return onMainThread([db, key]() -> Future<Void> { return db->waitPurgeGranulesComplete(key); });
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);

View File

@ -59,6 +59,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;

View File

@ -326,6 +326,7 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
AcknowledgementReceiver acknowledgements;
Endpoint requestStreamEndpoint;
bool sentError = false;
bool notifiedFailed = false;
Promise<Void> onConnect;
NetNotifiedQueueWithAcknowledgements(int futures, int promises)
@ -402,14 +403,20 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
return res;
}
~NetNotifiedQueueWithAcknowledgements() {
if (acknowledgements.getRawEndpoint().isValid() && acknowledgements.isRemoteEndpoint() && !this->hasError()) {
void notifyFailed() {
if (!notifiedFailed && acknowledgements.getRawEndpoint().isValid() && acknowledgements.isRemoteEndpoint() &&
!this->hasError()) {
// Notify the server that a client is not using this ReplyPromiseStream anymore
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
notifiedFailed = true;
}
}
~NetNotifiedQueueWithAcknowledgements() {
notifyFailed();
if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) {
// Notify the client ReplyPromiseStream was cancelled before sending an error, so the storage server must
// have died
@ -505,6 +512,8 @@ public:
return queue->onConnect.getFuture();
}
void notifyFailed() { queue->notifyFailed(); }
~ReplyPromiseStream() {
if (queue)
queue->delPromiseRef();

View File

@ -321,6 +321,8 @@ void endStreamOnDisconnect(Future<Void> signal,
wait(signal || stream.onConnected());
}
}
// Notify BEFORE dropping last reference, causing broken_promise to send on stream before destructor is called
stream.notifyFailed();
}
}

View File

@ -612,6 +612,18 @@ private:
TEST(true); // Recovering at a higher version.
}
void checkSetVersionEpochKey(MutationRef m) {
if (m.param1 != versionEpochKey) {
return;
}
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(m.param2, Unversioned());
TraceEvent("VersionEpoch", dbgid).detail("Epoch", versionEpoch);
if (!initialCommit)
txnStateStore->set(KeyValueRef(m.param1, m.param2));
confChange = true;
TEST(true); // Setting version epoch
}
void checkSetWriteRecoverKey(MutationRef m) {
if (m.param1 != writeRecoveryKey) {
return;
@ -990,6 +1002,16 @@ private:
}
}
void checkClearVersionEpochKeys(MutationRef m, KeyRangeRef range) {
if (!range.contains(versionEpochKey)) {
return;
}
if (!initialCommit)
txnStateStore->clear(singleKeyRange(versionEpochKey));
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m);
confChange = true;
}
void checkClearTenantMapPrefix(KeyRangeRef range) {
if (tenantMapKeys.intersects(range)) {
if (tenantMap) {
@ -1155,6 +1177,7 @@ public:
checkSetGlobalKeys(m);
checkSetWriteRecoverKey(m);
checkSetMinRequiredCommitVersionKey(m);
checkSetVersionEpochKey(m);
checkSetTenantMapPrefix(m);
checkSetOtherKeys(m);
} else if (m.type == MutationRef::ClearRange && isSystemKey(m.param2)) {
@ -1171,6 +1194,7 @@ public:
checkClearLogRangesRange(range);
checkClearTssMappingKeys(m, range);
checkClearTssQuarantineKeys(m, range);
checkClearVersionEpochKeys(m, range);
checkClearTenantMapPrefix(range);
checkClearMiscRangeKeys(range);
}

View File

@ -216,7 +216,7 @@ struct SplitEvaluation {
struct BlobManagerStats {
CounterCollection cc;
// FIXME: pruning stats
// FIXME: purging stats
Counter granuleSplits;
Counter granuleWriteHotSplits;
@ -226,6 +226,10 @@ struct BlobManagerStats {
Counter ccMismatches;
Counter ccTimeouts;
Counter ccErrors;
Counter purgesProcessed;
Counter granulesFullyPurged;
Counter granulesPartiallyPurged;
Counter filesPurged;
Future<Void> logger;
// Current stats maintained for a given blob worker process
@ -233,7 +237,9 @@ struct BlobManagerStats {
: cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc),
granuleWriteHotSplits("GranuleWriteHotSplits", cc), ccGranulesChecked("CCGranulesChecked", cc),
ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc),
ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc) {
ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc),
granulesFullyPurged("GranulesFullyPurged", cc), granulesPartiallyPurged("GranulesPartiallyPurged", cc),
filesPurged("FilesPurged", cc) {
specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); });
logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics");
}
@ -438,6 +444,7 @@ ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData) {
ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
RangeAssignment assignment,
Optional<UID> workerID,
int64_t epoch,
int64_t seqNo) {
// WorkerId is set, except in case of assigning to any worker. Then we pick the worker to assign to in here
@ -468,7 +475,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
assignment.isAssign ? "assigning" : "revoking",
assignment.keyRange.begin.printable(),
assignment.keyRange.end.printable(),
bmData->epoch,
epoch,
seqNo,
workerID.get().toString());
}
@ -481,7 +488,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
AssignBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.type = assignment.assign.get().type;
@ -497,7 +504,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
RevokeBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.dispose = assignment.revoke.get().dispose;
@ -637,10 +644,10 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
}
count++;
}
ASSERT(count == 1);
if (skip) {
continue;
}
ASSERT(count == 1);
if (assignment.worker.present() && assignment.worker.get().isValid()) {
if (BM_DEBUG) {
@ -653,7 +660,7 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
bmData->workerAssignments.insert(assignment.keyRange, workerId);
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, workerId, seqNo));
doRangeAssignment(bmData, assignment, workerId, bmData->epoch, seqNo));
// If we know about the worker and this is not a continue, then this is a new range for the worker
if (bmData->workerStats.count(workerId) &&
assignment.assign.get().type != AssignRequestType::Continue) {
@ -662,8 +669,8 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
} else {
// Ensure the key boundaries are updated before we pick a worker
bmData->workerAssignments.insert(assignment.keyRange, UID());
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, Optional<UID>(), seqNo));
bmData->assignsInProgress.insert(
assignment.keyRange, doRangeAssignment(bmData, assignment, Optional<UID>(), bmData->epoch, seqNo));
}
} else {
@ -677,7 +684,8 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
if (existingRange.range() == assignment.keyRange && existingRange.cvalue() == assignment.worker.get()) {
bmData->workerAssignments.insert(assignment.keyRange, UID());
}
bmData->addActor.send(doRangeAssignment(bmData, assignment, assignment.worker.get(), seqNo));
bmData->addActor.send(
doRangeAssignment(bmData, assignment, assignment.worker.get(), bmData->epoch, seqNo));
} else {
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
for (auto& it : currentAssignments) {
@ -693,7 +701,7 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
}
// revoke the range for the worker that owns it, not the worker specified in the revoke
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), bmData->epoch, seqNo));
}
bmData->workerAssignments.insert(assignment.keyRange, UID());
}
@ -1356,26 +1364,6 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
// back is to split the range.
ASSERT(rep.doSplit);
// only evaluate for split if this worker currently owns the granule in this blob manager's mapping
auto currGranuleAssignment = bmData->workerAssignments.rangeContaining(rep.granuleRange.begin);
if (!(currGranuleAssignment.begin() == rep.granuleRange.begin &&
currGranuleAssignment.end() == rep.granuleRange.end &&
currGranuleAssignment.cvalue() == bwInterf.id())) {
if (BM_DEBUG) {
fmt::print("Manager {0} ignoring status from BW {1} for granule [{2} - {3}) since BW {4} owns "
"[{5} - {6}).\n",
bmData->epoch,
bwInterf.id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
currGranuleAssignment.cvalue().toString().substr(0, 5),
currGranuleAssignment.begin().printable(),
currGranuleAssignment.end().printable());
}
// FIXME: could send revoke request
continue;
}
// FIXME: We will need to go over all splits in the range once we're doing merges, instead of first one
auto lastSplitEval = bmData->splitEvaluations.rangeContaining(rep.granuleRange.begin);
if (rep.granuleRange.begin == lastSplitEval.begin() && rep.granuleRange.end == lastSplitEval.end() &&
@ -1386,46 +1374,67 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable());
}
} else {
ASSERT(lastSplitEval.cvalue().epoch < rep.epoch ||
(lastSplitEval.cvalue().epoch == rep.epoch && lastSplitEval.cvalue().seqno < rep.seqno));
if (lastSplitEval.cvalue().inProgress.isValid() && !lastSplitEval.cvalue().inProgress.isReady()) {
TEST(true); // racing BM splits
// For example, one worker asked BM to split, then died, granule was moved, new worker asks to
// split on recovery. We need to ensure that they are semantically the same split.
// We will just rely on the in-progress split to finish
if (BM_DEBUG) {
fmt::print("Manager {0} got split request for [{1} - {2}) @ ({3}, {4}), but already in "
"progress from [{5} - {6}) @ ({7}, {8})\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno,
lastSplitEval.begin().printable().c_str(),
lastSplitEval.end().printable().c_str(),
lastSplitEval.cvalue().epoch,
lastSplitEval.cvalue().seqno);
}
// ignore the request, they will retry
} else {
if (BM_DEBUG) {
fmt::print("Manager {0} evaluating [{1} - {2}) @ ({3}, {4}) for split\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno);
}
Future<Void> doSplitEval = maybeSplitRange(bmData,
bwInterf.id(),
rep.granuleRange,
rep.granuleID,
rep.startVersion,
rep.writeHotSplit);
bmData->splitEvaluations.insert(rep.granuleRange,
SplitEvaluation(rep.epoch, rep.seqno, doSplitEval));
} else if (!(lastSplitEval.cvalue().epoch < rep.epoch ||
(lastSplitEval.cvalue().epoch == rep.epoch && lastSplitEval.cvalue().seqno < rep.seqno))) {
TEST(true); // BM got out-of-date split request
if (BM_DEBUG) {
fmt::print(
"Manager {0} ignoring status from BW {1} for granule [{2} - {3}) since it already processed"
"[{4} - {5}) @ ({6}, {7}).\n",
bmData->epoch,
bwInterf.id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
lastSplitEval.begin().printable(),
lastSplitEval.end().printable(),
lastSplitEval.cvalue().epoch,
lastSplitEval.cvalue().seqno);
}
// revoke range from out-of-date worker, but bypass rangeAssigner and hack (epoch, seqno) to be
// (requesting epoch, requesting seqno + 1) to ensure no race with then reassigning the range to the
// worker at a later version
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.worker = bwInterf.id();
revokeOld.keyRange = rep.granuleRange;
revokeOld.revoke = RangeRevokeData(false);
bmData->addActor.send(
doRangeAssignment(bmData, revokeOld, bwInterf.id(), rep.epoch, rep.seqno + 1));
} else if (lastSplitEval.cvalue().inProgress.isValid() &&
!lastSplitEval.cvalue().inProgress.isReady()) {
TEST(true); // racing BM splits
// For example, one worker asked BM to split, then died, granule was moved, new worker asks to
// split on recovery. We need to ensure that they are semantically the same split.
// We will just rely on the in-progress split to finish
if (BM_DEBUG) {
fmt::print("Manager {0} got split request for [{1} - {2}) @ ({3}, {4}), but already in "
"progress from [{5} - {6}) @ ({7}, {8})\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno,
lastSplitEval.begin().printable().c_str(),
lastSplitEval.end().printable().c_str(),
lastSplitEval.cvalue().epoch,
lastSplitEval.cvalue().seqno);
}
// ignore the request, they will retry
} else {
if (BM_DEBUG) {
fmt::print("Manager {0} evaluating [{1} - {2}) @ ({3}, {4}) for split\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno);
}
Future<Void> doSplitEval = maybeSplitRange(
bmData, bwInterf.id(), rep.granuleRange, rep.granuleID, rep.startVersion, rep.writeHotSplit);
bmData->splitEvaluations.insert(rep.granuleRange,
SplitEvaluation(rep.epoch, rep.seqno, doSplitEval));
}
}
} catch (Error& e) {
@ -2160,23 +2169,84 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, U
}
}
// FIXME: trace events for pruning
// FIXME: trace events for purging
ACTOR Future<Void> canDeleteFullGranule(Reference<BlobManagerData> self, UID granuleId) {
state Transaction tr(self->db);
state KeyRange splitRange = blobGranuleSplitKeyRangeFor(granuleId);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state RangeResult splitState = wait(tr.getRange(splitRange, SERVER_KNOBS->BG_MAX_SPLIT_FANOUT));
state int i = 0;
state bool retry = false;
for (; i < splitState.size(); i++) {
UID parent, child;
BlobGranuleSplitState st;
Version v;
std::tie(parent, child) = decodeBlobGranuleSplitKey(splitState[i].key);
std::tie(st, v) = decodeBlobGranuleSplitValue(splitState[i].value);
// if split state is done, this granule has definitely persisted a snapshot
if (st >= BlobGranuleSplitState::Done) {
continue;
}
// if split state isn't even assigned, this granule has definitely not persisted a snapshot
if (st <= BlobGranuleSplitState::Initialized) {
retry = true;
break;
}
ASSERT(st == BlobGranuleSplitState::Assigned);
// if assigned, granule may or may not have snapshotted. Check files to confirm. Since a re-snapshot is
// the first file written for a new granule, any files present mean it has re-snapshotted from this
// granule
KeyRange granuleFileRange = blobGranuleFileKeyRangeFor(child);
RangeResult files = wait(tr.getRange(granuleFileRange, 1));
if (files.empty()) {
retry = true;
break;
}
}
if (retry) {
tr.reset();
wait(delay(1.0));
} else {
if (splitState.empty() || !splitState.more) {
break;
}
splitRange = KeyRangeRef(keyAfter(splitState.back().key), splitRange.end);
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
/*
* Deletes all files pertaining to the granule with id granuleId and
* also removes the history entry for this granule from the system keyspace
* TODO: ensure cannot fully delete granule that is still splitting!
*/
ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Key historyKey) {
ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
UID granuleId,
Key historyKey,
Version purgeVersion) {
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: init\n", granuleId.toString());
}
// if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially
// delete the granule, since we need to keep the last snapshot and deltas for splitting
wait(canDeleteFullGranule(self, granuleId));
// get files
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId));
std::vector<Future<Void>> deletions;
std::vector<std::string> filesToDelete; // TODO: remove, just for debugging
state std::vector<std::string> filesToDelete; // TODO: remove, just for debugging
for (auto snapshotFile : files.snapshotFiles) {
std::string fname = snapshotFile.filename;
@ -2191,7 +2261,7 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granu
}
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), deletions.size());
fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
for (auto filename : filesToDelete) {
fmt::print(" - {}\n", filename.c_str());
}
@ -2228,18 +2298,27 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granu
fmt::print("Fully deleting granule {0}: success\n", granuleId.toString());
}
TraceEvent("GranuleFullPurge", self->id)
.detail("Epoch", self->epoch)
.detail("GranuleID", granuleId)
.detail("PurgeVersion", purgeVersion)
.detail("FilesPurged", filesToDelete.size());
++self->stats.granulesFullyPurged;
self->stats.filesPurged += filesToDelete.size();
return Void();
}
/*
* For the granule with id granuleId, finds the first snapshot file at a
* version <= pruneVersion and deletes all files older than it.
* version <= purgeVersion and deletes all files older than it.
*
* Assumption: this granule's startVersion might change because the first snapshot
* file might be deleted. We will need to ensure we don't rely on the granule's startVersion
* (that's persisted as part of the key), but rather use the granule's first snapshot's version when needed
*/
ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Version pruneVersion) {
ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Version purgeVersion) {
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: init\n", granuleId.toString());
}
@ -2247,7 +2326,7 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
// get files
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId));
// represents the version of the latest snapshot file in this granule with G.version < pruneVersion
// represents the version of the latest snapshot file in this granule with G.version < purgeVersion
Version latestSnapshotVersion = invalidVersion;
state std::vector<Future<Void>> deletions; // deletion work per file
@ -2262,8 +2341,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
deletions.emplace_back(self->bstore->deleteFile(fname));
deletedFileKeys.emplace_back(blobGranuleFileKeyFor(granuleId, files.snapshotFiles[idx].version, 'S'));
filesToDelete.emplace_back(fname);
} else if (files.snapshotFiles[idx].version <= pruneVersion) {
// otherwise if this is the FIRST snapshot file with version < pruneVersion,
} else if (files.snapshotFiles[idx].version <= purgeVersion) {
// otherwise if this is the FIRST snapshot file with version < purgeVersion,
// then we found our latestSnapshotVersion (FIRST since we are traversing in reverse)
latestSnapshotVersion = files.snapshotFiles[idx].version;
}
@ -2289,19 +2368,19 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
}
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), deletions.size());
fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
for (auto filename : filesToDelete) {
fmt::print(" - {0}\n", filename);
}
}
// TODO: the following comment relies on the assumption that BWs will not get requests to
// read data that was already pruned. confirm assumption is fine. otherwise, we'd need
// to communicate with BWs here and have them ack the pruneVersion
// read data that was already purged. confirm assumption is fine. otherwise, we'd need
// to communicate with BWs here and have them ack the purgeVersion
// delete the files before the corresponding metadata.
// this could lead to dangling pointers in fdb, but we should never read data older than
// pruneVersion anyways, and we can clean up the keys the next time around.
// purgeVersion anyways, and we can clean up the keys the next time around.
// deleting files before corresponding metadata reduces the # of orphaned files.
wait(waitForAll(deletions));
@ -2329,26 +2408,41 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: success\n", granuleId.toString());
}
TraceEvent("GranulePartialPurge", self->id)
.detail("Epoch", self->epoch)
.detail("GranuleID", granuleId)
.detail("PurgeVersion", purgeVersion)
.detail("FilesPurged", filesToDelete.size());
++self->stats.granulesPartiallyPurged;
self->stats.filesPurged += filesToDelete.size();
return Void();
}
/*
* This method is used to prune the range [startKey, endKey) at (and including) pruneVersion.
* This method is used to purge the range [startKey, endKey) at (and including) purgeVersion.
* To do this, we do a BFS traversal starting at the active granules. Then we classify granules
* in the history as nodes that can be fully deleted (i.e. their files and history can be deleted)
* and nodes that can be partially deleted (i.e. some of their files can be deleted).
* Once all this is done, we finally clear the pruneIntent key, if possible, to indicate we are done
* processing this prune intent.
* Once all this is done, we finally clear the purgeIntent key, if possible, to indicate we are done
* processing this purge intent.
*/
ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range, Version pruneVersion, bool force) {
ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range, Version purgeVersion, bool force) {
if (BM_DEBUG) {
fmt::print("pruneRange starting for range [{0} - {1}) @ pruneVersion={2}, force={3}\n",
fmt::print("purgeRange starting for range [{0} - {1}) @ purgeVersion={2}, force={3}\n",
range.begin.printable(),
range.end.printable(),
pruneVersion,
purgeVersion,
force);
}
TraceEvent("PurgeGranulesBegin", self->id)
.detail("Epoch", self->epoch)
.detail("Range", range)
.detail("PurgeVersion", purgeVersion)
.detail("Force", force);
// queue of <range, startVersion, endVersion> for BFS traversal of history
state std::queue<std::tuple<KeyRange, Version, Version>> historyEntryQueue;
@ -2371,18 +2465,18 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
state KeyRangeMap<UID>::iterator activeRange;
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
if (BM_DEBUG) {
fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be pruned\n",
fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be purged\n",
activeRange.begin().printable(),
activeRange.end().printable(),
activeRange.value().toString());
}
// assumption: prune boundaries must respect granule boundaries
// assumption: purge boundaries must respect granule boundaries
if (activeRange.begin() < range.begin || activeRange.end() > range.end) {
continue;
}
// TODO: if this is a force prune, then revoke the assignment from the corresponding BW first
// TODO: if this is a force purge, then revoke the assignment from the corresponding BW first
// so that it doesn't try to interact with the granule (i.e. force it to give up gLock).
// we'll need some way to ack that the revoke was successful
@ -2456,17 +2550,17 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
}
// There are three cases this granule can fall into:
// - if the granule's end version is at or before the prune version or this is a force delete,
// - if the granule's end version is at or before the purge version or this is a force delete,
// this granule should be completely deleted
// - else if the startVersion <= pruneVersion, then G.startVersion < pruneVersion < G.endVersion
// - else if the startVersion <= purgeVersion, then G.startVersion < purgeVersion < G.endVersion
// and so this granule should be partially deleted
// - otherwise, this granule is active, so don't schedule it for deletion
if (force || endVersion <= pruneVersion) {
if (force || endVersion <= purgeVersion) {
if (BM_DEBUG) {
fmt::print("Granule {0} will be FULLY deleted\n", currHistoryNode.granuleID.toString());
}
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey });
} else if (startVersion < pruneVersion) {
} else if (startVersion < purgeVersion) {
if (BM_DEBUG) {
fmt::print("Granule {0} will be partially deleted\n", currHistoryNode.granuleID.toString());
}
@ -2513,70 +2607,79 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
// we won't run into any issues with trying to "re-delete" a blob file since deleting
// a file that doesn't exist is considered successful
state std::vector<Future<Void>> partialDeletions;
state int i;
if (BM_DEBUG) {
fmt::print("{0} granules to fully delete\n", toFullyDelete.size());
}
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
UID granuleId;
state UID granuleId;
Key historyKey;
std::tie(granuleId, historyKey) = toFullyDelete[i];
// FIXME: consider batching into a single txn (need to take care of txn size limit)
if (BM_DEBUG) {
fmt::print("About to fully delete granule {0}\n", granuleId.toString());
}
wait(fullyDeleteGranule(self, granuleId, historyKey));
wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion));
}
if (BM_DEBUG) {
fmt::print("{0} granules to partially delete\n", toPartiallyDelete.size());
}
std::vector<Future<Void>> partialDeletions;
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
UID granuleId = toPartiallyDelete[i];
if (BM_DEBUG) {
fmt::print("About to partially delete granule {0}\n", granuleId.toString());
}
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, pruneVersion));
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, purgeVersion));
}
wait(waitForAll(partialDeletions));
// Now that all the necessary granules and their files have been deleted, we can
// clear the pruneIntent key to signify that the work is done. However, there could have been
// another pruneIntent that got written for this table while we were processing this one.
// clear the purgeIntent key to signify that the work is done. However, there could have been
// another purgeIntent that got written for this table while we were processing this one.
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
if (BM_DEBUG) {
fmt::print("Successfully pruned range [{0} - {1}) at pruneVersion={2}\n",
fmt::print("Successfully purged range [{0} - {1}) at purgeVersion={2}\n",
range.begin.printable(),
range.end.printable(),
pruneVersion);
purgeVersion);
}
TraceEvent("PurgeGranulesComplete", self->id)
.detail("Epoch", self->epoch)
.detail("Range", range)
.detail("PurgeVersion", purgeVersion)
.detail("Force", force);
++self->stats.purgesProcessed;
return Void();
}
/*
* This monitor watches for changes to a key K that gets updated whenever there is a new prune intent.
* On this change, we scan through all blobGranulePruneKeys (which look like <startKey, endKey>=<prune_version,
* force>) and prune any intents.
* This monitor watches for changes to a key K that gets updated whenever there is a new purge intent.
* On this change, we scan through all blobGranulePurgeKeys (which look like <startKey, endKey>=<purge_version,
* force>) and purge any intents.
*
* Once the prune has succeeded, we clear the key IF the version is still the same one that was pruned.
* That way, if another prune intent arrived for the same range while we were working on an older one,
* Once the purge has succeeded, we clear the key IF the version is still the same one that was purged.
* That way, if another purge intent arrived for the same range while we were working on an older one,
* we wouldn't end up clearing the intent.
*
* When watching for changes, we might end up in scenarios where we failed to do the work
* for a prune intent even though the watch was triggered (maybe the BM had a blip). This is problematic
* if the intent is a force and there isn't another prune intent for quite some time. To remedy this,
* if we don't see a watch change in X (configurable) seconds, we will just sweep through the prune intents,
* for a purge intent even though the watch was triggered (maybe the BM had a blip). This is problematic
* if the intent is a force and there isn't another purge intent for quite some time. To remedy this,
* if we don't see a watch change in X (configurable) seconds, we will just sweep through the purge intents,
* consolidating any work we might have missed before.
*
* Note: we could potentially use a changefeed here to get the exact pruneIntent that was added
* Note: we could potentially use a changefeed here to get the exact purgeIntent that was added
* rather than iterating through all of them, but this might have too much overhead for latency
* improvements we don't really need here (also we need to go over all prune intents anyways in the
* case that the timer is up before any new prune intents arrive).
* improvements we don't really need here (also we need to go over all purge intents anyways in the
* case that the timer is up before any new purge intents arrive).
*/
ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
self->initBStore();
loop {
@ -2585,35 +2688,35 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Wait for the watch to change, or some time to expire (whichever comes first)
// before checking through the prune intents. We write a UID into the change key value
// before checking through the purge intents. We write a UID into the change key value
// so that we can still recognize when the watch key has been changed while we weren't
// monitoring it
state Key lastPruneKey = blobGranulePruneKeys.begin;
state Key lastPurgeKey = blobGranulePurgeKeys.begin;
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state std::vector<Future<Void>> prunes;
state CoalescedKeyRangeMap<std::pair<Version, bool>> pruneMap;
pruneMap.insert(allKeys, std::make_pair<Version, bool>(0, false));
state std::vector<Future<Void>> purges;
state CoalescedKeyRangeMap<std::pair<Version, bool>> purgeMap;
purgeMap.insert(allKeys, std::make_pair<Version, bool>(0, false));
try {
// TODO: replace 10000 with a knob
state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, BUGGIFY ? 1 : 10000));
if (pruneIntents.size()) {
state RangeResult purgeIntents = wait(tr->getRange(blobGranulePurgeKeys, BUGGIFY ? 1 : 10000));
if (purgeIntents.size()) {
int rangeIdx = 0;
for (; rangeIdx < pruneIntents.size(); ++rangeIdx) {
Version pruneVersion;
for (; rangeIdx < purgeIntents.size(); ++rangeIdx) {
Version purgeVersion;
KeyRange range;
bool force;
std::tie(pruneVersion, range, force) =
decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
auto ranges = pruneMap.intersectingRanges(range);
std::tie(purgeVersion, range, force) =
decodeBlobGranulePurgeValue(purgeIntents[rangeIdx].value);
auto ranges = purgeMap.intersectingRanges(range);
bool foundConflict = false;
for (auto it : ranges) {
if ((it.value().second && !force && it.value().first < pruneVersion) ||
(!it.value().second && force && pruneVersion < it.value().first)) {
if ((it.value().second && !force && it.value().first < purgeVersion) ||
(!it.value().second && force && purgeVersion < it.value().first)) {
foundConflict = true;
break;
}
@ -2621,39 +2724,41 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
if (foundConflict) {
break;
}
pruneMap.insert(range, std::make_pair(pruneVersion, force));
purgeMap.insert(range, std::make_pair(purgeVersion, force));
fmt::print("about to prune range [{0} - {1}) @ {2}, force={3}\n",
range.begin.printable(),
range.end.printable(),
pruneVersion,
force ? "T" : "F");
if (BM_DEBUG) {
fmt::print("about to purge range [{0} - {1}) @ {2}, force={3}\n",
range.begin.printable(),
range.end.printable(),
purgeVersion,
force ? "T" : "F");
}
}
lastPruneKey = pruneIntents[rangeIdx - 1].key;
lastPurgeKey = purgeIntents[rangeIdx - 1].key;
for (auto it : pruneMap.ranges()) {
for (auto it : purgeMap.ranges()) {
if (it.value().first > 0) {
prunes.emplace_back(pruneRange(self, it.range(), it.value().first, it.value().second));
purges.emplace_back(purgeRange(self, it.range(), it.value().first, it.value().second));
}
}
// wait for this set of prunes to complete before starting the next ones since if we
// prune a range R at version V and while we are doing that, the time expires, we will
// end up trying to prune the same range again since the work isn't finished and the
// prunes will race
// wait for this set of purges to complete before starting the next ones since if we
// purge a range R at version V and while we are doing that, the time expires, we will
// end up trying to purge the same range again since the work isn't finished and the
// purges will race
//
// TODO: this isn't that efficient though. Instead we could keep metadata as part of the
// BM's memory that tracks which prunes are active. Once done, we can mark that work as
// done. If the BM fails then all prunes will fail and so the next BM will have a clear
// BM's memory that tracks which purges are active. Once done, we can mark that work as
// done. If the BM fails then all purges will fail and so the next BM will have a clear
// set of metadata (i.e. no work in progress) so we will end up doing the work in the
// new BM
wait(waitForAll(prunes));
wait(waitForAll(purges));
break;
} else {
state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey);
state Future<Void> watchPurgeIntentsChange = tr->watch(blobGranulePurgeChangeKey);
wait(tr->commit());
wait(watchPruneIntentsChange);
wait(watchPurgeIntentsChange);
tr->reset();
}
} catch (Error& e) {
@ -2666,7 +2771,7 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->clear(KeyRangeRef(blobGranulePruneKeys.begin, keyAfter(lastPruneKey)));
tr->clear(KeyRangeRef(blobGranulePurgeKeys.begin, keyAfter(lastPurgeKey)));
wait(tr->commit());
break;
} catch (Error& e) {
@ -2675,7 +2780,7 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
}
if (BM_DEBUG) {
printf("Done pruning current set of prune intents.\n");
printf("Done clearing current set of purge intents.\n");
}
}
}
@ -2876,7 +2981,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
self->addActor.send(doLockChecks(self));
self->addActor.send(monitorClientRanges(self));
self->addActor.send(monitorPruneKeys(self));
self->addActor.send(monitorPurgeKeys(self));
if (SERVER_KNOBS->BG_CONSISTENCY_CHECK_ENABLED) {
self->addActor.send(bgConsistencyCheck(self));
}

View File

@ -86,6 +86,7 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots
Version pendingSnapshotVersion = 0;
Version initialSnapshotVersion = invalidVersion;
Version historyVersion = invalidVersion;
Version knownCommittedVersion;
int64_t originalEpoch;
@ -756,7 +757,11 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
bytesRead);
}
state Error err = e;
wait(tr->onError(e));
if (e.code() == error_code_server_overloaded) {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} else {
wait(tr->onError(e));
}
retries++;
TEST(true); // Granule initial snapshot failed
// FIXME: why can't we supress error event?
@ -935,13 +940,8 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
break;
}
bwData->currentManagerStatusStream.get().send(GranuleStatusReply(metadata->keyRange,
true,
writeHot,
statusEpoch,
statusSeqno,
granuleID,
metadata->initialSnapshotVersion));
bwData->currentManagerStatusStream.get().send(GranuleStatusReply(
metadata->keyRange, true, writeHot, statusEpoch, statusSeqno, granuleID, metadata->historyVersion));
break;
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
@ -1037,10 +1037,14 @@ static void handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
// if we get an i/o error updating files, or a rollback, reassign the granule to ourselves and start fresh
static bool granuleCanRetry(const Error& e) {
switch (e.code()) {
case error_code_please_reboot:
case error_code_io_error:
case error_code_io_timeout:
// FIXME: handle connection errors in tighter retry loop around individual files.
// FIXME: if these requests fail at a high enough rate, the whole worker should be marked as unhealthy and its
// granules should be moved away, as there may be some problem with this host contacting blob storage
case error_code_http_request_failed:
case error_code_connection_failed:
case error_code_lookup_failed: // dns
return true;
default:
return false;
@ -1119,10 +1123,15 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
}
metadata->pendingDeltaVersion = cfRollbackVersion;
if (BW_DEBUG) {
fmt::print("[{0} - {1}) rollback discarding all {2} in-memory mutations\n",
fmt::print("[{0} - {1}) rollback discarding all {2} in-memory mutations",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->currentDeltas.size());
if (metadata->currentDeltas.size()) {
fmt::print(
" {0} - {1}", metadata->currentDeltas.front().version, metadata->currentDeltas.back().version);
}
fmt::print("\n");
}
// discard all in-memory mutations
@ -1150,6 +1159,8 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
// FIXME: could binary search?
int mIdx = metadata->currentDeltas.size() - 1;
Version firstDiscarded = invalidVersion;
Version lastDiscarded = invalidVersion;
while (mIdx >= 0) {
if (metadata->currentDeltas[mIdx].version <= rollbackVersion) {
break;
@ -1157,19 +1168,37 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
for (auto& m : metadata->currentDeltas[mIdx].mutations) {
metadata->bufferedDeltaBytes -= m.totalSize();
}
if (firstDiscarded == invalidVersion) {
firstDiscarded = metadata->currentDeltas[mIdx].version;
}
lastDiscarded = metadata->currentDeltas[mIdx].version;
mIdx--;
}
mIdx++;
if (BW_DEBUG) {
fmt::print("[{0} - {1}) rollback discarding {2} in-memory mutations, {3} mutations and {4} bytes left\n",
fmt::print("[{0} - {1}) rollback discarding {2} in-memory mutations",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->currentDeltas.size() - mIdx,
mIdx,
metadata->bufferedDeltaBytes);
metadata->currentDeltas.size() - mIdx - 1);
if (firstDiscarded != invalidVersion) {
fmt::print(" {0} - {1}", lastDiscarded, firstDiscarded);
}
fmt::print(", {0} mutations", mIdx);
if (mIdx >= 0) {
fmt::print(
" ({0} - {1})", metadata->currentDeltas.front().version, metadata->currentDeltas[mIdx].version);
}
fmt::print(" and {0} bytes left\n", metadata->bufferedDeltaBytes);
}
metadata->currentDeltas.resize(metadata->currentDeltas.arena(), mIdx);
if (mIdx < 0) {
metadata->currentDeltas = Standalone<GranuleDeltas>();
metadata->bufferedDeltaBytes = 0;
} else {
metadata->currentDeltas.resize(metadata->currentDeltas.arena(), mIdx + 1);
}
// delete all deltas in rollback range, but we can optimize here to just skip the uncommitted mutations
// directly and immediately pop the rollback out of inProgress to completed
@ -1328,6 +1357,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->pendingSnapshotVersion = metadata->files.snapshotFiles.back().version;
metadata->durableSnapshotVersion.set(metadata->pendingSnapshotVersion);
metadata->initialSnapshotVersion = metadata->files.snapshotFiles.front().version;
metadata->historyVersion = startState.history.get().version;
} else {
if (startState.blobFilesToSnapshot.present()) {
startVersion = startState.previousDurableVersion;
@ -1350,6 +1380,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
metadata->initialSnapshotVersion = startVersion;
metadata->pendingSnapshotVersion = startVersion;
metadata->historyVersion = startState.history.present() ? startState.history.get().version : startVersion;
}
metadata->durableDeltaVersion.set(startVersion);
@ -1459,8 +1490,16 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
ASSERT(mutations.front().version > metadata->bufferedDeltaVersion);
// If this assert trips we should have gotten change_feed_popped from SS and didn't
ASSERT(mutations.front().version >= metadata->activeCFData.get()->popVersion);
// Rare race from merge cursor where no individual server detected popped in their response
if (mutations.front().version < metadata->activeCFData.get()->popVersion) {
TEST(true); // Blob Worker detected popped instead of change feed
TraceEvent("BlobWorkerChangeFeedPopped", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("GranuleID", startState.granuleID)
.detail("MutationVersion", mutations.front().version)
.detail("PopVersion", metadata->activeCFData.get()->popVersion);
throw change_feed_popped();
}
}
when(wait(inFlightFiles.empty() ? Never() : success(inFlightFiles.front().future))) {}
}
@ -1623,6 +1662,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->activeCFData.set(cfData);
justDidRollback = true;
lastDeltaVersion = cfRollbackVersion;
break;
}
}
@ -1841,6 +1881,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
}
} catch (Error& e) {
if (BW_DEBUG) {
fmt::print("Granule file updater for [{0} - {1}) got error {2}, exiting\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
e.name());
}
// Free last change feed data
metadata->activeCFData.set(Reference<ChangeFeedData>());
@ -1871,12 +1917,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
return Void();
}
++bwData->stats.granuleUpdateErrors;
if (BW_DEBUG) {
fmt::print("Granule file updater for [{0} - {1}) got error {2}, exiting\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
e.name());
}
if (granuleCanRetry(e)) {
TEST(true); // Granule close and re-open on error
@ -2002,6 +2042,14 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
int skipped = historyEntryStack.size() - 1 - i;
while (i >= 0) {
auto intersectingRanges = bwData->granuleHistory.intersectingRanges(historyEntryStack[i]->range);
std::vector<std::pair<KeyRange, Reference<GranuleHistoryEntry>>> newerHistory;
for (auto& r : intersectingRanges) {
if (r.value().isValid() && r.value()->endVersion >= historyEntryStack[i]->endVersion) {
newerHistory.push_back(std::make_pair(r.range(), r.value()));
}
}
auto prevRanges = bwData->granuleHistory.rangeContaining(historyEntryStack[i]->range.begin);
if (prevRanges.value().isValid() &&
@ -2012,6 +2060,9 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
}
bwData->granuleHistory.insert(historyEntryStack[i]->range, historyEntryStack[i]);
for (auto& it : newerHistory) {
bwData->granuleHistory.insert(it.first, it.second);
}
i--;
}
@ -2137,7 +2188,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
if (req.beginVersion > 0) {
fmt::print("{0} - {1}\n", req.beginVersion, req.readVersion);
} else {
fmt::print("{}", req.readVersion);
fmt::print("{}\n", req.readVersion);
}
}
@ -2210,7 +2261,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
state KeyRange chunkRange;
state GranuleFiles chunkFiles;
if (metadata->initialSnapshotVersion > req.readVersion) {
if (req.readVersion < metadata->historyVersion) {
TEST(true); // Granule Time Travel Read
// this is a time travel query, find previous granule
if (metadata->historyLoaded.canBeSet()) {
@ -2226,7 +2277,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
Reference<GranuleHistoryEntry> cur = bwData->granuleHistory.rangeContaining(historySearchKey).value();
// FIXME: use skip pointers here
Version expectedEndVersion = metadata->initialSnapshotVersion;
Version expectedEndVersion = metadata->historyVersion;
if (cur.isValid()) {
ASSERT(cur->endVersion == expectedEndVersion);
}
@ -2269,17 +2320,22 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
}
if (chunkFiles.snapshotFiles.empty()) {
// a snapshot file must have been pruned
// a snapshot file must have been purged
throw blob_granule_transaction_too_old();
}
ASSERT(!chunkFiles.deltaFiles.empty());
ASSERT(chunkFiles.deltaFiles.back().version > req.readVersion);
if (chunkFiles.snapshotFiles.front().version > req.readVersion) {
// a snapshot file must have been pruned
// a snapshot file must have been purged
throw blob_granule_transaction_too_old();
}
} else {
if (req.readVersion < metadata->initialSnapshotVersion) {
// a snapshot file must have been pruned
throw blob_granule_transaction_too_old();
}
TEST(true); // Granule Active Read
// this is an active granule query
loop {
@ -2287,7 +2343,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
throw wrong_shard_server();
}
Future<Void> waitForVersionFuture = waitForVersion(metadata, req.readVersion);
if (waitForVersionFuture.isReady()) {
if (waitForVersionFuture.isReady() && !waitForVersionFuture.isError()) {
// didn't wait, so no need to check rollback stuff
break;
}

View File

@ -345,6 +345,7 @@ ACTOR Future<Void> newSeedServers(Reference<ClusterRecoveryData> self,
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = deterministicRandom()->randomUniqueID();
isr.clusterId = self->clusterId;
isr.initialClusterVersion = self->recoveryTransactionVersion;
ErrorOr<InitializeStorageReply> newServer = wait(recruits.storageServers[idx].storage.tryGetReply(isr));
@ -996,8 +997,12 @@ ACTOR Future<std::vector<Standalone<CommitTransactionRef>>> recruitEverything(
newTLogServers(self, recruits, oldLogSystem, &confChanges));
// Update recovery related information to the newly elected sequencer (master) process.
wait(brokenPromiseToNever(self->masterInterface.updateRecoveryData.getReply(UpdateRecoveryDataRequest(
self->recoveryTransactionVersion, self->lastEpochEnd, self->commitProxies, self->resolvers))));
wait(brokenPromiseToNever(
self->masterInterface.updateRecoveryData.getReply(UpdateRecoveryDataRequest(self->recoveryTransactionVersion,
self->lastEpochEnd,
self->commitProxies,
self->resolvers,
self->versionEpoch))));
return confChanges;
}
@ -1049,6 +1054,14 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
self->txnStateStore =
keyValueStoreLogSystem(self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false, true);
// Version 0 occurs at the version epoch. The version epoch is the number
// of microseconds since the Unix epoch. It can be set through fdbcli.
self->versionEpoch.reset();
Optional<Standalone<StringRef>> versionEpochValue = wait(self->txnStateStore->readValue(versionEpochKey));
if (versionEpochValue.present()) {
self->versionEpoch = BinaryReader::fromStringRef<int64_t>(versionEpochValue.get(), Unversioned());
}
// Versionstamped operations (particularly those applied from DR) define a minimum commit version
// that we may recover to, as they embed the version in user-readable data and require that no
// transactions will be committed at a lower version.
@ -1059,6 +1072,11 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
if (requiredCommitVersion.present()) {
minRequiredCommitVersion = BinaryReader::fromStringRef<Version>(requiredCommitVersion.get(), Unversioned());
}
if (g_network->isSimulated() && self->versionEpoch.present()) {
minRequiredCommitVersion = std::max(
minRequiredCommitVersion,
static_cast<Version>(g_network->timer() * SERVER_KNOBS->VERSIONS_PER_SECOND - self->versionEpoch.get()));
}
// Recover version info
self->lastEpochEnd = oldLogSystem->getEnd() - 1;
@ -1071,14 +1089,14 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
self->recoveryTransactionVersion = self->lastEpochEnd + SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT;
}
if (BUGGIFY) {
self->recoveryTransactionVersion +=
deterministicRandom()->randomInt64(0, SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT);
}
if (self->recoveryTransactionVersion < minRequiredCommitVersion)
self->recoveryTransactionVersion = minRequiredCommitVersion;
}
if (BUGGIFY) {
self->recoveryTransactionVersion += deterministicRandom()->randomInt64(0, 10000000);
}
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_RECOVERED_EVENT_NAME).c_str(),
self->dbgid)
.detail("LastEpochEnd", self->lastEpochEnd)

View File

@ -169,6 +169,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
AsyncTrigger registrationTrigger;
Version lastEpochEnd, // The last version in the old epoch not (to be) rolled back in this recovery
recoveryTransactionVersion; // The first version in this epoch
Optional<int64_t> versionEpoch; // The epoch which all versions are based off of
double lastCommitTime;
Version liveCommittedVersion; // The largest live committed version reported by commit proxies.
@ -209,6 +210,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
UID clusterId;
Version initialClusterVersion = -1;
Standalone<StringRef> dbId;
MasterInterface masterInterface;

View File

@ -42,6 +42,7 @@
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/BooleanParam.h"
#include "flow/genericactors.actor.h"
#include "flow/serialize.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
@ -752,7 +753,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
shardsAffectedByTeamFailure,
lock,
getAverageShardBytes,
getUnhealthyRelocationCount,
getUnhealthyRelocationCount.getFuture(),
self->ddId,
storageTeamSize,
configuration.storageTeamSize,
@ -900,8 +901,39 @@ Future<Void> sendSnapReq(RequestStream<Req> stream, Req req, Error e) {
return Void();
}
ACTOR template <class Req>
Future<ErrorOr<Void>> trySendSnapReq(RequestStream<Req> stream, Req req) {
ErrorOr<REPLY_TYPE(Req)> reply = wait(stream.tryGetReply(req));
if (reply.isError()) {
TraceEvent("SnapDataDistributor_ReqError")
.errorUnsuppressed(reply.getError())
.detail("Peer", stream.getEndpoint().getPrimaryAddress());
return ErrorOr<Void>(reply.getError());
}
return ErrorOr<Void>(Void());
}
ACTOR static Future<Void> waitForMost(std::vector<Future<ErrorOr<Void>>> futures,
int faultTolerance,
Error e,
double waitMultiplierForSlowFutures = 1.0) {
state std::vector<Future<bool>> successFutures;
state double startTime = now();
successFutures.reserve(futures.size());
for (const auto& future : futures) {
successFutures.push_back(fmap([](auto const& result) { return result.present(); }, future));
}
bool success = wait(quorumEqualsTrue(successFutures, successFutures.size() - faultTolerance));
if (!success) {
throw e;
}
wait(delay((now() - startTime) * waitMultiplierForSlowFutures) || waitForAll(successFutures));
return Void();
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<ServerDBInfo> const> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
state ReadYourWritesTransaction tr(cx);
loop {
try {
@ -936,19 +968,29 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local storage nodes
std::vector<WorkerInterface> storageWorkers =
// TODO: Atomically read configuration and storage worker list in a single transaction
state DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx));
std::pair<std::vector<WorkerInterface>, int> storageWorkersAndFailures =
wait(transformErrors(getStorageWorkers(cx, db, true /* localOnly */), snap_storage_failed()));
const auto& [storageWorkers, storageFailures] = storageWorkersAndFailures;
auto const storageFaultTolerance =
std::min(static_cast<int>(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE),
configuration.storageTeamSize - 1) -
storageFailures;
if (storageFaultTolerance < 0) {
TEST(true); // Too many failed storage servers to complete snapshot
throw snap_storage_failed();
}
TraceEvent("SnapDataDistributor_GotStorageWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
std::vector<Future<Void>> storageSnapReqs;
std::vector<Future<ErrorOr<Void>>> storageSnapReqs;
storageSnapReqs.reserve(storageWorkers.size());
for (const auto& worker : storageWorkers) {
storageSnapReqs.push_back(sendSnapReq(worker.workerSnapReq,
WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr),
snap_storage_failed()));
storageSnapReqs.push_back(trySendSnapReq(
worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr)));
}
wait(waitForAll(storageSnapReqs));
wait(waitForMost(storageSnapReqs, storageFaultTolerance, snap_storage_failed()));
TraceEvent("SnapDataDistributor_AfterSnapStorage")
.detail("SnapPayload", snapReq.snapPayload)
@ -983,14 +1025,15 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
TraceEvent("SnapDataDistributor_GotCoordWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
std::vector<Future<Void>> coordSnapReqs;
std::vector<Future<ErrorOr<Void>>> coordSnapReqs;
coordSnapReqs.reserve(coordWorkers.size());
for (const auto& worker : coordWorkers) {
coordSnapReqs.push_back(sendSnapReq(worker.workerSnapReq,
WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr),
snap_coord_failed()));
coordSnapReqs.push_back(trySendSnapReq(
worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr)));
}
wait(waitForAll(coordSnapReqs));
auto const coordFaultTolerance = std::min<int>(std::max<int>(0, coordSnapReqs.size() / 2 - 1),
SERVER_KNOBS->MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE);
wait(waitForMost(coordSnapReqs, coordFaultTolerance, snap_coord_failed()));
TraceEvent("SnapDataDistributor_AfterSnapCoords")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
@ -1260,3 +1303,44 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
return Void();
}
static Future<ErrorOr<Void>> goodTestFuture(double duration) {
return tag(delay(duration), ErrorOr<Void>(Void()));
}
static Future<ErrorOr<Void>> badTestFuture(double duration, Error e) {
return tag(delay(duration), ErrorOr<Void>(e));
}
TEST_CASE("/DataDistribution/WaitForMost") {
state std::vector<Future<ErrorOr<Void>>> futures;
{
futures = { goodTestFuture(1), goodTestFuture(2), goodTestFuture(3) };
wait(waitForMost(futures, 1, operation_failed(), 0.0)); // Don't wait for slowest future
ASSERT(!futures[2].isReady());
}
{
futures = { goodTestFuture(1), goodTestFuture(2), goodTestFuture(3) };
wait(waitForMost(futures, 0, operation_failed(), 0.0)); // Wait for all futures
ASSERT(futures[2].isReady());
}
{
futures = { goodTestFuture(1), goodTestFuture(2), goodTestFuture(3) };
wait(waitForMost(futures, 1, operation_failed(), 1.0)); // Wait for slowest future
ASSERT(futures[2].isReady());
}
{
futures = { goodTestFuture(1), goodTestFuture(2), badTestFuture(1, success()) };
wait(waitForMost(futures, 1, operation_failed(), 1.0)); // Error ignored
}
{
futures = { goodTestFuture(1), goodTestFuture(2), badTestFuture(1, success()) };
try {
wait(waitForMost(futures, 0, operation_failed(), 1.0));
ASSERT(false);
} catch (Error& e) {
ASSERT_EQ(e.code(), error_code_operation_failed);
}
}
return Void();
}

View File

@ -267,7 +267,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<Promise<int64_t>> getAverageShardBytes,
PromiseStream<Promise<int>> getUnhealthyRelocationCount,
FutureStream<Promise<int>> getUnhealthyRelocationCount,
UID distributorId,
int teamSize,
int singleRegionTeamSize,

View File

@ -1027,6 +1027,16 @@ struct DDQueueData {
validate();
}
int getHighestPriorityRelocation() const {
int highestPriority{ 0 };
for (const auto& [priority, count] : priority_relocations) {
if (count > 0) {
highestPriority = std::max(highestPriority, priority);
}
}
return highestPriority;
}
};
static std::string destServersString(std::vector<std::pair<Reference<IDataDistributionTeam>, bool>> const& bestTeams) {
@ -1698,7 +1708,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<Promise<int64_t>> getAverageShardBytes,
PromiseStream<Promise<int>> getUnhealthyRelocationCount,
FutureStream<Promise<int>> getUnhealthyRelocationCount,
UID distributorId,
int teamSize,
int singleRegionTeamSize,
@ -1792,12 +1802,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL, TaskPriority::FlushTrace);
int highestPriorityRelocation = 0;
for (auto it = self.priority_relocations.begin(); it != self.priority_relocations.end(); ++it) {
if (it->second) {
highestPriorityRelocation = std::max(highestPriorityRelocation, it->first);
}
}
auto const highestPriorityRelocation = self.getHighestPriorityRelocation();
TraceEvent("MovingData", distributorId)
.detail("InFlight", self.activeRelocations)
@ -1833,9 +1838,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(balancingFutures))) {}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount.getFuture())) {
r.send(self.unhealthyRelocations);
}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); }
}
}
} catch (Error& e) {

View File

@ -790,6 +790,7 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
{ "EstPendCompactBytes", rocksdb::DB::Properties::kEstimatePendingCompactionBytes },
{ "BlockCacheUsage", rocksdb::DB::Properties::kBlockCacheUsage },
{ "BlockCachePinnedUsage", rocksdb::DB::Properties::kBlockCachePinnedUsage },
{ "LiveSstFilesSize", rocksdb::DB::Properties::kLiveSstFilesSize },
};
state std::unordered_map<std::string, uint64_t> readIteratorPoolStats = {
@ -811,7 +812,8 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
for (auto& p : propertyStats) {
auto& [name, property] = p;
stat = 0;
ASSERT(db->GetIntProperty(property, &stat));
// GetAggregatedIntProperty gets the aggregated int property from all column families.
ASSERT(db->GetAggregatedIntProperty(property, &stat));
e.detail(name, stat);
}
@ -1933,7 +1935,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
StorageBytes getStorageBytes() const override {
uint64_t live = 0;
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kLiveSstFilesSize, &live));
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kLiveSstFilesSize, &live));
int64_t free;
int64_t total;

View File

@ -168,19 +168,21 @@ struct UpdateRecoveryDataRequest {
Version lastEpochEnd;
std::vector<CommitProxyInterface> commitProxies;
std::vector<ResolverInterface> resolvers;
Optional<int64_t> versionEpoch;
ReplyPromise<Void> reply;
UpdateRecoveryDataRequest() = default;
UpdateRecoveryDataRequest(Version recoveryTransactionVersion,
Version lastEpochEnd,
const std::vector<CommitProxyInterface>& commitProxies,
const std::vector<ResolverInterface>& resolvers)
const std::vector<ResolverInterface>& resolvers,
Optional<int64_t> versionEpoch)
: recoveryTransactionVersion(recoveryTransactionVersion), lastEpochEnd(lastEpochEnd),
commitProxies(commitProxies), resolvers(resolvers) {}
commitProxies(commitProxies), resolvers(resolvers), versionEpoch(versionEpoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, recoveryTransactionVersion, lastEpochEnd, commitProxies, resolvers, reply);
serializer(ar, recoveryTransactionVersion, lastEpochEnd, commitProxies, resolvers, versionEpoch, reply);
}
};

View File

@ -277,9 +277,8 @@ ACTOR Future<std::vector<StorageServerInterface>> getStorageServers(Database cx,
}
}
ACTOR Future<std::vector<WorkerInterface>> getStorageWorkers(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool localOnly) {
ACTOR Future<std::pair<std::vector<WorkerInterface>, int>>
getStorageWorkers(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo, bool localOnly) {
state std::vector<StorageServerInterface> servers = wait(getStorageServers(cx));
state std::map<NetworkAddress, WorkerInterface> workersMap;
std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
@ -299,7 +298,9 @@ ACTOR Future<std::vector<WorkerInterface>> getStorageWorkers(Database cx,
}
auto masterDcId = dbInfo->get().master.locality.dcId();
std::vector<WorkerInterface> result;
std::pair<std::vector<WorkerInterface>, int> result;
auto& [workerInterfaces, failures] = result;
failures = 0;
for (const auto& server : servers) {
TraceEvent(SevDebug, "DcIdInfo")
.detail("ServerLocalityID", server.locality.dcId())
@ -310,9 +311,10 @@ ACTOR Future<std::vector<WorkerInterface>> getStorageWorkers(Database cx,
TraceEvent(SevWarn, "GetStorageWorkers")
.detail("Reason", "Could not find worker for storage server")
.detail("SS", server.id());
throw operation_failed();
++failures;
} else {
workerInterfaces.push_back(itr->second);
}
result.push_back(itr->second);
}
}
return result;
@ -598,6 +600,31 @@ ACTOR Future<bool> getStorageServersRecruiting(Database cx, WorkerInterface dist
}
}
// Gets the difference between the expected version (based on the version
// epoch) and the actual version.
ACTOR Future<int64_t> getVersionOffset(Database cx,
WorkerInterface distributorWorker,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
loop {
state Transaction tr(cx);
try {
TraceEvent("GetVersionOffset").detail("Stage", "ReadingVersionEpoch");
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state Version rv = wait(tr.getReadVersion());
Optional<Standalone<StringRef>> versionEpochValue = wait(tr.get(versionEpochKey));
if (!versionEpochValue.present()) {
return 0;
}
int64_t versionEpoch = BinaryReader::fromStringRef<int64_t>(versionEpochValue.get(), Unversioned());
int64_t versionOffset = abs(rv - (g_network->timer() * SERVER_KNOBS->VERSIONS_PER_SECOND - versionEpoch));
return versionOffset;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
@ -652,7 +679,8 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
int64_t maxTLogQueueGate = 5e6,
int64_t maxStorageServerQueueGate = 5e6,
int64_t maxDataDistributionQueueSize = 0,
int64_t maxPoppedVersionLag = 30e6) {
int64_t maxPoppedVersionLag = 30e6,
int64_t maxVersionOffset = 1e6) {
state Future<Void> reconfig =
reconfigureAfter(cx, 100 + (deterministicRandom()->random01() * 100), dbInfo, "QuietDatabase");
state Future<int64_t> dataInFlight;
@ -662,6 +690,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
state Future<int64_t> storageQueueSize;
state Future<bool> dataDistributionActive;
state Future<bool> storageServersRecruiting;
state Future<int64_t> versionOffset;
auto traceMessage = "QuietDatabase" + phase + "Begin";
TraceEvent(traceMessage.c_str()).log();
@ -698,10 +727,11 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo);
dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID);
versionOffset = getVersionOffset(cx, distributorWorker, dbInfo);
wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) &&
success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) &&
success(storageServersRecruiting));
success(storageServersRecruiting) && success(versionOffset));
TraceEvent(("QuietDatabase" + phase).c_str())
.detail("DataInFlight", dataInFlight.get())
@ -717,13 +747,17 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
.detail("MaxStorageServerQueueGate", maxStorageServerQueueGate)
.detail("DataDistributionActive", dataDistributionActive.get())
.detail("StorageServersRecruiting", storageServersRecruiting.get())
.detail("RecoveryCount", dbInfo->get().recoveryCount)
.detail("VersionOffset", versionOffset.get())
.detail("NumSuccesses", numSuccesses);
maxVersionOffset += dbInfo->get().recoveryCount * SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT;
if (dataInFlight.get() > dataInFlightGate || tLogQueueInfo.get().first > maxTLogQueueGate ||
tLogQueueInfo.get().second > maxPoppedVersionLag ||
dataDistributionQueueSize.get() > maxDataDistributionQueueSize ||
storageQueueSize.get() > maxStorageServerQueueGate || !dataDistributionActive.get() ||
storageServersRecruiting.get() || !teamCollectionValid.get()) {
storageServersRecruiting.get() || versionOffset.get() > maxVersionOffset ||
!teamCollectionValid.get()) {
wait(delay(1.0));
numSuccesses = 0;
@ -779,6 +813,10 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "storageServersRecruiting");
}
if (versionOffset.isReady() && versionOffset.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "versionOffset");
}
wait(delay(1.0));
numSuccesses = 0;
}
@ -794,7 +832,8 @@ Future<Void> quietDatabase(Database const& cx,
int64_t maxTLogQueueGate,
int64_t maxStorageServerQueueGate,
int64_t maxDataDistributionQueueSize,
int64_t maxPoppedVersionLag) {
int64_t maxPoppedVersionLag,
int64_t maxVersionOffset) {
return waitForQuietDatabase(cx,
dbInfo,
phase,
@ -802,5 +841,6 @@ Future<Void> quietDatabase(Database const& cx,
maxTLogQueueGate,
maxStorageServerQueueGate,
maxDataDistributionQueueSize,
maxPoppedVersionLag);
maxPoppedVersionLag,
maxVersionOffset);
}

View File

@ -46,9 +46,11 @@ Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<S
Future<Void> repairDeadDatacenter(Database const& cx,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string const& context);
Future<std::vector<WorkerInterface>> getStorageWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
bool const& localOnly);
// Returns list of worker interfaces for available storage servers and the number of unavailable
// storage servers
Future<std::pair<std::vector<WorkerInterface>, int>>
getStorageWorkers(Database const& cx, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo, bool const& localOnly);
Future<std::vector<WorkerInterface>> getCoordWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);

View File

@ -769,11 +769,13 @@ struct InitializeStorageRequest {
Optional<std::pair<UID, Version>>
tssPairIDAndVersion; // Only set if recruiting a tss. Will be the UID and Version of its SS pair.
UID clusterId; // Unique cluster identifier. Only needed at recruitment, will be read from txnStateStore on recovery
Version initialClusterVersion;
ReplyPromise<InitializeStorageReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, clusterId);
serializer(
ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, clusterId, initialClusterVersion);
}
};
@ -1088,6 +1090,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version startVersion,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include <algorithm>
#include <iterator>
#include "fdbrpc/sim_validation.h"
@ -50,6 +51,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Version version; // The last version assigned to a proxy by getVersion()
double lastVersionTime;
Optional<Version> referenceVersion;
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
@ -155,12 +157,36 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
if (BUGGIFY) {
t1 = self->lastVersionTime;
}
rep.prevVersion = self->version;
self->version +=
// Versions should roughly follow wall-clock time, based on the
// system clock of the current machine and an FDB-specific epoch.
// Calculate the expected version and determine whether we need to
// hand out versions faster or slower to stay in sync with the
// clock.
Version toAdd =
std::max<Version>(1,
std::min<Version>(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS,
SERVER_KNOBS->VERSIONS_PER_SECOND * (t1 - self->lastVersionTime)));
rep.prevVersion = self->version;
if (self->referenceVersion.present()) {
Version expected =
g_network->timer() * SERVER_KNOBS->VERSIONS_PER_SECOND - self->referenceVersion.get();
// Attempt to jump directly to the expected version. But make
// sure that versions are still being handed out at a rate
// around VERSIONS_PER_SECOND. This rate is scaled depending on
// how far off the calculated version is from the expected
// version.
int64_t maxOffset = std::min(static_cast<int64_t>(toAdd * SERVER_KNOBS->MAX_VERSION_RATE_MODIFIER),
SERVER_KNOBS->MAX_VERSION_RATE_OFFSET);
self->version =
std::clamp(expected, self->version + toAdd - maxOffset, self->version + toAdd + maxOffset);
ASSERT_GT(self->version, rep.prevVersion);
} else {
self->version = self->version + toAdd;
}
TEST(self->version - rep.prevVersion == 1); // Minimum possible version gap
bool maxVersionGap = self->version - rep.prevVersion == SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;
@ -275,7 +301,8 @@ ACTOR Future<Void> updateRecoveryData(Reference<MasterData> self) {
TraceEvent("UpdateRecoveryData", self->dbgid)
.detail("RecoveryTxnVersion", req.recoveryTransactionVersion)
.detail("LastEpochEnd", req.lastEpochEnd)
.detail("NumCommitProxies", req.commitProxies.size());
.detail("NumCommitProxies", req.commitProxies.size())
.detail("VersionEpoch", req.versionEpoch);
if (self->recoveryTransactionVersion == invalidVersion ||
req.recoveryTransactionVersion > self->recoveryTransactionVersion) {
@ -291,6 +318,16 @@ ACTOR Future<Void> updateRecoveryData(Reference<MasterData> self) {
self->lastCommitProxyVersionReplies[p.id()] = CommitProxyVersionReplies();
}
}
if (req.versionEpoch.present()) {
self->referenceVersion = req.versionEpoch.get();
} else if (BUGGIFY) {
// Cannot use a positive version epoch in simulation because of the
// clock starting at 0. A positive version epoch would mean the initial
// cluster version was negative.
// TODO: Increase the size of this interval after fixing the issue
// with restoring ranges with large version gaps.
self->referenceVersion = deterministicRandom()->randomInt64(-1e6, 0);
}
self->resolutionBalancer.setCommitProxies(req.commitProxies);
self->resolutionBalancer.setResolvers(req.resolvers);

View File

@ -440,6 +440,8 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
Version metadataCreateVersion = invalidVersion;
bool removing = false;
bool destroyed = false;
bool possiblyDestroyed = false;
KeyRangeMap<std::unordered_map<UID, Promise<Void>>> moveTriggers;
@ -472,6 +474,13 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
}
// TODO: may be more cleanup possible here
}
void destroy(Version destroyVersion) {
removing = true;
destroyed = true;
moved(range);
newMutations.trigger();
}
};
class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> {
@ -795,6 +804,9 @@ public:
Reference<ILogSystem::IPeekCursor> logCursor;
Promise<UID> clusterId;
// The version the cluster starts on. This value is not persisted and may
// not be valid after a recovery.
Version initialClusterVersion = invalidVersion;
UID thisServerID;
Optional<UID> tssPairID; // if this server is a tss, this is the id of its (ss) pair
Optional<UID> ssPairID; // if this server is an ss, this is the id of its (tss) pair
@ -1959,6 +1971,12 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
for (auto& it : rangeIds) {
reply.rangeIds.push_back(OverlappingChangeFeedEntry(
it.first, std::get<0>(it.second), std::get<1>(it.second), std::get<2>(it.second)));
TraceEvent(SevDebug, "OverlappingChangeFeedEntry", data->thisServerID)
.detail("MinVersion", req.minVersion)
.detail("FeedID", it.first)
.detail("Range", std::get<0>(it.second))
.detail("EmptyVersion", std::get<1>(it.second))
.detail("StopVersion", std::get<2>(it.second));
}
// Make sure all of the metadata we are sending won't get rolled back
@ -4759,6 +4777,9 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
}
}
// global validation that missing refreshed feeds were previously destroyed
static std::unordered_set<Key> allDestroyedChangeFeeds;
// We have to store the version the change feed was stopped at in the SS instead of just the stopped status
// In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds
// from other SS correctly
@ -4799,33 +4820,35 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
.detail("RangeID", req.rangeID.printable())
.detail("Version", req.version)
.detail("SSVersion", self->version.get())
.detail("Range", req.range.toString());
.detail("Range", req.range);
if (req.version - 1 > feed->second->emptyVersion) {
feed->second->emptyVersion = req.version - 1;
while (!feed->second->mutations.empty() && feed->second->mutations.front().version < req.version) {
feed->second->mutations.pop_front();
}
Version durableVersion = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(durableVersion);
self->addMutationToMutationLog(
mLV,
MutationRef(
MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
changeFeedSSValue(feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
if (feed->second->storageVersion != invalidVersion) {
++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, req.version)));
if (req.version > feed->second->storageVersion) {
feed->second->storageVersion = invalidVersion;
feed->second->durableVersion = invalidVersion;
if (!feed->second->destroyed) {
Version durableVersion = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(durableVersion);
self->addMutationToMutationLog(
mLV,
MutationRef(
MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
changeFeedSSValue(feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
if (feed->second->storageVersion != invalidVersion) {
++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, req.version)));
if (req.version > feed->second->storageVersion) {
feed->second->storageVersion = invalidVersion;
feed->second->durableVersion = invalidVersion;
}
}
wait(self->durableVersion.whenAtLeast(durableVersion));
}
wait(self->durableVersion.whenAtLeast(durableVersion));
}
req.reply.send(Void());
return Void();
@ -5004,7 +5027,9 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
.errorUnsuppressed(e)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("EndVersion", endVersion);
.detail("EndVersion", endVersion)
.detail("Removing", changeFeedInfo->removing)
.detail("Destroyed", changeFeedInfo->destroyed);
throw;
}
}
@ -5101,6 +5126,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
}
}
state bool seenNotRegistered = false;
loop {
try {
Version maxFetched = wait(fetchChangeFeedApplier(data,
@ -5117,19 +5143,110 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
throw;
}
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
// TODO REMOVE
fmt::print("DBG: SS {} Feed {} possibly destroyed {}, {} metadata create, {} desired committed\n",
data->thisServerID.toString().substr(0, 4),
changeFeedInfo->id.printable(),
changeFeedInfo->possiblyDestroyed,
changeFeedInfo->metadataCreateVersion,
data->desiredOldestVersion.get());
// There are two reasons for change_feed_not_registered:
// 1. The feed was just created, but the ss mutation stream is ahead of the GRV that fetchChangeFeedApplier
// uses to read the change feed data from the database. In this case we need to wait and retry
// 2. The feed was destroyed, but we missed a metadata update telling us this. In this case we need to destroy
// the feed
// endVersion >= the metadata create version, so we can safely use it as a proxy
if (beginVersion != 0 || seenNotRegistered || endVersion <= data->desiredOldestVersion.get()) {
// If any of these are true, the feed must be destroyed.
Version cleanupVersion = data->data().getLatestVersion();
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetch", data->thisServerID)
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range.toString())
.detail("Version", cleanupVersion);
if (g_network->isSimulated()) {
ASSERT(allDestroyedChangeFeeds.count(changeFeedInfo->id));
}
Key beginClearKey = changeFeedInfo->id.withPrefix(persistChangeFeedKeys.begin);
auto& mLV = data->addVersionToMutationLog(cleanupVersion);
data->addMutationToMutationLog(
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(changeFeedInfo->id, 0),
changeFeedDurableKey(changeFeedInfo->id, cleanupVersion)));
++data->counters.kvSystemClearRanges;
changeFeedInfo->destroy(cleanupVersion);
data->changeFeedCleanupDurable[changeFeedInfo->id] = cleanupVersion;
for (auto& it : data->changeFeedRemovals) {
it.second.send(changeFeedInfo->id);
}
return invalidVersion;
}
// otherwise assume the feed just hasn't been created on the SS we tried to read it from yet, wait for it to
// definitely be committed and retry
seenNotRegistered = true;
wait(data->desiredOldestVersion.whenAtLeast(endVersion));
}
}
ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
KeyRange keys,
Version fetchVersion,
PromiseStream<Key> removals) {
PromiseStream<Key> removals,
UID fetchKeysID) {
// Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of the
// ChangeServerKeys mutation. This guarantees we don't miss any metadata between the previous batch's version
// (data->version) and the mutation version.
wait(data->version.whenAtLeast(data->version.get() + 1));
state Version fetchVersion = data->version.get();
TraceEvent(SevDebug, "FetchChangeFeedMetadata", data->thisServerID)
.detail("Range", keys.toString())
.detail("FetchVersion", fetchVersion);
state std::vector<OverlappingChangeFeedEntry> feeds =
wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1));
.detail("Range", keys)
.detail("FetchVersion", fetchVersion)
.detail("FKID", fetchKeysID);
state std::set<Key> refreshedFeedIds;
state std::set<Key> destroyedFeedIds;
// before fetching feeds from other SS's, refresh any feeds we already have that are being marked as removed
auto ranges = data->keyChangeFeed.intersectingRanges(keys);
for (auto& r : ranges) {
for (auto& cfInfo : r.value()) {
auto feedCleanup = data->changeFeedCleanupDurable.find(cfInfo->id);
if (feedCleanup != data->changeFeedCleanupDurable.end() && cfInfo->removing && !cfInfo->destroyed) {
TEST(true); // re-fetching feed scheduled for deletion! Un-mark it as removing
destroyedFeedIds.insert(cfInfo->id);
cfInfo->removing = false;
// because we now have a gap in the metadata, it's possible this feed was destroyed
cfInfo->possiblyDestroyed = true;
// reset fetch versions because everything previously fetched was cleaned up
cfInfo->fetchVersion = invalidVersion;
cfInfo->durableFetchVersion = NotifiedVersion();
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfInfo->id.printable())
.detail("Range", cfInfo->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfInfo->emptyVersion)
.detail("StopVersion", cfInfo->stopVersion)
.detail("FKID", fetchKeysID);
}
}
}
state std::vector<OverlappingChangeFeedEntry> feeds = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
// handle change feeds removed while fetching overlapping
while (removals.getFuture().isReady()) {
Key remove = waitNext(removals.getFuture());
for (int i = 0; i < feeds.size(); i++) {
@ -5138,6 +5255,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
}
}
}
std::vector<Key> feedIds;
feedIds.reserve(feeds.size());
// create change feed metadata if it does not exist
@ -5150,16 +5268,23 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfEntry.rangeId.printable())
.detail("Range", cfEntry.range.toString())
.detail("Range", cfEntry.range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfEntry.emptyVersion)
.detail("StopVersion", cfEntry.stopVersion)
.detail("Existing", existing)
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion);
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion)
.detail("FKID", fetchKeysID);
bool addMutationToLog = false;
Reference<ChangeFeedInfo> changeFeedInfo;
auto fid = destroyedFeedIds.find(cfEntry.rangeId);
if (fid != destroyedFeedIds.end()) {
refreshedFeedIds.insert(cfEntry.rangeId);
destroyedFeedIds.erase(fid);
}
if (!existing) {
TEST(cleanupPending); // Fetch change feed which is cleanup pending. This means there was a move away and a
// move back, this will remake the metadata
@ -5180,30 +5305,26 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
addMutationToLog = true;
} else {
changeFeedInfo = existingEntry->second;
auto feedCleanup = data->changeFeedCleanupDurable.find(cfEntry.rangeId);
if (changeFeedInfo->destroyed) {
// race where multiple feeds fetched overlapping change feed, one realized feed was missing and marked
// it removed+destroyed, then this one fetched the same info
continue;
}
// we checked all feeds we already owned in this range at the start to reset them if they were removing, and
// this actor would have been cancelled if a later remove happened
ASSERT(!changeFeedInfo->removing);
if (cfEntry.stopVersion < changeFeedInfo->stopVersion) {
TEST(true); // Change feed updated stop version from fetch metadata
changeFeedInfo->stopVersion = cfEntry.stopVersion;
addMutationToLog = true;
}
if (feedCleanup != data->changeFeedCleanupDurable.end() && changeFeedInfo->removing) {
TEST(true); // re-fetching feed scheduled for deletion! Un-mark it as removing
if (cfEntry.emptyVersion < data->version.get()) {
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
}
changeFeedInfo->removing = false;
// reset fetch versions because everything previously fetched was cleaned up
changeFeedInfo->fetchVersion = invalidVersion;
changeFeedInfo->durableFetchVersion = NotifiedVersion();
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
// it
// We may just want to refactor this so updateStorage does explicit deletes based on
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
// Then we wouldn't have to reset anything here
// don't update empty version past SS version if SS is behind, it can cause issues
if (cfEntry.emptyVersion < data->version.get() && cfEntry.emptyVersion > changeFeedInfo->emptyVersion) {
TEST(true); // Change feed updated empty version from fetch metadata
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
addMutationToLog = true;
}
}
@ -5223,6 +5344,84 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
}
}
}
TEST(!refreshedFeedIds.empty()); // Feed refreshed between move away and move back
TEST(!destroyedFeedIds.empty()); // Feed destroyed between move away and move back
for (auto& feedId : refreshedFeedIds) {
auto existingEntry = data->uidChangeFeed.find(feedId);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
TEST(true); // feed refreshed
continue;
}
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
// it
// We may just want to refactor this so updateStorage does explicit deletes based on
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
// Then we wouldn't have to reset anything here or above
// Do the mutation log update here instead of above to ensure we only add it back to the mutation log if we're
// sure it wasn't deleted in the metadata gap
Version metadataVersion = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(metadataVersion);
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + existingEntry->second->id.toString(),
changeFeedSSValue(existingEntry->second->range,
existingEntry->second->emptyVersion + 1,
existingEntry->second->stopVersion)));
TraceEvent(SevDebug, "PersistingResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", existingEntry->second->id.printable())
.detail("Range", existingEntry->second->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", existingEntry->second->emptyVersion)
.detail("StopVersion", existingEntry->second->stopVersion)
.detail("FKID", fetchKeysID)
.detail("MetadataVersion", metadataVersion);
}
for (auto& feedId : destroyedFeedIds) {
auto existingEntry = data->uidChangeFeed.find(feedId);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
TEST(true); // feed refreshed but then destroyed elsewhere
continue;
}
// TODO REMOVE print
fmt::print("DBG: SS {} fetching feed {} was refreshed but not present!! assuming destroyed\n",
data->thisServerID.toString().substr(0, 4),
feedId.printable());
Version cleanupVersion = data->data().getLatestVersion();
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID)
.detail("RangeID", feedId.printable())
.detail("Range", existingEntry->second->range)
.detail("Version", cleanupVersion)
.detail("FKID", fetchKeysID);
if (g_network->isSimulated()) {
ASSERT(allDestroyedChangeFeeds.count(feedId));
}
Key beginClearKey = feedId.withPrefix(persistChangeFeedKeys.begin);
auto& mLV = data->addVersionToMutationLog(cleanupVersion);
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feedId, 0),
changeFeedDurableKey(feedId, cleanupVersion)));
++data->counters.kvSystemClearRanges;
existingEntry->second->destroy(cleanupVersion);
data->changeFeedCleanupDurable[feedId] = cleanupVersion;
for (auto& it : data->changeFeedRemovals) {
it.second.send(feedId);
}
}
return feedIds;
}
@ -5278,7 +5477,6 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
}
}
if (done) {
data->changeFeedRemovals.erase(fetchKeysID);
return feedMaxFetched;
}
}
@ -5343,8 +5541,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state PromiseStream<Key> removals;
data->changeFeedRemovals[fetchKeysID] = removals;
state Future<std::vector<Key>> fetchCFMetadata =
fetchChangeFeedMetadata(data, keys, data->version.get(), removals);
state Future<std::vector<Key>> fetchCFMetadata = fetchChangeFeedMetadata(data, keys, removals, fetchKeysID);
validate(data);
@ -5689,6 +5886,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
}
}
data->changeFeedRemovals.erase(fetchKeysID);
shard->phase = AddingShard::Waiting;
// Similar to transferred version, but wait for all feed data and
@ -5909,7 +6108,8 @@ void changeServerKeys(StorageServer* data,
data->watches.triggerRange(range.begin, range.end);
} else if (!dataAvailable) {
// SOMEDAY: Avoid restarting adding/transferred shards
if (version == 0) { // bypass fetchkeys; shard is known empty at version 0
// bypass fetchkeys; shard is known empty at initial cluster version
if (version == data->initialClusterVersion - 1) {
TraceEvent("ChangeServerKeysInitialRange", data->thisServerID)
.detail("Begin", range.begin)
.detail("End", range.end);
@ -6000,7 +6200,6 @@ void changeServerKeys(StorageServer* data,
auto feed = data->uidChangeFeed.find(f.first);
if (feed != data->uidChangeFeed.end()) {
feed->second->emptyVersion = version - 1;
feed->second->removing = true;
feed->second->moved(feed->second->range);
feed->second->newMutations.trigger();
@ -6302,7 +6501,10 @@ private:
feed->second->durableVersion = invalidVersion;
}
}
addMutationToLog = true;
if (!feed->second->destroyed) {
// if feed is destroyed, adding an extra mutation here would re-create it if SS restarted
addMutationToLog = true;
}
}
} else if (status == ChangeFeedStatus::CHANGE_FEED_CREATE && createdFeed) {
@ -6338,13 +6540,12 @@ private:
changeFeedDurableKey(feed->second->id, currentVersion)));
++data->counters.kvSystemClearRanges;
feed->second->emptyVersion = currentVersion - 1;
feed->second->stopVersion = currentVersion;
feed->second->removing = true;
feed->second->moved(feed->second->range);
feed->second->newMutations.trigger();
feed->second->destroy(currentVersion);
data->changeFeedCleanupDurable[feed->first] = cleanupVersion;
if (g_network->isSimulated()) {
allDestroyedChangeFeeds.insert(changeFeedId);
}
}
if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) {
@ -6794,7 +6995,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
.detail("Version", cloneCursor2->version().toString());
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
if (ver == 1) {
if (ver == data->initialClusterVersion) {
//TraceEvent("SSPeekMutation", data->thisServerID).log();
// The following trace event may produce a value with special characters
TraceEvent("SSPeekMutation", data->thisServerID)
@ -6910,6 +7111,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
proposedOldestVersion = std::min(proposedOldestVersion, data->version.get() - 1);
proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());
proposedOldestVersion = std::max(proposedOldestVersion, data->initialClusterVersion);
//TraceEvent("StorageServerUpdated", data->thisServerID).detail("Ver", ver).detail("DataVersion", data->version.get())
// .detail("LastTLogVersion", data->lastTLogVersion).detail("NewOldest",
@ -8782,6 +8984,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version startVersion,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
@ -8789,6 +8992,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
state StorageServer self(persistentData, db, ssi);
state Future<Void> ssCore;
self.clusterId.send(clusterId);
self.initialClusterVersion = startVersion;
if (ssi.isTss()) {
self.setTssPair(ssi.tssPairID.get());
ASSERT(self.isTss());

View File

@ -2183,6 +2183,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
[&req](const auto& p) { return p.second != req.storeType; }) ||
req.seedTag != invalidTag)) {
ASSERT(req.clusterId.isValid());
ASSERT(req.initialClusterVersion >= 0);
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
bool isTss = req.tssPairIDAndVersion.present();
@ -2244,6 +2245,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
recruited,
req.seedTag,
req.clusterId,
req.initialClusterVersion,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,
dbInfo,

View File

@ -62,8 +62,9 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
int64_t timeTravelTooOld = 0;
int64_t rowsRead = 0;
int64_t bytesRead = 0;
int64_t purges = 0;
std::vector<Future<Void>> clients;
bool enablePruning;
bool enablePurging;
DatabaseConfiguration config;
@ -79,7 +80,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), testDuration);
timeTravelBufferSize = getOption(options, LiteralStringRef("timeTravelBufferSize"), 100000000);
threads = getOption(options, LiteralStringRef("threads"), 1);
enablePruning = getOption(options, LiteralStringRef("enablePruning"), false /*sharedRandomNumber % 2 == 0*/);
enablePurging = getOption(options, LiteralStringRef("enablePurging"), false /*sharedRandomNumber % 2 == 0*/);
ASSERT(threads >= 1);
if (BGV_DEBUG) {
@ -177,60 +178,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
OldRead(KeyRange range, Version v, RangeResult oldResult) : range(range), v(v), oldResult(oldResult) {}
};
// utility to prune <range> at pruneVersion=<version> with the <force> flag
ACTOR Future<Void> pruneAtVersion(Database cx, KeyRange range, Version version, bool force) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state Key pruneKey;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Value pruneValue = blobGranulePruneValueFor(version, range, force);
tr->atomicOp(
addVersionStampAtEnd(blobGranulePruneKeys.begin), pruneValue, MutationRef::SetVersionstampedKey);
tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString());
state Future<Standalone<StringRef>> fTrVs = tr->getVersionstamp();
wait(tr->commit());
Standalone<StringRef> vs = wait(fTrVs);
pruneKey = blobGranulePruneKeys.begin.withSuffix(vs);
if (BGV_DEBUG) {
fmt::print("pruneAtVersion for range [{0} - {1}) at version {2} succeeded\n",
range.begin.printable(),
range.end.printable(),
version);
}
break;
} catch (Error& e) {
if (BGV_DEBUG) {
fmt::print("pruneAtVersion for range [{0} - {1}) at version {2} encountered error {3}\n",
range.begin.printable(),
range.end.printable(),
version,
e.name());
}
wait(tr->onError(e));
}
}
tr->reset();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> pruneVal = wait(tr->get(pruneKey));
if (!pruneVal.present()) {
return Void();
}
state Future<Void> watchFuture = tr->watch(pruneKey);
wait(tr->commit());
wait(watchFuture);
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> killBlobWorkers(Database cx, BlobGranuleVerifierWorkload* self) {
state Transaction tr(cx);
state std::set<UID> knownWorkers;
@ -272,12 +219,12 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
}
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self, bool allowPruning) {
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self, bool allowPurging) {
state double last = now();
state double endTime = last + self->testDuration;
state std::map<double, OldRead> timeTravelChecks;
state int64_t timeTravelChecksMemory = 0;
state Version prevPruneVersion = -1;
state Version prevPurgeVersion = -1;
state UID dbgId = debugRandom()->randomUniqueID();
TraceEvent("BlobGranuleVerifierStart");
@ -300,25 +247,27 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state OldRead oldRead = timeTravelIt->second;
timeTravelChecksMemory -= oldRead.oldResult.expectedSize();
timeTravelIt = timeTravelChecks.erase(timeTravelIt);
if (prevPruneVersion == -1) {
prevPruneVersion = oldRead.v;
if (prevPurgeVersion == -1) {
prevPurgeVersion = oldRead.v;
}
// advance iterator before doing read, so if it gets error we don't retry it
try {
state Version newPruneVersion = 0;
state bool doPruning = allowPruning && deterministicRandom()->random01() < 0.5;
if (doPruning) {
Version maxPruneVersion = oldRead.v;
state Version newPurgeVersion = 0;
state bool doPurging = allowPurging && deterministicRandom()->random01() < 0.5;
if (doPurging) {
Version maxPurgeVersion = oldRead.v;
for (auto& it : timeTravelChecks) {
maxPruneVersion = std::min(it.second.v, maxPruneVersion);
maxPurgeVersion = std::min(it.second.v, maxPurgeVersion);
}
if (prevPruneVersion < maxPruneVersion) {
newPruneVersion = deterministicRandom()->randomInt64(prevPruneVersion, maxPruneVersion);
prevPruneVersion = std::max(prevPruneVersion, newPruneVersion);
wait(self->pruneAtVersion(cx, normalKeys, newPruneVersion, false));
if (prevPurgeVersion < maxPurgeVersion) {
newPurgeVersion = deterministicRandom()->randomInt64(prevPurgeVersion, maxPurgeVersion);
prevPurgeVersion = std::max(prevPurgeVersion, newPurgeVersion);
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, false));
wait(cx->waitPurgeGranulesComplete(purgeKey));
self->purges++;
} else {
doPruning = false;
doPurging = false;
}
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
@ -328,12 +277,12 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
self->timeTravelReads++;
if (doPruning) {
if (doPurging) {
wait(self->killBlobWorkers(cx, self));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPruneVersion));
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
try {
Version minSnapshotVersion = newPruneVersion;
Version minSnapshotVersion = newPurgeVersion;
for (auto& it : versionRead.second) {
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
}
@ -395,10 +344,10 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
Future<Void> start(Database const& cx) override {
clients.reserve(threads + 1);
clients.push_back(timeout(findGranules(cx, this), testDuration, Void()));
if (enablePruning && clientId == 0) {
if (enablePurging && clientId == 0) {
clients.push_back(
timeout(reportErrors(verifyGranules(cx, this, true), "BlobGranuleVerifier"), testDuration, Void()));
} else if (!enablePruning) {
} else if (!enablePurging) {
for (int i = 0; i < threads; i++) {
clients.push_back(timeout(
reportErrors(verifyGranules(cx, this, false), "BlobGranuleVerifier"), testDuration, Void()));
@ -518,6 +467,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
fmt::print(" {} time travel reads\n", self->timeTravelReads);
fmt::print(" {} rows\n", self->rowsRead);
fmt::print(" {} bytes\n", self->bytesRead);
fmt::print(" {} purges\n", self->purges);
// FIXME: add above as details to trace event
TraceEvent("BlobGranuleVerifierChecked").detail("Result", result);

View File

@ -154,9 +154,13 @@ struct DiskFailureInjectionWorkload : TestWorkload {
loop {
wait(poisson(&lastTime, 1));
try {
wait(store(machines, getStorageWorkers(cx, self->dbInfo, false)));
std::pair<std::vector<W>, int> m = wait(getStorageWorkers(cx, self->dbInfo, false));
if (m.second > 0) {
throw operation_failed();
}
machines = std::move(m.first);
} catch (Error& e) {
// If we failed to get a list of storage servers, we can't inject failure events
// If we failed to get a complete list of storage servers, we can't inject failure events
// But don't throw the error in that case
continue;
}

View File

@ -235,7 +235,8 @@ Future<Void> quietDatabase(Database const& cx,
int64_t maxTLogQueueGate = 5e6,
int64_t maxStorageServerQueueGate = 5e6,
int64_t maxDataDistributionQueueSize = 0,
int64_t maxPoppedVersionLag = 30e6);
int64_t maxPoppedVersionLag = 30e6,
int64_t maxVersionOffset = 1e6);
/**
* A utility function for testing error situations. It succeeds if the given test

View File

@ -963,7 +963,7 @@ struct DynamicFieldBase {
if (getDerivedTypeName() == metricTypeName<T>())
return (DynamicField<T>*)this;
TraceEvent(SevWarnAlways, "ScopeEventFieldTypeMismatch")
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "ScopeEventFieldTypeMismatch")
.detail("EventType", eventType.toString())
.detail("FieldName", fieldName().toString())
.detail("OldType", getDerivedTypeName().toString())

View File

@ -35,6 +35,7 @@
#define TRACE_DEFAULT_ROLL_SIZE (10 << 20)
#define TRACE_DEFAULT_MAX_LOGS_SIZE (10 * TRACE_DEFAULT_ROLL_SIZE)
#define PRINTABLE_COMPRESS_NULLS 0
inline int fastrand() {
static int g_seed = 0;
@ -343,20 +344,37 @@ struct TraceableStringImpl : std::true_type {
}
std::string result;
result.reserve(size - nonPrintables + (nonPrintables * 4) + numBackslashes);
int numNull = 0;
for (auto iter = TraceableString<T>::begin(value); !TraceableString<T>::atEnd(value, iter); ++iter) {
if (*iter == '\\') {
if (numNull > 0) {
result += format("[%d]", numNull);
numNull = 0;
}
result.push_back('\\');
result.push_back('\\');
} else if (isPrintable(*iter)) {
if (numNull > 0) {
result += format("[%d]", numNull);
numNull = 0;
}
result.push_back(*iter);
} else {
const uint8_t byte = *iter;
result.push_back('\\');
result.push_back('x');
result.push_back(base16Char(byte / 16));
result.push_back(base16Char(byte));
if (PRINTABLE_COMPRESS_NULLS && byte == 0) {
numNull++;
} else {
result.push_back('\\');
result.push_back('x');
result.push_back(base16Char(byte / 16));
result.push_back(base16Char(byte));
}
}
}
if (numNull > 0) {
result += format("[%d]", numNull);
numNull = 0;
}
return result;
}
};