2017-05-26 04:48:44 +08:00
/*
* KeyValueStoreMemory . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2018-10-20 01:30:13 +08:00
# include "fdbserver/IKeyValueStore.h"
# include "fdbserver/IDiskQueue.h"
2019-01-10 10:03:54 +08:00
# include "flow/IKeyValueContainer.h"
# include "flow/RadixTree.h"
2017-05-26 04:48:44 +08:00
# include "flow/ActorCollection.h"
2017-07-15 06:49:30 +08:00
# include "fdbclient/Notified.h"
2017-05-26 04:48:44 +08:00
# include "fdbclient/SystemData.h"
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
# define OP_DISK_OVERHEAD (sizeof(OpHeader) + 1)
extern bool noUnseed ;
2019-01-10 10:03:54 +08:00
template < typename Container >
2017-05-26 04:48:44 +08:00
class KeyValueStoreMemory : public IKeyValueStore , NonCopyable {
public :
2019-01-10 10:03:54 +08:00
KeyValueStoreMemory ( IDiskQueue * log , UID id , int64_t memoryLimit , KeyValueStoreType storeType , bool disableSnapshot ,
bool replaceContent , bool exactRecovery ) ;
2017-05-26 04:48:44 +08:00
// IClosable
virtual Future < Void > getError ( ) { return log - > getError ( ) ; }
virtual Future < Void > onClosed ( ) { return log - > onClosed ( ) ; }
2019-01-10 10:03:54 +08:00
virtual void dispose ( ) {
recovering . cancel ( ) ;
log - > dispose ( ) ;
if ( reserved_buffer ! = nullptr ) {
delete [ ] reserved_buffer ;
reserved_buffer = nullptr ;
}
delete this ;
}
virtual void close ( ) {
recovering . cancel ( ) ;
log - > close ( ) ;
if ( reserved_buffer ! = nullptr ) {
delete [ ] reserved_buffer ;
reserved_buffer = nullptr ;
}
delete this ;
}
2017-05-26 04:48:44 +08:00
// IKeyValueStore
2019-01-10 10:03:54 +08:00
virtual KeyValueStoreType getType ( ) { return type ; }
virtual std : : tuple < size_t , size_t , size_t > getSize ( ) { return data . size ( ) ; }
2017-05-26 04:48:44 +08:00
int64_t getAvailableSize ( ) {
2019-01-10 10:03:54 +08:00
int64_t residentSize = data . sumTo ( data . end ( ) ) + queue . totalSize ( ) + // doesn't account for overhead in queue
transactionSize ;
2017-05-26 04:48:44 +08:00
return memoryLimit - residentSize ;
}
virtual StorageBytes getStorageBytes ( ) {
StorageBytes diskQueueBytes = log - > getStorageBytes ( ) ;
// Try to bound how many in-memory bytes we might need to write to disk if we commit() now
int64_t uncommittedBytes = queue . totalSize ( ) + transactionSize ;
2019-01-10 10:03:54 +08:00
// Check that we have enough space in memory and on disk
2018-03-21 05:35:28 +08:00
int64_t freeSize = std : : min ( getAvailableSize ( ) , diskQueueBytes . free / 4 - uncommittedBytes ) ;
int64_t availableSize = std : : min ( getAvailableSize ( ) , diskQueueBytes . available / 4 - uncommittedBytes ) ;
2017-05-26 04:48:44 +08:00
int64_t totalSize = std : : min ( memoryLimit , diskQueueBytes . total / 4 - uncommittedBytes ) ;
2018-03-21 05:35:28 +08:00
return StorageBytes ( std : : max ( ( int64_t ) 0 , freeSize ) , std : : max ( ( int64_t ) 0 , totalSize ) , diskQueueBytes . used ,
2019-01-10 10:03:54 +08:00
std : : max ( ( int64_t ) 0 , availableSize ) ) ;
2017-05-26 04:48:44 +08:00
}
void semiCommit ( ) {
transactionSize + = queue . totalSize ( ) ;
2019-01-10 10:03:54 +08:00
if ( transactionSize > 0.5 * committedDataSize ) {
2017-05-26 04:48:44 +08:00
transactionIsLarge = true ;
2019-01-10 10:03:54 +08:00
TraceEvent ( " KVSMemSwitchingToLargeTransactionMode " , id )
. detail ( " TransactionSize " , transactionSize )
. detail ( " DataSize " , committedDataSize ) ;
2017-05-26 04:48:44 +08:00
TEST ( true ) ; // KeyValueStoreMemory switching to large transaction mode
2019-01-10 10:03:54 +08:00
TEST ( committedDataSize >
1e3 ) ; // KeyValueStoreMemory switching to large transaction mode with committed data
2017-05-26 04:48:44 +08:00
}
int64_t bytesWritten = commit_queue ( queue , true ) ;
committedWriteBytes + = bytesWritten ;
}
virtual void set ( KeyValueRef keyValue , const Arena * arena ) {
2019-01-10 10:03:54 +08:00
// A commit that occurs with no available space returns Never, so we can throw out all modifications
if ( getAvailableSize ( ) < = 0 ) return ;
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
if ( transactionIsLarge ) {
data . insert ( keyValue . key , keyValue . value ) ;
} else {
2017-05-26 04:48:44 +08:00
queue . set ( keyValue , arena ) ;
2019-01-10 10:03:54 +08:00
if ( recovering . isReady ( ) & & ! disableSnapshot ) {
2017-05-26 04:48:44 +08:00
semiCommit ( ) ;
}
}
}
virtual void clear ( KeyRangeRef range , const Arena * arena ) {
2019-01-10 10:03:54 +08:00
// A commit that occurs with no available space returns Never, so we can throw out all modifications
if ( getAvailableSize ( ) < = 0 ) return ;
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
if ( transactionIsLarge ) {
2017-05-26 04:48:44 +08:00
data . erase ( data . lower_bound ( range . begin ) , data . lower_bound ( range . end ) ) ;
2019-01-10 10:03:54 +08:00
} else {
2017-05-26 04:48:44 +08:00
queue . clear ( range , arena ) ;
2019-01-10 10:03:54 +08:00
if ( recovering . isReady ( ) & & ! disableSnapshot ) {
2017-05-26 04:48:44 +08:00
semiCommit ( ) ;
}
}
}
virtual Future < Void > commit ( bool sequential ) {
if ( getAvailableSize ( ) < = 0 ) {
TraceEvent ( SevError , " KeyValueStoreMemory_OutOfSpace " , id ) ;
return Never ( ) ;
}
2019-01-10 10:03:54 +08:00
if ( recovering . isError ( ) ) throw recovering . getError ( ) ;
if ( ! recovering . isReady ( ) ) return waitAndCommit ( this , sequential ) ;
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
if ( ! disableSnapshot & & replaceContent & & ! firstCommitWithSnapshot ) {
2018-05-02 10:43:35 +08:00
transactionSize + = SERVER_KNOBS - > REPLACE_CONTENTS_BYTES ;
committedWriteBytes + = SERVER_KNOBS - > REPLACE_CONTENTS_BYTES ;
2018-03-30 06:12:38 +08:00
semiCommit ( ) ;
}
2019-01-10 10:03:54 +08:00
if ( transactionIsLarge ) {
2017-05-26 04:48:44 +08:00
fullSnapshot ( data ) ;
resetSnapshot = true ;
committedWriteBytes = notifiedCommittedWriteBytes . get ( ) ;
2018-10-13 03:58:17 +08:00
overheadWriteBytes = 0 ;
2019-01-10 10:03:54 +08:00
if ( disableSnapshot ) {
2018-10-13 03:58:17 +08:00
return Void ( ) ;
}
log_op ( OpCommit , StringRef ( ) , StringRef ( ) ) ;
2019-01-10 10:03:54 +08:00
} else {
2017-05-26 04:48:44 +08:00
int64_t bytesWritten = commit_queue ( queue , ! disableSnapshot , sequential ) ;
2019-01-10 10:03:54 +08:00
if ( disableSnapshot ) {
2017-05-26 04:48:44 +08:00
return Void ( ) ;
2018-10-13 03:58:17 +08:00
}
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
if ( bytesWritten > 0 | | committedWriteBytes > notifiedCommittedWriteBytes . get ( ) ) {
committedWriteBytes + = bytesWritten + overheadWriteBytes +
OP_DISK_OVERHEAD ; // OP_DISK_OVERHEAD is for the following log_op(OpCommit)
notifiedCommittedWriteBytes . set ( committedWriteBytes ) ; // This set will cause snapshot items to be
// written, so it must happen before the OpCommit
2018-10-13 03:58:17 +08:00
log_op ( OpCommit , StringRef ( ) , StringRef ( ) ) ;
overheadWriteBytes = log - > getCommitOverhead ( ) ;
}
2017-05-26 04:48:44 +08:00
}
auto c = log - > commit ( ) ;
committedDataSize = data . sumTo ( data . end ( ) ) ;
transactionSize = 0 ;
transactionIsLarge = false ;
2018-03-30 06:12:38 +08:00
firstCommitWithSnapshot = false ;
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
addActor . send ( commitAndUpdateVersions ( this , c , previousSnapshotEnd ) ) ;
2017-05-26 04:48:44 +08:00
return c ;
}
2019-01-10 10:03:54 +08:00
virtual Future < Optional < Value > > readValue ( KeyRef key , Optional < UID > debugID = Optional < UID > ( ) ) {
if ( recovering . isError ( ) ) throw recovering . getError ( ) ;
2017-05-26 04:48:44 +08:00
if ( ! recovering . isReady ( ) ) return waitAndReadValue ( this , key ) ;
auto it = data . find ( key ) ;
if ( it = = data . end ( ) ) return Optional < Value > ( ) ;
2019-01-10 10:03:54 +08:00
return Optional < Value > ( it . getValue ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2019-01-10 10:03:54 +08:00
virtual Future < Optional < Value > > readValuePrefix ( KeyRef key , int maxLength ,
Optional < UID > debugID = Optional < UID > ( ) ) {
if ( recovering . isError ( ) ) throw recovering . getError ( ) ;
2017-05-26 04:48:44 +08:00
if ( ! recovering . isReady ( ) ) return waitAndReadValuePrefix ( this , key , maxLength ) ;
auto it = data . find ( key ) ;
if ( it = = data . end ( ) ) return Optional < Value > ( ) ;
2019-01-10 10:03:54 +08:00
auto val = it . getValue ( ) ;
if ( maxLength < val . size ( ) ) {
2017-05-26 04:48:44 +08:00
return Optional < Value > ( val . substr ( 0 , maxLength ) ) ;
2019-01-10 10:03:54 +08:00
} else {
2017-05-26 04:48:44 +08:00
return Optional < Value > ( val ) ;
}
}
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
2019-01-10 10:03:54 +08:00
virtual Future < Standalone < VectorRef < KeyValueRef > > > readRange ( KeyRangeRef keys , int rowLimit = 1 < < 30 ,
int byteLimit = 1 < < 30 ) {
if ( recovering . isError ( ) ) throw recovering . getError ( ) ;
2017-05-26 04:48:44 +08:00
if ( ! recovering . isReady ( ) ) return waitAndReadRange ( this , keys , rowLimit , byteLimit ) ;
Standalone < VectorRef < KeyValueRef > > result ;
if ( rowLimit > = 0 ) {
auto it = data . lower_bound ( keys . begin ) ;
2019-01-10 10:03:54 +08:00
while ( it ! = data . end ( ) & & rowLimit & & byteLimit > = 0 ) {
StringRef tempKey = it . getKey ( reserved_buffer , CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ) ;
if ( tempKey > = keys . end ) break ;
byteLimit - = sizeof ( KeyValueRef ) + tempKey . size ( ) + it . getValue ( ) . size ( ) ;
result . push_back_deep ( result . arena ( ) , KeyValueRef ( tempKey , it . getValue ( ) ) ) ;
2017-05-26 04:48:44 +08:00
+ + it ;
- - rowLimit ;
}
} else {
rowLimit = - rowLimit ;
2019-01-10 10:03:54 +08:00
auto it = data . previous ( data . lower_bound ( keys . end ) ) ;
while ( it ! = data . end ( ) & & rowLimit & & byteLimit > = 0 ) {
StringRef tempKey = it . getKey ( reserved_buffer , CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ) ;
if ( tempKey < keys . begin ) break ;
byteLimit - = sizeof ( KeyValueRef ) + tempKey . size ( ) + it . getValue ( ) . size ( ) ;
result . push_back_deep ( result . arena ( ) , KeyValueRef ( tempKey , it . getValue ( ) ) ) ;
2017-05-26 04:48:44 +08:00
it = data . previous ( it ) ;
- - rowLimit ;
}
}
return result ;
}
virtual void resyncLog ( ) {
2019-01-10 10:03:54 +08:00
ASSERT ( recovering . isReady ( ) ) ;
2017-05-26 04:48:44 +08:00
resetSnapshot = true ;
log_op ( OpSnapshotAbort , StringRef ( ) , StringRef ( ) ) ;
}
2019-01-10 10:03:54 +08:00
virtual void enableSnapshot ( ) { disableSnapshot = false ; }
2017-05-26 04:48:44 +08:00
private :
enum OpType {
OpSet ,
OpClear ,
OpClearToEnd ,
OpSnapshotItem ,
OpSnapshotEnd ,
OpSnapshotAbort , // terminate an in progress snapshot in order to start a full snapshot
2019-01-10 10:03:54 +08:00
OpCommit , // only in log, not in queue
OpRollback // only in log, not in queue
2017-05-26 04:48:44 +08:00
} ;
struct OpRef {
OpType op ;
StringRef p1 , p2 ;
OpRef ( ) { }
2019-01-10 10:03:54 +08:00
OpRef ( Arena & a , OpRef const & o ) : op ( o . op ) , p1 ( a , o . p1 ) , p2 ( a , o . p2 ) { }
size_t expectedSize ( ) { return p1 . expectedSize ( ) + p2 . expectedSize ( ) ; }
2017-05-26 04:48:44 +08:00
} ;
struct OpHeader {
int op ;
int len1 , len2 ;
} ;
struct OpQueue {
2019-01-10 10:03:54 +08:00
OpQueue ( ) : numBytes ( 0 ) { }
2017-05-26 04:48:44 +08:00
int totalSize ( ) const { return numBytes ; }
void clear ( ) {
numBytes = 0 ;
operations = Standalone < VectorRef < OpRef > > ( ) ;
arenas . clear ( ) ;
}
2019-01-10 10:03:54 +08:00
void rollback ( ) { clear ( ) ; }
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
void set ( KeyValueRef keyValue , const Arena * arena = NULL ) {
2017-05-26 04:48:44 +08:00
queue_op ( OpSet , keyValue . key , keyValue . value , arena ) ;
}
2019-01-10 10:03:54 +08:00
void clear ( KeyRangeRef range , const Arena * arena = NULL ) { queue_op ( OpClear , range . begin , range . end , arena ) ; }
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
void clear_to_end ( StringRef fromKey , const Arena * arena = NULL ) {
2017-05-26 04:48:44 +08:00
queue_op ( OpClearToEnd , fromKey , StringRef ( ) , arena ) ;
}
2019-01-10 10:03:54 +08:00
void queue_op ( OpType op , StringRef p1 , StringRef p2 , const Arena * arena ) {
2017-05-26 04:48:44 +08:00
numBytes + = p1 . size ( ) + p2 . size ( ) + sizeof ( OpHeader ) + sizeof ( OpRef ) ;
2019-01-10 10:03:54 +08:00
OpRef r ;
r . op = op ;
r . p1 = p1 ;
r . p2 = p2 ;
if ( arena = = NULL ) {
operations . push_back_deep ( operations . arena ( ) , r ) ;
2017-05-26 04:48:44 +08:00
} else {
2019-01-10 10:03:54 +08:00
operations . push_back ( operations . arena ( ) , r ) ;
2017-05-26 04:48:44 +08:00
arenas . push_back ( * arena ) ;
}
}
2019-01-10 10:03:54 +08:00
const OpRef * begin ( ) { return operations . begin ( ) ; }
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
const OpRef * end ( ) { return operations . end ( ) ; }
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
private :
Standalone < VectorRef < OpRef > > operations ;
uint64_t numBytes ;
std : : vector < Arena > arenas ;
2017-05-26 04:48:44 +08:00
} ;
2019-01-10 10:03:54 +08:00
KeyValueStoreType type ;
2017-05-26 04:48:44 +08:00
UID id ;
2019-01-10 10:03:54 +08:00
Container data ;
// reserved buffer for snapshot/fullsnapshot
uint8_t * reserved_buffer ;
2017-05-26 04:48:44 +08:00
OpQueue queue ; // mutations not yet commit()ted
2019-01-10 10:03:54 +08:00
IDiskQueue * log ;
2017-05-26 04:48:44 +08:00
Future < Void > recovering , snapshotting ;
int64_t committedWriteBytes ;
2018-10-13 03:58:17 +08:00
int64_t overheadWriteBytes ;
2017-05-26 04:48:44 +08:00
NotifiedVersion notifiedCommittedWriteBytes ;
Key recoveredSnapshotKey ; // After recovery, the next key in the currently uncompleted snapshot
2019-01-10 10:03:54 +08:00
IDiskQueue : : location
currentSnapshotEnd ; // The end of the most recently completed snapshot (this snapshot cannot be discarded)
IDiskQueue : : location previousSnapshotEnd ; // The end of the second most recently completed snapshot (on commit, this
// snapshot can be discarded)
2017-05-26 04:48:44 +08:00
PromiseStream < Future < Void > > addActor ;
Future < Void > commitActors ;
int64_t committedDataSize ;
int64_t transactionSize ;
bool transactionIsLarge ;
2019-01-10 10:03:54 +08:00
bool resetSnapshot ; // Set to true after a fullSnapshot is performed. This causes the regular snapshot mechanism to
// restart
2017-05-26 04:48:44 +08:00
bool disableSnapshot ;
2018-03-30 06:12:38 +08:00
bool replaceContent ;
bool firstCommitWithSnapshot ;
int snapshotCount ;
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
int64_t memoryLimit ; // The upper limit on the memory used by the store (excluding, possibly, some clear operations)
2017-05-26 04:48:44 +08:00
std : : vector < std : : pair < KeyValueMapPair , uint64_t > > dataSets ;
2019-01-10 10:03:54 +08:00
int64_t commit_queue ( OpQueue & ops , bool log , bool sequential = false ) {
2017-05-26 04:48:44 +08:00
int64_t total = 0 , count = 0 ;
IDiskQueue : : location log_location = 0 ;
2019-01-10 10:03:54 +08:00
for ( auto o = ops . begin ( ) ; o ! = ops . end ( ) ; + + o ) {
2017-05-26 04:48:44 +08:00
+ + count ;
total + = o - > p1 . size ( ) + o - > p2 . size ( ) + OP_DISK_OVERHEAD ;
if ( o - > op = = OpSet ) {
2019-01-10 10:03:54 +08:00
if ( sequential ) {
KeyValueMapPair pair ( o - > p1 , o - > p2 ) ;
2017-05-26 04:48:44 +08:00
dataSets . push_back ( std : : make_pair ( pair , pair . arena . getSize ( ) + data . getElementBytes ( ) ) ) ;
} else {
2019-01-10 10:03:54 +08:00
data . insert ( o - > p1 , o - > p2 ) ;
2017-05-26 04:48:44 +08:00
}
2019-01-10 10:03:54 +08:00
} else if ( o - > op = = OpClear ) {
if ( sequential ) {
2017-05-26 04:48:44 +08:00
data . insert ( dataSets ) ;
dataSets . clear ( ) ;
}
2019-01-10 10:03:54 +08:00
data . erase ( data . lower_bound ( o - > p1 ) , data . lower_bound ( o - > p2 ) ) ;
} else if ( o - > op = = OpClearToEnd ) {
if ( sequential ) {
2017-05-26 04:48:44 +08:00
data . insert ( dataSets ) ;
dataSets . clear ( ) ;
}
2019-01-10 10:03:54 +08:00
data . erase ( data . lower_bound ( o - > p1 ) , data . end ( ) ) ;
} else
ASSERT ( false ) ;
if ( log ) log_location = log_op ( o - > op , o - > p1 , o - > p2 ) ;
2017-05-26 04:48:44 +08:00
}
2019-01-10 10:03:54 +08:00
if ( sequential ) {
2017-05-26 04:48:44 +08:00
data . insert ( dataSets ) ;
dataSets . clear ( ) ;
}
bool ok = count < 1e6 ;
if ( ! ok ) {
2019-02-21 18:46:30 +08:00
TraceEvent ( /*ok ? SevInfo : */ SevWarnAlways , " KVSMemCommitQueue " , id )
2018-06-09 02:11:08 +08:00
. detail ( " Bytes " , total )
. detail ( " Log " , log )
. detail ( " Ops " , count )
2017-05-26 04:48:44 +08:00
. detail ( " LastLoggedLocation " , log_location )
. detail ( " Details " , count ) ;
}
ops . clear ( ) ;
return total ;
}
IDiskQueue : : location log_op ( OpType op , StringRef v1 , StringRef v2 ) {
2019-01-10 10:03:54 +08:00
OpHeader h = { ( int ) op , v1 . size ( ) , v2 . size ( ) } ;
log - > push ( StringRef ( ( const uint8_t * ) & h , sizeof ( h ) ) ) ;
log - > push ( v1 ) ;
log - > push ( v2 ) ;
return log - > push ( LiteralStringRef ( " \x01 " ) ) ; // Changes here should be reflected in OP_DISK_OVERHEAD
2017-05-26 04:48:44 +08:00
}
2018-09-01 04:07:48 +08:00
ACTOR static Future < Void > recover ( KeyValueStoreMemory * self , bool exactRecovery ) {
2019-07-31 01:14:39 +08:00
loop {
// 'uncommitted' variables track something that might be rolled back by an OpRollback, and are copied into permanent variables
// (in self) in OpCommit. OpRollback does the reverse (copying the permanent versions over the uncommitted versions)
// the uncommitted and committed variables should be equal initially (to whatever makes sense if there are no committed transactions recovered)
state Key uncommittedNextKey = self - > recoveredSnapshotKey ;
state IDiskQueue : : location uncommittedPrevSnapshotEnd = self - > previousSnapshotEnd = self - > log - > getNextReadLocation ( ) ; // not really, but popping up to here does nothing
state IDiskQueue : : location uncommittedSnapshotEnd = self - > currentSnapshotEnd = uncommittedPrevSnapshotEnd ;
state int zeroFillSize = 0 ;
state int dbgSnapshotItemCount = 0 ;
state int dbgSnapshotEndCount = 0 ;
state int dbgMutationCount = 0 ;
state int dbgCommitCount = 0 ;
state double startt = now ( ) ;
state UID dbgid = self - > id ;
state Future < Void > loggingDelay = delay ( 1.0 ) ;
state OpQueue recoveryQueue ;
state OpHeader h ;
TraceEvent ( " KVSMemRecoveryStarted " , self - > id )
. detail ( " SnapshotEndLocation " , uncommittedSnapshotEnd ) ;
try {
loop {
{
Standalone < StringRef > data = wait ( self - > log - > readNext ( sizeof ( OpHeader ) ) ) ;
if ( data . size ( ) ! = sizeof ( OpHeader ) ) {
if ( data . size ( ) ) {
TEST ( true ) ; // zero fill partial header in KeyValueStoreMemory
memset ( & h , 0 , sizeof ( OpHeader ) ) ;
memcpy ( & h , data . begin ( ) , data . size ( ) ) ;
zeroFillSize = sizeof ( OpHeader ) - data . size ( ) + h . len1 + h . len2 + 1 ;
}
TraceEvent ( " KVSMemRecoveryComplete " , self - > id )
. detail ( " Reason " , " Non-header sized data read " )
. detail ( " DataSize " , data . size ( ) )
. detail ( " ZeroFillSize " , zeroFillSize )
. detail ( " SnapshotEndLocation " , uncommittedSnapshotEnd )
. detail ( " NextReadLoc " , self - > log - > getNextReadLocation ( ) ) ;
break ;
2019-02-18 10:46:59 +08:00
}
2019-07-31 01:14:39 +08:00
h = * ( OpHeader * ) data . begin ( ) ;
}
Standalone < StringRef > data = wait ( self - > log - > readNext ( h . len1 + h . len2 + 1 ) ) ;
if ( data . size ( ) ! = h . len1 + h . len2 + 1 ) {
zeroFillSize = h . len1 + h . len2 + 1 - data . size ( ) ;
2019-02-18 10:46:59 +08:00
TraceEvent ( " KVSMemRecoveryComplete " , self - > id )
2019-07-31 01:14:39 +08:00
. detail ( " Reason " , " data specified by header does not exist " )
2019-02-18 10:46:59 +08:00
. detail ( " DataSize " , data . size ( ) )
. detail ( " ZeroFillSize " , zeroFillSize )
. detail ( " SnapshotEndLocation " , uncommittedSnapshotEnd )
2019-07-31 01:14:39 +08:00
. detail ( " OpCode " , h . op )
2019-02-18 10:46:59 +08:00
. detail ( " NextReadLoc " , self - > log - > getNextReadLocation ( ) ) ;
break ;
2017-05-26 04:48:44 +08:00
}
2019-07-31 01:14:39 +08:00
if ( data [ data . size ( ) - 1 ] ) {
StringRef p1 = data . substr ( 0 , h . len1 ) ;
StringRef p2 = data . substr ( h . len1 , h . len2 ) ;
if ( h . op = = OpSnapshotItem ) { // snapshot data item
/*if (p1 < uncommittedNextKey) {
TraceEvent ( SevError , " RecSnapshotBack " , self - > id )
. detail ( " NextKey " , uncommittedNextKey )
. detail ( " P1 " , p1 )
. detail ( " Nextlocation " , self - > log - > getNextReadLocation ( ) ) ;
}
ASSERT ( p1 > = uncommittedNextKey ) ; */
if ( p1 > = uncommittedNextKey )
recoveryQueue . clear ( KeyRangeRef ( uncommittedNextKey , p1 ) , & uncommittedNextKey . arena ( ) ) ; //FIXME: Not sure what this line is for, is it necessary?
recoveryQueue . set ( KeyValueRef ( p1 , p2 ) , & data . arena ( ) ) ;
uncommittedNextKey = keyAfter ( p1 ) ;
+ + dbgSnapshotItemCount ;
} else if ( h . op = = OpSnapshotEnd | | h . op = = OpSnapshotAbort ) { // snapshot complete
TraceEvent ( " RecSnapshotEnd " , self - > id )
2019-04-06 04:11:50 +08:00
. detail ( " NextKey " , uncommittedNextKey )
2019-07-31 01:14:39 +08:00
. detail ( " Nextlocation " , self - > log - > getNextReadLocation ( ) )
. detail ( " IsSnapshotEnd " , h . op = = OpSnapshotEnd ) ;
if ( h . op = = OpSnapshotEnd ) {
uncommittedPrevSnapshotEnd = uncommittedSnapshotEnd ;
uncommittedSnapshotEnd = self - > log - > getNextReadLocation ( ) ;
recoveryQueue . clear_to_end ( uncommittedNextKey , & uncommittedNextKey . arena ( ) ) ;
}
uncommittedNextKey = Key ( ) ;
+ + dbgSnapshotEndCount ;
} else if ( h . op = = OpSet ) { // set mutation
recoveryQueue . set ( KeyValueRef ( p1 , p2 ) , & data . arena ( ) ) ;
+ + dbgMutationCount ;
} else if ( h . op = = OpClear ) { // clear mutation
recoveryQueue . clear ( KeyRangeRef ( p1 , p2 ) , & data . arena ( ) ) ;
+ + dbgMutationCount ;
} else if ( h . op = = OpClearToEnd ) { //clear all data from begin key to end
recoveryQueue . clear_to_end ( p1 , & data . arena ( ) ) ;
} else if ( h . op = = OpCommit ) { // commit previous transaction
self - > commit_queue ( recoveryQueue , false ) ;
+ + dbgCommitCount ;
self - > recoveredSnapshotKey = uncommittedNextKey ;
self - > previousSnapshotEnd = uncommittedPrevSnapshotEnd ;
self - > currentSnapshotEnd = uncommittedSnapshotEnd ;
} else if ( h . op = = OpRollback ) { // rollback previous transaction
recoveryQueue . rollback ( ) ;
TraceEvent ( " KVSMemRecSnapshotRollback " , self - > id )
. detail ( " NextKey " , uncommittedNextKey ) ;
uncommittedNextKey = self - > recoveredSnapshotKey ;
uncommittedPrevSnapshotEnd = self - > previousSnapshotEnd ;
uncommittedSnapshotEnd = self - > currentSnapshotEnd ;
} else
ASSERT ( false ) ;
} else {
TraceEvent ( " KVSMemRecoverySkippedZeroFill " , self - > id )
. detail ( " PayloadSize " , data . size ( ) )
. detail ( " ExpectedSize " , h . len1 + h . len2 + 1 )
. detail ( " OpCode " , h . op )
. detail ( " EndsAt " , self - > log - > getNextReadLocation ( ) ) ;
}
2017-05-26 04:48:44 +08:00
2019-07-31 01:14:39 +08:00
if ( loggingDelay . isReady ( ) ) {
TraceEvent ( " KVSMemRecoveryLogSnap " , self - > id )
. detail ( " SnapshotItems " , dbgSnapshotItemCount )
. detail ( " SnapshotEnd " , dbgSnapshotEndCount )
. detail ( " Mutations " , dbgMutationCount )
. detail ( " Commits " , dbgCommitCount )
. detail ( " EndsAt " , self - > log - > getNextReadLocation ( ) ) ;
loggingDelay = delay ( 1.0 ) ;
}
2017-05-26 04:48:44 +08:00
2019-07-31 01:14:39 +08:00
wait ( yield ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2019-07-31 01:14:39 +08:00
if ( zeroFillSize ) {
if ( exactRecovery ) {
TraceEvent ( SevError , " KVSMemExpectedExact " , self - > id ) ;
ASSERT ( false ) ;
}
2017-05-26 04:48:44 +08:00
2019-07-31 01:14:39 +08:00
TEST ( true ) ; // Fixing a partial commit at the end of the KeyValueStoreMemory log
for ( int i = 0 ; i < zeroFillSize ; i + + )
self - > log - > push ( StringRef ( ( const uint8_t * ) " " , 1 ) ) ;
2018-09-01 04:07:48 +08:00
}
2019-07-31 01:14:39 +08:00
//self->rollback(); not needed, since we are about to discard anything left in the recoveryQueue
//TraceEvent("KVSMemRecRollback", self->id).detail("QueueEmpty", data.size() == 0);
// make sure that before any new operations are added to the log that all uncommitted operations are "rolled back"
self - > log_op ( OpRollback , StringRef ( ) , StringRef ( ) ) ; // rollback previous transaction
2018-09-01 04:07:48 +08:00
2019-07-31 01:14:39 +08:00
self - > committedDataSize = self - > data . sumTo ( self - > data . end ( ) ) ;
TraceEvent ( " KVSMemRecovered " , self - > id )
. detail ( " SnapshotItems " , dbgSnapshotItemCount )
. detail ( " SnapshotEnd " , dbgSnapshotEndCount )
. detail ( " Mutations " , dbgMutationCount )
. detail ( " Commits " , dbgCommitCount )
. detail ( " TimeTaken " , now ( ) - startt ) ;
self - > semiCommit ( ) ;
return Void ( ) ;
} catch ( Error & e ) {
bool ok = e . code ( ) = = error_code_operation_cancelled | | e . code ( ) = = error_code_file_not_found | | e . code ( ) = = error_code_disk_adapter_reset ;
TraceEvent ( ok ? SevInfo : SevError , " ErrorDuringRecovery " , dbgid ) . error ( e , true ) ;
if ( e . code ( ) ! = error_code_disk_adapter_reset ) {
throw e ;
}
self - > data . clear ( ) ;
self - > dataSets . clear ( ) ;
2017-05-26 04:48:44 +08:00
}
}
}
2019-01-10 10:03:54 +08:00
// Snapshots an entire data set
void fullSnapshot ( Container & snapshotData ) {
2017-05-26 04:48:44 +08:00
previousSnapshotEnd = log_op ( OpSnapshotAbort , StringRef ( ) , StringRef ( ) ) ;
2018-03-30 06:12:38 +08:00
replaceContent = false ;
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
// Clear everything since we are about to write the whole database
2017-05-26 04:48:44 +08:00
log_op ( OpClearToEnd , allKeys . begin , StringRef ( ) ) ;
int count = 0 ;
int64_t snapshotSize = 0 ;
2019-01-10 10:03:54 +08:00
for ( auto kv = snapshotData . begin ( ) ; kv ! = snapshotData . end ( ) ; + + kv ) {
StringRef tempKey = kv . getKey ( reserved_buffer , CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ) ;
log_op ( OpSnapshotItem , tempKey , kv . getValue ( ) ) ;
snapshotSize + = tempKey . size ( ) + kv . getValue ( ) . size ( ) + OP_DISK_OVERHEAD ;
2017-05-26 04:48:44 +08:00
+ + count ;
}
TraceEvent ( " FullSnapshotEnd " , id )
2019-01-10 10:03:54 +08:00
. detail ( " PreviousSnapshotEndLoc " , previousSnapshotEnd )
. detail ( " SnapshotSize " , snapshotSize )
. detail ( " SnapshotElements " , count ) ;
2017-05-26 04:48:44 +08:00
currentSnapshotEnd = log_op ( OpSnapshotEnd , StringRef ( ) , StringRef ( ) ) ;
}
ACTOR static Future < Void > snapshot ( KeyValueStoreMemory * self ) {
2018-08-11 04:57:10 +08:00
wait ( self - > recovering ) ;
2017-05-26 04:48:44 +08:00
state Key nextKey = self - > recoveredSnapshotKey ;
2019-01-10 10:03:54 +08:00
state bool nextKeyAfter = false ; // setting this to true is equilvent to setting nextKey = keyAfter(nextKey)
2017-05-26 04:48:44 +08:00
state uint64_t snapshotTotalWrittenBytes = 0 ;
state int lastDiff = 0 ;
state int snapItems = 0 ;
state uint64_t snapshotBytes = 0 ;
2019-03-19 06:03:43 +08:00
TraceEvent ( " KVSMemStartingSnapshot " , self - > id ) . detail ( " StartKey " , nextKey ) ;
2017-05-26 04:48:44 +08:00
loop {
2018-08-11 04:57:10 +08:00
wait ( self - > notifiedCommittedWriteBytes . whenAtLeast ( snapshotTotalWrittenBytes + 1 ) ) ;
2017-05-26 04:48:44 +08:00
2019-01-10 10:03:54 +08:00
if ( self - > resetSnapshot ) {
2017-05-26 04:48:44 +08:00
nextKey = Key ( ) ;
nextKeyAfter = false ;
snapItems = 0 ;
snapshotBytes = 0 ;
self - > resetSnapshot = false ;
}
auto next = nextKeyAfter ? self - > data . upper_bound ( nextKey ) : self - > data . lower_bound ( nextKey ) ;
int diff = self - > notifiedCommittedWriteBytes . get ( ) - snapshotTotalWrittenBytes ;
2019-01-10 10:03:54 +08:00
if ( diff > lastDiff & & diff > 5e7 )
2017-05-26 04:48:44 +08:00
TraceEvent ( SevWarnAlways , " ManyWritesAtOnce " , self - > id )
2019-01-10 10:03:54 +08:00
. detail ( " CommittedWrites " , self - > notifiedCommittedWriteBytes . get ( ) )
. detail ( " SnapshotWrites " , snapshotTotalWrittenBytes )
. detail ( " Diff " , diff )
. detail ( " LastOperationWasASnapshot " , nextKey = = Key ( ) & & ! nextKeyAfter ) ;
2017-05-26 04:48:44 +08:00
lastDiff = diff ;
if ( next = = self - > data . end ( ) ) {
2019-01-10 10:03:54 +08:00
auto thisSnapshotEnd = self - > log_op ( OpSnapshotEnd , StringRef ( ) , StringRef ( ) ) ;
2017-05-26 04:48:44 +08:00
//TraceEvent("SnapshotEnd", self->id)
2019-03-19 06:03:43 +08:00
// .detail("LastKey", lastKey.present() ? lastKey.get() : LiteralStringRef("<none>"))
2018-06-09 02:11:08 +08:00
// .detail("CurrentSnapshotEndLoc", self->currentSnapshotEnd)
// .detail("PreviousSnapshotEndLoc", self->previousSnapshotEnd)
// .detail("ThisSnapshotEnd", thisSnapshotEnd)
2017-05-26 04:48:44 +08:00
// .detail("Items", snapItems)
// .detail("CommittedWrites", self->notifiedCommittedWriteBytes.get())
// .detail("SnapshotSize", snapshotBytes);
ASSERT ( thisSnapshotEnd > = self - > currentSnapshotEnd ) ;
self - > previousSnapshotEnd = self - > currentSnapshotEnd ;
self - > currentSnapshotEnd = thisSnapshotEnd ;
2018-03-30 06:12:38 +08:00
2019-01-10 10:03:54 +08:00
if ( + + self - > snapshotCount = = 2 ) {
2018-03-30 06:12:38 +08:00
self - > replaceContent = false ;
}
2017-05-26 04:48:44 +08:00
nextKey = Key ( ) ;
nextKeyAfter = false ;
snapItems = 0 ;
snapshotBytes = 0 ;
snapshotTotalWrittenBytes + = OP_DISK_OVERHEAD ;
} else {
2019-01-10 10:03:54 +08:00
StringRef tempKey = next . getKey ( self - > reserved_buffer , CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ) ;
self - > log_op ( OpSnapshotItem , tempKey , next . getValue ( ) ) ;
nextKey = tempKey ;
2017-05-26 04:48:44 +08:00
nextKeyAfter = true ;
snapItems + + ;
2019-01-10 10:03:54 +08:00
uint64_t opBytes = tempKey . size ( ) + next . getValue ( ) . size ( ) + OP_DISK_OVERHEAD ;
2017-05-26 04:48:44 +08:00
snapshotBytes + = opBytes ;
snapshotTotalWrittenBytes + = opBytes ;
}
}
}
ACTOR static Future < Optional < Value > > waitAndReadValue ( KeyValueStoreMemory * self , Key key ) {
2018-08-11 04:57:10 +08:00
wait ( self - > recovering ) ;
2017-05-26 04:48:44 +08:00
return self - > readValue ( key ) . get ( ) ;
}
ACTOR static Future < Optional < Value > > waitAndReadValuePrefix ( KeyValueStoreMemory * self , Key key , int maxLength ) {
2018-08-11 04:57:10 +08:00
wait ( self - > recovering ) ;
2017-05-26 04:48:44 +08:00
return self - > readValuePrefix ( key , maxLength ) . get ( ) ;
}
ACTOR static Future < Standalone < VectorRef < KeyValueRef > > > waitAndReadRange ( KeyValueStoreMemory * self , KeyRange keys , int rowLimit , int byteLimit ) {
2018-08-11 04:57:10 +08:00
wait ( self - > recovering ) ;
2017-05-26 04:48:44 +08:00
return self - > readRange ( keys , rowLimit , byteLimit ) . get ( ) ;
}
ACTOR static Future < Void > waitAndCommit ( KeyValueStoreMemory * self , bool sequential ) {
2018-08-11 04:57:10 +08:00
wait ( self - > recovering ) ;
wait ( self - > commit ( sequential ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
ACTOR static Future < Void > commitAndUpdateVersions ( KeyValueStoreMemory * self , Future < Void > commit , IDiskQueue : : location location ) {
2018-08-11 04:57:10 +08:00
wait ( commit ) ;
2017-05-26 04:48:44 +08:00
self - > log - > pop ( location ) ;
return Void ( ) ;
}
} ;
2019-01-10 10:03:54 +08:00
template < typename Container >
KeyValueStoreMemory < Container > : : KeyValueStoreMemory ( IDiskQueue * log , UID id , int64_t memoryLimit ,
KeyValueStoreType storeType , bool disableSnapshot ,
bool replaceContent , bool exactRecovery )
: log ( log ) , id ( id ) , type ( storeType ) , previousSnapshotEnd ( - 1 ) , currentSnapshotEnd ( - 1 ) , resetSnapshot ( false ) ,
memoryLimit ( memoryLimit ) , committedWriteBytes ( 0 ) , overheadWriteBytes ( 0 ) , committedDataSize ( 0 ) , transactionSize ( 0 ) ,
transactionIsLarge ( false ) , disableSnapshot ( disableSnapshot ) , replaceContent ( replaceContent ) , snapshotCount ( 0 ) ,
firstCommitWithSnapshot ( true ) {
// create reserved buffer for radixtree store type
this - > reserved_buffer =
( storeType = = KeyValueStoreType : : MEMORY ) ? nullptr : new uint8_t [ CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ] ;
if ( this - > reserved_buffer ! = nullptr ) memset ( this - > reserved_buffer , 0 , CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT ) ;
recovering = recover ( this , exactRecovery ) ;
snapshotting = snapshot ( this ) ;
commitActors = actorCollection ( addActor . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2019-01-10 10:03:54 +08:00
IKeyValueStore * keyValueStoreMemory ( std : : string const & basename , UID logID , int64_t memoryLimit , std : : string ext ,
KeyValueStoreType storeType ) {
TraceEvent ( " KVSMemOpening " , logID )
. detail ( " Basename " , basename )
. detail ( " MemoryLimit " , memoryLimit )
. detail ( " StoreType " , storeType ) ;
2019-07-02 04:38:06 +08:00
IDiskQueue * log = openDiskQueue ( basename , ext , logID , DiskQueueVersion : : V1 ) ;
2019-01-10 10:03:54 +08:00
if ( storeType = = KeyValueStoreType : : MEMORY_RADIXTREE ) {
return new KeyValueStoreMemory < radix_tree > ( log , logID , memoryLimit , storeType , false , false , false ) ;
} else {
return new KeyValueStoreMemory < IKeyValueContainer > ( log , logID , memoryLimit , storeType , false , false , false ) ;
}
2017-05-26 04:48:44 +08:00
}
2018-09-01 04:07:48 +08:00
IKeyValueStore * keyValueStoreLogSystem ( class IDiskQueue * queue , UID logID , int64_t memoryLimit , bool disableSnapshot , bool replaceContent , bool exactRecovery ) {
2019-01-10 10:03:54 +08:00
return new KeyValueStoreMemory < IKeyValueContainer > ( queue , logID , memoryLimit , KeyValueStoreType : : MEMORY ,
disableSnapshot , replaceContent , exactRecovery ) ;
2017-07-15 06:49:30 +08:00
}