Merge branch 'main' of github.com:apple/foundationdb into jfu-mako-active-tenants

This commit is contained in:
Jon Fu 2022-06-22 10:31:58 -07:00
commit e3ea9f5c73
44 changed files with 1281 additions and 262 deletions

View File

@ -491,6 +491,28 @@ extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
}
extern "C" DLLEXPORT FDBFuture* fdb_tenant_purge_blob_granules(FDBTenant* tenant,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force) {
return (FDBFuture*)(TENANT(tenant)
->purgeBlobGranules(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)),
purge_version,
force)
.extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_tenant_wait_purge_granules_complete(FDBTenant* tenant,
uint8_t const* purge_key_name,
int purge_key_name_length) {
return (FDBFuture*)(TENANT(tenant)
->waitPurgeGranulesComplete(StringRef(purge_key_name, purge_key_name_length))
.extractPtr());
}
extern "C" DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant) {
try {
TENANT(tenant)->delref();

View File

@ -315,6 +315,18 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_wait_purge_granules_complet
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant,
FDBTransaction** out_transaction);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_purge_blob_granules(FDBTenant* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_wait_purge_granules_complete(FDBTenant* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant);
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);

View File

@ -118,7 +118,7 @@ public:
return T(0);
int index = -1;
bool found = false;
[[maybe_unused]] bool found = false;
if (percentile <= 0.5) { // count up
uint64_t count = zeroPopulationSize;
for (size_t i = 0; i < buckets.size(); i++) {
@ -272,4 +272,4 @@ private:
constexpr static double gamma = 1.189207115;
};
#endif
#endif

View File

@ -157,6 +157,25 @@ Tenant::Tenant(FDBDatabase* db, const uint8_t* name, int name_length) {
}
}
KeyFuture Tenant::purge_blob_granules(FDBTenant* tenant,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force) {
return KeyFuture(fdb_tenant_purge_blob_granules(tenant,
(const uint8_t*)begin_key.data(),
begin_key.size(),
(const uint8_t*)end_key.data(),
end_key.size(),
purge_version,
force));
}
EmptyFuture Tenant::wait_purge_granules_complete(FDBTenant* tenant, std::string_view purge_key) {
return EmptyFuture(
fdb_tenant_wait_purge_granules_complete(tenant, (const uint8_t*)purge_key.data(), purge_key.size()));
}
Tenant::~Tenant() {
if (tenant != nullptr) {
fdb_tenant_destroy(tenant);

View File

@ -98,6 +98,7 @@ public:
private:
friend class Transaction;
friend class Database;
friend class Tenant;
KeyFuture(FDBFuture* f) : Future(f) {}
};
@ -164,6 +165,7 @@ class EmptyFuture : public Future {
private:
friend class Transaction;
friend class Database;
friend class Tenant;
EmptyFuture(FDBFuture* f) : Future(f) {}
};
@ -221,6 +223,14 @@ public:
Tenant(Tenant&&) = delete;
Tenant& operator=(Tenant&&) = delete;
static KeyFuture purge_blob_granules(FDBTenant* tenant,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force);
static EmptyFuture wait_purge_granules_complete(FDBTenant* tenant, std::string_view purge_key);
private:
friend class Transaction;
FDBTenant* tenant;

View File

@ -378,7 +378,10 @@ public:
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
Future<Key> purgeBlobGranules(KeyRange keyRange, Version purgeVersion, bool force = false);
Future<Key> purgeBlobGranules(KeyRange keyRange,
Version purgeVersion,
Optional<TenantName> tenant,
bool force = false);
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
// private:

View File

@ -671,7 +671,6 @@ Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, TenantN
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantFuture = tr->get(tenantMapKey);
Optional<Value> val = wait(safeThreadFutureToFuture(tenantFuture));
@ -685,6 +684,7 @@ Future<Optional<TenantMapEntry>> tryGetTenant(Reference<DB> db, TenantName name)
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
return entry;
} catch (Error& e) {
@ -714,8 +714,10 @@ Future<TenantMapEntry> getTenant(Reference<DB> db, TenantName name) {
}
// Creates a tenant with the given name. If the tenant already exists, an empty optional will be returned.
// The caller must enforce that the tenant ID be unique from all current and past tenants, and it must also be unique
// from all other tenants created in the same transaction.
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantNameRef name) {
Future<std::pair<TenantMapEntry, bool>> createTenantTransaction(Transaction tr, TenantNameRef name, int64_t tenantId) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
if (name.startsWith("\xff"_sr)) {
@ -723,12 +725,10 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN
}
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<TenantMapEntry>> tenantEntryFuture = tryGetTenantTransaction(tr, name);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantDataPrefixFuture =
tr->get(tenantDataPrefixKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type lastIdFuture = tr->get(tenantLastIdKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture =
tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr));
@ -740,12 +740,10 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN
Optional<TenantMapEntry> tenantEntry = wait(tenantEntryFuture);
if (tenantEntry.present()) {
return Optional<TenantMapEntry>();
return std::make_pair(tenantEntry.get(), false);
}
state Optional<Value> lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture));
Optional<Value> tenantDataPrefix = wait(safeThreadFutureToFuture(tenantDataPrefixFuture));
if (tenantDataPrefix.present() &&
tenantDataPrefix.get().size() + TenantMapEntry::ROOT_PREFIX_SIZE > CLIENT_KNOBS->TENANT_PREFIX_SIZE_LIMIT) {
TraceEvent(SevWarnAlways, "TenantPrefixTooLarge")
@ -757,8 +755,7 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN
throw client_invalid_operation();
}
state TenantMapEntry newTenant(lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0,
tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr);
state TenantMapEntry newTenant(tenantId, tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr);
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
tr->getRange(prefixRange(newTenant.prefix), 1);
@ -767,20 +764,21 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN
throw tenant_prefix_allocator_conflict();
}
tr->set(tenantLastIdKey, TenantMapEntry::idToPrefix(newTenant.id));
tr->set(tenantMapKey, encodeTenantEntry(newTenant));
return newTenant;
return std::make_pair(newTenant, true);
}
ACTOR template <class DB>
Future<Void> createTenant(Reference<DB> db, TenantName name) {
Future<TenantMapEntry> createTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state typename DB::TransactionT::template FutureT<Optional<Value>> lastIdFuture = tr->get(tenantLastIdKey);
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
@ -791,7 +789,10 @@ Future<Void> createTenant(Reference<DB> db, TenantName name) {
firstTry = false;
}
state Optional<TenantMapEntry> newTenant = wait(createTenantTransaction(tr, name));
Optional<Value> lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture));
int64_t tenantId = lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0;
tr->set(tenantLastIdKey, TenantMapEntry::idToPrefix(tenantId));
state std::pair<TenantMapEntry, bool> newTenant = wait(createTenantTransaction(tr, name, tenantId));
if (BUGGIFY) {
throw commit_unknown_result();
@ -805,11 +806,11 @@ Future<Void> createTenant(Reference<DB> db, TenantName name) {
TraceEvent("CreatedTenant")
.detail("Tenant", name)
.detail("TenantId", newTenant.present() ? newTenant.get().id : -1)
.detail("Prefix", newTenant.present() ? (StringRef)newTenant.get().prefix : "Unknown"_sr)
.detail("TenantId", newTenant.first.id)
.detail("Prefix", newTenant.first.prefix)
.detail("Version", tr->getCommittedVersion());
return Void();
return newTenant.first;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
@ -821,7 +822,6 @@ Future<Void> deleteTenantTransaction(Transaction tr, TenantNameRef name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, name));
if (!tenantEntry.present()) {
@ -848,6 +848,7 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
@ -886,7 +887,6 @@ Future<std::map<TenantName, TenantMapEntry>> listTenantsTransaction(Transaction
state KeyRange range = KeyRangeRef(begin, end).withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state typename transaction_future_type<Transaction, RangeResult>::type listFuture =
tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit);
@ -910,6 +910,7 @@ Future<std::map<TenantName, TenantMapEntry>> listTenants(Reference<DB> db,
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
std::map<TenantName, TenantMapEntry> tenants = wait(listTenantsTransaction(tr, begin, end, limit));
return tenants;
} catch (Error& e) {

View File

@ -130,6 +130,9 @@ public:
virtual Reference<ITransaction> createTransaction() = 0;
virtual ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) = 0;
virtual ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};

View File

@ -398,6 +398,38 @@ Reference<ITransaction> DLTenant::createTransaction() {
return Reference<ITransaction>(new DLTransaction(api, tr));
}
ThreadFuture<Key> DLTenant::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
if (!api->tenantPurgeBlobGranules) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->tenantPurgeBlobGranules(tenant,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const uint8_t* key;
int keyLength;
FdbCApi::fdb_error_t error = api->futureGetKey(f, &key, &keyLength);
ASSERT(!error);
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
return Key(KeyRef(key, keyLength), Arena());
});
}
ThreadFuture<Void> DLTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
if (!api->tenantWaitPurgeGranulesComplete) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->tenantWaitPurgeGranulesComplete(tenant, purgeKey.begin(), purgeKey.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// DLDatabase
DLDatabase::DLDatabase(Reference<FdbCApi> api, ThreadFuture<FdbCApi::FDBDatabase*> dbFuture) : api(api), db(nullptr) {
addref();
@ -520,16 +552,16 @@ ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVer
}
ThreadFuture<Key> DLDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
if (!api->purgeBlobGranules) {
if (!api->databasePurgeBlobGranules) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->purgeBlobGranules(db,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
FdbCApi::FDBFuture* f = api->databasePurgeBlobGranules(db,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const uint8_t* key;
@ -543,11 +575,11 @@ ThreadFuture<Key> DLDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Ver
}
ThreadFuture<Void> DLDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
if (!api->waitPurgeGranulesComplete) {
if (!api->databaseWaitPurgeGranulesComplete) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->waitPurgeGranulesComplete(db, purgeKey.begin(), purgeKey.size());
FdbCApi::FDBFuture* f = api->databaseWaitPurgeGranulesComplete(db, purgeKey.begin(), purgeKey.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
@ -624,11 +656,9 @@ void DLApi::init() {
headerVersion >= 700);
loadClientFunction(
&api->databaseCreateSnapshot, lib, fdbCPath, "fdb_database_create_snapshot", headerVersion >= 700);
loadClientFunction(
&api->purgeBlobGranules, lib, fdbCPath, "fdb_database_purge_blob_granules", headerVersion >= 710);
loadClientFunction(&api->waitPurgeGranulesComplete,
&api->databasePurgeBlobGranules, lib, fdbCPath, "fdb_database_purge_blob_granules", headerVersion >= 710);
loadClientFunction(&api->databaseWaitPurgeGranulesComplete,
lib,
fdbCPath,
"fdb_database_wait_purge_granules_complete",
@ -636,6 +666,13 @@ void DLApi::init() {
loadClientFunction(
&api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710);
loadClientFunction(
&api->tenantPurgeBlobGranules, lib, fdbCPath, "fdb_tenant_purge_blob_granules", headerVersion >= 720);
loadClientFunction(&api->tenantWaitPurgeGranulesComplete,
lib,
fdbCPath,
"fdb_tenant_wait_purge_granules_complete",
headerVersion >= 720);
loadClientFunction(&api->tenantDestroy, lib, fdbCPath, "fdb_tenant_destroy", headerVersion >= 710);
loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option", headerVersion >= 0);
@ -1316,6 +1353,16 @@ Reference<ITransaction> MultiVersionTenant::createTransaction() {
tenantState->db->dbState->transactionDefaultOptions));
}
ThreadFuture<Key> MultiVersionTenant::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
auto f = tenantState->db ? tenantState->db->purgeBlobGranules(keyRange, purgeVersion, force)
: ThreadFuture<Key>(Never());
return abortableFuture(f, tenantState->db->dbState->dbVar->get().onChange);
}
ThreadFuture<Void> MultiVersionTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
auto f = tenantState->db ? tenantState->db->waitPurgeGranulesComplete(purgeKey) : ThreadFuture<Void>(Never());
return abortableFuture(f, tenantState->db->dbState->dbVar->get().onChange);
}
MultiVersionTenant::TenantState::TenantState(Reference<MultiVersionDatabase> db, StringRef tenantName)
: tenantVar(new ThreadSafeAsyncVar<Reference<ITenant>>(Reference<ITenant>(nullptr))), tenantName(tenantName), db(db),
closed(false) {

View File

@ -157,18 +157,33 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
FDBFuture* (*purgeBlobGranules)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*databasePurgeBlobGranules)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*waitPurgeGranulesComplete)(FDBDatabase* db, uint8_t const* purge_key_name, int purge_key_name_length);
FDBFuture* (*databaseWaitPurgeGranulesComplete)(FDBDatabase* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
// Tenant
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
FDBFuture* (*tenantPurgeBlobGranules)(FDBTenant* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*tenantWaitPurgeGranulesComplete)(FDBTenant* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
void (*tenantDestroy)(FDBTenant* tenant);
// Transaction
@ -413,6 +428,9 @@ public:
Reference<ITransaction> createTransaction() override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
void addref() override { ThreadSafeReferenceCounted<DLTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLTenant>::delref(); }
@ -672,6 +690,9 @@ public:
Reference<ITransaction> createTransaction() override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::delref(); }

View File

@ -7514,14 +7514,42 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Trans
// FIXME: use streaming range read
state KeyRange currentRange = keyRange;
state Standalone<VectorRef<KeyRangeRef>> results;
state Optional<Key> tenantPrefix;
if (BG_REQUEST_DEBUG) {
fmt::print("Getting Blob Granules for [{0} - {1})\n", keyRange.begin.printable(), keyRange.end.printable());
}
self->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// FIXME: limit to tenant range if set
if (self->getTenant().present()) {
// have to bypass tenant to read system key space, and add tenant prefix to part of mapping
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(self, currentRange.begin));
tenantPrefix = tenantEntry.prefix;
} else {
self->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
}
loop {
state RangeResult blobGranuleMapping = wait(
krmGetRanges(self, blobGranuleMappingKeys.begin, currentRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
state RangeResult blobGranuleMapping;
if (tenantPrefix.present()) {
state Standalone<StringRef> mappingPrefix = tenantPrefix.get().withPrefix(blobGranuleMappingKeys.begin);
// basically krmGetRange, but enable it to not use tenant without RAW_ACCESS by doing manual getRange with
// UseTenant::False
GetRangeLimits limits(1000);
limits.minRows = 2;
RangeResult rawMapping = wait(getRange(self->trState,
self->getReadVersion(),
lastLessOrEqual(keyRange.begin.withPrefix(mappingPrefix)),
firstGreaterThan(keyRange.end.withPrefix(mappingPrefix)),
limits,
Reverse::False,
UseTenant::False));
// strip off mapping prefix
blobGranuleMapping = krmDecodeRanges(mappingPrefix, currentRange, rawMapping);
} else {
wait(store(
blobGranuleMapping,
krmGetRanges(
self, blobGranuleMappingKeys.begin, currentRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)));
}
for (int i = 0; i < blobGranuleMapping.size() - 1; i++) {
if (blobGranuleMapping[i].value.size()) {
@ -9447,24 +9475,33 @@ Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
}
// FIXME: handle tenants?
ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
KeyRange range,
Version purgeVersion,
Optional<TenantName> tenant,
bool force) {
state Database cx(db);
state Transaction tr(cx);
state Key purgeKey;
state KeyRange purgeRange = range;
state bool loadedTenantPrefix = false;
// FIXME: implement force
if (!force) {
throw unsupported_operation();
}
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
if (tenant.present() && !loadedTenantPrefix) {
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin));
loadedTenantPrefix = true;
purgeRange = purgeRange.withPrefix(tenantEntry.prefix);
}
Value purgeValue = blobGranulePurgeValueFor(purgeVersion, range, force);
tr.atomicOp(
addVersionStampAtEnd(blobGranulePurgeKeys.begin), purgeValue, MutationRef::SetVersionstampedKey);
@ -9495,8 +9532,11 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
return purgeKey;
}
Future<Key> DatabaseContext::purgeBlobGranules(KeyRange range, Version purgeVersion, bool force) {
return purgeBlobGranulesActor(Reference<DatabaseContext>::addRef(this), range, purgeVersion, force);
Future<Key> DatabaseContext::purgeBlobGranules(KeyRange range,
Version purgeVersion,
Optional<TenantName> tenant,
bool force) {
return purgeBlobGranulesActor(Reference<DatabaseContext>::addRef(this), range, purgeVersion, tenant, force);
}
ACTOR Future<Void> waitPurgeGranulesCompleteActor(Reference<DatabaseContext> db, Key purgeKey) {

View File

@ -286,6 +286,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY, 120 ); if( randomize && BUGGIFY ) DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY = 5;
init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 10 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 1000;
init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 20 );
init( DD_TENANT_AWARENESS_ENABLED, false );
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2.0 );
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -232,6 +232,8 @@ public:
int DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY;
int DD_STORAGE_WIGGLE_PAUSE_THRESHOLD; // How many unhealthy relocations are ongoing will pause storage wiggle
int DD_STORAGE_WIGGLE_STUCK_THRESHOLD; // How many times bestTeamStuck accumulate will pause storage wiggle
bool DD_TENANT_AWARENESS_ENABLED;
int TENANT_CACHE_LIST_REFRESH_INTERVAL; // How often the TenantCache is refreshed
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -2772,7 +2772,24 @@ Future<RangeResult> TenantMapRangeImpl::getRange(ReadYourWritesTransaction* ryw,
return getTenantList(ryw, kr, limitsHint);
}
ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw, TenantName beginTenant, TenantName endTenant) {
ACTOR Future<Void> createTenants(ReadYourWritesTransaction* ryw, std::vector<TenantNameRef> tenants) {
Optional<Value> lastIdVal = wait(ryw->getTransaction().get(tenantLastIdKey));
int64_t previousId = lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) : -1;
std::vector<Future<Void>> createFutures;
for (auto tenant : tenants) {
createFutures.push_back(
success(ManagementAPI::createTenantTransaction(&ryw->getTransaction(), tenant, ++previousId)));
}
ryw->getTransaction().set(tenantLastIdKey, TenantMapEntry::idToPrefix(previousId));
wait(waitForAll(createFutures));
return Void();
}
ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw,
TenantNameRef beginTenant,
TenantNameRef endTenant) {
std::map<TenantName, TenantMapEntry> tenants = wait(
ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
@ -2795,6 +2812,7 @@ ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw, TenantName
Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransaction* ryw) {
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
std::vector<TenantNameRef> tenantsToCreate;
std::vector<Future<Void>> tenantManagementFutures;
for (auto range : ranges) {
if (!range.value().first) {
@ -2807,8 +2825,7 @@ Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransacti
.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
if (range.value().second.present()) {
tenantManagementFutures.push_back(
success(ManagementAPI::createTenantTransaction(&ryw->getTransaction(), tenantName)));
tenantsToCreate.push_back(tenantName);
} else {
// For a single key clear, just issue the delete
if (KeyRangeRef(range.begin(), range.end()).singleKeyRange()) {
@ -2827,5 +2844,9 @@ Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransacti
}
}
if (tenantsToCreate.size()) {
tenantManagementFutures.push_back(createTenants(ryw, tenantsToCreate));
}
return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
}

View File

@ -131,7 +131,7 @@ ThreadFuture<Key> ThreadSafeDatabase::purgeBlobGranules(const KeyRangeRef& keyRa
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread([db, range, purgeVersion, force]() -> Future<Key> {
return db->purgeBlobGranules(range, purgeVersion, force);
return db->purgeBlobGranules(range, purgeVersion, {}, force);
});
}
@ -174,6 +174,21 @@ Reference<ITransaction> ThreadSafeTenant::createTransaction() {
return Reference<ITransaction>(new ThreadSafeTransaction(db->db, type, name));
}
ThreadFuture<Key> ThreadSafeTenant::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
DatabaseContext* db = this->db->db;
Standalone<StringRef> tenantName = this->name;
KeyRange range = keyRange;
return onMainThread([db, range, purgeVersion, tenantName, force]() -> Future<Key> {
return db->purgeBlobGranules(range, purgeVersion, tenantName, force);
});
}
ThreadFuture<Void> ThreadSafeTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
DatabaseContext* db = this->db->db;
Key key = purgeKey;
return onMainThread([db, key]() -> Future<Void> { return db->waitPurgeGranulesComplete(key); });
}
ThreadSafeTenant::~ThreadSafeTenant() {}
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,

View File

@ -84,6 +84,9 @@ public:
Reference<ITransaction> createTransaction() override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
void addref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::delref(); }

