Merge pull request #4873 from sfc-gh-ajbeamon/close-files-in-simulation

Actually close files in simulation
This commit is contained in:
A.J. Beamon 2021-05-28 15:32:10 -07:00 committed by GitHub
commit d35da1aeae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 129 additions and 48 deletions

View File

@ -46,7 +46,8 @@ EvictablePage::~EvictablePage() {
}
}
std::map<std::string, OpenFileInfo> AsyncFileCached::openFiles;
// A map of filename to the file handle for all opened cached files
std::map<std::string, UnsafeWeakFutureReference<IAsyncFile>> AsyncFileCached::openFiles;
void AsyncFileCached::remove_page(AFCPage* page) {
pages.erase(page->pageOffset);

View File

@ -132,27 +132,13 @@ struct EvictablePageCache : ReferenceCounted<EvictablePageCache> {
const CacheEvictionType cacheEvictionType;
};
struct OpenFileInfo : NonCopyable {
IAsyncFile* f;
Future<Reference<IAsyncFile>> opened; // Only valid until the file is fully opened
OpenFileInfo() : f(0) {}
OpenFileInfo(OpenFileInfo&& r) noexcept : f(r.f), opened(std::move(r.opened)) { r.f = 0; }
Future<Reference<IAsyncFile>> get() {
if (f)
return Reference<IAsyncFile>::addRef(f);
else
return opened;
}
};
struct AFCPage;
class AsyncFileCached final : public IAsyncFile, public ReferenceCounted<AsyncFileCached> {
friend struct AFCPage;
public:
// Opens a file that uses the FDB in-memory page cache
static Future<Reference<IAsyncFile>> open(std::string filename, int flags, int mode) {
//TraceEvent("AsyncFileCachedOpen").detail("Filename", filename);
if (openFiles.find(filename) == openFiles.end()) {
@ -160,7 +146,7 @@ public:
if (f.isReady() && f.isError())
return f;
if (!f.isReady())
openFiles[filename].opened = f;
openFiles[filename] = UnsafeWeakFutureReference<IAsyncFile>(f);
else
return f.get();
}
@ -263,7 +249,9 @@ public:
~AsyncFileCached() override;
private:
static std::map<std::string, OpenFileInfo> openFiles;
// A map of filename to the file handle for all opened cached files
static std::map<std::string, UnsafeWeakFutureReference<IAsyncFile>> openFiles;
std::string filename;
Reference<IAsyncFile> uncached;
int64_t length;
@ -330,6 +318,7 @@ private:
static Future<Reference<IAsyncFile>> open_impl(std::string filename, int flags, int mode);
// Opens a file that uses the FDB in-memory page cache
ACTOR static Future<Reference<IAsyncFile>> open_impl(std::string filename,
int flags,
int mode,
@ -345,10 +334,7 @@ private:
TraceEvent("AFCUnderlyingOpenEnd").detail("Filename", filename);
int64_t l = wait(f->size());
TraceEvent("AFCUnderlyingSize").detail("Filename", filename).detail("Size", l);
auto& of = openFiles[filename];
of.f = new AsyncFileCached(f, filename, l, pageCache);
of.opened = Future<Reference<IAsyncFile>>();
return Reference<IAsyncFile>(of.f);
return new AsyncFileCached(f, filename, l, pageCache);
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled)
openFiles.erase(filename);

View File

@ -130,6 +130,9 @@ public:
UID id;
std::string filename;
// For files that use atomic write and create, they are initially created with an extra suffix
std::string initialFilename;
// An approximation of the size of the file; .size() should be used instead of this variable in most cases
mutable int64_t approximateSize;
@ -182,11 +185,13 @@ private:
reponses; // cannot call getResult on this actor collection, since the actors will be on different processes
AsyncFileNonDurable(const std::string& filename,
const std::string& initialFilename,
Reference<IAsyncFile> file,
Reference<DiskParameters> diskParameters,
NetworkAddress openedAddress,
bool aio)
: openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false),
: filename(filename), initialFilename(initialFilename), file(file), diskParameters(diskParameters),
openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false),
aio(aio) {
// This is only designed to work in simulation
@ -194,9 +199,6 @@ private:
this->id = deterministicRandom()->randomUniqueID();
//TraceEvent("AsyncFileNonDurable_Create", id).detail("Filename", filename);
this->file = file;
this->filename = filename;
this->diskParameters = diskParameters;
maxWriteDelay = FLOW_KNOBS->NON_DURABLE_MAX_WRITE_DELAY;
hasBeenSynced = false;
@ -236,10 +238,11 @@ public:
//TraceEvent("AsyncFileNonDurableOpenWaitOnDelete2").detail("Filename", filename);
if (shutdown.isReady())
throw io_error().asInjectedFault();
wait(g_simulator.onProcess(currentProcess, currentTaskID));
}
state Reference<AsyncFileNonDurable> nonDurableFile(
new AsyncFileNonDurable(filename, file, diskParameters, currentProcess->address, aio));
new AsyncFileNonDurable(filename, actualFilename, file, diskParameters, currentProcess->address, aio));
// Causes the approximateSize member to be set
state Future<int64_t> sizeFuture = nonDurableFile->size();
@ -269,13 +272,38 @@ public:
}
void addref() override { ReferenceCounted<AsyncFileNonDurable>::addref(); }
void delref() override {
if (delref_no_destroy()) {
ASSERT(filesBeingDeleted.count(filename) == 0);
//TraceEvent("AsyncFileNonDurable_StartDelete", id).detail("Filename", filename);
Future<Void> deleteFuture = deleteFile(this);
if (!deleteFuture.isReady())
filesBeingDeleted[filename] = deleteFuture;
if (filesBeingDeleted.count(filename) == 0) {
//TraceEvent("AsyncFileNonDurable_StartDelete", id).detail("Filename", filename);
Future<Void> deleteFuture = deleteFile(this);
if (!deleteFuture.isReady())
filesBeingDeleted[filename] = deleteFuture;
}
removeOpenFile(filename, this);
if (initialFilename != filename) {
removeOpenFile(initialFilename, this);
}
}
}
// Removes a file from the openFiles map
static void removeOpenFile(std::string filename, AsyncFileNonDurable* file) {
auto& openFiles = g_simulator.getCurrentProcess()->machine->openFiles;
auto iter = openFiles.find(filename);
// Various actions (e.g. simulated delete) can remove a file from openFiles prematurely, so it may already
// be gone. Renamed files (from atomic write and create) will also be present under only one of the two
// names.
if (iter != openFiles.end()) {
// even if the filename exists, it doesn't mean that it references the same file. It could be that the
// file was renamed and later a file with the same name was opened.
if (iter->second.getPtrIfReady().orDefault(nullptr) == file) {
openFiles.erase(iter);
}
}
}
@ -832,11 +860,9 @@ private:
//TraceEvent("AsyncFileNonDurable_FinishDelete", self->id).detail("Filename", self->filename);
delete self;
wait(g_simulator.onProcess(currentProcess, currentTaskID));
return Void();
} catch (Error& e) {
state Error err = e;
wait(g_simulator.onProcess(currentProcess, currentTaskID));
throw err;
}
}

