Make IDiskQueue const-correct

This commit is contained in:
sfc-gh-tclinkenbeard 2020-07-21 11:57:27 -07:00
parent 9416e9139e
commit 3755b25c43
4 changed files with 84 additions and 60 deletions

View File

@ -218,7 +218,7 @@ public:
void dispose() { shutdown(this, true); }
void close() { shutdown(this, false); }
StorageBytes getStorageBytes() {
StorageBytes getStorageBytes() const {
int64_t free;
int64_t total;
@ -789,7 +789,7 @@ public:
{
}
virtual location push( StringRef contents ) {
location push(StringRef contents) override {
ASSERT( recovered );
uint8_t const* begin = contents.begin();
uint8_t const* end = contents.end();
@ -807,7 +807,7 @@ public:
return endLocation();
}
virtual void pop( location upTo ) {
void pop(location upTo) override {
ASSERT( !upTo.hi );
ASSERT( !recovered || upTo.lo <= endLocation() );
@ -829,14 +829,14 @@ public:
}
}
virtual Future<Standalone<StringRef>> read(location from, location to, CheckHashes ch) { return read(this, from, to, ch); }
int getMaxPayload() {
return Page::maxPayload;
Future<Standalone<StringRef>> read(location from, location to, CheckHashes ch) override {
return read(this, from, to, ch);
}
int getMaxPayload() const { return Page::maxPayload; }
// Always commit an entire page. Commit overhead is the unused space in a to-be-committed page
virtual int getCommitOverhead() {
int getCommitOverhead() const override {
if(!pushedPageCount()) {
if(!anyPopped)
return 0;
@ -849,7 +849,7 @@ public:
return backPage().remainingCapacity();
}
virtual Future<Void> commit() {
Future<Void> commit() override {
ASSERT( recovered );
if (!pushedPageCount()) {
if (!anyPopped) return Void();
@ -887,30 +887,30 @@ public:
rawQueue->stall();
}
virtual Future<bool> initializeRecovery(location recoverAt) { return initializeRecovery( this, recoverAt ); }
virtual Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
Future<bool> initializeRecovery(location recoverAt) override { return initializeRecovery(this, recoverAt); }
Future<Standalone<StringRef>> readNext(int bytes) override { return readNext(this, bytes); }
// FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs
// to be changed to understand the new intiailizeRecovery protocol.
virtual location getNextReadLocation() { return nextReadLocation; }
virtual location getNextCommitLocation() { ASSERT( initialized ); return lastCommittedSeq + sizeof(Page); }
virtual location getNextPushLocation() { ASSERT( initialized ); return endLocation(); }
location getNextReadLocation() const override { return nextReadLocation; }
location getNextCommitLocation() const override {
ASSERT(initialized);
return lastCommittedSeq + sizeof(Page);
}
location getNextPushLocation() const override {
ASSERT(initialized);
return endLocation();
}
virtual Future<Void> getError() { return rawQueue->getError(); }
virtual Future<Void> onClosed() { return rawQueue->onClosed(); }
Future<Void> getError() override { return rawQueue->getError(); }
Future<Void> onClosed() override { return rawQueue->onClosed(); }
virtual void dispose() {
void dispose() override {
TraceEvent("DQDestroy", dbgid).detail("LastPoppedSeq", lastPoppedSeq).detail("PoppedSeq", poppedSeq).detail("NextPageSeq", nextPageSeq).detail("File0Name", rawQueue->files[0].dbgFilename);
dispose(this);
}
ACTOR static void dispose(DiskQueue* self) {
wait( self->onSafeToDestruct() );
TraceEvent("DQDestroyDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename);
self->rawQueue->dispose();
delete self;
}
virtual void close() {
void close() override {
TraceEvent("DQClose", dbgid)
.detail("LastPoppedSeq", lastPoppedSeq)
.detail("PoppedSeq", poppedSeq)
@ -919,6 +919,17 @@ public:
.detail("File0Name", rawQueue->files[0].dbgFilename);
close(this);
}
StorageBytes getStorageBytes() const override { return rawQueue->getStorageBytes(); }
private:
ACTOR static void dispose(DiskQueue* self) {
wait(self->onSafeToDestruct());
TraceEvent("DQDestroyDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename);
self->rawQueue->dispose();
delete self;
}
ACTOR static void close(DiskQueue* self) {
wait( self->onSafeToDestruct() );
TraceEvent("DQCloseDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename);
@ -926,11 +937,6 @@ public:
delete self;
}
virtual StorageBytes getStorageBytes() {
return rawQueue->getStorageBytes();
}
private:
#pragma pack(push, 1)
struct PageHeader {
union {
@ -1399,29 +1405,30 @@ public:
Future<bool> initializeRecovery(location recoverAt) { return queue->initializeRecovery(recoverAt); }
Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); }
virtual location getNextReadLocation() { return queue->getNextReadLocation(); }
location getNextReadLocation() const override { return queue->getNextReadLocation(); }
virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes ch ) { return queue->read( start, end, ch ); }
virtual location getNextCommitLocation() { return queue->getNextCommitLocation(); }
virtual location getNextPushLocation() { return queue->getNextPushLocation(); }
Future<Standalone<StringRef>> read(location start, location end, CheckHashes ch) override {
return queue->read(start, end, ch);
}
location getNextCommitLocation() const override { return queue->getNextCommitLocation(); }
location getNextPushLocation() const override { return queue->getNextPushLocation(); }
virtual location push( StringRef contents ) {
location push(StringRef contents) override {
pushed = queue->push(contents);
return pushed;
}
virtual void pop( location upTo ) {
void pop(location upTo) override {
popped = std::max(popped, upTo);
ASSERT_WE_THINK(committed >= popped);
queue->pop(std::min(committed, popped));
}
virtual int getCommitOverhead() {
int getCommitOverhead() const override {
return queue->getCommitOverhead() + (popped > committed ? queue->getMaxPayload() : 0);
}
Future<Void> commit() {
Future<Void> commit() override {
location pushLocation = pushed;
location popLocation = popped;
@ -1444,7 +1451,7 @@ public:
return commitFuture;
}
virtual StorageBytes getStorageBytes() { return queue->getStorageBytes(); }
StorageBytes getStorageBytes() const override { return queue->getStorageBytes(); }
private:
DiskQueue *queue;

View File

@ -68,18 +68,23 @@ public:
// Before calling push or commit, the caller *must* perform recovery by calling readNext() until it returns less than the requested number of bytes.
// Thereafter it may not be called again.
virtual Future<Standalone<StringRef>> readNext( int bytes ) = 0; // Return the next bytes in the queue (beginning, the first time called, with the first unpopped byte)
virtual location getNextReadLocation() = 0; // Returns a location >= the location of all bytes previously returned by readNext(), and <= the location of all bytes subsequently returned
virtual location getNextCommitLocation() = 0; // If commit() were to be called, all buffered writes would be written starting at `location`.
virtual location getNextPushLocation() = 0; // If push() were to be called, the pushed data would be written starting at `location`.
virtual location getNextReadLocation()
const = 0; // Returns a location >= the location of all bytes previously returned by readNext(), and <= the
// location of all bytes subsequently returned
virtual location getNextCommitLocation()
const = 0; // If commit() were to be called, all buffered writes would be written starting at `location`.
virtual location getNextPushLocation()
const = 0; // If push() were to be called, the pushed data would be written starting at `location`.
virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes vc ) = 0;
virtual location push( StringRef contents ) = 0; // Appends the given bytes to the byte stream. Returns a location token representing the *end* of the contents.
virtual void pop( location upTo ) = 0; // Removes all bytes before the given location token from the byte stream.
virtual Future<Void> commit() = 0; // returns when all prior pushes and pops are durable. If commit does not return (due to close or a crash), any prefix of the pushed bytes and any prefix of the popped bytes may be durable.
virtual int getCommitOverhead() = 0; // returns the amount of unused space that would be written by a commit that immediately followed this call
virtual int getCommitOverhead() const = 0; // returns the amount of unused space that would be written by a commit
// that immediately followed this call
virtual StorageBytes getStorageBytes() = 0;
virtual StorageBytes getStorageBytes() const = 0;
};
template<>

View File

@ -136,7 +136,7 @@ Future<Standalone<StringRef>> LogSystemDiskQueueAdapter::readNext( int bytes ) {
return LogSystemDiskQueueAdapterImpl::readNext(this, bytes);
}
IDiskQueue::location LogSystemDiskQueueAdapter::getNextReadLocation() {
IDiskQueue::location LogSystemDiskQueueAdapter::getNextReadLocation() const {
return IDiskQueue::location( 0, recoveryQueueLoc );
}

View File

@ -73,23 +73,35 @@ public:
Future<CommitMessage> getCommitMessage();
// IClosable interface
virtual Future<Void> getError();
virtual Future<Void> onClosed();
virtual void dispose();
virtual void close();
Future<Void> getError() override;
Future<Void> onClosed() override;
void dispose() override;
void close() override;
// IDiskQueue interface
virtual Future<bool> initializeRecovery(location recoverAt) { return false; }
virtual Future<Standalone<StringRef>> readNext( int bytes );
virtual IDiskQueue::location getNextReadLocation();
virtual IDiskQueue::location getNextCommitLocation() { ASSERT(false); throw internal_error(); }
virtual IDiskQueue::location getNextPushLocation() { ASSERT(false); throw internal_error(); }
virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes ch ) { ASSERT(false); throw internal_error(); }
virtual IDiskQueue::location push( StringRef contents );
virtual void pop( IDiskQueue::location upTo );
virtual Future<Void> commit();
virtual StorageBytes getStorageBytes() { ASSERT(false); throw internal_error(); }
virtual int getCommitOverhead() { return 0; } //SOMEDAY: could this be more accurate?
Future<bool> initializeRecovery(location recoverAt) override { return false; }
Future<Standalone<StringRef>> readNext(int bytes) override;
IDiskQueue::location getNextReadLocation() const override;
IDiskQueue::location getNextCommitLocation() const override {
ASSERT(false);
throw internal_error();
}
IDiskQueue::location getNextPushLocation() const override {
ASSERT(false);
throw internal_error();
}
Future<Standalone<StringRef>> read(location start, location end, CheckHashes ch) override {
ASSERT(false);
throw internal_error();
}
IDiskQueue::location push(StringRef contents) override;
void pop(IDiskQueue::location upTo) override;
Future<Void> commit() override;
StorageBytes getStorageBytes() const override {
ASSERT(false);
throw internal_error();
}
int getCommitOverhead() const override { return 0; } // SOMEDAY: could this be more accurate?
private:
Reference<AsyncVar<PeekTxsInfo>> peekLocality;