Merge branch 'release-6.1'

# Conflicts:
#	documentation/sphinx/source/release-notes.rst
#	fdbserver/LogRouter.actor.cpp
#	flow/Trace.cpp
#	versions.target
This commit is contained in:
Evan Tschannen 2019-05-08 18:19:35 -07:00
commit 22499666d0
22 changed files with 366 additions and 75 deletions

View File

@ -20,9 +20,11 @@ elseif(CPACK_GENERATOR MATCHES "PackageMaker")
set(CPACK_PREFLIGHT_SERVER_SCRIPT ${CMAKE_SOURCE_DIR}/packaging/osx/scripts-server/preinstall)
set(CPACK_POSTFLIGHT_SERVER_SCRIPT ${CMAKE_SOURCE_DIR}/packaging/osx/scripts-server/postinstall)
set(CPACK_POSTFLIGHT_CLIENTS_SCRIPT ${CMAKE_SOURCE_DIR}/packaging/osx/scripts-server/preinstall)
set(CPACK_RESOURCE_FILE_README ${CMAKE_SOURCE_DIR}/packaging/osx/resources/conclusion.rtf)
# Commenting out this readme file until it works within packaging
# set(CPACK_RESOURCE_FILE_README ${CMAKE_SOURCE_DIR}/packaging/osx/resources/conclusion.rtf)
set(CPACK_PRODUCTBUILD_RESOURCES_DIR ${CMAKE_SOURCE_DIR}/packaging/osx/resources)
set(CPACK_RESOURCE_FILE_LICENSE ${CMAKE_BINARY_DIR}/License.txt)
# Changing the path of this file as CMAKE_BINARY_DIR does not seem to be defined
set(CPACK_RESOURCE_FILE_LICENSE License.txt)
elseif(CPACK_GENERATOR MATCHES "TGZ")
set(CPACK_STRIP_FILES TRUE)
set(CPACK_COMPONENTS_ALL clients-tgz server-tgz)

View File

@ -171,7 +171,8 @@ list(GET FDB_VERSION_LIST 2 FDB_PATCH)
include(InstallRequiredSystemLibraries)
set(CPACK_PACKAGE_NAME "foundationdb")
set(CPACK_PACKAGE_VENDOR "FoundationDB <fdb-dist@apple.com>")
set(CPACK_PACKAGE_VENDOR "FoundationDB")
set(CPACK_PACKAGE_CONTACT "fdb-dist@apple.com")
set(CPACK_PACKAGE_VERSION_MAJOR ${FDB_MAJOR})
set(CPACK_PACKAGE_VERSION_MINOR ${FDB_MINOR})
set(CPACK_PACKAGE_VERSION_PATCH ${FDB_PATCH})

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.1.4.pkg <https://www.foundationdb.org/downloads/6.1.4/macOS/installers/FoundationDB-6.1.4.pkg>`_
* `FoundationDB-6.1.5.pkg <https://www.foundationdb.org/downloads/6.1.5/macOS/installers/FoundationDB-6.1.5.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.1.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.4/ubuntu/installers/foundationdb-clients_6.1.4-1_amd64.deb>`_
* `foundationdb-server-6.1.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.4/ubuntu/installers/foundationdb-server_6.1.4-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.1.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.5/ubuntu/installers/foundationdb-clients_6.1.5-1_amd64.deb>`_
* `foundationdb-server-6.1.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.5/ubuntu/installers/foundationdb-server_6.1.5-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.1.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.4/rhel6/installers/foundationdb-clients-6.1.4-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.1.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.4/rhel6/installers/foundationdb-server-6.1.4-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.1.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.5/rhel6/installers/foundationdb-clients-6.1.5-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.1.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.5/rhel6/installers/foundationdb-server-6.1.5-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.1.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.4/rhel7/installers/foundationdb-clients-6.1.4-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.1.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.4/rhel7/installers/foundationdb-server-6.1.4-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.1.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.5/rhel7/installers/foundationdb-clients-6.1.5-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.1.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.5/rhel7/installers/foundationdb-server-6.1.5-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.1.4-x64.msi <https://www.foundationdb.org/downloads/6.1.4/windows/installers/foundationdb-6.1.4-x64.msi>`_
* `foundationdb-6.1.5-x64.msi <https://www.foundationdb.org/downloads/6.1.5/windows/installers/foundationdb-6.1.5-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.1.4.tar.gz <https://www.foundationdb.org/downloads/6.1.4/bindings/python/foundationdb-6.1.4.tar.gz>`_
* `foundationdb-6.1.5.tar.gz <https://www.foundationdb.org/downloads/6.1.5/bindings/python/foundationdb-6.1.5.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.1.4.gem <https://www.foundationdb.org/downloads/6.1.4/bindings/ruby/fdb-6.1.4.gem>`_
* `fdb-6.1.5.gem <https://www.foundationdb.org/downloads/6.1.5/bindings/ruby/fdb-6.1.5.gem>`_
Java 8+
-------
* `fdb-java-6.1.4.jar <https://www.foundationdb.org/downloads/6.1.4/bindings/java/fdb-java-6.1.4.jar>`_
* `fdb-java-6.1.4-javadoc.jar <https://www.foundationdb.org/downloads/6.1.4/bindings/java/fdb-java-6.1.4-javadoc.jar>`_
* `fdb-java-6.1.5.jar <https://www.foundationdb.org/downloads/6.1.5/bindings/java/fdb-java-6.1.5.jar>`_
* `fdb-java-6.1.5-javadoc.jar <https://www.foundationdb.org/downloads/6.1.5/bindings/java/fdb-java-6.1.5-javadoc.jar>`_
Go 1.1+
-------

