Merge branch 'release-6.2' of github.com:apple/foundationdb into feature-redwood

This commit is contained in:
Stephen Atherton 2019-10-29 16:33:22 -07:00
commit 547616d5f1
21 changed files with 204 additions and 83 deletions

View File

@ -18,7 +18,7 @@
# limitations under the License.
cmake_minimum_required(VERSION 3.12)
project(foundationdb
VERSION 6.2.7
VERSION 6.2.8
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
HOMEPAGE_URL "http://www.foundationdb.org/"
LANGUAGES C CXX ASM)

View File

@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* The starting point for accessing FoundationDB.
* <br>
* <h3>Setting API version</h3>
* <h2>Setting API version</h2>
* The FoundationDB API is accessed with a call to {@link #selectAPIVersion(int)}.
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
@ -49,11 +49,11 @@ import java.util.concurrent.atomic.AtomicInteger;
* being used to connect to the cluster. In particular, you should not advance
* the API version of your application after upgrading your client until the
* cluster has also been upgraded.<br>
* <h3>Getting a database</h3>
* <h2>Getting a database</h2>
* Once the API version has been set, the easiest way to get a {@link Database} object to use is
* to call {@link #open}.
* <br>
* <h3>Client networking</h3>
* <h2>Client networking</h2>
* The network is started either implicitly with a call to a variant of {@link #open()}
* or started explicitly with a call to {@link #startNetwork()}.
* <br>

View File

@ -39,7 +39,7 @@ import com.apple.foundationdb.Range;
* the same order in which they would sort in FoundationDB. {@code Tuple}s sort
* first by the first element, then by the second, etc. This makes the tuple layer
* ideal for building a variety of higher-level data models.<br>
* <h3>Types</h3>
* <h2>Types</h2>
* A {@code Tuple} can
* contain byte arrays ({@code byte[]}), {@link String}s, {@link Number}s, {@link UUID}s,
* {@code boolean}s, {@link List}s, {@link Versionstamp}s, other {@code Tuple}s, and {@code null}.
@ -50,7 +50,7 @@ import com.apple.foundationdb.Range;
* a {@code long} integral value, so the range will be constrained to
* [{@code -2^63}, {@code 2^63-1}]. Note that for numbers outside this range the way that Java
* truncates integral values may yield unexpected results.<br>
* <h3>{@code null} values</h3>
* <h2>{@code null} values</h2>
* The FoundationDB tuple specification has a special type-code for {@code None}; {@code nil}; or,
* as Java would understand it, {@code null}.
* The behavior of the layer in the presence of {@code null} varies by type with the intention

View File

@ -2,7 +2,7 @@
<BODY>
This documents the client API for using FoundationDB from Java.<br>
<br>
<h3>Installation</h3>
<h1>Installation</h1>
FoundationDB's Java bindings rely on native libraries that are installed as part of the
FoundationDB client binaries installation (see
<a href="/foundationdb/api-general.html#installing-client-binaries" target="_blank">
@ -10,7 +10,7 @@ Installing FoundationDB client binaries</a>). The JAR can be downloaded from
<a href="https://www.foundationdb.org/download/">our website</a>
and then added to your classpath.<br>
<br>
<h3>Getting started</h3>
<h1>Getting started</h1>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 510} and {@code 620}).
@ -50,7 +50,7 @@ public class Example {
}
}
</pre>
<h3>FoundationDB {@link com.apple.foundationdb.tuple Tuple API}</h3>
<h1>FoundationDB {@link com.apple.foundationdb.tuple Tuple API}</h1>
The {@link com.apple.foundationdb.tuple Tuple API} is provided with the core Java API for FoundationDB.
This layer is provided in some form in all official language bindings. It enables
cross-language support for storing and retrieving typed data from the
@ -60,7 +60,7 @@ binary data that FoundationDB supports. And, just as importantly, data packed in
and <a href="/foundationdb/data-modeling.html#data-modeling-tuples">general Tuple documentation</a>
for information about how Tuples sort and can be used to efficiently model data.
<br>
<h3>FoundationDB {@link com.apple.foundationdb.directory Directory API}</h3>
<h1>FoundationDB {@link com.apple.foundationdb.directory Directory API}</h1>
The {@link com.apple.foundationdb.directory Directory API} is provided with the core
Java API for FoundationDB. This layer is provided in some form in all official
language bindings. The FoundationDB API provides directories as a tool for

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.6.pkg <https://www.foundationdb.org/downloads/6.2.6/macOS/installers/FoundationDB-6.2.6.pkg>`_
* `FoundationDB-6.2.7.pkg <https://www.foundationdb.org/downloads/6.2.7/macOS/installers/FoundationDB-6.2.7.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.6-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.6/ubuntu/installers/foundationdb-clients_6.2.6-1_amd64.deb>`_
* `foundationdb-server-6.2.6-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.6/ubuntu/installers/foundationdb-server_6.2.6-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.7-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.7/ubuntu/installers/foundationdb-clients_6.2.7-1_amd64.deb>`_
* `foundationdb-server-6.2.7-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.7/ubuntu/installers/foundationdb-server_6.2.7-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.6-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.6/rhel6/installers/foundationdb-clients-6.2.6-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.6-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.6/rhel6/installers/foundationdb-server-6.2.6-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.7-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel6/installers/foundationdb-clients-6.2.7-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.7-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel6/installers/foundationdb-server-6.2.7-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.6-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.6/rhel7/installers/foundationdb-clients-6.2.6-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.6-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.6/rhel7/installers/foundationdb-server-6.2.6-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.7-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel7/installers/foundationdb-clients-6.2.7-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.7-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel7/installers/foundationdb-server-6.2.7-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.6-x64.msi <https://www.foundationdb.org/downloads/6.2.6/windows/installers/foundationdb-6.2.6-x64.msi>`_
* `foundationdb-6.2.7-x64.msi <https://www.foundationdb.org/downloads/6.2.7/windows/installers/foundationdb-6.2.7-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.6.tar.gz <https://www.foundationdb.org/downloads/6.2.6/bindings/python/foundationdb-6.2.6.tar.gz>`_
* `foundationdb-6.2.7.tar.gz <https://www.foundationdb.org/downloads/6.2.7/bindings/python/foundationdb-6.2.7.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.6.gem <https://www.foundationdb.org/downloads/6.2.6/bindings/ruby/fdb-6.2.6.gem>`_
* `fdb-6.2.7.gem <https://www.foundationdb.org/downloads/6.2.7/bindings/ruby/fdb-6.2.7.gem>`_
Java 8+
-------
* `fdb-java-6.2.6.jar <https://www.foundationdb.org/downloads/6.2.6/bindings/java/fdb-java-6.2.6.jar>`_
* `fdb-java-6.2.6-javadoc.jar <https://www.foundationdb.org/downloads/6.2.6/bindings/java/fdb-java-6.2.6-javadoc.jar>`_
* `fdb-java-6.2.7.jar <https://www.foundationdb.org/downloads/6.2.7/bindings/java/fdb-java-6.2.7.jar>`_
* `fdb-java-6.2.7-javadoc.jar <https://www.foundationdb.org/downloads/6.2.7/bindings/java/fdb-java-6.2.7-javadoc.jar>`_
Go 1.11+
--------

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.2.6
6.2.7
=====
Performance
@ -39,7 +39,6 @@ Fixes
* File descriptors opened by clients and servers set close-on-exec, if available on the platform. `(PR #1581) <https://github.com/apple/foundationdb/pull/1581>`_.
* ``fdbrestore`` commands other than ``start`` required a default cluster file to be found but did not actually use it. `(PR #1912) <https://github.com/apple/foundationdb/pull/1912>`_.
* Unneeded network connections were not being closed because peer reference counts were handled improperly. `(PR #1768) <https://github.com/apple/foundationdb/pull/1768>`_.
* Under certain conditions, cross region replication could stall for 10 minute periods. `(PR #1818) <https://github.com/apple/foundationdb/pull/1818>`_.
* In very rare scenarios, master recovery would restart because system metadata was loaded incorrectly. `(PR #1919) <https://github.com/apple/foundationdb/pull/1919>`_.
* Ratekeeper will aggressively throttle when unable to fetch the list of storage servers for a considerable period of time. `(PR #1858) <https://github.com/apple/foundationdb/pull/1858>`_.
* Proxies could become overloaded when all storage servers on a team fail. [6.2.1] `(PR #1976) <https://github.com/apple/foundationdb/pull/1976>`_.
@ -58,6 +57,10 @@ Fixes
* Committing transactions larger than 1 MB could cause the proxy to stall for up to a second. [6.2.6] `(PR #2250) <https://github.com/apple/foundationdb/pull/2250>`_.
* The cluster controller could become saturated in clusters with large numbers of connected clients using TLS. [6.2.6] `(PR #2252) <https://github.com/apple/foundationdb/pull/2252>`_.
* Backup and DR would not share a mutation stream if they were started on different versions of FoundationDB. Either backup or DR must be restarted to resolve this issue. [6.2.6] `(PR #2202) <https://github.com/apple/foundationdb/pull/2202>`_.
* Don't track batch priority GRV requests in latency bands. [6.2.7] `(PR #2279) <https://github.com/apple/foundationdb/pull/2279>`_.
* Transaction log processes used twice their normal memory when switching spill types. [6.2.7] `(PR #2256) <https://github.com/apple/foundationdb/pull/2256>`_.
* Under certain conditions, cross region replication could stall for 10 minute periods. [6.2.7] `(PR #1818) <https://github.com/apple/foundationdb/pull/1818>`_ `(PR #2276) <https://github.com/apple/foundationdb/pull/2276>`_.
* When dropping a remote region from the configuration after processes in the region have failed, data distribution would create teams from the dead servers for one minute. [6.2.7] `(PR #2286) <https://github.com/apple/foundationdb/pull/1818>`_.
Status
------
@ -134,6 +137,7 @@ Fixes only impacting 6.2.0+
* A storage server could crash if it took longer than 10 minutes to fetch a key range from another server. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* Excluding or including servers would restart the data distributor. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* The data distributor could read invalid memory when estimating database size. [6.2.6] `(PR #2225) <https://github.com/apple/foundationdb/pull/2225>`_.
* Status could incorrectly report that backup and DR were not sharing a mutation stream. [6.2.7] `(PR #2274) <https://github.com/apple/foundationdb/pull/2274>`_.
Earlier release notes
---------------------

View File

@ -45,7 +45,8 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
public:
static void init() {
if (eio_init( &eio_want_poll, NULL )) {
eio_set_max_parallel(FLOW_KNOBS->EIO_MAX_PARALLELISM);
if (eio_init( &eio_want_poll, NULL )) {
TraceEvent("EioInitError").detail("ErrorNo", errno);
throw platform_error();
}
@ -246,6 +247,9 @@ private:
if( flags & OPEN_READONLY ) oflags |= O_RDONLY;
if( flags & OPEN_READWRITE ) oflags |= O_RDWR;
if( flags & OPEN_ATOMIC_WRITE_AND_CREATE ) oflags |= O_TRUNC;
#if defined(__linux__)
if ( flags & OPEN_UNBUFFERED && FLOW_KNOBS->EIO_USE_ODIRECT ) oflags |= O_DIRECT;
#endif
return oflags;
}

View File

@ -59,9 +59,10 @@ Future< Reference<class IAsyncFile> > Net2FileSystem::open( std::string filename
Future<Reference<IAsyncFile>> f;
#ifdef __linux__
// In the vast majority of cases, we wish to use Kernel AIO. However, some systems
// dont properly support dont properly support kernel async I/O without O_DIRECT
// or AIO at all. In such cases, DISABLE_POSIX_KERNEL_AIO knob can be enabled to fallback to
// EIO instead of Kernel AIO.
// dont properly support kernel async I/O without O_DIRECT or AIO at all. In such
// cases, DISABLE_POSIX_KERNEL_AIO knob can be enabled to fallback to EIO instead
// of Kernel AIO. And EIO_USE_ODIRECT can be used to turn on or off O_DIRECT within
// EIO.
if ((flags & IAsyncFile::OPEN_UNBUFFERED) && !(flags & IAsyncFile::OPEN_NO_AIO) &&
!FLOW_KNOBS->DISABLE_POSIX_KERNEL_AIO)
f = AsyncFileKAIO::open(filename, flags, mode, NULL);

View File

@ -3175,8 +3175,21 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
}
}
//Returns the KeyValueStoreType of server if it is different from self->storeType
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
//Returns if the KeyValueStoreType of server is different from self->storeType or the desired datacenter does not match
ACTOR Future<Void> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
if ((!self->includedDCs.empty() &&
std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) ==
self->includedDCs.end()) ||
(!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality))) {
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", "?")
.detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy,
server->lastKnownInterface.locality));
return Void();
}
state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(TaskPriority::DataDistribution)));
if (type == self->configuration.storageServerStoreType &&
(self->includedDCs.empty() ||
@ -3186,7 +3199,14 @@ ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self,
wait(Future<Void>(Never()));
}
return type;
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", type.toString())
.detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy,
server->lastKnownInterface.locality));
return Void();
}
ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams ) {
@ -3302,7 +3322,7 @@ ACTOR Future<Void> storageServerTracker(
state Future<Void> metricsTracker = serverMetricsPolling( server );
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
state Future<Void> storeTracker = keyValueStoreTypeTracker( self, server );
state bool hasWrongStoreTypeOrDC = false;
state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2;
@ -3527,13 +3547,7 @@ ACTOR Future<Void> storageServerTracker(
when( wait( otherChanges.empty() ? Never() : quorum( otherChanges, 1 ) ) ) {
TraceEvent("SameAddressChangedStatus", self->distributorId).detail("ServerID", server->id);
}
when( KeyValueStoreType type = wait( storeTracker ) ) {
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", type.toString())
.detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy,
server->lastKnownInterface.locality));
when( wait( storeTracker ) ) {
TEST(true); //KeyValueStore type changed
storeTracker = Never();

View File

@ -152,6 +152,12 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
}
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
throw timed_out();
}
} else if (self->futureResults.size() == 1) {
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;
} else if (self->futureResults.size() == 0) {
return Void();
}
@ -985,8 +991,16 @@ void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) {
}
ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference<ILogSystem::IPeekCursor> cursor, Version maxVersion, TaskPriority taskID ) {
if(cursor->version().version >= maxVersion) {
return Void();
}
loop {
wait(yield());
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
if(cursor->version().version >= maxVersion) {
return Void();
}
@ -997,11 +1011,6 @@ ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Refe
return Void();
}
}
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
}
}

