Merge pull request #2362 from etschannen/master

Merge 6.2 into master
This commit is contained in:
Evan Tschannen 2019-12-02 15:04:27 -08:00 committed by GitHub
commit 07331ab5fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 289 additions and 112 deletions

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.8.pkg <https://www.foundationdb.org/downloads/6.2.8/macOS/installers/FoundationDB-6.2.8.pkg>`_
* `FoundationDB-6.2.11.pkg <https://www.foundationdb.org/downloads/6.2.11/macOS/installers/FoundationDB-6.2.11.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.8/ubuntu/installers/foundationdb-clients_6.2.8-1_amd64.deb>`_
* `foundationdb-server-6.2.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.8/ubuntu/installers/foundationdb-server_6.2.8-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.11-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.11/ubuntu/installers/foundationdb-clients_6.2.11-1_amd64.deb>`_
* `foundationdb-server-6.2.11-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.11/ubuntu/installers/foundationdb-server_6.2.11-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel6/installers/foundationdb-clients-6.2.8-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel6/installers/foundationdb-server-6.2.8-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.11-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel6/installers/foundationdb-clients-6.2.11-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.11-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel6/installers/foundationdb-server-6.2.11-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel7/installers/foundationdb-clients-6.2.8-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel7/installers/foundationdb-server-6.2.8-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.11-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel7/installers/foundationdb-clients-6.2.11-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.11-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel7/installers/foundationdb-server-6.2.11-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.8-x64.msi <https://www.foundationdb.org/downloads/6.2.8/windows/installers/foundationdb-6.2.8-x64.msi>`_
* `foundationdb-6.2.11-x64.msi <https://www.foundationdb.org/downloads/6.2.11/windows/installers/foundationdb-6.2.11-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.8.tar.gz <https://www.foundationdb.org/downloads/6.2.8/bindings/python/foundationdb-6.2.8.tar.gz>`_
* `foundationdb-6.2.11.tar.gz <https://www.foundationdb.org/downloads/6.2.11/bindings/python/foundationdb-6.2.11.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.8.gem <https://www.foundationdb.org/downloads/6.2.8/bindings/ruby/fdb-6.2.8.gem>`_
* `fdb-6.2.11.gem <https://www.foundationdb.org/downloads/6.2.11/bindings/ruby/fdb-6.2.11.gem>`_
Java 8+
-------
* `fdb-java-6.2.8.jar <https://www.foundationdb.org/downloads/6.2.8/bindings/java/fdb-java-6.2.8.jar>`_
* `fdb-java-6.2.8-javadoc.jar <https://www.foundationdb.org/downloads/6.2.8/bindings/java/fdb-java-6.2.8-javadoc.jar>`_
* `fdb-java-6.2.11.jar <https://www.foundationdb.org/downloads/6.2.11/bindings/java/fdb-java-6.2.11.jar>`_
* `fdb-java-6.2.11-javadoc.jar <https://www.foundationdb.org/downloads/6.2.11/bindings/java/fdb-java-6.2.11-javadoc.jar>`_
Go 1.11+
--------

View File

@ -2,6 +2,36 @@
Release Notes
#############
6.2.11
======
Fixes
-----
* Clients could hang indefinitely on reads if all storage servers holding a keyrange were removed from a cluster since the last time the client read a key in the range. `(PR #2377) <https://github.com/apple/foundationdb/pull/2377>`_.
* In rare scenarios, status could falsely report no replicas remain of some data. `(PR #2380) <https://github.com/apple/foundationdb/pull/2380>`_.
* Latency band tracking could fail to configure correctly after a recovery or upon process startup. `(PR #2371) <https://github.com/apple/foundationdb/pull/2371>`_.
6.2.10
======
Fixes
-----
* ``backup_agent`` crashed on startup. `(PR #2356) <https://github.com/apple/foundationdb/pull/2356>`_.
6.2.9
=====
Fixes
-----
* Small clusters using specific sets of process classes could cause the data distributor to be continuously killed and re-recruited. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* The data distributor and ratekeeper could be recruited on non-optimal processes. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* A ``kill`` command from ``fdbcli`` could take a long time before being executed by a busy process. `(PR #2339) <https://github.com/apple/foundationdb/pull/2339>`_.
* Committing transactions larger than 1 MB could cause the proxy to stall for up to a second. `(PR #2350) <https://github.com/apple/foundationdb/pull/2350>`_.
* Transaction timeouts would use memory for the entire duration of the timeout, regardless of whether the transaction had been destroyed. `(PR #2353) <https://github.com/apple/foundationdb/pull/2353>`_.
6.2.8
=====
@ -63,7 +93,6 @@ Fixes
* Status would report incorrect fault tolerance metrics when a remote region was configured and the primary region lost a storage replica. [6.2.6] `(PR #2230) <https://github.com/apple/foundationdb/pull/2230>`_
* The cluster would not change to a new set of satellite transaction logs when they become available in a better satellite location. [6.2.6] `(PR #2241) <https://github.com/apple/foundationdb/pull/2241>`_.
* The existence of ``proxy`` or ``resolver`` class processes prevented ``stateless`` class processes from being recruited as proxies or resolvers. [6.2.6] `(PR #2241) <https://github.com/apple/foundationdb/pull/2241>`_.
* Committing transactions larger than 1 MB could cause the proxy to stall for up to a second. [6.2.6] `(PR #2250) <https://github.com/apple/foundationdb/pull/2250>`_.
* The cluster controller could become saturated in clusters with large numbers of connected clients using TLS. [6.2.6] `(PR #2252) <https://github.com/apple/foundationdb/pull/2252>`_.
* Backup and DR would not share a mutation stream if they were started on different versions of FoundationDB. Either backup or DR must be restarted to resolve this issue. [6.2.6] `(PR #2202) <https://github.com/apple/foundationdb/pull/2202>`_.
* Don't track batch priority GRV requests in latency bands. [6.2.7] `(PR #2279) <https://github.com/apple/foundationdb/pull/2279>`_.

