Merge branch 'release-6.2'

# Conflicts:
#	CMakeLists.txt
#	documentation/sphinx/source/release-notes.rst
#	fdbclient/MutationList.h
#	fdbserver/MasterProxyServer.actor.cpp
#	versions.target
This commit is contained in:
Evan Tschannen 2019-11-14 15:49:56 -08:00
commit 8d3ef89540
23 changed files with 195 additions and 55 deletions

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.8.pkg <https://www.foundationdb.org/downloads/6.2.8/macOS/installers/FoundationDB-6.2.8.pkg>`_
* `FoundationDB-6.2.10.pkg <https://www.foundationdb.org/downloads/6.2.10/macOS/installers/FoundationDB-6.2.10.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.8/ubuntu/installers/foundationdb-clients_6.2.8-1_amd64.deb>`_
* `foundationdb-server-6.2.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.8/ubuntu/installers/foundationdb-server_6.2.8-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.10-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.10/ubuntu/installers/foundationdb-clients_6.2.10-1_amd64.deb>`_
* `foundationdb-server-6.2.10-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.10/ubuntu/installers/foundationdb-server_6.2.10-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel6/installers/foundationdb-clients-6.2.8-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel6/installers/foundationdb-server-6.2.8-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.10-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.10/rhel6/installers/foundationdb-clients-6.2.10-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.10-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.10/rhel6/installers/foundationdb-server-6.2.10-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel7/installers/foundationdb-clients-6.2.8-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel7/installers/foundationdb-server-6.2.8-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.10-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.10/rhel7/installers/foundationdb-clients-6.2.10-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.10-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.10/rhel7/installers/foundationdb-server-6.2.10-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.8-x64.msi <https://www.foundationdb.org/downloads/6.2.8/windows/installers/foundationdb-6.2.8-x64.msi>`_
* `foundationdb-6.2.10-x64.msi <https://www.foundationdb.org/downloads/6.2.10/windows/installers/foundationdb-6.2.10-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.8.tar.gz <https://www.foundationdb.org/downloads/6.2.8/bindings/python/foundationdb-6.2.8.tar.gz>`_
* `foundationdb-6.2.10.tar.gz <https://www.foundationdb.org/downloads/6.2.10/bindings/python/foundationdb-6.2.10.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.8.gem <https://www.foundationdb.org/downloads/6.2.8/bindings/ruby/fdb-6.2.8.gem>`_
* `fdb-6.2.10.gem <https://www.foundationdb.org/downloads/6.2.10/bindings/ruby/fdb-6.2.10.gem>`_
Java 8+
-------
* `fdb-java-6.2.8.jar <https://www.foundationdb.org/downloads/6.2.8/bindings/java/fdb-java-6.2.8.jar>`_
* `fdb-java-6.2.8-javadoc.jar <https://www.foundationdb.org/downloads/6.2.8/bindings/java/fdb-java-6.2.8-javadoc.jar>`_
* `fdb-java-6.2.10.jar <https://www.foundationdb.org/downloads/6.2.10/bindings/java/fdb-java-6.2.10.jar>`_
* `fdb-java-6.2.10-javadoc.jar <https://www.foundationdb.org/downloads/6.2.10/bindings/java/fdb-java-6.2.10-javadoc.jar>`_
Go 1.11+
--------

View File

