Merge remote-tracking branch 'origin/main' into global-tag-throttling3

This commit is contained in:
sfc-gh-tclinkenbeard 2022-06-22 22:17:33 -07:00
commit 840dac1fa3
39 changed files with 1054 additions and 295 deletions

View File

@ -86,10 +86,10 @@ public:
addTask([promise, func, priority] {
try {
auto funcResult = func();
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, nullptr, priority);
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, priority);
} catch (Error& e) {
TraceEvent("ErrorExecutingAsyncTask").error(e);
onMainThreadVoid([promise, e] { promise.sendError(e); }, nullptr, priority);
onMainThreadVoid([promise, e] { promise.sendError(e); }, priority);
}
});
return promise.getFuture();

View File

@ -101,8 +101,8 @@ public:
// @todo This API and the "getSpanContext()" API may help with debugging simulation
// test failures. (These APIs are not currently invoked anywhere.) Remove them
// later if they are not really needed.
virtual VersionVector getVersionVector() = 0;
virtual SpanContext getSpanContext() = 0;
virtual ThreadFuture<VersionVector> getVersionVector() = 0;
virtual ThreadFuture<SpanContext> getSpanContext() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -202,7 +202,7 @@ public:
auto sav = (DLThreadSingleAssignmentVar<T>*)param;
if (MultiVersionApi::api->callbackOnMainThread) {
onMainThreadVoid([sav]() { sav->apply(); }, nullptr);
onMainThreadVoid([sav]() { sav->apply(); });
} else {
sav->apply();
}

View File

