Merge branch 'master' into move-fdbserver-arguments

This commit is contained in:
A.J. Beamon 2019-07-30 15:13:44 -07:00
commit 08cdd8e788
12 changed files with 84 additions and 35 deletions

View File

@ -27,6 +27,7 @@ Fixes
* During an upgrade, the multi-version client now persists database default options and transaction options that aren't reset on retry (e.g. transaction timeout). In order for these options to function correctly during an upgrade, a 6.2 or later client should be used as the primary client. `(PR #1767) <https://github.com/apple/foundationdb/pull/1767>`_.
* If a cluster is upgraded during an ``onError`` call, the cluster could return a ``cluster_version_changed`` error. `(PR #1734) <https://github.com/apple/foundationdb/pull/1734>`_.
* Do not set doBuildTeams in StorageServerTracker unless a storage server's interface changes, in order to avoid unnecessary work. `(PR #1779) <https://github.com/apple/foundationdb/pull/1779>`_.
* Data distribution will now pick a random destination when merging shards in the ``\xff`` keyspace. This avoids an issue with backup where the write-heavy mutation log shards could concentrate on a single process that has less data than everybody else. `(PR #1916) <https://github.com/apple/foundationdb/pull/1916>`_.
Status
------
@ -36,6 +37,7 @@ Status
* Added transaction start counts by priority to ``cluster.workload.transactions``. The new counters are named ``started_immediate_priority``, ``started_default_priority``, and ``started_batch_priority``. `(PR #1836) <https://github.com/apple/foundationdb/pull/1836>`_.
* Remove ``cluster.datacenter_version_difference`` and replace it with ``cluster.datacenter_lag`` that has subfields ``versions`` and ``seconds``. `(PR #1800) <https://github.com/apple/foundationdb/pull/1800>`_.
* Added ``local_rate`` to the ``roles`` section to record the throttling rate of the local ratekeeper `(PR #1712) <http://github.com/apple/foundationdb/pull/1712>`_.
* ``fdbcli`` status now reports the configured zone count. The fault tolerance is now reported in terms of the number of zones unless machine IDs are being used as zone IDs. `(PR #1924) <https://github.com/apple/foundationdb/pull/1924>`_.
Bindings
--------

View File

@ -995,6 +995,9 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
StatusObjectReader machinesMap;
outputStringCache = outputString;
bool machinesAreZones = true;
std::map<std::string, int> zones;
try {
outputString += "\n FoundationDB processes - ";
if (statusObjCluster.get("processes", processesMap)) {
@ -1005,10 +1008,24 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
int processExclusions = 0;
for (auto p : processesMap.obj()) {
StatusObjectReader process(p.second);
if (process.has("excluded") && process.last().get_bool())
bool excluded = process.has("excluded") && process.last().get_bool();
if (excluded) {
processExclusions++;
}
if (process.has("messages") && process.last().get_array().size()){
errors ++;
errors++;
}
std::string zoneId;
if (process.get("locality.zoneid", zoneId)) {
std::string machineId;
if (!process.get("locality.machineid", machineId) || machineId != zoneId) {
machinesAreZones = false;
}
int& nonExcluded = zones[zoneId];
if(!excluded) {
nonExcluded = 1;
}
}
}
@ -1019,6 +1036,21 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
} else
outputString += "unknown";
if (zones.size() > 0) {
outputString += format("\n Zones - %d", zones.size());
int zoneExclusions = 0;
for (auto itr : zones) {
if (itr.second == 0) {
++zoneExclusions;
}
}
if (zoneExclusions > 0) {
outputString += format(" (less %d excluded)", zoneExclusions);
}
} else {
outputString += "\n Zones - unknown";
}
outputString += "\n Machines - ";
if (statusObjCluster.get("machines", machinesMap)) {
outputString += format("%d", machinesMap.obj().size());
@ -1073,15 +1105,15 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
outputString += "\n Fault Tolerance - ";
int minLoss = std::min(availLoss, dataLoss);
const char *faultDomain = machinesAreZones ? "machine" : "zone";
if (minLoss == 1)
outputString += "1 machine";
outputString += format("1 %s", faultDomain);
else
outputString += format("%d machines", minLoss);
outputString += format("%d %ss", minLoss, faultDomain);
if (dataLoss > availLoss){
outputString += format(" (%d without data loss)", dataLoss);
}
}
}

View File

