Merge pull request #3605 from apple/release-6.3
Merge Release 6.3 to master
This commit is contained in:
commit
fe5902994c
|
@ -10,6 +10,7 @@ if (RocksDB_FOUND)
|
||||||
DOWNLOAD_COMMAND ""
|
DOWNLOAD_COMMAND ""
|
||||||
CMAKE_ARGS -DUSE_RTTI=1 -DPORTABLE=${PORTABLE_ROCKSDB}
|
CMAKE_ARGS -DUSE_RTTI=1 -DPORTABLE=${PORTABLE_ROCKSDB}
|
||||||
-DCMAKE_CXX_STANDARD=${CMAKE_CXX_STANDARD}
|
-DCMAKE_CXX_STANDARD=${CMAKE_CXX_STANDARD}
|
||||||
|
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
|
||||||
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
|
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
|
||||||
-DWITH_GFLAGS=OFF
|
-DWITH_GFLAGS=OFF
|
||||||
-DWITH_TESTS=OFF
|
-DWITH_TESTS=OFF
|
||||||
|
@ -36,6 +37,7 @@ else()
|
||||||
URL_HASH SHA256=d573d2f15cdda883714f7e0bc87b814a8d4a53a82edde558f08f940e905541ee
|
URL_HASH SHA256=d573d2f15cdda883714f7e0bc87b814a8d4a53a82edde558f08f940e905541ee
|
||||||
CMAKE_ARGS -DUSE_RTTI=1 -DPORTABLE=${PORTABLE_ROCKSDB}
|
CMAKE_ARGS -DUSE_RTTI=1 -DPORTABLE=${PORTABLE_ROCKSDB}
|
||||||
-DCMAKE_CXX_STANDARD=${CMAKE_CXX_STANDARD}
|
-DCMAKE_CXX_STANDARD=${CMAKE_CXX_STANDARD}
|
||||||
|
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
|
||||||
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
|
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
|
||||||
-DWITH_GFLAGS=OFF
|
-DWITH_GFLAGS=OFF
|
||||||
-DWITH_TESTS=OFF
|
-DWITH_TESTS=OFF
|
||||||
|
|
|
@ -2,6 +2,11 @@
|
||||||
Release Notes
|
Release Notes
|
||||||
#############
|
#############
|
||||||
|
|
||||||
|
6.3.5
|
||||||
|
=====
|
||||||
|
|
||||||
|
* Fix an issue where ``fdbcli --exec 'exclude no_wait ...'`` would incorrectly report that processes can safely be removed from the cluster. `(PR #3566) <https://github.com/apple/foundationdb/pull/3566>`_
|
||||||
|
|
||||||
6.3.4
|
6.3.4
|
||||||
=====
|
=====
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,9 @@ StringRef toStringRef(rocksdb::Slice s) {
|
||||||
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
|
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksdb::Options getOptions(const std::string& path) {
|
rocksdb::Options getOptions() {
|
||||||
rocksdb::Options options;
|
rocksdb::Options options;
|
||||||
bool exists = directoryExists(path);
|
options.create_if_missing = true;
|
||||||
options.create_if_missing = !exists;
|
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +70,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||||
"default", getCFOptions() } };
|
"default", getCFOptions() } };
|
||||||
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
||||||
auto status = rocksdb::DB::Open(getOptions(a.path), a.path, defaultCF, &handle, &db);
|
auto status = rocksdb::DB::Open(getOptions(), a.path, defaultCF, &handle, &db);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open");
|
TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open");
|
||||||
a.done.sendError(statusToError(status));
|
a.done.sendError(statusToError(status));
|
||||||
|
@ -112,7 +111,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
if (a.deleteOnClose) {
|
if (a.deleteOnClose) {
|
||||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||||
"default", getCFOptions() } };
|
"default", getCFOptions() } };
|
||||||
rocksdb::DestroyDB(a.path, getOptions(a.path), defaultCF);
|
rocksdb::DestroyDB(a.path, getOptions(), defaultCF);
|
||||||
}
|
}
|
||||||
a.done.send(Void());
|
a.done.send(Void());
|
||||||
}
|
}
|
||||||
|
@ -121,7 +120,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
struct Reader : IThreadPoolReceiver {
|
struct Reader : IThreadPoolReceiver {
|
||||||
DB& db;
|
DB& db;
|
||||||
rocksdb::ReadOptions readOptions;
|
rocksdb::ReadOptions readOptions;
|
||||||
std::unique_ptr<rocksdb::Iterator> cursor = nullptr;
|
|
||||||
|
|
||||||
explicit Reader(DB& db) : db(db) {}
|
explicit Reader(DB& db) : db(db) {}
|
||||||
|
|
||||||
|
@ -197,11 +195,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
|
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
|
||||||
};
|
};
|
||||||
void action(ReadRangeAction& a) {
|
void action(ReadRangeAction& a) {
|
||||||
if (cursor == nullptr) {
|
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
|
||||||
cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
|
|
||||||
} else {
|
|
||||||
cursor->Refresh();
|
|
||||||
}
|
|
||||||
Standalone<RangeResultRef> result;
|
Standalone<RangeResultRef> result;
|
||||||
int accumulatedBytes = 0;
|
int accumulatedBytes = 0;
|
||||||
if (a.rowLimit >= 0) {
|
if (a.rowLimit >= 0) {
|
||||||
|
@ -231,7 +225,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
|
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
|
||||||
}
|
}
|
||||||
result.more = (result.size() == a.rowLimit);
|
result.more =
|
||||||
|
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
|
||||||
if (result.more) {
|
if (result.more) {
|
||||||
result.readThrough = result[result.size()-1].key;
|
result.readThrough = result[result.size()-1].key;
|
||||||
}
|
}
|
||||||
|
@ -242,7 +237,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
DB db = nullptr;
|
DB db = nullptr;
|
||||||
std::string path;
|
std::string path;
|
||||||
UID id;
|
UID id;
|
||||||
size_t diskBytesUsed = 0;
|
|
||||||
Reference<IThreadPool> writeThread;
|
Reference<IThreadPool> writeThread;
|
||||||
Reference<IThreadPool> readThreads;
|
Reference<IThreadPool> readThreads;
|
||||||
unsigned nReaders = 16;
|
unsigned nReaders = 16;
|
||||||
|
@ -352,9 +346,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
int64_t free;
|
int64_t free;
|
||||||
int64_t total;
|
int64_t total;
|
||||||
|
|
||||||
|
uint64_t sstBytes = 0;
|
||||||
|
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kTotalSstFilesSize, &sstBytes));
|
||||||
|
uint64_t memtableBytes = 0;
|
||||||
|
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kSizeAllMemTables, &memtableBytes));
|
||||||
g_network->getDiskBytes(path, free, total);
|
g_network->getDiskBytes(path, free, total);
|
||||||
|
|
||||||
return StorageBytes(free, total, diskBytesUsed, free);
|
return StorageBytes(free, total, sstBytes + memtableBytes, free);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -611,7 +611,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
||||||
init( FASTRESTORE_SAMPLING_PERCENT, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
|
init( FASTRESTORE_SAMPLING_PERCENT, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
|
||||||
init( FASTRESTORE_NUM_LOADERS, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
|
init( FASTRESTORE_NUM_LOADERS, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
|
||||||
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
|
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
|
||||||
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1048576.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
|
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
|
||||||
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 2.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
|
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 2.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
|
||||||
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
|
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
|
||||||
init( FASTRESTORE_VB_MONITOR_DELAY, 30 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
|
init( FASTRESTORE_VB_MONITOR_DELAY, 30 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
|
||||||
|
@ -636,6 +636,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
||||||
init( FASTRESTORE_TXN_CLEAR_MAX, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_CLEAR_MAX = deterministicRandom()->random01() * 100 + 1; }
|
init( FASTRESTORE_TXN_CLEAR_MAX, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_CLEAR_MAX = deterministicRandom()->random01() * 100 + 1; }
|
||||||
init( FASTRESTORE_TXN_RETRY_MAX, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_RETRY_MAX = deterministicRandom()->random01() * 100 + 1; }
|
init( FASTRESTORE_TXN_RETRY_MAX, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_RETRY_MAX = deterministicRandom()->random01() * 100 + 1; }
|
||||||
init( FASTRESTORE_TXN_EXTRA_DELAY, 0.1 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_EXTRA_DELAY = deterministicRandom()->random01() * 1 + 0.001;}
|
init( FASTRESTORE_TXN_EXTRA_DELAY, 0.1 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_EXTRA_DELAY = deterministicRandom()->random01() * 1 + 0.001;}
|
||||||
|
init( FASTRESTORE_NOT_WRITE_DB, false ); // Perf test only: set it to true will cause simulation failure
|
||||||
|
init( FASTRESTORE_USE_RANGE_FILE, true ); // Perf test only: set it to false will cause simulation failure
|
||||||
|
init( FASTRESTORE_USE_LOG_FILE, true ); // Perf test only: set it to false will cause simulation failure
|
||||||
|
|
||||||
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
|
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
|
||||||
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
|
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
|
||||||
|
|
|
@ -568,6 +568,9 @@ public:
|
||||||
int FASTRESTORE_TXN_CLEAR_MAX; // threshold to start tracking each clear op in a txn
|
int FASTRESTORE_TXN_CLEAR_MAX; // threshold to start tracking each clear op in a txn
|
||||||
int FASTRESTORE_TXN_RETRY_MAX; // threshold to start output error on too many retries
|
int FASTRESTORE_TXN_RETRY_MAX; // threshold to start output error on too many retries
|
||||||
double FASTRESTORE_TXN_EXTRA_DELAY; // extra delay to avoid overwhelming fdb
|
double FASTRESTORE_TXN_EXTRA_DELAY; // extra delay to avoid overwhelming fdb
|
||||||
|
bool FASTRESTORE_NOT_WRITE_DB; // do not write result to DB. Only for dev testing
|
||||||
|
bool FASTRESTORE_USE_RANGE_FILE; // use range file in backup
|
||||||
|
bool FASTRESTORE_USE_LOG_FILE; // use log file in backup
|
||||||
|
|
||||||
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
|
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
|
||||||
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
|
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
|
||||||
|
@ -579,7 +582,7 @@ public:
|
||||||
int64_t REDWOOD_REMAP_CLEANUP_WINDOW; // Remap remover lag interval in which to coalesce page writes
|
int64_t REDWOOD_REMAP_CLEANUP_WINDOW; // Remap remover lag interval in which to coalesce page writes
|
||||||
double REDWOOD_REMAP_CLEANUP_LAG; // Maximum allowed remap remover lag behind the cleanup window as a multiple of the window size
|
double REDWOOD_REMAP_CLEANUP_LAG; // Maximum allowed remap remover lag behind the cleanup window as a multiple of the window size
|
||||||
double REDWOOD_LOGGING_INTERVAL;
|
double REDWOOD_LOGGING_INTERVAL;
|
||||||
|
|
||||||
// Server request latency measurement
|
// Server request latency measurement
|
||||||
int LATENCY_SAMPLE_SIZE;
|
int LATENCY_SAMPLE_SIZE;
|
||||||
double LATENCY_METRICS_LOGGING_INTERVAL;
|
double LATENCY_METRICS_LOGGING_INTERVAL;
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
#include "fdbserver/RestoreApplier.actor.h"
|
#include "fdbserver/RestoreApplier.actor.h"
|
||||||
|
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
#include "flow/network.h"
|
||||||
|
|
||||||
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
|
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
|
||||||
Reference<RestoreApplierData> self);
|
Reference<RestoreApplierData> self);
|
||||||
|
@ -99,7 +100,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
// The actor may be invovked multiple times and executed async.
|
// The actor may be invoked multiple times and executed async.
|
||||||
// No race condition as long as we do not wait or yield when operate the shared
|
// No race condition as long as we do not wait or yield when operate the shared
|
||||||
// data. Multiple such actors can run on different fileIDs.
|
// data. Multiple such actors can run on different fileIDs.
|
||||||
// Different files may contain mutations of the same commit versions, but with
|
// Different files may contain mutations of the same commit versions, but with
|
||||||
|
@ -146,6 +147,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
||||||
.detail("Version", versionedMutation.version.toString())
|
.detail("Version", versionedMutation.version.toString())
|
||||||
.detail("Index", mIndex)
|
.detail("Index", mIndex)
|
||||||
.detail("MutationReceived", versionedMutation.mutation.toString());
|
.detail("MutationReceived", versionedMutation.mutation.toString());
|
||||||
|
batchData->receivedBytes += versionedMutation.mutation.totalSize();
|
||||||
batchData->counters.receivedBytes += versionedMutation.mutation.totalSize();
|
batchData->counters.receivedBytes += versionedMutation.mutation.totalSize();
|
||||||
batchData->counters.receivedWeightedBytes +=
|
batchData->counters.receivedWeightedBytes +=
|
||||||
versionedMutation.mutation.weightedTotalSize(); // atomicOp will be amplified
|
versionedMutation.mutation.weightedTotalSize(); // atomicOp will be amplified
|
||||||
|
@ -187,6 +189,14 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
||||||
.detail("BatchIndex", batchIndex)
|
.detail("BatchIndex", batchIndex)
|
||||||
.detail("Ranges", ranges.size())
|
.detail("Ranges", ranges.size())
|
||||||
.detail("DelayTime", delayTime);
|
.detail("DelayTime", delayTime);
|
||||||
|
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) {
|
||||||
|
TraceEvent("FastRestoreApplierClearRangeMutationsNotWriteDB", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
|
.detail("Ranges", ranges.size());
|
||||||
|
ASSERT(!g_network->isSimulated());
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
@ -232,6 +242,24 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||||
state UID randomID = deterministicRandom()->randomUniqueID();
|
state UID randomID = deterministicRandom()->randomUniqueID();
|
||||||
|
|
||||||
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
|
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
|
||||||
|
|
||||||
|
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) { // Get dummy value to short-circut DB
|
||||||
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStartNotUseDB", applierID)
|
||||||
|
.detail("RandomUID", randomID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
|
.detail("GetKeys", incompleteStagingKeys.size())
|
||||||
|
.detail("DelayTime", delayTime);
|
||||||
|
ASSERT(!g_network->isSimulated());
|
||||||
|
int i = 0;
|
||||||
|
for (auto& key : incompleteStagingKeys) {
|
||||||
|
MutationRef m(MutationRef::SetValue, key.first, LiteralStringRef("0"));
|
||||||
|
key.second->second.add(m, LogMessageVersion(1));
|
||||||
|
key.second->second.precomputeResult("GetAndComputeStagingKeys", applierID, batchIndex);
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
|
||||||
.detail("RandomUID", randomID)
|
.detail("RandomUID", randomID)
|
||||||
.detail("BatchIndex", batchIndex)
|
.detail("BatchIndex", batchIndex)
|
||||||
|
@ -417,6 +445,11 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
||||||
std::map<Key, StagingKey>::iterator end, Database cx,
|
std::map<Key, StagingKey>::iterator end, Database cx,
|
||||||
FlowLock* applyStagingKeysBatchLock, UID applierID,
|
FlowLock* applyStagingKeysBatchLock, UID applierID,
|
||||||
ApplierBatchData::Counters* cc) {
|
ApplierBatchData::Counters* cc) {
|
||||||
|
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) {
|
||||||
|
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchSkipped", applierID).detail("Begin", begin->first);
|
||||||
|
ASSERT(!g_network->isSimulated());
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock?
|
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock?
|
||||||
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||||
|
@ -494,6 +527,7 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
||||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||||
&batchData->counters));
|
&batchData->counters));
|
||||||
batchData->counters.appliedBytes += txnSize;
|
batchData->counters.appliedBytes += txnSize;
|
||||||
|
batchData->appliedBytes += txnSize;
|
||||||
begin = cur;
|
begin = cur;
|
||||||
txnSize = 0;
|
txnSize = 0;
|
||||||
txnBatches++;
|
txnBatches++;
|
||||||
|
@ -504,6 +538,7 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
||||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||||
&batchData->counters));
|
&batchData->counters));
|
||||||
batchData->counters.appliedBytes += txnSize;
|
batchData->counters.appliedBytes += txnSize;
|
||||||
|
batchData->appliedBytes += txnSize;
|
||||||
txnBatches++;
|
txnBatches++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -252,6 +252,10 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
||||||
|
|
||||||
long receiveMutationReqs;
|
long receiveMutationReqs;
|
||||||
|
|
||||||
|
// Stats
|
||||||
|
double receivedBytes;
|
||||||
|
double appliedBytes;
|
||||||
|
|
||||||
// Status counters
|
// Status counters
|
||||||
struct Counters {
|
struct Counters {
|
||||||
CounterCollection cc;
|
CounterCollection cc;
|
||||||
|
@ -371,7 +375,7 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
|
||||||
// even when no version batch has been started.
|
// even when no version batch has been started.
|
||||||
int getVersionBatchState(int batchIndex) final {
|
int getVersionBatchState(int batchIndex) final {
|
||||||
std::map<int, Reference<ApplierBatchData>>::iterator item = batch.find(batchIndex);
|
std::map<int, Reference<ApplierBatchData>>::iterator item = batch.find(batchIndex);
|
||||||
if (item == batch.end()) { // Simply caller's effort in when it can call this func.
|
if (item == batch.end()) { // Batch has not been initialized when we blindly profile the state
|
||||||
return ApplierVersionBatchState::INVALID;
|
return ApplierVersionBatchState::INVALID;
|
||||||
} else {
|
} else {
|
||||||
return item->second->vbState.get();
|
return item->second->vbState.get();
|
||||||
|
|
|
@ -699,7 +699,9 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
||||||
|
|
||||||
TraceEvent("FastRestoreControllerPhaseCollectBackupFilesStart")
|
TraceEvent("FastRestoreControllerPhaseCollectBackupFilesStart")
|
||||||
.detail("TargetVersion", request.targetVersion)
|
.detail("TargetVersion", request.targetVersion)
|
||||||
.detail("BackupDesc", desc.toString());
|
.detail("BackupDesc", desc.toString())
|
||||||
|
.detail("UseRangeFile", SERVER_KNOBS->FASTRESTORE_USE_RANGE_FILE)
|
||||||
|
.detail("UseLogFile", SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE);
|
||||||
if (g_network->isSimulated()) {
|
if (g_network->isSimulated()) {
|
||||||
std::cout << "Restore to version: " << request.targetVersion << "\nBackupDesc: \n" << desc.toString() << "\n\n";
|
std::cout << "Restore to version: " << request.targetVersion << "\nBackupDesc: \n" << desc.toString() << "\n\n";
|
||||||
}
|
}
|
||||||
|
@ -720,29 +722,37 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
||||||
double rangeSize = 0;
|
double rangeSize = 0;
|
||||||
double logSize = 0;
|
double logSize = 0;
|
||||||
*minRangeVersion = MAX_VERSION;
|
*minRangeVersion = MAX_VERSION;
|
||||||
for (const RangeFile& f : restorable.get().ranges) {
|
if (SERVER_KNOBS->FASTRESTORE_USE_RANGE_FILE) {
|
||||||
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles").detail("RangeFile", f.toString());
|
for (const RangeFile& f : restorable.get().ranges) {
|
||||||
if (f.fileSize <= 0) {
|
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles")
|
||||||
continue;
|
.detail("RangeFile", f.toString());
|
||||||
|
if (f.fileSize <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
RestoreFileFR file(f);
|
||||||
|
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles")
|
||||||
|
.detail("RangeFileFR", file.toString());
|
||||||
|
uniqueRangeFiles.insert(file);
|
||||||
|
rangeSize += file.fileSize;
|
||||||
|
*minRangeVersion = std::min(*minRangeVersion, file.version);
|
||||||
}
|
}
|
||||||
RestoreFileFR file(f);
|
|
||||||
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles")
|
|
||||||
.detail("RangeFileFR", file.toString());
|
|
||||||
uniqueRangeFiles.insert(file);
|
|
||||||
rangeSize += file.fileSize;
|
|
||||||
*minRangeVersion = std::min(*minRangeVersion, file.version);
|
|
||||||
}
|
}
|
||||||
for (const LogFile& f : restorable.get().logs) {
|
|
||||||
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles").detail("LogFile", f.toString());
|
if (SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE) {
|
||||||
if (f.fileSize <= 0) {
|
for (const LogFile& f : restorable.get().logs) {
|
||||||
continue;
|
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles").detail("LogFile", f.toString());
|
||||||
|
if (f.fileSize <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
RestoreFileFR file(f);
|
||||||
|
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles")
|
||||||
|
.detail("LogFileFR", file.toString());
|
||||||
|
logFiles->push_back(file);
|
||||||
|
uniqueLogFiles.insert(file);
|
||||||
|
logSize += file.fileSize;
|
||||||
}
|
}
|
||||||
RestoreFileFR file(f);
|
|
||||||
TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles").detail("LogFileFR", file.toString());
|
|
||||||
logFiles->push_back(file);
|
|
||||||
uniqueLogFiles.insert(file);
|
|
||||||
logSize += file.fileSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assign unique range files and log files to output
|
// Assign unique range files and log files to output
|
||||||
rangeFiles->assign(uniqueRangeFiles.begin(), uniqueRangeFiles.end());
|
rangeFiles->assign(uniqueRangeFiles.begin(), uniqueRangeFiles.end());
|
||||||
logFiles->assign(uniqueLogFiles.begin(), uniqueLogFiles.end());
|
logFiles->assign(uniqueLogFiles.begin(), uniqueLogFiles.end());
|
||||||
|
@ -752,7 +762,9 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
||||||
.detail("RangeFiles", rangeFiles->size())
|
.detail("RangeFiles", rangeFiles->size())
|
||||||
.detail("LogFiles", logFiles->size())
|
.detail("LogFiles", logFiles->size())
|
||||||
.detail("RangeFileBytes", rangeSize)
|
.detail("RangeFileBytes", rangeSize)
|
||||||
.detail("LogFileBytes", logSize);
|
.detail("LogFileBytes", logSize)
|
||||||
|
.detail("UseRangeFile", SERVER_KNOBS->FASTRESTORE_USE_RANGE_FILE)
|
||||||
|
.detail("UseLogFile", SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE);
|
||||||
return request.targetVersion;
|
return request.targetVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -847,7 +859,7 @@ ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInt
|
||||||
}
|
}
|
||||||
wait(sendBatchRequests(&RestoreLoaderInterface::initVersionBatch, loadersInterf, requestsToLoaders));
|
wait(sendBatchRequests(&RestoreLoaderInterface::initVersionBatch, loadersInterf, requestsToLoaders));
|
||||||
|
|
||||||
TraceEvent("FastRestoreControllerPhaseInitVersionBatchForLoadersDone").detail("BatchIndex", batchIndex);
|
TraceEvent("FastRestoreControllerPhaseInitVersionBatchForAppliersDone").detail("BatchIndex", batchIndex);
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -183,6 +183,10 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
|
||||||
|
|
||||||
void dumpVersionBatches(const std::map<Version, VersionBatch>& versionBatches) {
|
void dumpVersionBatches(const std::map<Version, VersionBatch>& versionBatches) {
|
||||||
int i = 1;
|
int i = 1;
|
||||||
|
double rangeFiles = 0;
|
||||||
|
double rangeSize = 0;
|
||||||
|
double logFiles = 0;
|
||||||
|
double logSize = 0;
|
||||||
for (auto& vb : versionBatches) {
|
for (auto& vb : versionBatches) {
|
||||||
TraceEvent("FastRestoreVersionBatches")
|
TraceEvent("FastRestoreVersionBatches")
|
||||||
.detail("BatchIndex", vb.second.batchIndex)
|
.detail("BatchIndex", vb.second.batchIndex)
|
||||||
|
@ -196,15 +200,25 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
|
||||||
TraceEvent(invalidVersion ? SevError : SevInfo, "FastRestoreVersionBatches")
|
TraceEvent(invalidVersion ? SevError : SevInfo, "FastRestoreVersionBatches")
|
||||||
.detail("BatchIndex", i)
|
.detail("BatchIndex", i)
|
||||||
.detail("RangeFile", f.toString());
|
.detail("RangeFile", f.toString());
|
||||||
|
rangeSize += f.fileSize;
|
||||||
|
rangeFiles++;
|
||||||
}
|
}
|
||||||
for (auto& f : vb.second.logFiles) {
|
for (auto& f : vb.second.logFiles) {
|
||||||
bool outOfRange = (f.beginVersion >= vb.second.endVersion || f.endVersion <= vb.second.beginVersion);
|
bool outOfRange = (f.beginVersion >= vb.second.endVersion || f.endVersion <= vb.second.beginVersion);
|
||||||
TraceEvent(outOfRange ? SevError : SevInfo, "FastRestoreVersionBatches")
|
TraceEvent(outOfRange ? SevError : SevInfo, "FastRestoreVersionBatches")
|
||||||
.detail("BatchIndex", i)
|
.detail("BatchIndex", i)
|
||||||
.detail("LogFile", f.toString());
|
.detail("LogFile", f.toString());
|
||||||
|
logSize += f.fileSize;
|
||||||
|
logFiles++;
|
||||||
}
|
}
|
||||||
++i;
|
++i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TraceEvent("FastRestoreVersionBatchesSummary")
|
||||||
|
.detail("LogFiles", logFiles)
|
||||||
|
.detail("RangeFiles", rangeFiles)
|
||||||
|
.detail("LogBytes", logSize)
|
||||||
|
.detail("RangeBytes", rangeSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Input: Get the size of data in backup files in version range [prevVersion, nextVersion)
|
// Input: Get the size of data in backup files in version range [prevVersion, nextVersion)
|
||||||
|
|
|
@ -801,7 +801,7 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cc->sampledLogBytes += mutation.totalSize();
|
cc->loadedLogBytes += mutation.totalSize();
|
||||||
|
|
||||||
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
|
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
|
||||||
.detail("CommitVersion", commitVersion)
|
.detail("CommitVersion", commitVersion)
|
||||||
|
@ -814,6 +814,7 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
|
||||||
|
|
||||||
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
|
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
|
||||||
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
|
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
|
||||||
|
cc->sampledLogBytes += mutation.totalSize();
|
||||||
samples.push_back_deep(samples.arena(), mutation);
|
samples.push_back_deep(samples.arena(), mutation);
|
||||||
}
|
}
|
||||||
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
|
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
|
||||||
|
@ -942,7 +943,7 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
|
||||||
|
|
||||||
if (pProcessedFileOffset->get() == asset.offset) {
|
if (pProcessedFileOffset->get() == asset.offset) {
|
||||||
for (const KeyValueRef& kv : data) {
|
for (const KeyValueRef& kv : data) {
|
||||||
// Concatenate the backuped param1 and param2 (KV) at the same version.
|
// Concatenate the backup param1 and param2 (KV) at the same version.
|
||||||
concatenateBackupMutationForLogFile(pMutationMap, kv.key, kv.value, asset);
|
concatenateBackupMutationForLogFile(pMutationMap, kv.key, kv.value, asset);
|
||||||
}
|
}
|
||||||
pProcessedFileOffset->set(asset.offset + asset.len);
|
pProcessedFileOffset->set(asset.offset + asset.len);
|
||||||
|
|
|
@ -158,7 +158,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
|
||||||
|
|
||||||
int getVersionBatchState(int batchIndex) final {
|
int getVersionBatchState(int batchIndex) final {
|
||||||
std::map<int, Reference<LoaderBatchData>>::iterator item = batch.find(batchIndex);
|
std::map<int, Reference<LoaderBatchData>>::iterator item = batch.find(batchIndex);
|
||||||
if (item != batch.end()) { // Simply caller's effort in when it can call this func.
|
if (item == batch.end()) { // Batch has not been initialized when we blindly profile the state
|
||||||
return LoaderVersionBatchState::INVALID;
|
return LoaderVersionBatchState::INVALID;
|
||||||
} else {
|
} else {
|
||||||
return item->second->vbState.get();
|
return item->second->vbState.get();
|
||||||
|
|
|
@ -151,6 +151,7 @@ ACTOR Future<Void> traceProcessMetrics(Reference<RestoreRoleData> self, std::str
|
||||||
.detail("Role", role)
|
.detail("Role", role)
|
||||||
.detail("PipelinedMaxVersionBatchIndex", self->versionBatchId.get())
|
.detail("PipelinedMaxVersionBatchIndex", self->versionBatchId.get())
|
||||||
.detail("FinishedVersionBatchIndex", self->finishedBatch.get())
|
.detail("FinishedVersionBatchIndex", self->finishedBatch.get())
|
||||||
|
.detail("CurrentVersionBatchPhase", self->getVersionBatchState(self->finishedBatch.get() + 1))
|
||||||
.detail("CpuUsage", self->cpuUsage)
|
.detail("CpuUsage", self->cpuUsage)
|
||||||
.detail("UsedMemory", self->memory)
|
.detail("UsedMemory", self->memory)
|
||||||
.detail("ResidentMemory", self->residentMemory);
|
.detail("ResidentMemory", self->residentMemory);
|
||||||
|
|
Loading…
Reference in New Issue