Merge pull request #48 from apple/release-5.2
Merge release-5.2 into master
This commit is contained in:
commit
cf9d02cdbd
|
@ -18,7 +18,6 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
import ctypes
|
|
||||||
import random
|
import random
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
|
@ -169,7 +168,7 @@ class ApiTest(Test):
|
||||||
op_choices += resets
|
op_choices += resets
|
||||||
|
|
||||||
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX']
|
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX']
|
||||||
atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR']
|
atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR', u'APPEND_IF_FITS']
|
||||||
|
|
||||||
if args.concurrency > 1:
|
if args.concurrency > 1:
|
||||||
self.max_keys = random.randint(100, 1000)
|
self.max_keys = random.randint(100, 1000)
|
||||||
|
|
|
@ -1654,6 +1654,7 @@ void populateAtomicOpMap() {
|
||||||
optionInfo["BIT_OR"] = FDBMutationType::FDB_MUTATION_TYPE_BIT_OR;
|
optionInfo["BIT_OR"] = FDBMutationType::FDB_MUTATION_TYPE_BIT_OR;
|
||||||
optionInfo["XOR"] = FDBMutationType::FDB_MUTATION_TYPE_XOR;
|
optionInfo["XOR"] = FDBMutationType::FDB_MUTATION_TYPE_XOR;
|
||||||
optionInfo["BIT_XOR"] = FDBMutationType::FDB_MUTATION_TYPE_BIT_XOR;
|
optionInfo["BIT_XOR"] = FDBMutationType::FDB_MUTATION_TYPE_BIT_XOR;
|
||||||
|
optionInfo["APPEND_IF_FITS"] = FDBMutationType::FDB_MUTATION_TYPE_APPEND_IF_FITS;
|
||||||
optionInfo["MAX"] = FDBMutationType::FDB_MUTATION_TYPE_MAX;
|
optionInfo["MAX"] = FDBMutationType::FDB_MUTATION_TYPE_MAX;
|
||||||
optionInfo["MIN"] = FDBMutationType::FDB_MUTATION_TYPE_MIN;
|
optionInfo["MIN"] = FDBMutationType::FDB_MUTATION_TYPE_MIN;
|
||||||
optionInfo["SET_VERSIONSTAMPED_KEY"] = FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY;
|
optionInfo["SET_VERSIONSTAMPED_KEY"] = FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY;
|
||||||
|
|
|
@ -421,7 +421,7 @@ Future<Void> logError(Database cx, Key keyErrors, const std::string& message);
|
||||||
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message);
|
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message);
|
||||||
Future<Void> checkVersion(Reference<ReadYourWritesTransaction> const& tr);
|
Future<Void> checkVersion(Reference<ReadYourWritesTransaction> const& tr);
|
||||||
Future<Void> readCommitted(Database const& cx, PromiseStream<RangeResultWithVersion> const& results, Reference<FlowLock> const& lock, KeyRangeRef const& range, bool const& terminator = true, bool const& systemAccess = false, bool const& lockAware = false);
|
Future<Void> readCommitted(Database const& cx, PromiseStream<RangeResultWithVersion> const& results, Reference<FlowLock> const& lock, KeyRangeRef const& range, bool const& terminator = true, bool const& systemAccess = false, bool const& lockAware = false);
|
||||||
Future<Void> readCommitted(Database const& cx, PromiseStream<RCGroup> const& results, Future<Void> const& active, Reference<FlowLock> const& lock, KeyRangeRef const& range, std::function< std::pair<uint64_t, uint32_t>(Key key) > const& groupBy, bool const& terminator = true, bool const& systemAccess = false, bool const& lockAware = false, std::function< Future<Void>(Reference<ReadYourWritesTransaction> tr) > const& withEachFunction = nullptr);
|
Future<Void> readCommitted(Database const& cx, PromiseStream<RCGroup> const& results, Future<Void> const& active, Reference<FlowLock> const& lock, KeyRangeRef const& range, std::function< std::pair<uint64_t, uint32_t>(Key key) > const& groupBy, bool const& terminator = true, bool const& systemAccess = false, bool const& lockAware = false);
|
||||||
Future<Void> applyMutations(Database const& cx, Key const& uid, Key const& addPrefix, Key const& removePrefix, Version const& beginVersion, Version* const& endVersion, RequestStream<CommitTransactionRequest> const& commit, NotifiedVersion* const& committedVersion, Reference<KeyRangeMap<Version>> const& keyVersion);
|
Future<Void> applyMutations(Database const& cx, Key const& uid, Key const& addPrefix, Key const& removePrefix, Version const& beginVersion, Version* const& endVersion, RequestStream<CommitTransactionRequest> const& commit, NotifiedVersion* const& committedVersion, Reference<KeyRangeMap<Version>> const& keyVersion);
|
||||||
|
|
||||||
typedef BackupAgentBase::enumState EBackupState;
|
typedef BackupAgentBase::enumState EBackupState;
|
||||||
|
|
|
@ -328,7 +328,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
|
||||||
KeyRangeRef range, bool terminator, bool systemAccess, bool lockAware) {
|
KeyRangeRef range, bool terminator, bool systemAccess, bool lockAware) {
|
||||||
state KeySelector begin = firstGreaterOrEqual(range.begin);
|
state KeySelector begin = firstGreaterOrEqual(range.begin);
|
||||||
state KeySelector end = firstGreaterOrEqual(range.end);
|
state KeySelector end = firstGreaterOrEqual(range.end);
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Transaction tr(cx);
|
||||||
state FlowLock::Releaser releaser;
|
state FlowLock::Releaser releaser;
|
||||||
|
|
||||||
loop{
|
loop{
|
||||||
|
@ -336,16 +336,16 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
|
||||||
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
|
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
|
||||||
|
|
||||||
if (systemAccess)
|
if (systemAccess)
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
if (lockAware)
|
if (lockAware)
|
||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
|
||||||
//add lock
|
//add lock
|
||||||
releaser.release();
|
releaser.release();
|
||||||
Void _ = wait(lock->take(TaskDefaultYield, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT));
|
Void _ = wait(lock->take(TaskDefaultYield, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT));
|
||||||
releaser = FlowLock::Releaser(*lock, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
releaser = FlowLock::Releaser(*lock, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||||
|
|
||||||
state Standalone<RangeResultRef> values = wait(tr->getRange(begin, end, limits));
|
state Standalone<RangeResultRef> values = wait(tr.getRange(begin, end, limits));
|
||||||
|
|
||||||
// When this buggify line is enabled, if there are more than 1 result then use half of the results
|
// When this buggify line is enabled, if there are more than 1 result then use half of the results
|
||||||
if(values.size() > 1 && BUGGIFY) {
|
if(values.size() > 1 && BUGGIFY) {
|
||||||
|
@ -359,7 +359,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
|
||||||
releaser.remaining -= values.expectedSize(); //its the responsibility of the caller to release after this point
|
releaser.remaining -= values.expectedSize(); //its the responsibility of the caller to release after this point
|
||||||
ASSERT(releaser.remaining >= 0);
|
ASSERT(releaser.remaining >= 0);
|
||||||
|
|
||||||
results.send(RangeResultWithVersion(values, tr->getReadVersion().get()));
|
results.send(RangeResultWithVersion(values, tr.getReadVersion().get()));
|
||||||
|
|
||||||
if (values.size() > 0)
|
if (values.size() > 0)
|
||||||
begin = firstGreaterThan(values.end()[-1].key);
|
begin = firstGreaterThan(values.end()[-1].key);
|
||||||
|
@ -373,21 +373,21 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
|
||||||
catch (Error &e) {
|
catch (Error &e) {
|
||||||
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
|
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
|
||||||
throw;
|
throw;
|
||||||
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
|
tr = Transaction(cx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Future<Void> active, Reference<FlowLock> lock,
|
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Future<Void> active, Reference<FlowLock> lock,
|
||||||
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy,
|
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy,
|
||||||
bool terminator, bool systemAccess, bool lockAware, std::function< Future<Void>(Reference<ReadYourWritesTransaction> tr) > withEachFunction)
|
bool terminator, bool systemAccess, bool lockAware)
|
||||||
{
|
{
|
||||||
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
|
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
|
||||||
state KeySelector end = firstGreaterOrEqual(range.end);
|
state KeySelector end = firstGreaterOrEqual(range.end);
|
||||||
|
|
||||||
state RCGroup rcGroup = RCGroup();
|
state RCGroup rcGroup = RCGroup();
|
||||||
state uint64_t skipGroup(ULLONG_MAX);
|
state uint64_t skipGroup(ULLONG_MAX);
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Transaction tr(cx);
|
||||||
state FlowLock::Releaser releaser;
|
state FlowLock::Releaser releaser;
|
||||||
|
|
||||||
loop{
|
loop{
|
||||||
|
@ -395,14 +395,11 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
|
||||||
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
|
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
|
||||||
|
|
||||||
if (systemAccess)
|
if (systemAccess)
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
if (lockAware)
|
if (lockAware)
|
||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
|
||||||
state Future<Void> withEach = Void();
|
state Standalone<RangeResultRef> rangevalue = wait(tr.getRange(nextKey, end, limits));
|
||||||
if (withEachFunction) withEach = withEachFunction(tr);
|
|
||||||
|
|
||||||
state Standalone<RangeResultRef> rangevalue = wait(tr->getRange(nextKey, end, limits));
|
|
||||||
|
|
||||||
// When this buggify line is enabled, if there are more than 1 result then use half of the results
|
// When this buggify line is enabled, if there are more than 1 result then use half of the results
|
||||||
if(rangevalue.size() > 1 && BUGGIFY) {
|
if(rangevalue.size() > 1 && BUGGIFY) {
|
||||||
|
@ -413,8 +410,6 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
|
||||||
Void _ = wait(delay(6.0));
|
Void _ = wait(delay(6.0));
|
||||||
}
|
}
|
||||||
|
|
||||||
Void _ = wait(withEach);
|
|
||||||
|
|
||||||
//add lock
|
//add lock
|
||||||
Void _ = wait(active);
|
Void _ = wait(active);
|
||||||
releaser.release();
|
releaser.release();
|
||||||
|
@ -427,7 +422,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
|
||||||
//TraceEvent("log_readCommitted").detail("groupKey", groupKey).detail("skipGroup", skipGroup).detail("nextKey", printable(nextKey.key)).detail("end", printable(end.key)).detail("valuesize", value.size()).detail("index",index++).detail("size",s.value.size());
|
//TraceEvent("log_readCommitted").detail("groupKey", groupKey).detail("skipGroup", skipGroup).detail("nextKey", printable(nextKey.key)).detail("end", printable(end.key)).detail("valuesize", value.size()).detail("index",index++).detail("size",s.value.size());
|
||||||
if (groupKey != skipGroup){
|
if (groupKey != skipGroup){
|
||||||
if (rcGroup.version == -1){
|
if (rcGroup.version == -1){
|
||||||
rcGroup.version = tr->getReadVersion().get();
|
rcGroup.version = tr.getReadVersion().get();
|
||||||
rcGroup.groupKey = groupKey;
|
rcGroup.groupKey = groupKey;
|
||||||
}
|
}
|
||||||
else if (rcGroup.groupKey != groupKey) {
|
else if (rcGroup.groupKey != groupKey) {
|
||||||
|
@ -444,7 +439,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
|
||||||
skipGroup = rcGroup.groupKey;
|
skipGroup = rcGroup.groupKey;
|
||||||
|
|
||||||
rcGroup = RCGroup();
|
rcGroup = RCGroup();
|
||||||
rcGroup.version = tr->getReadVersion().get();
|
rcGroup.version = tr.getReadVersion().get();
|
||||||
rcGroup.groupKey = groupKey;
|
rcGroup.groupKey = groupKey;
|
||||||
}
|
}
|
||||||
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
|
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
|
||||||
|
@ -469,94 +464,13 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
|
||||||
catch (Error &e) {
|
catch (Error &e) {
|
||||||
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
|
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
|
||||||
throw;
|
throw;
|
||||||
Void _ = wait(tr->onError(e));
|
Void _ = wait(tr.onError(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock,
|
Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy) {
|
||||||
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy)
|
return readCommitted(cx, results, Void(), lock, range, groupBy, true, true, true);
|
||||||
{
|
|
||||||
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
|
|
||||||
state KeySelector end = firstGreaterOrEqual(range.end);
|
|
||||||
|
|
||||||
state RCGroup rcGroup = RCGroup();
|
|
||||||
state uint64_t skipGroup(ULLONG_MAX);
|
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
|
||||||
state FlowLock::Releaser releaser;
|
|
||||||
|
|
||||||
loop{
|
|
||||||
try {
|
|
||||||
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
|
|
||||||
|
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
||||||
|
|
||||||
state Standalone<RangeResultRef> rangevalue = wait(tr->getRange(nextKey, end, limits));
|
|
||||||
|
|
||||||
// When this buggify line is enabled, if there are more than 1 result then use half of the results
|
|
||||||
if(rangevalue.size() > 1 && BUGGIFY) {
|
|
||||||
rangevalue.resize(rangevalue.arena(), rangevalue.size() / 2);
|
|
||||||
rangevalue.more = true;
|
|
||||||
// Half of the time wait for this tr to expire so that the next read is at a different version
|
|
||||||
if(g_random->random01() < 0.5)
|
|
||||||
Void _ = wait(delay(6.0));
|
|
||||||
}
|
|
||||||
|
|
||||||
releaser.release();
|
|
||||||
Void _ = wait(lock->take(TaskDefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
|
|
||||||
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());
|
|
||||||
|
|
||||||
int index(0);
|
|
||||||
for (auto & s : rangevalue){
|
|
||||||
uint64_t groupKey = groupBy(s.key).first;
|
|
||||||
//TraceEvent("log_readCommitted").detail("groupKey", groupKey).detail("skipGroup", skipGroup).detail("nextKey", printable(nextKey.key)).detail("end", printable(end.key)).detail("valuesize", value.size()).detail("index",index++).detail("size",s.value.size());
|
|
||||||
if (groupKey != skipGroup){
|
|
||||||
if (rcGroup.version == -1){
|
|
||||||
rcGroup.version = tr->getReadVersion().get();
|
|
||||||
rcGroup.groupKey = groupKey;
|
|
||||||
}
|
|
||||||
else if (rcGroup.groupKey != groupKey) {
|
|
||||||
//TraceEvent("log_readCommitted").detail("sendGroup0", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length",rcGroup.items[0].value.size());
|
|
||||||
state uint32_t len(0);
|
|
||||||
for (size_t j = 0; j < rcGroup.items.size(); ++j) {
|
|
||||||
len += rcGroup.items[j].value.size();
|
|
||||||
}
|
|
||||||
//TraceEvent("SendGroup").detail("groupKey", rcGroup.groupKey).detail("version", rcGroup.version).detail("length", len).detail("releaser.remaining", releaser.remaining);
|
|
||||||
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
|
|
||||||
ASSERT(releaser.remaining >= 0);
|
|
||||||
results.send(rcGroup);
|
|
||||||
nextKey = firstGreaterThan(rcGroup.items.end()[-1].key);
|
|
||||||
skipGroup = rcGroup.groupKey;
|
|
||||||
|
|
||||||
rcGroup = RCGroup();
|
|
||||||
rcGroup.version = tr->getReadVersion().get();
|
|
||||||
rcGroup.groupKey = groupKey;
|
|
||||||
}
|
|
||||||
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!rangevalue.more) {
|
|
||||||
if (rcGroup.version != -1){
|
|
||||||
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
|
|
||||||
ASSERT(releaser.remaining >= 0);
|
|
||||||
//TraceEvent("log_readCommitted").detail("sendGroup1", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length", rcGroup.items[0].value.size());
|
|
||||||
results.send(rcGroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
results.sendError(end_of_stream());
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
nextKey = firstGreaterThan(rangevalue.end()[-1].key);
|
|
||||||
}
|
|
||||||
catch (Error &e) {
|
|
||||||
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
|
|
||||||
throw;
|
|
||||||
Void _ = wait(tr->onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<int> dumpData(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, Key uid, Key addPrefix, Key removePrefix, RequestStream<CommitTransactionRequest> commit,
|
ACTOR Future<int> dumpData(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, Key uid, Key addPrefix, Key removePrefix, RequestStream<CommitTransactionRequest> commit,
|
||||||
|
|
|
@ -651,7 +651,7 @@ namespace dbBackup {
|
||||||
|
|
||||||
for (int i = 0; i < ranges.size(); ++i) {
|
for (int i = 0; i < ranges.size(); ++i) {
|
||||||
results.push_back(PromiseStream<RCGroup>());
|
results.push_back(PromiseStream<RCGroup>());
|
||||||
rc.push_back(readCommitted(taskBucket->src, results[i], Future<Void>(Void()), lock, ranges[i], decodeBKMutationLogKey, true, true, true, nullptr));
|
rc.push_back(readCommitted(taskBucket->src, results[i], Future<Void>(Void()), lock, ranges[i], decodeBKMutationLogKey, true, true, true));
|
||||||
dump.push_back(dumpData(cx, task, results[i], lock.getPtr(), taskBucket));
|
dump.push_back(dumpData(cx, task, results[i], lock.getPtr(), taskBucket));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1479,7 +1479,7 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
|
||||||
if(key >= getMaxWriteKey())
|
if(key >= getMaxWriteKey())
|
||||||
throw key_outside_legal_range();
|
throw key_outside_legal_range();
|
||||||
|
|
||||||
if(!isValidMutationType(operationType) || !isAtomicOp((MutationRef::Type) operationType) || operationType == MutationRef::AppendIfFits)
|
if(!isValidMutationType(operationType) || !isAtomicOp((MutationRef::Type) operationType))
|
||||||
throw invalid_mutation_type();
|
throw invalid_mutation_type();
|
||||||
|
|
||||||
if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
|
if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
|
||||||
|
@ -1898,4 +1898,4 @@ void ReadYourWritesTransaction::debugLogRetries(Optional<Error> error) {
|
||||||
transactionDebugInfo->lastRetryLogTime = now();
|
transactionDebugInfo->lastRetryLogTime = now();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -219,6 +219,9 @@ description is not currently required but encouraged.
|
||||||
<Option name="bit_xor" code="8"
|
<Option name="bit_xor" code="8"
|
||||||
paramType="Bytes" paramDescription="value with which to perform bitwise xor"
|
paramType="Bytes" paramDescription="value with which to perform bitwise xor"
|
||||||
description="Performs a bitwise ``xor`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``."/>
|
description="Performs a bitwise ``xor`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``."/>
|
||||||
|
<Option name="append_if_fits" code="9"
|
||||||
|
paramType="Bytes" paramDescription="value to append to the database value"
|
||||||
|
description="Appends ``param`` to the end of the existing value already in the database at the given key (or creates the key and sets the value to ``param`` if the key is empty). This will only append the value if the final concatenated value size is less than or equal to the maximum value size (i.e., if it fits). WARNING: No error is surfaced back to the user if the final value is too large because the mutation will not be applied until after the transaction has been committed. Therefore, it is only safe to use this mutation type if one can guarantee that one will keep the total value size under the maximum size."/>
|
||||||
<Option name="max" code="12"
|
<Option name="max" code="12"
|
||||||
paramType="Bytes" paramDescription="value to check against database value"
|
paramType="Bytes" paramDescription="value to check against database value"
|
||||||
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The larger of the two values is then stored in the database."/>
|
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The larger of the two values is then stored in the database."/>
|
||||||
|
|
|
@ -1314,6 +1314,16 @@ int main(int argc, char* argv[]) {
|
||||||
flushAndExit(FDB_EXIT_ERROR);
|
flushAndExit(FDB_EXIT_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(role == ConsistencyCheck) {
|
||||||
|
if(publicAddressStr != "") {
|
||||||
|
fprintf(stderr, "ERROR: Public address cannot be specified for consistency check processes\n");
|
||||||
|
printHelpTeaser(argv[0]);
|
||||||
|
flushAndExit(FDB_EXIT_ERROR);
|
||||||
|
}
|
||||||
|
auto publicIP = determinePublicIPAutomatically(connectionFile->getConnectionString());
|
||||||
|
publicAddress = NetworkAddress(publicIP, ::getpid());
|
||||||
|
}
|
||||||
|
|
||||||
if (listenAddressStr == "public")
|
if (listenAddressStr == "public")
|
||||||
listenAddress = publicAddress;
|
listenAddress = publicAddress;
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -343,27 +343,26 @@ ACTOR Future<Void> runProfiler(ProfilerRequest req) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit ) {
|
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, UID id, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit ) {
|
||||||
loop {
|
loop {
|
||||||
ErrorOr<Void> e = wait( errorOr( prevStorageServer) );
|
ErrorOr<Void> e = wait( errorOr( prevStorageServer) );
|
||||||
if (!e.isError()) return Void();
|
if (!e.isError()) return Void();
|
||||||
else if (e.getError().code() != error_code_please_reboot) throw e.getError();
|
else if (e.getError().code() != error_code_please_reboot) throw e.getError();
|
||||||
|
|
||||||
TraceEvent("StorageServerRequestedReboot", ssi.id());
|
TraceEvent("StorageServerRequestedReboot", id);
|
||||||
|
|
||||||
//if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as zombie()
|
//if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as zombie()
|
||||||
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
|
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
|
||||||
// the old one so the failure detector will know it is gone.
|
// the old one so the failure detector will know it is gone.
|
||||||
StorageServerInterface ssi_new;
|
StorageServerInterface ssi;
|
||||||
ssi_new.uniqueID = ssi.uniqueID;
|
ssi.uniqueID = id;
|
||||||
ssi_new.locality = ssi.locality;
|
ssi.locality = locality;
|
||||||
ssi = ssi_new;
|
|
||||||
ssi.initEndpoints();
|
ssi.initEndpoints();
|
||||||
auto* kv = openKVStore( storeType, filename, ssi.uniqueID, memoryLimit );
|
auto* kv = openKVStore( storeType, filename, ssi.uniqueID, memoryLimit );
|
||||||
Future<Void> kvClosed = kv->onClosed();
|
Future<Void> kvClosed = kv->onClosed();
|
||||||
filesClosed->add( kvClosed );
|
filesClosed->add( kvClosed );
|
||||||
prevStorageServer = storageServer( kv, ssi, db, folder, Promise<Void>() );
|
prevStorageServer = storageServer( kv, ssi, db, folder, Promise<Void>() );
|
||||||
prevStorageServer = handleIOErrors( prevStorageServer, kv, ssi.id(), kvClosed );
|
prevStorageServer = handleIOErrors( prevStorageServer, kv, id, kvClosed );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -610,7 +609,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
||||||
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
|
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
|
||||||
recoveries.push_back(recovery.getFuture());
|
recoveries.push_back(recovery.getFuture());
|
||||||
f = handleIOErrors( f, kv, s.storeID, kvClosed );
|
f = handleIOErrors( f, kv, s.storeID, kvClosed );
|
||||||
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited, dbInfo, folder, &filesClosed, memoryLimit );
|
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
|
||||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
|
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
|
||||||
} else if( s.storedComponent == DiskStore::TLogData ) {
|
} else if( s.storedComponent == DiskStore::TLogData ) {
|
||||||
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
|
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
|
||||||
|
@ -765,7 +764,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
||||||
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
|
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
|
||||||
s = handleIOErrors(s, data, recruited.id(), kvClosed);
|
s = handleIOErrors(s, data, recruited.id(), kvClosed);
|
||||||
s = storageCache.removeOnReady( req.reqId, s );
|
s = storageCache.removeOnReady( req.reqId, s );
|
||||||
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited, dbInfo, folder, &filesClosed, memoryLimit );
|
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
|
||||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );
|
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );
|
||||||
} else
|
} else
|
||||||
forwardPromise( req.reply, storageCache.get( req.reqId ) );
|
forwardPromise( req.reply, storageCache.get( req.reqId ) );
|
||||||
|
|
|
@ -861,7 +861,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
|
||||||
std::make_pair( error_code_key_too_large, ExceptionContract::requiredIf(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) ),
|
std::make_pair( error_code_key_too_large, ExceptionContract::requiredIf(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) ),
|
||||||
std::make_pair( error_code_value_too_large, ExceptionContract::requiredIf(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT) ),
|
std::make_pair( error_code_value_too_large, ExceptionContract::requiredIf(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT) ),
|
||||||
std::make_pair( error_code_invalid_mutation_type, ExceptionContract::requiredIf(
|
std::make_pair( error_code_invalid_mutation_type, ExceptionContract::requiredIf(
|
||||||
!isValidMutationType(op) || !isAtomicOp((MutationRef::Type) op) || op == MutationRef::AppendIfFits ) ),
|
!isValidMutationType(op) || !isAtomicOp((MutationRef::Type) op)) ),
|
||||||
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
|
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
|
||||||
(key >= (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) ),
|
(key >= (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) ),
|
||||||
std::make_pair( error_code_client_invalid_operation, ExceptionContract::requiredIf(
|
std::make_pair( error_code_client_invalid_operation, ExceptionContract::requiredIf(
|
||||||
|
|
|
@ -137,7 +137,7 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for(i = 0; i < g_random->randomInt(self->minOperationsPerTransaction,self->maxOperationsPerTransaction+1); i++) {
|
for(i = 0; i < g_random->randomInt(self->minOperationsPerTransaction,self->maxOperationsPerTransaction+1); i++) {
|
||||||
j = g_random->randomInt(0,15);
|
j = g_random->randomInt(0,16);
|
||||||
if( j < 3 ) {
|
if( j < 3 ) {
|
||||||
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
||||||
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
||||||
|
@ -236,30 +236,30 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//else if( j < 8 ) {
|
|
||||||
// myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
|
||||||
// myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
|
||||||
// myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
|
||||||
// //TraceEvent("RYOWappendIfFits").detail("Key",myKeyA).detail("Value", myValue);
|
|
||||||
// trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::AppendIfFits);
|
|
||||||
//
|
|
||||||
// loop {
|
|
||||||
// try {
|
|
||||||
// tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());
|
|
||||||
// tr.atomicOp(StringRef(clientID + "d/" + myKeyA), myValue, MutationRef::AppendIfFits);
|
|
||||||
// Void _ = wait( tr.commit() );
|
|
||||||
// break;
|
|
||||||
// } catch (Error& e) {
|
|
||||||
// error = e;
|
|
||||||
// Void _ = wait( tr.onError(e) );
|
|
||||||
// if(error.code() == error_code_commit_unknown_result) {
|
|
||||||
// Optional<Value> thing = wait(tr.get(StringRef(clientID+"z/"+myRandomIDKey)));
|
|
||||||
// if (thing.present()) break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
else if( j < 8 ) {
|
else if( j < 8 ) {
|
||||||
|
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
||||||
|
myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
||||||
|
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
||||||
|
//TraceEvent("RYOWappendIfFits").detail("Key",myKeyA).detail("Value", myValue);
|
||||||
|
trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::AppendIfFits);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());
|
||||||
|
tr.atomicOp(StringRef(clientID + "d/" + myKeyA), myValue, MutationRef::AppendIfFits);
|
||||||
|
Void _ = wait( tr.commit() );
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
error = e;
|
||||||
|
Void _ = wait( tr.onError(e) );
|
||||||
|
if(error.code() == error_code_commit_unknown_result) {
|
||||||
|
Optional<Value> thing = wait(tr.get(StringRef(clientID+"z/"+myRandomIDKey)));
|
||||||
|
if (thing.present()) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if( j < 9 ) {
|
||||||
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
||||||
myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
||||||
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
||||||
|
@ -282,7 +282,7 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if( j < 9 ) {
|
else if( j < 10 ) {
|
||||||
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
||||||
myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
||||||
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
||||||
|
@ -305,7 +305,7 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if( j < 10 ) {
|
else if( j < 11 ) {
|
||||||
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
|
||||||
myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
myRandomIDKey = format( "%010d", g_random->randomInt(0, 1000000000) );
|
||||||
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
|
||||||
|
@ -328,7 +328,7 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (j < 11) {
|
else if (j < 12) {
|
||||||
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
||||||
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
||||||
myValue = format("%d", g_random->randomInt(0, 10000000));
|
myValue = format("%d", g_random->randomInt(0, 10000000));
|
||||||
|
@ -352,7 +352,7 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (j < 12) {
|
else if (j < 13) {
|
||||||
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
||||||
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
||||||
myValue = format("%d", g_random->randomInt(0, 10000000));
|
myValue = format("%d", g_random->randomInt(0, 10000000));
|
||||||
|
@ -376,7 +376,7 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (j < 13) {
|
else if (j < 14) {
|
||||||
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
||||||
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
||||||
myValue = format("%d", g_random->randomInt(0, 10000000));
|
myValue = format("%d", g_random->randomInt(0, 10000000));
|
||||||
|
@ -400,7 +400,7 @@ struct RandomSelectorWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (j < 14) {
|
else if (j < 15) {
|
||||||
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
|
||||||
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
|
||||||
myValue = format("%d", g_random->randomInt(0, 10000000));
|
myValue = format("%d", g_random->randomInt(0, 10000000));
|
||||||
|
|
Loading…
Reference in New Issue