@ -385,7 +385,7 @@ void DLTransaction::reset() {
api->transactionReset(tr);
}
VersionVector DLTransaction::getVersionVector() {
ThreadFuture<VersionVector> DLTransaction::getVersionVector() {
return VersionVector(); // not implemented
}
@ -1136,7 +1136,7 @@ Version MultiVersionTransaction::getCommittedVersion() {
return invalidVersion;
}
VersionVector MultiVersionTransaction::getVersionVector() {
ThreadFuture<VersionVector> MultiVersionTransaction::getVersionVector() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getVersionVector();
@ -1145,7 +1145,7 @@ VersionVector MultiVersionTransaction::getVersionVector() {
return VersionVector();
}
SpanContext MultiVersionTransaction::getSpanContext() {
ThreadFuture<SpanContext> MultiVersionTransaction::getSpanContext() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getSpanContext();
@ -1461,8 +1461,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
});
Reference<DatabaseState> dbStateRef = dbState;
onMainThreadVoid([dbStateRef]() { dbStateRef->protocolVersionMonitor = dbStateRef->monitorProtocolVersion(); },
nullptr);
onMainThreadVoid([dbStateRef]() { dbStateRef->protocolVersionMonitor = dbStateRef->monitorProtocolVersion(); });
}
}
@ -1629,7 +1628,7 @@ ThreadFuture<Void> MultiVersionDatabase::DatabaseState::monitorProtocolVersion()
ProtocolVersion clusterVersion =
!cv.isError() ? cv.get() : self->dbProtocolVersion.orDefault(currentProtocolVersion);
onMainThreadVoid([self, clusterVersion]() { self->protocolVersionChanged(clusterVersion); }, nullptr);
onMainThreadVoid([self, clusterVersion]() { self->protocolVersionChanged(clusterVersion); });
return ErrorOr<Void>(Void());
});
}
@ -1693,10 +1692,10 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion
dbReady = mapThreadFuture<Void, Void>(
newDb.castTo<DLDatabase>()->onReady(), [self, newDb, client](ErrorOr<Void> ready) {
if (!ready.isError()) {
onMainThreadVoid([self, newDb, client]() { self->updateDatabase(newDb, client); }, nullptr);
onMainThreadVoid([self, newDb, client]() { self->updateDatabase(newDb, client); });
} else {
onMainThreadVoid([self, client]() { self->updateDatabase(Reference<IDatabase>(), client); },
nullptr);
onMainThreadVoid(
[self, client]() { self->updateDatabase(Reference<IDatabase>(), client); });
}
return ready;
@ -1806,19 +1805,17 @@ void MultiVersionDatabase::DatabaseState::startLegacyVersionMonitors() {
// Cleans up state for the legacy version monitors to break reference cycles
void MultiVersionDatabase::DatabaseState::close() {
Reference<DatabaseState> self = Reference<DatabaseState>::addRef(this);
onMainThreadVoid(
[self]() {
self->closed = true;
if (self->protocolVersionMonitor.isValid()) {
self->protocolVersionMonitor.cancel();
}
for (auto monitor : self->legacyVersionMonitors) {
monitor->close();
}
onMainThreadVoid([self]() {
self->closed = true;
if (self->protocolVersionMonitor.isValid()) {
self->protocolVersionMonitor.cancel();
}
for (auto monitor : self->legacyVersionMonitors) {
monitor->close();
}
self->legacyVersionMonitors.clear();
},
nullptr);
self->legacyVersionMonitors.clear();
});
}
// Starts the connection monitor by creating a database object at an old version.
@ -1838,22 +1835,20 @@ void MultiVersionDatabase::LegacyVersionMonitor::startConnectionMonitor(
Reference<LegacyVersionMonitor> self = Reference<LegacyVersionMonitor>::addRef(this);
versionMonitor =
mapThreadFuture<Void, Void>(db.castTo<DLDatabase>()->onReady(), [self, dbState](ErrorOr<Void> ready) {
onMainThreadVoid(
[self, ready, dbState]() {
if (ready.isError()) {
if (ready.getError().code() != error_code_operation_cancelled) {
TraceEvent(SevError, "FailedToOpenDatabaseOnClient")
.error(ready.getError())
.detail("LibPath", self->client->libPath);
onMainThreadVoid([self, ready, dbState]() {
if (ready.isError()) {
if (ready.getError().code() != error_code_operation_cancelled) {
TraceEvent(SevError, "FailedToOpenDatabaseOnClient")
.error(ready.getError())
.detail("LibPath", self->client->libPath);
self->client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
}
} else {
self->runGrvProbe(dbState);
}
},
nullptr);
self->client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
}
} else {
self->runGrvProbe(dbState);
}
});
return ready;
});
@ -1868,12 +1863,10 @@ void MultiVersionDatabase::LegacyVersionMonitor::runGrvProbe(Reference<MultiVers
versionMonitor = mapThreadFuture<Version, Void>(tr->getReadVersion(), [self, dbState](ErrorOr<Version> v) {
// If the version attempt returns an error, we regard that as a connection (except operation_cancelled)
if (!v.isError() || v.getError().code() != error_code_operation_cancelled) {
onMainThreadVoid(
[self, dbState]() {
self->monitorRunning = false;
dbState->protocolVersionChanged(self->client->protocolVersion);
},
nullptr);
onMainThreadVoid([self, dbState]() {
self->monitorRunning = false;
dbState->protocolVersionChanged(self->client->protocolVersion);
});
}
return v.map<Void>([](Version v) { return Void(); });
@ -2123,11 +2116,9 @@ void MultiVersionApi::setSupportedClientVersions(Standalone<StringRef> versions)
// This option must be set on the main thread because it modifies structures that can be used concurrently by the
// main thread
onMainThreadVoid(
[this, versions]() {
localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
},
nullptr);
onMainThreadVoid([this, versions]() {
localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
});
if (!bypassMultiClientApi) {
runOnExternalClientsAllThreads([versions](Reference<ClientInfo> client) {
@ -2896,7 +2887,7 @@ THREAD_FUNC runSingleAssignmentVarTest(void* arg) {
checkUndestroyed.blockUntilReady();
}
onMainThreadVoid([done]() { *done = true; }, nullptr);
onMainThreadVoid([done]() { *done = true; });
} catch (Error& e) {
printf("Caught error in test: %s\n", e.name());
*done = true;

View File

@ -395,8 +395,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override { return SpanContext(); };
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override { return SpanContext(); };
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
@ -588,8 +588,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override;
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;

View File

@ -286,6 +286,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY, 120 ); if( randomize && BUGGIFY ) DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY = 5;
init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 10 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 1000;
init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 20 );
init( DD_TENANT_AWARENESS_ENABLED, false );
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2.0 );
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -232,6 +232,8 @@ public:
int DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY;
int DD_STORAGE_WIGGLE_PAUSE_THRESHOLD; // How many unhealthy relocations are ongoing will pause storage wiggle
int DD_STORAGE_WIGGLE_STUCK_THRESHOLD; // How many times bestTeamStuck accumulate will pause storage wiggle
bool DD_TENANT_AWARENESS_ENABLED;
int TENANT_CACHE_LIST_REFRESH_INTERVAL; // How often the TenantCache is refreshed
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -78,7 +78,8 @@ void ThreadSafeDatabase::setOption(FDBDatabaseOptions::Option option, Optional<S
db->checkDeferredError();
db->setOption(option, passValue.contents());
},
&db->deferredError);
db,
&DatabaseContext::deferredError);
}
ThreadFuture<int64_t> ThreadSafeDatabase::rebootWorker(const StringRef& address, bool check, int duration) {
@ -109,7 +110,7 @@ ThreadFuture<DatabaseSharedState*> ThreadSafeDatabase::createSharedState() {
void ThreadSafeDatabase::setSharedState(DatabaseSharedState* p) {
DatabaseContext* db = this->db;
onMainThreadVoid([db, p]() { db->setSharedState(p); }, nullptr);
onMainThreadVoid([db, p]() { db->setSharedState(p); });
}
// Return the main network thread busyness
@ -149,24 +150,22 @@ ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion)
// but run its constructor on the main thread
DatabaseContext* db = this->db = DatabaseContext::allocateOnForeignThread();
onMainThreadVoid(
[db, connFile, apiVersion]() {
try {
Database::createDatabase(
Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::False, LocalityData(), db)
.extractPtr();
} catch (Error& e) {
new (db) DatabaseContext(e);
} catch (...) {
new (db) DatabaseContext(unknown_error());
}
},
nullptr);
onMainThreadVoid([db, connFile, apiVersion]() {
try {
Database::createDatabase(
Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::False, LocalityData(), db)
.extractPtr();
} catch (Error& e) {
new (db) DatabaseContext(e);
} catch (...) {
new (db) DatabaseContext(unknown_error());
}
});
}
ThreadSafeDatabase::~ThreadSafeDatabase() {
DatabaseContext* db = this->db;
onMainThreadVoid([db]() { db->delref(); }, nullptr);
onMainThreadVoid([db]() { db->delref(); });
}
Reference<ITransaction> ThreadSafeTenant::createTransaction() {
@ -194,7 +193,7 @@ ThreadSafeTenant::~ThreadSafeTenant() {}
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
ISingleThreadTransaction::Type type,
Optional<TenantName> tenant)
: tenantName(tenant) {
: tenantName(tenant), initialized(std::make_shared<std::atomic_bool>(false)) {
// Allocate memory for the transaction from this thread (so the pointer is known for subsequent method calls)
// but run its constructor on the main thread
@ -203,29 +202,30 @@ ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
// immediately after this call, it will defer the DatabaseContext::delref (and onMainThread preserves the order of
// these operations).
auto tr = this->tr = ISingleThreadTransaction::allocateOnForeignThread(type);
auto init = this->initialized;
// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
onMainThreadVoid(
[tr, cx, type, tenant]() {
cx->addref();
if (tenant.present()) {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx), tenant.get());
} else {
tr->construct(Database(cx), tenant.get());
}
} else {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx));
} else {
tr->construct(Database(cx));
}
}
},
nullptr);
onMainThreadVoid([tr, cx, type, tenant, init]() {
cx->addref();
if (tenant.present()) {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx), tenant.get());
} else {
tr->construct(Database(cx), tenant.get());
}
} else {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx));
} else {
tr->construct(Database(cx));
}
}
*init = true;
});
}
// This constructor is only used while refactoring fdbcli and only called from the main thread
ThreadSafeTransaction::ThreadSafeTransaction(ReadYourWritesTransaction* ryw) : tr(ryw) {
ThreadSafeTransaction::ThreadSafeTransaction(ReadYourWritesTransaction* ryw)
: tr(ryw), initialized(std::make_shared<std::atomic_bool>(true)) {
if (tr)
tr->addref();
}
@ -233,17 +233,17 @@ ThreadSafeTransaction::ThreadSafeTransaction(ReadYourWritesTransaction* ryw) : t
ThreadSafeTransaction::~ThreadSafeTransaction() {
ISingleThreadTransaction* tr = this->tr;
if (tr)
onMainThreadVoid([tr]() { tr->delref(); }, nullptr);
onMainThreadVoid([tr]() { tr->delref(); });
}
void ThreadSafeTransaction::cancel() {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr]() { tr->cancel(); }, nullptr);
onMainThreadVoid([tr]() { tr->cancel(); });
}
void ThreadSafeTransaction::setVersion(Version v) {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, v]() { tr->setVersion(v); }, &tr->deferredError);
onMainThreadVoid([tr, v]() { tr->setVersion(v); }, tr, &ISingleThreadTransaction::deferredError);
}
ThreadFuture<Version> ThreadSafeTransaction::getReadVersion() {
@ -402,12 +402,12 @@ void ThreadSafeTransaction::addReadConflictRange(const KeyRangeRef& keys) {
KeyRange r = keys;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, r]() { tr->addReadConflictRange(r); }, &tr->deferredError);
onMainThreadVoid([tr, r]() { tr->addReadConflictRange(r); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::makeSelfConflicting() {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr]() { tr->makeSelfConflicting(); }, &tr->deferredError);
onMainThreadVoid([tr]() { tr->makeSelfConflicting(); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
@ -415,7 +415,9 @@ void ThreadSafeTransaction::atomicOp(const KeyRef& key, const ValueRef& value, u
Value v = value;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, k, v, operationType]() { tr->atomicOp(k, v, operationType); }, &tr->deferredError);
onMainThreadVoid([tr, k, v, operationType]() { tr->atomicOp(k, v, operationType); },
tr,
&ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::set(const KeyRef& key, const ValueRef& value) {
@ -423,14 +425,14 @@ void ThreadSafeTransaction::set(const KeyRef& key, const ValueRef& value) {
Value v = value;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, k, v]() { tr->set(k, v); }, &tr->deferredError);
onMainThreadVoid([tr, k, v]() { tr->set(k, v); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::clear(const KeyRangeRef& range) {
KeyRange r = range;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, r]() { tr->clear(r); }, &tr->deferredError);
onMainThreadVoid([tr, r]() { tr->clear(r); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::clear(const KeyRef& begin, const KeyRef& end) {
@ -445,14 +447,15 @@ void ThreadSafeTransaction::clear(const KeyRef& begin, const KeyRef& end) {
tr->clear(KeyRangeRef(b, e));
},
&tr->deferredError);
tr,
&ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::clear(const KeyRef& key) {
Key k = key;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, k]() { tr->clear(k); }, &tr->deferredError);
onMainThreadVoid([tr, k]() { tr->clear(k); }, tr, &ISingleThreadTransaction::deferredError);
}
ThreadFuture<Void> ThreadSafeTransaction::watch(const KeyRef& key) {
@ -469,7 +472,7 @@ void ThreadSafeTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
KeyRange r = keys;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, r]() { tr->addWriteConflictRange(r); }, &tr->deferredError);
onMainThreadVoid([tr, r]() { tr->addWriteConflictRange(r); }, tr, &ISingleThreadTransaction::deferredError);
}
ThreadFuture<Void> ThreadSafeTransaction::commit() {
@ -482,15 +485,20 @@ ThreadFuture<Void> ThreadSafeTransaction::commit() {
Version ThreadSafeTransaction::getCommittedVersion() {
// This should be thread safe when called legally, but it is fragile
if (!initialized || !*initialized) {
return ::invalidVersion;
}
return tr->getCommittedVersion();
}
VersionVector ThreadSafeTransaction::getVersionVector() {
return tr->getVersionVector();
ThreadFuture<VersionVector> ThreadSafeTransaction::getVersionVector() {
ISingleThreadTransaction* tr = this->tr;
return onMainThread([tr]() -> Future<VersionVector> { return tr->getVersionVector(); });
}
SpanContext ThreadSafeTransaction::getSpanContext() {
return tr->getSpanContext();
ThreadFuture<SpanContext> ThreadSafeTransaction::getSpanContext() {
ISingleThreadTransaction* tr = this->tr;
return onMainThread([tr]() -> Future<SpanContext> { return tr->getSpanContext(); });
}
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
@ -513,7 +521,9 @@ void ThreadSafeTransaction::setOption(FDBTransactionOptions::Option option, Opti
Standalone<Optional<StringRef>> passValue = value;
// ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW.
onMainThreadVoid([tr, option, passValue]() { tr->setOption(option, passValue.contents()); }, &tr->deferredError);
onMainThreadVoid([tr, option, passValue]() { tr->setOption(option, passValue.contents()); },
tr,
&ISingleThreadTransaction::deferredError);
}
ThreadFuture<Void> ThreadSafeTransaction::checkDeferredError() {
@ -541,16 +551,18 @@ Optional<TenantName> ThreadSafeTransaction::getTenant() {
void ThreadSafeTransaction::operator=(ThreadSafeTransaction&& r) noexcept {
tr = r.tr;
r.tr = nullptr;
initialized = std::move(r.initialized);
}
ThreadSafeTransaction::ThreadSafeTransaction(ThreadSafeTransaction&& r) noexcept {
tr = r.tr;
r.tr = nullptr;
initialized = std::move(r.initialized);
}
void ThreadSafeTransaction::reset() {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr]() { tr->reset(); }, nullptr);
onMainThreadVoid([tr]() { tr->reset(); });
}
extern const char* getSourceVersion();

View File

@ -170,8 +170,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override;
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override;
ThreadFuture<int64_t> getApproximateSize() override;
ThreadFuture<uint64_t> getProtocolVersion();
@ -184,7 +184,7 @@ public:
Optional<TenantName> getTenant() override;
// These are to permit use as state variables in actors:
ThreadSafeTransaction() : tr(nullptr) {}
ThreadSafeTransaction() : tr(nullptr), initialized(std::make_shared<std::atomic_bool>(false)) {}
void operator=(ThreadSafeTransaction&& r) noexcept;
ThreadSafeTransaction(ThreadSafeTransaction&& r) noexcept;
@ -196,6 +196,7 @@ public:
private:
ISingleThreadTransaction* tr;
const Optional<TenantName> tenantName;
std::shared_ptr<std::atomic_bool> initialized;
};
// An implementation of IClientApi that serializes operations onto the network thread and interacts with the lower-level

2
fdbkubernetesmonitor/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
fdbkubernetesmonitor

View File

@ -25,6 +25,7 @@ require (
github.com/fsnotify/fsnotify v1.5.1
github.com/go-logr/logr v1.2.0
github.com/go-logr/zapr v1.2.0
github.com/prometheus/client_golang v1.12.2
github.com/spf13/pflag v1.0.5
go.uber.org/zap v1.19.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
@ -37,6 +38,8 @@ require (
require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
@ -49,11 +52,15 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.18.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.6.0 // indirect

View File

@ -51,11 +51,23 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@ -90,6 +102,12 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE=
@ -105,7 +123,9 @@ github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng=
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@ -189,13 +209,21 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -206,17 +234,22 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
@ -238,18 +271,45 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.2 h1:51L9cDoUHVrXx4zWYlcLQIZ+d+VXHgqnYKkIuq4g/34=
github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
@ -279,6 +339,7 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -323,6 +384,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@ -330,6 +392,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -358,6 +421,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
@ -372,6 +436,7 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -386,10 +451,13 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -401,6 +469,7 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -414,6 +483,8 @@ golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -422,6 +493,7 @@ golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -429,9 +501,11 @@ golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@ -610,6 +684,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@ -626,6 +701,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=

View File

@ -55,6 +55,8 @@ var (
currentContainerVersion string
additionalEnvFile string
processCount int
enablePprof bool
listenAddress string
)
type executionMode string
@ -102,6 +104,8 @@ func main() {
pflag.StringVar(&mainContainerVersion, "main-container-version", "", "For sidecar mode, this specifies the version of the main container. If this is equal to the current container version, no files will be copied")
pflag.StringVar(&additionalEnvFile, "additional-env-file", "", "A file with additional environment variables to use when interpreting the monitor configuration")
pflag.IntVar(&processCount, "process-count", 1, "The number of processes to start")
pflag.BoolVar(&enablePprof, "enable-pprof", false, "Enables /debug/pprof endpoints on the listen address")
pflag.StringVar(&listenAddress, "listen-address", ":8081", "An address and port to listen on")
pflag.Parse()
logger := zapr.NewLogger(initLogger(logPath))
@ -126,7 +130,7 @@ func main() {
logger.Error(err, "Error loading additional environment")
os.Exit(1)
}
StartMonitor(logger, fmt.Sprintf("%s/%s", inputDir, monitorConfFile), customEnvironment, processCount)
StartMonitor(logger, fmt.Sprintf("%s/%s", inputDir, monitorConfFile), customEnvironment, processCount, listenAddress, enablePprof)
case executionModeInit:
err = CopyFiles(logger, outputDir, copyDetails, requiredCopies)
if err != nil {

View File

@ -24,6 +24,8 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/pprof"
"os"
"os/exec"
"os/signal"
@ -33,6 +35,7 @@ import (
"time"
"k8s.io/utils/pointer"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
"github.com/fsnotify/fsnotify"
@ -88,7 +91,7 @@ type Monitor struct {
}
// StartMonitor starts the monitor loop.
func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int) {
func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int, listenAddr string, enableDebug bool) {
podClient, err := CreatePodClient(logger)
if err != nil {
panic(err)
@ -103,6 +106,33 @@ func StartMonitor(logger logr.Logger, configFile string, customEnvironment map[s
}
go func() { monitor.WatchPodTimestamps() }()
mux := http.NewServeMux()
// Enable pprof endpoints for debugging purposes.
if enableDebug {
mux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
mux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
mux.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
mux.Handle("/debug/pprof/block", pprof.Handler("block"))
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
// Add Prometheus support
mux.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(listenAddr, mux)
if err != nil {
logger.Error(err, "could not start HTTP server")
os.Exit(1)
}
}()
monitor.Run()
}

View File

@ -538,7 +538,7 @@ private:
static void eio_want_poll() {
want_poll = 1;
// SOMEDAY: nullptr for deferred error, no analysis of correctness (itp)
onMainThreadVoid([]() { poll_eio(); }, nullptr, TaskPriority::PollEIO);
onMainThreadVoid([]() { poll_eio(); }, TaskPriority::PollEIO);
}
static int eio_callback(eio_req* req) {

View File

@ -165,6 +165,8 @@ set(FDBSERVER_SRCS
TCInfo.h
template_fdb.h
tester.actor.cpp
TenantCache.actor.cpp
TenantCache.h
TesterInterface.actor.h
TLogInterface.h
TLogServer.actor.cpp

View File

@ -28,6 +28,7 @@
#include "fdbclient/SystemData.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
@ -208,11 +209,13 @@ ACTOR Future<Void> handleLeaderReplacement(Reference<ClusterRecoveryData> self,
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
ClusterControllerData::DBInfo* db,
ServerCoordinators coordinators,
Future<Void> leaderFail) {
Future<Void> leaderFail,
Future<Void> recoveredDiskFiles) {
state MasterInterface iMaster;
state Reference<ClusterRecoveryData> recoveryData;
state PromiseStream<Future<Void>> addActor;
state Future<Void> recoveryCore;
state bool recoveredDisk = false;
// SOMEDAY: If there is already a non-failed master referenced by zkMasterInfo, use that one until it fails
// When this someday is implemented, make sure forced failures still cause the master to be recruited again
@ -254,6 +257,18 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
.detail("ChangeID", dbInfo.id);
db->serverInfo->set(dbInfo);
if (SERVER_KNOBS->ENABLE_ENCRYPTION && !recoveredDisk) {
// EKP singleton recruitment waits for 'Master/Sequencer' recruitment, execute wait for
// 'recoveredDiskFiles' optimization once EKP recruitment is unblocked to avoid circular dependencies
// with StorageServer initialization. The waiting for recoveredDiskFiles is to make sure the worker
// server on the same process has been registered with the new CC before recruitment.
wait(recoveredDiskFiles);
TraceEvent("CCWDB_RecoveredDiskFiles", cluster->id).log();
// Need to be done for the first once in the lifetime of ClusterController
recoveredDisk = true;
}
state Future<Void> spinDelay = delay(
SERVER_KNOBS
->MASTER_SPIN_DELAY); // Don't retry cluster recovery more than once per second, but don't delay
@ -2511,7 +2526,8 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
Future<Void> leaderFail,
ServerCoordinators coordinators,
LocalityData locality,
ConfigDBType configDBType) {
ConfigDBType configDBType,
Future<Void> recoveredDiskFiles) {
state ClusterControllerData self(interf, locality, coordinators);
state ConfigBroadcaster configBroadcaster(coordinators, configDBType);
state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY);
@ -2522,7 +2538,8 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
if (SERVER_KNOBS->ENABLE_ENCRYPTION || g_network->isSimulated()) {
self.addActor.send(monitorEncryptKeyProxy(&self));
}
self.addActor.send(clusterWatchDatabase(&self, &self.db, coordinators, leaderFail)); // Start the master database
self.addActor.send(clusterWatchDatabase(
&self, &self.db, coordinators, leaderFail, recoveredDiskFiles)); // Start the master database
self.addActor.send(self.updateWorkerList.init(self.db.db));
self.addActor.send(statusServer(interf.clientInterface.databaseStatus.getFuture(),
&self,
@ -2651,7 +2668,8 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
bool hasConnected,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
LocalityData locality,
ConfigDBType configDBType) {
ConfigDBType configDBType,
Future<Void> recoveredDiskFiles) {
loop {
state ClusterControllerFullInterface cci;
state bool inRole = false;
@ -2678,7 +2696,7 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
inRole = true;
wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType));
wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType, recoveredDiskFiles));
}
} catch (Error& e) {
if (inRole)
@ -2703,12 +2721,27 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
Future<Void> recoveredDiskFiles,
LocalityData locality,
ConfigDBType configDBType) {
wait(recoveredDiskFiles);
// Defer this wait optimization of cluster configuration has 'Encryption data at-rest' enabled.
// Encryption depends on available of EncryptKeyProxy (EKP) FDB role to enable fetch/refresh of encryption keys
// created and managed by external KeyManagementService (KMS).
//
// TODO: Wait optimization is to ensure the worker server on the same process gets registered with the new CC before
// recruitment. Unify the codepath for both Encryption enable vs disable scenarios.
if (!SERVER_KNOBS->ENABLE_ENCRYPTION) {
wait(recoveredDiskFiles);
TraceEvent("RecoveredDiskFiles").log();
} else {
TraceEvent("RecoveredDiskFiles_Deferred").log();
}
state bool hasConnected = false;
loop {
try {
ServerCoordinators coordinators(connRecord);
wait(clusterController(coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType));
wait(clusterController(
coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType, recoveredDiskFiles));
hasConnected = true;
} catch (Error& e) {
if (e.code() != error_code_coordinators_changed)

View File

@ -465,6 +465,7 @@ public:
[compactionVersion](const auto& va) { return va.version > compactionVersion; });
annotationHistory.erase(annotationHistory.begin(), it);
}
lastCompactedVersion = compactionVersion;
}
Future<Void> getError() const { return consumerFuture || actors.getResult(); }

View File

@ -299,6 +299,8 @@ public:
return readFrom.checkEventually(member, value);
}
JsonBuilderObject getStatus() const { return broadcaster.getStatus(); }
void changeBroadcaster() {
broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{});
@ -453,6 +455,8 @@ public:
void restartNode() { writeTo.restartNode(); }
JsonBuilderObject getStatus() const { return broadcaster.getStatus(); }
void changeBroadcaster() {
broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{});
@ -565,7 +569,7 @@ Future<Void> testRestartLocalConfigAndChangeClass(UnitTestParameters params) {
}
ACTOR template <class Env>
Future<Void> testNewLocalConfigAfterCompaction(UnitTestParameters params) {
Future<std::string> testNewLocalConfigAfterCompaction(UnitTestParameters params) {
state Env env(params.getDataDir(), "class-A");
wait(env.setup(ConfigClassSet({ "class-A"_sr })));
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
@ -581,7 +585,7 @@ Future<Void> testNewLocalConfigAfterCompaction(UnitTestParameters params) {
wait(check(env, &TestKnobs::TEST_LONG, Optional<int64_t>{ 1 }));
wait(set(env, "class-A"_sr, "test_long"_sr, 2));
wait(check(env, &TestKnobs::TEST_LONG, Optional<int64_t>{ 2 }));
return Void();
return env.getStatus().getJson();
}
ACTOR template <class Env>
@ -837,7 +841,24 @@ TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/Compact") {
}
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/RestartLocalConfigurationAfterCompaction") {
wait(testNewLocalConfigAfterCompaction<BroadcasterToLocalConfigEnvironment>(params));
std::string statusStr = wait(testNewLocalConfigAfterCompaction<BroadcasterToLocalConfigEnvironment>(params));
json_spirit::mValue status;
ASSERT(json_spirit::read_string(statusStr, status));
ASSERT(status.type() == json_spirit::obj_type);
auto lastCompacted = status.get_obj().at("last_compacted_version").get_int64();
ASSERT_EQ(lastCompacted, 1);
auto mostRecent = status.get_obj().at("most_recent_version").get_int64();
ASSERT_EQ(mostRecent, 2);
auto commits = status.get_obj().at("commits");
// The unit test does not include annotations when running the set
// operation.
ASSERT_EQ(commits.get_array().size(), 0);
auto mutations = status.get_obj().at("mutations");
ASSERT_EQ(mutations.get_array().size(), 1);
auto snapshot = status.get_obj().at("snapshot");
auto classA = snapshot.get_obj().at("class-A");
auto value = classA.get_obj().at("test_long").get_str();
ASSERT(value == "int64_t:2");
return Void();
}
@ -886,7 +907,10 @@ TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/CompactNode") {
}
TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/RestartLocalConfigurationAfterCompaction") {
wait(testNewLocalConfigAfterCompaction<TransactionToLocalConfigEnvironment>(params));
// TransactionToLocalConfigEnvironment only calls into ConfigNode compact.
// It does not interact with the broadcaster, thus status json won't be
// updated.
wait(success(testNewLocalConfigAfterCompaction<TransactionToLocalConfigEnvironment>(params)));
return Void();
}

View File

@ -3830,7 +3830,8 @@ int DDTeamCollection::overlappingMachineMembers(std::vector<Standalone<StringRef
void DDTeamCollection::addTeam(const std::vector<Reference<TCServerInfo>>& newTeamServers,
IsInitialTeam isInitialTeam,
IsRedundantTeam redundantTeam) {
auto teamInfo = makeReference<TCTeamInfo>(newTeamServers);
Optional<Reference<TCTenantInfo>> no_tenant = {};
auto teamInfo = makeReference<TCTeamInfo>(newTeamServers, no_tenant);
// Move satisfiesPolicy to the end for performance benefit
auto badTeam = IsBadTeam{ redundantTeam || teamInfo->size() != configuration.storageTeamSize ||

View File

@ -49,6 +49,7 @@
#include "flow/UnitTest.h"
class TCTeamInfo;
class TCTenantInfo;
class TCMachineInfo;
class TCMachineTeamInfo;

View File

@ -19,6 +19,7 @@
*/
#include <set>
#include <string>
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBOptions.g.h"
@ -28,6 +29,7 @@
#include "fdbclient/RunTransaction.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tenant.h"
#include "fdbrpc/Replication.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/DDTeamCollection.h"
@ -37,6 +39,7 @@
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TenantCache.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WaitFailure.h"
#include "flow/ActorCollection.h"
@ -513,6 +516,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
state Reference<DDTeamCollection> primaryTeamCollection;
state Reference<DDTeamCollection> remoteTeamCollection;
state bool trackerCancelled;
state bool ddIsTenantAware = SERVER_KNOBS->DD_TENANT_AWARENESS_ENABLED;
loop {
trackerCancelled = false;
@ -597,6 +601,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
// mode may be set true by system operator using fdbcli and isDDEnabled() set to true
break;
}
TraceEvent("DataDistributionDisabled", self->ddId).log();
TraceEvent("MovingData", self->ddId)
@ -637,6 +642,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
TraceEvent("DataDistributionEnabled").log();
}
state Reference<TenantCache> ddTenantCache;
if (ddIsTenantAware) {
ddTenantCache = makeReference<TenantCache>(cx, self->ddId);
wait(ddTenantCache->build(cx));
}
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
ASSERT(configuration.storageTeamSize > 0);
@ -705,6 +716,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
} else {
anyZeroHealthyTeams = zeroHealthyTeams[0];
}
if (ddIsTenantAware) {
actors.push_back(reportErrorsExcept(
ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors()));
}
actors.push_back(pollMoveKeysLock(cx, lock, ddEnabledState));
actors.push_back(reportErrorsExcept(dataDistributionTracker(initData,

View File

@ -328,6 +328,7 @@ class PaxosConfigConsumerImpl {
}
try {
wait(timeoutError(waitForAll(compactionRequests), 1.0));
broadcaster->compact(compactionVersion);
} catch (Error& e) {
TraceEvent(SevWarn, "ErrorSendingCompactionRequest").error(e);
}

View File

@ -3149,12 +3149,10 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["workload"] = workerStatuses[1];
statusObj["layers"] = workerStatuses[2];
/*
if (configBroadcaster) {
// TODO: Read from coordinators for more up-to-date config database status?
statusObj["configuration_database"] = configBroadcaster->getStatus();
// TODO: Read from coordinators for more up-to-date config database status?
statusObj["configuration_database"] = configBroadcaster->getStatus();
}
*/
// Add qos section if it was populated
if (!qos.empty())

View File

@ -307,9 +307,9 @@ std::string TCMachineTeamInfo::getMachineIDsStr() const {
return std::move(ss).str();
}
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers)
: servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY),
id(deterministicRandom()->randomUniqueID()) {
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers, Optional<Reference<TCTenantInfo>> tenant)
: servers(servers), tenant(tenant), healthy(true), wrongConfiguration(false),
priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY), id(deterministicRandom()->randomUniqueID()) {
if (servers.empty()) {
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log();
}

View File

@ -20,9 +20,13 @@
#pragma once
#include "fdbclient/Tenant.h"
#include "fdbserver/DDTeamCollection.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
class TCTeamInfo;
class TCTenantInfo;
class TCMachineInfo;
class TCMachineTeamInfo;
@ -166,6 +170,7 @@ class TCTeamInfo final : public ReferenceCounted<TCTeamInfo>, public IDataDistri
friend class TCTeamInfoImpl;
std::vector<Reference<TCServerInfo>> servers;
std::vector<UID> serverIDs;
Optional<Reference<TCTenantInfo>> tenant;
bool healthy;
bool wrongConfiguration; // True if any of the servers in the team have the wrong configuration
int priority;
@ -175,7 +180,9 @@ public:
Reference<TCMachineTeamInfo> machineTeam;
Future<Void> tracker;
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers);
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers, Optional<Reference<TCTenantInfo>> tenant);
Optional<Reference<TCTenantInfo>>& getTenant() { return tenant; }
std::string getTeamID() const override { return id.shortString(); }
@ -235,3 +242,24 @@ private:
bool allServersHaveHealthyAvailableSpace() const;
};
class TCTenantInfo : public ReferenceCounted<TCTenantInfo> {
private:
TenantInfo m_tenantInfo;
Key m_prefix;
std::vector<Reference<TCTeamInfo>> m_tenantTeams;
int64_t m_cacheGeneration;
public:
TCTenantInfo() { m_prefix = allKeys.end; }
TCTenantInfo(TenantInfo tinfo, Key prefix) : m_tenantInfo(tinfo), m_prefix(prefix) {}
std::vector<Reference<TCTeamInfo>>& teams() { return m_tenantTeams; }
TenantName name() { return m_tenantInfo.name.get(); }
std::string prefixDesc() { return m_prefix.printable(); }
void addTeam(TCTeamInfo team);
void removeTeam(TCTeamInfo team);
void updateCacheGeneration(int64_t generation) { m_cacheGeneration = generation; }
int64_t cacheGeneration() const { return m_cacheGeneration; }
};

View File

@ -0,0 +1,304 @@
/*
* TenantCache.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/DDTeamCollection.h"
#include "fdbserver/TenantCache.h"
#include <limits>
#include <string>
#include "flow/actorcompiler.h"
class TenantCacheImpl {
ACTOR static Future<RangeResult> getTenantList(TenantCache* tenantCache, Transaction* tr) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state Future<RangeResult> tenantList = tr->getRange(tenantMapKeys, CLIENT_KNOBS->TOO_MANY);
wait(success(tenantList));
ASSERT(!tenantList.get().more && tenantList.get().size() < CLIENT_KNOBS->TOO_MANY);
return tenantList.get();
}
public:
ACTOR static Future<Void> build(TenantCache* tenantCache) {
state Transaction tr(tenantCache->dbcx());
TraceEvent(SevInfo, "BuildingTenantCache", tenantCache->id()).log();
try {
state RangeResult tenantList = wait(getTenantList(tenantCache, &tr));
for (int i = 0; i < tenantList.size(); i++) {
TenantName tname = tenantList[i].key.removePrefix(tenantMapPrefix);
TenantMapEntry t = decodeTenantEntry(tenantList[i].value);
tenantCache->insert(tname, t);
TraceEvent(SevInfo, "TenantFound", tenantCache->id())
.detail("TenantName", tname)
.detail("TenantID", t.id)
.detail("TenantPrefix", t.prefix);
}
} catch (Error& e) {
wait(tr.onError(e));
}
TraceEvent(SevInfo, "BuiltTenantCache", tenantCache->id()).log();
return Void();
}
ACTOR static Future<Void> monitorTenantMap(TenantCache* tenantCache) {
TraceEvent(SevInfo, "StartingTenantCacheMonitor", tenantCache->id()).log();
state Transaction tr(tenantCache->dbcx());
state double lastTenantListFetchTime = now();
loop {
try {
if (now() - lastTenantListFetchTime > (2 * SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL)) {
TraceEvent(SevWarn, "TenantListRefreshDelay", tenantCache->id()).log();
}
state RangeResult tenantList = wait(getTenantList(tenantCache, &tr));
tenantCache->startRefresh();
bool tenantListUpdated = false;
for (int i = 0; i < tenantList.size(); i++) {
TenantName tname = tenantList[i].key.removePrefix(tenantMapPrefix);
TenantMapEntry t = decodeTenantEntry(tenantList[i].value);
if (tenantCache->update(tname, t)) {
tenantListUpdated = true;
}
}
if (tenantCache->cleanup()) {
tenantListUpdated = true;
}
if (tenantListUpdated) {
TraceEvent(SevInfo, "TenantCache", tenantCache->id()).detail("List", tenantCache->desc());
}
lastTenantListFetchTime = now();
tr.reset();
wait(delay(SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL));
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent("TenantCacheGetTenantListError", tenantCache->id())
.errorUnsuppressed(e)
.suppressFor(1.0);
}
wait(tr.onError(e));
}
}
}
};
void TenantCache::insert(TenantName& tenantName, TenantMapEntry& tenant) {
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
ASSERT(tenantCache.find(tenantPrefix) == tenantCache.end());
TenantInfo tenantInfo(tenantName, tenant.id);
tenantCache[tenantPrefix] = makeReference<TCTenantInfo>(tenantInfo, tenant.prefix);
tenantCache[tenantPrefix]->updateCacheGeneration(generation);
}
void TenantCache::startRefresh() {
ASSERT(generation < std::numeric_limits<uint64_t>::max());
generation++;
}
void TenantCache::keep(TenantName& tenantName, TenantMapEntry& tenant) {
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
ASSERT(tenantCache.find(tenantPrefix) != tenantCache.end());
tenantCache[tenantPrefix]->updateCacheGeneration(generation);
}
bool TenantCache::update(TenantName& tenantName, TenantMapEntry& tenant) {
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
if (tenantCache.find(tenantPrefix) != tenantCache.end()) {
keep(tenantName, tenant);
return false;
}
insert(tenantName, tenant);
return true;
}
int TenantCache::cleanup() {
int tenantsRemoved = 0;
std::vector<Key> keysToErase;
for (auto& t : tenantCache) {
ASSERT(t.value->cacheGeneration() <= generation);
if (t.value->cacheGeneration() != generation) {
keysToErase.push_back(t.key);
}
}
for (auto& k : keysToErase) {
tenantCache.erase(k);
tenantsRemoved++;
}
return tenantsRemoved;
}
std::string TenantCache::desc() const {
std::string s("@Generation: ");
s += std::to_string(generation) + " ";
int count = 0;
for (auto& [tenantPrefix, tenant] : tenantCache) {
if (count) {
s += ", ";
}
s += "Name: " + tenant->name().toString() + " Prefix: " + tenantPrefix.printable();
count++;
}
return s;
}
bool TenantCache::isTenantKey(KeyRef key) const {
auto it = tenantCache.lastLessOrEqual(key);
if (it == tenantCache.end()) {
return false;
}
if (!key.startsWith(it->key)) {
return false;
}
return true;
}
Future<Void> TenantCache::build(Database cx) {
return TenantCacheImpl::build(this);
}
Future<Void> TenantCache::monitorTenantMap() {
return TenantCacheImpl::monitorTenantMap(this);
}
class TenantCacheUnitTest {
public:
ACTOR static Future<Void> InsertAndTestPresence() {
wait(Future<Void>(Void()));
Database cx;
TenantCache tenantCache(cx, UID(1, 0));
constexpr static uint16_t tenantLimit = 64;
uint16_t tenantCount = deterministicRandom()->randomInt(1, tenantLimit);
uint16_t tenantNumber = deterministicRandom()->randomInt(0, std::numeric_limits<uint16_t>::max());
for (uint16_t i = 0; i < tenantCount; i++) {
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantNumber + i));
TenantMapEntry tenant(tenantNumber + i, KeyRef());
tenantCache.insert(tenantName, tenant);
}
for (int i = 0; i < tenantLimit; i++) {
Key k(format("%d", i));
ASSERT(tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantNumber + (i % tenantCount)))));
ASSERT(!tenantCache.isTenantKey(k.withPrefix(allKeys.begin)));
ASSERT(!tenantCache.isTenantKey(k));
}
return Void();
}
ACTOR static Future<Void> RefreshAndTestPresence() {
wait(Future<Void>(Void()));
Database cx;
TenantCache tenantCache(cx, UID(1, 0));
constexpr static uint16_t tenantLimit = 64;
uint16_t tenantCount = deterministicRandom()->randomInt(1, tenantLimit);
uint16_t tenantNumber = deterministicRandom()->randomInt(0, std::numeric_limits<uint16_t>::max());
for (uint16_t i = 0; i < tenantCount; i++) {
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantNumber + i));
TenantMapEntry tenant(tenantNumber + i, KeyRef());
tenantCache.insert(tenantName, tenant);
}
uint16_t staleTenantFraction = deterministicRandom()->randomInt(1, 8);
tenantCache.startRefresh();
int keepCount = 0, removeCount = 0;
for (int i = 0; i < tenantCount; i++) {
uint16_t tenantOrdinal = tenantNumber + i;
if (tenantOrdinal % staleTenantFraction != 0) {
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantOrdinal));
TenantMapEntry tenant(tenantOrdinal, KeyRef());
bool newTenant = tenantCache.update(tenantName, tenant);
ASSERT(!newTenant);
keepCount++;
} else {
removeCount++;
}
}
int tenantsRemoved = tenantCache.cleanup();
ASSERT(tenantsRemoved == removeCount);
int keptCount = 0, removedCount = 0;
for (int i = 0; i < tenantCount; i++) {
uint16_t tenantOrdinal = tenantNumber + i;
Key k(format("%d", i));
if (tenantOrdinal % staleTenantFraction != 0) {
ASSERT(tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantOrdinal))));
keptCount++;
} else {
ASSERT(!tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantOrdinal))));
removedCount++;
}
}
ASSERT(keepCount == keptCount);
ASSERT(removeCount == removedCount);
return Void();
}
};
TEST_CASE("/TenantCache/InsertAndTestPresence") {
wait(TenantCacheUnitTest::InsertAndTestPresence());
return Void();
}
TEST_CASE("/TenantCache/RefreshAndTestPresence") {
wait(TenantCacheUnitTest::RefreshAndTestPresence());
return Void();
}

72
fdbserver/TenantCache.h Normal file
View File

@ -0,0 +1,72 @@
/*
* TenantCache.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "fdbserver/DDTeamCollection.h"
#include "fdbserver/TCInfo.h"
#include "flow/IRandom.h"
#include "flow/IndexedSet.h"
#include <limits>
#include <string>
typedef Map<KeyRef, Reference<TCTenantInfo>> TenantMapByPrefix;
class TenantCache : public ReferenceCounted<TenantCache> {
friend class TenantCacheImpl;
friend class TenantCacheUnitTest;
private:
constexpr static uint64_t INVALID_GENERATION = std::numeric_limits<uint64_t>::max();
UID distributorID;
Database cx;
uint64_t generation;
TenantMapByPrefix tenantCache;
// mark the start of a new sweep of the tenant cache
void startRefresh();
void insert(TenantName& tenantName, TenantMapEntry& tenant);
void keep(TenantName& tenantName, TenantMapEntry& tenant);
// return true if a new tenant is inserted into the cache
bool update(TenantName& tenantName, TenantMapEntry& tenant);
// return count of tenants that were found to be stale and removed from the cache
int cleanup();
UID id() const { return distributorID; }
Database dbcx() const { return cx; }
public:
TenantCache(Database cx, UID distributorID) : distributorID(distributorID), cx(cx) {
generation = deterministicRandom()->randomUInt32();
}
Future<Void> build(Database cx);
Future<Void> monitorTenantMap();
std::string desc() const;
bool isTenantKey(KeyRef key) const;
};

View File

@ -78,10 +78,10 @@ struct TransactionWrapper : public ReferenceCounted<TransactionWrapper> {
virtual Version getCommittedVersion() = 0;
// Gets the version vector cached in a transaction
virtual VersionVector getVersionVector() = 0;
virtual Future<VersionVector> getVersionVector() = 0;
// Gets the spanContext of a transaction
virtual SpanContext getSpanContext() = 0;
virtual Future<SpanContext> getSpanContext() = 0;
// Prints debugging messages for a transaction; not implemented for all transaction types
virtual void debugTransaction(UID debugId) {}
@ -160,10 +160,10 @@ struct FlowTransactionWrapper : public TransactionWrapper {
Version getCommittedVersion() override { return transaction.getCommittedVersion(); }
// Gets the version vector cached in a transaction
VersionVector getVersionVector() override { return transaction.getVersionVector(); }
Future<VersionVector> getVersionVector() override { return transaction.getVersionVector(); }
// Gets the spanContext of a transaction
SpanContext getSpanContext() override { return transaction.getSpanContext(); }
Future<SpanContext> getSpanContext() override { return transaction.getSpanContext(); }
// Prints debugging messages for a transaction
void debugTransaction(UID debugId) override { transaction.debugTransaction(debugId); }
@ -230,10 +230,12 @@ struct ThreadTransactionWrapper : public TransactionWrapper {
Version getCommittedVersion() override { return transaction->getCommittedVersion(); }
// Gets the version vector cached in a transaction
VersionVector getVersionVector() override { return transaction->getVersionVector(); }
Future<VersionVector> getVersionVector() override {
return unsafeThreadFutureToFuture(transaction->getVersionVector());
}
// Gets the spanContext of a transaction
SpanContext getSpanContext() override { return transaction->getSpanContext(); }
Future<SpanContext> getSpanContext() override { return unsafeThreadFutureToFuture(transaction->getSpanContext()); }
void addReadConflictRange(KeyRangeRef const& keys) override { transaction->addReadConflictRange(keys); }
};

View File

@ -38,12 +38,10 @@ struct FDBPromiseImpl : FDBPromise {
if (g_network->isOnMainThread()) {
impl.send(*reinterpret_cast<T*>(value));
} else {
onMainThreadVoid(
[impl = impl, val = *reinterpret_cast<T*>(value)]() -> Future<Void> {
impl.send(val);
return Void();
},
nullptr);
onMainThreadVoid([impl = impl, val = *reinterpret_cast<T*>(value)]() -> Future<Void> {
impl.send(val);
return Void();
});
}
}
};
@ -93,13 +91,11 @@ struct FDBLoggerImpl : FDBLogger {
traceFun();
flushTraceFileVoid();
} else {
onMainThreadVoid(
[traceFun]() -> Future<Void> {
traceFun();
flushTraceFileVoid();
return Void();
},
nullptr);
onMainThreadVoid([traceFun]() -> Future<Void> {
traceFun();
flushTraceFileVoid();
return Void();
});
}
}
};

View File

@ -194,14 +194,12 @@ struct ThreadSafetyWorkload : TestWorkload {
info->self->commitBarrier.decrementNumRequired();
// Signal completion back to the main thread
onMainThreadVoid(
[=]() {
if (error.code() != error_code_success)
info->done.sendError(error);
else
info->done.send(Void());
},
nullptr);
onMainThreadVoid([=]() {
if (error.code() != error_code_success)
info->done.sendError(error);
else
info->done.send(Void());
});
THREAD_RETURN;
}

View File

@ -189,19 +189,17 @@ void FileTraceLogWriter::open() {
needsResolve = true;
int errorNum = errno;
onMainThreadVoid(
[finalname = finalname, errorNum] {
TraceEvent(SevWarnAlways, "TraceFileOpenError")
.detail("Filename", finalname)
.detail("ErrorCode", errorNum)
.detail("Error", strerror(errorNum))
.trackLatest("TraceFileOpenError");
},
nullptr);
onMainThreadVoid([finalname = finalname, errorNum] {
TraceEvent(SevWarnAlways, "TraceFileOpenError")
.detail("Filename", finalname)
.detail("ErrorCode", errorNum)
.detail("Error", strerror(errorNum))
.trackLatest("TraceFileOpenError");
});
threadSleep(FLOW_KNOBS->TRACE_RETRY_OPEN_INTERVAL);
}
}
onMainThreadVoid([] { latestEventCache.clear("TraceFileOpenError"); }, nullptr);
onMainThreadVoid([] { latestEventCache.clear("TraceFileOpenError"); });
if (needsResolve) {
issues->resolveIssue("trace_log_could_not_create_file");
}

View File

@ -194,13 +194,13 @@ public:
if (thread_network == this)
stopImmediately();
else
onMainThreadVoid([this] { this->stopImmediately(); }, nullptr);
onMainThreadVoid([this] { this->stopImmediately(); });
}
void addStopCallback(std::function<void()> fn) override {
if (thread_network == this)
stopCallbacks.emplace_back(std::move(fn));
else
onMainThreadVoid([this, fn] { this->stopCallbacks.emplace_back(std::move(fn)); }, nullptr);
onMainThreadVoid([this, fn] { this->stopCallbacks.emplace_back(std::move(fn)); });
}
bool isSimulated() const override { return false; }

View File

@ -39,15 +39,24 @@
namespace internal_thread_helper {
ACTOR template <class F>
void doOnMainThreadVoid(Future<Void> signal, F f, Error* err) {
void doOnMainThreadVoid(Future<Void> signal, F f) {
wait(signal);
if (err && err->code() != invalid_error_code)
try {
f();
} catch (Error& e) {
}
}
ACTOR template <class F, class T>
void doOnMainThreadVoid(Future<Void> signal, F f, T* t, Error T::*member) {
wait(signal);
if (t && (t->*member).code() != invalid_error_code)
return;
try {
f();
} catch (Error& e) {
if (err)
*err = e;
if (t)
t->*member = e;
}
}
@ -63,11 +72,21 @@ void doOnMainThreadVoid(Future<Void> signal, F f, Error* err) {
// WARNING: The error returned in `err` can only be read on the FDB network thread because there is no way to
// order the write to `err` with actions on other threads.
//
// WARNING: The Error member of `T` is accepted as a pointer to a data member so the caller can avoid dereferencing
// `T` until it is initialized on the main thread.
//
// `onMainThreadVoid` is defined here because of the dependency in `ThreadSingleAssignmentVarBase`.
template <class F>
void onMainThreadVoid(F f, Error* err = nullptr, TaskPriority taskID = TaskPriority::DefaultOnMainThread) {
template <class F, class T>
void onMainThreadVoid(F f, T* t, Error T::*member, TaskPriority taskID = TaskPriority::DefaultOnMainThread) {
Promise<Void> signal;
internal_thread_helper::doOnMainThreadVoid(signal.getFuture(), f, err);
internal_thread_helper::doOnMainThreadVoid(signal.getFuture(), f, t, member);
g_network->onMainThread(std::move(signal), taskID);
}
template <class F>
void onMainThreadVoid(F f, TaskPriority taskID = TaskPriority::DefaultOnMainThread) {
Promise<Void> signal;
internal_thread_helper::doOnMainThreadVoid(signal.getFuture(), f);
g_network->onMainThread(std::move(signal), taskID);
}

View File

@ -1413,7 +1413,7 @@ void TraceBatch::dump() {
g_traceLog.writeEvent(buggifyBatch[i].fields, "", false);
}
onMainThreadVoid([]() { g_traceLog.flush(); }, nullptr);
onMainThreadVoid([]() { g_traceLog.flush(); });
eventBatch.clear();
attachBatch.clear();
buggifyBatch.clear();

View File

@ -45,7 +45,7 @@ int main(int argc, char** argv) {
Promise<Void> benchmarksDone;
std::thread benchmarkThread([&]() {
benchmark::RunSpecifiedBenchmarks();
onMainThreadVoid([&]() { benchmarksDone.send(Void()); }, nullptr);
onMainThreadVoid([&]() { benchmarksDone.send(Void()); });
});
auto f = stopNetworkAfter(benchmarksDone.getFuture());
runNetwork();

View File

@ -43,6 +43,7 @@ RUN yum install -y \
telnet-0.17-66.el7 \
traceroute-2.0.22-2.el7 \
unzip-6.0-22.el7_9 \
openssl-1.0.2k-24.el7_9.x86_64 \
vim-enhanced-7.4.629-8.el7_9 && \
yum clean all && \
rm -rf /var/cache/yum

View File

@ -1,10 +1,10 @@
#!/usr/bin/env python3
# entrypoint.py
# sidecar.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2018-2021 Apple Inc. and the FoundationDB project authors
# Copyright 2018-2022 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -22,24 +22,21 @@
import argparse
import hashlib
import ipaddress
import logging
import json
import logging
import os
import re
import shutil
import socket
import ssl
import stat
import time
import traceback
import sys
import tempfile
import time
from functools import partial
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from http.server import HTTPServer, ThreadingHTTPServer, BaseHTTPRequestHandler
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
@ -166,9 +163,7 @@ class Config(object):
)
parser.add_argument(
"--require-not-empty",
help=(
"A file that must be present and non-empty " "in the input directory"
),
help=("A file that must be present and non-empty in the input directory"),
action="append",
)
args = parser.parse_args()
@ -243,7 +238,7 @@ class Config(object):
if self.main_container_version == self.primary_version:
self.substitutions["BINARY_DIR"] = "/usr/bin"
else:
self.substitutions["BINARY_DIR"] = target_path = str(
self.substitutions["BINARY_DIR"] = str(
Path("%s/bin/%s" % (args.main_container_conf_dir, self.primary_version))
)
@ -346,42 +341,16 @@ class ThreadingHTTPServerV6(ThreadingHTTPServer):
address_family = socket.AF_INET6
class Server(BaseHTTPRequestHandler):
class SidecarHandler(BaseHTTPRequestHandler):
# We don't want to load the ssl context for each request so we hold it as a static variable.
ssl_context = None
@classmethod
def start(cls):
"""
This method starts the server.
"""
config = Config.shared()
colon_index = config.bind_address.rindex(":")
port_index = colon_index + 1
address = config.bind_address[:colon_index]
port = config.bind_address[port_index:]
log.info(f"Listening on {address}:{port}")
if address.startswith("[") and address.endswith("]"):
server = ThreadingHTTPServerV6((address[1:-1], int(port)), cls)
else:
server = ThreadingHTTPServer((address, int(port)), cls)
if config.enable_tls:
context = Server.load_ssl_context()
server.socket = context.wrap_socket(server.socket, server_side=True)
observer = Observer()
event_handler = CertificateEventHandler()
for path in set(
[
Path(config.certificate_file).parent.as_posix(),
Path(config.key_file).parent.as_posix(),
]
):
observer.schedule(event_handler, path)
observer.start()
server.serve_forever()
def __init__(self, config, *args, **kwargs):
self.config = config
self.ssl_context = self.__class__.ssl_context
super().__init__(*args, **kwargs)
# This method allows to trigger a reload of the ssl context and updates the static variable.
@classmethod
def load_ssl_context(cls):
config = Config.shared()
@ -390,6 +359,7 @@ class Server(BaseHTTPRequestHandler):
cls.ssl_context.check_hostname = False
cls.ssl_context.verify_mode = ssl.CERT_OPTIONAL
cls.ssl_context.load_cert_chain(config.certificate_file, config.key_file)
return cls.ssl_context
def send_text(self, text, code=200, content_type="text/plain", add_newline=True):
@ -407,16 +377,14 @@ class Server(BaseHTTPRequestHandler):
self.wfile.write(response)
def check_request_cert(self, path):
config = Config.shared()
if path == "/ready":
return True
if not config.enable_tls:
if not self.config.enable_tls:
return True
approved = self.check_cert(
self.connection.getpeercert(), config.peer_verification_rules
self.connection.getpeercert(), self.config.peer_verification_rules
)
if not approved:
self.send_error(401, "Client certificate was not approved")
@ -517,32 +485,33 @@ class Server(BaseHTTPRequestHandler):
if not self.check_request_cert(self.path):
return
if self.path.startswith("/check_hash/"):
file_path = os.path.relpath(self.path, "/check_hash")
try:
self.send_text(
check_hash(os.path.relpath(self.path, "/check_hash")), add_newline=False
self.check_hash(file_path),
add_newline=False,
)
except FileNotFoundError:
self.send_error(404, "Path not found")
self.end_headers()
self.send_error(404, f"{file_path} not found")
if self.path.startswith("/is_present/"):
if is_present(os.path.relpath(self.path, "/is_present")):
file_path = os.path.relpath(self.path, "/is_present")
if self.is_present(file_path):
self.send_text("OK")
else:
self.send_error(404, "Path not found")
self.end_headers()
self.send_error(404, f"{file_path} not found")
elif self.path == "/ready":
self.send_text(ready())
self.send_text("OK")
elif self.path == "/substitutions":
self.send_text(get_substitutions())
self.send_text(self.get_substitutions())
else:
self.send_error(404, "Path not found")
self.end_headers()
except RequestException as e:
self.send_error(400, e.message)
except (ConnectionResetError, BrokenPipeError) as ex:
log.error(f"connection was reset {ex}")
except Exception as ex:
log.error(f"Error processing request {ex}", exc_info=True)
self.send_error(500)
self.end_headers()
def do_POST(self):
"""
@ -552,15 +521,15 @@ class Server(BaseHTTPRequestHandler):
if not self.check_request_cert(self.path):
return
if self.path == "/copy_files":
self.send_text(copy_files())
self.send_text(copy_files(self.config))
elif self.path == "/copy_binaries":
self.send_text(copy_binaries())
self.send_text(copy_binaries(self.config))
elif self.path == "/copy_libraries":
self.send_text(copy_libraries())
self.send_text(copy_libraries(self.config))
elif self.path == "/copy_monitor_conf":
self.send_text(copy_monitor_conf())
self.send_text(copy_monitor_conf(self.config))
elif self.path == "/refresh_certs":
self.send_text(refresh_certs())
self.send_text(self.refresh_certs())
elif self.path == "/restart":
self.send_text("OK")
exit(1)
@ -571,16 +540,38 @@ class Server(BaseHTTPRequestHandler):
raise e
except RequestException as e:
self.send_error(400, e.message)
except e:
log.error("Error processing request", exc_info=True)
except (ConnectionResetError, BrokenPipeError) as ex:
log.error(f"connection was reset {ex}")
except Exception as ex:
log.error(f"Error processing request {ex}", exc_info=True)
self.send_error(500)
self.end_headers()
def log_message(self, format, *args):
log.info(format % args)
def refresh_certs(self):
if not self.config.enable_tls:
raise RequestException("Server is not using TLS")
SidecarHandler.load_ssl_context()
return "OK"
def get_substitutions(self):
return json.dumps(self.config.substitutions)
def check_hash(self, filename):
with open(os.path.join(self.config.output_dir, filename), "rb") as contents:
m = hashlib.sha256()
m.update(contents.read())
return m.hexdigest()
def is_present(self, filename):
return os.path.exists(os.path.join(self.config.output_dir, filename))
class CertificateEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_any_event(self, event):
if event.is_directory:
return None
@ -597,27 +588,15 @@ class CertificateEventHandler(FileSystemEventHandler):
)
time.sleep(10)
log.info("Reloading certificates")
Server.load_ssl_context()
SidecarHandler.load_ssl_context()
def check_hash(filename):
with open(os.path.join(Config.shared().output_dir, filename), "rb") as contents:
m = hashlib.sha256()
m.update(contents.read())
return m.hexdigest()
def is_present(filename):
return os.path.exists(os.path.join(Config.shared().output_dir, filename))
def copy_files():
config = Config.shared()
def copy_files(config):
if config.require_not_empty:
for filename in config.require_not_empty:
path = os.path.join(config.input_dir, filename)
if not os.path.isfile(path) or os.path.getsize(path) == 0:
raise Exception("No contents for file %s" % path)
raise Exception(f"No contents for file {path}")
for filename in config.copy_files:
tmp_file = tempfile.NamedTemporaryFile(
@ -629,8 +608,7 @@ def copy_files():
return "OK"
def copy_binaries():
config = Config.shared()
def copy_binaries(config):
if config.main_container_version != config.primary_version:
for binary in config.copy_binaries:
path = Path(f"/usr/bin/{binary}")
@ -650,8 +628,7 @@ def copy_binaries():
return "OK"
def copy_libraries():
config = Config.shared()
def copy_libraries(config):
for version in config.copy_libraries:
path = Path(f"/var/fdb/lib/libfdb_c_{version}.so")
if version == config.copy_libraries[0]:
@ -670,8 +647,7 @@ def copy_libraries():
return "OK"
def copy_monitor_conf():
config = Config.shared()
def copy_monitor_conf(config):
if config.input_monitor_conf:
with open(
os.path.join(config.input_dir, config.input_monitor_conf)
@ -695,35 +671,58 @@ def copy_monitor_conf():
return "OK"
def get_substitutions():
return json.dumps(Config.shared().substitutions)
def ready():
return "OK"
def refresh_certs():
if not Config.shared().enable_tls:
raise RequestException("Server is not using TLS")
Server.load_ssl_context()
return "OK"
class RequestException(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
def start_sidecar_server(config):
"""
This method starts the HTTP server with the sidecar handler.
"""
colon_index = config.bind_address.rindex(":")
port_index = colon_index + 1
address = config.bind_address[:colon_index]
port = config.bind_address[port_index:]
log.info(f"Listening on {address}:{port}")
handler = partial(
SidecarHandler,
config,
)
if address.startswith("[") and address.endswith("]"):
server = ThreadingHTTPServerV6((address[1:-1], int(port)), handler)
else:
server = ThreadingHTTPServer((address, int(port)), handler)
if config.enable_tls:
context = SidecarHandler.load_ssl_context()
server.socket = context.wrap_socket(server.socket, server_side=True)
observer = Observer()
event_handler = CertificateEventHandler()
for path in set(
[
Path(config.certificate_file).parent.as_posix(),
Path(config.key_file).parent.as_posix(),
]
):
observer.schedule(event_handler, path)
observer.start()
server.serve_forever()
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s")
copy_files()
copy_binaries()
copy_libraries()
copy_monitor_conf()
config = Config.shared()
copy_files(config)
copy_binaries(config)
copy_libraries(config)
copy_monitor_conf(config)
if Config.shared().init_mode:
if config.init_mode:
sys.exit(0)
Server.start()
start_sidecar_server(config)

139
packaging/docker/sidecar_test.py Executable file
View File

@ -0,0 +1,139 @@
#!/usr/bin/env python3
# sidecar_test.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2018-2022 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import shutil
import socket
import tempfile
import unittest
from functools import partial
from http.server import HTTPServer
from threading import Thread
from unittest.mock import MagicMock
import requests
from sidecar import SidecarHandler
# This test suite starts a real server with a mocked configuration and will do some requests against it.
class TestSidecar(unittest.TestCase):
def setUp(self):
super(TestSidecar, self).setUp()
self.get_free_port()
self.server_url = f"http://localhost:{self.test_server_port}"
self.mock_config = MagicMock()
# We don't want to use TLS for the local tests for now.
self.mock_config.enable_tls = False
self.mock_config.output_dir = tempfile.mkdtemp()
handler = partial(
SidecarHandler,
self.mock_config,
)
self.mock_server = HTTPServer(("localhost", self.test_server_port), handler)
# Start running mock server in a separate thread.
# Daemon threads automatically shut down when the main process exits.
self.mock_server_thread = Thread(target=self.mock_server.serve_forever)
self.mock_server_thread.setDaemon(True)
self.mock_server_thread.start()
def tearDown(self):
shutil.rmtree(self.mock_config.output_dir)
super(TestSidecar, self).tearDown()
# Helper method to get a free port
def get_free_port(self):
s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM)
s.bind(("localhost", 0))
__, port = s.getsockname()
s.close()
self.test_server_port = port
def test_get_ready(self):
r = requests.get(f"{self.server_url }/ready")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, "OK\n")
def test_get_substitutions(self):
expected = {"key": "value"}
self.mock_config.substitutions = expected
r = requests.get(f"{self.server_url }/substitutions")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.json(), expected)
def test_get_check_hash_no_found(self):
r = requests.get(f"{self.server_url }/check_hash/foobar")
self.assertEqual(r.status_code, 404)
self.assertRegex(r.text, "foobar not found")
def test_get_check_hash(self):
with open(os.path.join(self.mock_config.output_dir, "foobar"), "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/check_hash/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(
r.text, "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
)
def test_get_check_hash_nested(self):
test_path = os.path.join(self.mock_config.output_dir, "nested/foobar")
os.makedirs(os.path.dirname(test_path), exist_ok=True)
with open(test_path, "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/check_hash/nested/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(
r.text, "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
)
def test_get_is_present_no_found(self):
r = requests.get(f"{self.server_url }/is_present/foobar")
self.assertEqual(r.status_code, 404)
self.assertRegex(r.text, "foobar not found")
def test_get_is_present(self):
with open(os.path.join(self.mock_config.output_dir, "foobar"), "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/is_present/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, "OK\n")
def test_get_is_present_nested(self):
test_path = os.path.join(self.mock_config.output_dir, "nested/foobar")
os.makedirs(os.path.dirname(test_path), exist_ok=True)
with open(test_path, "w") as f:
f.write("hello world")
r = requests.get(f"{self.server_url }/is_present/nested/foobar")
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, "OK\n")
def test_get_not_found(self):
r = requests.get(f"{self.server_url }/foobar")
self.assertEqual(r.status_code, 404)
self.assertRegex(r.text, "Path not found")
# TODO(johscheuer): Add test cases for post requests.
# TODO(johscheuer): Add test cases for TLS.
if __name__ == "__main__":
unittest.main()