diff --git a/bindings/bindingtester/run_binding_tester.sh b/bindings/bindingtester/run_binding_tester.sh old mode 100755 new mode 100644 diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index e69ab9859e..81a306c9d4 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -705,7 +705,7 @@ ACTOR Future changeQuorum( Database cx, ReferenceisSimulated()) { for(int i = 0; i < (desiredCoordinators.size()/2)+1; i++) { - g_simulator.protectedAddresses.insert( desiredCoordinators[i] ); + g_simulator.protectedAddresses.insert( NetworkAddress(desiredCoordinators[i].ip,desiredCoordinators[i].port,true,false) ); } } diff --git a/fdbrpc/AsyncFileKAIO.actor.h b/fdbrpc/AsyncFileKAIO.actor.h index 023af64697..cd6f4e1c6a 100644 --- a/fdbrpc/AsyncFileKAIO.actor.h +++ b/fdbrpc/AsyncFileKAIO.actor.h @@ -200,6 +200,9 @@ public: //result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; }); #endif + // Update checksum history if it is in use + if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0) + result = map(result, [=](int r) { updateChecksumHistory(false, offset, length, (uint8_t *)data); return r; }); return result; } virtual Future write( void const* data, int length, int64_t offset ) { @@ -225,6 +228,10 @@ public: //result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; }); #endif + // Update checksum history if it is in use + if(FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0) + result = map(result, [=](int r) { updateChecksumHistory(true, offset, length, (uint8_t *)data); return r; }); + return success(result); } virtual Future truncate( int64_t size ) { @@ -268,6 +275,13 @@ public: } lastFileSize = nextFileSize = size; + + // Truncate the page checksum history if it is in use + if( FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY > 0 && ((size / checksumHistoryPageSize) < checksumHistory.size()) ) { + int oldCapacity = checksumHistory.capacity(); + checksumHistory.resize(size / checksumHistoryPageSize); + checksumHistoryBudget -= (checksumHistory.capacity() - oldCapacity); + } return Void(); } @@ -325,6 +339,7 @@ public: if(logFile != nullptr) fclose(logFile); #endif + checksumHistoryBudget += checksumHistory.capacity(); } static void launch() { @@ -423,6 +438,58 @@ private: Int64MetricHandle countLogicalWrites; Int64MetricHandle countLogicalReads; + std::vector checksumHistory; + // This is the most page checksum history blocks we will use across all files. + static int checksumHistoryBudget; + static int checksumHistoryPageSize; + + // Update or check checksum(s) in history for any full pages covered by this operation + void updateChecksumHistory(bool write, int64_t offset, int len, uint8_t *buf) { + // Check or set each full block in the the range + int page = offset / checksumHistoryPageSize; // First page number + if(offset != page * checksumHistoryPageSize) + ++page; // Advance page if first page touch isn't whole + int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1 + uint8_t *start = buf + (page * checksumHistoryPageSize - offset); // Beginning of the first page within buf + + // Make sure history is large enough or limit pageEnd + if(checksumHistory.size() < pageEnd) { + if(checksumHistoryBudget > 0) { + // Resize history and update budget based on capacity change + auto initialCapacity = checksumHistory.capacity(); + checksumHistory.resize(checksumHistory.size() + std::min(checksumHistoryBudget, pageEnd - checksumHistory.size())); + checksumHistoryBudget -= (checksumHistory.capacity() - initialCapacity); + } + + // Limit pageEnd to end of history, which works whether or not all of the desired + // history slots were allocatd. + pageEnd = checksumHistory.size(); + } + + while(page < pageEnd) { + uint32_t checksum = hashlittle(start, checksumHistoryPageSize, 0xab12fd93); + uint32_t &historySum = checksumHistory[page]; + + // For writes, just update the stored sum + if(write) { + historySum = checksum; + } + else if(historySum != 0 && historySum != checksum) { + // For reads, verify the stored sum if it is not 0. If it fails, clear it. + TraceEvent (SevError, "AsyncFileKAIODetectedLostWrite") + .detail("Filename", filename) + .detail("PageNumber", page) + .detail("ChecksumOfPage", checksum) + .detail("ChecksumHistory", historySum) + .error(checksum_failed()); + historySum = 0; + } + + start += checksumHistoryPageSize; + ++page; + } + } + struct IOBlock : linux_iocb, FastAllocated { Promise result; Reference owner; @@ -550,6 +617,12 @@ private: static Context ctx; explicit AsyncFileKAIO(int fd, int flags, std::string const& filename) : fd(fd), flags(flags), filename(filename), failed(false) { + // Initialize the static history budget the first time (and only the first time) a file is opened. + static int _ = checksumHistoryBudget = FLOW_KNOBS->KAIO_PAGE_WRITE_CHECKSUM_HISTORY; + + // Adjust the budget by the initial capacity of history, which should be 0 but maybe not for some implementations. + checksumHistoryBudget -= checksumHistory.capacity(); + if( !g_network->isSimulated() ) { countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename); countFileLogicalReads.init( LiteralStringRef("AsyncFile.CountFileLogicalReads"), filename); @@ -728,6 +801,10 @@ void AsyncFileKAIO::KAIOLogEvent(FILE *logFile, uint32_t id, OpLogEntry::EOperat } #endif +// TODO: Move this to the .cpp if there ever is one. Only one source file includes this header so defining this here is safe. +int AsyncFileKAIO::checksumHistoryBudget; +int AsyncFileKAIO::checksumHistoryPageSize = 4096; + ACTOR Future runTestOps(Reference f, int numIterations, int fileSize, bool expectedToSucceed) { state void *buf = FastAllocator<4096>::allocate(); // we leak this if there is an error, but that shouldn't be a big deal state int iteration = 0; diff --git a/fdbserver/KeyValueStoreSQLite.actor.cpp b/fdbserver/KeyValueStoreSQLite.actor.cpp index 8b905bbc2f..869c3fd344 100644 --- a/fdbserver/KeyValueStoreSQLite.actor.cpp +++ b/fdbserver/KeyValueStoreSQLite.actor.cpp @@ -65,9 +65,7 @@ struct SpringCleaningStats { }; struct PageChecksumCodec { - PageChecksumCodec(std::string const &filename, int verifyChecksumHistorySize) : pageSize(0), reserveSize(0), filename(filename), silent(false) { - checksumHistory.resize(verifyChecksumHistorySize); - } + PageChecksumCodec(std::string const &filename) : pageSize(0), reserveSize(0), filename(filename), silent(false) {} int pageSize; int reserveSize; @@ -80,7 +78,6 @@ struct PageChecksumCodec { uint32_t part2; std::string toString() { return format("0x%08x%08x", part1, part2); } }; - std::vector> checksumHistory; // Calculates and then either stores or verifies a checksum. // The checksum is read/stored at the end of the page buffer. @@ -118,33 +115,6 @@ struct PageChecksumCodec { return false; } - // Update or check sum in history if the history buffer isn't empty and if we're not in a simulated injected fault situation - if(!checksumHistory.empty() && - (!g_network->isSimulated() || (!g_simulator.getCurrentProcess()->fault_injection_p1 && !g_simulator.getCurrentProcess()->rebooting)) - ) { - auto &bucket = checksumHistory[pageNumber % checksumHistory.size()]; - if(write) { - // For writes, put this pagenumber and sum into the bucket - bucket.first = pageNumber; - bucket.second = *sumOut; - } - else { - // For reads, see if the bucket has the right page number, if so then verify sum - if(bucket.first == pageNumber && bucket.second != *pSumInPage) { - TraceEvent (SevError, "SQLitePageChecksumDetectedLostWrite") - .detail("CodecPageSize", pageSize) - .detail("CodecReserveSize", reserveSize) - .detail("Filename", filename) - .detail("PageNumber", pageNumber) - .detail("PageSize", pageLen) - .detail("ChecksumInPage", pSumInPage->toString()) - .detail("ChecksumHistory", bucket.second.toString()) - .error(checksum_failed()); - return false; - } - } - } - return true; } @@ -246,7 +216,7 @@ struct SQLiteDB : NonCopyable { ASSERT(false); } // Always start with a new pager codec with default options. - pPagerCodec = new PageChecksumCodec(filename, SERVER_KNOBS->SQLITE_PAGER_CHECKSUM_HISTORY); + pPagerCodec = new PageChecksumCodec(filename); sqlite3BtreePagerSetCodec(btree, PageChecksumCodec::codec, PageChecksumCodec::sizeChange, PageChecksumCodec::free, pPagerCodec); } } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index f315410d89..71bf802593 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -57,7 +57,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( MAX_VERSIONS_IN_FLIGHT, 100000000 ); init( VERSIONS_PER_SECOND, 1000000 ); init( MAX_READ_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_READ_TRANSACTION_LIFE_VERSIONS=std::max(1, 0.1 * VERSIONS_PER_SECOND); else if( randomize && BUGGIFY ) MAX_READ_TRANSACTION_LIFE_VERSIONS = 10 * VERSIONS_PER_SECOND; - init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max(1, 0.5 * VERSIONS_PER_SECOND); + init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max(1, 1 * VERSIONS_PER_SECOND); init( MAX_COMMIT_BATCH_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) MAX_COMMIT_BATCH_INTERVAL = 2.0; // Each master proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly // Data distribution queue @@ -163,7 +163,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( DISK_METRIC_LOGGING_INTERVAL, 5.0 ); init( SOFT_HEAP_LIMIT, 300e6 ); - init( SQLITE_PAGER_CHECKSUM_HISTORY, 0 ); init( SQLITE_PAGE_SCAN_ERROR_LIMIT, 10000 ); init( SQLITE_BTREE_PAGE_USABLE, 4096 - 8); // pageSize - reserveSize for page checksum diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index c89b94e7b5..e27787a468 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -137,7 +137,6 @@ public: int SQLITE_FRAGMENT_PRIMARY_PAGE_USABLE; int SQLITE_FRAGMENT_OVERFLOW_PAGE_USABLE; double SQLITE_FRAGMENT_MIN_SAVINGS; - int SQLITE_PAGER_CHECKSUM_HISTORY; // KeyValueStoreSqlite spring cleaning double CLEANING_INTERVAL; diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 7a0849dd8d..9eacebc403 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -745,7 +745,7 @@ void setupSimulatedSystem( vector> *systemActors, std::string baseF g_random->randomShuffle(coordinatorAddresses); for(int i = 0; i < (coordinatorAddresses.size()/2)+1; i++) { TraceEvent("ProtectMachine").detail("Address", coordinatorAddresses[i]).detail("Coordinators", coordinatorAddresses.size()).backtrace(); - g_simulator.protectedAddresses.insert(coordinatorAddresses[i]); + g_simulator.protectedAddresses.insert(NetworkAddress(coordinatorAddresses[i].ip,coordinatorAddresses[i].port,true,false)); } g_random->randomShuffle(coordinatorAddresses); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 9b0dc08652..41a242d8bf 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1650,8 +1650,6 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit logData->removed = rejoinMasters(self, recruited, req.epoch); self->queueOrder.push_back(recruited.id()); - Void _ = wait( delay(0.0) ); // if multiple recruitment requests were already in the promise stream make sure they are all started before any are removed - TraceEvent("TLogStart", logData->logId); try { @@ -1686,6 +1684,8 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit throw; } + Void _ = wait( delay(0.0) ); // if multiple recruitment requests were already in the promise stream make sure they are all started before any are removed + double inputSum = 0; double durableSum = 0; for(auto it : self->id_data) { @@ -1709,7 +1709,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit if(self->id_data.size() || (self->oldLogServer.isValid() && !self->oldLogServer.isReady())) { return Void(); } else { - throw; + throw worker_removed(); } } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 3a253aa278..bc908dc76b 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -815,6 +815,7 @@ ACTOR Future workerServer( Reference connFile, Refe endRole(interf.id(), "Worker", "WorkerError", ok, e); errorForwarders.clear(false); + tlog = Void(); if (e.code() != error_code_actor_cancelled) { // We get cancelled e.g. when an entire simulation times out, but in that case we won't be restarted and don't need to wait for shutdown stopping.send(Void()); diff --git a/fdbserver/workloads/RemoveServersSafely.actor.cpp b/fdbserver/workloads/RemoveServersSafely.actor.cpp index 1ee6a3c527..938deb0839 100644 --- a/fdbserver/workloads/RemoveServersSafely.actor.cpp +++ b/fdbserver/workloads/RemoveServersSafely.actor.cpp @@ -359,7 +359,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { } if (removedCount >= 1) { TraceEvent("ProtectMachine").detail("Address", addr).detail("Coordinators", coordinators.size()).backtrace(); - g_simulator.protectedAddresses.insert(addr); + g_simulator.protectedAddresses.insert(NetworkAddress(addr.ip,addr.port,true,false)); safeCoordinators.push_back(addr); safeCount++; } diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 0c052bda7b..9972977caa 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -71,6 +71,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) { //AsyncFileKAIO init( MAX_OUTSTANDING, 64 ); init( MIN_SUBMIT, 10 ); + init( KAIO_PAGE_WRITE_CHECKSUM_HISTORY, 0 ); //AsyncFileNonDurable init( MAX_PRIOR_MODIFICATION_DELAY, 1.0 ); if( randomize && BUGGIFY ) MAX_PRIOR_MODIFICATION_DELAY = 10.0; diff --git a/flow/Knobs.h b/flow/Knobs.h index 04b00200fb..1c8f239c01 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -89,6 +89,7 @@ public: //AsyncFileKAIO int MAX_OUTSTANDING; int MIN_SUBMIT; + int KAIO_PAGE_WRITE_CHECKSUM_HISTORY; //AsyncFileNonDurable double MAX_PRIOR_MODIFICATION_DELAY;