@ -2,6 +2,26 @@
Release Notes
#############
6.2.10
======
Fixes
-----
* ``backup_agent`` crashed on startup. `(PR #2356) <https://github.com/apple/foundationdb/pull/2356>`_.
6.2.9
=====
Fixes
-----
* Small clusters using specific sets of process classes could cause the data distributor to be continuously killed and re-recruited. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* The data distributor and ratekeeper could be recruited on non-optimal processes. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* A ``kill`` command from ``fdbcli`` could take a long time before being executed by a busy process. `(PR #2339) <https://github.com/apple/foundationdb/pull/2339>`_.
* Committing transactions larger than 1 MB could cause the proxy to stall for up to a second. `(PR #2350) <https://github.com/apple/foundationdb/pull/2350>`_.
* Transaction timeouts would use memory for the entire duration of the timeout, regardless of whether the transaction had been destroyed. `(PR #2353) <https://github.com/apple/foundationdb/pull/2353>`_.
6.2.8
=====
@ -63,7 +83,6 @@ Fixes
* Status would report incorrect fault tolerance metrics when a remote region was configured and the primary region lost a storage replica. [6.2.6] `(PR #2230) <https://github.com/apple/foundationdb/pull/2230>`_
* The cluster would not change to a new set of satellite transaction logs when they become available in a better satellite location. [6.2.6] `(PR #2241) <https://github.com/apple/foundationdb/pull/2241>`_.
* The existence of ``proxy`` or ``resolver`` class processes prevented ``stateless`` class processes from being recruited as proxies or resolvers. [6.2.6] `(PR #2241) <https://github.com/apple/foundationdb/pull/2241>`_.
* Committing transactions larger than 1 MB could cause the proxy to stall for up to a second. [6.2.6] `(PR #2250) <https://github.com/apple/foundationdb/pull/2250>`_.
* The cluster controller could become saturated in clusters with large numbers of connected clients using TLS. [6.2.6] `(PR #2252) <https://github.com/apple/foundationdb/pull/2252>`_.
* Backup and DR would not share a mutation stream if they were started on different versions of FoundationDB. Either backup or DR must be restarted to resolve this issue. [6.2.6] `(PR #2202) <https://github.com/apple/foundationdb/pull/2202>`_.
* Don't track batch priority GRV requests in latency bands. [6.2.7] `(PR #2279) <https://github.com/apple/foundationdb/pull/2279>`_.

View File

@ -39,6 +39,10 @@ struct ClientWorkerInterface {
UID id() const { return reboot.getEndpoint().token; }
NetworkAddress address() const { return reboot.getEndpoint().getPrimaryAddress(); }
void initEndpoints() {
reboot.getEndpoint( TaskPriority::ReadSocket );
}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, reboot, profiler);

View File

@ -68,6 +68,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
init( BROADCAST_BATCH_SIZE, 20 ); if( randomize && BUGGIFY ) BROADCAST_BATCH_SIZE = 1;
init( TRANSACTION_TIMEOUT_DELAY_INTERVAL, 10.0 ); if( randomize && BUGGIFY ) TRANSACTION_TIMEOUT_DELAY_INTERVAL = 1.0;
init( LOCATION_CACHE_EVICTION_SIZE, 600000 );
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
@ -97,6 +98,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MUTATION_BLOCK_SIZE, 10000 );
// TaskBucket
init( TASKBUCKET_LOGGING_DELAY, 5.0 );
init( TASKBUCKET_MAX_PRIORITY, 1 );
init( TASKBUCKET_CHECK_TIMEOUT_CHANCE, 0.02 ); if( randomize && BUGGIFY ) TASKBUCKET_CHECK_TIMEOUT_CHANCE = 1.0;
init( TASKBUCKET_TIMEOUT_JITTER_OFFSET, 0.9 );

View File

@ -66,6 +66,7 @@ public:
int MAX_BATCH_SIZE;
double GRV_BATCH_TIMEOUT;
int BROADCAST_BATCH_SIZE;
double TRANSACTION_TIMEOUT_DELAY_INTERVAL;
// When locationCache in DatabaseContext gets to be this size, items will be evicted
int LOCATION_CACHE_EVICTION_SIZE;
@ -99,6 +100,7 @@ public:
int MUTATION_BLOCK_SIZE;
// Taskbucket
double TASKBUCKET_LOGGING_DELAY;
int TASKBUCKET_MAX_PRIORITY;
double TASKBUCKET_CHECK_TIMEOUT_CHANCE;
double TASKBUCKET_TIMEOUT_JITTER_OFFSET;

View File

@ -32,13 +32,15 @@ struct MutationListRef {
// Each blob has a struct Header following by the mutation's param1 and param2 content.
// The Header has the mutation's type and the length of param1 and param2
private:
public:
struct Blob {
// StringRef data Format: |type|p1len|p2len|p1_content|p2_content|
// |type|p1len|p2len| is the header; p1_content has p1len length; p2_content has p2len length
StringRef data;
Blob* next;
};
Blob *blob_begin;
private:
struct Header {
int type, p1len, p2len;
const uint8_t* p1begin() const {
@ -148,6 +150,8 @@ public:
blob_begin->data = StringRef((const uint8_t*)ar.arenaRead(totalBytes), totalBytes); // Zero-copy read when deserializing from an ArenaReader
}
}
//FIXME: this is re-implemented on the master proxy to include a yield, any changes to this function should also done there
template <class Ar>
void serialize_save( Ar& ar ) const {
serializer(ar, totalBytes);
@ -180,7 +184,7 @@ private:
return b;
}
Blob *blob_begin, *blob_end;
Blob *blob_end;
int totalBytes;
};
typedef Standalone<MutationListRef> MutationList;

