Make IAsyncFile const-correct
This commit is contained in:
parent
4867bad46e
commit
157700e5b6
|
@ -23,7 +23,7 @@
|
|||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
Future<int64_t> AsyncFileBlobStoreRead::size() {
|
||||
Future<int64_t> AsyncFileBlobStoreRead::size() const {
|
||||
if(!m_size.isValid())
|
||||
m_size = m_bstore->objectSize(m_bucket, m_object);
|
||||
return m_size;
|
||||
|
|
|
@ -92,7 +92,7 @@ public:
|
|||
MD5_CTX content_md5_buf;
|
||||
};
|
||||
|
||||
virtual Future<int> read( void *data, int length, int64_t offset ) { throw file_not_readable(); }
|
||||
Future<int> read(void* data, int length, int64_t offset) override { throw file_not_readable(); }
|
||||
|
||||
ACTOR static Future<Void> write_impl(Reference<AsyncFileBlobStoreWrite> f, const uint8_t *data, int length) {
|
||||
state Part *p = f->m_parts.back().getPtr();
|
||||
|
@ -115,7 +115,7 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
virtual Future<Void> write( void const *data, int length, int64_t offset ) {
|
||||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
if(offset != m_cursor)
|
||||
throw non_sequential_op();
|
||||
m_cursor += length;
|
||||
|
@ -123,7 +123,7 @@ public:
|
|||
return m_error.getFuture() || write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
|
||||
}
|
||||
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
Future<Void> truncate(int64_t size) override {
|
||||
if(size != m_cursor)
|
||||
return non_sequential_op();
|
||||
return Void();
|
||||
|
@ -182,25 +182,25 @@ public:
|
|||
// their size. So in the case of a write buffer that does not meet the part minimum size the part could be sent
|
||||
// but then if there is any more data written then that part needs to be sent again in its entirety. So a client
|
||||
// that calls flush often could generate far more blob store write traffic than they intend to.
|
||||
virtual Future<Void> flush() { return Void(); }
|
||||
Future<Void> flush() override { return Void(); }
|
||||
|
||||
virtual Future<int64_t> size() { return m_cursor; }
|
||||
Future<int64_t> size() const override { return m_cursor; }
|
||||
|
||||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) {
|
||||
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override {
|
||||
TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "BlobStoreWrite");
|
||||
return platform_error();
|
||||
}
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
void releaseZeroCopy(void* data, int length, int64_t offset) override {}
|
||||
|
||||
virtual int64_t debugFD() { return -1; }
|
||||
int64_t debugFD() const override { return -1; }
|
||||
|
||||
virtual ~AsyncFileBlobStoreWrite() {
|
||||
~AsyncFileBlobStoreWrite() override {
|
||||
m_upload_id.cancel();
|
||||
m_finished.cancel();
|
||||
m_parts.clear(); // Contains futures
|
||||
}
|
||||
|
||||
virtual std::string getFilename() { return m_object; }
|
||||
std::string getFilename() const override { return m_object; }
|
||||
|
||||
private:
|
||||
Reference<BlobStoreEndpoint> m_bstore;
|
||||
|
@ -267,7 +267,7 @@ public:
|
|||
virtual Future<Void> sync() { return Void(); }
|
||||
virtual Future<Void> flush() { return Void(); }
|
||||
|
||||
virtual Future<int64_t> size();
|
||||
Future<int64_t> size() const override;
|
||||
|
||||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) {
|
||||
TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "BlobStoreRead");
|
||||
|
@ -275,16 +275,16 @@ public:
|
|||
}
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
|
||||
virtual int64_t debugFD() { return -1; }
|
||||
virtual int64_t debugFD() const override { return -1; }
|
||||
|
||||
virtual std::string getFilename() { return m_object; }
|
||||
std::string getFilename() const override { return m_object; }
|
||||
|
||||
virtual ~AsyncFileBlobStoreRead() {}
|
||||
|
||||
Reference<BlobStoreEndpoint> m_bstore;
|
||||
std::string m_bucket;
|
||||
std::string m_object;
|
||||
Future<int64_t> m_size;
|
||||
mutable Future<int64_t> m_size;
|
||||
|
||||
AsyncFileBlobStoreRead(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object)
|
||||
: m_bstore(bstore), m_bucket(bucket), m_object(object) {
|
||||
|
|
|
@ -219,17 +219,11 @@ public:
|
|||
return waitAndSync( this, flush() );
|
||||
}
|
||||
|
||||
virtual Future<int64_t> size() {
|
||||
return length;
|
||||
}
|
||||
Future<int64_t> size() const override { return length; }
|
||||
|
||||
virtual int64_t debugFD() {
|
||||
return uncached->debugFD();
|
||||
}
|
||||
int64_t debugFD() const override { return uncached->debugFD(); }
|
||||
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
std::string getFilename() const override { return filename; }
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<AsyncFileCached>::addref();
|
||||
|
|
|
@ -115,7 +115,7 @@ public:
|
|||
virtual void addref() { ReferenceCounted<AsyncFileEIO>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<AsyncFileEIO>::delref(); }
|
||||
|
||||
virtual int64_t debugFD() { return fd; }
|
||||
int64_t debugFD() const override { return fd; }
|
||||
|
||||
virtual Future<int> read( void* data, int length, int64_t offset ) {
|
||||
++countFileLogicalReads;
|
||||
|
@ -147,14 +147,12 @@ public:
|
|||
|
||||
return fsync;
|
||||
}
|
||||
virtual Future<int64_t> size() {
|
||||
Future<int64_t> size() const override {
|
||||
++countFileLogicalReads;
|
||||
++countLogicalReads;
|
||||
return size_impl(fd);
|
||||
}
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
std::string getFilename() const override { return filename; }
|
||||
|
||||
ACTOR static Future<Void> async_fsync_parent( std::string filename ) {
|
||||
std::string folder = parentDirectory( filename );
|
||||
|
@ -227,11 +225,11 @@ private:
|
|||
int fd, flags;
|
||||
Reference<ErrorInfo> err;
|
||||
std::string filename;
|
||||
Int64MetricHandle countFileLogicalWrites;
|
||||
Int64MetricHandle countFileLogicalReads;
|
||||
mutable Int64MetricHandle countFileLogicalWrites;
|
||||
mutable Int64MetricHandle countFileLogicalReads;
|
||||
|
||||
Int64MetricHandle countLogicalWrites;
|
||||
Int64MetricHandle countLogicalReads;
|
||||
mutable Int64MetricHandle countLogicalWrites;
|
||||
mutable Int64MetricHandle countLogicalReads;
|
||||
|
||||
AsyncFileEIO( int fd, int flags, std::string const& filename ) : fd(fd), flags(flags), filename(filename), err(new ErrorInfo) {
|
||||
if( !g_network->isSimulated() ) {
|
||||
|
|
|
@ -340,13 +340,9 @@ public:
|
|||
|
||||
return fsync;
|
||||
}
|
||||
virtual Future<int64_t> size() { return nextFileSize; }
|
||||
virtual int64_t debugFD() {
|
||||
return fd;
|
||||
}
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
Future<int64_t> size() const override { return nextFileSize; }
|
||||
int64_t debugFD() const override { return fd; }
|
||||
std::string getFilename() const override { return filename; }
|
||||
~AsyncFileKAIO() {
|
||||
close(fd);
|
||||
|
||||
|
|
|
@ -111,18 +111,18 @@ public:
|
|||
return sendErrorOnShutdown( file->sync() );
|
||||
}
|
||||
|
||||
Future<int64_t> size() {
|
||||
Future<int64_t> size() const override {
|
||||
if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
|
||||
return io_error().asInjectedFault();
|
||||
return sendErrorOnShutdown( file->size() );
|
||||
}
|
||||
|
||||
int64_t debugFD() {
|
||||
int64_t debugFD() const override {
|
||||
if( !file.getPtr() )
|
||||
throw io_error().asInjectedFault();
|
||||
return file->debugFD();
|
||||
}
|
||||
std::string getFilename() {
|
||||
std::string getFilename() const override {
|
||||
if( !file.getPtr() )
|
||||
throw io_error().asInjectedFault();
|
||||
return file->getFilename();
|
||||
|
@ -137,7 +137,7 @@ public:
|
|||
std::string filename;
|
||||
|
||||
//An approximation of the size of the file; .size() should be used instead of this variable in most cases
|
||||
int64_t approximateSize;
|
||||
mutable int64_t approximateSize;
|
||||
|
||||
//The address of the machine that opened the file
|
||||
NetworkAddress openedAddress;
|
||||
|
@ -306,17 +306,11 @@ public:
|
|||
}
|
||||
|
||||
//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
|
||||
Future<int64_t> size() {
|
||||
return size(this);
|
||||
}
|
||||
Future<int64_t> size() const override { return size(this); }
|
||||
|
||||
int64_t debugFD() {
|
||||
return file->debugFD();
|
||||
}
|
||||
int64_t debugFD() const override { return file->debugFD(); }
|
||||
|
||||
std::string getFilename() {
|
||||
return file->getFilename();
|
||||
}
|
||||
std::string getFilename() const override { return file->getFilename(); }
|
||||
|
||||
//Forces a non-durable sync (some writes are not made or made incorrectly)
|
||||
//This is used when the file should 'die' without first completing its operations
|
||||
|
@ -358,7 +352,7 @@ private:
|
|||
}
|
||||
|
||||
//Checks if the file is killed. If so, then the current sync is completed if running and then an error is thrown
|
||||
ACTOR Future<Void> checkKilled(AsyncFileNonDurable *self, std::string context) {
|
||||
ACTOR static Future<Void> checkKilled(AsyncFileNonDurable const* self, std::string context) {
|
||||
if(self->killed.isSet()) {
|
||||
//TraceEvent("AsyncFileNonDurable_KilledInCheck", self->id).detail("In", context).detail("Filename", self->filename);
|
||||
wait(self->killComplete.getFuture());
|
||||
|
@ -372,14 +366,14 @@ private:
|
|||
|
||||
//Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the results
|
||||
ACTOR Future<int> onRead(AsyncFileNonDurable *self, void *data, int length, int64_t offset) {
|
||||
wait(self->checkKilled(self, "Read"));
|
||||
wait(checkKilled(self, "Read"));
|
||||
vector<Future<Void>> priorModifications = self->getModificationsAndInsert(offset, length);
|
||||
wait(waitForAll(priorModifications));
|
||||
state Future<int> readFuture = self->file->read(data, length, offset);
|
||||
wait( success( readFuture ) || self->killed.getFuture() );
|
||||
|
||||
// throws if we were killed
|
||||
wait(self->checkKilled(self, "ReadEnd"));
|
||||
wait(checkKilled(self, "ReadEnd"));
|
||||
|
||||
debugFileCheck("AsyncFileNonDurableRead", self->filename, data, offset, length);
|
||||
|
||||
|
@ -421,7 +415,7 @@ private:
|
|||
|
||||
try {
|
||||
//TraceEvent("AsyncFileNonDurable_Write", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
|
||||
wait(self->checkKilled(self, "Write"));
|
||||
wait(checkKilled(self, "Write"));
|
||||
|
||||
Future<Void> writeEnded = wait(ownFuture);
|
||||
std::vector<Future<Void>> priorModifications = self->getModificationsAndInsert(offset, length, true, writeEnded);
|
||||
|
@ -543,7 +537,7 @@ private:
|
|||
|
||||
try {
|
||||
//TraceEvent("AsyncFileNonDurable_Truncate", self->id).detail("Delay", delayDuration).detail("Filename", self->filename);
|
||||
wait(self->checkKilled(self, "Truncate"));
|
||||
wait(checkKilled(self, "Truncate"));
|
||||
|
||||
Future<Void> truncateEnded = wait(ownFuture);
|
||||
std::vector<Future<Void>> priorModifications = self->getModificationsAndInsert(size, -1, true, truncateEnded);
|
||||
|
@ -600,8 +594,8 @@ private:
|
|||
wait(waitUntilDiskReady(self->diskParameters, 0, true) || self->killed.getFuture());
|
||||
}
|
||||
|
||||
wait(self->checkKilled(self, durable ? "Sync" : "Kill"));
|
||||
|
||||
wait(checkKilled(self, durable ? "Sync" : "Kill"));
|
||||
|
||||
if(!durable)
|
||||
self->killed.send( Void() );
|
||||
|
||||
|
@ -653,7 +647,7 @@ private:
|
|||
}
|
||||
//A killed file cannot be allowed to report that it successfully synced
|
||||
else {
|
||||
wait(self->checkKilled(self, "SyncEnd"));
|
||||
wait(checkKilled(self, "SyncEnd"));
|
||||
wait(self->file->sync());
|
||||
//TraceEvent("AsyncFileNonDurable_ImplSyncEnd", self->id).detail("Filename", self->filename).detail("Durable", durable);
|
||||
}
|
||||
|
@ -679,13 +673,13 @@ private:
|
|||
}
|
||||
|
||||
//Passes along size requests to the underlying file, augmenting with any writes past the end of the file
|
||||
ACTOR Future<int64_t> onSize(AsyncFileNonDurable *self) {
|
||||
ACTOR static Future<int64_t> onSize(AsyncFileNonDurable const* self) {
|
||||
//TraceEvent("AsyncFileNonDurable_Size", self->id).detail("Filename", self->filename);
|
||||
wait(self->checkKilled(self, "Size"));
|
||||
wait(checkKilled(self, "Size"));
|
||||
state Future<int64_t> sizeFuture = self->file->size();
|
||||
wait( success( sizeFuture ) || self->killed.getFuture() );
|
||||
|
||||
wait(self->checkKilled(self, "SizeEnd"));
|
||||
wait(checkKilled(self, "SizeEnd"));
|
||||
|
||||
//Include any modifications which extend past the end of the file
|
||||
uint64_t maxModification = self->pendingModifications.lastItem().begin();
|
||||
|
@ -693,14 +687,14 @@ private:
|
|||
return self->approximateSize;
|
||||
}
|
||||
|
||||
ACTOR Future<int64_t> size(AsyncFileNonDurable *self) {
|
||||
ACTOR static Future<int64_t> size(AsyncFileNonDurable const* self) {
|
||||
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
|
||||
state TaskPriority currentTaskID = g_network->getCurrentTask();
|
||||
|
||||
wait( g_simulator.onMachine( currentProcess ) );
|
||||
|
||||
try {
|
||||
state int64_t rep = wait( self->onSize( self ) );
|
||||
state int64_t rep = wait(onSize(self));
|
||||
wait( g_simulator.onProcess( currentProcess, currentTaskID ) );
|
||||
|
||||
return rep;
|
||||
|
|
|
@ -165,7 +165,7 @@ public:
|
|||
virtual Future<Void> sync() { return Void(); }
|
||||
virtual Future<Void> flush() { return Void(); }
|
||||
|
||||
virtual Future<int64_t> size() { return m_f->size(); }
|
||||
Future<int64_t> size() const override { return m_f->size(); }
|
||||
|
||||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) {
|
||||
TraceEvent(SevError, "ReadZeroCopyNotSupported").detail("FileType", "ReadAheadCache");
|
||||
|
@ -173,9 +173,9 @@ public:
|
|||
}
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
|
||||
virtual int64_t debugFD() { return -1; }
|
||||
int64_t debugFD() const override { return -1; }
|
||||
|
||||
virtual std::string getFilename() { return m_f->getFilename(); }
|
||||
virtual std::string getFilename() const override { return m_f->getFilename(); }
|
||||
|
||||
virtual ~AsyncFileReadAheadCache() {
|
||||
for(auto &it : m_blocks) {
|
||||
|
|
|
@ -159,14 +159,12 @@ public:
|
|||
|
||||
return Void();
|
||||
}
|
||||
virtual Future<int64_t> size() {
|
||||
Future<int64_t> size() const override {
|
||||
LARGE_INTEGER s;
|
||||
if (!GetFileSizeEx(file.native_handle(), &s)) throw io_error();
|
||||
return *(int64_t*)&s;
|
||||
}
|
||||
virtual std::string getFilename() {
|
||||
return filename;
|
||||
}
|
||||
std::string getFilename() const override { return filename; }
|
||||
|
||||
~AsyncFileWinASIO() { }
|
||||
|
||||
|
|
|
@ -60,10 +60,10 @@ public:
|
|||
|
||||
Future<Void> sync() { return m_f->sync(); }
|
||||
Future<Void> flush() { return m_f->flush(); }
|
||||
Future<int64_t> size() { return m_f->size(); }
|
||||
std::string getFilename() { return m_f->getFilename(); }
|
||||
Future<int64_t> size() const override { return m_f->size(); }
|
||||
std::string getFilename() const override { return m_f->getFilename(); }
|
||||
void releaseZeroCopy( void* data, int length, int64_t offset ) { return m_f->releaseZeroCopy(data, length, offset); }
|
||||
int64_t debugFD() { return m_f->debugFD(); }
|
||||
int64_t debugFD() const override { return m_f->debugFD(); }
|
||||
|
||||
AsyncFileWriteChecker(Reference<IAsyncFile> f) : m_f(f) {
|
||||
// Initialize the static history budget the first time (and only the first time) a file is opened.
|
||||
|
|
|
@ -63,8 +63,8 @@ public:
|
|||
virtual Future<Void> truncate( int64_t size ) = 0;
|
||||
virtual Future<Void> sync() = 0;
|
||||
virtual Future<Void> flush() { return Void(); } // Sends previous writes to the OS if they have been buffered in memory, but does not make them power safe
|
||||
virtual Future<int64_t> size() = 0;
|
||||
virtual std::string getFilename() = 0;
|
||||
virtual Future<int64_t> size() const = 0;
|
||||
virtual std::string getFilename() const = 0;
|
||||
|
||||
// Attempt to read the *length bytes at offset without copying. If successful, a pointer to the
|
||||
// requested bytes is written to *data, and the number of bytes successfully read is
|
||||
|
@ -80,7 +80,7 @@ public:
|
|||
virtual Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) { return io_error(); }
|
||||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
|
||||
virtual int64_t debugFD() = 0;
|
||||
virtual int64_t debugFD() const = 0;
|
||||
};
|
||||
|
||||
typedef void (*runCycleFuncPtr)();
|
||||
|
|
|
@ -164,6 +164,11 @@ public:
|
|||
i.decrementNonEnd();
|
||||
return iterator(i);
|
||||
}
|
||||
const_iterator lastItem() const {
|
||||
auto i(map.lastItem());
|
||||
i.decrementNonEnd();
|
||||
return const_iterator(i);
|
||||
}
|
||||
int size() const { return map.size() - 1; } // We always have one range bounded by two entries
|
||||
iterator randomRange() { return iterator(map.index(deterministicRandom()->randomInt(0, map.size() - 1))); }
|
||||
const_iterator randomRange() const {
|
||||
|
@ -275,4 +280,4 @@ void RangeMap<Key,Val,Range,Metric,MetricFunc>::insert( const Range& keys, const
|
|||
map.insert(beginPair, true, mf(beginPair));
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -478,7 +478,7 @@ public:
|
|||
virtual void addref() { ReferenceCounted<SimpleFile>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<SimpleFile>::delref(); }
|
||||
|
||||
virtual int64_t debugFD() { return (int64_t)h; }
|
||||
int64_t debugFD() const override { return (int64_t)h; }
|
||||
|
||||
virtual Future<int> read( void* data, int length, int64_t offset ) {
|
||||
return read_impl( this, data, length, offset );
|
||||
|
@ -496,13 +496,9 @@ public:
|
|||
return sync_impl( this );
|
||||
}
|
||||
|
||||
virtual Future<int64_t> size() {
|
||||
return size_impl( this );
|
||||
}
|
||||
Future<int64_t> size() const override { return size_impl(this); }
|
||||
|
||||
virtual std::string getFilename() {
|
||||
return actualFilename;
|
||||
}
|
||||
virtual std::string getFilename() const override { return actualFilename; }
|
||||
|
||||
~SimpleFile() {
|
||||
_close( h );
|
||||
|
@ -667,7 +663,7 @@ private:
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<int64_t> size_impl( SimpleFile* self ) {
|
||||
ACTOR static Future<int64_t> size_impl(SimpleFile const* self) {
|
||||
state UID opId = deterministicRandom()->randomUniqueID();
|
||||
if (randLog)
|
||||
fprintf(randLog, "SFS1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str());
|
||||
|
|
|
@ -785,14 +785,14 @@ public:
|
|||
void sendError(const E& exc) const { sav->sendError(exc); }
|
||||
|
||||
Future<T> getFuture() const { sav->addFutureRef(); return Future<T>(sav); }
|
||||
bool isSet() { return sav->isSet(); }
|
||||
bool canBeSet() { return sav->canBeSet(); }
|
||||
bool isValid() const { return sav != NULL; }
|
||||
bool isSet() const { return sav->isSet(); }
|
||||
bool canBeSet() const { return sav->canBeSet(); }
|
||||
bool isValid() const { return sav != nullptr; }
|
||||
Promise() : sav(new SAV<T>(0, 1)) {}
|
||||
Promise(const Promise& rhs) : sav(rhs.sav) {
|
||||
sav->addPromiseRef();
|
||||
}
|
||||
Promise(Promise&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) { rhs.sav = 0; }
|
||||
Promise(Promise&& rhs) noexcept : sav(rhs.sav) { rhs.sav = 0; }
|
||||
|
||||
~Promise() { if (sav) sav->delPromiseRef(); }
|
||||
|
||||
|
|
Loading…
Reference in New Issue