diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 02053e6cc4..032e269b30 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -3667,7 +3667,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optionalidentifier) - .detail("ServerTraceID", info.debugID.get().first()); + .detail("ServerTraceID", info.debugID.get()); } break; @@ -3703,7 +3703,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optionalidentifier.empty()) { TraceEvent(SevInfo, "TransactionBeingTraced") .detail("DebugTransactionID", trLogInfo->identifier) - .detail("ServerTraceID", info.debugID.get().first()); + .detail("ServerTraceID", info.debugID.get()); } break; @@ -4097,9 +4097,9 @@ Future Transaction::onError( Error const& e ) { return e; } -ACTOR Future getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys); +ACTOR Future getStorageMetricsLargeKeyRange(Database cx, KeyRange keys); -ACTOR Future doGetStorageMetrics(Database cx, KeyRangeRef keys, Reference locationInfo) { +ACTOR Future doGetStorageMetrics(Database cx, KeyRange keys, Reference locationInfo) { loop { try { WaitMetricsRequest req(keys, StorageMetrics(), StorageMetrics()); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index aed498a16d..aff3c1c9ea 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -2075,7 +2075,7 @@ struct DDTeamCollection : ReferenceCounted { for (auto& server : serverTeam) { score += server_info[server]->teams.size(); } - TraceEvent("BuildServerTeams") + TraceEvent(SevDebug, "BuildServerTeams") .detail("Score", score) .detail("BestScore", bestScore) .detail("TeamSize", serverTeam.size()) @@ -4826,6 +4826,21 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference ddGetMetrics(GetDataDistributorMetricsRequest req, PromiseStream getShardMetricsList) { + ErrorOr>> result = wait(errorOr(brokenPromiseToNever( + getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit))))); + + if(result.isError()) { + req.reply.sendError(result.getError()); + } else { + GetDataDistributorMetricsReply rep; + rep.storageMetricsList = result.get(); + req.reply.send(rep); + } + + return Void(); +} + ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, Reference> db ) { state Future dbInfoChange = db->onChange(); if (!setDDEnabled(false, snapReq.snapUID)) { @@ -5000,16 +5015,8 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference>> result = wait(errorOr(brokenPromiseToNever( - getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit))))); - if ( result.isError() ) { - req.reply.sendError(result.getError()); - } else { - GetDataDistributorMetricsReply rep; - rep.storageMetricsList = result.get(); - req.reply.send(rep); - } + when(GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture())) { + actors.add(ddGetMetrics(req, getShardMetricsList)); } when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) { actors.add(ddSnapCreate(snapReq, db)); diff --git a/fdbserver/IPager.h b/fdbserver/IPager.h index cbd4a329c2..0f74c744a8 100644 --- a/fdbserver/IPager.h +++ b/fdbserver/IPager.h @@ -105,6 +105,10 @@ public: // Free pageID to be used again after the commit that moves oldestVersion past v virtual void freePage(LogicalPageID pageID, Version v) = 0; + // If id is remapped, delete the original as of version v and return the page it was remapped to. The caller + // is then responsible for referencing and deleting the returned page ID. + virtual LogicalPageID detachRemappedPage(LogicalPageID id, Version v) = 0; + // Returns the latest data (regardless of version) for a page by LogicalPageID // The data returned will be the later of // - the most recent committed atomic @@ -133,7 +137,7 @@ public: virtual StorageBytes getStorageBytes() const = 0; - // Count of pages in use by the pager client + // Count of pages in use by the pager client (including retained old page versions) virtual Future getUserPageCount() = 0; // Future returned is ready when pager has been initialized from disk and is ready for reads and writes. diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index fd1447e0b7..b17d248d22 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -2,6 +2,7 @@ #include #include +#include #include "flow/flow.h" #include "flow/IThreadPool.h" @@ -22,14 +23,23 @@ StringRef toStringRef(rocksdb::Slice s) { return StringRef(reinterpret_cast(s.data()), s.size()); } -rocksdb::Options getOptions() { - rocksdb::Options options; - options.create_if_missing = true; +rocksdb::ColumnFamilyOptions getCFOptions() { + rocksdb::ColumnFamilyOptions options; + options.level_compaction_dynamic_level_bytes = true; + options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES); + // Compact sstables when there's too much deleted stuff. + options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) }; return options; } -rocksdb::ColumnFamilyOptions getCFOptions() { - return {}; +rocksdb::Options getOptions() { + rocksdb::Options options({}, getCFOptions()); + options.avoid_unnecessary_blocking_io = true; + options.create_if_missing = true; + if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) { + options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM); + } + return options; } struct RocksDBKeyValueStore : IKeyValueStore { @@ -119,7 +129,6 @@ struct RocksDBKeyValueStore : IKeyValueStore { struct Reader : IThreadPoolReceiver { DB& db; - rocksdb::ReadOptions readOptions; explicit Reader(DB& db) : db(db) {} @@ -141,7 +150,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before"); } rocksdb::PinnableSlice value; - auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value); + auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value); if (a.debugID.present()) { traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After"); traceBatch.get().dump(); @@ -172,7 +181,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); } - auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value); + auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value); if (a.debugID.present()) { traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); @@ -195,33 +204,51 @@ struct RocksDBKeyValueStore : IKeyValueStore { virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } }; void action(ReadRangeAction& a) { - auto cursor = std::unique_ptr(db->NewIterator(readOptions)); Standalone result; + if (a.rowLimit == 0 || a.byteLimit == 0) { + a.result.send(result); + } int accumulatedBytes = 0; + rocksdb::Status s; if (a.rowLimit >= 0) { + rocksdb::ReadOptions options; + auto endSlice = toSlice(a.keys.end); + options.iterate_upper_bound = &endSlice; + auto cursor = std::unique_ptr(db->NewIterator(options)); cursor->Seek(toSlice(a.keys.begin)); - while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit && - accumulatedBytes < a.byteLimit) { + while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) { KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); result.push_back_deep(result.arena(), kv); + // Calling `cursor->Next()` is potentially expensive, so short-circut here just in case. + if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) { + break; + } cursor->Next(); } + s = cursor->status(); } else { + rocksdb::ReadOptions options; + auto beginSlice = toSlice(a.keys.begin); + options.iterate_lower_bound = &beginSlice; + auto cursor = std::unique_ptr(db->NewIterator(options)); cursor->SeekForPrev(toSlice(a.keys.end)); if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) { cursor->Prev(); } - - while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit && - accumulatedBytes < a.byteLimit) { + while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin) { KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); result.push_back_deep(result.arena(), kv); + // Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case. + if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) { + break; + } cursor->Prev(); } + s = cursor->status(); } - auto s = cursor->status(); + if (!s.ok()) { TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange"); } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 89202aed0d..51a4022ee2 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -93,7 +93,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( PEEK_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PEEK_RESET_INTERVAL = 20.0; init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0; init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true; - init( PEEK_STATS_INTERVAL, 10.0 ); + init( PEEK_STATS_INTERVAL, 10.0 ); init( PEEK_STATS_SLOW_AMOUNT, 0 ); init( PEEK_STATS_SLOW_RATIO, 0.5 ); @@ -236,7 +236,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false; init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600; init( DD_ENABLE_VERBOSE_TRACING, false ); if( randomize && BUGGIFY ) DD_ENABLE_VERBOSE_TRACING = true; - init( DD_SS_FAILURE_VERSIONLAG, 250000000 ); + init( DD_SS_FAILURE_VERSIONLAG, 250000000 ); init( DD_SS_ALLOWED_VERSIONLAG, 200000000 ); if( randomize && BUGGIFY ) { DD_SS_FAILURE_VERSIONLAG = deterministicRandom()->randomInt(15000000, 500000000); DD_SS_ALLOWED_VERSIONLAG = 0.75 * DD_SS_FAILURE_VERSIONLAG; } init( DD_SS_STUCK_TIME_LIMIT, 300.0 ); if( randomize && BUGGIFY ) { DD_SS_STUCK_TIME_LIMIT = 200.0 + deterministicRandom()->random01() * 100.0; } @@ -308,6 +308,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi // KeyValueStoreMemory init( REPLACE_CONTENTS_BYTES, 1e5 ); + // KeyValueStoreRocksDB + init( ROCKSDB_BACKGROUND_PARALLELISM, 0 ); + init( ROCKSDB_MEMTABLE_BYTES, 512 * 1024 * 1024 ); + // Leader election bool longLeaderElection = randomize && BUGGIFY; init( MAX_NOTIFICATIONS, 100000 ); @@ -573,7 +577,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS, 10 ); init( TRACE_LOG_PING_TIMEOUT_SECONDS, 5.0 ); init( MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 10.0 ); - init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 ); + init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 ); init( DBINFO_FAILED_DELAY, 1.0 ); // Test harness @@ -650,7 +654,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( REDWOOD_REMAP_CLEANUP_WINDOW, 50 ); init( REDWOOD_REMAP_CLEANUP_LAG, 0.1 ); init( REDWOOD_LOGGING_INTERVAL, 5.0 ); - + // Server request latency measurement init( LATENCY_SAMPLE_SIZE, 100000 ); init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 942eb0a8ad..ebbb24903f 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -243,6 +243,10 @@ public: // KeyValueStoreMemory int64_t REPLACE_CONTENTS_BYTES; + // KeyValueStoreRocksDB + int ROCKSDB_BACKGROUND_PARALLELISM; + int64_t ROCKSDB_MEMTABLE_BYTES; + // Leader election int MAX_NOTIFICATIONS; int MIN_NOTIFICATIONS; diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index e96d4159bb..42be5b8f1e 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -79,6 +79,89 @@ ACTOR Future broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool return Void(); } +struct ProxyStats { + CounterCollection cc; + Counter txnRequestIn, txnRequestOut, txnRequestErrors; + Counter txnStartIn, txnStartOut, txnStartBatch; + Counter txnSystemPriorityStartIn, txnSystemPriorityStartOut; + Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut; + Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut; + Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess, txnCommitErrors; + Counter txnConflicts; + Counter txnThrottled; + Counter commitBatchIn, commitBatchOut; + Counter mutationBytes; + Counter mutations; + Counter conflictRanges; + Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors; + Version lastCommitVersionAssigned; + + LatencySample commitLatencySample; + LatencySample grvLatencySample; + + LatencyBands commitLatencyBands; + LatencyBands grvLatencyBands; + + Future logger; + + int recentRequests; + Deque requestBuckets; + double lastBucketBegin; + double bucketInterval; + + void updateRequestBuckets() { + while(now() - lastBucketBegin > bucketInterval) { + lastBucketBegin += bucketInterval; + recentRequests -= requestBuckets.front(); + requestBuckets.pop_front(); + requestBuckets.push_back(0); + } + } + + void addRequest() { + updateRequestBuckets(); + ++recentRequests; + ++requestBuckets.back(); + } + + int getRecentRequests() { + updateRequestBuckets(); + return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now())); + } + + explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr) + : cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS), + txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), + txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), + txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), + txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), + txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), + txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc), + txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), + txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc), + txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), + txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc), + txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc), + txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc), + commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), + conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc), + keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc), + lastCommitVersionAssigned(0), + commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), + grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { + specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;}); + specialCounter(cc, "Version", [pVersion](){return *pVersion; }); + specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); }); + specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; }); + logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics"); + for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) { + requestBuckets.push_back(0); + } + } +}; + struct TransactionRateInfo { double rate; double limit; diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 8718ac7d26..82325b09c1 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -185,7 +185,7 @@ ACTOR static Future applyClearRangeMutations(Standalonerandom01() * delayTime)); - TraceEvent(delayTime > 5 ? SevWarnAlways : SevInfo, "FastRestoreApplierClearRangeMutationsStart", applierID) + TraceEvent(delayTime > 5 ? SevWarnAlways : SevDebug, "FastRestoreApplierClearRangeMutationsStart", applierID) .detail("BatchIndex", batchIndex) .detail("Ranges", ranges.size()) .detail("DelayTime", delayTime); @@ -296,7 +296,7 @@ ACTOR static Future getAndComputeStagingKeys( for (auto& key : incompleteStagingKeys) { if (!fValues[i].get().present()) { // Key not exist in DB // if condition: fValues[i].Valid() && fValues[i].isReady() && !fValues[i].isError() && - TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID) + TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID) .suppressFor(5.0) .detail("BatchIndex", batchIndex) .detail("Key", key.first) @@ -304,7 +304,7 @@ ACTOR static Future getAndComputeStagingKeys( .detail("PendingMutations", key.second->second.pendingMutations.size()) .detail("StagingKeyType", getTypeString(key.second->second.type)); for (auto& vm : key.second->second.pendingMutations) { - TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB") + TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB") .detail("PendingMutationVersion", vm.first.toString()) .detail("PendingMutation", vm.second.toString()); } diff --git a/fdbserver/RestoreController.actor.cpp b/fdbserver/RestoreController.actor.cpp index d06bdf95f1..34bb155e78 100644 --- a/fdbserver/RestoreController.actor.cpp +++ b/fdbserver/RestoreController.actor.cpp @@ -300,7 +300,7 @@ ACTOR static Future processRestoreRequest(Reference logFiles; state std::vector allFiles; state Version minRangeVersion = MAX_VERSION; - state ActorCollection actors(false); + state Future error = actorCollection(self->addActor.getFuture()); self->initBackupContainer(request.url); @@ -356,7 +356,7 @@ ACTOR static Future processRestoreRequest(ReferenceaddActor.send(monitorFinishedVersion(self, request)); state std::vector::iterator versionBatch = versionBatches.begin(); for (; versionBatch != versionBatches.end(); versionBatch++) { while (self->runningVersionBatches.get() >= SERVER_KNOBS->FASTRESTORE_VB_PARALLELISM && !releaseVBOutOfOrder) { @@ -378,7 +378,11 @@ ACTOR static Future processRestoreRequest(ReferenceFASTRESTORE_VB_LAUNCH_DELAY)); } - wait(waitForAll(fBatches)); + try { + wait(waitForAll(fBatches) || error); + } catch (Error& e) { + TraceEvent(SevError, "FastRestoreControllerDispatchVersionBatchesUnexpectedError").error(e); + } TraceEvent("FastRestoreController").detail("RestoreToVersion", request.targetVersion); return request.targetVersion; diff --git a/fdbserver/RestoreController.actor.h b/fdbserver/RestoreController.actor.h index 99e03e31d9..98361e25bc 100644 --- a/fdbserver/RestoreController.actor.h +++ b/fdbserver/RestoreController.actor.h @@ -149,6 +149,10 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted rolesHeartBeatTime; // Key: role id; Value: most recent time controller receives heart beat + // addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error. + // addActor is used to create the actorCollection when the RestoreController is created + PromiseStream> addActor; + void addref() { return ReferenceCounted::addref(); } void delref() { return ReferenceCounted::delref(); } diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 146fe2b97f..ea9ba44036 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -92,7 +92,18 @@ std::string toString(LogicalPageID id) { if (id == invalidLogicalPageID) { return "LogicalPageID{invalid}"; } - return format("LogicalPageID{%" PRId64 "}", id); + return format("LogicalPageID{%u}", id); +} + +std::string toString(Version v) { + if (v == invalidVersion) { + return "invalidVersion"; + } + return format("@%" PRId64, v); +} + +std::string toString(bool b) { + return b ? "true" : "false"; } template @@ -136,6 +147,11 @@ std::string toString(const Optional& o) { return ""; } +template +std::string toString(const std::pair& o) { + return format("{%s, %s}", toString(o.first).c_str(), toString(o.second).c_str()); +} + // A FIFO queue of T stored as a linked list of pages. // Main operations are pop(), pushBack(), pushFront(), and flush(). // @@ -765,6 +781,8 @@ struct RedwoodMetrics { unsigned int lazyClearRequeueExt; unsigned int lazyClearFree; unsigned int lazyClearFreeExt; + unsigned int forceUpdate; + unsigned int detachChild; double buildStoredPct; double buildFillPct; unsigned int buildItemCount; @@ -797,6 +815,12 @@ struct RedwoodMetrics { unsigned int btreeLeafPreload; unsigned int btreeLeafPreloadExt; + // Return number of pages read or written, from cache or disk + unsigned int pageOps() const { + // All page reads are either a cache hit, probe hit, or a disk read + return pagerDiskWrite + pagerDiskRead + pagerCacheHit + pagerProbeHit; + } + double startTime; Level& level(unsigned int level) { @@ -807,9 +831,9 @@ struct RedwoodMetrics { return levels[level - 1]; } - // This will populate a trace event and/or a string with Redwood metrics. The string is a - // reasonably well formatted page of information - void getFields(TraceEvent* e, std::string* s = nullptr) { + // This will populate a trace event and/or a string with Redwood metrics. + // The string is a reasonably well formatted page of information + void getFields(TraceEvent* e, std::string* s = nullptr, bool skipZeroes = false) { std::pair metrics[] = { { "BTreePreload", btreeLeafPreload }, { "BTreePreloadExt", btreeLeafPreloadExt }, { "", 0 }, @@ -837,21 +861,26 @@ struct RedwoodMetrics { { "PagerRemapCopy", pagerRemapCopy }, { "PagerRemapSkip", pagerRemapSkip } }; double elapsed = now() - startTime; - for (auto& m : metrics) { - if (*m.first == '\0') { - if (s != nullptr) { - *s += "\n"; - } - } else { - if (s != nullptr) { - *s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed)); - } - if (e != nullptr) { + + if (e != nullptr) { + for (auto& m : metrics) { + char c = m.first[0]; + if(c != 0 && (!skipZeroes || m.second != 0) ) { e->detail(m.first, m.second); } } } + if(s != nullptr) { + for (auto& m : metrics) { + if (*m.first == '\0') { + *s += "\n"; + } else if(!skipZeroes || m.second != 0) { + *s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed)); + } + } + } + for (int i = 0; i < btreeLevels; ++i) { auto& level = levels[i]; std::pair metrics[] = { @@ -869,37 +898,44 @@ struct RedwoodMetrics { { "LazyClear", level.lazyClearFree }, { "LazyClearExt", level.lazyClearFreeExt }, { "", 0 }, + { "ForceUpdate", level.forceUpdate }, + { "DetachChild", level.detachChild }, + { "", 0 }, { "-BldAvgCount", level.pageBuild ? level.buildItemCount / level.pageBuild : 0 }, { "-BldAvgFillPct", level.pageBuild ? level.buildFillPct / level.pageBuild * 100 : 0 }, { "-BldAvgStoredPct", level.pageBuild ? level.buildStoredPct / level.pageBuild * 100 : 0 }, { "", 0 }, { "-ModAvgCount", level.pageModify ? level.modifyItemCount / level.pageModify : 0 }, { "-ModAvgFillPct", level.pageModify ? level.modifyFillPct / level.pageModify * 100 : 0 }, - { "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 } + { "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 }, + { "", 0 }, }; + if(e != nullptr) { + for (auto& m : metrics) { + char c = m.first[0]; + if(c != 0 && (!skipZeroes || m.second != 0) ) { + e->detail(format("L%d%s", i + 1, m.first + (c == '-' ? 1 : 0)), m.second); + } + } + } + if (s != nullptr) { *s += format("\nLevel %d\n\t", i + 1); - } - for (auto& m : metrics) { - const char* name = m.first; - bool rate = elapsed != 0; - if (*name == '-') { - ++name; - rate = false; - } - if (*name == '\0') { - if (s != nullptr) { + for (auto& m : metrics) { + const char* name = m.first; + bool rate = elapsed != 0; + if (*name == '-') { + ++name; + rate = false; + } + + if (*name == '\0') { *s += "\n\t"; - } - } else { - if (s != nullptr) { + } else if(!skipZeroes || m.second != 0) { *s += format("%-15s %8u %8u/s ", name, m.second, rate ? int(m.second / elapsed) : 0); } - if (e != nullptr) { - e->detail(format("L%d%s", i + 1, name), m.second); - } } } } @@ -1124,22 +1160,32 @@ public: }; struct RemappedPage { - RemappedPage() : version(invalidVersion) {} - RemappedPage(Version v, LogicalPageID o, LogicalPageID n) : version(v), originalPageID(o), newPageID(n) {} + enum Type { NONE = 'N', REMAP = 'R', FREE = 'F', DETACH = 'D' }; + RemappedPage(Version v = invalidVersion, LogicalPageID o = invalidLogicalPageID, LogicalPageID n = invalidLogicalPageID) : version(v), originalPageID(o), newPageID(n) {} Version version; LogicalPageID originalPageID; LogicalPageID newPageID; - bool isFree() const { - return newPageID == invalidLogicalPageID; + static Type getTypeOf(LogicalPageID newPageID) { + if(newPageID == invalidLogicalPageID) { + return FREE; + } + if(newPageID == 0) { + return DETACH; + } + return REMAP; + } + + Type getType() const { + return getTypeOf(newPageID); } bool operator<(const RemappedPage& rhs) { return version < rhs.version; } std::string toString() const { - return format("RemappedPage(%s -> %s @%" PRId64 "}", ::toString(originalPageID).c_str(), - ::toString(newPageID).c_str(), version); + return format("RemappedPage(%c: %s -> %s %s}", getType(), ::toString(originalPageID).c_str(), + ::toString(newPageID).c_str(), ::toString(version).c_str()); } }; @@ -1484,6 +1530,35 @@ public: } } + LogicalPageID detachRemappedPage(LogicalPageID pageID, Version v) override { + auto i = remappedPages.find(pageID); + if(i == remappedPages.end()) { + // Page is not remapped + return invalidLogicalPageID; + } + + // Get the page that id was most recently remapped to + auto iLast = i->second.rbegin(); + LogicalPageID newID = iLast->second; + ASSERT(RemappedPage::getTypeOf(newID) == RemappedPage::REMAP); + + // If the last change remap was also at v then change the remap to a delete, as it's essentially + // the same as the original page being deleted at that version and newID being used from then on. + if(iLast->first == v) { + debug_printf("DWALPager(%s) op=detachDelete originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), + toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion); + iLast->second = invalidLogicalPageID; + remapQueue.pushBack(RemappedPage{ v, pageID, invalidLogicalPageID }); + } else { + debug_printf("DWALPager(%s) op=detach originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(), + toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion); + // Mark id as converted to its last remapped location as of v + i->second[v] = 0; + remapQueue.pushBack(RemappedPage{ v, pageID, 0 }); + } + return newID; + } + void freePage(LogicalPageID pageID, Version v) override { // If pageID has been remapped, then it can't be freed until all existing remaps for that page have been undone, // so queue it for later deletion @@ -1588,13 +1663,13 @@ public: auto j = i->second.upper_bound(v); if (j != i->second.begin()) { --j; - debug_printf("DWALPager(%s) read %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(), + debug_printf("DWALPager(%s) op=readAtVersionRemapped %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(), v, toString(j->second).c_str()); pageID = j->second; ASSERT(pageID != invalidLogicalPageID); } } else { - debug_printf("DWALPager(%s) read %s @%" PRId64 " (not remapped)\n", filename.c_str(), + debug_printf("DWALPager(%s) op=readAtVersionNotRemapped %s @%" PRId64 " (not remapped)\n", filename.c_str(), toString(pageID).c_str(), v); } @@ -1623,29 +1698,126 @@ public: return std::min(pLastCommittedHeader->oldestVersion, snapshots.front().version); } - ACTOR static Future remapCopyAndFree(DWALPager* self, RemappedPage p, VersionToPageMapT *m, VersionToPageMapT::iterator i) { - debug_printf("DWALPager(%s) remapCleanup copyAndFree %s\n", self->filename.c_str(), p.toString().c_str()); + ACTOR static Future removeRemapEntry(DWALPager* self, RemappedPage p, Version oldestRetainedVersion) { + // Get iterator to the versioned page map entry for the original page + state PageToVersionedMapT::iterator iPageMapPair = self->remappedPages.find(p.originalPageID); + // The iterator must be valid and not empty and its first page map entry must match p's version + ASSERT(iPageMapPair != self->remappedPages.end()); + ASSERT(!iPageMapPair->second.empty()); + state VersionToPageMapT::iterator iVersionPagePair = iPageMapPair->second.find(p.version); + ASSERT(iVersionPagePair != iPageMapPair->second.end()); - // Read the data from the page that the original was mapped to - Reference data = wait(self->readPage(p.newPageID, false)); + RemappedPage::Type firstType = p.getType(); + state RemappedPage::Type secondType; + bool secondAfterOldestRetainedVersion = false; + state bool deleteAtSameVersion = false; + if(p.newPageID == iVersionPagePair->second) { + auto nextEntry = iVersionPagePair; + ++nextEntry; + if(nextEntry == iPageMapPair->second.end()) { + secondType = RemappedPage::NONE; + } else { + secondType = RemappedPage::getTypeOf(nextEntry->second); + secondAfterOldestRetainedVersion = nextEntry->first >= oldestRetainedVersion; + } + } else { + ASSERT(iVersionPagePair->second == invalidLogicalPageID); + secondType = RemappedPage::FREE; + deleteAtSameVersion = true; + } + ASSERT(firstType == RemappedPage::REMAP || secondType == RemappedPage::NONE); - // Write the data to the original page so it can be read using its original pageID - self->updatePage(p.originalPageID, data); - ++g_redwoodMetrics.pagerRemapCopy; + // Scenarios and actions to take: + // + // The first letter (firstType) is the type of the entry just popped from the remap queue. + // The second letter (secondType) is the type of the next item in the queue for the same + // original page ID, if present. If not present, secondType will be NONE. + // + // Since the next item can be arbitrarily ahead in the queue, secondType is determined by + // looking at the remappedPages structure. + // + // R == Remap F == Free D == Detach | == oldestRetaineedVersion + // + // R R | free new ID + // R F | free new ID if R and D are at different versions + // R D | do nothing + // R | R copy new to original ID, free new ID + // R | F copy new to original ID, free new ID + // R | D copy new to original ID + // R | copy new to original ID, free new ID + // F | free original ID + // D | free original ID + // + // Note that + // + // Special case: Page is detached while it is being read in remapCopyAndFree() + // Initial state: R | + // Start remapCopyAndFree(), intending to copy new, ID to originalID and free newID + // New state: R | D + // Read of newID completes. + // Copy new contents over original, do NOT free new ID + // Later popped state: D | + // free original ID + // + state bool freeNewID = (firstType == RemappedPage::REMAP && secondType != RemappedPage::DETACH && !deleteAtSameVersion); + state bool copyNewToOriginal = (firstType == RemappedPage::REMAP && (secondAfterOldestRetainedVersion || secondType == RemappedPage::NONE)); + state bool freeOriginalID = (firstType == RemappedPage::FREE || firstType == RemappedPage::DETACH); - // Now that the page data has been copied to the original page, the versioned page map entry is no longer - // needed and the new page ID can be freed as of the next commit. - m->erase(i); - self->freeUnmappedPage(p.newPageID, 0); - ++g_redwoodMetrics.pagerRemapFree; + debug_printf("DWALPager(%s) remapCleanup %s secondType=%c mapEntry=%s oldestRetainedVersion=%" PRId64 " \n", + self->filename.c_str(), p.toString().c_str(), secondType, ::toString(*iVersionPagePair).c_str(), oldestRetainedVersion); + + if(copyNewToOriginal) { + debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str()); + + // Read the data from the page that the original was mapped to + Reference data = wait(self->readPage(p.newPageID, false, true)); + + // Write the data to the original page so it can be read using its original pageID + self->updatePage(p.originalPageID, data); + ++g_redwoodMetrics.pagerRemapCopy; + } else if (firstType == RemappedPage::REMAP) { + ++g_redwoodMetrics.pagerRemapSkip; + } + + // Now that the page contents have been copied to the original page, if the corresponding map entry + // represented the remap and there wasn't a delete later in the queue at p for the same version then + // erase the entry. + if(!deleteAtSameVersion) { + debug_printf("DWALPager(%s) remapCleanup deleting map entry %s\n", self->filename.c_str(), p.toString().c_str()); + // Erase the entry and set iVersionPagePair to the next entry or end + iVersionPagePair = iPageMapPair->second.erase(iVersionPagePair); + + // If the map is now empty, delete it + if(iPageMapPair->second.empty()) { + debug_printf("DWALPager(%s) remapCleanup deleting empty map %s\n", self->filename.c_str(), p.toString().c_str()); + self->remappedPages.erase(iPageMapPair); + } else if(freeNewID && secondType == RemappedPage::NONE && iVersionPagePair != iPageMapPair->second.end() && RemappedPage::getTypeOf(iVersionPagePair->second) == RemappedPage::DETACH) { + // If we intend to free the new ID and there was no map entry, one could have been added during the wait above. + // If so, and if it was a detach operation, then we can't free the new page ID as its lifetime will be managed + // by the client starting at some later version. + freeNewID = false; + } + } + + if(freeNewID) { + debug_printf("DWALPager(%s) remapCleanup freeNew %s\n", self->filename.c_str(), p.toString().c_str()); + self->freeUnmappedPage(p.newPageID, 0); + ++g_redwoodMetrics.pagerRemapFree; + } + + if(freeOriginalID) { + debug_printf("DWALPager(%s) remapCleanup freeOriginal %s\n", self->filename.c_str(), p.toString().c_str()); + self->freeUnmappedPage(p.originalPageID, 0); + ++g_redwoodMetrics.pagerRemapFree; + } return Void(); } ACTOR static Future remapCleanup(DWALPager* self) { - state ActorCollection copies(true); + state ActorCollection tasks(true); state Promise signal; - copies.add(signal.getFuture()); + tasks.add(signal.getFuture()); self->remapCleanupStop = false; @@ -1654,8 +1826,7 @@ public: state Version oldestRetainedVersion = self->effectiveOldestVersion(); // Cutoff is the version we can pop to - state RemappedPage cutoff; - cutoff.version = oldestRetainedVersion - self->remapCleanupWindow; + state RemappedPage cutoff(oldestRetainedVersion - self->remapCleanupWindow); // Minimum version we must pop to before obeying stop command. state Version minStopVersion = cutoff.version - (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG); @@ -1663,46 +1834,15 @@ public: loop { state Optional p = wait(self->remapQueue.pop(cutoff)); debug_printf("DWALPager(%s) remapCleanup popped %s\n", self->filename.c_str(), ::toString(p).c_str()); + + // Stop if we have reached the cutoff version, which is the start of the cleanup coalescing window if (!p.present()) { break; } - // Get iterator to the versioned page map entry for the original page - auto iPageMapPair = self->remappedPages.find(p.get().originalPageID); - // The iterator must be valid and not empty and its first page map entry must match p's version - ASSERT(iPageMapPair != self->remappedPages.end()); - ASSERT(!iPageMapPair->second.empty()); - auto iVersionPagePair = iPageMapPair->second.begin(); - ASSERT(iVersionPagePair->first == p.get().version); - - // If this is a free page entry then free the original page ID - if(p.get().isFree()) { - debug_printf("DWALPager(%s) remapCleanup free %s\n", self->filename.c_str(), - p.get().toString().c_str()); - self->freeUnmappedPage(p.get().originalPageID, 0); - ++g_redwoodMetrics.pagerRemapFree; - - // There can't be any more entries in the page map after this one so verify that - // the map size is 1 and erase the map for p's original page ID. - ASSERT(iPageMapPair->second.size() == 1); - self->remappedPages.erase(iPageMapPair); - } - else { - // If there is no next page map entry or there is but it is after the oldest retained version - // then p must be copied to unmap it. - auto iNextVersionPagePair = iVersionPagePair; - ++iNextVersionPagePair; - if(iNextVersionPagePair == iPageMapPair->second.end() || iNextVersionPagePair->first > oldestRetainedVersion) { - // Copy the remapped page to the original so it can be freed. - copies.add(remapCopyAndFree(self, p.get(), &iPageMapPair->second, iVersionPagePair)); - } - else { - debug_printf("DWALPager(%s) remapCleanup skipAndFree %s\n", self->filename.c_str(), p.get().toString().c_str()); - self->freeUnmappedPage(p.get().newPageID, 0); - ++g_redwoodMetrics.pagerRemapFree; - ++g_redwoodMetrics.pagerRemapSkip; - iPageMapPair->second.erase(iVersionPagePair); - } + Future task = removeRemapEntry(self, p.get(), oldestRetainedVersion); + if(!task.isReady()) { + tasks.add(task); } // If the stop flag is set and we've reached the minimum stop version according the the allowed lag then stop. @@ -1713,7 +1853,7 @@ public: debug_printf("DWALPager(%s) remapCleanup stopped (stop=%d)\n", self->filename.c_str(), self->remapCleanupStop); signal.send(Void()); - wait(copies.getResult()); + wait(tasks.getResult()); return Void(); } @@ -1889,8 +2029,7 @@ public: Future getUserPageCount() override { return map(getUserPageCount_cleanup(this), [=](Void) { int64_t userPages = pHeader->pageCount - 2 - freeList.numPages - freeList.numEntries - - delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages - - remapQueue.numEntries; + delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages; debug_printf("DWALPager(%s) userPages=%" PRId64 " totalPageCount=%" PRId64 " freeQueuePages=%" PRId64 " freeQueueCount=%" PRId64 " delayedFreeQueuePages=%" PRId64 " delayedFreeQueueCount=%" PRId64 @@ -2871,6 +3010,38 @@ public: typedef FIFOQueue LazyClearQueueT; + struct ParentInfo { + ParentInfo() { + count = 0; + bits = 0; + } + void clear() { + count = 0; + bits = 0; + } + + static uint32_t mask(LogicalPageID id) { + return 1 << (id & 31); + } + + void pageUpdated(LogicalPageID child) { + auto m = mask(child); + if((bits & m) == 0) { + bits |= m; + ++count; + } + } + + bool maybeUpdated(LogicalPageID child) { + return (mask(child) & bits) != 0; + } + + uint32_t bits; + int count; + }; + + typedef std::unordered_map ParentInfoMapT; + #pragma pack(push, 1) struct MetaKey { static constexpr int FORMAT_VERSION = 8; @@ -2923,8 +3094,8 @@ public: // durable once the following call to commit() returns void set(KeyValueRef keyValue) override { ++g_redwoodMetrics.opSet; - ++g_redwoodMetrics.opSetKeyBytes += keyValue.key.size(); - ++g_redwoodMetrics.opSetValueBytes += keyValue.value.size(); + g_redwoodMetrics.opSetKeyBytes += keyValue.key.size(); + g_redwoodMetrics.opSetValueBytes += keyValue.value.size(); m_pBuffer->insert(keyValue.key).mutation().setBoundaryValue(m_pBuffer->copyToArena(keyValue.value)); } @@ -3022,7 +3193,7 @@ public: // If this page is height 2, then the children are leaves so free them directly if (btPage.height == 2) { debug_printf("LazyClear: freeing child %s\n", toString(btChildPageID).c_str()); - self->freeBtreePage(btChildPageID, v); + self->freeBTreePage(btChildPageID, v); freedPages += btChildPageID.size(); metrics.lazyClearFree += 1; metrics.lazyClearFreeExt += (btChildPageID.size() - 1); @@ -3041,7 +3212,7 @@ public: // Free the page, now that its children have either been freed or queued debug_printf("LazyClear: freeing queue entry %s\n", toString(entry.pageID).c_str()); - self->freeBtreePage(entry.pageID, v); + self->freeBTreePage(entry.pageID, v); freedPages += entry.pageID.size(); metrics.lazyClearFree += 1; metrics.lazyClearFreeExt += entry.pageID.size() - 1; @@ -3146,7 +3317,7 @@ public: return commit_impl(this); } - ACTOR static Future destroyAndCheckSanity_impl(VersionedBTree* self) { + ACTOR static Future clearAllAndCheckSanity_impl(VersionedBTree* self) { ASSERT(g_network->isSimulated()); debug_printf("Clearing tree.\n"); @@ -3191,7 +3362,7 @@ public: return Void(); } - Future destroyAndCheckSanity() { return destroyAndCheckSanity_impl(this); } + Future clearAllAndCheckSanity() { return clearAllAndCheckSanity_impl(this); } private: // Represents a change to a single key - set, clear, or atomic op @@ -3412,6 +3583,8 @@ private: Future m_init; std::string m_name; int m_blockSize; + std::unordered_map parents; + ParentInfoMapT childUpdateTracker; // MetaKey changes size so allocate space for it to expand into union { @@ -3603,7 +3776,7 @@ private: // must be rewritten anyway to count for the change in child count or child links. // Free the old IDs, but only once (before the first output record is added). if (records.empty()) { - self->freeBtreePage(previousID, v); + self->freeBTreePage(previousID, v); } for (p = 0; p < pages.size(); ++p) { LogicalPageID id = wait(self->m_pager->newPageID()); @@ -3771,7 +3944,7 @@ private: } } - void freeBtreePage(BTreePageIDRef btPageID, Version v) { + void freeBTreePage(BTreePageIDRef btPageID, Version v) { // Free individual pages at v for (LogicalPageID id : btPageID) { m_pager->freePage(id, v); @@ -3780,7 +3953,7 @@ private: // Write new version of pageID at version v using page as its data. // Attempts to reuse original id(s) in btPageID, returns BTreePageID. - ACTOR static Future updateBtreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena, + ACTOR static Future updateBTreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena, Reference page, Version writeVersion) { state BTreePageIDRef newID; newID.resize(*arena, oldID.size()); @@ -3878,19 +4051,23 @@ private: // If the last record in the range has a null link then this will be null. const RedwoodRecordRef* expectedUpperBound; + bool inPlaceUpdate; + // CommitSubtree will call one of the following three functions based on its exit path // Subtree was cleared. void cleared() { + inPlaceUpdate = false; childrenChanged = true; expectedUpperBound = nullptr; } // Page was updated in-place through edits and written to maybeNewID void updatedInPlace(BTreePageIDRef maybeNewID, BTreePage* btPage, int capacity) { + inPlaceUpdate = true; auto& metrics = g_redwoodMetrics.level(btPage->height); metrics.pageModify += 1; - metrics.pageModify += (maybeNewID.size() - 1); + metrics.pageModifyExt += (maybeNewID.size() - 1); metrics.modifyFillPct += (double)btPage->size() / capacity; metrics.modifyStoredPct += (double)btPage->kvBytes / capacity; metrics.modifyItemCount += btPage->tree().numItems; @@ -3912,6 +4089,7 @@ private: // writePages() was used to build 1 or more replacement pages. void rebuilt(Standalone> newRecords) { + inPlaceUpdate = false; newLinks = newRecords; childrenChanged = true; @@ -3952,14 +4130,15 @@ private: struct InternalPageModifier { InternalPageModifier() {} - InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating) - : btPage(p), m(m), updating(updating), changesMade(false) {} + InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating, ParentInfo *parentInfo) + : btPage(p), m(m), updating(updating), changesMade(false), parentInfo(parentInfo) {} bool updating; BTreePage* btPage; BTreePage::BinaryTree::Mirror* m; Standalone> rebuild; bool changesMade; + ParentInfo *parentInfo; bool empty() const { if (updating) { @@ -4055,6 +4234,13 @@ private: // endpoint. changesMade = true; } else { + + if(u.inPlaceUpdate) { + for(auto id : u.decodeLowerBound->getChildPage()) { + parentInfo->pageUpdated(id); + } + } + keep(u.cBegin, u.cEnd); } @@ -4226,7 +4412,7 @@ private: debug_printf("%s Inserted %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str()); } else { - debug_printf("%s Inserted failed for %s [mutation, boundary start]\n", context.c_str(), + debug_printf("%s Insert failed for %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str()); switchToLinearMerge(); } @@ -4339,12 +4525,12 @@ private: // If the tree is now empty, delete the page if (deltaTree.numItems == 0) { update->cleared(); - self->freeBtreePage(rootID, writeVersion); + self->freeBTreePage(rootID, writeVersion); debug_printf("%s Page updates cleared all entries, returning %s\n", context.c_str(), toString(*update).c_str()); } else { // Otherwise update it. - BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(), + BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(), page.castTo(), writeVersion)); update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize); @@ -4357,7 +4543,7 @@ private: // If everything in the page was deleted then this page should be deleted as of the new version if (merged.empty()) { update->cleared(); - self->freeBtreePage(rootID, writeVersion); + self->freeBTreePage(rootID, writeVersion); debug_printf("%s All leaf page contents were cleared, returning %s\n", context.c_str(), toString(*update).c_str()); @@ -4511,7 +4697,7 @@ private: if (btPage->height == 2) { debug_printf("%s: freeing child page in cleared subtree range: %s\n", context.c_str(), ::toString(rec.getChildPage()).c_str()); - self->freeBtreePage(rec.getChildPage(), writeVersion); + self->freeBTreePage(rec.getChildPage(), writeVersion); } else { debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n", context.c_str(), ::toString(rec.getChildPage()).c_str()); @@ -4547,7 +4733,10 @@ private: wait(waitForAll(recursions)); debug_printf("%s Recursions done, processing slice updates.\n", context.c_str()); - state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate); + // Note: parentInfo could be invalid after a wait and must be re-initialized. + // All uses below occur before waits so no reinitialization is done. + state ParentInfo *parentInfo = &self->childUpdateTracker[rootID.front()]; + state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate, parentInfo); // Apply the possible changes for each subtree range recursed to, except the last one. // For each range, the expected next record, if any, is checked against the first boundary @@ -4565,25 +4754,103 @@ private: context.c_str(), m.changesMade, update->toString().c_str()); m.applyUpdate(*slices.back(), m.changesMade ? update->subtreeUpperBound : update->decodeUpperBound); + state bool detachChildren = (parentInfo->count > 2); + state bool forceUpdate = false; + + if(!m.changesMade && detachChildren) { + debug_printf("%s Internal page forced rewrite because at least %d children have been updated in-place.\n", context.c_str(), parentInfo->count); + forceUpdate = true; + if(!m.updating) { + page = self->cloneForUpdate(page); + cursor = getCursor(page); + btPage = (BTreePage*)page->begin(); + m.btPage = btPage; + m.m = cursor.mirror; + m.updating = true; + } + ++g_redwoodMetrics.level(btPage->height).forceUpdate; + } + // If page contents have changed - if (m.changesMade) { - if ((m.empty())) { + if (m.changesMade || forceUpdate) { + if (m.empty()) { update->cleared(); debug_printf("%s All internal page children were deleted so deleting this page too, returning %s\n", context.c_str(), toString(*update).c_str()); - self->freeBtreePage(rootID, writeVersion); + self->freeBTreePage(rootID, writeVersion); + self->childUpdateTracker.erase(rootID.front()); } else { if (m.updating) { - // Page was updated in place - BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(), + // Page was updated in place (or being forced to be updated in place to update child page ids) + debug_printf("%s Internal page modified in-place tryUpdate=%d forceUpdate=%d detachChildren=%d\n", context.c_str(), tryToUpdate, forceUpdate, detachChildren); + + if(detachChildren) { + int detached = 0; + cursor.moveFirst(); + auto &stats = g_redwoodMetrics.level(btPage->height); + while(cursor.valid()) { + if(cursor.get().value.present()) { + for(auto &p : cursor.get().getChildPage()) { + if(parentInfo->maybeUpdated(p)) { + LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion); + if(newID != invalidLogicalPageID) { + debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID); + p = newID; + ++stats.detachChild; + ++detached; + } + } + } + } + cursor.moveNext(); + } + parentInfo->clear(); + if(forceUpdate && detached == 0) { + debug_printf("%s No children detached during forced update, returning %s\n", context.c_str(), toString(*update).c_str()); + return Void(); + } + } + + BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(), page.castTo(), writeVersion)); + debug_printf( + "%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n", context.c_str(), toString(writeVersion).c_str(), + btPage->toString(false, newID, snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound) + .c_str()); update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize); debug_printf("%s Internal page updated in-place, returning %s\n", context.c_str(), toString(*update).c_str()); } else { // Page was rebuilt, possibly split. - debug_printf("%s Internal page modified, creating replacements.\n", context.c_str()); + debug_printf("%s Internal page could not be modified, rebuilding replacement(s).\n", context.c_str()); + + if(detachChildren) { + auto &stats = g_redwoodMetrics.level(btPage->height); + for(auto &rec : m.rebuild) { + if(rec.value.present()) { + BTreePageIDRef oldPages = rec.getChildPage(); + BTreePageIDRef newPages; + for(int i = 0; i < oldPages.size(); ++i) { + LogicalPageID p = oldPages[i]; + if(parentInfo->maybeUpdated(p)) { + LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion); + if(newID != invalidLogicalPageID) { + // Rebuild record values reference original page memory so make a copy + if(newPages.empty()) { + newPages = BTreePageIDRef(m.rebuild.arena(), oldPages); + rec.setChildPage(newPages); + } + debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID); + newPages[i] = newID; + ++stats.detachChild; + } + } + } + } + } + parentInfo->clear(); + } Standalone> newChildEntries = wait(writePages(self, update->subtreeLowerBound, update->subtreeUpperBound, m.rebuild, @@ -4985,7 +5252,7 @@ public: bool isValid() const { return valid; } std::string toString() const { - std::string r; + std::string r = format("{ptr=%p %s ", this, ::toString(pager->getVersion()).c_str()); for (int i = 0; i < path.size(); ++i) { r += format("[%d/%d: %s] ", i + 1, path.size(), path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage->isLeaf()).c_str() @@ -4994,6 +5261,7 @@ public: if (!valid) { r += " (invalid) "; } + r += "}"; return r; } @@ -5014,6 +5282,8 @@ public: const RedwoodRecordRef& upperBound) { Reference& page = pages[id.front()]; if (page.isValid()) { + // The pager won't see this access so count it as a cache hit + ++g_redwoodMetrics.pagerCacheHit; path.push_back(arena, { (BTreePage*)page->begin(), getCursor(page) }); return Void(); } @@ -6960,24 +7230,23 @@ TEST_CASE("!/redwood/correctness/btree") { state int pageSize = shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400)); + state int64_t targetPageOps = shortTest ? 50000 : 1000000; state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .01); state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2); state int maxValueSize = randomSize(pageSize * 25); state int maxCommitSize = shortTest ? 1000 : randomSize(std::min((maxKeySize + maxValueSize) * 20000, 10e6)); - state int mutationBytesTarget = - shortTest ? 100000 : randomSize(std::min(maxCommitSize * 100, pageSize * 100000)); state double clearProbability = deterministicRandom()->random01() * .1; state double clearSingleKeyProbability = deterministicRandom()->random01(); state double clearPostSetProbability = deterministicRandom()->random01() * .1; state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3); state double advanceOldVersionProbability = deterministicRandom()->random01(); - state double maxDuration = 60; state int64_t cacheSizeBytes = pagerMemoryOnly ? 2e9 : (BUGGIFY ? deterministicRandom()->randomInt(1, 10 * pageSize) : 0); state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8); state Version remapCleanupWindow = deterministicRandom()->randomInt64(0, versionIncrement * 50); printf("\n"); + printf("targetPageOps: %" PRId64 "\n", targetPageOps); printf("pagerMemoryOnly: %d\n", pagerMemoryOnly); printf("serialTest: %d\n", serialTest); printf("shortTest: %d\n", shortTest); @@ -6985,7 +7254,6 @@ TEST_CASE("!/redwood/correctness/btree") { printf("maxKeySize: %d\n", maxKeySize); printf("maxValueSize: %d\n", maxValueSize); printf("maxCommitSize: %d\n", maxCommitSize); - printf("mutationBytesTarget: %d\n", mutationBytesTarget); printf("clearProbability: %f\n", clearProbability); printf("clearSingleKeyProbability: %f\n", clearSingleKeyProbability); printf("clearPostSetProbability: %f\n", clearPostSetProbability); @@ -7000,8 +7268,6 @@ TEST_CASE("!/redwood/correctness/btree") { deleteFile(pagerFile); printf("Initializing...\n"); - state double startTime = now(); - pager = new DWALPager(pageSize, pagerFile, cacheSizeBytes, remapCleanupWindow, pagerMemoryOnly); state VersionedBTree* btree = new VersionedBTree(pager, pagerFile); wait(btree->init()); @@ -7028,14 +7294,12 @@ TEST_CASE("!/redwood/correctness/btree") { state PromiseStream committedVersions; state Future verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest); state Future randomTask = serialTest ? Void() : (randomReader(btree) || btree->getError()); + committedVersions.send(lastVer); state Future commit = Void(); + state int64_t totalPageOps = 0; - while (mutationBytes.get() < mutationBytesTarget && (now() - startTime) < maxDuration) { - if (now() - startTime > 600) { - mutationBytesTarget = mutationBytes.get(); - } - + while (totalPageOps < targetPageOps) { // Sometimes increment the version if (deterministicRandom()->random01() < 0.10) { ++version; @@ -7131,14 +7395,12 @@ TEST_CASE("!/redwood/correctness/btree") { } // Commit at end or after this commit's mutation bytes are reached - if (mutationBytes.get() >= mutationBytesTarget || mutationBytesThisCommit >= mutationBytesTargetThisCommit) { + if (totalPageOps >= targetPageOps || mutationBytesThisCommit >= mutationBytesTargetThisCommit) { // Wait for previous commit to finish wait(commit); - printf("Committed. Next commit %d bytes, %" PRId64 - "/%d (%.2f%%) Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n", - mutationBytesThisCommit, mutationBytes.get(), mutationBytesTarget, - (double)mutationBytes.get() / mutationBytesTarget * 100, - (keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6, + printf("Committed. Next commit %d bytes, %" PRId64 " bytes.", mutationBytesThisCommit, mutationBytes.get()); + printf(" Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n", + (keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6, mutationBytes.rate() / 1e6); Version v = version; // Avoid capture of version as a member of *this @@ -7151,8 +7413,12 @@ TEST_CASE("!/redwood/correctness/btree") { btree->getOldestVersion() + 1)); } - commit = map(btree->commit(), [=](Void) { + commit = map(btree->commit(), [=,&ops=totalPageOps](Void) { + // Update pager ops before clearing metrics + ops += g_redwoodMetrics.pageOps(); + printf("PageOps %" PRId64 "/%" PRId64 " (%.2f%%)\n", ops, targetPageOps, ops * 100.0 / targetPageOps); printf("Committed:\n%s\n", g_redwoodMetrics.toString(true).c_str()); + // Notify the background verifier that version is committed and therefore readable committedVersions.send(v); return Void(); @@ -7202,6 +7468,7 @@ TEST_CASE("!/redwood/correctness/btree") { committedVersions = PromiseStream(); verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest); randomTask = randomReader(btree) || btree->getError(); + committedVersions.send(v); } version += versionIncrement; @@ -7209,7 +7476,7 @@ TEST_CASE("!/redwood/correctness/btree") { } // Check for errors - if (errorCount != 0) throw internal_error(); + ASSERT(errorCount == 0); } debug_printf("Waiting for outstanding commit\n"); @@ -7220,11 +7487,18 @@ TEST_CASE("!/redwood/correctness/btree") { wait(verifyTask); // Check for errors - if (errorCount != 0) throw internal_error(); + ASSERT(errorCount == 0); - wait(btree->destroyAndCheckSanity()); + // Reopen pager and btree with a remap cleanup window of 0 to reclaim all old pages + state Future closedFuture = btree->onClosed(); + btree->close(); + wait(closedFuture); + btree = new VersionedBTree(new DWALPager(pageSize, pagerFile, cacheSizeBytes, 0), pagerFile); + wait(btree->init()); - Future closedFuture = btree->onClosed(); + wait(btree->clearAllAndCheckSanity()); + + closedFuture = btree->onClosed(); btree->close(); debug_printf("Closing.\n"); wait(closedFuture); @@ -7330,7 +7604,7 @@ TEST_CASE("!/redwood/performance/set") { state int minValueSize = 100; state int maxValueSize = 500; state int minConsecutiveRun = 1; - state int maxConsecutiveRun = 10; + state int maxConsecutiveRun = 100000; state char firstKeyChar = 'a'; state char lastKeyChar = 'm'; state Version remapCleanupWindow = SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW; diff --git a/tests/restarting/from_5.0.0/StorefrontTestRestart-1.txt b/tests/restarting/from_5.0.0/StorefrontTestRestart-1.txt index ffc85ad6d2..1eda9e987c 100644 --- a/tests/restarting/from_5.0.0/StorefrontTestRestart-1.txt +++ b/tests/restarting/from_5.0.0/StorefrontTestRestart-1.txt @@ -1,11 +1,9 @@ testTitle=StorefrontTest clearAfterTest=false - - testName=Storefront - actorsPerClient=50 - itemCount=100000 - maxOrderSize=4 - - testName=SaveAndKill - restartInfoLocation=simfdb/restartInfo.ini - testDuration=10.0 +testName=Storefront +actorsPerClient=50 +itemCount=100000 +maxOrderSize=4 +testName=SaveAndKill +restartInfoLocation=simfdb/restartInfo.ini +testDuration=10.0 diff --git a/tests/restarting/from_5.0.0/StorefrontTestRestart-2.txt b/tests/restarting/from_5.0.0/StorefrontTestRestart-2.txt index 8a096605a5..9bcb361629 100644 --- a/tests/restarting/from_5.0.0/StorefrontTestRestart-2.txt +++ b/tests/restarting/from_5.0.0/StorefrontTestRestart-2.txt @@ -1,7 +1,6 @@ testTitle=StorefrontTest runSetup=false - - testName=Storefront - actorsPerClient=50 - itemCount=100000 - maxOrderSize=4 +testName=Storefront +actorsPerClient=50 +itemCount=100000 +maxOrderSize=4