Merge pull request #2323 from etschannen/feature-efficient-buffered-cursor

remote logs use bufferedCursor when peeking from log routers
This commit is contained in:
Evan Tschannen 2019-11-05 20:45:15 -08:00 committed by GitHub
commit a11db961cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 161 additions and 111 deletions

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.7.pkg <https://www.foundationdb.org/downloads/6.2.7/macOS/installers/FoundationDB-6.2.7.pkg>`_
* `FoundationDB-6.2.8.pkg <https://www.foundationdb.org/downloads/6.2.8/macOS/installers/FoundationDB-6.2.8.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.7-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.7/ubuntu/installers/foundationdb-clients_6.2.7-1_amd64.deb>`_
* `foundationdb-server-6.2.7-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.7/ubuntu/installers/foundationdb-server_6.2.7-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.8/ubuntu/installers/foundationdb-clients_6.2.8-1_amd64.deb>`_
* `foundationdb-server-6.2.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.8/ubuntu/installers/foundationdb-server_6.2.8-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.7-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel6/installers/foundationdb-clients-6.2.7-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.7-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel6/installers/foundationdb-server-6.2.7-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel6/installers/foundationdb-clients-6.2.8-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel6/installers/foundationdb-server-6.2.8-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.7-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel7/installers/foundationdb-clients-6.2.7-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.7-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.7/rhel7/installers/foundationdb-server-6.2.7-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel7/installers/foundationdb-clients-6.2.8-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.8/rhel7/installers/foundationdb-server-6.2.8-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.7-x64.msi <https://www.foundationdb.org/downloads/6.2.7/windows/installers/foundationdb-6.2.7-x64.msi>`_
* `foundationdb-6.2.8-x64.msi <https://www.foundationdb.org/downloads/6.2.8/windows/installers/foundationdb-6.2.8-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.7.tar.gz <https://www.foundationdb.org/downloads/6.2.7/bindings/python/foundationdb-6.2.7.tar.gz>`_
* `foundationdb-6.2.8.tar.gz <https://www.foundationdb.org/downloads/6.2.8/bindings/python/foundationdb-6.2.8.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.7.gem <https://www.foundationdb.org/downloads/6.2.7/bindings/ruby/fdb-6.2.7.gem>`_
* `fdb-6.2.8.gem <https://www.foundationdb.org/downloads/6.2.8/bindings/ruby/fdb-6.2.8.gem>`_
Java 8+
-------
* `fdb-java-6.2.7.jar <https://www.foundationdb.org/downloads/6.2.7/bindings/java/fdb-java-6.2.7.jar>`_
* `fdb-java-6.2.7-javadoc.jar <https://www.foundationdb.org/downloads/6.2.7/bindings/java/fdb-java-6.2.7-javadoc.jar>`_
* `fdb-java-6.2.8.jar <https://www.foundationdb.org/downloads/6.2.8/bindings/java/fdb-java-6.2.8.jar>`_
* `fdb-java-6.2.8-javadoc.jar <https://www.foundationdb.org/downloads/6.2.8/bindings/java/fdb-java-6.2.8-javadoc.jar>`_
Go 1.11+
--------

View File

@ -8,7 +8,7 @@ Release Notes
Fixes
-----
* Significantly improved the rate at which the transaction logs in a remote region can pull data from the primary region. `(PR #2307) <https://github.com/apple/foundationdb/pull/2307>`_.
* Significantly improved the rate at which the transaction logs in a remote region can pull data from the primary region. `(PR #2307) <https://github.com/apple/foundationdb/pull/2307>`_ `(PR #2323) <https://github.com/apple/foundationdb/pull/2323>`_.
* The ``system_kv_size_bytes`` status field could report a size much larger than the actual size of the system keyspace. `(PR #2305) <https://github.com/apple/foundationdb/pull/2305>`_.
6.2.7

View File

@ -110,10 +110,10 @@ enum { txsTagOld = -1, invalidTagOld = -100 };
struct TagsAndMessage {
StringRef message;
std::vector<Tag> tags;
VectorRef<Tag> tags;
TagsAndMessage() {}
TagsAndMessage(StringRef message, const std::vector<Tag>& tags) : message(message), tags(tags) {}
TagsAndMessage(StringRef message, VectorRef<Tag> tags) : message(message), tags(tags) {}
};
struct KeyRangeRef;

View File

