diff --git a/fdbrpc/AsyncFileCached.actor.cpp b/fdbrpc/AsyncFileCached.actor.cpp index f4a57d4646..6354e55cd0 100644 --- a/fdbrpc/AsyncFileCached.actor.cpp +++ b/fdbrpc/AsyncFileCached.actor.cpp @@ -46,7 +46,8 @@ EvictablePage::~EvictablePage() { } } -std::map AsyncFileCached::openFiles; +// A map of filename to the file handle for all opened cached files +std::map> AsyncFileCached::openFiles; void AsyncFileCached::remove_page(AFCPage* page) { pages.erase(page->pageOffset); diff --git a/fdbrpc/AsyncFileCached.actor.h b/fdbrpc/AsyncFileCached.actor.h index c5b6b3127c..84c42f9716 100644 --- a/fdbrpc/AsyncFileCached.actor.h +++ b/fdbrpc/AsyncFileCached.actor.h @@ -132,27 +132,13 @@ struct EvictablePageCache : ReferenceCounted { const CacheEvictionType cacheEvictionType; }; -struct OpenFileInfo : NonCopyable { - IAsyncFile* f; - Future> opened; // Only valid until the file is fully opened - - OpenFileInfo() : f(0) {} - OpenFileInfo(OpenFileInfo&& r) noexcept : f(r.f), opened(std::move(r.opened)) { r.f = 0; } - - Future> get() { - if (f) - return Reference::addRef(f); - else - return opened; - } -}; - struct AFCPage; class AsyncFileCached final : public IAsyncFile, public ReferenceCounted { friend struct AFCPage; public: + // Opens a file that uses the FDB in-memory page cache static Future> open(std::string filename, int flags, int mode) { //TraceEvent("AsyncFileCachedOpen").detail("Filename", filename); if (openFiles.find(filename) == openFiles.end()) { @@ -160,7 +146,7 @@ public: if (f.isReady() && f.isError()) return f; if (!f.isReady()) - openFiles[filename].opened = f; + openFiles[filename] = UnsafeWeakFutureReference(f); else return f.get(); } @@ -263,7 +249,9 @@ public: ~AsyncFileCached() override; private: - static std::map openFiles; + // A map of filename to the file handle for all opened cached files + static std::map> openFiles; + std::string filename; Reference uncached; int64_t length; @@ -330,6 +318,7 @@ private: static Future> open_impl(std::string filename, int flags, int mode); + // Opens a file that uses the FDB in-memory page cache ACTOR static Future> open_impl(std::string filename, int flags, int mode, @@ -345,10 +334,7 @@ private: TraceEvent("AFCUnderlyingOpenEnd").detail("Filename", filename); int64_t l = wait(f->size()); TraceEvent("AFCUnderlyingSize").detail("Filename", filename).detail("Size", l); - auto& of = openFiles[filename]; - of.f = new AsyncFileCached(f, filename, l, pageCache); - of.opened = Future>(); - return Reference(of.f); + return new AsyncFileCached(f, filename, l, pageCache); } catch (Error& e) { if (e.code() != error_code_actor_cancelled) openFiles.erase(filename); diff --git a/fdbrpc/AsyncFileNonDurable.actor.h b/fdbrpc/AsyncFileNonDurable.actor.h index 00c0f7441d..f813c1a354 100644 --- a/fdbrpc/AsyncFileNonDurable.actor.h +++ b/fdbrpc/AsyncFileNonDurable.actor.h @@ -130,6 +130,9 @@ public: UID id; std::string filename; + // For files that use atomic write and create, they are initially created with an extra suffix + std::string initialFilename; + // An approximation of the size of the file; .size() should be used instead of this variable in most cases mutable int64_t approximateSize; @@ -182,11 +185,13 @@ private: reponses; // cannot call getResult on this actor collection, since the actors will be on different processes AsyncFileNonDurable(const std::string& filename, + const std::string& initialFilename, Reference file, Reference diskParameters, NetworkAddress openedAddress, bool aio) - : openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false), + : filename(filename), initialFilename(initialFilename), file(file), diskParameters(diskParameters), + openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false), aio(aio) { // This is only designed to work in simulation @@ -194,9 +199,6 @@ private: this->id = deterministicRandom()->randomUniqueID(); //TraceEvent("AsyncFileNonDurable_Create", id).detail("Filename", filename); - this->file = file; - this->filename = filename; - this->diskParameters = diskParameters; maxWriteDelay = FLOW_KNOBS->NON_DURABLE_MAX_WRITE_DELAY; hasBeenSynced = false; @@ -236,10 +238,11 @@ public: //TraceEvent("AsyncFileNonDurableOpenWaitOnDelete2").detail("Filename", filename); if (shutdown.isReady()) throw io_error().asInjectedFault(); + wait(g_simulator.onProcess(currentProcess, currentTaskID)); } state Reference nonDurableFile( - new AsyncFileNonDurable(filename, file, diskParameters, currentProcess->address, aio)); + new AsyncFileNonDurable(filename, actualFilename, file, diskParameters, currentProcess->address, aio)); // Causes the approximateSize member to be set state Future sizeFuture = nonDurableFile->size(); @@ -269,13 +272,38 @@ public: } void addref() override { ReferenceCounted::addref(); } + void delref() override { if (delref_no_destroy()) { - ASSERT(filesBeingDeleted.count(filename) == 0); - //TraceEvent("AsyncFileNonDurable_StartDelete", id).detail("Filename", filename); - Future deleteFuture = deleteFile(this); - if (!deleteFuture.isReady()) - filesBeingDeleted[filename] = deleteFuture; + if (filesBeingDeleted.count(filename) == 0) { + //TraceEvent("AsyncFileNonDurable_StartDelete", id).detail("Filename", filename); + Future deleteFuture = deleteFile(this); + if (!deleteFuture.isReady()) + filesBeingDeleted[filename] = deleteFuture; + } + + removeOpenFile(filename, this); + if (initialFilename != filename) { + removeOpenFile(initialFilename, this); + } + } + } + + // Removes a file from the openFiles map + static void removeOpenFile(std::string filename, AsyncFileNonDurable* file) { + auto& openFiles = g_simulator.getCurrentProcess()->machine->openFiles; + + auto iter = openFiles.find(filename); + + // Various actions (e.g. simulated delete) can remove a file from openFiles prematurely, so it may already + // be gone. Renamed files (from atomic write and create) will also be present under only one of the two + // names. + if (iter != openFiles.end()) { + // even if the filename exists, it doesn't mean that it references the same file. It could be that the + // file was renamed and later a file with the same name was opened. + if (iter->second.getPtrIfReady().orDefault(nullptr) == file) { + openFiles.erase(iter); + } } } @@ -832,11 +860,9 @@ private: //TraceEvent("AsyncFileNonDurable_FinishDelete", self->id).detail("Filename", self->filename); delete self; - wait(g_simulator.onProcess(currentProcess, currentTaskID)); return Void(); } catch (Error& e) { state Error err = e; - wait(g_simulator.onProcess(currentProcess, currentTaskID)); throw err; } } diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 1af14ec676..ee735b963a 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -536,7 +536,10 @@ public: std::string getFilename() const override { return actualFilename; } - ~SimpleFile() override { _close(h); } + ~SimpleFile() override { + _close(h); + --openCount; + } private: int h; @@ -1015,8 +1018,8 @@ public: // Get the size of all files we've created on the server and subtract them from the free space for (auto file = proc->machine->openFiles.begin(); file != proc->machine->openFiles.end(); ++file) { - if (file->second.isReady()) { - totalFileSize += ((AsyncFileNonDurable*)file->second.get().getPtr())->approximateSize; + if (file->second.get().isReady()) { + totalFileSize += ((AsyncFileNonDurable*)file->second.get().get().getPtr())->approximateSize; } numFiles++; } @@ -2440,7 +2443,7 @@ Future> Sim2FileSystem::open(const std::string& file actualFilename = filename + ".part"; auto partFile = machineCache.find(actualFilename); if (partFile != machineCache.end()) { - Future> f = AsyncFileDetachable::open(partFile->second); + Future> f = AsyncFileDetachable::open(partFile->second.get()); if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0) f = map(f, [=](Reference r) { return Reference(new AsyncFileWriteChecker(r)); @@ -2448,19 +2451,26 @@ Future> Sim2FileSystem::open(const std::string& file return f; } } - if (machineCache.find(actualFilename) == machineCache.end()) { + + Future> f; + auto itr = machineCache.find(actualFilename); + if (itr == machineCache.end()) { // Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile. // This way, they can both keep up with the time to start the next operation auto diskParameters = makeReference(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH); - machineCache[actualFilename] = - AsyncFileNonDurable::open(filename, + f = AsyncFileNonDurable::open(filename, actualFilename, SimpleFile::open(filename, flags, mode, diskParameters, false), diskParameters, (flags & IAsyncFile::OPEN_NO_AIO) == 0); + + machineCache[actualFilename] = UnsafeWeakFutureReference(f); + } else { + f = itr->second.get(); } - Future> f = AsyncFileDetachable::open(machineCache[actualFilename]); + + f = AsyncFileDetachable::open(f); if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0) f = map(f, [=](Reference r) { return Reference(new AsyncFileWriteChecker(r)); }); return f; diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 4b74ed91ba..f83686f464 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -188,10 +188,14 @@ public: Promise shutdownSignal; }; + // A set of data associated with a simulated machine struct MachineInfo { ProcessInfo* machineProcess; std::vector processes; - std::map>> openFiles; + + // A map from filename to file handle for all open files on a machine + std::map> openFiles; + std::set deletingFiles; std::set closingFiles; Optional> machineId; diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index bfa1f9d007..b33aeb21c5 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -175,7 +175,6 @@ class TestConfig { ifs.close(); } - public: int extraDB = 0; int minimumReplication = 0; @@ -708,8 +707,8 @@ ACTOR Future simulatedMachine(ClusterConnectionString connStr, // Copy the file pointers to a vector because the map may be modified while we are killing files std::vector files; for (auto fileItr = machineCache.begin(); fileItr != machineCache.end(); ++fileItr) { - ASSERT(fileItr->second.isReady()); - files.push_back((AsyncFileNonDurable*)fileItr->second.get().getPtr()); + ASSERT(fileItr->second.get().isReady()); + files.push_back((AsyncFileNonDurable*)fileItr->second.get().get().getPtr()); } std::vector> killFutures; @@ -725,7 +724,7 @@ ACTOR Future simulatedMachine(ClusterConnectionString connStr, for (auto it : machineCache) { filenames.insert(it.first); closingStr += it.first + ", "; - ASSERT(it.second.isReady() && !it.second.isError()); + ASSERT(it.second.get().canGet()); } for (auto it : g_simulator.getMachineById(localities.machineId())->deletingFiles) { @@ -1240,7 +1239,7 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) { if (deterministicRandom()->random01() < 0.5) set_config(format("log_spill:=%d", TLogSpillType::DEFAULT)); } - + if (deterministicRandom()->random01() < 0.5) { set_config("backup_worker_enabled:=1"); } diff --git a/flow/flow.h b/flow/flow.h index 987572d7c5..e03d598d9b 100644 --- a/flow/flow.h +++ b/flow/flow.h @@ -674,6 +674,8 @@ public: bool isValid() const { return sav != 0; } bool isReady() const { return sav->isSet(); } bool isError() const { return sav->isError(); } + // returns true if get can be called on this future (counterpart of canBeSet on Promises) + bool canGet() const { return isValid() && isReady() && !isError(); } Error& getError() const { ASSERT(isError()); return sav->error_state; diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index 7bf2a05e63..400b9cdf41 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -1899,6 +1899,59 @@ Future operator>>(Future const& lhs, Future const& rhs) { return runAfter(lhs, rhs); } +// A weak reference type to wrap a future Reference object. +// Once the future is complete, this object holds a pointer to the referenced object but does +// not contribute to its reference count. +// +// WARNING: this class will not be aware when the underlying object is destroyed. It is up to the +// user to make sure that an UnsafeWeakFutureReference is discarded at the same time the object is. +template +class UnsafeWeakFutureReference { +public: + UnsafeWeakFutureReference() {} + UnsafeWeakFutureReference(Future> future) : data(new UnsafeWeakFutureReferenceData(future)) {} + + // Returns a future to obtain a normal reference handle + // If the future is ready, this creates a Reference to wrap the object + Future> get() { + if (!data) { + return Reference(); + } else if (data->ptr.present()) { + return Reference::addRef(data->ptr.get()); + } else { + return data->future; + } + } + + // Returns the raw pointer, if the object is ready + // Note: this should be used with care, as this pointer is not counted as a reference to the object and + // it could be deleted if all normal references are destroyed. + Optional getPtrIfReady() { return data->ptr; } + +private: + // A class to hold the state for an UnsafeWeakFutureReference + struct UnsafeWeakFutureReferenceData : public ReferenceCounted, NonCopyable { + Optional ptr; + Future> future; + Future moveResultFuture; + + UnsafeWeakFutureReferenceData(Future> future) : future(future) { + moveResultFuture = moveResult(this); + } + + // Waits for the future to complete and then stores the pointer in local storage + // When this completes, we will no longer be counted toward the reference count of the object + ACTOR Future moveResult(UnsafeWeakFutureReferenceData* self) { + Reference result = wait(self->future); + self->ptr = result.getPtr(); + self->future = Future>(); + return Void(); + } + }; + + Reference data; +}; + #include "flow/unactorcompiler.h" #endif