Merge pull request #2738 from ajbeamon/fix-assertion-failure-on-io-error

Fix assertion failure in SQLite thread pools on io_error
This commit is contained in:
Evan Tschannen 2020-04-14 16:48:22 -07:00 committed by GitHub
commit a3598a7616
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 14 additions and 10 deletions

View File

@ -180,10 +180,10 @@ class WorkPool : public IThreadPool, public ReferenceCounted<WorkPool<Threadlike
ACTOR Future<Void> stopOnError( WorkPool* w ) {
try {
wait( w->getError() );
ASSERT(false);
} catch (Error& e) {
w->error = e;
w->stop(e);
}
w->stop();
return Void();
}
@ -230,12 +230,14 @@ public:
} else
pool->queueLock.leave();
}
virtual Future<Void> stop() {
if (error.code() == invalid_error_code) error = success();
virtual Future<Void> stop(Error const& e) {
if (error.code() == invalid_error_code) {
error = e;
}
pool->queueLock.enter();
TraceEvent("WorkPool_Stop").detail("Workers", pool->workers.size()).detail("Idle", pool->idle.size())
.detail("Work", pool->work.size());
.detail("Work", pool->work.size()).error(e, true);
for (uint32_t i=0; i<pool->work.size(); i++)
pool->work[i]->cancel(); // What if cancel() does something to this?

View File

@ -1858,13 +1858,15 @@ private:
ACTOR static Future<Void> stopOnError( KeyValueStoreSQLite* self ) {
try {
wait( self->readThreads->getError() || self->writeThread->getError() );
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
self->readThreads->stop(e);
self->writeThread->stop(e);
}
self->readThreads->stop();
self->writeThread->stop();
return Void();
}

View File

@ -78,7 +78,7 @@ class ThreadPool : public IThreadPool, public ReferenceCounted<ThreadPool> {
public:
ThreadPool() : dontstop(ios), mode(Run) {}
~ThreadPool() {}
Future<Void> stop() {
Future<Void> stop(Error const& e = success()) {
if (mode == Shutdown) return Void();
ReferenceCounted<ThreadPool>::addref();
ios.stop(); // doesn't work?

View File

@ -60,7 +60,7 @@ public:
virtual Future<Void> getError() = 0; // asynchronously throws an error if there is an internal error
virtual void addThread( IThreadPoolReceiver* userData ) = 0;
virtual void post( PThreadAction action ) = 0;
virtual Future<Void> stop() = 0;
virtual Future<Void> stop(Error const& e = success()) = 0;
virtual bool isCoro() const { return false; }
virtual void addref() = 0;
virtual void delref() = 0;

View File

@ -74,7 +74,7 @@ public:
errors.sendError( unknown_error() );
}
}
Future<Void> stop() {
Future<Void> stop(Error const& e) {
return Void();
}
void addref() {