@ -50,7 +50,7 @@ struct StorageServerInterface {
RequestStream<struct GetShardStateRequest> getShardState;
RequestStream<struct WaitMetricsRequest> waitMetrics;
RequestStream<struct SplitMetricsRequest> splitMetrics;
RequestStream<struct GetPhysicalMetricsRequest> getPhysicalMetrics;
RequestStream<struct GetStorageMetricsRequest> getStorageMetrics;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct StorageQueuingMetricsRequest> getQueuingMetrics;
@ -69,11 +69,11 @@ struct StorageServerInterface {
if constexpr (!is_fb_function<Ar>) {
serializer(ar, uniqueID, locality, getVersion, getValue, getKey, getKeyValues, getShardState, waitMetrics,
splitMetrics, getPhysicalMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType);
splitMetrics, getStorageMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType);
if (ar.protocolVersion().hasWatches()) serializer(ar, watchValue);
} else {
serializer(ar, uniqueID, locality, getVersion, getValue, getKey, getKeyValues, getShardState, waitMetrics,
splitMetrics, getPhysicalMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType,
splitMetrics, getStorageMetrics, waitFailure, getQueuingMetrics, getKeyValueStoreType,
watchValue);
}
}
@ -340,21 +340,24 @@ struct SplitMetricsRequest {
}
};
struct GetPhysicalMetricsReply {
struct GetStorageMetricsReply {
constexpr static FileIdentifier file_identifier = 15491478;
StorageMetrics load;
StorageMetrics free;
StorageMetrics capacity;
double bytesInputRate;
GetStorageMetricsReply() : bytesInputRate(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, load, free, capacity);
serializer(ar, load, free, capacity, bytesInputRate);
}
};
struct GetPhysicalMetricsRequest {
struct GetStorageMetricsRequest {
constexpr static FileIdentifier file_identifier = 13290999;
ReplyPromise<GetPhysicalMetricsReply> reply;
ReplyPromise<GetStorageMetricsReply> reply;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -50,7 +50,7 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
Reference<TCMachineInfo> machine;
Future<Void> tracker;
int64_t dataInFlightToServer;
ErrorOr<GetPhysicalMetricsReply> serverMetrics;
ErrorOr<GetStorageMetricsReply> serverMetrics;
Promise<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged;
Future<std::pair<StorageServerInterface, ProcessClass>> onInterfaceChanged;
Promise<Void> removed;
@ -91,14 +91,14 @@ struct TCMachineInfo : public ReferenceCounted<TCMachineInfo> {
ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
state StorageServerInterface ssi = server->lastKnownInterface;
state Future<ErrorOr<GetPhysicalMetricsReply>> metricsRequest = ssi.getPhysicalMetrics.tryGetReply( GetPhysicalMetricsRequest(), TaskPriority::DataDistributionLaunch );
state Future<ErrorOr<GetStorageMetricsReply>> metricsRequest = ssi.getStorageMetrics.tryGetReply( GetStorageMetricsRequest(), TaskPriority::DataDistributionLaunch );
state Future<Void> resetRequest = Never();
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged( server->onInterfaceChanged );
state Future<Void> serverRemoved( server->onRemoved );
loop {
choose {
when( ErrorOr<GetPhysicalMetricsReply> rep = wait( metricsRequest ) ) {
when( ErrorOr<GetStorageMetricsReply> rep = wait( metricsRequest ) ) {
if( rep.present() ) {
server->serverMetrics = rep;
if(server->updated.canBeSet()) {
@ -118,12 +118,12 @@ ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
return Void();
}
when( wait( resetRequest ) ) { //To prevent a tight spin loop
if(IFailureMonitor::failureMonitor().getState(ssi.getPhysicalMetrics.getEndpoint()).isFailed()) {
resetRequest = IFailureMonitor::failureMonitor().onStateEqual(ssi.getPhysicalMetrics.getEndpoint(), FailureStatus(false));
if(IFailureMonitor::failureMonitor().getState(ssi.getStorageMetrics.getEndpoint()).isFailed()) {
resetRequest = IFailureMonitor::failureMonitor().onStateEqual(ssi.getStorageMetrics.getEndpoint(), FailureStatus(false));
}
else {
resetRequest = Never();
metricsRequest = ssi.getPhysicalMetrics.tryGetReply( GetPhysicalMetricsRequest(), TaskPriority::DataDistributionLaunch );
metricsRequest = ssi.getStorageMetrics.tryGetReply( GetStorageMetricsRequest(), TaskPriority::DataDistributionLaunch );
}
}
}
@ -292,8 +292,8 @@ public:
return getMinFreeSpaceRatio() > SERVER_KNOBS->MIN_FREE_SPACE_RATIO && getMinFreeSpace() > SERVER_KNOBS->MIN_FREE_SPACE;
}
virtual Future<Void> updatePhysicalMetrics() {
return doUpdatePhysicalMetrics( this );
virtual Future<Void> updateStorageMetrics() {
return doUpdateStorageMetrics( this );
}
virtual bool isOptimal() {
@ -341,7 +341,7 @@ private:
// Calculate the max of the metrics replies that we received.
ACTOR Future<Void> doUpdatePhysicalMetrics( TCTeamInfo* self ) {
ACTOR Future<Void> doUpdateStorageMetrics( TCTeamInfo* self ) {
std::vector<Future<Void>> updates;
for( int i = 0; i< self->servers.size(); i++ )
updates.push_back( updateServerMetrics( self->servers[i] ) );

View File

@ -80,7 +80,7 @@ struct IDataDistributionTeam {
virtual int64_t getMinFreeSpace( bool includeInFlight = true ) = 0;
virtual double getMinFreeSpaceRatio( bool includeInFlight = true ) = 0;
virtual bool hasHealthyFreeSpace() = 0;
virtual Future<Void> updatePhysicalMetrics() = 0;
virtual Future<Void> updateStorageMetrics() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
virtual bool isHealthy() = 0;

View File

@ -52,7 +52,14 @@ struct RelocateData {
rs.priority == PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
rs.priority == PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
rs.priority == PRIORITY_SPLIT_SHARD ||
rs.priority == PRIORITY_TEAM_REDUNDANT ), interval("QueuedRelocation") {}
rs.priority == PRIORITY_TEAM_REDUNDANT ||
mergeWantsNewServers(rs.keys, rs.priority)), interval("QueuedRelocation") {}
static bool mergeWantsNewServers(KeyRangeRef keys, int priority) {
return priority == PRIORITY_MERGE_SHARD &&
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 2 ||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 1 && keys.begin.startsWith(LiteralStringRef("\xff"))));
}
bool operator> (const RelocateData& rhs) const {
return priority != rhs.priority ? priority > rhs.priority : ( startTime != rhs.startTime ? startTime < rhs.startTime : randomId > rhs.randomId );
@ -181,11 +188,11 @@ public:
});
}
virtual Future<Void> updatePhysicalMetrics() {
virtual Future<Void> updateStorageMetrics() {
vector<Future<Void>> futures;
for (auto it = teams.begin(); it != teams.end(); it++) {
futures.push_back((*it)->updatePhysicalMetrics());
futures.push_back((*it)->updateStorageMetrics());
}
return waitForAll(futures);
}
@ -1049,7 +1056,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
if( error.code() != error_code_move_to_removed_server ) {
if( !error.code() ) {
try {
wait( healthyDestinations.updatePhysicalMetrics() ); //prevent a gap between the polling for an increase in physical metrics and decrementing data in flight
wait( healthyDestinations.updateStorageMetrics() ); //prevent a gap between the polling for an increase in storage metrics and decrementing data in flight
} catch( Error& e ) {
error = e;
}

View File

@ -101,6 +101,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( INFLIGHT_PENALTY_HEALTHY, 1.0 );
init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 );
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );
init( MERGE_ONTO_NEW_TEAM, 1 ); if( randomize && BUGGIFY ) MERGE_ONTO_NEW_TEAM = deterministicRandom()->coinflip() ? 0 : 2;
// Data distribution
init( RETRY_RELOCATESHARD_DELAY, 0.1 );

View File

@ -102,6 +102,7 @@ public:
double INFLIGHT_PENALTY_REDUNDANT;
double INFLIGHT_PENALTY_UNHEALTHY;
double INFLIGHT_PENALTY_ONE_LEFT;
int MERGE_ONTO_NEW_TEAM; // Merges will request new servers. 0 for off, 1 for \xff only, 2 for all shards.
// Data distribution
double RETRY_RELOCATESHARD_DELAY;

View File

@ -342,18 +342,19 @@ struct StorageServerMetrics {
}
}
void getPhysicalMetrics( GetPhysicalMetricsRequest req, StorageBytes sb ){
GetPhysicalMetricsReply rep;
void getStorageMetrics( GetStorageMetricsRequest req, StorageBytes sb, double bytesInputRate ){
GetStorageMetricsReply rep;
// SOMEDAY: make bytes dynamic with hard disk space
rep.load = getMetrics(allKeys);
if (sb.free < 1e9 && deterministicRandom()->random01() < 0.1)
if (sb.free < 1e9 && deterministicRandom()->random01() < 0.1) {
TraceEvent(SevWarn, "PhysicalDiskMetrics")
.detail("Free", sb.free)
.detail("Total", sb.total)
.detail("Available", sb.available)
.detail("Load", rep.load.bytes);
}
rep.free.bytes = sb.free;
rep.free.iosPerKSecond = 10e6;
@ -363,6 +364,8 @@ struct StorageServerMetrics {
rep.capacity.iosPerKSecond = 10e6;
rep.capacity.bytesPerKSecond = 100e9;
rep.bytesInputRate = bytesInputRate;
req.reply.send(rep);
}

View File

@ -3399,9 +3399,9 @@ ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi
self->metrics.splitMetrics( req );
}
}
when (GetPhysicalMetricsRequest req = waitNext(ssi.getPhysicalMetrics.getFuture())) {
when (GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) {
StorageBytes sb = self->storage.getStorageBytes();
self->metrics.getPhysicalMetrics( req, sb );
self->metrics.getStorageMetrics( req, sb, self->counters.bytesInput.getRate() );
}
when (wait(doPollMetrics) ) {
self->metrics.poll();

View File

@ -570,7 +570,7 @@ ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getPhysicalMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
@ -847,7 +847,7 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getPhysicalMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
@ -1074,7 +1074,7 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getPhysicalMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);

View File

@ -96,7 +96,7 @@ public: // introduced features
//
// xyzdev
// vvvv
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B062000001LL);
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B062010001LL);
// This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to
// change when we reach version 10.
static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version");