optimized memory allocations by using VectorRef<Tag> instead of std::vector<Tag>

This commit is contained in:
Evan Tschannen 2019-11-05 18:07:30 -08:00
parent 4a597fdcce
commit a8ca47beff
7 changed files with 61 additions and 61 deletions

View File

@ -110,10 +110,10 @@ enum { txsTagOld = -1, invalidTagOld = -100 };
struct TagsAndMessage { struct TagsAndMessage {
StringRef message; StringRef message;
std::vector<Tag> tags; VectorRef<Tag> tags;
TagsAndMessage() {} TagsAndMessage() {}
TagsAndMessage(StringRef message, const std::vector<Tag>& tags) : message(message), tags(tags) {} TagsAndMessage(StringRef message, VectorRef<Tag> tags) : message(message), tags(tags) {}
}; };
struct KeyRangeRef; struct KeyRangeRef;

View File

@ -245,6 +245,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
state Version ver = 0; state Version ver = 0;
state std::vector<TagsAndMessage> messages; state std::vector<TagsAndMessage> messages;
state Arena arena;
while (true) { while (true) {
state bool foundMessage = r->hasMessage(); state bool foundMessage = r->hasMessage();
if (!foundMessage || r->version().version != ver) { if (!foundMessage || r->version().version != ver) {
@ -260,6 +261,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
lastVer = ver; lastVer = ver;
ver = r->version().version; ver = r->version().version;
messages.clear(); messages.clear();
arena = Arena();
if (!foundMessage) { if (!foundMessage) {
ver--; //ver is the next possible version we will get data for ver--; //ver is the next possible version we will get data for
@ -277,8 +279,9 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
tagAndMsg.message = r->getMessageWithTags(); tagAndMsg.message = r->getMessageWithTags();
tags.clear(); tags.clear();
self->logSet.getPushLocations(r->getTags(), tags, 0); self->logSet.getPushLocations(r->getTags(), tags, 0);
tagAndMsg.tags.reserve(arena, tags.size());
for (const auto& t : tags) { for (const auto& t : tags) {
tagAndMsg.tags.emplace_back(tagLocalityRemoteLog, t); tagAndMsg.tags.push_back(arena, Tag(tagLocalityRemoteLog, t));
} }
messages.push_back(std::move(tagAndMsg)); messages.push_back(std::move(tagAndMsg));

View File

@ -231,7 +231,7 @@ public:
return resultEntries.size() == 0; return resultEntries.size() == 0;
} }
void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, int locationOffset, void getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, int locationOffset,
bool allLocations = false) { bool allLocations = false) {
if(locality == tagLocalitySatellite) { if(locality == tagLocalitySatellite) {
for(auto& t : tags) { for(auto& t : tags) {
@ -309,7 +309,7 @@ struct ILogSystem {
//pre: only callable if hasMessage() returns true //pre: only callable if hasMessage() returns true
//return the tags associated with the message for the current sequence //return the tags associated with the message for the current sequence
virtual const std::vector<Tag>& getTags() = 0; virtual VectorRef<Tag> getTags() = 0;
//pre: only callable if hasMessage() returns true //pre: only callable if hasMessage() returns true
//returns the arena containing the contents of getMessage(), getMessageWithTags(), and reader() //returns the arena containing the contents of getMessage(), getMessageWithTags(), and reader()
@ -382,7 +382,7 @@ struct ILogSystem {
LogMessageVersion messageVersion, end; LogMessageVersion messageVersion, end;
Version poppedVersion; Version poppedVersion;
int32_t messageLength, rawLength; int32_t messageLength, rawLength;
std::vector<Tag> tags; VectorRef<Tag> tags;
bool hasMsg; bool hasMsg;
Future<Void> more; Future<Void> more;
UID randomID; UID randomID;
@ -405,7 +405,7 @@ struct ILogSystem {
virtual void nextMessage(); virtual void nextMessage();
virtual StringRef getMessage(); virtual StringRef getMessage();
virtual StringRef getMessageWithTags(); virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags(); virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n); virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply); virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed(); virtual Future<Void> onFailed();
@ -454,7 +454,7 @@ struct ILogSystem {
virtual void nextMessage(); virtual void nextMessage();
virtual StringRef getMessage(); virtual StringRef getMessage();
virtual StringRef getMessageWithTags(); virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags(); virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n); virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply); virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed(); virtual Future<Void> onFailed();
@ -500,7 +500,7 @@ struct ILogSystem {
virtual void nextMessage(); virtual void nextMessage();
virtual StringRef getMessage(); virtual StringRef getMessage();
virtual StringRef getMessageWithTags(); virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags(); virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n); virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply); virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed(); virtual Future<Void> onFailed();
@ -534,7 +534,7 @@ struct ILogSystem {
virtual void nextMessage(); virtual void nextMessage();
virtual StringRef getMessage(); virtual StringRef getMessage();
virtual StringRef getMessageWithTags(); virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags(); virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n); virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply); virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed(); virtual Future<Void> onFailed();
@ -557,12 +557,12 @@ struct ILogSystem {
struct BufferedMessage { struct BufferedMessage {
Arena arena; Arena arena;
StringRef message; StringRef message;
std::vector<Tag> tags; VectorRef<Tag> tags;
LogMessageVersion version; LogMessageVersion version;
BufferedMessage() {} BufferedMessage() {}
explicit BufferedMessage( Version version ) : version(version) {} explicit BufferedMessage( Version version ) : version(version) {}
BufferedMessage( Arena arena, StringRef message, const std::vector<Tag>& tags, const LogMessageVersion& version ) : arena(arena), message(message), tags(tags), version(version) {} BufferedMessage( Arena arena, StringRef message, const VectorRef<Tag>& tags, const LogMessageVersion& version ) : arena(arena), message(message), tags(tags), version(version) {}
bool operator < (BufferedMessage const& r) const { bool operator < (BufferedMessage const& r) const {
return version < r.version; return version < r.version;
@ -582,6 +582,7 @@ struct ILogSystem {
bool hasNextMessage; bool hasNextMessage;
bool withTags; bool withTags;
bool knownUnique; bool knownUnique;
Version minKnownCommittedVersion;
Version poppedVersion; Version poppedVersion;
Version initialPoppedVersion; Version initialPoppedVersion;
bool canDiscardPopped; bool canDiscardPopped;
@ -591,7 +592,7 @@ struct ILogSystem {
//FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade. //FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade.
bool collectTags; bool collectTags;
std::vector<Tag> tags; VectorRef<Tag> tags;
void combineMessages(); void combineMessages();
BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ); BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped );
@ -605,7 +606,7 @@ struct ILogSystem {
virtual void nextMessage(); virtual void nextMessage();
virtual StringRef getMessage(); virtual StringRef getMessage();
virtual StringRef getMessageWithTags(); virtual StringRef getMessageWithTags();
virtual const std::vector<Tag>& getTags(); virtual VectorRef<Tag> getTags();
virtual void advanceTo(LogMessageVersion n); virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply); virtual Future<Void> getMore(TaskPriority taskID = TaskPriority::TLogPeekReply);
virtual Future<Void> onFailed(); virtual Future<Void> onFailed();
@ -717,7 +718,11 @@ struct ILogSystem {
virtual Future<Void> onLogSystemConfigChange() = 0; virtual Future<Void> onLogSystemConfigChange() = 0;
// Returns when the log system configuration has changed due to a tlog rejoin. // Returns when the log system configuration has changed due to a tlog rejoin.
virtual void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, bool allLocations = false) = 0; virtual void getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, bool allLocations = false) = 0;
void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, bool allLocations = false) {
getPushLocations(VectorRef<Tag>((Tag*)&tags.front(), tags.size()), locations, allLocations);
}
virtual bool hasRemoteLogs() const = 0; virtual bool hasRemoteLogs() const = 0;

View File

@ -92,10 +92,7 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
uint16_t tagCount; uint16_t tagCount;
rd.checkpoint(); rd.checkpoint();
rd >> messageLength >> messageVersion.sub >> tagCount; rd >> messageLength >> messageVersion.sub >> tagCount;
tags.resize(tagCount); tags = VectorRef<Tag>((Tag*)rd.readBytes(tagCount*sizeof(Tag)), tagCount);
for(int i = 0; i < tagCount; i++) {
rd >> tags[i];
}
rawLength = messageLength + sizeof(messageLength); rawLength = messageLength + sizeof(messageLength);
messageLength -= (sizeof(messageVersion.sub) + sizeof(tagCount) + tagCount*sizeof(Tag)); messageLength -= (sizeof(messageVersion.sub) + sizeof(tagCount) + tagCount*sizeof(Tag));
hasMsg = true; hasMsg = true;
@ -112,7 +109,7 @@ StringRef ILogSystem::ServerPeekCursor::getMessageWithTags() {
return StringRef( (uint8_t const*)rd.readBytes(rawLength), rawLength); return StringRef( (uint8_t const*)rd.readBytes(rawLength), rawLength);
} }
const std::vector<Tag>& ILogSystem::ServerPeekCursor::getTags() { VectorRef<Tag> ILogSystem::ServerPeekCursor::getTags() {
return tags; return tags;
} }
@ -438,7 +435,7 @@ StringRef ILogSystem::MergedPeekCursor::getMessageWithTags() {
return serverCursors[currentCursor]->getMessageWithTags(); return serverCursors[currentCursor]->getMessageWithTags();
} }
const std::vector<Tag>& ILogSystem::MergedPeekCursor::getTags() { VectorRef<Tag> ILogSystem::MergedPeekCursor::getTags() {
return serverCursors[currentCursor]->getTags(); return serverCursors[currentCursor]->getTags();
} }
@ -702,7 +699,7 @@ StringRef ILogSystem::SetPeekCursor::getMessage() { return serverCursors[current
StringRef ILogSystem::SetPeekCursor::getMessageWithTags() { return serverCursors[currentSet][currentCursor]->getMessageWithTags(); } StringRef ILogSystem::SetPeekCursor::getMessageWithTags() { return serverCursors[currentSet][currentCursor]->getMessageWithTags(); }
const std::vector<Tag>& ILogSystem::SetPeekCursor::getTags() { VectorRef<Tag> ILogSystem::SetPeekCursor::getTags() {
return serverCursors[currentSet][currentCursor]->getTags(); return serverCursors[currentSet][currentCursor]->getTags();
} }
@ -869,7 +866,7 @@ StringRef ILogSystem::MultiCursor::getMessageWithTags() {
return cursors.back()->getMessageWithTags(); return cursors.back()->getMessageWithTags();
} }
const std::vector<Tag>& ILogSystem::MultiCursor::getTags() { VectorRef<Tag> ILogSystem::MultiCursor::getTags() {
return cursors.back()->getTags(); return cursors.back()->getTags();
} }
@ -919,13 +916,13 @@ Version ILogSystem::MultiCursor::popped() {
return std::max(poppedVersion, cursors.back()->popped()); return std::max(poppedVersion, cursors.back()->popped());
} }
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped), knownUnique(false), randomID(deterministicRandom()->randomUniqueID()) { ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped), knownUnique(false), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/cursors.size(); targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/cursors.size();
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES); messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
cursorMessages.resize(cursors.size()); cursorMessages.resize(cursors.size());
} }
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore ) : messageVersion(begin), end(end), withTags(true), collectTags(false), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(false), knownUnique(true), randomID(deterministicRandom()->randomUniqueID()) { ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore ) : messageVersion(begin), end(end), withTags(true), collectTags(false), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(false), knownUnique(true), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/logServers.size(); targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/logServers.size();
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES); messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
cursorMessages.resize(logServers.size()); cursorMessages.resize(logServers.size());
@ -940,22 +937,22 @@ void ILogSystem::BufferedCursor::combineMessages() {
return; return;
} }
tags.clear(); std::vector<Tag> tempTags;
tags.push_back(messages[messageIndex].tags[0]); tempTags.push_back(messages[messageIndex].tags[0]);
for(int i = messageIndex + 1; i < messages.size() && messages[messageIndex].version == messages[i].version; i++) { for(int i = messageIndex + 1; i < messages.size() && messages[messageIndex].version == messages[i].version; i++) {
tags.push_back(messages[i].tags[0]); tempTags.push_back(messages[i].tags[0]);
messageIndex = i; messageIndex = i;
} }
auto& msg = messages[messageIndex]; auto& msg = messages[messageIndex];
BinaryWriter messageWriter(Unversioned()); BinaryWriter messageWriter(Unversioned());
messageWriter << uint32_t(msg.message.size() + sizeof(uint32_t) + sizeof(uint16_t) + tags.size()*sizeof(Tag)) << msg.version.sub << uint16_t(tags.size()); messageWriter << uint32_t(msg.message.size() + sizeof(uint32_t) + sizeof(uint16_t) + tempTags.size()*sizeof(Tag)) << msg.version.sub << uint16_t(tags.size());
for(auto& t : tags) { msg.tags = VectorRef<Tag>((Tag*)(((uint8_t*)messageWriter.getData())+messageWriter.getLength()), tags.size());
for(auto t : tempTags) {
messageWriter << t; messageWriter << t;
} }
messageWriter.serializeBytes(msg.message); messageWriter.serializeBytes(msg.message);
Standalone<StringRef> val = messageWriter.toValue(); Standalone<StringRef> val = messageWriter.toValue();
msg.arena = val.arena(); msg.arena = val.arena();
msg.tags = tags;
msg.message = val; msg.message = val;
} }
@ -1003,7 +1000,7 @@ StringRef ILogSystem::BufferedCursor::getMessageWithTags() {
return messages[messageIndex].message; return messages[messageIndex].message;
} }
const std::vector<Tag>& ILogSystem::BufferedCursor::getTags() { VectorRef<Tag> ILogSystem::BufferedCursor::getTags() {
ASSERT(withTags); ASSERT(withTags);
return messages[messageIndex].tags; return messages[messageIndex].tags;
} }
@ -1020,6 +1017,7 @@ ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Refe
} }
wait(cursor->getMore(taskID)); wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped()); self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, cursor->getMinKnownCommittedVersion());
if(self->canDiscardPopped) { if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped()); self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
} }
@ -1027,7 +1025,7 @@ ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Refe
return Void(); return Void();
} }
while(cursor->hasMessage()) { while(cursor->hasMessage()) {
self->cursorMessages[idx].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? std::vector<Tag>() : cursor->getTags(), cursor->version())); self->cursorMessages[idx].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef<Tag>() : cursor->getTags(), cursor->version()));
cursor->nextMessage(); cursor->nextMessage();
} }
} }
@ -1053,7 +1051,7 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
loop { loop {
wait( allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID) ); wait( allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID) );
minVersion = self->end; minVersion = self->end;
for(auto& cursor : self->cursors) { for(auto cursor : self->cursors) {
minVersion = std::min(minVersion, cursor->version().version); minVersion = std::min(minVersion, cursor->version().version);
} }
if(minVersion > self->messageVersion.version) { if(minVersion > self->messageVersion.version) {
@ -1089,7 +1087,7 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
if(self->canDiscardPopped && self->poppedVersion > self->version().version) { if(self->canDiscardPopped && self->poppedVersion > self->version().version) {
TraceEvent(SevWarn, "DiscardingPoppedData", self->randomID).detail("Version", self->version().version).detail("Popped", self->poppedVersion); TraceEvent(SevWarn, "DiscardingPoppedData", self->randomID).detail("Version", self->version().version).detail("Popped", self->poppedVersion);
self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion)); self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion));
for(auto& cursor : self->cursors) { for(auto cursor : self->cursors) {
cursor->advanceTo(self->messageVersion); cursor->advanceTo(self->messageVersion);
} }
self->messageIndex = self->messages.size(); self->messageIndex = self->messages.size();
@ -1144,11 +1142,7 @@ const LogMessageVersion& ILogSystem::BufferedCursor::version() {
} }
Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() { Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() {
Version res = 0; return minKnownCommittedVersion;
for(auto& cursor : cursors) {
res = std::max(res, cursor->getMinKnownCommittedVersion());
}
return res;
} }
Version ILogSystem::BufferedCursor::popped() { Version ILogSystem::BufferedCursor::popped() {

View File

@ -284,6 +284,7 @@ struct TLogData : NonCopyable {
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set // that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded; Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder) TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -890,21 +891,18 @@ void commitMessages( TLogData *self, Reference<LogData> logData, Version version
int32_t messageLength, rawLength; int32_t messageLength, rawLength;
uint16_t tagCount; uint16_t tagCount;
uint32_t sub; uint32_t sub;
std::vector<TagsAndMessage> msgs; self->tempTagMessages.clear();
while(!rd.empty()) { while(!rd.empty()) {
TagsAndMessage tagsAndMsg; TagsAndMessage tagsAndMsg;
rd.checkpoint(); rd.checkpoint();
rd >> messageLength >> sub >> tagCount; rd >> messageLength >> sub >> tagCount;
tagsAndMsg.tags.resize(tagCount); tagsAndMsg.tags = VectorRef<Tag>((Tag*)rd.readBytes(tagCount*sizeof(Tag)), tagCount);
for(int i = 0; i < tagCount; i++) {
rd >> tagsAndMsg.tags[i];
}
rawLength = messageLength + sizeof(messageLength); rawLength = messageLength + sizeof(messageLength);
rd.rewind(); rd.rewind();
tagsAndMsg.message = StringRef((uint8_t const*)rd.readBytes(rawLength), rawLength); tagsAndMsg.message = StringRef((uint8_t const*)rd.readBytes(rawLength), rawLength);
msgs.push_back(std::move(tagsAndMsg)); self->tempTagMessages.push_back(std::move(tagsAndMsg));
} }
commitMessages(self, logData, version, msgs); commitMessages(self, logData, version, self->tempTagMessages);
} }
Version poppedVersion( Reference<LogData> self, Tag tag) { Version poppedVersion( Reference<LogData> self, Tag tag) {
@ -1241,6 +1239,7 @@ ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData, st
self->queueCommitBegin = commitNumber; self->queueCommitBegin = commitNumber;
logData->queueCommittingVersion = ver; logData->queueCommittingVersion = ver;
g_network->setCurrentTask(TaskPriority::TLogCommitReply);
Future<Void> c = self->persistentQueue->commit(); Future<Void> c = self->persistentQueue->commit();
self->diskQueueCommitBytes = 0; self->diskQueueCommitBytes = 0;
self->largeDiskQueueCommitBytes.set(false); self->largeDiskQueueCommitBytes.set(false);

View File

@ -335,6 +335,7 @@ struct TLogData : NonCopyable {
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set // that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded; Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder) TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()), : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -1156,21 +1157,18 @@ void commitMessages( TLogData *self, Reference<LogData> logData, Version version
int32_t messageLength, rawLength; int32_t messageLength, rawLength;
uint16_t tagCount; uint16_t tagCount;
uint32_t sub; uint32_t sub;
std::vector<TagsAndMessage> msgs; self->tempTagMessages.clear();
while(!rd.empty()) { while(!rd.empty()) {
TagsAndMessage tagsAndMsg; TagsAndMessage tagsAndMsg;
rd.checkpoint(); rd.checkpoint();
rd >> messageLength >> sub >> tagCount; rd >> messageLength >> sub >> tagCount;
tagsAndMsg.tags.resize(tagCount); tagsAndMsg.tags = VectorRef<Tag>((Tag*)rd.readBytes(tagCount*sizeof(Tag)), tagCount);
for(int i = 0; i < tagCount; i++) {
rd >> tagsAndMsg.tags[i];
}
rawLength = messageLength + sizeof(messageLength); rawLength = messageLength + sizeof(messageLength);
rd.rewind(); rd.rewind();
tagsAndMsg.message = StringRef((uint8_t const*)rd.readBytes(rawLength), rawLength); tagsAndMsg.message = StringRef((uint8_t const*)rd.readBytes(rawLength), rawLength);
msgs.push_back(std::move(tagsAndMsg)); self->tempTagMessages.push_back(std::move(tagsAndMsg));
} }
commitMessages(self, logData, version, msgs); commitMessages(self, logData, version, self->tempTagMessages);
} }
Version poppedVersion( Reference<LogData> self, Tag tag) { Version poppedVersion( Reference<LogData> self, Tag tag) {
@ -1632,6 +1630,7 @@ ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData, st
self->queueCommitBegin = commitNumber; self->queueCommitBegin = commitNumber;
logData->queueCommittingVersion = ver; logData->queueCommittingVersion = ver;
g_network->setCurrentTask(TaskPriority::TLogCommitReply);
Future<Void> c = self->persistentQueue->commit(); Future<Void> c = self->persistentQueue->commit();
self->diskQueueCommitBytes = 0; self->diskQueueCommitBytes = 0;
self->largeDiskQueueCommitBytes.set(false); self->largeDiskQueueCommitBytes.set(false);

View File

@ -1269,7 +1269,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return std::numeric_limits<Version>::max(); return std::numeric_limits<Version>::max();
} }
virtual void getPushLocations(std::vector<Tag> const& tags, std::vector<int>& locations, bool allLocations) { virtual void getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, bool allLocations) {
int locationOffset = 0; int locationOffset = 0;
for(auto& log : tLogs) { for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size()) { if(log->isLocal && log->logServers.size()) {
@ -1906,7 +1906,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<int> locations; std::vector<int> locations;
for( Tag tag : localTags ) { for( Tag tag : localTags ) {
locations.clear(); locations.clear();
logSet->getPushLocations( vector<Tag>(1, tag), locations, 0 ); logSet->getPushLocations( VectorRef<Tag>(&tag, 1), locations, 0 );
for(int loc : locations) for(int loc : locations)
remoteTLogReqs[ loc ].recoverTags.push_back( tag ); remoteTLogReqs[ loc ].recoverTags.push_back( tag );
} }
@ -1922,7 +1922,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i); Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%self->txsTags); Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%self->txsTags);
locations.clear(); locations.clear();
logSet->getPushLocations( {pushTag}, locations, 0 ); logSet->getPushLocations( VectorRef<Tag>(&pushTag, 1), locations, 0 );
for(int loc : locations) for(int loc : locations)
remoteTLogReqs[ loc ].recoverTags.push_back( tag ); remoteTLogReqs[ loc ].recoverTags.push_back( tag );
} }
@ -2116,7 +2116,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<int> locations; std::vector<int> locations;
for( Tag tag : localTags ) { for( Tag tag : localTags ) {
locations.clear(); locations.clear();
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 ); logSystem->tLogs[0]->getPushLocations( VectorRef<Tag>(&tag, 1), locations, 0 );
for(int loc : locations) for(int loc : locations)
reqs[ loc ].recoverTags.push_back( tag ); reqs[ loc ].recoverTags.push_back( tag );
} }
@ -2130,7 +2130,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i); Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags); Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
locations.clear(); locations.clear();
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, pushTag), locations, 0 ); logSystem->tLogs[0]->getPushLocations( VectorRef<Tag>(&pushTag, 1), locations, 0 );
for(int loc : locations) for(int loc : locations)
reqs[ loc ].recoverTags.push_back( tag ); reqs[ loc ].recoverTags.push_back( tag );
} }
@ -2182,7 +2182,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// are the preferred location for id%logRouterTags. // are the preferred location for id%logRouterTags.
Tag pushLocation = Tag(tagLocalityLogRouter, i%logSystem->logRouterTags); Tag pushLocation = Tag(tagLocalityLogRouter, i%logSystem->logRouterTags);
locations.clear(); locations.clear();
logSystem->tLogs[1]->getPushLocations( {pushLocation}, locations, 0 ); logSystem->tLogs[1]->getPushLocations( VectorRef<Tag>(&pushLocation,1), locations, 0 );
for(int loc : locations) for(int loc : locations)
sreqs[ loc ].recoverTags.push_back( tag ); sreqs[ loc ].recoverTags.push_back( tag );
} }
@ -2192,7 +2192,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i); Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags); Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
locations.clear(); locations.clear();
logSystem->tLogs[1]->getPushLocations( {pushTag}, locations, 0 ); logSystem->tLogs[1]->getPushLocations( VectorRef<Tag>(&pushTag,1), locations, 0 );
for(int loc : locations) for(int loc : locations)
sreqs[ loc ].recoverTags.push_back( tag ); sreqs[ loc ].recoverTags.push_back( tag );
} }