Merge branch 'master' into backup-container-refactor

# Conflicts:
#	fdbclient/FileBackupAgent.actor.cpp
This commit is contained in:
Stephen Atherton 2017-11-25 19:54:51 -08:00
commit 1b1c8e985a
26 changed files with 447 additions and 315 deletions

View File

@ -653,6 +653,10 @@ std::string logBackupDR(const char *context, std::map<std::string, std::string>
}
void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level, bool displayDatabaseAvailable = true, bool hideErrorMessages = false) {
if (FlowTransport::transport().incompatibleOutgoingConnectionsPresent()) {
printf("WARNING: One or more of the processes in the cluster is incompatible with this version of fdbcli.\n\n");
}
try {
bool printedCoordinators = false;

View File

@ -24,10 +24,11 @@
#include "FDBTypes.h"
struct MutationRef {
static const char * typeString[] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP };
const char * typeString[MAX_ATOMIC_OP] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
// This is stored this way for serialization purposes.
uint8_t type;
StringRef param1, param2;

View File

@ -93,6 +93,7 @@ private:
struct LeaderInfo {
UID changeID;
uint64_t mask = ~(15ll << 60);
Value serializedInfo;
bool forward; // If true, serializedInfo is a connection string instead!
@ -103,32 +104,24 @@ struct LeaderInfo {
bool operator == (LeaderInfo const& r) const { return changeID == r.changeID; }
// The first 4 bits of ChangeID represent cluster controller process class fitness, the lower the better
bool updateChangeID(uint64_t processClassFitness) {
uint64_t mask = 15ll << 60;
processClassFitness <<= 60;
if ((changeID.first() & mask) == processClassFitness) {
return false;
}
changeID = UID((changeID.first() & ~mask) | processClassFitness, changeID.second());
return true;
void updateChangeID(uint64_t processClassFitness, bool isExcluded) {
changeID = UID( ( (uint64_t)isExcluded << 63) | (processClassFitness << 60) | (changeID.first() & mask ), changeID.second() );
}
// Change leader only if the candidate has better process class fitness
bool leaderChangeRequired(LeaderInfo const& candidate) const {
uint64_t mask = 15ll << 60;
if ((changeID.first() & mask) > (candidate.changeID.first() & mask)) {
// All but the first 4 bits are used to represent process id
bool equalInternalId(LeaderInfo const& leaderInfo) const {
if ( (changeID.first() & mask) == (leaderInfo.changeID.first() & mask) && changeID.second() == leaderInfo.changeID.second() ) {
return true;
} else {
return false;
}
}
// All but the first 4 bits are used to represent process id
bool equalInternalId(LeaderInfo const& leaderInfo) const {
uint64_t mask = ~(15ll << 60);
if ((changeID.first() & mask) == (leaderInfo.changeID.first() & mask)) {
// Change leader only if
// 1. the candidate has better process class fitness and the candidate is not the leader
// 2. the leader process class fitness become worse
bool leaderChangeRequired(LeaderInfo const& candidate) const {
if ( ((changeID.first() & ~mask) > (candidate.changeID.first() & ~mask) && !equalInternalId(candidate)) || ((changeID.first() & ~mask) < (candidate.changeID.first() & ~mask) && equalInternalId(candidate)) ) {
return true;
} else {
return false;

View File

@ -841,6 +841,7 @@ namespace fileBackup {
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Key begin, Key end, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>(), int priority = 0) {
TraceEvent(SevInfo, "FBA_schedBackupRangeTask").detail("begin", printable(begin)).detail("end", printable(end));
Key key = wait(addBackupTask(BackupRangeTaskFunc::name,
BackupRangeTaskFunc::version,
tr, taskBucket, completionKey,
@ -898,6 +899,7 @@ namespace fileBackup {
}
state bool done = false;
state int64_t nrKeys = 0;
loop{
state RangeResultWithVersion values;
@ -920,9 +922,17 @@ namespace fileBackup {
Void _ = wait(rangeFile.writeKey(nextKey));
bool keepGoing = wait(finishRangeFile(outFile, cx, task, taskBucket, KeyRangeRef(beginKey, nextKey), outVersion));
TraceEvent("FileBackupWroteRangeFile")
.detail("Size", outFile->size())
.detail("Keys", nrKeys)
.detail("BeginKey", beginKey)
.detail("EndKey", nextKey)
.detail("FileDiscarded", keepGoing ? "No" : "Yes");
if(!keepGoing)
return Void();
nrKeys = 0;
beginKey = nextKey;
}
@ -947,6 +957,7 @@ namespace fileBackup {
lastKey = values.first[i].key;
Void _ = wait(rangeFile.writeKV(lastKey, values.first[i].value));
}
nrKeys += values.first.size();
}
}
@ -984,6 +995,8 @@ namespace fileBackup {
Void _ = wait(taskFuture->set(tr, taskBucket));
}
TraceEvent(SevInfo, "FBA_endBackupRangeTask").detail("begin", printable(Params.beginKey().get(task))).detail("end", printable(Params.endKey().get(task)));
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}

View File

@ -2507,6 +2507,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
if (info.debugID.present())
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
req.transaction.read_snapshot = 0;
try {
Version v = wait( readVersion );
req.transaction.read_snapshot = v;

View File

@ -177,6 +177,7 @@ public:
Int64MetricHandle countConnClosedWithoutError;
std::map<NetworkAddress, std::pair<uint64_t, double>> incompatiblePeers;
uint32_t numIncompatibleConnections;
std::map<uint64_t, double> multiVersionConnections;
double lastIncompatibleMessage;
uint64_t transportId;
@ -572,113 +573,125 @@ ACTOR static Future<Void> connectionReader(
if (peer == nullptr) {
ASSERT( !peerAddress.isPublic() );
}
loop {
try {
loop {
int readAllBytes = buffer_end - unprocessed_end;
if (readAllBytes < 4096) {
Arena newArena;
int unproc_len = unprocessed_end - unprocessed_begin;
int len = std::max( 65536, unproc_len*2 );
uint8_t* newBuffer = new (newArena) uint8_t[ len ];
memcpy( newBuffer, unprocessed_begin, unproc_len );
arena = newArena;
unprocessed_begin = newBuffer;
unprocessed_end = newBuffer + unproc_len;
buffer_end = newBuffer + len;
readAllBytes = buffer_end - unprocessed_end;
}
loop {
int readAllBytes = buffer_end - unprocessed_end;
if (readAllBytes < 4096) {
Arena newArena;
int unproc_len = unprocessed_end - unprocessed_begin;
int len = std::max( 65536, unproc_len*2 );
uint8_t* newBuffer = new (newArena) uint8_t[ len ];
memcpy( newBuffer, unprocessed_begin, unproc_len );
arena = newArena;
unprocessed_begin = newBuffer;
unprocessed_end = newBuffer + unproc_len;
buffer_end = newBuffer + len;
readAllBytes = buffer_end - unprocessed_end;
}
int readBytes = conn->read( unprocessed_end, buffer_end );
if (!readBytes) break;
state bool readWillBlock = readBytes != readAllBytes;
unprocessed_end += readBytes;
int readBytes = conn->read( unprocessed_end, buffer_end );
if (!readBytes) break;
state bool readWillBlock = readBytes != readAllBytes;
unprocessed_end += readBytes;
if (expectConnectPacket && unprocessed_end-unprocessed_begin>=CONNECT_PACKET_V0_SIZE) {
// At the beginning of a connection, we expect to receive a packet containing the protocol version and the listening port of the remote process
ConnectPacket* p = (ConnectPacket*)unprocessed_begin;
if (expectConnectPacket && unprocessed_end-unprocessed_begin>=CONNECT_PACKET_V0_SIZE) {
// At the beginning of a connection, we expect to receive a packet containing the protocol version and the listening port of the remote process
ConnectPacket* p = (ConnectPacket*)unprocessed_begin;
uint64_t connectionId = 0;
int32_t connectPacketSize = p->minimumSize();
if ( unprocessed_end-unprocessed_begin >= connectPacketSize ) {
if(p->protocolVersion >= 0x0FDB00A444020001) {
connectionId = p->connectionId;
}
uint64_t connectionId = 0;
int32_t connectPacketSize = p->minimumSize();
if ( unprocessed_end-unprocessed_begin >= connectPacketSize ) {
if(p->protocolVersion >= 0x0FDB00A444020001) {
connectionId = p->connectionId;
}
if( (p->protocolVersion&compatibleProtocolVersionMask) != (currentProtocolVersion&compatibleProtocolVersionMask) ) {
NetworkAddress addr = p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress();
if(connectionId != 1) addr.port = 0;
if( (p->protocolVersion&compatibleProtocolVersionMask) != (currentProtocolVersion&compatibleProtocolVersionMask) ) {
NetworkAddress addr = p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress();
if(connectionId != 1) addr.port = 0;
if(!transport->multiVersionConnections.count(connectionId)) {
if(now() - transport->lastIncompatibleMessage > FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) {
TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID())
.detail("Reason", "IncompatibleProtocolVersion")
.detail("LocalVersion", currentProtocolVersion)
.detail("RejectedVersion", p->protocolVersion)
.detail("VersionMask", compatibleProtocolVersionMask)
.detail("Peer", p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress())
.detail("ConnectionId", connectionId);
transport->lastIncompatibleMessage = now();
if(!transport->multiVersionConnections.count(connectionId)) {
if(now() - transport->lastIncompatibleMessage > FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) {
TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID())
.detail("Reason", "IncompatibleProtocolVersion")
.detail("LocalVersion", currentProtocolVersion)
.detail("RejectedVersion", p->protocolVersion)
.detail("VersionMask", compatibleProtocolVersionMask)
.detail("Peer", p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress())
.detail("ConnectionId", connectionId);
transport->lastIncompatibleMessage = now();
}
if(!transport->incompatiblePeers.count(addr)) {
transport->incompatiblePeers[ addr ] = std::make_pair(connectionId, now());
}
} else if(connectionId > 1) {
transport->multiVersionConnections[connectionId] = now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
}
if(!transport->incompatiblePeers.count(addr)) {
transport->incompatiblePeers[ addr ] = std::make_pair(connectionId, now());
compatible = false;
if(p->protocolVersion < 0x0FDB00A551000000LL) {
// Older versions expected us to hang up. It may work even if we don't hang up here, but it's safer to keep the old behavior.
throw incompatible_protocol_version();
}
} else if(connectionId > 1) {
}
else {
compatible = true;
TraceEvent("ConnectionEstablished", conn->getDebugID())
.detail("Peer", conn->getPeerAddress())
.detail("ConnectionId", connectionId);
}
if(connectionId > 1) {
transport->multiVersionConnections[connectionId] = now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
}
unprocessed_begin += connectPacketSize;
expectConnectPacket = false;
compatible = false;
if(p->protocolVersion < 0x0FDB00A551000000LL) {
// Older versions expected us to hang up. It may work even if we don't hang up here, but it's safer to keep the old behavior.
throw incompatible_protocol_version();
peerProtocolVersion = p->protocolVersion;
if (peer != nullptr) {
// Outgoing connection; port information should be what we expect
TraceEvent("ConnectedOutgoing").detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) );
peer->compatible = compatible;
if (!compatible)
peer->transport->numIncompatibleConnections++;
ASSERT( p->canonicalRemotePort == peerAddress.port );
} else {
if (p->canonicalRemotePort) {
peerAddress = NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort, true, peerAddress.isTLS() );
}
peer = transport->getPeer(peerAddress);
peer->compatible = compatible;
if (!compatible)
peer->transport->numIncompatibleConnections++;
onConnected.send( peer );
Void _ = wait( delay(0) ); // Check for cancellation
}
}
else {
compatible = true;
TraceEvent("ConnectionEstablished", conn->getDebugID())
.detail("Peer", conn->getPeerAddress())
.detail("ConnectionId", connectionId);
}
if(connectionId > 1) {
transport->multiVersionConnections[connectionId] = now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
}
unprocessed_begin += connectPacketSize;
expectConnectPacket = false;
peerProtocolVersion = p->protocolVersion;
if (peer != nullptr) {
// Outgoing connection; port information should be what we expect
TraceEvent("ConnectedOutgoing").detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) );
peer->compatible = compatible;
ASSERT( p->canonicalRemotePort == peerAddress.port );
} else {
if (p->canonicalRemotePort) {
peerAddress = NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort, true, peerAddress.isTLS() );
}
peer = transport->getPeer(peerAddress);
peer->compatible = compatible;
onConnected.send( peer );
Void _ = wait( delay(0) ); // Check for cancellation
}
}
}
if (compatible) {
scanPackets( transport, unprocessed_begin, unprocessed_end, arena, peerAddress, peerProtocolVersion );
}
else if(!expectConnectPacket) {
unprocessed_begin = unprocessed_end;
peer->incompatibleDataRead.set(true);
if (compatible) {
scanPackets( transport, unprocessed_begin, unprocessed_end, arena, peerAddress, peerProtocolVersion );
}
else if(!expectConnectPacket) {
unprocessed_begin = unprocessed_end;
peer->incompatibleDataRead.set(true);
}
if (readWillBlock)
break;
Void _ = wait(yield(TaskReadSocket));
}
if (readWillBlock)
break;
Void _ = wait(yield(TaskReadSocket));
Void _ = wait( conn->onReadable() );
Void _ = wait(delay(0, TaskReadSocket)); // We don't want to call conn->read directly from the reactor - we could get stuck in the reactor reading 1 packet at a time
}
Void _ = wait( conn->onReadable() );
Void _ = wait(delay(0, TaskReadSocket)); // We don't want to call conn->read directly from the reactor - we could get stuck in the reactor reading 1 packet at a time
}
catch (Error& e) {
if (peer && !peer->compatible) {
ASSERT(peer->transport->numIncompatibleConnections > 0);
peer->transport->numIncompatibleConnections--;
}
throw;
}
}
@ -935,6 +948,10 @@ int FlowTransport::getEndpointCount() {
return -1;
}
bool FlowTransport::incompatibleOutgoingConnectionsPresent() {
return self->numIncompatibleConnections;
}
void FlowTransport::createInstance( uint64_t transportId )
{
g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor());

View File

@ -107,6 +107,8 @@ public:
int getEndpointCount();
// for tracing only
bool incompatibleOutgoingConnectionsPresent();
static FlowTransport& transport() { return *static_cast<FlowTransport*>((void*) g_network->global(INetwork::enFlowTransport)); }
static NetworkAddress getGlobalLocalAddress() { return transport().getLocalAddress(); }

View File

@ -385,7 +385,6 @@ protected:
LocalityEntry const& add(LocalityEntry const& entry, LocalityData const& data) {
_entryArray.push_back(entry);
_mutableEntryArray.push_back(entry);
ASSERT(data._data.size() > 0);
// Ensure that the key value array is large enough to hold the values
if (_keyValueArray.capacity() < _keyValueArray.size() + data._data.size()) {
@ -419,7 +418,6 @@ protected:
if (_keyValueArray.capacity() < _keyValueArray.size() + record->_dataMap->size()) {
_keyValueArray.reserve(_keyValueArray.size() + record->_dataMap->size());
}
ASSERT(record->_dataMap->_keyvaluearray.size() > 0);
for (auto& keyValuePair : record->_dataMap->_keyvaluearray) {
auto keyString = _localitygroup->keyText(keyValuePair.first);

View File

@ -68,6 +68,11 @@ struct IReplicationPolicy : public ReferenceCounted<IReplicationPolicy> {
std::vector<LocalityEntry> const& solutionSet,
std::vector<LocalityEntry> const& alsoServers,
LocalitySetRef const& fromServers );
// Returns a set of the attributes that this policy uses in selection and validation.
std::set<std::string> attributeKeys() const
{ std::set<std::string> keys; this->attributeKeys(&keys); return keys; }
virtual void attributeKeys(std::set<std::string>*) const = 0;
};
template <class Archive>
@ -108,6 +113,7 @@ struct PolicyOne : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
template <class Ar>
void serialize(Ar& ar) {
}
virtual void attributeKeys(std::set<std::string>* set) const override { return; }
};
struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross> {
@ -135,6 +141,9 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross>
static bool compareAddedResults(const std::pair<int, int>& rhs, const std::pair<int, int>& lhs)
{ return (rhs.first < lhs.first) || (!(lhs.first < rhs.first) && (rhs.second < lhs.second)); }
virtual void attributeKeys(std::set<std::string> *set) const override
{ set->insert(_attribKey); _policy->attributeKeys(set); }
protected:
int _count;
std::string _attribKey;
@ -207,6 +216,9 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
}
}
virtual void attributeKeys(std::set<std::string> *set) const override
{ for (const IRepPolicyRef& r : _policies) { r->attributeKeys(set); } }
protected:
std::vector<IRepPolicyRef> _policies;
std::vector<IRepPolicyRef> _sortedPolicies;

View File

@ -807,6 +807,31 @@ int testReplication()
return totalErrors;
}
namespace {
void filterLocalityDataForPolicy(const std::set<std::string>& keys, LocalityData* ld) {
for (auto iter = ld->_data.begin(); iter != ld->_data.end();) {
auto prev = iter;
iter++;
if (keys.find(prev->first.toString()) == keys.end()) {
ld->_data.erase(prev);
}
}
}
}
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld) {
if (!policy) return;
filterLocalityDataForPolicy(policy->attributeKeys(), ld);
}
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld) {
if (!policy) return;
std::set<std::string> keys = policy->attributeKeys();
for (LocalityData& ld : *vld) {
filterLocalityDataForPolicy(policy, &ld);
}
}
TEST_CASE("fdbrpc/Replication/test") {
printf("Running replication test\n");

View File

@ -71,4 +71,9 @@ extern bool validateAllCombinations(
std::vector<LocalityData> const& newItems,
unsigned int nCombinationSize,
bool bCheckIfValid = true);
/// Remove all pieces of locality information from the LocalityData that will not be used when validating the policy.
void filterLocalityDataForPolicy(IRepPolicyRef policy, LocalityData* ld);
void filterLocalityDataForPolicy(IRepPolicyRef policy, std::vector<LocalityData>* vld);
#endif

View File

@ -126,17 +126,8 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) {
if(Optional<StringRef>(m.param2) != txnStateStore->readValue(m.param1).get().cast_to<StringRef>()) { // FIXME: Make this check more specific, here or by reading configuration whenever there is a change
auto t = txnStateStore->readValue(m.param1).get();
if (logSystem && m.param1.startsWith( excludedServersPrefix )) {
// If one of our existing tLogs is now excluded, we have to die and recover
auto addr = decodeExcludedServersKey(m.param1);
for( auto tl : logSystem->getLogSystemConfig().tLogs ) {
if(!tl.present() || addr.excludes(tl.interf().commit.getEndpoint().address)) {
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("toCommit", toCommit!=NULL).detail("addr", addr.toString());
if(confChange) *confChange = true;
}
}
} else if(m.param1 != excludedServersVersionKey) {
if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey) {
auto t = txnStateStore->readValue(m.param1).get();
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("toCommit", toCommit!=NULL);
if(confChange) *confChange = true;
}

View File

@ -43,19 +43,20 @@ void failAfter( Future<Void> trigger, Endpoint e );
struct WorkerInfo : NonCopyable {
Future<Void> watcher;
ReplyPromise<ProcessClass> reply;
ReplyPromise<RegisterWorkerReply> reply;
Generation gen;
int reboots;
WorkerInterface interf;
ProcessClass initialClass;
ProcessClass processClass;
bool isExcluded;
WorkerInfo() : gen(-1), reboots(0) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<ProcessClass> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, bool isExcluded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded) {}
WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass) {}
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), isExcluded(r.isExcluded) {}
void operator=( WorkerInfo&& r ) noexcept(true) {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
@ -64,6 +65,7 @@ struct WorkerInfo : NonCopyable {
interf = std::move(r.interf);
initialClass = r.initialClass;
processClass = r.processClass;
isExcluded = r.isExcluded;
}
};
@ -79,6 +81,7 @@ public:
Promise<Void> forceMasterFailure;
int64_t masterRegistrationCount;
DatabaseConfiguration config; // Asynchronously updated via master registration
DatabaseConfiguration fullyRecoveredConfig;
Database db;
DBInfo() : masterRegistrationCount(0),
@ -259,7 +262,7 @@ public:
return results;
}
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDatacenters( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogs( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
{
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
@ -278,22 +281,6 @@ public:
fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass));
}
else {
if (it.second.interf.locality.dataHallId().present())
TraceEvent(SevWarn,"GWFTADNotAvailable", functionId)
.detail("Fitness", fitness)
.detailext("Zone", it.second.interf.locality.zoneId())
.detailext("DataHall", it.second.interf.locality.dataHallId())
.detail("Address", it.second.interf.address())
.detail("workerAvailable", workerAvailable(it.second, checkStable))
.detail("isExcludedServer", conf.isExcludedServer(it.second.interf.address()))
.detail("checkStable", checkStable)
.detail("reboots", it.second.reboots)
.detail("isAvailable", IFailureMonitor::failureMonitor().getState(it.second.interf.storage.getEndpoint()).isAvailable())
.detail("Locality", it.second.interf.locality.toString())
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
.detail("DesiredLogs", conf.getDesiredLogs())
.detail("InterfaceId", id);
unavailableLocals.push_back(it.second.interf.locality);
}
}
@ -581,41 +568,29 @@ public:
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
};
struct AcrossDatacenterFitness {
struct TLogFitness {
ProcessClass::Fitness tlogFit;
int tlogCount;
AcrossDatacenterFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
TLogFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
AcrossDatacenterFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
TLogFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
AcrossDatacenterFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
std::set<Optional<Standalone<StringRef>>> dcs;
TLogFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
tlogFit = ProcessClass::BestFit;
for(auto it : tlogs) {
dcs.insert(it.first.locality.dcId());
tlogFit = std::max(tlogFit, it.second.machineClassFitness( ProcessClass::TLog ));
}
tlogCount = tlogs.size();
}
AcrossDatacenterFitness( vector<OptionalInterface<TLogInterface>> tlogs, std::vector<ProcessClass> processClasses ) {
std::set<Optional<Standalone<StringRef>>> dcs;
tlogFit = ProcessClass::BestFit;
for(int i = 0; i < tlogs.size(); i++) {
ASSERT(tlogs[i].present());
dcs.insert(tlogs[i].interf().locality.dcId());
tlogFit = std::max(tlogFit, processClasses[i].machineClassFitness( ProcessClass::TLog ));
}
tlogCount = tlogs.size();
}
bool operator < (AcrossDatacenterFitness const& r) const {
if(tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
bool operator < (TLogFitness const& r) const {
if (tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
return tlogCount > r.tlogCount;
}
bool operator == (AcrossDatacenterFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
bool operator == (TLogFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
};
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
@ -638,7 +613,7 @@ public:
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used );
auto tlogs = getWorkersForTlogs( req.configuration, id_used );
for(int i = 0; i < tlogs.size(); i++)
result.tLogs.push_back(tlogs[i].first);
@ -686,7 +661,7 @@ public:
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
( TLogFitness(tlogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
@ -701,15 +676,46 @@ public:
return false;
}
std::map< Optional<Standalone<StringRef>>, int> id_used;
// Get master process
auto masterWorker = id_worker.find(dbi.master.locality.processId());
if(masterWorker == id_worker.end())
if(masterWorker == id_worker.end()) {
return false;
}
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
// Get tlog processes
std::vector<std::pair<WorkerInterface, ProcessClass>> tlogs;
for( auto& it : dbi.logSystemConfig.tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
if ( tlogWorker->second.isExcluded )
return true;
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
// Get proxy classes
std::vector<ProcessClass> proxyClasses;
for(auto& it : dbi.client.proxies ) {
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
if ( proxyWorker->second.isExcluded )
return true;
proxyClasses.push_back(proxyWorker->second.processClass);
}
// Get resolver classes
std::vector<ProcessClass> resolverClasses;
for(auto& it : dbi.resolvers ) {
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
if ( resolverWorker->second.isExcluded )
return true;
resolverClasses.push_back(resolverWorker->second.processClass);
}
// Check master fitness. Don't return false if master is excluded in case all the processes are excluded, we still need master for recovery.
ProcessClass::Fitness oldMasterFit = masterWorker->second.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(dbi.master.address())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
@ -721,42 +727,26 @@ public:
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}
if(oldMasterFit < newMasterFit) return false;
if ( oldMasterFit < newMasterFit )
return false;
if ( oldMasterFit > newMasterFit && oldMasterFit == ProcessClass::ExcludeFit )
return true;
std::vector<ProcessClass> tlogProcessClasses;
for(auto& it : dbi.logSystemConfig.tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
tlogProcessClasses.push_back(tlogWorker->second.processClass);
}
AcrossDatacenterFitness oldAcrossFit(dbi.logSystemConfig.tLogs, tlogProcessClasses);
AcrossDatacenterFitness newAcrossFit(getWorkersForTlogsAcrossDatacenters(db.config, id_used, true));
// Check tLog fitness
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
if(oldAcrossFit < newAcrossFit) return false;
TLogFitness oldTLogFit(tlogs);
TLogFitness newTLotFit(getWorkersForTlogs(db.config, id_used, true));
if(oldTLogFit < newTLotFit) return false;
std::vector<ProcessClass> proxyClasses;
for(auto& it : dbi.client.proxies ) {
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
proxyClasses.push_back(proxyWorker->second.processClass);
}
std::vector<ProcessClass> resolverClasses;
for(auto& it : dbi.resolvers ) {
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
resolverClasses.push_back(resolverWorker->second.processClass);
}
// Check proxy/resolver fitness
InDatacenterFitness oldInFit(dbi.client.proxies, dbi.resolvers, proxyClasses, resolverClasses);
auto datacenters = getDatacenters( db.config, true );
InDatacenterFitness newInFit;
for(auto dcId : datacenters) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config, used );
@ -774,16 +764,17 @@ public:
if(oldInFit.betterInDatacenterFitness(newInFit)) return false;
if(oldMasterFit > newMasterFit || oldAcrossFit > newAcrossFit || oldInFit > newInFit) {
if(oldMasterFit > newMasterFit || oldTLogFit > newTLotFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldAcrossFitC", oldAcrossFit.tlogCount).detail("newAcrossFitC", newAcrossFit.tlogCount)
.detail("oldAcrossFitT", oldAcrossFit.tlogFit).detail("newAcrossFitT", newAcrossFit.tlogFit)
.detail("oldTLogFitC", oldTLogFit.tlogCount).detail("newTLotFitC", newTLotFit.tlogCount)
.detail("oldTLogFitT", oldTLogFit.tlogFit).detail("newTLotFitT", newTLotFit.tlogFit)
.detail("oldInFitP", oldInFit.proxyFit).detail("newInFitP", newInFit.proxyFit)
.detail("oldInFitR", oldInFit.resolverFit).detail("newInFitR", newInFit.resolverFit)
.detail("oldInFitPC", oldInFit.proxyCount).detail("newInFitPC", newInFit.proxyCount)
.detail("oldInFitRC", oldInFit.resolverCount).detail("newInFitRC", newInFit.resolverCount);
return true;
}
return false;
}
@ -791,6 +782,7 @@ public:
std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database
Standalone<RangeResultRef> lastProcessClasses;
bool gotProcessClasses;
bool gotFullyRecoveredConfig;
Optional<Standalone<StringRef>> masterProcessId;
Optional<Standalone<StringRef>> clusterControllerProcessId;
UID id;
@ -805,7 +797,7 @@ public:
double startTime;
explicit ClusterControllerData( ClusterControllerFullInterface ccInterface )
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), startTime(now())
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now())
{
auto serverInfo = db.serverInfo->get();
serverInfo.id = g_random->randomUniqueID();
@ -941,7 +933,6 @@ ACTOR Future<Void> clusterGetServerInfo(
removeIssue( db->workersWithIssues, reply.getEndpoint().address, issues, issueID );
TraceEvent("SendingServerInfo").detail("Dest", reply.getEndpoint().address );
reply.send( db->serverInfo->get() );
return Void();
}
@ -1084,7 +1075,7 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
when( Void _ = wait( failed ) ) { // remove workers that have failed
WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ];
if (!failedWorkerInfo.reply.isSet()) {
failedWorkerInfo.reply.send( failedWorkerInfo.processClass );
failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.isExcluded) );
}
cluster->id_worker.erase( worker.locality.processId() );
cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
@ -1295,7 +1286,20 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
}
db->masterRegistrationCount = req.registrationCount;
if(req.configuration.present()) db->config = req.configuration.get();
if ( req.configuration.present() ) {
db->config = req.configuration.get();
if ( req.recoveryState >= RecoveryState::FULLY_RECOVERED ) {
self->gotFullyRecoveredConfig = true;
db->fullyRecoveredConfig = req.configuration.get();
for ( auto& it : self->id_worker ) {
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(it.second.interf.address());
if ( it.second.isExcluded != isExcludedFromConfig && !it.second.reply.isSet() ) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, isExcludedFromConfig) );
}
}
}
}
bool isChanged = false;
auto dbInfo = self->db.serverInfo->get();
@ -1348,6 +1352,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
WorkerInterface w = req.wi;
ProcessClass newProcessClass = req.processClass;
bool newIsExcluded = req.isExcluded;
auto info = self->id_worker.find( w.locality.processId() );
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace();
@ -1356,39 +1361,43 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
self->clusterControllerProcessId = w.locality.processId();
}
// Check process class if needed
if (self->gotProcessClasses && (info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen)) {
auto classIter = self->id_class.find(w.locality.processId());
if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) {
newProcessClass = classIter->second;
} else {
newProcessClass = req.initialClass;
// Check process class and exclusive property
if ( info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if ( self->gotProcessClasses ) {
auto classIter = self->id_class.find(w.locality.processId());
if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) {
newProcessClass = classIter->second;
} else {
newProcessClass = req.initialClass;
}
}
// Notify the worker to register again with new process class
if (newProcessClass != req.processClass && !req.reply.isSet()) {
req.reply.send( newProcessClass );
if ( self->gotFullyRecoveredConfig ) {
newIsExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.address());
}
// Notify the worker to register again with new process class/exclusive property
if ( !req.reply.isSet() && ( newProcessClass != req.processClass || newIsExcluded != req.isExcluded ) ) {
req.reply.send( RegisterWorkerReply(newProcessClass, newIsExcluded) );
}
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass );
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, req.isExcluded );
checkOutstandingRequests( self );
return;
}
if( info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if (info->second.processClass != newProcessClass) {
info->second.processClass = newProcessClass;
}
info->second.initialClass = req.initialClass;
if (!info->second.reply.isSet()) {
info->second.reply.send( Never() );
}
info->second.reply = req.reply;
info->second.processClass = newProcessClass;
info->second.isExcluded = req.isExcluded;
info->second.initialClass = req.initialClass;
info->second.gen = req.generation;
if(info->second.interf.id() != w.id()) {
@ -1592,7 +1601,7 @@ ACTOR Future<Void> monitorProcessClasses(ClusterControllerData *self) {
if (newProcessClass != w.second.processClass) {
w.second.processClass = newProcessClass;
if (!w.second.reply.isSet()) {
w.second.reply.send( newProcessClass );
w.second.reply.send( RegisterWorkerReply(newProcessClass, w.second.isExcluded) );
}
}
}
@ -1741,14 +1750,14 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
}
}
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) {
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
loop {
state ClusterControllerFullInterface cci;
state bool inRole = false;
cci.initEndpoints();
try {
//Register as a possible leader; wait to be elected
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass );
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded );
while (!currentCC->get().present() || currentCC->get().get() != cci) {
choose {
@ -1772,12 +1781,12 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
}
}
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass) {
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
state bool hasConnected = false;
loop {
try {
ServerCoordinators coordinators( connFile );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass ) );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded ) );
} catch( Error &e ) {
if( e.code() != error_code_coordinators_changed )
throw; // Expected to terminate fdbserver

View File

@ -112,20 +112,34 @@ struct RecruitStorageRequest {
}
};
struct RegisterWorkerReply {
ProcessClass processClass;
bool isExcluded;
RegisterWorkerReply() {}
RegisterWorkerReply(ProcessClass processClass, bool isExcluded) : processClass(processClass), isExcluded(isExcluded) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & processClass & isExcluded;
}
};
struct RegisterWorkerRequest {
WorkerInterface wi;
ProcessClass processClass;
ProcessClass initialClass;
bool isExcluded;
Generation generation;
ReplyPromise<ProcessClass> reply;
ReplyPromise<RegisterWorkerReply> reply;
RegisterWorkerRequest() {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), generation(generation) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, bool isExcluded, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded), generation(generation) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & wi & initialClass & processClass & generation & reply;
ar & wi & initialClass & processClass & isExcluded & generation & reply;
}
};

