Merge branch 'fix-io-timeout-handling'

# Conflicts:
#	fdbrpc/AsyncFileKAIO.actor.h
#	fdbrpc/sim2.actor.cpp
#	fdbserver/KeyValueStoreSQLite.actor.cpp
#	fdbserver/optimisttest.actor.cpp
#	fdbserver/worker.actor.cpp
#	fdbserver/workloads/MachineAttrition.actor.cpp
#	tests/fast/SidebandWithStatus.txt
#	tests/rare/LargeApiCorrectnessStatus.txt
#	tests/slow/DDBalanceAndRemoveStatus.txt
This commit is contained in:
Stephen Atherton 2017-05-26 17:43:28 -07:00
parent bffa1facb5
commit 7260e38545
20 changed files with 624 additions and 64 deletions

View File

@ -36,6 +36,12 @@
#include "fdbrpc/linux_kaio.h"
#include "flow/Knobs.h"
#include "flow/UnitTest.h"
#include <stdio.h>
#include "flow/Hash3.h"
#include "flow/genericactors.actor.h"
// Set this to true to enable detailed KAIO request logging, which currently is written to a hardcoded location /data/v7/fdb/
#define KAIO_LOGGING 0
DESCR struct SlowAioSubmit {
int64_t submitDuration; // ns
@ -47,6 +53,48 @@ DESCR struct SlowAioSubmit {
class AsyncFileKAIO : public IAsyncFile, public ReferenceCounted<AsyncFileKAIO> {
public:
#if KAIO_LOGGING
private:
#pragma pack(push, 1)
struct OpLogEntry {
OpLogEntry() : result(0) {}
enum EOperation { READ = 1, WRITE = 2, SYNC = 3, TRUNCATE = 4 };
enum EStage { START = 1, LAUNCH = 2, REQUEUE = 3, COMPLETE = 4, READY = 5 };
int64_t timestamp;
uint32_t id;
uint32_t checksum;
uint32_t pageOffset;
uint8_t pageCount;
uint8_t op;
uint8_t stage;
uint32_t result;
static uint32_t nextID() {
static uint32_t last = 0;
return ++last;
}
void log(FILE *file) {
if(ftell(file) > (int64_t)50 * 1e9)
fseek(file, 0, SEEK_SET);
if(!fwrite(this, sizeof(OpLogEntry), 1, file))
throw io_error();
}
};
#pragma pop
FILE *logFile;
struct IOBlock;
static void KAIOLogBlockEvent(IOBlock *ioblock, OpLogEntry::EStage stage, uint32_t result = 0);
static void KAIOLogBlockEvent(FILE *logFile, IOBlock *ioblock, OpLogEntry::EStage stage, uint32_t result = 0);
static void KAIOLogEvent(FILE *logFile, uint32_t id, OpLogEntry::EOperation op, OpLogEntry::EStage stage, uint32_t pageOffset = 0, uint32_t result = 0);
public:
#else
#define KAIOLogBlockEvent(...)
#define KAIOLogEvent(...)
#endif
static Future<Reference<IAsyncFile>> open( std::string filename, int flags, int mode, void* ignore ) {
ASSERT( flags & OPEN_UNBUFFERED );
@ -146,7 +194,13 @@ public:
io->offset = offset;
enqueue(io, "read", this);
return io->result.getFuture();
Future<int> result = io->result.getFuture();
#if KAIO_LOGGING
//result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
return result;
}
virtual Future<Void> write( void const* data, int length, int64_t offset ) {
++countFileLogicalWrites;
@ -165,7 +219,13 @@ public:
nextFileSize = std::max( nextFileSize, offset+length );
enqueue(io, "write", this);
return success(io->result.getFuture());
Future<int> result = io->result.getFuture();
#if KAIO_LOGGING
//result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
return success(result);
}
virtual Future<Void> truncate( int64_t size ) {
++countFileLogicalWrites;
@ -175,22 +235,34 @@ public:
return io_timeout();
}
#if KAIO_LOGGING
uint32_t id = OpLogEntry::nextID();
#endif
int result = -1;
KAIOLogEvent(logFile, id, OpLogEntry::TRUNCATE, OpLogEntry::START, size / 4096);
bool completed = false;
if( ctx.fallocateSupported && size >= lastFileSize ) {
if (fallocate( fd, 0, 0, size)) {
result = fallocate( fd, 0, 0, size);
if (result != 0) {
int fallocateErrCode = errno;
TraceEvent("AsyncFileKAIOAllocateError").detail("fd",fd).detail("filename", filename).GetLastError();
if ( fallocateErrCode == EOPNOTSUPP ) {
// Mark fallocate as unsupported. Try again with truncate.
ctx.fallocateSupported = false;
} else {
KAIOLogEvent(logFile, id, OpLogEntry::TRUNCATE, OpLogEntry::COMPLETE, size / 4096, result);
return io_error();
}
} else {
completed = true;
}
}
if ( !completed && ftruncate(fd, size) ) {
if ( !completed )
result = ftruncate(fd, size);
KAIOLogEvent(logFile, id, OpLogEntry::TRUNCATE, OpLogEntry::COMPLETE, size / 4096, result);
if(result != 0) {
TraceEvent("AsyncFileKAIOTruncateError").detail("fd",fd).detail("filename", filename).GetLastError();
return io_error();
}
@ -215,12 +287,22 @@ public:
return io_timeout();
}
#if KAIO_LOGGING
uint32_t id = OpLogEntry::nextID();
#endif
KAIOLogEvent(logFile, id, OpLogEntry::SYNC, OpLogEntry::START);
Future<Void> fsync = throwErrorIfFailed(Reference<AsyncFileKAIO>::addRef(this), AsyncFileEIO::async_fdatasync(fd)); // Don't close the file until the asynchronous thing is done
// Alas, AIO f(data)sync doesn't seem to actually be implemented by the kernel
/*IOBlock *io = new IOBlock(IO_CMD_FDSYNC, fd);
submit(io, "write");
fsync=success(io->result.getFuture());*/
#if KAIO_LOGGING
fsync = map(fsync, [=](Void r) mutable { KAIOLogEvent(logFile, id, OpLogEntry::SYNC, OpLogEntry::COMPLETE); return r; });
#endif
if (flags & OPEN_ATOMIC_WRITE_AND_CREATE) {
flags &= ~OPEN_ATOMIC_WRITE_AND_CREATE;
@ -236,7 +318,14 @@ public:
virtual std::string getFilename() {
return filename;
}
~AsyncFileKAIO() { close(fd); }
~AsyncFileKAIO() {
close(fd);
#if KAIO_LOGGING
if(logFile != nullptr)
fclose(logFile);
#endif
}
static void launch() {
if (ctx.queue.size() && ctx.outstanding < FLOW_KNOBS->MAX_OUTSTANDING - FLOW_KNOBS->MIN_SUBMIT) {
@ -254,6 +343,9 @@ public:
for(int i=0; i<n; i++) {
auto io = ctx.queue.top();
KAIOLogBlockEvent(io, OpLogEntry::LAUNCH);
ctx.queue.pop();
toStart[i] = io;
io->startTime = now();
@ -305,6 +397,7 @@ public:
if (errno == EAGAIN) {
rc = 0;
} else {
KAIOLogBlockEvent(toStart[0], OpLogEntry::COMPLETE, errno ? -errno : -1000000);
// Other errors are assumed to represent failure to issue the first I/O in the list
toStart[0]->setResult( errno ? -errno : -1000000 );
rc = 1;
@ -312,8 +405,10 @@ public:
} else
ctx.outstanding += rc;
// Any unsubmitted I/Os need to be requeued
for(int i=rc; i<n; i++)
for(int i=rc; i<n; i++) {
KAIOLogBlockEvent(toStart[i], OpLogEntry::REQUEUE);
ctx.queue.push(toStart[i]);
}
}
}
@ -335,6 +430,9 @@ private:
IOBlock *prev;
IOBlock *next;
double startTime;
#if KAIO_LOGGING
int32_t iolog_id;
#endif
struct indirect_order_by_priority { bool operator () ( IOBlock* a, IOBlock* b ) { return a->prio < b->prio; } };
@ -342,6 +440,9 @@ private:
memset((linux_iocb*)this, 0, sizeof(linux_iocb));
aio_lio_opcode = op;
aio_fildes = fd;
#if KAIO_LOGGING
iolog_id = 0;
#endif
}
int getTask() const { return (prio>>32)+1; }
@ -369,11 +470,13 @@ private:
void timeout(bool warnOnly) {
TraceEvent(SevWarnAlways, "AsyncFileKAIOTimeout").detail("fd", aio_fildes).detail("op", aio_lio_opcode).detail("nbytes", nbytes).detail("offset", offset).detail("ptr", int64_t(buf))
.detail("filename", owner->filename);
g_network->setGlobal(INetwork::enASIOTimedOut, (flowGlobalType)true);
if(!warnOnly)
owner->failed = true;
}
};
struct Context {
io_context_t iocx;
int evfd;
@ -453,10 +556,45 @@ private:
countLogicalWrites.init(LiteralStringRef("AsyncFile.CountLogicalWrites"));
countLogicalReads.init( LiteralStringRef("AsyncFile.CountLogicalReads"));
}
#if KAIO_LOGGING
logFile = nullptr;
// TODO: Don't do this hacky investigation-specific thing
StringRef fname(filename);
if(fname.endsWith(LiteralStringRef(".sqlite")) || fname.endsWith(LiteralStringRef(".sqlite-wal"))) {
std::string logFileName = filename;
while(logFileName.find("/") != std::string::npos)
logFileName = logFileName.substr(logFileName.find("/") + 1);
if(!logFileName.empty()) {
// TODO: don't hardcode this path
std::string logPath("/data/v7/fdb/");
try {
platform::createDirectory(logPath);
logFileName = logPath + format("%s.iolog", logFileName.c_str());
logFile = fopen(logFileName.c_str(), "r+");
if(logFile == nullptr)
logFile = fopen(logFileName.c_str(), "w");
if(logFile != nullptr)
TraceEvent("KAIOLogOpened").detail("File", filename).detail("LogFile", logFileName);
else
TraceEvent(SevWarn, "KAIOLogOpenFailure")
.detail("File", filename)
.detail("LogFile", logFileName)
.detail("ErrorCode", errno)
.detail("ErrorDesc", strerror(errno));
} catch(Error &e) {
TraceEvent(SevError, "KAIOLogOpenFailure").error(e);
}
}
}
#endif
}
void enqueue( IOBlock* io, const char* op, AsyncFileKAIO* owner ) {
ASSERT( int64_t(io->buf) % 4096 == 0 && io->offset % 4096 == 0 && io->nbytes % 4096 == 0 );
KAIOLogBlockEvent(owner->logFile, io, OpLogEntry::START);
io->flags |= 1;
io->eventfd = ctx.evfd;
io->prio = (int64_t(g_network->getCurrentTask())<<32) - (++ctx.opsIssued);
@ -520,6 +658,8 @@ private:
for(int i=0; i<n; i++) {
IOBlock* iob = static_cast<IOBlock*>(ev[i].iocb);
KAIOLogBlockEvent(iob, OpLogEntry::COMPLETE, ev[i].result);
if(ctx.ioTimeout > 0) {
ctx.removeFromRequestList(iob);
}
@ -530,6 +670,63 @@ private:
}
};
#if KAIO_LOGGING
// Call from contexts where only an ioblock is available, log if its owner is set
void AsyncFileKAIO::KAIOLogBlockEvent(IOBlock *ioblock, OpLogEntry::EStage stage, uint32_t result) {
if(ioblock->owner)
return KAIOLogBlockEvent(ioblock->owner->logFile, ioblock, stage, result);
}
void AsyncFileKAIO::KAIOLogBlockEvent(FILE *logFile, IOBlock *ioblock, OpLogEntry::EStage stage, uint32_t result) {
if(logFile != nullptr) {
// Figure out what type of operation this is
OpLogEntry::EOperation op;
if(ioblock->aio_lio_opcode == IO_CMD_PREAD)
op = OpLogEntry::READ;
else if(ioblock->aio_lio_opcode == IO_CMD_PWRITE)
op = OpLogEntry::WRITE;
else
return;
// Assign this IO operation an io log id number if it doesn't already have one
if(ioblock->iolog_id == 0)
ioblock->iolog_id = OpLogEntry::nextID();
OpLogEntry e;
e.timestamp = timer_int();
e.op = (uint8_t)op;
e.id = ioblock->iolog_id;
e.stage = (uint8_t)stage;
e.pageOffset = (uint32_t)(ioblock->offset / 4096);
e.pageCount = (uint8_t)(ioblock->nbytes / 4096);
e.result = result;
// Log a checksum for Writes up to the Complete stage or Reads starting from the Complete stage
if( (op == OpLogEntry::WRITE && stage <= OpLogEntry::COMPLETE) || (op == OpLogEntry::READ && stage >= OpLogEntry::COMPLETE) )
e.checksum = hashlittle(ioblock->buf, ioblock->nbytes, 0xab12fd93);
else
e.checksum = 0;
e.log(logFile);
}
}
void AsyncFileKAIO::KAIOLogEvent(FILE *logFile, uint32_t id, OpLogEntry::EOperation op, OpLogEntry::EStage stage, uint32_t pageOffset, uint32_t result) {
if(logFile != nullptr) {
OpLogEntry e;
e.timestamp = timer_int();
e.id = id;
e.op = (uint8_t)op;
e.stage = (uint8_t)stage;
e.pageOffset = pageOffset;
e.pageCount = 0;
e.checksum = 0;
e.result = result;
e.log(logFile);
}
}
#endif
ACTOR Future<Void> runTestOps(Reference<IAsyncFile> f, int numIterations, int fileSize, bool expectedToSucceed) {
state void *buf = FastAllocator<4096>::allocate(); // we leak this if there is an error, but that shouldn't be a big deal
state int iteration = 0;

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "simulator.h"
#include "flow/IThreadPool.h"
#include "IAsyncFile.h"
@ -50,7 +49,15 @@ bool simulator_should_inject_fault( const char* context, const char* file, int l
h2 = p->fault_injection_r;
if (h1 < p->fault_injection_p1*std::numeric_limits<uint32_t>::max()) {
TEST(true);
TEST(error_code == error_code_io_timeout);
TEST(error_code == error_code_io_error);
TEST(error_code == error_code_platform_error);
TraceEvent(SevWarn, "FaultInjected").detail("Context", context).detail("File", file).detail("Line", line).detail("ErrorCode", error_code);
if(error_code == error_code_io_timeout) {
g_network->setGlobal(INetwork::enASIOTimedOut, (flowGlobalType)true);
g_network->setGlobal(INetwork::enASIOTimedOutInjected, (flowGlobalType)true);
}
return true;
}
}
@ -521,8 +528,8 @@ private:
debugFileCheck("SimpleFileRead", self->filename, data, offset, length);
INJECT_FAULT(io_error, "SimpleFile::read");
INJECT_FAULT(io_timeout, "SimpleFile::read");
INJECT_FAULT(io_error, "SimpleFile::read");
return read_bytes;
}
@ -559,8 +566,9 @@ private:
}
debugFileCheck("SimpleFileWrite", self->filename, (void*)data.begin(), offset, data.size());
INJECT_FAULT(io_error, "SimpleFile::write");
INJECT_FAULT(io_timeout, "SimpleFile::write");
INJECT_FAULT(io_error, "SimpleFile::write");
return Void();
}
@ -580,6 +588,8 @@ private:
if (randLog)
fprintf( randLog, "SFT2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
INJECT_FAULT( io_timeout, "SimpleFile::truncate" );
INJECT_FAULT( io_error, "SimpleFile::truncate" );
return Void();
@ -611,6 +621,8 @@ private:
if (randLog)
fprintf( randLog, "SFC2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
INJECT_FAULT( io_timeout, "SimpleFile::sync" );
INJECT_FAULT( io_error, "SimpleFile::sync" );
return Void();
@ -1031,6 +1043,7 @@ public:
killProcess_internal( p, KillInstantly );
}
void killProcess_internal( ProcessInfo* machine, KillType kt ) {
TEST( true ); // Simulated machine was killed with any kill type
TEST( kt == KillInstantly ); // Simulated machine was killed instantly
TEST( kt == InjectFaults ); // Simulated machine was killed with faults
@ -1092,12 +1105,17 @@ public:
auto ktOrig = kt;
if (killIsSafe) ASSERT( kt == ISimulator::RebootAndDelete ); // Only types of "safe" kill supported so far
TEST(true); // Trying to killing a machine
TEST(kt == KillInstantly); // Trying to kill instantly
TEST(kt == InjectFaults); // Trying to kill by injecting faults
if(speedUpSimulation && !forceKill) {
return false;
}
int processesOnMachine = 0;
KillType originalKt = kt;
// Reboot if any of the processes are protected and count the number of processes not rebooting
for (auto& process : machines[zoneId].processes) {
if (protectedAddresses.count(process->address))
@ -1143,6 +1161,8 @@ public:
}
}
TEST(originalKt != kt); // Kill type was changed from requested to reboot.
// Check if any processes on machine are rebooting
if( processesOnMachine != processesPerMachine && kt >= RebootAndDelete ) {
TEST(true); //Attempted reboot, but the target did not have all of its processes running

View File

@ -75,16 +75,16 @@ protected:
virtual ~IKeyValueStore() {}
};
extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool validateFile=false );
extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false );
extern IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit );
extern IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot );
inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string const& filename, UID logID, int64_t memoryLimit, bool validateFile=false ) {
inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string const& filename, UID logID, int64_t memoryLimit, bool checkChecksums=false, bool checkIntegrity=false ) {
switch( storeType ) {
case KeyValueStoreType::SSD_BTREE_V1:
return keyValueStoreSQLite( filename, logID, KeyValueStoreType::SSD_BTREE_V1);
return keyValueStoreSQLite(filename, logID, KeyValueStoreType::SSD_BTREE_V1, false, checkIntegrity);
case KeyValueStoreType::SSD_BTREE_V2:
return keyValueStoreSQLite(filename, logID, KeyValueStoreType::SSD_BTREE_V2, validateFile);
return keyValueStoreSQLite(filename, logID, KeyValueStoreType::SSD_BTREE_V2, checkChecksums, checkIntegrity);
case KeyValueStoreType::MEMORY:
return keyValueStoreMemory( filename, logID, memoryLimit );
default:
@ -93,4 +93,7 @@ inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string con
UNREACHABLE(); // FIXME: is this right?
}
Future<Void> GenerateIOLogChecksumFile(std::string const & filename);
Future<Void> KVFileCheck(std::string const & filename, bool const &integrity);
#endif

View File

@ -253,11 +253,18 @@ struct SQLiteDB : NonCopyable {
void checkError( const char* context, int rc ) {
//if (g_random->random01() < .001) rc = SQLITE_INTERRUPT;
if (rc) {
Error err = (rc == SQLITE_IOERR_TIMEOUT) ? io_timeout() : io_error();
// Our exceptions don't propagate through sqlite, so we don't know for sure if the error that caused this was
// an injected fault. Assume that if fault injection is happening, this is an injected fault.
Error err = io_error();
if (g_network->isSimulated() && (g_simulator.getCurrentProcess()->fault_injection_p1 || g_simulator.getCurrentProcess()->rebooting))
//
// Also, timeouts returned from our VFS plugin to SQLite are no always propagated out of the sqlite API as timeouts,
// so if a timeout has been injected in this process then assume this error is an injected fault.
if (g_network->isSimulated() &&
( (g_simulator.getCurrentProcess()->fault_injection_p1 || g_simulator.getCurrentProcess()->rebooting) || (bool)g_network->global(INetwork::enASIOTimedOutInjected))
)
err = err.asInjectedFault();
if (db)
@ -293,16 +300,34 @@ struct SQLiteDB : NonCopyable {
if (rc && rc != SQLITE_DONE) checkError("vacuum", rc);
return rc == SQLITE_DONE;
}
void check(bool verbose) {
int check(bool verbose) {
int errors = 0;
int tables[] = {1, table, freetable};
TraceEvent("BTreeIntegrityCheckBegin").detail("Filename", filename);
char* e = sqlite3BtreeIntegrityCheck(btree, tables, 3, 100, &errors, verbose);
char* e = sqlite3BtreeIntegrityCheck(btree, tables, 3, 1000, &errors, verbose);
if (!(g_network->isSimulated() && (g_simulator.getCurrentProcess()->fault_injection_p1 || g_simulator.getCurrentProcess()->rebooting))) {
TraceEvent((errors||e) ? SevError : SevInfo, "BTreeIntegrityCheck").detail("filename", filename).detail("ErrorTotal", errors).detail("ErrorDetail", e ? e : "");
TraceEvent((errors||e) ? SevError : SevInfo, "BTreeIntegrityCheck").detail("filename", filename).detail("ErrorTotal", errors);
if(e != nullptr) {
// e is a string containing 1 or more lines. Create a separate trace event for each line.
char *lineStart = e;
while(lineStart != nullptr) {
char *lineEnd = strstr(lineStart, "\n");
if(lineEnd != nullptr) {
*lineEnd = '\0';
++lineEnd;
}
// If the line length found is not zero then print a trace event
if(*lineStart != '\0')
TraceEvent(SevError, "BTreeIntegrityCheck").detail("filename", filename).detail("ErrorDetail", lineStart);
lineStart = lineEnd;
}
}
TEST(true); // BTree integrity checked
}
if (e) sqlite3_free(e);
return errors;
}
int checkAllPageChecksums();
};
@ -1440,7 +1465,7 @@ public:
virtual Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID );
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 );
KeyValueStoreSQLite(std::string const& filename, UID logID, KeyValueStoreType type, bool validateFile);
KeyValueStoreSQLite(std::string const& filename, UID logID, KeyValueStoreType type, bool checkChecksums, bool checkIntegrity);
~KeyValueStoreSQLite();
Future<Void> doClean();
@ -1453,7 +1478,6 @@ private:
Reference<IThreadPool> readThreads, writeThread;
Promise<Void> stopped;
Future<Void> cleaning, logging, starting, stopOnErr;
bool validateFile;
int64_t readsRequested, writesRequested;
ThreadSafeCounter readsComplete;
@ -1556,8 +1580,9 @@ private:
UID dbgid;
vector<Reference<ReadCursor>>& readThreads;
bool checkAllChecksumsOnOpen;
bool checkIntegrityOnOpen;
explicit Writer( std::string const& filename, bool isBtreeV2, bool checkAllChecksumsOnOpen, volatile int64_t& writesComplete, volatile SpringCleaningStats& springCleaningStats, volatile int64_t& diskBytesUsed, volatile int64_t& freeListPages, UID dbgid, vector<Reference<ReadCursor>>* pReadThreads )
explicit Writer( std::string const& filename, bool isBtreeV2, bool checkAllChecksumsOnOpen, bool checkIntegrityOnOpen, volatile int64_t& writesComplete, volatile SpringCleaningStats& springCleaningStats, volatile int64_t& diskBytesUsed, volatile int64_t& freeListPages, UID dbgid, vector<Reference<ReadCursor>>* pReadThreads )
: conn( filename, isBtreeV2, isBtreeV2 ),
commits(), setsThisCommit(),
freeTableEmpty(false),
@ -1568,7 +1593,8 @@ private:
cursor(NULL),
dbgid(dbgid),
readThreads(*pReadThreads),
checkAllChecksumsOnOpen(checkAllChecksumsOnOpen)
checkAllChecksumsOnOpen(checkAllChecksumsOnOpen),
checkIntegrityOnOpen(checkIntegrityOnOpen)
{
}
~Writer() {
@ -1577,8 +1603,15 @@ private:
TraceEvent("KVWriterDestroyed", dbgid);
}
virtual void init() {
if(checkAllChecksumsOnOpen)
conn.checkAllPageChecksums();
if(checkAllChecksumsOnOpen) {
if(conn.checkAllPageChecksums() != 0) {
// It's not strictly necessary to discard the file immediately if a page checksum error is found
// because most of the file could be valid and bad pages will be detected if they are read.
// However, we shouldn't use the file unless we absolutely have to because some range(s) of keys
// have effectively lost a replica.
throw file_corrupt();
}
}
conn.open(true);
//If a wal file fails during the commit process before finishing a checkpoint, then it is possible that our wal file will be non-empty
@ -1588,8 +1621,12 @@ private:
cursor = new Cursor(conn, true);
if (EXPENSIVE_VALIDATION)
conn.check(false);
if (checkIntegrityOnOpen || EXPENSIVE_VALIDATION) {
if(conn.check(false) != 0) {
// A corrupt btree structure must not be used.
throw file_corrupt();
}
}
}
struct InitAction : TypedAction<Writer, InitAction>, FastAllocated<InitAction> {
@ -1840,8 +1877,8 @@ private:
}
}
};
IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool validateFile ) {
return new KeyValueStoreSQLite(filename, logID, storeType, validateFile);
IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums, bool checkIntegrity) {
return new KeyValueStoreSQLite(filename, logID, storeType, checkChecksums, checkIntegrity);
}
ACTOR Future<Void> cleanPeriodically( KeyValueStoreSQLite* self ) {
@ -1860,14 +1897,13 @@ ACTOR static Future<Void> startReadThreadsWhen( KeyValueStoreSQLite* kv, Future<
sqlite3_vfs *vfsAsync();
static int vfs_registered = 0;
KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename, UID id, KeyValueStoreType storeType, bool validateFile)
KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename, UID id, KeyValueStoreType storeType, bool checkChecksums, bool checkIntegrity)
: type(storeType),
filename(filename),
logID(id),
readThreads(CoroThreadPool::createThreadPool()),
writeThread(CoroThreadPool::createThreadPool()),
readsRequested(0), writesRequested(0), writesComplete(0), diskBytesUsed(0), freeListPages(0),
validateFile(validateFile)
readsRequested(0), writesRequested(0), writesComplete(0), diskBytesUsed(0), freeListPages(0)
{
stopOnErr = stopOnError(this);
@ -1887,7 +1923,7 @@ KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename, UID id, Ke
sqlite3_soft_heap_limit64( SERVER_KNOBS->SOFT_HEAP_LIMIT ); // SOMEDAY: Is this a performance issue? Should we drop the cache sizes for individual threads?
int taskId = g_network->getCurrentTask();
g_network->setCurrentTask(TaskDiskWrite);
writeThread->addThread( new Writer(filename, type==KeyValueStoreType::SSD_BTREE_V2, validateFile, writesComplete, springCleaningStats, diskBytesUsed, freeListPages, id, &readCursors) );
writeThread->addThread( new Writer(filename, type==KeyValueStoreType::SSD_BTREE_V2, checkChecksums, checkIntegrity, writesComplete, springCleaningStats, diskBytesUsed, freeListPages, id, &readCursors) );
g_network->setCurrentTask(taskId);
auto p = new Writer::InitAction();
auto f = p->result.getFuture();
@ -1969,3 +2005,49 @@ void createTemplateDatabase() {
db1.createFromScratch();
db2.createFromScratch();
}
ACTOR Future<Void> GenerateIOLogChecksumFile(std::string filename) {
if(!fileExists(filename))
throw file_not_found();
FILE *f = fopen(filename.c_str(), "r");
FILE *fout = fopen((filename + ".checksums").c_str(), "w");
uint8_t buf[4096];
unsigned int c = 0;
while(fread(buf, 1, 4096, f) > 0)
fprintf(fout, "%u %u\n", c++, hashlittle(buf, 4096, 0xab12fd93));
fclose(f);
fclose(fout);
return Void();
}
// If integrity is true, a full btree integrity check is done.
// If integrity is false, only a scan of all pages to validate their checksums is done.
ACTOR Future<Void> KVFileCheck(std::string filename, bool integrity) {
if(!fileExists(filename))
throw file_not_found();
StringRef kvFile(filename);
KeyValueStoreType type = KeyValueStoreType::END;
if(kvFile.endsWith(LiteralStringRef(".fdb")))
type = KeyValueStoreType::SSD_BTREE_V1;
else if(kvFile.endsWith(LiteralStringRef(".sqlite")))
type = KeyValueStoreType::SSD_BTREE_V2;
ASSERT(type != KeyValueStoreType::END);
state IKeyValueStore* store = keyValueStoreSQLite(filename, UID(0, 0), type, !integrity, integrity);
ASSERT(store != nullptr);
// Wait for integry check to finish
Optional<Value> _ = wait(store->readValue(StringRef()));
if(store->getError().isError())
Void _ = wait(store->getError());
Future<Void> c = store->onClosed();
store->close();
Void _ = wait(c);
return Void();
}

View File

@ -244,6 +244,10 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
Void _ = wait(listen || fd || success(onShutdown) || backup);
} catch (Error& e) {
// If in simulation, if we make it here with an error other than io_timeout but enASIOTimedOut is set then somewhere an io_timeout was converted to a different error.
if(g_network->isSimulated() && e.code() != error_code_io_timeout && (bool)g_network->global(INetwork::enASIOTimedOut))
TraceEvent(SevError, "IOTimeoutErrorSuppressed").detail("ErrorCode", e.code()).backtrace();
if (onShutdown.isReady() && onShutdown.isError()) throw onShutdown.getError();
if(e.code() != error_code_actor_cancelled)
printf("SimulatedFDBDTerminated: %s\n", e.what());

View File

@ -113,6 +113,8 @@ static int asyncRead(sqlite3_file *pFile, void *zBuf, int iAmt, sqlite_int64 iOf
}
return SQLITE_OK;
} catch (Error& e) {
if(e.code() == error_code_io_timeout)
return SQLITE_IOERR_TIMEOUT;
return SQLITE_IOERR_READ;
}
}
@ -146,6 +148,8 @@ static int asyncReadZeroCopy(sqlite3_file *pFile, void **data, int iAmt, sqlite_
++p->debug_zcreads;
return SQLITE_OK;
} catch (Error& e) {
if(e.code() == error_code_io_timeout)
return SQLITE_IOERR_TIMEOUT;
return SQLITE_IOERR_READ;
}
}
@ -179,6 +183,8 @@ static int asyncWrite(sqlite3_file *pFile, const void *zBuf, int iAmt, sqlite_in
waitFor( p->file->write( zBuf, iAmt, iOfst ) );
return SQLITE_OK;
} catch(Error& e) {
if(e.code() == error_code_io_timeout)
return SQLITE_IOERR_TIMEOUT;
return SQLITE_IOERR_WRITE;
}
}
@ -189,6 +195,8 @@ static int asyncTruncate(sqlite3_file *pFile, sqlite_int64 size){
waitFor( p->file->truncate( size ) );
return SQLITE_OK;
} catch(Error& e) {
if(e.code() == error_code_io_timeout)
return SQLITE_IOERR_TIMEOUT;
return SQLITE_IOERR_TRUNCATE;
}
}
@ -204,7 +212,9 @@ static int asyncSync(sqlite3_file *pFile, int flags){
.detail("sqlite3_file", (int64_t)pFile)
.detail("IAsyncFile", (int64_t)p->file.getPtr())
.error(e);
if(e.code() == error_code_io_timeout)
return SQLITE_IOERR_TIMEOUT;
return SQLITE_IOERR_FSYNC;
}
}