View File

@ -1131,7 +1131,9 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
GetReadVersionReply reply = wait(replyFuture);
double end = timer();
for(GetReadVersionRequest const& request : requests) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime());
if(request.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime());
}
request.reply.send(reply);
}

View File

@ -875,6 +875,9 @@ namespace oldTLog_4_6 {
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) {
throw timed_out();
}
if(sequence > 0) {
auto& trackerData = self->peekTracker[peekId];
trackerData.lastUpdate = now();

View File

@ -262,6 +262,7 @@ struct TLogData : NonCopyable {
int64_t instanceID;
int64_t bytesInput;
int64_t bytesDurable;
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
@ -288,7 +289,7 @@ struct TLogData : NonCopyable {
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
{
@ -697,7 +698,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
state FlowLock::Releaser commitLockReleaser;
if(logData->stopped) {
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
while(logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0;
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@ -742,7 +743,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
} else {
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
{
totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr;
@ -1036,6 +1037,9 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw timed_out();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
@ -2312,8 +2316,18 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
return Void();
}
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
wait(delay(10));
if (activeSharedTLog->get() != tlogId) {
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
// and QuietDatabase will hang thinking our TLog is behind.
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
}
return Void();
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded) {
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -2334,6 +2348,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
self.sharedActors.send( commitQueue(&self) );
self.sharedActors.send( updateStorageLoop(&self) );
state Future<Void> activeSharedChange = Void();
loop {
choose {
@ -2346,6 +2361,14 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
}
when ( wait( error ) ) { throw internal_error(); }
when ( wait( activeSharedChange ) ) {
if (activeSharedTLog->get() == tlogId) {
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
}
activeSharedChange = activeSharedTLog->onChange();
}
}
}
} catch (Error& e) {

View File

@ -312,6 +312,7 @@ struct TLogData : NonCopyable {
int64_t instanceID;
int64_t bytesInput;
int64_t bytesDurable;
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
@ -339,7 +340,7 @@ struct TLogData : NonCopyable {
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
@ -952,7 +953,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
state FlowLock::Releaser commitLockReleaser;
if(logData->stopped) {
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
while(logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0;
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@ -1000,10 +1001,12 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
if(logData->version_sizes.empty()) {
nextVersion = logData->version.get();
} else {
// Double check that a running TLog wasn't wrongly affected by spilling locked SharedTLogs.
ASSERT_WE_THINK(self->targetVolatileBytes == SERVER_KNOBS->TLOG_SPILL_THRESHOLD);
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
{
totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr;
@ -1337,6 +1340,9 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
throw timed_out();
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
@ -2593,20 +2599,10 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
}
}
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
DUMPTOKEN( recruited.popMessages );
DUMPTOKEN( recruited.commit );
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
void stopAllTLogs( TLogData* self, UID newLogId ) {
for(auto it : self->id_data) {
if( !it.second->stopped ) {
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", recruited.id()).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0);
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", newLogId).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0);
if(!it.second->isPrimary && it.second->logSystem->get()) {
it.second->removed = it.second->removed && it.second->logSystem->get()->endEpoch();
}
@ -2620,6 +2616,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
}
it.second->stopCommit.trigger();
}
}
// Start the tLog role for a worker
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
DUMPTOKEN( recruited.popMessages );
DUMPTOKEN( recruited.commit );
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
stopAllTLogs(self, recruited.id());
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) );
self->id_data[recruited.id()] = logData;
@ -2736,8 +2747,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
return Void();
}
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
wait(delay(10));
if (activeSharedTLog->get() != tlogId) {
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
// and QuietDatabase will hang thinking our TLog is behind.
TraceEvent("SharedTLogBeginSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
} else {
TraceEvent("SharedTLogSkipSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
}
return Void();
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded ) {
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -2758,6 +2782,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
self.sharedActors.send( commitQueue(&self) );
self.sharedActors.send( updateStorageLoop(&self) );
state Future<Void> activeSharedChange = Void();
loop {
choose {
@ -2770,6 +2795,17 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
}
when ( wait( error ) ) { throw internal_error(); }
when ( wait( activeSharedChange ) ) {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
stopAllTLogs(&self, tlogId);
TraceEvent("SharedTLogQueueSpilling", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
}
activeSharedChange = activeSharedTLog->onChange();
}
}
}
} catch (Error& e) {

View File

@ -1290,10 +1290,6 @@ public:
return closedPromise.getFuture();
}
Future<Void> onClose() override {
return closedPromise.getFuture();
}
StorageBytes getStorageBytes() override {
ASSERT(recoverFuture.isReady());
int64_t free;

View File

@ -445,7 +445,9 @@ ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMaste
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded); // changes tli->id() to be the recovered ID
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo);
@ -467,7 +469,8 @@ namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded);
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
}
typedef decltype(&tLog) TLogFn;

View File

@ -754,6 +754,17 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
}
struct SharedLogsValue {
Future<Void> actor = Void();
UID uid = UID();
PromiseStream<InitializeTLogRequest> requests;
SharedLogsValue() = default;
SharedLogsValue( Future<Void> actor, UID uid, PromiseStream<InitializeTLogRequest> requests )
: actor(actor), uid(uid), requests(requests) {
}
};
ACTOR Future<Void> workerServer(
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
@ -782,7 +793,9 @@ ACTOR Future<Void> workerServer(
// decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3.
state std::map<std::tuple<TLogVersion, KeyValueStoreType::StoreType, TLogSpillType>,
std::pair<Future<Void>, PromiseStream<InitializeTLogRequest>>> sharedLogs;
SharedLogsValue> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf( locality );
@ -899,13 +912,15 @@ ACTOR Future<Void> workerServer(
auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded );
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded, activeSharedTLog );
recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID);
tl = handleIOErrors( tl, kv, s.storeID );
tl = handleIOErrors( tl, queue, s.storeID );
if(!logData.first.isValid() || logData.first.isReady()) {
logData.first = oldLog.getFuture() || tl;
if(!logData.actor.isValid() || logData.actor.isReady()) {
logData.actor = oldLog.getFuture() || tl;
logData.uid = s.storeID;
}
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl ) );
}
@ -1045,8 +1060,8 @@ ACTOR Future<Void> workerServer(
TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[std::make_tuple(req.logVersion, req.storeType, req.spillType)];
logData.second.send(req);
if(!logData.first.isValid() || logData.first.isReady()) {
logData.requests.send(req);
if(!logData.actor.isValid() || logData.actor.isReady()) {
UID logId = deterministicRandom()->randomUniqueID();
std::map<std::string, std::string> details;
details["ForMaster"] = req.recruitmentID.shortString();
@ -1063,11 +1078,14 @@ ACTOR Future<Void> workerServer(
filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() );
logData.first = tLogFn( data, queue, dbInfo, locality, logData.second, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded );
logData.first = handleIOErrors( logData.first, data, logId );
logData.first = handleIOErrors( logData.first, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, logData.first ) );
Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog );
tLogCore = handleIOErrors( tLogCore, data, logId );
tLogCore = handleIOErrors( tLogCore, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore ) );
logData.actor = tLogCore;
logData.uid = logId;
}
activeSharedTLog->set(logData.uid);
}
when( InitializeStorageRequest req = waitNext(interf.storage.getFuture()) ) {
if( !storageCache.exists( req.reqId ) ) {

View File

@ -85,6 +85,10 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( CACHE_EVICTION_POLICY, "random" );
init( PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION, 0.1 ); if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 0.0; else if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 1.0;
//AsyncFileEIO
init( EIO_MAX_PARALLELISM, 4 );
init( EIO_USE_ODIRECT, 0 );
//AsyncFileKAIO
init( MAX_OUTSTANDING, 64 );
init( MIN_SUBMIT, 10 );

View File

@ -105,6 +105,10 @@ public:
double TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY;
int TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT;
//AsyncFileEIO
int EIO_MAX_PARALLELISM;
int EIO_USE_ODIRECT;
//AsyncFileKAIO
int MAX_OUTSTANDING;
int MIN_SUBMIT;

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{B69CF2EA-9CDC-4373-83E6-3615F9AE393B}'
Id='{2E1D0D76-FEF8-4874-A023-51FFE70475E7}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'

View File

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Version>6.2.7</Version>
<Version>6.2.8</Version>
<PackageName>6.2</PackageName>
</PropertyGroup>
</Project>