2
fdbkubernetesmonitor/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
fdbkubernetesmonitor

View File

@ -25,6 +25,7 @@ require (
github.com/fsnotify/fsnotify v1.5.1
github.com/go-logr/logr v1.2.0
github.com/go-logr/zapr v1.2.0
github.com/prometheus/client_golang v1.12.2
github.com/spf13/pflag v1.0.5
go.uber.org/zap v1.19.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
@ -37,6 +38,8 @@ require (
require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
@ -49,11 +52,15 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.18.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.6.0 // indirect

View File

@ -51,11 +51,23 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@ -90,6 +102,12 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE=
@ -105,7 +123,9 @@ github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng=
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@ -189,13 +209,21 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -206,17 +234,22 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
@ -238,18 +271,45 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.2 h1:51L9cDoUHVrXx4zWYlcLQIZ+d+VXHgqnYKkIuq4g/34=
github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
@ -279,6 +339,7 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -323,6 +384,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@ -330,6 +392,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -358,6 +421,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
@ -372,6 +436,7 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -386,10 +451,13 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -401,6 +469,7 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -414,6 +483,8 @@ golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -422,6 +493,7 @@ golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -429,9 +501,11 @@ golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@ -610,6 +684,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@ -626,6 +701,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=

View File

@ -55,6 +55,8 @@ var (
currentContainerVersion string
additionalEnvFile string
processCount int
enablePprof bool
listenAddress string
)
type executionMode string
@ -102,6 +104,8 @@ func main() {
pflag.StringVar(&mainContainerVersion, "main-container-version", "", "For sidecar mode, this specifies the version of the main container. If this is equal to the current container version, no files will be copied")
pflag.StringVar(&additionalEnvFile, "additional-env-file", "", "A file with additional environment variables to use when interpreting the monitor configuration")
pflag.IntVar(&processCount, "process-count", 1, "The number of processes to start")
pflag.BoolVar(&enablePprof, "enable-pprof", false, "Enables /debug/pprof endpoints on the listen address")
pflag.StringVar(&listenAddress, "listen-address", ":8081", "An address and port to listen on")
pflag.Parse()
logger := zapr.NewLogger(initLogger(logPath))
@ -126,7 +130,7 @@ func main() {
logger.Error(err, "Error loading additional environment")
os.Exit(1)
}
StartMonitor(logger, fmt.Sprintf("%s/%s", inputDir, monitorConfFile), customEnvironment, processCount)
StartMonitor(logger, fmt.Sprintf("%s/%s", inputDir, monitorConfFile), customEnvironment, processCount, listenAddress, enablePprof)
case executionModeInit:
err = CopyFiles(logger, outputDir, copyDetails, requiredCopies)
if err != nil {

View File

@ -24,6 +24,8 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/pprof"
"os"
"os/exec"
"os/signal"
@ -33,6 +35,7 @@ import (
"time"
"k8s.io/utils/pointer"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
"github.com/fsnotify/fsnotify"
@ -88,7 +91,7 @@ type Monitor struct {
}
// StartMonitor starts the monitor loop.
func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int) {
func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int, listenAddr string, enableDebug bool) {
podClient, err := CreatePodClient(logger)
if err != nil {
panic(err)
@ -103,6 +106,33 @@ func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[s
}
go func() { monitor.WatchPodTimestamps() }()
mux := http.NewServeMux()
// Enable pprof endpoints for debugging purposes.
if enableDebug {
mux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
mux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
mux.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
mux.Handle("/debug/pprof/block", pprof.Handler("block"))
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
// Add Prometheus support
mux.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(listenAddr, mux)
if err != nil {
logger.Error(err, "could not start HTTP server")
os.Exit(1)
}
}()
monitor.Run()
}

View File

@ -164,6 +164,8 @@ set(FDBSERVER_SRCS
TCInfo.h
template_fdb.h
tester.actor.cpp
TenantCache.actor.cpp
TenantCache.h
TesterInterface.actor.h
TLogInterface.h
TLogServer.actor.cpp

View File

@ -28,6 +28,7 @@
#include "fdbclient/SystemData.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
@ -208,11 +209,13 @@ ACTOR Future<Void> handleLeaderReplacement(Reference<ClusterRecoveryData> self,
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
ClusterControllerData::DBInfo* db,
ServerCoordinators coordinators,
Future<Void> leaderFail) {
Future<Void> leaderFail,
Future<Void> recoveredDiskFiles) {
state MasterInterface iMaster;
state Reference<ClusterRecoveryData> recoveryData;
state PromiseStream<Future<Void>> addActor;
state Future<Void> recoveryCore;
state bool recoveredDisk = false;
// SOMEDAY: If there is already a non-failed master referenced by zkMasterInfo, use that one until it fails
// When this someday is implemented, make sure forced failures still cause the master to be recruited again
@ -254,6 +257,18 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
.detail("ChangeID", dbInfo.id);
db->serverInfo->set(dbInfo);
if (SERVER_KNOBS->ENABLE_ENCRYPTION && !recoveredDisk) {
// EKP singleton recruitment waits for 'Master/Sequencer' recruitment, execute wait for
// 'recoveredDiskFiles' optimization once EKP recruitment is unblocked to avoid circular dependencies
// with StorageServer initialization. The waiting for recoveredDiskFiles is to make sure the worker
// server on the same process has been registered with the new CC before recruitment.
wait(recoveredDiskFiles);
TraceEvent("CCWDB_RecoveredDiskFiles", cluster->id).log();
// Need to be done for the first once in the lifetime of ClusterController
recoveredDisk = true;
}
state Future<Void> spinDelay = delay(
SERVER_KNOBS
->MASTER_SPIN_DELAY); // Don't retry cluster recovery more than once per second, but don't delay
@ -2511,7 +2526,8 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
Future<Void> leaderFail,
ServerCoordinators coordinators,
LocalityData locality,
ConfigDBType configDBType) {
ConfigDBType configDBType,
Future<Void> recoveredDiskFiles) {
state ClusterControllerData self(interf, locality, coordinators);
state ConfigBroadcaster configBroadcaster(coordinators, configDBType);
state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY);
@ -2522,7 +2538,8 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
if (SERVER_KNOBS->ENABLE_ENCRYPTION || g_network->isSimulated()) {
self.addActor.send(monitorEncryptKeyProxy(&self));
}
self.addActor.send(clusterWatchDatabase(&self, &self.db, coordinators, leaderFail)); // Start the master database
self.addActor.send(clusterWatchDatabase(
&self, &self.db, coordinators, leaderFail, recoveredDiskFiles)); // Start the master database
self.addActor.send(self.updateWorkerList.init(self.db.db));
self.addActor.send(statusServer(interf.clientInterface.databaseStatus.getFuture(),
&self,
@ -2651,7 +2668,8 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
bool hasConnected,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
LocalityData locality,
ConfigDBType configDBType) {
ConfigDBType configDBType,
Future<Void> recoveredDiskFiles) {
loop {
state ClusterControllerFullInterface cci;
state bool inRole = false;
@ -2678,7 +2696,7 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
inRole = true;
wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType));
wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType, recoveredDiskFiles));
}
} catch (Error& e) {
if (inRole)
@ -2703,12 +2721,27 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
Future<Void> recoveredDiskFiles,
LocalityData locality,
ConfigDBType configDBType) {
wait(recoveredDiskFiles);
// Defer this wait optimization of cluster configuration has 'Encryption data at-rest' enabled.
// Encryption depends on available of EncryptKeyProxy (EKP) FDB role to enable fetch/refresh of encryption keys
// created and managed by external KeyManagementService (KMS).
//
// TODO: Wait optimization is to ensure the worker server on the same process gets registered with the new CC before
// recruitment. Unify the codepath for both Encryption enable vs disable scenarios.
if (!SERVER_KNOBS->ENABLE_ENCRYPTION) {
wait(recoveredDiskFiles);
TraceEvent("RecoveredDiskFiles").log();
} else {
TraceEvent("RecoveredDiskFiles_Deferred").log();
}
state bool hasConnected = false;
loop {
try {
ServerCoordinators coordinators(connRecord);
wait(clusterController(coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType));
wait(clusterController(
coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType, recoveredDiskFiles));
hasConnected = true;
} catch (Error& e) {
if (e.code() != error_code_coordinators_changed)

View File

@ -465,6 +465,7 @@ public:
[compactionVersion](const auto& va) { return va.version > compactionVersion; });
annotationHistory.erase(annotationHistory.begin(), it);
}
lastCompactedVersion = compactionVersion;
}
Future<Void> getError() const { return consumerFuture || actors.getResult(); }

View File

@ -299,6 +299,8 @@ public:
return readFrom.checkEventually(member, value);
}
JsonBuilderObject getStatus() const { return broadcaster.getStatus(); }
void changeBroadcaster() {
broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{});
@ -453,6 +455,8 @@ public:
void restartNode() { writeTo.restartNode(); }
JsonBuilderObject getStatus() const { return broadcaster.getStatus(); }
void changeBroadcaster() {
broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{});
@ -565,7 +569,7 @@ Future<Void> testRestartLocalConfigAndChangeClass(UnitTestParameters params) {
}
ACTOR template <class Env>
Future<Void> testNewLocalConfigAfterCompaction(UnitTestParameters params) {
Future<std::string> testNewLocalConfigAfterCompaction(UnitTestParameters params) {
state Env env(params.getDataDir(), "class-A");
wait(env.setup(ConfigClassSet({ "class-A"_sr })));
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
@ -581,7 +585,7 @@ Future<Void> testNewLocalConfigAfterCompaction(UnitTestParameters params) {
wait(check(env, &TestKnobs::TEST_LONG, Optional<int64_t>{ 1 }));
wait(set(env, "class-A"_sr, "test_long"_sr, 2));
wait(check(env, &TestKnobs::TEST_LONG, Optional<int64_t>{ 2 }));
return Void();
return env.getStatus().getJson();
}
ACTOR template <class Env>
@ -837,7 +841,24 @@ TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/Compact") {
}
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/RestartLocalConfigurationAfterCompaction") {
wait(testNewLocalConfigAfterCompaction<BroadcasterToLocalConfigEnvironment>(params));
std::string statusStr = wait(testNewLocalConfigAfterCompaction<BroadcasterToLocalConfigEnvironment>(params));
json_spirit::mValue status;
ASSERT(json_spirit::read_string(statusStr, status));
ASSERT(status.type() == json_spirit::obj_type);
auto lastCompacted = status.get_obj().at("last_compacted_version").get_int64();
ASSERT_EQ(lastCompacted, 1);
auto mostRecent = status.get_obj().at("most_recent_version").get_int64();
ASSERT_EQ(mostRecent, 2);
auto commits = status.get_obj().at("commits");
// The unit test does not include annotations when running the set
// operation.
ASSERT_EQ(commits.get_array().size(), 0);
auto mutations = status.get_obj().at("mutations");
ASSERT_EQ(mutations.get_array().size(), 1);
auto snapshot = status.get_obj().at("snapshot");
auto classA = snapshot.get_obj().at("class-A");
auto value = classA.get_obj().at("test_long").get_str();
ASSERT(value == "int64_t:2");
return Void();
}
@ -886,7 +907,10 @@ TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/CompactNode") {
}
TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/RestartLocalConfigurationAfterCompaction") {
wait(testNewLocalConfigAfterCompaction<TransactionToLocalConfigEnvironment>(params));
// TransactionToLocalConfigEnvironment only calls into ConfigNode compact.
// It does not interact with the broadcaster, thus status json won't be
// updated.
wait(success(testNewLocalConfigAfterCompaction<TransactionToLocalConfigEnvironment>(params)));
return Void();
}

