2017-05-26 04:48:44 +08:00
/*
* DiskQueue . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2018-10-20 01:30:13 +08:00
# include "fdbserver/IDiskQueue.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/IAsyncFile.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/Knobs.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/simulator.h"
2019-03-16 12:01:16 +08:00
# include "fdbrpc/crc32c.h"
2019-02-08 09:02:20 +08:00
# include "flow/genericactors.actor.h"
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
typedef bool ( * compare_pages ) ( void * , void * ) ;
typedef int64_t loc_t ;
2019-03-04 09:12:52 +08:00
// 0 -> 0
// 1 -> 4k
// 4k -> 4k
int64_t pageCeiling ( int64_t loc ) {
return ( loc + _PAGE_SIZE - 1 ) / _PAGE_SIZE * _PAGE_SIZE ;
}
// 0 -> 0
// 1 -> 0
// 4k -> 4k
int64_t pageFloor ( int64_t loc ) {
return loc / _PAGE_SIZE * _PAGE_SIZE ;
}
2017-05-26 04:48:44 +08:00
struct StringBuffer {
Standalone < StringRef > str ;
int reserved ;
UID id ;
StringBuffer ( UID fromFileID ) : reserved ( 0 ) , id ( fromFileID ) { }
int size ( ) const { return str . size ( ) ; }
StringRef & ref ( ) { return str ; }
void clear ( ) {
str = Standalone < StringRef > ( ) ;
reserved = 0 ;
}
void clearReserve ( int size ) {
str = Standalone < StringRef > ( ) ;
reserved = size ;
ref ( ) = StringRef ( new ( str . arena ( ) ) uint8_t [ size ] , 0 ) ;
}
void append ( StringRef x ) {
memcpy ( append ( x . size ( ) ) , x . begin ( ) , x . size ( ) ) ;
}
void * append ( int bytes ) {
ASSERT ( str . size ( ) + bytes < = reserved ) ;
void * p = const_cast < uint8_t * > ( str . end ( ) ) ;
ref ( ) = StringRef ( str . begin ( ) , str . size ( ) + bytes ) ;
return p ;
}
StringRef pop_front ( int bytes ) {
ASSERT ( bytes < = str . size ( ) ) ;
StringRef result = str . substr ( 0 , bytes ) ;
ref ( ) = str . substr ( bytes ) ;
return result ;
}
void alignReserve ( int alignment , int size ) {
ASSERT ( alignment & & ( alignment & ( alignment - 1 ) ) = = 0 ) ; // alignment is a power of two
if ( size > = reserved ) {
// SOMEDAY: Use a new arena and discard the old one after copying?
reserved = std : : max ( size , reserved * 2 ) ;
if ( reserved > 1e9 ) {
printf ( " WOAH! Huge allocation \n " ) ;
TraceEvent ( SevError , " StringBufferHugeAllocation " , id ) . detail ( " Alignment " , alignment ) . detail ( " Reserved " , reserved ) . backtrace ( ) ;
}
uint8_t * b = new ( str . arena ( ) ) uint8_t [ reserved + alignment - 1 ] ;
uint8_t * e = b + ( reserved + alignment - 1 ) ;
uint8_t * p = ( uint8_t * ) ( int64_t ( b + alignment - 1 ) & ~ ( alignment - 1 ) ) ; // first multiple of alignment greater than or equal to b
ASSERT ( p > = b & & p + reserved < = e & & int64_t ( p ) % alignment = = 0 ) ;
memcpy ( p , str . begin ( ) , str . size ( ) ) ;
ref ( ) = StringRef ( p , str . size ( ) ) ;
}
}
} ;
struct SyncQueue : ReferenceCounted < SyncQueue > {
SyncQueue ( int outstandingLimit , Reference < IAsyncFile > file )
: outstandingLimit ( outstandingLimit ) , file ( file )
{
for ( int i = 0 ; i < outstandingLimit ; i + + )
outstanding . push_back ( Void ( ) ) ;
}
Future < Void > onSync ( ) { // Future is set when all writes completed before the call to onSync are complete
if ( outstanding . size ( ) < = outstandingLimit )
outstanding . push_back ( waitAndSync ( this ) ) ;
return outstanding . back ( ) ;
}
private :
int outstandingLimit ;
Deque < Future < Void > > outstanding ;
Reference < IAsyncFile > file ;
ACTOR static Future < Void > waitAndSync ( SyncQueue * self ) {
2018-08-11 04:57:10 +08:00
wait ( self - > outstanding . front ( ) ) ;
2017-05-26 04:48:44 +08:00
self - > outstanding . pop_front ( ) ;
2018-08-11 04:57:10 +08:00
wait ( self - > file - > sync ( ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
} ;
2019-02-08 09:02:24 +08:00
// We use a Tracked instead of a Reference when the shutdown/destructor code would need to wait().
2019-02-08 09:02:18 +08:00
template < typename T >
class Tracked {
protected :
struct TrackMe : NonCopyable {
T * self ;
explicit TrackMe ( T * self ) : self ( self ) {
self - > actorCount + + ;
if ( self - > actorCount = = 1 ) self - > actorCountIsZero . set ( false ) ;
}
~ TrackMe ( ) {
self - > actorCount - - ;
if ( self - > actorCount = = 0 ) self - > actorCountIsZero . set ( true ) ;
}
} ;
Future < Void > onSafeToDestruct ( ) {
if ( actorCountIsZero . get ( ) ) {
return Void ( ) ;
} else {
return actorCountIsZero . onChange ( ) ;
}
}
private :
int actorCount = 0 ;
AsyncVar < bool > actorCountIsZero = true ;
} ;
class RawDiskQueue_TwoFiles : public Tracked < RawDiskQueue_TwoFiles > {
2017-05-26 04:48:44 +08:00
public :
2018-02-12 17:30:02 +08:00
RawDiskQueue_TwoFiles ( std : : string basename , std : : string fileExtension , UID dbgid , int64_t fileSizeWarningLimit )
2017-09-22 14:51:55 +08:00
: basename ( basename ) , fileExtension ( fileExtension ) , onError ( delayed ( error . getFuture ( ) ) ) , onStopped ( stopped . getFuture ( ) ) ,
2017-05-26 04:48:44 +08:00
readingFile ( - 1 ) , readingPage ( - 1 ) , writingPos ( - 1 ) , dbgid ( dbgid ) ,
2019-03-04 04:57:44 +08:00
dbg_file0BeginSeq ( 0 ) , fileExtensionBytes ( SERVER_KNOBS - > DISK_QUEUE_FILE_EXTENSION_BYTES ) ,
fileShrinkBytes ( SERVER_KNOBS - > DISK_QUEUE_FILE_SHRINK_BYTES ) , readingBuffer ( dbgid ) ,
2018-01-03 05:30:27 +08:00
readyToPush ( Void ( ) ) , fileSizeWarningLimit ( fileSizeWarningLimit ) , lastCommit ( Void ( ) ) , isFirstCommit ( true )
2017-05-26 04:48:44 +08:00
{
2019-03-04 04:57:44 +08:00
if ( BUGGIFY )
2019-05-11 05:01:52 +08:00
fileExtensionBytes = 1 < < 10 * deterministicRandom ( ) - > randomSkewedUInt32 ( 1 , 40 < < 10 ) ;
2019-03-04 04:57:44 +08:00
if ( BUGGIFY )
2019-05-11 05:01:52 +08:00
fileShrinkBytes = _PAGE_SIZE * deterministicRandom ( ) - > randomSkewedUInt32 ( 1 , 10 < < 10 ) ;
2017-05-26 04:48:44 +08:00
files [ 0 ] . dbgFilename = filename ( 0 ) ;
files [ 1 ] . dbgFilename = filename ( 1 ) ;
2019-02-08 09:19:56 +08:00
// We issue reads into firstPages, so it needs to be 4k aligned.
firstPages . reserve ( firstPages . arena ( ) , 2 ) ;
void * pageMemory = operator new ( sizeof ( Page ) * 3 , firstPages . arena ( ) ) ;
2019-03-16 12:01:14 +08:00
// firstPages is assumed to always be a valid page, and our initialization here is the only
// time that it would not contain a valid page. Whenever DiskQueue reaches in to look at
// these bytes, it only cares about `seq`, and having that be all 0xFF's means uninitialized
// pages will look like the ultimate end of the disk queue, rather than the beginning of it.
// This makes code fail in more immediate and obvious ways.
2019-02-08 09:19:56 +08:00
firstPages [ 0 ] = ( Page * ) ( ( ( ( uintptr_t ) pageMemory + 4095 ) / 4096 ) * 4096 ) ;
2019-03-16 12:01:14 +08:00
memset ( firstPages [ 0 ] , 0xFF , sizeof ( Page ) ) ;
2019-02-08 09:19:56 +08:00
firstPages [ 1 ] = ( Page * ) ( ( uintptr_t ) firstPages [ 0 ] + 4096 ) ;
2019-03-16 12:01:14 +08:00
memset ( firstPages [ 1 ] , 0xFF , sizeof ( Page ) ) ;
2017-05-26 04:48:44 +08:00
stallCount . init ( LiteralStringRef ( " RawDiskQueue.StallCount " ) ) ;
}
Future < Void > pushAndCommit ( StringRef pageData , StringBuffer * pageMem , uint64_t poppedPages ) {
return pushAndCommit ( this , pageData , pageMem , poppedPages ) ;
}
void stall ( ) {
stallCount + + ;
readyToPush = lastCommit ;
}
Future < Standalone < StringRef > > readFirstAndLastPages ( compare_pages compare ) { return readFirstAndLastPages ( this , compare ) ; }
void setStartPage ( int file , int64_t page ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( " RDQSetStart " , dbgid ) . detail ( " FileNum " , file ) . detail ( " PageNum " , page ) . detail ( " File0Name " , files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
readingFile = file ;
readingPage = page ;
}
Future < Void > setPoppedPage ( int file , int64_t page , int64_t debugSeq ) { return setPoppedPage ( this , file , page , debugSeq ) ; }
2019-02-08 09:02:15 +08:00
// FIXME: let the caller pass in where to write the data.
Future < Standalone < StringRef > > read ( int file , int page , int nPages ) { return read ( this , file , page , nPages ) ; }
2017-05-26 04:48:44 +08:00
Future < Standalone < StringRef > > readNextPage ( ) { return readNextPage ( this ) ; }
Future < Void > truncateBeforeLastReadPage ( ) { return truncateBeforeLastReadPage ( this ) ; }
Future < Void > getError ( ) { return onError ; }
Future < Void > onClosed ( ) { return onStopped ; }
void dispose ( ) { shutdown ( this , true ) ; }
void close ( ) { shutdown ( this , false ) ; }
StorageBytes getStorageBytes ( ) {
int64_t free ;
int64_t total ;
g_network - > getDiskBytes ( parentDirectory ( basename ) , free , total ) ;
return StorageBytes ( free , total , files [ 0 ] . size + files [ 1 ] . size , free ) ; // TODO: we could potentially do better in the available field by accounting for the unused pages at the end of the file
}
//private:
struct Page { uint8_t data [ _PAGE_SIZE ] ; } ;
struct File {
Reference < IAsyncFile > f ;
int64_t size ; // always a multiple of _PAGE_SIZE, even if the physical file isn't for some reason
int64_t popped ;
std : : string dbgFilename ;
Reference < SyncQueue > syncQueue ;
File ( ) : size ( - 1 ) , popped ( - 1 ) { }
void setFile ( Reference < IAsyncFile > f ) {
this - > f = f ;
this - > syncQueue = Reference < SyncQueue > ( new SyncQueue ( 1 , f ) ) ;
}
} ;
File files [ 2 ] ; // After readFirstAndLastPages(), files[0] is logically before files[1] (pushes are always into files[1])
2019-02-08 09:19:56 +08:00
Standalone < VectorRef < Page * > > firstPages ;
2017-05-26 04:48:44 +08:00
std : : string basename ;
2017-09-22 14:51:55 +08:00
std : : string fileExtension ;
std : : string filename ( int i ) const { return basename + format ( " %d.%s " , i , fileExtension . c_str ( ) ) ; }
2017-05-26 04:48:44 +08:00
UID dbgid ;
int64_t dbg_file0BeginSeq ;
2017-12-02 07:05:17 +08:00
int64_t fileSizeWarningLimit ;
2017-05-26 04:48:44 +08:00
Promise < Void > error , stopped ;
Future < Void > onError , onStopped ;
Future < Void > readyToPush ;
2017-12-07 04:31:07 +08:00
Future < Void > lastCommit ;
2018-01-03 05:30:27 +08:00
bool isFirstCommit ;
2017-05-26 04:48:44 +08:00
StringBuffer readingBuffer ; // Pages that have been read and not yet returned
int readingFile ; // i if the next page after readingBuffer should be read from files[i], 2 if recovery is complete
int64_t readingPage ; // Page within readingFile that is the next page after readingBuffer
int64_t writingPos ; // Position within files[1] that will be next written
int64_t fileExtensionBytes ;
2019-03-04 04:57:44 +08:00
int64_t fileShrinkBytes ;
2017-05-26 04:48:44 +08:00
Int64MetricHandle stallCount ;
Future < Void > truncateFile ( int file , int64_t pos ) { return truncateFile ( this , file , pos ) ; }
2019-05-08 15:01:29 +08:00
// FIXME: Merge this function with IAsyncFileSystem::incrementalDeleteFile().
ACTOR static void incrementalTruncate ( Reference < IAsyncFile > file ) {
state int64_t remainingFileSize = wait ( file - > size ( ) ) ;
for ( ; remainingFileSize > 0 ; remainingFileSize - = FLOW_KNOBS - > INCREMENTAL_DELETE_TRUNCATE_AMOUNT ) {
wait ( file - > truncate ( remainingFileSize ) ) ;
wait ( file - > sync ( ) ) ;
wait ( delay ( FLOW_KNOBS - > INCREMENTAL_DELETE_INTERVAL ) ) ;
}
2019-05-09 06:29:18 +08:00
TraceEvent ( " DiskQueueReplaceTruncateEnded " ) . detail ( " Filename " , file - > getFilename ( ) ) ;
2019-05-08 15:01:29 +08:00
}
ACTOR static Future < Reference < IAsyncFile > > replaceFile ( Reference < IAsyncFile > toReplace ) {
incrementalTruncate ( toReplace ) ;
Reference < IAsyncFile > _replacement = wait ( IAsyncFileSystem : : filesystem ( ) - > open ( toReplace - > getFilename ( ) , IAsyncFile : : OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_UNCACHED | IAsyncFile : : OPEN_UNBUFFERED | IAsyncFile : : OPEN_LOCK , 0 ) ) ;
state Reference < IAsyncFile > replacement = _replacement ;
wait ( replacement - > sync ( ) ) ;
return replacement ;
}
2019-05-08 07:48:41 +08:00
Future < Void > push ( StringRef pageData , vector < Reference < SyncQueue > > * toSync ) {
return push ( this , pageData , toSync ) ;
}
ACTOR static Future < Void > push ( RawDiskQueue_TwoFiles * self , StringRef pageData , vector < Reference < SyncQueue > > * toSync ) {
2017-05-26 04:48:44 +08:00
// Write the given data to the queue files, swapping or extending them if necessary.
// Don't do any syncs, but push the modified file(s) onto toSync.
2019-05-08 07:48:41 +08:00
ASSERT ( self - > readingFile = = 2 ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( pageData . size ( ) % _PAGE_SIZE = = 0 ) ;
ASSERT ( int64_t ( pageData . begin ( ) ) % _PAGE_SIZE = = 0 ) ;
2019-05-08 07:48:41 +08:00
ASSERT ( self - > writingPos % _PAGE_SIZE = = 0 ) ;
ASSERT ( self - > files [ 0 ] . size % _PAGE_SIZE = = 0 & & self - > files [ 1 ] . size % _PAGE_SIZE = = 0 ) ;
2017-05-26 04:48:44 +08:00
2019-05-08 15:01:29 +08:00
state vector < Future < Void > > waitfor ;
2017-05-26 04:48:44 +08:00
2019-05-08 07:48:41 +08:00
if ( pageData . size ( ) + self - > writingPos > self - > files [ 1 ] . size ) {
if ( self - > files [ 0 ] . popped = = self - > files [ 0 ] . size ) {
// Finish self->files[1] and swap
int p = self - > files [ 1 ] . size - self - > writingPos ;
2017-05-26 04:48:44 +08:00
if ( p > 0 ) {
2019-05-08 07:48:41 +08:00
toSync - > push_back ( self - > files [ 1 ] . syncQueue ) ;
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
. detail ( " WritingPos " , self - > writingPos ) . detail ( " WritingBytes " , p ) ; */
waitfor . push_back ( self - > files [ 1 ] . f - > write ( pageData . begin ( ) , p , self - > writingPos ) ) ;
2017-05-26 04:48:44 +08:00
pageData = pageData . substr ( p ) ;
}
2019-05-08 07:48:41 +08:00
self - > dbg_file0BeginSeq + = self - > files [ 0 ] . size ;
std : : swap ( self - > files [ 0 ] , self - > files [ 1 ] ) ;
std : : swap ( self - > firstPages [ 0 ] , self - > firstPages [ 1 ] ) ;
self - > files [ 1 ] . popped = 0 ;
self - > writingPos = 0 ;
const int64_t activeDataVolume = pageCeiling ( self - > files [ 0 ] . size - self - > files [ 0 ] . popped + self - > fileExtensionBytes + self - > fileShrinkBytes ) ;
2019-05-08 08:24:45 +08:00
const int64_t desiredMaxFileSize = std : : max ( activeDataVolume , SERVER_KNOBS - > TLOG_HARD_LIMIT_BYTES * 2 ) ;
if ( self - > files [ 1 ] . size > desiredMaxFileSize ) {
2019-05-08 07:48:41 +08:00
// Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes
2019-05-08 08:24:45 +08:00
int64_t maxShrink = std : : max ( pageFloor ( self - > files [ 1 ] . size - desiredMaxFileSize ) , self - > fileShrinkBytes ) ;
2019-05-08 15:01:29 +08:00
if ( maxShrink / SERVER_KNOBS - > DISK_QUEUE_FILE_EXTENSION_BYTES >
SERVER_KNOBS - > DISK_QUEUE_MAX_TRUNCATE_EXTENTS ) {
TEST ( true ) ; // Replacing DiskQueue file
2019-05-09 06:10:56 +08:00
TraceEvent ( " DiskQueueReplaceFile " , self - > dbgid ) . detail ( " Filename " , self - > files [ 1 ] . f - > getFilename ( ) ) . detail ( " OldFileSize " , self - > files [ 1 ] . size ) . detail ( " ElidedTruncateSize " , maxShrink ) ;
2019-05-08 15:01:29 +08:00
Reference < IAsyncFile > newFile = wait ( replaceFile ( self - > files [ 1 ] . f ) ) ;
self - > files [ 1 ] . setFile ( newFile ) ;
self - > files [ 1 ] . size = 0 ;
} else {
self - > files [ 1 ] . size - = maxShrink ;
waitfor . push_back ( self - > files [ 1 ] . f - > truncate ( self - > files [ 1 ] . size ) ) ;
}
2019-03-04 04:57:44 +08:00
}
2017-05-26 04:48:44 +08:00
} else {
2019-05-08 07:48:41 +08:00
// Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
2018-06-09 02:11:08 +08:00
. detail ( " ExtensionBytes " , fileExtensionBytes ) ; */
2019-05-08 07:48:41 +08:00
int64_t minExtension = pageData . size ( ) + self - > writingPos - self - > files [ 1 ] . size ;
self - > files [ 1 ] . size + = std : : min ( std : : max ( self - > fileExtensionBytes , minExtension ) , self - > files [ 0 ] . size + self - > files [ 1 ] . size + minExtension ) ;
waitfor . push_back ( self - > files [ 1 ] . f - > truncate ( self - > files [ 1 ] . size ) ) ;
2017-12-02 07:05:17 +08:00
2019-05-08 07:48:41 +08:00
if ( self - > fileSizeWarningLimit > 0 & & self - > files [ 1 ] . size > self - > fileSizeWarningLimit ) {
TraceEvent ( SevWarnAlways , " DiskQueueFileTooLarge " , self - > dbgid ) . suppressFor ( 1.0 ) . detail ( " Filename " , self - > filename ( 1 ) ) . detail ( " Size " , self - > files [ 1 ] . size ) ;
2017-12-02 07:05:17 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
2019-05-08 07:48:41 +08:00
if ( self - > writingPos = = 0 ) {
* self - > firstPages [ 1 ] = * ( const Page * ) pageData . begin ( ) ;
2019-02-08 09:19:56 +08:00
}
2019-05-08 07:48:41 +08:00
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
. detail ( " WritingPos " , self - > writingPos ) . detail ( " WritingBytes " , pageData . size ( ) ) ; */
self - > files [ 1 ] . size = std : : max ( self - > files [ 1 ] . size , self - > writingPos + pageData . size ( ) ) ;
toSync - > push_back ( self - > files [ 1 ] . syncQueue ) ;
waitfor . push_back ( self - > files [ 1 ] . f - > write ( pageData . begin ( ) , pageData . size ( ) , self - > writingPos ) ) ;
self - > writingPos + = pageData . size ( ) ;
2017-05-26 04:48:44 +08:00
2019-05-08 07:48:41 +08:00
wait ( waitForAll ( waitfor ) ) ;
return Void ( ) ;
2017-05-26 04:48:44 +08:00
}
ACTOR static UNCANCELLABLE Future < Void > pushAndCommit ( RawDiskQueue_TwoFiles * self , StringRef pageData , StringBuffer * pageMem , uint64_t poppedPages ) {
state Promise < Void > pushing , committed ;
state Promise < Void > errorPromise = self - > error ;
state std : : string filename = self - > files [ 0 ] . dbgFilename ;
state UID dbgid = self - > dbgid ;
state vector < Reference < SyncQueue > > syncFiles ;
2017-12-07 04:31:07 +08:00
state Future < Void > lastCommit = self - > lastCommit ;
2017-05-26 04:48:44 +08:00
try {
// pushing might need to wait for previous pushes to start (to maintain order) or for
// a previous commit to finish if stall() was called
Future < Void > ready = self - > readyToPush ;
self - > readyToPush = pushing . getFuture ( ) ;
2017-12-07 04:31:07 +08:00
self - > lastCommit = committed . getFuture ( ) ;
2017-05-26 04:48:44 +08:00
2018-01-03 05:30:27 +08:00
// the first commit must complete before we can pipeline other commits so that we will always have a valid page to binary search to
if ( self - > isFirstCommit ) {
self - > isFirstCommit = false ;
self - > readyToPush = self - > lastCommit ;
}
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( ready ) ;
2017-05-26 04:48:44 +08:00
TEST ( pageData . size ( ) > sizeof ( Page ) ) ; // push more than one page of data
2019-05-08 07:48:41 +08:00
Future < Void > pushed = self - > push ( pageData , & syncFiles ) ;
2017-05-26 04:48:44 +08:00
pushing . send ( Void ( ) ) ;
2019-05-08 15:01:29 +08:00
wait ( pushed ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( syncFiles . size ( ) > = 1 & & syncFiles . size ( ) < = 2 ) ;
TEST ( 2 = = syncFiles . size ( ) ) ; // push spans both files
2019-03-16 12:01:15 +08:00
delete pageMem ;
2017-05-26 04:48:44 +08:00
pageMem = 0 ;
Future < Void > sync = syncFiles [ 0 ] - > onSync ( ) ;
for ( int i = 1 ; i < syncFiles . size ( ) ; i + + ) sync = sync & & syncFiles [ i ] - > onSync ( ) ;
2018-08-11 04:57:10 +08:00
wait ( sync ) ;
wait ( lastCommit ) ;
2017-12-19 07:31:59 +08:00
//Calling check_yield instead of yield to avoid a destruction ordering problem in simulation
if ( g_network - > check_yield ( g_network - > getCurrentTask ( ) ) ) {
2018-08-11 04:57:10 +08:00
wait ( delay ( 0 , g_network - > getCurrentTask ( ) ) ) ;
2017-12-19 07:31:59 +08:00
}
2017-05-26 04:48:44 +08:00
self - > updatePopped ( poppedPages * sizeof ( Page ) ) ;
/*TraceEvent("RDQCommitEnd", self->dbgid).detail("DeltaPopped", poppedPages*sizeof(Page)).detail("PoppedCommitted", self->dbg_file0BeginSeq + self->files[0].popped + self->files[1].popped)
. detail ( " File0Size " , self - > files [ 0 ] . size ) . detail ( " File1Size " , self - > files [ 1 ] . size )
. detail ( " File0Name " , self - > files [ 0 ] . dbgFilename ) . detail ( " SyncedFiles " , syncFiles . size ( ) ) ; */
2017-08-24 04:56:18 +08:00
committed . send ( Void ( ) ) ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
2019-03-16 12:01:15 +08:00
delete pageMem ;
2017-05-26 04:48:44 +08:00
TEST ( true ) ; // push error
TEST ( 2 = = syncFiles . size ( ) ) ; // push spanning both files error
2018-08-02 05:30:57 +08:00
TraceEvent ( SevError , " RDQPushAndCommitError " , dbgid ) . error ( e , true ) . detail ( " InitialFilename0 " , filename ) ;
2017-05-26 04:48:44 +08:00
if ( errorPromise . canBeSet ( ) ) errorPromise . sendError ( e ) ;
if ( pushing . canBeSet ( ) ) pushing . sendError ( e ) ;
if ( committed . canBeSet ( ) ) committed . sendError ( e ) ;
throw e ;
}
return Void ( ) ;
}
void updatePopped ( int64_t popped ) {
int64_t pop0 = std : : min ( popped , files [ 0 ] . size - files [ 0 ] . popped ) ;
files [ 0 ] . popped + = pop0 ;
files [ 1 ] . popped + = popped - pop0 ;
}
ACTOR static Future < Void > setPoppedPage ( RawDiskQueue_TwoFiles * self , int file , int64_t page , int64_t debugSeq ) {
self - > files [ file ] . popped = page * sizeof ( Page ) ;
if ( file ) self - > files [ 0 ] . popped = self - > files [ 0 ] . size ;
else self - > files [ 1 ] . popped = 0 ;
self - > dbg_file0BeginSeq = debugSeq - self - > files [ 1 ] . popped - self - > files [ 0 ] . popped ;
//If we are starting in file 1, we truncate file 0 in case it has been corrupted.
// In particular, we are trying to avoid a dropped or corrupted write to the first page of file 0 causing it to be sequenced before file 1,
// when in fact it contains many pages that follow file 1. These ok pages may be incorrectly read if the machine dies after overwritting the
// first page of file 0 and is then recovered
if ( file = = 1 )
2018-08-11 04:57:10 +08:00
wait ( self - > truncateFile ( self , 0 , 0 ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
ACTOR static Future < Void > openFiles ( RawDiskQueue_TwoFiles * self ) {
state vector < Future < Reference < IAsyncFile > > > fs ;
for ( int i = 0 ; i < 2 ; i + + )
fs . push_back ( IAsyncFileSystem : : filesystem ( ) - > open ( self - > filename ( i ) , IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_UNCACHED | IAsyncFile : : OPEN_UNBUFFERED | IAsyncFile : : OPEN_LOCK , 0 ) ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAllReady ( fs ) ) ;
2017-05-26 04:48:44 +08:00
// Treatment of errors here is important. If only one of the two files is present
// (due to a power failure during creation or deletion, or administrative error) we don't want to
// open the queue!
if ( ! fs [ 0 ] . isError ( ) & & ! fs [ 1 ] . isError ( ) ) {
// Both files were opened OK: success
} else if ( fs [ 0 ] . isError ( ) & & fs [ 0 ] . getError ( ) . code ( ) = = error_code_file_not_found & &
fs [ 1 ] . isError ( ) & & fs [ 1 ] . getError ( ) . code ( ) = = error_code_file_not_found )
{
// Neither file was found: we can create a new queue
// OPEN_ATOMIC_WRITE_AND_CREATE defers creation (using a .part file) until the calls to sync() below
TraceEvent ( " DiskQueueCreate " ) . detail ( " File0 " , self - > filename ( 0 ) ) ;
for ( int i = 0 ; i < 2 ; i + + )
fs [ i ] = IAsyncFileSystem : : filesystem ( ) - > open ( self - > filename ( i ) , IAsyncFile : : OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile : : OPEN_CREATE | IAsyncFile : : OPEN_READWRITE | IAsyncFile : : OPEN_UNCACHED | IAsyncFile : : OPEN_UNBUFFERED | IAsyncFile : : OPEN_LOCK , 0600 ) ;
// Any error here is fatal
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( fs ) ) ;
2017-05-26 04:48:44 +08:00
// sync on each file to actually create it will be done below
} else {
// One file had a more serious error or one file is present and the other is not. Die.
if ( ! fs [ 0 ] . isError ( ) | | ( fs [ 1 ] . isError ( ) & & fs [ 1 ] . getError ( ) . code ( ) ! = error_code_file_not_found ) )
throw fs [ 1 ] . getError ( ) ;
else
throw fs [ 0 ] . getError ( ) ;
}
// fsync both files. This is necessary to trigger atomic file creation in the creation case above.
// It also permits the recovery code to assume that whatever it reads is durable. Otherwise a prior
// process could have written (but not synchronized) data to the file which we will read but which
// might not survive a reboot. The recovery code assumes otherwise and could corrupt the disk.
vector < Future < Void > > syncs ;
for ( int i = 0 ; i < fs . size ( ) ; i + + )
syncs . push_back ( fs [ i ] . get ( ) - > sync ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( syncs ) ) ;
2017-05-26 04:48:44 +08:00
// Successfully opened or created; fill in self->files[]
for ( int i = 0 ; i < 2 ; i + + )
self - > files [ i ] . setFile ( fs [ i ] . get ( ) ) ;
return Void ( ) ;
}
ACTOR static void shutdown ( RawDiskQueue_TwoFiles * self , bool deleteFiles ) {
// Wait for all reads and writes on the file, and all actors referencing self, to be finished
state Error error = success ( ) ;
try {
2019-02-13 08:07:17 +08:00
wait ( success ( errorOr ( self - > lastCommit ) ) ) ;
2019-02-08 09:02:18 +08:00
wait ( self - > onSafeToDestruct ( ) ) ;
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < 2 ; i + + )
self - > files [ i ] . f . clear ( ) ;
if ( deleteFiles ) {
TraceEvent ( " DiskQueueShutdownDeleting " , self - > dbgid )
. detail ( " File0 " , self - > filename ( 0 ) )
. detail ( " File1 " , self - > filename ( 1 ) ) ;
2018-08-11 04:57:10 +08:00
wait ( IAsyncFileSystem : : filesystem ( ) - > incrementalDeleteFile ( self - > filename ( 0 ) , false ) ) ;
wait ( IAsyncFileSystem : : filesystem ( ) - > incrementalDeleteFile ( self - > filename ( 1 ) , true ) ) ;
2017-05-26 04:48:44 +08:00
}
TraceEvent ( " DiskQueueShutdownComplete " , self - > dbgid )
. detail ( " DeleteFiles " , deleteFiles )
. detail ( " File0 " , self - > filename ( 0 ) ) ;
} catch ( Error & e ) {
TraceEvent ( SevError , " DiskQueueShutdownError " , self - > dbgid )
2018-08-02 05:30:57 +08:00
. error ( e , true )
. detail ( " Reason " , e . code ( ) = = error_code_platform_error ? " could not delete database " : " unknown " ) ;
2017-05-26 04:48:44 +08:00
error = e ;
}
if ( error . code ( ) ! = error_code_actor_cancelled ) {
2017-08-26 01:12:58 +08:00
if ( self - > stopped . canBeSet ( ) ) self - > stopped . send ( Void ( ) ) ;
if ( self - > error . canBeSet ( ) ) self - > error . send ( Never ( ) ) ;
2017-05-26 04:48:44 +08:00
delete self ;
}
}
ACTOR static UNCANCELLABLE Future < Standalone < StringRef > > readFirstAndLastPages ( RawDiskQueue_TwoFiles * self , compare_pages compare ) {
state TrackMe trackMe ( self ) ;
try {
// Open both files or create both files
2018-08-11 04:57:10 +08:00
wait ( openFiles ( self ) ) ;
2017-05-26 04:48:44 +08:00
// Get the file sizes
vector < Future < int64_t > > fsize ;
for ( int i = 0 ; i < 2 ; i + + )
fsize . push_back ( self - > files [ i ] . f - > size ( ) ) ;
vector < int64_t > file_sizes = wait ( getAll ( fsize ) ) ;
for ( int i = 0 ; i < 2 ; i + + ) {
// SOMEDAY: If the file size is not a multiple of page size, it may never be shortened. Change this?
self - > files [ i ] . size = file_sizes [ i ] - file_sizes [ i ] % sizeof ( Page ) ;
ASSERT ( self - > files [ i ] . size % sizeof ( Page ) = = 0 ) ;
}
// Read the first pages
vector < Future < int > > reads ;
for ( int i = 0 ; i < 2 ; i + + )
if ( self - > files [ i ] . size > 0 )
2019-02-08 09:19:56 +08:00
reads . push_back ( self - > files [ i ] . f - > read ( self - > firstPages [ i ] , sizeof ( Page ) , 0 ) ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( reads ) ) ;
2017-05-26 04:48:44 +08:00
// Determine which file comes first
2019-02-08 09:19:56 +08:00
if ( compare ( self - > firstPages [ 1 ] , self - > firstPages [ 0 ] ) ) {
std : : swap ( self - > firstPages [ 0 ] , self - > firstPages [ 1 ] ) ;
2017-05-26 04:48:44 +08:00
std : : swap ( self - > files [ 0 ] , self - > files [ 1 ] ) ;
}
2019-03-16 12:01:14 +08:00
if ( ! compare ( self - > firstPages [ 0 ] , self - > firstPages [ 0 ] ) ) {
memset ( self - > firstPages [ 0 ] , 0xFF , sizeof ( Page ) ) ;
}
2019-02-08 09:19:56 +08:00
if ( ! compare ( self - > firstPages [ 1 ] , self - > firstPages [ 1 ] ) ) {
2017-05-26 04:48:44 +08:00
// Both files are invalid... the queue is empty!
// Begin pushing at the beginning of files[1]
//Truncate both files, since perhaps only the first pages are corrupted. This avoids cases where overwritting the first page and then terminating makes
//subsequent pages valid upon recovery.
vector < Future < Void > > truncates ;
for ( int i = 0 ; i < 2 ; + + i )
if ( self - > files [ i ] . size > 0 )
truncates . push_back ( self - > truncateFile ( self , i , 0 ) ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( truncates ) ) ;
2017-05-26 04:48:44 +08:00
self - > files [ 0 ] . popped = self - > files [ 0 ] . size ;
self - > files [ 1 ] . popped = 0 ;
2019-03-16 12:01:14 +08:00
memset ( self - > firstPages [ 1 ] , 0xFF , sizeof ( Page ) ) ;
2017-05-26 04:48:44 +08:00
self - > writingPos = 0 ;
self - > readingFile = 2 ;
return Standalone < StringRef > ( ) ;
}
2019-02-08 09:19:56 +08:00
// A page in files[1] is "valid" iff compare(self->firstPages[1], page)
2017-05-26 04:48:44 +08:00
// Binary search to find a page in files[1] that is "valid" but the next page is not valid
// Invariant: the page at begin is valid, and the page at end is invalid
state int64_t begin = 0 ;
state int64_t end = self - > files [ 1 ] . size / sizeof ( Page ) ;
2019-02-08 09:19:56 +08:00
state Standalone < StringRef > middlePageAllocation = makeAlignedString ( sizeof ( Page ) , sizeof ( Page ) ) ;
state Page * middlePage = ( Page * ) middlePageAllocation . begin ( ) ;
2017-05-26 04:48:44 +08:00
while ( begin + 1 ! = end ) {
state int64_t middle = ( begin + end ) / 2 ;
ASSERT ( middle > begin & & middle < end ) ; // So the loop always changes begin or end
int len = wait ( self - > files [ 1 ] . f - > read ( middlePage , sizeof ( Page ) , middle * sizeof ( Page ) ) ) ;
ASSERT ( len = = sizeof ( Page ) ) ;
2019-02-08 09:19:56 +08:00
bool middleValid = compare ( self - > firstPages [ 1 ] , middlePage ) ;
2017-05-26 04:48:44 +08:00
2018-06-09 02:11:08 +08:00
TraceEvent ( " RDQBS " , self - > dbgid ) . detail ( " Begin " , begin ) . detail ( " End " , end ) . detail ( " Middle " , middle ) . detail ( " Valid " , middleValid ) . detail ( " File0Name " , self - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
if ( middleValid )
begin = middle ;
else
end = middle ;
}
// Now by the invariant and the loop condition, begin is a valid page and begin+1 is an invalid page
// Check that begin+1 is invalid
2019-02-08 09:19:56 +08:00
int len1 = wait ( self - > files [ 1 ] . f - > read ( middlePage , sizeof ( Page ) , ( begin + 1 ) * sizeof ( Page ) ) ) ;
ASSERT ( ! ( len1 = = sizeof ( Page ) & & compare ( self - > firstPages [ 1 ] , middlePage ) ) ) ;
2017-05-26 04:48:44 +08:00
// Read it
2019-02-08 09:19:56 +08:00
int len2 = wait ( self - > files [ 1 ] . f - > read ( middlePage , sizeof ( Page ) , begin * sizeof ( Page ) ) ) ;
ASSERT ( len2 = = sizeof ( Page ) & & compare ( self - > firstPages [ 1 ] , middlePage ) ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( " RDQEndFound " , self - > dbgid ) . detail ( " File0Name " , self - > files [ 0 ] . dbgFilename ) . detail ( " Pos " , begin ) . detail ( " FileSize " , self - > files [ 1 ] . size ) ;
2019-02-08 09:19:56 +08:00
return middlePageAllocation ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
bool ok = e . code ( ) = = error_code_file_not_found ;
2018-08-02 05:30:57 +08:00
TraceEvent ( ok ? SevInfo : SevError , " RDQReadFirstAndLastPagesError " , self - > dbgid ) . error ( e , true ) . detail ( " File0Name " , self - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
if ( ! self - > error . isSet ( ) ) self - > error . sendError ( e ) ;
throw ;
}
}
2019-02-08 09:02:15 +08:00
ACTOR static Future < Standalone < StringRef > > read ( RawDiskQueue_TwoFiles * self , int file , int pageOffset , int nPages ) {
state TrackMe trackMe ( self ) ;
state const size_t bytesRequested = nPages * sizeof ( Page ) ;
state Standalone < StringRef > result = makeAlignedString ( sizeof ( Page ) , bytesRequested ) ;
2019-02-08 09:02:20 +08:00
if ( file = = 1 ) ASSERT_WE_THINK ( pageOffset * sizeof ( Page ) + bytesRequested < = self - > writingPos ) ;
2019-02-08 09:02:15 +08:00
int bytesRead = wait ( self - > files [ file ] . f - > read ( mutateString ( result ) , bytesRequested , pageOffset * sizeof ( Page ) ) ) ;
ASSERT_WE_THINK ( bytesRead = = bytesRequested ) ;
return result ;
}
2017-05-26 04:48:44 +08:00
Future < int > fillReadingBuffer ( ) {
// If we're right at the end of a file...
if ( readingPage * sizeof ( Page ) > = ( size_t ) files [ readingFile ] . size ) {
readingFile + + ;
readingPage = 0 ;
if ( readingFile > = 2 ) {
// Recovery complete
readingBuffer . clear ( ) ;
writingPos = files [ 1 ] . size ;
return 0 ;
}
}
// Read up to 1MB into readingBuffer
2019-05-11 05:01:52 +08:00
int len = std : : min < int64_t > ( ( files [ readingFile ] . size / sizeof ( Page ) - readingPage ) * sizeof ( Page ) , BUGGIFY_WITH_PROB ( 1.0 ) ? sizeof ( Page ) * deterministicRandom ( ) - > randomInt ( 1 , 4 ) : ( 1 < < 20 ) ) ;
2017-05-26 04:48:44 +08:00
readingBuffer . clear ( ) ;
readingBuffer . alignReserve ( sizeof ( Page ) , len ) ;
void * p = readingBuffer . append ( len ) ;
auto pos = readingPage * sizeof ( Page ) ;
readingPage + = len / sizeof ( Page ) ;
ASSERT ( int64_t ( p ) % sizeof ( Page ) = = 0 ) ;
return files [ readingFile ] . f - > read ( p , len , pos ) ;
}
ACTOR static UNCANCELLABLE Future < Standalone < StringRef > > readNextPage ( RawDiskQueue_TwoFiles * self ) {
state TrackMe trackMe ( self ) ;
try {
ASSERT ( self - > readingFile < 2 ) ;
ASSERT ( self - > files [ 0 ] . f & & self - > files [ 1 ] . f ) ;
if ( ! self - > readingBuffer . size ( ) ) {
state Future < Void > f = Void ( ) ;
2019-05-11 05:01:52 +08:00
//if (BUGGIFY) f = delay( deterministicRandom()->random01() * 0.1 );
2017-05-26 04:48:44 +08:00
int read = wait ( self - > fillReadingBuffer ( ) ) ;
ASSERT ( read = = self - > readingBuffer . size ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( f ) ;
2017-05-26 04:48:44 +08:00
}
if ( ! self - > readingBuffer . size ( ) ) return Standalone < StringRef > ( ) ;
ASSERT ( self - > readingBuffer . size ( ) > = sizeof ( Page ) ) ;
Standalone < StringRef > result = self - > readingBuffer . pop_front ( sizeof ( Page ) ) ;
return result ;
} catch ( Error & e ) {
TEST ( true ) ; // Read next page error
2018-08-02 05:30:57 +08:00
TraceEvent ( SevError , " RDQReadNextPageError " , self - > dbgid ) . error ( e , true ) . detail ( " File0Name " , self - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
if ( ! self - > error . isSet ( ) ) self - > error . sendError ( e ) ;
throw ;
}
}
ACTOR static UNCANCELLABLE Future < Void > truncateFile ( RawDiskQueue_TwoFiles * self , int file , int64_t pos ) {
state TrackMe trackMe ( self ) ;
TraceEvent ( " DQTruncateFile " , self - > dbgid ) . detail ( " File " , file ) . detail ( " Pos " , pos ) . detail ( " File0Name " , self - > files [ 0 ] . dbgFilename ) ;
2017-11-30 09:24:04 +08:00
state Reference < IAsyncFile > f = self - > files [ file ] . f ; // Hold onto a reference in the off-chance that the DQ is removed from underneath us.
2019-02-08 09:19:56 +08:00
if ( pos = = 0 ) {
2019-03-16 12:01:14 +08:00
memset ( self - > firstPages [ file ] , 0xFF , _PAGE_SIZE ) ;
2019-02-08 09:19:56 +08:00
}
2018-08-11 04:57:10 +08:00
wait ( f - > zeroRange ( pos , self - > files [ file ] . size - pos ) ) ;
wait ( self - > files [ file ] . syncQueue - > onSync ( ) ) ;
2017-11-30 09:24:04 +08:00
// We intentionally don't return the f->zero future, so that TrackMe is destructed after f->zero finishes.
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
ACTOR static Future < Void > truncateBeforeLastReadPage ( RawDiskQueue_TwoFiles * self ) {
try {
state int file = self - > readingFile ;
state int64_t pos = ( self - > readingPage - self - > readingBuffer . size ( ) / sizeof ( Page ) - 1 ) * sizeof ( Page ) ;
state vector < Future < Void > > commits ;
state bool swap = file = = 0 ;
TEST ( file = = 0 ) ; // truncate before last read page on file 0
TEST ( file = = 1 & & pos ! = self - > files [ 1 ] . size ) ; // truncate before last read page on file 1
self - > readingFile = 2 ;
self - > readingBuffer . clear ( ) ;
self - > writingPos = pos ;
while ( file < 2 ) {
commits . push_back ( self - > truncateFile ( self , file , pos ) ) ;
file + + ;
pos = 0 ;
}
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( commits ) ) ;
2017-05-26 04:48:44 +08:00
if ( swap ) {
std : : swap ( self - > files [ 0 ] , self - > files [ 1 ] ) ;
2019-02-08 09:19:56 +08:00
std : : swap ( self - > firstPages [ 0 ] , self - > firstPages [ 1 ] ) ;
2017-05-26 04:48:44 +08:00
self - > files [ 0 ] . popped = self - > files [ 0 ] . size ;
}
return Void ( ) ;
} catch ( Error & e ) {
2018-08-02 05:30:57 +08:00
TraceEvent ( SevError , " RDQTruncateBeforeLastReadPageError " , self - > dbgid ) . error ( e ) . detail ( " File0Name " , self - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
if ( ! self - > error . isSet ( ) ) self - > error . sendError ( e ) ;
throw ;
}
}
} ;
2019-02-08 09:02:19 +08:00
class DiskQueue : public IDiskQueue , public Tracked < DiskQueue > {
2017-05-26 04:48:44 +08:00
public :
2019-02-08 09:02:25 +08:00
// FIXME: Is setting lastCommittedSeq to -1 instead of 0 necessary?
2019-03-16 12:01:16 +08:00
DiskQueue ( std : : string basename , std : : string fileExtension , UID dbgid , DiskQueueVersion diskQueueVersion , int64_t fileSizeWarningLimit )
: rawQueue ( new RawDiskQueue_TwoFiles ( basename , fileExtension , dbgid , fileSizeWarningLimit ) ) , dbgid ( dbgid ) , diskQueueVersion ( diskQueueVersion ) , anyPopped ( false ) , nextPageSeq ( 0 ) , poppedSeq ( 0 ) , lastPoppedSeq ( 0 ) ,
2019-02-08 09:02:25 +08:00
nextReadLocation ( - 1 ) , readBufPage ( NULL ) , readBufPos ( 0 ) , pushed_page_buffer ( NULL ) , recovered ( false ) , initialized ( false ) , lastCommittedSeq ( - 1 ) , warnAlwaysForMemory ( true )
2017-05-26 04:48:44 +08:00
{
}
virtual location push ( StringRef contents ) {
ASSERT ( recovered ) ;
uint8_t const * begin = contents . begin ( ) ;
uint8_t const * end = contents . end ( ) ;
TEST ( contents . size ( ) & & pushedPageCount ( ) ) ; // More than one push between commits
TEST ( contents . size ( ) > = 4 & & pushedPageCount ( ) & & backPage ( ) . remainingCapacity ( ) < 4 ) ; // Push right at the end of a page, possibly splitting size
while ( begin ! = end ) {
if ( ! pushedPageCount ( ) | | ! backPage ( ) . remainingCapacity ( ) ) addEmptyPage ( ) ;
auto & p = backPage ( ) ;
int s = std : : min < int > ( p . remainingCapacity ( ) , end - begin ) ;
memcpy ( p . payload + p . payloadSize , begin , s ) ;
p . payloadSize + = s ;
begin + = s ;
}
return endLocation ( ) ;
}
2019-02-08 09:02:27 +08:00
2017-05-26 04:48:44 +08:00
virtual void pop ( location upTo ) {
ASSERT ( ! upTo . hi ) ;
ASSERT ( ! recovered | | upTo . lo < = endLocation ( ) ) ;
// The following ASSERT is NOT part of the intended contract of IDiskQueue, but alerts the user to a known bug where popping
// into uncommitted pages can cause a durability failure.
// FIXME: Remove this ASSERT when popping into uncommitted pages is fixed
if ( upTo . lo > lastCommittedSeq ) {
TraceEvent ( SevError , " DQPopUncommittedData " , dbgid )
. detail ( " UpTo " , upTo )
. detail ( " LastCommittedSeq " , lastCommittedSeq )
2018-06-09 02:11:08 +08:00
. detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
}
if ( upTo . lo > poppedSeq ) {
poppedSeq = upTo . lo ;
anyPopped = true ;
}
}
2019-03-16 12:01:18 +08:00
virtual Future < Standalone < StringRef > > read ( location from , location to , CheckHashes ch ) { return read ( this , from , to , ch ) ; }
2019-02-08 09:02:27 +08:00
2017-05-26 04:48:44 +08:00
int getMaxPayload ( ) {
return Page : : maxPayload ;
}
virtual int getCommitOverhead ( ) {
if ( ! pushedPageCount ( ) ) {
if ( ! anyPopped )
return 0 ;
return Page : : maxPayload ;
}
else
return backPage ( ) . remainingCapacity ( ) ;
}
virtual Future < Void > commit ( ) {
ASSERT ( recovered ) ;
if ( ! pushedPageCount ( ) ) {
if ( ! anyPopped ) return Void ( ) ;
addEmptyPage ( ) ;
}
2018-07-28 16:19:40 +08:00
anyPopped = false ;
2017-05-26 04:48:44 +08:00
backPage ( ) . popped = poppedSeq ;
backPage ( ) . zeroPad ( ) ;
backPage ( ) . updateHash ( ) ;
if ( pushedPageCount ( ) > = 8000 ) {
TraceEvent ( warnAlwaysForMemory ? SevWarnAlways : SevWarn , " DiskQueueMemoryWarning " , dbgid )
2018-08-02 05:30:57 +08:00
. suppressFor ( 1.0 )
2018-06-09 02:11:08 +08:00
. detail ( " PushedPages " , pushedPageCount ( ) )
. detail ( " NextPageSeq " , nextPageSeq )
2017-05-26 04:48:44 +08:00
. detail ( " Details " , format ( " %d pages " , pushedPageCount ( ) ) )
2018-08-02 05:30:57 +08:00
. detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
if ( g_network - > isSimulated ( ) )
warnAlwaysForMemory = false ;
}
2018-06-09 02:11:08 +08:00
/*TraceEvent("DQCommit", dbgid).detail("Pages", pushedPageCount()).detail("LastPoppedSeq", lastPoppedSeq).detail("PoppedSeq", poppedSeq).detail("NextPageSeq", nextPageSeq)
. detail ( " RawFile0Size " , rawQueue - > files [ 0 ] . size ) . detail ( " RawFile1Size " , rawQueue - > files [ 1 ] . size ) . detail ( " WritingPos " , rawQueue - > writingPos )
2017-05-26 04:48:44 +08:00
. detail ( " RawFile0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ; */
lastCommittedSeq = backPage ( ) . endSeq ( ) ;
auto f = rawQueue - > pushAndCommit ( pushed_page_buffer - > ref ( ) , pushed_page_buffer , poppedSeq / sizeof ( Page ) - lastPoppedSeq / sizeof ( Page ) ) ;
lastPoppedSeq = poppedSeq ;
pushed_page_buffer = 0 ;
return f ;
}
2018-07-28 16:19:40 +08:00
2017-05-26 04:48:44 +08:00
void stall ( ) {
rawQueue - > stall ( ) ;
}
2019-03-16 12:01:22 +08:00
virtual Future < bool > initializeRecovery ( location recoverAt ) { return initializeRecovery ( this , recoverAt ) ; }
2017-05-26 04:48:44 +08:00
virtual Future < Standalone < StringRef > > readNext ( int bytes ) { return readNext ( this , bytes ) ; }
2019-02-08 09:02:25 +08:00
// FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs
// to be changed to understand the new intiailizeRecovery protocol.
2017-05-26 04:48:44 +08:00
virtual location getNextReadLocation ( ) { return nextReadLocation ; }
2019-02-08 09:02:30 +08:00
virtual location getNextCommitLocation ( ) { ASSERT ( initialized ) ; return lastCommittedSeq + sizeof ( Page ) ; }
virtual location getNextPushLocation ( ) { ASSERT ( initialized ) ; return endLocation ( ) ; }
2017-05-26 04:48:44 +08:00
virtual Future < Void > getError ( ) { return rawQueue - > getError ( ) ; }
virtual Future < Void > onClosed ( ) { return rawQueue - > onClosed ( ) ; }
2019-02-08 09:02:19 +08:00
2017-05-26 04:48:44 +08:00
virtual void dispose ( ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( " DQDestroy " , dbgid ) . detail ( " LastPoppedSeq " , lastPoppedSeq ) . detail ( " PoppedSeq " , poppedSeq ) . detail ( " NextPageSeq " , nextPageSeq ) . detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2019-02-08 09:02:19 +08:00
dispose ( this ) ;
}
ACTOR static void dispose ( DiskQueue * self ) {
wait ( self - > onSafeToDestruct ( ) ) ;
TraceEvent ( " DQDestroyDone " , self - > dbgid ) . detail ( " File0Name " , self - > rawQueue - > files [ 0 ] . dbgFilename ) ;
self - > rawQueue - > dispose ( ) ;
delete self ;
2017-05-26 04:48:44 +08:00
}
2019-02-08 09:02:19 +08:00
2017-05-26 04:48:44 +08:00
virtual void close ( ) {
TraceEvent ( " DQClose " , dbgid )
2018-06-09 02:11:08 +08:00
. detail ( " LastPoppedSeq " , lastPoppedSeq )
. detail ( " PoppedSeq " , poppedSeq )
. detail ( " NextPageSeq " , nextPageSeq )
. detail ( " PoppedCommitted " , rawQueue - > dbg_file0BeginSeq + rawQueue - > files [ 0 ] . popped + rawQueue - > files [ 1 ] . popped )
. detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2019-02-08 09:02:19 +08:00
close ( this ) ;
}
ACTOR static void close ( DiskQueue * self ) {
wait ( self - > onSafeToDestruct ( ) ) ;
TraceEvent ( " DQCloseDone " , self - > dbgid ) . detail ( " File0Name " , self - > rawQueue - > files [ 0 ] . dbgFilename ) ;
self - > rawQueue - > close ( ) ;
delete self ;
2017-05-26 04:48:44 +08:00
}
virtual StorageBytes getStorageBytes ( ) {
return rawQueue - > getStorageBytes ( ) ;
}
private :
# pragma pack(push, 1)
struct PageHeader {
2019-03-16 12:01:16 +08:00
union {
UID hash ;
struct {
uint32_t hash32 ;
uint32_t _unused ;
uint16_t magic ;
uint16_t implementationVersion ;
} ;
} ;
2017-05-26 04:48:44 +08:00
uint64_t seq ;
uint64_t popped ;
int payloadSize ;
} ;
2018-12-18 05:37:44 +08:00
// The on disk format depends on the size of PageHeader.
2019-03-16 12:01:16 +08:00
static_assert ( sizeof ( PageHeader ) = = 36 , " PageHeader must be 36 bytes " ) ;
2017-05-26 04:48:44 +08:00
struct Page : PageHeader {
static const int maxPayload = _PAGE_SIZE - sizeof ( PageHeader ) ;
uint8_t payload [ maxPayload ] ;
2019-03-16 12:01:16 +08:00
DiskQueueVersion diskQueueVersion ( ) const { return static_cast < DiskQueueVersion > ( implementationVersion ) ; }
2017-05-26 04:48:44 +08:00
int remainingCapacity ( ) const { return maxPayload - payloadSize ; }
uint64_t endSeq ( ) const { return seq + sizeof ( PageHeader ) + payloadSize ; }
2019-03-16 12:01:16 +08:00
UID checksum_hashlittle2 ( ) const {
2017-05-26 04:48:44 +08:00
// SOMEDAY: Better hash?
uint32_t part [ 2 ] = { 0x12345678 , 0xbeefabcd } ;
2019-03-16 12:01:16 +08:00
hashlittle2 ( & seq , sizeof ( Page ) - sizeof ( UID ) , & part [ 0 ] , & part [ 1 ] ) ;
return UID ( int64_t ( part [ 0 ] ) < < 32 | part [ 1 ] , 0xFDB ) ;
}
uint32_t checksum_crc32c ( ) const {
return crc32c_append ( 0xfdbeefdb , ( uint8_t * ) & _unused , sizeof ( Page ) - sizeof ( uint32_t ) ) ;
}
void updateHash ( ) {
switch ( diskQueueVersion ( ) ) {
case DiskQueueVersion : : V0 : {
hash = checksum_hashlittle2 ( ) ;
return ;
}
case DiskQueueVersion : : V1 :
default : {
hash32 = checksum_crc32c ( ) ;
return ;
}
}
2017-05-26 04:48:44 +08:00
}
bool checkHash ( ) {
2019-03-16 12:01:16 +08:00
switch ( diskQueueVersion ( ) ) {
case DiskQueueVersion : : V0 : {
return hash = = checksum_hashlittle2 ( ) ;
}
case DiskQueueVersion : : V1 : {
return hash32 = = checksum_crc32c ( ) ;
}
default :
return false ;
}
2017-05-26 04:48:44 +08:00
}
void zeroPad ( ) {
memset ( payload + payloadSize , 0 , maxPayload - payloadSize ) ;
}
} ;
2019-03-16 12:01:16 +08:00
static_assert ( sizeof ( Page ) = = _PAGE_SIZE , " Page must be 4k " ) ;
2017-05-26 04:48:44 +08:00
# pragma pack(pop)
loc_t endLocation ( ) const { return pushedPageCount ( ) ? backPage ( ) . endSeq ( ) : nextPageSeq ; }
void addEmptyPage ( ) {
if ( pushedPageCount ( ) ) {
backPage ( ) . updateHash ( ) ;
ASSERT ( backPage ( ) . payloadSize = = Page : : maxPayload ) ;
}
//pushed_pages.resize( pushed_pages.arena(), pushed_pages.size()+1 );
if ( ! pushed_page_buffer ) pushed_page_buffer = new StringBuffer ( dbgid ) ;
pushed_page_buffer - > alignReserve ( sizeof ( Page ) , pushed_page_buffer - > size ( ) + sizeof ( Page ) ) ;
pushed_page_buffer - > append ( sizeof ( Page ) ) ;
ASSERT ( nextPageSeq % sizeof ( Page ) = = 0 ) ;
auto & p = backPage ( ) ;
memset ( & p , 0 , sizeof ( Page ) ) ; // FIXME: unnecessary?
2019-03-16 12:01:16 +08:00
p . magic = 0xFDB ;
switch ( diskQueueVersion ) {
case DiskQueueVersion : : V0 :
p . implementationVersion = 0 ;
break ;
case DiskQueueVersion : : V1 :
p . implementationVersion = 1 ;
break ;
}
2017-05-26 04:48:44 +08:00
p . payloadSize = 0 ;
p . seq = nextPageSeq ;
nextPageSeq + = sizeof ( Page ) ;
p . popped = poppedSeq ;
if ( pushedPageCount ( ) = = 8000 ) {
TraceEvent ( " DiskQueueHighPageCount " , dbgid )
2018-06-09 02:11:08 +08:00
. detail ( " PushedPages " , pushedPageCount ( ) )
. detail ( " NextPageSeq " , nextPageSeq )
. detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
}
}
2019-02-08 09:02:22 +08:00
ACTOR static void verifyCommit ( DiskQueue * self , Future < Void > commitSynced , StringBuffer * buffer , loc_t start , loc_t end ) {
state TrackMe trackme ( self ) ;
2019-02-08 09:02:23 +08:00
try {
wait ( commitSynced ) ;
Standalone < StringRef > pagedData = wait ( readPages ( self , start , end ) ) ;
const int startOffset = start % _PAGE_SIZE ;
const int dataLen = end - start ;
ASSERT ( pagedData . substr ( startOffset , dataLen ) . compare ( buffer - > ref ( ) . substr ( 0 , dataLen ) ) = = 0 ) ;
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_io_error ) {
delete buffer ;
throw ;
}
}
2019-02-08 09:02:22 +08:00
delete buffer ;
}
2019-02-08 09:02:20 +08:00
ACTOR static Future < Standalone < StringRef > > readPages ( DiskQueue * self , location start , location end ) {
state TrackMe trackme ( self ) ;
state int fromFile ;
state int toFile ;
state int64_t fromPage ;
state int64_t toPage ;
2019-03-16 12:01:14 +08:00
state uint64_t file0size = self - > rawQueue - > files [ 0 ] . size ? self - > firstPages ( 1 ) . seq - self - > firstPages ( 0 ) . seq : self - > firstPages ( 1 ) . seq ;
2019-02-08 09:02:20 +08:00
ASSERT ( end > start ) ;
2019-03-16 12:01:14 +08:00
ASSERT ( start . lo > = self - > firstPages ( 0 ) . seq | | start . lo > = self - > firstPages ( 1 ) . seq ) ;
2019-02-08 09:02:21 +08:00
self - > findPhysicalLocation ( start . lo , & fromFile , & fromPage , nullptr ) ;
self - > findPhysicalLocation ( end . lo - 1 , & toFile , & toPage , nullptr ) ;
2019-02-08 09:02:20 +08:00
if ( fromFile = = 0 ) { ASSERT ( fromPage < file0size / _PAGE_SIZE ) ; }
if ( toFile = = 0 ) { ASSERT ( toPage < file0size / _PAGE_SIZE ) ; }
2019-02-08 09:02:26 +08:00
// FIXME I think there's something with nextReadLocation we can do here when initialized && !recovered.
if ( fromFile = = 1 & & self - > recovered ) { ASSERT ( fromPage < self - > rawQueue - > writingPos / _PAGE_SIZE ) ; }
if ( toFile = = 1 & & self - > recovered ) { ASSERT ( toPage < self - > rawQueue - > writingPos / _PAGE_SIZE ) ; }
2019-02-08 09:02:20 +08:00
if ( fromFile = = toFile ) {
ASSERT ( toPage > = fromPage ) ;
Standalone < StringRef > pagedData = wait ( self - > rawQueue - > read ( fromFile , fromPage , toPage - fromPage + 1 ) ) ;
2019-03-16 12:01:14 +08:00
if ( std : : min ( self - > firstPages ( 0 ) . seq , self - > firstPages ( 1 ) . seq ) > start . lo ) {
2019-02-08 09:02:23 +08:00
// Simulation allows for reads to be delayed and executed after overlapping subsequent
// write operations. This means that by the time our read was executed, it's possible
// that both disk queue files have been completely overwritten.
// I'm not clear what is the actual contract for read/write in this case, so simulation
// might be a bit overly aggressive here, but it's behavior we need to tolerate.
throw io_error ( ) ;
}
2019-03-04 09:21:54 +08:00
ASSERT ( ( ( Page * ) pagedData . begin ( ) ) - > seq = = pageFloor ( start . lo ) ) ;
2019-02-08 09:02:20 +08:00
ASSERT ( pagedData . size ( ) = = ( toPage - fromPage + 1 ) * _PAGE_SIZE ) ;
2019-02-08 09:02:23 +08:00
2019-03-04 09:21:54 +08:00
ASSERT ( ( ( Page * ) pagedData . end ( ) - 1 ) - > seq = = pageFloor ( end . lo - 1 ) ) ;
2019-02-08 09:02:20 +08:00
return pagedData ;
} else {
ASSERT ( fromFile = = 0 ) ;
state Standalone < StringRef > firstChunk ;
state Standalone < StringRef > secondChunk ;
wait ( store ( firstChunk , self - > rawQueue - > read ( fromFile , fromPage , ( file0size / sizeof ( Page ) ) - fromPage ) ) & &
store ( secondChunk , self - > rawQueue - > read ( toFile , 0 , toPage + 1 ) ) ) ;
2019-03-16 12:01:14 +08:00
if ( std : : min ( self - > firstPages ( 0 ) . seq , self - > firstPages ( 1 ) . seq ) > start . lo ) {
2019-02-08 09:02:23 +08:00
// See above.
throw io_error ( ) ;
}
2019-02-08 09:02:20 +08:00
ASSERT ( firstChunk . size ( ) = = ( ( file0size / sizeof ( Page ) ) - fromPage ) * _PAGE_SIZE ) ;
2019-03-04 09:21:54 +08:00
ASSERT ( ( ( Page * ) firstChunk . begin ( ) ) - > seq = = pageFloor ( start . lo ) ) ;
2019-02-08 09:02:20 +08:00
ASSERT ( secondChunk . size ( ) = = ( toPage + 1 ) * _PAGE_SIZE ) ;
2019-03-04 09:21:54 +08:00
ASSERT ( ( ( Page * ) secondChunk . end ( ) - 1 ) - > seq = = pageFloor ( end . lo - 1 ) ) ;
2019-02-08 09:02:20 +08:00
return firstChunk . withSuffix ( secondChunk ) ;
}
}
2019-03-16 12:01:18 +08:00
ACTOR static Future < Standalone < StringRef > > read ( DiskQueue * self , location start , location end , CheckHashes ch ) {
2019-02-08 09:02:24 +08:00
// This `state` is unnecessary, but works around pagedData wrongly becoming const
// due to the actor compiler.
state Standalone < StringRef > pagedData = wait ( readPages ( self , start , end ) ) ;
ASSERT ( start . lo % sizeof ( Page ) = = 0 | |
start . lo % sizeof ( Page ) > = sizeof ( PageHeader ) ) ;
int startingOffset = start . lo % sizeof ( Page ) ;
if ( startingOffset > 0 ) startingOffset - = sizeof ( PageHeader ) ;
ASSERT ( end . lo % sizeof ( Page ) = = 0 | |
end . lo % sizeof ( Page ) > sizeof ( PageHeader ) ) ;
int endingOffset = end . lo % sizeof ( Page ) ;
if ( endingOffset = = 0 ) endingOffset = sizeof ( Page ) ;
if ( endingOffset > 0 ) endingOffset - = sizeof ( PageHeader ) ;
2019-03-04 09:21:54 +08:00
if ( pageFloor ( end . lo - 1 ) = = pageFloor ( start . lo ) ) {
2019-02-08 09:02:24 +08:00
// start and end are on the same page
ASSERT ( pagedData . size ( ) = = sizeof ( Page ) ) ;
2019-03-16 12:01:18 +08:00
Page * data = reinterpret_cast < Page * > ( const_cast < uint8_t * > ( pagedData . begin ( ) ) ) ;
if ( ch = = CheckHashes : : YES & & ! data - > checkHash ( ) ) throw io_error ( ) ;
if ( ch = = CheckHashes : : NO & & data - > payloadSize > Page : : maxPayload ) throw io_error ( ) ;
2019-02-08 09:02:24 +08:00
pagedData . contents ( ) = pagedData . substr ( sizeof ( PageHeader ) + startingOffset , endingOffset - startingOffset ) ;
return pagedData ;
} else {
2019-03-04 04:57:42 +08:00
// Reusing pagedData wastes # of pages * sizeof(PageHeader) bytes, but means
// we don't have to double allocate in a hot, memory hungry call.
uint8_t * buf = mutateString ( pagedData ) ;
2019-03-16 12:01:18 +08:00
Page * data = reinterpret_cast < Page * > ( const_cast < uint8_t * > ( pagedData . begin ( ) ) ) ;
if ( ch = = CheckHashes : : YES & & ! data - > checkHash ( ) ) throw io_error ( ) ;
if ( ch = = CheckHashes : : NO & & data - > payloadSize > Page : : maxPayload ) throw io_error ( ) ;
2019-02-08 09:02:24 +08:00
// Only start copying from `start` in the first page.
if ( data - > payloadSize > startingOffset ) {
2019-03-04 04:57:42 +08:00
const int length = data - > payloadSize - startingOffset ;
memmove ( buf , data - > payload + startingOffset , length ) ;
buf + = length ;
2019-02-08 09:02:24 +08:00
}
data + + ;
2019-03-16 12:01:18 +08:00
if ( ch = = CheckHashes : : YES & & ! data - > checkHash ( ) ) throw io_error ( ) ;
if ( ch = = CheckHashes : : NO & & data - > payloadSize > Page : : maxPayload ) throw io_error ( ) ;
2019-02-08 09:02:24 +08:00
// Copy all the middle pages
2019-03-04 09:21:54 +08:00
while ( data - > seq ! = pageFloor ( end . lo - 1 ) ) {
2019-02-08 09:02:24 +08:00
// These pages can have varying amounts of data, as pages with partial
// data will be zero-filled when commit is called.
2019-03-04 04:57:42 +08:00
const int length = data - > payloadSize ;
memmove ( buf , data - > payload , length ) ;
buf + = length ;
2019-02-08 09:02:24 +08:00
data + + ;
2019-03-16 12:01:18 +08:00
if ( ch = = CheckHashes : : YES & & ! data - > checkHash ( ) ) throw io_error ( ) ;
if ( ch = = CheckHashes : : NO & & data - > payloadSize > Page : : maxPayload ) throw io_error ( ) ;
2019-02-08 09:02:24 +08:00
}
// Copy only until `end` in the last page.
2019-03-04 04:57:42 +08:00
const int length = data - > payloadSize ;
memmove ( buf , data - > payload , std : : min ( endingOffset , length ) ) ;
buf + = std : : min ( endingOffset , length ) ;
2019-02-08 09:02:24 +08:00
2019-03-04 04:57:42 +08:00
memset ( buf , 0 , pagedData . size ( ) - ( buf - pagedData . begin ( ) ) ) ;
Standalone < StringRef > unpagedData = pagedData . substr ( 0 , buf - pagedData . begin ( ) ) ;
2019-02-08 09:02:24 +08:00
return unpagedData ;
}
}
2018-08-17 06:22:27 +08:00
void readFromBuffer ( StringBuffer * result , int * bytes ) {
2017-05-26 04:48:44 +08:00
// extract up to bytes from readBufPage into result
2018-08-17 06:22:27 +08:00
int len = std : : min ( readBufPage - > payloadSize - readBufPos , * bytes ) ;
2017-05-26 04:48:44 +08:00
if ( len < = 0 ) return ;
2018-08-17 06:22:27 +08:00
result - > append ( StringRef ( readBufPage - > payload + readBufPos , len ) ) ;
2017-05-26 04:48:44 +08:00
readBufPos + = len ;
2018-08-17 06:22:27 +08:00
* bytes - = len ;
2017-05-26 04:48:44 +08:00
nextReadLocation + = len ;
}
ACTOR static Future < Standalone < StringRef > > readNext ( DiskQueue * self , int bytes ) {
state StringBuffer result ( self - > dbgid ) ;
ASSERT ( bytes > = 0 ) ;
result . clearReserve ( bytes ) ;
ASSERT ( ! self - > recovered ) ;
2019-02-08 09:02:25 +08:00
if ( ! self - > initialized ) {
2019-03-16 12:01:22 +08:00
bool recoveryComplete = wait ( initializeRecovery ( self , 0 ) ) ;
2017-05-26 04:48:44 +08:00
2019-02-08 09:02:25 +08:00
if ( recoveryComplete ) {
ASSERT ( self - > poppedSeq < = self - > endLocation ( ) ) ;
2017-05-26 04:48:44 +08:00
return Standalone < StringRef > ( ) ;
}
}
loop {
if ( self - > readBufPage ) {
2018-08-17 06:22:27 +08:00
self - > readFromBuffer ( & result , & bytes ) ;
2017-05-26 04:48:44 +08:00
// if done, return
if ( ! bytes ) return result . str ;
ASSERT ( self - > readBufPos = = self - > readBufPage - > payloadSize ) ;
self - > readBufPage = 0 ;
self - > nextReadLocation + = sizeof ( Page ) - self - > readBufPos ;
self - > readBufPos = 0 ;
}
Standalone < StringRef > page = wait ( self - > rawQueue - > readNextPage ( ) ) ;
if ( ! page . size ( ) ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( " DQRecEOF " , self - > dbgid ) . detail ( " NextReadLocation " , self - > nextReadLocation ) . detail ( " File0Name " , self - > rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
break ;
}
ASSERT ( page . size ( ) = = sizeof ( Page ) ) ;
self - > readBufArena = page . arena ( ) ;
self - > readBufPage = ( Page * ) page . begin ( ) ;
2019-03-04 09:21:54 +08:00
if ( ! self - > readBufPage - > checkHash ( ) | | self - > readBufPage - > seq < pageFloor ( self - > nextReadLocation ) ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( " DQRecInvalidPage " , self - > dbgid ) . detail ( " NextReadLocation " , self - > nextReadLocation ) . detail ( " HashCheck " , self - > readBufPage - > checkHash ( ) )
2019-03-04 09:21:54 +08:00
. detail ( " Seq " , self - > readBufPage - > seq ) . detail ( " Expect " , pageFloor ( self - > nextReadLocation ) ) . detail ( " File0Name " , self - > rawQueue - > files [ 0 ] . dbgFilename ) ;
2018-08-11 04:57:10 +08:00
wait ( self - > rawQueue - > truncateBeforeLastReadPage ( ) ) ;
2017-05-26 04:48:44 +08:00
break ;
}
2018-06-09 02:11:08 +08:00
//TraceEvent("DQRecPage", self->dbgid).detail("NextReadLoc", self->nextReadLocation).detail("Seq", self->readBufPage->seq).detail("Pop", self->readBufPage->popped).detail("Payload", self->readBufPage->payloadSize).detail("File0Name", self->rawQueue->files[0].dbgFilename);
2019-03-04 09:21:54 +08:00
ASSERT ( self - > readBufPage - > seq = = pageFloor ( self - > nextReadLocation ) ) ;
2017-05-26 04:48:44 +08:00
self - > lastPoppedSeq = self - > readBufPage - > popped ;
}
// Recovery complete.
// The fully durable popped point is self->lastPoppedSeq; tell the raw queue that.
int f ; int64_t p ;
TEST ( self - > lastPoppedSeq / sizeof ( Page ) ! = self - > poppedSeq / sizeof ( Page ) ) ; // DiskQueue: Recovery popped position not fully durable
2018-08-17 06:22:27 +08:00
self - > findPhysicalLocation ( self - > lastPoppedSeq , & f , & p , " lastPoppedSeq " ) ;
2019-03-04 09:21:54 +08:00
wait ( self - > rawQueue - > setPoppedPage ( f , p , pageFloor ( self - > lastPoppedSeq ) ) ) ;
2017-05-26 04:48:44 +08:00
// Writes go at the end of our reads (but on the next page)
2019-03-04 09:21:54 +08:00
self - > nextPageSeq = pageFloor ( self - > nextReadLocation ) ;
2018-12-18 05:37:44 +08:00
if ( self - > nextReadLocation % sizeof ( Page ) > sizeof ( PageHeader ) ) self - > nextPageSeq + = sizeof ( Page ) ;
2017-05-26 04:48:44 +08:00
2018-06-09 02:11:08 +08:00
TraceEvent ( " DQRecovered " , self - > dbgid ) . detail ( " LastPoppedSeq " , self - > lastPoppedSeq ) . detail ( " PoppedSeq " , self - > poppedSeq ) . detail ( " NextPageSeq " , self - > nextPageSeq ) . detail ( " File0Name " , self - > rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
self - > recovered = true ;
ASSERT ( self - > poppedSeq < = self - > endLocation ( ) ) ;
TEST ( result . size ( ) = = 0 ) ; // End of queue at border between reads
TEST ( result . size ( ) ! = 0 ) ; // Partial read at end of queue
//The next read location isn't necessarily the end of the last commit, but this is sufficient for helping us check an ASSERTion
self - > lastCommittedSeq = self - > nextReadLocation ;
return result . str ;
}
2019-03-16 12:01:22 +08:00
ACTOR static Future < bool > initializeRecovery ( DiskQueue * self , location recoverAt ) {
2019-02-08 09:02:25 +08:00
if ( self - > initialized ) {
return self - > recovered ;
}
2019-02-08 09:02:17 +08:00
Standalone < StringRef > lastPageData = wait ( self - > rawQueue - > readFirstAndLastPages ( & comparePages ) ) ;
2019-02-08 09:02:25 +08:00
self - > initialized = true ;
2017-05-26 04:48:44 +08:00
2019-02-08 09:02:17 +08:00
if ( ! lastPageData . size ( ) ) {
2017-05-26 04:48:44 +08:00
// There are no valid pages, so apparently this is a completely empty queue
self - > nextReadLocation = 0 ;
2019-02-08 09:02:25 +08:00
self - > lastCommittedSeq = 0 ;
self - > recovered = true ;
return true ;
2017-05-26 04:48:44 +08:00
}
2019-02-08 09:02:17 +08:00
Page * lastPage = ( Page * ) lastPageData . begin ( ) ;
2019-03-16 12:01:16 +08:00
self - > poppedSeq = lastPage - > popped ;
2019-03-16 12:01:22 +08:00
if ( self - > diskQueueVersion > = DiskQueueVersion : : V1 ) {
self - > nextReadLocation = std : : max ( recoverAt . lo , self - > poppedSeq ) ;
} else {
self - > nextReadLocation = lastPage - > popped ;
}
2017-05-26 04:48:44 +08:00
/*
state std : : auto_ptr < Page > testPage ( new Page ) ;
state int fileNum ;
for ( fileNum = 0 ; fileNum < 2 ; fileNum + + ) {
state int sizeNum ;
for ( sizeNum = 0 ; sizeNum < self - > rawQueue - > files [ fileNum ] . size ; sizeNum + = sizeof ( Page ) ) {
2019-02-13 08:07:17 +08:00
wait ( success ( self - > rawQueue - > files [ fileNum ] . f - > read ( testPage . get ( ) , sizeof ( Page ) , sizeNum ) ) ) ;
2018-06-09 02:11:08 +08:00
TraceEvent ( " PageData " ) . detail ( " File " , self - > rawQueue - > files [ fileNum ] . dbgFilename ) . detail ( " SizeNum " , sizeNum ) . detail ( " Seq " , testPage - > seq ) . detail ( " Hash " , testPage - > checkHash ( ) ) . detail ( " Popped " , testPage - > popped ) ;
2017-05-26 04:48:44 +08:00
}
}
*/
int file ; int64_t page ;
2019-03-16 12:01:16 +08:00
self - > findPhysicalLocation ( self - > nextReadLocation , & file , & page , " FirstReadLocation " ) ;
2017-05-26 04:48:44 +08:00
self - > rawQueue - > setStartPage ( file , page ) ;
2019-02-08 09:02:25 +08:00
self - > readBufPos = self - > nextReadLocation % sizeof ( Page ) - sizeof ( PageHeader ) ;
if ( self - > readBufPos < 0 ) { self - > nextReadLocation - = self - > readBufPos ; self - > readBufPos = 0 ; }
2019-03-16 12:01:22 +08:00
TraceEvent ( " DQRecStart " , self - > dbgid ) . detail ( " ReadBufPos " , self - > readBufPos ) . detail ( " NextReadLoc " , self - > nextReadLocation ) . detail ( " Popped " , self - > poppedSeq ) . detail ( " MinRecoverAt " , recoverAt ) . detail ( " File0Name " , self - > rawQueue - > files [ 0 ] . dbgFilename ) ;
2019-02-08 09:02:25 +08:00
return false ;
2017-05-26 04:48:44 +08:00
}
2019-02-08 09:02:20 +08:00
Page & firstPages ( int i ) {
2019-02-08 09:02:25 +08:00
ASSERT ( initialized ) ;
2019-02-08 09:02:20 +08:00
return * ( Page * ) rawQueue - > firstPages [ i ] ;
}
2018-08-17 06:22:27 +08:00
void findPhysicalLocation ( loc_t loc , int * file , int64_t * page , const char * context ) {
2017-05-26 04:48:44 +08:00
bool ok = false ;
2019-02-08 09:02:21 +08:00
if ( context )
TraceEvent ( SevInfo , " FindPhysicalLocation " , dbgid )
2019-02-08 09:02:17 +08:00
. detail ( " Page0Valid " , firstPages ( 0 ) . checkHash ( ) )
. detail ( " Page0Seq " , firstPages ( 0 ) . seq )
. detail ( " Page1Valid " , firstPages ( 1 ) . checkHash ( ) )
. detail ( " Page1Seq " , firstPages ( 1 ) . seq )
2017-05-26 04:48:44 +08:00
. detail ( " Location " , loc )
. detail ( " Context " , context )
2018-06-09 02:11:08 +08:00
. detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
2019-02-20 14:03:41 +08:00
for ( int i = 1 ; i > = 0 ; i - - ) {
ASSERT_WE_THINK ( firstPages ( i ) . checkHash ( ) ) ;
2019-02-08 09:02:43 +08:00
if ( firstPages ( i ) . seq < = ( size_t ) loc ) {
2018-08-17 06:22:27 +08:00
* file = i ;
2019-02-08 09:02:17 +08:00
* page = ( loc - firstPages ( i ) . seq ) / sizeof ( Page ) ;
2019-02-08 09:02:21 +08:00
if ( context )
TraceEvent ( " FoundPhysicalLocation " , dbgid )
. detail ( " PageIndex " , i )
. detail ( " PageLocation " , * page )
. detail ( " SizeofPage " , sizeof ( Page ) )
. detail ( " PageSequence " , firstPages ( i ) . seq )
. detail ( " Location " , loc )
. detail ( " Context " , context )
. detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
ok = true ;
break ;
}
2019-02-20 14:03:41 +08:00
}
2017-05-26 04:48:44 +08:00
if ( ! ok )
TraceEvent ( SevError , " DiskQueueLocationError " , dbgid )
2019-02-08 09:02:17 +08:00
. detail ( " Page0Valid " , firstPages ( 0 ) . checkHash ( ) )
. detail ( " Page0Seq " , firstPages ( 0 ) . seq )
. detail ( " Page1Valid " , firstPages ( 1 ) . checkHash ( ) )
. detail ( " Page1Seq " , firstPages ( 1 ) . seq )
2017-05-26 04:48:44 +08:00
. detail ( " Location " , loc )
2019-02-08 09:02:21 +08:00
. detail ( " Context " , context ? context : " " )
2018-06-09 02:11:08 +08:00
. detail ( " File0Name " , rawQueue - > files [ 0 ] . dbgFilename ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( ok ) ;
}
// isValid(firstPage) == compare(firstPage, firstPage)
// isValid(otherPage) == compare(firstPage, otherPage)
// Swap file1, file2 if comparePages( file2.firstPage, file1.firstPage )
static bool comparePages ( void * v1 , void * v2 ) {
Page * p1 = ( Page * ) v1 ; Page * p2 = ( Page * ) v2 ;
return p2 - > checkHash ( ) & & ( p2 - > seq > = p1 - > seq | | ! p1 - > checkHash ( ) ) ;
}
RawDiskQueue_TwoFiles * rawQueue ;
UID dbgid ;
2019-03-16 12:01:16 +08:00
DiskQueueVersion diskQueueVersion ;
2017-05-26 04:48:44 +08:00
2018-07-28 16:19:40 +08:00
bool anyPopped ; // pop() has been called since the most recent call to commit()
2017-05-26 04:48:44 +08:00
bool warnAlwaysForMemory ;
loc_t nextPageSeq , poppedSeq ;
loc_t lastPoppedSeq ; // poppedSeq the last time commit was called
loc_t lastCommittedSeq ;
// Buffer of pushed pages that haven't been committed. The last one (backPage()) is still mutable.
StringBuffer * pushed_page_buffer ;
Page & backPage ( ) {
ASSERT ( pushedPageCount ( ) ) ;
return ( ( Page * ) pushed_page_buffer - > ref ( ) . end ( ) ) [ - 1 ] ;
}
Page const & backPage ( ) const { return ( ( Page * ) pushed_page_buffer - > ref ( ) . end ( ) ) [ - 1 ] ; }
int pushedPageCount ( ) const { return pushed_page_buffer ? pushed_page_buffer - > size ( ) / sizeof ( Page ) : 0 ; }
// Recovery state
bool recovered ;
2019-02-08 09:02:25 +08:00
bool initialized ;
2017-05-26 04:48:44 +08:00
loc_t nextReadLocation ;
Arena readBufArena ;
Page * readBufPage ;
int readBufPos ;
} ;
//A class wrapping DiskQueue which durably allows uncommitted data to be popped
//This works by performing two commits when uncommitted data is popped:
// Commit 1 - pop only previously committed data and push new data
// Commit 2 - finish pop into uncommitted data
class DiskQueue_PopUncommitted : public IDiskQueue {
public :
2019-03-16 12:01:16 +08:00
DiskQueue_PopUncommitted ( std : : string basename , std : : string fileExtension , UID dbgid , DiskQueueVersion diskQueueVersion , int64_t fileSizeWarningLimit ) : queue ( new DiskQueue ( basename , fileExtension , dbgid , diskQueueVersion , fileSizeWarningLimit ) ) , pushed ( 0 ) , popped ( 0 ) , committed ( 0 ) { } ;
2017-05-26 04:48:44 +08:00
//IClosable
Future < Void > getError ( ) { return queue - > getError ( ) ; }
Future < Void > onClosed ( ) { return queue - > onClosed ( ) ; }
void dispose ( ) { queue - > dispose ( ) ; delete this ; }
void close ( ) { queue - > close ( ) ; delete this ; }
//IDiskQueue
2019-03-16 12:01:22 +08:00
Future < bool > initializeRecovery ( location recoverAt ) { return queue - > initializeRecovery ( recoverAt ) ; }
2017-05-26 04:48:44 +08:00
Future < Standalone < StringRef > > readNext ( int bytes ) { return readNext ( this , bytes ) ; }
virtual location getNextReadLocation ( ) { return queue - > getNextReadLocation ( ) ; }
2019-03-16 12:01:18 +08:00
virtual Future < Standalone < StringRef > > read ( location start , location end , CheckHashes ch ) { return queue - > read ( start , end , ch ) ; }
2019-02-08 09:02:30 +08:00
virtual location getNextCommitLocation ( ) { return queue - > getNextCommitLocation ( ) ; }
virtual location getNextPushLocation ( ) { return queue - > getNextPushLocation ( ) ; }
2019-02-08 09:02:27 +08:00
2017-05-26 04:48:44 +08:00
virtual location push ( StringRef contents ) {
pushed = queue - > push ( contents ) ;
return pushed ;
}
virtual void pop ( location upTo ) {
popped = std : : max ( popped , upTo ) ;
ASSERT_WE_THINK ( committed > = popped ) ;
queue - > pop ( std : : min ( committed , popped ) ) ;
}
virtual int getCommitOverhead ( ) {
return queue - > getCommitOverhead ( ) + ( popped > committed ? queue - > getMaxPayload ( ) : 0 ) ;
}
Future < Void > commit ( ) {
location pushLocation = pushed ;
location popLocation = popped ;
Future < Void > commitFuture = queue - > commit ( ) ;
bool updatePop = popLocation > committed ;
committed = pushLocation ;
if ( updatePop ) {
ASSERT_WE_THINK ( false ) ;
ASSERT ( popLocation < = committed ) ;
queue - > stall ( ) ; // Don't permit this pipelined commit to write anything to disk until the previous commit is totally finished
pop ( popLocation ) ;
commitFuture = commitFuture & & queue - > commit ( ) ;
}
else
TEST ( true ) ; //No uncommitted data was popped
return commitFuture ;
}
virtual StorageBytes getStorageBytes ( ) { return queue - > getStorageBytes ( ) ; }
private :
DiskQueue * queue ;
location pushed ;
location popped ;
location committed ;
ACTOR static Future < Standalone < StringRef > > readNext ( DiskQueue_PopUncommitted * self , int bytes ) {
Standalone < StringRef > str = wait ( self - > queue - > readNext ( bytes ) ) ;
if ( str . size ( ) < bytes )
self - > pushed = self - > getNextReadLocation ( ) ;
return str ;
}
} ;
2019-03-16 12:01:16 +08:00
IDiskQueue * openDiskQueue ( std : : string basename , std : : string ext , UID dbgid , DiskQueueVersion dqv , int64_t fileSizeWarningLimit ) {
return new DiskQueue_PopUncommitted ( basename , ext , dbgid , dqv , fileSizeWarningLimit ) ;
2017-05-26 04:48:44 +08:00
}