Make LogData aware of the spill type it was created to perform.
The spilling type is now pulled out of the request, and then stored on LogData for later access, and persisted in the tlog metadata per tlog generation. It turns out that serializing types as Unversioned is a bit wonky.
This commit is contained in:
parent
24c46337e1
commit
d38a96ab73
|
@ -741,7 +741,7 @@ struct TLogSpillType {
|
|||
operator SpillType() const { return SpillType(type); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) { serializer(ar, type); }
|
||||
void serialize_unversioned(Ar& ar) { serializer(ar, type); }
|
||||
|
||||
std::string toString() const {
|
||||
switch( type ) {
|
||||
|
@ -759,10 +759,32 @@ struct TLogSpillType {
|
|||
return default_error_or();
|
||||
}
|
||||
|
||||
private:
|
||||
uint32_t type;
|
||||
};
|
||||
|
||||
template <class Ar> void load( Ar& ar, TLogSpillType& logSpillType ) { logSpillType.serialize_unversioned(ar); }
|
||||
template <class Ar> void save( Ar& ar, TLogSpillType const& logSpillType ) { const_cast<TLogSpillType&>(logSpillType).serialize_unversioned(ar); }
|
||||
|
||||
template <>
|
||||
struct struct_like_traits<TLogSpillType> : std::true_type {
|
||||
using Member = TLogSpillType;
|
||||
using types = pack<uint32_t>;
|
||||
|
||||
template <int i, class Context>
|
||||
static const index_t<i, types>& get(const Member& m, Context&) {
|
||||
if constexpr (i == 0) {
|
||||
return m.type;
|
||||
}
|
||||
}
|
||||
|
||||
template <int i, class Type, class Context>
|
||||
static const void assign(Member& m, const Type& t, Context&) {
|
||||
if constexpr (i == 0) {
|
||||
m = static_cast<TLogSpillType::SpillType>(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
//Contains the amount of free and total space for a storage server, in bytes
|
||||
struct StorageBytes {
|
||||
int64_t free;
|
||||
|
|
|
@ -197,6 +197,7 @@ private:
|
|||
static const KeyValueRef persistFormat( LiteralStringRef( "Format" ), LiteralStringRef("FoundationDB/LogServer/3/0") );
|
||||
static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/LogServer/3/0"), LiteralStringRef("FoundationDB/LogServer/4/0") );
|
||||
static const KeyRangeRef persistProtocolVersionKeys( LiteralStringRef( "ProtocolVersion/" ), LiteralStringRef( "ProtocolVersion0" ) );
|
||||
static const KeyRangeRef persistTLogSpillTypeKeys( LiteralStringRef( "TLogSpillType/" ), LiteralStringRef( "TLogSpillType0" ) );
|
||||
static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef( LiteralStringRef( "DbRecoveryCount/" ), LiteralStringRef( "DbRecoveryCount0" ) );
|
||||
|
||||
// Updated on updatePersistentData()
|
||||
|
@ -499,14 +500,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
Version logRouterPoppedVersion, logRouterPopToVersion;
|
||||
int8_t locality;
|
||||
UID recruitmentID;
|
||||
TLogSpillType logSpillType;
|
||||
std::set<Tag> allTags;
|
||||
Future<Void> terminated;
|
||||
FlowLock execOpLock;
|
||||
bool execOpCommitInProgress;
|
||||
int txsTags;
|
||||
|
||||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
|
||||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, TLogSpillType logSpillType, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion), logSpillType(logSpillType),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
minPoppedTagVersion(0), minPoppedTag(invalidTag),
|
||||
// These are initialized differently on init() or recovery
|
||||
|
@ -562,6 +564,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistTxsTagsKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistProtocolVersionKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistTLogSpillTypeKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryLocationKey)) );
|
||||
Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin);
|
||||
tLogData->persistentData->clear( KeyRangeRef( msgKey, strinc(msgKey) ) );
|
||||
|
@ -1797,6 +1800,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
|
|||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistTxsTagsKeys.begin), BinaryWriter::toValue(logData->txsTags, Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistRecoveryCountKeys.begin), BinaryWriter::toValue(logData->recoveryCount, Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistProtocolVersionKeys.begin), BinaryWriter::toValue(logData->protocolVersion, Unversioned()) ) );
|
||||
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistTLogSpillTypeKeys.begin), BinaryWriter::toValue(logData->logSpillType, Unversioned()) ) );
|
||||
|
||||
for(auto tag : logData->allTags) {
|
||||
ASSERT(!logData->getTagData(tag));
|
||||
|
@ -2321,11 +2325,12 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
state Future<Standalone<VectorRef<KeyValueRef>>> fTxsTags = storage->readRange(persistTxsTagsKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
|
||||
state Future<Standalone<VectorRef<KeyValueRef>>> fTLogSpillTypes = storage->readRange(persistTLogSpillTypeKeys);
|
||||
|
||||
// FIXME: metadata in queue?
|
||||
|
||||
wait( waitForAll( std::vector{fFormat, fRecoveryLocation} ) );
|
||||
wait( waitForAll( std::vector{fVers, fKnownCommitted, fLocality, fLogRouterTags, fTxsTags, fRecoverCounts, fProtocolVersions} ) );
|
||||
wait( waitForAll( std::vector{fVers, fKnownCommitted, fLocality, fLogRouterTags, fTxsTags, fRecoverCounts, fProtocolVersions, fTLogSpillTypes} ) );
|
||||
|
||||
if (fFormat.get().present() && !persistFormatReadableRange.contains( fFormat.get().get() )) {
|
||||
//FIXME: remove when we no longer need to test upgrades from 4.X releases
|
||||
|
@ -2403,9 +2408,10 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
|
|||
DUMPTOKEN( recruited.confirmRunning );
|
||||
|
||||
ProtocolVersion protocolVersion = BinaryReader::fromStringRef<ProtocolVersion>( fProtocolVersions.get()[idx].value, Unversioned() );
|
||||
TLogSpillType logSpillType = BinaryReader::fromStringRef<TLogSpillType>( fTLogSpillTypes.get()[idx].value, Unversioned() );
|
||||
|
||||
//We do not need the remoteTag, because we will not be loading any additional data
|
||||
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, std::vector<Tag>()) );
|
||||
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, logSpillType, std::vector<Tag>()) );
|
||||
logData->locality = id_locality[id1];
|
||||
logData->stopped = true;
|
||||
self->id_data[id1] = logData;
|
||||
|
@ -2608,7 +2614,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
it.second->stopCommit.trigger();
|
||||
}
|
||||
|
||||
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) );
|
||||
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.spillType, req.allTags) );
|
||||
self->id_data[recruited.id()] = logData;
|
||||
logData->locality = req.locality;
|
||||
logData->recoveryCount = req.epoch;
|
||||
|
|
|
@ -87,6 +87,7 @@ public: // introduced features
|
|||
PROTOCOL_VERSION_FEATURE(0x0FDB00B061030000LL, TLogVersion);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B061070000LL, PseudoLocalities);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B061070000LL, ShardedTxsTags);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B063000000LL, UnifiedTLogSpilling);
|
||||
};
|
||||
|
||||
// These impact both communications and the deserialization of certain database and IKeyValueStore keys.
|
||||
|
@ -96,7 +97,7 @@ public: // introduced features
|
|||
//
|
||||
// xyzdev
|
||||
// vvvv
|
||||
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B063000001LL);
|
||||
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B063010001LL);
|
||||
// This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to
|
||||
// change when we reach version 10.
|
||||
static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version");
|
||||
|
|
Loading…
Reference in New Issue