Use TagsAndMessage for deserialization in TLogServer

This commit is contained in:
Jingyu Zhou 2019-09-05 13:25:02 -07:00
parent 2723922f5f
commit 2d5ebebb7b
2 changed files with 15 additions and 26 deletions

View File

@ -116,8 +116,10 @@ struct TagsAndMessage {
TagsAndMessage(StringRef message, const std::vector<Tag>& tags) : message(message), tags(tags) {}
// Loads tags and message from a serialized buffer. "rd" is checkpointed at
// its begining position to allow the caller to be rewinded if needed.
void loadFromArena(ArenaReader* rd, uint32_t* messageVersionSub) {
// its begining position to allow the caller to rewind if needed.
// T can be ArenaReader or BinaryReader.
template <class T>
void loadFromArena(T* rd, uint32_t* messageVersionSub) {
int32_t messageLength;
uint16_t tagCount;
uint32_t sub;

View File

@ -1279,32 +1279,18 @@ ACTOR Future<std::vector<StringRef>> parseMessagesForTag( StringRef commitBlob,
state std::vector<StringRef> relevantMessages;
state BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion));
while (!rd.empty()) {
uint32_t messageLength = 0;
uint32_t subsequence = 0;
uint16_t tagCount = 0;
rd >> messageLength;
rd.checkpoint();
rd >> subsequence >> tagCount;
Tag msgtag;
bool match = false;
for (int i = 0; i < tagCount; i++) {
rd >> msgtag;
if (msgtag == tag) {
match = true;
break;
} else if (tag.locality == tagLocalityLogRouter && msgtag.locality == tagLocalityLogRouter &&
msgtag.id % logRouters == tag.id) {
TagsAndMessage tagsAndMessage;
tagsAndMessage.loadFromArena(&rd, nullptr);
for (Tag t : tagsAndMessage.tags) {
if (t == tag || (tag.locality == tagLocalityLogRouter && t.locality == tagLocalityLogRouter &&
t.id % logRouters == tag.id)) {
// Mutations that are in the partially durable span between known comitted version and
// recovery version get copied to the new log generation. These commits might have had more
// log router tags than what now exist, so we mod them down to what we have.
match = true;
relevantMessages.push_back(tagsAndMessage.getRawMessage());
break;
}
}
rd.rewind();
const void* begin = rd.readBytes(messageLength);
if (match) {
relevantMessages.push_back( StringRef((uint8_t*)begin, messageLength) );
}
wait(yield());
}
return relevantMessages;
@ -1521,9 +1507,10 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
messages << VERSION_HEADER << entry.version;
std::vector<StringRef> parsedMessages = wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
for (StringRef msg : parsedMessages) {
messages << msg;
std::vector<StringRef> rawMessages =
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
}
lastRefMessageVersion = entry.version;