optimized tag lookups on the tlog by removing one level of vectors
This commit is contained in:
parent
d8e064d8bb
commit
0746fe4d56
|
@ -379,33 +379,24 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
Version knownCommittedVersion;
|
||||
|
||||
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
|
||||
std::vector<std::vector<std::vector<Reference<TagData>>>> tag_data; //negative or positive tag.locality | abs(tag.locality) | tag.id
|
||||
|
||||
Reference<TagData> getTagData(int8_t locality, uint16_t id, std::vector<std::vector<Reference<TagData>>>& data) {
|
||||
if(locality >= data.size()) {
|
||||
data.resize(locality+1);
|
||||
}
|
||||
if(id >= data[locality].size()) {
|
||||
data[locality].resize(id+1);
|
||||
}
|
||||
return data[locality][id];
|
||||
}
|
||||
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
|
||||
|
||||
Reference<TagData> getTagData(Tag tag) {
|
||||
if(tag.locality < 0) {
|
||||
return getTagData(-(1+tag.locality), tag.id, tag_data[1]);
|
||||
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
|
||||
if(idx >= tag_data.size()) {
|
||||
tag_data.resize(idx+1);
|
||||
}
|
||||
return getTagData(tag.locality, tag.id, tag_data[0]);
|
||||
if(tag.id >= tag_data[idx].size()) {
|
||||
tag_data[idx].resize(tag.id+1);
|
||||
}
|
||||
return tag_data[idx][tag.id];
|
||||
}
|
||||
|
||||
//only callable after getTagData returns a null reference
|
||||
Reference<TagData> createTagData(Tag tag, Version popped, bool nothing_persistent, bool popped_recently) {
|
||||
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, nothing_persistent, popped_recently) );
|
||||
if(tag.locality < 0) {
|
||||
tag_data[1][-(1+tag.locality)][tag.id] = newTagData;
|
||||
} else {
|
||||
tag_data[0][tag.locality][tag.id] = newTagData;
|
||||
}
|
||||
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
|
||||
tag_data[idx][tag.id] = newTagData;
|
||||
return newTagData;
|
||||
}
|
||||
|
||||
|
@ -431,7 +422,6 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(0)
|
||||
{
|
||||
tag_data.resize(2);
|
||||
startRole(interf.id(), UID(), "TLog");
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
|
@ -497,12 +487,10 @@ ACTOR Future<Void> tLogLock( TLogData* self, ReplyPromise< TLogLockResult > repl
|
|||
result.end = stopVersion;
|
||||
result.knownCommittedVersion = logData->knownCommittedVersion;
|
||||
|
||||
for(int tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(int tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(int tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_special][tag_locality][tag_id]) {
|
||||
result.tags.push_back(logData->tag_data[tag_special][tag_locality][tag_id]->tag);
|
||||
}
|
||||
for(int tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(int tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_locality][tag_id]) {
|
||||
result.tags.push_back(logData->tag_data[tag_locality][tag_id]->tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -539,14 +527,12 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
state bool anyData = false;
|
||||
|
||||
// For all existing tags
|
||||
state int tag_special = 0;
|
||||
state int tag_locality = 0;
|
||||
state int tag_id = 0;
|
||||
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
state Reference<LogData::TagData> tagData = logData->tag_data[tag_special][tag_locality][tag_id];
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
state Reference<LogData::TagData> tagData = logData->tag_data[tag_locality][tag_id];
|
||||
if(tagData) {
|
||||
state Version currentVersion = 0;
|
||||
// Clear recently popped versions from persistentData if necessary
|
||||
|
@ -575,7 +561,6 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) );
|
||||
logData->persistentDataVersion = newPersistentDataVersion;
|
||||
|
@ -588,16 +573,14 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
TEST(anyData); // TLog moved data to persistentData
|
||||
logData->persistentDataDurableVersion = newPersistentDataVersion;
|
||||
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_special][tag_locality][tag_id]) {
|
||||
Void _ = wait(logData->tag_data[tag_special][tag_locality][tag_id]->eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, logData, TaskUpdateStorage ));
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_locality][tag_id]) {
|
||||
Void _ = wait(logData->tag_data[tag_locality][tag_id]->eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, logData, TaskUpdateStorage ));
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logData->version_sizes.erase(logData->version_sizes.begin(), logData->version_sizes.lower_bound(logData->persistentDataDurableVersion));
|
||||
|
||||
|
@ -642,7 +625,6 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
|||
state Version nextVersion = 0;
|
||||
state int totalSize = 0;
|
||||
|
||||
state int tag_special = 0;
|
||||
state int tag_locality = 0;
|
||||
state int tag_id = 0;
|
||||
state Reference<LogData::TagData> tagData;
|
||||
|
@ -652,16 +634,14 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
|||
while(logData->persistentDataDurableVersion != logData->version.get()) {
|
||||
std::vector<std::pair<std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator, std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator>> iters;
|
||||
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_special][tag_locality][tag_id];
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_locality][tag_id];
|
||||
if(tagData) {
|
||||
iters.push_back(std::make_pair(tagData->version_messages.begin(), tagData->version_messages.end()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nextVersion = 0;
|
||||
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT || nextVersion <= logData->persistentDataVersion ) {
|
||||
|
@ -716,10 +696,9 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
|||
++sizeItr;
|
||||
nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key;
|
||||
|
||||
for(tag_special = 0; tag_special < 2; tag_special++) {
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data[tag_special].size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_special][tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_special][tag_locality][tag_id];
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_locality][tag_id];
|
||||
if(tagData) {
|
||||
auto it = std::lower_bound(tagData->version_messages.begin(), tagData->version_messages.end(), std::make_pair(prevVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
for(; it != tagData->version_messages.end() && it->first < nextVersion; ++it) {
|
||||
|
@ -730,7 +709,6 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prevVersion = nextVersion;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue