require key value store memory to recover cleanly when recovering the txnStateStore, since all of the data it is recovering has been fsync’ed

This commit is contained in:
Evan Tschannen 2018-08-31 13:07:48 -07:00
parent 1e2ce75ce4
commit 90bf277206
4 changed files with 16 additions and 11 deletions

View File

@ -78,7 +78,7 @@ protected:
extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false );
extern IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit );
extern IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent );
extern IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery );
inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string const& filename, UID logID, int64_t memoryLimit, bool checkChecksums=false, bool checkIntegrity=false ) {
switch( storeType ) {
@ -97,4 +97,4 @@ inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string con
Future<Void> GenerateIOLogChecksumFile(std::string const & filename);
Future<Void> KVFileCheck(std::string const & filename, bool const &integrity);
#endif
#endif

View File

@ -56,7 +56,7 @@ extern bool noUnseed;
class KeyValueStoreMemory : public IKeyValueStore, NonCopyable {
public:
KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent );
KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery );
// IClosable
virtual Future<Void> getError() { return log->getError(); }
@ -427,7 +427,7 @@ private:
return log->push( LiteralStringRef("\x01") ); // Changes here should be reflected in OP_DISK_OVERHEAD
}
ACTOR static Future<Void> recover( KeyValueStoreMemory* self ) {
ACTOR static Future<Void> recover( KeyValueStoreMemory* self, bool exactRecovery ) {
// 'uncommitted' variables track something that might be rolled back by an OpRollback, and are copied into permanent variables
// (in self) in OpCommit. OpRollback does the reverse (copying the permanent versions over the uncommitted versions)
// the uncommitted and committed variables should be equal initially (to whatever makes sense if there are no committed transactions recovered)
@ -559,6 +559,11 @@ private:
}
if (zeroFillSize) {
if( exactRecovery ) {
TraceEvent(SevError, "KVSMemExpectedExact", self->id);
ASSERT(false);
}
TEST( true ); // Fixing a partial commit at the end of the KeyValueStoreMemory log
for(int i=0; i<zeroFillSize; i++)
self->log->push( StringRef((const uint8_t*)"",1) );
@ -704,11 +709,11 @@ private:
}
};
KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent )
KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery )
: log(log), id(id), previousSnapshotEnd(-1), currentSnapshotEnd(-1), resetSnapshot(false), memoryLimit(memoryLimit), committedWriteBytes(0),
committedDataSize(0), transactionSize(0), transactionIsLarge(false), disableSnapshot(disableSnapshot), replaceContent(replaceContent), snapshotCount(0), firstCommitWithSnapshot(true)
{
recovering = recover( this );
recovering = recover( this, exactRecovery );
snapshotting = snapshot( this );
commitActors = actorCollection( addActor.getFuture() );
}
@ -716,9 +721,9 @@ KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memor
IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit ) {
TraceEvent("KVSMemOpening", logID).detail("Basename", basename).detail("MemoryLimit", memoryLimit);
IDiskQueue *log = openDiskQueue( basename, logID );
return new KeyValueStoreMemory( log, logID, memoryLimit, false, false );
return new KeyValueStoreMemory( log, logID, memoryLimit, false, false, false );
}
IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent ) {
return new KeyValueStoreMemory( queue, logID, memoryLimit, disableSnapshot, replaceContent );
IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery ) {
return new KeyValueStoreMemory( queue, logID, memoryLimit, disableSnapshot, replaceContent, exactRecovery );
}

View File

@ -1232,7 +1232,7 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, false);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
// ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.
// COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR is an estimate based on experiments and not an accurate one.

View File

@ -596,7 +596,7 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
// Recover transaction state store
if(self->txnStateStore) self->txnStateStore->close();
self->txnStateLogAdapter = openDiskQueueAdapter( oldLogSystem, txsTag );
self->txnStateStore = keyValueStoreLogSystem( self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false );
self->txnStateStore = keyValueStoreLogSystem( self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false, true );
// Versionstamped operations (particularly those applied from DR) define a minimum commit version
// that we may recover to, as they embed the version in user-readable data and require that no