View File

@ -250,7 +250,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
} else {
Optional<LeaderInfo> nextNominee;
if (availableLeaders.size() && availableCandidates.size()) {
nextNominee = (*availableLeaders.begin()).leaderChangeRequired(*availableCandidates.begin()) ? *availableCandidates.begin() : *availableLeaders.begin();
nextNominee = ( *availableLeaders.begin() < *availableCandidates.begin() ) ? *availableLeaders.begin() : *availableCandidates.begin();
} else if (availableLeaders.size()) {
nextNominee = *availableLeaders.begin();
} else if (availableCandidates.size()) {
@ -259,13 +259,16 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
nextNominee = Optional<LeaderInfo>();
}
if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && !currentNominee.get().equalInternalId(nextNominee.get())) || !availableLeaders.size() ) {
if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && currentNominee.get().leaderChangeRequired(nextNominee.get())) || !availableLeaders.size() ) {
TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
.detail("Changed", nextNominee != currentNominee).detail("Key", printable(key));
for(int i=0; i<notify.size(); i++)
notify[i].send( nextNominee );
notify.clear();
currentNominee = nextNominee;
} else if (currentNominee.present() && nextNominee.present() && currentNominee.get().equalInternalId(nextNominee.get())) {
// leader becomes better
currentNominee = nextNominee;
}
if( availableLeaders.size() ) {

View File

@ -293,7 +293,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
bool slowRateKeeper = randomize && BUGGIFY;
init( SMOOTHING_AMOUNT, 1.0 ); if( slowRateKeeper ) SMOOTHING_AMOUNT = 5.0;
init( SLOW_SMOOTHING_AMOUNT, 10.0 ); if( slowRateKeeper ) SLOW_SMOOTHING_AMOUNT = 50.0;
init( RATEKEEPER_LOGGING_INTERVAL, 5.0 );
init( METRIC_UPDATE_RATE, .1 ); if( slowRateKeeper ) METRIC_UPDATE_RATE = 0.5;
bool smallStorageTarget = randomize && BUGGIFY;

View File

@ -235,7 +235,6 @@ public:
//Ratekeeper
double SMOOTHING_AMOUNT;
double SLOW_SMOOTHING_AMOUNT;
double RATEKEEPER_LOGGING_INTERVAL;
double METRIC_UPDATE_RATE;
double LAST_LIMITED_RATIO;

View File

@ -75,7 +75,7 @@ ACTOR Future<Void> changeLeaderCoordinators( ServerCoordinators coordinators, Va
return Void();
}
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) {
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
state Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees( new AsyncVar<vector<Optional<LeaderInfo>>>() );
state LeaderInfo myInfo;
state Future<Void> candidacies;
@ -94,7 +94,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
myInfo.changeID = g_random->randomUniqueID();
prevChangeID = myInfo.changeID;
myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController));
myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
vector<Future<Void>> cand;
for(int i=0; i<coordinators.leaderElectionServers.size(); i++)
@ -153,7 +153,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
break;
}
when (Void _ = wait(candidacies)) { ASSERT(false); }
when (Void _ = wait( asyncProcessClass->onChange() )) {
when (Void _ = wait( asyncProcessClass->onChange() || asyncIsExcluded->onChange() )) {
break;
}
}
@ -166,7 +166,8 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
loop {
prevChangeID = myInfo.changeID;
if (myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController))) {
myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
if (myInfo.changeID != prevChangeID) {
TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID);
}