@ -67,7 +67,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
init( MULTI_CURSOR_PRE_FETCH_LIMIT, 10 );
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
init( VERSIONS_PER_BATCH, VERSIONS_PER_SECOND/20 ); if( randomize && BUGGIFY ) VERSIONS_PER_BATCH = std::max<int64_t>(1,VERSIONS_PER_SECOND/1000);
init( DESIRED_OUTSTANDING_MESSAGES, 5000 ); if( randomize && BUGGIFY ) DESIRED_OUTSTANDING_MESSAGES = deterministicRandom()->randomInt(0,100);
init( DESIRED_GET_MORE_DELAY, 0.005 );
init( CONCURRENT_LOG_ROUTER_READS, 1 );
init( LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED, 1 ); if( randomize && BUGGIFY ) LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED = 0;
init( DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME, 1.0 );

View File

@ -70,7 +70,8 @@ public:
int PARALLEL_GET_MORE_REQUESTS;
int MULTI_CURSOR_PRE_FETCH_LIMIT;
int64_t MAX_QUEUE_COMMIT_BYTES;
int64_t VERSIONS_PER_BATCH;
int DESIRED_OUTSTANDING_MESSAGES;
double DESIRED_GET_MORE_DELAY;
int CONCURRENT_LOG_ROUTER_READS;
int LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED; // 0==peek from primary, non-zero==peek from satellites
double DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME;

View File

