2017-05-26 04:48:44 +08:00
/*
* AsyncFileNonDurable . actor . h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version.
# if defined(NO_INTELLISENSE) && !defined(FLOW_ASYNCFILENONDURABLE_ACTOR_G_H)
# define FLOW_ASYNCFILENONDURABLE_ACTOR_G_H
2018-10-20 01:30:13 +08:00
# include "fdbrpc/AsyncFileNonDurable.actor.g.h"
2017-05-26 04:48:44 +08:00
# elif !defined(FLOW_ASYNCFILENONDURABLE_ACTOR_H)
# define FLOW_ASYNCFILENONDURABLE_ACTOR_H
# include "flow/flow.h"
2018-10-20 01:30:13 +08:00
# include "fdbrpc/IAsyncFile.h"
2017-05-26 04:48:44 +08:00
# include "flow/ActorCollection.h"
2018-10-20 01:30:13 +08:00
# include "fdbrpc/simulator.h"
# include "fdbrpc/TraceFileIO.h"
# include "fdbrpc/RangeMap.h"
2018-08-11 06:47:41 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
# undef max
# undef min
2019-06-25 17:47:35 +08:00
ACTOR Future < Void > sendOnProcess ( ISimulator : : ProcessInfo * process , Promise < Void > promise , TaskPriority taskID ) ;
ACTOR Future < Void > sendErrorOnProcess ( ISimulator : : ProcessInfo * process , Promise < Void > promise , Error e , TaskPriority taskID ) ;
2017-05-26 04:48:44 +08:00
ACTOR template < class T >
Future < T > sendErrorOnShutdown ( Future < T > in ) {
choose {
2019-02-13 08:07:17 +08:00
when ( wait ( success ( g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) ) ) ) {
2017-05-26 04:48:44 +08:00
throw io_error ( ) . asInjectedFault ( ) ;
}
when ( T rep = wait ( in ) ) {
return rep ;
}
}
}
class AsyncFileDetachable sealed : public IAsyncFile , public ReferenceCounted < AsyncFileDetachable > {
private :
Reference < IAsyncFile > file ;
Future < Void > shutdown ;
public :
explicit AsyncFileDetachable ( Reference < IAsyncFile > file ) : file ( file ) {
shutdown = doShutdown ( this ) ;
}
ACTOR Future < Void > doShutdown ( AsyncFileDetachable * self ) {
2019-02-13 08:07:17 +08:00
wait ( success ( g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) ) ) ;
2017-05-26 04:48:44 +08:00
self - > file = Reference < IAsyncFile > ( ) ;
return Void ( ) ;
}
ACTOR static Future < Reference < IAsyncFile > > open ( Future < Reference < IAsyncFile > > wrappedFile ) {
choose {
2019-02-13 08:07:17 +08:00
when ( wait ( success ( g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) ) ) ) {
2017-05-26 04:48:44 +08:00
throw io_error ( ) . asInjectedFault ( ) ;
}
when ( Reference < IAsyncFile > f = wait ( wrappedFile ) ) {
return Reference < AsyncFileDetachable > ( new AsyncFileDetachable ( f ) ) ;
}
}
}
virtual void addref ( ) {
ReferenceCounted < AsyncFileDetachable > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < AsyncFileDetachable > : : delref ( ) ;
}
Future < int > read ( void * data , int length , int64_t offset ) {
if ( ! file . getPtr ( ) | | g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) . isReady ( ) )
return io_error ( ) . asInjectedFault ( ) ;
return sendErrorOnShutdown ( file - > read ( data , length , offset ) ) ;
}
Future < Void > write ( void const * data , int length , int64_t offset ) {
if ( ! file . getPtr ( ) | | g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) . isReady ( ) )
return io_error ( ) . asInjectedFault ( ) ;
return sendErrorOnShutdown ( file - > write ( data , length , offset ) ) ;
}
Future < Void > truncate ( int64_t size ) {
if ( ! file . getPtr ( ) | | g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) . isReady ( ) )
return io_error ( ) . asInjectedFault ( ) ;
return sendErrorOnShutdown ( file - > truncate ( size ) ) ;
}
Future < Void > sync ( ) {
if ( ! file . getPtr ( ) | | g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) . isReady ( ) )
return io_error ( ) . asInjectedFault ( ) ;
return sendErrorOnShutdown ( file - > sync ( ) ) ;
}
Future < int64_t > size ( ) {
if ( ! file . getPtr ( ) | | g_simulator . getCurrentProcess ( ) - > shutdownSignal . getFuture ( ) . isReady ( ) )
return io_error ( ) . asInjectedFault ( ) ;
return sendErrorOnShutdown ( file - > size ( ) ) ;
}
int64_t debugFD ( ) {
if ( ! file . getPtr ( ) )
throw io_error ( ) . asInjectedFault ( ) ;
return file - > debugFD ( ) ;
}
std : : string getFilename ( ) {
if ( ! file . getPtr ( ) )
throw io_error ( ) . asInjectedFault ( ) ;
return file - > getFilename ( ) ;
}
} ;
//An async file implementation which wraps another async file and will randomly destroy sectors that it is writing when killed
//This is used to simulate a power failure which prevents all written data from being persisted to disk
class AsyncFileNonDurable sealed : public IAsyncFile , public ReferenceCounted < AsyncFileNonDurable > {
public :
UID id ;
std : : string filename ;
//An approximation of the size of the file; .size() should be used instead of this variable in most cases
int64_t approximateSize ;
//The address of the machine that opened the file
NetworkAddress openedAddress ;
private :
//The wrapped IAsyncFile
Reference < IAsyncFile > file ;
//The maximum amount of time a write is delayed before being passed along to the underlying file
double maxWriteDelay ;
//Modifications which haven't been pushed to file, mapped by the location in the file that is being modified
RangeMap < uint64_t , Future < Void > > pendingModifications ;
//Will be blocked whenever kill is running
Promise < Void > killed ;
Promise < Void > killComplete ;
//Used by sync (and kill) to force writes which have not yet been passed along.
//If true is sent, then writes will be durable. If false, then they may not be durable.
Promise < bool > startSyncPromise ;
//The performance parameters of the simulated disk
Reference < DiskParameters > diskParameters ;
//Set to true the first time sync is called on the file
bool hasBeenSynced ;
//Used to describe what corruption is allowed by the file as well as the type of corruption being used on a particular page
enum KillMode { NO_CORRUPTION = 0 , DROP_ONLY = 1 , FULL_CORRUPTION = 2 } ;
//Limits what types of corruption are applied to writes from this file
KillMode killMode ;
ActorCollection reponses ; //cannot call getResult on this actor collection, since the actors will be on different processes
AsyncFileNonDurable ( const std : : string & filename , Reference < IAsyncFile > file , Reference < DiskParameters > diskParameters , NetworkAddress openedAddress )
: openedAddress ( openedAddress ) , pendingModifications ( uint64_t ( - 1 ) ) , approximateSize ( 0 ) , reponses ( false ) {
//This is only designed to work in simulation
ASSERT ( g_network - > isSimulated ( ) ) ;
2019-05-11 05:01:52 +08:00
this - > id = deterministicRandom ( ) - > randomUniqueID ( ) ;
2017-05-26 04:48:44 +08:00
//TraceEvent("AsyncFileNonDurable_Create", id).detail("Filename", filename);
this - > file = file ;
this - > filename = filename ;
this - > diskParameters = diskParameters ;
maxWriteDelay = 5.0 ;
hasBeenSynced = false ;
2019-05-11 05:01:52 +08:00
killMode = ( KillMode ) deterministicRandom ( ) - > randomInt ( 1 , 3 ) ;
2017-05-26 04:48:44 +08:00
//TraceEvent("AsyncFileNonDurable_CreateEnd", id).detail("Filename", filename).backtrace();
}
public :
static std : : map < std : : string , Future < Void > > filesBeingDeleted ;
//Creates a new AsyncFileNonDurable which wraps the provided IAsyncFile
ACTOR static Future < Reference < IAsyncFile > > open ( std : : string filename , std : : string actualFilename , Future < Reference < IAsyncFile > > wrappedFile , Reference < DiskParameters > diskParameters ) {
state ISimulator : : ProcessInfo * currentProcess = g_simulator . getCurrentProcess ( ) ;
2019-06-25 17:47:35 +08:00
state TaskPriority currentTaskID = g_network - > getCurrentTask ( ) ;
2017-05-26 04:48:44 +08:00
state Future < Void > shutdown = success ( currentProcess - > shutdownSignal . getFuture ( ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("AsyncFileNonDurableOpenBegin").detail("Filename", filename).detail("Addr", g_simulator.getCurrentProcess()->address);
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onMachine ( currentProcess ) ) ;
2017-05-26 04:48:44 +08:00
try {
2018-08-11 04:57:10 +08:00
wait ( success ( wrappedFile ) | | shutdown ) ;
2017-05-26 04:48:44 +08:00
if ( shutdown . isReady ( ) )
throw io_error ( ) . asInjectedFault ( ) ;
state Reference < IAsyncFile > file = wrappedFile . get ( ) ;
//If we are in the process of deleting a file, we can't let someone else modify it at the same time. We therefore block the creation of new files until deletion is complete
state std : : map < std : : string , Future < Void > > : : iterator deletedFile = filesBeingDeleted . find ( filename ) ;
if ( deletedFile ! = filesBeingDeleted . end ( ) ) {
2017-08-26 01:12:58 +08:00
//TraceEvent("AsyncFileNonDurableOpenWaitOnDelete1").detail("Filename", filename);
2018-08-11 04:57:10 +08:00
wait ( deletedFile - > second | | shutdown ) ;
2017-08-26 01:12:58 +08:00
//TraceEvent("AsyncFileNonDurableOpenWaitOnDelete2").detail("Filename", filename);
2017-05-26 04:48:44 +08:00
if ( shutdown . isReady ( ) )
throw io_error ( ) . asInjectedFault ( ) ;
}
state Reference < AsyncFileNonDurable > nonDurableFile ( new AsyncFileNonDurable ( filename , file , diskParameters , currentProcess - > address ) ) ;
//Causes the approximateSize member to be set
state Future < int64_t > sizeFuture = nonDurableFile - > size ( ) ;
2018-08-11 04:57:10 +08:00
wait ( success ( sizeFuture ) | | shutdown ) ;
2017-05-26 04:48:44 +08:00
if ( shutdown . isReady ( ) )
throw io_error ( ) . asInjectedFault ( ) ;
//TraceEvent("AsyncFileNonDurableOpenComplete").detail("Filename", filename);
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
return nonDurableFile ;
} catch ( Error & e ) {
state Error err = e ;
std : : string currentFilename = ( wrappedFile . isReady ( ) & & ! wrappedFile . isError ( ) ) ? wrappedFile . get ( ) - > getFilename ( ) : actualFilename ;
currentProcess - > machine - > openFiles . erase ( currentFilename ) ;
2018-08-02 05:30:57 +08:00
//TraceEvent("AsyncFileNonDurableOpenError").error(e, true).detail("Filename", filename).detail("Address", currentProcess->address).detail("Addr", g_simulator.getCurrentProcess()->address);
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
throw err ;
}
}
~ AsyncFileNonDurable ( ) {
//TraceEvent("AsyncFileNonDurable_Destroy", id).detail("Filename", filename);
}
virtual void addref ( ) {
ReferenceCounted < AsyncFileNonDurable > : : addref ( ) ;
}
virtual void delref ( ) {
if ( delref_no_destroy ( ) ) {
ASSERT ( filesBeingDeleted . count ( filename ) = = 0 ) ;
//TraceEvent("AsyncFileNonDurable_StartDelete", id).detail("Filename", filename);
Future < Void > deleteFuture = deleteFile ( this ) ;
if ( ! deleteFuture . isReady ( ) )
filesBeingDeleted [ filename ] = deleteFuture ;
}
}
//Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the results
Future < int > read ( void * data , int length , int64_t offset ) {
return read ( this , data , length , offset ) ;
}
//Writes data to the file. Writes are delayed a random amount of time before being
//passed to the underlying file
Future < Void > write ( void const * data , int length , int64_t offset ) {
//TraceEvent("AsyncFileNonDurable_Write", id).detail("Filename", filename).detail("Offset", offset).detail("Length", length);
if ( length = = 0 ) {
TraceEvent ( SevWarnAlways , " AsyncFileNonDurable_EmptyModification " , id ) . detail ( " Filename " , filename ) ;
return Void ( ) ;
}
debugFileSet ( " AsyncFileNonDurableWrite " , filename , data , offset , length ) ;
Promise < Void > writeStarted ;
Promise < Future < Void > > writeEnded ;
writeEnded . send ( write ( this , writeStarted , writeEnded . getFuture ( ) , data , length , offset ) ) ;
return writeStarted . getFuture ( ) ;
}
//Truncates the file. Truncates are delayed a random amount of time before being
//passed to the underlying file
Future < Void > truncate ( int64_t size ) {
//TraceEvent("AsyncFileNonDurable_Truncate", id).detail("Filename", filename).detail("Offset", size);
debugFileTruncate ( " AsyncFileNonDurableTruncate " , filename , size ) ;
Promise < Void > truncateStarted ;
Promise < Future < Void > > truncateEnded ;
truncateEnded . send ( truncate ( this , truncateStarted , truncateEnded . getFuture ( ) , size ) ) ;
return truncateStarted . getFuture ( ) ;
}
//Fsyncs the file. This allows all delayed modifications to the file to complete before
//syncing the underlying file
Future < Void > sync ( ) {
//TraceEvent("AsyncFileNonDurable_Sync", id).detail("Filename", filename);
Future < Void > syncFuture = sync ( this , true ) ;
reponses . add ( syncFuture ) ;
return syncFuture ;
}
//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
Future < int64_t > size ( ) {
return size ( this ) ;
}
int64_t debugFD ( ) {
return file - > debugFD ( ) ;
}
std : : string getFilename ( ) {
return file - > getFilename ( ) ;
}
//Forces a non-durable sync (some writes are not made or made incorrectly)
//This is used when the file should 'die' without first completing its operations
//(e.g. to simulate power failure)
Future < Void > kill ( ) {
TraceEvent ( " AsyncFileNonDurable_Kill " , id ) . detail ( " Filename " , filename ) ;
TEST ( true ) ; //AsyncFileNonDurable was killed
return sync ( this , false ) ;
}
private :
//Returns a future that is used to ensure the waiter ends up on the main thread
Future < Void > returnToMainThread ( ) {
Promise < Void > p ;
Future < Void > f = p . getFuture ( ) ;
g_network - > onMainThread ( std : : move ( p ) , g_network - > getCurrentTask ( ) ) ;
return f ;
}
//Gets existing modifications that overlap the specified range. Optionally inserts a new modification into the map
std : : vector < Future < Void > > getModificationsAndInsert ( int64_t offset , int64_t length , bool insertModification = false , Future < Void > value = Void ( ) ) {
auto modification = RangeMapRange < uint64_t > ( offset , length > = 0 ? offset + length : uint64_t ( - 1 ) ) ;
auto priorModifications = pendingModifications . intersectingRanges ( modification ) ;
//Aggregate existing modifications in this range
std : : vector < Future < Void > > modificationFutures ;
for ( auto itr = priorModifications . begin ( ) ; itr ! = priorModifications . end ( ) ; + + itr ) {
if ( itr . value ( ) . isValid ( ) & & ( ! itr . value ( ) . isReady ( ) | | itr . value ( ) . isError ( ) ) ) {
modificationFutures . push_back ( itr . value ( ) ) ;
}
}
//Add the modification if we are doing a write or truncate
if ( insertModification )
pendingModifications . insert ( modification , value ) ;
return modificationFutures ;
}
//Checks if the file is killed. If so, then the current sync is completed if running and then an error is thrown
ACTOR Future < Void > checkKilled ( AsyncFileNonDurable * self , std : : string context ) {
if ( self - > killed . isSet ( ) ) {
//TraceEvent("AsyncFileNonDurable_KilledInCheck", self->id).detail("In", context).detail("Filename", self->filename);
2018-08-11 04:57:10 +08:00
wait ( self - > killComplete . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( " AsyncFileNonDurable_KilledFileOperation " , self - > id ) . detail ( " In " , context ) . detail ( " Filename " , self - > filename ) ;
TEST ( true ) ; // AsyncFileNonDurable operation killed
throw io_error ( ) . asInjectedFault ( ) ;
}
return Void ( ) ;
}
//Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the results
ACTOR Future < int > onRead ( AsyncFileNonDurable * self , void * data , int length , int64_t offset ) {
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , " Read " ) ) ;
2017-05-26 04:48:44 +08:00
vector < Future < Void > > priorModifications = self - > getModificationsAndInsert ( offset , length ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( priorModifications ) ) ;
2017-05-26 04:48:44 +08:00
state Future < int > readFuture = self - > file - > read ( data , length , offset ) ;
2018-08-11 04:57:10 +08:00
wait ( success ( readFuture ) | | self - > killed . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
// throws if we were killed
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , " ReadEnd " ) ) ;
2017-05-26 04:48:44 +08:00
debugFileCheck ( " AsyncFileNonDurableRead " , self - > filename , data , offset , length ) ;
//if(g_simulator.getCurrentProcess()->rebooting)
//TraceEvent("AsyncFileNonDurable_ReadEnd", self->id).detail("Filename", self->filename);
return readFuture . get ( ) ;
}
ACTOR Future < int > read ( AsyncFileNonDurable * self , void * data , int length , int64_t offset ) {
state ISimulator : : ProcessInfo * currentProcess = g_simulator . getCurrentProcess ( ) ;
2019-06-25 17:47:35 +08:00
state TaskPriority currentTaskID = g_network - > getCurrentTask ( ) ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onMachine ( currentProcess ) ) ;
2017-05-26 04:48:44 +08:00
try {
state int rep = wait ( self - > onRead ( self , data , length , offset ) ) ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
return rep ;
} catch ( Error & e ) {
state Error err = e ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
throw err ;
}
}
//Delays writes a random amount of time before passing them through to the underlying file.
//If a kill interrupts the delay, then the output could be the correct write, part of the write,
//or none of the write. It may also corrupt parts of sectors which have not been written correctly
ACTOR Future < Void > write ( AsyncFileNonDurable * self , Promise < Void > writeStarted , Future < Future < Void > > ownFuture , void const * data , int length , int64_t offset ) {
state ISimulator : : ProcessInfo * currentProcess = g_simulator . getCurrentProcess ( ) ;
2019-06-25 17:47:35 +08:00
state TaskPriority currentTaskID = g_network - > getCurrentTask ( ) ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onMachine ( currentProcess ) ) ;
2017-05-26 04:48:44 +08:00
2019-05-11 05:01:52 +08:00
state double delayDuration = deterministicRandom ( ) - > random01 ( ) * self - > maxWriteDelay ;
2017-05-26 04:48:44 +08:00
state Standalone < StringRef > dataCopy ( StringRef ( ( uint8_t * ) data , length ) ) ;
state Future < bool > startSyncFuture = self - > startSyncPromise . getFuture ( ) ;
try {
//TraceEvent("AsyncFileNonDurable_Write", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , " Write " ) ) ;
2017-05-26 04:48:44 +08:00
Future < Void > writeEnded = wait ( ownFuture ) ;
std : : vector < Future < Void > > priorModifications = self - > getModificationsAndInsert ( offset , length , true , writeEnded ) ;
if ( BUGGIFY_WITH_PROB ( 0.001 ) )
2019-05-11 05:01:52 +08:00
priorModifications . push_back ( delay ( deterministicRandom ( ) - > random01 ( ) * FLOW_KNOBS - > MAX_PRIOR_MODIFICATION_DELAY ) | | self - > killed . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
else
priorModifications . push_back ( waitUntilDiskReady ( self - > diskParameters , length ) | | self - > killed . getFuture ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( priorModifications ) ) ;
2017-05-26 04:48:44 +08:00
self - > approximateSize = std : : max ( self - > approximateSize , length + offset ) ;
self - > reponses . add ( sendOnProcess ( currentProcess , writeStarted , currentTaskID ) ) ;
}
catch ( Error & e ) {
self - > reponses . add ( sendErrorOnProcess ( currentProcess , writeStarted , e , currentTaskID ) ) ;
throw ;
}
//TraceEvent("AsyncFileNonDurable_WriteDoneWithPreviousMods", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
//Wait a random amount of time or until a sync/kill is issued
state bool saveDurable = true ;
choose {
2018-08-11 04:57:10 +08:00
when ( wait ( delay ( delayDuration ) ) ) { }
2017-05-26 04:48:44 +08:00
when ( bool durable = wait ( startSyncFuture ) ) {
saveDurable = durable ;
}
}
debugFileCheck ( " AsyncFileNonDurableWriteAfterWait " , self - > filename , dataCopy . begin ( ) , offset , length ) ;
//Only page-aligned writes are supported
ASSERT ( offset % 4096 = = 0 & & length % 4096 = = 0 ) ;
//Non-durable writes should introduce errors at the page level and corrupt at the sector level
//Otherwise, we can perform the entire write at once
int pageLength = saveDurable ? length : 4096 ;
int sectorLength = saveDurable ? length : 512 ;
vector < Future < Void > > writeFutures ;
for ( int writeOffset = 0 ; writeOffset < length ; writeOffset + = pageLength ) {
//choose a random action to perform on this page write (write correctly, corrupt, or don't write)
2019-05-11 05:01:52 +08:00
KillMode pageKillMode = ( KillMode ) deterministicRandom ( ) - > randomInt ( 0 , self - > killMode + 1 ) ;
2017-05-26 04:48:44 +08:00
for ( int pageOffset = 0 ; pageOffset < pageLength ; pageOffset + = sectorLength ) {
//If saving durable, then perform the write correctly. Otherwise, perform the write correcly with a probability of 1/3.
//If corrupting the write, then this sector will be written correctly with a 1/4 chance
2019-05-11 05:01:52 +08:00
if ( saveDurable | | pageKillMode = = NO_CORRUPTION | | ( pageKillMode = = FULL_CORRUPTION & & deterministicRandom ( ) - > random01 ( ) < 0.25 ) ) {
2017-05-26 04:48:44 +08:00
//if (!saveDurable) TraceEvent(SevInfo, "AsyncFileNonDurableWrite", self->id).detail("Filename", self->filename).detail("Offset", offset+writeOffset+pageOffset).detail("Length", sectorLength);
writeFutures . push_back ( self - > file - > write ( dataCopy . begin ( ) + writeOffset + pageOffset , sectorLength , offset + writeOffset + pageOffset ) ) ;
}
//If the write is not durable, then the write will either be corrupted or not written at all. If corrupted, there is 1/4 chance that a given
//sector will not be written
2019-05-11 05:01:52 +08:00
else if ( pageKillMode = = FULL_CORRUPTION & & deterministicRandom ( ) - > random01 ( ) < 0.66667 ) {
2017-05-26 04:48:44 +08:00
//The incorrect part of the write can be the rightmost bytes (side = 0), the leftmost bytes (side = 1), or the entire write (side = 2)
2019-05-11 05:01:52 +08:00
int side = deterministicRandom ( ) - > randomInt ( 0 , 3 ) ;
2017-05-26 04:48:44 +08:00
//There is a 1/2 chance that a bad write will have garbage written into its bad portion
//The chance is increased to 1 if the entire write is bad
2019-05-11 05:01:52 +08:00
bool garbage = side = = 2 | | deterministicRandom ( ) - > random01 ( ) < 0.5 ;
2017-05-26 04:48:44 +08:00
int64_t goodStart = 0 ;
int64_t goodEnd = sectorLength ;
int64_t badStart = 0 ;
int64_t badEnd = sectorLength ;
if ( side = = 0 ) {
2019-05-11 05:01:52 +08:00
goodEnd = deterministicRandom ( ) - > randomInt ( 0 , sectorLength ) ;
2017-05-26 04:48:44 +08:00
badStart = goodEnd ;
}
else if ( side = = 1 ) {
2019-05-11 05:01:52 +08:00
badEnd = deterministicRandom ( ) - > randomInt ( 0 , sectorLength ) ;
2017-05-26 04:48:44 +08:00
goodStart = badEnd ;
}
else
goodEnd = 0 ;
//Write randomly generated bytes, if required
if ( garbage & & badStart ! = badEnd ) {
uint8_t * badData = const_cast < uint8_t * > ( & dataCopy . begin ( ) [ badStart + writeOffset + pageOffset ] ) ;
for ( int i = 0 ; i < badEnd - badStart ; i + = sizeof ( uint32_t ) ) {
2019-05-11 05:01:52 +08:00
uint32_t val = deterministicRandom ( ) - > randomUInt32 ( ) ;
2017-05-26 04:48:44 +08:00
memcpy ( & badData [ i ] , & val , std : : min ( badEnd - badStart - i , ( int64_t ) sizeof ( uint32_t ) ) ) ;
}
writeFutures . push_back ( self - > file - > write ( dataCopy . begin ( ) + writeOffset + pageOffset , sectorLength , offset + writeOffset + pageOffset ) ) ;
debugFileSet ( " AsyncFileNonDurableBadWrite " , self - > filename , dataCopy . begin ( ) + writeOffset + pageOffset , offset + writeOffset + pageOffset , sectorLength ) ;
}
else if ( goodStart ! = goodEnd )
writeFutures . push_back ( self - > file - > write ( dataCopy . begin ( ) + goodStart + writeOffset + pageOffset , goodEnd - goodStart , goodStart + offset + writeOffset + pageOffset ) ) ;
TraceEvent ( " AsyncFileNonDurable_BadWrite " , self - > id ) . detail ( " Offset " , offset + writeOffset + pageOffset ) . detail ( " Length " , sectorLength ) . detail ( " GoodStart " , goodStart ) . detail ( " GoodEnd " , goodEnd ) . detail ( " HasGarbage " , garbage ) . detail ( " Side " , side ) . detail ( " Filename " , self - > filename ) ;
TEST ( true ) ; //AsyncFileNonDurable bad write
}
else {
TraceEvent ( " AsyncFileNonDurable_DroppedWrite " , self - > id ) . detail ( " Offset " , offset + writeOffset + pageOffset ) . detail ( " Length " , sectorLength ) . detail ( " Filename " , self - > filename ) ;
TEST ( true ) ; //AsyncFileNonDurable dropped write
}
}
}
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( writeFutures ) ) ;
2017-05-26 04:48:44 +08:00
//TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
return Void ( ) ;
}
//Delays truncates a random amount of time before passing them through to the underlying file.
//If a kill interrupts the delay, then the truncate may or may not be performed
ACTOR Future < Void > truncate ( AsyncFileNonDurable * self , Promise < Void > truncateStarted , Future < Future < Void > > ownFuture , int64_t size ) {
state ISimulator : : ProcessInfo * currentProcess = g_simulator . getCurrentProcess ( ) ;
2019-06-25 17:47:35 +08:00
state TaskPriority currentTaskID = g_network - > getCurrentTask ( ) ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onMachine ( currentProcess ) ) ;
2017-05-26 04:48:44 +08:00
2019-05-11 05:01:52 +08:00
state double delayDuration = deterministicRandom ( ) - > random01 ( ) * self - > maxWriteDelay ;
2017-05-26 04:48:44 +08:00
state Future < bool > startSyncFuture = self - > startSyncPromise . getFuture ( ) ;
try {
//TraceEvent("AsyncFileNonDurable_Truncate", self->id).detail("Delay", delayDuration).detail("Filename", self->filename);
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , " Truncate " ) ) ;
2017-05-26 04:48:44 +08:00
Future < Void > truncateEnded = wait ( ownFuture ) ;
std : : vector < Future < Void > > priorModifications = self - > getModificationsAndInsert ( size , - 1 , true , truncateEnded ) ;
if ( BUGGIFY_WITH_PROB ( 0.001 ) )
2019-05-11 05:01:52 +08:00
priorModifications . push_back ( delay ( deterministicRandom ( ) - > random01 ( ) * FLOW_KNOBS - > MAX_PRIOR_MODIFICATION_DELAY ) | | self - > killed . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
else
priorModifications . push_back ( waitUntilDiskReady ( self - > diskParameters , 0 ) | | self - > killed . getFuture ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( priorModifications ) ) ;
2017-05-26 04:48:44 +08:00
self - > approximateSize = size ;
self - > reponses . add ( sendOnProcess ( currentProcess , truncateStarted , currentTaskID ) ) ;
}
catch ( Error & e ) {
self - > reponses . add ( sendErrorOnProcess ( currentProcess , truncateStarted , e , currentTaskID ) ) ;
throw ;
}
//Wait a random amount of time or until a sync/kill is issued
state bool saveDurable = true ;
choose {
2018-08-11 04:57:10 +08:00
when ( wait ( delay ( delayDuration ) ) ) { }
2017-05-26 04:48:44 +08:00
when ( bool durable = wait ( startSyncFuture ) ) {
saveDurable = durable ;
}
}
2019-06-25 17:47:35 +08:00
if ( g_network - > check_yield ( TaskPriority : : DefaultYield ) ) {
wait ( delay ( 0 , TaskPriority : : DefaultYield ) ) ;
2017-05-26 04:48:44 +08:00
}
//If performing a durable truncate, then pass it through to the file. Otherwise, pass it through with a 1/2 chance
2019-05-11 05:01:52 +08:00
if ( saveDurable | | self - > killMode = = NO_CORRUPTION | | deterministicRandom ( ) - > random01 ( ) < 0.5 )
2018-08-11 04:57:10 +08:00
wait ( self - > file - > truncate ( size ) ) ;
2017-05-26 04:48:44 +08:00
else {
TraceEvent ( " AsyncFileNonDurable_DroppedTruncate " , self - > id ) . detail ( " Size " , size ) ;
TEST ( true ) ; //AsyncFileNonDurable dropped truncate
}
return Void ( ) ;
}
//Waits for delayed modifications to the file to complete and then syncs the underlying file
//If durable is false, then some of the delayed modifications will not be applied or will be
//applied incorrectly
ACTOR Future < Void > onSync ( AsyncFileNonDurable * self , bool durable ) {
//TraceEvent("AsyncFileNonDurable_ImplSync", self->id).detail("Filename", self->filename).detail("Durable", durable);
ASSERT ( durable | | ! self - > killed . isSet ( ) ) ; // this file is kill()ed only once
if ( durable ) {
self - > hasBeenSynced = true ;
2018-08-11 04:57:10 +08:00
wait ( waitUntilDiskReady ( self - > diskParameters , 0 , true ) | | self - > killed . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , durable ? " Sync " : " Kill " ) ) ;
2017-05-26 04:48:44 +08:00
if ( ! durable )
self - > killed . send ( Void ( ) ) ;
//Get all outstanding modifications
std : : vector < Future < Void > > outstandingModifications ;
std : : vector < RangeMapRange < uint64_t > > stillPendingModifications ;
auto rangeItr = self - > pendingModifications . ranges ( ) ;
for ( auto itr = rangeItr . begin ( ) ; itr ! = rangeItr . end ( ) ; + + itr ) {
if ( itr . value ( ) . isValid ( ) & & ( ! itr - > value ( ) . isReady ( ) | | itr - > value ( ) . isError ( ) ) ) {
outstandingModifications . push_back ( itr - > value ( ) ) ;
if ( ! itr . value ( ) . isReady ( ) )
stillPendingModifications . push_back ( itr - > range ( ) ) ;
}
}
Future < Void > allModifications = waitForAll ( outstandingModifications ) ;
//Clear out the pending modifications map of all completed modifications
self - > pendingModifications . insert ( RangeMapRange < uint64_t > ( 0 , - 1 ) , Void ( ) ) ;
for ( auto itr = stillPendingModifications . begin ( ) ; itr ! = stillPendingModifications . end ( ) ; + + itr )
self - > pendingModifications . insert ( * itr , success ( allModifications ) ) ; //waitForAll cannot wait on the same future more than once, so wrap the future with success
//Signal all modifications to end their delay and reset the startSyncPromise
Promise < bool > startSyncPromise = self - > startSyncPromise ;
self - > startSyncPromise = Promise < bool > ( ) ;
//Writes will be durable in a kill with a 10% probability
2019-05-11 05:01:52 +08:00
state bool writeDurable = durable | | deterministicRandom ( ) - > random01 ( ) < 0.1 ;
2017-05-26 04:48:44 +08:00
startSyncPromise . send ( writeDurable ) ;
//Wait for outstanding writes to complete
if ( durable )
2018-08-11 04:57:10 +08:00
wait ( allModifications ) ;
2017-05-26 04:48:44 +08:00
else
2019-02-13 08:07:17 +08:00
wait ( success ( errorOr ( allModifications ) ) ) ;
2017-05-26 04:48:44 +08:00
if ( ! durable ) {
//Sometimes sync the file if writes were made durably. Before a file is first synced, it is stored in a temporary file and then renamed to the correct
//location once sync is called. By not calling sync, we simulate a failure to fsync the directory storing the file
2019-05-11 05:01:52 +08:00
if ( self - > hasBeenSynced & & writeDurable & & deterministicRandom ( ) - > random01 ( ) < 0.5 ) {
2017-05-26 04:48:44 +08:00
TEST ( true ) ; //AsyncFileNonDurable kill was durable and synced
2019-02-13 08:07:17 +08:00
wait ( success ( errorOr ( self - > file - > sync ( ) ) ) ) ;
2017-05-26 04:48:44 +08:00
}
//Setting this promise could trigger the deletion of the AsyncFileNonDurable; after this none of its members should be used
//TraceEvent("AsyncFileNonDurable_ImplSyncEnd", self->id).detail("Filename", self->filename).detail("Durable", durable);
self - > killComplete . send ( Void ( ) ) ;
}
//A killed file cannot be allowed to report that it successfully synced
else {
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , " SyncEnd " ) ) ;
wait ( self - > file - > sync ( ) ) ;
2017-05-26 04:48:44 +08:00
//TraceEvent("AsyncFileNonDurable_ImplSyncEnd", self->id).detail("Filename", self->filename).detail("Durable", durable);
}
return Void ( ) ;
}
ACTOR Future < Void > sync ( AsyncFileNonDurable * self , bool durable ) {
state ISimulator : : ProcessInfo * currentProcess = g_simulator . getCurrentProcess ( ) ;
2019-06-25 17:47:35 +08:00
state TaskPriority currentTaskID = g_network - > getCurrentTask ( ) ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onMachine ( currentProcess ) ) ;
2017-05-26 04:48:44 +08:00
try {
2018-08-11 04:57:10 +08:00
wait ( self - > onSync ( self , durable ) ) ;
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
} catch ( Error & e ) {
state Error err = e ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
throw err ;
}
}
//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
ACTOR Future < int64_t > onSize ( AsyncFileNonDurable * self ) {
//TraceEvent("AsyncFileNonDurable_Size", self->id).detail("Filename", self->filename);
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , " Size " ) ) ;
2017-05-26 04:48:44 +08:00
state Future < int64_t > sizeFuture = self - > file - > size ( ) ;
2018-08-11 04:57:10 +08:00
wait ( success ( sizeFuture ) | | self - > killed . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( self - > checkKilled ( self , " SizeEnd " ) ) ;
2017-05-26 04:48:44 +08:00
//Include any modifications which extend past the end of the file
uint64_t maxModification = self - > pendingModifications . lastItem ( ) . begin ( ) ;
self - > approximateSize = std : : max < int64_t > ( sizeFuture . get ( ) , maxModification ) ;
return self - > approximateSize ;
}
ACTOR Future < int64_t > size ( AsyncFileNonDurable * self ) {
state ISimulator : : ProcessInfo * currentProcess = g_simulator . getCurrentProcess ( ) ;
2019-06-25 17:47:35 +08:00
state TaskPriority currentTaskID = g_network - > getCurrentTask ( ) ;
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onMachine ( currentProcess ) ) ;
2017-05-26 04:48:44 +08:00
try {
state int64_t rep = wait ( self - > onSize ( self ) ) ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
return rep ;
} catch ( Error & e ) {
state Error err = e ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-05-26 04:48:44 +08:00
throw err ;
}
}
//Finishes all outstanding actors on an AsyncFileNonDurable and then deletes it
ACTOR Future < Void > deleteFile ( AsyncFileNonDurable * self ) {
2017-08-26 01:12:58 +08:00
state ISimulator : : ProcessInfo * currentProcess = g_simulator . getCurrentProcess ( ) ;
2019-06-25 17:47:35 +08:00
state TaskPriority currentTaskID = g_network - > getCurrentTask ( ) ;
2017-08-26 01:12:58 +08:00
state std : : string filename = self - > filename ;
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onMachine ( currentProcess ) ) ;
2017-08-26 01:12:58 +08:00
try {
//Make sure all writes have gone through.
Promise < bool > startSyncPromise = self - > startSyncPromise ;
self - > startSyncPromise = Promise < bool > ( ) ;
startSyncPromise . send ( true ) ;
2017-05-26 04:48:44 +08:00
2017-08-26 01:12:58 +08:00
std : : vector < Future < Void > > outstandingModifications ;
2017-05-26 04:48:44 +08:00
2017-08-26 01:12:58 +08:00
for ( auto itr = self - > pendingModifications . ranges ( ) . begin ( ) ; itr ! = self - > pendingModifications . ranges ( ) . end ( ) ; + + itr )
if ( itr - > value ( ) . isValid ( ) & & ! itr - > value ( ) . isReady ( ) )
outstandingModifications . push_back ( itr - > value ( ) ) ;
2017-05-26 04:48:44 +08:00
2017-08-26 01:12:58 +08:00
//Ignore errors here so that all modifications can finish
2018-08-11 04:57:10 +08:00
wait ( waitForAllReady ( outstandingModifications ) ) ;
2017-05-26 04:48:44 +08:00
2017-08-26 01:12:58 +08:00
//Make sure we aren't in the process of killing the file
if ( self - > killed . isSet ( ) )
2018-08-11 04:57:10 +08:00
wait ( self - > killComplete . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
2017-08-26 01:12:58 +08:00
//Remove this file from the filesBeingDeleted map so that new files can be created with this filename
g_simulator . getMachineByNetworkAddress ( self - > openedAddress ) - > closingFiles . erase ( self - > getFilename ( ) ) ;
g_simulator . getMachineByNetworkAddress ( self - > openedAddress ) - > deletingFiles . erase ( self - > getFilename ( ) ) ;
AsyncFileNonDurable : : filesBeingDeleted . erase ( self - > filename ) ;
//TraceEvent("AsyncFileNonDurable_FinishDelete", self->id).detail("Filename", self->filename);
2017-05-26 04:48:44 +08:00
2017-08-26 01:12:58 +08:00
delete self ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-08-26 01:12:58 +08:00
return Void ( ) ;
} catch ( Error & e ) {
state Error err = e ;
2018-08-11 04:57:10 +08:00
wait ( g_simulator . onProcess ( currentProcess , currentTaskID ) ) ;
2017-08-26 01:12:58 +08:00
throw err ;
}
2017-05-26 04:48:44 +08:00
}
} ;
2018-08-11 06:47:41 +08:00
# include "flow/unactorcompiler.h"
2017-05-26 04:48:44 +08:00
# endif