View File

@ -39,6 +39,10 @@ struct ClientWorkerInterface {
UID id() const { return reboot.getEndpoint().token; }
NetworkAddress address() const { return reboot.getEndpoint().getPrimaryAddress(); }
void initEndpoints() {
reboot.getEndpoint( TaskPriority::ReadSocket );
}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, reboot, profiler);

View File

@ -68,6 +68,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
init( BROADCAST_BATCH_SIZE, 20 ); if( randomize && BUGGIFY ) BROADCAST_BATCH_SIZE = 1;
init( TRANSACTION_TIMEOUT_DELAY_INTERVAL, 10.0 ); if( randomize && BUGGIFY ) TRANSACTION_TIMEOUT_DELAY_INTERVAL = 1.0;
init( LOCATION_CACHE_EVICTION_SIZE, 600000 );
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
@ -97,6 +98,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MUTATION_BLOCK_SIZE, 10000 );
// TaskBucket
init( TASKBUCKET_LOGGING_DELAY, 5.0 );
init( TASKBUCKET_MAX_PRIORITY, 1 );
init( TASKBUCKET_CHECK_TIMEOUT_CHANCE, 0.02 ); if( randomize && BUGGIFY ) TASKBUCKET_CHECK_TIMEOUT_CHANCE = 1.0;
init( TASKBUCKET_TIMEOUT_JITTER_OFFSET, 0.9 );

View File

@ -66,6 +66,7 @@ public:
int MAX_BATCH_SIZE;
double GRV_BATCH_TIMEOUT;
int BROADCAST_BATCH_SIZE;
double TRANSACTION_TIMEOUT_DELAY_INTERVAL;
// When locationCache in DatabaseContext gets to be this size, items will be evicted
int LOCATION_CACHE_EVICTION_SIZE;
@ -99,6 +100,7 @@ public:
int MUTATION_BLOCK_SIZE;
// Taskbucket
double TASKBUCKET_LOGGING_DELAY;
int TASKBUCKET_MAX_PRIORITY;
double TASKBUCKET_CHECK_TIMEOUT_CHANCE;
double TASKBUCKET_TIMEOUT_JITTER_OFFSET;

View File

@ -32,13 +32,15 @@ struct MutationListRef {
// Each blob has a struct Header following by the mutation's param1 and param2 content.
// The Header has the mutation's type and the length of param1 and param2
private:
public:
struct Blob {
// StringRef data Format: |type|p1len|p2len|p1_content|p2_content|
// |type|p1len|p2len| is the header; p1_content has p1len length; p2_content has p2len length
StringRef data;
Blob* next;
};
Blob *blob_begin;
private:
struct Header {
int type, p1len, p2len;
const uint8_t* p1begin() const {
@ -148,6 +150,8 @@ public:
blob_begin->data = StringRef((const uint8_t*)ar.arenaRead(totalBytes), totalBytes); // Zero-copy read when deserializing from an ArenaReader
}
}
//FIXME: this is re-implemented on the master proxy to include a yield, any changes to this function should also done there
template <class Ar>
void serialize_save( Ar& ar ) const {
serializer(ar, totalBytes);
@ -180,7 +184,7 @@ private:
return b;
}
Blob *blob_begin, *blob_end;
Blob *blob_end;
int totalBytes;
};
typedef Standalone<MutationListRef> MutationList;

View File

