added tlog support for upgrading from 5.X clusters. Does not support upgrading from 4.X or earlier. Untested, storage servers still need the ability to change their tag.

This commit is contained in:
Evan Tschannen 2018-01-21 12:21:46 -08:00
parent 698ef4117e
commit 66b2218989
6 changed files with 763 additions and 778 deletions

File diff suppressed because it is too large Load Diff

View File

@ -375,9 +375,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
Optional<Tag> remoteTag;
int persistentDataFormat;
explicit LogData(TLogData* tLogData, TLogInterface interf, Optional<Tag> remoteTag, int persistentDataFormat = 1) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("bytesInput", cc), bytesDurable("bytesDurable", cc), remoteTag(remoteTag), persistentDataFormat(persistentDataFormat), logSystem(new AsyncVar<Reference<ILogSystem>>()),
explicit LogData(TLogData* tLogData, TLogInterface interf, Optional<Tag> remoteTag) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("bytesInput", cc), bytesDurable("bytesDurable", cc), remoteTag(remoteTag), logSystem(new AsyncVar<Reference<ILogSystem>>()),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void()), unrecoveredBefore(0)
{
@ -824,20 +823,7 @@ void peekMessagesFromMemory( Reference<LogData> self, TLogPeekRequest const& req
messages << int32_t(-1) << currentVersion;
}
if(self->persistentDataFormat == 0) {
BinaryReader rd( it->second.getLengthPtr(), it->second.expectedSize()+4, Unversioned() );
while(!rd.empty()) {
int32_t messageLength;
uint32_t subVersion;
rd >> messageLength >> subVersion;
messageLength += sizeof(uint16_t);
messages << messageLength << subVersion << uint16_t(0);
messageLength -= (sizeof(subVersion) + sizeof(uint16_t));
messages.serializeBytes(rd.readBytes(messageLength), messageLength);
}
} else {
messages << it->second.toStringRef();
}
messages << it->second.toStringRef();
}
}
@ -945,21 +931,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
for (auto &kv : kvs) {
auto ver = decodeTagMessagesKey(kv.key);
messages << int32_t(-1) << ver;
if(logData->persistentDataFormat == 0) {
BinaryReader rd( kv.value, Unversioned() );
while(!rd.empty()) {
int32_t messageLength;
uint32_t subVersion;
rd >> messageLength >> subVersion;
messageLength += sizeof(uint16_t);
messages << messageLength << subVersion << uint16_t(0);
messageLength -= (sizeof(subVersion) + sizeof(uint16_t));
messages.serializeBytes(rd.readBytes(messageLength), messageLength);
}
} else {
messages.serializeBytes(kv.value);
}
messages.serializeBytes(kv.value);
}
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
@ -1518,7 +1490,7 @@ ACTOR Future<Void> checkRecovered(TLogData* self) {
return Void();
}
ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality, Promise<Void> recovered, PromiseStream<InitializeTLogRequest> tlogRequests ) {
ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality, Promise<Void> oldLog, Promise<Void> recovered, PromiseStream<InitializeTLogRequest> tlogRequests ) {
state double startt = now();
state Reference<LogData> logData;
state KeyRange tagKeys;
@ -1556,9 +1528,17 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
}
state std::vector<Future<ErrorOr<Void>>> removed;
state int persistentDataFormat = 0;
if(fFormat.get().get() >= LiteralStringRef("FoundationDB/LogServer/2/4")) {
persistentDataFormat = 1;
if(fFormat.get().get() == LiteralStringRef("FoundationDB/LogServer/2/3")) {
//FIXME: need for upgrades from 5.X to 6.0, remove once this upgrade path is no longer needed
if(recovered.canBeSet()) recovered.send(Void());
oldLog.send(Void());
while(!tlogRequests.isEmpty()) {
tlogRequests.getFuture().pop().reply.sendError(recruitment_failed());
}
Void _ = wait( oldTLog::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid) );
throw internal_error();
}
ASSERT(fVers.get().size() == fRecoverCounts.get().size());
@ -1590,7 +1570,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
DUMPTOKEN( recruited.confirmRunning );
//We do not need the remoteTag, because we will not be loading any additional data
logData = Reference<LogData>( new LogData(self, recruited, Optional<Tag>(), persistentDataFormat) );
logData = Reference<LogData>( new LogData(self, recruited, Optional<Tag>()) );
logData->stopped = true;
self->id_data[id1] = logData;
id_interf[id1] = recruited;
@ -1986,7 +1966,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> recovered ) {
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered ) {
state TLogData self( tlogId, persistentData, persistentQueue, db );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
@ -1994,7 +1974,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
try {
if(restoreFromDisk) {
Void _ = wait( restorePersistentState( &self, locality, recovered, tlogRequests ) );
Void _ = wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
} else {
Void _ = wait( checkEmptyQueue(&self) && checkRecovered(&self) );
}

View File

@ -289,7 +289,7 @@ Future<Void> storageServer(
Promise<Void> const& recovered); // changes pssi->id() to be the recovered ID
Future<Void> masterServer( MasterInterface const& mi, Reference<AsyncVar<ServerDBInfo>> const& db, class ServerCoordinators const&, LifetimeToken const& lifetime );
Future<Void> masterProxyServer(MasterProxyInterface const& proxy, InitializeMasterProxyRequest const& req, Reference<AsyncVar<ServerDBInfo>> const& db);
Future<Void> tLog( class IKeyValueStore* const& persistentData, class IDiskQueue* const& persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& db, LocalityData const& locality, PromiseStream<InitializeTLogRequest> const& tlogRequests, UID const& tlogId, bool const& restoreFromDisk, Promise<Void> const& recovered ); // changes tli->id() to be the recovered ID
Future<Void> tLog( class IKeyValueStore* const& persistentData, class IDiskQueue* const& persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& db, LocalityData const& locality, PromiseStream<InitializeTLogRequest> const& tlogRequests, UID const& tlogId, bool const& restoreFromDisk, Promise<Void> const& oldLog, Promise<Void> const& recovered ); // changes tli->id() to be the recovered ID
Future<Void> debugQueryServer( DebugQueryRequest const& req );
Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& ccInterface, Reference<ClusterConnectionFile> const&, LocalityData const&, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<Void> resolver( ResolverInterface const& proxy, InitializeResolverRequest const&, Reference<AsyncVar<ServerDBInfo>> const& db );
@ -299,7 +299,7 @@ void registerThreadForProfiling();
void updateCpuProfiler(ProfilerRequest req);
namespace oldTLog {
Future<Void> tLog( IKeyValueStore* const& persistentData, IDiskQueue* const& persistentQueue, TLogInterface const& tli, Reference<AsyncVar<ServerDBInfo>> const& db );
Future<Void> tLog( IKeyValueStore* const& persistentData, IDiskQueue* const& persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& db, LocalityData const& locality, UID const& tlogId );
}
#endif

View File

@ -53,6 +53,7 @@
<ActorCompiler Include="LogSystemDiskQueueAdapter.actor.cpp" />
<ActorCompiler Include="LogSystemPeekCursor.actor.cpp" />
<ActorCompiler Include="LogRouter.actor.cpp" />
<ActorCompiler Include="OldTLogServer.actor.cpp" />
<ClCompile Include="SkipList.cpp" />
<ActorCompiler Include="WaitFailure.actor.cpp" />
<ActorCompiler Include="tester.actor.cpp" />
@ -299,4 +300,4 @@
<Target Name="MyPreCompileSteps" AfterTargets="CLCompile">
<Exec Command="..\bin\$(Configuration)\coveragetool.exe &quot;$(OutDir)coverage.$(TargetName).xml&quot; @(ActorCompiler -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLInclude -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLCompile -> '%(RelativeDir)%(Filename)%(Extension)', ' ')" />
</Target>
</Project>
</Project>

View File

@ -272,6 +272,7 @@
<ActorCompiler Include="workloads\DiskDurability.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="OldTLogServer.actor.cpp" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="SkipList.cpp" />
@ -365,4 +366,4 @@
<UniqueIdentifier>{de5e282f-8d97-4054-b795-0a75b772326f}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>
</Project>

View File

@ -623,14 +623,15 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
details["StorageEngine"] = s.storeType.toString();
startRole( s.storeID, interf.id(), "SharedTLog", details, "Restored" );
Promise<Void> oldLog;
Promise<Void> recovery;
Future<Void> tl = tLog( kv, queue, dbInfo, locality, tlog.isReady() ? tlogRequests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, recovery );
Future<Void> tl = tLog( kv, queue, dbInfo, locality, tlog.isReady() ? tlogRequests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery );
recoveries.push_back(recovery.getFuture());
tl = handleIOErrors( tl, kv, s.storeID );
tl = handleIOErrors( tl, queue, s.storeID );
if(tlog.isReady()) {
tlog = tl;
tlog = oldLog.getFuture() || tl;
}
errorForwarders.add( forwardError( errors, "SharedTLog", s.storeID, tl ) );
}
@ -725,7 +726,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() );
tlog = tLog( data, queue, dbInfo, locality, tlogRequests, logId, false, Promise<Void>() );
tlog = tLog( data, queue, dbInfo, locality, tlogRequests, logId, false, Promise<Void>(), Promise<Void>() );
tlog = handleIOErrors( tlog, data, logId );
tlog = handleIOErrors( tlog, queue, logId );
errorForwarders.add( forwardError( errors, "SharedTLog", logId, tlog ) );