optimized the tlog to use a vector for tags instead of a map
This commit is contained in:
parent
fecfea0f7d
commit
9c8cb445d6
|
@ -33,7 +33,7 @@ typedef StringRef KeyRef;
|
|||
typedef StringRef ValueRef;
|
||||
typedef int64_t Generation;
|
||||
|
||||
enum { tagLocalitySpecial = -100, tagLocalityLogRouter = -1, tagLocalityRemoteLog = -2, tagLocalityUpgraded = -3};
|
||||
enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4}; //The TLog and LogRouter require these number to be as compact as possible
|
||||
|
||||
#pragma pack(push, 1)
|
||||
struct Tag {
|
||||
|
|
|
@ -283,8 +283,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( REMOVE_RETRY_DELAY, 1.0 );
|
||||
init( MOVE_KEYS_KRM_LIMIT, 2000 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT = 2;
|
||||
init( MOVE_KEYS_KRM_LIMIT_BYTES, 1e5 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT_BYTES = 5e4; //This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an attempt to read a key range map
|
||||
init( SKIP_TAGS_GROWTH_RATE, 2.0 );
|
||||
init( MAX_SKIP_TAGS, 100 );
|
||||
init( MAX_SKIP_TAGS, 1 ); //The TLogs require tags to be densely packed to be memory efficient, so be careful increasing this knob
|
||||
|
||||
//FdbServer
|
||||
bool longReboots = randomize && BUGGIFY;
|
||||
|
|
|
@ -227,7 +227,6 @@ public:
|
|||
double REMOVE_RETRY_DELAY;
|
||||
int MOVE_KEYS_KRM_LIMIT;
|
||||
int MOVE_KEYS_KRM_LIMIT_BYTES; //This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an attempt to read a key range map
|
||||
double SKIP_TAGS_GROWTH_RATE;
|
||||
int MAX_SKIP_TAGS;
|
||||
|
||||
//FdbServer
|
||||
|
|
|
@ -720,7 +720,7 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer( Database cx, StorageServ
|
|||
throw recruitment_failed(); // There is a remote possibility that we successfully added ourselves and then someone removed us, so we have to fail
|
||||
|
||||
if(e.code() == error_code_not_committed) {
|
||||
maxSkipTags = std::min<int>(maxSkipTags * SERVER_KNOBS->SKIP_TAGS_GROWTH_RATE, SERVER_KNOBS->MAX_SKIP_TAGS);
|
||||
maxSkipTags = SERVER_KNOBS->MAX_SKIP_TAGS;
|
||||
}
|
||||
|
||||
Void _ = wait( tr.onError(e) );
|
||||
|
|
|
@ -302,22 +302,24 @@ struct TLogData : NonCopyable {
|
|||
};
|
||||
|
||||
struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
struct TagData {
|
||||
struct TagData : NonCopyable, public ReferenceCounted<TagData> {
|
||||
std::deque<std::pair<Version, LengthPrefixedStringRef>> version_messages;
|
||||
bool nothing_persistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
|
||||
bool popped_recently; // `popped` has changed since last updatePersistentData
|
||||
Version popped; // see popped version tracking contract below
|
||||
bool update_version_sizes;
|
||||
Tag tag;
|
||||
|
||||
TagData( Version popped, bool nothing_persistent, bool popped_recently, Tag tag ) : nothing_persistent(nothing_persistent), popped(popped), popped_recently(popped_recently), update_version_sizes(tag != txsTag) {}
|
||||
TagData( Tag tag, Version popped, bool nothing_persistent, bool popped_recently ) : tag(tag), nothing_persistent(nothing_persistent), popped(popped), popped_recently(popped_recently), update_version_sizes(tag != txsTag) {}
|
||||
|
||||
TagData(TagData&& r) noexcept(true) : version_messages(std::move(r.version_messages)), nothing_persistent(r.nothing_persistent), popped_recently(r.popped_recently), popped(r.popped), update_version_sizes(r.update_version_sizes) {}
|
||||
TagData(TagData&& r) noexcept(true) : version_messages(std::move(r.version_messages)), nothing_persistent(r.nothing_persistent), popped_recently(r.popped_recently), popped(r.popped), update_version_sizes(r.update_version_sizes), tag(r.tag) {}
|
||||
void operator= (TagData&& r) noexcept(true) {
|
||||
version_messages = std::move(r.version_messages);
|
||||
nothing_persistent = r.nothing_persistent;
|
||||
popped_recently = r.popped_recently;
|
||||
popped = r.popped;
|
||||
update_version_sizes = r.update_version_sizes;
|
||||
tag = r.tag;
|
||||
}
|
||||
|
||||
// Erase messages not needed to update *from* versions >= before (thus, messages with toversion <= before)
|
||||
|
@ -376,7 +378,35 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
Version knownCommittedVersion;
|
||||
|
||||
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
|
||||
Map< Tag, TagData > tag_data;
|
||||
std::vector<std::vector<std::vector<Reference<TagData>>>> tag_data; //negative or positive tag.locality | abs(tag.locality) | tag.id
|
||||
|
||||
Reference<TagData> getTagData(int8_t locality, uint16_t id, std::vector<std::vector<Reference<TagData>>>& data) {
|
||||
if(locality >= data.size()) {
|
||||
data.resize(locality+1);
|
||||
}
|
||||
if(id >= data[locality].size()) {
|
||||
data[locality].resize(id+1);
|
||||
}
|
||||
return data[locality][id];
|
||||
}
|
||||
|
||||
Reference<TagData> getTagData(Tag tag) {
|
||||
if(tag.locality < 0) {
|
||||
return getTagData(-(1+tag.locality), tag.id, tag_data[1]);
|
||||
}
|
||||
return getTagData(tag.locality, tag.id, tag_data[0]);
|
||||
}
|
||||
|
||||
//only callable after getTagData returns a null reference
|
||||
Reference<TagData> createTagData(Tag tag, Version popped, bool nothing_persistent, bool popped_recently) {
|
||||
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, nothing_persistent, popped_recently) );
|
||||
if(tag.locality < 0) {
|
||||
tag_data[1][-(1+tag.locality)][tag.id] = newTagData;
|
||||
} else {
|
||||
tag_data[0][tag.locality][tag.id] = newTagData;
|
||||
}
|
||||
return newTagData;
|
||||
}
|
||||
|
||||
Map<Version, std::pair<int,int>> version_sizes;
|
||||
|
||||
|
@ -400,6 +430,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(0)
|
||||
{
|
||||
tag_data.resize(2);
|
||||
startRole(interf.id(), UID(), "TLog");
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
|
@ -464,28 +495,35 @@ ACTOR Future<Void> tLogLock( TLogData* self, ReplyPromise< TLogLockResult > repl
|
|||
TLogLockResult result;
|
||||
result.end = stopVersion;
|
||||
result.knownCommittedVersion = logData->knownCommittedVersion;
|
||||
for( auto & tag : logData->tag_data )
|
||||
result.tags.push_back( tag.key );
|
||||
|
||||
for(int tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(int tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(int tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_special][tag_locality][tag_id]) {
|
||||
result.tags.push_back(logData->tag_data[tag_special][tag_locality][tag_id]->tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TLogStop2", self->dbgid).detail("logId", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get()).detail("tags", describe(result.tags));
|
||||
|
||||
|
||||
reply.send( result );
|
||||
return Void();
|
||||
}
|
||||
|
||||
void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Tag tag, LogData::TagData& data ) {
|
||||
if (!data.popped_recently) return;
|
||||
self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, tag), persistTagPoppedValue(data.popped) ));
|
||||
data.popped_recently = false;
|
||||
void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Reference<LogData::TagData> data ) {
|
||||
if (!data->popped_recently) return;
|
||||
self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, data->tag), persistTagPoppedValue(data->popped) ));
|
||||
data->popped_recently = false;
|
||||
|
||||
if (data.nothing_persistent) return;
|
||||
if (data->nothing_persistent) return;
|
||||
|
||||
self->persistentData->clear( KeyRangeRef(
|
||||
persistTagMessagesKey( logData->logId, tag, Version(0) ),
|
||||
persistTagMessagesKey( logData->logId, tag, data.popped ) ) );
|
||||
if (data.popped > logData->persistentDataVersion)
|
||||
data.nothing_persistent = true;
|
||||
persistTagMessagesKey( logData->logId, data->tag, Version(0) ),
|
||||
persistTagMessagesKey( logData->logId, data->tag, data->popped ) ) );
|
||||
if (data->popped > logData->persistentDataVersion)
|
||||
data->nothing_persistent = true;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logData, Version newPersistentDataVersion ) {
|
||||
|
@ -498,33 +536,44 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
//TraceEvent("updatePersistentData", self->dbgid).detail("seq", newPersistentDataSeq);
|
||||
|
||||
state bool anyData = false;
|
||||
state Map<Tag, LogData::TagData>::iterator tag;
|
||||
|
||||
// For all existing tags
|
||||
for(tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag) {
|
||||
state Version currentVersion = 0;
|
||||
// Clear recently popped versions from persistentData if necessary
|
||||
updatePersistentPopped( self, logData, tag->key, tag->value );
|
||||
// Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData
|
||||
state std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator msg = tag->value.version_messages.begin();
|
||||
while(msg != tag->value.version_messages.end() && msg->first <= newPersistentDataVersion) {
|
||||
currentVersion = msg->first;
|
||||
anyData = true;
|
||||
tag->value.nothing_persistent = false;
|
||||
BinaryWriter wr( Unversioned() );
|
||||
state int tag_special = 0;
|
||||
state int tag_locality = 0;
|
||||
state int tag_id = 0;
|
||||
|
||||
for(; msg != tag->value.version_messages.end() && msg->first == currentVersion; ++msg)
|
||||
wr << msg->second.toStringRef();
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
state Reference<LogData::TagData> tagData = logData->tag_data[tag_special][tag_locality][tag_id];
|
||||
if(tagData) {
|
||||
state Version currentVersion = 0;
|
||||
// Clear recently popped versions from persistentData if necessary
|
||||
updatePersistentPopped( self, logData, tagData );
|
||||
// Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData
|
||||
state std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator msg = tagData->version_messages.begin();
|
||||
while(msg != tagData->version_messages.end() && msg->first <= newPersistentDataVersion) {
|
||||
currentVersion = msg->first;
|
||||
anyData = true;
|
||||
tagData->nothing_persistent = false;
|
||||
BinaryWriter wr( Unversioned() );
|
||||
|
||||
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tag->key, currentVersion ), wr.toStringRef() ) );
|
||||
for(; msg != tagData->version_messages.end() && msg->first == currentVersion; ++msg)
|
||||
wr << msg->second.toStringRef();
|
||||
|
||||
Future<Void> f = yield(TaskUpdateStorage);
|
||||
if(!f.isReady()) {
|
||||
Void _ = wait(f);
|
||||
msg = std::upper_bound(tag->value.version_messages.begin(), tag->value.version_messages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
|
||||
|
||||
Future<Void> f = yield(TaskUpdateStorage);
|
||||
if(!f.isReady()) {
|
||||
Void _ = wait(f);
|
||||
msg = std::upper_bound(tagData->version_messages.begin(), tagData->version_messages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
}
|
||||
}
|
||||
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
}
|
||||
|
||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) );
|
||||
|
@ -538,9 +587,15 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
TEST(anyData); // TLog moved data to persistentData
|
||||
logData->persistentDataDurableVersion = newPersistentDataVersion;
|
||||
|
||||
for(tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag) {
|
||||
Void _ = wait(tag->value.eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, logData, TaskUpdateStorage ));
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_special][tag_locality][tag_id]) {
|
||||
Void _ = wait(logData->tag_data[tag_special][tag_locality][tag_id]->eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, logData, TaskUpdateStorage ));
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logData->version_sizes.erase(logData->version_sizes.begin(), logData->version_sizes.lower_bound(logData->persistentDataDurableVersion));
|
||||
|
@ -586,12 +641,26 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
|||
state Version nextVersion = 0;
|
||||
state int totalSize = 0;
|
||||
|
||||
state int tag_special = 0;
|
||||
state int tag_locality = 0;
|
||||
state int tag_id = 0;
|
||||
state Reference<LogData::TagData> tagData;
|
||||
|
||||
if(logData->stopped) {
|
||||
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
|
||||
while(logData->persistentDataDurableVersion != logData->version.get()) {
|
||||
std::vector<std::pair<std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator, std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator>> iters;
|
||||
for(auto tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag)
|
||||
iters.push_back(std::make_pair(tag->value.version_messages.begin(), tag->value.version_messages.end()));
|
||||
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_special][tag_locality][tag_id];
|
||||
if(tagData) {
|
||||
iters.push_back(std::make_pair(tagData->version_messages.begin(), tagData->version_messages.end()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nextVersion = 0;
|
||||
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT || nextVersion <= logData->persistentDataVersion ) {
|
||||
|
@ -646,14 +715,20 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
|||
++sizeItr;
|
||||
nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key;
|
||||
|
||||
state Map<Tag, LogData::TagData>::iterator tag;
|
||||
for(tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag) {
|
||||
auto it = std::lower_bound(tag->value.version_messages.begin(), tag->value.version_messages.end(), std::make_pair(prevVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
for(; it != tag->value.version_messages.end() && it->first < nextVersion; ++it) {
|
||||
totalSize += it->second.expectedSize();
|
||||
}
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_special][tag_locality][tag_id];
|
||||
if(tagData) {
|
||||
auto it = std::lower_bound(tagData->version_messages.begin(), tagData->version_messages.end(), std::make_pair(prevVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
for(; it != tagData->version_messages.end() && it->first < nextVersion; ++it) {
|
||||
totalSize += it->second.expectedSize();
|
||||
}
|
||||
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prevVersion = nextVersion;
|
||||
|
@ -732,18 +807,18 @@ void commitMessages( Reference<LogData> self, Version version, const std::vector
|
|||
|
||||
block.append(block.arena(), msg.message.begin(), msg.message.size());
|
||||
for(auto& tag : msg.tags) {
|
||||
auto tsm = self->tag_data.find(tag);
|
||||
if (tsm == self->tag_data.end()) {
|
||||
tsm = self->tag_data.insert( mapPair(std::move(Tag(tag)), LogData::TagData(Version(0), true, true, tag) ), false );
|
||||
Reference<LogData::TagData> tagData = self->getTagData(tag);
|
||||
if(!tagData) {
|
||||
tagData = self->createTagData(tag, 0, true, true);
|
||||
}
|
||||
|
||||
if (version >= tsm->value.popped) {
|
||||
tsm->value.version_messages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))));
|
||||
if(tsm->value.version_messages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) {
|
||||
TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tsm->value.version_messages.back().second.expectedSize());
|
||||
if (version >= tagData->popped) {
|
||||
tagData->version_messages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))));
|
||||
if(tagData->version_messages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) {
|
||||
TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tagData->version_messages.back().second.expectedSize());
|
||||
}
|
||||
if (tag != txsTag) {
|
||||
expectedBytes += tsm->value.version_messages.back().second.expectedSize();
|
||||
expectedBytes += tagData->version_messages.back().second.expectedSize();
|
||||
}
|
||||
|
||||
// The factor of VERSION_MESSAGES_OVERHEAD is intended to be an overestimate of the actual memory used to store this data in a std::deque.
|
||||
|
@ -789,31 +864,30 @@ void commitMessages( Reference<LogData> self, Version version, Arena arena, Stri
|
|||
}
|
||||
|
||||
Version poppedVersion( Reference<LogData> self, Tag tag) {
|
||||
auto mapIt = self->tag_data.find(tag);
|
||||
if (mapIt == self->tag_data.end())
|
||||
auto tagData = self->getTagData(tag);
|
||||
if (!tagData)
|
||||
return Version(0);
|
||||
return mapIt->value.popped;
|
||||
return tagData->popped;
|
||||
}
|
||||
|
||||
std::deque<std::pair<Version, LengthPrefixedStringRef>> & get_version_messages( Reference<LogData> self, Tag tag ) {
|
||||
auto mapIt = self->tag_data.find(tag);
|
||||
if (mapIt == self->tag_data.end()) {
|
||||
auto tagData = self->getTagData(tag);
|
||||
if (!tagData) {
|
||||
static std::deque<std::pair<Version, LengthPrefixedStringRef>> empty;
|
||||
return empty;
|
||||
}
|
||||
return mapIt->value.version_messages;
|
||||
return tagData->version_messages;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) {
|
||||
auto ti = logData->tag_data.find(req.tag);
|
||||
if (ti == logData->tag_data.end()) {
|
||||
ti = logData->tag_data.insert( mapPair(std::move(Tag(req.tag)), LogData::TagData(req.to, true, true, req.tag)) );
|
||||
} else if (req.to > ti->value.popped) {
|
||||
ti->value.popped = req.to;
|
||||
ti->value.popped_recently = true;
|
||||
//if (to.epoch == self->epoch())
|
||||
auto tagData = logData->getTagData(req.tag);
|
||||
if (!tagData) {
|
||||
tagData = logData->createTagData(req.tag, req.to, true, true);
|
||||
} else if (req.to > tagData->popped) {
|
||||
tagData->popped = req.to;
|
||||
tagData->popped_recently = true;
|
||||
if ( req.to > logData->persistentDataDurableVersion )
|
||||
Void _ = wait(ti->value.eraseMessagesBefore( req.to, &self->bytesDurable, logData, TaskTLogPop ));
|
||||
Void _ = wait(tagData->eraseMessagesBefore( req.to, &self->bytesDurable, logData, TaskTLogPop ));
|
||||
//TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to);
|
||||
}
|
||||
|
||||
|
@ -1595,8 +1669,9 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
Tag tag = decodeTagPoppedKey(rawId, kv.key);
|
||||
Version popped = decodeTagPoppedValue(kv.value);
|
||||
TraceEvent("TLogRestorePop", logData->logId).detail("Tag", tag.toString()).detail("To", popped);
|
||||
ASSERT( logData->tag_data.find(tag) == logData->tag_data.end() );
|
||||
logData->tag_data.insert( mapPair( std::move(Tag(tag)), LogData::TagData( popped, false, false, tag )) );
|
||||
auto tagData = logData->getTagData(tag);
|
||||
ASSERT( !tagData );
|
||||
logData->createTagData(tag, popped, false, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1765,14 +1840,14 @@ ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> l
|
|||
}
|
||||
if(r) tagPopped = std::max(tagPopped, r->popped());
|
||||
|
||||
auto tsm = logData->tag_data.find(tag);
|
||||
if (tsm == logData->tag_data.end()) {
|
||||
logData->tag_data.insert( mapPair(std::move(Tag(tag)), LogData::TagData(tagPopped, false, true, tag)) );
|
||||
auto tagData = logData->getTagData(tag);
|
||||
if(!tagData) {
|
||||
tagData = logData->createTagData(tag, tagPopped, false, true);
|
||||
}
|
||||
|
||||
Void _ = wait(tLogPop( self, TLogPopRequest(tagPopped, tag), logData ));
|
||||
|
||||
updatePersistentPopped( self, logData, tag, logData->tag_data.find(tag)->value );
|
||||
updatePersistentPopped( self, logData, logData->getTagData(tag) );
|
||||
|
||||
TraceEvent("LogRecoveringTagComplete", logData->logId).detail("Tag", tag.toString()).detail("recoverAt", endVersion);
|
||||
return Void();
|
||||
|
|
Loading…
Reference in New Issue