@ -1133,8 +1133,8 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
}
ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
if (now() < endTime) {
wait ( delayUntil( endTime ) );
while(now() < endTime) {
wait( delayUntil( std::min(endTime + 0.0001, now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL) ) );
}
if( !resetPromise.isSet() )
resetPromise.sendError(transaction_timed_out());

View File

@ -316,6 +316,7 @@ public:
ACTOR static Future<Void> extendTimeoutRepeatedly(Database cx, Reference<TaskBucket> taskBucket, Reference<Task> task) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state double start = now();
state Version versionNow = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
taskBucket->setOptions(tr);
return map(tr->getReadVersion(), [=](Version v) {
@ -329,6 +330,13 @@ public:
// Wait until we are half way to the timeout version of this task
wait(delay(0.8 * (BUGGIFY ? (2 * deterministicRandom()->random01()) : 1.0) * (double)(task->timeoutVersion - (uint64_t)versionNow) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
if(now() - start > 300) {
TraceEvent(SevWarnAlways, "TaskBucketLongExtend")
.detail("Duration", now() - start)
.detail("TaskUID", task->key)
.detail("TaskType", task->params[Task::reservedTaskParamKeyType])
.detail("Priority", task->getPriority());
}
// Take the extendMutex lock until we either succeed or stop trying to extend due to failure
wait(task->extendMutex.take());
releaser = FlowLock::Releaser(task->extendMutex, 1);
@ -430,6 +438,7 @@ public:
loop {
// Start running tasks while slots are available and we keep finding work to do
++taskBucket->dispatchSlotChecksStarted;
while(!availableSlots.empty()) {
getTasks.clear();
for(int i = 0, imax = std::min<unsigned int>(getBatchSize, availableSlots.size()); i < imax; ++i)
@ -439,18 +448,22 @@ public:
bool done = false;
for(int i = 0; i < getTasks.size(); ++i) {
if(getTasks[i].isError()) {
++taskBucket->dispatchErrors;
done = true;
continue;
}
Reference<Task> task = getTasks[i].get();
if(task) {
// Start the task
++taskBucket->dispatchDoTasks;
int slot = availableSlots.back();
availableSlots.pop_back();
tasks[slot] = taskBucket->doTask(cx, futureBucket, task);
}
else
else {
++taskBucket->dispatchEmptyTasks;
done = true;
}
}
if(done) {
@ -460,11 +473,16 @@ public:
else
getBatchSize = std::min<unsigned int>(getBatchSize * 2, maxConcurrentTasks);
}
++taskBucket->dispatchSlotChecksComplete;
// Wait for a task to be done. Also, if we have any slots available then stop waiting after pollDelay at the latest.
Future<Void> w = ready(waitForAny(tasks));
if(!availableSlots.empty())
if(!availableSlots.empty()) {
if(*pollDelay > 600) {
TraceEvent(SevWarnAlways, "TaskBucketLongPollDelay").suppressFor(1.0).detail("Delay", *pollDelay);
}
w = w || delay(*pollDelay * (0.9 + deterministicRandom()->random01() / 5)); // Jittered by 20 %, so +/- 10%
}
wait(w);
// Check all of the task slots, any that are finished should be replaced with Never() and their slots added back to availableSlots
@ -497,7 +515,7 @@ public:
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {
state Reference<AsyncVar<bool>> paused = Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) );
state Future<Void> watchPausedFuture = watchPaused(cx, taskBucket, paused);
taskBucket->metricLogger = traceCounters("TaskBucketMetrics", taskBucket->dbgid, CLIENT_KNOBS->TASKBUCKET_LOGGING_DELAY, &taskBucket->cc);
loop {
while(paused->get()) {
wait(paused->onChange() || watchPausedFuture);
@ -783,6 +801,13 @@ TaskBucket::TaskBucket(const Subspace& subspace, bool sysAccess, bool priorityBa
, system_access(sysAccess)
, priority_batch(priorityBatch)
, lock_aware(lockAware)
, cc("TaskBucket")
, dbgid( deterministicRandom()->randomUniqueID() )
, dispatchSlotChecksStarted("DispatchSlotChecksStarted", cc)
, dispatchErrors("DispatchErrors", cc)
, dispatchDoTasks("DispatchDoTasks", cc)
, dispatchEmptyTasks("DispatchEmptyTasks", cc)
, dispatchSlotChecksComplete("DispatchSlotChecksComplete", cc)
{
}

View File

@ -228,12 +228,23 @@ public:
Database src;
Map<Key, Future<Reference<KeyRangeMap<Version>>>> key_version;
CounterCollection cc;
Counter dispatchSlotChecksStarted;
Counter dispatchErrors;
Counter dispatchDoTasks;
Counter dispatchEmptyTasks;
Counter dispatchSlotChecksComplete;
UID dbgid;
double getTimeoutSeconds() const {
return (double)timeout / CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
}
private:
friend class TaskBucketImpl;
Future<Void> metricLogger;
Subspace prefix;
Subspace active;
Key pauseKey;

View File

@ -428,31 +428,41 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
self->lastConnectTime = now();
TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
Reference<IConnection> _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference<IConnection>() ) );
if (_conn) {
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
try {
choose {
when( Reference<IConnection> _conn = wait( INetworkConnections::net()->connect(self->destination) ) ) {
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
}
if (self->unsent.empty()) {
_conn->close();
clientReconnectDelay = false;
continue;
} else {
conn = _conn;
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
self->prependConnectPacket();
}
reader = connectionReader( self->transport, conn, self, Promise<Reference<Peer>>());
}
when( wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) ) ) {
throw connection_failed();
}
}
if (self->unsent.empty()) {
_conn->close();
clientReconnectDelay = false;
continue;
} else {
conn = _conn;
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
self->prependConnectPacket();
} catch( Error &e ) {
if(e.code() != error_code_connection_failed) {
throw;
}
} else {
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
clientReconnectDelay = true;
}
throw connection_failed();
throw;
}
reader = connectionReader( self->transport, conn, self, Promise<Reference<Peer>>());
} else {
self->outgoingConnectionIdle = false;
}
@ -497,7 +507,10 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
.detail("PeerAddr", self->destination);
}
if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) {
if(self->destination.isPublic()
&& IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()
&& !FlowTransport::transport().isClient())
{
auto& it = self->transport->closedPeers[self->destination];
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
it.first = now();
@ -511,9 +524,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
}
if (conn) {
if (FlowTransport::transport().isClient() && e.code() != error_code_connection_idle) {
clientReconnectDelay = true;
}
clientReconnectDelay = FlowTransport::transport().isClient() && e.code() != error_code_connection_idle;
conn->close();
conn = Reference<IConnection>();
}
@ -686,7 +697,7 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
}
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
TraceEvent(SevError, "PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
throw platform_error();
}
@ -740,7 +751,7 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
++transport->countPacketsReceived;
if (packetLen > FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(transport->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket")
TraceEvent(transport->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketReceived")
.suppressFor(1.0)
.detail("FromPeer", peerAddress.toString())
.detail("Length", (int)packetLen)
@ -767,7 +778,7 @@ static int getNewBufferSize(const uint8_t* begin, const uint8_t* end, const Netw
}
const uint32_t packetLen = *(uint32_t*)begin;
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
TraceEvent(SevError, "PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
throw platform_error();
}
return std::max<uint32_t>(FLOW_KNOBS->MIN_PACKET_BUFFER_BYTES,
@ -1216,11 +1227,11 @@ static ReliablePacket* sendPacket( TransportData* self, Reference<Peer> peer, IS
}
if (len > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len);
TraceEvent(SevError, "PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len);
// throw platform_error(); // FIXME: How to recover from this situation?
}
else if (len > FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket")
TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "LargePacketSent")
.suppressFor(1.0)
.detail("ToPeer", destination.getPrimaryAddress())
.detail("Length", (int)len)

View File

@ -293,6 +293,7 @@ private:
void closeInternal() {
if(peer) {
peer->peerClosed();
stopReceive = delay(1.0);
}
leakedConnectionTracker.cancel();
peer.clear();
@ -835,8 +836,11 @@ public:
}
ACTOR static Future<Reference<IConnection>> onConnect( Future<Void> ready, Reference<Sim2Conn> conn ) {
wait(ready);
if (conn->isPeerGone() && deterministicRandom()->random01()<0.5) {
if (conn->isPeerGone()) {
conn.clear();
if(FLOW_KNOBS->SIM_CONNECT_ERROR_MODE == 1 || (FLOW_KNOBS->SIM_CONNECT_ERROR_MODE == 2 && deterministicRandom()->random01() > 0.5)) {
throw connection_failed();
}
wait(Never());
}
conn->opened = true;

View File

@ -1194,17 +1194,24 @@ public:
return false;
}
bool isProxyOrResolver(Optional<Key> processId) {
bool isUsedNotMaster(Optional<Key> processId) {
ASSERT(masterProcessId.present());
if (processId == masterProcessId) return false;
auto& dbInfo = db.serverInfo->get().read();
for (const auto& tlogset : dbInfo.logSystemConfig.tLogs) {
for (const auto& tlog: tlogset.tLogs) {
if (tlog.present() && tlog.interf().locality.processId() == processId) return true;
}
}
for (const MasterProxyInterface& interf : dbInfo.client.proxies) {
if (interf.locality.processId() == processId) return true;
}
for (const ResolverInterface& interf: dbInfo.resolvers) {
if (interf.locality.processId() == processId) return true;
}
if (processId == clusterControllerProcessId) return true;
return false;
}
@ -1214,7 +1221,7 @@ public:
if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper) || pid == masterProcessId.get()) {
return false;
}
return isProxyOrResolver(pid);
return isUsedNotMaster(pid);
}
std::map< Optional<Standalone<StringRef>>, int> getUsedIds() {
@ -1373,6 +1380,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
dbInfo.distributor = db->serverInfo->get().read().distributor;
dbInfo.ratekeeper = db->serverInfo->get().read().ratekeeper;
dbInfo.storageCaches = db->serverInfo->get().read().storageCaches;
dbInfo.latencyBandConfig = db->serverInfo->get().read().latencyBandConfig;
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->serverInfo->set( cachedInfo );
@ -1521,35 +1529,74 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
return;
}
auto& db = self->db.serverInfo->get().read();
auto bestFitnessForRK = self->getBestFitnessForRoleInDatacenter(ProcessClass::Ratekeeper);
auto bestFitnessForDD = self->getBestFitnessForRoleInDatacenter(ProcessClass::DataDistributor);
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
WorkerDetails newRKWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::Ratekeeper, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
if (self->onMasterIsBetter(newRKWorker, ProcessClass::Ratekeeper)) {
newRKWorker = self->id_worker[self->masterProcessId.get()].details;
}
id_used = self->getUsedIds();
for(auto& it : id_used) {
it.second *= 2;
}
id_used[newRKWorker.interf.locality.processId()]++;
WorkerDetails newDDWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
if (self->onMasterIsBetter(newDDWorker, ProcessClass::DataDistributor)) {
newDDWorker = self->id_worker[self->masterProcessId.get()].details;
}
auto bestFitnessForRK = newRKWorker.processClass.machineClassFitness(ProcessClass::Ratekeeper);
if(self->db.config.isExcludedServer(newRKWorker.interf.address())) {
bestFitnessForRK = std::max(bestFitnessForRK, ProcessClass::ExcludeFit);
}
auto bestFitnessForDD = newDDWorker.processClass.machineClassFitness(ProcessClass::DataDistributor);
if(self->db.config.isExcludedServer(newDDWorker.interf.address())) {
bestFitnessForDD = std::max(bestFitnessForDD, ProcessClass::ExcludeFit);
}
//TraceEvent("CheckBetterDDorRKNewRecruits", self->id).detail("MasterProcessId", self->masterProcessId)
//.detail("NewRecruitRKProcessId", newRKWorker.interf.locality.processId()).detail("NewRecruiteDDProcessId", newDDWorker.interf.locality.processId());
Optional<Standalone<StringRef>> currentRKProcessId;
Optional<Standalone<StringRef>> currentDDProcessId;
auto& db = self->db.serverInfo->get().read();
bool ratekeeperHealthy = false;
if (db.ratekeeper.present() && self->id_worker.count(db.ratekeeper.get().locality.processId()) &&
(!self->recruitingRatekeeperID.present() || (self->recruitingRatekeeperID.get() == db.ratekeeper.get().id()))) {
auto& rkWorker = self->id_worker[db.ratekeeper.get().locality.processId()];
currentRKProcessId = rkWorker.details.interf.locality.processId();
auto rkFitness = rkWorker.details.processClass.machineClassFitness(ProcessClass::Ratekeeper);
if(rkWorker.priorityInfo.isExcluded) {
rkFitness = ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) || rkFitness > bestFitnessForRK) {
if (self->isUsedNotMaster(rkWorker.details.interf.locality.processId()) || bestFitnessForRK < rkFitness
|| (rkFitness == bestFitnessForRK && rkWorker.details.interf.locality.processId() == self->masterProcessId && newRKWorker.interf.locality.processId() != self->masterProcessId)) {
TraceEvent("CCHaltRK", self->id).detail("RKID", db.ratekeeper.get().id())
.detail("Excluded", rkWorker.priorityInfo.isExcluded)
.detail("Fitness", rkFitness).detail("BestFitness", bestFitnessForRK);
self->recruitRatekeeper.set(true);
} else {
ratekeeperHealthy = true;
}
}
if (!self->recruitingDistributor && db.distributor.present() && self->id_worker.count(db.distributor.get().locality.processId())) {
auto& ddWorker = self->id_worker[db.distributor.get().locality.processId()];
auto ddFitness = ddWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
currentDDProcessId = ddWorker.details.interf.locality.processId();
if(ddWorker.priorityInfo.isExcluded) {
ddFitness = ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) || ddFitness > bestFitnessForDD) {
if (self->isUsedNotMaster(ddWorker.details.interf.locality.processId()) || bestFitnessForDD < ddFitness
|| (ddFitness == bestFitnessForDD && ddWorker.details.interf.locality.processId() == self->masterProcessId && newDDWorker.interf.locality.processId() != self->masterProcessId)
|| (ddFitness == bestFitnessForDD && newRKWorker.interf.locality.processId() != newDDWorker.interf.locality.processId() && ratekeeperHealthy && currentRKProcessId.present() && currentDDProcessId == currentRKProcessId
&& (newRKWorker.interf.locality.processId() != self->masterProcessId && newDDWorker.interf.locality.processId() != self->masterProcessId) )) {
TraceEvent("CCHaltDD", self->id).detail("DDID", db.distributor.get().id())
.detail("Excluded", ddWorker.priorityInfo.isExcluded)
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD);
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD)
.detail("CurrentRateKeeperProcessId", currentRKProcessId.present() ? currentRKProcessId.get() : LiteralStringRef("None"))
.detail("CurrentDDProcessId", currentDDProcessId)
.detail("MasterProcessID", self->masterProcessId)
.detail("NewRKWorkers", newRKWorker.interf.locality.processId())
.detail("NewDDWorker", newDDWorker.interf.locality.processId());
ddWorker.haltDistributor = brokenPromiseToNever(db.distributor.get().haltDataDistributor.getReply(HaltDataDistributorRequest(self->id)));
}
}

View File

@ -2380,7 +2380,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Remove the removedMachineInfo machine and any related machine team
void removeMachine(DDTeamCollection* self, Reference<TCMachineInfo> removedMachineInfo) {
void removeMachine(Reference<TCMachineInfo> removedMachineInfo) {
// Find machines that share teams with the removed machine
std::set<Standalone<StringRef>> machinesWithAjoiningTeams;
for (auto& machineTeam : removedMachineInfo->machineTeams) {
@ -2454,7 +2454,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return foundMachineTeam;
}
void removeServer(DDTeamCollection* self, UID removedServer) {
void removeServer(UID removedServer) {
TraceEvent("RemovedStorageServer", distributorId).detail("ServerID", removedServer);
// ASSERT( !shardsAffectedByTeamFailure->getServersForTeam( t ) for all t in teams that contain removedServer )
@ -2503,6 +2503,14 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("Debug", "ThisShouldRarelyHappen_CheckInfoBelow");
}
for (int t = 0; t < badTeams.size(); t++) {
if ( std::count( badTeams[t]->getServerIDs().begin(), badTeams[t]->getServerIDs().end(), removedServer ) ) {
badTeams[t]->tracker.cancel();
badTeams[t--] = badTeams.back();
badTeams.pop_back();
}
}
// Step: Remove machine info related to removedServer
// Remove the server from its machine
Reference<TCMachineInfo> removedMachineInfo = removedServerInfo->machine;
@ -2518,7 +2526,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Note: Remove machine (and machine team) after server teams have been removed, because
// we remove a machine team only when the server teams on it have been removed
if (removedMachineInfo->serversOnMachine.size() == 0) {
removeMachine(self, removedMachineInfo);
removeMachine(removedMachineInfo);
}
// If the machine uses removedServer's locality and the machine still has servers, the the machine's
@ -2527,9 +2535,9 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// This is ok as long as we do not arbitrarily validate if machine team satisfies replication policy.
if (server_info[removedServer]->wrongStoreTypeToRemove.get()) {
if (self->wrongStoreTypeRemover.isReady()) {
self->wrongStoreTypeRemover = removeWrongStoreType(self);
self->addActor.send(self->wrongStoreTypeRemover);
if (wrongStoreTypeRemover.isReady()) {
wrongStoreTypeRemover = removeWrongStoreType(this);
addActor.send(wrongStoreTypeRemover);
}
}
@ -3621,7 +3629,7 @@ ACTOR Future<Void> storageServerTracker(
if (machine->serversOnMachine.size() == 1) {
// When server is the last server on the machine,
// remove the machine and the related machine team
self->removeMachine(self, machine);
self->removeMachine(machine);
server->machine = Reference<TCMachineInfo>();
} else {
// we remove the server from the machine, and
@ -4129,7 +4137,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(
loop choose {
when( UID removedServer = waitNext( self->removedServers.getFuture() ) ) {
TEST(true); // Storage server removed from database
self->removeServer(self, removedServer);
self->removeServer(removedServer);
serverRemoved.send( Void() );
self->restartRecruiting.trigger();

View File

@ -69,7 +69,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
init( DESIRED_OUTSTANDING_MESSAGES, 5000 ); if( randomize && BUGGIFY ) DESIRED_OUTSTANDING_MESSAGES = deterministicRandom()->randomInt(0,100);
init( DESIRED_GET_MORE_DELAY, 0.005 );
init( CONCURRENT_LOG_ROUTER_READS, 1 );
init( CONCURRENT_LOG_ROUTER_READS, 5 ); if( randomize && BUGGIFY ) CONCURRENT_LOG_ROUTER_READS = 1;
init( LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED, 1 ); if( randomize && BUGGIFY ) LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED = 0;
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );
init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 );

View File

@ -268,6 +268,34 @@ struct ProxyCommitData {
}
return false;
}
void updateLatencyBandConfig(Optional<LatencyBandConfig> newLatencyBandConfig) {
if(newLatencyBandConfig.present() != latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != latencyBandConfig.get().grvConfig))
{
TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.grvLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().grvConfig.bands) {
stats.grvLatencyBands.addThreshold(band);
}
}
}
if(newLatencyBandConfig.present() != latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().commitConfig != latencyBandConfig.get().commitConfig))
{
TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.commitLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().commitConfig.bands) {
stats.commitLatencyBands.addThreshold(band);
}
}
}
latencyBandConfig = newLatencyBandConfig;
}
ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit, Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
@ -462,21 +490,32 @@ bool isWhitelisted(const vector<Standalone<StringRef>>& binPathVec, StringRef bi
return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end();
}
ACTOR Future<Void> adddBackupMutations(ProxyCommitData* self, std::map<Key, MutationListRef>* logRangeMutations,
ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, MutationListRef>* logRangeMutations,
LogPushData* toCommit, Version commitVersion) {
state std::map<Key, MutationListRef>::iterator logRangeMutation = logRangeMutations->begin();
state int32_t version = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
state int yieldBytes = 0;
state BinaryWriter valueWriter(Unversioned());
// Serialize the log range mutations within the map
for (; logRangeMutation != logRangeMutations->end(); ++logRangeMutation)
{
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield());
//FIXME: this is re-implementing the serialize function of MutationListRef in order to have a yield
valueWriter = BinaryWriter(IncludeVersion());
valueWriter << logRangeMutation->second.totalSize();
state MutationListRef::Blob* blobIter = logRangeMutation->second.blob_begin;
while(blobIter) {
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield());
}
valueWriter.serializeBytes(blobIter->data);
yieldBytes += blobIter->data.size();
blobIter = blobIter->next;
}
yieldBytes += logRangeMutation->second.expectedSize();
Key val = valueWriter.toValue();
BinaryWriter wr(Unversioned());
@ -491,8 +530,6 @@ ACTOR Future<Void> adddBackupMutations(ProxyCommitData* self, std::map<Key, Muta
backupMutation.type = MutationRef::SetValue;
uint32_t* partBuffer = NULL;
Key val = BinaryWriter::toValue(logRangeMutation->second, IncludeVersion());
for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < val.size(); part++) {
// Assign the second parameter as the part
@ -897,7 +934,7 @@ ACTOR Future<Void> commitBatch(
// Serialize and backup the mutations as a single mutation
if ((self->vecBackupKeys.size() > 1) && logRangeMutations.size()) {
wait( adddBackupMutations(self, &logRangeMutations, &toCommit, commitVersion) );
wait( addBackupMutations(self, &logRangeMutations, &toCommit, commitVersion) );
}
self->stats.mutations += mutationCount;
@ -1649,6 +1686,8 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
createWhitelistBinPathVec(whitelistBinPaths, commitData.whitelistedBinPathVec);
commitData.updateLatencyBandConfig(commitData.db->get().latencyBandConfig);
// ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.
// COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR is an estimate based on experiments and not an accurate one.
state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast<int64_t>((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR));
@ -1684,33 +1723,7 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.logSystem->popTxs(commitData.lastTxsPop, tagLocalityRemoteLog);
}
Optional<LatencyBandConfig> newLatencyBandConfig = commitData.db->get().latencyBandConfig;
if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != commitData.latencyBandConfig.get().grvConfig))
{
TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
commitData.stats.grvLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().grvConfig.bands) {
commitData.stats.grvLatencyBands.addThreshold(band);
}
}
}
if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().commitConfig != commitData.latencyBandConfig.get().commitConfig))
{
TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
commitData.stats.commitLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().commitConfig.bands) {
commitData.stats.commitLatencyBands.addThreshold(band);
}
}
}
commitData.latencyBandConfig = newLatencyBandConfig;
commitData.updateLatencyBandConfig(commitData.db->get().latencyBandConfig);
}
when(wait(onError)) {}
when(std::pair<vector<CommitTransactionRequest>, int> batchedRequests = waitNext(batchedCommits.getFuture())) {

View File

@ -1100,7 +1100,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) );
}
if( req.tag.locality == tagLocalityLogRouter ) {
if( logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter ) {
wait( self->concurrentLogRouterReads.take() );
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait( delay(0.0, TaskPriority::Low) );

View File

@ -1246,6 +1246,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
bool requiresExtraDBMachines = extraDB && g_simulator.extraDB->toString() != conn.toString();
int assignedMachines = 0, nonVersatileMachines = 0;
std::vector<ProcessClass::ClassType> processClassesSubSet = {ProcessClass::UnsetClass, ProcessClass::ResolutionClass, ProcessClass::MasterClass};
for( int dc = 0; dc < dataCenters; dc++ ) {
//FIXME: test unset dcID
Optional<Standalone<StringRef>> dcUID = StringRef(format("%d", dc));
@ -1277,7 +1278,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
if(assignedMachines < 4)
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 2), ProcessClass::CommandLineSource); //Unset or Storage
else if(assignedMachines == 4 && !simconfig.db.regions.size())
processClass = ProcessClass((ProcessClass::ClassType) (deterministicRandom()->randomInt(0, 2) * ProcessClass::ResolutionClass), ProcessClass::CommandLineSource); //Unset or Resolution
processClass = ProcessClass(processClassesSubSet[deterministicRandom()->randomInt(0, processClassesSubSet.size())], ProcessClass::CommandLineSource); //Unset or Resolution or Master
else
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 3), ProcessClass::CommandLineSource); //Unset, Storage, or Transaction
if (processClass == ProcessClass::ResolutionClass) // *can't* be assigned to other roles, even in an emergency