View File

@ -63,7 +63,8 @@
"resolver",
"cluster_controller",
"data_distributor",
"ratekeeper"
"ratekeeper",
"router"
]
},
"data_version":12341234,

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.1.4
6.1.5
=====
Features
@ -75,6 +75,7 @@ Fixes
* Storage servers could not rejoin the cluster when the proxies were saturated. [6.1.4] `(PR #1486) <https://github.com/apple/foundationdb/pull/1486>`_ `(PR #1499) <https://github.com/apple/foundationdb/pull/1499>`_
* The ``configure`` command in ``fdbcli`` returned successfully even when the configuration was not changed for some error types. [6.1.4] `(PR #1509) <https://github.com/apple/foundationdb/pull/1509>`_
* Safety protections in the ``configure`` command in ``fdbcli`` would trigger spuriously when changing between ``three_datacenter`` replication and a region configuration. [6.1.4] `(PR #1509) <https://github.com/apple/foundationdb/pull/1509>`_
* Status could report an incorrect reason for ongoing data movement. [6.1.5] `(PR #1544) <https://github.com/apple/foundationdb/pull/1544>`_
Status
------
@ -122,6 +123,9 @@ Fixes only impacting 6.1.0+
* The ``consistencycheck`` fdbserver role would repeatedly exit. [6.1.1] `(PR #1437) <https://github.com/apple/foundationdb/pull/1437>`_
* The ``consistencycheck`` fdbserver role could proceed at a very slow rate after inserting data into an empty database. [6.1.2] `(PR #1452) <https://github.com/apple/foundationdb/pull/1452>`_
* The background actor which removes redundant teams could leave data unbalanced. [6.1.3] `(PR #1479) <https://github.com/apple/foundationdb/pull/1479>`_
* The transaction log spill-by-reference policy could read too much data from disk. [6.1.5] `(PR #1527) <https://github.com/apple/foundationdb/pull/1527>`_
* Memory tracking trace events could cause the program to crash when called from inside a trace event. [6.1.5] `(PR #1541) <https://github.com/apple/foundationdb/pull/1541>`_
* TLogs will replace a large file with an empty file rather than doing a large truncate operation. [6.1.5] `(PR #1545) <https://github.com/apple/foundationdb/pull/1545>`_
Earlier release notes
---------------------

View File

@ -84,7 +84,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"resolver",
"cluster_controller",
"data_distributor",
"ratekeeper"
"ratekeeper",
"router"
]
},
"data_version":12341234,

View File

@ -104,6 +104,7 @@ set(FDBSERVER_SRCS
workloads/BulkSetup.actor.h
workloads/ChangeConfig.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/TriggerRecovery.actor.cpp
workloads/CommitBugCheck.actor.cpp
workloads/ConfigureDatabase.actor.cpp
workloads/ConflictRange.actor.cpp

View File

@ -270,68 +270,106 @@ public:
Future<Void> truncateFile(int file, int64_t pos) { return truncateFile(this, file, pos); }
Future<Void> push(StringRef pageData, vector<Reference<SyncQueue>>& toSync) {
// FIXME: Merge this function with IAsyncFileSystem::incrementalDeleteFile().
ACTOR static void incrementalTruncate(Reference<IAsyncFile> file) {
state int64_t remainingFileSize = wait( file->size() );
for( ; remainingFileSize > 0; remainingFileSize -= FLOW_KNOBS->INCREMENTAL_DELETE_TRUNCATE_AMOUNT ){
wait(file->truncate(remainingFileSize));
wait(file->sync());
wait(delay(FLOW_KNOBS->INCREMENTAL_DELETE_INTERVAL));
}
TraceEvent("DiskQueueReplaceTruncateEnded").detail("Filename", file->getFilename());
}
ACTOR static Future<Reference<IAsyncFile>> replaceFile(Reference<IAsyncFile> toReplace) {
incrementalTruncate( toReplace );
Reference<IAsyncFile> _replacement = wait( IAsyncFileSystem::filesystem()->open( toReplace->getFilename(), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0 ) );
state Reference<IAsyncFile> replacement = _replacement;
wait( replacement->sync() );
return replacement;
}
Future<Void> push(StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
return push( this, pageData, toSync );
}
ACTOR static Future<Void> push(RawDiskQueue_TwoFiles* self, StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
// Write the given data to the queue files, swapping or extending them if necessary.
// Don't do any syncs, but push the modified file(s) onto toSync.
ASSERT( readingFile == 2 );
ASSERT( self->readingFile == 2 );
ASSERT( pageData.size() % _PAGE_SIZE == 0 );
ASSERT( int64_t(pageData.begin()) % _PAGE_SIZE == 0 );
ASSERT( writingPos % _PAGE_SIZE == 0 );
ASSERT( files[0].size % _PAGE_SIZE == 0 && files[1].size % _PAGE_SIZE == 0 );
ASSERT( self->writingPos % _PAGE_SIZE == 0 );
ASSERT( self->files[0].size % _PAGE_SIZE == 0 && self->files[1].size % _PAGE_SIZE == 0 );
vector<Future<Void>> waitfor;
state vector<Future<Void>> waitfor;
if (pageData.size() + writingPos > files[1].size) {
if ( files[0].popped == files[0].size ) {
// Finish files[1] and swap
int p = files[1].size - writingPos;
if (pageData.size() + self->writingPos > self->files[1].size) {
if ( self->files[0].popped == self->files[0].size ) {
// Finish self->files[1] and swap
int p = self->files[1].size - self->writingPos;
if(p > 0) {
toSync.push_back( files[1].syncQueue );
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
.detail("WritingPos", writingPos).detail("WritingBytes", p);*/
waitfor.push_back( files[1].f->write( pageData.begin(), p, writingPos ) );
toSync->push_back( self->files[1].syncQueue );
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/
waitfor.push_back( self->files[1].f->write( pageData.begin(), p, self->writingPos ) );
pageData = pageData.substr( p );
}
dbg_file0BeginSeq += files[0].size;
std::swap(files[0], files[1]);
std::swap(firstPages[0], firstPages[1]);
files[1].popped = 0;
writingPos = 0;
self->dbg_file0BeginSeq += self->files[0].size;
std::swap(self->files[0], self->files[1]);
std::swap(self->firstPages[0], self->firstPages[1]);
self->files[1].popped = 0;
self->writingPos = 0;
const int64_t activeDataVolume = pageCeiling(files[0].size - files[0].popped + fileExtensionBytes + fileShrinkBytes);
if (files[1].size > activeDataVolume) {
// Either shrink files[1] to the size of files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(files[1].size - activeDataVolume), fileShrinkBytes );
files[1].size -= maxShrink;
waitfor.push_back( files[1].f->truncate( files[1].size ) );
const int64_t activeDataVolume = pageCeiling(self->files[0].size - self->files[0].popped + self->fileExtensionBytes + self->fileShrinkBytes);
const int64_t desiredMaxFileSize = std::max( activeDataVolume, SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES * 2 );
if (self->files[1].size > desiredMaxFileSize) {
// Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(self->files[1].size - desiredMaxFileSize), self->fileShrinkBytes );
if (maxShrink / SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES >
SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_EXTENTS) {
TEST(true); // Replacing DiskQueue file
TraceEvent("DiskQueueReplaceFile", self->dbgid).detail("Filename", self->files[1].f->getFilename()).detail("OldFileSize", self->files[1].size).detail("ElidedTruncateSize", maxShrink);
Reference<IAsyncFile> newFile = wait( replaceFile(self->files[1].f) );
self->files[1].setFile(newFile);
self->files[1].size = 0;
} else {
self->files[1].size -= maxShrink;
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
}
}
} else {
// Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
// Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("ExtensionBytes", fileExtensionBytes);*/
int64_t minExtension = pageData.size() + writingPos - files[1].size;
files[1].size += std::min(std::max(fileExtensionBytes, minExtension), files[0].size+files[1].size+minExtension);
waitfor.push_back( files[1].f->truncate( files[1].size ) );
int64_t minExtension = pageData.size() + self->writingPos - self->files[1].size;
self->files[1].size += std::min(std::max(self->fileExtensionBytes, minExtension), self->files[0].size+self->files[1].size+minExtension);
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
if(fileSizeWarningLimit > 0 && files[1].size > fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", dbgid).suppressFor(1.0).detail("Filename", filename(1)).detail("Size", files[1].size);
if(self->fileSizeWarningLimit > 0 && self->files[1].size > self->fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", self->dbgid).suppressFor(1.0).detail("Filename", self->filename(1)).detail("Size", self->files[1].size);
}
}
}
if (writingPos == 0) {
*firstPages[1] = *(const Page*)pageData.begin();
if (self->writingPos == 0) {
*self->firstPages[1] = *(const Page*)pageData.begin();
}
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
.detail("WritingPos", writingPos).detail("WritingBytes", pageData.size());*/
files[1].size = std::max( files[1].size, writingPos + pageData.size() );
toSync.push_back( files[1].syncQueue );
waitfor.push_back( files[1].f->write( pageData.begin(), pageData.size(), writingPos ) );
writingPos += pageData.size();
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/
self->files[1].size = std::max( self->files[1].size, self->writingPos + pageData.size() );
toSync->push_back( self->files[1].syncQueue );
waitfor.push_back( self->files[1].f->write( pageData.begin(), pageData.size(), self->writingPos ) );
self->writingPos += pageData.size();
return waitForAll(waitfor);
wait( waitForAll(waitfor) );
return Void();
}
ACTOR static UNCANCELLABLE Future<Void> pushAndCommit(RawDiskQueue_TwoFiles* self, StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages) {
@ -358,11 +396,11 @@ public:
TEST( pageData.size() > sizeof(Page) ); // push more than one page of data
Future<Void> pushed = self->push( pageData, syncFiles );
Future<Void> pushed = self->push( pageData, &syncFiles );
pushing.send(Void());
wait( pushed );
ASSERT( syncFiles.size() >= 1 && syncFiles.size() <= 2 );
TEST(2==syncFiles.size()); // push spans both files
wait( pushed );
delete pageMem;
pageMem = 0;

View File

@ -71,8 +71,11 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );
init( DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME, 5.0 );
init( TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES, 2e9 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES = 2e6;
init( TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK, 100 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK = 1;
init( TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH, 16<<10 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH = 500;
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_MAX_TRUNCATE_EXTENTS, 1<<10 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_EXTENTS = 0;
init( TLOG_DEGRADED_DELAY_COUNT, 5 );
init( TLOG_DEGRADED_DURATION, 5.0 );

View File

@ -75,8 +75,11 @@ public:
double DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME;
double DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME;
int64_t TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES;
int64_t TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK;
int64_t TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH;
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
int DISK_QUEUE_MAX_TRUNCATE_EXTENTS;
int TLOG_DEGRADED_DELAY_COUNT;
double TLOG_DEGRADED_DURATION;

View File

@ -86,6 +86,9 @@ struct LogRouterData {
LogSet logSet;
bool foundEpochEnd;
CounterCollection cc;
Future<Void> logger;
std::vector<Reference<TagData>> tag_data; //we only store data for the remote tag locality
Reference<TagData> getTagData(Tag tag) {
@ -103,7 +106,9 @@ struct LogRouterData {
return newTagData;
}
LogRouterData(UID dbgid, const InitializeLogRouterRequest& req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false) {
LogRouterData(UID dbgid, const InitializeLogRouterRequest& req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()),
version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false),
cc("LogRouter", dbgid.toString()) {
//setup just enough of a logSet to be able to call getPushLocations
logSet.logServers.resize(req.tLogLocalities.size());
logSet.tLogPolicy = req.tLogPolicy;
@ -117,6 +122,12 @@ struct LogRouterData {
tagData = createTagData(tag, 0, 0);
}
}
specialCounter(cc, "Version", [this](){return this->version.get(); });
specialCounter(cc, "MinPopped", [this](){return this->minPopped.get(); });
specialCounter(cc, "MinKnownCommittedVersion", [this](){ return this->minKnownCommittedVersion; });
specialCounter(cc, "PoppedVersion", [this](){ return this->poppedVersion; });
logger = traceCounters("LogRouterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "LogRouterMetrics");
}
};

View File

@ -432,6 +432,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id);
specialCounter(cc, "Version", [this](){ return this->version.get(); });
specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); });
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
@ -998,6 +1002,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait( delay(0.0, TaskLowPriority) );
}
if( req.begin <= logData->persistentDataDurableVersion && req.tag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
// txsTag is only ever peeked on recovery, and we would still wish to prioritize requests
// that impact recovery duration.
wait(delay(0, TaskTLogSpilledPeekReply));
}
Version poppedVer = poppedVersion(logData, req.tag);
if(poppedVer > req.begin) {
TLogPeekReply rep;

View File

@ -608,6 +608,24 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
roles.addRole("ratekeeper", db->get().ratekeeper.get());
}
for(auto& tLogSet : db->get().logSystemConfig.tLogs) {
for(auto& it : tLogSet.logRouters) {
if(it.present()) {
roles.addRole("router", it.interf());
}
}
}
for(auto& old : db->get().logSystemConfig.oldTLogs) {
for(auto& tLogSet : old.tLogs) {
for(auto& it : tLogSet.logRouters) {
if(it.present()) {
roles.addRole("router", it.interf());
}
}
}
}
state std::vector<std::pair<MasterProxyInterface, EventMap>>::iterator proxy;
for(proxy = proxies.begin(); proxy != proxies.end(); ++proxy) {
roles.addRole( "proxy", proxy->first, proxy->second );
@ -1154,18 +1172,21 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
TraceEventFields md = dataInfo[2];
// If we have a MovingData message, parse it.
int64_t partitionsInFlight = 0;
int movingHighestPriority = 1000;
if (md.size())
{
int64_t partitionsInQueue = md.getInt64("InQueue");
int64_t partitionsInFlight = md.getInt64("InFlight");
int64_t averagePartitionSize = md.getInt64("AverageShardSize");
partitionsInFlight = md.getInt64("InFlight");
movingHighestPriority = md.getInt("HighestPriority");
if( averagePartitionSize >= 0 ) {
JsonBuilderObject moving_data;
moving_data["in_queue_bytes"] = partitionsInQueue * averagePartitionSize;
moving_data["in_flight_bytes"] = partitionsInFlight * averagePartitionSize;
moving_data.setKeyRawNumber("total_written_bytes",md.getValue("BytesWritten"));
moving_data.setKeyRawNumber("highest_priority",md.getValue("HighestPriority"));
moving_data["highest_priority"] = movingHighestPriority;
// TODO: moving_data["rate_bytes"] = makeCounter(hz, c, r);
statusObjData["moving_data"] = moving_data;
@ -1189,6 +1210,12 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
bool primary = inFlight.getInt("Primary");
int highestPriority = inFlight.getInt("HighestPriority");
if(movingHighestPriority < PRIORITY_TEAM_REDUNDANT) {
highestPriority = movingHighestPriority;
} else if(partitionsInFlight > 0) {
highestPriority = std::max<int>(highestPriority, PRIORITY_MERGE_SHARD);
}
JsonBuilderObject team_tracker;
team_tracker["primary"] = primary;
team_tracker.setKeyRawNumber("in_flight_bytes",inFlight.getValue("TotalBytes"));
@ -1242,6 +1269,10 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
stateSectionObj["name"] = "healthy_removing_server";
stateSectionObj["description"] = "Removing storage server";
}
else if (highestPriority == PRIORITY_TEAM_HEALTHY) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy";
}
else if (highestPriority >= PRIORITY_REBALANCE_SHARD) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy_rebalancing";

View File

@ -489,6 +489,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id);
specialCounter(cc, "Version", [this](){ return this->version.get(); });
specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); });
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
@ -786,6 +790,15 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
lastVersion = std::max(currentVersion, lastVersion);
firstLocation = std::min(begin, firstLocation);
if ((wr.getLength() + sizeof(SpilledData) > SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH) ) {
*(uint32_t*)wr.getData() = refSpilledTagCount;
self->persistentData->set( KeyValueRef( persistTagMessageRefsKey( logData->logId, tagData->tag, lastVersion ), wr.toValue() ) );
tagData->poppedLocation = std::min(tagData->poppedLocation, firstLocation);
refSpilledTagCount = 0;
wr = BinaryWriter( AssumeVersion(logData->protocolVersion) );
wr << uint32_t(0);
}
Future<Void> f = yield(TaskUpdateStorage);
if(!f.isReady()) {
wait(f);
@ -1275,6 +1288,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait( delay(0.0, TaskLowPriority) );
}
if( req.begin <= logData->persistentDataDurableVersion && req.tag != txsTag) {
// Reading spilled data will almost always imply that the storage server is >5s behind the rest
// of the cluster. We shouldn't prioritize spending CPU on helping this server catch up
// slightly faster over keeping the rest of the cluster operating normally.
// txsTag is only ever peeked on recovery, and we would still wish to prioritize requests
// that impact recovery duration.
wait(delay(0, TaskTLogSpilledPeekReply));
}
Version poppedVer = poppedVersion(logData, req.tag);
if(poppedVer > req.begin) {
TLogPeekReply rep;
@ -1336,16 +1358,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
else
messages.serializeBytes( messages2.toValue() );
} else {
// Calculating checksums of read pages is potentially expensive, and storage servers with
// spilled data are likely behind and not contributing usefully to the cluster anyway.
// Thus, we penalize their priority slightly to make sure that commits have a higher priority
// than catching up old storage servers.
wait(delay(0, TaskTLogSpilledPeekReply));
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1))));
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK+1));
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
@ -1354,7 +1372,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
uint32_t mutationBytes = 0;
state uint64_t commitBytes = 0;
state Version firstVersion = std::numeric_limits<Version>::max();
for (auto &kv : kvrefs) {
for (int i = 0; i < kvrefs.size() && i < SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK; i++) {
auto& kv = kvrefs[i];
VectorRef<SpilledData> spilledData;
BinaryReader r(kv.value, AssumeVersion(logData->protocolVersion));
r >> spilledData;
@ -1375,6 +1394,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
if (earlyEnd) break;
}
earlyEnd = earlyEnd || (kvrefs.size() >= SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK+1);
wait( self->peekMemoryLimiter.take(TaskTLogSpilledPeekReply, commitBytes) );
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
state std::vector<Future<Standalone<StringRef>>> messageReads;

View File

@ -104,6 +104,7 @@
<ActorCompiler Include="workloads\AtomicOps.actor.cpp" />
<ActorCompiler Include="workloads\AtomicOpsApiCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\ClientTransactionProfileCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\TriggerRecovery.actor.cpp" />
<ActorCompiler Include="workloads\BackupToDBAbort.actor.cpp" />
<ActorCompiler Include="workloads\BackupToDBCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\BackupToDBUpgrade.actor.cpp" />

View File

@ -228,6 +228,9 @@
<ActorCompiler Include="workloads\ClientTransactionProfileCorrectness.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\TriggerRecovery.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\StatusWorkload.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>

View File

@ -0,0 +1,147 @@
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbclient/Status.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "flow/actorcompiler.h" // has to be last include
struct TriggerRecoveryLoopWorkload : TestWorkload {
double startTime;
int numRecoveries;
double delayBetweenRecoveries;
double killAllProportion;
Optional<int32_t> originalNumOfResolvers;
Optional<int32_t> currentNumOfResolvers;
TriggerRecoveryLoopWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
startTime = getOption(options, LiteralStringRef("startTime"), 0.0);
numRecoveries = getOption(options, LiteralStringRef("numRecoveries"), g_random->randomInt(1, 10));
delayBetweenRecoveries = getOption(options, LiteralStringRef("delayBetweenRecoveries"), 0.0);
killAllProportion = getOption(options, LiteralStringRef("killAllProportion"), 0.1);
ASSERT(numRecoveries > 0 && startTime >= 0 and delayBetweenRecoveries >= 0);
TraceEvent(SevInfo, "TriggerRecoveryLoopSetup")
.detail("StartTime", startTime)
.detail("NumRecoveries", numRecoveries)
.detail("DelayBetweenRecoveries", delayBetweenRecoveries);
}
virtual std::string description() { return "TriggerRecoveryLoop"; }
ACTOR Future<Void> setOriginalNumOfResolvers(Database cx, TriggerRecoveryLoopWorkload* self) {
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
self->originalNumOfResolvers = self->currentNumOfResolvers = config.getDesiredResolvers();
return Void();
}
virtual Future<Void> setup(Database const& cx) {
if (clientId == 0) {
return setOriginalNumOfResolvers(cx, this);
}
return Void();
}
ACTOR Future<Void> returnIfClusterRecovered(Database cx) {
loop {
state ReadYourWritesTransaction tr(cx);
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Version v = wait(tr.getReadVersion());
tr.makeSelfConflicting();
wait(tr.commit());
TraceEvent(SevInfo, "TriggerRecoveryLoop_ClusterVersion").detail("Version", v);
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<Void> changeResolverConfig(Database cx, TriggerRecoveryLoopWorkload* self, bool setToOriginal=false) {
state int32_t numResolversToSet;
if(setToOriginal) {
numResolversToSet = self->originalNumOfResolvers.get();
} else {
numResolversToSet = self->currentNumOfResolvers.get() == self->originalNumOfResolvers.get()
? self->originalNumOfResolvers.get() + 1
: self->originalNumOfResolvers.get();
}
state StringRef configStr(format("resolvers=%d", numResolversToSet));
loop {
Optional<ConfigureAutoResult> conf;
ConfigurationResult::Type r = wait(changeConfig(cx, { configStr }, conf, true));
if (r == ConfigurationResult::SUCCESS) {
self->currentNumOfResolvers = numResolversToSet;
TraceEvent(SevInfo, "TriggerRecoveryLoop_ChangeResolverConfigSuccess").detail("NumOfResolvers", self->currentNumOfResolvers.get());
break;
}
TraceEvent(SevWarn, "TriggerRecoveryLoop_ChangeResolverConfigFailed").detail("Result", r);
wait(delay(1.0));
}
return Void();
}
ACTOR Future<Void> killAll(Database cx) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<RangeResultRef> kvs = wait(tr.getRange(
KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1));
std::map<Key, Value> address_interface;
for (auto it : kvs) {
auto ip_port = it.key.endsWith(LiteralStringRef(":tls"))
? it.key.removeSuffix(LiteralStringRef(":tls"))
: it.key;
address_interface[ip_port] = it.value;
}
for (auto it : address_interface) {
tr.set(LiteralStringRef("\xff\xff/reboot_worker"), it.second);
}
TraceEvent(SevInfo, "TriggerRecoveryLoop_AttempedKillAll");
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> _start(Database cx, TriggerRecoveryLoopWorkload* self) {
wait(delay(self->startTime));
state int numRecoveriesDone = 0;
try {
loop {
if (g_random->random01() < self->killAllProportion) {
wait(self->killAll(cx));
} else {
wait(self->changeResolverConfig(cx, self));
}
numRecoveriesDone++;
TraceEvent(SevInfo, "TriggerRecoveryLoop_AttempedRecovery").detail("RecoveryNum", numRecoveriesDone);
if (numRecoveriesDone == self->numRecoveries) {
break;
}
wait(delay(self->delayBetweenRecoveries));
wait(self->returnIfClusterRecovered(cx));
}
} catch(Error &e) {
// Dummy catch here to give a chance to reset number of resolvers to its original value
}
wait(self->changeResolverConfig(cx, self, true));
return Void();
}
virtual Future<Void> start(Database const& cx) {
if (clientId != 0) return Void();
return _start(cx, this);
}
virtual Future<bool> check(Database const& cx) { return true; }
virtual void getMetrics(vector<PerfMetric>& m) {}
};
WorkloadFactory<TriggerRecoveryLoopWorkload> TriggerRecoveryLoopWorkloadFactory("TriggerRecoveryLoop");

View File

@ -241,7 +241,7 @@ struct ArenaBlock : NonCopyable, ThreadSafeReferenceCounted<ArenaBlock>
b->bigSize = reqSize;
b->bigUsed = sizeof(ArenaBlock);
if(FLOW_KNOBS && g_nondeterministic_random && g_nondeterministic_random->random01() < (reqSize / FLOW_KNOBS->HUGE_ARENA_LOGGING_BYTES)) {
if(FLOW_KNOBS && g_trace_depth == 0 && g_nondeterministic_random && g_nondeterministic_random->random01() < (reqSize / FLOW_KNOBS->HUGE_ARENA_LOGGING_BYTES)) {
hugeArenaSample(reqSize);
}
g_hugeArenaMemory += reqSize;

View File

@ -442,7 +442,7 @@ void FastAllocator<Size>::getMagazine() {
// FIXME: We should be able to allocate larger magazine sizes here if we
// detect that the underlying system supports hugepages. Using hugepages
// with smaller-than-2MiB magazine sizes strands memory. See issue #909.
if(FLOW_KNOBS && g_nondeterministic_random && g_nondeterministic_random->random01() < (magazine_size * Size)/FLOW_KNOBS->FAST_ALLOC_LOGGING_BYTES) {
if(FLOW_KNOBS && g_trace_depth == 0 && g_nondeterministic_random && g_nondeterministic_random->random01() < (magazine_size * Size)/FLOW_KNOBS->FAST_ALLOC_LOGGING_BYTES) {
TraceEvent("GetMagazineSample").detail("Size", Size).backtrace();
}
block = (void **)::allocate(magazine_size * Size, false);

View File

@ -43,6 +43,8 @@
#undef min
#endif
int g_trace_depth = 0;
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
@ -653,15 +655,20 @@ void removeTraceRole(std::string role) {
g_traceLog.removeRole(role);
}
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true) {}
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true) {
g_trace_depth++;
}
TraceEvent::TraceEvent( Severity severity, const char* type, UID id )
: id(id), type(type), severity(severity), initialized(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {}
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
g_trace_depth++;
}
TraceEvent::TraceEvent( TraceInterval& interval, UID id )
: id(id), type(interval.type)
, severity(interval.severity)
, initialized(false)
, enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= interval.severity) {
g_trace_depth++;
init(interval);
}
TraceEvent::TraceEvent( Severity severity, TraceInterval& interval, UID id )
@ -669,6 +676,7 @@ TraceEvent::TraceEvent( Severity severity, TraceInterval& interval, UID id )
severity(severity),
initialized(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
g_trace_depth++;
init(interval);
}
@ -933,6 +941,7 @@ TraceEvent::~TraceEvent() {
TraceEvent(SevError, "TraceEventDestructorError").error(e,true);
}
delete tmpEventMetric;
g_trace_depth--;
}
thread_local bool TraceEvent::networkThread = false;

View File

@ -42,6 +42,8 @@ inline int fastrand() {
//inline static bool TRACE_SAMPLE() { return fastrand()<16; }
inline static bool TRACE_SAMPLE() { return false; }
extern int g_trace_depth;
enum Severity {
SevSample=1,
SevDebug=5,

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{94FC1149-B569-4F0C-886D-3C9D0A83A3E7}'
Id='{32F74616-4B66-4A17-972F-765FF2C03728}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'