View File

@ -32,7 +32,8 @@ Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass);
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded);
// Participates in the given coordination group's leader election process, nominating the given
// LeaderInterface (presumed to be a local interface) as leader. The leader election process is
@ -48,17 +49,18 @@ Future<Void> changeLeaderCoordinators( ServerCoordinators const& coordinators, V
#pragma region Implementation
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
template <class LeaderInterface>
Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass)
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded)
{
Reference<AsyncVar<Value>> serializedInfo( new AsyncVar<Value> );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass, asyncIsExcluded );
return m || asyncDeserialize( serializedInfo, outKnownLeader );
}

View File

@ -21,6 +21,7 @@
#include "LogSystem.h"
#include "fdbrpc/FailureMonitor.h"
#include "Knobs.h"
#include "fdbrpc/ReplicationUtils.h"
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) {
@ -243,12 +244,14 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<O
serverCursors.push_back( cursor );
}
sortedVersions.resize(serverCursors.size());
filterLocalityDataForPolicy(this->tLogPolicy, &this->tLogLocalities);
}
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
: serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities), tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor) {
sortedVersions.resize(serverCursors.size());
calcHasMessage();
filterLocalityDataForPolicy(this->tLogPolicy, &this->tLogLocalities);
}
Reference<ILogSystem::IPeekCursor> ILogSystem::MergedPeekCursor::cloneNoMore() {

View File

@ -88,27 +88,38 @@ ACTOR Future<int64_t> getDataInFlight( Database cx, Reference<AsyncVar<ServerDBI
return dataInFlight;
}
//Computes the queue size for storage servers and tlogs using the BytesInput and BytesDurable attributes
//For now, we must ignore invalid keys on storage servers because of a bug that can cause them to be orphaned
//Computes the queue size for storage servers and tlogs using the bytesInput and bytesDurable attributes
int64_t getQueueSize( Standalone<StringRef> md ) {
int64_t bytesInput, bytesDurable;
double inputRate, durableRate;
double inputRoughness, durableRoughness;
int64_t inputBytes, durableBytes;
sscanf(extractAttribute(md.toString(), "BytesInput").c_str(), "%lld", &bytesInput);
sscanf(extractAttribute(md.toString(), "BytesDurable").c_str(), "%lld", &bytesDurable);
sscanf(extractAttribute(md.toString(), "bytesInput").c_str(), "%f %f %lld", &inputRate, &inputRoughness, &inputBytes);
sscanf(extractAttribute(md.toString(), "bytesDurable").c_str(), "%f %f %lld", &durableRate, &durableRoughness, &durableBytes);
return bytesInput - bytesDurable;
return inputBytes - durableBytes;
}
// This is not robust in the face of a TLog failure
ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName))
.detail("Stage", "ContactingMaster");
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingLogs");
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(getWorkers(dbInfo));
std::map<NetworkAddress, WorkerInterface> workersMap;
for(auto worker : workers) {
workersMap[worker.first.address()] = worker.first;
}
state std::vector<Future<Standalone<StringRef>>> messages;
state std::vector<TLogInterface> tlogs = dbInfo->get().logSystemConfig.allPresentLogs();
for(int i = 0; i < tlogs.size(); i++) {
messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( "TLogQueueSize/" + tlogs[i].id().toString() ) ) ), 1.0 ) );
auto itr = workersMap.find(tlogs[i].address());
if(itr == workersMap.end()) {
TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for log server").detail("Tlog", tlogs[i].id());
throw attribute_not_found();
}
messages.push_back( timeoutError(itr->second.eventLogRequest.getReply(
EventLogRequest( StringRef(tlogs[i].id().toString() + "/TLogMetrics") ) ), 1.0 ) );
}
Void _ = wait( waitForAll( messages ) );
@ -121,7 +132,7 @@ ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<Serve
try {
maxQueueSize = std::max( maxQueueSize, getQueueSize( messages[i].get() ) );
} catch( Error &e ) {
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract MaxTLogQueue").detail("Tlog", tlogs[i].id());
TraceEvent("QuietDatabaseFailure").detail("Reason", "Failed to extract MaxTLogQueue").detail("Tlog", tlogs[i].id());
throw;
}
}
@ -158,13 +169,28 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, boo
//Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingStorageServers");
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
state Future<std::vector<std::pair<WorkerInterface, ProcessClass>>> workersFuture = getWorkers(dbInfo);
state std::vector<StorageServerInterface> servers = wait(serversFuture);
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(workersFuture);
std::map<NetworkAddress, WorkerInterface> workersMap;
for(auto worker : workers) {
workersMap[worker.first.address()] = worker.first;
}
state vector<StorageServerInterface> servers = wait( getStorageServers( cx ) );
state std::vector<Future<Standalone<StringRef>>> messages;
for(int i = 0; i < servers.size(); i++) {
messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( "StorageServerQueueSize/" + servers[i].id().toString() ) ) ), 1.0 ) );
auto itr = workersMap.find(servers[i].address());
if(itr == workersMap.end()) {
TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for storage server").detail("SS", servers[i].id());
throw attribute_not_found();
}
messages.push_back( timeoutError(itr->second.eventLogRequest.getReply(
EventLogRequest( StringRef(servers[i].id().toString() + "/StorageMetrics") ) ), 1.0 ) );
}
Void _ = wait( waitForAll(messages) );

