Merge remote-tracking branch 'upstream/release-6.3' into dyoungworth/fixMerge1
This commit is contained in:
commit
e1b7dd0c7d
|
@ -3667,7 +3667,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
if (info.debugID.present()) {
|
||||
TraceEvent(SevInfo, "TransactionBeingTraced")
|
||||
.detail("DebugTransactionID", trLogInfo->identifier)
|
||||
.detail("ServerTraceID", info.debugID.get().first());
|
||||
.detail("ServerTraceID", info.debugID.get());
|
||||
|
||||
}
|
||||
break;
|
||||
|
@ -3703,7 +3703,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
if (trLogInfo && !trLogInfo->identifier.empty()) {
|
||||
TraceEvent(SevInfo, "TransactionBeingTraced")
|
||||
.detail("DebugTransactionID", trLogInfo->identifier)
|
||||
.detail("ServerTraceID", info.debugID.get().first());
|
||||
.detail("ServerTraceID", info.debugID.get());
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -4097,9 +4097,9 @@ Future<Void> Transaction::onError( Error const& e ) {
|
|||
|
||||
return e;
|
||||
}
|
||||
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys);
|
||||
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRange keys);
|
||||
|
||||
ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRangeRef keys, Reference<LocationInfo> locationInfo) {
|
||||
ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRange keys, Reference<LocationInfo> locationInfo) {
|
||||
loop {
|
||||
try {
|
||||
WaitMetricsRequest req(keys, StorageMetrics(), StorageMetrics());
|
||||
|
|
|
@ -2075,7 +2075,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
for (auto& server : serverTeam) {
|
||||
score += server_info[server]->teams.size();
|
||||
}
|
||||
TraceEvent("BuildServerTeams")
|
||||
TraceEvent(SevDebug, "BuildServerTeams")
|
||||
.detail("Score", score)
|
||||
.detail("BestScore", bestScore)
|
||||
.detail("TeamSize", serverTeam.size())
|
||||
|
@ -4826,6 +4826,21 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req, PromiseStream<GetMetricsListRequest> getShardMetricsList) {
|
||||
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
|
||||
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
|
||||
|
||||
if(result.isError()) {
|
||||
req.reply.sendError(result.getError());
|
||||
} else {
|
||||
GetDataDistributorMetricsReply rep;
|
||||
rep.storageMetricsList = result.get();
|
||||
req.reply.send(rep);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
|
||||
state Future<Void> dbInfoChange = db->onChange();
|
||||
if (!setDDEnabled(false, snapReq.snapUID)) {
|
||||
|
@ -5000,16 +5015,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
|||
TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
}
|
||||
when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) {
|
||||
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
|
||||
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
|
||||
if ( result.isError() ) {
|
||||
req.reply.sendError(result.getError());
|
||||
} else {
|
||||
GetDataDistributorMetricsReply rep;
|
||||
rep.storageMetricsList = result.get();
|
||||
req.reply.send(rep);
|
||||
}
|
||||
when(GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture())) {
|
||||
actors.add(ddGetMetrics(req, getShardMetricsList));
|
||||
}
|
||||
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
|
||||
actors.add(ddSnapCreate(snapReq, db));
|
||||
|
|
|
@ -105,6 +105,10 @@ public:
|
|||
// Free pageID to be used again after the commit that moves oldestVersion past v
|
||||
virtual void freePage(LogicalPageID pageID, Version v) = 0;
|
||||
|
||||
// If id is remapped, delete the original as of version v and return the page it was remapped to. The caller
|
||||
// is then responsible for referencing and deleting the returned page ID.
|
||||
virtual LogicalPageID detachRemappedPage(LogicalPageID id, Version v) = 0;
|
||||
|
||||
// Returns the latest data (regardless of version) for a page by LogicalPageID
|
||||
// The data returned will be the later of
|
||||
// - the most recent committed atomic
|
||||
|
@ -133,7 +137,7 @@ public:
|
|||
|
||||
virtual StorageBytes getStorageBytes() const = 0;
|
||||
|
||||
// Count of pages in use by the pager client
|
||||
// Count of pages in use by the pager client (including retained old page versions)
|
||||
virtual Future<int64_t> getUserPageCount() = 0;
|
||||
|
||||
// Future returned is ready when pager has been initialized from disk and is ready for reads and writes.
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/utilities/table_properties_collectors.h>
|
||||
#include "flow/flow.h"
|
||||
#include "flow/IThreadPool.h"
|
||||
|
||||
|
@ -22,14 +23,23 @@ StringRef toStringRef(rocksdb::Slice s) {
|
|||
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
|
||||
}
|
||||
|
||||
rocksdb::Options getOptions() {
|
||||
rocksdb::Options options;
|
||||
options.create_if_missing = true;
|
||||
rocksdb::ColumnFamilyOptions getCFOptions() {
|
||||
rocksdb::ColumnFamilyOptions options;
|
||||
options.level_compaction_dynamic_level_bytes = true;
|
||||
options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES);
|
||||
// Compact sstables when there's too much deleted stuff.
|
||||
options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) };
|
||||
return options;
|
||||
}
|
||||
|
||||
rocksdb::ColumnFamilyOptions getCFOptions() {
|
||||
return {};
|
||||
rocksdb::Options getOptions() {
|
||||
rocksdb::Options options({}, getCFOptions());
|
||||
options.avoid_unnecessary_blocking_io = true;
|
||||
options.create_if_missing = true;
|
||||
if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) {
|
||||
options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM);
|
||||
}
|
||||
return options;
|
||||
}
|
||||
|
||||
struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
|
@ -119,7 +129,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
|
||||
struct Reader : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
rocksdb::ReadOptions readOptions;
|
||||
|
||||
explicit Reader(DB& db) : db(db) {}
|
||||
|
||||
|
@ -141,7 +150,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
|
||||
}
|
||||
rocksdb::PinnableSlice value;
|
||||
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
|
||||
traceBatch.get().dump();
|
||||
|
@ -172,7 +181,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
|
||||
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
}
|
||||
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
|
||||
"Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
@ -195,33 +204,51 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadRangeAction& a) {
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
|
||||
Standalone<RangeResultRef> result;
|
||||
if (a.rowLimit == 0 || a.byteLimit == 0) {
|
||||
a.result.send(result);
|
||||
}
|
||||
int accumulatedBytes = 0;
|
||||
rocksdb::Status s;
|
||||
if (a.rowLimit >= 0) {
|
||||
rocksdb::ReadOptions options;
|
||||
auto endSlice = toSlice(a.keys.end);
|
||||
options.iterate_upper_bound = &endSlice;
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
|
||||
cursor->Seek(toSlice(a.keys.begin));
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit &&
|
||||
accumulatedBytes < a.byteLimit) {
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
|
||||
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
|
||||
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
|
||||
result.push_back_deep(result.arena(), kv);
|
||||
// Calling `cursor->Next()` is potentially expensive, so short-circut here just in case.
|
||||
if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) {
|
||||
break;
|
||||
}
|
||||
cursor->Next();
|
||||
}
|
||||
s = cursor->status();
|
||||
} else {
|
||||
rocksdb::ReadOptions options;
|
||||
auto beginSlice = toSlice(a.keys.begin);
|
||||
options.iterate_lower_bound = &beginSlice;
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
|
||||
cursor->SeekForPrev(toSlice(a.keys.end));
|
||||
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
|
||||
cursor->Prev();
|
||||
}
|
||||
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit &&
|
||||
accumulatedBytes < a.byteLimit) {
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin) {
|
||||
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
|
||||
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
|
||||
result.push_back_deep(result.arena(), kv);
|
||||
// Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case.
|
||||
if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) {
|
||||
break;
|
||||
}
|
||||
cursor->Prev();
|
||||
}
|
||||
s = cursor->status();
|
||||
}
|
||||
auto s = cursor->status();
|
||||
|
||||
if (!s.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( PEEK_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PEEK_RESET_INTERVAL = 20.0;
|
||||
init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0;
|
||||
init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true;
|
||||
init( PEEK_STATS_INTERVAL, 10.0 );
|
||||
init( PEEK_STATS_INTERVAL, 10.0 );
|
||||
init( PEEK_STATS_SLOW_AMOUNT, 0 );
|
||||
init( PEEK_STATS_SLOW_RATIO, 0.5 );
|
||||
|
||||
|
@ -236,7 +236,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false;
|
||||
init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600;
|
||||
init( DD_ENABLE_VERBOSE_TRACING, false ); if( randomize && BUGGIFY ) DD_ENABLE_VERBOSE_TRACING = true;
|
||||
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
|
||||
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
|
||||
init( DD_SS_ALLOWED_VERSIONLAG, 200000000 ); if( randomize && BUGGIFY ) { DD_SS_FAILURE_VERSIONLAG = deterministicRandom()->randomInt(15000000, 500000000); DD_SS_ALLOWED_VERSIONLAG = 0.75 * DD_SS_FAILURE_VERSIONLAG; }
|
||||
init( DD_SS_STUCK_TIME_LIMIT, 300.0 ); if( randomize && BUGGIFY ) { DD_SS_STUCK_TIME_LIMIT = 200.0 + deterministicRandom()->random01() * 100.0; }
|
||||
|
||||
|
@ -308,6 +308,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
// KeyValueStoreMemory
|
||||
init( REPLACE_CONTENTS_BYTES, 1e5 );
|
||||
|
||||
// KeyValueStoreRocksDB
|
||||
init( ROCKSDB_BACKGROUND_PARALLELISM, 0 );
|
||||
init( ROCKSDB_MEMTABLE_BYTES, 512 * 1024 * 1024 );
|
||||
|
||||
// Leader election
|
||||
bool longLeaderElection = randomize && BUGGIFY;
|
||||
init( MAX_NOTIFICATIONS, 100000 );
|
||||
|
@ -573,7 +577,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS, 10 );
|
||||
init( TRACE_LOG_PING_TIMEOUT_SECONDS, 5.0 );
|
||||
init( MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 10.0 );
|
||||
init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 );
|
||||
init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 );
|
||||
init( DBINFO_FAILED_DELAY, 1.0 );
|
||||
|
||||
// Test harness
|
||||
|
@ -650,7 +654,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( REDWOOD_REMAP_CLEANUP_WINDOW, 50 );
|
||||
init( REDWOOD_REMAP_CLEANUP_LAG, 0.1 );
|
||||
init( REDWOOD_LOGGING_INTERVAL, 5.0 );
|
||||
|
||||
|
||||
// Server request latency measurement
|
||||
init( LATENCY_SAMPLE_SIZE, 100000 );
|
||||
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
|
||||
|
|
|
@ -243,6 +243,10 @@ public:
|
|||
// KeyValueStoreMemory
|
||||
int64_t REPLACE_CONTENTS_BYTES;
|
||||
|
||||
// KeyValueStoreRocksDB
|
||||
int ROCKSDB_BACKGROUND_PARALLELISM;
|
||||
int64_t ROCKSDB_MEMTABLE_BYTES;
|
||||
|
||||
// Leader election
|
||||
int MAX_NOTIFICATIONS;
|
||||
int MIN_NOTIFICATIONS;
|
||||
|
|
|
@ -79,6 +79,89 @@ ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool
|
|||
return Void();
|
||||
}
|
||||
|
||||
struct ProxyStats {
|
||||
CounterCollection cc;
|
||||
Counter txnRequestIn, txnRequestOut, txnRequestErrors;
|
||||
Counter txnStartIn, txnStartOut, txnStartBatch;
|
||||
Counter txnSystemPriorityStartIn, txnSystemPriorityStartOut;
|
||||
Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut;
|
||||
Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
|
||||
Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess, txnCommitErrors;
|
||||
Counter txnConflicts;
|
||||
Counter txnThrottled;
|
||||
Counter commitBatchIn, commitBatchOut;
|
||||
Counter mutationBytes;
|
||||
Counter mutations;
|
||||
Counter conflictRanges;
|
||||
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
|
||||
Version lastCommitVersionAssigned;
|
||||
|
||||
LatencySample commitLatencySample;
|
||||
LatencySample grvLatencySample;
|
||||
|
||||
LatencyBands commitLatencyBands;
|
||||
LatencyBands grvLatencyBands;
|
||||
|
||||
Future<Void> logger;
|
||||
|
||||
int recentRequests;
|
||||
Deque<int> requestBuckets;
|
||||
double lastBucketBegin;
|
||||
double bucketInterval;
|
||||
|
||||
void updateRequestBuckets() {
|
||||
while(now() - lastBucketBegin > bucketInterval) {
|
||||
lastBucketBegin += bucketInterval;
|
||||
recentRequests -= requestBuckets.front();
|
||||
requestBuckets.pop_front();
|
||||
requestBuckets.push_back(0);
|
||||
}
|
||||
}
|
||||
|
||||
void addRequest() {
|
||||
updateRequestBuckets();
|
||||
++recentRequests;
|
||||
++requestBuckets.back();
|
||||
}
|
||||
|
||||
int getRecentRequests() {
|
||||
updateRequestBuckets();
|
||||
return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now()));
|
||||
}
|
||||
|
||||
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
|
||||
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
|
||||
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
|
||||
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
|
||||
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
|
||||
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
|
||||
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
|
||||
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
|
||||
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
|
||||
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
|
||||
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
|
||||
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
|
||||
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
|
||||
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
|
||||
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
|
||||
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
|
||||
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
|
||||
lastCommitVersionAssigned(0),
|
||||
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
|
||||
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
|
||||
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
|
||||
specialCounter(cc, "Version", [pVersion](){return *pVersion; });
|
||||
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
|
||||
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
|
||||
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
|
||||
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
|
||||
requestBuckets.push_back(0);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct TransactionRateInfo {
|
||||
double rate;
|
||||
double limit;
|
||||
|
|
|
@ -185,7 +185,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
state int retries = 0;
|
||||
state double numOps = 0;
|
||||
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
|
||||
TraceEvent(delayTime > 5 ? SevWarnAlways : SevInfo, "FastRestoreApplierClearRangeMutationsStart", applierID)
|
||||
TraceEvent(delayTime > 5 ? SevWarnAlways : SevDebug, "FastRestoreApplierClearRangeMutationsStart", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Ranges", ranges.size())
|
||||
.detail("DelayTime", delayTime);
|
||||
|
@ -296,7 +296,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
for (auto& key : incompleteStagingKeys) {
|
||||
if (!fValues[i].get().present()) { // Key not exist in DB
|
||||
// if condition: fValues[i].Valid() && fValues[i].isReady() && !fValues[i].isError() &&
|
||||
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
|
||||
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
|
||||
.suppressFor(5.0)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Key", key.first)
|
||||
|
@ -304,7 +304,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
.detail("PendingMutations", key.second->second.pendingMutations.size())
|
||||
.detail("StagingKeyType", getTypeString(key.second->second.type));
|
||||
for (auto& vm : key.second->second.pendingMutations) {
|
||||
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
|
||||
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
|
||||
.detail("PendingMutationVersion", vm.first.toString())
|
||||
.detail("PendingMutation", vm.second.toString());
|
||||
}
|
||||
|
|
|
@ -300,7 +300,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
state std::vector<RestoreFileFR> logFiles;
|
||||
state std::vector<RestoreFileFR> allFiles;
|
||||
state Version minRangeVersion = MAX_VERSION;
|
||||
state ActorCollection actors(false);
|
||||
state Future<Void> error = actorCollection(self->addActor.getFuture());
|
||||
|
||||
self->initBackupContainer(request.url);
|
||||
|
||||
|
@ -356,7 +356,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
}
|
||||
}
|
||||
|
||||
actors.add(monitorFinishedVersion(self, request));
|
||||
self->addActor.send(monitorFinishedVersion(self, request));
|
||||
state std::vector<VersionBatch>::iterator versionBatch = versionBatches.begin();
|
||||
for (; versionBatch != versionBatches.end(); versionBatch++) {
|
||||
while (self->runningVersionBatches.get() >= SERVER_KNOBS->FASTRESTORE_VB_PARALLELISM && !releaseVBOutOfOrder) {
|
||||
|
@ -378,7 +378,11 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
|
|||
wait(delay(SERVER_KNOBS->FASTRESTORE_VB_LAUNCH_DELAY));
|
||||
}
|
||||
|
||||
wait(waitForAll(fBatches));
|
||||
try {
|
||||
wait(waitForAll(fBatches) || error);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreControllerDispatchVersionBatchesUnexpectedError").error(e);
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreController").detail("RestoreToVersion", request.targetVersion);
|
||||
return request.targetVersion;
|
||||
|
|
|
@ -149,6 +149,10 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
|
|||
|
||||
std::map<UID, double> rolesHeartBeatTime; // Key: role id; Value: most recent time controller receives heart beat
|
||||
|
||||
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
|
||||
// addActor is used to create the actorCollection when the RestoreController is created
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
|
||||
void addref() { return ReferenceCounted<RestoreControllerData>::addref(); }
|
||||
void delref() { return ReferenceCounted<RestoreControllerData>::delref(); }
|
||||
|
||||
|
|
|
@ -92,7 +92,18 @@ std::string toString(LogicalPageID id) {
|
|||
if (id == invalidLogicalPageID) {
|
||||
return "LogicalPageID{invalid}";
|
||||
}
|
||||
return format("LogicalPageID{%" PRId64 "}", id);
|
||||
return format("LogicalPageID{%u}", id);
|
||||
}
|
||||
|
||||
std::string toString(Version v) {
|
||||
if (v == invalidVersion) {
|
||||
return "invalidVersion";
|
||||
}
|
||||
return format("@%" PRId64, v);
|
||||
}
|
||||
|
||||
std::string toString(bool b) {
|
||||
return b ? "true" : "false";
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
@ -136,6 +147,11 @@ std::string toString(const Optional<T>& o) {
|
|||
return "<not present>";
|
||||
}
|
||||
|
||||
template <typename F, typename S>
|
||||
std::string toString(const std::pair<F, S>& o) {
|
||||
return format("{%s, %s}", toString(o.first).c_str(), toString(o.second).c_str());
|
||||
}
|
||||
|
||||
// A FIFO queue of T stored as a linked list of pages.
|
||||
// Main operations are pop(), pushBack(), pushFront(), and flush().
|
||||
//
|
||||
|
@ -765,6 +781,8 @@ struct RedwoodMetrics {
|
|||
unsigned int lazyClearRequeueExt;
|
||||
unsigned int lazyClearFree;
|
||||
unsigned int lazyClearFreeExt;
|
||||
unsigned int forceUpdate;
|
||||
unsigned int detachChild;
|
||||
double buildStoredPct;
|
||||
double buildFillPct;
|
||||
unsigned int buildItemCount;
|
||||
|
@ -797,6 +815,12 @@ struct RedwoodMetrics {
|
|||
unsigned int btreeLeafPreload;
|
||||
unsigned int btreeLeafPreloadExt;
|
||||
|
||||
// Return number of pages read or written, from cache or disk
|
||||
unsigned int pageOps() const {
|
||||
// All page reads are either a cache hit, probe hit, or a disk read
|
||||
return pagerDiskWrite + pagerDiskRead + pagerCacheHit + pagerProbeHit;
|
||||
}
|
||||
|
||||
double startTime;
|
||||
|
||||
Level& level(unsigned int level) {
|
||||
|
@ -807,9 +831,9 @@ struct RedwoodMetrics {
|
|||
return levels[level - 1];
|
||||
}
|
||||
|
||||
// This will populate a trace event and/or a string with Redwood metrics. The string is a
|
||||
// reasonably well formatted page of information
|
||||
void getFields(TraceEvent* e, std::string* s = nullptr) {
|
||||
// This will populate a trace event and/or a string with Redwood metrics.
|
||||
// The string is a reasonably well formatted page of information
|
||||
void getFields(TraceEvent* e, std::string* s = nullptr, bool skipZeroes = false) {
|
||||
std::pair<const char*, unsigned int> metrics[] = { { "BTreePreload", btreeLeafPreload },
|
||||
{ "BTreePreloadExt", btreeLeafPreloadExt },
|
||||
{ "", 0 },
|
||||
|
@ -837,21 +861,26 @@ struct RedwoodMetrics {
|
|||
{ "PagerRemapCopy", pagerRemapCopy },
|
||||
{ "PagerRemapSkip", pagerRemapSkip } };
|
||||
double elapsed = now() - startTime;
|
||||
for (auto& m : metrics) {
|
||||
if (*m.first == '\0') {
|
||||
if (s != nullptr) {
|
||||
*s += "\n";
|
||||
}
|
||||
} else {
|
||||
if (s != nullptr) {
|
||||
*s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed));
|
||||
}
|
||||
if (e != nullptr) {
|
||||
|
||||
if (e != nullptr) {
|
||||
for (auto& m : metrics) {
|
||||
char c = m.first[0];
|
||||
if(c != 0 && (!skipZeroes || m.second != 0) ) {
|
||||
e->detail(m.first, m.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(s != nullptr) {
|
||||
for (auto& m : metrics) {
|
||||
if (*m.first == '\0') {
|
||||
*s += "\n";
|
||||
} else if(!skipZeroes || m.second != 0) {
|
||||
*s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < btreeLevels; ++i) {
|
||||
auto& level = levels[i];
|
||||
std::pair<const char*, unsigned int> metrics[] = {
|
||||
|
@ -869,37 +898,44 @@ struct RedwoodMetrics {
|
|||
{ "LazyClear", level.lazyClearFree },
|
||||
{ "LazyClearExt", level.lazyClearFreeExt },
|
||||
{ "", 0 },
|
||||
{ "ForceUpdate", level.forceUpdate },
|
||||
{ "DetachChild", level.detachChild },
|
||||
{ "", 0 },
|
||||
{ "-BldAvgCount", level.pageBuild ? level.buildItemCount / level.pageBuild : 0 },
|
||||
{ "-BldAvgFillPct", level.pageBuild ? level.buildFillPct / level.pageBuild * 100 : 0 },
|
||||
{ "-BldAvgStoredPct", level.pageBuild ? level.buildStoredPct / level.pageBuild * 100 : 0 },
|
||||
{ "", 0 },
|
||||
{ "-ModAvgCount", level.pageModify ? level.modifyItemCount / level.pageModify : 0 },
|
||||
{ "-ModAvgFillPct", level.pageModify ? level.modifyFillPct / level.pageModify * 100 : 0 },
|
||||
{ "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 }
|
||||
{ "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 },
|
||||
{ "", 0 },
|
||||
};
|
||||
|
||||
if(e != nullptr) {
|
||||
for (auto& m : metrics) {
|
||||
char c = m.first[0];
|
||||
if(c != 0 && (!skipZeroes || m.second != 0) ) {
|
||||
e->detail(format("L%d%s", i + 1, m.first + (c == '-' ? 1 : 0)), m.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s != nullptr) {
|
||||
*s += format("\nLevel %d\n\t", i + 1);
|
||||
}
|
||||
for (auto& m : metrics) {
|
||||
const char* name = m.first;
|
||||
bool rate = elapsed != 0;
|
||||
if (*name == '-') {
|
||||
++name;
|
||||
rate = false;
|
||||
}
|
||||
|
||||
if (*name == '\0') {
|
||||
if (s != nullptr) {
|
||||
for (auto& m : metrics) {
|
||||
const char* name = m.first;
|
||||
bool rate = elapsed != 0;
|
||||
if (*name == '-') {
|
||||
++name;
|
||||
rate = false;
|
||||
}
|
||||
|
||||
if (*name == '\0') {
|
||||
*s += "\n\t";
|
||||
}
|
||||
} else {
|
||||
if (s != nullptr) {
|
||||
} else if(!skipZeroes || m.second != 0) {
|
||||
*s += format("%-15s %8u %8u/s ", name, m.second, rate ? int(m.second / elapsed) : 0);
|
||||
}
|
||||
if (e != nullptr) {
|
||||
e->detail(format("L%d%s", i + 1, name), m.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1124,22 +1160,32 @@ public:
|
|||
};
|
||||
|
||||
struct RemappedPage {
|
||||
RemappedPage() : version(invalidVersion) {}
|
||||
RemappedPage(Version v, LogicalPageID o, LogicalPageID n) : version(v), originalPageID(o), newPageID(n) {}
|
||||
enum Type { NONE = 'N', REMAP = 'R', FREE = 'F', DETACH = 'D' };
|
||||
RemappedPage(Version v = invalidVersion, LogicalPageID o = invalidLogicalPageID, LogicalPageID n = invalidLogicalPageID) : version(v), originalPageID(o), newPageID(n) {}
|
||||
|
||||
Version version;
|
||||
LogicalPageID originalPageID;
|
||||
LogicalPageID newPageID;
|
||||
|
||||
bool isFree() const {
|
||||
return newPageID == invalidLogicalPageID;
|
||||
static Type getTypeOf(LogicalPageID newPageID) {
|
||||
if(newPageID == invalidLogicalPageID) {
|
||||
return FREE;
|
||||
}
|
||||
if(newPageID == 0) {
|
||||
return DETACH;
|
||||
}
|
||||
return REMAP;
|
||||
}
|
||||
|
||||
Type getType() const {
|
||||
return getTypeOf(newPageID);
|
||||
}
|
||||
|
||||
bool operator<(const RemappedPage& rhs) { return version < rhs.version; }
|
||||
|
||||
std::string toString() const {
|
||||
return format("RemappedPage(%s -> %s @%" PRId64 "}", ::toString(originalPageID).c_str(),
|
||||
::toString(newPageID).c_str(), version);
|
||||
return format("RemappedPage(%c: %s -> %s %s}", getType(), ::toString(originalPageID).c_str(),
|
||||
::toString(newPageID).c_str(), ::toString(version).c_str());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1484,6 +1530,35 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
LogicalPageID detachRemappedPage(LogicalPageID pageID, Version v) override {
|
||||
auto i = remappedPages.find(pageID);
|
||||
if(i == remappedPages.end()) {
|
||||
// Page is not remapped
|
||||
return invalidLogicalPageID;
|
||||
}
|
||||
|
||||
// Get the page that id was most recently remapped to
|
||||
auto iLast = i->second.rbegin();
|
||||
LogicalPageID newID = iLast->second;
|
||||
ASSERT(RemappedPage::getTypeOf(newID) == RemappedPage::REMAP);
|
||||
|
||||
// If the last change remap was also at v then change the remap to a delete, as it's essentially
|
||||
// the same as the original page being deleted at that version and newID being used from then on.
|
||||
if(iLast->first == v) {
|
||||
debug_printf("DWALPager(%s) op=detachDelete originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(),
|
||||
toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion);
|
||||
iLast->second = invalidLogicalPageID;
|
||||
remapQueue.pushBack(RemappedPage{ v, pageID, invalidLogicalPageID });
|
||||
} else {
|
||||
debug_printf("DWALPager(%s) op=detach originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(),
|
||||
toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion);
|
||||
// Mark id as converted to its last remapped location as of v
|
||||
i->second[v] = 0;
|
||||
remapQueue.pushBack(RemappedPage{ v, pageID, 0 });
|
||||
}
|
||||
return newID;
|
||||
}
|
||||
|
||||
void freePage(LogicalPageID pageID, Version v) override {
|
||||
// If pageID has been remapped, then it can't be freed until all existing remaps for that page have been undone,
|
||||
// so queue it for later deletion
|
||||
|
@ -1588,13 +1663,13 @@ public:
|
|||
auto j = i->second.upper_bound(v);
|
||||
if (j != i->second.begin()) {
|
||||
--j;
|
||||
debug_printf("DWALPager(%s) read %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(),
|
||||
debug_printf("DWALPager(%s) op=readAtVersionRemapped %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(),
|
||||
v, toString(j->second).c_str());
|
||||
pageID = j->second;
|
||||
ASSERT(pageID != invalidLogicalPageID);
|
||||
}
|
||||
} else {
|
||||
debug_printf("DWALPager(%s) read %s @%" PRId64 " (not remapped)\n", filename.c_str(),
|
||||
debug_printf("DWALPager(%s) op=readAtVersionNotRemapped %s @%" PRId64 " (not remapped)\n", filename.c_str(),
|
||||
toString(pageID).c_str(), v);
|
||||
}
|
||||
|
||||
|
@ -1623,29 +1698,126 @@ public:
|
|||
return std::min(pLastCommittedHeader->oldestVersion, snapshots.front().version);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> remapCopyAndFree(DWALPager* self, RemappedPage p, VersionToPageMapT *m, VersionToPageMapT::iterator i) {
|
||||
debug_printf("DWALPager(%s) remapCleanup copyAndFree %s\n", self->filename.c_str(), p.toString().c_str());
|
||||
ACTOR static Future<Void> removeRemapEntry(DWALPager* self, RemappedPage p, Version oldestRetainedVersion) {
|
||||
// Get iterator to the versioned page map entry for the original page
|
||||
state PageToVersionedMapT::iterator iPageMapPair = self->remappedPages.find(p.originalPageID);
|
||||
// The iterator must be valid and not empty and its first page map entry must match p's version
|
||||
ASSERT(iPageMapPair != self->remappedPages.end());
|
||||
ASSERT(!iPageMapPair->second.empty());
|
||||
state VersionToPageMapT::iterator iVersionPagePair = iPageMapPair->second.find(p.version);
|
||||
ASSERT(iVersionPagePair != iPageMapPair->second.end());
|
||||
|
||||
// Read the data from the page that the original was mapped to
|
||||
Reference<IPage> data = wait(self->readPage(p.newPageID, false));
|
||||
RemappedPage::Type firstType = p.getType();
|
||||
state RemappedPage::Type secondType;
|
||||
bool secondAfterOldestRetainedVersion = false;
|
||||
state bool deleteAtSameVersion = false;
|
||||
if(p.newPageID == iVersionPagePair->second) {
|
||||
auto nextEntry = iVersionPagePair;
|
||||
++nextEntry;
|
||||
if(nextEntry == iPageMapPair->second.end()) {
|
||||
secondType = RemappedPage::NONE;
|
||||
} else {
|
||||
secondType = RemappedPage::getTypeOf(nextEntry->second);
|
||||
secondAfterOldestRetainedVersion = nextEntry->first >= oldestRetainedVersion;
|
||||
}
|
||||
} else {
|
||||
ASSERT(iVersionPagePair->second == invalidLogicalPageID);
|
||||
secondType = RemappedPage::FREE;
|
||||
deleteAtSameVersion = true;
|
||||
}
|
||||
ASSERT(firstType == RemappedPage::REMAP || secondType == RemappedPage::NONE);
|
||||
|
||||
// Write the data to the original page so it can be read using its original pageID
|
||||
self->updatePage(p.originalPageID, data);
|
||||
++g_redwoodMetrics.pagerRemapCopy;
|
||||
// Scenarios and actions to take:
|
||||
//
|
||||
// The first letter (firstType) is the type of the entry just popped from the remap queue.
|
||||
// The second letter (secondType) is the type of the next item in the queue for the same
|
||||
// original page ID, if present. If not present, secondType will be NONE.
|
||||
//
|
||||
// Since the next item can be arbitrarily ahead in the queue, secondType is determined by
|
||||
// looking at the remappedPages structure.
|
||||
//
|
||||
// R == Remap F == Free D == Detach | == oldestRetaineedVersion
|
||||
//
|
||||
// R R | free new ID
|
||||
// R F | free new ID if R and D are at different versions
|
||||
// R D | do nothing
|
||||
// R | R copy new to original ID, free new ID
|
||||
// R | F copy new to original ID, free new ID
|
||||
// R | D copy new to original ID
|
||||
// R | copy new to original ID, free new ID
|
||||
// F | free original ID
|
||||
// D | free original ID
|
||||
//
|
||||
// Note that
|
||||
//
|
||||
// Special case: Page is detached while it is being read in remapCopyAndFree()
|
||||
// Initial state: R |
|
||||
// Start remapCopyAndFree(), intending to copy new, ID to originalID and free newID
|
||||
// New state: R | D
|
||||
// Read of newID completes.
|
||||
// Copy new contents over original, do NOT free new ID
|
||||
// Later popped state: D |
|
||||
// free original ID
|
||||
//
|
||||
state bool freeNewID = (firstType == RemappedPage::REMAP && secondType != RemappedPage::DETACH && !deleteAtSameVersion);
|
||||
state bool copyNewToOriginal = (firstType == RemappedPage::REMAP && (secondAfterOldestRetainedVersion || secondType == RemappedPage::NONE));
|
||||
state bool freeOriginalID = (firstType == RemappedPage::FREE || firstType == RemappedPage::DETACH);
|
||||
|
||||
// Now that the page data has been copied to the original page, the versioned page map entry is no longer
|
||||
// needed and the new page ID can be freed as of the next commit.
|
||||
m->erase(i);
|
||||
self->freeUnmappedPage(p.newPageID, 0);
|
||||
++g_redwoodMetrics.pagerRemapFree;
|
||||
debug_printf("DWALPager(%s) remapCleanup %s secondType=%c mapEntry=%s oldestRetainedVersion=%" PRId64 " \n",
|
||||
self->filename.c_str(), p.toString().c_str(), secondType, ::toString(*iVersionPagePair).c_str(), oldestRetainedVersion);
|
||||
|
||||
if(copyNewToOriginal) {
|
||||
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
|
||||
|
||||
// Read the data from the page that the original was mapped to
|
||||
Reference<IPage> data = wait(self->readPage(p.newPageID, false, true));
|
||||
|
||||
// Write the data to the original page so it can be read using its original pageID
|
||||
self->updatePage(p.originalPageID, data);
|
||||
++g_redwoodMetrics.pagerRemapCopy;
|
||||
} else if (firstType == RemappedPage::REMAP) {
|
||||
++g_redwoodMetrics.pagerRemapSkip;
|
||||
}
|
||||
|
||||
// Now that the page contents have been copied to the original page, if the corresponding map entry
|
||||
// represented the remap and there wasn't a delete later in the queue at p for the same version then
|
||||
// erase the entry.
|
||||
if(!deleteAtSameVersion) {
|
||||
debug_printf("DWALPager(%s) remapCleanup deleting map entry %s\n", self->filename.c_str(), p.toString().c_str());
|
||||
// Erase the entry and set iVersionPagePair to the next entry or end
|
||||
iVersionPagePair = iPageMapPair->second.erase(iVersionPagePair);
|
||||
|
||||
// If the map is now empty, delete it
|
||||
if(iPageMapPair->second.empty()) {
|
||||
debug_printf("DWALPager(%s) remapCleanup deleting empty map %s\n", self->filename.c_str(), p.toString().c_str());
|
||||
self->remappedPages.erase(iPageMapPair);
|
||||
} else if(freeNewID && secondType == RemappedPage::NONE && iVersionPagePair != iPageMapPair->second.end() && RemappedPage::getTypeOf(iVersionPagePair->second) == RemappedPage::DETACH) {
|
||||
// If we intend to free the new ID and there was no map entry, one could have been added during the wait above.
|
||||
// If so, and if it was a detach operation, then we can't free the new page ID as its lifetime will be managed
|
||||
// by the client starting at some later version.
|
||||
freeNewID = false;
|
||||
}
|
||||
}
|
||||
|
||||
if(freeNewID) {
|
||||
debug_printf("DWALPager(%s) remapCleanup freeNew %s\n", self->filename.c_str(), p.toString().c_str());
|
||||
self->freeUnmappedPage(p.newPageID, 0);
|
||||
++g_redwoodMetrics.pagerRemapFree;
|
||||
}
|
||||
|
||||
if(freeOriginalID) {
|
||||
debug_printf("DWALPager(%s) remapCleanup freeOriginal %s\n", self->filename.c_str(), p.toString().c_str());
|
||||
self->freeUnmappedPage(p.originalPageID, 0);
|
||||
++g_redwoodMetrics.pagerRemapFree;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> remapCleanup(DWALPager* self) {
|
||||
state ActorCollection copies(true);
|
||||
state ActorCollection tasks(true);
|
||||
state Promise<Void> signal;
|
||||
copies.add(signal.getFuture());
|
||||
tasks.add(signal.getFuture());
|
||||
|
||||
self->remapCleanupStop = false;
|
||||
|
||||
|
@ -1654,8 +1826,7 @@ public:
|
|||
state Version oldestRetainedVersion = self->effectiveOldestVersion();
|
||||
|
||||
// Cutoff is the version we can pop to
|
||||
state RemappedPage cutoff;
|
||||
cutoff.version = oldestRetainedVersion - self->remapCleanupWindow;
|
||||
state RemappedPage cutoff(oldestRetainedVersion - self->remapCleanupWindow);
|
||||
|
||||
// Minimum version we must pop to before obeying stop command.
|
||||
state Version minStopVersion = cutoff.version - (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG);
|
||||
|
@ -1663,46 +1834,15 @@ public:
|
|||
loop {
|
||||
state Optional<RemappedPage> p = wait(self->remapQueue.pop(cutoff));
|
||||
debug_printf("DWALPager(%s) remapCleanup popped %s\n", self->filename.c_str(), ::toString(p).c_str());
|
||||
|
||||
// Stop if we have reached the cutoff version, which is the start of the cleanup coalescing window
|
||||
if (!p.present()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Get iterator to the versioned page map entry for the original page
|
||||
auto iPageMapPair = self->remappedPages.find(p.get().originalPageID);
|
||||
// The iterator must be valid and not empty and its first page map entry must match p's version
|
||||
ASSERT(iPageMapPair != self->remappedPages.end());
|
||||
ASSERT(!iPageMapPair->second.empty());
|
||||
auto iVersionPagePair = iPageMapPair->second.begin();
|
||||
ASSERT(iVersionPagePair->first == p.get().version);
|
||||
|
||||
// If this is a free page entry then free the original page ID
|
||||
if(p.get().isFree()) {
|
||||
debug_printf("DWALPager(%s) remapCleanup free %s\n", self->filename.c_str(),
|
||||
p.get().toString().c_str());
|
||||
self->freeUnmappedPage(p.get().originalPageID, 0);
|
||||
++g_redwoodMetrics.pagerRemapFree;
|
||||
|
||||
// There can't be any more entries in the page map after this one so verify that
|
||||
// the map size is 1 and erase the map for p's original page ID.
|
||||
ASSERT(iPageMapPair->second.size() == 1);
|
||||
self->remappedPages.erase(iPageMapPair);
|
||||
}
|
||||
else {
|
||||
// If there is no next page map entry or there is but it is after the oldest retained version
|
||||
// then p must be copied to unmap it.
|
||||
auto iNextVersionPagePair = iVersionPagePair;
|
||||
++iNextVersionPagePair;
|
||||
if(iNextVersionPagePair == iPageMapPair->second.end() || iNextVersionPagePair->first > oldestRetainedVersion) {
|
||||
// Copy the remapped page to the original so it can be freed.
|
||||
copies.add(remapCopyAndFree(self, p.get(), &iPageMapPair->second, iVersionPagePair));
|
||||
}
|
||||
else {
|
||||
debug_printf("DWALPager(%s) remapCleanup skipAndFree %s\n", self->filename.c_str(), p.get().toString().c_str());
|
||||
self->freeUnmappedPage(p.get().newPageID, 0);
|
||||
++g_redwoodMetrics.pagerRemapFree;
|
||||
++g_redwoodMetrics.pagerRemapSkip;
|
||||
iPageMapPair->second.erase(iVersionPagePair);
|
||||
}
|
||||
Future<Void> task = removeRemapEntry(self, p.get(), oldestRetainedVersion);
|
||||
if(!task.isReady()) {
|
||||
tasks.add(task);
|
||||
}
|
||||
|
||||
// If the stop flag is set and we've reached the minimum stop version according the the allowed lag then stop.
|
||||
|
@ -1713,7 +1853,7 @@ public:
|
|||
|
||||
debug_printf("DWALPager(%s) remapCleanup stopped (stop=%d)\n", self->filename.c_str(), self->remapCleanupStop);
|
||||
signal.send(Void());
|
||||
wait(copies.getResult());
|
||||
wait(tasks.getResult());
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1889,8 +2029,7 @@ public:
|
|||
Future<int64_t> getUserPageCount() override {
|
||||
return map(getUserPageCount_cleanup(this), [=](Void) {
|
||||
int64_t userPages = pHeader->pageCount - 2 - freeList.numPages - freeList.numEntries -
|
||||
delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages
|
||||
- remapQueue.numEntries;
|
||||
delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages;
|
||||
|
||||
debug_printf("DWALPager(%s) userPages=%" PRId64 " totalPageCount=%" PRId64 " freeQueuePages=%" PRId64
|
||||
" freeQueueCount=%" PRId64 " delayedFreeQueuePages=%" PRId64 " delayedFreeQueueCount=%" PRId64
|
||||
|
@ -2871,6 +3010,38 @@ public:
|
|||
|
||||
typedef FIFOQueue<LazyClearQueueEntry> LazyClearQueueT;
|
||||
|
||||
struct ParentInfo {
|
||||
ParentInfo() {
|
||||
count = 0;
|
||||
bits = 0;
|
||||
}
|
||||
void clear() {
|
||||
count = 0;
|
||||
bits = 0;
|
||||
}
|
||||
|
||||
static uint32_t mask(LogicalPageID id) {
|
||||
return 1 << (id & 31);
|
||||
}
|
||||
|
||||
void pageUpdated(LogicalPageID child) {
|
||||
auto m = mask(child);
|
||||
if((bits & m) == 0) {
|
||||
bits |= m;
|
||||
++count;
|
||||
}
|
||||
}
|
||||
|
||||
bool maybeUpdated(LogicalPageID child) {
|
||||
return (mask(child) & bits) != 0;
|
||||
}
|
||||
|
||||
uint32_t bits;
|
||||
int count;
|
||||
};
|
||||
|
||||
typedef std::unordered_map<LogicalPageID, ParentInfo> ParentInfoMapT;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
struct MetaKey {
|
||||
static constexpr int FORMAT_VERSION = 8;
|
||||
|
@ -2923,8 +3094,8 @@ public:
|
|||
// durable once the following call to commit() returns
|
||||
void set(KeyValueRef keyValue) override {
|
||||
++g_redwoodMetrics.opSet;
|
||||
++g_redwoodMetrics.opSetKeyBytes += keyValue.key.size();
|
||||
++g_redwoodMetrics.opSetValueBytes += keyValue.value.size();
|
||||
g_redwoodMetrics.opSetKeyBytes += keyValue.key.size();
|
||||
g_redwoodMetrics.opSetValueBytes += keyValue.value.size();
|
||||
m_pBuffer->insert(keyValue.key).mutation().setBoundaryValue(m_pBuffer->copyToArena(keyValue.value));
|
||||
}
|
||||
|
||||
|
@ -3022,7 +3193,7 @@ public:
|
|||
// If this page is height 2, then the children are leaves so free them directly
|
||||
if (btPage.height == 2) {
|
||||
debug_printf("LazyClear: freeing child %s\n", toString(btChildPageID).c_str());
|
||||
self->freeBtreePage(btChildPageID, v);
|
||||
self->freeBTreePage(btChildPageID, v);
|
||||
freedPages += btChildPageID.size();
|
||||
metrics.lazyClearFree += 1;
|
||||
metrics.lazyClearFreeExt += (btChildPageID.size() - 1);
|
||||
|
@ -3041,7 +3212,7 @@ public:
|
|||
|
||||
// Free the page, now that its children have either been freed or queued
|
||||
debug_printf("LazyClear: freeing queue entry %s\n", toString(entry.pageID).c_str());
|
||||
self->freeBtreePage(entry.pageID, v);
|
||||
self->freeBTreePage(entry.pageID, v);
|
||||
freedPages += entry.pageID.size();
|
||||
metrics.lazyClearFree += 1;
|
||||
metrics.lazyClearFreeExt += entry.pageID.size() - 1;
|
||||
|
@ -3146,7 +3317,7 @@ public:
|
|||
return commit_impl(this);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> destroyAndCheckSanity_impl(VersionedBTree* self) {
|
||||
ACTOR static Future<Void> clearAllAndCheckSanity_impl(VersionedBTree* self) {
|
||||
ASSERT(g_network->isSimulated());
|
||||
|
||||
debug_printf("Clearing tree.\n");
|
||||
|
@ -3191,7 +3362,7 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> destroyAndCheckSanity() { return destroyAndCheckSanity_impl(this); }
|
||||
Future<Void> clearAllAndCheckSanity() { return clearAllAndCheckSanity_impl(this); }
|
||||
|
||||
private:
|
||||
// Represents a change to a single key - set, clear, or atomic op
|
||||
|
@ -3412,6 +3583,8 @@ private:
|
|||
Future<Void> m_init;
|
||||
std::string m_name;
|
||||
int m_blockSize;
|
||||
std::unordered_map<LogicalPageID, ParentInfo> parents;
|
||||
ParentInfoMapT childUpdateTracker;
|
||||
|
||||
// MetaKey changes size so allocate space for it to expand into
|
||||
union {
|
||||
|
@ -3603,7 +3776,7 @@ private:
|
|||
// must be rewritten anyway to count for the change in child count or child links.
|
||||
// Free the old IDs, but only once (before the first output record is added).
|
||||
if (records.empty()) {
|
||||
self->freeBtreePage(previousID, v);
|
||||
self->freeBTreePage(previousID, v);
|
||||
}
|
||||
for (p = 0; p < pages.size(); ++p) {
|
||||
LogicalPageID id = wait(self->m_pager->newPageID());
|
||||
|
@ -3771,7 +3944,7 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
void freeBtreePage(BTreePageIDRef btPageID, Version v) {
|
||||
void freeBTreePage(BTreePageIDRef btPageID, Version v) {
|
||||
// Free individual pages at v
|
||||
for (LogicalPageID id : btPageID) {
|
||||
m_pager->freePage(id, v);
|
||||
|
@ -3780,7 +3953,7 @@ private:
|
|||
|
||||
// Write new version of pageID at version v using page as its data.
|
||||
// Attempts to reuse original id(s) in btPageID, returns BTreePageID.
|
||||
ACTOR static Future<BTreePageIDRef> updateBtreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena,
|
||||
ACTOR static Future<BTreePageIDRef> updateBTreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena,
|
||||
Reference<IPage> page, Version writeVersion) {
|
||||
state BTreePageIDRef newID;
|
||||
newID.resize(*arena, oldID.size());
|
||||
|
@ -3878,19 +4051,23 @@ private:
|
|||
// If the last record in the range has a null link then this will be null.
|
||||
const RedwoodRecordRef* expectedUpperBound;
|
||||
|
||||
bool inPlaceUpdate;
|
||||
|
||||
// CommitSubtree will call one of the following three functions based on its exit path
|
||||
|
||||
// Subtree was cleared.
|
||||
void cleared() {
|
||||
inPlaceUpdate = false;
|
||||
childrenChanged = true;
|
||||
expectedUpperBound = nullptr;
|
||||
}
|
||||
|
||||
// Page was updated in-place through edits and written to maybeNewID
|
||||
void updatedInPlace(BTreePageIDRef maybeNewID, BTreePage* btPage, int capacity) {
|
||||
inPlaceUpdate = true;
|
||||
auto& metrics = g_redwoodMetrics.level(btPage->height);
|
||||
metrics.pageModify += 1;
|
||||
metrics.pageModify += (maybeNewID.size() - 1);
|
||||
metrics.pageModifyExt += (maybeNewID.size() - 1);
|
||||
metrics.modifyFillPct += (double)btPage->size() / capacity;
|
||||
metrics.modifyStoredPct += (double)btPage->kvBytes / capacity;
|
||||
metrics.modifyItemCount += btPage->tree().numItems;
|
||||
|
@ -3912,6 +4089,7 @@ private:
|
|||
|
||||
// writePages() was used to build 1 or more replacement pages.
|
||||
void rebuilt(Standalone<VectorRef<RedwoodRecordRef>> newRecords) {
|
||||
inPlaceUpdate = false;
|
||||
newLinks = newRecords;
|
||||
childrenChanged = true;
|
||||
|
||||
|
@ -3952,14 +4130,15 @@ private:
|
|||
|
||||
struct InternalPageModifier {
|
||||
InternalPageModifier() {}
|
||||
InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating)
|
||||
: btPage(p), m(m), updating(updating), changesMade(false) {}
|
||||
InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating, ParentInfo *parentInfo)
|
||||
: btPage(p), m(m), updating(updating), changesMade(false), parentInfo(parentInfo) {}
|
||||
|
||||
bool updating;
|
||||
BTreePage* btPage;
|
||||
BTreePage::BinaryTree::Mirror* m;
|
||||
Standalone<VectorRef<RedwoodRecordRef>> rebuild;
|
||||
bool changesMade;
|
||||
ParentInfo *parentInfo;
|
||||
|
||||
bool empty() const {
|
||||
if (updating) {
|
||||
|
@ -4055,6 +4234,13 @@ private:
|
|||
// endpoint.
|
||||
changesMade = true;
|
||||
} else {
|
||||
|
||||
if(u.inPlaceUpdate) {
|
||||
for(auto id : u.decodeLowerBound->getChildPage()) {
|
||||
parentInfo->pageUpdated(id);
|
||||
}
|
||||
}
|
||||
|
||||
keep(u.cBegin, u.cEnd);
|
||||
}
|
||||
|
||||
|
@ -4226,7 +4412,7 @@ private:
|
|||
debug_printf("%s Inserted %s [mutation, boundary start]\n", context.c_str(),
|
||||
rec.toString().c_str());
|
||||
} else {
|
||||
debug_printf("%s Inserted failed for %s [mutation, boundary start]\n", context.c_str(),
|
||||
debug_printf("%s Insert failed for %s [mutation, boundary start]\n", context.c_str(),
|
||||
rec.toString().c_str());
|
||||
switchToLinearMerge();
|
||||
}
|
||||
|
@ -4339,12 +4525,12 @@ private:
|
|||
// If the tree is now empty, delete the page
|
||||
if (deltaTree.numItems == 0) {
|
||||
update->cleared();
|
||||
self->freeBtreePage(rootID, writeVersion);
|
||||
self->freeBTreePage(rootID, writeVersion);
|
||||
debug_printf("%s Page updates cleared all entries, returning %s\n", context.c_str(),
|
||||
toString(*update).c_str());
|
||||
} else {
|
||||
// Otherwise update it.
|
||||
BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(),
|
||||
BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(),
|
||||
page.castTo<IPage>(), writeVersion));
|
||||
|
||||
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
|
||||
|
@ -4357,7 +4543,7 @@ private:
|
|||
// If everything in the page was deleted then this page should be deleted as of the new version
|
||||
if (merged.empty()) {
|
||||
update->cleared();
|
||||
self->freeBtreePage(rootID, writeVersion);
|
||||
self->freeBTreePage(rootID, writeVersion);
|
||||
|
||||
debug_printf("%s All leaf page contents were cleared, returning %s\n", context.c_str(),
|
||||
toString(*update).c_str());
|
||||
|
@ -4511,7 +4697,7 @@ private:
|
|||
if (btPage->height == 2) {
|
||||
debug_printf("%s: freeing child page in cleared subtree range: %s\n",
|
||||
context.c_str(), ::toString(rec.getChildPage()).c_str());
|
||||
self->freeBtreePage(rec.getChildPage(), writeVersion);
|
||||
self->freeBTreePage(rec.getChildPage(), writeVersion);
|
||||
} else {
|
||||
debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n",
|
||||
context.c_str(), ::toString(rec.getChildPage()).c_str());
|
||||
|
@ -4547,7 +4733,10 @@ private:
|
|||
wait(waitForAll(recursions));
|
||||
debug_printf("%s Recursions done, processing slice updates.\n", context.c_str());
|
||||
|
||||
state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate);
|
||||
// Note: parentInfo could be invalid after a wait and must be re-initialized.
|
||||
// All uses below occur before waits so no reinitialization is done.
|
||||
state ParentInfo *parentInfo = &self->childUpdateTracker[rootID.front()];
|
||||
state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate, parentInfo);
|
||||
|
||||
// Apply the possible changes for each subtree range recursed to, except the last one.
|
||||
// For each range, the expected next record, if any, is checked against the first boundary
|
||||
|
@ -4565,25 +4754,103 @@ private:
|
|||
context.c_str(), m.changesMade, update->toString().c_str());
|
||||
m.applyUpdate(*slices.back(), m.changesMade ? update->subtreeUpperBound : update->decodeUpperBound);
|
||||
|
||||
state bool detachChildren = (parentInfo->count > 2);
|
||||
state bool forceUpdate = false;
|
||||
|
||||
if(!m.changesMade && detachChildren) {
|
||||
debug_printf("%s Internal page forced rewrite because at least %d children have been updated in-place.\n", context.c_str(), parentInfo->count);
|
||||
forceUpdate = true;
|
||||
if(!m.updating) {
|
||||
page = self->cloneForUpdate(page);
|
||||
cursor = getCursor(page);
|
||||
btPage = (BTreePage*)page->begin();
|
||||
m.btPage = btPage;
|
||||
m.m = cursor.mirror;
|
||||
m.updating = true;
|
||||
}
|
||||
++g_redwoodMetrics.level(btPage->height).forceUpdate;
|
||||
}
|
||||
|
||||
// If page contents have changed
|
||||
if (m.changesMade) {
|
||||
if ((m.empty())) {
|
||||
if (m.changesMade || forceUpdate) {
|
||||
if (m.empty()) {
|
||||
update->cleared();
|
||||
debug_printf("%s All internal page children were deleted so deleting this page too, returning %s\n",
|
||||
context.c_str(), toString(*update).c_str());
|
||||
self->freeBtreePage(rootID, writeVersion);
|
||||
self->freeBTreePage(rootID, writeVersion);
|
||||
self->childUpdateTracker.erase(rootID.front());
|
||||
} else {
|
||||
if (m.updating) {
|
||||
// Page was updated in place
|
||||
BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(),
|
||||
// Page was updated in place (or being forced to be updated in place to update child page ids)
|
||||
debug_printf("%s Internal page modified in-place tryUpdate=%d forceUpdate=%d detachChildren=%d\n", context.c_str(), tryToUpdate, forceUpdate, detachChildren);
|
||||
|
||||
if(detachChildren) {
|
||||
int detached = 0;
|
||||
cursor.moveFirst();
|
||||
auto &stats = g_redwoodMetrics.level(btPage->height);
|
||||
while(cursor.valid()) {
|
||||
if(cursor.get().value.present()) {
|
||||
for(auto &p : cursor.get().getChildPage()) {
|
||||
if(parentInfo->maybeUpdated(p)) {
|
||||
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
|
||||
if(newID != invalidLogicalPageID) {
|
||||
debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
|
||||
p = newID;
|
||||
++stats.detachChild;
|
||||
++detached;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
cursor.moveNext();
|
||||
}
|
||||
parentInfo->clear();
|
||||
if(forceUpdate && detached == 0) {
|
||||
debug_printf("%s No children detached during forced update, returning %s\n", context.c_str(), toString(*update).c_str());
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
||||
BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(),
|
||||
page.castTo<IPage>(), writeVersion));
|
||||
debug_printf(
|
||||
"%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n", context.c_str(), toString(writeVersion).c_str(),
|
||||
btPage->toString(false, newID, snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound)
|
||||
.c_str());
|
||||
|
||||
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
|
||||
debug_printf("%s Internal page updated in-place, returning %s\n", context.c_str(),
|
||||
toString(*update).c_str());
|
||||
} else {
|
||||
// Page was rebuilt, possibly split.
|
||||
debug_printf("%s Internal page modified, creating replacements.\n", context.c_str());
|
||||
debug_printf("%s Internal page could not be modified, rebuilding replacement(s).\n", context.c_str());
|
||||
|
||||
if(detachChildren) {
|
||||
auto &stats = g_redwoodMetrics.level(btPage->height);
|
||||
for(auto &rec : m.rebuild) {
|
||||
if(rec.value.present()) {
|
||||
BTreePageIDRef oldPages = rec.getChildPage();
|
||||
BTreePageIDRef newPages;
|
||||
for(int i = 0; i < oldPages.size(); ++i) {
|
||||
LogicalPageID p = oldPages[i];
|
||||
if(parentInfo->maybeUpdated(p)) {
|
||||
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
|
||||
if(newID != invalidLogicalPageID) {
|
||||
// Rebuild record values reference original page memory so make a copy
|
||||
if(newPages.empty()) {
|
||||
newPages = BTreePageIDRef(m.rebuild.arena(), oldPages);
|
||||
rec.setChildPage(newPages);
|
||||
}
|
||||
debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
|
||||
newPages[i] = newID;
|
||||
++stats.detachChild;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
parentInfo->clear();
|
||||
}
|
||||
|
||||
Standalone<VectorRef<RedwoodRecordRef>> newChildEntries =
|
||||
wait(writePages(self, update->subtreeLowerBound, update->subtreeUpperBound, m.rebuild,
|
||||
|
@ -4985,7 +5252,7 @@ public:
|
|||
bool isValid() const { return valid; }
|
||||
|
||||
std::string toString() const {
|
||||
std::string r;
|
||||
std::string r = format("{ptr=%p %s ", this, ::toString(pager->getVersion()).c_str());
|
||||
for (int i = 0; i < path.size(); ++i) {
|
||||
r += format("[%d/%d: %s] ", i + 1, path.size(),
|
||||
path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage->isLeaf()).c_str()
|
||||
|
@ -4994,6 +5261,7 @@ public:
|
|||
if (!valid) {
|
||||
r += " (invalid) ";
|
||||
}
|
||||
r += "}";
|
||||
return r;
|
||||
}
|
||||
|
||||
|
@ -5014,6 +5282,8 @@ public:
|
|||
const RedwoodRecordRef& upperBound) {
|
||||
Reference<const IPage>& page = pages[id.front()];
|
||||
if (page.isValid()) {
|
||||
// The pager won't see this access so count it as a cache hit
|
||||
++g_redwoodMetrics.pagerCacheHit;
|
||||
path.push_back(arena, { (BTreePage*)page->begin(), getCursor(page) });
|
||||
return Void();
|
||||
}
|
||||
|
@ -6960,24 +7230,23 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
state int pageSize =
|
||||
shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
|
||||
|
||||
state int64_t targetPageOps = shortTest ? 50000 : 1000000;
|
||||
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .01);
|
||||
state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2);
|
||||
state int maxValueSize = randomSize(pageSize * 25);
|
||||
state int maxCommitSize = shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6));
|
||||
state int mutationBytesTarget =
|
||||
shortTest ? 100000 : randomSize(std::min<int>(maxCommitSize * 100, pageSize * 100000));
|
||||
state double clearProbability = deterministicRandom()->random01() * .1;
|
||||
state double clearSingleKeyProbability = deterministicRandom()->random01();
|
||||
state double clearPostSetProbability = deterministicRandom()->random01() * .1;
|
||||
state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3);
|
||||
state double advanceOldVersionProbability = deterministicRandom()->random01();
|
||||
state double maxDuration = 60;
|
||||
state int64_t cacheSizeBytes =
|
||||
pagerMemoryOnly ? 2e9 : (BUGGIFY ? deterministicRandom()->randomInt(1, 10 * pageSize) : 0);
|
||||
state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8);
|
||||
state Version remapCleanupWindow = deterministicRandom()->randomInt64(0, versionIncrement * 50);
|
||||
|
||||
printf("\n");
|
||||
printf("targetPageOps: %" PRId64 "\n", targetPageOps);
|
||||
printf("pagerMemoryOnly: %d\n", pagerMemoryOnly);
|
||||
printf("serialTest: %d\n", serialTest);
|
||||
printf("shortTest: %d\n", shortTest);
|
||||
|
@ -6985,7 +7254,6 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
printf("maxKeySize: %d\n", maxKeySize);
|
||||
printf("maxValueSize: %d\n", maxValueSize);
|
||||
printf("maxCommitSize: %d\n", maxCommitSize);
|
||||
printf("mutationBytesTarget: %d\n", mutationBytesTarget);
|
||||
printf("clearProbability: %f\n", clearProbability);
|
||||
printf("clearSingleKeyProbability: %f\n", clearSingleKeyProbability);
|
||||
printf("clearPostSetProbability: %f\n", clearPostSetProbability);
|
||||
|
@ -7000,8 +7268,6 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
deleteFile(pagerFile);
|
||||
|
||||
printf("Initializing...\n");
|
||||
state double startTime = now();
|
||||
|
||||
pager = new DWALPager(pageSize, pagerFile, cacheSizeBytes, remapCleanupWindow, pagerMemoryOnly);
|
||||
state VersionedBTree* btree = new VersionedBTree(pager, pagerFile);
|
||||
wait(btree->init());
|
||||
|
@ -7028,14 +7294,12 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
state PromiseStream<Version> committedVersions;
|
||||
state Future<Void> verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
|
||||
state Future<Void> randomTask = serialTest ? Void() : (randomReader(btree) || btree->getError());
|
||||
committedVersions.send(lastVer);
|
||||
|
||||
state Future<Void> commit = Void();
|
||||
state int64_t totalPageOps = 0;
|
||||
|
||||
while (mutationBytes.get() < mutationBytesTarget && (now() - startTime) < maxDuration) {
|
||||
if (now() - startTime > 600) {
|
||||
mutationBytesTarget = mutationBytes.get();
|
||||
}
|
||||
|
||||
while (totalPageOps < targetPageOps) {
|
||||
// Sometimes increment the version
|
||||
if (deterministicRandom()->random01() < 0.10) {
|
||||
++version;
|
||||
|
@ -7131,14 +7395,12 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
}
|
||||
|
||||
// Commit at end or after this commit's mutation bytes are reached
|
||||
if (mutationBytes.get() >= mutationBytesTarget || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
|
||||
if (totalPageOps >= targetPageOps || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
|
||||
// Wait for previous commit to finish
|
||||
wait(commit);
|
||||
printf("Committed. Next commit %d bytes, %" PRId64
|
||||
"/%d (%.2f%%) Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n",
|
||||
mutationBytesThisCommit, mutationBytes.get(), mutationBytesTarget,
|
||||
(double)mutationBytes.get() / mutationBytesTarget * 100,
|
||||
(keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6,
|
||||
printf("Committed. Next commit %d bytes, %" PRId64 " bytes.", mutationBytesThisCommit, mutationBytes.get());
|
||||
printf(" Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n",
|
||||
(keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6,
|
||||
mutationBytes.rate() / 1e6);
|
||||
|
||||
Version v = version; // Avoid capture of version as a member of *this
|
||||
|
@ -7151,8 +7413,12 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
btree->getOldestVersion() + 1));
|
||||
}
|
||||
|
||||
commit = map(btree->commit(), [=](Void) {
|
||||
commit = map(btree->commit(), [=,&ops=totalPageOps](Void) {
|
||||
// Update pager ops before clearing metrics
|
||||
ops += g_redwoodMetrics.pageOps();
|
||||
printf("PageOps %" PRId64 "/%" PRId64 " (%.2f%%)\n", ops, targetPageOps, ops * 100.0 / targetPageOps);
|
||||
printf("Committed:\n%s\n", g_redwoodMetrics.toString(true).c_str());
|
||||
|
||||
// Notify the background verifier that version is committed and therefore readable
|
||||
committedVersions.send(v);
|
||||
return Void();
|
||||
|
@ -7202,6 +7468,7 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
committedVersions = PromiseStream<Version>();
|
||||
verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
|
||||
randomTask = randomReader(btree) || btree->getError();
|
||||
committedVersions.send(v);
|
||||
}
|
||||
|
||||
version += versionIncrement;
|
||||
|
@ -7209,7 +7476,7 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
}
|
||||
|
||||
// Check for errors
|
||||
if (errorCount != 0) throw internal_error();
|
||||
ASSERT(errorCount == 0);
|
||||
}
|
||||
|
||||
debug_printf("Waiting for outstanding commit\n");
|
||||
|
@ -7220,11 +7487,18 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
wait(verifyTask);
|
||||
|
||||
// Check for errors
|
||||
if (errorCount != 0) throw internal_error();
|
||||
ASSERT(errorCount == 0);
|
||||
|
||||
wait(btree->destroyAndCheckSanity());
|
||||
// Reopen pager and btree with a remap cleanup window of 0 to reclaim all old pages
|
||||
state Future<Void> closedFuture = btree->onClosed();
|
||||
btree->close();
|
||||
wait(closedFuture);
|
||||
btree = new VersionedBTree(new DWALPager(pageSize, pagerFile, cacheSizeBytes, 0), pagerFile);
|
||||
wait(btree->init());
|
||||
|
||||
Future<Void> closedFuture = btree->onClosed();
|
||||
wait(btree->clearAllAndCheckSanity());
|
||||
|
||||
closedFuture = btree->onClosed();
|
||||
btree->close();
|
||||
debug_printf("Closing.\n");
|
||||
wait(closedFuture);
|
||||
|
@ -7330,7 +7604,7 @@ TEST_CASE("!/redwood/performance/set") {
|
|||
state int minValueSize = 100;
|
||||
state int maxValueSize = 500;
|
||||
state int minConsecutiveRun = 1;
|
||||
state int maxConsecutiveRun = 10;
|
||||
state int maxConsecutiveRun = 100000;
|
||||
state char firstKeyChar = 'a';
|
||||
state char lastKeyChar = 'm';
|
||||
state Version remapCleanupWindow = SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW;
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
testTitle=StorefrontTest
|
||||
clearAfterTest=false
|
||||
|
||||
testName=Storefront
|
||||
actorsPerClient=50
|
||||
itemCount=100000
|
||||
maxOrderSize=4
|
||||
|
||||
testName=SaveAndKill
|
||||
restartInfoLocation=simfdb/restartInfo.ini
|
||||
testDuration=10.0
|
||||
testName=Storefront
|
||||
actorsPerClient=50
|
||||
itemCount=100000
|
||||
maxOrderSize=4
|
||||
testName=SaveAndKill
|
||||
restartInfoLocation=simfdb/restartInfo.ini
|
||||
testDuration=10.0
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
testTitle=StorefrontTest
|
||||
runSetup=false
|
||||
|
||||
testName=Storefront
|
||||
actorsPerClient=50
|
||||
itemCount=100000
|
||||
maxOrderSize=4
|
||||
testName=Storefront
|
||||
actorsPerClient=50
|
||||
itemCount=100000
|
||||
maxOrderSize=4
|
||||
|
|
Loading…
Reference in New Issue