View File

@ -3830,7 +3830,8 @@ int DDTeamCollection::overlappingMachineMembers(std::vector<Standalone<StringRef
void DDTeamCollection::addTeam(const std::vector<Reference<TCServerInfo>>& newTeamServers,
IsInitialTeam isInitialTeam,
IsRedundantTeam redundantTeam) {
auto teamInfo = makeReference<TCTeamInfo>(newTeamServers);
Optional<Reference<TCTenantInfo>> no_tenant = {};
auto teamInfo = makeReference<TCTeamInfo>(newTeamServers, no_tenant);
// Move satisfiesPolicy to the end for performance benefit
auto badTeam = IsBadTeam{ redundantTeam || teamInfo->size() != configuration.storageTeamSize ||

View File

@ -49,6 +49,7 @@
#include "flow/UnitTest.h"
class TCTeamInfo;
class TCTenantInfo;
class TCMachineInfo;
class TCMachineTeamInfo;

View File

@ -19,6 +19,7 @@
*/
#include <set>
#include <string>
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBOptions.g.h"
@ -28,6 +29,7 @@
#include "fdbclient/RunTransaction.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tenant.h"
#include "fdbrpc/Replication.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/DDTeamCollection.h"
@ -37,6 +39,7 @@
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TenantCache.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WaitFailure.h"
#include "flow/ActorCollection.h"
@ -513,6 +516,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
state Reference<DDTeamCollection> primaryTeamCollection;
state Reference<DDTeamCollection> remoteTeamCollection;
state bool trackerCancelled;
state bool ddIsTenantAware = SERVER_KNOBS->DD_TENANT_AWARENESS_ENABLED;
loop {
trackerCancelled = false;
@ -597,6 +601,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
// mode may be set true by system operator using fdbcli and isDDEnabled() set to true
break;
}
TraceEvent("DataDistributionDisabled", self->ddId).log();
TraceEvent("MovingData", self->ddId)
@ -637,6 +642,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
TraceEvent("DataDistributionEnabled").log();
}
state Reference<TenantCache> ddTenantCache;
if (ddIsTenantAware) {
ddTenantCache = makeReference<TenantCache>(cx, self->ddId);
wait(ddTenantCache->build(cx));
}
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
ASSERT(configuration.storageTeamSize > 0);
@ -705,6 +716,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
} else {
anyZeroHealthyTeams = zeroHealthyTeams[0];
}
if (ddIsTenantAware) {
actors.push_back(reportErrorsExcept(
ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors()));
}
actors.push_back(pollMoveKeysLock(cx, lock, ddEnabledState));
actors.push_back(reportErrorsExcept(dataDistributionTracker(initData,

View File

@ -96,12 +96,19 @@ public:
.detail("HasMessage", self->cursor->hasMessage())
.detail("Version", self->cursor->version().version);
if (self->cursor->popped() != 0 || (!self->hasDiscardedData && BUGGIFY_WITH_PROB(0.01))) {
bool buggify = !self->hasDiscardedData && BUGGIFY_WITH_PROB(0.01);
if (self->cursor->popped() != 0 || buggify) {
TraceEvent(SevWarnAlways, "DiskQueueAdapterReset")
.detail("Version", self->cursor->popped())
.detail("PeekTypeSwitch", self->peekTypeSwitches % 3);
TEST(true); // disk adapter reset
TraceEvent(SevWarnAlways, "DiskQueueAdapterReset").detail("Version", self->cursor->popped());
if (self->cursor->popped() != 0) {
self->recoveryLoc = self->cursor->popped();
} else {
self->recoveryLoc = self->startLoc;
}
self->recoveryQueue.clear();
self->recoveryQueueDataSize = 0;
self->recoveryLoc = self->cursor->popped();
self->recoveryQueueLoc = self->recoveryLoc;
self->totalRecoveredBytes = 0;
if (self->peekTypeSwitches % 3 == 1) {

View File

@ -61,8 +61,8 @@ public:
Version txsPoppedVersion,
bool recover)
: peekLocality(peekLocality), peekTypeSwitches(0), enableRecovery(recover), logSystem(logSystem),
recoveryLoc(txsPoppedVersion), recoveryQueueLoc(txsPoppedVersion), recoveryQueueDataSize(0), poppedUpTo(0),
nextCommit(1), hasDiscardedData(false), totalRecoveredBytes(0) {
startLoc(txsPoppedVersion), recoveryLoc(txsPoppedVersion), recoveryQueueLoc(txsPoppedVersion),
recoveryQueueDataSize(0), poppedUpTo(0), nextCommit(1), hasDiscardedData(false), totalRecoveredBytes(0) {
if (enableRecovery) {
localityChanged = peekLocality ? peekLocality->onChange() : Never();
cursor = logSystem->peekTxs(UID(),
@ -127,7 +127,7 @@ private:
// Recovery state (used while readNext() is being called repeatedly)
bool enableRecovery;
Reference<ILogSystem> logSystem;
Version recoveryLoc, recoveryQueueLoc;
Version startLoc, recoveryLoc, recoveryQueueLoc;
std::vector<Standalone<StringRef>> recoveryQueue;
int recoveryQueueDataSize;

View File

@ -328,6 +328,7 @@ class PaxosConfigConsumerImpl {
}
try {
wait(timeoutError(waitForAll(compactionRequests), 1.0));
broadcaster->compact(compactionVersion);
} catch (Error& e) {
TraceEvent(SevWarn, "ErrorSendingCompactionRequest").error(e);
}

View File

@ -3149,12 +3149,10 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["workload"] = workerStatuses[1];
statusObj["layers"] = workerStatuses[2];
/*
if (configBroadcaster) {
// TODO: Read from coordinators for more up-to-date config database status?
statusObj["configuration_database"] = configBroadcaster->getStatus();
// TODO: Read from coordinators for more up-to-date config database status?
statusObj["configuration_database"] = configBroadcaster->getStatus();
}
*/
// Add qos section if it was populated
if (!qos.empty())

View File

@ -307,9 +307,9 @@ std::string TCMachineTeamInfo::getMachineIDsStr() const {
return std::move(ss).str();
}
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers)
: servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY),
id(deterministicRandom()->randomUniqueID()) {
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers, Optional<Reference<TCTenantInfo>> tenant)
: servers(servers), tenant(tenant), healthy(true), wrongConfiguration(false),
priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY), id(deterministicRandom()->randomUniqueID()) {
if (servers.empty()) {
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log();
}

View File

@ -20,9 +20,13 @@
#pragma once
#include "fdbclient/Tenant.h"
#include "fdbserver/DDTeamCollection.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
class TCTeamInfo;
class TCTenantInfo;
class TCMachineInfo;
class TCMachineTeamInfo;
@ -166,6 +170,7 @@ class TCTeamInfo final : public ReferenceCounted<TCTeamInfo>, public IDataDistri
friend class TCTeamInfoImpl;
std::vector<Reference<TCServerInfo>> servers;
std::vector<UID> serverIDs;
Optional<Reference<TCTenantInfo>> tenant;
bool healthy;
bool wrongConfiguration; // True if any of the servers in the team have the wrong configuration
int priority;
@ -175,7 +180,9 @@ public:
Reference<TCMachineTeamInfo> machineTeam;
Future<Void> tracker;
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers);
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers, Optional<Reference<TCTenantInfo>> tenant);
Optional<Reference<TCTenantInfo>>& getTenant() { return tenant; }
std::string getTeamID() const override { return id.shortString(); }
@ -235,3 +242,24 @@ private:
bool allServersHaveHealthyAvailableSpace() const;
};
class TCTenantInfo : public ReferenceCounted<TCTenantInfo> {
private:
TenantInfo m_tenantInfo;
Key m_prefix;
std::vector<Reference<TCTeamInfo>> m_tenantTeams;
int64_t m_cacheGeneration;
public:
TCTenantInfo() { m_prefix = allKeys.end; }
TCTenantInfo(TenantInfo tinfo, Key prefix) : m_tenantInfo(tinfo), m_prefix(prefix) {}
std::vector<Reference<TCTeamInfo>>& teams() { return m_tenantTeams; }
TenantName name() { return m_tenantInfo.name.get(); }
std::string prefixDesc() { return m_prefix.printable(); }
void addTeam(TCTeamInfo team);
void removeTeam(TCTeamInfo team);
void updateCacheGeneration(int64_t generation) { m_cacheGeneration = generation; }
int64_t cacheGeneration() const { return m_cacheGeneration; }
};

View File

@ -0,0 +1,304 @@
/*
* TenantCache.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/DDTeamCollection.h"
#include "fdbserver/TenantCache.h"
#include <limits>
#include <string>
#include "flow/actorcompiler.h"
class TenantCacheImpl {
ACTOR static Future<RangeResult> getTenantList(TenantCache* tenantCache, Transaction* tr) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state Future<RangeResult> tenantList = tr->getRange(tenantMapKeys, CLIENT_KNOBS->TOO_MANY);
wait(success(tenantList));
ASSERT(!tenantList.get().more && tenantList.get().size() < CLIENT_KNOBS->TOO_MANY);
return tenantList.get();
}
public:
ACTOR static Future<Void> build(TenantCache* tenantCache) {
state Transaction tr(tenantCache->dbcx());
TraceEvent(SevInfo, "BuildingTenantCache", tenantCache->id()).log();
try {
state RangeResult tenantList = wait(getTenantList(tenantCache, &tr));
for (int i = 0; i < tenantList.size(); i++) {
TenantName tname = tenantList[i].key.removePrefix(tenantMapPrefix);
TenantMapEntry t = decodeTenantEntry(tenantList[i].value);
tenantCache->insert(tname, t);
TraceEvent(SevInfo, "TenantFound", tenantCache->id())
.detail("TenantName", tname)
.detail("TenantID", t.id)
.detail("TenantPrefix", t.prefix);
}
} catch (Error& e) {
wait(tr.onError(e));
}
TraceEvent(SevInfo, "BuiltTenantCache", tenantCache->id()).log();
return Void();
}
ACTOR static Future<Void> monitorTenantMap(TenantCache* tenantCache) {
TraceEvent(SevInfo, "StartingTenantCacheMonitor", tenantCache->id()).log();
state Transaction tr(tenantCache->dbcx());
state double lastTenantListFetchTime = now();
loop {
try {
if (now() - lastTenantListFetchTime > (2 * SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL)) {
TraceEvent(SevWarn, "TenantListRefreshDelay", tenantCache->id()).log();
}
state RangeResult tenantList = wait(getTenantList(tenantCache, &tr));
tenantCache->startRefresh();
bool tenantListUpdated = false;
for (int i = 0; i < tenantList.size(); i++) {
TenantName tname = tenantList[i].key.removePrefix(tenantMapPrefix);
TenantMapEntry t = decodeTenantEntry(tenantList[i].value);
if (tenantCache->update(tname, t)) {
tenantListUpdated = true;
}
}
if (tenantCache->cleanup()) {
tenantListUpdated = true;
}
if (tenantListUpdated) {
TraceEvent(SevInfo, "TenantCache", tenantCache->id()).detail("List", tenantCache->desc());
}
lastTenantListFetchTime = now();
tr.reset();
wait(delay(SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL));
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent("TenantCacheGetTenantListError", tenantCache->id())
.errorUnsuppressed(e)
.suppressFor(1.0);
}
wait(tr.onError(e));
}
}
}
};
void TenantCache::insert(TenantName& tenantName, TenantMapEntry& tenant) {
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
ASSERT(tenantCache.find(tenantPrefix) == tenantCache.end());
TenantInfo tenantInfo(tenantName, tenant.id);
tenantCache[tenantPrefix] = makeReference<TCTenantInfo>(tenantInfo, tenant.prefix);
tenantCache[tenantPrefix]->updateCacheGeneration(generation);
}
void TenantCache::startRefresh() {
ASSERT(generation < std::numeric_limits<uint64_t>::max());
generation++;
}
void TenantCache::keep(TenantName& tenantName, TenantMapEntry& tenant) {
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
ASSERT(tenantCache.find(tenantPrefix) != tenantCache.end());
tenantCache[tenantPrefix]->updateCacheGeneration(generation);
}
bool TenantCache::update(TenantName& tenantName, TenantMapEntry& tenant) {
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
if (tenantCache.find(tenantPrefix) != tenantCache.end()) {
keep(tenantName, tenant);
return false;
}
insert(tenantName, tenant);
return true;
}
int TenantCache::cleanup() {
int tenantsRemoved = 0;
std::vector<Key> keysToErase;
for (auto& t : tenantCache) {
ASSERT(t.value->cacheGeneration() <= generation);
if (t.value->cacheGeneration() != generation) {
keysToErase.push_back(t.key);
}
}
for (auto& k : keysToErase) {
tenantCache.erase(k);
tenantsRemoved++;
}
return tenantsRemoved;
}
std::string TenantCache::desc() const {
std::string s("@Generation: ");
s += std::to_string(generation) + " ";
int count = 0;
for (auto& [tenantPrefix, tenant] : tenantCache) {
if (count) {
s += ", ";
}
s += "Name: " + tenant->name().toString() + " Prefix: " + tenantPrefix.printable();
count++;
}
return s;
}
bool TenantCache::isTenantKey(KeyRef key) const {
auto it = tenantCache.lastLessOrEqual(key);
if (it == tenantCache.end()) {
return false;
}
if (!key.startsWith(it->key)) {
return false;
}
return true;
}
Future<Void> TenantCache::build(Database cx) {
return TenantCacheImpl::build(this);
}
Future<Void> TenantCache::monitorTenantMap() {
return TenantCacheImpl::monitorTenantMap(this);
}
class TenantCacheUnitTest {
public:
ACTOR static Future<Void> InsertAndTestPresence() {
wait(Future<Void>(Void()));
Database cx;
TenantCache tenantCache(cx, UID(1, 0));
constexpr static uint16_t tenantLimit = 64;
uint16_t tenantCount = deterministicRandom()->randomInt(1, tenantLimit);
uint16_t tenantNumber = deterministicRandom()->randomInt(0, std::numeric_limits<uint16_t>::max());
for (uint16_t i = 0; i < tenantCount; i++) {
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantNumber + i));
TenantMapEntry tenant(tenantNumber + i, KeyRef());
tenantCache.insert(tenantName, tenant);
}
for (int i = 0; i < tenantLimit; i++) {
Key k(format("%d", i));
ASSERT(tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantNumber + (i % tenantCount)))));
ASSERT(!tenantCache.isTenantKey(k.withPrefix(allKeys.begin)));
ASSERT(!tenantCache.isTenantKey(k));
}
return Void();
}
ACTOR static Future<Void> RefreshAndTestPresence() {
wait(Future<Void>(Void()));
Database cx;
TenantCache tenantCache(cx, UID(1, 0));
constexpr static uint16_t tenantLimit = 64;
uint16_t tenantCount = deterministicRandom()->randomInt(1, tenantLimit);
uint16_t tenantNumber = deterministicRandom()->randomInt(0, std::numeric_limits<uint16_t>::max());
for (uint16_t i = 0; i < tenantCount; i++) {
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantNumber + i));
TenantMapEntry tenant(tenantNumber + i, KeyRef());
tenantCache.insert(tenantName, tenant);
}
uint16_t staleTenantFraction = deterministicRandom()->randomInt(1, 8);
tenantCache.startRefresh();
int keepCount = 0, removeCount = 0;
for (int i = 0; i < tenantCount; i++) {
uint16_t tenantOrdinal = tenantNumber + i;
if (tenantOrdinal % staleTenantFraction != 0) {
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantOrdinal));
TenantMapEntry tenant(tenantOrdinal, KeyRef());
bool newTenant = tenantCache.update(tenantName, tenant);
ASSERT(!newTenant);
keepCount++;
} else {
removeCount++;
}
}
int tenantsRemoved = tenantCache.cleanup();
ASSERT(tenantsRemoved == removeCount);
int keptCount = 0, removedCount = 0;
for (int i = 0; i < tenantCount; i++) {
uint16_t tenantOrdinal = tenantNumber + i;
Key k(format("%d", i));
if (tenantOrdinal % staleTenantFraction != 0) {
ASSERT(tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantOrdinal))));
keptCount++;
} else {
ASSERT(!tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantOrdinal))));
removedCount++;
}
}
ASSERT(keepCount == keptCount);
ASSERT(removeCount == removedCount);
return Void();
}
};
TEST_CASE("/TenantCache/InsertAndTestPresence") {
wait(TenantCacheUnitTest::InsertAndTestPresence());
return Void();
}
TEST_CASE("/TenantCache/RefreshAndTestPresence") {
wait(TenantCacheUnitTest::RefreshAndTestPresence());
return Void();
}

72
fdbserver/TenantCache.h Normal file
View File

@ -0,0 +1,72 @@
/*
* TenantCache.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "fdbserver/DDTeamCollection.h"
#include "fdbserver/TCInfo.h"
#include "flow/IRandom.h"
#include "flow/IndexedSet.h"
#include <limits>
#include <string>
typedef Map<KeyRef, Reference<TCTenantInfo>> TenantMapByPrefix;
class TenantCache : public ReferenceCounted<TenantCache> {
friend class TenantCacheImpl;
friend class TenantCacheUnitTest;
private:
constexpr static uint64_t INVALID_GENERATION = std::numeric_limits<uint64_t>::max();
UID distributorID;
Database cx;
uint64_t generation;
TenantMapByPrefix tenantCache;
// mark the start of a new sweep of the tenant cache
void startRefresh();
void insert(TenantName& tenantName, TenantMapEntry& tenant);
void keep(TenantName& tenantName, TenantMapEntry& tenant);
// return true if a new tenant is inserted into the cache
bool update(TenantName& tenantName, TenantMapEntry& tenant);
// return count of tenants that were found to be stale and removed from the cache
int cleanup();
UID id() const { return distributorID; }
Database dbcx() const { return cx; }
public:
TenantCache(Database cx, UID distributorID) : distributorID(distributorID), cx(cx) {
generation = deterministicRandom()->randomUInt32();
}
Future<Void> build(Database cx);
Future<Void> monitorTenantMap();
std::string desc() const;
bool isTenantKey(KeyRef key) const;
};

View File

@ -1629,7 +1629,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
std::vector<Future<Void>> tenantFutures;
for (auto tenant : tenantsToCreate) {
TraceEvent("CreatingTenant").detail("Tenant", tenant);
tenantFutures.push_back(ManagementAPI::createTenant(cx.getReference(), tenant));
tenantFutures.push_back(success(ManagementAPI::createTenant(cx.getReference(), tenant)));
}
wait(waitForAll(tenantFutures));

View File

@ -206,30 +206,14 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
if (BGW_DEBUG) {
fmt::print("Setting up blob granule range for tenant {0}\n", name.printable());
}
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Optional<TenantMapEntry> entry = wait(ManagementAPI::createTenantTransaction(tr, name));
if (!entry.present()) {
// if tenant already exists because of retry, load it
wait(store(entry, ManagementAPI::tryGetTenantTransaction(tr, name)));
ASSERT(entry.present());
}
TenantMapEntry entry = wait(ManagementAPI::createTenant(cx.getReference(), name));
wait(tr->commit());
if (BGW_DEBUG) {
fmt::print("Set up blob granule range for tenant {0}: {1}\n",
name.printable(),
entry.get().prefix.printable());
}
return entry.get();
} catch (Error& e) {
wait(tr->onError(e));
}
if (BGW_DEBUG) {
fmt::print("Set up blob granule range for tenant {0}: {1}\n", name.printable(), entry.prefix.printable());
}
return entry;
}
std::string description() const override { return "BlobGranuleCorrectnessWorkload"; }
@ -855,6 +839,29 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
return delay(testDuration);
}
ACTOR Future<Void> checkTenantRanges(BlobGranuleCorrectnessWorkload* self,
Database cx,
Reference<ThreadData> threadData) {
// check that reading ranges with tenant name gives valid result of ranges just for tenant, with no tenant
// prefix
loop {
state Transaction tr(cx, threadData->tenantName);
try {
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(normalKeys));
ASSERT(ranges.size() >= 1);
ASSERT(ranges.front().begin == normalKeys.begin);
ASSERT(ranges.back().end == normalKeys.end);
for (int i = 0; i < ranges.size() - 1; i++) {
ASSERT(ranges[i].end == ranges[i + 1].begin);
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<bool> checkDirectory(Database cx,
BlobGranuleCorrectnessWorkload* self,
Reference<ThreadData> threadData) {
@ -891,6 +898,11 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
wait(self->waitFirstSnapshot(self, cx, threadData, false));
}
}
// read granule ranges with tenant and validate
if (BGW_DEBUG) {
fmt::print("Directory {0} checking tenant ranges\n", threadData->directoryID);
}
wait(self->checkTenantRanges(self, cx, threadData));
bool initialCheck = result;
result &= threadData->mismatches == 0 && (threadData->timeTravelTooOld == 0);

View File

@ -254,7 +254,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (prevPurgeVersion < maxPurgeVersion) {
newPurgeVersion = deterministicRandom()->randomInt64(prevPurgeVersion, maxPurgeVersion);
prevPurgeVersion = std::max(prevPurgeVersion, newPurgeVersion);
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, false));
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, {}, false));
wait(cx->waitPurgeGranulesComplete(purgeKey));
self->purges++;
} else {

View File

@ -225,7 +225,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
// The last tenant will not be created
if (i < self->numTenants) {
tenantFutures.push_back(ManagementAPI::createTenant(cx.getReference(), tenantName));
tenantFutures.push_back(::success(ManagementAPI::createTenant(cx.getReference(), tenantName)));
self->createdTenants.insert(tenantName);
}
}

View File

@ -131,88 +131,129 @@ struct TenantManagementWorkload : TestWorkload {
}
ACTOR Future<Void> createTenant(Database cx, TenantManagementWorkload* self) {
state TenantName tenant = self->chooseTenantName(true);
state bool alreadyExists = self->createdTenants.count(tenant);
state OperationType operationType = TenantManagementWorkload::randomOperationType();
int numTenants = 1;
// For transaction-based operations, test creating multiple tenants in the same transaction
if (operationType == OperationType::SPECIAL_KEYS || operationType == OperationType::MANAGEMENT_TRANSACTION) {
numTenants = deterministicRandom()->randomInt(1, 5);
}
state bool alreadyExists = false;
state bool hasSystemTenant = false;
state std::set<TenantName> tenantsToCreate;
for (int i = 0; i < numTenants; ++i) {
TenantName tenant = self->chooseTenantName(true);
tenantsToCreate.insert(tenant);
alreadyExists = alreadyExists || self->createdTenants.count(tenant);
hasSystemTenant = hasSystemTenant || tenant.startsWith("\xff"_sr);
}
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
if (operationType == OperationType::SPECIAL_KEYS) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
Key key = self->specialKeysTenantMapPrefix.withSuffix(tenant);
tr->set(key, ""_sr);
for (auto tenant : tenantsToCreate) {
tr->set(self->specialKeysTenantMapPrefix.withSuffix(tenant), ""_sr);
}
wait(tr->commit());
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
wait(ManagementAPI::createTenant(cx.getReference(), tenant));
ASSERT(tenantsToCreate.size() == 1);
wait(success(ManagementAPI::createTenant(cx.getReference(), *tenantsToCreate.begin())));
} else {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<TenantMapEntry> _ = wait(ManagementAPI::createTenantTransaction(tr, tenant));
Optional<Value> lastIdVal = wait(tr->get(tenantLastIdKey));
int64_t previousId = lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) : -1;
std::vector<Future<Void>> createFutures;
for (auto tenant : tenantsToCreate) {
createFutures.push_back(
success(ManagementAPI::createTenantTransaction(tr, tenant, ++previousId)));
}
tr->set(tenantLastIdKey, TenantMapEntry::idToPrefix(previousId));
wait(waitForAll(createFutures));
wait(tr->commit());
}
if (operationType != OperationType::MANAGEMENT_DATABASE && alreadyExists) {
return Void();
if (operationType == OperationType::MANAGEMENT_DATABASE) {
ASSERT(!alreadyExists);
}
ASSERT(!alreadyExists);
ASSERT(!tenant.startsWith("\xff"_sr));
ASSERT(!hasSystemTenant);
state Optional<TenantMapEntry> entry = wait(ManagementAPI::tryGetTenant(cx.getReference(), tenant));
ASSERT(entry.present());
ASSERT(entry.get().id > self->maxId);
ASSERT(entry.get().prefix.startsWith(self->tenantSubspace));
state std::set<TenantName>::iterator tenantItr;
for (tenantItr = tenantsToCreate.begin(); tenantItr != tenantsToCreate.end(); ++tenantItr) {
if (self->createdTenants.count(*tenantItr)) {
continue;
}
self->maxId = entry.get().id;
self->createdTenants[tenant] = TenantState(entry.get().id, true);
state Optional<TenantMapEntry> entry =
wait(ManagementAPI::tryGetTenant(cx.getReference(), *tenantItr));
ASSERT(entry.present());
ASSERT(entry.get().id > self->maxId);
ASSERT(entry.get().prefix.startsWith(self->tenantSubspace));
state bool insertData = deterministicRandom()->random01() < 0.5;
if (insertData) {
state Transaction insertTr(cx, tenant);
loop {
try {
insertTr.set(self->keyName, tenant);
wait(insertTr.commit());
break;
} catch (Error& e) {
wait(insertTr.onError(e));
self->maxId = entry.get().id;
self->createdTenants[*tenantItr] = TenantState(entry.get().id, true);
state bool insertData = deterministicRandom()->random01() < 0.5;
if (insertData) {
state Transaction insertTr(cx, *tenantItr);
loop {
try {
insertTr.set(self->keyName, *tenantItr);
wait(insertTr.commit());
break;
} catch (Error& e) {
wait(insertTr.onError(e));
}
}
self->createdTenants[*tenantItr].empty = false;
state Transaction checkTr(cx);
loop {
try {
checkTr.setOption(FDBTransactionOptions::RAW_ACCESS);
Optional<Value> val = wait(checkTr.get(self->keyName.withPrefix(entry.get().prefix)));
ASSERT(val.present());
ASSERT(val.get() == *tenantItr);
break;
} catch (Error& e) {
wait(checkTr.onError(e));
}
}
}
self->createdTenants[tenant].empty = false;
state Transaction checkTr(cx);
loop {
try {
checkTr.setOption(FDBTransactionOptions::RAW_ACCESS);
Optional<Value> val = wait(checkTr.get(self->keyName.withPrefix(entry.get().prefix)));
ASSERT(val.present());
ASSERT(val.get() == tenant);
break;
} catch (Error& e) {
wait(checkTr.onError(e));
}
}
wait(self->checkTenant(cx, self, *tenantItr, self->createdTenants[*tenantItr]));
}
wait(self->checkTenant(cx, self, tenant, self->createdTenants[tenant]));
return Void();
} catch (Error& e) {
if (e.code() == error_code_invalid_tenant_name) {
ASSERT(tenant.startsWith("\xff"_sr));
ASSERT(hasSystemTenant);
return Void();
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
if (e.code() == error_code_tenant_already_exists) {
ASSERT(alreadyExists && operationType == OperationType::MANAGEMENT_DATABASE);
} else {
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
ASSERT(tenantsToCreate.size() == 1);
TraceEvent(SevError, "CreateTenantFailure")
.error(e)
.detail("TenantName", *tenantsToCreate.begin());
}
return Void();
} else {
try {
wait(tr->onError(e));
} catch (Error& e) {
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
for (auto tenant : tenantsToCreate) {
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
}
return Void();
}
}

View File

@ -43,6 +43,7 @@ RUN yum install -y \
telnet-0.17-66.el7 \
traceroute-2.0.22-2.el7 \
unzip-6.0-22.el7_9 \
openssl-1.0.2k-24.el7_9.x86_64 \
vim-enhanced-7.4.629-8.el7_9 && \
yum clean all && \
rm -rf /var/cache/yum

View File

@ -1,10 +1,10 @@
#!/usr/bin/env python3
# entrypoint.py
# sidecar.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2018-2021 Apple Inc. and the FoundationDB project authors
# Copyright 2018-2022 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -22,24 +22,21 @@
import argparse
import hashlib
import ipaddress
import logging
import json
import logging
import os
import re
import shutil
import socket
import ssl
import stat
import time
import traceback
import sys
import tempfile
import time
from functools import partial
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from http.server import HTTPServer, ThreadingHTTPServer, BaseHTTPRequestHandler
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
@ -166,9 +163,7 @@ class Config(object):
)
parser.add_argument(
"--require-not-empty",
help=(
"A file that must be present and non-empty " "in the input directory"
),
help=("A file that must be present and non-empty in the input directory"),
action="append",
)
args = parser.parse_args()
@ -243,7 +238,7 @@ class Config(object):
if self.main_container_version == self.primary_version:
self.substitutions["BINARY_DIR"] = "/usr/bin"
else:
self.substitutions["BINARY_DIR"] = target_path = str(
self.substitutions["BINARY_DIR"] = str(
Path("%s/bin/%s" % (args.main_container_conf_dir, self.primary_version))
)
@ -346,42 +341,16 @@ class ThreadingHTTPServerV6(ThreadingHTTPServer):
address_family = socket.AF_INET6
class Server(BaseHTTPRequestHandler):
class SidecarHandler(BaseHTTPRequestHandler):
# We don't want to load the ssl context for each request so we hold it as a static variable.
ssl_context = None
@classmethod
def start(cls):
"""
This method starts the server.
"""
config = Config.shared()
colon_index = config.bind_address.rindex(":")
port_index = colon_index + 1
address = config.bind_address[:colon_index]
port = config.bind_address[port_index:]
log.info(f"Listening on {address}:{port}")
if address.startswith("[") and address.endswith("]"):
server = ThreadingHTTPServerV6((address[1:-1], int(port)), cls)
else:
server = ThreadingHTTPServer((address, int(port)), cls)
if config.enable_tls:
context = Server.load_ssl_context()
server.socket = context.wrap_socket(server.socket, server_side=True)
observer = Observer()
event_handler = CertificateEventHandler()
for path in set(
[
Path(config.certificate_file).parent.as_posix(),
Path(config.key_file).parent.as_posix(),
]
):
observer.schedule(event_handler, path)
observer.start()
server.serve_forever()
def __init__(self, config, *args, **kwargs):
self.config = config
self.ssl_context = self.__class__.ssl_context
super().__init__(*args, **kwargs)
# This method allows to trigger a reload of the ssl context and updates the static variable.
@classmethod
def load_ssl_context(cls):
config = Config.shared()
@ -390,6 +359,7 @@ class Server(BaseHTTPRequestHandler):
cls.ssl_context.check_hostname = False
cls.ssl_context.verify_mode = ssl.CERT_OPTIONAL
cls.ssl_context.load_cert_chain(config.certificate_file, config.key_file)
return cls.ssl_context
def send_text(self, text, code=200, content_type="text/plain", add_newline=True):
@ -407,16 +377,14 @@ class Server(BaseHTTPRequestHandler):
self.wfile.write(response)
def check_request_cert(self, path):
config = Config.shared()
if path == "/ready":
return True
if not config.enable_tls:
if not self.config.enable_tls:
return True
approved = self.check_cert(
self.connection.getpeercert(), config.peer_verification_rules
self.connection.getpeercert(), self.config.peer_verification_rules
)
if not approved:
self.send_error(401, "Client certificate was not approved")
@ -517,32 +485,33 @@ class Server(BaseHTTPRequestHandler):
if not self.check_request_cert(self.path):
return
if self.path.startswith("/check_hash/"):
file_path = os.path.relpath(self.path, "/check_hash")
try:
self.send_text(
check_hash(os.path.relpath(self.path, "/check_hash")), add_newline=False
self.check_hash(file_path),
add_newline=False,
)
except FileNotFoundError:
self.send_error(404, "Path not found")
self.end_headers()
self.send_error(404, f"{file_path} not found")
if self.path.startswith("/is_present/"):
if is_present(os.path.relpath(self.path, "/is_present")):
file_path = os.path.relpath(self.path, "/is_present")
if self.is_present(file_path):
self.send_text("OK")
else:
self.send_error(404, "Path not found")
self.end_headers()
self.send_error(404, f"{file_path} not found")
elif self.path == "/ready":
self.send_text(ready())
self.send_text("OK")
elif self.path == "/substitutions":
self.send_text(get_substitutions())
self.send_text(self.get_substitutions())
else:
self.send_error(404, "Path not found")
self.end_headers()
except RequestException as e:
self.send_error(400, e.message)
except (ConnectionResetError, BrokenPipeError) as ex:
log.error(f"connection was reset {ex}")
except Exception as ex:
log.error(f"Error processing request {ex}", exc_info=True)
self.send_error(500)
self.end_headers()
def do_POST(self):
"""
@ -552,15 +521,15 @@ class Server(BaseHTTPRequestHandler):
if not self.check_request_cert(self.path):
return
if self.path == "/copy_files":
self.send_text(copy_files())
self.send_text(copy_files(self.config))
elif self.path == "/copy_binaries":
self.send_text(copy_binaries())
self.send_text(copy_binaries(self.config))
elif self.path == "/copy_libraries":
self.send_text(copy_libraries())
self.send_text(copy_libraries(self.config))
elif self.path == "/copy_monitor_conf":
self.send_text(copy_monitor_conf())
self.send_text(copy_monitor_conf(self.config))
elif self.path == "/refresh_certs":
self.send_text(refresh_certs())
self.send_text(self.refresh_certs())
elif self.path == "/restart":
self.send_text("OK")
exit(1)
@ -571,16 +540,38 @@ class Server(BaseHTTPRequestHandler):
raise e
except RequestException as e:
self.send_error(400, e.message)
except e:
log.error("Error processing request", exc_info=True)
except (ConnectionResetError, BrokenPipeError) as ex:
log.error(f"connection was reset {ex}")
except Exception as ex:
log.error(f"Error processing request {ex}", exc_info=True)
self.send_error(500)
self.end_headers()
def log_message(self, format, *args):
log.info(format % args)
def refresh_certs(self):
if not self.config.enable_tls:
raise RequestException("Server is not using TLS")
SidecarHandler.load_ssl_context()
return "OK"
def get_substitutions(self):
return json.dumps(self.config.substitutions)
def check_hash(self, filename):
with open(os.path.join(self.config.output_dir, filename), "rb") as contents:
m = hashlib.sha256()
m.update(contents.read())
return m.hexdigest()
def is_present(self, filename):
return os.path.exists(os.path.join(self.config.output_dir, filename))
class CertificateEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_any_event(self, event):
if event.is_directory:
return None
@ -597,27 +588,15 @@ class CertificateEventHandler(FileSystemEventHandler):
)
time.sleep(10)
log.info("Reloading certificates")
Server.load_ssl_context()
SidecarHandler.load_ssl_context()
def check_hash(filename):
with open(os.path.join(Config.shared().output_dir, filename), "rb") as contents:
m = hashlib.sha256()
m.update(contents.read())
return m.hexdigest()
def is_present(filename):
return os.path.exists(os.path.join(Config.shared().output_dir, filename))
def copy_files():
config = Config.shared()
def copy_files(config):
if config.require_not_empty:
for filename in config.require_not_empty:
path = os.path.join(config.input_dir, filename)
if not os.path.isfile(path) or os.path.getsize(path) == 0:
raise Exception("No contents for file %s" % path)
raise Exception(f"No contents for file {path}")
for filename in config.copy_files:
tmp_file = tempfile.NamedTemporaryFile(
@ -629,8 +608,7 @@ def copy_files():
return "OK"
def copy_binaries():
config = Config.shared()
def copy_binaries(config):
if config.main_container_version != config.primary_version:
for binary in config.copy_binaries:
path = Path(f"/usr/bin/{binary}")
@ -650,8 +628,7 @@ def copy_binaries():
return "OK"
def copy_libraries():
config = Config.shared()
def copy_libraries(config):
for version in config.copy_libraries:
path = Path(f"/var/fdb/lib/libfdb_c_{version}.so")
if version == config.copy_libraries[0]:
@ -670,8 +647,7 @@ def copy_libraries():
return "OK"
def copy_monitor_conf():
config = Config.shared()
def copy_monitor_conf(config):
if config.input_monitor_conf:
with open(
os.path.join(config.input_dir, config.input_monitor_conf)
@ -695,35 +671,58 @@ def copy_monitor_conf():
return "OK"
def get_substitutions():
return json.dumps(Config.shared().substitutions)
def ready():
return "OK"
def refresh_certs():
if not Config.shared().enable_tls:
raise RequestException("Server is not using TLS")
Server.load_ssl_context()
return "OK"
class RequestException(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
def start_sidecar_server(config):
"""
This method starts the HTTP server with the sidecar handler.
"""
colon_index = config.bind_address.rindex(":")
port_index = colon_index + 1
address = config.bind_address[:colon_index]
port = config.bind_address[port_index:]
log.info(f"Listening on {address}:{port}")
handler = partial(
SidecarHandler,
config,
)
if address.startswith("[") and address.endswith("]"):
server = ThreadingHTTPServerV6((address[1:-1], int(port)), handler)
else:
server = ThreadingHTTPServer((address, int(port)), handler)
if config.enable_tls:
context = SidecarHandler.load_ssl_context()
server.socket = context.wrap_socket(server.socket, server_side=True)
observer = Observer()
event_handler = CertificateEventHandler()
for path in set(
[
Path(config.certificate_file).parent.as_posix(),
Path(config.key_file).parent.as_posix(),
]
):
observer.schedule(event_handler, path)
observer.start()
server.serve_forever()
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s")
copy_files()
copy_binaries()
copy_libraries()
copy_monitor_conf()
config = Config.shared()
copy_files(config)
copy_binaries(config)
copy_libraries(config)
copy_monitor_conf(config)
if Config.shared().init_mode:
if config.init_mode:
sys.exit(0)
Server.start()
start_sidecar_server(config)

139
packaging/docker/sidecar_test.py Executable file
View File

@ -0,0 +1,139 @@
#!/usr/bin/env python3
# sidecar_test.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2018-2022 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import shutil
import socket
import tempfile
import unittest
from functools import partial
from http.server import HTTPServer
from threading import Thread
from unittest.mock import MagicMock
import requests
from sidecar import SidecarHandler
# This test suite starts a real server with a mocked configuration and will do some requests against it.
class TestSidecar(unittest.TestCase):
def setUp(self):
super(TestSidecar, self).setUp()
self.get_free_port()
self.server_url = f"http://localhost:{self.test_server_port}"
self.mock_config = MagicMock()
# We don't want to use TLS for the local tests for now.
self.mock_config.enable_tls = False
self.mock_config.output_dir = tempfile.mkdtemp()
handler = partial(
SidecarHandler,
self.mock_config,
)
self.mock_server = HTTPServer(("localhost", self.test_server_port), handler)
# Start running mock server in a separate thread.
# Daemon threads automatically shut down when the main process exits.
self.mock_server_thread = Thread(target=self.mock_server.serve_forever)
self.mock_server_thread.setDaemon(True)
self.mock_server_thread.start()
def tearDown(self):
shutil.rmtree(self.mock_config.output_dir)
super(TestSidecar, self).tearDown()
# Helper method to get a free port
def get_free_port(self):
s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM)
s.bind(("localhost", 0))
__, port = s.getsockname()
s.close()
self.test_server_port = port
def test_get_ready(self):
r = requests.get(f"{self.server_url }/ready")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, "OK\n")
def test_get_substitutions(self):
expected = {"key": "value"}
self.mock_config.substitutions = expected
r = requests.get(f"{self.server_url }/substitutions")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json(), expected)
def test_get_check_hash_no_found(self):
r = requests.get(f"{self.server_url }/check_hash/foobar")
self.assertEqual(r.status_code, 404)
self.assertRegex(r.text, "foobar not found")
def test_get_check_hash(self):
with open(os.path.join(self.mock_config.output_dir, "foobar"), "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/check_hash/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(
r.text, "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
)
def test_get_check_hash_nested(self):
test_path = os.path.join(self.mock_config.output_dir, "nested/foobar")
os.makedirs(os.path.dirname(test_path), exist_ok=True)
with open(test_path, "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/check_hash/nested/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(
r.text, "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
)
def test_get_is_present_no_found(self):
r = requests.get(f"{self.server_url }/is_present/foobar")
self.assertEqual(r.status_code, 404)
self.assertRegex(r.text, "foobar not found")
def test_get_is_present(self):
with open(os.path.join(self.mock_config.output_dir, "foobar"), "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/is_present/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, "OK\n")
def test_get_is_present_nested(self):
test_path = os.path.join(self.mock_config.output_dir, "nested/foobar")
os.makedirs(os.path.dirname(test_path), exist_ok=True)
with open(test_path, "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/is_present/nested/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, "OK\n")
def test_get_not_found(self):
r = requests.get(f"{self.server_url }/foobar")
self.assertEqual(r.status_code, 404)
self.assertRegex(r.text, "Path not found")
# TODO(johscheuer): Add test cases for post requests.
# TODO(johscheuer): Add test cases for TLS.
if __name__ == "__main__":
unittest.main()