View File

@ -137,7 +137,6 @@ struct Ratekeeper {
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerInterface ssi ) {
state double debug_lastTraceTime = 0;
self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) );
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
TraceEvent("RkTracking", ssi.id());
@ -163,25 +162,7 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available );
myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total );
}
if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){
TraceEvent("RkServerQueueInfo", ssi.id())
.detail("LocalTime", reply.get().localTime)
.detail("BytesDurable", reply.get().bytesDurable)
.detail("BytesInput", reply.get().bytesInput)
.detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal())
.detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal())
.detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate())
.detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate())
.detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal())
.detail("Version", reply.get().v)
.trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str());
debug_lastTraceTime = now();
}
} else {
//If the SS didn't respond, clear the queue info so that we know it might have failed
if(myQueueInfo->value.valid)
TraceEvent("RkServerQueueInfo", ssi.id()).trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str());
myQueueInfo->value.valid = false;
}
@ -195,7 +176,6 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
}
ACTOR Future<Void> trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) {
state double debug_lastTraceTime = 0;
self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) );
state Map<UID, TLogQueueInfo>::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id());
TraceEvent("RkTracking", tli.id());
@ -220,20 +200,7 @@ ACTOR Future<Void> trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) {
myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total);
}
if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){
TraceEvent("RkTLogQueueInfo", tli.id()).detail("LocalTime", reply.get().localTime).detail("BytesDurable", reply.get().bytesDurable).detail("BytesInput", reply.get().bytesInput)
.detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal()).detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal())
.detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate()).detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate())
.detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal())
.detail("Version", reply.get().v)
.trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str());
debug_lastTraceTime = now();
}
} else {
//If the TLog didn't respond, clear the queue info so that we know it might have failed
if(myQueueInfo->value.valid)
TraceEvent("RkTLogQueueInfo", tli.id()).trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str());
myQueueInfo->value.valid = false;
}

