Merge branch 'master' of github.com:apple/foundationdb into features/flatbuffers-debugtx
This commit is contained in:
commit
040e62341a
|
@ -1,10 +1,10 @@
|
|||
## FDB Backup Data Format
|
||||
|
||||
### Introduction
|
||||
This document describes the data format of the files generated by FoundationDB (FDB) backup procedure.
|
||||
The target readers who may benefit from reading this document are:
|
||||
* who make changes on the current backup or restore procedure;
|
||||
* who writes tools to digest the backup data for analytical purpose;
|
||||
This document describes the data format of the files generated by FoundationDB (FDB) backup procedure.
|
||||
The target readers who may benefit from reading this document are:
|
||||
* who make changes on the current backup or restore procedure;
|
||||
* who writes tools to digest the backup data for analytical purpose;
|
||||
* who wants to understand the internals of how backup and restore works.
|
||||
|
||||
The description of the backup data format is based on FDB 5.2 to FDB 6.1. The backup data format may (although unlikely) change after FDB 6.1.
|
||||
|
@ -12,27 +12,27 @@ The description of the backup data format is based on FDB 5.2 to FDB 6.1. The ba
|
|||
|
||||
### Files generated by backup
|
||||
The backup procedure generates two types of files: range files and log files.
|
||||
* A range file describes key-value pairs in a range at the version when the backup process takes a snapshot of the range. Different range files have data for different ranges at different versions.
|
||||
* A log file describes the mutations taken from a version v<sub>1</sub> to v<sub>2</sub> during the backup procedure.
|
||||
* A range file describes key-value pairs in a range at the version when the backup process takes a snapshot of the range. Different range files have data for different ranges at different versions.
|
||||
* A log file describes the mutations taken from a version v<sub>1</sub> to v<sub>2</sub> during the backup procedure.
|
||||
|
||||
With the key-value pairs in range file and the mutations in log file, the restore procedure can restore the database into a consistent state at a user-provided version v<sub>k</sub> if the backup data is claimed by the restore as restorable at v<sub>k</sub>. (The details of determining if a set of backup data is restorable at a version is out of scope of this document and can be found at [backup.md](https://github.com/xumengpanda/foundationdb/blob/cd873831ecd18653c5bf459d6f72d14a99b619c4/design/backup.md).
|
||||
|
||||
|
||||
### Filename conventions
|
||||
The backup files will be saved in a directory (i.e., url) specified by users. Under the directory, the range files are in the `snapshots` folder. The log files are in the `logs` folder.
|
||||
The backup files will be saved in a directory (i.e., url) specified by users. Under the directory, the range files are in the `snapshots` folder. The log files are in the `logs` folder.
|
||||
|
||||
The convention of the range filename is ` snapshots/snapshot,beginVersion,beginVersion,blockSize`, where `beginVersion` is the version when the key-values in the range file are recorded, and blockSize is the size of data blocks in the range file.
|
||||
|
||||
The convention of the log filename is `logs/,versionPrefix/log,beginVersion,endVersion,randomUID, blockSize`, where the versionPrefix is a 2-level path (`x/y`) where beginVersion should go such that `x/y/*` contains (10^smallestBucket) possible versions; the randomUID is a random UID, the `beginVersion` and `endVersion` are the version range (left inclusive, right exclusive) when the mutations are recorded; and the `blockSize` is the data block size in the log file.
|
||||
|
||||
We will use an example to explain what each field in the range and log filename means.
|
||||
Suppose under the backup directory, we have a range file `snapshots/snapshot,78994177,78994177,97` and a log file `logs/0000/0000/log,78655645,98655645,149a0bdfedecafa2f648219d5eba816e,1048576`.
|
||||
Suppose under the backup directory, we have a range file `snapshots/snapshot,78994177,78994177,97` and a log file `logs/0000/0000/log,78655645,98655645,149a0bdfedecafa2f648219d5eba816e,1048576`.
|
||||
The range file’s filename tells us that all key-value pairs decoded from the file are the KV value in DB at the version `78994177`. The data block size is `97` bytes.
|
||||
The log file’s filename tells us that the mutations in the log file were the mutations in the DB during the version range `[78655645,98655645)`, and the data block size is `1048576` bytes.
|
||||
The log file’s filename tells us that the mutations in the log file were the mutations in the DB during the version range `[78655645,98655645)`, and the data block size is `1048576` bytes.
|
||||
|
||||
|
||||
### Data format in a range file
|
||||
A range file can have one to many data blocks. Each data block has a set of key-value pairs.
|
||||
### Data format in a range file
|
||||
A range file can have one to many data blocks. Each data block has a set of key-value pairs.
|
||||
A data block is encoded as follows: `Header startKey k1v1 k2v2 Padding`.
|
||||
|
||||
|
||||
|
@ -44,7 +44,7 @@ A data block is encoded as follows: `Header startKey k1v1 k2v2 Padding`.
|
|||
|
||||
H = header P = padding a...z = keys v = value | = block boundary
|
||||
|
||||
Encoded file: H a cv dv ev P | H e ev fv gv hv P | H h hv iv jv z
|
||||
Encoded file: H a cv dv P | H e ev fv gv hv P | H h hv iv jv z
|
||||
Decoded in blocks yields:
|
||||
Block 1: range [a, e) with kv pairs cv, dv
|
||||
Block 2: range [e, h) with kv pairs ev, fv, gv
|
||||
|
@ -58,19 +58,19 @@ The code that decodes a range block is in `ACTOR Future<Standalone<VectorRef<Key
|
|||
|
||||
|
||||
### Data format in a log file
|
||||
A log file can have one to many data blocks.
|
||||
Each block is encoded as `Header, [Param1, Param2]... padding`.
|
||||
The first 32bits in `Param1` and `Param2` specifies the length of the `Param1` and `Param2`.
|
||||
`Param1` specifies the version when the mutations happened;
|
||||
`Param2` encodes the group of mutations happened at the version.
|
||||
A log file can have one to many data blocks.
|
||||
Each block is encoded as `Header, [Param1, Param2]... padding`.
|
||||
The first 32bits in `Param1` and `Param2` specifies the length of the `Param1` and `Param2`.
|
||||
`Param1` specifies the version when the mutations happened;
|
||||
`Param2` encodes the group of mutations happened at the version.
|
||||
|
||||
Note that if the group of mutations is bigger than the block size, the mutation group will be split across multiple data blocks.
|
||||
Note that if the group of mutations is bigger than the block size, the mutation group will be split across multiple data blocks.
|
||||
For example, we may get `[Param1, Param2_part0]`, `[Param1, Param2_part1]`. By concatenating the `Param2_part0` and `Param2_part1`, we can get the group of all mutations happened in the version specified in `Param1`.
|
||||
|
||||
The encoding format for `Param1` is as follows:
|
||||
`hashValue|commitVersion|part`,
|
||||
where `hashValue` is the hash of the commitVersion, `commitVersion` is the version when the mutations in `Param2`(s) are taken, and `part` is the part number in case we need to concatenate the `Param2` to get the group of all mutations.
|
||||
`hashValue` takes 8bits, `commitVersion` takes 64bits, and `part` takes 32bits.
|
||||
`hashValue|commitVersion|part`,
|
||||
where `hashValue` is the hash of the commitVersion, `commitVersion` is the version when the mutations in `Param2`(s) are taken, and `part` is the part number in case we need to concatenate the `Param2` to get the group of all mutations.
|
||||
`hashValue` takes 8bits, `commitVersion` takes 64bits, and `part` takes 32bits.
|
||||
|
||||
Note that in case of concatenating the partial group of mutations in `Param2` to get the full group of all mutations, the part number should be continuous.
|
||||
|
||||
|
@ -80,12 +80,12 @@ The `encoded_mutation_i` is encoded as follows
|
|||
`type|kLen|vLen|Key|Value`
|
||||
where type is the mutation type, such as Set or Clear, `kLen` and `vLen` respectively are the length of the key and value in the mutation. `Key` and `Value` are the serialized value of the Key and Value in the mutation.
|
||||
|
||||
The code related to how a log file is written is in the `struct LogFileWriter` in `namespace fileBackup`.
|
||||
The code related to how a log file is written is in the `struct LogFileWriter` in `namespace fileBackup`.
|
||||
|
||||
The code that decodes a mutation block is in `ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file, int64_t offset, int len)`.
|
||||
|
||||
|
||||
### Endianness
|
||||
When the restore decodes a serialized integer from the backup file, it needs to convert the serialized value from big endian to little endian.
|
||||
When the restore decodes a serialized integer from the backup file, it needs to convert the serialized value from big endian to little endian.
|
||||
|
||||
The reason is as follows: When the backup procedure transfers the data to remote blob store, the backup data is encoded in big endian. However, FoundationDB currently only run on little endian machines. The endianness affects the interpretation of an integer, so we must perform the endianness convertion.
|
||||
The reason is as follows: When the backup procedure transfers the data to remote blob store, the backup data is encoded in big endian. However, FoundationDB currently only run on little endian machines. The endianness affects the interpretation of an integer, so we must perform the endianness convertion.
|
|
@ -44,6 +44,8 @@ set(FDBCLIENT_SRCS
|
|||
NativeAPI.actor.cpp
|
||||
NativeAPI.actor.h
|
||||
Notified.h
|
||||
SpecialKeySpace.actor.cpp
|
||||
SpecialKeySpace.actor.h
|
||||
ReadYourWrites.actor.cpp
|
||||
ReadYourWrites.h
|
||||
RestoreWorkerInterface.actor.h
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/MasterProxyInterface.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "fdbrpc/QueueModel.h"
|
||||
#include "fdbrpc/MultiInterface.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
|
@ -228,6 +229,8 @@ public:
|
|||
double detailedHealthMetricsLastUpdated;
|
||||
|
||||
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
|
||||
std::shared_ptr<SpecialKeySpace> specialKeySpace;
|
||||
std::shared_ptr<ConflictingKeysImpl> cKImpl;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -416,7 +416,7 @@ typedef Standalone<KeyRangeRef> KeyRange;
|
|||
typedef Standalone<KeyValueRef> KeyValue;
|
||||
typedef Standalone<struct KeySelectorRef> KeySelector;
|
||||
|
||||
enum { invalidVersion = -1, latestVersion = -2 };
|
||||
enum { invalidVersion = -1, latestVersion = -2, MAX_VERSION = std::numeric_limits<int64_t>::max() };
|
||||
|
||||
inline Key keyAfter( const KeyRef& key ) {
|
||||
if(key == LiteralStringRef("\xff\xff"))
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include "fdbclient/MonitorLeader.h"
|
||||
#include "fdbclient/MutationList.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbrpc/LoadBalance.h"
|
||||
|
@ -503,25 +504,42 @@ ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bo
|
|||
Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
|
||||
return getHealthMetricsActor(this, detailed);
|
||||
}
|
||||
DatabaseContext::DatabaseContext(
|
||||
Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
|
||||
TaskPriority taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, bool internal, int apiVersion, bool switchable )
|
||||
: connectionFile(connectionFile),clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
|
||||
lockAware(lockAware), apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
|
||||
transactionReadVersionsCompleted("ReadVersionsCompleted", cc), transactionReadVersionBatches("ReadVersionBatches", cc), transactionBatchReadVersions("BatchPriorityReadVersions", cc),
|
||||
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc), transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
|
||||
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc), transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
|
||||
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
|
||||
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc), transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
|
||||
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc), transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc),
|
||||
transactionBytesRead("BytesRead", cc), transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc), transactionCommittedMutations("CommittedMutations", cc),
|
||||
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc), transactionClearMutations("ClearMutations", cc),
|
||||
transactionAtomicMutations("AtomicMutations", cc), transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
|
||||
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc), transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc), transactionsTooOld("TooOld", cc),
|
||||
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
|
||||
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal)
|
||||
{
|
||||
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
|
||||
Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
|
||||
TaskPriority taskID, LocalityData const& clientLocality,
|
||||
bool enableLocalityLoadBalance, bool lockAware, bool internal, int apiVersion,
|
||||
bool switchable)
|
||||
: connectionFile(connectionFile), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID),
|
||||
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
|
||||
apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"),
|
||||
transactionReadVersions("ReadVersions", cc), transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
|
||||
transactionReadVersionBatches("ReadVersionBatches", cc),
|
||||
transactionBatchReadVersions("BatchPriorityReadVersions", cc),
|
||||
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc),
|
||||
transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
|
||||
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc),
|
||||
transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
|
||||
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc),
|
||||
transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
|
||||
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc),
|
||||
transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
|
||||
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc),
|
||||
transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc), transactionBytesRead("BytesRead", cc),
|
||||
transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc),
|
||||
transactionCommittedMutations("CommittedMutations", cc),
|
||||
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc),
|
||||
transactionClearMutations("ClearMutations", cc), transactionAtomicMutations("AtomicMutations", cc),
|
||||
transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
|
||||
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc),
|
||||
transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc),
|
||||
transactionsTooOld("TooOld", cc), transactionsFutureVersions("FutureVersions", cc),
|
||||
transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
|
||||
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc),
|
||||
transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0), latencies(1000), readLatencies(1000),
|
||||
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal),
|
||||
specialKeySpace(std::make_shared<SpecialKeySpace>(normalKeys.begin, specialKeys.end)),
|
||||
cKImpl(std::make_shared<ConflictingKeysImpl>(conflictingKeysRange)) {
|
||||
dbId = deterministicRandom()->randomUniqueID();
|
||||
connected = clientInfo->get().proxies.size() ? Void() : clientInfo->onChange();
|
||||
|
||||
|
@ -540,6 +558,7 @@ DatabaseContext::DatabaseContext(
|
|||
|
||||
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
specialKeySpace->registerKeyRange(conflictingKeysRange, cKImpl.get());
|
||||
}
|
||||
|
||||
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
|
||||
|
@ -2777,42 +2796,21 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
return Void();
|
||||
} else {
|
||||
// clear the RYW transaction which contains previous conflicting keys
|
||||
tr->info.conflictingKeysRYW.reset();
|
||||
tr->info.conflictingKeys.reset();
|
||||
if (ci.conflictingKRIndices.present()) {
|
||||
// In general, if we want to use getRange to expose conflicting keys,
|
||||
// we need to support all the parameters getRange provides.
|
||||
// It is difficult to take care of all corner cases of what getRange does.
|
||||
// Consequently, we use a hack way here to achieve it.
|
||||
// We create an empty RYWTransaction and write all conflicting key/values to it.
|
||||
// Since it is RYWTr, we can call getRange on it with same parameters given to the original
|
||||
// getRange.
|
||||
tr->info.conflictingKeysRYW = std::make_shared<ReadYourWritesTransaction>(tr->getDatabase());
|
||||
state Reference<ReadYourWritesTransaction> hackTr =
|
||||
Reference<ReadYourWritesTransaction>(tr->info.conflictingKeysRYW.get());
|
||||
try {
|
||||
state Standalone<VectorRef<int>> conflictingKRIndices = ci.conflictingKRIndices.get();
|
||||
// To make the getRange call local, we need to explicitly set the read version here.
|
||||
// This version number 100 set here does nothing but prevent getting read version from the
|
||||
// proxy
|
||||
tr->info.conflictingKeysRYW->setVersion(100);
|
||||
// Clear the whole key space, thus, RYWTr knows to only read keys locally
|
||||
tr->info.conflictingKeysRYW->clear(normalKeys);
|
||||
// initialize value
|
||||
tr->info.conflictingKeysRYW->set(conflictingKeysPrefix, conflictingKeysFalse);
|
||||
// drop duplicate indices and merge overlapped ranges
|
||||
// Note: addReadConflictRange in native transaction object does not merge overlapped ranges
|
||||
state std::unordered_set<int> mergedIds(conflictingKRIndices.begin(),
|
||||
conflictingKRIndices.end());
|
||||
for (auto const& rCRIndex : mergedIds) {
|
||||
const KeyRange kr = req.transaction.read_conflict_ranges[rCRIndex];
|
||||
wait(krmSetRangeCoalescing(hackTr, conflictingKeysPrefix, kr, allKeys,
|
||||
conflictingKeysTrue));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
hackTr.extractPtr(); // Make sure the RYW is not freed twice in case exception thrown
|
||||
throw;
|
||||
tr->info.conflictingKeys =
|
||||
std::make_shared<CoalescedKeyRangeMap<Value>>(conflictingKeysFalse, specialKeys.end);
|
||||
state Standalone<VectorRef<int>> conflictingKRIndices = ci.conflictingKRIndices.get();
|
||||
// drop duplicate indices and merge overlapped ranges
|
||||
// Note: addReadConflictRange in native transaction object does not merge overlapped ranges
|
||||
state std::unordered_set<int> mergedIds(conflictingKRIndices.begin(),
|
||||
conflictingKRIndices.end());
|
||||
for (auto const& rCRIndex : mergedIds) {
|
||||
const KeyRangeRef kr = req.transaction.read_conflict_ranges[rCRIndex];
|
||||
const KeyRange krWithPrefix = KeyRangeRef(kr.begin.withPrefix(conflictingKeysRange.begin),
|
||||
kr.end.withPrefix(conflictingKeysRange.begin));
|
||||
tr->info.conflictingKeys->insert(krWithPrefix, conflictingKeysTrue);
|
||||
}
|
||||
hackTr.extractPtr(); // Avoid the Reference to destroy the RYW object
|
||||
}
|
||||
|
||||
if (info.debugID.present())
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
#include "fdbclient/CoordinationInterface.h"
|
||||
#include "fdbclient/ClusterInterface.h"
|
||||
#include "fdbclient/ClientLogEvents.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
// CLIENT_BUGGIFY should be used to randomly introduce failures at run time (like BUGGIFY but for client side testing)
|
||||
|
@ -143,8 +144,9 @@ struct TransactionInfo {
|
|||
TaskPriority taskID;
|
||||
bool useProvisionalProxies;
|
||||
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
|
||||
// shared_ptr used here since TransactionInfo is sometimes copied as function parameters.
|
||||
std::shared_ptr<ReadYourWritesTransaction> conflictingKeysRYW;
|
||||
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
|
||||
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
|
||||
|
||||
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
|
||||
};
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/Atomic.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "fdbclient/StatusClient.h"
|
||||
#include "fdbclient/MonitorLeader.h"
|
||||
#include "flow/Util.h"
|
||||
|
@ -1229,6 +1230,10 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
|
|||
return Optional<Value>();
|
||||
}
|
||||
|
||||
// special key space are only allowed to query if both begin and end start with \xff\xff
|
||||
if (key.startsWith(specialKeys.begin))
|
||||
return getDatabase()->specialKeySpace->get(Reference<ReadYourWritesTransaction>::addRef(this), key);
|
||||
|
||||
if(checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
}
|
||||
|
@ -1280,41 +1285,10 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
|
|||
}
|
||||
}
|
||||
|
||||
// Use special key prefix "\xff\xff/transaction/conflicting_keys/<some_key>",
|
||||
// to retrieve keys which caused latest not_committed(conflicting with another transaction) error.
|
||||
// The returned key value pairs are interpretted as :
|
||||
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
|
||||
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||
// Currently, the conflicting keyranges returned are original read_conflict_ranges or union of them.
|
||||
// TODO : This interface needs to be integrated into the framework that handles special keys' calls in the future
|
||||
if (begin.getKey().startsWith(conflictingKeysAbsolutePrefix) &&
|
||||
end.getKey().startsWith(conflictingKeysAbsolutePrefix)) {
|
||||
// Remove the special key prefix "\xff\xff"
|
||||
KeyRef beginConflictingKey = begin.getKey().removePrefix(specialKeys.begin);
|
||||
KeyRef endConflictingKey = end.getKey().removePrefix(specialKeys.begin);
|
||||
// Check if the conflicting key range to be read is valid
|
||||
KeyRef maxKey = getMaxReadKey();
|
||||
if (beginConflictingKey > maxKey || endConflictingKey > maxKey) return key_outside_legal_range();
|
||||
begin.setKey(beginConflictingKey);
|
||||
end.setKey(endConflictingKey);
|
||||
if (tr.info.conflictingKeysRYW) {
|
||||
Future<Standalone<RangeResultRef>> resultWithoutPrefixFuture =
|
||||
tr.info.conflictingKeysRYW->getRange(begin, end, limits, snapshot, reverse);
|
||||
// Make sure it happens locally
|
||||
ASSERT(resultWithoutPrefixFuture.isReady());
|
||||
Standalone<RangeResultRef> resultWithoutPrefix = resultWithoutPrefixFuture.get();
|
||||
// Add prefix to results, making keys consistent with the getRange query
|
||||
Standalone<RangeResultRef> resultWithPrefix;
|
||||
resultWithPrefix.reserve(resultWithPrefix.arena(), resultWithoutPrefix.size());
|
||||
for (auto const& kv : resultWithoutPrefix) {
|
||||
KeyValueRef kvWithPrefix(kv.key.withPrefix(specialKeys.begin, resultWithPrefix.arena()), kv.value);
|
||||
resultWithPrefix.push_back(resultWithPrefix.arena(), kvWithPrefix);
|
||||
}
|
||||
return resultWithPrefix;
|
||||
} else {
|
||||
return Standalone<RangeResultRef>();
|
||||
}
|
||||
}
|
||||
// special key space are only allowed to query if both begin and end start with \xff\xff
|
||||
if (begin.getKey().startsWith(specialKeys.begin) && end.getKey().startsWith(specialKeys.begin))
|
||||
return getDatabase()->specialKeySpace->getRange(Reference<ReadYourWritesTransaction>::addRef(this), begin, end,
|
||||
limits, reverse);
|
||||
|
||||
if(checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
|
|
|
@ -131,6 +131,10 @@ public:
|
|||
Database getDatabase() const {
|
||||
return tr.getDatabase();
|
||||
}
|
||||
|
||||
const TransactionInfo& getTransactionInfo() const {
|
||||
return tr.info;
|
||||
}
|
||||
private:
|
||||
friend class RYWImpl;
|
||||
|
||||
|
|
|
@ -362,20 +362,24 @@ struct RestoreSysInfoRequest : TimedRequest {
|
|||
constexpr static FileIdentifier file_identifier = 75960741;
|
||||
|
||||
RestoreSysInfo sysInfo;
|
||||
Standalone<VectorRef<std::pair<KeyRangeRef, Version>>> rangeVersions;
|
||||
|
||||
ReplyPromise<RestoreCommonReply> reply;
|
||||
|
||||
RestoreSysInfoRequest() = default;
|
||||
explicit RestoreSysInfoRequest(RestoreSysInfo sysInfo) : sysInfo(sysInfo) {}
|
||||
explicit RestoreSysInfoRequest(RestoreSysInfo sysInfo,
|
||||
Standalone<VectorRef<std::pair<KeyRangeRef, Version>>> rangeVersions)
|
||||
: sysInfo(sysInfo), rangeVersions(rangeVersions) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, sysInfo, reply);
|
||||
serializer(ar, sysInfo, rangeVersions, reply);
|
||||
}
|
||||
|
||||
std::string toString() {
|
||||
std::stringstream ss;
|
||||
ss << "RestoreSysInfoRequest";
|
||||
ss << "RestoreSysInfoRequest "
|
||||
<< "rangeVersions.size:" << rangeVersions.size();
|
||||
return ss.str();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -0,0 +1,377 @@
|
|||
/*
|
||||
* SpecialKeySpace.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// This function will normalize the given KeySelector to a standard KeySelector:
|
||||
// orEqual == false && offset == 1 (Standard form)
|
||||
// If the corresponding key is not in this special key range, it will move as far as possible to adjust the offset to 1
|
||||
// It does have overhead here since we query all keys twice in the worst case.
|
||||
// However, moving the KeySelector while handling other parameters like limits makes the code much more complex and hard
|
||||
// to maintain Separate each part to make the code easy to understand and more compact
|
||||
ACTOR Future<Void> SpecialKeyRangeBaseImpl::normalizeKeySelectorActor(const SpecialKeyRangeBaseImpl* pkrImpl,
|
||||
Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector* ks) {
|
||||
ASSERT(!ks->orEqual); // should be removed before calling
|
||||
ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized
|
||||
|
||||
state Key startKey(pkrImpl->range.begin);
|
||||
state Key endKey(pkrImpl->range.end);
|
||||
|
||||
if (ks->offset < 1) {
|
||||
// less than the given key
|
||||
if (pkrImpl->range.contains(ks->getKey())) endKey = keyAfter(ks->getKey());
|
||||
} else {
|
||||
// greater than the given key
|
||||
if (pkrImpl->range.contains(ks->getKey())) startKey = ks->getKey();
|
||||
}
|
||||
|
||||
TraceEvent(SevDebug, "NormalizeKeySelector")
|
||||
.detail("OriginalKey", ks->getKey())
|
||||
.detail("OriginalOffset", ks->offset)
|
||||
.detail("SpecialKeyRangeStart", pkrImpl->range.begin)
|
||||
.detail("SpecialKeyRangeEnd", pkrImpl->range.end);
|
||||
|
||||
Standalone<RangeResultRef> result = wait(pkrImpl->getRange(ryw, KeyRangeRef(startKey, endKey)));
|
||||
if (result.size() == 0) {
|
||||
TraceEvent("ZeroElementsIntheRange").detail("Start", startKey).detail("End", endKey);
|
||||
return Void();
|
||||
}
|
||||
// Note : KeySelector::setKey has byte limit according to the knobs, customize it if needed
|
||||
if (ks->offset < 1) {
|
||||
if (result.size() >= 1 - ks->offset) {
|
||||
ks->setKey(KeyRef(ks->arena(), result[result.size() - (1 - ks->offset)].key));
|
||||
ks->offset = 1;
|
||||
} else {
|
||||
ks->setKey(KeyRef(ks->arena(), result[0].key));
|
||||
ks->offset += result.size();
|
||||
}
|
||||
} else {
|
||||
if (result.size() >= ks->offset) {
|
||||
ks->setKey(KeyRef(ks->arena(), result[ks->offset - 1].key));
|
||||
ks->offset = 1;
|
||||
} else {
|
||||
ks->setKey(KeyRef(ks->arena(), keyAfter(result[result.size() - 1].key)));
|
||||
ks->offset -= result.size();
|
||||
}
|
||||
}
|
||||
TraceEvent(SevDebug, "NormalizeKeySelector")
|
||||
.detail("NormalizedKey", ks->getKey())
|
||||
.detail("NormalizedOffset", ks->offset)
|
||||
.detail("SpecialKeyRangeStart", pkrImpl->range.begin)
|
||||
.detail("SpecialKeyRangeEnd", pkrImpl->range.end);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationActor(
|
||||
SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw, KeySelector begin, KeySelector end,
|
||||
GetRangeLimits limits, bool reverse) {
|
||||
// This function handles ranges which cover more than one keyrange and aggregates all results
|
||||
// KeySelector, GetRangeLimits and reverse are all handled here
|
||||
state Standalone<RangeResultRef> result;
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter;
|
||||
state int actualBeginOffset;
|
||||
state int actualEndOffset;
|
||||
|
||||
// make sure offset == 1
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator beginIter =
|
||||
pks->impls.rangeContaining(begin.getKey());
|
||||
while ((begin.offset < 1 && beginIter != pks->impls.ranges().begin()) ||
|
||||
(begin.offset > 1 && beginIter != pks->impls.ranges().end())) {
|
||||
if (beginIter->value() != nullptr)
|
||||
wait(beginIter->value()->normalizeKeySelectorActor(beginIter->value(), ryw, &begin));
|
||||
begin.offset < 1 ? --beginIter : ++beginIter;
|
||||
}
|
||||
|
||||
actualBeginOffset = begin.offset;
|
||||
if (beginIter == pks->impls.ranges().begin())
|
||||
begin.setKey(pks->range.begin);
|
||||
else if (beginIter == pks->impls.ranges().end())
|
||||
begin.setKey(pks->range.end);
|
||||
|
||||
if (!begin.isFirstGreaterOrEqual()) {
|
||||
// The Key Selector points to key outside the whole special key space
|
||||
TraceEvent(SevInfo, "BeginKeySelectorPointsOutside")
|
||||
.detail("TerminateKey", begin.getKey())
|
||||
.detail("TerminateOffset", begin.offset);
|
||||
if (begin.offset < 1 && beginIter == pks->impls.ranges().begin())
|
||||
result.readToBegin = true;
|
||||
else
|
||||
result.readThroughEnd = true;
|
||||
begin.offset = 1;
|
||||
}
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator endIter =
|
||||
pks->impls.rangeContaining(end.getKey());
|
||||
while ((end.offset < 1 && endIter != pks->impls.ranges().begin()) ||
|
||||
(end.offset > 1 && endIter != pks->impls.ranges().end())) {
|
||||
if (endIter->value() != nullptr) wait(endIter->value()->normalizeKeySelectorActor(endIter->value(), ryw, &end));
|
||||
end.offset < 1 ? --endIter : ++endIter;
|
||||
}
|
||||
|
||||
actualEndOffset = end.offset;
|
||||
if (endIter == pks->impls.ranges().begin())
|
||||
end.setKey(pks->range.begin);
|
||||
else if (endIter == pks->impls.ranges().end())
|
||||
end.setKey(pks->range.end);
|
||||
|
||||
if (!end.isFirstGreaterOrEqual()) {
|
||||
// The Key Selector points to key outside the whole special key space
|
||||
TraceEvent(SevInfo, "EndKeySelectorPointsOutside")
|
||||
.detail("TerminateKey", end.getKey())
|
||||
.detail("TerminateOffset", end.offset);
|
||||
if (end.offset < 1 && endIter == pks->impls.ranges().begin())
|
||||
result.readToBegin = true;
|
||||
else
|
||||
result.readThroughEnd = true;
|
||||
end.offset = 1;
|
||||
}
|
||||
// Handle all corner cases like what RYW does
|
||||
// return if range inverted
|
||||
if (actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
|
||||
TEST(true);
|
||||
return RangeResultRef(false, false);
|
||||
}
|
||||
// If touches begin or end, return with readToBegin and readThroughEnd flags
|
||||
if (beginIter == pks->impls.ranges().end() || endIter == pks->impls.ranges().begin()) {
|
||||
TEST(true);
|
||||
return result;
|
||||
}
|
||||
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Ranges ranges =
|
||||
pks->impls.intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
|
||||
// TODO : workaround to write this two together to make the code compact
|
||||
// The issue here is boost::iterator_range<> doest not provide rbegin(), rend()
|
||||
iter = reverse ? ranges.end() : ranges.begin();
|
||||
if (reverse) {
|
||||
while (iter != ranges.begin()) {
|
||||
--iter;
|
||||
if (iter->value() == nullptr) continue;
|
||||
KeyRangeRef kr = iter->range();
|
||||
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
|
||||
KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end;
|
||||
Standalone<RangeResultRef> pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
|
||||
result.arena().dependsOn(pairs.arena());
|
||||
// limits handler
|
||||
for (int i = pairs.size() - 1; i >= 0; --i) {
|
||||
result.push_back(result.arena(), pairs[i]);
|
||||
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it's still
|
||||
// returned. In other words, the total size of the returned value (less the last entry) will be less
|
||||
// than byteLimit
|
||||
limits.decrement(pairs[i]);
|
||||
if (limits.isReached()) {
|
||||
result.more = true;
|
||||
result.readToBegin = false;
|
||||
return result;
|
||||
};
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (iter = ranges.begin(); iter != ranges.end(); ++iter) {
|
||||
if (iter->value() == nullptr) continue;
|
||||
KeyRangeRef kr = iter->range();
|
||||
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
|
||||
KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end;
|
||||
Standalone<RangeResultRef> pairs = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
|
||||
result.arena().dependsOn(pairs.arena());
|
||||
// limits handler
|
||||
for (int i = 0; i < pairs.size(); ++i) {
|
||||
result.push_back(result.arena(), pairs[i]);
|
||||
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it's still
|
||||
// returned. In other words, the total size of the returned value (less the last entry) will be less
|
||||
// than byteLimit
|
||||
limits.decrement(pairs[i]);
|
||||
if (limits.isReached()) {
|
||||
result.more = true;
|
||||
result.readThroughEnd = false;
|
||||
return result;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector begin, KeySelector end, GetRangeLimits limits,
|
||||
bool reverse) {
|
||||
// validate limits here
|
||||
if (!limits.isValid()) return range_limits_invalid();
|
||||
if (limits.isReached()) {
|
||||
TEST(true); // read limit 0
|
||||
return Standalone<RangeResultRef>();
|
||||
}
|
||||
// make sure orEqual == false
|
||||
begin.removeOrEqual(begin.arena());
|
||||
end.removeOrEqual(end.arena());
|
||||
|
||||
return getRangeAggregationActor(this, ryw, begin, end, limits, reverse);
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRef key) {
|
||||
// use getRange to workaround this
|
||||
Standalone<RangeResultRef> result =
|
||||
wait(pks->getRange(ryw, KeySelector(firstGreaterOrEqual(key)), KeySelector(firstGreaterOrEqual(keyAfter(key))),
|
||||
GetRangeLimits(CLIENT_KNOBS->TOO_MANY), false));
|
||||
ASSERT(result.size() <= 1);
|
||||
if (result.size()) {
|
||||
return Optional<Value>(result[0].value);
|
||||
} else {
|
||||
return Optional<Value>();
|
||||
}
|
||||
}
|
||||
|
||||
Future<Optional<Value>> SpecialKeySpace::get(Reference<ReadYourWritesTransaction> ryw, const Key& key) {
|
||||
return getActor(this, ryw, key);
|
||||
}
|
||||
|
||||
ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRangeRef kr) const {
|
||||
Standalone<RangeResultRef> result;
|
||||
if (ryw->getTransactionInfo().conflictingKeys) {
|
||||
auto krMapPtr = ryw->getTransactionInfo().conflictingKeys.get();
|
||||
auto beginIter = krMapPtr->rangeContaining(kr.begin);
|
||||
if (beginIter->begin() != kr.begin) ++beginIter;
|
||||
auto endIter = krMapPtr->rangeContaining(kr.end);
|
||||
for (auto it = beginIter; it != endIter; ++it) {
|
||||
// it->begin() is stored in the CoalescedKeyRangeMap in TransactionInfo
|
||||
// it->value() is always constants in SystemData.cpp
|
||||
// Thus, push_back() can be used
|
||||
result.push_back(result.arena(), KeyValueRef(it->begin(), it->value()));
|
||||
}
|
||||
if (endIter->begin() != kr.end)
|
||||
result.push_back(result.arena(), KeyValueRef(endIter->begin(), endIter->value()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
class SpecialKeyRangeTestImpl : public SpecialKeyRangeBaseImpl {
|
||||
public:
|
||||
explicit SpecialKeyRangeTestImpl(KeyRangeRef kr, const std::string& prefix, int size)
|
||||
: SpecialKeyRangeBaseImpl(kr), prefix(prefix), size(size) {
|
||||
ASSERT(size > 0);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
kvs.push_back_deep(kvs.arena(),
|
||||
KeyValueRef(getKeyForIndex(i), deterministicRandom()->randomAlphaNumeric(16)));
|
||||
}
|
||||
}
|
||||
|
||||
KeyValueRef getKeyValueForIndex(int idx) { return kvs[idx]; }
|
||||
|
||||
Key getKeyForIndex(int idx) { return Key(prefix + format("%010d", idx)).withPrefix(range.begin); }
|
||||
int getSize() { return size; }
|
||||
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRangeRef kr) const override {
|
||||
int startIndex = 0, endIndex = size;
|
||||
while (startIndex < size && kvs[startIndex].key < kr.begin) ++startIndex;
|
||||
while (endIndex > startIndex && kvs[endIndex - 1].key >= kr.end) --endIndex;
|
||||
if (startIndex == endIndex)
|
||||
return Standalone<RangeResultRef>();
|
||||
else
|
||||
return Standalone<RangeResultRef>(RangeResultRef(kvs.slice(startIndex, endIndex), false));
|
||||
}
|
||||
|
||||
private:
|
||||
Standalone<VectorRef<KeyValueRef>> kvs;
|
||||
std::string prefix;
|
||||
int size;
|
||||
};
|
||||
|
||||
TEST_CASE("/fdbclient/SpecialKeySpace/Unittest") {
|
||||
SpecialKeySpace pks(normalKeys.begin, normalKeys.end);
|
||||
SpecialKeyRangeTestImpl pkr1(KeyRangeRef(LiteralStringRef("/cat/"), LiteralStringRef("/cat/\xff")), "small", 10);
|
||||
SpecialKeyRangeTestImpl pkr2(KeyRangeRef(LiteralStringRef("/dog/"), LiteralStringRef("/dog/\xff")), "medium", 100);
|
||||
SpecialKeyRangeTestImpl pkr3(KeyRangeRef(LiteralStringRef("/pig/"), LiteralStringRef("/pig/\xff")), "large", 1000);
|
||||
pks.registerKeyRange(pkr1.getKeyRange(), &pkr1);
|
||||
pks.registerKeyRange(pkr2.getKeyRange(), &pkr2);
|
||||
pks.registerKeyRange(pkr3.getKeyRange(), &pkr3);
|
||||
auto nullRef = Reference<ReadYourWritesTransaction>();
|
||||
// get
|
||||
{
|
||||
auto resultFuture = pks.get(nullRef, LiteralStringRef("/cat/small0000000009"));
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue().get();
|
||||
ASSERT(result == pkr1.getKeyValueForIndex(9).value);
|
||||
auto emptyFuture = pks.get(nullRef, LiteralStringRef("/cat/small0000000010"));
|
||||
ASSERT(emptyFuture.isReady());
|
||||
auto emptyResult = emptyFuture.getValue();
|
||||
ASSERT(!emptyResult.present());
|
||||
}
|
||||
// general getRange
|
||||
{
|
||||
KeySelector start = KeySelectorRef(LiteralStringRef("/elepant"), false, -9);
|
||||
KeySelector end = KeySelectorRef(LiteralStringRef("/frog"), false, +11);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits());
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
ASSERT(result.size() == 20);
|
||||
ASSERT(result[0].key == pkr2.getKeyForIndex(90));
|
||||
ASSERT(result[result.size() - 1].key == pkr3.getKeyForIndex(9));
|
||||
}
|
||||
// KeySelector points outside
|
||||
{
|
||||
KeySelector start = KeySelectorRef(pkr3.getKeyForIndex(999), true, -1110);
|
||||
KeySelector end = KeySelectorRef(pkr1.getKeyForIndex(0), false, +1112);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits());
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
ASSERT(result.size() == 1110);
|
||||
ASSERT(result[0].key == pkr1.getKeyForIndex(0));
|
||||
ASSERT(result[result.size() - 1].key == pkr3.getKeyForIndex(999));
|
||||
}
|
||||
// GetRangeLimits with row limit
|
||||
{
|
||||
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
|
||||
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(0), false, 0);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits(2));
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
ASSERT(result.size() == 2);
|
||||
ASSERT(result[0].key == pkr2.getKeyForIndex(0));
|
||||
ASSERT(result[1].key == pkr2.getKeyForIndex(1));
|
||||
}
|
||||
// GetRangeLimits with byte limit
|
||||
{
|
||||
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
|
||||
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(0), false, 0);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits(10, 100));
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
int bytes = 0;
|
||||
for (int i = 0; i < result.size() - 1; ++i) bytes += 8 + pkr2.getKeyValueForIndex(i).expectedSize();
|
||||
ASSERT(bytes < 100);
|
||||
ASSERT(bytes + 8 + pkr2.getKeyValueForIndex(result.size()).expectedSize() >= 100);
|
||||
}
|
||||
// reverse test with overlapping key range
|
||||
{
|
||||
KeySelector start = KeySelectorRef(pkr2.getKeyForIndex(0), true, 0);
|
||||
KeySelector end = KeySelectorRef(pkr3.getKeyForIndex(999), true, +1);
|
||||
auto resultFuture = pks.getRange(nullRef, start, end, GetRangeLimits(1100), true);
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
for (int i = 0; i < pkr3.getSize(); ++i) ASSERT(result[i] == pkr3.getKeyValueForIndex(pkr3.getSize() - 1 - i));
|
||||
for (int i = 0; i < pkr2.getSize(); ++i)
|
||||
ASSERT(result[i + pkr3.getSize()] == pkr2.getKeyValueForIndex(pkr2.getSize() - 1 - i));
|
||||
}
|
||||
return Void();
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* SpecialKeySpace.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_SPECIALKEYSPACE_ACTOR_G_H)
|
||||
#define FDBCLIENT_SPECIALKEYSPACE_ACTOR_G_H
|
||||
#include "fdbclient/SpecialKeySpace.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_SPECIALKEYSPACE_ACTOR_H)
|
||||
#define FDBCLIENT_SPECIALKEYSPACE_ACTOR_H
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
class SpecialKeyRangeBaseImpl {
|
||||
public:
|
||||
// Each derived class only needs to implement this simple version of getRange
|
||||
virtual Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRangeRef kr) const = 0;
|
||||
|
||||
explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {}
|
||||
KeyRangeRef getKeyRange() const { return range; }
|
||||
ACTOR Future<Void> normalizeKeySelectorActor(const SpecialKeyRangeBaseImpl* pkrImpl,
|
||||
Reference<ReadYourWritesTransaction> ryw, KeySelector* ks);
|
||||
|
||||
protected:
|
||||
KeyRange range; // underlying key range for this function
|
||||
};
|
||||
|
||||
class SpecialKeySpace {
|
||||
public:
|
||||
Future<Optional<Value>> get(Reference<ReadYourWritesTransaction> ryw, const Key& key);
|
||||
|
||||
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw, KeySelector begin,
|
||||
KeySelector end, GetRangeLimits limits, bool reverse = false);
|
||||
|
||||
SpecialKeySpace(KeyRef spaceStartKey = Key(), KeyRef spaceEndKey = normalKeys.end) {
|
||||
// Default value is nullptr, begin of KeyRangeMap is Key()
|
||||
impls = KeyRangeMap<SpecialKeyRangeBaseImpl*>(nullptr, spaceEndKey);
|
||||
range = KeyRangeRef(spaceStartKey, spaceEndKey);
|
||||
}
|
||||
void registerKeyRange(const KeyRangeRef& kr, SpecialKeyRangeBaseImpl* impl) {
|
||||
// range check
|
||||
// TODO: add range check not to be replaced by overlapped ones
|
||||
ASSERT(kr.begin >= range.begin && kr.end <= range.end);
|
||||
// make sure the registered range is not overlapping with existing ones
|
||||
// Note: kr.end should not be the same as another range's begin, although it should work even they are the same
|
||||
ASSERT(impls.rangeContaining(kr.begin) == impls.rangeContaining(kr.end) && impls[kr.begin] == nullptr);
|
||||
impls.insert(kr, impl);
|
||||
}
|
||||
|
||||
private:
|
||||
ACTOR Future<Optional<Value>> getActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw, KeyRef key);
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> getRangeAggregationActor(SpecialKeySpace* pks,
|
||||
Reference<ReadYourWritesTransaction> ryw,
|
||||
KeySelector begin, KeySelector end,
|
||||
GetRangeLimits limits, bool reverse);
|
||||
|
||||
KeyRangeMap<SpecialKeyRangeBaseImpl*> impls;
|
||||
KeyRange range;
|
||||
};
|
||||
|
||||
// Use special key prefix "\xff\xff/transaction/conflicting_keys/<some_key>",
|
||||
// to retrieve keys which caused latest not_committed(conflicting with another transaction) error.
|
||||
// The returned key value pairs are interpretted as :
|
||||
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
|
||||
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||
// Currently, the conflicting keyranges returned are original read_conflict_ranges or union of them.
|
||||
class ConflictingKeysImpl : public SpecialKeyRangeBaseImpl {
|
||||
public:
|
||||
explicit ConflictingKeysImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRangeRef kr) const override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
|
@ -30,7 +30,7 @@ const KeyRangeRef systemKeys(systemKeysPrefix, LiteralStringRef("\xff\xff") );
|
|||
const KeyRangeRef nonMetadataSystemKeys(LiteralStringRef("\xff\x02"), LiteralStringRef("\xff\x03"));
|
||||
const KeyRangeRef allKeys = KeyRangeRef(normalKeys.begin, systemKeys.end);
|
||||
const KeyRef afterAllKeys = LiteralStringRef("\xff\xff\x00");
|
||||
const KeyRangeRef specialKeys = KeyRangeRef(LiteralStringRef("\xff\xff"), LiteralStringRef("\xff\xff\xff\xff"));
|
||||
const KeyRangeRef specialKeys = KeyRangeRef(LiteralStringRef("\xff\xff"), LiteralStringRef("\xff\xff\xff"));
|
||||
|
||||
// keyServersKeys.contains(k) iff k.startsWith(keyServersPrefix)
|
||||
const KeyRangeRef keyServersKeys( LiteralStringRef("\xff/keyServers/"), LiteralStringRef("\xff/keyServers0") );
|
||||
|
@ -108,8 +108,8 @@ void decodeKeyServersValue( Standalone<RangeResultRef> result, const ValueRef& v
|
|||
std::sort(dest.begin(), dest.end());
|
||||
}
|
||||
|
||||
const KeyRef conflictingKeysPrefix = LiteralStringRef("/transaction/conflicting_keys/");
|
||||
const Key conflictingKeysAbsolutePrefix = conflictingKeysPrefix.withPrefix(specialKeys.begin);
|
||||
const KeyRangeRef conflictingKeysRange = KeyRangeRef(LiteralStringRef("\xff\xff/transaction/conflicting_keys/"),
|
||||
LiteralStringRef("\xff\xff/transaction/conflicting_keys/\xff"));
|
||||
const ValueRef conflictingKeysTrue = LiteralStringRef("1");
|
||||
const ValueRef conflictingKeysFalse = LiteralStringRef("0");
|
||||
|
||||
|
|
|
@ -40,7 +40,8 @@ extern const KeyRangeRef normalKeys; // '' to systemKeys.begin
|
|||
extern const KeyRangeRef systemKeys; // [FF] to [FF][FF]
|
||||
extern const KeyRangeRef nonMetadataSystemKeys; // [FF][00] to [FF][01]
|
||||
extern const KeyRangeRef allKeys; // '' to systemKeys.end
|
||||
extern const KeyRangeRef specialKeys; // [FF][FF] to [FF][FF][FF][FF]
|
||||
extern const KeyRangeRef specialKeys; // [FF][FF] to [FF][FF][FF], some client functions are exposed through FDB calls
|
||||
// using these special keys, see pr#2662
|
||||
extern const KeyRef afterAllKeys;
|
||||
|
||||
// "\xff/keyServers/[[begin]]" := "[[vector<serverID>, vector<serverID>]|[vector<Tag>, vector<Tag>]]"
|
||||
|
@ -74,8 +75,7 @@ const Key serverKeysPrefixFor( UID serverID );
|
|||
UID serverKeysDecodeServer( const KeyRef& key );
|
||||
bool serverHasKey( ValueRef storedValue );
|
||||
|
||||
extern const KeyRef conflictingKeysPrefix;
|
||||
extern const Key conflictingKeysAbsolutePrefix;
|
||||
extern const KeyRangeRef conflictingKeysRange;
|
||||
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;
|
||||
|
||||
extern const KeyRef cacheKeysPrefix;
|
||||
|
|
|
@ -0,0 +1,248 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<Project DefaultTargets="Build" ToolsVersion="14.1" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<Import Project="$(SolutionDir)versions.target" />
|
||||
<PropertyGroup Condition="'$(Release)' != 'true' ">
|
||||
<PreReleaseDecoration>-PRERELEASE</PreReleaseDecoration>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition="'$(Release)' == 'true' ">
|
||||
<PreReleaseDecoration>
|
||||
</PreReleaseDecoration>
|
||||
<PreprocessorDefinitions>FDB_CLEAN_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
|
||||
</PropertyGroup>
|
||||
<ItemGroup Label="ProjectConfigurations">
|
||||
<ProjectConfiguration Include="Debug|X64">
|
||||
<Configuration>Debug</Configuration>
|
||||
<Platform>X64</Platform>
|
||||
</ProjectConfiguration>
|
||||
<ProjectConfiguration Include="Release|X64">
|
||||
<Configuration>Release</Configuration>
|
||||
<Platform>X64</Platform>
|
||||
</ProjectConfiguration>
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ActorCompiler Include="AsyncFileBlobStore.actor.h">
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="Atomic.h" />
|
||||
<ClInclude Include="BackupContainer.h" />
|
||||
<ActorCompiler Include="BackupAgent.actor.h">
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="BlobStore.h" />
|
||||
<ClInclude Include="ClientLogEvents.h" />
|
||||
<ClInclude Include="ClientWorkerInterface.h" />
|
||||
<ClInclude Include="ClusterInterface.h" />
|
||||
<ClInclude Include="CommitTransaction.h" />
|
||||
<ClInclude Include="CoordinationInterface.h" />
|
||||
<ClInclude Include="DatabaseConfiguration.h" />
|
||||
<ClInclude Include="DatabaseContext.h" />
|
||||
<ActorCompiler Include="EventTypes.actor.h">
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="FDBOptions.g.h" />
|
||||
<ClInclude Include="FDBOptions.h" />
|
||||
<ClInclude Include="FDBTypes.h" />
|
||||
<ClInclude Include="HTTP.h" />
|
||||
<ClInclude Include="KeyBackedTypes.h" />
|
||||
<ClInclude Include="MetricLogger.h" />
|
||||
<ClInclude Include="IClientApi.h" />
|
||||
<ClInclude Include="JsonBuilder.h" />
|
||||
<ClInclude Include="JSONDoc.h" />
|
||||
<ClInclude Include="json_spirit\json_spirit_error_position.h" />
|
||||
<ClInclude Include="json_spirit\json_spirit_reader_template.h" />
|
||||
<ClInclude Include="json_spirit\json_spirit_value.h" />
|
||||
<ClInclude Include="json_spirit\json_spirit_writer_options.h" />
|
||||
<ClInclude Include="json_spirit\json_spirit_writer_template.h" />
|
||||
<ClInclude Include="KeyRangeMap.h" />
|
||||
<ClInclude Include="Knobs.h" />
|
||||
<ClInclude Include="libb64\cdecode.h" />
|
||||
<ClInclude Include="libb64\cencode.h" />
|
||||
<ClInclude Include="libb64\decode.h" />
|
||||
<ClInclude Include="libb64\encode.h" />
|
||||
<ActorCompiler Include="ManagementAPI.actor.h">
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="MasterProxyInterface.h" />
|
||||
<ClInclude Include="md5\md5.h" />
|
||||
<ClInclude Include="MonitorLeader.h" />
|
||||
<ClInclude Include="MultiVersionAssignmentVars.h" />
|
||||
<ClInclude Include="MultiVersionTransaction.h" />
|
||||
<ClInclude Include="MutationList.h" />
|
||||
<ActorCompiler Include="NativeAPI.actor.h">
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="Notified.h" />
|
||||
<ClInclude Include="ReadYourWrites.h" />
|
||||
<ActorCompiler Include="RunTransaction.actor.h" />
|
||||
<ClInclude Include="RYWIterator.h" />
|
||||
<ClInclude Include="Schemas.h" />
|
||||
<ClInclude Include="sha1\SHA1.h" />
|
||||
<ClInclude Include="SnapshotCache.h" />
|
||||
<ActorCompiler Include="SpecialKeySpace.actor.h" />
|
||||
<ClInclude Include="Status.h" />
|
||||
<ClInclude Include="StatusClient.h" />
|
||||
<ClInclude Include="StorageServerInterface.h" />
|
||||
<ClInclude Include="Subspace.h" />
|
||||
<ClInclude Include="SystemData.h" />
|
||||
<ActorCompiler Include="RestoreWorkerInterface.actor.h">
|
||||
<EnableCompile>false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="TaskBucket.h" />
|
||||
<ClInclude Include="ThreadSafeTransaction.h" />
|
||||
<ClInclude Include="Tuple.h" />
|
||||
<ActorCompiler Include="VersionedMap.actor.h">
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
|
||||
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="VersionedMap.h" />
|
||||
<ClInclude Include="WriteMap.h" />
|
||||
<ClInclude Include="zipf.h" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ActorCompiler Include="AsyncFileBlobStore.actor.cpp" />
|
||||
<ClCompile Include="AutoPublicAddress.cpp" />
|
||||
<ActorCompiler Include="BackupAgentBase.actor.cpp" />
|
||||
<ActorCompiler Include="BackupContainer.actor.cpp" />
|
||||
<ActorCompiler Include="BlobStore.actor.cpp" />
|
||||
<ActorCompiler Include="DatabaseBackupAgent.actor.cpp" />
|
||||
<ClCompile Include="DatabaseConfiguration.cpp" />
|
||||
<ClCompile Include="FDBOptions.g.cpp" />
|
||||
<ActorCompiler Include="FileBackupAgent.actor.cpp" />
|
||||
<ActorCompiler Include="HTTP.actor.cpp" />
|
||||
<ActorCompiler Include="KeyRangeMap.actor.cpp" />
|
||||
<ClCompile Include="Knobs.cpp" />
|
||||
<ClCompile Include="libb64\cdecode.c" />
|
||||
<ClCompile Include="libb64\cencode.c" />
|
||||
<ClCompile Include="md5\md5.c" />
|
||||
<ActorCompiler Include="MetricLogger.actor.cpp" />
|
||||
<ActorCompiler Include="MonitorLeader.actor.cpp" />
|
||||
<ActorCompiler Include="ManagementAPI.actor.cpp" />
|
||||
<ActorCompiler Include="MultiVersionTransaction.actor.cpp" />
|
||||
<ActorCompiler Include="NativeAPI.actor.cpp" />
|
||||
<ActorCompiler Include="ReadYourWrites.actor.cpp" />
|
||||
<ClCompile Include="RYWIterator.cpp" />
|
||||
<ActorCompiler Include="StatusClient.actor.cpp" />
|
||||
<ClCompile Include="Schemas.cpp" />
|
||||
<ClCompile Include="SystemData.cpp" />
|
||||
<ClCompile Include="sha1\SHA1.cpp" />
|
||||
<ActorCompiler Include="SpecialKeySpace.actor.cpp" />
|
||||
<ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
|
||||
<ActorCompiler Include="TaskBucket.actor.cpp" />
|
||||
<ClCompile Include="Subspace.cpp" />
|
||||
<ClCompile Include="Tuple.cpp" />
|
||||
<ClCompile Include="JsonBuilder.cpp" />
|
||||
<ClCompile Include="zipf.c" />
|
||||
</ItemGroup>
|
||||
<PropertyGroup Label="Globals">
|
||||
<ProjectGUID>{E2939DAA-238E-4970-96C4-4C57980F93BD}</ProjectGUID>
|
||||
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
|
||||
<Keyword>Win32Proj</Keyword>
|
||||
<RootNamespace>flow</RootNamespace>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup>
|
||||
<OutDir>$(SolutionDir)bin\$(Configuration)\</OutDir>
|
||||
<IntDir>$(SystemDrive)\temp\msvcfdb\$(Platform)$(Configuration)\$(MSBuildProjectName)\</IntDir>
|
||||
<BuildLogFile>$(IntDir)\$(MSBuildProjectName).log</BuildLogFile>
|
||||
</PropertyGroup>
|
||||
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|X64'" Label="Configuration">
|
||||
<ConfigurationType>StaticLibrary</ConfigurationType>
|
||||
<CharacterSet>MultiByte</CharacterSet>
|
||||
<PlatformToolset>v141</PlatformToolset>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|X64'" Label="Configuration">
|
||||
<ConfigurationType>StaticLibrary</ConfigurationType>
|
||||
<CharacterSet>MultiByte</CharacterSet>
|
||||
<PlatformToolset>v141</PlatformToolset>
|
||||
</PropertyGroup>
|
||||
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
|
||||
<ImportGroup Label="ExtensionSettings">
|
||||
</ImportGroup>
|
||||
<ImportGroup Label="PropertySheets">
|
||||
<Import Project="$(LocalAppData)\Microsoft\VisualStudio\10.0\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(LocalAppData)\Microsoft\VisualStudio\10.0\Microsoft.Cpp.$(Platform).user.props')" />
|
||||
</ImportGroup>
|
||||
<PropertyGroup Label="UserMacros" />
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">
|
||||
<LinkIncremental>true</LinkIncremental>
|
||||
<IncludePath>$(IncludePath);../;C:\Program Files\boost_1_67_0</IncludePath>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|X64'">
|
||||
<LinkIncremental>false</LinkIncremental>
|
||||
<IncludePath>$(IncludePath);../;C:\Program Files\boost_1_67_0</IncludePath>
|
||||
</PropertyGroup>
|
||||
<ItemDefinitionGroup>
|
||||
<ClCompile>
|
||||
<PreprocessorDefinitions>FDB_VT_VERSION="$(Version)$(PreReleaseDecoration)";FDB_VT_PACKAGE_NAME="$(PackageName)";%(PreprocessorDefinitions)</PreprocessorDefinitions>
|
||||
<LanguageStandard>stdcpp17</LanguageStandard>
|
||||
</ClCompile>
|
||||
</ItemDefinitionGroup>
|
||||
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">
|
||||
<ClCompile>
|
||||
<PrecompiledHeader>
|
||||
</PrecompiledHeader>
|
||||
<WarningLevel>Level3</WarningLevel>
|
||||
<MinimalRebuild>false</MinimalRebuild>
|
||||
<DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
|
||||
<Optimization>Disabled</Optimization>
|
||||
<BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
|
||||
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
|
||||
<PreprocessorDefinitions>TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;_DEBUG;_HAS_ITERATOR_DEBUGGING=0;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
|
||||
<AdditionalIncludeDirectories>%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
|
||||
<MultiProcessorCompilation>true</MultiProcessorCompilation>
|
||||
<AdditionalOptions>/bigobj @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
|
||||
<LanguageStandard>stdcpp17</LanguageStandard>
|
||||
</ClCompile>
|
||||
<Link>
|
||||
<SubSystem>Console</SubSystem>
|
||||
<GenerateDebugInformation>true</GenerateDebugInformation>
|
||||
<AdditionalDependencies>Advapi32.lib</AdditionalDependencies>
|
||||
</Link>
|
||||
<Lib>
|
||||
<AdditionalDependencies>$(TargetDir)flow.lib;$(TargetDir)fdbrpc.lib;winmm.lib</AdditionalDependencies>
|
||||
</Lib>
|
||||
</ItemDefinitionGroup>
|
||||
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|X64'">
|
||||
<ClCompile>
|
||||
<WarningLevel>Level3</WarningLevel>
|
||||
<PrecompiledHeader>
|
||||
</PrecompiledHeader>
|
||||
<DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
|
||||
<Optimization>Full</Optimization>
|
||||
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
|
||||
<IntrinsicFunctions>true</IntrinsicFunctions>
|
||||
<PreprocessorDefinitions>TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
|
||||
<AdditionalIncludeDirectories>%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
|
||||
<EnableEnhancedInstructionSet>NotSet</EnableEnhancedInstructionSet>
|
||||
<EnablePREfast>false</EnablePREfast>
|
||||
<AdditionalOptions>/bigobj @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
|
||||
<MultiProcessorCompilation>true</MultiProcessorCompilation>
|
||||
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
|
||||
<MinimalRebuild>false</MinimalRebuild>
|
||||
<LanguageStandard>stdcpp17</LanguageStandard>
|
||||
</ClCompile>
|
||||
<Link>
|
||||
<SubSystem>Console</SubSystem>
|
||||
<GenerateDebugInformation>true</GenerateDebugInformation>
|
||||
<EnableCOMDATFolding>false</EnableCOMDATFolding>
|
||||
<OptimizeReferences>false</OptimizeReferences>
|
||||
<LinkTimeCodeGeneration>Default</LinkTimeCodeGeneration>
|
||||
<AdditionalDependencies>Advapi32.lib</AdditionalDependencies>
|
||||
<AdditionalOptions>/LTCG %(AdditionalOptions)</AdditionalOptions>
|
||||
</Link>
|
||||
<Lib>
|
||||
<AdditionalDependencies>$(TargetDir)flow.lib;$(TargetDir)fdbrpc.lib;winmm.lib</AdditionalDependencies>
|
||||
</Lib>
|
||||
</ItemDefinitionGroup>
|
||||
<ImportGroup Label="ExtensionTargets">
|
||||
<Import Project="..\flow\actorcompiler\ActorCompiler.targets" />
|
||||
</ImportGroup>
|
||||
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
|
||||
<Target Name="MyPreCompileSteps" AfterTargets="CLCompile">
|
||||
<Exec Command=""$(SolutionDir)bin\$(Configuration)\coveragetool.exe" "$(OutDir)coverage.$(TargetName).xml" @(ActorCompiler -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLInclude -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLCompile -> '%(RelativeDir)%(Filename)%(Extension)', ' ')" />
|
||||
</Target>
|
||||
</Project>
|
|
@ -97,6 +97,7 @@ public:
|
|||
#endif
|
||||
|
||||
static Future<Reference<IAsyncFile>> open( std::string filename, int flags, int mode, void* ignore ) {
|
||||
ASSERT( !FLOW_KNOBS->DISABLE_POSIX_KERNEL_AIO );
|
||||
ASSERT( flags & OPEN_UNBUFFERED );
|
||||
|
||||
if (flags & OPEN_LOCK)
|
||||
|
@ -153,6 +154,7 @@ public:
|
|||
}
|
||||
|
||||
static void init( Reference<IEventFD> ev, double ioTimeout ) {
|
||||
ASSERT( !FLOW_KNOBS->DISABLE_POSIX_KERNEL_AIO );
|
||||
if( !g_network->isSimulated() ) {
|
||||
ctx.countAIOSubmit.init(LiteralStringRef("AsyncFile.CountAIOSubmit"));
|
||||
ctx.countAIOCollect.init(LiteralStringRef("AsyncFile.CountAIOCollect"));
|
||||
|
@ -578,7 +580,7 @@ private:
|
|||
static Context ctx;
|
||||
|
||||
explicit AsyncFileKAIO(int fd, int flags, std::string const& filename) : fd(fd), flags(flags), filename(filename), failed(false) {
|
||||
|
||||
ASSERT( !FLOW_KNOBS->DISABLE_POSIX_KERNEL_AIO );
|
||||
if( !g_network->isSimulated() ) {
|
||||
countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename);
|
||||
countFileLogicalReads.init( LiteralStringRef("AsyncFile.CountFileLogicalReads"), filename);
|
||||
|
|
|
@ -93,7 +93,8 @@ Net2FileSystem::Net2FileSystem(double ioTimeout, std::string fileSystemPath)
|
|||
{
|
||||
Net2AsyncFile::init();
|
||||
#ifdef __linux__
|
||||
AsyncFileKAIO::init( Reference<IEventFD>(N2::ASIOReactor::getEventFD()), ioTimeout );
|
||||
if (!FLOW_KNOBS->DISABLE_POSIX_KERNEL_AIO)
|
||||
AsyncFileKAIO::init( Reference<IEventFD>(N2::ASIOReactor::getEventFD()), ioTimeout );
|
||||
|
||||
if (fileSystemPath.empty()) {
|
||||
checkFileSystem = false;
|
||||
|
|
|
@ -112,6 +112,7 @@ public:
|
|||
Val const& operator[]( const Key& k ) { return rangeContaining(k).value(); }
|
||||
|
||||
Ranges ranges() { return Ranges( Iterator(map.begin()), Iterator(map.lastItem()) ); }
|
||||
// intersectingRanges returns [begin, end] where begin <= r.begin and end >= r.end
|
||||
Ranges intersectingRanges( const Range& r ) { return Ranges(rangeContaining(r.begin), Iterator(map.lower_bound(r.end))); }
|
||||
// containedRanges() will return all ranges that are fully contained by the passed range (note that a range fully contains itself)
|
||||
Ranges containedRanges( const Range& r ) {
|
||||
|
|
|
@ -178,6 +178,7 @@ set(FDBSERVER_SRCS
|
|||
workloads/Sideband.actor.cpp
|
||||
workloads/SlowTaskWorkload.actor.cpp
|
||||
workloads/SnapTest.actor.cpp
|
||||
workloads/SpecialKeySpaceCorrectness.actor.cpp
|
||||
workloads/StatusWorkload.actor.cpp
|
||||
workloads/Storefront.actor.cpp
|
||||
workloads/StreamingRead.actor.cpp
|
||||
|
|
|
@ -581,8 +581,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( FASTRESTORE_WAIT_FOR_MEMORY_LATENCY, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_WAIT_FOR_MEMORY_LATENCY = 60; }
|
||||
init( FASTRESTORE_HEARTBEAT_DELAY, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_HEARTBEAT_DELAY = deterministicRandom()->random01() * 120 + 2; }
|
||||
init( FASTRESTORE_HEARTBEAT_MAX_DELAY, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_HEARTBEAT_MAX_DELAY = FASTRESTORE_HEARTBEAT_DELAY * 10; }
|
||||
init( FASTRESTORE_APPLIER_FETCH_KEYS_SIZE, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLIER_FETCH_KEYS_SIZE = deterministicRandom()->random01() * 10240 + 1; }
|
||||
init( FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES, 1.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 + 1; }
|
||||
init( FASTRESTORE_APPLIER_FETCH_KEYS_SIZE, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLIER_FETCH_KEYS_SIZE = deterministicRandom()->random01() * 10240 + 1; }
|
||||
init( FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES, 1.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 + 1; }
|
||||
init( FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE, false ); if( randomize && BUGGIFY ) { FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE = deterministicRandom()->random01() < 0.5 ? true : false; }
|
||||
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -521,6 +521,7 @@ public:
|
|||
int64_t FASTRESTORE_HEARTBEAT_MAX_DELAY; // master claim a node is down if no heart beat from the node for this delay
|
||||
int64_t FASTRESTORE_APPLIER_FETCH_KEYS_SIZE; // number of keys to fetch in a txn on applier
|
||||
int64_t FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES; // desired size of mutation message sent from loader to appliers
|
||||
bool FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE; // parse each range file to get (range, version) it has?
|
||||
|
||||
ServerKnobs();
|
||||
void initialize(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);
|
||||
|
|
|
@ -38,7 +38,8 @@ typedef std::map<Standalone<StringRef>, uint32_t> SerializedMutationPartMap;
|
|||
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier);
|
||||
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
|
||||
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
|
||||
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
|
||||
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
SerializedMutationListMap* mutationMap,
|
||||
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
|
||||
const RestoreAsset& asset);
|
||||
|
@ -126,6 +127,21 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
|
|||
return Void();
|
||||
}
|
||||
|
||||
static inline bool _logMutationTooOld(KeyRangeMap<Version>* pRangeVersions, KeyRangeRef keyRange, Version v) {
|
||||
auto ranges = pRangeVersions->intersectingRanges(keyRange);
|
||||
Version minVersion = MAX_VERSION;
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
minVersion = std::min(minVersion, r->value());
|
||||
}
|
||||
return minVersion >= v;
|
||||
}
|
||||
|
||||
static inline bool logMutationTooOld(KeyRangeMap<Version>* pRangeVersions, MutationRef mutation, Version v) {
|
||||
return isRangeMutation(mutation)
|
||||
? _logMutationTooOld(pRangeVersions, KeyRangeRef(mutation.param1, mutation.param2), v)
|
||||
: _logMutationTooOld(pRangeVersions, KeyRangeRef(singleKeyRange(mutation.param1)), v);
|
||||
}
|
||||
|
||||
// Assume: Only update the local data if it (applierInterf) has not been set
|
||||
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self) {
|
||||
TraceEvent("FastRestoreLoader", self->id()).detail("HandleRestoreSysInfoRequest", self->id());
|
||||
|
@ -138,6 +154,23 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
|
|||
}
|
||||
|
||||
self->appliersInterf = req.sysInfo.appliers;
|
||||
// Update rangeVersions
|
||||
ASSERT(req.rangeVersions.size() > 0); // At least the min version of range files will be used
|
||||
ASSERT(self->rangeVersions.size() == 1); // rangeVersions has not been set
|
||||
for (auto rv = req.rangeVersions.begin(); rv != req.rangeVersions.end(); ++rv) {
|
||||
self->rangeVersions.insert(rv->first, rv->second);
|
||||
}
|
||||
|
||||
// Debug message for range version in each loader
|
||||
auto ranges = self->rangeVersions.ranges();
|
||||
int i = 0;
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
TraceEvent("FastRestoreLoader", self->id())
|
||||
.detail("RangeIndex", i++)
|
||||
.detail("RangeBegin", r->begin())
|
||||
.detail("RangeEnd", r->end())
|
||||
.detail("Version", r->value());
|
||||
}
|
||||
|
||||
req.reply.send(RestoreCommonReply(self->id()));
|
||||
}
|
||||
|
@ -145,8 +178,10 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
|
|||
// Parse a data block in a partitioned mutation log file and store mutations
|
||||
// into "kvOpsIter" and samples into "samplesIter".
|
||||
ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
||||
NotifiedVersion* processedFileOffset, std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, RestoreAsset asset) {
|
||||
KeyRangeMap<Version>* pRangeVersions, NotifiedVersion* processedFileOffset,
|
||||
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
|
||||
RestoreAsset asset) {
|
||||
state Standalone<StringRef> buf = makeString(asset.len);
|
||||
state Reference<IAsyncFile> file = wait(bc->readFile(asset.filename));
|
||||
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
|
||||
|
@ -190,6 +225,12 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
MutationRef mutation;
|
||||
rd >> mutation;
|
||||
|
||||
// Skip mutation whose commitVesion < range kv's version
|
||||
if (logMutationTooOld(pRangeVersions, mutation, msgVersion.version)) {
|
||||
cc->oldLogMutations += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Should this mutation be skipped?
|
||||
if (mutation.param1 >= asset.range.end ||
|
||||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
|
||||
|
@ -228,7 +269,8 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatchData> batchData, UID loaderID,
|
||||
ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, LoadingParam param,
|
||||
Reference<LoaderBatchData> batchData, UID loaderID,
|
||||
Reference<IBackupContainer> bc) {
|
||||
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
|
||||
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
|
||||
|
@ -263,8 +305,9 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatc
|
|||
} else {
|
||||
// TODO: Sanity check the log file's range is overlapped with the restored version range
|
||||
if (param.isPartitionedLog()) {
|
||||
fileParserFutures.push_back(_parsePartitionedLogFileOnLoader(&processedFileOffset, kvOpsPerLPIter,
|
||||
samplesIter, bc, subAsset));
|
||||
fileParserFutures.push_back(_parsePartitionedLogFileOnLoader(pRangeVersions, &processedFileOffset,
|
||||
kvOpsPerLPIter, samplesIter,
|
||||
&batchData->counters, bc, subAsset));
|
||||
} else {
|
||||
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
|
||||
&mutationPartMap, bc, subAsset));
|
||||
|
@ -274,7 +317,8 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatc
|
|||
wait(waitForAll(fileParserFutures));
|
||||
|
||||
if (!param.isRangeFile && !param.isPartitionedLog()) {
|
||||
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters, param.asset);
|
||||
_parseSerializedMutation(pRangeVersions, kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters,
|
||||
param.asset);
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreLoaderProcessLoadingParamDone", loaderID).detail("LoadingParam", param.toString());
|
||||
|
@ -304,7 +348,8 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("ProcessLoadParam", req.param.toString());
|
||||
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
|
||||
batchData->processedFileParams[req.param] = _processLoadingParam(req.param, batchData, self->id(), self->bc);
|
||||
batchData->processedFileParams[req.param] =
|
||||
_processLoadingParam(&self->rangeVersions, req.param, batchData, self->id(), self->bc);
|
||||
isDuplicated = false;
|
||||
} else {
|
||||
TraceEvent("FastRestoreLoadFile", self->id())
|
||||
|
@ -669,7 +714,8 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
|
|||
// we may not get the entire mutation list for the version encoded_list_of_mutations:
|
||||
// [mutation1][mutation2]...[mutationk], where
|
||||
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
|
||||
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
|
||||
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
|
||||
SerializedMutationListMap* pmutationMap,
|
||||
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
|
||||
const RestoreAsset& asset) {
|
||||
|
@ -709,6 +755,12 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
|
|||
|
||||
MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
|
||||
// Should this mutation be skipped?
|
||||
// Skip mutation whose commitVesion < range kv's version
|
||||
if (logMutationTooOld(pRangeVersions, mutation, commitVersion)) {
|
||||
cc->oldLogMutations += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mutation.param1 >= asset.range.end ||
|
||||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
|
||||
(!isRangeMutation(mutation) && mutation.param1 < asset.range.begin)) {
|
||||
|
|
|
@ -82,11 +82,13 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
|
|||
CounterCollection cc;
|
||||
Counter loadedRangeBytes, loadedLogBytes, sentBytes;
|
||||
Counter sampledRangeBytes, sampledLogBytes;
|
||||
Counter oldLogMutations;
|
||||
|
||||
Counters(LoaderBatchData* self, UID loaderInterfID, int batchIndex)
|
||||
: cc("LoaderBatch", loaderInterfID.toString() + ":" + std::to_string(batchIndex)),
|
||||
loadedRangeBytes("LoadedRangeBytes", cc), loadedLogBytes("LoadedLogBytes", cc), sentBytes("SentBytes", cc),
|
||||
sampledRangeBytes("SampledRangeBytes", cc), sampledLogBytes("SampledLogBytes", cc) {}
|
||||
sampledRangeBytes("SampledRangeBytes", cc), sampledLogBytes("SampledLogBytes", cc),
|
||||
oldLogMutations("OldLogMutations", cc) {}
|
||||
} counters;
|
||||
|
||||
explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex), vbState(LoaderVersionBatchState::NOT_INIT) {
|
||||
|
@ -129,6 +131,8 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
|
|||
std::map<int, Reference<LoaderBatchData>> batch;
|
||||
std::map<int, Reference<LoaderBatchStatus>> status;
|
||||
|
||||
KeyRangeMap<Version> rangeVersions;
|
||||
|
||||
Reference<IBackupContainer> bc; // Backup container is used to read backup files
|
||||
Key bcUrl; // The url used to get the bc
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
// This file implements the functions for RestoreMaster role
|
||||
|
||||
#include "fdbrpc/RangeMap.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
|
@ -37,8 +38,10 @@
|
|||
|
||||
ACTOR static Future<Void> clearDB(Database cx);
|
||||
ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
|
||||
std::vector<RestoreFileFR>* logFiles, Database cx,
|
||||
RestoreRequest request);
|
||||
std::vector<RestoreFileFR>* logFiles, Version* minRangeVersion,
|
||||
Database cx, RestoreRequest request);
|
||||
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
|
||||
std::vector<RestoreFileFR>* pRangeFiles, Key url);
|
||||
|
||||
ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData> self, Database cx, RestoreRequest request);
|
||||
ACTOR static Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self, Database cx);
|
||||
|
@ -48,8 +51,8 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
|
|||
|
||||
ACTOR static Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker,
|
||||
Reference<RestoreMasterData> masterData);
|
||||
ACTOR static Future<Void> distributeRestoreSysInfo(Reference<RestoreWorkerData> masterWorker,
|
||||
Reference<RestoreMasterData> masterData);
|
||||
ACTOR static Future<Void> distributeRestoreSysInfo(Reference<RestoreMasterData> masterData,
|
||||
KeyRangeMap<Version>* pRangeVersions);
|
||||
|
||||
ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequests(Database cx);
|
||||
ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInterface> appliersInterf,
|
||||
|
@ -79,8 +82,6 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker,
|
|||
actors.add(updateHeartbeatTime(self));
|
||||
actors.add(checkRolesLiveness(self));
|
||||
|
||||
wait(distributeRestoreSysInfo(masterWorker, self));
|
||||
|
||||
wait(startProcessRestoreRequests(self, cx));
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_operation_cancelled) {
|
||||
|
@ -148,14 +149,27 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreWorkerData> masterWorker,
|
||||
Reference<RestoreMasterData> masterData) {
|
||||
ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreMasterData> masterData,
|
||||
KeyRangeMap<Version>* pRangeVersions) {
|
||||
ASSERT(masterData.isValid());
|
||||
ASSERT(!masterData->loadersInterf.empty());
|
||||
RestoreSysInfo sysInfo(masterData->appliersInterf);
|
||||
// Construct serializable KeyRange versions
|
||||
Standalone<VectorRef<std::pair<KeyRangeRef, Version>>> rangeVersionsVec;
|
||||
auto ranges = pRangeVersions->ranges();
|
||||
int i = 0;
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
rangeVersionsVec.push_back(rangeVersionsVec.arena(),
|
||||
std::make_pair(KeyRangeRef(r->begin(), r->end()), r->value()));
|
||||
TraceEvent("DistributeRangeVersions")
|
||||
.detail("RangeIndex", i++)
|
||||
.detail("RangeBegin", r->begin())
|
||||
.detail("RangeEnd", r->end())
|
||||
.detail("RangeVersion", r->value());
|
||||
}
|
||||
std::vector<std::pair<UID, RestoreSysInfoRequest>> requests;
|
||||
for (auto& loader : masterData->loadersInterf) {
|
||||
requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo));
|
||||
requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo, rangeVersionsVec));
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreDistributeRestoreSysInfoToLoaders", masterData->id())
|
||||
|
@ -233,12 +247,14 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
|
|||
state std::vector<RestoreFileFR> rangeFiles;
|
||||
state std::vector<RestoreFileFR> logFiles;
|
||||
state std::vector<RestoreFileFR> allFiles;
|
||||
state Version minRangeVersion = MAX_VERSION;
|
||||
state ActorCollection actors(false);
|
||||
|
||||
self->initBackupContainer(request.url);
|
||||
|
||||
// Get all backup files' description and save them to files
|
||||
Version targetVersion = wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, cx, request));
|
||||
state Version targetVersion =
|
||||
wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, &minRangeVersion, cx, request));
|
||||
ASSERT(targetVersion > 0);
|
||||
|
||||
std::sort(rangeFiles.begin(), rangeFiles.end());
|
||||
|
@ -247,6 +263,25 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
|
|||
std::tie(f2.endVersion, f2.beginVersion, f2.fileIndex, f2.fileName);
|
||||
});
|
||||
|
||||
// Build range versions: version of key ranges in range file
|
||||
state KeyRangeMap<Version> rangeVersions(minRangeVersion, allKeys.end);
|
||||
if (SERVER_KNOBS->FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE) {
|
||||
wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url));
|
||||
} else {
|
||||
// Debug purpose, dump range versions
|
||||
auto ranges = rangeVersions.ranges();
|
||||
int i = 0;
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
TraceEvent(SevDebug, "SingleRangeVersion")
|
||||
.detail("RangeIndex", i++)
|
||||
.detail("RangeBegin", r->begin())
|
||||
.detail("RangeEnd", r->end())
|
||||
.detail("RangeVersion", r->value());
|
||||
}
|
||||
}
|
||||
|
||||
wait(distributeRestoreSysInfo(self, &rangeVersions));
|
||||
|
||||
// Divide files into version batches.
|
||||
self->buildVersionBatches(rangeFiles, logFiles, &self->versionBatches, targetVersion);
|
||||
self->dumpVersionBatches(self->versionBatches);
|
||||
|
@ -615,8 +650,8 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
|
|||
// Collect the backup files' description into output_files by reading the backupContainer bc.
|
||||
// Returns the restore target version.
|
||||
ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
|
||||
std::vector<RestoreFileFR>* logFiles, Database cx,
|
||||
RestoreRequest request) {
|
||||
std::vector<RestoreFileFR>* logFiles, Version* minRangeVersion,
|
||||
Database cx, RestoreRequest request) {
|
||||
state BackupDescription desc = wait(bc->describeBackup());
|
||||
|
||||
// Convert version to real time for operators to read the BackupDescription desc.
|
||||
|
@ -645,6 +680,7 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
|||
|
||||
std::set<RestoreFileFR> uniqueRangeFiles;
|
||||
std::set<RestoreFileFR> uniqueLogFiles;
|
||||
*minRangeVersion = MAX_VERSION;
|
||||
for (const RangeFile& f : restorable.get().ranges) {
|
||||
TraceEvent("FastRestoreMasterPhaseCollectBackupFiles").detail("RangeFile", f.toString());
|
||||
if (f.fileSize <= 0) {
|
||||
|
@ -653,6 +689,7 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
|||
RestoreFileFR file(f);
|
||||
TraceEvent("FastRestoreMasterPhaseCollectBackupFiles").detail("RangeFileFR", file.toString());
|
||||
uniqueRangeFiles.insert(file);
|
||||
*minRangeVersion = std::min(*minRangeVersion, file.version);
|
||||
}
|
||||
for (const LogFile& f : restorable.get().logs) {
|
||||
TraceEvent("FastRestoreMasterPhaseCollectBackupFiles").detail("LogFile", f.toString());
|
||||
|
@ -675,6 +712,78 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
|
|||
return request.targetVersion;
|
||||
}
|
||||
|
||||
// By the first and last block of *file to get (beginKey, endKey);
|
||||
// set (beginKey, endKey) and file->version to pRangeVersions
|
||||
ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersions, RestoreFileFR* file,
|
||||
Reference<IBackupContainer> bc) {
|
||||
TraceEvent("FastRestoreMasterDecodeRangeVersion").detail("File", file->toString());
|
||||
state Reference<IAsyncFile> inFile = wait(bc->readFile(file->fileName));
|
||||
state bool beginKeySet = false;
|
||||
state Key beginKey;
|
||||
state Key endKey;
|
||||
state int64_t j = 0;
|
||||
for (; j < file->fileSize; j += file->blockSize) {
|
||||
int64_t len = std::min<int64_t>(file->blockSize, file->fileSize - j);
|
||||
Standalone<VectorRef<KeyValueRef>> blockData = wait(parallelFileRestore::decodeRangeFileBlock(inFile, j, len));
|
||||
if (!beginKeySet) {
|
||||
beginKey = blockData.front().key;
|
||||
}
|
||||
endKey = blockData.back().key;
|
||||
}
|
||||
|
||||
// First and last key are the range for this file: endKey is exclusive
|
||||
KeyRange fileRange = KeyRangeRef(beginKey.contents(), endKey.contents());
|
||||
TraceEvent("FastRestoreMasterInsertRangeVersion")
|
||||
.detail("DecodedRangeFile", file->fileName)
|
||||
.detail("KeyRange", fileRange)
|
||||
.detail("Version", file->version);
|
||||
// Update version for pRangeVersions's ranges in fileRange
|
||||
auto ranges = pRangeVersions->modify(fileRange);
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
r->value() = std::max(r->value(), file->version);
|
||||
}
|
||||
|
||||
// Dump the new key ranges
|
||||
ranges = pRangeVersions->ranges();
|
||||
int i = 0;
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
TraceEvent(SevDebug, "RangeVersionsAfterUpdate")
|
||||
.detail("File", file->toString())
|
||||
.detail("FileRange", fileRange.toString())
|
||||
.detail("FileVersion", file->version)
|
||||
.detail("RangeIndex", i++)
|
||||
.detail("RangeBegin", r->begin())
|
||||
.detail("RangeEnd", r->end())
|
||||
.detail("RangeVersion", r->value());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Build the version skyline of snapshot ranges by parsing range files;
|
||||
// Expensive and slow operation that should not run in real prod.
|
||||
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
|
||||
std::vector<RestoreFileFR>* pRangeFiles, Key url) {
|
||||
if (!g_network->isSimulated()) {
|
||||
TraceEvent(SevError, "ExpensiveBuildRangeVersions")
|
||||
.detail("Reason", "Parsing all range files is slow and memory intensive");
|
||||
return Void();
|
||||
}
|
||||
Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
|
||||
|
||||
// Key ranges not in range files are empty;
|
||||
// Assign highest version to avoid applying any mutation in these ranges
|
||||
state int fileIndex = 0;
|
||||
state std::vector<Future<Void>> fInsertRangeVersions;
|
||||
for (; fileIndex < pRangeFiles->size(); ++fileIndex) {
|
||||
fInsertRangeVersions.push_back(insertRangeVersion(pRangeVersions, &pRangeFiles->at(fileIndex), bc));
|
||||
}
|
||||
|
||||
wait(waitForAll(fInsertRangeVersions));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> clearDB(Database cx) {
|
||||
wait(runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
|
|
@ -442,6 +442,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
|
||||
BackupDescription desc = wait(container->describeBackup());
|
||||
ASSERT(self->usePartitionedLogs == desc.partitioned);
|
||||
ASSERT(desc.minRestorableVersion.present()); // We must have a valid backup now.
|
||||
|
||||
state Version targetVersion = -1;
|
||||
if (desc.maxRestorableVersion.present()) {
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -162,8 +162,8 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
if (foundConflict) {
|
||||
// \xff\xff/transaction/conflicting_keys is always initialized to false, skip it here
|
||||
state KeyRange ckr =
|
||||
KeyRangeRef(keyAfter(LiteralStringRef("").withPrefix(conflictingKeysAbsolutePrefix)),
|
||||
LiteralStringRef("\xff\xff").withPrefix(conflictingKeysAbsolutePrefix));
|
||||
KeyRangeRef(keyAfter(LiteralStringRef("").withPrefix(conflictingKeysRange.begin)),
|
||||
LiteralStringRef("\xff\xff").withPrefix(conflictingKeysRange.begin));
|
||||
// The getRange here using the special key prefix "\xff\xff/transaction/conflicting_keys/" happens
|
||||
// locally Thus, the error handling is not needed here
|
||||
Future<Standalone<RangeResultRef>> conflictingKeyRangesFuture =
|
||||
|
@ -176,12 +176,14 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||
ASSERT(!conflictingKeyRanges.more);
|
||||
for (int i = 0; i < conflictingKeyRanges.size(); i += 2) {
|
||||
KeyValueRef startKeyWithPrefix = conflictingKeyRanges[i];
|
||||
ASSERT(startKeyWithPrefix.key.startsWith(conflictingKeysRange.begin));
|
||||
ASSERT(startKeyWithPrefix.value == conflictingKeysTrue);
|
||||
KeyValueRef endKeyWithPrefix = conflictingKeyRanges[i + 1];
|
||||
ASSERT(endKeyWithPrefix.key.startsWith(conflictingKeysRange.begin));
|
||||
ASSERT(endKeyWithPrefix.value == conflictingKeysFalse);
|
||||
// Remove the prefix of returning keys
|
||||
Key startKey = startKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
|
||||
Key endKey = endKeyWithPrefix.key.removePrefix(conflictingKeysAbsolutePrefix);
|
||||
Key startKey = startKeyWithPrefix.key.removePrefix(conflictingKeysRange.begin);
|
||||
Key endKey = endKeyWithPrefix.key.removePrefix(conflictingKeysRange.begin);
|
||||
KeyRangeRef kr = KeyRangeRef(startKey, endKey);
|
||||
if (!std::any_of(readConflictRanges.begin(), readConflictRanges.end(), [&kr](KeyRange rCR) {
|
||||
// Read_conflict_range remains same in the resolver.
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
* SpecialKeySpaceCorrectness.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
class SKSCTestImpl : public SpecialKeyRangeBaseImpl {
|
||||
public:
|
||||
explicit SKSCTestImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
|
||||
virtual Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
|
||||
KeyRangeRef kr) const {
|
||||
ASSERT(range.contains(kr));
|
||||
auto resultFuture = ryw->getRange(kr, CLIENT_KNOBS->TOO_MANY);
|
||||
// all keys are written to RYW, since GRV is set, the read should happen locally
|
||||
ASSERT(resultFuture.isReady());
|
||||
auto result = resultFuture.getValue();
|
||||
ASSERT(!result.more);
|
||||
return resultFuture.getValue();
|
||||
}
|
||||
};
|
||||
|
||||
struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
||||
|
||||
int actorCount, minKeysPerRange, maxKeysPerRange, rangeCount, keyBytes, valBytes;
|
||||
double testDuration, absoluteRandomProb, transactionsPerSecond;
|
||||
PerfIntCounter wrongResults, keysCount;
|
||||
Reference<ReadYourWritesTransaction> ryw; // used to store all populated data
|
||||
std::vector<std::shared_ptr<SKSCTestImpl>> impls;
|
||||
Standalone<VectorRef<KeyRangeRef>> keys;
|
||||
|
||||
SpecialKeySpaceCorrectnessWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx), wrongResults("Wrong Results"), keysCount("Number of generated keys") {
|
||||
minKeysPerRange = getOption(options, LiteralStringRef("minKeysPerRange"), 1);
|
||||
maxKeysPerRange = getOption(options, LiteralStringRef("maxKeysPerRange"), 100);
|
||||
rangeCount = getOption(options, LiteralStringRef("rangeCount"), 10);
|
||||
keyBytes = getOption(options, LiteralStringRef("keyBytes"), 16);
|
||||
valBytes = getOption(options, LiteralStringRef("valueBytes"), 16);
|
||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 100.0);
|
||||
actorCount = getOption(options, LiteralStringRef("actorCount"), 1);
|
||||
absoluteRandomProb = getOption(options, LiteralStringRef("absoluteRandomProb"), 0.5);
|
||||
}
|
||||
|
||||
virtual std::string description() { return "SpecialKeySpaceCorrectness"; }
|
||||
virtual Future<Void> setup(Database const& cx) { return _setup(cx, this); }
|
||||
virtual Future<Void> start(Database const& cx) { return _start(cx, this); }
|
||||
virtual Future<bool> check(Database const& cx) { return wrongResults.getValue() == 0; }
|
||||
virtual void getMetrics(std::vector<PerfMetric>& m) {}
|
||||
|
||||
// disable the default timeout setting
|
||||
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
|
||||
|
||||
Future<Void> _setup(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
|
||||
if (self->clientId == 0) {
|
||||
self->ryw = Reference(new ReadYourWritesTransaction(cx));
|
||||
self->ryw->setVersion(100);
|
||||
self->ryw->clear(normalKeys);
|
||||
// generate key ranges
|
||||
for (int i = 0; i < self->rangeCount; ++i) {
|
||||
std::string baseKey = deterministicRandom()->randomAlphaNumeric(i + 1);
|
||||
Key startKey(baseKey + "/");
|
||||
Key endKey(baseKey + "/\xff");
|
||||
self->keys.push_back_deep(self->keys.arena(), KeyRangeRef(startKey, endKey));
|
||||
self->impls.push_back(std::make_shared<SKSCTestImpl>(KeyRangeRef(startKey, endKey)));
|
||||
// Although there are already ranges registered, the testing range will replace them
|
||||
cx->specialKeySpace->registerKeyRange(self->keys.back(), self->impls.back().get());
|
||||
// generate keys in each key range
|
||||
int keysInRange = deterministicRandom()->randomInt(self->minKeysPerRange, self->maxKeysPerRange + 1);
|
||||
self->keysCount += keysInRange;
|
||||
for (int j = 0; j < keysInRange; ++j) {
|
||||
self->ryw->set(Key(deterministicRandom()->randomAlphaNumeric(self->keyBytes)).withPrefix(startKey),
|
||||
Value(deterministicRandom()->randomAlphaNumeric(self->valBytes)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
ACTOR Future<Void> _start(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
|
||||
if (self->clientId == 0) wait(timeout(self->getRangeCallActor(cx, self), self->testDuration, Void()));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getRangeCallActor(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
|
||||
state double lastTime = now();
|
||||
loop {
|
||||
wait(poisson(&lastTime, 1.0 / self->transactionsPerSecond));
|
||||
state bool reverse = deterministicRandom()->random01() < 0.5;
|
||||
state GetRangeLimits limit = self->randomLimits();
|
||||
state KeySelector begin = self->randomKeySelector();
|
||||
state KeySelector end = self->randomKeySelector();
|
||||
auto correctResultFuture = self->ryw->getRange(begin, end, limit, false, reverse);
|
||||
ASSERT(correctResultFuture.isReady());
|
||||
auto correctResult = correctResultFuture.getValue();
|
||||
auto testResultFuture = cx->specialKeySpace->getRange(self->ryw, begin, end, limit, reverse);
|
||||
ASSERT(testResultFuture.isReady());
|
||||
auto testResult = testResultFuture.getValue();
|
||||
|
||||
// check the consistency of results
|
||||
if (!self->compareRangeResult(correctResult, testResult)) {
|
||||
TraceEvent(SevError, "TestFailure")
|
||||
.detail("Reason", "Results from getRange are inconsistent")
|
||||
.detail("Begin", begin.toString())
|
||||
.detail("End", end.toString())
|
||||
.detail("LimitRows", limit.rows)
|
||||
.detail("LimitBytes", limit.bytes)
|
||||
.detail("Reverse", reverse);
|
||||
++self->wrongResults;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool compareRangeResult(Standalone<RangeResultRef>& res1, Standalone<RangeResultRef>& res2) {
|
||||
if ((res1.more != res2.more) || (res1.readToBegin != res2.readToBegin) ||
|
||||
(res1.readThroughEnd != res2.readThroughEnd)) {
|
||||
TraceEvent(SevError, "TestFailure")
|
||||
.detail("Reason", "RangeResultRef flags are inconsistent")
|
||||
.detail("More", res1.more)
|
||||
.detail("ReadToBegin", res1.readToBegin)
|
||||
.detail("ReadThroughEnd", res1.readThroughEnd)
|
||||
.detail("More2", res2.more)
|
||||
.detail("ReadToBegin2", res2.readToBegin)
|
||||
.detail("ReadThroughEnd2", res2.readThroughEnd);
|
||||
return false;
|
||||
}
|
||||
if (res1.size() != res2.size()) {
|
||||
TraceEvent(SevError, "TestFailure")
|
||||
.detail("Reason", "Results' sizes are inconsistent")
|
||||
.detail("CorrestResultSize", res1.size())
|
||||
.detail("TestResultSize", res2.size());
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < res1.size(); ++i) {
|
||||
if (res1[i] != res2[i]) {
|
||||
TraceEvent(SevError, "TestFailure")
|
||||
.detail("Reason", "Elements are inconsistent")
|
||||
.detail("Index", i)
|
||||
.detail("CorrectKey", printable(res1[i].key))
|
||||
.detail("TestKey", printable(res2[i].key))
|
||||
.detail("CorrectValue", printable(res1[i].value))
|
||||
.detail("TestValue", printable(res2[i].value));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
KeySelector randomKeySelector() {
|
||||
Key randomKey;
|
||||
// TODO : add randomness to pickup existing keys
|
||||
if (deterministicRandom()->random01() < absoluteRandomProb) {
|
||||
Key prefix;
|
||||
if (deterministicRandom()->random01() < absoluteRandomProb)
|
||||
// prefix length is randomly generated
|
||||
prefix =
|
||||
Key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(1, rangeCount + 1)) +
|
||||
"/");
|
||||
else
|
||||
// pick up an existing prefix
|
||||
prefix = keys[deterministicRandom()->randomInt(0, rangeCount)].begin;
|
||||
randomKey = Key(deterministicRandom()->randomAlphaNumeric(keyBytes)).withPrefix(prefix);
|
||||
} else {
|
||||
// pick up existing keys from registered key ranges
|
||||
KeyRangeRef randomKeyRangeRef = keys[deterministicRandom()->randomInt(0, keys.size())];
|
||||
randomKey = deterministicRandom()->random01() < 0.5 ? randomKeyRangeRef.begin : randomKeyRangeRef.end;
|
||||
}
|
||||
// return Key(deterministicRandom()->randomAlphaNumeric(keyBytes)).withPrefix(prefix);
|
||||
// covers corner cases where offset points outside the key space
|
||||
int offset = deterministicRandom()->randomInt(-keysCount.getValue() - 1, keysCount.getValue() + 2);
|
||||
bool orEqual = deterministicRandom()->random01() < 0.5;
|
||||
return KeySelectorRef(randomKey, orEqual, offset);
|
||||
}
|
||||
|
||||
GetRangeLimits randomLimits() {
|
||||
// TODO : fix knobs for row_unlimited
|
||||
int rowLimits = deterministicRandom()->randomInt(1, keysCount.getValue() + 1);
|
||||
// The largest key's bytes is longest prefix bytes + 1(for '/') + generated key bytes
|
||||
// 8 here refers to bytes of KeyValueRef
|
||||
int byteLimits = deterministicRandom()->randomInt(
|
||||
1, keysCount.getValue() * (keyBytes + (rangeCount + 1) + valBytes + 8) + 1);
|
||||
|
||||
return GetRangeLimits(rowLimits, byteLimits);
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<SpecialKeySpaceCorrectnessWorkload> SpecialKeySpaceCorrectnessFactory("SpecialKeySpaceCorrectness");
|
|
@ -953,7 +953,15 @@ namespace actorcompiler
|
|||
// if it has side effects
|
||||
cx.target.WriteLine("if (!{0}->SAV<{1}>::futures) {{ (void)({2}); this->~{3}(); {0}->destroy(); return 0; }}", This, actor.returnType, stmt.expression, stateClassName);
|
||||
// Build the return value directly in SAV<T>::value_storage
|
||||
cx.target.WriteLine("new (&{0}->SAV< {1} >::value()) {1}({2});", This, actor.returnType, stmt.expression);
|
||||
// If the expression is exactly the name of a state variable, std::move() it
|
||||
if (state.Exists(s => s.name == stmt.expression))
|
||||
{
|
||||
cx.target.WriteLine("new (&{0}->SAV< {1} >::value()) {1}(std::move({2})); // state_var_RVO", This, actor.returnType, stmt.expression);
|
||||
}
|
||||
else
|
||||
{
|
||||
cx.target.WriteLine("new (&{0}->SAV< {1} >::value()) {1}({2});", This, actor.returnType, stmt.expression);
|
||||
}
|
||||
// Destruct state
|
||||
cx.target.WriteLine("this->~{0}();", stateClassName);
|
||||
// Tell SAV<T> to return the value we already constructed in value_storage
|
||||
|
|
|
@ -687,6 +687,11 @@ public:
|
|||
{
|
||||
sav->send(presentValue);
|
||||
}
|
||||
Future(T&& presentValue)
|
||||
: sav(new SAV<T>(1, 0))
|
||||
{
|
||||
sav->send(std::move(presentValue));
|
||||
}
|
||||
Future(Never)
|
||||
: sav(new SAV<T>(1, 0))
|
||||
{
|
||||
|
|
|
@ -127,6 +127,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES fast/SelectorCorrectness.txt)
|
||||
add_fdb_test(TEST_FILES fast/Sideband.txt)
|
||||
add_fdb_test(TEST_FILES fast/SidebandWithStatus.txt)
|
||||
add_fdb_test(TEST_FILES fast/SpecialKeySpaceCorrectness.txt)
|
||||
add_fdb_test(TEST_FILES fast/SwizzledRollbackSideband.txt)
|
||||
add_fdb_test(TEST_FILES fast/SystemRebootTestCycle.txt)
|
||||
add_fdb_test(TEST_FILES fast/TaskBucketCorrectness.txt)
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
testTitle=SpecialKeySpaceCorrectnessTest
|
||||
testName=SpecialKeySpaceCorrectness
|
||||
testDuration=30.0
|
||||
valueBytes=16
|
||||
keyBytes=16
|
Loading…
Reference in New Issue