View File

@ -35,6 +35,7 @@
#include "ConflictSet.h"
#include "DataDistribution.h"
#include "NetworkTest.h"
#include "IKeyValueStore.h"
#include <stdarg.h>
#include <stdio.h>
#include "pubsub.h"
@ -74,7 +75,7 @@
enum {
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_NEWCONSOLE, OPT_NOBOX, OPT_TESTFILE, OPT_RESTARTING, OPT_RANDOMSEED, OPT_KEY, OPT_MEMLIMIT, OPT_STORAGEMEMLIMIT, OPT_MACHINEID, OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE, OPT_METRICSPREFIX,
OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM };
OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_KVFILE };
CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_CONNFILE, "-C", SO_REQ_SEP },
@ -630,7 +631,7 @@ static void printUsage( const char *name, bool devhelp ) {
printf(" -r ROLE, --role ROLE\n"
" Server role (valid options are fdbd, test, multitest,\n");
printf(" simulation, networktestclient, networktestserver,\n");
printf(" consistencycheck). The default is `fdbd'.\n");
printf(" consistencycheck, kvfileintegritycheck, kvfilegeneratesums). The default is `fdbd'.\n");
#ifdef _WIN32
printf(" -n, --newconsole\n"
" Create a new console.\n");
@ -836,6 +837,8 @@ int main(int argc, char* argv[]) {
CreateTemplateDatabase,
NetworkTestClient,
NetworkTestServer,
KVFileIntegrityCheck,
KVFileGenerateIOLogChecksums,
ConsistencyCheck
};
std::string fileSystemPath = "", dataFolder, connFile = "", seedConnFile = "", seedConnString = "", logFolder = ".", metricsConnFile = "", metricsPrefix = "";
@ -844,6 +847,7 @@ int main(int argc, char* argv[]) {
uint32_t randomSeed = platform::getRandomSeed();
const char *testFile = "tests/default.txt";
std::string kvFile;
std::string publicAddressStr, listenAddressStr = "public";
std::string testServersStr;
NetworkAddress publicAddress, listenAddress;
@ -965,6 +969,8 @@ int main(int argc, char* argv[]) {
else if (!strcmp(sRole, "createtemplatedb")) role = CreateTemplateDatabase;
else if (!strcmp(sRole, "networktestclient")) role = NetworkTestClient;
else if (!strcmp(sRole, "networktestserver")) role = NetworkTestServer;
else if (!strcmp(sRole, "kvfileintegritycheck")) role = KVFileIntegrityCheck;
else if (!strcmp(sRole, "kvfilegeneratesums")) role = KVFileGenerateIOLogChecksums;
else if (!strcmp(sRole, "consistencycheck")) role = ConsistencyCheck;
else {
fprintf(stderr, "ERROR: Unknown role `%s'\n", sRole);
@ -1095,6 +1101,9 @@ int main(int argc, char* argv[]) {
case OPT_TESTFILE:
testFile = args.OptionArg();
break;
case OPT_KVFILE:
kvFile = args.OptionArg();
break;
case OPT_RESTARTING:
restarting = true;
break;
@ -1228,7 +1237,7 @@ int main(int argc, char* argv[]) {
bool autoPublicAddress = StringRef(publicAddressStr).startsWith(LiteralStringRef("auto:"));
Reference<ClusterConnectionFile> connectionFile;
if ( (role != Simulation && role != CreateTemplateDatabase) || autoPublicAddress ) {
if ( (role != Simulation && role != CreateTemplateDatabase && role != KVFileIntegrityCheck && role != KVFileGenerateIOLogChecksums) || autoPublicAddress ) {
if (seedSpecified && !fileExists(connFile)){
std::string connectionString = seedConnString.length() ? seedConnString : "";
@ -1594,6 +1603,12 @@ int main(int argc, char* argv[]) {
} else if (role == NetworkTestServer) {
f = stopAfter( networkTestServer() );
g_network->run();
} else if (role == KVFileIntegrityCheck) {
auto f = stopAfter( KVFileCheck(kvFile, true) );
g_network->run();
} else if (role == KVFileGenerateIOLogChecksums) {
auto f = stopAfter( GenerateIOLogChecksumFile(kvFile) );
g_network->run();
}
int rc = FDB_EXIT_SUCCESS;

View File

@ -136,6 +136,7 @@
<ActorCompiler Include="workloads\Unreadable.actor.cpp" />
<ActorCompiler Include="workloads\VersionStamp.actor.cpp" />
<ActorCompiler Include="workloads\Serializability.actor.cpp" />
<ActorCompiler Include="workloads\DiskDurability.actor.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="ApplyMetadataMutation.h" />

View File

@ -454,6 +454,7 @@ SQLITE_API int sqlite3_exec(
#define SQLITE_IOERR_SHMOPEN (SQLITE_IOERR | (18<<8))
#define SQLITE_IOERR_SHMSIZE (SQLITE_IOERR | (19<<8))
#define SQLITE_IOERR_SHMLOCK (SQLITE_IOERR | (20<<8))
#define SQLITE_IOERR_TIMEOUT (SQLITE_IOERR | (21<<8))
#define SQLITE_LOCKED_SHAREDCACHE (SQLITE_LOCKED | (1<<8))
#define SQLITE_BUSY_RECOVERY (SQLITE_BUSY | (1<<8))
#define SQLITE_CANTOPEN_NOTEMPDIR (SQLITE_CANTOPEN | (1<<8))

View File

@ -75,6 +75,17 @@ struct ErrorInfo {
template <class Ar> void serialize(Ar&) { ASSERT(false); }
};
Error checkIOTimeout(Error const &e) {
// Convert io_errors to io_timeout if global timeout bool was set
if(e.code() == error_code_io_error && (bool)g_network->global(INetwork::enASIOTimedOut)) {
Error timeout = io_timeout();
if(e.isInjectedFault() || (bool)g_network->global(INetwork::enASIOTimedOutInjected))
timeout = timeout.asInjectedFault();
return timeout;
}
return e;
}
ACTOR Future<Void> forwardError( PromiseStream<ErrorInfo> errors,
const char* context, UID id,
Future<Void> process )
@ -108,9 +119,12 @@ ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id,
}
}
ACTOR Future<Void> workerDisplayErrors(FutureStream<ErrorInfo> errors) {
ACTOR Future<Void> workerHandleErrors(FutureStream<ErrorInfo> errors) {
loop choose {
when( ErrorInfo err = waitNext(errors) ) {
when( ErrorInfo _err = waitNext(errors) ) {
ErrorInfo err = _err;
err.error = checkIOTimeout(err.error); // Convert io_errors to io_timeout if ASIO flag is set
bool ok =
err.error.code() == error_code_success ||
err.error.code() == error_code_please_reboot ||
@ -436,7 +450,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities,
ProcessClass processClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
state PromiseStream< ErrorInfo > errors;
state Future<Void> displayErrors = workerDisplayErrors( errors.getFuture() ); // Needs to be stopped last
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
state ActorCollection errorForwarders(false);
state Future<Void> loggingTrigger = Void();
state double loggingDelay = SERVER_KNOBS->WORKER_LOGGING_INTERVAL;
@ -535,7 +549,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
DiskStore s = stores[f];
// FIXME: Error handling
if( s.storedComponent == DiskStore::Storage ) {
IKeyValueStore* kv = openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles);
IKeyValueStore* kv = openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, false, validateDataFiles);
Future<Void> kvClosed = kv->onClosed();
filesClosed.add( kvClosed );
@ -792,7 +806,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
loggingTrigger = delay( loggingDelay, TaskFlushTrace );
}
when( Void _ = wait( errorForwarders.getResult() ) ) {}
when( Void _ = wait( displayErrors ) ) {}
when( Void _ = wait( handleErrors ) ) {}
}
} catch (Error& err) {
state Error e = err;
@ -878,22 +892,27 @@ ACTOR Future<Void> fdbd(
std::string metricsConnFile,
std::string metricsPrefix )
{
ServerCoordinators coordinators( connFile );
TraceEvent("StartingFDBD").detailext("ZoneID", localities.zoneId()).detailext("machineId", localities.machineId()).detail("DiskPath", dataFolder).detail("CoordPath", coordFolder);
try {
ServerCoordinators coordinators( connFile );
TraceEvent("StartingFDBD").detailext("ZoneID", localities.zoneId()).detailext("machineId", localities.machineId()).detail("DiskPath", dataFolder).detail("CoordPath", coordFolder);
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> );
Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> );
vector<Future<Void>> v;
if ( coordFolder.size() )
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc ), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> );
Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> );
vector<Future<Void>> v;
if ( coordFolder.size() )
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc ), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
Void _ = wait( quorum(v,1) );
ASSERT(false); // None of these actors should terminate normally
throw internal_error();
Void _ = wait( quorum(v,1) );
ASSERT(false); // None of these actors should terminate normally
throw internal_error();
} catch(Error &e) {
Error err = checkIOTimeout(e);
throw err;
}
}

View File

@ -0,0 +1,194 @@
/*
* DiskDurability.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/actorcompiler.h"
#include "workloads.h"
#include "flow/ActorCollection.h"
#include "flow/SystemMonitor.h"
#include "fdbrpc/IAsyncFile.h"
#include "AsyncFile.actor.h"
struct DiskDurabilityWorkload : public AsyncFileWorkload
{
struct FileBlock {
FileBlock(int blockNum) : blockNum(blockNum), lastData(0), lock(new FlowLock(1)) {}
~FileBlock() {}
int blockNum;
int64_t lastData;
Reference<FlowLock> lock;
ACTOR static Future<Void> test_impl(FileBlock *self, Reference<AsyncFileHandle> file, int pages, Reference<AsyncFileBuffer> buffer) {
Void _ = wait(self->lock->take(1));
state int64_t offset = (int64_t)self->blockNum * pages * _PAGE_SIZE;
state int size = pages * _PAGE_SIZE;
state int64_t newData;
if(self->lastData == 0)
newData = g_random->randomInt64(std::numeric_limits<int64_t>::min(), std::numeric_limits<int64_t>::max());
else {
++newData;
int readBytes = wait(file->file->read(buffer->buffer, size, offset));
ASSERT(readBytes == size);
}
if(newData == 0)
newData = 1;
int64_t *arr = (int64_t *)buffer->buffer;
for(int i = 0, imax = size / sizeof(int64_t); i < imax; ++i) {
if(self->lastData != 0 && arr[i] != self->lastData) {
TraceEvent(SevError, "WriteWasNotDurable")
.detail("Filename", file->path)
.detail("Offset", offset)
.detail("OpSize", size)
.detail("Expected", self->lastData)
.detail("Found", arr[i])
.detail("Index", i);
throw io_error();
}
arr[i] = newData;
}
Void _ = wait(file->file->write(buffer->buffer, size, offset));
self->lock->release(1);
self->lastData = newData;
return Void();
}
Future<Void> test(Reference<AsyncFileHandle> file, int pages, Reference<AsyncFileBuffer> buffer) { return test_impl(this, file, pages, buffer); }
};
vector<FileBlock> blocks;
int pagesPerWrite;
int filePages;
int writers;
double syncInterval;
DiskDurabilityWorkload(WorkloadContext const& wcx)
: AsyncFileWorkload(wcx)
{
writers = getOption(options, LiteralStringRef("writers"), 1);
filePages = getOption(options, LiteralStringRef("filePages"), 1000000);
fileSize = filePages * _PAGE_SIZE;
unbufferedIO = true;
uncachedIO = true;
fillRandom = false;
pagesPerWrite = getOption(options, LiteralStringRef("pagesPerWrite"), 1);
syncInterval = (double)(getOption(options, LiteralStringRef("syncIntervalMs"), 2000)) / 1000;
}
virtual ~DiskDurabilityWorkload(){ }
virtual std::string description()
{
return "DiskDurability";
}
virtual Future<Void> setup(Database const& cx)
{
if(enabled)
return _setup(this);
return Void();
}
ACTOR Future<Void> _setup(DiskDurabilityWorkload *self)
{
ASSERT(!self->path.empty());
int flags = IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_CREATE;
if(self->unbufferedIO)
flags |= IAsyncFile::OPEN_UNBUFFERED;
if(self->uncachedIO)
flags |= IAsyncFile::OPEN_UNCACHED;
try
{
state Reference<IAsyncFile> file = wait(IAsyncFileSystem::filesystem()->open(self->path, flags, 0666));
if(self->fileHandle.getPtr() == NULL)
self->fileHandle = Reference<AsyncFileHandle>(new AsyncFileHandle(file, self->path, false));
else
self->fileHandle->file = file;
}
catch(Error &error)
{
TraceEvent(SevError, "TestFailure").detail("Reason", "Could not open file");
throw;
}
return Void();
}
virtual Future<Void> start(Database const& cx)
{
if(enabled)
return _start(this);
return Void();
}
static unsigned int intHash(unsigned int x) {
x = ((x >> 16) ^ x) * 0x45d9f3b;
x = ((x >> 16) ^ x) * 0x45d9f3b;
x = (x >> 16) ^ x;
return x;
}
ACTOR static Future<Void> worker(DiskDurabilityWorkload *self) {
state Reference<AsyncFileBuffer> buffer = Reference<AsyncFileBuffer>(new AsyncFileBuffer(_PAGE_SIZE, true));
state int logfp = (int)ceil(log2(self->filePages));
loop {
int block = intHash(std::min<int>(g_random->randomInt(0, 1 << g_random->randomInt(0, logfp)), self->filePages - 1)) % self->filePages;
Void _ = wait(self->blocks[block].test(self->fileHandle, self->pagesPerWrite, buffer));
}
}
ACTOR static Future<Void> syncLoop(DiskDurabilityWorkload *self) {
loop {
Void _ = wait(delay(g_random->random01() * self->syncInterval));
Void _ = wait(self->fileHandle->file->sync());
}
}
ACTOR Future<Void> _start(DiskDurabilityWorkload *self)
{
self->blocks.reserve(self->filePages);
for(int i = 0; i < self->filePages; ++i)
self->blocks.push_back(FileBlock(i));
state std::vector<Future<Void>> tasks;
tasks.push_back(syncLoop(self));
for(int i = 0; i < self->writers; ++i)
tasks.push_back(worker(self));
Void _ = wait(timeout(waitForAll(tasks), self->testDuration, Void()));
return Void();
}
virtual void getMetrics(vector<PerfMetric>& m)
{
}
};
WorkloadFactory<DiskDurabilityWorkload> DiskDurabilityWorkloadFactory("DiskDurability");

View File

@ -400,4 +400,4 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
Void _ = wait( c );
if (err.code() != invalid_error_code) throw err;
return Void();
}
}

View File

@ -118,6 +118,9 @@ struct MachineAttritionWorkload : TestWorkload {
ASSERT( g_network->isSimulated() );
TEST(g_simulator.killableMachines > 0); // Some machines can be killed
TEST(g_simulator.killableDatacenters > 0); // Some processes can be killed
if( self->killDc ) {
Void _ = wait( delay( delayBeforeKill ) );
@ -143,6 +146,8 @@ struct MachineAttritionWorkload : TestWorkload {
TraceEvent("WorkerKillBegin").detail("killedMachines", killedMachines)
.detail("machinesToKill", self->machinesToKill).detail("machinesToLeave", self->machinesToLeave)
.detail("machines", self->machines.size());
TEST(true); // Killing a machine
Void _ = wait( delay( delayBeforeKill ) );
TraceEvent("WorkerKillAfterDelay");

View File

@ -96,6 +96,7 @@ ERROR( http_bad_response, 1518, "HTTP response was not valid." )
ERROR( http_not_accepted, 1519, "HTTP request not accepted." )
ERROR( checksum_failed, 1520, "A data checksum failed." )
ERROR( io_timeout, 1521, "A disk IO operation failed to complete in a timely manner." )
ERROR( file_corrupt, 1522, "A structurally corrupt data file was detected." )
// 2xxx Attempt (presumably by a _client_) to do something illegal. If an error is known to
// be internally caused, it should be 41xx

View File

@ -64,7 +64,7 @@ bool validationIsEnabled();
#define BUGGIFY_WITH_PROB(x) (getSBVar(__FILE__, __LINE__) && g_random->random01() < (x))
#define BUGGIFY BUGGIFY_WITH_PROB(P_BUGGIFIED_SECTION_FIRES)
#define EXPENSIVE_VALIDATION validationIsEnabled() && g_random->random01() < P_EXPENSIVE_VALIDATION
#define EXPENSIVE_VALIDATION (validationIsEnabled() && g_random->random01() < P_EXPENSIVE_VALIDATION)
extern std::string format(const char* form, ...);
extern Standalone<StringRef> strinc(StringRef const& str);

View File

@ -196,7 +196,7 @@ public:
enum enumGlobal {
enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3,
enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8
enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enASIOTimedOutInjected = 10
};
virtual void longTaskCheck( const char* name ) {}

9
tests/DiskDurability.txt Executable file
View File

@ -0,0 +1,9 @@
testTitle=DiskDurability
testName=DiskDurability
useDB=false
pagesPerWrite=1
writers=64
fileName=deleteme
filePages=1000000
testDuration=360000
timeout=360000

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long