2017-05-26 04:48:44 +08:00
/*
* KeyValueStoreSQLite . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# define SQLITE_THREADSAFE 0 // also in sqlite3.amalgamation.c!
2019-05-17 16:03:44 +08:00
# include "fdbrpc/crc32c.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/IKeyValueStore.h"
# include "fdbserver/CoroFlow.h"
# include "fdbserver/Knobs.h"
2017-05-26 04:48:44 +08:00
# include "flow/Hash3.h"
extern " C " {
2018-10-20 01:30:13 +08:00
# include "fdbserver/sqlite/sqliteInt.h"
2017-05-26 04:48:44 +08:00
u32 sqlite3VdbeSerialGet ( const unsigned char * , u32 , Mem * ) ;
}
# include "flow/ThreadPrimitives.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/template_fdb.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/simulator.h"
2018-08-11 08:24:52 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
# if SQLITE_THREADSAFE == 0
# define sqlite3_mutex_enter(x)
# define sqlite3_mutex_leave(x)
# endif
void hexdump ( FILE * fout , StringRef val ) ;
/*#undef state
# include <Windows.h> * /
/*uint64_t getFileSize( const char* filename ) {
HANDLE f = CreateFile ( filename , GENERIC_READ , FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE , NULL , OPEN_EXISTING , 0 , NULL ) ;
if ( f = = INVALID_HANDLE_VALUE ) return 0 ;
DWORD hi , lo ;
lo = GetFileSize ( f , & hi ) ;
CloseHandle ( f ) ;
return ( uint64_t ( hi ) < < 32 ) + lo ;
} */
struct SpringCleaningStats {
int64_t springCleaningCount ;
int64_t lazyDeletePages ;
int64_t vacuumedPages ;
double springCleaningTime ;
double vacuumTime ;
double lazyDeleteTime ;
SpringCleaningStats ( ) : springCleaningCount ( 0 ) , lazyDeletePages ( 0 ) , vacuumedPages ( 0 ) , springCleaningTime ( 0.0 ) , vacuumTime ( 0.0 ) , lazyDeleteTime ( 0.0 ) { }
} ;
struct PageChecksumCodec {
2017-06-15 07:43:53 +08:00
PageChecksumCodec ( std : : string const & filename ) : pageSize ( 0 ) , reserveSize ( 0 ) , filename ( filename ) , silent ( false ) { }
2017-05-26 04:48:44 +08:00
int pageSize ;
int reserveSize ;
std : : string filename ;
bool silent ;
struct SumType {
bool operator = = ( const SumType & rhs ) const { return part1 = = rhs . part1 & & part2 = = rhs . part2 ; }
uint32_t part1 ;
uint32_t part2 ;
std : : string toString ( ) { return format ( " 0x%08x%08x " , part1 , part2 ) ; }
} ;
// Calculates and then either stores or verifies a checksum.
// The checksum is read/stored at the end of the page buffer.
// Page size is passed in as pageLen because this->pageSize is not always appropriate.
// If write is true then the checksum is written into the page and true is returned.
// If write is false then the checksum is compared to the in-page sum and the return value
// is whether or not the checksums were equal.
bool checksum ( Pgno pageNumber , void * data , int pageLen , bool write ) {
ASSERT ( pageLen > sizeof ( SumType ) ) ;
char * pData = ( char * ) data ;
int dataLen = pageLen - sizeof ( SumType ) ;
SumType * pSumInPage = ( SumType * ) ( pData + dataLen ) ;
2019-05-17 16:03:44 +08:00
if ( write ) {
2019-05-15 04:43:55 +08:00
// Always write a CRC32 checksum for new pages
2019-05-17 07:05:44 +08:00
pSumInPage - > part1 = 0 ; // Indicates CRC32 is being used
pSumInPage - > part2 = crc32c_append ( 0xfdbeefdb , static_cast < uint8_t * > ( data ) , dataLen ) ;
2019-05-17 16:03:44 +08:00
return true ;
2017-05-26 04:48:44 +08:00
}
2019-05-17 16:03:44 +08:00
SumType sum ;
if ( pSumInPage - > part1 = = 0 ) {
// part1 being 0 indicates with high probability that a CRC32 checksum
// was used, so check that first. If this checksum fails, there is still
// some chance the page was written with hashlittle2, so fall back to checking
// hashlittle2
sum . part1 = 0 ;
sum . part2 = crc32c_append ( 0xfdbeefdb , static_cast < uint8_t * > ( data ) , dataLen ) ;
if ( sum = = * pSumInPage ) return true ;
2017-05-26 04:48:44 +08:00
}
2019-05-17 16:03:44 +08:00
SumType hashLittle2Sum ;
hashLittle2Sum . part1 = pageNumber ; // DO NOT CHANGE
hashLittle2Sum . part2 = 0x5ca1ab1e ;
hashlittle2 ( pData , dataLen , & hashLittle2Sum . part1 , & hashLittle2Sum . part2 ) ;
if ( hashLittle2Sum = = * pSumInPage ) return true ;
2019-05-18 01:43:28 +08:00
if ( ! silent ) {
TraceEvent trEvent ( SevError , " SQLitePageChecksumFailure " ) ;
trEvent . error ( checksum_failed ( ) )
. detail ( " CodecPageSize " , pageSize )
. detail ( " CodecReserveSize " , reserveSize )
. detail ( " Filename " , filename )
. detail ( " PageNumber " , pageNumber )
. detail ( " PageSize " , pageLen )
. detail ( " ChecksumInPage " , pSumInPage - > toString ( ) )
. detail ( " ChecksumCalculatedHL2 " , hashLittle2Sum . toString ( ) ) ;
if ( pSumInPage - > part1 = = 0 ) trEvent . detail ( " ChecksumCalculatedCRC " , sum . toString ( ) ) ;
}
2019-05-17 16:03:44 +08:00
return false ;
2017-05-26 04:48:44 +08:00
}
static void * codec ( void * vpSelf , void * data , Pgno pageNumber , int op ) {
PageChecksumCodec * self = ( PageChecksumCodec * ) vpSelf ;
// Page write operations are 6 for DB page and 7 for journal page
bool write = ( op = = 6 | | op = = 7 ) ;
// Page read is operation 3, which must be the operation if it's not a write.
ASSERT ( write | | op = = 3 ) ;
// Page 1 is special. It contains the database configuration including Page Size and Reserve Size.
// SQLite can't get authoritative values for these things until the Pager Codec has validated (and
// potentially decrypted) Page 1 itself, so it can't tell the Pager Codec what those things are before
// Page 1 is handled. It will guess a Page Size of SQLITE_DEFAULT_PAGE_SIZE, and a Reserve Size based
// on the pre-verified (and perhaps still encrypted) header in the Page 1 data that it will then pass
// to the Pager Codec.
//
// So, Page 1 must be written and verifiable as a SQLITE_DEFAULT_PAGE_SIZE sized page as well as
// the actual configured page size for the database, if it is larger. A configured page size lower
// than the default (in other words 512) results in undefined behavior.
if ( pageNumber = = 1 ) {
if ( write & & self - > pageSize > SQLITE_DEFAULT_PAGE_SIZE ) {
self - > checksum ( pageNumber , data , SQLITE_DEFAULT_PAGE_SIZE , write ) ;
}
}
else {
// For Page Numbers other than 1, reserve size must be the size of the checksum.
if ( self - > reserveSize ! = sizeof ( SumType ) ) {
if ( ! self - > silent )
TraceEvent ( SevWarnAlways , " SQLitePageChecksumFailureBadReserveSize " )
. detail ( " CodecPageSize " , self - > pageSize )
. detail ( " CodecReserveSize " , self - > reserveSize )
. detail ( " Filename " , self - > filename )
. detail ( " PageNumber " , pageNumber ) ;
return NULL ;
}
}
if ( ! self - > checksum ( pageNumber , data , self - > pageSize , write ) )
return NULL ;
return data ;
}
static void sizeChange ( void * vpSelf , int new_pageSize , int new_reserveSize ) {
PageChecksumCodec * self = ( PageChecksumCodec * ) vpSelf ;
self - > pageSize = new_pageSize ;
self - > reserveSize = new_reserveSize ;
}
static void free ( void * vpSelf ) {
PageChecksumCodec * self = ( PageChecksumCodec * ) vpSelf ;
delete self ;
}
} ;
struct SQLiteDB : NonCopyable {
std : : string filename ;
sqlite3 * db ;
Btree * btree ;
int table , freetable ;
bool haveMutex ;
Reference < IAsyncFile > dbFile , walFile ;
bool page_checksums ;
bool fragment_values ;
PageChecksumCodec * pPagerCodec ; // we do NOT own this pointer, db does.
void beginTransaction ( bool write ) {
checkError ( " BtreeBeginTrans " , sqlite3BtreeBeginTrans ( btree , write ) ) ;
}
void endTransaction ( ) {
checkError ( " BtreeCommit " , sqlite3BtreeCommit ( btree ) ) ;
}
void rollback ( ) {
checkError ( " BtreeRollback " , sqlite3BtreeRollback ( btree ) ) ;
}
void open ( bool writable ) ;
void createFromScratch ( ) ;
SQLiteDB ( std : : string filename , bool page_checksums , bool fragment_values ) : filename ( filename ) , db ( NULL ) , btree ( NULL ) , table ( - 1 ) , freetable ( - 1 ) , haveMutex ( false ) , page_checksums ( page_checksums ) , fragment_values ( fragment_values ) { }
~ SQLiteDB ( ) {
if ( db ) {
if ( haveMutex ) {
sqlite3_mutex_leave ( db - > mutex ) ;
}
sqlite3_close ( db ) ;
}
}
void initPagerCodec ( ) {
if ( page_checksums ) {
int r = sqlite3_test_control ( SQLITE_TESTCTRL_RESERVE , db , sizeof ( PageChecksumCodec : : SumType ) ) ;
if ( r ! = 0 ) {
TraceEvent ( SevError , " BtreePageReserveSizeSetError " ) . detail ( " Filename " , filename ) . detail ( " ErrorCode " , r ) ;
ASSERT ( false ) ;
}
// Always start with a new pager codec with default options.
2017-06-15 07:43:53 +08:00
pPagerCodec = new PageChecksumCodec ( filename ) ;
2017-05-26 04:48:44 +08:00
sqlite3BtreePagerSetCodec ( btree , PageChecksumCodec : : codec , PageChecksumCodec : : sizeChange , PageChecksumCodec : : free , pPagerCodec ) ;
}
}
void checkError ( const char * context , int rc ) {
2019-05-11 05:01:52 +08:00
//if (deterministicRandom()->random01() < .001) rc = SQLITE_INTERRUPT;
2017-05-26 04:48:44 +08:00
if ( rc ) {
// Our exceptions don't propagate through sqlite, so we don't know for sure if the error that caused this was
// an injected fault. Assume that if fault injection is happening, this is an injected fault.
2017-06-01 08:03:15 +08:00
Error err = io_error ( ) ;
2017-06-16 08:40:19 +08:00
if ( g_network - > isSimulated ( ) & & ( g_simulator . getCurrentProcess ( ) - > fault_injection_p1 | | g_simulator . getCurrentProcess ( ) - > machine - > machineProcess - > fault_injection_p1 | | g_simulator . getCurrentProcess ( ) - > rebooting ) )
2017-05-26 04:48:44 +08:00
err = err . asInjectedFault ( ) ;
if ( db )
db - > errCode = rc ;
if ( rc = = SQLITE_NOMEM ) platform : : outOfMemory ( ) ; // SOMEDAY: Trap out of memory errors at allocation time; check out different allocation options in sqlite
TraceEvent ( SevError , " DiskError " ) . error ( err ) . detail ( " In " , context ) . detail ( " File " , filename ) . detail ( " SQLiteError " , sqlite3ErrStr ( rc ) ) . detail ( " SQLiteErrorCode " , rc ) . GetLastError ( ) ;
throw err ;
}
}
void checkpoint ( bool restart ) {
int logSize = 0 , checkpointCount = 0 ;
//double t = timer();
while ( true ) {
int rc = sqlite3_wal_checkpoint_v2 ( db , 0 , restart ? SQLITE_CHECKPOINT_RESTART : SQLITE_CHECKPOINT_FULL , & logSize , & checkpointCount ) ;
if ( ! rc ) break ;
if ( ( sqlite3_errcode ( db ) & 0xff ) = = SQLITE_BUSY ) {
//printf("#");
//threadSleep(.010);
sqlite3_sleep ( 10 ) ;
} else
checkError ( " checkpoint " , rc ) ;
}
//printf("Checkpoint (%0.1f ms): %d frames in log, %d checkpointed\n", (timer()-t)*1000, logSize, checkpointCount);
}
uint32_t freePages ( ) {
u32 fp = 0 ;
sqlite3BtreeGetMeta ( btree , BTREE_FREE_PAGE_COUNT , & fp ) ;
return fp ;
}
bool vacuum ( ) { // Returns true if vacuum is complete or stalled by a lazy free root
int rc = sqlite3BtreeIncrVacuum ( btree ) ;
if ( rc & & rc ! = SQLITE_DONE ) checkError ( " vacuum " , rc ) ;
return rc = = SQLITE_DONE ;
}
2017-05-27 08:43:28 +08:00
int check ( bool verbose ) {
2017-05-26 04:48:44 +08:00
int errors = 0 ;
int tables [ ] = { 1 , table , freetable } ;
TraceEvent ( " BTreeIntegrityCheckBegin " ) . detail ( " Filename " , filename ) ;
2017-05-27 08:43:28 +08:00
char * e = sqlite3BtreeIntegrityCheck ( btree , tables , 3 , 1000 , & errors , verbose ) ;
2017-05-26 04:48:44 +08:00
if ( ! ( g_network - > isSimulated ( ) & & ( g_simulator . getCurrentProcess ( ) - > fault_injection_p1 | | g_simulator . getCurrentProcess ( ) - > rebooting ) ) ) {
2018-09-24 23:40:18 +08:00
TraceEvent ( ( errors | | e ) ? SevError : SevInfo , " BTreeIntegrityCheckResults " ) . detail ( " Filename " , filename ) . detail ( " ErrorTotal " , errors ) ;
2017-05-27 08:43:28 +08:00
if ( e ! = nullptr ) {
// e is a string containing 1 or more lines. Create a separate trace event for each line.
char * lineStart = e ;
while ( lineStart ! = nullptr ) {
char * lineEnd = strstr ( lineStart , " \n " ) ;
if ( lineEnd ! = nullptr ) {
* lineEnd = ' \0 ' ;
+ + lineEnd ;
}
// If the line length found is not zero then print a trace event
if ( * lineStart ! = ' \0 ' )
2018-06-09 02:11:08 +08:00
TraceEvent ( SevError , " BTreeIntegrityCheck " ) . detail ( " Filename " , filename ) . detail ( " ErrorDetail " , lineStart ) ;
2017-05-27 08:43:28 +08:00
lineStart = lineEnd ;
}
}
2017-05-26 04:48:44 +08:00
TEST ( true ) ; // BTree integrity checked
}
if ( e ) sqlite3_free ( e ) ;
2017-05-27 08:43:28 +08:00
return errors ;
2017-05-26 04:48:44 +08:00
}
int checkAllPageChecksums ( ) ;
} ;
class Statement : NonCopyable {
SQLiteDB & db ;
sqlite3_stmt * stmt ;
public :
Statement ( SQLiteDB & db , const char * sql )
: db ( db ) , stmt ( NULL )
{
db . checkError ( " prepare " , sqlite3_prepare_v2 ( db . db , sql , - 1 , & stmt , NULL ) ) ;
}
~ Statement ( ) {
try {
db . checkError ( " finalize " , sqlite3_finalize ( stmt ) ) ;
} catch ( . . . ) {
}
}
Statement & reset ( ) {
db . checkError ( " reset " , sqlite3_reset ( stmt ) ) ;
return * this ;
}
Statement & param ( int i , StringRef value ) {
db . checkError ( " bind " , sqlite3_bind_blob ( stmt , i , value . begin ( ) , value . size ( ) , SQLITE_STATIC ) ) ;
return * this ;
}
Statement & param ( int i , int value ) {
db . checkError ( " bind " , sqlite3_bind_int ( stmt , i , value ) ) ;
return * this ;
}
Statement & execute ( ) {
int r = sqlite3_step ( stmt ) ;
if ( r = = SQLITE_ROW ) db . checkError ( " execute called on statement that returns rows " , r ) ;
if ( r ! = SQLITE_DONE ) db . checkError ( " execute " , r ) ;
return * this ;
}
bool nextRow ( ) {
int r = sqlite3_step ( stmt ) ;
if ( r = = SQLITE_ROW ) return true ;
if ( r = = SQLITE_DONE ) return false ;
db . checkError ( " nextRow " , r ) ;
__assume ( false ) ; // NOT REACHED
}
StringRef column ( int i ) {
return StringRef (
( const uint8_t * ) sqlite3_column_blob ( stmt , i ) ,
sqlite3_column_bytes ( stmt , i ) ) ;
}
} ;
void hexdump ( FILE * fout , StringRef val ) {
int buflen = val . size ( ) ;
const unsigned char * buf = val . begin ( ) ;
int i , j ;
for ( i = 0 ; i < buflen ; i + = 32 ) {
fprintf ( fout , " %06x: " , i ) ;
for ( j = 0 ; j < 32 ; j + + ) {
if ( j = = 16 )
fprintf ( fout , " " ) ;
if ( i + j < buflen )
fprintf ( fout , " %02x " , buf [ i + j ] ) ;
else
fprintf ( fout , " " ) ;
}
fprintf ( fout , " " ) ;
for ( j = 0 ; j < 32 ; j + + ) {
if ( j = = 16 )
fprintf ( fout , " " ) ;
if ( i + j < buflen )
fprintf ( fout , " %c " , isprint ( buf [ i + j ] ) ? buf [ i + j ] : ' . ' ) ;
}
fprintf ( fout , " \n " ) ;
}
}
Value encode ( KeyValueRef kv ) {
int keyCode = kv . key . size ( ) * 2 + 12 ;
int valCode = kv . value . size ( ) * 2 + 12 ;
int header_size = sqlite3VarintLen ( keyCode ) + sqlite3VarintLen ( valCode ) ;
int hh = sqlite3VarintLen ( header_size ) ;
header_size + = hh ;
if ( hh < sqlite3VarintLen ( header_size ) )
header_size + + ;
int size = header_size + kv . key . size ( ) + kv . value . size ( ) ;
Value v ;
uint8_t * d = new ( v . arena ( ) ) uint8_t [ size ] ;
( ( ValueRef & ) v ) = KeyRef ( d , size ) ;
d + = sqlite3PutVarint ( d , header_size ) ;
d + = sqlite3PutVarint ( d , keyCode ) ;
d + = sqlite3PutVarint ( d , valCode ) ;
memcpy ( d , kv . key . begin ( ) , kv . key . size ( ) ) ;
d + = kv . key . size ( ) ;
memcpy ( d , kv . value . begin ( ) , kv . value . size ( ) ) ;
d + = kv . value . size ( ) ;
ASSERT ( d = = v . begin ( ) + size ) ;
return v ;
}
// Fragments are encoded as (key, index, value) tuples
// An index of 0 indicates an unfragmented KV pair.
// For fragmented KV pairs, the values will be concatenated in index order.
//
// In the current implementation, index values are chosen to enable a single linear
// pass over the fragments, in forward or backward order, to immediately know the final
// unfragmented value size accurately enough to allocate a buffer that is certainly large
// enough to hold the defragmented bytes.
//
// However, the decoder could be made to work if these index value 'hints' become inaccurate
// due to a change in splitting logic or index numbering. The decoder would just have to support
// buffer expansion as needed.
//
// Note that changing the following value constitutes a change in index numbering.
# define KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR 4
Value encodeKVFragment ( KeyValueRef kv , uint32_t index ) {
int keyCode = kv . key . size ( ) * 2 + 12 ;
int valCode = kv . value . size ( ) * 2 + 12 ;
// The SQLite type code for the index is the minimal number of bytes needed to store
// a signed representation of the index value. The type code for 0 is 0 (which is
// actually the null type in SQLite).
int8_t indexCode = 0 ;
2019-12-04 13:48:30 +08:00
if ( index ! = 0 ) {
uint32_t tmp = index ;
while ( tmp ! = 0 ) {
+ + indexCode ;
tmp > > = 8 ;
}
// An increment is required if the high bit of the N-byte index value is set, since it is
// positive number but SQLite only stores signed values and would interpret it as negative.
if ( index > > ( 8 * indexCode - 1 ) ) + + indexCode ;
2017-05-26 04:48:44 +08:00
}
int header_size = sqlite3VarintLen ( keyCode ) + sizeof ( indexCode ) + sqlite3VarintLen ( valCode ) ;
int hh = sqlite3VarintLen ( header_size ) ;
header_size + = hh ;
if ( hh < sqlite3VarintLen ( header_size ) )
header_size + + ;
int size = header_size + kv . key . size ( ) + indexCode + kv . value . size ( ) ;
Value v ;
uint8_t * d = new ( v . arena ( ) ) uint8_t [ size ] ;
( ( ValueRef & ) v ) = KeyRef ( d , size ) ;
d + = sqlite3PutVarint ( d , header_size ) ;
d + = sqlite3PutVarint ( d , keyCode ) ;
* d + + = indexCode ;
d + = sqlite3PutVarint ( d , valCode ) ;
// Write key
memcpy ( d , kv . key . begin ( ) , kv . key . size ( ) ) ;
d + = kv . key . size ( ) ;
// Write index bytes, if any
for ( int i = indexCode - 1 ; i > = 0 ; - - i ) {
d [ i ] = ( uint8_t ) index ;
index > > = 8 ;
}
d + = indexCode ;
// Write value
memcpy ( d , kv . value . begin ( ) , kv . value . size ( ) ) ;
d + = kv . value . size ( ) ;
ASSERT ( d = = v . begin ( ) + size ) ;
return v ;
}
int getEncodedSize ( int keySize , int valuePrefixSize ) {
int keyCode = keySize * 2 + 12 ;
int header_size = sqlite3VarintLen ( keyCode ) + 8 ; // 8 is the maximum return value of sqlite3VarintLen(), so this is our worst case header size (for values larger than allowable database values)
int hh = sqlite3VarintLen ( header_size ) ;
header_size + = hh ;
if ( hh < sqlite3VarintLen ( header_size ) )
header_size + + ;
return header_size + keySize + valuePrefixSize ;
}
KeyValueRef decodeKV ( StringRef encoded ) {
uint8_t const * d = encoded . begin ( ) ;
uint64_t h , len1 , len2 ;
d + = sqlite3GetVarint ( d , ( u64 * ) & h ) ;
d + = sqlite3GetVarint ( d , ( u64 * ) & len1 ) ;
d + = sqlite3GetVarint ( d , ( u64 * ) & len2 ) ;
ASSERT ( d = = encoded . begin ( ) + h ) ;
ASSERT ( len1 > = 12 & & ! ( len1 & 1 ) ) ;
ASSERT ( len2 > = 12 & & ! ( len2 & 1 ) ) ;
len1 = ( len1 - 12 ) / 2 ;
len2 = ( len2 - 12 ) / 2 ;
ASSERT ( d + len1 + len2 = = encoded . end ( ) ) ;
return KeyValueRef ( KeyRef ( d , len1 ) , KeyRef ( d + len1 , len2 ) ) ;
}
// Given a key size and value prefix size, get the minimum bytes that must be read from the underlying
// btree tuple to safely read the prefix length from the value bytes (if the value is long enough)
int getEncodedKVFragmentSize ( int keySize , int valuePrefixSize ) {
int keyCode = keySize * 2 + 12 ;
int header_size = sqlite3VarintLen ( keyCode )
+ 1 // index code length
+ 8 ; // worst case for value size (larger than fdb api allows)
int hh = sqlite3VarintLen ( header_size ) ;
header_size + = hh ;
if ( hh < sqlite3VarintLen ( header_size ) )
header_size + + ;
return header_size + keySize
+ 4 // Max width allowed of index value
+ valuePrefixSize ;
}
// Decode (key, index, value) tuple.
// A present() Optional will always be returned UNLESS partial is true.
// If partial is true then the return will not be present() unless at least
// the full key and index were in the encoded buffer. The value returned will be 0 or
// more value bytes, however many were available.
// Note that a short encoded buffer must at *least* contain the header length varint.
Optional < KeyValueRef > decodeKVFragment ( StringRef encoded , uint32_t * index = NULL , bool partial = false ) {
uint8_t const * d = encoded . begin ( ) ;
uint64_t h , len1 , len2 ;
d + = sqlite3GetVarint ( d , ( u64 * ) & h ) ;
// Make sure entire header is present, else return nothing
if ( partial & & encoded . size ( ) < h )
return Optional < KeyValueRef > ( ) ;
d + = sqlite3GetVarint ( d , ( u64 * ) & len1 ) ;
const uint8_t indexLen = * d + + ;
ASSERT ( indexLen < = 4 ) ;
d + = sqlite3GetVarint ( d , ( u64 * ) & len2 ) ;
ASSERT ( d = = encoded . begin ( ) + h ) ;
ASSERT ( len1 > = 12 & & ! ( len1 & 1 ) ) ;
ASSERT ( len2 > = 12 & & ! ( len2 & 1 ) ) ;
len1 = ( len1 - 12 ) / 2 ;
len2 = ( len2 - 12 ) / 2 ;
if ( partial ) {
// If the key and index aren't complete, return nothing.
if ( d + len1 + indexLen > encoded . end ( ) )
return Optional < KeyValueRef > ( ) ;
// Encoded size shouldn't be *larger* than the record described by the header no matter what.
ASSERT ( d + len1 + indexLen + len2 > = encoded . end ( ) ) ;
// Shorten value length to be whatever bytes remain after the header/key/index
len2 = std : : min ( len2 , ( uint64_t ) ( encoded . end ( ) - indexLen - len1 - d ) ) ;
}
else {
// But for non partial records encoded size should be exactly the size of the described record.
ASSERT ( d + len1 + indexLen + len2 = = encoded . end ( ) ) ;
}
// Decode big endian index
if ( index ! = nullptr ) {
if ( indexLen = = 0 )
* index = 0 ;
else {
const uint8_t * begin = d + len1 ;
const uint8_t * end = begin + indexLen ;
* index = ( uint8_t ) * begin + + ;
while ( begin < end ) {
* index < < = 8 ;
* index | = * begin + + ;
}
}
}
return KeyValueRef ( KeyRef ( d , len1 ) , KeyRef ( d + len1 + indexLen , len2 ) ) ;
}
KeyValueRef decodeKVPrefix ( StringRef encoded , int maxLength ) {
uint8_t const * d = encoded . begin ( ) ;
uint64_t h , len1 , len2 ;
d + = sqlite3GetVarint ( d , ( u64 * ) & h ) ;
d + = sqlite3GetVarint ( d , ( u64 * ) & len1 ) ;
d + = sqlite3GetVarint ( d , ( u64 * ) & len2 ) ;
ASSERT ( d = = encoded . begin ( ) + h ) ;
ASSERT ( len1 > = 12 & & ! ( len1 & 1 ) ) ;
ASSERT ( len2 > = 12 & & ! ( len2 & 1 ) ) ;
len1 = ( len1 - 12 ) / 2 ;
len2 = ( len2 - 12 ) / 2 ;
len2 = std : : min ( len2 , ( uint64_t ) maxLength ) ;
ASSERT ( d + len1 + len2 < = encoded . end ( ) ) ;
return KeyValueRef ( KeyRef ( d , len1 ) , KeyRef ( d + len1 , len2 ) ) ;
}
Value encodeKey ( KeyRef key , bool using_fragments ) {
int keyCode = key . size ( ) * 2 + 12 ;
int header_size = sqlite3VarintLen ( keyCode ) ;
if ( using_fragments ) // will be encoded as key, 0 (where 0 is really a null)
+ + header_size ;
int hh = sqlite3VarintLen ( header_size ) ;
header_size + = hh ;
if ( hh < sqlite3VarintLen ( header_size ) )
header_size + + ;
int size = header_size + key . size ( ) ;
Value v ;
uint8_t * d = new ( v . arena ( ) ) uint8_t [ size ] ;
( ( ValueRef & ) v ) = KeyRef ( d , size ) ;
d + = sqlite3PutVarint ( d , header_size ) ;
d + = sqlite3PutVarint ( d , keyCode ) ;
if ( using_fragments )
* d + + = 0 ;
memcpy ( d , key . begin ( ) , key . size ( ) ) ;
d + = key . size ( ) ;
ASSERT ( d = = v . begin ( ) + size ) ;
return v ;
}
struct SQLiteTransaction {
SQLiteDB & db ;
bool shouldCommit ;
SQLiteTransaction ( SQLiteDB & db , bool write ) : db ( db ) , shouldCommit ( false ) {
db . beginTransaction ( write ) ;
}
void commit ( ) {
shouldCommit = true ;
}
~ SQLiteTransaction ( ) {
try {
if ( shouldCommit )
db . endTransaction ( ) ;
else
db . rollback ( ) ;
} catch ( . . . ) { }
}
} ;
struct IntKeyCursor {
SQLiteDB & db ;
BtCursor * cursor ;
IntKeyCursor ( SQLiteDB & db , int table , bool write ) : cursor ( 0 ) , db ( db ) {
cursor = ( BtCursor * ) new char [ sqlite3BtreeCursorSize ( ) ] ;
sqlite3BtreeCursorZero ( cursor ) ;
db . checkError ( " BtreeCursor " , sqlite3BtreeCursor ( db . btree , table , write , NULL , cursor ) ) ;
}
~ IntKeyCursor ( ) {
if ( cursor ) {
try {
db . checkError ( " BtreeCloseCursor " , sqlite3BtreeCloseCursor ( cursor ) ) ;
} catch ( . . . ) { }
delete [ ] ( char * ) cursor ;
}
}
} ;
struct RawCursor {
SQLiteDB & db ;
BtCursor * cursor ;
KeyInfo keyInfo ;
bool valid ;
operator bool ( ) const { return valid ; }
RawCursor ( SQLiteDB & db , int table , bool write ) : cursor ( 0 ) , db ( db ) , valid ( false ) {
keyInfo . db = db . db ;
keyInfo . enc = db . db - > aDb [ 0 ] . pSchema - > enc ;
keyInfo . aColl [ 0 ] = db . db - > pDfltColl ;
keyInfo . aSortOrder = 0 ;
keyInfo . nField = 1 ;
try {
cursor = ( BtCursor * ) new char [ sqlite3BtreeCursorSize ( ) ] ;
sqlite3BtreeCursorZero ( cursor ) ;
db . checkError ( " BtreeCursor " , sqlite3BtreeCursor ( db . btree , table , write , & keyInfo , cursor ) ) ;
} catch ( . . . ) {
destroyCursor ( ) ;
throw ;
}
}
~ RawCursor ( ) {
destroyCursor ( ) ;
}
void destroyCursor ( ) {
if ( cursor ) {
try {
db . checkError ( " BtreeCloseCursor " , sqlite3BtreeCloseCursor ( cursor ) ) ;
} catch ( . . . ) {
TraceEvent ( SevError , " RawCursorDestructionError " ) ;
}
delete [ ] ( char * ) cursor ;
}
}
void moveFirst ( ) {
int empty = 1 ;
db . checkError ( " BtreeFirst " , sqlite3BtreeFirst ( cursor , & empty ) ) ;
valid = ! empty ;
}
void moveNext ( ) {
int empty = 1 ;
db . checkError ( " BtreeNext " , sqlite3BtreeNext ( cursor , & empty ) ) ;
valid = ! empty ;
}
void movePrevious ( ) {
int empty = 1 ;
db . checkError ( " BtreePrevious " , sqlite3BtreePrevious ( cursor , & empty ) ) ;
valid = ! empty ;
}
int size ( ) {
int64_t size ;
db . checkError ( " BtreeKeySize " , sqlite3BtreeKeySize ( cursor , ( i64 * ) & size ) ) ;
ASSERT ( size < ( 1 < < 30 ) ) ;
return size ;
}
Value getEncodedRow ( ) {
int s = size ( ) ;
Value v ;
uint8_t * d = new ( v . arena ( ) ) uint8_t [ s ] ;
db . checkError ( " BtreeKey " , sqlite3BtreeKey ( cursor , 0 , s , d ) ) ;
( ( ValueRef & ) v ) = KeyRef ( d , s ) ;
return v ;
}
ValueRef getEncodedRow ( Arena & arena ) {
int s = size ( ) ;
uint8_t * d = new ( arena ) uint8_t [ s ] ;
db . checkError ( " BtreeKey " , sqlite3BtreeKey ( cursor , 0 , s , d ) ) ;
return KeyRef ( d , s ) ;
}
ValueRef getEncodedRowPrefix ( Arena & arena , int maxEncodedSize ) {
int s = std : : min ( size ( ) , maxEncodedSize ) ;
uint8_t * d = new ( arena ) uint8_t [ s ] ;
db . checkError ( " BtreeKey " , sqlite3BtreeKey ( cursor , 0 , s , d ) ) ;
return KeyRef ( d , s ) ;
}
void insertFragment ( KeyValueRef kv , uint32_t index , int seekResult ) {
Value v = encodeKVFragment ( kv , index ) ;
db . checkError ( " BtreeInsert " , sqlite3BtreeInsert ( cursor , v . begin ( ) , v . size ( ) , NULL , 0 , 0 , 0 , seekResult ) ) ;
}
void remove ( ) {
db . checkError ( " BtreeDelete " , sqlite3BtreeDelete ( cursor ) ) ;
}
void set ( KeyValueRef kv ) {
if ( db . fragment_values ) {
// Unlike a read, where we need to access fragments in fully forward or reverse order,
// here we just want to delete any existing fragments for the key. It does not matter
// what order we delete them in, and SQLite requires us to seek after every delete, so
// the fastest way to do this is to repeatedly seek to the tuple prefix (key, ) and
// delete the current fragment until nothing is there.
// This should result in almost identical performance to non-fragmenting mode for single fragment kv pairs.
int seekResult = moveTo ( kv . key , true ) ; // second arg means to ignore fragmenting and seek to (key, )
while ( seekResult = = 0 ) {
remove ( ) ;
seekResult = moveTo ( kv . key , true ) ;
}
const int primaryPageUsable = SERVER_KNOBS - > SQLITE_FRAGMENT_PRIMARY_PAGE_USABLE ;
const int overflowPageUsable = SERVER_KNOBS - > SQLITE_FRAGMENT_OVERFLOW_PAGE_USABLE ;
int fragments = 1 ;
int valuePerFragment = kv . value . size ( ) ;
// Figure out if we would benefit from fragmenting this kv pair. The key size must be less than
// primary page usable size, and the value and key size together must exceeed the primary page usable size.
if ( ( kv . key . size ( ) + kv . value . size ( ) ) > primaryPageUsable
& & kv . key . size ( ) < primaryPageUsable ) {
// Just the part of the value that would be in a partially-filled overflow page
int overflowPartialBytes = ( kv . expectedSize ( ) - primaryPageUsable ) % overflowPageUsable ;
// Number of bytes wasted in the unfragmented case
int unfragmentedWaste = overflowPageUsable - overflowPartialBytes ;
// Total space used for unfragmented form
int unfragmentedTotal = kv . expectedSize ( ) + unfragmentedWaste ;
// Value bytes that can fit in the primary page for each fragment
int primaryPageValueBytes = primaryPageUsable - kv . key . size ( ) ;
// Calculate how many total fragments it would take to spread the partial overflow page bytes and the first fragment's primary
// page value bytes evenly over multiple tuples that fit in primary pages.
fragments = ( primaryPageValueBytes + overflowPartialBytes + primaryPageValueBytes - 1 ) / primaryPageValueBytes ;
// Number of bytes wasted in the fragmented case (for the extra key copies)
int fragmentedWaste = kv . key . size ( ) * ( fragments - 1 ) ;
// Total bytes used for the fragmented case
2019-03-30 04:21:15 +08:00
//int fragmentedTotal = kv.expectedSize() + fragmentedWaste;
2017-05-26 04:48:44 +08:00
// Calculate bytes saved by having extra key instances stored vs the original partial overflow page bytes.
int savings = unfragmentedWaste - fragmentedWaste ;
double reduction = ( double ) savings / unfragmentedTotal ;
//printf("K: %5d V: %6d OVERFLOW: %5d FRAGMENTS: %3d SAVINGS: %4d FRAG: %7d UNFRAG: %7d REDUCTION: %.3f\n",
//kv.key.size(), kv.value.size(), overflowPartialBytes, fragments, savings, fragmentedTotal, unfragmentedTotal, reduction);
if ( reduction < SERVER_KNOBS - > SQLITE_FRAGMENT_MIN_SAVINGS )
fragments = 1 ;
else
valuePerFragment = ( primaryPageValueBytes + overflowPartialBytes + fragments - 1 ) / fragments ;
}
if ( fragments = = 1 ) {
insertFragment ( kv , 0 , seekResult ) ;
return ;
}
// First index is ceiling(value_size / KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR)
uint32_t nextIndex = ( kv . value . size ( ) + KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR - 1 ) / KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR ;
// Last index is ceiling(value_size / (KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR / 2) )
uint32_t finalIndex = ( kv . value . size ( ) + ( KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR / 2 ) - 1 ) / ( KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR / 2 ) ;
int bytesLeft = kv . value . size ( ) ;
int readPos = 0 ;
while ( bytesLeft > 0 ) {
- - fragments ; // remaining ideal fragment count
int fragSize = ( fragments = = 0 ) ? bytesLeft : std : : min < int > ( bytesLeft , valuePerFragment ) ;
// The last fragment must have an index of finalIndex or higher.
if ( fragSize = = bytesLeft & & nextIndex < finalIndex )
nextIndex = finalIndex ;
//printf("insert ks %d vs %d fragment %d, %dbytes\n", kv.key.size(), kv.value.size(), nextIndex, fragSize);
insertFragment ( KeyValueRef ( kv . key , kv . value . substr ( readPos , fragSize ) ) , nextIndex , seekResult ) ;
// seekResult can only be used for the first insertion.
if ( seekResult ! = 0 )
seekResult = 0 ;
readPos + = fragSize ;
bytesLeft - = fragSize ;
+ + nextIndex ;
}
}
else {
int r = moveTo ( kv . key ) ;
if ( ! r ) remove ( ) ;
Value v = encode ( kv ) ;
db . checkError ( " BTreeInsert " , sqlite3BtreeInsert ( cursor , v . begin ( ) , v . size ( ) , NULL , 0 , 0 , 0 , r ) ) ;
}
}
void clearOne ( KeyRangeRef keys ) {
ASSERT ( ! db . fragment_values ) ;
int r = moveTo ( keys . begin ) ;
if ( r < 0 ) moveNext ( ) ;
ASSERT ( valid & & decodeKV ( getEncodedRow ( ) ) . key < keys . end ) ;
remove ( ) ;
}
void clear ( KeyRangeRef keys ) {
// TODO: This is really slow!
while ( true ) {
int r = moveTo ( keys . begin ) ;
if ( r < 0 ) moveNext ( ) ;
if ( ! valid | | ( db . fragment_values ? decodeKVFragment ( getEncodedRow ( ) ) . get ( ) . key : decodeKV ( getEncodedRow ( ) ) . key ) > = keys . end )
break ;
remove ( ) ;
}
}
void fastClear ( KeyRangeRef keys , bool & freeTableEmpty ) {
vector < int > clearBuffer ( SERVER_KNOBS - > CLEAR_BUFFER_SIZE ) ;
clearBuffer [ 0 ] = 0 ;
while ( true ) {
if ( moveTo ( keys . begin ) < 0 ) moveNext ( ) ;
RawCursor endCursor ( db , db . table , false ) ;
if ( endCursor . moveTo ( keys . end ) > = 0 ) endCursor . movePrevious ( ) ;
if ( ! valid | | ! endCursor
| | ( db . fragment_values ? ( decodeKVFragment ( getEncodedRow ( ) ) . get ( ) . key > = decodeKVFragment ( endCursor . getEncodedRow ( ) ) . get ( ) . key )
: ( decodeKV ( getEncodedRow ( ) ) . key > decodeKV ( endCursor . getEncodedRow ( ) ) . key )
)
)
break ; // If empty stop!
int rc = sqlite3BtreeDeleteRange ( cursor , endCursor . cursor , & clearBuffer [ 0 ] , & clearBuffer [ 0 ] + clearBuffer . size ( ) ) ;
if ( rc = = 201 ) continue ;
if ( ! rc ) break ;
db . checkError ( " BtreeDeleteRange " , rc ) ;
}
if ( clearBuffer [ 0 ] ) {
//printf("fastClear(%s,%s): %d pages freed\n", printable(keys.begin).c_str(), printable(keys.end).c_str(), clearBuffer[0]);
IntKeyCursor fc ( db , db . freetable , true ) ;
int pagesDeleted = 0 ;
db . checkError ( " BtreeLazyDelete " , sqlite3BtreeLazyDelete ( fc . cursor , & clearBuffer [ 0 ] , & clearBuffer [ 0 ] + clearBuffer . size ( ) , 0 , & pagesDeleted ) ) ;
ASSERT ( pagesDeleted = = 0 ) ;
freeTableEmpty = false ;
}
}
int lazyDelete ( int desiredPages ) {
vector < int > clearBuffer ( SERVER_KNOBS - > CLEAR_BUFFER_SIZE ) ;
clearBuffer [ 0 ] = 0 ;
IntKeyCursor fc ( db , db . freetable , true ) ;
int pagesDeleted = 0 ;
db . checkError ( " BtreeLazyDelete " , sqlite3BtreeLazyDelete ( fc . cursor , & clearBuffer [ 0 ] , & clearBuffer [ 0 ] + clearBuffer . size ( ) , desiredPages , & pagesDeleted ) ) ;
return pagesDeleted ;
}
// Reads and reconstitutes kv fragments given cursor, an arena to allocate in, and a direction to move the cursor.
// getNext() returns the next KV pair, if there is one
// peek() returns the next key that would be read by getNext(), if there is one
// Both methods return Optionals.
// Once either method returns a non-present value, using the DefragmentingReader again is undefined behavior.
struct DefragmentingReader {
// Use this constructor for forward/backward range reads
DefragmentingReader ( RawCursor & cur , Arena & m , bool forward ) : cur ( cur ) , arena ( m ) , forward ( forward ) , fragmentReadLimit ( - 1 ) {
parse ( ) ;
}
// Use this constructor to read a SINGLE partial value from the current cursor position for an expected key.
// This exists to support IKeyValueStore::getPrefix().
// The reader will return exactly one KV pair if its key matches expectedKey, otherwise no KV pairs.
DefragmentingReader ( RawCursor & cur , Arena & m , KeyRef expectedKey , int maxValueLen ) : cur ( cur ) , arena ( m ) , forward ( true ) , maxValueLen ( maxValueLen ) {
fragmentReadLimit = getEncodedKVFragmentSize ( expectedKey . size ( ) , maxValueLen ) ;
parse ( ) ;
// If a key was found but it wasn't the expected key then
// clear the current kv pair and invalidate the cursor.
if ( kv . present ( ) & & kv . get ( ) . key ! = expectedKey ) {
kv = Optional < KeyValueRef > ( ) ;
cur . valid = false ;
}
}
private :
Optional < KeyValueRef > kv ; // key and latest value fragment read
uint32_t index ; // index of latest value fragment read
RawCursor & cur ; // Cursor to read from
Arena & arena ; // Arena to allocate key and value bytes in
bool forward ; // true for forward iteration, false for reverse
int maxValueLen ; // truncated value length to return
int fragmentReadLimit ; // If >= 0, only read and *attempt* to decode this many fragment bytes
// Update kv with whatever is at the current cursor position if the position is valid.
void parse ( ) {
if ( cur . valid ) {
// The read is either not partial or it is but the fragment read limit is at least 4 (the size of a minimal header).
bool partial = fragmentReadLimit > = 0 ;
ASSERT ( ! partial | | fragmentReadLimit > = 4 ) ;
// Read full or part of fragment
ValueRef encoded = ( partial ) ? cur . getEncodedRowPrefix ( arena , fragmentReadLimit ) : cur . getEncodedRow ( arena ) ;
kv = decodeKVFragment ( encoded , & index , partial ) ;
// If this was a partial fragment then if successful update the next fragment read size, and if not
// then invalidate the cursor.
if ( partial ) {
if ( kv . present ( ) )
fragmentReadLimit - = kv . get ( ) . value . size ( ) ;
else
cur . valid = false ;
}
}
else
kv = Optional < KeyValueRef > ( ) ;
}
// advance cursor, parse and return key if valid
Optional < KeyRef > advance ( ) {
if ( cur . valid ) {
forward ? cur . moveNext ( ) : cur . movePrevious ( ) ;
parse ( ) ;
}
return kv . present ( ) ? kv . get ( ) . key : Optional < KeyRef > ( ) ;
}
public :
// Get the next key that would be returned by getNext(), if there is one
// This is more efficient than getNext() if the caller is not sure if it wants the next KV pair
Optional < KeyRef > peek ( ) {
if ( kv . present ( ) )
return kv . get ( ) . key ;
return advance ( ) ;
}
Optional < KeyValueRef > getNext ( ) {
if ( ! peek ( ) . present ( ) )
return Optional < KeyValueRef > ( ) ;
bool partial = fragmentReadLimit > = 0 ;
// Start out with the next KV fragment as the pair to return
KeyValueRef resultKV = kv . get ( ) ;
// If index is 0 then this is an unfragmented key. It is unnecessary to advance the cursor so
// we won't, but we will clear kv so that the next peek/getNext will have to advance.
if ( index = = 0 )
kv = Optional < KeyValueRef > ( ) ;
else {
// First and last indexes in fragment group are size hints.
// First index is ceil(total_value_size / 4)
// Last index is ceil(total_value_size / 2)
// Set size depending on which of these will be first encountered and allocate buffer in arena.
// Note that if these index hints are wrong (such as if the index scheme changes) then asserts
// below will fail. They will have to be changed to expand the buffer as needed.
int size = forward ? ( index * KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR ) : ( index * ( KV_FRAGMENT_INDEX_SIZE_HINT_FACTOR / 2 ) ) ;
uint8_t * buf = new ( arena ) uint8_t [ size ] ;
uint8_t * bufEnd = buf + size ;
// For forward iteration wptr is the place to write to next, for reverse it's where the last write started.
uint8_t * wptr = forward ? buf : bufEnd ;
int fragments = 0 ;
do {
+ + fragments ;
const ValueRef & val = kv . get ( ) . value ;
if ( forward ) {
uint8_t * w = wptr ;
wptr + = val . size ( ) ;
ASSERT ( wptr < = bufEnd ) ;
memcpy ( w , val . begin ( ) , val . size ( ) ) ;
// If this is a partial value get and we have enough bytes we can stop since we are forward iterating.
if ( partial & & wptr - buf > = maxValueLen ) {
resultKV . value = ValueRef ( buf , maxValueLen ) ;
// To make further calls to peek() or getNext() return nothing, reset kv and invalidate cursor
kv = Optional < KeyValueRef > ( ) ;
cur . valid = false ;
return resultKV ;
}
}
else {
wptr - = val . size ( ) ;
ASSERT ( wptr > = buf ) ;
memcpy ( wptr , val . begin ( ) , val . size ( ) ) ;
}
} while ( advance ( ) . present ( ) & & kv . get ( ) . key = = resultKV . key ) ;
// If there was only 1 fragment, it should have been index 0 and handled above,
ASSERT ( fragments ! = 1 ) ;
// Set final value based on direction of buffer fill
resultKV . value = forward ? ValueRef ( buf , wptr - buf ) : ValueRef ( wptr , bufEnd - wptr ) ;
}
// In partial value mode, we could end up here if there was only 1 fragments or maxValueLen
// was greater than the total unfragmented value size.
if ( partial )
resultKV . value = resultKV . value . substr ( 0 , std : : min ( resultKV . value . size ( ) , maxValueLen ) ) ;
return resultKV ;
}
} ;
Optional < Value > get ( KeyRef key ) {
int r = moveTo ( key ) ;
if ( db . fragment_values ) {
// Optimization - moveTo seeks to fragment (key, 0) so if it was exactly found then we
// know we have a single fragment for key and can return it.
if ( r = = 0 ) {
Value result ;
( ( ValueRef & ) result ) = decodeKVFragment ( getEncodedRow ( result . arena ( ) ) ) . get ( ) . value ;
return result ;
}
// Otherwise see if the fragments immediately after (key, 0) are for the key we want.
if ( r < 0 )
moveNext ( ) ;
Arena m ;
DefragmentingReader i ( * this , m , true ) ;
if ( i . peek ( ) = = key ) {
Optional < KeyValueRef > kv = i . getNext ( ) ;
return Value ( kv . get ( ) . value , m ) ;
}
}
else if ( r = = 0 ) {
Value result ;
KeyValueRef kv = decodeKV ( getEncodedRow ( result . arena ( ) ) ) ;
( ( ValueRef & ) result ) = kv . value ;
return result ;
}
return Optional < Value > ( ) ;
}
Optional < Value > getPrefix ( KeyRef key , int maxLength ) {
if ( db . fragment_values ) {
int r = moveTo ( key ) ;
if ( r < 0 )
moveNext ( ) ;
Arena m ;
DefragmentingReader i ( * this , m , getEncodedKVFragmentSize ( key . size ( ) , maxLength ) ) ;
if ( i . peek ( ) = = key ) {
Optional < KeyValueRef > kv = i . getNext ( ) ;
return Value ( kv . get ( ) . value , m ) ;
}
}
else if ( ! moveTo ( key ) ) {
if ( maxLength = = 0 ) {
return Value ( ) ;
}
Value result ;
int maxEncodedSize = getEncodedSize ( key . size ( ) , maxLength ) ;
KeyValueRef kv = decodeKVPrefix ( getEncodedRowPrefix ( result . arena ( ) , maxEncodedSize ) , maxLength ) ;
( ( ValueRef & ) result ) = kv . value ;
return result ;
}
return Optional < Value > ( ) ;
}
Standalone < VectorRef < KeyValueRef > > getRange ( KeyRangeRef keys , int rowLimit , int byteLimit ) {
Standalone < VectorRef < KeyValueRef > > result ;
int accumulatedBytes = 0 ;
ASSERT ( byteLimit > 0 ) ;
if ( db . fragment_values ) {
if ( rowLimit > = 0 ) {
int r = moveTo ( keys . begin ) ;
if ( r < 0 )
moveNext ( ) ;
DefragmentingReader i ( * this , result . arena ( ) , true ) ;
Optional < KeyRef > nextKey = i . peek ( ) ;
while ( nextKey . present ( ) & & nextKey . get ( ) < keys . end & & rowLimit - - & & accumulatedBytes < byteLimit ) {
Optional < KeyValueRef > kv = i . getNext ( ) ;
result . push_back ( result . arena ( ) , kv . get ( ) ) ;
accumulatedBytes + = sizeof ( KeyValueRef ) + kv . get ( ) . expectedSize ( ) ;
nextKey = i . peek ( ) ;
}
}
else {
int r = moveTo ( keys . end ) ;
if ( r > = 0 )
movePrevious ( ) ;
DefragmentingReader i ( * this , result . arena ( ) , false ) ;
Optional < KeyRef > nextKey = i . peek ( ) ;
while ( nextKey . present ( ) & & nextKey . get ( ) > = keys . begin & & rowLimit + + & & accumulatedBytes < byteLimit ) {
Optional < KeyValueRef > kv = i . getNext ( ) ;
result . push_back ( result . arena ( ) , kv . get ( ) ) ;
accumulatedBytes + = sizeof ( KeyValueRef ) + kv . get ( ) . expectedSize ( ) ;
nextKey = i . peek ( ) ;
}
}
}
else {
if ( rowLimit > = 0 ) {
int r = moveTo ( keys . begin ) ;
if ( r < 0 ) moveNext ( ) ;
while ( this - > valid & & rowLimit - - & & accumulatedBytes < byteLimit ) {
KeyValueRef kv = decodeKV ( getEncodedRow ( result . arena ( ) ) ) ;
accumulatedBytes + = sizeof ( KeyValueRef ) + kv . expectedSize ( ) ;
if ( kv . key > = keys . end ) break ;
result . push_back ( result . arena ( ) , kv ) ;
moveNext ( ) ;
}
} else {
int r = moveTo ( keys . end ) ;
if ( r > = 0 ) movePrevious ( ) ;
while ( this - > valid & & rowLimit + + & & accumulatedBytes < byteLimit ) {
KeyValueRef kv = decodeKV ( getEncodedRow ( result . arena ( ) ) ) ;
accumulatedBytes + = sizeof ( KeyValueRef ) + kv . expectedSize ( ) ;
if ( kv . key < keys . begin ) break ;
result . push_back ( result . arena ( ) , kv ) ;
movePrevious ( ) ;
}
}
}
return result ;
}
int moveTo ( KeyRef key , bool ignore_fragment_mode = false ) {
UnpackedRecord r ;
r . pKeyInfo = & keyInfo ;
r . flags = UNPACKED_PREFIX_MATCH ; // This record [key] can be considered equal to a record [key,value] for any value
Mem tupleValues [ 2 ] ;
r . aMem = tupleValues ;
// Set field 1 of tuple to key, which is a string type with typecode 12 + 2*len
tupleValues [ 0 ] . db = keyInfo . db ;
tupleValues [ 0 ] . enc = keyInfo . enc ;
tupleValues [ 0 ] . zMalloc = NULL ;
ASSERT ( sqlite3VdbeSerialGet ( key . begin ( ) , 12 + ( 2 * key . size ( ) ) , & tupleValues [ 0 ] ) = = key . size ( ) ) ;
// In fragmenting mode, seek is to (k, 0, ), otherwise just (k, ).
if ( ignore_fragment_mode | | ! db . fragment_values )
r . nField = 1 ;
else {
// Set field 2 of tuple to the null type which is typecode 0
tupleValues [ 1 ] . db = keyInfo . db ;
tupleValues [ 1 ] . enc = keyInfo . enc ;
tupleValues [ 1 ] . zMalloc = NULL ;
ASSERT ( sqlite3VdbeSerialGet ( NULL , 0 , & tupleValues [ 1 ] ) = = 0 ) ;
r . nField = 2 ;
}
int result ;
db . checkError ( " BtreeMovetoUnpacked " , sqlite3BtreeMovetoUnpacked ( cursor , & r , 0 , 0 , & result ) ) ;
valid = result > = 0 | | ! sqlite3BtreeEof ( cursor ) ;
return result ;
}
} ;
struct Cursor : SQLiteTransaction , RawCursor {
Cursor ( SQLiteDB & db , bool write ) : SQLiteTransaction ( db , write ) , RawCursor ( db , db . table , write ) {
}
} ;
struct ReadCursor : ReferenceCounted < ReadCursor > , FastAllocated < ReadCursor > {
// Readers need to be reset (forced to move to a new snapshot) when the writer thread does a checkpoint.
// ReadCursor is reference counted so that the writer can clear the persistent reference (readCursors[n]) and
// readers can hold an additional reference when they actually have a read happening.
// ReadCursor lazily constructs its actual Cursor (and hence transaction) because it's vital that readCursors[n] be
// assigned before the transaction is opened.
ReadCursor ( ) : valid ( false ) { }
void init ( SQLiteDB & db ) { new ( & cursor ) Cursor ( db , false ) ; valid = true ; }
~ ReadCursor ( ) { if ( valid ) get ( ) . ~ Cursor ( ) ; }
Cursor & get ( ) { return * ( ( Cursor * ) & cursor ) ; }
private :
std : : aligned_storage < sizeof ( Cursor ) , __alignof ( Cursor ) > : : type cursor ;
bool valid ;
} ;
extern bool vfsAsyncIsOpen ( std : : string filename ) ;
// Returns number of pages which failed checksum.
int SQLiteDB : : checkAllPageChecksums ( ) {
ASSERT ( ! haveMutex ) ;
ASSERT ( page_checksums ) ; // This should never be called on SQLite databases that do not have page checksums.
double startT = timer ( ) ;
// First try to open an existing file
std : : string apath = abspath ( filename ) ;
std : : string walpath = apath + " -wal " ;
/* REMOVE THIS BEFORE CHECKIN */ if ( ! fileExists ( apath ) ) return 0 ;
TraceEvent ( " SQLitePageChecksumScanBegin " ) . detail ( " File " , apath ) ;
ErrorOr < Reference < IAsyncFile > > dbFile = waitForAndGet ( errorOr ( IAsyncFileSystem : : filesystem ( ) - > open ( apath , IAsyncFile : : OPEN_READONLY | IAsyncFile : : OPEN_LOCK , 0 ) ) ) ;
ErrorOr < Reference < IAsyncFile > > walFile = waitForAndGet ( errorOr ( IAsyncFileSystem : : filesystem ( ) - > open ( walpath , IAsyncFile : : OPEN_READONLY | IAsyncFile : : OPEN_LOCK , 0 ) ) ) ;
if ( dbFile . isError ( ) ) throw dbFile . getError ( ) ; // If we've failed to open the file, throw an exception
if ( walFile . isError ( ) ) throw walFile . getError ( ) ; // If we've failed to open the file, throw an exception
// Now that the file itself is open and locked, let sqlite open the database
// Note that VFSAsync will also call g_network->open (including for the WAL), so its flags are important, too
// TODO: If better performance is needed, make AsyncFileReadAheadCache work and be enabled by SQLITE_OPEN_READAHEAD which was added for that purpose.
int result = sqlite3_open_v2 ( apath . c_str ( ) , & db , SQLITE_OPEN_READONLY , NULL ) ;
checkError ( " open " , result ) ;
// This check has the useful side effect of actually opening/reading the database. If we were not doing this,
// then we could instead open a read cursor for the same effect, as currently tryReadEveryDbPage() requires it.
Statement * jm = new Statement ( * this , " PRAGMA journal_mode " ) ;
ASSERT ( jm - > nextRow ( ) ) ;
if ( jm - > column ( 0 ) ! = LiteralStringRef ( " wal " ) ) {
2019-03-19 06:03:43 +08:00
TraceEvent ( SevError , " JournalModeError " ) . detail ( " Filename " , filename ) . detail ( " Mode " , jm - > column ( 0 ) ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( false ) ;
}
delete jm ;
btree = db - > aDb [ 0 ] . pBt ;
initPagerCodec ( ) ;
sqlite3_extended_result_codes ( db , 1 ) ;
sqlite3_mutex_enter ( db - > mutex ) ;
haveMutex = true ;
pPagerCodec - > silent = true ;
Pgno p = 1 ;
int readErrors = 0 ;
int corruptPages = 0 ;
int totalErrors = 0 ;
while ( 1 ) {
int type ;
int zero ;
int rc = tryReadEveryDbPage ( db , p , & p , & type , & zero ) ;
if ( rc = = SQLITE_OK )
break ;
if ( rc = = SQLITE_CORRUPT ) {
TraceEvent ( SevWarnAlways , " SQLitePageChecksumScanCorruptPage " )
. detail ( " File " , filename )
. detail ( " PageNumber " , p )
. detail ( " PageType " , type )
. detail ( " PageWasZeroed " , zero ) ;
+ + corruptPages ;
} else {
TraceEvent ( SevWarnAlways , " SQLitePageChecksumScanReadFailed " )
. detail ( " File " , filename )
. detail ( " PageNumber " , p )
. detail ( " SQLiteError " , sqlite3ErrStr ( rc ) )
. detail ( " SQLiteErrorCode " , rc ) ;
+ + readErrors ;
}
+ + p ;
if ( + + totalErrors > = SERVER_KNOBS - > SQLITE_PAGE_SCAN_ERROR_LIMIT )
break ;
}
pPagerCodec - > silent = false ;
haveMutex = false ;
sqlite3_mutex_leave ( db - > mutex ) ;
sqlite3_close ( db ) ;
TraceEvent ( " SQLitePageChecksumScanEnd " )
. detail ( " Elapsed " , DEBUG_DETERMINISM ? 0 : timer ( ) - startT )
2018-06-09 02:11:08 +08:00
. detail ( " Filename " , filename )
2017-05-26 04:48:44 +08:00
. detail ( " CorruptPages " , corruptPages )
. detail ( " ReadErrors " , readErrors )
. detail ( " TotalErrors " , totalErrors ) ;
ASSERT ( ! vfsAsyncIsOpen ( filename ) ) ;
return totalErrors ;
}
void SQLiteDB : : open ( bool writable ) {
ASSERT ( ! haveMutex ) ;
double startT = timer ( ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("KVThreadInitStage").detail("Stage",1).detail("Filename", filename).detail("Writable", writable);
2017-05-26 04:48:44 +08:00
// First try to open an existing file
std : : string apath = abspath ( filename ) ;
std : : string walpath = apath + " -wal " ;
ErrorOr < Reference < IAsyncFile > > dbFile = waitForAndGet ( errorOr ( IAsyncFileSystem : : filesystem ( ) - > open ( apath , IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_LOCK , 0 ) ) ) ;
ErrorOr < Reference < IAsyncFile > > walFile = waitForAndGet ( errorOr ( IAsyncFileSystem : : filesystem ( ) - > open ( walpath , IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_LOCK , 0 ) ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("KVThreadInitStage").detail("Stage",15).detail("Filename", apath).detail("Writable", writable).detail("IsErr", dbFile.isError());
2017-05-26 04:48:44 +08:00
if ( writable ) {
if ( dbFile . isError ( ) & & dbFile . getError ( ) . code ( ) = = error_code_file_not_found & & ! fileExists ( apath ) & & // db file is missing
! walFile . isError ( ) & & fileExists ( walpath ) ) // ..but WAL file is present
{
// Either we died partway through creating this DB, or died partway through deleting it, or someone is monkeying with our files
// Create a new blank DB by backing up the WAL file (just in case it is important) and then hitting the next case
walFile = file_not_found ( ) ;
2019-05-11 05:01:52 +08:00
renameFile ( walpath , walpath + " -old- " + deterministicRandom ( ) - > randomUniqueID ( ) . toString ( ) ) ;
2017-05-26 04:48:44 +08:00
ASSERT_WE_THINK ( false ) ; //< This code should not be hit in FoundationDB at the moment, because worker looks for databases to open by listing .fdb files, not .fdb-wal files
//TEST(true); // Replace a partially constructed or destructed DB
}
if ( dbFile . isError ( ) & & walFile . isError ( ) & & writable & &
dbFile . getError ( ) . code ( ) = = error_code_file_not_found & &
walFile . getError ( ) . code ( ) = = error_code_file_not_found & &
! fileExists ( apath ) & & ! fileExists ( walpath ) )
{
// The file doesn't exist, try to create a new one
// Creating the WAL before the database ensures we will not try to open a database with no WAL
walFile = waitForAndGet ( IAsyncFileSystem : : filesystem ( ) - > open ( walpath , IAsyncFile : : OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile : : OPEN_CREATE | IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_LOCK , 0600 ) ) ;
waitFor ( walFile . get ( ) - > sync ( ) ) ;
dbFile = waitForAndGet ( IAsyncFileSystem : : filesystem ( ) - > open ( apath , IAsyncFile : : OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile : : OPEN_CREATE | IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_LOCK , 0600 ) ) ;
if ( page_checksums )
waitFor ( dbFile . get ( ) - > write ( template_fdb_with_page_checksums , sizeof ( template_fdb_with_page_checksums ) , 0 ) ) ;
else
waitFor ( dbFile . get ( ) - > write ( template_fdb_without_page_checksums , sizeof ( template_fdb_without_page_checksums ) , 0 ) ) ;
waitFor ( dbFile . get ( ) - > sync ( ) ) ; // renames filename.part to filename, fsyncs data and directory
TraceEvent ( " CreatedDBFile " ) . detail ( " Filename " , apath ) ;
}
}
if ( dbFile . isError ( ) ) throw dbFile . getError ( ) ; // If we've failed to open the file, throw an exception
if ( walFile . isError ( ) ) throw walFile . getError ( ) ; // If we've failed to open the file, throw an exception
2018-06-09 02:11:08 +08:00
//TraceEvent("KVThreadInitStage").detail("Stage",2).detail("Filename", filename).detail("Writable", writable);
2017-05-26 04:48:44 +08:00
// Now that the file itself is open and locked, let sqlite open the database
// Note that VFSAsync will also call g_network->open (including for the WAL), so its flags are important, too
int result = sqlite3_open_v2 ( apath . c_str ( ) , & db , ( writable ? SQLITE_OPEN_READWRITE : SQLITE_OPEN_READONLY ) , NULL ) ;
checkError ( " open " , result ) ;
2019-04-27 04:53:56 +08:00
int chunkSize ;
if ( ! g_network - > isSimulated ( ) ) {
chunkSize = 4096 * SERVER_KNOBS - > SQLITE_CHUNK_SIZE_PAGES ;
} else if ( BUGGIFY ) {
2019-05-11 05:01:52 +08:00
chunkSize = 4096 * deterministicRandom ( ) - > randomInt ( 0 , 100 ) ;
2019-04-27 04:53:56 +08:00
} else {
chunkSize = 4096 * SERVER_KNOBS - > SQLITE_CHUNK_SIZE_PAGES_SIM ;
}
2019-04-23 19:50:58 +08:00
checkError ( " setChunkSize " , sqlite3_file_control ( db , nullptr , SQLITE_FCNTL_CHUNK_SIZE , & chunkSize ) ) ;
2017-05-26 04:48:44 +08:00
btree = db - > aDb [ 0 ] . pBt ;
initPagerCodec ( ) ;
sqlite3_extended_result_codes ( db , 1 ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("KVThreadInitStage").detail("Stage",3).detail("Filename", filename).detail("Writable", writable);
2017-05-26 04:48:44 +08:00
//Statement(*this, "PRAGMA cache_size = 100").execute();
Statement jm ( * this , " PRAGMA journal_mode " ) ;
ASSERT ( jm . nextRow ( ) ) ;
if ( jm . column ( 0 ) ! = LiteralStringRef ( " wal " ) ) {
2019-03-19 06:03:43 +08:00
TraceEvent ( SevError , " JournalModeError " ) . detail ( " Filename " , filename ) . detail ( " Mode " , jm . column ( 0 ) ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( false ) ;
}
if ( writable ) {
Statement ( * this , " PRAGMA synchronous = NORMAL " ) . execute ( ) ; // OFF, NORMAL, FULL
Statement ( * this , " PRAGMA wal_autocheckpoint = -1 " ) . nextRow ( ) ;
}
2018-06-09 02:11:08 +08:00
//TraceEvent("KVThreadInitStage").detail("Stage",4).detail("Filename", filename).detail("Writable", writable);
2017-05-26 04:48:44 +08:00
sqlite3_mutex_enter ( db - > mutex ) ;
haveMutex = true ;
table = 3 ;
freetable = 4 ;
this - > dbFile = dbFile . get ( ) ;
this - > walFile = walFile . get ( ) ;
2018-06-09 02:11:08 +08:00
TraceEvent ( " KVThreadInitTime " ) . detail ( " Elapsed " , DEBUG_DETERMINISM ? 0 : timer ( ) - startT ) . detail ( " Filename " , filename ) . detail ( " Writable " , writable ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( vfsAsyncIsOpen ( filename ) ) ;
}
void SQLiteDB : : createFromScratch ( ) {
int sqliteFlags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE ;
checkError ( " open " , sqlite3_open_v2 ( filename . c_str ( ) , & db , sqliteFlags , NULL ) ) ;
Statement ( * this , " PRAGMA page_size = 4096 " ) . nextRow ( ) ; //fast
btree = db - > aDb [ 0 ] . pBt ;
initPagerCodec ( ) ;
Statement ( * this , " PRAGMA auto_vacuum = 2 " ) . nextRow ( ) ; //slow all the time
Statement ( * this , " PRAGMA journal_mode = WAL " ) . nextRow ( ) ; // sometimes slow
sqlite3_extended_result_codes ( db , 1 ) ;
sqlite3_mutex_enter ( db - > mutex ) ;
haveMutex = true ;
beginTransaction ( true ) ;
u32 pgnoRoot = - 1 ;
sqlite3BtreeGetMeta ( btree , BTREE_LARGEST_ROOT_PAGE , & pgnoRoot ) ;
// We expect our tables are #3, #4 (since autovacuum is enabled, there is a pointer map page at #2)
if ( pgnoRoot = = 4 ) {
table = pgnoRoot - 1 ;
freetable = pgnoRoot ;
rollback ( ) ;
} else if ( pgnoRoot = = 1 ) {
// The database is empty; create tables
checkError ( " BtreeCreateTable " , sqlite3BtreeCreateTable ( btree , & table , BTREE_BLOBKEY ) ) ;
ASSERT ( table = = 3 ) ;
checkError ( " BtreeCreateTable2 " , sqlite3BtreeCreateTable ( btree , & freetable , BTREE_INTKEY ) ) ;
ASSERT ( freetable = = table + 1 ) ;
endTransaction ( ) ;
} else {
2018-06-09 04:57:00 +08:00
TraceEvent ( " PgnoRoot " ) . detail ( " Value " , pgnoRoot ) ;
2017-05-26 04:48:44 +08:00
checkError ( " CheckTables " , SQLITE_CORRUPT ) ;
}
}
struct ThreadSafeCounter {
volatile int64_t counter ;
ThreadSafeCounter ( ) : counter ( 0 ) { }
void operator + + ( ) { interlockedIncrement64 ( & counter ) ; }
void operator - - ( ) { interlockedDecrement64 ( & counter ) ; }
operator const int64_t ( ) const { return counter ; }
} ;
class KeyValueStoreSQLite : public IKeyValueStore {
public :
virtual void dispose ( ) {
doClose ( this , true ) ;
}
virtual void close ( ) {
doClose ( this , false ) ;
}
virtual Future < Void > getError ( ) { return delayed ( readThreads - > getError ( ) | | writeThread - > getError ( ) ) ; }
virtual Future < Void > onClosed ( ) { return stopped . getFuture ( ) ; }
virtual KeyValueStoreType getType ( ) { return type ; }
virtual StorageBytes getStorageBytes ( ) ;
virtual void set ( KeyValueRef keyValue , const Arena * arena = NULL ) ;
virtual void clear ( KeyRangeRef range , const Arena * arena = NULL ) ;
virtual Future < Void > commit ( bool sequential = false ) ;
virtual Future < Optional < Value > > readValue ( KeyRef key , Optional < UID > debugID ) ;
virtual Future < Optional < Value > > readValuePrefix ( KeyRef key , int maxLength , Optional < UID > debugID ) ;
virtual Future < Standalone < VectorRef < KeyValueRef > > > readRange ( KeyRangeRef keys , int rowLimit = 1 < < 30 , int byteLimit = 1 < < 30 ) ;
2017-05-27 08:43:28 +08:00
KeyValueStoreSQLite ( std : : string const & filename , UID logID , KeyValueStoreType type , bool checkChecksums , bool checkIntegrity ) ;
2017-05-26 04:48:44 +08:00
~ KeyValueStoreSQLite ( ) ;
2019-04-24 05:01:51 +08:00
struct SpringCleaningWorkPerformed {
2019-04-24 05:16:52 +08:00
int lazyDeletePages = 0 ;
int vacuumedPages = 0 ;
2019-04-24 05:01:51 +08:00
} ;
Future < SpringCleaningWorkPerformed > doClean ( ) ;
2017-05-26 04:48:44 +08:00
void startReadThreads ( ) ;
private :
KeyValueStoreType type ;
UID logID ;
std : : string filename ;
Reference < IThreadPool > readThreads , writeThread ;
Promise < Void > stopped ;
Future < Void > cleaning , logging , starting , stopOnErr ;
int64_t readsRequested , writesRequested ;
ThreadSafeCounter readsComplete ;
volatile int64_t writesComplete ;
volatile SpringCleaningStats springCleaningStats ;
volatile int64_t diskBytesUsed ;
volatile int64_t freeListPages ;
vector < Reference < ReadCursor > > readCursors ;
struct Reader : IThreadPoolReceiver {
SQLiteDB conn ;
ThreadSafeCounter & counter ;
UID dbgid ;
Reference < ReadCursor > * ppReadCursor ;
explicit Reader ( std : : string const & filename , bool is_btree_v2 , ThreadSafeCounter & counter , UID dbgid , Reference < ReadCursor > * ppReadCursor )
: conn ( filename , is_btree_v2 , is_btree_v2 ) , counter ( counter ) , dbgid ( dbgid ) , ppReadCursor ( ppReadCursor )
{
}
~ Reader ( ) {
ppReadCursor - > clear ( ) ;
}
virtual void init ( ) {
conn . open ( false ) ;
}
Reference < ReadCursor > getCursor ( ) {
Reference < ReadCursor > cursor = * ppReadCursor ;
if ( ! cursor ) {
* ppReadCursor = cursor = Reference < ReadCursor > ( new ReadCursor ) ;
cursor - > init ( conn ) ;
}
return cursor ;
}
struct ReadValueAction : TypedAction < Reader , ReadValueAction > , FastAllocated < ReadValueAction > {
Key key ;
Optional < UID > debugID ;
ThreadReturnPromise < Optional < Value > > result ;
ReadValueAction ( Key key , Optional < UID > debugID ) : key ( key ) , debugID ( debugID ) { } ;
virtual double getTimeEstimate ( ) { return SERVER_KNOBS - > READ_VALUE_TIME_ESTIMATE ; }
} ;
void action ( ReadValueAction & rv ) {
//double t = timer();
if ( rv . debugID . present ( ) ) g_traceBatch . addEvent ( " GetValueDebug " , rv . debugID . get ( ) . first ( ) , " Reader.Before " ) ; //.detail("TaskID", g_network->getCurrentTask());
rv . result . send ( getCursor ( ) - > get ( ) . get ( rv . key ) ) ;
+ + counter ;
if ( rv . debugID . present ( ) ) g_traceBatch . addEvent ( " GetValueDebug " , rv . debugID . get ( ) . first ( ) , " Reader.After " ) ; //.detail("TaskID", g_network->getCurrentTask());
//t = timer()-t;
//if (t >= 1.0) TraceEvent("ReadValueActionSlow",dbgid).detail("Elapsed", t);
}
struct ReadValuePrefixAction : TypedAction < Reader , ReadValuePrefixAction > , FastAllocated < ReadValuePrefixAction > {
Key key ;
int maxLength ;
Optional < UID > debugID ;
ThreadReturnPromise < Optional < Value > > result ;
ReadValuePrefixAction ( Key key , int maxLength , Optional < UID > debugID ) : key ( key ) , maxLength ( maxLength ) , debugID ( debugID ) { } ;
virtual double getTimeEstimate ( ) { return SERVER_KNOBS - > READ_VALUE_TIME_ESTIMATE ; }
} ;
void action ( ReadValuePrefixAction & rv ) {
//double t = timer();
if ( rv . debugID . present ( ) ) g_traceBatch . addEvent ( " GetValuePrefixDebug " , rv . debugID . get ( ) . first ( ) , " Reader.Before " ) ; //.detail("TaskID", g_network->getCurrentTask());
rv . result . send ( getCursor ( ) - > get ( ) . getPrefix ( rv . key , rv . maxLength ) ) ;
+ + counter ;
if ( rv . debugID . present ( ) ) g_traceBatch . addEvent ( " GetValuePrefixDebug " , rv . debugID . get ( ) . first ( ) , " Reader.After " ) ; //.detail("TaskID", g_network->getCurrentTask());
//t = timer()-t;
//if (t >= 1.0) TraceEvent("ReadValuePrefixActionSlow",dbgid).detail("Elapsed", t);
}
struct ReadRangeAction : TypedAction < Reader , ReadRangeAction > , FastAllocated < ReadRangeAction > {
KeyRange keys ;
int rowLimit , byteLimit ;
ThreadReturnPromise < Standalone < VectorRef < KeyValueRef > > > result ;
ReadRangeAction ( KeyRange keys , int rowLimit , int byteLimit ) : keys ( keys ) , rowLimit ( rowLimit ) , byteLimit ( byteLimit ) { }
virtual double getTimeEstimate ( ) { return SERVER_KNOBS - > READ_RANGE_TIME_ESTIMATE ; }
} ;
void action ( ReadRangeAction & rr ) {
rr . result . send ( getCursor ( ) - > get ( ) . getRange ( rr . keys , rr . rowLimit , rr . byteLimit ) ) ;
+ + counter ;
}
} ;
struct Writer : IThreadPoolReceiver {
SQLiteDB conn ;
Cursor * cursor ;
int commits ;
int setsThisCommit ;
bool freeTableEmpty ; // true if we are sure the freetable (pages pending lazy deletion) is empty
volatile int64_t & writesComplete ;
volatile SpringCleaningStats & springCleaningStats ;
volatile int64_t & diskBytesUsed ;
volatile int64_t & freeListPages ;
UID dbgid ;
vector < Reference < ReadCursor > > & readThreads ;
bool checkAllChecksumsOnOpen ;
2017-05-27 08:43:28 +08:00
bool checkIntegrityOnOpen ;
2017-05-26 04:48:44 +08:00
2017-05-27 08:43:28 +08:00
explicit Writer ( std : : string const & filename , bool isBtreeV2 , bool checkAllChecksumsOnOpen , bool checkIntegrityOnOpen , volatile int64_t & writesComplete , volatile SpringCleaningStats & springCleaningStats , volatile int64_t & diskBytesUsed , volatile int64_t & freeListPages , UID dbgid , vector < Reference < ReadCursor > > * pReadThreads )
2017-05-26 04:48:44 +08:00
: conn ( filename , isBtreeV2 , isBtreeV2 ) ,
commits ( ) , setsThisCommit ( ) ,
freeTableEmpty ( false ) ,
writesComplete ( writesComplete ) ,
springCleaningStats ( springCleaningStats ) ,
diskBytesUsed ( diskBytesUsed ) ,
freeListPages ( freeListPages ) ,
cursor ( NULL ) ,
dbgid ( dbgid ) ,
readThreads ( * pReadThreads ) ,
2017-05-27 08:43:28 +08:00
checkAllChecksumsOnOpen ( checkAllChecksumsOnOpen ) ,
checkIntegrityOnOpen ( checkIntegrityOnOpen )
2017-05-26 04:48:44 +08:00
{
}
~ Writer ( ) {
TraceEvent ( " KVWriterDestroying " , dbgid ) ;
delete cursor ;
TraceEvent ( " KVWriterDestroyed " , dbgid ) ;
}
virtual void init ( ) {
2017-05-27 08:43:28 +08:00
if ( checkAllChecksumsOnOpen ) {
if ( conn . checkAllPageChecksums ( ) ! = 0 ) {
// It's not strictly necessary to discard the file immediately if a page checksum error is found
// because most of the file could be valid and bad pages will be detected if they are read.
// However, we shouldn't use the file unless we absolutely have to because some range(s) of keys
// have effectively lost a replica.
throw file_corrupt ( ) ;
}
}
2017-05-26 04:48:44 +08:00
conn . open ( true ) ;
//If a wal file fails during the commit process before finishing a checkpoint, then it is possible that our wal file will be non-empty
//when we reload it. We execute a checkpoint here to remedy that situation. This call must come before before creating a cursor because
//it will fail if there are any outstanding transactions.
fullCheckpoint ( ) ;
cursor = new Cursor ( conn , true ) ;
2017-05-27 08:43:28 +08:00
if ( checkIntegrityOnOpen | | EXPENSIVE_VALIDATION ) {
if ( conn . check ( false ) ! = 0 ) {
// A corrupt btree structure must not be used.
2017-09-26 08:22:41 +08:00
if ( g_network - > isSimulated ( ) & & ( g_simulator . getCurrentProcess ( ) - > fault_injection_p1 | | g_simulator . getCurrentProcess ( ) - > machine - > machineProcess - > fault_injection_p1 | | g_simulator . getCurrentProcess ( ) - > rebooting ) ) {
throw file_corrupt ( ) . asInjectedFault ( ) ;
} else {
throw file_corrupt ( ) ;
}
2017-05-27 08:43:28 +08:00
}
}
2017-05-26 04:48:44 +08:00
}
struct InitAction : TypedAction < Writer , InitAction > , FastAllocated < InitAction > {
ThreadReturnPromise < Void > result ;
virtual double getTimeEstimate ( ) { return 0 ; }
} ;
void action ( InitAction & a ) {
// init() has already been called
a . result . send ( Void ( ) ) ;
}
struct SetAction : TypedAction < Writer , SetAction > , FastAllocated < SetAction > {
KeyValue kv ;
SetAction ( KeyValue kv ) : kv ( kv ) { }
virtual double getTimeEstimate ( ) { return SERVER_KNOBS - > SET_TIME_ESTIMATE ; }
} ;
void action ( SetAction & a ) {
double s = now ( ) ;
checkFreePages ( ) ;
cursor - > set ( a . kv ) ;
+ + setsThisCommit ;
+ + writesComplete ;
if ( g_network - > isSimulated ( ) & & g_simulator . getCurrentProcess ( ) - > rebooting )
TraceEvent ( " SetActionFinished " , dbgid ) . detail ( " Elapsed " , now ( ) - s ) ;
}
struct ClearAction : TypedAction < Writer , ClearAction > , FastAllocated < ClearAction > {
KeyRange range ;
ClearAction ( KeyRange range ) : range ( range ) { }
virtual double getTimeEstimate ( ) { return SERVER_KNOBS - > CLEAR_TIME_ESTIMATE ; }
} ;
void action ( ClearAction & a ) {
double s = now ( ) ;
cursor - > fastClear ( a . range , freeTableEmpty ) ;
cursor - > clear ( a . range ) ; // TODO: at most one
+ + writesComplete ;
if ( g_network - > isSimulated ( ) & & g_simulator . getCurrentProcess ( ) - > rebooting )
TraceEvent ( " ClearActionFinished " , dbgid ) . detail ( " Elapsed " , now ( ) - s ) ;
}
struct CommitAction : TypedAction < Writer , CommitAction > , FastAllocated < CommitAction > {
double issuedTime ;
ThreadReturnPromise < Void > result ;
CommitAction ( ) : issuedTime ( now ( ) ) { }
virtual double getTimeEstimate ( ) { return SERVER_KNOBS - > COMMIT_TIME_ESTIMATE ; }
} ;
void action ( CommitAction & a ) {
double t1 = now ( ) ;
cursor - > commit ( ) ;
delete cursor ;
cursor = NULL ;
double t2 = now ( ) ;
fullCheckpoint ( ) ;
double t3 = now ( ) ;
+ + commits ;
//if ( !(commits % 100) )
//printf("dbf=%lld bytes, wal=%lld bytes\n", getFileSize((kv->filename+".fdb").c_str()), getFileSize((kv->filename+".fdb-wal").c_str()));
a . result . send ( Void ( ) ) ;
cursor = new Cursor ( conn , true ) ;
checkFreePages ( ) ;
+ + writesComplete ;
2019-05-11 05:01:52 +08:00
if ( t3 - a . issuedTime > 10.0 * deterministicRandom ( ) - > random01 ( ) )
2018-06-09 02:11:08 +08:00
TraceEvent ( " KVCommit10sSample " , dbgid ) . detail ( " Queued " , t1 - a . issuedTime ) . detail ( " Commit " , t2 - t1 ) . detail ( " Checkpoint " , t3 - t2 ) ;
2017-05-26 04:48:44 +08:00
diskBytesUsed = waitForAndGet ( conn . dbFile - > size ( ) ) + waitForAndGet ( conn . walFile - > size ( ) ) ;
if ( g_network - > isSimulated ( ) & & g_simulator . getCurrentProcess ( ) - > rebooting )
TraceEvent ( " CommitActionFinished " , dbgid ) . detail ( " Elapsed " , now ( ) - t1 ) ;
}
//Checkpoints the database and resets the wal file back to the beginning
void fullCheckpoint ( ) {
//A checkpoint cannot succeed while there is an outstanding transaction
ASSERT ( cursor = = NULL ) ;
resetReaders ( ) ;
conn . checkpoint ( false ) ;
resetReaders ( ) ;
conn . checkpoint ( true ) ;
}
void resetReaders ( ) {
for ( int i = 0 ; i < readThreads . size ( ) ; i + + )
readThreads [ i ] . clear ( ) ;
}
void checkFreePages ( ) {
int iterations = 0 ;
int64_t freeListSize = freeListPages ;
while ( ! freeTableEmpty & & freeListSize < SERVER_KNOBS - > CHECK_FREE_PAGE_AMOUNT ) {
int deletedPages = cursor - > lazyDelete ( SERVER_KNOBS - > CHECK_FREE_PAGE_AMOUNT ) ;
freeTableEmpty = ( deletedPages ! = SERVER_KNOBS - > CHECK_FREE_PAGE_AMOUNT ) ;
springCleaningStats . lazyDeletePages + = deletedPages ;
+ + iterations ;
freeListSize = conn . freePages ( ) ;
}
freeListPages = freeListSize ;
//if (iterations) printf("Lazy free: %d pages on freelist, %d iterations, freeTableEmpty=%d\n", freeListPages, iterationsi, freeTableEmpty);
}
struct SpringCleaningAction : TypedAction < Writer , SpringCleaningAction > , FastAllocated < SpringCleaningAction > {
2019-04-24 05:01:51 +08:00
ThreadReturnPromise < SpringCleaningWorkPerformed > result ;
virtual double getTimeEstimate ( ) {
return std : : max ( SERVER_KNOBS - > SPRING_CLEANING_LAZY_DELETE_TIME_ESTIMATE , SERVER_KNOBS - > SPRING_CLEANING_VACUUM_TIME_ESTIMATE ) ;
}
2017-05-26 04:48:44 +08:00
} ;
void action ( SpringCleaningAction & a ) {
double s = now ( ) ;
2019-04-24 05:01:51 +08:00
double lazyDeleteEnd = now ( ) + SERVER_KNOBS - > SPRING_CLEANING_LAZY_DELETE_TIME_ESTIMATE ;
double vacuumEnd = now ( ) + SERVER_KNOBS - > SPRING_CLEANING_VACUUM_TIME_ESTIMATE ;
2017-05-26 04:48:44 +08:00
2019-04-24 05:01:51 +08:00
SpringCleaningWorkPerformed workPerformed ;
2017-05-26 04:48:44 +08:00
double lazyDeleteTime = 0 ;
double vacuumTime = 0 ;
const double lazyDeleteBatchProbability = 1.0 / ( 1 + SERVER_KNOBS - > SPRING_CLEANING_VACUUMS_PER_LAZY_DELETE_PAGE * std : : max ( 1 , SERVER_KNOBS - > SPRING_CLEANING_LAZY_DELETE_BATCH_SIZE ) ) ;
bool vacuumFinished = false ;
loop {
double begin = now ( ) ;
2019-04-24 05:01:51 +08:00
bool canDelete = ! freeTableEmpty
& & ( now ( ) < lazyDeleteEnd | | workPerformed . lazyDeletePages < SERVER_KNOBS - > SPRING_CLEANING_MIN_LAZY_DELETE_PAGES )
& & workPerformed . lazyDeletePages < SERVER_KNOBS - > SPRING_CLEANING_MAX_LAZY_DELETE_PAGES ;
bool canVacuum = ! vacuumFinished
& & ( now ( ) < vacuumEnd | | workPerformed . vacuumedPages < SERVER_KNOBS - > SPRING_CLEANING_MIN_VACUUM_PAGES )
& & workPerformed . vacuumedPages < SERVER_KNOBS - > SPRING_CLEANING_MAX_VACUUM_PAGES ;
2017-05-26 04:48:44 +08:00
if ( ! canDelete & & ! canVacuum ) {
break ;
}
2019-05-11 05:01:52 +08:00
if ( canDelete & & ( ! canVacuum | | deterministicRandom ( ) - > random01 ( ) < lazyDeleteBatchProbability ) ) {
2017-05-26 04:48:44 +08:00
TEST ( canVacuum ) ; // SQLite lazy deletion when vacuuming is active
TEST ( ! canVacuum ) ; // SQLite lazy deletion when vacuuming is inactive
2019-04-24 05:01:51 +08:00
int pagesToDelete = std : : max ( 1 , std : : min ( SERVER_KNOBS - > SPRING_CLEANING_LAZY_DELETE_BATCH_SIZE , SERVER_KNOBS - > SPRING_CLEANING_MAX_LAZY_DELETE_PAGES - workPerformed . lazyDeletePages ) ) ;
2017-05-26 04:48:44 +08:00
int pagesDeleted = cursor - > lazyDelete ( pagesToDelete ) ;
freeTableEmpty = ( pagesDeleted ! = pagesToDelete ) ;
2019-04-24 05:01:51 +08:00
workPerformed . lazyDeletePages + = pagesDeleted ;
2017-05-26 04:48:44 +08:00
lazyDeleteTime + = now ( ) - begin ;
}
else {
ASSERT ( canVacuum ) ;
TEST ( canDelete ) ; // SQLite vacuuming when lazy delete is active
TEST ( ! canDelete ) ; // SQLite vacuuming when lazy delete is inactive
TEST ( SERVER_KNOBS - > SPRING_CLEANING_VACUUMS_PER_LAZY_DELETE_PAGE ! = 0 ) ; //SQLite vacuuming with nonzero vacuums_per_lazy_delete_page
vacuumFinished = conn . vacuum ( ) ;
if ( ! vacuumFinished ) {
2019-04-24 05:01:51 +08:00
+ + workPerformed . vacuumedPages ;
2017-05-26 04:48:44 +08:00
}
vacuumTime + = now ( ) - begin ;
}
CoroThreadPool : : waitFor ( yield ( ) ) ;
}
freeListPages = conn . freePages ( ) ;
2019-04-24 05:01:51 +08:00
TEST ( workPerformed . lazyDeletePages > 0 ) ; // Pages lazily deleted
TEST ( workPerformed . vacuumedPages > 0 ) ; // Pages vacuumed
2017-05-26 04:48:44 +08:00
TEST ( vacuumTime > 0 ) ; // Time spent vacuuming
TEST ( lazyDeleteTime > 0 ) ; // Time spent lazy deleting
+ + springCleaningStats . springCleaningCount ;
2019-04-24 05:01:51 +08:00
springCleaningStats . lazyDeletePages + = workPerformed . lazyDeletePages ;
springCleaningStats . vacuumedPages + = workPerformed . vacuumedPages ;
2017-05-26 04:48:44 +08:00
springCleaningStats . springCleaningTime + = now ( ) - s ;
springCleaningStats . vacuumTime + = vacuumTime ;
springCleaningStats . lazyDeleteTime + = lazyDeleteTime ;
2019-04-24 05:01:51 +08:00
a . result . send ( workPerformed ) ;
2017-05-26 04:48:44 +08:00
+ + writesComplete ;
if ( g_network - > isSimulated ( ) & & g_simulator . getCurrentProcess ( ) - > rebooting )
TraceEvent ( " SpringCleaningActionFinished " , dbgid ) . detail ( " Elapsed " , now ( ) - s ) ;
}
} ;
ACTOR static Future < Void > logPeriodically ( KeyValueStoreSQLite * self ) {
state int64_t lastReadsComplete = 0 ;
state int64_t lastWritesComplete = 0 ;
loop {
2018-08-11 04:57:10 +08:00
wait ( delay ( SERVER_KNOBS - > DISK_METRIC_LOGGING_INTERVAL ) ) ;
2017-05-26 04:48:44 +08:00
int64_t rc = self - > readsComplete , wc = self - > writesComplete ;
TraceEvent ( " DiskMetrics " , self - > logID )
. detail ( " ReadOps " , rc - lastReadsComplete )
. detail ( " WriteOps " , wc - lastWritesComplete )
. detail ( " ReadQueue " , self - > readsRequested - rc )
. detail ( " WriteQueue " , self - > writesRequested - wc )
. detail ( " GlobalSQLiteMemoryHighWater " , ( int64_t ) sqlite3_memory_highwater ( 1 ) ) ;
TraceEvent ( " SpringCleaningMetrics " , self - > logID )
. detail ( " SpringCleaningCount " , self - > springCleaningStats . springCleaningCount )
. detail ( " LazyDeletePages " , self - > springCleaningStats . lazyDeletePages )
. detail ( " VacuumedPages " , self - > springCleaningStats . vacuumedPages )
. detail ( " SpringCleaningTime " , self - > springCleaningStats . springCleaningTime )
. detail ( " LazyDeleteTime " , self - > springCleaningStats . lazyDeleteTime )
. detail ( " VacuumTime " , self - > springCleaningStats . vacuumTime ) ;
lastReadsComplete = self - > readsComplete ;
lastWritesComplete = self - > writesComplete ;
}
}
ACTOR static Future < Void > stopOnError ( KeyValueStoreSQLite * self ) {
try {
2018-08-11 04:57:10 +08:00
wait ( self - > readThreads - > getError ( ) | | self - > writeThread - > getError ( ) ) ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_actor_cancelled )
throw ;
}
self - > readThreads - > stop ( ) ;
self - > writeThread - > stop ( ) ;
return Void ( ) ;
}
ACTOR static void doClose ( KeyValueStoreSQLite * self , bool deleteOnClose ) {
state Error error = success ( ) ;
try {
TraceEvent ( " KVClose " , self - > logID ) . detail ( " Del " , deleteOnClose ) ;
self - > starting . cancel ( ) ;
self - > cleaning . cancel ( ) ;
self - > logging . cancel ( ) ;
2018-08-11 04:57:10 +08:00
wait ( self - > readThreads - > stop ( ) & & self - > writeThread - > stop ( ) ) ;
2017-05-26 04:48:44 +08:00
if ( deleteOnClose ) {
2018-08-11 04:57:10 +08:00
wait ( IAsyncFileSystem : : filesystem ( ) - > incrementalDeleteFile ( self - > filename , true ) ) ;
wait ( IAsyncFileSystem : : filesystem ( ) - > incrementalDeleteFile ( self - > filename + " -wal " , false ) ) ;
2017-05-26 04:48:44 +08:00
}
} catch ( Error & e ) {
TraceEvent ( SevError , " KVDoCloseError " , self - > logID )
2018-08-02 05:30:57 +08:00
. error ( e , true )
. detail ( " Reason " , e . code ( ) = = error_code_platform_error ? " could not delete database " : " unknown " ) ;
2017-05-26 04:48:44 +08:00
error = e ;
}
TraceEvent ( " KVClosed " , self - > logID ) ;
if ( error . code ( ) ! = error_code_actor_cancelled ) {
self - > stopped . send ( Void ( ) ) ;
delete self ;
}
}
} ;
2017-05-27 08:43:28 +08:00
IKeyValueStore * keyValueStoreSQLite ( std : : string const & filename , UID logID , KeyValueStoreType storeType , bool checkChecksums , bool checkIntegrity ) {
return new KeyValueStoreSQLite ( filename , logID , storeType , checkChecksums , checkIntegrity ) ;
2017-05-26 04:48:44 +08:00
}
ACTOR Future < Void > cleanPeriodically ( KeyValueStoreSQLite * self ) {
2019-04-24 05:01:51 +08:00
wait ( delayJittered ( SERVER_KNOBS - > SPRING_CLEANING_NO_ACTION_INTERVAL ) ) ;
2017-05-26 04:48:44 +08:00
loop {
2019-04-24 05:01:51 +08:00
KeyValueStoreSQLite : : SpringCleaningWorkPerformed workPerformed = wait ( self - > doClean ( ) ) ;
double duration = std : : numeric_limits < double > : : max ( ) ;
if ( workPerformed . lazyDeletePages > = SERVER_KNOBS - > SPRING_CLEANING_LAZY_DELETE_BATCH_SIZE ) {
duration = std : : min ( duration , SERVER_KNOBS - > SPRING_CLEANING_LAZY_DELETE_INTERVAL ) ;
}
if ( workPerformed . vacuumedPages > 0 ) {
duration = std : : min ( duration , SERVER_KNOBS - > SPRING_CLEANING_VACUUM_INTERVAL ) ;
}
if ( duration = = std : : numeric_limits < double > : : max ( ) ) {
duration = SERVER_KNOBS - > SPRING_CLEANING_NO_ACTION_INTERVAL ;
}
wait ( delayJittered ( duration ) ) ;
2017-05-26 04:48:44 +08:00
}
}
ACTOR static Future < Void > startReadThreadsWhen ( KeyValueStoreSQLite * kv , Future < Void > onReady , UID id ) {
2018-08-11 04:57:10 +08:00
wait ( onReady ) ;
2017-05-26 04:48:44 +08:00
kv - > startReadThreads ( ) ;
return Void ( ) ;
}
sqlite3_vfs * vfsAsync ( ) ;
static int vfs_registered = 0 ;
2017-05-27 08:43:28 +08:00
KeyValueStoreSQLite : : KeyValueStoreSQLite ( std : : string const & filename , UID id , KeyValueStoreType storeType , bool checkChecksums , bool checkIntegrity )
2017-05-26 04:48:44 +08:00
: type ( storeType ) ,
filename ( filename ) ,
logID ( id ) ,
readThreads ( CoroThreadPool : : createThreadPool ( ) ) ,
writeThread ( CoroThreadPool : : createThreadPool ( ) ) ,
2017-05-27 08:43:28 +08:00
readsRequested ( 0 ) , writesRequested ( 0 ) , writesComplete ( 0 ) , diskBytesUsed ( 0 ) , freeListPages ( 0 )
2017-05-26 04:48:44 +08:00
{
stopOnErr = stopOnError ( this ) ;
# if SQLITE_THREADSAFE == 0
ASSERT ( writeThread - > isCoro ( ) ) ;
# endif
if ( ! vfs_registered & & writeThread - > isCoro ( ) )
if ( sqlite3_vfs_register ( vfsAsync ( ) , true ) ! = SQLITE_OK )
ASSERT ( false ) ;
//The DB file should not already be open
ASSERT ( ! vfsAsyncIsOpen ( filename ) ) ;
readCursors . resize ( 64 ) ; //< number of read threads
sqlite3_soft_heap_limit64 ( SERVER_KNOBS - > SOFT_HEAP_LIMIT ) ; // SOMEDAY: Is this a performance issue? Should we drop the cache sizes for individual threads?
2019-06-25 17:47:35 +08:00
TaskPriority taskId = g_network - > getCurrentTask ( ) ;
g_network - > setCurrentTask ( TaskPriority : : DiskWrite ) ;
2017-05-27 08:43:28 +08:00
writeThread - > addThread ( new Writer ( filename , type = = KeyValueStoreType : : SSD_BTREE_V2 , checkChecksums , checkIntegrity , writesComplete , springCleaningStats , diskBytesUsed , freeListPages , id , & readCursors ) ) ;
2017-05-26 04:48:44 +08:00
g_network - > setCurrentTask ( taskId ) ;
auto p = new Writer : : InitAction ( ) ;
auto f = p - > result . getFuture ( ) ;
writeThread - > post ( p ) ;
starting = startReadThreadsWhen ( this , f , logID ) ;
cleaning = cleanPeriodically ( this ) ;
logging = logPeriodically ( this ) ;
}
KeyValueStoreSQLite : : ~ KeyValueStoreSQLite ( ) {
//printf("dbf=%lld bytes, wal=%lld bytes\n", getFileSize((filename+".fdb").c_str()), getFileSize((filename+".fdb-wal").c_str()));
}
StorageBytes KeyValueStoreSQLite : : getStorageBytes ( ) {
int64_t free ;
int64_t total ;
g_network - > getDiskBytes ( parentDirectory ( filename ) , free , total ) ;
return StorageBytes ( free , total , diskBytesUsed , free + _PAGE_SIZE * freeListPages ) ;
}
void KeyValueStoreSQLite : : startReadThreads ( ) {
int nReadThreads = readCursors . size ( ) ;
2019-06-25 17:47:35 +08:00
TaskPriority taskId = g_network - > getCurrentTask ( ) ;
g_network - > setCurrentTask ( TaskPriority : : DiskRead ) ;
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < nReadThreads ; i + + )
readThreads - > addThread ( new Reader ( filename , type = = KeyValueStoreType : : SSD_BTREE_V2 , readsComplete , logID , & readCursors [ i ] ) ) ;
g_network - > setCurrentTask ( taskId ) ;
}
void KeyValueStoreSQLite : : set ( KeyValueRef keyValue , const Arena * arena ) {
+ + writesRequested ;
writeThread - > post ( new Writer : : SetAction ( keyValue ) ) ;
}
void KeyValueStoreSQLite : : clear ( KeyRangeRef range , const Arena * arena ) {
+ + writesRequested ;
writeThread - > post ( new Writer : : ClearAction ( range ) ) ;
}
Future < Void > KeyValueStoreSQLite : : commit ( bool sequential ) {
+ + writesRequested ;
auto p = new Writer : : CommitAction ;
auto f = p - > result . getFuture ( ) ;
writeThread - > post ( p ) ;
return f ;
}
Future < Optional < Value > > KeyValueStoreSQLite : : readValue ( KeyRef key , Optional < UID > debugID ) {
+ + readsRequested ;
auto p = new Reader : : ReadValueAction ( key , debugID ) ;
auto f = p - > result . getFuture ( ) ;
readThreads - > post ( p ) ;
return f ;
}
Future < Optional < Value > > KeyValueStoreSQLite : : readValuePrefix ( KeyRef key , int maxLength , Optional < UID > debugID ) {
+ + readsRequested ;
auto p = new Reader : : ReadValuePrefixAction ( key , maxLength , debugID ) ;
auto f = p - > result . getFuture ( ) ;
readThreads - > post ( p ) ;
return f ;
}
Future < Standalone < VectorRef < KeyValueRef > > > KeyValueStoreSQLite : : readRange ( KeyRangeRef keys , int rowLimit , int byteLimit ) {
+ + readsRequested ;
auto p = new Reader : : ReadRangeAction ( keys , rowLimit , byteLimit ) ;
auto f = p - > result . getFuture ( ) ;
readThreads - > post ( p ) ;
return f ;
}
2019-04-24 05:01:51 +08:00
Future < KeyValueStoreSQLite : : SpringCleaningWorkPerformed > KeyValueStoreSQLite : : doClean ( ) {
2017-05-26 04:48:44 +08:00
+ + writesRequested ;
auto p = new Writer : : SpringCleaningAction ;
auto f = p - > result . getFuture ( ) ;
writeThread - > post ( p ) ;
return f ;
}
void createTemplateDatabase ( ) {
ASSERT ( ! vfs_registered ) ;
SQLiteDB db1 ( " template.fdb " , false , false ) ;
SQLiteDB db2 ( " template.sqlite " , true , true ) ;
db1 . createFromScratch ( ) ;
db2 . createFromScratch ( ) ;
}
2017-05-27 08:43:28 +08:00
2018-09-11 01:51:41 +08:00
void GenerateIOLogChecksumFile ( std : : string filename ) {
if ( ! fileExists ( filename ) ) {
2017-05-27 08:43:28 +08:00
throw file_not_found ( ) ;
2018-09-11 01:51:41 +08:00
}
2017-05-27 08:43:28 +08:00
FILE * f = fopen ( filename . c_str ( ) , " r " ) ;
FILE * fout = fopen ( ( filename + " .checksums " ) . c_str ( ) , " w " ) ;
uint8_t buf [ 4096 ] ;
unsigned int c = 0 ;
while ( fread ( buf , 1 , 4096 , f ) > 0 )
fprintf ( fout , " %u %u \n " , c + + , hashlittle ( buf , 4096 , 0xab12fd93 ) ) ;
2017-06-01 08:03:15 +08:00
fclose ( f ) ;
2017-05-27 08:43:28 +08:00
fclose ( fout ) ;
}
// If integrity is true, a full btree integrity check is done.
// If integrity is false, only a scan of all pages to validate their checksums is done.
ACTOR Future < Void > KVFileCheck ( std : : string filename , bool integrity ) {
if ( ! fileExists ( filename ) )
throw file_not_found ( ) ;
StringRef kvFile ( filename ) ;
KeyValueStoreType type = KeyValueStoreType : : END ;
if ( kvFile . endsWith ( LiteralStringRef ( " .fdb " ) ) )
type = KeyValueStoreType : : SSD_BTREE_V1 ;
else if ( kvFile . endsWith ( LiteralStringRef ( " .sqlite " ) ) )
type = KeyValueStoreType : : SSD_BTREE_V2 ;
ASSERT ( type ! = KeyValueStoreType : : END ) ;
state IKeyValueStore * store = keyValueStoreSQLite ( filename , UID ( 0 , 0 ) , type , ! integrity , integrity ) ;
ASSERT ( store ! = nullptr ) ;
// Wait for integry check to finish
2019-02-13 08:07:17 +08:00
wait ( success ( store - > readValue ( StringRef ( ) ) ) ) ;
2017-05-27 08:43:28 +08:00
if ( store - > getError ( ) . isError ( ) )
2018-08-11 04:57:10 +08:00
wait ( store - > getError ( ) ) ;
2017-05-27 08:43:28 +08:00
Future < Void > c = store - > onClosed ( ) ;
store - > close ( ) ;
2018-08-11 04:57:10 +08:00
wait ( c ) ;
2017-05-27 08:43:28 +08:00
return Void ( ) ;
}