Merge branch 'master' into java-add-missing-dispose

This commit is contained in:
A.J. Beamon 2017-12-08 14:53:05 -08:00
commit 33d047b6e8
8 changed files with 116 additions and 42 deletions

View File

@ -720,7 +720,7 @@ namespace fileBackup {
if (taskVersion > version) {
state Error err = task_invalid_version();
TraceEvent(SevError, "BA_BackupRangeTaskFunc_execute").detail("taskVersion", taskVersion).detail("Name", printable(name)).detail("Version", version);
TraceEvent(SevWarn, "BA_BackupRangeTaskFunc_execute").detail("taskVersion", taskVersion).detail("Name", printable(name)).detail("Version", version);
if (KeyBackedConfig::TaskParams.uid().exists(task)) {
std::string msg = format("ERROR: %s task version `%lu' is greater than supported version `%lu'", task->params[Task::reservedTaskParamKeyType].toString().c_str(), (unsigned long)taskVersion, (unsigned long)version);
Void _ = wait(BackupConfig(task).logError(cx, err, msg));
@ -1414,7 +1414,7 @@ namespace fileBackup {
Void _ = wait(IBackupContainer::openContainer(backupContainer)->renameFile(tempFileName, logFileName));
}
catch (Error &e) {
TraceEvent(SevError, "BA_BackupLogRangeTaskFunc_endLogFileError").error(e).detail("backupContainer", backupContainer).detail("Rename_file_from", tempFileName);
TraceEvent(SevWarn, "BA_BackupLogRangeTaskFunc_endLogFileError").error(e).detail("backupContainer", backupContainer).detail("Rename_file_from", tempFileName);
throw;
}

View File

@ -227,10 +227,27 @@ public:
return success(result);
}
// TODO(alexmiller): Remove when we upgrade the dev docker image to >14.10
#ifndef FALLOC_FL_ZERO_RANGE
#define FALLOC_FL_ZERO_RANGE 0x10
#endif
virtual Future<Void> zeroRange( int64_t offset, int64_t length ) override {
bool success = false;
if (ctx.fallocateZeroSupported) {
int rc = fallocate( fd, FALLOC_FL_ZERO_RANGE, offset, length );
if (rc == EOPNOTSUPP) {
ctx.fallocateZeroSupported = false;
}
if (rc == 0) {
success = true;
}
}
return success ? Void() : IAsyncFile::zeroRange(offset, length);
}
virtual Future<Void> truncate( int64_t size ) {
++countFileLogicalWrites;
++countLogicalWrites;
if(failed) {
return io_timeout();
}
@ -484,6 +501,7 @@ private:
int outstanding;
double ioStallBegin;
bool fallocateSupported;
bool fallocateZeroSupported;
std::priority_queue<IOBlock*, std::vector<IOBlock*>, IOBlock::indirect_order_by_priority> queue;
Int64MetricHandle countAIOSubmit;
Int64MetricHandle countAIOCollect;
@ -499,7 +517,7 @@ private:
EventMetricHandle<SlowAioSubmit> slowAioSubmitMetric;
uint32_t opsIssued;
Context() : iocx(0), evfd(-1), outstanding(0), opsIssued(0), ioStallBegin(0), fallocateSupported(true), submittedRequestList(nullptr) {
Context() : iocx(0), evfd(-1), outstanding(0), opsIssued(0), ioStallBegin(0), fallocateSupported(true), fallocateZeroSupported(true), submittedRequestList(nullptr) {
setIOTimeout(0);
}

View File

@ -198,7 +198,7 @@ Reference<BlobStoreEndpoint> BlobStoreEndpoint::fromString(std::string const &ur
} catch(std::string &err) {
if(error != nullptr)
*error = err;
TraceEvent(SevWarnAlways, "BlobStoreEndpoint").detail("Description", err).detail("Format", getURLFormat()).detail("URL", url);
TraceEvent(SevWarnAlways, "BlobStoreEndpointBadURL").detail("Description", err).detail("Format", getURLFormat()).detail("URL", url);
throw file_not_found();
}
}
@ -407,7 +407,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
// But only if our previous attempt was not the last allowable try.
retryable = retryable && (thisTry < maxTries);
TraceEvent event(retryable ? SevWarn : SevWarnAlways, retryable ? "BlobStoreEndpointRequestFailedRetryable" : "BlobStoreEndpointRequestFailed");
TraceEvent event(SevWarn, retryable ? "BlobStoreEndpointRequestFailedRetryable" : "BlobStoreEndpointRequestFailed");
event.detail("RemoteEndpoint", address)
.detail("Verb", verb)

View File

@ -25,7 +25,61 @@
#include "flow/UnitTest.h"
#include <iostream>
IAsyncFile::IAsyncFile(){};
IAsyncFile::~IAsyncFile() = default;
const static unsigned int ONE_MEGABYTE = 1<<20;
const static unsigned int FOUR_KILOBYTES = 4<<10;
ACTOR static Future<Void> zeroRangeHelper( Reference<IAsyncFile> f, int64_t offset, int64_t length, int fixedbyte) {
state int64_t pos = offset;
state void* zeros = aligned_alloc( ONE_MEGABYTE, ONE_MEGABYTE );
memset( zeros, fixedbyte, ONE_MEGABYTE );
while(pos < offset+length) {
state int len = std::min<int64_t>( ONE_MEGABYTE, offset+length-pos );
Void _ = wait( f->write( zeros, len, pos ) );
pos += len;
Void _ = wait( yield() );
}
free(zeros);
return Void();
}
Future<Void> IAsyncFile::zeroRange(int64_t offset, int64_t length) {
return uncancellable(zeroRangeHelper(Reference<IAsyncFile>::addRef(this), offset, length, 0));
}
TEST_CASE( "fileio/zero" ) {
state std::string filename = "/tmp/__ZEROJUNK__";
state Reference<IAsyncFile> f =
wait(IAsyncFileSystem::filesystem()->open(
filename,
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE,
0));
// Verify that we can grow a file with zero().
Void _ = wait(f->sync());
Void _ = wait(f->zeroRange(0, ONE_MEGABYTE));
int64_t size = wait(f->size());
ASSERT( ONE_MEGABYTE == size );
// Verify that zero() does, in fact, zero.
Void _ = wait(zeroRangeHelper(f, 0, ONE_MEGABYTE, 0xff));
Void _ = wait(f->zeroRange(0, ONE_MEGABYTE));
state uint8_t* page = (uint8_t*)malloc(FOUR_KILOBYTES);
int n = wait( f->read(page, FOUR_KILOBYTES, 0) );
ASSERT( n == FOUR_KILOBYTES );
for (int i = 0; i < FOUR_KILOBYTES; i++) {
ASSERT(page[i] == 0);
}
free(page);
// Destruct our file and remove it.
f.clear();
Void _ = wait( IAsyncFileSystem::filesystem()->deleteFile(filename, true) );
return Void();
}
ACTOR static Future<Void> incrementalDeleteHelper( std::string filename, bool mustBeDurable, int64_t truncateAmt, double interval ) {
state Reference<IAsyncFile> file;
@ -53,7 +107,7 @@ ACTOR static Future<Void> incrementalDeleteHelper( std::string filename, bool mu
return Void();
}
Future<Void> IAsyncFile::incrementalDelete( std::string filename, bool mustBeDurable ) {
Future<Void> IAsyncFileSystem::incrementalDeleteFile( std::string filename, bool mustBeDurable ) {
return uncancellable(incrementalDeleteHelper(
filename,
mustBeDurable,
@ -74,6 +128,6 @@ TEST_CASE( "fileio/incrementalDelete" ) {
Void _ = wait(f->truncate(fileSize));
//close the file by deleting the reference
f.clear();
Void _ = wait(IAsyncFile::incrementalDelete(filename, true));
Void _ = wait(IAsyncFileSystem::filesystem()->incrementalDeleteFile(filename, true));
return Void();
}

View File

@ -24,10 +24,14 @@
#include "flow/flow.h"
//All outstanding operations must be cancelled before the destructor of IAsyncFile is called.
// All outstanding operations must be cancelled before the destructor of IAsyncFile is called.
// The desirability of the above semantic is disputed. Some classes (AsyncFileBlobStore,
// AsyncFileCached) maintain references, while others (AsyncFileNonDurable) don't, and the comment
// is unapplicable to some others as well (AsyncFileKAIO). It's safest to assume that all operations
// must complete or cancel, but you should probably look at the file implementations you'll be using.
class IAsyncFile {
public:
IAsyncFile();
virtual ~IAsyncFile();
// Pass these to g_network->open to get an IAsyncFile
enum {
// Implementation relies on the low bits being the same as the SQLite flags (this is validated by a static_assert there)
@ -52,16 +56,15 @@ public:
// For read() and write(), the data buffer must remain valid until the future is ready
virtual Future<int> read( void* data, int length, int64_t offset ) = 0; // Returns number of bytes actually read (from [0,length])
virtual Future<Void> write( void const* data, int length, int64_t offset ) = 0;
// The zeroed data is not guaranteed to be durable after `zeroRange` returns. A call to sync() would be required.
// This operation holds a reference to the AsyncFile, and does not need to be cancelled before a reference is dropped.
virtual Future<Void> zeroRange( int64_t offset, int64_t length );
virtual Future<Void> truncate( int64_t size ) = 0;
virtual Future<Void> sync() = 0;
virtual Future<Void> flush() { return Void(); } // Sends previous writes to the OS if they have been buffered in memory, but does not make them power safe
virtual Future<int64_t> size() = 0;
virtual std::string getFilename() = 0;
// Unlinks a file and then deletes it slowly by truncating the file repeatedly.
// If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure.
static Future<Void> incrementalDelete( std::string filename, bool mustBeDurable );
// Attempt to read the *length bytes at offset without copying. If successful, a pointer to the
// requested bytes is written to *data, and the number of bytes successfully read is
// written to *length. If unsuccessful, *data and *length are undefined.
@ -83,11 +86,15 @@ typedef void (*runCycleFuncPtr)();
class IAsyncFileSystem {
public:
virtual Future< Reference<class IAsyncFile> > open( std::string filename, int64_t flags, int64_t mode ) = 0;
// Opens a file for asynchronous I/O
virtual Future< Reference<class IAsyncFile> > open( std::string filename, int64_t flags, int64_t mode ) = 0;
virtual Future< Void > deleteFile( std::string filename, bool mustBeDurable ) = 0;
// Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure.
virtual Future< Void > deleteFile( std::string filename, bool mustBeDurable ) = 0;
// Unlinks a file and then deletes it slowly by truncating the file repeatedly.
// If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure.
virtual Future<Void> incrementalDeleteFile( std::string filename, bool mustBeDurable );
static IAsyncFileSystem* filesystem() { return filesystem(g_network); }
static runCycleFuncPtr runCycleFunc() { return reinterpret_cast<runCycleFuncPtr>(reinterpret_cast<flowGlobalType>(g_network->global(INetwork::enRunCycleFunc))); }

View File

@ -115,7 +115,7 @@ public:
: basename(basename), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
dbg_file0BeginSeq(0), fileExtensionBytes(10<<20), readingBuffer( dbgid ),
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit)
readyToPush(Void()), fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void())
{
if(BUGGIFY)
fileExtensionBytes = 8<<10;
@ -190,7 +190,7 @@ public:
Future<Void> onError, onStopped;
Future<Void> readyToPush;
AndFuture lastCommit;
Future<Void> lastCommit;
StringBuffer readingBuffer; // Pages that have been read and not yet returned
int readingFile; // i if the next page after readingBuffer should be read from files[i], 2 if recovery is complete
@ -273,12 +273,13 @@ public:
state std::string filename = self->files[0].dbgFilename;
state UID dbgid = self->dbgid;
state vector<Reference<SyncQueue>> syncFiles;
state Future<Void> lastCommit = self->lastCommit;
try {
// pushing might need to wait for previous pushes to start (to maintain order) or for
// a previous commit to finish if stall() was called
Future<Void> ready = self->readyToPush;
self->readyToPush = pushing.getFuture();
self->lastCommit.add( committed.getFuture() );
self->lastCommit = committed.getFuture();
Void _ = wait( ready );
@ -296,6 +297,8 @@ public:
Future<Void> sync = syncFiles[0]->onSync();
for(int i=1; i<syncFiles.size(); i++) sync = sync && syncFiles[i]->onSync();
Void _ = wait( sync );
Void _ = wait( lastCommit );
Void _ = wait( yield() );
self->updatePopped( poppedPages*sizeof(Page) );
@ -303,10 +306,6 @@ public:
.detail("File0Size", self->files[0].size).detail("File1Size", self->files[1].size)
.detail("File0Name", self->files[0].dbgFilename).detail("SyncedFiles", syncFiles.size());*/
if(g_random->random01() < 0.01) {
//occasionally delete all the ready future in the AndFuture
self->lastCommit.cleanup();
}
committed.send(Void());
} catch (Error& e) {
delete pageMem;
@ -399,7 +398,7 @@ public:
// Wait for all reads and writes on the file, and all actors referencing self, to be finished
state Error error = success();
try {
ErrorOr<Void> _ = wait(errorOr(self->lastCommit.getFuture()));
ErrorOr<Void> _ = wait(errorOr(self->lastCommit));
while (self->recoveryActorCount.get(false))
Void _ = wait( self->recoveryActorCount.onChange(false) );
@ -410,8 +409,8 @@ public:
TraceEvent("DiskQueueShutdownDeleting", self->dbgid)
.detail("File0", self->filename(0))
.detail("File1", self->filename(1));
Void _ = wait( IAsyncFile::incrementalDelete( self->filename(0), false ) );
Void _ = wait( IAsyncFile::incrementalDelete( self->filename(1), true ) );
Void _ = wait( IAsyncFileSystem::filesystem()->incrementalDeleteFile( self->filename(0), false ) );
Void _ = wait( IAsyncFileSystem::filesystem()->incrementalDeleteFile( self->filename(1), true ) );
}
TraceEvent("DiskQueueShutdownComplete", self->dbgid)
.detail("DeleteFiles", deleteFiles)
@ -586,20 +585,11 @@ public:
ACTOR static UNCANCELLABLE Future<Void> truncateFile(RawDiskQueue_TwoFiles* self, int file, int64_t pos) {
state TrackMe trackMe(self);
state StringBuffer zeros( self->dbgid );
TraceEvent("DQTruncateFile", self->dbgid).detail("File", file).detail("Pos", pos).detail("File0Name", self->files[0].dbgFilename);
zeros.alignReserve( sizeof(Page), 1<<20 );
memset( zeros.append(1<<20), 0, 1<<20 );
while(pos < self->files[file].size) {
state int len = std::min<int64_t>(zeros.size(), self->files[file].size-pos);
Void _ = wait( self->files[file].f->write( zeros.str.begin(), len, pos ) );
pos += len;
}
state Reference<IAsyncFile> f = self->files[file].f; // Hold onto a reference in the off-chance that the DQ is removed from underneath us.
Void _ = wait( f->zeroRange( pos, self->files[file].size-pos ) );
Void _ = wait(self->files[file].syncQueue->onSync());
// We intentionally don't return the f->zero future, so that TrackMe is destructed after f->zero finishes.
return Void();
}

View File

@ -1827,8 +1827,8 @@ private:
self->logging.cancel();
Void _ = wait( self->readThreads->stop() && self->writeThread->stop() );
if (deleteOnClose) {
Void _ = wait( IAsyncFile::incrementalDelete( self->filename, true ) );
Void _ = wait( IAsyncFile::incrementalDelete( self->filename + "-wal", false ) );
Void _ = wait( IAsyncFileSystem::filesystem()->incrementalDeleteFile( self->filename, true ) );
Void _ = wait( IAsyncFileSystem::filesystem()->incrementalDeleteFile( self->filename + "-wal", false ) );
}
} catch (Error& e) {
TraceEvent(SevError, "KVDoCloseError", self->logID)

View File

@ -489,7 +489,12 @@ inline static void aligned_free(void* ptr) { free(ptr); }
inline static void* aligned_alloc(size_t alignment, size_t size) { return memalign(alignment, size); }
#endif
#elif defined(__APPLE__)
inline static void* aligned_alloc(size_t alignment, size_t size) { return malloc(size); } // FIXME: OSX doesn't have memalign(). All allocations are 16-byte aligned
#include <cstdlib>
inline static void* aligned_alloc(size_t alignment, size_t size) {
void* ptr = nullptr;
posix_memalign(&ptr, alignment, size);
return ptr;
}
inline static void aligned_free(void* ptr) { free(ptr); }
#endif