Merge branch 'release-5.0'
This commit is contained in:
commit
6d9e302487
|
@ -200,9 +200,6 @@ public:
|
|||
//result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
|
||||
#endif
|
||||
|
||||
// Update checksum history if it is in use
|
||||
if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
result = map(result, [=](int r) { updateChecksumHistory(false, offset, length, (uint8_t *)data); return r; });
|
||||
return result;
|
||||
}
|
||||
virtual Future<Void> write( void const* data, int length, int64_t offset ) {
|
||||
|
@ -228,10 +225,6 @@ public:
|
|||
//result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
|
||||
#endif
|
||||
|
||||
// Update checksum history if it is in use
|
||||
if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
result = map(result, [=](int r) { updateChecksumHistory(true, offset, length, (uint8_t *)data); return r; });
|
||||
|
||||
return success(result);
|
||||
}
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
|
@ -276,12 +269,6 @@ public:
|
|||
|
||||
lastFileSize = nextFileSize = size;
|
||||
|
||||
// Truncate the page checksum history if it is in use
|
||||
if( FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0 && ((size / checksumHistoryPageSize) < checksumHistory.size()) ) {
|
||||
int oldCapacity = checksumHistory.capacity();
|
||||
checksumHistory.resize(size / checksumHistoryPageSize);
|
||||
checksumHistoryBudget -= (checksumHistory.capacity() - oldCapacity);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -339,7 +326,6 @@ public:
|
|||
if(logFile != nullptr)
|
||||
fclose(logFile);
|
||||
#endif
|
||||
checksumHistoryBudget += checksumHistory.capacity();
|
||||
}
|
||||
|
||||
static void launch() {
|
||||
|
@ -438,58 +424,6 @@ private:
|
|||
Int64MetricHandle countLogicalWrites;
|
||||
Int64MetricHandle countLogicalReads;
|
||||
|
||||
std::vector<uint32_t> checksumHistory;
|
||||
// This is the most page checksum history blocks we will use across all files.
|
||||
static int checksumHistoryBudget;
|
||||
static int checksumHistoryPageSize;
|
||||
|
||||
// Update or check checksum(s) in history for any full pages covered by this operation
|
||||
void updateChecksumHistory(bool write, int64_t offset, int len, uint8_t *buf) {
|
||||
// Check or set each full block in the the range
|
||||
int page = offset / checksumHistoryPageSize; // First page number
|
||||
if(offset != page * checksumHistoryPageSize)
|
||||
++page; // Advance page if first page touch isn't whole
|
||||
int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1
|
||||
uint8_t *start = buf + (page * checksumHistoryPageSize - offset); // Beginning of the first page within buf
|
||||
|
||||
// Make sure history is large enough or limit pageEnd
|
||||
if(checksumHistory.size() < pageEnd) {
|
||||
if(checksumHistoryBudget > 0) {
|
||||
// Resize history and update budget based on capacity change
|
||||
auto initialCapacity = checksumHistory.capacity();
|
||||
checksumHistory.resize(checksumHistory.size() + std::min<int>(checksumHistoryBudget, pageEnd - checksumHistory.size()));
|
||||
checksumHistoryBudget -= (checksumHistory.capacity() - initialCapacity);
|
||||
}
|
||||
|
||||
// Limit pageEnd to end of history, which works whether or not all of the desired
|
||||
// history slots were allocatd.
|
||||
pageEnd = checksumHistory.size();
|
||||
}
|
||||
|
||||
while(page < pageEnd) {
|
||||
uint32_t checksum = hashlittle(start, checksumHistoryPageSize, 0xab12fd93);
|
||||
uint32_t &historySum = checksumHistory[page];
|
||||
|
||||
// For writes, just update the stored sum
|
||||
if(write) {
|
||||
historySum = checksum;
|
||||
}
|
||||
else if(historySum != 0 && historySum != checksum) {
|
||||
// For reads, verify the stored sum if it is not 0. If it fails, clear it.
|
||||
TraceEvent (SevError, "AsyncFileKAIODetectedLostWrite")
|
||||
.detail("Filename", filename)
|
||||
.detail("PageNumber", page)
|
||||
.detail("ChecksumOfPage", checksum)
|
||||
.detail("ChecksumHistory", historySum)
|
||||
.error(checksum_failed());
|
||||
historySum = 0;
|
||||
}
|
||||
|
||||
start += checksumHistoryPageSize;
|
||||
++page;
|
||||
}
|
||||
}
|
||||
|
||||
struct IOBlock : linux_iocb, FastAllocated<IOBlock> {
|
||||
Promise<int> result;
|
||||
Reference<AsyncFileKAIO> owner;
|
||||
|
@ -617,11 +551,6 @@ private:
|
|||
static Context ctx;
|
||||
|
||||
explicit AsyncFileKAIO(int fd, int flags, std::string const& filename) : fd(fd), flags(flags), filename(filename), failed(false) {
|
||||
// Initialize the static history budget the first time (and only the first time) a file is opened.
|
||||
static int _ = checksumHistoryBudget = FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY;
|
||||
|
||||
// Adjust the budget by the initial capacity of history, which should be 0 but maybe not for some implementations.
|
||||
checksumHistoryBudget -= checksumHistory.capacity();
|
||||
|
||||
if( !g_network->isSimulated() ) {
|
||||
countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename);
|
||||
|
@ -801,10 +730,6 @@ void AsyncFileKAIO::KAIOLogEvent(FILE *logFile, uint32_t id, OpLogEntry::EOperat
|
|||
}
|
||||
#endif
|
||||
|
||||
// TODO: Move this to the .cpp if there ever is one. Only one source file includes this header so defining this here is safe.
|
||||
int AsyncFileKAIO::checksumHistoryBudget;
|
||||
int AsyncFileKAIO::checksumHistoryPageSize = 4096;
|
||||
|
||||
ACTOR Future<Void> runTestOps(Reference<IAsyncFile> f, int numIterations, int fileSize, bool expectedToSucceed) {
|
||||
state void *buf = FastAllocator<4096>::allocate(); // we leak this if there is an error, but that shouldn't be a big deal
|
||||
state int iteration = 0;
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* AsyncFileWriteChecker.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "AsyncFileWriteChecker.h"
|
||||
|
||||
int AsyncFileWriteChecker::checksumHistoryBudget;
|
||||
int AsyncFileWriteChecker::checksumHistoryPageSize = 4096;
|
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
* AsyncFileWriteChecker.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "IAsyncFile.h"
|
||||
|
||||
class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFileWriteChecker> {
|
||||
public:
|
||||
void addref() { ReferenceCounted<AsyncFileWriteChecker>::addref(); }
|
||||
void delref() { ReferenceCounted<AsyncFileWriteChecker>::delref(); }
|
||||
|
||||
// For read() and write(), the data buffer must remain valid until the future is ready
|
||||
Future<int> read( void* data, int length, int64_t offset ) {
|
||||
return map(m_f->read(data, length, offset), [=](int r) { updateChecksumHistory(false, offset, length, (uint8_t *)data); return r; });
|
||||
}
|
||||
Future<Void> readZeroCopy( void** data, int* length, int64_t offset ) {
|
||||
return map(m_f->readZeroCopy(data, length, offset), [=](Void r) { updateChecksumHistory(false, offset, *length, (uint8_t *)data); return r; });
|
||||
}
|
||||
|
||||
Future<Void> write( void const* data, int length, int64_t offset ) {
|
||||
updateChecksumHistory(true, offset, length, (uint8_t *)data);
|
||||
return m_f->write(data, length, offset);
|
||||
}
|
||||
|
||||
Future<Void> truncate( int64_t size ) {
|
||||
return map(m_f->truncate(size), [=](Void r) {
|
||||
// Truncate the page checksum history if it is in use
|
||||
if( (size / checksumHistoryPageSize) < checksumHistory.size() ) {
|
||||
int oldCapacity = checksumHistory.capacity();
|
||||
checksumHistory.resize(size / checksumHistoryPageSize);
|
||||
checksumHistoryBudget -= (checksumHistory.capacity() - oldCapacity);
|
||||
}
|
||||
return r;
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> sync() { return m_f->sync(); }
|
||||
Future<Void> flush() { return m_f->flush(); }
|
||||
Future<int64_t> size() { return m_f->size(); }
|
||||
std::string getFilename() { return m_f->getFilename(); }
|
||||
void releaseZeroCopy( void* data, int length, int64_t offset ) { return m_f->releaseZeroCopy(data, length, offset); }
|
||||
int64_t debugFD() { return m_f->debugFD(); }
|
||||
|
||||
AsyncFileWriteChecker(Reference<IAsyncFile> f) : m_f(f) {
|
||||
// Initialize the static history budget the first time (and only the first time) a file is opened.
|
||||
static int _ = checksumHistoryBudget = FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY;
|
||||
|
||||
// Adjust the budget by the initial capacity of history, which should be 0 but maybe not for some implementations.
|
||||
checksumHistoryBudget -= checksumHistory.capacity();
|
||||
}
|
||||
|
||||
|
||||
virtual ~AsyncFileWriteChecker() {
|
||||
checksumHistoryBudget += checksumHistory.capacity();
|
||||
}
|
||||
|
||||
private:
|
||||
Reference<IAsyncFile> m_f;
|
||||
|
||||
std::vector<uint32_t> checksumHistory;
|
||||
// This is the most page checksum history blocks we will use across all files.
|
||||
static int checksumHistoryBudget;
|
||||
static int checksumHistoryPageSize;
|
||||
|
||||
// Update or check checksum(s) in history for any full pages covered by this operation
|
||||
void updateChecksumHistory(bool write, int64_t offset, int len, uint8_t *buf) {
|
||||
// Check or set each full block in the the range
|
||||
int page = offset / checksumHistoryPageSize; // First page number
|
||||
if(offset != page * checksumHistoryPageSize)
|
||||
++page; // Advance page if first page touch isn't whole
|
||||
int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1
|
||||
uint8_t *start = buf + (page * checksumHistoryPageSize - offset); // Beginning of the first page within buf
|
||||
|
||||
// Make sure history is large enough or limit pageEnd
|
||||
if(checksumHistory.size() < pageEnd) {
|
||||
if(checksumHistoryBudget > 0) {
|
||||
// Resize history and update budget based on capacity change
|
||||
auto initialCapacity = checksumHistory.capacity();
|
||||
checksumHistory.resize(checksumHistory.size() + std::min<int>(checksumHistoryBudget, pageEnd - checksumHistory.size()));
|
||||
checksumHistoryBudget -= (checksumHistory.capacity() - initialCapacity);
|
||||
}
|
||||
|
||||
// Limit pageEnd to end of history, which works whether or not all of the desired
|
||||
// history slots were allocatd.
|
||||
pageEnd = checksumHistory.size();
|
||||
}
|
||||
|
||||
while(page < pageEnd) {
|
||||
//printf("%d %d %u %u\n", write, page, checksum, historySum);
|
||||
uint32_t checksum = hashlittle(start, checksumHistoryPageSize, 0xab12fd93);
|
||||
uint32_t &historySum = checksumHistory[page];
|
||||
|
||||
// For writes, just update the stored sum
|
||||
if(write) {
|
||||
historySum = checksum;
|
||||
}
|
||||
else {
|
||||
if(historySum != 0 && historySum != checksum) {
|
||||
// For reads, verify the stored sum if it is not 0. If it fails, clear it.
|
||||
TraceEvent (SevError, "AsyncFileKAIODetectedLostWrite")
|
||||
.detail("Filename", m_f->getFilename())
|
||||
.detail("PageNumber", page)
|
||||
.detail("ChecksumOfPage", checksum)
|
||||
.detail("ChecksumHistory", historySum)
|
||||
.error(checksum_failed());
|
||||
historySum = 0;
|
||||
}
|
||||
}
|
||||
|
||||
start += checksumHistoryPageSize;
|
||||
++page;
|
||||
}
|
||||
}
|
||||
};
|
|
@ -37,6 +37,7 @@
|
|||
#include "AsyncFileKAIO.actor.h"
|
||||
#include "flow/AsioReactor.h"
|
||||
#include "flow/Platform.h"
|
||||
#include "AsyncFileWriteChecker.h"
|
||||
|
||||
// Opens a file for asynchronous I/O
|
||||
Future< Reference<class IAsyncFile> > Net2FileSystem::open( std::string filename, int64_t flags, int64_t mode )
|
||||
|
@ -54,12 +55,17 @@ Future< Reference<class IAsyncFile> > Net2FileSystem::open( std::string filename
|
|||
if ( (flags & IAsyncFile::OPEN_EXCLUSIVE) ) ASSERT( flags & IAsyncFile::OPEN_CREATE );
|
||||
if (!(flags & IAsyncFile::OPEN_UNCACHED))
|
||||
return AsyncFileCached::open(filename, flags, mode);
|
||||
|
||||
Future<Reference<IAsyncFile>> f;
|
||||
#ifdef __linux__
|
||||
if ( (flags & IAsyncFile::OPEN_UNBUFFERED) && !(flags & IAsyncFile::OPEN_NO_AIO) )
|
||||
return AsyncFileKAIO::open(filename, flags, mode, NULL);
|
||||
f = AsyncFileKAIO::open(filename, flags, mode, NULL);
|
||||
else
|
||||
#endif
|
||||
|
||||
return Net2AsyncFile::open(filename, flags, mode, static_cast<boost::asio::io_service*> ((void*) g_network->global(INetwork::enASIOService)));
|
||||
f = Net2AsyncFile::open(filename, flags, mode, static_cast<boost::asio::io_service*> ((void*) g_network->global(INetwork::enASIOService)));
|
||||
if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
|
||||
return f;
|
||||
}
|
||||
|
||||
// Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure.
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
<ClCompile Include="libb64\cdecode.c" />
|
||||
<ClCompile Include="md5\md5.c" />
|
||||
<ClCompile Include="Platform.cpp" />
|
||||
<ClCompile Include="AsyncFileWriteChecker.cpp" />
|
||||
<ClCompile Include="libcoroutine\Common.c" />
|
||||
<ClCompile Include="libcoroutine\Coro.c" />
|
||||
<ClCompile Include="Locality.cpp" />
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/Replication.h"
|
||||
#include "fdbrpc/ReplicationUtils.h"
|
||||
#include "AsyncFileWriteChecker.h"
|
||||
|
||||
|
||||
using std::min;
|
||||
|
@ -1505,14 +1506,20 @@ Future< Reference<class IAsyncFile> > Sim2FileSystem::open( std::string filename
|
|||
actualFilename = filename + ".part";
|
||||
auto partFile = machineCache.find(actualFilename);
|
||||
if(partFile != machineCache.end()) {
|
||||
return AsyncFileDetachable::open(partFile->second);
|
||||
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open(partFile->second);
|
||||
if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
|
||||
return f;
|
||||
}
|
||||
}
|
||||
//Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile. This way, they can both keep up with the time to start the next operation
|
||||
Reference<DiskParameters> diskParameters(new DiskParameters(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH));
|
||||
machineCache[actualFilename] = AsyncFileNonDurable::open(filename, actualFilename, SimpleFile::open(filename, flags, mode, diskParameters, false), diskParameters);
|
||||
}
|
||||
return AsyncFileDetachable::open( machineCache[actualFilename] );
|
||||
Future<Reference<IAsyncFile>> f = AsyncFileDetachable::open( machineCache[actualFilename] );
|
||||
if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
|
||||
return f;
|
||||
}
|
||||
else
|
||||
return AsyncFileCached::open(filename, flags, mode);
|
||||
|
|
|
@ -71,7 +71,8 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
//AsyncFileKAIO
|
||||
init( MAX_OUTSTANDING, 64 );
|
||||
init( MIN_SUBMIT, 10 );
|
||||
init( KAIO_PAGE_WRITE_CHECKSUM_HISTORY, 0 );
|
||||
|
||||
init( PAGE_WRITE_CHECKSUM_HISTORY, 0 ); if( randomize && BUGGIFY ) PAGE_WRITE_CHECKSUM_HISTORY = 10000000;
|
||||
|
||||
//AsyncFileNonDurable
|
||||
init( MAX_PRIOR_MODIFICATION_DELAY, 1.0 ); if( randomize && BUGGIFY ) MAX_PRIOR_MODIFICATION_DELAY = 10.0;
|
||||
|
|
|
@ -89,7 +89,8 @@ public:
|
|||
//AsyncFileKAIO
|
||||
int MAX_OUTSTANDING;
|
||||
int MIN_SUBMIT;
|
||||
int KAIO_PAGE_WRITE_CHECKSUM_HISTORY;
|
||||
|
||||
int PAGE_WRITE_CHECKSUM_HISTORY;
|
||||
|
||||
//AsyncFileNonDurable
|
||||
double MAX_PRIOR_MODIFICATION_DELAY;
|
||||
|
|
Loading…
Reference in New Issue