Merge pull request #3672 from sfc-gh-tclinkenbeard/make-iasyncfile-const-correct
Make IAsyncFile const-correct
This commit is contained in:
commit
939f59d989
|
@ -23,7 +23,7 @@
|
|||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
Future<int64_t> AsyncFileBlobStoreRead::size() {
|
||||
Future<int64_t> AsyncFileBlobStoreRead::size() const {
|
||||
if(!m_size.isValid())
|
||||
m_size = m_bstore->objectSize(m_bucket, m_object);
|
||||
return m_size;
|
||||
|
|
|
@ -92,7 +92,7 @@ public:
|
|||
MD5_CTX content_md5_buf;
|
||||
};
|
||||
|
||||
virtual Future<int> read( void *data, int length, int64_t offset ) { throw file_not_readable(); }
|
||||
Future<int> read(void* data, int length, int64_t offset) override { throw file_not_readable(); }
|
||||
|
||||
ACTOR static Future<Void> write_impl(Reference<AsyncFileBlobStoreWrite> f, const uint8_t *data, int length) {
|
||||
state Part *p = f->m_parts.back().getPtr();
|
||||
|
@ -115,7 +115,7 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
virtual Future<Void> write( void const *data, int length, int64_t offset ) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
if(offset != m_cursor)
|
||||
throw non_sequential_op();
|
||||
m_cursor += length;
|
||||
|
@ -123,7 +123,7 @@ public:
|
|||
return m_error.getFuture() || write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
|
||||
}
|
||||
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
if(size != m_cursor)
|
||||
return non_sequential_op();
|
||||
return Void();
|
||||
|
@ -165,7 +165,7 @@ public:
|
|||
}
|
||||
|
||||
// Ready once all data has been sent AND acknowledged from the remote side
|
||||
virtual Future<Void> sync() {
|
||||
Future<Void> sync() override {
|
||||
// Only initiate the finish operation once, and also prevent further writing.
|
||||
if(!m_finished.isValid()) {
|
||||
m_finished = doFinishUpload(this);
|
||||
|
@ -182,25 +182,25 @@ public:
|
|||
// their size. So in the case of a write buffer that does not meet the part minimum size the part could be sent
|
||||
// but then if there is any more data written then that part needs to be sent again in its entirety. So a client
|
||||
// that calls flush often could generate far more blob store write traffic than they intend to.
|
||||
virtual Future<Void> flush() { return Void(); }
|
||||
Future<Void> flush() override { return Void(); }
|
||||
|
||||
virtual Future<int64_t> size() { return m_cursor; }
|
||||
Future<int64_t> size() const override { return m_cursor; }
|
||||
|
||||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) {
|
||||
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override {
|
||||
TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "BlobStoreWrite");
|
||||
return platform_error();
|
||||
}
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
void releaseZeroCopy(void* data, int length, int64_t offset) override {}
|
||||
|
||||
virtual int64_t debugFD() { return -1; }
|
||||
int64_t debugFD() const override { return -1; }
|
||||
|
||||
virtual ~AsyncFileBlobStoreWrite() {
|
||||
~AsyncFileBlobStoreWrite() override {
|
||||
m_upload_id.cancel();
|
||||
m_finished.cancel();
|
||||
m_parts.clear(); // Contains futures
|
||||
}
|
||||
|
||||
virtual std::string getFilename() { return m_object; }
|
||||
std::string getFilename() const override { return m_object; }
|
||||
|
||||
private:
|
||||
Reference<BlobStoreEndpoint> m_bstore;
|
||||
|
@ -259,32 +259,32 @@ public:
|
|||
virtual void addref() { ReferenceCounted<AsyncFileBlobStoreRead>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<AsyncFileBlobStoreRead>::delref(); }
|
||||
|
||||
virtual Future<int> read( void *data, int length, int64_t offset );
|
||||
Future<int> read(void* data, int length, int64_t offset) override;
|
||||
|
||||
virtual Future<Void> write( void const *data, int length, int64_t offset ) { throw file_not_writable(); }
|
||||
virtual Future<Void> truncate( int64_t size ) { throw file_not_writable(); }
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override { throw file_not_writable(); }
|
||||
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
|
||||
|
||||
virtual Future<Void> sync() { return Void(); }
|
||||
virtual Future<Void> flush() { return Void(); }
|
||||
Future<Void> sync() override { return Void(); }
|
||||
Future<Void> flush() override { return Void(); }
|
||||
|
||||
virtual Future<int64_t> size();
|
||||
Future<int64_t> size() const override;
|
||||
|
||||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) {
|
||||
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override {
|
||||
TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "BlobStoreRead");
|
||||
return platform_error();
|
||||
}
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
void releaseZeroCopy(void* data, int length, int64_t offset) override {}
|
||||
|
||||
virtual int64_t debugFD() { return -1; }
|
||||
int64_t debugFD() const override { return -1; }
|
||||
|
||||
virtual std::string getFilename() { return m_object; }
|
||||
std::string getFilename() const override { return m_object; }
|
||||
|
||||
virtual ~AsyncFileBlobStoreRead() {}
|
||||
|
||||
Reference<BlobStoreEndpoint> m_bstore;
|
||||
std::string m_bucket;
|
||||
std::string m_object;
|
||||
Future<int64_t> m_size;
|
||||
mutable Future<int64_t> m_size;
|
||||
|
||||
AsyncFileBlobStoreRead(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object)
|
||||
: m_bstore(bstore), m_bucket(bucket), m_object(object) {
|
||||
|
|
|
@ -160,7 +160,7 @@ public:
|
|||
return openFiles[filename].get();
|
||||
}
|
||||
|
||||
virtual Future<int> read( void* data, int length, int64_t offset ) {
|
||||
Future<int> read(void* data, int length, int64_t offset) override {
|
||||
++countFileCacheReads;
|
||||
++countCacheReads;
|
||||
if (offset + length > this->length) {
|
||||
|
@ -190,17 +190,15 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
virtual Future<Void> write( void const* data, int length, int64_t offset ) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
return write_impl(this, data, length, offset);
|
||||
}
|
||||
|
||||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset );
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset );
|
||||
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override;
|
||||
void releaseZeroCopy(void* data, int length, int64_t offset) override;
|
||||
|
||||
// This waits for previously started truncates to finish and then truncates
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
return truncate_impl(this, size);
|
||||
}
|
||||
Future<Void> truncate(int64_t size) override { return truncate_impl(this, size); }
|
||||
|
||||
// This is the 'real' truncate that does the actual removal of cache blocks and then shortens the file
|
||||
Future<Void> changeFileSize( int64_t size );
|
||||
|
@ -215,21 +213,13 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
virtual Future<Void> sync() {
|
||||
return waitAndSync( this, flush() );
|
||||
}
|
||||
Future<Void> sync() override { return waitAndSync(this, flush()); }
|
||||
|
||||
virtual Future<int64_t> size() {
|
||||
return length;
|
||||
}
|
||||
Future<int64_t> size() const override { return length; }
|
||||
|
||||
virtual int64_t debugFD() {
|
||||
return uncached->debugFD();
|
||||
}
|
||||
int64_t debugFD() const override { return uncached->debugFD(); }
|
||||
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
std::string getFilename() const override { return filename; }
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<AsyncFileCached>::addref();
|
||||
|
@ -337,7 +327,7 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
virtual Future<Void> flush();
|
||||
Future<Void> flush() override;
|
||||
|
||||
Future<Void> quiesce();
|
||||
|
||||
|
@ -356,7 +346,7 @@ private:
|
|||
};
|
||||
|
||||
struct AFCPage : public EvictablePage, public FastAllocated<AFCPage> {
|
||||
virtual bool evict() {
|
||||
bool evict() override {
|
||||
if ( notReading.isReady() && notFlushing.isReady() && !dirty && !zeroCopyRefCount && !truncated ) {
|
||||
owner->remove_page( this );
|
||||
delete this;
|
||||
|
|
|
@ -115,26 +115,26 @@ public:
|
|||
virtual void addref() { ReferenceCounted<AsyncFileEIO>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<AsyncFileEIO>::delref(); }
|
||||
|
||||
virtual int64_t debugFD() { return fd; }
|
||||
int64_t debugFD() const override { return fd; }
|
||||
|
||||
virtual Future<int> read( void* data, int length, int64_t offset ) {
|
||||
Future<int> read(void* data, int length, int64_t offset) override {
|
||||
++countFileLogicalReads;
|
||||
++countLogicalReads;
|
||||
return read_impl(fd, data, length, offset);
|
||||
}
|
||||
virtual Future<Void> write( void const* data, int length, int64_t offset ) // Copies data synchronously
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override // Copies data synchronously
|
||||
{
|
||||
++countFileLogicalWrites;
|
||||
++countLogicalWrites;
|
||||
//Standalone<StringRef> copy = StringRef((const uint8_t*)data, length);
|
||||
return write_impl( fd, err, StringRef((const uint8_t*)data, length), offset );
|
||||
}
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
++countFileLogicalWrites;
|
||||
++countLogicalWrites;
|
||||
return truncate_impl( fd, err, size );
|
||||
}
|
||||
virtual Future<Void> sync() {
|
||||
Future<Void> sync() override {
|
||||
++countFileLogicalWrites;
|
||||
++countLogicalWrites;
|
||||
auto fsync = sync_impl( fd, err );
|
||||
|
@ -147,14 +147,12 @@ public:
|
|||
|
||||
return fsync;
|
||||
}
|
||||
virtual Future<int64_t> size() {
|
||||
Future<int64_t> size() const override {
|
||||
++countFileLogicalReads;
|
||||
++countLogicalReads;
|
||||
return size_impl(fd);
|
||||
}
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
std::string getFilename() const override { return filename; }
|
||||
|
||||
ACTOR static Future<Void> async_fsync_parent( std::string filename ) {
|
||||
std::string folder = parentDirectory( filename );
|
||||
|
@ -227,11 +225,11 @@ private:
|
|||
int fd, flags;
|
||||
Reference<ErrorInfo> err;
|
||||
std::string filename;
|
||||
Int64MetricHandle countFileLogicalWrites;
|
||||
Int64MetricHandle countFileLogicalReads;
|
||||
mutable Int64MetricHandle countFileLogicalWrites;
|
||||
mutable Int64MetricHandle countFileLogicalReads;
|
||||
|
||||
Int64MetricHandle countLogicalWrites;
|
||||
Int64MetricHandle countLogicalReads;
|
||||
mutable Int64MetricHandle countLogicalWrites;
|
||||
mutable Int64MetricHandle countLogicalReads;
|
||||
|
||||
AsyncFileEIO( int fd, int flags, std::string const& filename ) : fd(fd), flags(flags), filename(filename), err(new ErrorInfo) {
|
||||
if( !g_network->isSimulated() ) {
|
||||
|
|
|
@ -182,7 +182,7 @@ public:
|
|||
virtual void addref() { ReferenceCounted<AsyncFileKAIO>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<AsyncFileKAIO>::delref(); }
|
||||
|
||||
virtual Future<int> read( void* data, int length, int64_t offset ) {
|
||||
Future<int> read(void* data, int length, int64_t offset) override {
|
||||
++countFileLogicalReads;
|
||||
++countLogicalReads;
|
||||
//printf("%p Begin logical read\n", getCurrentCoro());
|
||||
|
@ -205,7 +205,7 @@ public:
|
|||
|
||||
return result;
|
||||
}
|
||||
virtual Future<Void> write( void const* data, int length, int64_t offset ) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
++countFileLogicalWrites;
|
||||
++countLogicalWrites;
|
||||
//printf("%p Begin logical write\n", getCurrentCoro());
|
||||
|
@ -234,7 +234,7 @@ public:
|
|||
#ifndef FALLOC_FL_ZERO_RANGE
|
||||
#define FALLOC_FL_ZERO_RANGE 0x10
|
||||
#endif
|
||||
virtual Future<Void> zeroRange( int64_t offset, int64_t length ) override {
|
||||
Future<Void> zeroRange(int64_t offset, int64_t length) override {
|
||||
bool success = false;
|
||||
if (ctx.fallocateZeroSupported) {
|
||||
int rc = fallocate( fd, FALLOC_FL_ZERO_RANGE, offset, length );
|
||||
|
@ -247,7 +247,7 @@ public:
|
|||
}
|
||||
return success ? Void() : IAsyncFile::zeroRange(offset, length);
|
||||
}
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
++countFileLogicalWrites;
|
||||
++countLogicalWrites;
|
||||
|
||||
|
@ -308,7 +308,7 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
virtual Future<Void> sync() {
|
||||
Future<Void> sync() override {
|
||||
++countFileLogicalWrites;
|
||||
++countLogicalWrites;
|
||||
|
||||
|
@ -340,13 +340,9 @@ public:
|
|||
|
||||
return fsync;
|
||||
}
|
||||
virtual Future<int64_t> size() { return nextFileSize; }
|
||||
virtual int64_t debugFD() {
|
||||
return fd;
|
||||
}
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
Future<int64_t> size() const override { return nextFileSize; }
|
||||
int64_t debugFD() const override { return fd; }
|
||||
std::string getFilename() const override { return filename; }
|
||||
~AsyncFileKAIO() {
|
||||
close(fd);
|
||||
|
||||
|
|
|
@ -87,42 +87,42 @@ public:
|
|||
ReferenceCounted<AsyncFileDetachable>::delref();
|
||||
}
|
||||
|
||||
Future<int> read(void *data, int length, int64_t offset) {
|
||||
Future<int> read(void* data, int length, int64_t offset) override {
|
||||
if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
|
||||
return io_error().asInjectedFault();
|
||||
return sendErrorOnShutdown( file->read( data, length, offset ) );
|
||||
}
|
||||
|
||||
Future<Void> write(void const *data, int length, int64_t offset) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
|
||||
return io_error().asInjectedFault();
|
||||
return sendErrorOnShutdown( file->write( data, length, offset ) );
|
||||
}
|
||||
|
||||
Future<Void> truncate(int64_t size) {
|
||||
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
|
||||
return io_error().asInjectedFault();
|
||||
return sendErrorOnShutdown( file->truncate( size ) );
|
||||
}
|
||||
|
||||
Future<Void> sync() {
|
||||
Future<Void> sync() override {
|
||||
if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
|
||||
return io_error().asInjectedFault();
|
||||
return sendErrorOnShutdown( file->sync() );
|
||||
}
|
||||
|
||||
Future<int64_t> size() {
|
||||
Future<int64_t> size() const override {
|
||||
if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
|
||||
return io_error().asInjectedFault();
|
||||
return sendErrorOnShutdown( file->size() );
|
||||
}
|
||||
|
||||
int64_t debugFD() {
|
||||
int64_t debugFD() const override {
|
||||
if( !file.getPtr() )
|
||||
throw io_error().asInjectedFault();
|
||||
return file->debugFD();
|
||||
}
|
||||
std::string getFilename() {
|
||||
std::string getFilename() const override {
|
||||
if( !file.getPtr() )
|
||||
throw io_error().asInjectedFault();
|
||||
return file->getFilename();
|
||||
|
@ -137,7 +137,7 @@ public:
|
|||
std::string filename;
|
||||
|
||||
//An approximation of the size of the file; .size() should be used instead of this variable in most cases
|
||||
int64_t approximateSize;
|
||||
mutable int64_t approximateSize;
|
||||
|
||||
//The address of the machine that opened the file
|
||||
NetworkAddress openedAddress;
|
||||
|
@ -263,13 +263,11 @@ public:
|
|||
}
|
||||
|
||||
//Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the results
|
||||
Future<int> read(void *data, int length, int64_t offset) {
|
||||
return read(this, data, length, offset);
|
||||
}
|
||||
Future<int> read(void* data, int length, int64_t offset) override { return read(this, data, length, offset); }
|
||||
|
||||
//Writes data to the file. Writes are delayed a random amount of time before being
|
||||
//passed to the underlying file
|
||||
Future<Void> write(void const *data, int length, int64_t offset) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
//TraceEvent("AsyncFileNonDurable_Write", id).detail("Filename", filename).detail("Offset", offset).detail("Length", length);
|
||||
if(length == 0) {
|
||||
TraceEvent(SevWarnAlways, "AsyncFileNonDurable_EmptyModification", id).detail("Filename", filename);
|
||||
|
@ -283,10 +281,10 @@ public:
|
|||
writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset));
|
||||
return writeStarted.getFuture();
|
||||
}
|
||||
|
||||
|
||||
//Truncates the file. Truncates are delayed a random amount of time before being
|
||||
//passed to the underlying file
|
||||
Future<Void> truncate(int64_t size) {
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
//TraceEvent("AsyncFileNonDurable_Truncate", id).detail("Filename", filename).detail("Offset", size);
|
||||
debugFileTruncate("AsyncFileNonDurableTruncate", filename, size);
|
||||
|
||||
|
@ -306,17 +304,11 @@ public:
|
|||
}
|
||||
|
||||
//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
|
||||
Future<int64_t> size() {
|
||||
return size(this);
|
||||
}
|
||||
Future<int64_t> size() const override { return size(this); }
|
||||
|
||||
int64_t debugFD() {
|
||||
return file->debugFD();
|
||||
}
|
||||
int64_t debugFD() const override { return file->debugFD(); }
|
||||
|
||||
std::string getFilename() {
|
||||
return file->getFilename();
|
||||
}
|
||||
std::string getFilename() const override { return file->getFilename(); }
|
||||
|
||||
//Forces a non-durable sync (some writes are not made or made incorrectly)
|
||||
//This is used when the file should 'die' without first completing its operations
|
||||
|
@ -358,7 +350,7 @@ private:
|
|||
}
|
||||
|
||||
//Checks if the file is killed. If so, then the current sync is completed if running and then an error is thrown
|
||||
ACTOR Future<Void> checkKilled(AsyncFileNonDurable *self, std::string context) {
|
||||
ACTOR static Future<Void> checkKilled(AsyncFileNonDurable const* self, std::string context) {
|
||||
if(self->killed.isSet()) {
|
||||
//TraceEvent("AsyncFileNonDurable_KilledInCheck", self->id).detail("In", context).detail("Filename", self->filename);
|
||||
wait(self->killComplete.getFuture());
|
||||
|
@ -372,14 +364,14 @@ private:
|
|||
|
||||
//Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the results
|
||||
ACTOR Future<int> onRead(AsyncFileNonDurable *self, void *data, int length, int64_t offset) {
|
||||
wait(self->checkKilled(self, "Read"));
|
||||
wait(checkKilled(self, "Read"));
|
||||
vector<Future<Void>> priorModifications = self->getModificationsAndInsert(offset, length);
|
||||
wait(waitForAll(priorModifications));
|
||||
state Future<int> readFuture = self->file->read(data, length, offset);
|
||||
wait( success( readFuture ) || self->killed.getFuture() );
|
||||
|
||||
// throws if we were killed
|
||||
wait(self->checkKilled(self, "ReadEnd"));
|
||||
wait(checkKilled(self, "ReadEnd"));
|
||||
|
||||
debugFileCheck("AsyncFileNonDurableRead", self->filename, data, offset, length);
|
||||
|
||||
|
@ -421,7 +413,7 @@ private:
|
|||
|
||||
try {
|
||||
//TraceEvent("AsyncFileNonDurable_Write", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
|
||||
wait(self->checkKilled(self, "Write"));
|
||||
wait(checkKilled(self, "Write"));
|
||||
|
||||
Future<Void> writeEnded = wait(ownFuture);
|
||||
std::vector<Future<Void>> priorModifications = self->getModificationsAndInsert(offset, length, true, writeEnded);
|
||||
|
@ -543,7 +535,7 @@ private:
|
|||
|
||||
try {
|
||||
//TraceEvent("AsyncFileNonDurable_Truncate", self->id).detail("Delay", delayDuration).detail("Filename", self->filename);
|
||||
wait(self->checkKilled(self, "Truncate"));
|
||||
wait(checkKilled(self, "Truncate"));
|
||||
|
||||
Future<Void> truncateEnded = wait(ownFuture);
|
||||
std::vector<Future<Void>> priorModifications = self->getModificationsAndInsert(size, -1, true, truncateEnded);
|
||||
|
@ -600,8 +592,8 @@ private:
|
|||
wait(waitUntilDiskReady(self->diskParameters, 0, true) || self->killed.getFuture());
|
||||
}
|
||||
|
||||
wait(self->checkKilled(self, durable ? "Sync" : "Kill"));
|
||||
|
||||
wait(checkKilled(self, durable ? "Sync" : "Kill"));
|
||||
|
||||
if(!durable)
|
||||
self->killed.send( Void() );
|
||||
|
||||
|
@ -653,7 +645,7 @@ private:
|
|||
}
|
||||
//A killed file cannot be allowed to report that it successfully synced
|
||||
else {
|
||||
wait(self->checkKilled(self, "SyncEnd"));
|
||||
wait(checkKilled(self, "SyncEnd"));
|
||||
wait(self->file->sync());
|
||||
//TraceEvent("AsyncFileNonDurable_ImplSyncEnd", self->id).detail("Filename", self->filename).detail("Durable", durable);
|
||||
}
|
||||
|
@ -679,13 +671,13 @@ private:
|
|||
}
|
||||
|
||||
//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
|
||||
ACTOR Future<int64_t> onSize(AsyncFileNonDurable *self) {
|
||||
ACTOR static Future<int64_t> onSize(AsyncFileNonDurable const* self) {
|
||||
//TraceEvent("AsyncFileNonDurable_Size", self->id).detail("Filename", self->filename);
|
||||
wait(self->checkKilled(self, "Size"));
|
||||
wait(checkKilled(self, "Size"));
|
||||
state Future<int64_t> sizeFuture = self->file->size();
|
||||
wait( success( sizeFuture ) || self->killed.getFuture() );
|
||||
|
||||
wait(self->checkKilled(self, "SizeEnd"));
|
||||
wait(checkKilled(self, "SizeEnd"));
|
||||
|
||||
//Include any modifications which extend past the end of the file
|
||||
uint64_t maxModification = self->pendingModifications.lastItem().begin();
|
||||
|
@ -693,14 +685,14 @@ private:
|
|||
return self->approximateSize;
|
||||
}
|
||||
|
||||
ACTOR Future<int64_t> size(AsyncFileNonDurable *self) {
|
||||
ACTOR static Future<int64_t> size(AsyncFileNonDurable const* self) {
|
||||
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
|
||||
state TaskPriority currentTaskID = g_network->getCurrentTask();
|
||||
|
||||
wait( g_simulator.onMachine( currentProcess ) );
|
||||
|
||||
try {
|
||||
state int64_t rep = wait( self->onSize( self ) );
|
||||
state int64_t rep = wait(onSize(self));
|
||||
wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
|
||||
|
||||
return rep;
|
||||
|
|
|
@ -155,27 +155,27 @@ public:
|
|||
return wpos;
|
||||
}
|
||||
|
||||
virtual Future<int> read( void *data, int length, int64_t offset ) {
|
||||
Future<int> read(void* data, int length, int64_t offset) override {
|
||||
return read_impl(Reference<AsyncFileReadAheadCache>::addRef(this), data, length, offset);
|
||||
}
|
||||
|
||||
virtual Future<Void> write( void const *data, int length, int64_t offset ) { throw file_not_writable(); }
|
||||
virtual Future<Void> truncate( int64_t size ) { throw file_not_writable(); }
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override { throw file_not_writable(); }
|
||||
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
|
||||
|
||||
virtual Future<Void> sync() { return Void(); }
|
||||
virtual Future<Void> flush() { return Void(); }
|
||||
Future<Void> sync() override { return Void(); }
|
||||
Future<Void> flush() override { return Void(); }
|
||||
|
||||
virtual Future<int64_t> size() { return m_f->size(); }
|
||||
Future<int64_t> size() const override { return m_f->size(); }
|
||||
|
||||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) {
|
||||
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override {
|
||||
TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "ReadAheadCache");
|
||||
return platform_error();
|
||||
}
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
void releaseZeroCopy(void* data, int length, int64_t offset) override {}
|
||||
|
||||
virtual int64_t debugFD() { return -1; }
|
||||
int64_t debugFD() const override { return -1; }
|
||||
|
||||
virtual std::string getFilename() { return m_f->getFilename(); }
|
||||
std::string getFilename() const override { return m_f->getFilename(); }
|
||||
|
||||
virtual ~AsyncFileReadAheadCache() {
|
||||
for(auto &it : m_blocks) {
|
||||
|
|
|
@ -87,7 +87,7 @@ public:
|
|||
virtual void addref() { ReferenceCounted<AsyncFileWinASIO>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<AsyncFileWinASIO>::delref(); }
|
||||
|
||||
virtual int64_t debugFD() { return (int64_t)file.native_handle(); }
|
||||
int64_t debugFD() const override { return (int64_t)(const_cast<decltype(file)&>(file).native_handle()); }
|
||||
|
||||
static void onReadReady( Promise<int> onReady, const boost::system::error_code& error, size_t bytesRead ) {
|
||||
if (error) {
|
||||
|
@ -116,7 +116,7 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
virtual Future<int> read( void* data, int length, int64_t offset ) {
|
||||
Future<int> read(void* data, int length, int64_t offset) override {
|
||||
// the size call is set inline
|
||||
auto end = this->size().get();
|
||||
//TraceEvent("WinAsyncRead").detail("Offset", offset).detail("Length", length).detail("FileSize", end).detail("FileName", filename);
|
||||
|
@ -128,7 +128,7 @@ public:
|
|||
|
||||
return result.getFuture();
|
||||
}
|
||||
virtual Future<Void> write( void const* data, int length, int64_t offset ) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
/*
|
||||
FIXME
|
||||
if ( length + offset >= fileValidData ) {
|
||||
|
@ -139,7 +139,7 @@ public:
|
|||
boost::asio::async_write_at( file, offset, boost::asio::const_buffers_1( data, length ), boost::bind( &onWriteReady, result, length, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
|
||||
return result.getFuture();
|
||||
}
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
// FIXME: Possibly use SetFileInformationByHandle( file.native_handle(), FileEndOfFileInfo, ... ) instead
|
||||
if (!SetFilePointerEx( file.native_handle(), *(LARGE_INTEGER*)&size, NULL, FILE_BEGIN ))
|
||||
throw io_error();
|
||||
|
@ -147,7 +147,7 @@ public:
|
|||
throw io_error();
|
||||
return Void();
|
||||
}
|
||||
virtual Future<Void> sync() {
|
||||
Future<Void> sync() override {
|
||||
// FIXME: Do FlushFileBuffers in a worker thread (using g_network->createThreadPool)?
|
||||
if (!FlushFileBuffers( file.native_handle() )) throw io_error();
|
||||
|
||||
|
@ -159,14 +159,12 @@ public:
|
|||
|
||||
return Void();
|
||||
}
|
||||
virtual Future<int64_t> size() {
|
||||
Future<int64_t> size() const override {
|
||||
LARGE_INTEGER s;
|
||||
if (!GetFileSizeEx(file.native_handle(), &s)) throw io_error();
|
||||
if (!GetFileSizeEx(const_cast<decltype(file)&>(file).native_handle(), &s)) throw io_error();
|
||||
return *(int64_t*)&s;
|
||||
}
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
std::string getFilename() const override { return filename; }
|
||||
|
||||
~AsyncFileWinASIO() { }
|
||||
|
||||
|
|
|
@ -60,10 +60,10 @@ public:
|
|||
|
||||
Future<Void> sync() { return m_f->sync(); }
|
||||
Future<Void> flush() { return m_f->flush(); }
|
||||
Future<int64_t> size() { return m_f->size(); }
|
||||
std::string getFilename() { return m_f->getFilename(); }
|
||||
Future<int64_t> size() const override { return m_f->size(); }
|
||||
std::string getFilename() const override { return m_f->getFilename(); }
|
||||
void releaseZeroCopy( void* data, int length, int64_t offset ) { return m_f->releaseZeroCopy(data, length, offset); }
|
||||
int64_t debugFD() { return m_f->debugFD(); }
|
||||
int64_t debugFD() const override { return m_f->debugFD(); }
|
||||
|
||||
AsyncFileWriteChecker(Reference<IAsyncFile> f) : m_f(f) {
|
||||
// Initialize the static history budget the first time (and only the first time) a file is opened.
|
||||
|
|
|
@ -63,8 +63,8 @@ public:
|
|||
virtual Future<Void> truncate( int64_t size ) = 0;
|
||||
virtual Future<Void> sync() = 0;
|
||||
virtual Future<Void> flush() { return Void(); } // Sends previous writes to the OS if they have been buffered in memory, but does not make them power safe
|
||||
virtual Future<int64_t> size() = 0;
|
||||
virtual std::string getFilename() = 0;
|
||||
virtual Future<int64_t> size() const = 0;
|
||||
virtual std::string getFilename() const = 0;
|
||||
|
||||
// Attempt to read the *length bytes at offset without copying. If successful, a pointer to the
|
||||
// requested bytes is written to *data, and the number of bytes successfully read is
|
||||
|
@ -80,7 +80,7 @@ public:
|
|||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) { return io_error(); }
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
|
||||
virtual int64_t debugFD() = 0;
|
||||
virtual int64_t debugFD() const = 0;
|
||||
};
|
||||
|
||||
typedef void (*runCycleFuncPtr)();
|
||||
|
|
|
@ -164,6 +164,11 @@ public:
|
|||
i.decrementNonEnd();
|
||||
return iterator(i);
|
||||
}
|
||||
const_iterator lastItem() const {
|
||||
auto i(map.lastItem());
|
||||
i.decrementNonEnd();
|
||||
return const_iterator(i);
|
||||
}
|
||||
int size() const { return map.size() - 1; } // We always have one range bounded by two entries
|
||||
iterator randomRange() { return iterator(map.index(deterministicRandom()->randomInt(0, map.size() - 1))); }
|
||||
const_iterator randomRange() const {
|
||||
|
@ -275,4 +280,4 @@ void RangeMap<Key,Val,Range,Metric,MetricFunc>::insert( const Range& keys, const
|
|||
map.insert(beginPair, true, mf(beginPair));
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -478,31 +478,21 @@ public:
|
|||
virtual void addref() { ReferenceCounted<SimpleFile>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<SimpleFile>::delref(); }
|
||||
|
||||
virtual int64_t debugFD() { return (int64_t)h; }
|
||||
int64_t debugFD() const override { return (int64_t)h; }
|
||||
|
||||
virtual Future<int> read( void* data, int length, int64_t offset ) {
|
||||
return read_impl( this, data, length, offset );
|
||||
}
|
||||
Future<int> read(void* data, int length, int64_t offset) override { return read_impl(this, data, length, offset); }
|
||||
|
||||
virtual Future<Void> write( void const* data, int length, int64_t offset ) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
return write_impl( this, StringRef((const uint8_t*)data, length), offset );
|
||||
}
|
||||
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
return truncate_impl( this, size );
|
||||
}
|
||||
Future<Void> truncate(int64_t size) override { return truncate_impl(this, size); }
|
||||
|
||||
virtual Future<Void> sync() {
|
||||
return sync_impl( this );
|
||||
}
|
||||
Future<Void> sync() override { return sync_impl(this); }
|
||||
|
||||
virtual Future<int64_t> size() {
|
||||
return size_impl( this );
|
||||
}
|
||||
Future<int64_t> size() const override { return size_impl(this); }
|
||||
|
||||
virtual std::string getFilename() {
|
||||
return actualFilename;
|
||||
}
|
||||
std::string getFilename() const override { return actualFilename; }
|
||||
|
||||
~SimpleFile() {
|
||||
_close( h );
|
||||
|
@ -667,7 +657,7 @@ private:
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<int64_t> size_impl( SimpleFile* self ) {
|
||||
ACTOR static Future<int64_t> size_impl(SimpleFile const* self) {
|
||||
state UID opId = deterministicRandom()->randomUniqueID();
|
||||
if (randLog)
|
||||
fprintf(randLog, "SFS1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
|
||||
|
|
|
@ -785,14 +785,14 @@ public:
|
|||
void sendError(const E& exc) const { sav->sendError(exc); }
|
||||
|
||||
Future<T> getFuture() const { sav->addFutureRef(); return Future<T>(sav); }
|
||||
bool isSet() { return sav->isSet(); }
|
||||
bool canBeSet() { return sav->canBeSet(); }
|
||||
bool isValid() const { return sav != NULL; }
|
||||
bool isSet() const { return sav->isSet(); }
|
||||
bool canBeSet() const { return sav->canBeSet(); }
|
||||
bool isValid() const { return sav != nullptr; }
|
||||
Promise() : sav(new SAV<T>(0, 1)) {}
|
||||
Promise(const Promise& rhs) : sav(rhs.sav) {
|
||||
sav->addPromiseRef();
|
||||
}
|
||||
Promise(Promise&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) { rhs.sav = 0; }
|
||||
Promise(Promise&& rhs) noexcept : sav(rhs.sav) { rhs.sav = 0; }
|
||||
|
||||
~Promise() { if (sav) sav->delPromiseRef(); }
|
||||
|
||||
|
|
Loading…
Reference in New Issue