View File

@ -1133,8 +1133,8 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
}
ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
if (now() < endTime) {
wait ( delayUntil( endTime ) );
while(now() < endTime) {
wait( delayUntil( std::min(endTime + 0.0001, now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL) ) );
}
if( !resetPromise.isSet() )
resetPromise.sendError(transaction_timed_out());

View File

@ -316,6 +316,7 @@ public:
ACTOR static Future<Void> extendTimeoutRepeatedly(Database cx, Reference<TaskBucket> taskBucket, Reference<Task> task) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state double start = now();
state Version versionNow = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
taskBucket->setOptions(tr);
return map(tr->getReadVersion(), [=](Version v) {
@ -329,6 +330,13 @@ public:
// Wait until we are half way to the timeout version of this task
wait(delay(0.8 * (BUGGIFY ? (2 * deterministicRandom()->random01()) : 1.0) * (double)(task->timeoutVersion - (uint64_t)versionNow) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
if(now() - start > 300) {
TraceEvent(SevWarnAlways, "TaskBucketLongExtend")
.detail("Duration", now() - start)
.detail("TaskUID", task->key)
.detail("TaskType", task->params[Task::reservedTaskParamKeyType])
.detail("Priority", task->getPriority());
}
// Take the extendMutex lock until we either succeed or stop trying to extend due to failure
wait(task->extendMutex.take());
releaser = FlowLock::Releaser(task->extendMutex, 1);
@ -430,6 +438,7 @@ public:
loop {
// Start running tasks while slots are available and we keep finding work to do
++taskBucket->dispatchSlotChecksStarted;
while(!availableSlots.empty()) {
getTasks.clear();
for(int i = 0, imax = std::min<unsigned int>(getBatchSize, availableSlots.size()); i < imax; ++i)
@ -439,18 +448,22 @@ public:
bool done = false;
for(int i = 0; i < getTasks.size(); ++i) {
if(getTasks[i].isError()) {
++taskBucket->dispatchErrors;
done = true;
continue;
}
Reference<Task> task = getTasks[i].get();
if(task) {
// Start the task
++taskBucket->dispatchDoTasks;
int slot = availableSlots.back();
availableSlots.pop_back();
tasks[slot] = taskBucket->doTask(cx, futureBucket, task);
}
else
else {
++taskBucket->dispatchEmptyTasks;
done = true;
}
}
if(done) {
@ -460,11 +473,16 @@ public:
else
getBatchSize = std::min<unsigned int>(getBatchSize * 2, maxConcurrentTasks);
}
++taskBucket->dispatchSlotChecksComplete;
// Wait for a task to be done. Also, if we have any slots available then stop waiting after pollDelay at the latest.
Future<Void> w = ready(waitForAny(tasks));
if(!availableSlots.empty())
if(!availableSlots.empty()) {
if(*pollDelay > 600) {
TraceEvent(SevWarnAlways, "TaskBucketLongPollDelay").suppressFor(1.0).detail("Delay", *pollDelay);
}
w = w || delay(*pollDelay * (0.9 + deterministicRandom()->random01() / 5)); // Jittered by 20 %, so +/- 10%
}
wait(w);
// Check all of the task slots, any that are finished should be replaced with Never() and their slots added back to availableSlots
@ -497,7 +515,7 @@ public:
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
state Reference<AsyncVar<bool>> paused = Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) );
state Future<Void> watchPausedFuture = watchPaused(cx, taskBucket, paused);
taskBucket->metricLogger = traceCounters("TaskBucketMetrics", taskBucket->dbgid, CLIENT_KNOBS->TASKBUCKET_LOGGING_DELAY, &taskBucket->cc);
loop {
while(paused->get()) {
wait(paused->onChange() || watchPausedFuture);
@ -783,6 +801,13 @@ TaskBucket::TaskBucket(const Subspace& subspace, bool sysAccess, bool priorityBa
, system_access(sysAccess)
, priority_batch(priorityBatch)
, lock_aware(lockAware)
, cc("TaskBucket")
, dbgid( deterministicRandom()->randomUniqueID() )
, dispatchSlotChecksStarted("DispatchSlotChecksStarted", cc)
, dispatchErrors("DispatchErrors", cc)
, dispatchDoTasks("DispatchDoTasks", cc)
, dispatchEmptyTasks("DispatchEmptyTasks", cc)
, dispatchSlotChecksComplete("DispatchSlotChecksComplete", cc)
{
}

View File

@ -228,12 +228,23 @@ public:
Database src;
Map<Key, Future<Reference<KeyRangeMap<Version>>>> key_version;
CounterCollection cc;
Counter dispatchSlotChecksStarted;
Counter dispatchErrors;
Counter dispatchDoTasks;
Counter dispatchEmptyTasks;
Counter dispatchSlotChecksComplete;
UID dbgid;
double getTimeoutSeconds() const {
return (double)timeout / CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
}
private:
friend class TaskBucketImpl;
Future<Void> metricLogger;
Subspace prefix;
Subspace active;
Key pauseKey;

View File

@ -497,7 +497,10 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
.detail("PeerAddr", self->destination);
}
if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) {
if(self->destination.isPublic()
&& IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()
&& !FlowTransport::transport().isClient())
{
auto& it = self->transport->closedPeers[self->destination];
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
it.first = now();
@ -686,7 +689,7 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
}
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
TraceEvent(SevError, "PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
throw platform_error();
}
@ -740,7 +743,7 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
++transport->countPacketsReceived;
if (packetLen > FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(transport->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket")
TraceEvent(transport->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketReceived")
.suppressFor(1.0)
.detail("FromPeer", peerAddress.toString())
.detail("Length", (int)packetLen)
@ -767,7 +770,7 @@ static int getNewBufferSize(const uint8_t* begin, const uint8_t* end, const Netw
}
const uint32_t packetLen = *(uint32_t*)begin;
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
TraceEvent(SevError, "PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
throw platform_error();
}
return std::max<uint32_t>(FLOW_KNOBS->MIN_PACKET_BUFFER_BYTES,
@ -1216,11 +1219,11 @@ static ReliablePacket* sendPacket( TransportData* self, Reference<Peer> peer, IS
}
if (len > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len);
TraceEvent(SevError, "PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len);
// throw platform_error(); // FIXME: How to recover from this situation?
}
else if (len > FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket")
TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketSent")
.suppressFor(1.0)
.detail("ToPeer", destination.getPrimaryAddress())
.detail("Length", (int)len)

View File

@ -1150,17 +1150,24 @@ public:
return false;
}
bool isProxyOrResolver(Optional<Key> processId) {
bool isUsedNotMaster(Optional<Key> processId) {
ASSERT(masterProcessId.present());
if (processId == masterProcessId) return false;
auto& dbInfo = db.serverInfo->get().read();
for (const auto& tlogset : dbInfo.logSystemConfig.tLogs) {
for (const auto& tlog: tlogset.tLogs) {
if (tlog.present() && tlog.interf().locality.processId() == processId) return true;
}
}
for (const MasterProxyInterface& interf : dbInfo.client.proxies) {
if (interf.locality.processId() == processId) return true;
}
for (const ResolverInterface& interf: dbInfo.resolvers) {
if (interf.locality.processId() == processId) return true;
}
if (processId == clusterControllerProcessId) return true;
return false;
}
@ -1170,7 +1177,7 @@ public:
if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper) || pid == masterProcessId.get()) {
return false;
}
return isProxyOrResolver(pid);
return isUsedNotMaster(pid);
}
std::map< Optional<Standalone<StringRef>>, int> getUsedIds() {
@ -1476,35 +1483,74 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
return;
}
auto& db = self->db.serverInfo->get().read();
auto bestFitnessForRK = self->getBestFitnessForRoleInDatacenter(ProcessClass::Ratekeeper);
auto bestFitnessForDD = self->getBestFitnessForRoleInDatacenter(ProcessClass::DataDistributor);
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
WorkerDetails newRKWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::Ratekeeper, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
if (self->onMasterIsBetter(newRKWorker, ProcessClass::Ratekeeper)) {
newRKWorker = self->id_worker[self->masterProcessId.get()].details;
}
id_used = self->getUsedIds();
for(auto& it : id_used) {
it.second *= 2;
}
id_used[newRKWorker.interf.locality.processId()]++;
WorkerDetails newDDWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
if (self->onMasterIsBetter(newDDWorker, ProcessClass::DataDistributor)) {
newDDWorker = self->id_worker[self->masterProcessId.get()].details;
}
auto bestFitnessForRK = newRKWorker.processClass.machineClassFitness(ProcessClass::Ratekeeper);
if(self->db.config.isExcludedServer(newRKWorker.interf.address())) {
bestFitnessForRK = std::max(bestFitnessForRK, ProcessClass::ExcludeFit);
}
auto bestFitnessForDD = newDDWorker.processClass.machineClassFitness(ProcessClass::DataDistributor);
if(self->db.config.isExcludedServer(newDDWorker.interf.address())) {
bestFitnessForDD = std::max(bestFitnessForDD, ProcessClass::ExcludeFit);
}
//TraceEvent("CheckBetterDDorRKNewRecruits", self->id).detail("MasterProcessId", self->masterProcessId)
//.detail("NewRecruitRKProcessId", newRKWorker.interf.locality.processId()).detail("NewRecruiteDDProcessId", newDDWorker.interf.locality.processId());
Optional<Standalone<StringRef>> currentRKProcessId;
Optional<Standalone<StringRef>> currentDDProcessId;
auto& db = self->db.serverInfo->get().read();
bool ratekeeperHealthy = false;
if (db.ratekeeper.present() && self->id_worker.count(db.ratekeeper.get().locality.processId()) &&
(!self->recruitingRatekeeperID.present() || (self->recruitingRatekeeperID.get() == db.ratekeeper.get().id()))) {
auto& rkWorker = self->id_worker[db.ratekeeper.get().locality.processId()];
currentRKProcessId = rkWorker.details.interf.locality.processId();
auto rkFitness = rkWorker.details.processClass.machineClassFitness(ProcessClass::Ratekeeper);
if(rkWorker.priorityInfo.isExcluded) {
rkFitness = ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) || rkFitness > bestFitnessForRK) {
if (self->isUsedNotMaster(rkWorker.details.interf.locality.processId()) || bestFitnessForRK < rkFitness
|| (rkFitness == bestFitnessForRK && rkWorker.details.interf.locality.processId() == self->masterProcessId && newRKWorker.interf.locality.processId() != self->masterProcessId)) {
TraceEvent("CCHaltRK", self->id).detail("RKID", db.ratekeeper.get().id())
.detail("Excluded", rkWorker.priorityInfo.isExcluded)
.detail("Fitness", rkFitness).detail("BestFitness", bestFitnessForRK);
self->recruitRatekeeper.set(true);
} else {
ratekeeperHealthy = true;
}
}
if (!self->recruitingDistributor && db.distributor.present() && self->id_worker.count(db.distributor.get().locality.processId())) {
auto& ddWorker = self->id_worker[db.distributor.get().locality.processId()];
auto ddFitness = ddWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
currentDDProcessId = ddWorker.details.interf.locality.processId();
if(ddWorker.priorityInfo.isExcluded) {
ddFitness = ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) || ddFitness > bestFitnessForDD) {
if (self->isUsedNotMaster(ddWorker.details.interf.locality.processId()) || bestFitnessForDD < ddFitness
|| (ddFitness == bestFitnessForDD && ddWorker.details.interf.locality.processId() == self->masterProcessId && newDDWorker.interf.locality.processId() != self->masterProcessId)
|| (ddFitness == bestFitnessForDD && newRKWorker.interf.locality.processId() != newDDWorker.interf.locality.processId() && ratekeeperHealthy && currentRKProcessId.present() && currentDDProcessId == currentRKProcessId
&& (newRKWorker.interf.locality.processId() != self->masterProcessId && newDDWorker.interf.locality.processId() != self->masterProcessId) )) {
TraceEvent("CCHaltDD", self->id).detail("DDID", db.distributor.get().id())
.detail("Excluded", ddWorker.priorityInfo.isExcluded)
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD);
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD)
.detail("CurrentRateKeeperProcessId", currentRKProcessId.present() ? currentRKProcessId.get() : LiteralStringRef("None"))
.detail("CurrentDDProcessId", currentDDProcessId)
.detail("MasterProcessID", self->masterProcessId)
.detail("NewRKWorkers", newRKWorker.interf.locality.processId())
.detail("NewDDWorker", newDDWorker.interf.locality.processId());
ddWorker.haltDistributor = brokenPromiseToNever(db.distributor.get().haltDataDistributor.getReply(HaltDataDistributorRequest(self->id)));
}
}

View File

@ -69,7 +69,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
init( DESIRED_OUTSTANDING_MESSAGES, 5000 ); if( randomize && BUGGIFY ) DESIRED_OUTSTANDING_MESSAGES = deterministicRandom()->randomInt(0,100);
init( DESIRED_GET_MORE_DELAY, 0.005 );
init( CONCURRENT_LOG_ROUTER_READS, 1 );
init( CONCURRENT_LOG_ROUTER_READS, 5 ); if( randomize && BUGGIFY ) CONCURRENT_LOG_ROUTER_READS = 1;
init( LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED, 1 ); if( randomize && BUGGIFY ) LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED = 0;
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );
init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 );

View File

@ -451,21 +451,32 @@ bool isWhitelisted(const vector<Standalone<StringRef>>& binPathVec, StringRef bi
return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end();
}
ACTOR Future<Void> adddBackupMutations(ProxyCommitData* self, std::map<Key, MutationListRef>* logRangeMutations,
ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, MutationListRef>* logRangeMutations,
LogPushData* toCommit, Version commitVersion) {
state std::map<Key, MutationListRef>::iterator logRangeMutation = logRangeMutations->begin();
state int32_t version = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
state int yieldBytes = 0;
state BinaryWriter valueWriter(Unversioned());
// Serialize the log range mutations within the map
for (; logRangeMutation != logRangeMutations->end(); ++logRangeMutation)
{
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield());
//FIXME: this is re-implementing the serialize function of MutationListRef in order to have a yield
valueWriter = BinaryWriter(IncludeVersion());
valueWriter << logRangeMutation->second.totalSize();
state MutationListRef::Blob* blobIter = logRangeMutation->second.blob_begin;
while(blobIter) {
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield());
}
valueWriter.serializeBytes(blobIter->data);
yieldBytes += blobIter->data.size();
blobIter = blobIter->next;
}
yieldBytes += logRangeMutation->second.expectedSize();
Key val = valueWriter.toValue();
BinaryWriter wr(Unversioned());
@ -480,8 +491,6 @@ ACTOR Future<Void> adddBackupMutations(ProxyCommitData* self, std::map<Key, Muta
backupMutation.type = MutationRef::SetValue;
uint32_t* partBuffer = NULL;
Key val = BinaryWriter::toValue(logRangeMutation->second, IncludeVersion());
for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < val.size(); part++) {
// Assign the second parameter as the part
@ -877,7 +886,7 @@ ACTOR Future<Void> commitBatch(
// Serialize and backup the mutations as a single mutation
if ((self->vecBackupKeys.size() > 1) && logRangeMutations.size()) {
wait( adddBackupMutations(self, &logRangeMutations, &toCommit, commitVersion) );
wait( addBackupMutations(self, &logRangeMutations, &toCommit, commitVersion) );
}
self->stats.mutations += mutationCount;

View File

@ -1080,7 +1080,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) );
}
if( req.tag.locality == tagLocalityLogRouter ) {
if( logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter ) {
wait( self->concurrentLogRouterReads.take() );
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait( delay(0.0, TaskPriority::Low) );

View File

@ -1246,6 +1246,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
bool requiresExtraDBMachines = extraDB && g_simulator.extraDB->toString() != conn.toString();
int assignedMachines = 0, nonVersatileMachines = 0;
std::vector<ProcessClass::ClassType> processClassesSubSet = {ProcessClass::UnsetClass, ProcessClass::ResolutionClass, ProcessClass::MasterClass};
for( int dc = 0; dc < dataCenters; dc++ ) {
//FIXME: test unset dcID
Optional<Standalone<StringRef>> dcUID = StringRef(format("%d", dc));
@ -1270,7 +1271,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
if(assignedMachines < 4)
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 2), ProcessClass::CommandLineSource); //Unset or Storage
else if(assignedMachines == 4 && !simconfig.db.regions.size())
processClass = ProcessClass((ProcessClass::ClassType) (deterministicRandom()->randomInt(0, 2) * ProcessClass::ResolutionClass), ProcessClass::CommandLineSource); //Unset or Resolution
processClass = ProcessClass(processClassesSubSet[deterministicRandom()->randomInt(0, processClassesSubSet.size())], ProcessClass::CommandLineSource); //Unset or Resolution or Master
else
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 3), ProcessClass::CommandLineSource); //Unset, Storage, or Transaction
if (processClass == ProcessClass::ResolutionClass) // *can't* be assigned to other roles, even in an emergency

View File

@ -1396,7 +1396,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) );
}
if( req.tag.locality == tagLocalityLogRouter ) {
if( logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter ) {
wait( self->concurrentLogRouterReads.take() );
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait( delay(0.0, TaskPriority::Low) );

View File

@ -69,6 +69,17 @@ struct WorkerInterface {
WorkerInterface() {}
WorkerInterface( const LocalityData& locality ) : locality( locality ) {}
void initEndpoints() {
clientInterface.initEndpoints();
tLog.getEndpoint( TaskPriority::Worker );
master.getEndpoint( TaskPriority::Worker );
masterProxy.getEndpoint( TaskPriority::Worker );
resolver.getEndpoint( TaskPriority::Worker );
logRouter.getEndpoint( TaskPriority::Worker );
debugPing.getEndpoint( TaskPriority::Worker );
coordinationPing.getEndpoint( TaskPriority::Worker );
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq, workerSnapReq);

View File

@ -820,6 +820,7 @@ ACTOR Future<Void> workerServer(
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf( locality );
interf.initEndpoints();
folder = abspath(folder);

View File

@ -115,7 +115,8 @@ public:
maxKeysPerTransaction = std::max(1, maxTransactionBytes / (maxValueLength + maxLongKeyLength));
if(maxTransactionBytes > 500000) {
TraceEvent("RemapEventSeverity").detail("TargetEvent", "Net2_LargePacket").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketSent").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketReceived").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargeTransaction").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "DiskQueueMemoryWarning").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
}

View File

@ -1424,17 +1424,16 @@ struct ConsistencyCheckWorkload : TestWorkload
}
// Check DataDistributor
ProcessClass::Fitness bestDistributorFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::DataDistributor);
if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) != bestDistributorFitness)) {
TraceEvent("ConsistencyCheck_DistributorNotBest").detail("BestDataDistributorFitness", bestDistributorFitness)
ProcessClass::Fitness fitnessLowerBound = allWorkerProcessMap[db.master.address()].processClass.machineClassFitness(ProcessClass::DataDistributor);
if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) > fitnessLowerBound)) {
TraceEvent("ConsistencyCheck_DistributorNotBest").detail("DataDistributorFitnessLowerBound", fitnessLowerBound)
.detail("ExistingDistributorFitness", nonExcludedWorkerProcessMap.count(db.distributor.get().address()) ? nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) : -1);
return false;
}
// Check Ratekeeper
ProcessClass::Fitness bestRatekeeperFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Ratekeeper);
if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) != bestRatekeeperFitness)) {
TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", bestRatekeeperFitness)
if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) > fitnessLowerBound)) {
TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", fitnessLowerBound)
.detail("ExistingRatekeeperFitness", nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) ? nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) : -1);
return false;
}

View File

@ -154,7 +154,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
.detail("MaxClearSize", maxClearSize)
.detail("UseSystemKeys", useSystemKeys);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "Net2_LargePacket").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketSent").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketReceived").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargeTransaction").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
}

View File

@ -44,6 +44,7 @@ enum class TaskPriority {
Coordination = 8800,
FailureMonitor = 8700,
ResolutionMetrics = 8700,
Worker = 8660,
ClusterController = 8650,
MasterTLogRejoin = 8646,
ProxyStorageRejoin = 8645,

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{508FF8D2-273F-4511-B724-344F5F4362A4}'
Id='{3FD36753-5282-41A8-A9E3-53B56550FB49}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'