diff --git a/fdbserver/OldTLogServer.actor.cpp b/fdbserver/OldTLogServer.actor.cpp index 034b9009a2..c34e0ce699 100644 --- a/fdbserver/OldTLogServer.actor.cpp +++ b/fdbserver/OldTLogServer.actor.cpp @@ -44,24 +44,60 @@ using std::min; using std::max; namespace oldTLog { + + typedef int16_t OldTag; + + OldTag convertTag( Tag tag ) { + if(tag == invalidTag) return invalidTagOld; + if(tag == txsTag) return txsTagOld; + ASSERT(tag.id >= 0); + return tag.id; + } + + Tag convertOldTag( OldTag tag ) { + if(tag == invalidTagOld) return invalidTag; + if(tag == txsTagOld) return txsTag; + ASSERT(tag >= 0); + return Tag(tagLocalityUpgraded, tag); + } + + struct OldTagMessagesRef { + OldTag tag; + VectorRef messageOffsets; + + OldTagMessagesRef() {} + OldTagMessagesRef(Arena &a, const OldTagMessagesRef &from) : tag(from.tag), messageOffsets(a, from.messageOffsets) {} + + size_t expectedSize() const { + return messageOffsets.expectedSize(); + } + + template + void serialize(Ar& ar) { + ar & tag & messageOffsets; + } + }; + struct TLogQueueEntryRef { + UID id; Version version; Version knownCommittedVersion; StringRef messages; - VectorRef< TagMessagesRef > tags; + VectorRef< OldTagMessagesRef > tags; TLogQueueEntryRef() : version(0), knownCommittedVersion(0) {} TLogQueueEntryRef(Arena &a, TLogQueueEntryRef const &from) - : version(from.version), knownCommittedVersion(from.knownCommittedVersion), messages(a, from.messages), tags(a, from.tags) { + : version(from.version), knownCommittedVersion(from.knownCommittedVersion), id(from.id), messages(a, from.messages), tags(a, from.tags) { } template void serialize(Ar& ar) { if( ar.protocolVersion() >= 0x0FDB00A460010001) { - ar & version & messages & tags & knownCommittedVersion; + ar & version & messages & tags & knownCommittedVersion & id; } else if(ar.isDeserializing) { ar & version & messages & tags; knownCommittedVersion = 0; + id = UID(); } } size_t expectedSize() const { @@ -73,7 +109,7 @@ namespace oldTLog { struct TLogQueue : public IClosable { public: - TLogQueue( IDiskQueue* queue, UID dbgid ) : queue(queue), debugNextReadVersion(1), dbgid(dbgid) {} + TLogQueue( IDiskQueue* queue, UID dbgid ) : queue(queue), dbgid(dbgid) {} // Each packet in the queue is // uint32_t payloadSize @@ -93,7 +129,6 @@ namespace oldTLog { } void push( TLogQueueEntryRef const& qe ) { - ASSERT( version_location.empty() || version_location.lastItem()->key < qe.version ); BinaryWriter wr( Unversioned() ); // outer framing is not versioned wr << uint32_t(0); IncludeVersion().write(wr); // payload is versioned @@ -131,7 +166,6 @@ namespace oldTLog { private: IDiskQueue* queue; Map version_location; // For the version of each entry that was push()ed, the end location of the serialized bytes - Version debugNextReadVersion; UID dbgid; ACTOR static Future readNext( TLogQueue* self ) { @@ -165,8 +199,6 @@ namespace oldTLog { Arena a = e.arena(); ArenaReader ar( a, e.substr(0, payloadSize), IncludeVersion() ); ar >> result; - ASSERT( result.version >= self->debugNextReadVersion ); - self->debugNextReadVersion = result.version + 1; self->version_location[result.version] = self->queue->getNextReadLocation(); return result; } @@ -180,30 +212,108 @@ namespace oldTLog { } }; - struct LengthPrefixedStringRef { - // Represents a pointer to a string which is prefixed by a 4-byte length - // A LengthPrefixedStringRef is only pointer-sized (8 bytes vs 12 bytes for StringRef), but the corresponding string is 4 bytes bigger, and - // substring operations aren't efficient as they are with StringRef. It's a good choice when there might be lots of references to the same - // exact string. + ////// Persistence format (for self->persistentData) - uint32_t* length; + // Immutable keys + static const KeyValueRef persistFormat( LiteralStringRef( "Format" ), LiteralStringRef("FoundationDB/LogServer/2/3") ); + static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/LogServer/2/3"), LiteralStringRef("FoundationDB/LogServer/2/4") ); + static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef( LiteralStringRef( "DbRecoveryCount/" ), LiteralStringRef( "DbRecoveryCount0" ) ); - StringRef toStringRef() const { ASSERT(length); return StringRef( (uint8_t*)(length+1), *length ); } - int expectedSize() const { ASSERT(length); return *length; } - uint32_t* getLengthPtr() const { return length; } + // Updated on updatePersistentData() + static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef( LiteralStringRef( "version/" ), LiteralStringRef( "version0" ) ); + static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/")); + static const KeyRange persistTagPoppedKeys = prefixRange(LiteralStringRef("TagPop/")); - LengthPrefixedStringRef() : length(NULL) {} - LengthPrefixedStringRef(uint32_t* length) : length(length) {} - }; + static Key persistTagMessagesKey( UID id, OldTag tag, Version version ) { + BinaryWriter wr( Unversioned() ); + wr.serializeBytes(persistTagMessagesKeys.begin); + wr << id; + wr << tag; + wr << bigEndian64( version ); + return wr.toStringRef(); + } - template - struct CompareFirst { - bool operator() (T const& lhs, T const& rhs) const { - return lhs.first < rhs.first; - } - }; + static Key persistTagPoppedKey( UID id, OldTag tag ) { + BinaryWriter wr(Unversioned()); + wr.serializeBytes( persistTagPoppedKeys.begin ); + wr << id; + wr << tag; + return wr.toStringRef(); + } + + static Value persistTagPoppedValue( Version popped ) { + return BinaryWriter::toValue( popped, Unversioned() ); + } + + static OldTag decodeTagPoppedKey( KeyRef id, KeyRef key ) { + OldTag s; + BinaryReader rd( key.removePrefix(persistTagPoppedKeys.begin).removePrefix(id), Unversioned() ); + rd >> s; + return s; + } + + static Version decodeTagPoppedValue( ValueRef value ) { + return BinaryReader::fromStringRef( value, Unversioned() ); + } + + static StringRef stripTagMessagesKey( StringRef key ) { + return key.substr( sizeof(UID) + sizeof(OldTag) + persistTagMessagesKeys.begin.size() ); + } + + static Version decodeTagMessagesKey( StringRef key ) { + return bigEndian64( BinaryReader::fromStringRef( stripTagMessagesKey(key), Unversioned() ) ); + } struct TLogData : NonCopyable { + AsyncTrigger newLogData; + Deque queueOrder; + std::map> id_data; + + UID dbgid; + + IKeyValueStore* persistentData; + IDiskQueue* rawPersistentQueue; + TLogQueue *persistentQueue; + + int64_t diskQueueCommitBytes; + AsyncVar largeDiskQueueCommitBytes; //becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES + + Reference> dbInfo; + + NotifiedVersion queueCommitEnd; + Version queueCommitBegin; + AsyncTrigger newVersion; + + int64_t instanceID; + int64_t bytesInput; + int64_t bytesDurable; + + Version prevVersion; + + struct peekTrackerData { + std::map> sequence_version; + double lastUpdate; + }; + + std::map peekTracker; + WorkerCache tlogCache; + + Future updatePersist; //SOMEDAY: integrate the recovery and update storage so that only one of them is committing to persistant data. + + PromiseStream> sharedActors; + bool terminated; + + TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference> const& dbInfo) + : dbgid(dbgid), instanceID(g_random->randomUniqueID().first()), + persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), + dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0), + diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), + bytesInput(0), bytesDurable(0), updatePersist(Void()), terminated(false) + { + } + }; + + struct LogData : NonCopyable, public ReferenceCounted { struct TagData { std::deque> version_messages; bool nothing_persistent; // true means tag is *known* to have no messages in persistentData. false means nothing. @@ -211,7 +321,7 @@ namespace oldTLog { Version popped; // see popped version tracking contract below bool update_version_sizes; - TagData( Version popped, bool nothing_persistent, bool popped_recently, Tag tag ) : nothing_persistent(nothing_persistent), popped(popped), popped_recently(popped_recently), update_version_sizes(tag != txsTag) {} + TagData( Version popped, bool nothing_persistent, bool popped_recently, OldTag tag ) : nothing_persistent(nothing_persistent), popped(popped), popped_recently(popped_recently), update_version_sizes(tag != txsTagOld) {} TagData(TagData&& r) noexcept(true) : version_messages(std::move(r.version_messages)), nothing_persistent(r.nothing_persistent), popped_recently(r.popped_recently), popped(r.popped), update_version_sizes(r.update_version_sizes) {} void operator= (TagData&& r) noexcept(true) { @@ -223,7 +333,7 @@ namespace oldTLog { } // Erase messages not needed to update *from* versions >= before (thus, messages with toversion <= before) - ACTOR Future eraseMessagesBefore( TagData *self, Version before, Counter* bytesErased, TLogData *tlogData, int taskID ) { + ACTOR Future eraseMessagesBefore( TagData *self, Version before, int64_t* gBytesErased, Reference tlogData, int taskID ) { while(!self->version_messages.empty() && self->version_messages.front().first < before) { Version version = self->version_messages.front().first; std::pair &sizes = tlogData->version_sizes[version]; @@ -240,235 +350,171 @@ namespace oldTLog { self->version_messages.pop_front(); } - *bytesErased += (messagesErased * sizeof(std::pair) * SERVER_KNOBS->VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS) >> 10; + int64_t bytesErased = (messagesErased * sizeof(std::pair) * SERVER_KNOBS->VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS) >> 10; + tlogData->bytesDurable += bytesErased; + *gBytesErased += bytesErased; Void _ = wait(yield(taskID)); } return Void(); } - Future eraseMessagesBefore(Version before, Counter* bytesErased, TLogData *tlogData, int taskID) { - return eraseMessagesBefore(this, before, bytesErased, tlogData, taskID); + Future eraseMessagesBefore(Version before, int64_t* gBytesErased, Reference tlogData, int taskID) { + return eraseMessagesBefore(this, before, gBytesErased, tlogData, taskID); } }; /* Popped version tracking contract needed by log system to implement ILogCursor::popped(): - - Log server tracks for each (possible) tag a popped_version + - Log server tracks for each (possible) tag a popped_version Impl: TagData::popped (in memory) and persistTagPoppedKeys (in persistentData) - - popped_version(tag) is <= the maximum version for which log server (or a predecessor) is ever asked to pop the tag + - popped_version(tag) is <= the maximum version for which log server (or a predecessor) is ever asked to pop the tag Impl: Only increased by tLogPop() in response to either a pop request or recovery from a predecessor - - popped_version(tag) is > the maximum version for which log server is unable to peek messages due to previous pops (on this server or a predecessor) + - popped_version(tag) is > the maximum version for which log server is unable to peek messages due to previous pops (on this server or a predecessor) Impl: Increased by tLogPop() atomically with erasing messages from memory; persisted by updatePersistentData() atomically with erasing messages from store; messages are not erased from queue where popped_version is not persisted - - LockTLogReply returns all tags which either have messages, or which have nonzero popped_versions + - LockTLogReply returns all tags which either have messages, or which have nonzero popped_versions Impl: tag_data is present for all such tags - - peek(tag, v) returns the popped_version for tag if that is greater than v + - peek(tag, v) returns the popped_version for tag if that is greater than v Impl: Check tag_data->popped (after all waits) */ - struct peekTrackerData { - std::map> sequence_version; - double lastUpdate; - }; - - std::map peekTracker; - - UID dbgid; - bool coreStarted; - bool stopped; + bool stopped, initialized; DBRecoveryCount recoveryCount; - IKeyValueStore* persistentData; - IDiskQueue* rawPersistentQueue; - TLogQueue *persistentQueue; VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // The last version number in the portion of the log (written|durable) to persistentData - NotifiedVersion version, queueCommittedVersion, queueCommitEnd; - Version queueCommitBegin, queueCommittingVersion; - int64_t diskQueueCommitBytes; - AsyncVar largeDiskQueueCommitBytes; //becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES - Version prevVersion, knownCommittedVersion; + NotifiedVersion version, queueCommittedVersion; + Version queueCommittingVersion; + Version knownCommittedVersion; Deque>>> messageBlocks; - Map< Tag, TagData > tag_data; + Map< OldTag, TagData > tag_data; Map> version_sizes; - int64_t instanceID; CounterCollection cc; Counter bytesInput; Counter bytesDurable; - Reference> dbInfo; - Future updatePersist; //SOMEDAY: integrate the recovery and update storage so that only one of them is committing to persistant data. + UID logId; + Version newPersistentDataVersion; + Future removed; + TLogInterface tli; PromiseStream> addActor; + TLogData* tLogData; + Promise recoverySuccessful; + Future recovery; - TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference> const& dbInfo) - : dbgid(dbgid), - persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), - prevVersion(0), knownCommittedVersion(0), - dbInfo(dbInfo), - updatePersist(Void()), - instanceID(g_random->randomUniqueID().first()), - cc("TLog", dbgid.toString()), - bytesInput("bytesInput", cc), - bytesDurable("bytesDurable", cc), - // These are initialized differently on init() or recovery - recoveryCount(), coreStarted(false), stopped(false), queueCommitBegin(0), queueCommitEnd(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), queueCommittingVersion(0) + explicit LogData(TLogData* tLogData, TLogInterface interf) : tLogData(tLogData), knownCommittedVersion(0), tli(interf), logId(interf.id()), + cc("TLog", interf.id().toString()), + bytesInput("bytesInput", cc), + bytesDurable("bytesDurable", cc), + // These are initialized differently on init() or recovery + recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void()) { + startRole(interf.id(), UID(), "TLog"); + persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id); persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id); version.initMetric(LiteralStringRef("TLog.Version"), cc.id); queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id); specialCounter(cc, "version", [this](){ return this->version.get(); }); - specialCounter(cc, "kvstoreBytesUsed", [this](){ return this->persistentData->getStorageBytes().used; }); - specialCounter(cc, "kvstoreBytesFree", [this](){ return this->persistentData->getStorageBytes().free; }); - specialCounter(cc, "kvstoreBytesAvailable", [this](){ return this->persistentData->getStorageBytes().available; }); - specialCounter(cc, "kvstoreBytesTotal", [this](){ return this->persistentData->getStorageBytes().total; }); - specialCounter(cc, "queueDiskBytesUsed", [this](){ return this->rawPersistentQueue->getStorageBytes().used; }); - specialCounter(cc, "queueDiskBytesFree", [this](){ return this->rawPersistentQueue->getStorageBytes().free; }); - specialCounter(cc, "queueDiskBytesAvailable", [this](){ return this->rawPersistentQueue->getStorageBytes().available; }); - specialCounter(cc, "queueDiskBytesTotal", [this](){ return this->rawPersistentQueue->getStorageBytes().total; }); + specialCounter(cc, "sharedBytesInput", [tLogData](){ return tLogData->bytesInput; }); + specialCounter(cc, "sharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; }); + specialCounter(cc, "kvstoreBytesUsed", [tLogData](){ return tLogData->persistentData->getStorageBytes().used; }); + specialCounter(cc, "kvstoreBytesFree", [tLogData](){ return tLogData->persistentData->getStorageBytes().free; }); + specialCounter(cc, "kvstoreBytesAvailable", [tLogData](){ return tLogData->persistentData->getStorageBytes().available; }); + specialCounter(cc, "kvstoreBytesTotal", [tLogData](){ return tLogData->persistentData->getStorageBytes().total; }); + specialCounter(cc, "queueDiskBytesUsed", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().used; }); + specialCounter(cc, "queueDiskBytesFree", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().free; }); + specialCounter(cc, "queueDiskBytesAvailable", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().available; }); + specialCounter(cc, "queueDiskBytesTotal", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().total; }); + } + + ~LogData() { + tLogData->bytesDurable += bytesInput.getValue() - bytesDurable.getValue(); + TraceEvent("TLogBytesWhenRemoved", tli.id()).detail("sharedBytesInput", tLogData->bytesInput).detail("sharedBytesDurable", tLogData->bytesDurable).detail("localBytesInput", bytesInput.getValue()).detail("localBytesDurable", bytesDurable.getValue()); + + ASSERT(tLogData->bytesDurable <= tLogData->bytesInput); + endRole(tli.id(), "TLog", "Error", true); + + if(!tLogData->terminated) { + Key logIdKey = BinaryWriter::toValue(logId,Unversioned()); + tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistCurrentVersionKeys.begin)) ); + tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) ); + Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin); + tLogData->persistentData->clear( KeyRangeRef( msgKey, strinc(msgKey) ) ); + Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin); + tLogData->persistentData->clear( KeyRangeRef( poppedKey, strinc(poppedKey) ) ); + } } LogEpoch epoch() const { return recoveryCount; } }; - ACTOR Future tLogLock( TLogData* self, ReplyPromise< TLogLockResult > reply ) { - state Version stopVersion = self->version.get(); + ACTOR Future tLogLock( TLogData* self, ReplyPromise< TLogLockResult > reply, Reference logData ) { + state Version stopVersion = logData->version.get(); TEST(true); // TLog stopped by recovering master - TEST( self->stopped ); - TEST( !self->stopped ); + TEST( logData->stopped ); + TEST( !logData->stopped ); - TraceEvent("TLogStop", self->dbgid).detail("Ver", stopVersion).detail("isStopped", self->stopped); + TraceEvent("TLogStop", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get()); - self->stopped = true; + logData->stopped = true; + if(logData->recoverySuccessful.canBeSet()) { + logData->recoverySuccessful.send(false); + } // Lock once the current version has been committed - Void _ = wait( self->queueCommittedVersion.whenAtLeast( stopVersion ) ); + Void _ = wait( logData->queueCommittedVersion.whenAtLeast( stopVersion ) ); - ASSERT(stopVersion == self->version.get()); + ASSERT(stopVersion == logData->version.get()); TLogLockResult result; result.end = stopVersion; - result.knownCommittedVersion = self->knownCommittedVersion; - for( auto & tag : self->tag_data ) - result.tags.push_back( tag.key ); + result.knownCommittedVersion = logData->knownCommittedVersion; + for( auto & tag : logData->tag_data ) + result.tags.push_back( convertOldTag(tag.key) ); + + TraceEvent("TLogStop2", self->dbgid).detail("logId", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get()).detail("tags", describe(result.tags)); + reply.send( result ); return Void(); } - ////// Persistence format (for self->persistentData) - - // Immutable keys - static const KeyValueRef persistFormat( LiteralStringRef( "Format" ), LiteralStringRef("FoundationDB/LogServer/2/2") ); - static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/LogServer/2/2"), LiteralStringRef("FoundationDB/LogServer/2/3") ); - static const KeyRef persistID = LiteralStringRef( "ID" ); - static const KeyRef persistRecoveryCountKey = LiteralStringRef("DbRecoveryCount"); - - // Updated on updatePersistentData() - static const KeyRef persistCurrentVersionKey = LiteralStringRef("version"); - static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/")); - static const KeyRange persistTagPoppedKeys = prefixRange(LiteralStringRef("TagPop/")); - - // Only present during network recovery process - static const KeyValueRef persistRecoveryInProgress( LiteralStringRef("RecoveryInProgress"), LiteralStringRef("1") ); - - static Key persistTagMessagesKey( Tag tag, Version version ) { - BinaryWriter wr( Unversioned() ); - wr.serializeBytes(persistTagMessagesKeys.begin); - wr << tag; - wr << bigEndian64( version ); - return wr.toStringRef(); - } - - static Key persistTagPoppedKey( Tag tag ) { - BinaryWriter wr(Unversioned()); - wr.serializeBytes( persistTagPoppedKeys.begin ); - wr << tag; - return wr.toStringRef(); - } - - static Value persistTagPoppedValue( Version popped ) { - return BinaryWriter::toValue( popped, Unversioned() ); - } - - static Tag decodeTagPoppedKey( KeyRef key ) { - Tag s; - BinaryReader rd( key.removePrefix(persistTagPoppedKeys.begin), Unversioned() ); - rd >> s; - return s; - } - - static Version decodeTagPoppedValue( ValueRef value ) { - return BinaryReader::fromStringRef( value, Unversioned() ); - } - - static StringRef stripTagMessagesKey( StringRef key ) { - return key.substr( sizeof(Tag) + persistTagMessagesKeys.begin.size() ); - } - - static Version decodeTagMessagesKey( StringRef key ) { - return bigEndian64( BinaryReader::fromStringRef( stripTagMessagesKey(key), Unversioned() ) ); - } - - static Standalone decodeTagMessagesKeyTag( StringRef key ) { - key = key.removePrefix( persistTagMessagesKeys.begin ); // \x00\xff - BinaryWriter wr( Unversioned() ); - for(auto c = key.begin(); c != key.end(); ++c) { - if (*c) - wr << *c; - else { - ASSERT( c+1 != key.end() ); - if (c[1] == 0xff) { - wr << uint8_t(0); - c++; - } else if (c[1] == 0) - break; - else - throw internal_error(); - } - } - return wr.toStringRef(); - } - - void validate( TLogData* self, bool force = false ) { - } - - void updatePersistentPopped( TLogData* self, Tag tag, TLogData::TagData& data ) { + void updatePersistentPopped( TLogData* self, Reference logData, OldTag tag, LogData::TagData& data ) { if (!data.popped_recently) return; - self->persistentData->set(KeyValueRef( persistTagPoppedKey(tag), persistTagPoppedValue(data.popped) )); + self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, tag), persistTagPoppedValue(data.popped) )); data.popped_recently = false; if (data.nothing_persistent) return; self->persistentData->clear( KeyRangeRef( - persistTagMessagesKey( tag, Version(0) ), - persistTagMessagesKey( tag, data.popped ) ) ); - if (data.popped > self->persistentDataVersion) + persistTagMessagesKey( logData->logId, tag, Version(0) ), + persistTagMessagesKey( logData->logId, tag, data.popped ) ) ); + if (data.popped > logData->persistentDataVersion) data.nothing_persistent = true; - //TraceEvent("TLogPopWrite", self->dbgid).detail("Tag", tag).detail("To", data.popped); } - ACTOR Future updatePersistentData( TLogData* self, Version newPersistentDataVersion ) { + ACTOR Future updatePersistentData( TLogData* self, Reference logData, Version newPersistentDataVersion ) { // PERSIST: Changes self->persistentDataVersion and writes and commits the relevant changes - ASSERT( newPersistentDataVersion <= self->version.get() ); - ASSERT( newPersistentDataVersion <= self->queueCommittedVersion.get() ); - ASSERT( newPersistentDataVersion > self->persistentDataVersion ); - ASSERT( self->persistentDataVersion == self->persistentDataDurableVersion ); + ASSERT( newPersistentDataVersion <= logData->version.get() ); + ASSERT( newPersistentDataVersion <= logData->queueCommittedVersion.get() ); + ASSERT( newPersistentDataVersion > logData->persistentDataVersion ); + ASSERT( logData->persistentDataVersion == logData->persistentDataDurableVersion ); //TraceEvent("updatePersistentData", self->dbgid).detail("seq", newPersistentDataSeq); state bool anyData = false; - state Map::iterator tag; + state Map::iterator tag; // For all existing tags - for(tag = self->tag_data.begin(); tag != self->tag_data.end(); ++tag) { + for(tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag) { state Version currentVersion = 0; // Clear recently popped versions from persistentData if necessary - updatePersistentPopped( self, tag->key, tag->value ); + updatePersistentPopped( self, logData, tag->key, tag->value ); // Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData state std::deque>::iterator msg = tag->value.version_messages.begin(); while(msg != tag->value.version_messages.end() && msg->first <= newPersistentDataVersion) { @@ -480,7 +526,7 @@ namespace oldTLog { for(; msg != tag->value.version_messages.end() && msg->first == currentVersion; ++msg) wr << msg->second.toStringRef(); - self->persistentData->set( KeyValueRef( persistTagMessagesKey( tag->key, currentVersion ), wr.toStringRef() ) ); + self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tag->key, currentVersion ), wr.toStringRef() ) ); Future f = yield(TaskUpdateStorage); if(!f.isReady()) { @@ -492,8 +538,8 @@ namespace oldTLog { Void _ = wait(yield(TaskUpdateStorage)); } - self->persistentData->set( KeyValueRef( persistCurrentVersionKey, BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) ); - self->persistentDataVersion = newPersistentDataVersion; + self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) ); + logData->persistentDataVersion = newPersistentDataVersion; Void _ = wait( self->persistentData->commit() ); // SOMEDAY: This seems to be running pretty often, should we slow it down??? Void _ = wait( delay(0, TaskUpdateStorage) ); @@ -501,24 +547,31 @@ namespace oldTLog { // Now that the changes we made to persistentData are durable, erase the data we moved from memory and the queue, increase bytesDurable accordingly, and update persistentDataDurableVersion. TEST(anyData); // TLog moved data to persistentData - self->persistentDataDurableVersion = newPersistentDataVersion; + logData->persistentDataDurableVersion = newPersistentDataVersion; - for(tag = self->tag_data.begin(); tag != self->tag_data.end(); ++tag) { - Void _ = wait(tag->value.eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, self, TaskUpdateStorage )); + for(tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag) { + Void _ = wait(tag->value.eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, logData, TaskUpdateStorage )); Void _ = wait(yield(TaskUpdateStorage)); } - self->version_sizes.erase(self->version_sizes.begin(), self->version_sizes.lower_bound(self->persistentDataDurableVersion)); + logData->version_sizes.erase(logData->version_sizes.begin(), logData->version_sizes.lower_bound(logData->persistentDataDurableVersion)); Void _ = wait(yield(TaskUpdateStorage)); - while(!self->messageBlocks.empty() && self->messageBlocks.front().first <= newPersistentDataVersion) { - self->bytesDurable += self->messageBlocks.front().second.size() * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR; - self->messageBlocks.pop_front(); + while(!logData->messageBlocks.empty() && logData->messageBlocks.front().first <= newPersistentDataVersion) { + int64_t bytesErased = int64_t(logData->messageBlocks.front().second.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR; + logData->bytesDurable += bytesErased; + self->bytesDurable += bytesErased; + logData->messageBlocks.pop_front(); Void _ = wait(yield(TaskUpdateStorage)); } - ASSERT(self->bytesDurable.getValue() <= self->bytesInput.getValue()); + if(logData->bytesDurable.getValue() > logData->bytesInput.getValue() || self->bytesDurable > self->bytesInput) { + TraceEvent(SevError, "BytesDurableTooLarge", logData->logId).detail("sharedBytesInput", self->bytesInput).detail("sharedBytesDurable", self->bytesDurable).detail("localBytesInput", logData->bytesInput.getValue()).detail("localBytesDurable", logData->bytesDurable.getValue()); + } + + ASSERT(logData->bytesDurable.getValue() <= logData->bytesInput.getValue()); + ASSERT(self->bytesDurable <= self->bytesInput); if( self->queueCommitEnd.get() > 0 ) self->persistentQueue->pop( newPersistentDataVersion+1 ); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions. Should we limit the number of versions cleared at a time? @@ -530,23 +583,82 @@ namespace oldTLog { // For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce latencies for more important // work (e.g. commits). ACTOR Future updateStorage( TLogData* self ) { - Void _ = wait(delay(0, TaskUpdateStorage)); - loop { - state Version prevVersion = 0; - state Version nextVersion = 0; - state int totalSize = 0; + while(self->queueOrder.size() && !self->id_data.count(self->queueOrder.front())) { + self->queueOrder.pop_front(); + } - state Map>::iterator sizeItr = self->version_sizes.begin(); - while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != self->version_sizes.end() - && (self->bytesInput.getValue() - self->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) ) + if(!self->queueOrder.size()) { + Void _ = wait( delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskUpdateStorage) ); + return Void(); + } + + state Reference logData = self->id_data[self->queueOrder.front()]; + state Version prevVersion = 0; + state Version nextVersion = 0; + state int totalSize = 0; + + if(logData->stopped) { + if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) { + while(logData->persistentDataDurableVersion != logData->version.get()) { + std::vector>::iterator, std::deque>::iterator>> iters; + for(auto tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag) + iters.push_back(std::make_pair(tag->value.version_messages.begin(), tag->value.version_messages.end())); + + nextVersion = 0; + while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT || nextVersion <= logData->persistentDataVersion ) { + nextVersion = logData->version.get(); + for( auto &it : iters ) + if(it.first != it.second) + nextVersion = std::min( nextVersion, it.first->first + 1 ); + + if(nextVersion == logData->version.get()) + break; + + for( auto &it : iters ) { + while (it.first != it.second && it.first->first < nextVersion) { + totalSize += it.first->second.expectedSize(); + ++it.first; + } + } + } + + Void _ = wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) ); + Void _ = wait( delay(0, TaskUpdateStorage) ); + + //TraceEvent("TlogUpdatePersist", self->dbgid).detail("logId", logData->logId).detail("nextVersion", nextVersion).detail("version", logData->version.get()).detail("persistentDataDurableVer", logData->persistentDataDurableVersion).detail("queueCommitVer", logData->queueCommittedVersion.get()).detail("persistDataVer", logData->persistentDataVersion); + if (nextVersion > logData->persistentDataVersion) { + self->updatePersist = updatePersistentData(self, logData, nextVersion); + Void _ = wait( self->updatePersist ); + } else { + Void _ = wait( delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskUpdateStorage) ); + } + + if( logData->removed.isReady() ) { + break; + } + } + + if(logData->persistentDataDurableVersion == logData->version.get()) { + self->queueOrder.pop_front(); + } + Void _ = wait( delay(0.0, TaskUpdateStorage) ); + } else { + Void _ = wait( delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskUpdateStorage) ); + } + } + else if(logData->initialized) { + ASSERT(self->queueOrder.size() == 1); + state Map>::iterator sizeItr = logData->version_sizes.begin(); + while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() + && (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) ) { Void _ = wait( yield(TaskUpdateStorage) ); ++sizeItr; - nextVersion = sizeItr == self->version_sizes.end() ? self->version.get() : sizeItr->key; + nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key; - state Map::iterator tag; - for(tag = self->tag_data.begin(); tag != self->tag_data.end(); ++tag) { + state Map::iterator tag; + for(tag = logData->tag_data.begin(); tag != logData->tag_data.end(); ++tag) { auto it = std::lower_bound(tag->value.version_messages.begin(), tag->value.version_messages.end(), std::make_pair(prevVersion, LengthPrefixedStringRef()), CompareFirst>()); for(; it != tag->value.version_messages.end() && it->first < nextVersion; ++it) { totalSize += it->second.expectedSize(); @@ -558,15 +670,15 @@ namespace oldTLog { prevVersion = nextVersion; } - nextVersion = std::max(nextVersion, self->persistentDataVersion); + nextVersion = std::max(nextVersion, logData->persistentDataVersion); - //TraceEvent("UpdateStorageVer", self->dbgid).detail("nextVersion", nextVersion).detail("persistentDataVersion", self->persistentDataVersion).detail("totalSize", totalSize); + //TraceEvent("UpdateStorageVer", logData->logId).detail("nextVersion", nextVersion).detail("persistentDataVersion", logData->persistentDataVersion).detail("totalSize", totalSize); - Void _ = wait( self->queueCommittedVersion.whenAtLeast( nextVersion ) ); + Void _ = wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) ); Void _ = wait( delay(0, TaskUpdateStorage) ); - if (nextVersion > self->persistentDataVersion) { - self->updatePersist = updatePersistentData(self, nextVersion); + if (nextVersion > logData->persistentDataVersion) { + self->updatePersist = updatePersistentData(self, logData, nextVersion); Void _ = wait( self->updatePersist ); } @@ -578,10 +690,21 @@ namespace oldTLog { //updatePersist returns another one has not been started yet. Void _ = wait( delay(0.0, TaskUpdateStorage) ); } + } else { + Void _ = wait( delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskUpdateStorage) ); + } + return Void(); + } + + ACTOR Future updateStorageLoop( TLogData* self ) { + Void _ = wait(delay(0, TaskUpdateStorage)); + + loop { + Void _ = wait( updateStorage(self) ); } } - void commitMessages( TLogData* self, Version version, Arena arena, StringRef messages, VectorRef< TagMessagesRef > tags) { + void commitMessages( Reference self, Version version, Arena arena, StringRef messages, VectorRef< OldTagMessagesRef > tags, int64_t& bytesInput) { // SOMEDAY: This method of copying messages is reasonably memory efficient, but it's still a lot of bytes copied. Find a // way to do the memory allocation right as we receive the messages in the network layer. @@ -644,7 +767,7 @@ namespace oldTLog { auto tsm = self->tag_data.find(tag->tag); if (tsm == self->tag_data.end()) { - tsm = self->tag_data.insert( mapPair(std::move(Tag(tag->tag)), TLogData::TagData(Version(0), true, true, tag->tag) ), false ); + tsm = self->tag_data.insert( mapPair(std::move(OldTag(tag->tag)), LogData::TagData(Version(0), true, true, tag->tag) ), false ); } if (version >= tsm->value.popped) { @@ -655,7 +778,7 @@ namespace oldTLog { if(tsm->value.version_messages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) { TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tsm->value.version_messages.back().second.expectedSize()); } - if (tag->tag != txsTag) + if (tag->tag != txsTagOld) expectedBytes += tsm->value.version_messages.back().second.expectedSize(); ++tagMessages; @@ -671,18 +794,19 @@ namespace oldTLog { self->version_sizes[version] = make_pair(expectedBytes, expectedBytes); self->bytesInput += addedBytes; + bytesInput += addedBytes; //TraceEvent("TLogPushed", self->dbgid).detail("Bytes", addedBytes).detail("MessageBytes", messages.size()).detail("Tags", tags.size()).detail("expectedBytes", expectedBytes).detail("mCount", mCount).detail("tCount", tCount); } - Version poppedVersion( TLogData* self, Tag tag) { + Version poppedVersion( Reference self, OldTag tag) { auto mapIt = self->tag_data.find(tag); if (mapIt == self->tag_data.end()) return Version(0); return mapIt->value.popped; } - std::deque> & get_version_messages( TLogData* self, Tag tag ) { + std::deque> & get_version_messages( Reference self, OldTag tag ) { auto mapIt = self->tag_data.find(tag); if (mapIt == self->tag_data.end()) { static std::deque> empty; @@ -691,29 +815,28 @@ namespace oldTLog { return mapIt->value.version_messages; }; - ACTOR Future tLogPop( TLogData* self, TLogPopRequest req ) { - auto ti = self->tag_data.find(req.tag); - if (ti == self->tag_data.end()) { - ti = self->tag_data.insert( mapPair(std::move(Tag(req.tag)), TLogData::TagData(req.to, true, true, req.tag)) ); + ACTOR Future tLogPop( TLogData* self, TLogPopRequest req, Reference logData ) { + OldTag oldTag = convertTag(req.tag); + auto ti = logData->tag_data.find(oldTag); + if (ti == logData->tag_data.end()) { + ti = logData->tag_data.insert( mapPair(oldTag, LogData::TagData(req.to, true, true, oldTag)) ); } else if (req.to > ti->value.popped) { ti->value.popped = req.to; ti->value.popped_recently = true; //if (to.epoch == self->epoch()) - if ( req.to > self->persistentDataDurableVersion ) - Void _ = wait(ti->value.eraseMessagesBefore( req.to, &self->bytesDurable, self, TaskTLogPop )); - //TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to); + if ( req.to > logData->persistentDataDurableVersion ) + Void _ = wait(ti->value.eraseMessagesBefore( req.to, &self->bytesDurable, logData, TaskTLogPop )); } req.reply.send(Void()); return Void(); } - void peekMessagesFromMemory( TLogData* self, TLogPeekRequest const& req, BinaryWriter& messages, Version& endVersion ) { + void peekMessagesFromMemory( Reference self, TLogPeekRequest const& req, BinaryWriter& messages, Version& endVersion ) { + OldTag oldTag = convertTag(req.tag); ASSERT( !messages.getLength() ); - auto& deque = get_version_messages(self, req.tag); - //TraceEvent("tLogPeekMem", self->dbgid).detail("Tag", printable(req.tag1)).detail("pDS", self->persistentDataSequence).detail("pDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size()); - + auto& deque = get_version_messages(self, oldTag); Version begin = std::max( req.begin, self->persistentDataDurableVersion+1 ); auto it = std::lower_bound(deque.begin(), deque.end(), std::make_pair(begin, LengthPrefixedStringRef()), CompareFirst>()); @@ -730,15 +853,25 @@ namespace oldTLog { messages << int32_t(-1) << currentVersion; } - messages << it->second.toStringRef(); + BinaryReader rd( it->second.getLengthPtr(), it->second.expectedSize()+4, Unversioned() ); + while(!rd.empty()) { + int32_t messageLength; + uint32_t subVersion; + rd >> messageLength >> subVersion; + messageLength += sizeof(uint16_t); + messages << messageLength << subVersion << uint16_t(0); + messageLength -= (sizeof(subVersion) + sizeof(uint16_t)); + messages.serializeBytes(rd.readBytes(messageLength), messageLength); + } } } - ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req ) { + ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Reference logData ) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); state int sequence = -1; state UID peekId; + state OldTag oldTag = convertTag(req.tag); if(req.sequence.present()) { try { @@ -761,41 +894,51 @@ namespace oldTLog { } } - if( req.returnIfBlocked && self->version.get() < req.begin ) { + if( req.returnIfBlocked && logData->version.get() < req.begin ) { req.reply.sendError(end_of_stream()); return Void(); } //TraceEvent("tLogPeekMessages0", self->dbgid).detail("reqBeginEpoch", req.begin.epoch).detail("reqBeginSeq", req.begin.sequence).detail("epoch", self->epoch()).detail("persistentDataSeq", self->persistentDataSequence).detail("Tag1", printable(req.tag1)).detail("Tag2", printable(req.tag2)); // Wait until we have something to return that the caller doesn't already have - if( self->version.get() < req.begin ) { - Void _ = wait( self->version.whenAtLeast( req.begin ) ); + if( logData->version.get() < req.begin ) { + Void _ = wait( logData->version.whenAtLeast( req.begin ) ); Void _ = wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) ); } - state Version endVersion = self->version.get() + 1; + state Version endVersion = logData->version.get() + 1; //grab messages from disk //TraceEvent("tLogPeekMessages", self->dbgid).detail("reqBeginEpoch", req.begin.epoch).detail("reqBeginSeq", req.begin.sequence).detail("epoch", self->epoch()).detail("persistentDataSeq", self->persistentDataSequence).detail("Tag1", printable(req.tag1)).detail("Tag2", printable(req.tag2)); - if( req.begin <= self->persistentDataDurableVersion ) { + if( req.begin <= logData->persistentDataDurableVersion ) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We may or may not actually send it depending on // whether we get enough data from disk. // SOMEDAY: Only do this if an initial attempt to read from disk results in insufficient data and the required data is no longer in memory // SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the result? - peekMessagesFromMemory( self, req, messages2, endVersion ); + peekMessagesFromMemory( logData, req, messages2, endVersion ); Standalone> kvs = wait( self->persistentData->readRange(KeyRangeRef( - persistTagMessagesKey(req.tag, req.begin), - persistTagMessagesKey(req.tag, self->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); + persistTagMessagesKey(logData->logId, oldTag, req.begin), + persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto &kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); messages << int32_t(-1) << ver; - messages.serializeBytes(kv.value); + + BinaryReader rd( kv.value, Unversioned() ); + while(!rd.empty()) { + int32_t messageLength; + uint32_t subVersion; + rd >> messageLength >> subVersion; + messageLength += sizeof(uint16_t); + messages << messageLength << subVersion << uint16_t(0); + messageLength -= (sizeof(subVersion) + sizeof(uint16_t)); + messages.serializeBytes(rd.readBytes(messageLength), messageLength); + } } if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) @@ -803,14 +946,14 @@ namespace oldTLog { else messages.serializeBytes( messages2.toStringRef() ); } else { - peekMessagesFromMemory( self, req, messages, endVersion ); + peekMessagesFromMemory( logData, req, messages, endVersion ); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } - Version poppedVer = poppedVersion(self, req.tag); + Version poppedVer = poppedVersion(logData, oldTag); TLogPeekReply reply; - reply.maxKnownVersion = self->version.get(); + reply.maxKnownVersion = logData->version.get(); if(poppedVer > req.begin) { reply.popped = poppedVer; reply.end = poppedVer; @@ -818,7 +961,7 @@ namespace oldTLog { reply.messages = messages.toStringRef(); reply.end = endVersion; } - //TraceEvent("TlogPeek", self->dbgid).detail("endVer", reply.end).detail("msgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address); + //TraceEvent("TlogPeek", self->dbgid).detail("logId", logData->logId).detail("endVer", reply.end).detail("msgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address); if(req.sequence.present()) { auto& trackerData = self->peekTracker[peekId]; @@ -839,11 +982,11 @@ namespace oldTLog { return Void(); } - ACTOR Future doQueueCommit( TLogData* self ) { - state Version ver = self->version.get(); + ACTOR Future doQueueCommit( TLogData* self, Reference logData ) { + state Version ver = logData->version.get(); state Version commitNumber = self->queueCommitBegin+1; self->queueCommitBegin = commitNumber; - self->queueCommittingVersion = ver; + logData->queueCommittingVersion = ver; Future c = self->persistentQueue->commit(); self->diskQueueCommitBytes = 0; @@ -857,9 +1000,9 @@ namespace oldTLog { Void _ = wait(delay(0, g_network->getCurrentTask())); } - ASSERT( ver > self->queueCommittedVersion.get() ); + ASSERT( ver > logData->queueCommittedVersion.get() ); - self->queueCommittedVersion.set(ver); + logData->queueCommittedVersion.set(ver); self->queueCommitEnd.set(commitNumber); TraceEvent("TLogCommitDurable", self->dbgid).detail("Version", ver); @@ -868,120 +1011,243 @@ namespace oldTLog { } ACTOR Future commitQueue( TLogData* self ) { + state Reference logData; + loop { - Void _ = wait( self->version.whenAtLeast( std::max(self->queueCommittingVersion, self->queueCommittedVersion.get()) + 1 ) ); - while( self->queueCommitBegin != self->queueCommitEnd.get() && !self->largeDiskQueueCommitBytes.get() ) { - Void _ = wait( self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) || self->largeDiskQueueCommitBytes.onChange() ); + bool foundCount = 0; + for(auto it : self->id_data) { + if(!it.second->stopped) { + logData = it.second; + foundCount++; + } + } + + ASSERT(foundCount < 2); + if(!foundCount) { + Void _ = wait( self->newLogData.onTrigger() ); + continue; + } + + TraceEvent("commitQueueNewLog", self->dbgid).detail("logId", logData->logId).detail("version", logData->version.get()).detail("committing", logData->queueCommittingVersion).detail("commmitted", logData->queueCommittedVersion.get()); + + loop { + if(logData->stopped && logData->version.get() == std::max(logData->queueCommittingVersion, logData->queueCommittedVersion.get())) { + Void _ = wait( logData->queueCommittedVersion.whenAtLeast(logData->version.get() ) ); + break; + } + + choose { + when(Void _ = wait( logData->version.whenAtLeast( std::max(logData->queueCommittingVersion, logData->queueCommittedVersion.get()) + 1 ) ) ) { + while( self->queueCommitBegin != self->queueCommitEnd.get() && !self->largeDiskQueueCommitBytes.get() ) { + Void _ = wait( self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) || self->largeDiskQueueCommitBytes.onChange() ); + } + self->sharedActors.send(doQueueCommit(self, logData)); + } + when(Void _ = wait(self->newLogData.onTrigger())) {} + } } - self->addActor.send(doQueueCommit(self)); } } - ACTOR Future tLogCommit( - TLogData* self, - TLogCommitRequest req, - PromiseStream warningCollectorInput ) { + ACTOR Future rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryCount recoveryCount, Future registerWithMaster ) { + state UID lastMasterID(0,0); + loop { + auto const& inf = self->dbInfo->get(); + bool isDisplaced = !std::count( inf.priorCommittedLogServers.begin(), inf.priorCommittedLogServers.end(), tli.id() ); + isDisplaced = isDisplaced && inf.recoveryCount >= recoveryCount && inf.recoveryState != 0; - state Optional tlogDebugID; - if(req.debugID.present()) - { - tlogDebugID = g_nondeterministic_random->randomUniqueID(); - g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), tlogDebugID.get().first()); - g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.BeforeWaitForVersion"); + if(isDisplaced) { + for(auto& log : inf.logSystemConfig.tLogs) { + if( std::count( log.tLogs.begin(), log.tLogs.end(), tli.id() ) ) { + isDisplaced = false; + break; + } + } + } + if(isDisplaced) { + for(auto& old : inf.logSystemConfig.oldTLogs) { + for(auto& log : old.tLogs) { + if( std::count( log.tLogs.begin(), log.tLogs.end(), tli.id() ) ) { + isDisplaced = false; + break; + } + } + } + } + if ( isDisplaced ) + { + TraceEvent("TLogDisplaced", tli.id()).detail("Reason", "DBInfoDoesNotContain").detail("recoveryCount", recoveryCount).detail("infRecoveryCount", inf.recoveryCount).detail("recoveryState", inf.recoveryState) + .detail("logSysConf", describe(inf.logSystemConfig.tLogs)).detail("priorLogs", describe(inf.priorCommittedLogServers)).detail("oldLogGens", inf.logSystemConfig.oldTLogs.size()); + if (BUGGIFY) Void _ = wait( delay( SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * g_random->random01() ) ); + throw worker_removed(); + } + + if( registerWithMaster.isReady() ) { + if ( self->dbInfo->get().master.id() != lastMasterID) { + // The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface + TLogRejoinRequest req; + req.myInterface = tli; + TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); + choose { + when ( bool success = wait( brokenPromiseToNever( self->dbInfo->get().master.tlogRejoin.getReply( req ) ) ) ) { + if (success) + lastMasterID = self->dbInfo->get().master.id(); + } + when ( Void _ = wait( self->dbInfo->onChange() ) ) { } + } + } else { + Void _ = wait( self->dbInfo->onChange() ); + } + } else { + Void _ = wait( registerWithMaster || self->dbInfo->onChange() ); + } + } + } + + ACTOR Future cleanupPeekTrackers( TLogData* self ) { + loop { + double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME; + auto it = self->peekTracker.begin(); + while(it != self->peekTracker.end()) { + double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now(); + if(timeUntilExpiration < 1.0e-6) { + for(auto seq : it->second.sequence_version) { + if(!seq.second.isSet()) { + seq.second.sendError(timed_out()); + } + } + it = self->peekTracker.erase(it); + } else { + minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration); + ++it; + } + } + + Void _ = wait( delay(minTimeUntilExpiration) ); + } + } + + void getQueuingMetrics( TLogData* self, TLogQueuingMetricsRequest const& req ) { + TLogQueuingMetricsReply reply; + reply.localTime = now(); + reply.instanceID = self->instanceID; + reply.bytesInput = self->bytesInput; + reply.bytesDurable = self->bytesDurable; + reply.storageBytes = self->persistentData->getStorageBytes(); + reply.v = self->prevVersion; + req.reply.send( reply ); + } + + ACTOR Future serveTLogInterface( TLogData* self, TLogInterface tli, Reference logData, PromiseStream warningCollectorInput ) { + loop choose { + when( TLogPeekRequest req = waitNext( tli.peekMessages.getFuture() ) ) { + logData->addActor.send( tLogPeekMessages( self, req, logData ) ); + } + when( TLogPopRequest req = waitNext( tli.popMessages.getFuture() ) ) { + logData->addActor.send( tLogPop( self, req, logData ) ); + } + when( TLogCommitRequest req = waitNext( tli.commit.getFuture() ) ) { + ASSERT(logData->stopped); + req.reply.sendError( tlog_stopped() ); + } + when( ReplyPromise< TLogLockResult > reply = waitNext( tli.lock.getFuture() ) ) { + logData->addActor.send( tLogLock(self, reply, logData) ); + } + when (TLogQueuingMetricsRequest req = waitNext(tli.getQueuingMetrics.getFuture())) { + getQueuingMetrics(self, req); + } + when (TLogConfirmRunningRequest req = waitNext(tli.confirmRunning.getFuture())){ + if (req.debugID.present() ) { + UID tlogDebugID = g_nondeterministic_random->randomUniqueID(); + g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first()); + g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest"); + } + ASSERT(logData->stopped); + req.reply.sendError( tlog_stopped() ); + } + } + } + + void removeLog( TLogData* self, Reference logData ) { + TraceEvent("TLogRemoved", logData->logId).detail("input", logData->bytesInput.getValue()).detail("durable", logData->bytesDurable.getValue()); + logData->stopped = true; + if(logData->recoverySuccessful.canBeSet()) { + logData->recoverySuccessful.send(false); } - self->knownCommittedVersion = std::max(self->knownCommittedVersion, req.knownCommittedVersion); + logData->addActor = PromiseStream>(); //there could be items still in the promise stream if one of the actors threw an error immediately + self->id_data.erase(logData->logId); - Void _ = wait( self->version.whenAtLeast( req.prevVersion ) ); - - //Calling check_yield instead of yield to avoid a destruction ordering problem in simulation - if(g_network->check_yield(g_network->getCurrentTask())) { - Void _ = wait(delay(0, g_network->getCurrentTask())); + if(self->id_data.size()) { + return; + } else { + throw worker_removed(); } + } - if(self->stopped) { - req.reply.sendError( tlog_stopped() ); + ACTOR Future tLogCore( TLogData* self, Reference logData ) { + if(logData->removed.isReady()) { + Void _ = wait(delay(0)); //to avoid iterator invalidation in restorePersistentState when removed is already ready + ASSERT(logData->removed.isError()); + + if(logData->removed.getError().code() != error_code_worker_removed) { + throw logData->removed.getError(); + } + + removeLog(self, logData); return Void(); } - if (self->version.get() == req.prevVersion) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!) - if(req.debugID.present()) - g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before"); + TraceEvent("newLogData", self->dbgid).detail("logId", logData->logId); + logData->initialized = true; + self->newLogData.trigger(); - TraceEvent("TLogCommit", self->dbgid).detail("Version", req.version); - commitMessages(self, req.version, req.arena, req.messages, req.tags); + state PromiseStream warningCollectorInput; + state Future warningCollector = timeoutWarningCollector( warningCollectorInput.getFuture(), 1.0, "TLogQueueCommitSlow", self->dbgid ); + state Future error = actorCollection( logData->addActor.getFuture() ); - // Log the changes to the persistent queue, to be committed by commitQueue() - TLogQueueEntryRef qe; - qe.version = req.version; - qe.knownCommittedVersion = req.knownCommittedVersion; - qe.messages = req.messages; - qe.tags = req.tags; - self->persistentQueue->push( qe ); + logData->addActor.send( logData->recovery ); + logData->addActor.send( waitFailureServer(logData->tli.waitFailure.getFuture()) ); + logData->addActor.send( logData->removed ); + //FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance + logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics")); + logData->addActor.send( serveTLogInterface(self, logData->tli, logData, warningCollectorInput) ); - self->diskQueueCommitBytes += qe.expectedSize(); - if( self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES ) { - self->largeDiskQueueCommitBytes.set(true); - } + try { + Void _ = wait( error ); + throw internal_error(); + } catch( Error &e ) { + if( e.code() != error_code_worker_removed ) + throw; - // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors - self->prevVersion = self->version.get(); - self->version.set( req.version ); - - if(req.debugID.present()) - g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit"); + removeLog(self, logData); + return Void(); } - // Send replies only once all prior messages have been received and committed. - Void _ = wait( timeoutWarning( self->queueCommittedVersion.whenAtLeast( req.version ), 0.1, warningCollectorInput ) ); - - if(req.debugID.present()) - g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After"); - - req.reply.send( Void() ); - return Void(); } - ACTOR Future initPersistentState( TLogData* self ) { - // PERSIST: Initial setup of persistentData for a brand new tLog for a new database - IKeyValueStore *storage = self->persistentData; - storage->set( persistFormat ); - storage->set( KeyValueRef( persistID, BinaryWriter::toValue( self->dbgid, Unversioned() ) ) ); - storage->set( KeyValueRef( persistCurrentVersionKey, BinaryWriter::toValue(self->version.get(), Unversioned()) ) ); - storage->set( KeyValueRef( persistRecoveryCountKey, BinaryWriter::toValue(self->recoveryCount, Unversioned()) ) ); - - TraceEvent("TLogInitCommit", self->dbgid).detail("Version", self->version.get()); - Void _ = wait( storage->commit() ); - return Void(); - } - - ACTOR Future restorePersistentState( TLogData* self, Promise outRecoveryCount, bool processQueue, TLogInterface myInterface ) { + ACTOR Future restorePersistentState( TLogData* self, LocalityData locality ) { state double startt = now(); + state Reference logData; + state KeyRange tagKeys; // PERSIST: Read basic state from persistentData; replay persistentQueue but don't erase it + + TraceEvent("TLogRestorePersistentState", self->dbgid); + IKeyValueStore *storage = self->persistentData; - - TraceEvent("TLogRestorePersistentState", self->dbgid).detail("pq", processQueue); - state Future> fFormat = storage->readValue(persistFormat.key); - state Future> fID = storage->readValue(persistID); - state Future> fVer = storage->readValue(persistCurrentVersionKey); - state Future> fRecoveryCount = storage->readValue(persistRecoveryCountKey); - state Future> fRecoveryInProgress = storage->readValue( persistRecoveryInProgress.key ); + state Future>> fVers = storage->readRange(persistCurrentVersionKeys); + state Future>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys); // FIXME: metadata in queue? - Void _ = wait( waitForAll( (vector>>(), fFormat, fID, fVer, fRecoveryCount, fRecoveryInProgress) ) ); + Void _ = wait( waitForAll( (vector>>(), fFormat ) ) ); + Void _ = wait( waitForAll( (vector>>>(), fVers, fRecoverCounts) ) ); if (fFormat.get().present() && !persistFormatReadableRange.contains( fFormat.get().get() )) { TraceEvent(SevError, "UnsupportedDBFormat", self->dbgid).detail("Format", printable(fFormat.get().get())).detail("Expected", persistFormat.value.toString()); throw worker_recovery_failed(); } - if (fRecoveryInProgress.get().present()) { - TEST(true); // We must have rebooted during network recovery; the master recovery that depended on us will fail and we can permanently delete our (incomplete) storage - TraceEvent("RestartedDuringNetworkRecovery", self->dbgid); - throw worker_removed(); - } - if (!fFormat.get().present()) { Standalone> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) ); if (!v.size()) { @@ -995,450 +1261,186 @@ namespace oldTLog { } } + state std::vector>> removed; - ASSERT( self->dbgid == BinaryReader::fromStringRef(fID.get().get(), Unversioned()) ); + ASSERT(fVers.get().size() == fRecoverCounts.get().size()); - Version ver = BinaryReader::fromStringRef( fVer.get().get(), Unversioned() ); - self->persistentDataVersion = ver; - self->persistentDataDurableVersion = ver; - self->version.set( ver ); + state int idx = 0; + state Promise registerWithMaster; + for(idx = 0; idx < fVers.get().size(); idx++) { + state KeyRef rawId = fVers.get()[idx].key.removePrefix(persistCurrentVersionKeys.begin); + UID id1 = BinaryReader::fromStringRef( rawId, Unversioned() ); + UID id2 = BinaryReader::fromStringRef( fRecoverCounts.get()[idx].key.removePrefix(persistRecoveryCountKeys.begin), Unversioned() ); + ASSERT(id1 == id2); - TraceEvent("TLogRestorePersistentStateVer", self->dbgid).detail("ver", self->version.get()); + TLogInterface recruited; + recruited.uniqueID = id1; + recruited.locality = locality; + recruited.initEndpoints(); - self->recoveryCount = BinaryReader::fromStringRef( fRecoveryCount.get().get(), Unversioned() ); + DUMPTOKEN( recruited.peekMessages ); + DUMPTOKEN( recruited.popMessages ); + DUMPTOKEN( recruited.commit ); + DUMPTOKEN( recruited.lock ); + DUMPTOKEN( recruited.getQueuingMetrics ); + DUMPTOKEN( recruited.confirmRunning ); - outRecoveryCount.send( self->recoveryCount ); // This might cancel this actor (if the recovery count is ancient) and destroy self - Void _ = wait(Future(Void())); // ... so check for cancellation + logData = Reference( new LogData(self, recruited) ); + logData->stopped = true; + self->id_data[id1] = logData; - // Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion might be lost, but - // that is fine because we will get the corresponding data back, too. - state KeyRange tagKeys = persistTagPoppedKeys; - loop { - Standalone> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) ); - if (!data.size()) break; - ((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end ); + Version ver = BinaryReader::fromStringRef( fVers.get()[idx].value, Unversioned() ); + logData->persistentDataVersion = ver; + logData->persistentDataDurableVersion = ver; + logData->version.set(ver); + logData->recoveryCount = BinaryReader::fromStringRef( fRecoverCounts.get()[idx].value, Unversioned() ); + logData->removed = rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture()); + removed.push_back(errorOr(logData->removed)); - for(auto &kv : data) { - Tag tag = decodeTagPoppedKey(kv.key); - Version popped = decodeTagPoppedValue(kv.value); - TraceEvent("TLogRestorePop", self->dbgid).detail("Tag", tag).detail("To", popped); - ASSERT( self->tag_data.find(tag) == self->tag_data.end() ); - self->tag_data.insert( mapPair( std::move(Tag(tag)), TLogData::TagData( popped, false, false, tag )) ); + TraceEvent("TLogRestorePersistentStateVer", id1).detail("ver", ver); + + // Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion might be lost, but + // that is fine because we will get the corresponding data back, too. + tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) ); + loop { + if(logData->removed.isReady()) break; + Standalone> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) ); + if (!data.size()) break; + ((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end ); + + for(auto &kv : data) { + OldTag tag = decodeTagPoppedKey(rawId, kv.key); + Version popped = decodeTagPoppedValue(kv.value); + TraceEvent("TLogRestorePop", logData->logId).detail("Tag", tag).detail("To", popped); + ASSERT( logData->tag_data.find(tag) == logData->tag_data.end() ); + logData->tag_data.insert( mapPair( tag, LogData::TagData( popped, false, false, tag )) ); + } } } - // PERSIST: Apply changes from queue - if (processQueue) { - state Version lastVer = 0; - state double recoverMemoryLimit = SERVER_KNOBS->TARGET_BYTES_PER_TLOG + SERVER_KNOBS->SPRING_BYTES_TLOG; - if (BUGGIFY) recoverMemoryLimit = SERVER_KNOBS->BUGGIFY_RECOVER_MEMORY_LIMIT; - try { - loop { - TLogQueueEntry qe = wait( self->persistentQueue->readNext() ); - //TraceEvent("TLogRecoveredQE", self->dbgid).detail("ver", qe.version).detail("MessageBytes", qe.messages.size()).detail("Tags", qe.tags.size()) - // .detail("Tag0", qe.tags.size() ? qe.tags[0].tag : invalidTag); + state Future allRemoved = waitForAll(removed); + state Version lastVer = 0; + state UID lastId = UID(1,1); //initialized so it will not compare equal to a default UID + state double recoverMemoryLimit = SERVER_KNOBS->TARGET_BYTES_PER_TLOG + SERVER_KNOBS->SPRING_BYTES_TLOG; + if (BUGGIFY) recoverMemoryLimit = std::max(SERVER_KNOBS->BUGGIFY_RECOVER_MEMORY_LIMIT, SERVER_KNOBS->TLOG_SPILL_THRESHOLD); + + try { + loop { + if(allRemoved.isReady()) { + TEST(true); //all tlogs removed during queue recovery + throw worker_removed(); + } + choose { + when( TLogQueueEntry qe = wait( self->persistentQueue->readNext() ) ) { + if(!self->queueOrder.size() || self->queueOrder.back() != qe.id) self->queueOrder.push_back(qe.id); + if(qe.id != lastId) { + lastId = qe.id; + auto it = self->id_data.find(qe.id); + if(it != self->id_data.end()) { + logData = it->second; + } else { + logData = Reference(); + } + } else { + ASSERT( qe.version >= lastVer ); + lastVer = qe.version; + } - ASSERT( qe.version > lastVer ); - lastVer = qe.version; - self->knownCommittedVersion = std::max(self->knownCommittedVersion, qe.knownCommittedVersion); - if( qe.version > self->version.get() ) { - commitMessages(self, qe.version, qe.arena(), qe.messages, qe.tags); - self->version.set( qe.version ); - self->queueCommittedVersion.set( qe.version ); + //TraceEvent("TLogRecoveredQE", self->dbgid).detail("logId", qe.id).detail("ver", qe.version).detail("MessageBytes", qe.messages.size()).detail("Tags", qe.tags.size()) + // .detail("Tag0", qe.tags.size() ? qe.tags[0].tag : invalidTag).detail("version", logData->version.get()); - if (self->bytesInput.getValue() - self->bytesDurable.getValue() > recoverMemoryLimit) { - TEST(true); // Flush excess data during TLog queue recovery - TraceEvent("FlushLargeQueueDuringRecovery", self->dbgid).detail("BytesInput", self->bytesInput.getValue()).detail("BytesDurable", self->bytesDurable.getValue()).detail("Version", self->version.get()).detail("PVer", self->persistentDataVersion); + if(logData) { + logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, qe.knownCommittedVersion); + if( qe.version > logData->version.get() ) { + commitMessages(logData, qe.version, qe.arena(), qe.messages, qe.tags, self->bytesInput); + logData->version.set( qe.version ); + logData->queueCommittedVersion.set( qe.version ); - while(self->persistentDataDurableVersion != self->version.get()) { - Version nextVersion; - int totalSize = 0; - std::vector>::iterator, std::deque>::iterator>> iters; - for(auto tag = self->tag_data.begin(); tag != self->tag_data.end(); ++tag) - iters.push_back(std::make_pair(tag->value.version_messages.begin(), tag->value.version_messages.end())); + while (self->bytesInput - self->bytesDurable >= recoverMemoryLimit) { + TEST(true); // Flush excess data during TLog queue recovery + TraceEvent("FlushLargeQueueDuringRecovery", self->dbgid).detail("BytesInput", self->bytesInput).detail("BytesDurable", self->bytesDurable).detail("Version", logData->version.get()).detail("PVer", logData->persistentDataVersion); - while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT ) { - nextVersion = self->version.get(); - for( auto &it : iters ) - if(it.first != it.second) - nextVersion = std::min( nextVersion, it.first->first + 1 ); - - if(nextVersion == self->version.get()) - break; - - for( auto &it : iters ) { - while (it.first != it.second && it.first->first < nextVersion) { - totalSize += it.first->second.expectedSize(); - ++it.first; - } + choose { + when( Void _ = wait( updateStorage(self) ) ) {} + when( Void _ = wait( allRemoved ) ) { throw worker_removed(); } } } - - Void _ = wait( updatePersistentData(self, nextVersion ) ); } } } - } - } catch (Error& e) { - if (e.code() != error_code_end_of_stream) throw; - } - } - - TraceEvent("TLogRestorePersistentStateDone", self->dbgid) - .detail("pq", processQueue).detail("version", self->version.get()).detail("durableVer", self->persistentDataDurableVersion) - .detail("Took", now()-startt); - TEST( now()-startt >= 1.0 ); // TLog recovery took more than 1 second - TEST( processQueue ); // TLog recovered from disk queue - - return Void(); - } - - void getQueuingMetrics( TLogData* self, TLogQueuingMetricsRequest const& req ) { - TLogQueuingMetricsReply reply; - reply.localTime = now(); - reply.instanceID = self->instanceID; - reply.bytesInput = self->bytesInput.getValue(); - reply.bytesDurable = self->bytesDurable.getValue(); - reply.storageBytes = self->persistentData->getStorageBytes(); - reply.v = self->prevVersion; - req.reply.send( reply ); - } - - ACTOR Future respondToRecovered( TLogInterface tli, Future recovery ) { - Void _ = wait( recovery ); - - loop { - TLogRecoveryFinishedRequest req = waitNext( tli.recoveryFinished.getFuture() ); - req.reply.send(Void()); - } - } - - ACTOR Future cleanupPeekTrackers( TLogData* self ) { - loop { - double minExpireTime = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME; - auto it = self->peekTracker.begin(); - while(it != self->peekTracker.end()) { - double expireTime = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now()-it->second.lastUpdate; - if(expireTime < 1.0e-6) { - for(auto seq : it->second.sequence_version) { - if(!seq.second.isSet()) { - seq.second.sendError(timed_out()); - } - } - it = self->peekTracker.erase(it); - } else { - minExpireTime = std::min(minExpireTime, expireTime); - ++it; + when( Void _ = wait( allRemoved ) ) { throw worker_removed(); } } } - - Void _ = wait( delay(minExpireTime) ); - } - } - - ACTOR Future serveTLogInterface( TLogData* self, TLogInterface tli, PromiseStream warningCollectorInput ) { - loop choose { - when( TLogPeekRequest req = waitNext( tli.peekMessages.getFuture() ) ) { - self->addActor.send( tLogPeekMessages( self, req ) ); - } - when( TLogPopRequest req = waitNext( tli.popMessages.getFuture() ) ) { - self->addActor.send( tLogPop( self, req ) ); - } - when( TLogCommitRequest req = waitNext( tli.commit.getFuture() ) ) { - TEST(self->stopped); // TLogCommitRequest while stopped - if (!self->stopped) - self->addActor.send( tLogCommit( self, req, warningCollectorInput ) ); - else - req.reply.sendError( tlog_stopped() ); - } - when( ReplyPromise< TLogLockResult > reply = waitNext( tli.lock.getFuture() ) ) { - self->addActor.send( tLogLock(self, reply) ); - } - when (TLogQueuingMetricsRequest req = waitNext(tli.getQueuingMetrics.getFuture())) { - getQueuingMetrics(self, req); - } - when (TLogConfirmRunningRequest req = waitNext(tli.confirmRunning.getFuture())){ - if (req.debugID.present() ) { - UID tlogDebugID = g_nondeterministic_random->randomUniqueID(); - g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first()); - g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest"); - } - if (!self->stopped) - req.reply.send(Void()); - else - req.reply.sendError( tlog_stopped() ); - } - } - } - - ACTOR Future tLogCore( TLogData* self, TLogInterface tli, Future recovery ) { - state PromiseStream warningCollectorInput; - state Future warningCollector = timeoutWarningCollector( warningCollectorInput.getFuture(), 1.0, "TLogQueueCommitSlow", self->dbgid ); - state Future error = actorCollection( self->addActor.getFuture() ); - - self->addActor.send( updateStorage(self) ); - self->addActor.send( commitQueue(self) ); - self->addActor.send( waitFailureServer(tli.waitFailure.getFuture()) ); - self->addActor.send( respondToRecovered(tli, recovery) ); - self->addActor.send( traceCounters("TLogMetrics", self->dbgid, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self->cc, self->dbgid.toString() + "/TLogMetrics")); - self->addActor.send( cleanupPeekTrackers(self) ); - - if( recovery.isValid() && !recovery.isReady()) { - self->addActor.send( recovery ); - } - - self->coreStarted = true; - - Void _ = wait( serveTLogInterface(self, tli, warningCollectorInput) || error ); - throw internal_error(); - }; - - ACTOR Future checkEmptyQueue(TLogData* self) { - TraceEvent("TLogCheckEmptyQueueBegin", self->dbgid); - try { - TLogQueueEntry r = wait( self->persistentQueue->readNext() ); - throw internal_error(); } catch (Error& e) { if (e.code() != error_code_end_of_stream) throw; - TraceEvent("TLogCheckEmptyQueueEnd", self->dbgid); - return Void(); - } - } - - ACTOR Future recoverTagFromLogSystem( TLogData* self, Version beginVersion, Version endVersion, Tag tag, Reference> uncommittedBytes, Reference>> logSystem ) { - state Future dbInfoChange = Void(); - state Reference r; - state Version tagAt = beginVersion; - state Version tagPopped = 0; - state Version lastVer = 0; - - TraceEvent("LogRecoveringTagBegin", self->dbgid).detail("Tag", tag).detail("recoverAt", endVersion); - - while (tagAt <= endVersion) { - loop { - choose { - when(Void _ = wait( r ? r->getMore() : Never() ) ) { - break; - } - when( Void _ = wait( dbInfoChange ) ) { - if(r) tagPopped = std::max(tagPopped, r->popped()); - if( logSystem->get() ) - r = logSystem->get()->peek( tagAt, tag ); - else - r = Reference(); - dbInfoChange = logSystem->onChange(); - } - } - } - - TraceEvent("LogRecoveringTagResults", self->dbgid).detail("Tag", tag); - - Version ver = 0; - BinaryWriter wr( Unversioned() ); - int writtenBytes = 0; - while (true) { - bool foundMessage = r->hasMessage(); - //TraceEvent("LogRecoveringMsg").detail("Tag", tag).detail("foundMessage", foundMessage).detail("ver", r->version().toString()); - if (!foundMessage || r->version().version != ver) { - ASSERT(r->version().version > lastVer); - if (ver) { - //TraceEvent("LogRecoveringTagVersion", self->dbgid).detail("Tag", tag).detail("Ver", ver).detail("Bytes", wr.getLength()); - writtenBytes += 100 + wr.getLength(); - self->persistentData->set( KeyValueRef( persistTagMessagesKey( tag, ver ), wr.toStringRef() ) ); - } - lastVer = ver; - ver = r->version().version; - wr = BinaryWriter( Unversioned() ); - if (!foundMessage || ver > endVersion) - break; - } - - // FIXME: This logic duplicates stuff in LogPushData::addMessage(), and really would be better in PeekResults or somewhere else. Also unnecessary copying. - StringRef msg = r->getMessage(); - wr << uint32_t( msg.size() + sizeof(uint32_t) ) << r->version().sub; - wr.serializeBytes( msg ); - r->nextMessage(); - } - - tagAt = r->version().version; - - if(writtenBytes) - uncommittedBytes->set(uncommittedBytes->get() + writtenBytes); - - while(uncommittedBytes->get() >= SERVER_KNOBS->LARGE_TLOG_COMMIT_BYTES) { - Void _ = wait(uncommittedBytes->onChange()); - } - } - if(r) tagPopped = std::max(tagPopped, r->popped()); - - auto tsm = self->tag_data.find(tag); - if (tsm == self->tag_data.end()) { - self->tag_data.insert( mapPair(std::move(Tag(tag)), TLogData::TagData(tagPopped, false, true, tag)) ); } - Void _ = wait(tLogPop( self, TLogPopRequest(tagPopped, tag) )); + TraceEvent("TLogRestorePersistentStateDone", self->dbgid).detail("Took", now()-startt); + TEST( now()-startt >= 1.0 ); // TLog recovery took more than 1 second - updatePersistentPopped( self, tag, self->tag_data.find(tag)->value ); + for(auto it : self->id_data) { + if(it.second->queueCommittedVersion.get() == 0) { + TraceEvent("TLogZeroVersion", self->dbgid).detail("logId", it.first); + it.second->queueCommittedVersion.set(it.second->version.get()); + } + self->sharedActors.send( tLogCore( self, it.second ) ); + } + + if(registerWithMaster.canBeSet()) registerWithMaster.send(Void()); return Void(); } - ACTOR Future updateLogSystem(TLogData* self, LogSystemConfig recoverFrom, Reference>> logSystem) { - loop { - TraceEvent("TLogUpdate", self->dbgid).detail("recoverFrom", recoverFrom.toString()).detail("dbInfo", self->dbInfo->get().logSystemConfig.toString()); - if( self->dbInfo->get().logSystemConfig.isEqualIds(recoverFrom) ) { - logSystem->set(ILogSystem::fromLogSystemConfig( self->dbgid, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig )); - } else if( self->dbInfo->get().logSystemConfig.isNextGenerationOf(recoverFrom) && std::count( self->dbInfo->get().logSystemConfig.tLogs.begin(), self->dbInfo->get().logSystemConfig.tLogs.end(), self->dbgid ) ) { - logSystem->set(ILogSystem::fromOldLogSystemConfig( self->dbgid, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig )); - } else { - logSystem->set(Reference()); - } - Void _ = wait( self->dbInfo->onChange() ); - } - } - - ACTOR Future recoverFromLogSystem( TLogData* self, LogSystemConfig recoverFrom, Version recoverAt, Version knownCommittedVersion, std::vector recoverTags, Promise copyComplete ) { - state Future committing = Void(); - state double lastCommitT = now(); - state Reference> uncommittedBytes = Reference>(new AsyncVar()); - state std::vector> recoverFutures; - state Reference>> logSystem = Reference>>(new AsyncVar>()); - state Future updater = updateLogSystem(self, recoverFrom, logSystem); - - for(auto tag : recoverTags ) - recoverFutures.push_back(recoverTagFromLogSystem(self, knownCommittedVersion, recoverAt, tag, uncommittedBytes, logSystem)); - - state Future copyDone = waitForAll(recoverFutures); - state Future recoveryDone = Never(); - state Future commitTimeout = delay(SERVER_KNOBS->LONG_TLOG_COMMIT_TIME); - - loop { - choose { - when(Void _ = wait(copyDone)) { - recoverFutures.clear(); - for(auto tag : recoverTags ) - recoverFutures.push_back(recoverTagFromLogSystem(self, 0, knownCommittedVersion, tag, uncommittedBytes, logSystem)); - copyDone = Never(); - recoveryDone = waitForAll(recoverFutures); - - Void __ = wait( committing ); - Void __ = wait( self->updatePersist ); - committing = self->persistentData->commit(); - commitTimeout = delay(SERVER_KNOBS->LONG_TLOG_COMMIT_TIME); - uncommittedBytes->set(0); - Void __ = wait( committing ); - TraceEvent("TLogCommitCopyData", self->dbgid); - - if(!copyComplete.isSet()) - copyComplete.send(Void()); - } - when(Void _ = wait(recoveryDone)) { break; } - when(Void _ = wait(commitTimeout)) { - TEST(true); // We need to commit occasionally if this process is long to avoid running out of memory. - // We let one, but not more, commits pipeline with the network transfer - Void __ = wait( committing ); - Void __ = wait( self->updatePersist ); - committing = self->persistentData->commit(); - commitTimeout = delay(SERVER_KNOBS->LONG_TLOG_COMMIT_TIME); - uncommittedBytes->set(0); - TraceEvent("TLogCommitRecoveryData", self->dbgid).detail("MemoryUsage", DEBUG_DETERMINISM ? 0 : getMemoryUsage()); - } - when(Void _ = wait(uncommittedBytes->onChange())) { - if(uncommittedBytes->get() >= SERVER_KNOBS->LARGE_TLOG_COMMIT_BYTES) - commitTimeout = Void(); - } - } - } - - Void _ = wait( committing ); - Void _ = wait( self->updatePersist ); - Void _ = wait( self->persistentData->commit() ); - - TraceEvent("TLogRecoveryComplete", self->dbgid).detail("Locality", self->dbInfo->get().myLocality.toString()); - TEST(true); // tLog restore from old log system completed - - return Void(); - } - - ACTOR Future tLogStart( TLogData* self, LogSystemConfig recoverFrom, Version recoverAt, Version knownCommittedVersion, std::vector recoverTags, bool recoverFromDisk, - TLogInterface tli, ReplyPromise outInterface, Promise outRecoveryCount ) { - state Future recovery = Void(); - if (recoverFrom.logSystemType == 1) { - ASSERT(false); - } else if (recoverFrom.logSystemType == 2) { - Void _ = wait( checkEmptyQueue(self) ); - - self->persistentDataVersion = recoverAt; - self->persistentDataDurableVersion = recoverAt; // durable is a white lie until initPersistentState() commits the store - self->queueCommittedVersion.set( recoverAt ); - self->version.set( recoverAt ); - - Void _ = wait( initPersistentState( self ) ); - - state Promise copyComplete; - recovery = recoverFromLogSystem( self, recoverFrom, recoverAt, knownCommittedVersion, recoverTags, copyComplete ); - Void _ = wait(copyComplete.getFuture()); - } else if (recoverFromDisk) { - Void _ = wait( restorePersistentState( self, outRecoveryCount, true, tli ) ); - TEST(true); // tLog restore from disk completed + bool tlogTerminated( TLogData* self, IKeyValueStore* persistentData, TLogQueue* persistentQueue, Error const& e ) { + // Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just close it. + self->terminated = true; + if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) { + persistentData->dispose(); + persistentQueue->dispose(); } else { - // Brand new tlog, initialization has already been done by caller - Void _ = wait( checkEmptyQueue(self) ); - Void _ = wait( initPersistentState( self ) ); + persistentData->close(); + persistentQueue->close(); } - TraceEvent("TLogReady", self->dbgid); - - validate(self); - //dump(self); - - outInterface.send( tli ); - - Void _ = wait( tLogCore( self, tli, recovery ) ); - throw internal_error(); // tLogCore() shouldn't return without an error + if ( e.code() == error_code_worker_removed || + e.code() == error_code_recruitment_failed || + e.code() == error_code_file_not_found ) + { + TraceEvent("TLogTerminated", self->dbgid).error(e, true); + return true; + } else + return false; } - ACTOR Future rejoinMasters( TLogData* self, TLogInterface tli, Future fRecoveryCount ) { - state DBRecoveryCount recoveryCount = wait( fRecoveryCount ); - state UID lastMasterID(0,0); - loop { - auto const& inf = self->dbInfo->get(); - bool isDisplaced = inf.recoveryCount >= recoveryCount && inf.recoveryState != 0 && - !std::count( inf.logSystemConfig.tLogs.begin(), inf.logSystemConfig.tLogs.end(), tli.id() ) && - !std::count( inf.priorCommittedLogServers.begin(), inf.priorCommittedLogServers.end(), tli.id() ); - for(int i = 0; i < inf.logSystemConfig.oldTLogs.size() && isDisplaced; i++) { - isDisplaced = !std::count( inf.logSystemConfig.oldTLogs[i].tLogs.begin(), inf.logSystemConfig.oldTLogs[i].tLogs.end(), tli.id() ); - } - if ( isDisplaced ) - { - TraceEvent("TLogDisplaced", tli.id()).detail("Reason", "DBInfoDoesNotContain"); - if (BUGGIFY) Void _ = wait( delay( SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * g_random->random01() ) ); - throw worker_removed(); - } + ACTOR Future tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference> db, LocalityData locality, UID tlogId ) + { + state TLogData self( tlogId, persistentData, persistentQueue, db ); + state Future error = actorCollection( self.sharedActors.getFuture() ); - if (self->dbInfo->get().master.id() != lastMasterID) { - // The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface - TLogRejoinRequest req; - req.myInterface = tli; - TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id()); - choose { - when ( bool success = wait( brokenPromiseToNever( self->dbInfo->get().master.tlogRejoin.getReply( req ) ) ) ) { - if (success) - lastMasterID = self->dbInfo->get().master.id(); - } - when ( Void _ = wait( self->dbInfo->onChange() ) ) { } + TraceEvent("SharedTlog", tlogId); + + try { + Void _ = wait( restorePersistentState( &self, locality ) ); + + self.sharedActors.send( cleanupPeekTrackers(&self) ); + self.sharedActors.send( commitQueue(&self) ); + self.sharedActors.send( updateStorageLoop(&self) ); + + Void _ = wait( error ); + throw internal_error(); + } catch (Error& e) { + TraceEvent("TLogError", tlogId).error(e, true); + + for( auto& it : self.id_data ) { + if(it.second->recoverySuccessful.canBeSet()) { + it.second->recoverySuccessful.send(false); } - } else - Void _ = wait( self->dbInfo->onChange() ); + } + + if (tlogTerminated( &self, persistentData, self.persistentQueue, e )) { + return Void(); + } else { + throw; + } } } - - // Restore from disk - ACTOR Future tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, TLogInterface tli, Reference> db ) { - state TLogData self( tli.id(), persistentData, persistentQueue, db ); - state Promise recoveryCount; - state Future removed = rejoinMasters(&self, tli, recoveryCount.getFuture()); - - Void _ = wait( tLogStart( &self, LogSystemConfig(), Version(0), Version(0), std::vector(), true, tli, ReplyPromise(), recoveryCount ) || removed ); - throw internal_error(); // tLogStart doesn't return without an error - } -} +} \ No newline at end of file diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 9751a09372..31be01f3ff 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -375,9 +375,8 @@ struct LogData : NonCopyable, public ReferenceCounted { Reference>> logSystem; Optional remoteTag; - int persistentDataFormat; - explicit LogData(TLogData* tLogData, TLogInterface interf, Optional remoteTag, int persistentDataFormat = 1) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()), - cc("TLog", interf.id().toString()), bytesInput("bytesInput", cc), bytesDurable("bytesDurable", cc), remoteTag(remoteTag), persistentDataFormat(persistentDataFormat), logSystem(new AsyncVar>()), + explicit LogData(TLogData* tLogData, TLogInterface interf, Optional remoteTag) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()), + cc("TLog", interf.id().toString()), bytesInput("bytesInput", cc), bytesDurable("bytesDurable", cc), remoteTag(remoteTag), logSystem(new AsyncVar>()), // These are initialized differently on init() or recovery recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void()), unrecoveredBefore(0) { @@ -824,20 +823,7 @@ void peekMessagesFromMemory( Reference self, TLogPeekRequest const& req messages << int32_t(-1) << currentVersion; } - if(self->persistentDataFormat == 0) { - BinaryReader rd( it->second.getLengthPtr(), it->second.expectedSize()+4, Unversioned() ); - while(!rd.empty()) { - int32_t messageLength; - uint32_t subVersion; - rd >> messageLength >> subVersion; - messageLength += sizeof(uint16_t); - messages << messageLength << subVersion << uint16_t(0); - messageLength -= (sizeof(subVersion) + sizeof(uint16_t)); - messages.serializeBytes(rd.readBytes(messageLength), messageLength); - } - } else { - messages << it->second.toStringRef(); - } + messages << it->second.toStringRef(); } } @@ -945,21 +931,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere for (auto &kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); messages << int32_t(-1) << ver; - - if(logData->persistentDataFormat == 0) { - BinaryReader rd( kv.value, Unversioned() ); - while(!rd.empty()) { - int32_t messageLength; - uint32_t subVersion; - rd >> messageLength >> subVersion; - messageLength += sizeof(uint16_t); - messages << messageLength << subVersion << uint16_t(0); - messageLength -= (sizeof(subVersion) + sizeof(uint16_t)); - messages.serializeBytes(rd.readBytes(messageLength), messageLength); - } - } else { - messages.serializeBytes(kv.value); - } + messages.serializeBytes(kv.value); } if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) @@ -1518,7 +1490,7 @@ ACTOR Future checkRecovered(TLogData* self) { return Void(); } -ACTOR Future restorePersistentState( TLogData* self, LocalityData locality, Promise recovered, PromiseStream tlogRequests ) { +ACTOR Future restorePersistentState( TLogData* self, LocalityData locality, Promise oldLog, Promise recovered, PromiseStream tlogRequests ) { state double startt = now(); state Reference logData; state KeyRange tagKeys; @@ -1556,9 +1528,17 @@ ACTOR Future restorePersistentState( TLogData* self, LocalityData locality } state std::vector>> removed; - state int persistentDataFormat = 0; - if(fFormat.get().get() >= LiteralStringRef("FoundationDB/LogServer/2/4")) { - persistentDataFormat = 1; + + if(fFormat.get().get() == LiteralStringRef("FoundationDB/LogServer/2/3")) { + //FIXME: need for upgrades from 5.X to 6.0, remove once this upgrade path is no longer needed + if(recovered.canBeSet()) recovered.send(Void()); + oldLog.send(Void()); + while(!tlogRequests.isEmpty()) { + tlogRequests.getFuture().pop().reply.sendError(recruitment_failed()); + } + + Void _ = wait( oldTLog::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid) ); + throw internal_error(); } ASSERT(fVers.get().size() == fRecoverCounts.get().size()); @@ -1590,7 +1570,7 @@ ACTOR Future restorePersistentState( TLogData* self, LocalityData locality DUMPTOKEN( recruited.confirmRunning ); //We do not need the remoteTag, because we will not be loading any additional data - logData = Reference( new LogData(self, recruited, Optional(), persistentDataFormat) ); + logData = Reference( new LogData(self, recruited, Optional()) ); logData->stopped = true; self->id_data[id1] = logData; id_interf[id1] = recruited; @@ -1986,7 +1966,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit } // New tLog (if !recoverFrom.size()) or restore from network -ACTOR Future tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, bool restoreFromDisk, Promise recovered ) { +ACTOR Future tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, bool restoreFromDisk, Promise oldLog, Promise recovered ) { state TLogData self( tlogId, persistentData, persistentQueue, db ); state Future error = actorCollection( self.sharedActors.getFuture() ); @@ -1994,7 +1974,7 @@ ACTOR Future tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ try { if(restoreFromDisk) { - Void _ = wait( restorePersistentState( &self, locality, recovered, tlogRequests ) ); + Void _ = wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) ); } else { Void _ = wait( checkEmptyQueue(&self) && checkRecovered(&self) ); } diff --git a/fdbserver/WorkerInterface.h b/fdbserver/WorkerInterface.h index 9726b43d08..708985bcd2 100644 --- a/fdbserver/WorkerInterface.h +++ b/fdbserver/WorkerInterface.h @@ -289,7 +289,7 @@ Future storageServer( Promise const& recovered); // changes pssi->id() to be the recovered ID Future masterServer( MasterInterface const& mi, Reference> const& db, class ServerCoordinators const&, LifetimeToken const& lifetime ); Future masterProxyServer(MasterProxyInterface const& proxy, InitializeMasterProxyRequest const& req, Reference> const& db); -Future tLog( class IKeyValueStore* const& persistentData, class IDiskQueue* const& persistentQueue, Reference> const& db, LocalityData const& locality, PromiseStream const& tlogRequests, UID const& tlogId, bool const& restoreFromDisk, Promise const& recovered ); // changes tli->id() to be the recovered ID +Future tLog( class IKeyValueStore* const& persistentData, class IDiskQueue* const& persistentQueue, Reference> const& db, LocalityData const& locality, PromiseStream const& tlogRequests, UID const& tlogId, bool const& restoreFromDisk, Promise const& oldLog, Promise const& recovered ); // changes tli->id() to be the recovered ID Future debugQueryServer( DebugQueryRequest const& req ); Future monitorServerDBInfo( Reference>> const& ccInterface, Reference const&, LocalityData const&, Reference> const& dbInfo ); Future resolver( ResolverInterface const& proxy, InitializeResolverRequest const&, Reference> const& db ); @@ -299,7 +299,7 @@ void registerThreadForProfiling(); void updateCpuProfiler(ProfilerRequest req); namespace oldTLog { - Future tLog( IKeyValueStore* const& persistentData, IDiskQueue* const& persistentQueue, TLogInterface const& tli, Reference> const& db ); + Future tLog( IKeyValueStore* const& persistentData, IDiskQueue* const& persistentQueue, Reference> const& db, LocalityData const& locality, UID const& tlogId ); } #endif diff --git a/fdbserver/fdbserver.vcxproj b/fdbserver/fdbserver.vcxproj index a8397fe455..14a6832dbf 100644 --- a/fdbserver/fdbserver.vcxproj +++ b/fdbserver/fdbserver.vcxproj @@ -53,6 +53,7 @@ + @@ -299,4 +300,4 @@ - + \ No newline at end of file diff --git a/fdbserver/fdbserver.vcxproj.filters b/fdbserver/fdbserver.vcxproj.filters index 1a1340b662..cc2679d2cc 100644 --- a/fdbserver/fdbserver.vcxproj.filters +++ b/fdbserver/fdbserver.vcxproj.filters @@ -272,6 +272,7 @@ workloads + @@ -365,4 +366,4 @@ {de5e282f-8d97-4054-b795-0a75b772326f} - + \ No newline at end of file diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 2ee148f282..89f6a6175f 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -623,14 +623,15 @@ ACTOR Future workerServer( Reference connFile, Refe details["StorageEngine"] = s.storeType.toString(); startRole( s.storeID, interf.id(), "SharedTLog", details, "Restored" ); + Promise oldLog; Promise recovery; - Future tl = tLog( kv, queue, dbInfo, locality, tlog.isReady() ? tlogRequests : PromiseStream(), s.storeID, true, recovery ); + Future tl = tLog( kv, queue, dbInfo, locality, tlog.isReady() ? tlogRequests : PromiseStream(), s.storeID, true, oldLog, recovery ); recoveries.push_back(recovery.getFuture()); tl = handleIOErrors( tl, kv, s.storeID ); tl = handleIOErrors( tl, queue, s.storeID ); if(tlog.isReady()) { - tlog = tl; + tlog = oldLog.getFuture() || tl; } errorForwarders.add( forwardError( errors, "SharedTLog", s.storeID, tl ) ); } @@ -725,7 +726,7 @@ ACTOR Future workerServer( Reference connFile, Refe filesClosed.add( data->onClosed() ); filesClosed.add( queue->onClosed() ); - tlog = tLog( data, queue, dbInfo, locality, tlogRequests, logId, false, Promise() ); + tlog = tLog( data, queue, dbInfo, locality, tlogRequests, logId, false, Promise(), Promise() ); tlog = handleIOErrors( tlog, data, logId ); tlog = handleIOErrors( tlog, queue, logId ); errorForwarders.add( forwardError( errors, "SharedTLog", logId, tlog ) );