@ -245,6 +245,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
state Version ver = 0;
state std::vector<TagsAndMessage> messages;
state Arena arena;
while (true) {
state bool foundMessage = r->hasMessage();
if (!foundMessage || r->version().version != ver) {
@ -260,6 +261,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
lastVer = ver;
ver = r->version().version;
messages.clear();
arena = Arena();
if (!foundMessage) {
ver--; //ver is the next possible version we will get data for
@ -277,8 +279,9 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
tagAndMsg.message = r->getMessageWithTags();
tags.clear();
self->logSet.getPushLocations(r->getTags(), tags, 0);
tagAndMsg.tags.reserve(arena, tags.size());
for (const auto& t : tags) {
tagAndMsg.tags.emplace_back(tagLocalityRemoteLog, t);
tagAndMsg.tags.push_back(arena, Tag(tagLocalityRemoteLog, t));
}
messages.push_back(std::move(tagAndMsg));

View File

@ -231,7 +231,7 @@ public:
return resultEntries.size() == 0;
}
void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, int locationOffset,
void getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, int locationOffset,
bool allLocations = false) {
if(locality == tagLocalitySatellite) {
for(auto& t : tags) {
@ -309,7 +309,7 @@ struct ILogSystem {
//pre: only callable if hasMessage() returns true
//return the tags associated with the message for the current sequence
virtual const std::vector<Tag>& getTags() = 0;
virtual VectorRef<Tag> getTags() = 0;
//pre: only callable if hasMessage() returns true
//returns the arena containing the contents of getMessage(), getMessageWithTags(), and reader()
@ -382,7 +382,7 @@ struct ILogSystem {
LogMessageVersion messageVersion, end;
Version poppedVersion;
int32_t messageLength, rawLength;
std::vector<Tag> tags;
VectorRef<Tag> tags;
bool hasMsg;
Future<Void> more;
UID randomID;
@ -405,7 +405,7 @@ struct ILogSystem {
virtual void nextMessage();
virtual StringRef getMessage();
virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags();
virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed();
@ -438,6 +438,7 @@ struct ILogSystem {
bool hasNextMessage;
UID randomID;
int tLogReplicationFactor;
Future<Void> more;
MergedPeekCursor( std::vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin );
MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector<LocalityData> const& tLogLocalities, Reference<IReplicationPolicy> const tLogPolicy, int tLogReplicationFactor );
@ -453,7 +454,7 @@ struct ILogSystem {
virtual void nextMessage();
virtual StringRef getMessage();
virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags();
virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed();
@ -484,6 +485,7 @@ struct ILogSystem {
bool hasNextMessage;
bool useBestSet;
UID randomID;
Future<Void> more;
SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore );
SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, std::vector< std::vector< Reference<IPeekCursor> > > const& serverCursors, LogMessageVersion const& messageVersion, int bestSet, int bestServer, Optional<LogMessageVersion> nextVersion, bool useBestSet );
@ -498,7 +500,7 @@ struct ILogSystem {
virtual void nextMessage();
virtual StringRef getMessage();
virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags();
virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed();
@ -532,7 +534,7 @@ struct ILogSystem {
virtual void nextMessage();
virtual StringRef getMessage();
virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags();
virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed();
@ -555,12 +557,12 @@ struct ILogSystem {
struct BufferedMessage {
Arena arena;
StringRef message;
std::vector<Tag> tags;
VectorRef<Tag> tags;
LogMessageVersion version;
BufferedMessage() {}
explicit BufferedMessage( Version version ) : version(version) {}
BufferedMessage( Arena arena, StringRef message, const std::vector<Tag>& tags, const LogMessageVersion& version ) : arena(arena), message(message), tags(tags), version(version) {}
BufferedMessage( Arena arena, StringRef message, const VectorRef<Tag>& tags, const LogMessageVersion& version ) : arena(arena), message(message), tags(tags), version(version) {}
bool operator < (BufferedMessage const& r) const {
return version < r.version;
@ -572,23 +574,28 @@ struct ILogSystem {
};
std::vector<Reference<IPeekCursor>> cursors;
std::vector<Deque<BufferedMessage>> cursorMessages;
std::vector<BufferedMessage> messages;
int messageIndex;
LogMessageVersion messageVersion;
Version end;
bool hasNextMessage;
bool withTags;
bool knownUnique;
Version minKnownCommittedVersion;
Version poppedVersion;
Version initialPoppedVersion;
bool canDiscardPopped;
Future<Void> more;
int targetQueueSize;
UID randomID;
//FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade.
bool collectTags;
std::vector<Tag> tags;
void combineMessages();
BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped );
BufferedCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore );
virtual Reference<IPeekCursor> cloneNoMore();
virtual void setProtocolVersion( ProtocolVersion version );
@ -598,7 +605,7 @@ struct ILogSystem {
virtual void nextMessage();
virtual StringRef getMessage();
virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags();
virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed();
@ -644,7 +651,7 @@ struct ILogSystem {
// Returns when the preceding changes are durable. (Later we will need multiple return signals for diffferent durability levels)
// If the current epoch has ended, push will not return, and the pushed messages will not be visible in any subsequent epoch (but may become visible in this epoch)
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Tag tag, bool parallelGetMore = false ) = 0;
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Optional<Version> end, Tag tag, bool parallelGetMore = false ) = 0;
// Returns (via cursor interface) a stream of messages with the given tag and message versions >= (begin, 0), ordered by message version
// If pop was previously or concurrently called with upTo > begin, the cursor may not return all such messages. In that case cursor->popped() will
// be greater than begin to reflect that.
@ -710,7 +717,11 @@ struct ILogSystem {
virtual Future<Void> onLogSystemConfigChange() = 0;
// Returns when the log system configuration has changed due to a tlog rejoin.
virtual void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, bool allLocations = false) = 0;
virtual void getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, bool allLocations = false) = 0;
void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, bool allLocations = false) {
getPushLocations(VectorRef<Tag>((Tag*)&tags.front(), tags.size()), locations, allLocations);
}
virtual bool hasRemoteLogs() const = 0;

View File

@ -92,10 +92,7 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
uint16_t tagCount;
rd.checkpoint();
rd >> messageLength >> messageVersion.sub >> tagCount;
tags.resize(tagCount);
for(int i = 0; i < tagCount; i++) {
rd >> tags[i];
}
tags = VectorRef<Tag>((Tag*)rd.readBytes(tagCount*sizeof(Tag)), tagCount);
rawLength = messageLength + sizeof(messageLength);
messageLength -= (sizeof(messageVersion.sub) + sizeof(tagCount) + tagCount*sizeof(Tag));
hasMsg = true;
@ -112,7 +109,7 @@ StringRef ILogSystem::ServerPeekCursor::getMessageWithTags() {
return StringRef( (uint8_t const*)rd.readBytes(rawLength), rawLength);
}
const std::vector<Tag>& ILogSystem::ServerPeekCursor::getTags() {
VectorRef<Tag> ILogSystem::ServerPeekCursor::getTags() {
return tags;
}
@ -438,7 +435,7 @@ StringRef ILogSystem::MergedPeekCursor::getMessageWithTags() {
return serverCursors[currentCursor]->getMessageWithTags();
}
const std::vector<Tag>& ILogSystem::MergedPeekCursor::getTags() {
VectorRef<Tag> ILogSystem::MergedPeekCursor::getTags() {
return serverCursors[currentCursor]->getTags();
}
@ -477,6 +474,10 @@ ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMess
}
Future<Void> ILogSystem::MergedPeekCursor::getMore(TaskPriority taskID) {
if( more.isValid() && !more.isReady() ) {
return more;
}
if(!serverCursors.size())
return Never();
@ -490,7 +491,8 @@ Future<Void> ILogSystem::MergedPeekCursor::getMore(TaskPriority taskID) {
if (version() > startVersion)
return Void();
return mergedPeekGetMore(this, startVersion, taskID);
more = mergedPeekGetMore(this, startVersion, taskID);
return more;
}
Future<Void> ILogSystem::MergedPeekCursor::onFailed() {
@ -697,7 +699,7 @@ StringRef ILogSystem::SetPeekCursor::getMessage() { return serverCursors[current
StringRef ILogSystem::SetPeekCursor::getMessageWithTags() { return serverCursors[currentSet][currentCursor]->getMessageWithTags(); }
const std::vector<Tag>& ILogSystem::SetPeekCursor::getTags() {
VectorRef<Tag> ILogSystem::SetPeekCursor::getTags() {
return serverCursors[currentSet][currentCursor]->getTags();
}
@ -778,6 +780,10 @@ ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer
}
Future<Void> ILogSystem::SetPeekCursor::getMore(TaskPriority taskID) {
if( more.isValid() && !more.isReady() ) {
return more;
}
auto startVersion = version();
calcHasMessage();
if( hasMessage() )
@ -788,7 +794,8 @@ Future<Void> ILogSystem::SetPeekCursor::getMore(TaskPriority taskID) {
if (version() > startVersion)
return Void();
return setPeekGetMore(this, startVersion, taskID);
more = setPeekGetMore(this, startVersion, taskID);
return more;
}
Future<Void> ILogSystem::SetPeekCursor::onFailed() {
@ -859,7 +866,7 @@ StringRef ILogSystem::MultiCursor::getMessageWithTags() {
return cursors.back()->getMessageWithTags();
}
const std::vector<Tag>& ILogSystem::MultiCursor::getTags() {
VectorRef<Tag> ILogSystem::MultiCursor::getTags() {
return cursors.back()->getTags();
}
@ -909,8 +916,20 @@ Version ILogSystem::MultiCursor::popped() {
return std::max(poppedVersion, cursors.back()->popped());
}
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped) {
messages.reserve(10000);
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped), knownUnique(false), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/cursors.size();
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
cursorMessages.resize(cursors.size());
}
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore ) : messageVersion(begin), end(end), withTags(true), collectTags(false), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(false), knownUnique(true), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/logServers.size();
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
cursorMessages.resize(logServers.size());
for( int i = 0; i < logServers.size(); i++ ) {
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logServers[i], tag, begin, end, false, parallelGetMore ) );
cursors.push_back( cursor );
}
}
void ILogSystem::BufferedCursor::combineMessages() {
@ -918,7 +937,7 @@ void ILogSystem::BufferedCursor::combineMessages() {
return;
}
tags.clear();
std::vector<Tag> tags;
tags.push_back(messages[messageIndex].tags[0]);
for(int i = messageIndex + 1; i < messages.size() && messages[messageIndex].version == messages[i].version; i++) {
tags.push_back(messages[i].tags[0]);
@ -927,14 +946,17 @@ void ILogSystem::BufferedCursor::combineMessages() {
auto& msg = messages[messageIndex];
BinaryWriter messageWriter(Unversioned());
messageWriter << uint32_t(msg.message.size() + sizeof(uint32_t) + sizeof(uint16_t) + tags.size()*sizeof(Tag)) << msg.version.sub << uint16_t(tags.size());
for(auto& t : tags) {
for(auto t : tags) {
messageWriter << t;
}
messageWriter.serializeBytes(msg.message);
Standalone<StringRef> val = messageWriter.toValue();
msg.arena = val.arena();
msg.tags = tags;
msg.message = val;
msg.tags = VectorRef<Tag>();
for(auto t : tags) {
msg.tags.push_back(msg.arena, t);
}
}
Reference<ILogSystem::IPeekCursor> ILogSystem::BufferedCursor::cloneNoMore() {
@ -981,7 +1003,7 @@ StringRef ILogSystem::BufferedCursor::getMessageWithTags() {
return messages[messageIndex].message;
}
const std::vector<Tag>& ILogSystem::BufferedCursor::getTags() {
VectorRef<Tag> ILogSystem::BufferedCursor::getTags() {
ASSERT(withTags);
return messages[messageIndex].tags;
}
@ -990,26 +1012,24 @@ void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) {
ASSERT(false);
}
ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference<ILogSystem::IPeekCursor> cursor, Version maxVersion, TaskPriority taskID ) {
if(cursor->version().version >= maxVersion) {
return Void();
}
ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference<ILogSystem::IPeekCursor> cursor, int idx, TaskPriority taskID ) {
loop {
wait(yield());
if(cursor->version().version >= self->end || self->cursorMessages[idx].size() > self->targetQueueSize) {
return Void();
}
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, cursor->getMinKnownCommittedVersion());
if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
if(cursor->version().version >= maxVersion) {
if(cursor->version().version >= self->end) {
return Void();
}
while(cursor->hasMessage()) {
self->messages.push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? std::vector<Tag>() : cursor->getTags(), cursor->version()));
self->cursorMessages[idx].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef<Tag>() : cursor->getTags(), cursor->version()));
cursor->nextMessage();
if(cursor->version().version >= maxVersion) {
return Void();
}
}
}
}
@ -1020,39 +1040,57 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
throw internal_error();
}
state Version targetVersion = std::min(self->end, self->messageVersion.version + SERVER_KNOBS->VERSIONS_PER_BATCH);
self->messages.clear();
std::vector<Future<Void>> loaders;
loaders.reserve(self->cursors.size());
for(auto& cursor : self->cursors) {
loaders.push_back(bufferedGetMoreLoader(self, cursor, targetVersion, taskID));
}
wait( waitForAll(loaders) );
wait(yield());
if(self->collectTags) {
for(int i = 0; i < self->cursors.size(); i++) {
loaders.push_back(bufferedGetMoreLoader(self, self->cursors[i], i, taskID));
}
state Future<Void> allLoaders = waitForAll(loaders);
state Version minVersion;
loop {
wait( allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID) );
minVersion = self->end;
for(auto cursor : self->cursors) {
minVersion = std::min(minVersion, cursor->version().version);
}
if(minVersion > self->messageVersion.version) {
break;
}
if(allLoaders.isReady()) {
wait(Future<Void>(Never()));
}
}
wait( yield() );
for(auto &it : self->cursorMessages) {
while(!it.empty() && it.front().version.version < minVersion) {
self->messages.push_back(it.front());
it.pop_front();
}
}
if(self->collectTags || self->knownUnique) {
std::sort(self->messages.begin(), self->messages.end());
} else {
uniquify(self->messages);
}
self->messageVersion = LogMessageVersion(minVersion);
self->messageIndex = 0;
self->hasNextMessage = self->messages.size() > 0;
Version minVersion = self->end;
for(auto& cursor : self->cursors) {
minVersion = std::min(minVersion, cursor->version().version);
}
self->messageVersion = LogMessageVersion(minVersion);
if(self->collectTags) {
self->combineMessages();
}
wait(yield());
if(self->canDiscardPopped && self->poppedVersion > self->version().version) {
TraceEvent(SevWarn, "DiscardingPoppedData").detail("Version", self->version().version).detail("Popped", self->poppedVersion);
TraceEvent(SevWarn, "DiscardingPoppedData", self->randomID).detail("Version", self->version().version).detail("Popped", self->poppedVersion);
self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion));
for(auto& cursor : self->cursors) {
for(auto cursor : self->cursors) {
cursor->advanceTo(self->messageVersion);
}
self->messageIndex = self->messages.size();
@ -1107,8 +1145,7 @@ const LogMessageVersion& ILogSystem::BufferedCursor::version() {
}
Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() {
ASSERT(false);
return invalidVersion;
return minKnownCommittedVersion;
}
Version ILogSystem::BufferedCursor::popped() {

View File

@ -284,6 +284,7 @@ struct TLogData : NonCopyable {
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -890,21 +891,18 @@ void commitMessages( TLogData *self, Reference<LogData> logData, Version version
int32_t messageLength, rawLength;
uint16_t tagCount;
uint32_t sub;
std::vector<TagsAndMessage> msgs;
self->tempTagMessages.clear();
while(!rd.empty()) {
TagsAndMessage tagsAndMsg;
rd.checkpoint();
rd >> messageLength >> sub >> tagCount;
tagsAndMsg.tags.resize(tagCount);
for(int i = 0; i < tagCount; i++) {
rd >> tagsAndMsg.tags[i];
}
tagsAndMsg.tags = VectorRef<Tag>((Tag*)rd.readBytes(tagCount*sizeof(Tag)), tagCount);
rawLength = messageLength + sizeof(messageLength);
rd.rewind();
tagsAndMsg.message = StringRef((uint8_t const*)rd.readBytes(rawLength), rawLength);
msgs.push_back(std::move(tagsAndMsg));
self->tempTagMessages.push_back(std::move(tagsAndMsg));
}
commitMessages(self, logData, version, msgs);
commitMessages(self, logData, version, self->tempTagMessages);
}
Version poppedVersion( Reference<LogData> self, Tag tag) {
@ -1241,6 +1239,7 @@ ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData, st
self->queueCommitBegin = commitNumber;
logData->queueCommittingVersion = ver;
g_network->setCurrentTask(TaskPriority::TLogCommitReply);
Future<Void> c = self->persistentQueue->commit();
self->diskQueueCommitBytes = 0;
self->largeDiskQueueCommitBytes.set(false);

View File

@ -335,6 +335,7 @@ struct TLogData : NonCopyable {
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -1156,21 +1157,18 @@ void commitMessages( TLogData *self, Reference<LogData> logData, Version version
int32_t messageLength, rawLength;
uint16_t tagCount;
uint32_t sub;
std::vector<TagsAndMessage> msgs;
self->tempTagMessages.clear();
while(!rd.empty()) {
TagsAndMessage tagsAndMsg;
rd.checkpoint();
rd >> messageLength >> sub >> tagCount;
tagsAndMsg.tags.resize(tagCount);
for(int i = 0; i < tagCount; i++) {
rd >> tagsAndMsg.tags[i];
}
tagsAndMsg.tags = VectorRef<Tag>((Tag*)rd.readBytes(tagCount*sizeof(Tag)), tagCount);
rawLength = messageLength + sizeof(messageLength);
rd.rewind();
tagsAndMsg.message = StringRef((uint8_t const*)rd.readBytes(rawLength), rawLength);
msgs.push_back(std::move(tagsAndMsg));
self->tempTagMessages.push_back(std::move(tagsAndMsg));
}
commitMessages(self, logData, version, msgs);
commitMessages(self, logData, version, self->tempTagMessages);
}
Version poppedVersion( Reference<LogData> self, Tag tag) {
@ -1632,6 +1630,7 @@ ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData, st
self->queueCommitBegin = commitNumber;
logData->queueCommittingVersion = ver;
g_network->setCurrentTask(TaskPriority::TLogCommitReply);
Future<Void> c = self->persistentQueue->commit();
self->diskQueueCommitBytes = 0;
self->largeDiskQueueCommitBytes.set(false);

View File

@ -538,7 +538,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
Reference<IPeekCursor> peekRemote( UID dbgid, Version begin, Tag tag, bool parallelGetMore ) {
Reference<IPeekCursor> peekRemote( UID dbgid, Version begin, Optional<Version> end, Tag tag, bool parallelGetMore ) {
int bestSet = -1;
Version lastBegin = recoveredAt.present() ? recoveredAt.get() + 1 : 0;
for(int t = 0; t < tLogs.size(); t++) {
@ -552,21 +552,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if(bestSet == -1) {
TraceEvent("TLogPeekRemoteNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
TraceEvent("TLogPeekRemoteNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
if(begin >= lastBegin) {
TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor( tLogs[bestSet]->logRouters, tag, begin, end.present() ? end.get() + 1 : getPeekEnd(), parallelGetMore ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
cursors.emplace_back(new ILogSystem::BufferedCursor( tLogs[bestSet]->logRouters, tag, lastBegin, end.present() ? end.get() + 1 : getPeekEnd(), parallelGetMore ) );
int i = 0;
while(begin < lastBegin) {
if(i == oldLogData.size()) {
TraceEvent("TLogPeekRemoteDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
TraceEvent("TLogPeekRemoteDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
@ -583,15 +583,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if(bestOldSet == -1) {
TraceEvent("TLogPeekRemoteNoOldBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
TraceEvent("TLogPeekRemoteNoOldBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
if(thisBegin < lastBegin) {
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
.detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion);
cursors.emplace_back(new ILogSystem::MergedPeekCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
thisBegin, lastBegin, parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0));
cursors.emplace_back(new ILogSystem::BufferedCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, tag, thisBegin, lastBegin, parallelGetMore));
epochEnds.emplace_back(lastBegin);
lastBegin = thisBegin;
}
@ -602,14 +601,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Tag tag, bool parallelGetMore ) {
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Optional<Version> end, Tag tag, bool parallelGetMore ) {
if(!tLogs.size()) {
TraceEvent("TLogPeekNoLogSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
if(tag.locality == tagLocalityRemoteLog) {
return peekRemote(dbgid, begin, tag, parallelGetMore);
return peekRemote(dbgid, begin, end, tag, parallelGetMore);
} else {
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore);
}
@ -622,12 +621,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(tags.size() == 1) {
return peek(dbgid, begin, tags[0], parallelGetMore);
return peek(dbgid, begin, end, tags[0], parallelGetMore);
}
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
for(auto tag : tags) {
cursors.push_back(peek(dbgid, begin, tag, parallelGetMore));
cursors.push_back(peek(dbgid, begin, end, tag, parallelGetMore));
}
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end.present() ? end.get() + 1 : getPeekEnd(), true, tLogs[0]->locality == tagLocalityUpgraded, false) );
}
@ -1033,7 +1032,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
ACTOR static Future<Void> popFromLog( TagPartitionedLogSystem* self, Reference<AsyncVar<OptionalInterface<TLogInterface>>> log, Tag tag, double time ) {
state Version last = 0;
loop {
wait( delay(time) );
wait( delay(time, TaskPriority::TLogPop) );
state std::pair<Version,Version> to = self->outstandingPops[ std::make_pair(log->get().id(),tag) ];
@ -1045,7 +1044,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
try {
if( !log->get().present() )
return Void();
wait(log->get().interf().popMessages.getReply( TLogPopRequest( to.first, to.second, tag ) ) );
wait(log->get().interf().popMessages.getReply( TLogPopRequest( to.first, to.second, tag ), TaskPriority::TLogPop ) );
last = to.first;
} catch (Error& e) {
@ -1270,7 +1269,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return std::numeric_limits<Version>::max();
}
virtual void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, bool allLocations) {
virtual void getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, bool allLocations) {
int locationOffset = 0;
for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size()) {
@ -1907,7 +1906,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<int> locations;
for( Tag tag : localTags ) {
locations.clear();
logSet->getPushLocations( vector<Tag>(1, tag), locations, 0 );
logSet->getPushLocations( VectorRef<Tag>(&tag, 1), locations, 0 );
for(int loc : locations)
remoteTLogReqs[ loc ].recoverTags.push_back( tag );
}
@ -1923,7 +1922,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%self->txsTags);
locations.clear();
logSet->getPushLocations( {pushTag}, locations, 0 );
logSet->getPushLocations( VectorRef<Tag>(&pushTag, 1), locations, 0 );
for(int loc : locations)
remoteTLogReqs[ loc ].recoverTags.push_back( tag );
}
@ -2117,7 +2116,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<int> locations;
for( Tag tag : localTags ) {
locations.clear();
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
logSystem->tLogs[0]->getPushLocations( VectorRef<Tag>(&tag, 1), locations, 0 );
for(int loc : locations)
reqs[ loc ].recoverTags.push_back( tag );
}
@ -2131,7 +2130,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
locations.clear();
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, pushTag), locations, 0 );
logSystem->tLogs[0]->getPushLocations( VectorRef<Tag>(&pushTag, 1), locations, 0 );
for(int loc : locations)
reqs[ loc ].recoverTags.push_back( tag );
}
@ -2183,7 +2182,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// are the preferred location for id%logRouterTags.
Tag pushLocation = Tag(tagLocalityLogRouter, i%logSystem->logRouterTags);
locations.clear();
logSystem->tLogs[1]->getPushLocations( {pushLocation}, locations, 0 );
logSystem->tLogs[1]->getPushLocations( VectorRef<Tag>(&pushLocation,1), locations, 0 );
for(int loc : locations)
sreqs[ loc ].recoverTags.push_back( tag );
}
@ -2193,7 +2192,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
locations.clear();
logSystem->tLogs[1]->getPushLocations( {pushTag}, locations, 0 );
logSystem->tLogs[1]->getPushLocations( VectorRef<Tag>(&pushTag,1), locations, 0 );
for(int loc : locations)
sreqs[ loc ].recoverTags.push_back( tag );
}