From 430bb6224eee2cd1d3844f9357144722644c10fc Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Fri, 16 Jun 2017 02:14:19 -0700 Subject: [PATCH] Merge branch 'release-4.6' into release-5.0 # Conflicts: # fdbrpc/AsyncFileKAIO.actor.h # fdbrpc/Net2FileSystem.cpp # fdbrpc/sim2.actor.cpp --- fdbrpc/AsyncFileKAIO.actor.h | 75 ------------------ fdbrpc/AsyncFileWriteChecker.cpp | 24 ++++++ fdbrpc/AsyncFileWriteChecker.h | 130 +++++++++++++++++++++++++++++++ fdbrpc/Net2FileSystem.cpp | 12 ++- fdbrpc/fdbrpc.vcxproj | 1 + fdbrpc/sim2.actor.cpp | 11 ++- flow/Knobs.cpp | 3 +- flow/Knobs.h | 3 +- 8 files changed, 177 insertions(+), 82 deletions(-) create mode 100644 fdbrpc/AsyncFileWriteChecker.cpp create mode 100644 fdbrpc/AsyncFileWriteChecker.h mode change 100644 => 100755 flow/Knobs.cpp diff --git a/fdbrpc/AsyncFileKAIO.actor.h b/fdbrpc/AsyncFileKAIO.actor.h index cd6f4e1c6a..b74f62d0fd 100644 --- a/fdbrpc/AsyncFileKAIO.actor.h +++ b/fdbrpc/AsyncFileKAIO.actor.h @@ -200,9 +200,6 @@ public: //result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; }); #endif - // Update checksum history if it is in use - if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0) - result = map(result, [=](int r) { updateChecksumHistory(false, offset, length, (uint8_t *)data); return r; }); return result; } virtual Future write( void const* data, int length, int64_t offset ) { @@ -228,10 +225,6 @@ public: //result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; }); #endif - // Update checksum history if it is in use - if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0) - result = map(result, [=](int r) { updateChecksumHistory(true, offset, length, (uint8_t *)data); return r; }); - return success(result); } virtual Future truncate( int64_t size ) { @@ -276,12 +269,6 @@ public: lastFileSize = nextFileSize = size; - // Truncate the page checksum history if it is in use - if( FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0 && ((size / checksumHistoryPageSize) < checksumHistory.size()) ) { - int oldCapacity = checksumHistory.capacity(); - checksumHistory.resize(size / checksumHistoryPageSize); - checksumHistoryBudget -= (checksumHistory.capacity() - oldCapacity); - } return Void(); } @@ -339,7 +326,6 @@ public: if(logFile != nullptr) fclose(logFile); #endif - checksumHistoryBudget += checksumHistory.capacity(); } static void launch() { @@ -438,58 +424,6 @@ private: Int64MetricHandle countLogicalWrites; Int64MetricHandle countLogicalReads; - std::vector checksumHistory; - // This is the most page checksum history blocks we will use across all files. - static int checksumHistoryBudget; - static int checksumHistoryPageSize; - - // Update or check checksum(s) in history for any full pages covered by this operation - void updateChecksumHistory(bool write, int64_t offset, int len, uint8_t *buf) { - // Check or set each full block in the the range - int page = offset / checksumHistoryPageSize; // First page number - if(offset != page * checksumHistoryPageSize) - ++page; // Advance page if first page touch isn't whole - int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1 - uint8_t *start = buf + (page * checksumHistoryPageSize - offset); // Beginning of the first page within buf - - // Make sure history is large enough or limit pageEnd - if(checksumHistory.size() < pageEnd) { - if(checksumHistoryBudget > 0) { - // Resize history and update budget based on capacity change - auto initialCapacity = checksumHistory.capacity(); - checksumHistory.resize(checksumHistory.size() + std::min(checksumHistoryBudget, pageEnd - checksumHistory.size())); - checksumHistoryBudget -= (checksumHistory.capacity() - initialCapacity); - } - - // Limit pageEnd to end of history, which works whether or not all of the desired - // history slots were allocatd. - pageEnd = checksumHistory.size(); - } - - while(page < pageEnd) { - uint32_t checksum = hashlittle(start, checksumHistoryPageSize, 0xab12fd93); - uint32_t &historySum = checksumHistory[page]; - - // For writes, just update the stored sum - if(write) { - historySum = checksum; - } - else if(historySum != 0 && historySum != checksum) { - // For reads, verify the stored sum if it is not 0. If it fails, clear it. - TraceEvent (SevError, "AsyncFileKAIODetectedLostWrite") - .detail("Filename", filename) - .detail("PageNumber", page) - .detail("ChecksumOfPage", checksum) - .detail("ChecksumHistory", historySum) - .error(checksum_failed()); - historySum = 0; - } - - start += checksumHistoryPageSize; - ++page; - } - } - struct IOBlock : linux_iocb, FastAllocated { Promise result; Reference owner; @@ -617,11 +551,6 @@ private: static Context ctx; explicit AsyncFileKAIO(int fd, int flags, std::string const& filename) : fd(fd), flags(flags), filename(filename), failed(false) { - // Initialize the static history budget the first time (and only the first time) a file is opened. - static int _ = checksumHistoryBudget = FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY; - - // Adjust the budget by the initial capacity of history, which should be 0 but maybe not for some implementations. - checksumHistoryBudget -= checksumHistory.capacity(); if( !g_network->isSimulated() ) { countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename); @@ -801,10 +730,6 @@ void AsyncFileKAIO::KAIOLogEvent(FILE *logFile, uint32_t id, OpLogEntry::EOperat } #endif -// TODO: Move this to the .cpp if there ever is one. Only one source file includes this header so defining this here is safe. -int AsyncFileKAIO::checksumHistoryBudget; -int AsyncFileKAIO::checksumHistoryPageSize = 4096; - ACTOR Future runTestOps(Reference f, int numIterations, int fileSize, bool expectedToSucceed) { state void *buf = FastAllocator<4096>::allocate(); // we leak this if there is an error, but that shouldn't be a big deal state int iteration = 0; diff --git a/fdbrpc/AsyncFileWriteChecker.cpp b/fdbrpc/AsyncFileWriteChecker.cpp new file mode 100644 index 0000000000..d1963af549 --- /dev/null +++ b/fdbrpc/AsyncFileWriteChecker.cpp @@ -0,0 +1,24 @@ +/* + * AsyncFileWriteChecker.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AsyncFileWriteChecker.h" + +int AsyncFileWriteChecker::checksumHistoryBudget; +int AsyncFileWriteChecker::checksumHistoryPageSize = 4096; diff --git a/fdbrpc/AsyncFileWriteChecker.h b/fdbrpc/AsyncFileWriteChecker.h new file mode 100644 index 0000000000..c02127df7c --- /dev/null +++ b/fdbrpc/AsyncFileWriteChecker.h @@ -0,0 +1,130 @@ +/* + * AsyncFileWriteChecker.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "IAsyncFile.h" + +class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted { +public: + void addref() { ReferenceCounted::addref(); } + void delref() { ReferenceCounted::delref(); } + + // For read() and write(), the data buffer must remain valid until the future is ready + Future read( void* data, int length, int64_t offset ) { + return map(m_f->read(data, length, offset), [=](int r) { updateChecksumHistory(false, offset, length, (uint8_t *)data); return r; }); + } + Future readZeroCopy( void** data, int* length, int64_t offset ) { + return map(m_f->readZeroCopy(data, length, offset), [=](Void r) { updateChecksumHistory(false, offset, *length, (uint8_t *)data); return r; }); + } + + Future write( void const* data, int length, int64_t offset ) { + updateChecksumHistory(true, offset, length, (uint8_t *)data); + return m_f->write(data, length, offset); + } + + Future truncate( int64_t size ) { + return map(m_f->truncate(size), [=](Void r) { + // Truncate the page checksum history if it is in use + if( (size / checksumHistoryPageSize) < checksumHistory.size() ) { + int oldCapacity = checksumHistory.capacity(); + checksumHistory.resize(size / checksumHistoryPageSize); + checksumHistoryBudget -= (checksumHistory.capacity() - oldCapacity); + } + return r; + }); + } + + Future sync() { return m_f->sync(); } + Future flush() { return m_f->flush(); } + Future size() { return m_f->size(); } + std::string getFilename() { return m_f->getFilename(); } + void releaseZeroCopy( void* data, int length, int64_t offset ) { return m_f->releaseZeroCopy(data, length, offset); } + int64_t debugFD() { return m_f->debugFD(); } + + AsyncFileWriteChecker(Reference f) : m_f(f) { + // Initialize the static history budget the first time (and only the first time) a file is opened. + static int _ = checksumHistoryBudget = FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY; + + // Adjust the budget by the initial capacity of history, which should be 0 but maybe not for some implementations. + checksumHistoryBudget -= checksumHistory.capacity(); + } + + + virtual ~AsyncFileWriteChecker() { + checksumHistoryBudget += checksumHistory.capacity(); + } + +private: + Reference m_f; + + std::vector checksumHistory; + // This is the most page checksum history blocks we will use across all files. + static int checksumHistoryBudget; + static int checksumHistoryPageSize; + + // Update or check checksum(s) in history for any full pages covered by this operation + void updateChecksumHistory(bool write, int64_t offset, int len, uint8_t *buf) { + // Check or set each full block in the the range + int page = offset / checksumHistoryPageSize; // First page number + if(offset != page * checksumHistoryPageSize) + ++page; // Advance page if first page touch isn't whole + int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1 + uint8_t *start = buf + (page * checksumHistoryPageSize - offset); // Beginning of the first page within buf + + // Make sure history is large enough or limit pageEnd + if(checksumHistory.size() < pageEnd) { + if(checksumHistoryBudget > 0) { + // Resize history and update budget based on capacity change + auto initialCapacity = checksumHistory.capacity(); + checksumHistory.resize(checksumHistory.size() + std::min(checksumHistoryBudget, pageEnd - checksumHistory.size())); + checksumHistoryBudget -= (checksumHistory.capacity() - initialCapacity); + } + + // Limit pageEnd to end of history, which works whether or not all of the desired + // history slots were allocatd. + pageEnd = checksumHistory.size(); + } + + while(page < pageEnd) { + //printf("%d %d %u %u\n", write, page, checksum, historySum); + uint32_t checksum = hashlittle(start, checksumHistoryPageSize, 0xab12fd93); + uint32_t &historySum = checksumHistory[page]; + + // For writes, just update the stored sum + if(write) { + historySum = checksum; + } + else { + if(historySum != 0 && historySum != checksum) { + // For reads, verify the stored sum if it is not 0. If it fails, clear it. + TraceEvent (SevError, "AsyncFileKAIODetectedLostWrite") + .detail("Filename", m_f->getFilename()) + .detail("PageNumber", page) + .detail("ChecksumOfPage", checksum) + .detail("ChecksumHistory", historySum) + .error(checksum_failed()); + historySum = 0; + } + } + + start += checksumHistoryPageSize; + ++page; + } + } +}; diff --git a/fdbrpc/Net2FileSystem.cpp b/fdbrpc/Net2FileSystem.cpp index 5c661b79ee..7e53cabeac 100644 --- a/fdbrpc/Net2FileSystem.cpp +++ b/fdbrpc/Net2FileSystem.cpp @@ -37,6 +37,7 @@ #include "AsyncFileKAIO.actor.h" #include "flow/AsioReactor.h" #include "flow/Platform.h" +#include "AsyncFileWriteChecker.h" // Opens a file for asynchronous I/O Future< Reference > Net2FileSystem::open( std::string filename, int64_t flags, int64_t mode ) @@ -54,12 +55,17 @@ Future< Reference > Net2FileSystem::open( std::string filename if ( (flags & IAsyncFile::OPEN_EXCLUSIVE) ) ASSERT( flags & IAsyncFile::OPEN_CREATE ); if (!(flags & IAsyncFile::OPEN_UNCACHED)) return AsyncFileCached::open(filename, flags, mode); + + Future> f; #ifdef __linux__ if ( (flags & IAsyncFile::OPEN_UNBUFFERED) && !(flags & IAsyncFile::OPEN_NO_AIO) ) - return AsyncFileKAIO::open(filename, flags, mode, NULL); + f = AsyncFileKAIO::open(filename, flags, mode, NULL); + else #endif - - return Net2AsyncFile::open(filename, flags, mode, static_cast ((void*) g_network->global(INetwork::enASIOService))); + f = Net2AsyncFile::open(filename, flags, mode, static_cast ((void*) g_network->global(INetwork::enASIOService))); + if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0) + f = map(f, [=](Reference r) { return Reference(new AsyncFileWriteChecker(r)); }); + return f; } // Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure. diff --git a/fdbrpc/fdbrpc.vcxproj b/fdbrpc/fdbrpc.vcxproj index 03aed07ef7..608437900c 100644 --- a/fdbrpc/fdbrpc.vcxproj +++ b/fdbrpc/fdbrpc.vcxproj @@ -30,6 +30,7 @@ + diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 99eda434ad..07d6c84991 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -31,6 +31,7 @@ #include "fdbclient/FDBTypes.h" #include "fdbrpc/Replication.h" #include "fdbrpc/ReplicationUtils.h" +#include "AsyncFileWriteChecker.h" using std::min; @@ -1505,14 +1506,20 @@ Future< Reference > Sim2FileSystem::open( std::string filename actualFilename = filename + ".part"; auto partFile = machineCache.find(actualFilename); if(partFile != machineCache.end()) { - return AsyncFileDetachable::open(partFile->second); + Future> f = AsyncFileDetachable::open(partFile->second); + if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0) + f = map(f, [=](Reference r) { return Reference(new AsyncFileWriteChecker(r)); }); + return f; } } //Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile. This way, they can both keep up with the time to start the next operation Reference diskParameters(new DiskParameters(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH)); machineCache[actualFilename] = AsyncFileNonDurable::open(filename, actualFilename, SimpleFile::open(filename, flags, mode, diskParameters, false), diskParameters); } - return AsyncFileDetachable::open( machineCache[actualFilename] ); + Future> f = AsyncFileDetachable::open( machineCache[actualFilename] ); + if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0) + f = map(f, [=](Reference r) { return Reference(new AsyncFileWriteChecker(r)); }); + return f; } else return AsyncFileCached::open(filename, flags, mode); diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp old mode 100644 new mode 100755 index 9972977caa..73683d0535 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -71,7 +71,8 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) { //AsyncFileKAIO init( MAX_OUTSTANDING, 64 ); init( MIN_SUBMIT, 10 ); - init( KAIO_PAGE_WRITE_CHECKSUM_HISTORY, 0 ); + + init( PAGE_WRITE_CHECKSUM_HISTORY, 0 ); if( randomize && BUGGIFY ) PAGE_WRITE_CHECKSUM_HISTORY = 10000000; //AsyncFileNonDurable init( MAX_PRIOR_MODIFICATION_DELAY, 1.0 ); if( randomize && BUGGIFY ) MAX_PRIOR_MODIFICATION_DELAY = 10.0; diff --git a/flow/Knobs.h b/flow/Knobs.h index 1c8f239c01..e1d80e53aa 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -89,7 +89,8 @@ public: //AsyncFileKAIO int MAX_OUTSTANDING; int MIN_SUBMIT; - int KAIO_PAGE_WRITE_CHECKSUM_HISTORY; + + int PAGE_WRITE_CHECKSUM_HISTORY; //AsyncFileNonDurable double MAX_PRIOR_MODIFICATION_DELAY;