View File

@ -141,6 +141,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogLocalities = lsConf.tLogLocalities;
logSystem->logSystemType = lsConf.logSystemType;
logSystem->UpdateLocalitySet(lsConf.tLogs);
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
return logSystem;
}
@ -159,6 +160,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogReplicationFactor = lsConf.oldTLogs[0].tLogReplicationFactor;
logSystem->tLogPolicy = lsConf.oldTLogs[0].tLogPolicy;
logSystem->tLogLocalities = lsConf.oldTLogs[0].tLogLocalities;
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1);
for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) {
@ -346,21 +348,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Void _ = wait( quorum( alive, std::min(self->tLogReplicationFactor, numPresent - self->tLogWriteAntiQuorum) ) );
state Reference<LocalityGroup> locked(new LocalityGroup());
state std::vector<bool> responded(alive.size());
for (int i = 0; i < alive.size(); i++) {
responded[i] = false;
}
loop {
LocalityGroup locked;
std::vector<LocalityData> unlocked, unused;
for (int i = 0; i < alive.size(); i++) {
if (alive[i].isReady() && !alive[i].isError()) {
locked.add(self->tLogLocalities[i]);
} else {
unlocked.push_back(self->tLogLocalities[i]);
if (!responded[i] && alive[i].isReady() && !alive[i].isError()) {
locked->add(self->tLogLocalities[i]);
responded[i] = true;
}
}
bool quorum_obtained = locked.validate(self->tLogPolicy);
if (!quorum_obtained && self->tLogWriteAntiQuorum != 0) {
quorum_obtained = !validateAllCombinations(unused, locked, self->tLogPolicy, unlocked, self->tLogWriteAntiQuorum, false);
}
if (self->tLogReplicationFactor - self->tLogWriteAntiQuorum == 1 && locked.size() > 0) {
bool quorum_obtained = locked->validate(self->tLogPolicy);
// We intentionally skip considering antiquorums, as the CPU cost of doing so is prohibitive.
if (self->tLogReplicationFactor == 1 && locked->size() > 0) {
ASSERT(quorum_obtained);
}
if (quorum_obtained) {
@ -581,6 +583,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogPolicy = prevState.tLogPolicy;
logSystem->tLogLocalities = prevState.tLogLocalities;
logSystem->logSystemType = prevState.logSystemType;
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
logSystem->epochEndVersion = 0;
logSystem->knownCommittedVersion = 0;
@ -869,6 +872,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );
logSystem->tLogLocalities[i] = workers[i].locality;
}
filterLocalityDataForPolicy(logSystem->tLogPolicy, &logSystem->tLogLocalities);
//Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout
if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed();

View File

@ -256,8 +256,8 @@ class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, int
Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b );
Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, Reference<AsyncVar<bool>> const& asyncIsExcluded, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
// These servers are started by workerServer
Future<Void> storageServer(

View File

@ -954,9 +954,41 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
}
}
ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
state Database cx = openDBOnServer(self->dbInfo, TaskDefaultEndpoint, true, true);
loop {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Future<Standalone<RangeResultRef>> fresults = tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY );
Void _ = wait( success(fresults) );
Standalone<RangeResultRef> results = fresults.get();
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
DatabaseConfiguration conf;
conf.fromKeyValues((VectorRef<KeyValueRef>) results);
if(conf != self->configuration) {
self->configuration = conf;
self->registrationTrigger.trigger();
}
state Future<Void> watchFuture = tr.watch(excludedServersVersionKey);
Void _ = wait(tr.commit());
Void _ = wait(watchFuture);
break;
} catch (Error& e) {
Void _ = wait( tr.onError(e) );
}
}
}
}
ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<Void>> addActor )
{
state TraceInterval recoveryInterval("MasterRecovery");
state double recoverStartTime = now();
addActor.send( waitFailureServer(self->myInterface.waitFailure.getFuture()) );
@ -1128,10 +1160,17 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
TraceEvent(recoveryInterval.end(), self->dbgid).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);
self->recoveryState = RecoveryState::FULLY_RECOVERED;
double recoveryDuration = now() - recoverStartTime;
TraceEvent(recoveryDuration > 4 ? SevWarnAlways : SevInfo, "MasterRecoveryDuration", self->dbgid)
.detail("recoveryDuration", recoveryDuration)
.trackLatest("MasterRecoveryDuration");
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.detail("storeType", self->configuration.storageServerStoreType)
.detail("recoveryDuration", recoveryDuration)
.trackLatest("MasterRecoveryState");
// Now that recovery is complete, we register ourselves with the cluster controller, so that the client and server information
@ -1149,6 +1188,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
if( self->resolvers.size() > 1 )
addActor.send( resolutionBalancing(self) );
state Future<Void> configMonitor = configurationMonitor( self );
addActor.send( changeCoordinators(self, skipTransition) );
addActor.send( trackTlogRecovery(self, oldLogSystems, skipTransition) );
@ -1156,7 +1196,8 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
when( Void _ = wait( tlogFailure ) ) { throw internal_error(); }
when( Void _ = wait( proxyFailure ) ) { throw internal_error(); }
when( Void _ = wait( resolverFailure ) ) { throw internal_error(); }
when (Void _ = wait(providingVersions)) { throw internal_error(); }
when( Void _ = wait( providingVersions ) ) { throw internal_error(); }
when( Void _ = wait( configMonitor ) ) { throw internal_error(); }
}
}