View File

@ -536,7 +536,10 @@ public:
std::string getFilename() const override { return actualFilename; }
~SimpleFile() override { _close(h); }
~SimpleFile() override {
_close(h);
--openCount;
}
private:
int h;
@ -1015,8 +1018,8 @@ public:
// Get the size of all files we've created on the server and subtract them from the free space
for (auto file = proc->machine->openFiles.begin(); file != proc->machine->openFiles.end(); ++file) {
if (file->second.isReady()) {
totalFileSize += ((AsyncFileNonDurable*)file->second.get().getPtr())->approximateSize;
if (file->second.get().isReady()) {
totalFileSize += ((AsyncFileNonDurable*)file->second.get().get().getPtr())->approximateSize;
}
numFiles++;
}
@ -2440,7 +2443,7 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
actualFilename = filename + ".part";
auto partFile = machineCache.find(actualFilename);
if (partFile != machineCache.end()) {
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(partFile->second);
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(partFile->second.get());
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) {
return Reference<IAsyncFile>(new AsyncFileWriteChecker(r));
@ -2448,19 +2451,26 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
return f;
}
}
if (machineCache.find(actualFilename) == machineCache.end()) {
Future<Reference<IAsyncFile>> f;
auto itr = machineCache.find(actualFilename);
if (itr == machineCache.end()) {
// Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile.
// This way, they can both keep up with the time to start the next operation
auto diskParameters =
makeReference<DiskParameters>(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH);
machineCache[actualFilename] =
AsyncFileNonDurable::open(filename,
f = AsyncFileNonDurable::open(filename,
actualFilename,
SimpleFile::open(filename, flags, mode, diskParameters, false),
diskParameters,
(flags & IAsyncFile::OPEN_NO_AIO) == 0);
machineCache[actualFilename] = UnsafeWeakFutureReference<IAsyncFile>(f);
} else {
f = itr->second.get();
}
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(machineCache[actualFilename]);
f = AsyncFileDetachable::open(f);
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
return f;

View File

@ -188,10 +188,14 @@ public:
Promise<KillType> shutdownSignal;
};
// A set of data associated with a simulated machine
struct MachineInfo {
ProcessInfo* machineProcess;
std::vector<ProcessInfo*> processes;
std::map<std::string, Future<Reference<IAsyncFile>>> openFiles;
// A map from filename to file handle for all open files on a machine
std::map<std::string, UnsafeWeakFutureReference<IAsyncFile>> openFiles;
std::set<std::string> deletingFiles;
std::set<std::string> closingFiles;
Optional<Standalone<StringRef>> machineId;

View File

@ -175,7 +175,6 @@ class TestConfig {
ifs.close();
}
public:
int extraDB = 0;
int minimumReplication = 0;
@ -708,8 +707,8 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
// Copy the file pointers to a vector because the map may be modified while we are killing files
std::vector<AsyncFileNonDurable*> files;
for (auto fileItr = machineCache.begin(); fileItr != machineCache.end(); ++fileItr) {
ASSERT(fileItr->second.isReady());
files.push_back((AsyncFileNonDurable*)fileItr->second.get().getPtr());
ASSERT(fileItr->second.get().isReady());
files.push_back((AsyncFileNonDurable*)fileItr->second.get().get().getPtr());
}
std::vector<Future<Void>> killFutures;
@ -725,7 +724,7 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
for (auto it : machineCache) {
filenames.insert(it.first);
closingStr += it.first + ", ";
ASSERT(it.second.isReady() && !it.second.isError());
ASSERT(it.second.get().canGet());
}
for (auto it : g_simulator.getMachineById(localities.machineId())->deletingFiles) {
@ -1240,7 +1239,7 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
if (deterministicRandom()->random01() < 0.5)
set_config(format("log_spill:=%d", TLogSpillType::DEFAULT));
}
if (deterministicRandom()->random01() < 0.5) {
set_config("backup_worker_enabled:=1");
}

View File

@ -674,6 +674,8 @@ public:
bool isValid() const { return sav != 0; }
bool isReady() const { return sav->isSet(); }
bool isError() const { return sav->isError(); }
// returns true if get can be called on this future (counterpart of canBeSet on Promises)
bool canGet() const { return isValid() && isReady() && !isError(); }
Error& getError() const {
ASSERT(isError());
return sav->error_state;

View File

@ -1899,6 +1899,59 @@ Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
return runAfter(lhs, rhs);
}
// A weak reference type to wrap a future Reference<T> object.
// Once the future is complete, this object holds a pointer to the referenced object but does
// not contribute to its reference count.
//
// WARNING: this class will not be aware when the underlying object is destroyed. It is up to the
// user to make sure that an UnsafeWeakFutureReference is discarded at the same time the object is.
template <class T>
class UnsafeWeakFutureReference {
public:
UnsafeWeakFutureReference() {}
UnsafeWeakFutureReference(Future<Reference<T>> future) : data(new UnsafeWeakFutureReferenceData(future)) {}
// Returns a future to obtain a normal reference handle
// If the future is ready, this creates a Reference<T> to wrap the object
Future<Reference<T>> get() {
if (!data) {
return Reference<T>();
} else if (data->ptr.present()) {
return Reference<T>::addRef(data->ptr.get());
} else {
return data->future;
}
}
// Returns the raw pointer, if the object is ready
// Note: this should be used with care, as this pointer is not counted as a reference to the object and
// it could be deleted if all normal references are destroyed.
Optional<T*> getPtrIfReady() { return data->ptr; }
private:
// A class to hold the state for an UnsafeWeakFutureReference
struct UnsafeWeakFutureReferenceData : public ReferenceCounted<UnsafeWeakFutureReferenceData>, NonCopyable {
Optional<T*> ptr;
Future<Reference<T>> future;
Future<Void> moveResultFuture;
UnsafeWeakFutureReferenceData(Future<Reference<T>> future) : future(future) {
moveResultFuture = moveResult(this);
}
// Waits for the future to complete and then stores the pointer in local storage
// When this completes, we will no longer be counted toward the reference count of the object
ACTOR Future<Void> moveResult(UnsafeWeakFutureReferenceData* self) {
Reference<T> result = wait(self->future);
self->ptr = result.getPtr();
self->future = Future<Reference<T>>();
return Void();
}
};
Reference<UnsafeWeakFutureReferenceData> data;
};
#include "flow/unactorcompiler.h"
#endif