View File

@ -1416,7 +1416,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) );
}
if( req.tag.locality == tagLocalityLogRouter ) {
if( logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter ) {
wait( self->concurrentLogRouterReads.take() );
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
wait( delay(0.0, TaskPriority::Low) );

View File

@ -69,6 +69,17 @@ struct WorkerInterface {
WorkerInterface() {}
WorkerInterface( const LocalityData& locality ) : locality( locality ) {}
void initEndpoints() {
clientInterface.initEndpoints();
tLog.getEndpoint( TaskPriority::Worker );
master.getEndpoint( TaskPriority::Worker );
masterProxy.getEndpoint( TaskPriority::Worker );
resolver.getEndpoint( TaskPriority::Worker );
logRouter.getEndpoint( TaskPriority::Worker );
debugPing.getEndpoint( TaskPriority::Worker );
coordinationPing.getEndpoint( TaskPriority::Worker );
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest, execReq, workerSnapReq);

View File

@ -856,6 +856,7 @@ ACTOR Future<Void> workerServer(
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf( locality );
interf.initEndpoints();
folder = abspath(folder);

View File

@ -115,7 +115,8 @@ public:
maxKeysPerTransaction = std::max(1, maxTransactionBytes / (maxValueLength + maxLongKeyLength));
if(maxTransactionBytes > 500000) {
TraceEvent("RemapEventSeverity").detail("TargetEvent", "Net2_LargePacket").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketSent").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketReceived").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargeTransaction").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "DiskQueueMemoryWarning").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
}

View File

@ -1424,17 +1424,16 @@ struct ConsistencyCheckWorkload : TestWorkload
}
// Check DataDistributor
ProcessClass::Fitness bestDistributorFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::DataDistributor);
if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) != bestDistributorFitness)) {
TraceEvent("ConsistencyCheck_DistributorNotBest").detail("BestDataDistributorFitness", bestDistributorFitness)
ProcessClass::Fitness fitnessLowerBound = allWorkerProcessMap[db.master.address()].processClass.machineClassFitness(ProcessClass::DataDistributor);
if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) > fitnessLowerBound)) {
TraceEvent("ConsistencyCheck_DistributorNotBest").detail("DataDistributorFitnessLowerBound", fitnessLowerBound)
.detail("ExistingDistributorFitness", nonExcludedWorkerProcessMap.count(db.distributor.get().address()) ? nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) : -1);
return false;
}
// Check Ratekeeper
ProcessClass::Fitness bestRatekeeperFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Ratekeeper);
if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) != bestRatekeeperFitness)) {
TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", bestRatekeeperFitness)
if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) > fitnessLowerBound)) {
TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", fitnessLowerBound)
.detail("ExistingRatekeeperFitness", nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) ? nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) : -1);
return false;
}

