Merge branch 'release-5.0'
This commit is contained in:
commit
afdc125db9
|
@ -705,7 +705,7 @@ ACTOR Future<CoordinatorsResult::Type> changeQuorum( Database cx, Reference<IQuo
|
|||
|
||||
if(g_network->isSimulated()) {
|
||||
for(int i = 0; i < (desiredCoordinators.size()/2)+1; i++) {
|
||||
g_simulator.protectedAddresses.insert( desiredCoordinators[i] );
|
||||
g_simulator.protectedAddresses.insert( NetworkAddress(desiredCoordinators[i].ip,desiredCoordinators[i].port,true,false) );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -200,6 +200,9 @@ public:
|
|||
//result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
|
||||
#endif
|
||||
|
||||
// Update checksum history if it is in use
|
||||
if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
result = map(result, [=](int r) { updateChecksumHistory(false, offset, length, (uint8_t *)data); return r; });
|
||||
return result;
|
||||
}
|
||||
virtual Future<Void> write( void const* data, int length, int64_t offset ) {
|
||||
|
@ -225,6 +228,10 @@ public:
|
|||
//result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
|
||||
#endif
|
||||
|
||||
// Update checksum history if it is in use
|
||||
if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0)
|
||||
result = map(result, [=](int r) { updateChecksumHistory(true, offset, length, (uint8_t *)data); return r; });
|
||||
|
||||
return success(result);
|
||||
}
|
||||
virtual Future<Void> truncate( int64_t size ) {
|
||||
|
@ -268,6 +275,13 @@ public:
|
|||
}
|
||||
|
||||
lastFileSize = nextFileSize = size;
|
||||
|
||||
// Truncate the page checksum history if it is in use
|
||||
if( FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0 && ((size / checksumHistoryPageSize) < checksumHistory.size()) ) {
|
||||
int oldCapacity = checksumHistory.capacity();
|
||||
checksumHistory.resize(size / checksumHistoryPageSize);
|
||||
checksumHistoryBudget -= (checksumHistory.capacity() - oldCapacity);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -325,6 +339,7 @@ public:
|
|||
if(logFile != nullptr)
|
||||
fclose(logFile);
|
||||
#endif
|
||||
checksumHistoryBudget += checksumHistory.capacity();
|
||||
}
|
||||
|
||||
static void launch() {
|
||||
|
@ -423,6 +438,58 @@ private:
|
|||
Int64MetricHandle countLogicalWrites;
|
||||
Int64MetricHandle countLogicalReads;
|
||||
|
||||
std::vector<uint32_t> checksumHistory;
|
||||
// This is the most page checksum history blocks we will use across all files.
|
||||
static int checksumHistoryBudget;
|
||||
static int checksumHistoryPageSize;
|
||||
|
||||
// Update or check checksum(s) in history for any full pages covered by this operation
|
||||
void updateChecksumHistory(bool write, int64_t offset, int len, uint8_t *buf) {
|
||||
// Check or set each full block in the the range
|
||||
int page = offset / checksumHistoryPageSize; // First page number
|
||||
if(offset != page * checksumHistoryPageSize)
|
||||
++page; // Advance page if first page touch isn't whole
|
||||
int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1
|
||||
uint8_t *start = buf + (page * checksumHistoryPageSize - offset); // Beginning of the first page within buf
|
||||
|
||||
// Make sure history is large enough or limit pageEnd
|
||||
if(checksumHistory.size() < pageEnd) {
|
||||
if(checksumHistoryBudget > 0) {
|
||||
// Resize history and update budget based on capacity change
|
||||
auto initialCapacity = checksumHistory.capacity();
|
||||
checksumHistory.resize(checksumHistory.size() + std::min<int>(checksumHistoryBudget, pageEnd - checksumHistory.size()));
|
||||
checksumHistoryBudget -= (checksumHistory.capacity() - initialCapacity);
|
||||
}
|
||||
|
||||
// Limit pageEnd to end of history, which works whether or not all of the desired
|
||||
// history slots were allocatd.
|
||||
pageEnd = checksumHistory.size();
|
||||
}
|
||||
|
||||
while(page < pageEnd) {
|
||||
uint32_t checksum = hashlittle(start, checksumHistoryPageSize, 0xab12fd93);
|
||||
uint32_t &historySum = checksumHistory[page];
|
||||
|
||||
// For writes, just update the stored sum
|
||||
if(write) {
|
||||
historySum = checksum;
|
||||
}
|
||||
else if(historySum != 0 && historySum != checksum) {
|
||||
// For reads, verify the stored sum if it is not 0. If it fails, clear it.
|
||||
TraceEvent (SevError, "AsyncFileKAIODetectedLostWrite")
|
||||
.detail("Filename", filename)
|
||||
.detail("PageNumber", page)
|
||||
.detail("ChecksumOfPage", checksum)
|
||||
.detail("ChecksumHistory", historySum)
|
||||
.error(checksum_failed());
|
||||
historySum = 0;
|
||||
}
|
||||
|
||||
start += checksumHistoryPageSize;
|
||||
++page;
|
||||
}
|
||||
}
|
||||
|
||||
struct IOBlock : linux_iocb, FastAllocated<IOBlock> {
|
||||
Promise<int> result;
|
||||
Reference<AsyncFileKAIO> owner;
|
||||
|
@ -550,6 +617,12 @@ private:
|
|||
static Context ctx;
|
||||
|
||||
explicit AsyncFileKAIO(int fd, int flags, std::string const& filename) : fd(fd), flags(flags), filename(filename), failed(false) {
|
||||
// Initialize the static history budget the first time (and only the first time) a file is opened.
|
||||
static int _ = checksumHistoryBudget = FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY;
|
||||
|
||||
// Adjust the budget by the initial capacity of history, which should be 0 but maybe not for some implementations.
|
||||
checksumHistoryBudget -= checksumHistory.capacity();
|
||||
|
||||
if( !g_network->isSimulated() ) {
|
||||
countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename);
|
||||
countFileLogicalReads.init( LiteralStringRef("AsyncFile.CountFileLogicalReads"), filename);
|
||||
|
@ -728,6 +801,10 @@ void AsyncFileKAIO::KAIOLogEvent(FILE *logFile, uint32_t id, OpLogEntry::EOperat
|
|||
}
|
||||
#endif
|
||||
|
||||
// TODO: Move this to the .cpp if there ever is one. Only one source file includes this header so defining this here is safe.
|
||||
int AsyncFileKAIO::checksumHistoryBudget;
|
||||
int AsyncFileKAIO::checksumHistoryPageSize = 4096;
|
||||
|
||||
ACTOR Future<Void> runTestOps(Reference<IAsyncFile> f, int numIterations, int fileSize, bool expectedToSucceed) {
|
||||
state void *buf = FastAllocator<4096>::allocate(); // we leak this if there is an error, but that shouldn't be a big deal
|
||||
state int iteration = 0;
|
||||
|
|
|
@ -65,9 +65,7 @@ struct SpringCleaningStats {
|
|||
};
|
||||
|
||||
struct PageChecksumCodec {
|
||||
PageChecksumCodec(std::string const &filename, int verifyChecksumHistorySize) : pageSize(0), reserveSize(0), filename(filename), silent(false) {
|
||||
checksumHistory.resize(verifyChecksumHistorySize);
|
||||
}
|
||||
PageChecksumCodec(std::string const &filename) : pageSize(0), reserveSize(0), filename(filename), silent(false) {}
|
||||
|
||||
int pageSize;
|
||||
int reserveSize;
|
||||
|
@ -80,7 +78,6 @@ struct PageChecksumCodec {
|
|||
uint32_t part2;
|
||||
std::string toString() { return format("0x%08x%08x", part1, part2); }
|
||||
};
|
||||
std::vector<std::pair<Pgno, SumType>> checksumHistory;
|
||||
|
||||
// Calculates and then either stores or verifies a checksum.
|
||||
// The checksum is read/stored at the end of the page buffer.
|
||||
|
@ -118,33 +115,6 @@ struct PageChecksumCodec {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Update or check sum in history if the history buffer isn't empty and if we're not in a simulated injected fault situation
|
||||
if(!checksumHistory.empty() &&
|
||||
(!g_network->isSimulated() || (!g_simulator.getCurrentProcess()->fault_injection_p1 && !g_simulator.getCurrentProcess()->rebooting))
|
||||
) {
|
||||
auto &bucket = checksumHistory[pageNumber % checksumHistory.size()];
|
||||
if(write) {
|
||||
// For writes, put this pagenumber and sum into the bucket
|
||||
bucket.first = pageNumber;
|
||||
bucket.second = *sumOut;
|
||||
}
|
||||
else {
|
||||
// For reads, see if the bucket has the right page number, if so then verify sum
|
||||
if(bucket.first == pageNumber && bucket.second != *pSumInPage) {
|
||||
TraceEvent (SevError, "SQLitePageChecksumDetectedLostWrite")
|
||||
.detail("CodecPageSize", pageSize)
|
||||
.detail("CodecReserveSize", reserveSize)
|
||||
.detail("Filename", filename)
|
||||
.detail("PageNumber", pageNumber)
|
||||
.detail("PageSize", pageLen)
|
||||
.detail("ChecksumInPage", pSumInPage->toString())
|
||||
.detail("ChecksumHistory", bucket.second.toString())
|
||||
.error(checksum_failed());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -246,7 +216,7 @@ struct SQLiteDB : NonCopyable {
|
|||
ASSERT(false);
|
||||
}
|
||||
// Always start with a new pager codec with default options.
|
||||
pPagerCodec = new PageChecksumCodec(filename, SERVER_KNOBS->SQLITE_PAGER_CHECKSUM_HISTORY);
|
||||
pPagerCodec = new PageChecksumCodec(filename);
|
||||
sqlite3BtreePagerSetCodec(btree, PageChecksumCodec::codec, PageChecksumCodec::sizeChange, PageChecksumCodec::free, pPagerCodec);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( MAX_VERSIONS_IN_FLIGHT, 100000000 );
|
||||
init( VERSIONS_PER_SECOND, 1000000 );
|
||||
init( MAX_READ_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_READ_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 0.1 * VERSIONS_PER_SECOND); else if( randomize && BUGGIFY ) MAX_READ_TRANSACTION_LIFE_VERSIONS = 10 * VERSIONS_PER_SECOND;
|
||||
init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 0.5 * VERSIONS_PER_SECOND);
|
||||
init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 1 * VERSIONS_PER_SECOND);
|
||||
init( MAX_COMMIT_BATCH_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) MAX_COMMIT_BATCH_INTERVAL = 2.0; // Each master proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
|
||||
|
||||
// Data distribution queue
|
||||
|
@ -163,7 +163,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( DISK_METRIC_LOGGING_INTERVAL, 5.0 );
|
||||
init( SOFT_HEAP_LIMIT, 300e6 );
|
||||
|
||||
init( SQLITE_PAGER_CHECKSUM_HISTORY, 0 );
|
||||
init( SQLITE_PAGE_SCAN_ERROR_LIMIT, 10000 );
|
||||
init( SQLITE_BTREE_PAGE_USABLE, 4096 - 8); // pageSize - reserveSize for page checksum
|
||||
|
||||
|
|
|
@ -137,7 +137,6 @@ public:
|
|||
int SQLITE_FRAGMENT_PRIMARY_PAGE_USABLE;
|
||||
int SQLITE_FRAGMENT_OVERFLOW_PAGE_USABLE;
|
||||
double SQLITE_FRAGMENT_MIN_SAVINGS;
|
||||
int SQLITE_PAGER_CHECKSUM_HISTORY;
|
||||
|
||||
// KeyValueStoreSqlite spring cleaning
|
||||
double CLEANING_INTERVAL;
|
||||
|
|
|
@ -745,7 +745,7 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
|
|||
g_random->randomShuffle(coordinatorAddresses);
|
||||
for(int i = 0; i < (coordinatorAddresses.size()/2)+1; i++) {
|
||||
TraceEvent("ProtectMachine").detail("Address", coordinatorAddresses[i]).detail("Coordinators", coordinatorAddresses.size()).backtrace();
|
||||
g_simulator.protectedAddresses.insert(coordinatorAddresses[i]);
|
||||
g_simulator.protectedAddresses.insert(NetworkAddress(coordinatorAddresses[i].ip,coordinatorAddresses[i].port,true,false));
|
||||
}
|
||||
g_random->randomShuffle(coordinatorAddresses);
|
||||
|
||||
|
|
|
@ -1650,8 +1650,6 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
logData->removed = rejoinMasters(self, recruited, req.epoch);
|
||||
self->queueOrder.push_back(recruited.id());
|
||||
|
||||
Void _ = wait( delay(0.0) ); // if multiple recruitment requests were already in the promise stream make sure they are all started before any are removed
|
||||
|
||||
TraceEvent("TLogStart", logData->logId);
|
||||
|
||||
try {
|
||||
|
@ -1686,6 +1684,8 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
throw;
|
||||
}
|
||||
|
||||
Void _ = wait( delay(0.0) ); // if multiple recruitment requests were already in the promise stream make sure they are all started before any are removed
|
||||
|
||||
double inputSum = 0;
|
||||
double durableSum = 0;
|
||||
for(auto it : self->id_data) {
|
||||
|
@ -1709,7 +1709,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
if(self->id_data.size() || (self->oldLogServer.isValid() && !self->oldLogServer.isReady())) {
|
||||
return Void();
|
||||
} else {
|
||||
throw;
|
||||
throw worker_removed();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -815,6 +815,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
|
||||
endRole(interf.id(), "Worker", "WorkerError", ok, e);
|
||||
errorForwarders.clear(false);
|
||||
tlog = Void();
|
||||
|
||||
if (e.code() != error_code_actor_cancelled) { // We get cancelled e.g. when an entire simulation times out, but in that case we won't be restarted and don't need to wait for shutdown
|
||||
stopping.send(Void());
|
||||
|
|
|
@ -359,7 +359,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
}
|
||||
if (removedCount >= 1) {
|
||||
TraceEvent("ProtectMachine").detail("Address", addr).detail("Coordinators", coordinators.size()).backtrace();
|
||||
g_simulator.protectedAddresses.insert(addr);
|
||||
g_simulator.protectedAddresses.insert(NetworkAddress(addr.ip,addr.port,true,false));
|
||||
safeCoordinators.push_back(addr);
|
||||
safeCount++;
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
//AsyncFileKAIO
|
||||
init( MAX_OUTSTANDING, 64 );
|
||||
init( MIN_SUBMIT, 10 );
|
||||
init( KAIO_PAGE_WRITE_CHECKSUM_HISTORY, 0 );
|
||||
|
||||
//AsyncFileNonDurable
|
||||
init( MAX_PRIOR_MODIFICATION_DELAY, 1.0 ); if( randomize && BUGGIFY ) MAX_PRIOR_MODIFICATION_DELAY = 10.0;
|
||||
|
|
|
@ -89,6 +89,7 @@ public:
|
|||
//AsyncFileKAIO
|
||||
int MAX_OUTSTANDING;
|
||||
int MIN_SUBMIT;
|
||||
int KAIO_PAGE_WRITE_CHECKSUM_HISTORY;
|
||||
|
||||
//AsyncFileNonDurable
|
||||
double MAX_PRIOR_MODIFICATION_DELAY;
|
||||
|
|
Loading…
Reference in New Issue