View File

@ -250,16 +250,17 @@ std::vector< DiskStore > getDiskStores( std::string folder ) {
return result;
}
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass ) {
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
state Generation requestGeneration = 0;
loop {
Future<ProcessClass> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), requestGeneration++) ) ) : Never();
Future<RegisterWorkerReply> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), asyncIsExcluded->get(), requestGeneration++) ) ) : Never();
choose {
when ( ProcessClass newProcessClass = wait( registrationReply )) {
asyncProcessClass->set(newProcessClass);
when ( RegisterWorkerReply reply = wait( registrationReply )) {
asyncProcessClass->set( reply.processClass );
asyncIsExcluded->set( reply.isExcluded );
}
when ( Void _ = wait( ccInterface->onChange() )) { }
}
@ -479,7 +480,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities,
Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
state PromiseStream< ErrorInfo > errors;
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
state ActorCollection errorForwarders(false);
@ -642,7 +643,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
startRole( interf.id(), interf.id(), "Worker", details );
Void _ = wait(waitForAll(recoveries));
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass ) );
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass, asyncIsExcluded ) );
TraceEvent("RecoveriesComplete", interf.id());
@ -957,13 +958,14 @@ ACTOR Future<Void> fdbd(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> );
Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> );
Reference<AsyncVar<ProcessClass>> asyncProcessClass(new AsyncVar<ProcessClass>(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource)));
Reference<AsyncVar<bool>> asyncIsExcluded(new AsyncVar<bool>(false));
vector<Future<Void>> v;
if ( coordFolder.size() )
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass), "clusterController") );
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass, asyncIsExcluded), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, asyncIsExcluded, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
Void _ = wait( quorum(v,1) );