View File

@ -154,7 +154,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
.detail("MaxClearSize", maxClearSize)
.detail("UseSystemKeys", useSystemKeys);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "Net2_LargePacket").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketSent").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketReceived").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargeTransaction").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
}

View File

@ -136,6 +136,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( SLOW_NETWORK_LATENCY, 100e-3 );
init( MAX_CLOGGING_LATENCY, 0 ); if( randomize && BUGGIFY ) MAX_CLOGGING_LATENCY = 0.1 * deterministicRandom()->random01();
init( MAX_BUGGIFIED_DELAY, 0 ); if( randomize && BUGGIFY ) MAX_BUGGIFIED_DELAY = 0.2 * deterministicRandom()->random01();
init( SIM_CONNECT_ERROR_MODE, deterministicRandom()->randomInt(0,3) );
//Tracefiles
init( ZERO_LENGTH_FILE_PAD, 1 );

View File

@ -157,6 +157,7 @@ public:
double SLOW_NETWORK_LATENCY;
double MAX_CLOGGING_LATENCY;
double MAX_BUGGIFIED_DELAY;
int SIM_CONNECT_ERROR_MODE;
//Tracefiles
int ZERO_LENGTH_FILE_PAD;

View File

@ -44,6 +44,7 @@ enum class TaskPriority {
Coordination = 8800,
FailureMonitor = 8700,
ResolutionMetrics = 8700,
Worker = 8660,
ClusterController = 8650,
MasterTLogRejoin = 8646,
ProxyStorageRejoin = 8645,

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{508FF8D2-273F-4511-B724-344F5F4362A4}'
Id='{51E254F0-440E-4746-B7B3-83051EB87E6B}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'