Merge commit '4038bd2fd968d88861f2cebd442ce511724816cb' into feature-remote-logs
# Conflicts: # fdbserver/ClusterController.actor.cpp # fdbserver/Knobs.cpp
This commit is contained in:
commit
42405c78a5
|
@ -2279,6 +2279,8 @@ int main(int argc, char* argv[]) {
|
|||
bool dryRun = false;
|
||||
std::string traceDir = "";
|
||||
std::string traceLogGroup;
|
||||
uint64_t traceRollSize = TRACE_DEFAULT_ROLL_SIZE;
|
||||
uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE;
|
||||
ESOError lastError;
|
||||
bool partial = true;
|
||||
LocalityData localities;
|
||||
|
@ -2706,6 +2708,14 @@ int main(int argc, char* argv[]) {
|
|||
}
|
||||
}
|
||||
|
||||
// Opens a trace file if trace is set (and if a trace file isn't already open)
|
||||
// For most modes, initCluster() will open a trace file, but some fdbbackup operations do not require
|
||||
// a cluster so they should use this instead.
|
||||
auto initTraceFile = [&]() {
|
||||
if(trace)
|
||||
openTraceFile(NetworkAddress(), traceRollSize, traceMaxLogsSize, traceDir, "trace", traceLogGroup);
|
||||
};
|
||||
|
||||
auto initCluster = [&](bool quiet = false) {
|
||||
auto resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(clusterFile);
|
||||
try {
|
||||
|
@ -2818,6 +2828,7 @@ int main(int argc, char* argv[]) {
|
|||
break;
|
||||
|
||||
case BACKUP_EXPIRE:
|
||||
initTraceFile();
|
||||
// Must have a usable cluster if either expire DateTime options were used
|
||||
if(!expireDatetime.empty() || !expireRestorableAfterDatetime.empty()) {
|
||||
if(!initCluster())
|
||||
|
@ -2827,10 +2838,12 @@ int main(int argc, char* argv[]) {
|
|||
break;
|
||||
|
||||
case BACKUP_DELETE:
|
||||
initTraceFile();
|
||||
f = stopAfter( deleteBackupContainer(argv[0], destinationContainer) );
|
||||
break;
|
||||
|
||||
case BACKUP_DESCRIBE:
|
||||
initTraceFile();
|
||||
// If timestamp lookups are desired, require a cluster file
|
||||
if(describeTimestamps && !initCluster())
|
||||
return FDB_EXIT_ERROR;
|
||||
|
@ -2839,6 +2852,7 @@ int main(int argc, char* argv[]) {
|
|||
f = stopAfter( describeBackup(argv[0], destinationContainer, describeDeep, describeTimestamps ? Optional<Database>(db) : Optional<Database>()) );
|
||||
break;
|
||||
case BACKUP_LIST:
|
||||
initTraceFile();
|
||||
f = stopAfter( listBackup(baseUrl) );
|
||||
break;
|
||||
|
||||
|
|
|
@ -1329,23 +1329,23 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
|
|||
state int64_t versionShift = g_random->randomInt64(0, std::numeric_limits<Version>::max() - 500);
|
||||
|
||||
state Reference<IBackupFile> log1 = wait(c->writeLogFile(100 + versionShift, 150 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, log1, 0));
|
||||
|
||||
state Reference<IBackupFile> log2 = wait(c->writeLogFile(150 + versionShift, 300 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000)));
|
||||
|
||||
state Reference<IBackupFile> range1 = wait(c->writeRangeFile(160 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000)));
|
||||
|
||||
state Reference<IBackupFile> range2 = wait(c->writeRangeFile(300 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000)));
|
||||
|
||||
state Reference<IBackupFile> range3 = wait(c->writeRangeFile(310 + versionShift, 10));
|
||||
Void _ = wait(writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000)));
|
||||
|
||||
Void _ = wait(c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size()));
|
||||
Void _ = wait(
|
||||
writeAndVerifyFile(c, log1, 0)
|
||||
&& writeAndVerifyFile(c, log2, g_random->randomInt(0, 10000000))
|
||||
&& writeAndVerifyFile(c, range1, g_random->randomInt(0, 1000))
|
||||
&& writeAndVerifyFile(c, range2, g_random->randomInt(0, 100000))
|
||||
&& writeAndVerifyFile(c, range3, g_random->randomInt(0, 3000000))
|
||||
);
|
||||
|
||||
Void _ = wait(c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size()));
|
||||
Void _ = wait(
|
||||
c->writeKeyspaceSnapshotFile({range1->getFileName(), range2->getFileName()}, range1->size() + range2->size())
|
||||
&& c->writeKeyspaceSnapshotFile({range3->getFileName()}, range3->size())
|
||||
);
|
||||
|
||||
printf("Checking file list dump\n");
|
||||
FullBackupListing listing = wait(c->dumpFileList());
|
||||
|
|
|
@ -132,10 +132,14 @@ public:
|
|||
Standalone<StringRef> dbName;
|
||||
Standalone<StringRef> dbId;
|
||||
|
||||
int64_t transactionsReadVersions;
|
||||
int64_t transactionReadVersions;
|
||||
int64_t transactionLogicalReads;
|
||||
int64_t transactionPhysicalReads;
|
||||
int64_t transactionCommittedMutations;
|
||||
int64_t transactionCommittedMutationBytes;
|
||||
int64_t transactionsCommitStarted;
|
||||
int64_t transactionsCommitCompleted;
|
||||
int64_t transactionsPastVersions;
|
||||
int64_t transactionsTooOld;
|
||||
int64_t transactionsFutureVersions;
|
||||
int64_t transactionsNotCommitted;
|
||||
int64_t transactionsMaybeCommitted;
|
||||
|
|
|
@ -1799,10 +1799,17 @@ namespace fileBackup {
|
|||
state bool stopWhenDone;
|
||||
state Optional<Version> restorableVersion;
|
||||
state EBackupState backupState;
|
||||
state Optional<std::string> tag;
|
||||
|
||||
Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState));
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState)
|
||||
&& store(config.tag().get(tr), tag));
|
||||
|
||||
// If restorable, update the last restorable version for this tag
|
||||
if(restorableVersion.present() && tag.present()) {
|
||||
FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get());
|
||||
}
|
||||
|
||||
// If the backup is restorable but the state is not differential then set state to differential
|
||||
if(restorableVersion.present() && backupState != BackupAgentBase::STATE_DIFFERENTIAL)
|
||||
|
@ -2014,11 +2021,18 @@ namespace fileBackup {
|
|||
state EBackupState backupState;
|
||||
state Optional<Version> restorableVersion;
|
||||
state Optional<Version> firstSnapshotEndVersion;
|
||||
state Optional<std::string> tag;
|
||||
|
||||
Void _ = wait(store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
&& store(config.stateEnum().getOrThrow(tr), backupState)
|
||||
&& store(config.getLatestRestorableVersion(tr), restorableVersion)
|
||||
&& store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion));
|
||||
&& store(config.firstSnapshotEndVersion().get(tr), firstSnapshotEndVersion)
|
||||
&& store(config.tag().get(tr), tag));
|
||||
|
||||
// If restorable, update the last restorable version for this tag
|
||||
if(restorableVersion.present() && tag.present()) {
|
||||
FileBackupAgent().setLastRestorable(tr, StringRef(tag.get()), restorableVersion.get());
|
||||
}
|
||||
|
||||
if(!firstSnapshotEndVersion.present()) {
|
||||
config.firstSnapshotEndVersion().set(tr, Params.endVersion().get(task));
|
||||
|
|
|
@ -1242,7 +1242,7 @@ ThreadFuture<Reference<ICluster>> MultiVersionApi::createCluster(const char *clu
|
|||
lock.enter();
|
||||
if(!networkSetup) {
|
||||
lock.leave();
|
||||
throw network_not_setup();
|
||||
return network_not_setup();
|
||||
}
|
||||
lock.leave();
|
||||
|
||||
|
|
|
@ -206,10 +206,14 @@ ACTOR Future<Void> databaseLogger( DatabaseContext *cx ) {
|
|||
loop {
|
||||
Void _ = wait( delay( CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID ) );
|
||||
TraceEvent("TransactionMetrics")
|
||||
.detail("ReadVersions", cx->transactionsReadVersions)
|
||||
.detail("ReadVersions", cx->transactionReadVersions)
|
||||
.detail("LogicalUncachedReads", cx->transactionLogicalReads)
|
||||
.detail("PhysicalReadRequests", cx->transactionPhysicalReads)
|
||||
.detail("CommittedMutations", cx->transactionCommittedMutations)
|
||||
.detail("CommittedMutationBytes", cx->transactionCommittedMutationBytes)
|
||||
.detail("CommitStarted", cx->transactionsCommitStarted)
|
||||
.detail("CommitCompleted", cx->transactionsCommitCompleted)
|
||||
.detail("PastVersions", cx->transactionsPastVersions)
|
||||
.detail("TooOld", cx->transactionsTooOld)
|
||||
.detail("FutureVersions", cx->transactionsFutureVersions)
|
||||
.detail("NotCommitted", cx->transactionsNotCommitted)
|
||||
.detail("MaybeCommitted", cx->transactionsMaybeCommitted)
|
||||
|
@ -461,11 +465,12 @@ DatabaseContext::DatabaseContext(
|
|||
Standalone<StringRef> dbName, Standalone<StringRef> dbId,
|
||||
int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware )
|
||||
: clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId),
|
||||
transactionsReadVersions(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsPastVersions(0),
|
||||
transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID),
|
||||
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0), transactionsCommitStarted(0),
|
||||
transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID),
|
||||
outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) {
|
||||
logger = databaseLogger( this );
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000)
|
||||
{
|
||||
logger = databaseLogger( this );
|
||||
locationCacheSize = g_network->isSimulated() ?
|
||||
CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE_SIM :
|
||||
CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE;
|
||||
|
@ -1182,6 +1187,7 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
++cx->getValueSubmitted;
|
||||
startTime = timer_int();
|
||||
startTimeD = now();
|
||||
++cx->transactionPhysicalReads;
|
||||
state GetValueReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getValue, GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
|
||||
double latency = now() - startTimeD;
|
||||
cx->readLatencies.addSample(latency);
|
||||
|
@ -1248,6 +1254,7 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
|||
try {
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", printable(k.getKey())).detail("offset",k.offset).detail("orEqual",k.orEqual);
|
||||
++cx->transactionPhysicalReads;
|
||||
GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",printable(reply.sel.key)).detail("offset", reply.sel.offset).detail("orEqual", k.orEqual);
|
||||
|
@ -1410,6 +1417,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
|||
.detail("Reverse", reverse)
|
||||
.detail("Servers", locations[shard].second->description());*/
|
||||
}
|
||||
++cx->transactionPhysicalReads;
|
||||
GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
|
||||
|
@ -1686,6 +1694,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
|||
.detail("Servers", beginServer.second->description());*/
|
||||
}
|
||||
|
||||
++cx->transactionPhysicalReads;
|
||||
GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
|
||||
|
||||
if( info.debugID.present() ) {
|
||||
|
@ -1864,6 +1873,7 @@ void Transaction::setVersion( Version v ) {
|
|||
}
|
||||
|
||||
Future<Optional<Value>> Transaction::get( const Key& key, bool snapshot ) {
|
||||
++cx->transactionLogicalReads;
|
||||
//ASSERT (key < allKeys.end);
|
||||
|
||||
//There are no keys in the database with size greater than KEY_SIZE_LIMIT
|
||||
|
@ -1950,6 +1960,7 @@ ACTOR Future< Standalone< VectorRef< const char*>>> getAddressesForKeyActor( Key
|
|||
}
|
||||
|
||||
Future< Standalone< VectorRef< const char*>>> Transaction::getAddressesForKey( const Key& key ) {
|
||||
++cx->transactionLogicalReads;
|
||||
auto ver = getReadVersion();
|
||||
|
||||
return getAddressesForKeyActor(key, ver, cx, info);
|
||||
|
@ -1972,6 +1983,7 @@ ACTOR Future< Key > getKeyAndConflictRange(
|
|||
}
|
||||
|
||||
Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) {
|
||||
++cx->transactionLogicalReads;
|
||||
if( snapshot )
|
||||
return ::getKey(cx, key, getReadVersion(), info);
|
||||
|
||||
|
@ -1987,6 +1999,8 @@ Future< Standalone<RangeResultRef> > Transaction::getRange(
|
|||
bool snapshot,
|
||||
bool reverse )
|
||||
{
|
||||
++cx->transactionLogicalReads;
|
||||
|
||||
if( limits.isReached() )
|
||||
return Standalone<RangeResultRef>();
|
||||
|
||||
|
@ -2403,6 +2417,8 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
|
||||
tr->numErrors = 0;
|
||||
cx->transactionsCommitCompleted++;
|
||||
cx->transactionCommittedMutations += req.transaction.mutations.size();
|
||||
cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize();
|
||||
|
||||
if(info.debugID.present())
|
||||
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
|
||||
|
@ -2766,7 +2782,7 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, Reference<Transact
|
|||
}
|
||||
|
||||
Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
||||
cx->transactionsReadVersions++;
|
||||
cx->transactionReadVersions++;
|
||||
flags |= options.getReadVersionFlags;
|
||||
|
||||
auto& batcher = cx->versionBatcher[ flags ];
|
||||
|
@ -2811,7 +2827,7 @@ Future<Void> Transaction::onError( Error const& e ) {
|
|||
e.code() == error_code_future_version)
|
||||
{
|
||||
if( e.code() == error_code_transaction_too_old )
|
||||
cx->transactionsPastVersions++;
|
||||
cx->transactionsTooOld++;
|
||||
else if( e.code() == error_code_future_version )
|
||||
cx->transactionsFutureVersions++;
|
||||
|
||||
|
|
|
@ -1307,7 +1307,7 @@ Future< Standalone<VectorRef<const char*> >> ReadYourWritesTransaction::getAddre
|
|||
|
||||
// If key >= allKeys.end, then our resulting address vector will be empty.
|
||||
|
||||
Future< Standalone<VectorRef<const char*> >> result = tr.getAddressesForKey(key);
|
||||
Future< Standalone<VectorRef<const char*> >> result = waitOrError(tr.getAddressesForKey(key), resetPromise.getFuture());
|
||||
reading.add( success( result ) );
|
||||
return result;
|
||||
}
|
||||
|
@ -1681,6 +1681,14 @@ Future<Void> ReadYourWritesTransaction::commit() {
|
|||
return RYWImpl::commit( this );
|
||||
}
|
||||
|
||||
Future<Standalone<StringRef>> ReadYourWritesTransaction::getVersionstamp() {
|
||||
if(checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
}
|
||||
|
||||
return waitOrError(tr.getVersionstamp(), resetPromise.getFuture());
|
||||
}
|
||||
|
||||
void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
|
||||
switch(option) {
|
||||
case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
|
||||
|
|
|
@ -103,13 +103,7 @@ public:
|
|||
|
||||
Future<Void> commit();
|
||||
Version getCommittedVersion() { return tr.getCommittedVersion(); }
|
||||
Future<Standalone<StringRef>> getVersionstamp() {
|
||||
if(checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
}
|
||||
|
||||
return tr.getVersionstamp();
|
||||
}
|
||||
Future<Standalone<StringRef>> getVersionstamp();
|
||||
|
||||
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
|
||||
|
||||
|
|
|
@ -146,98 +146,113 @@ void JSONDoc::mergeInto(json_spirit::mObject &dst, const json_spirit::mObject &s
|
|||
}
|
||||
}
|
||||
|
||||
void JSONDoc::mergeValueInto(json_spirit::mValue &d, const json_spirit::mValue &s) {
|
||||
if(s.is_null())
|
||||
void JSONDoc::mergeValueInto(json_spirit::mValue &dst, const json_spirit::mValue &src) {
|
||||
if(src.is_null())
|
||||
return;
|
||||
|
||||
if(d.is_null()) {
|
||||
d = s;
|
||||
if(dst.is_null()) {
|
||||
dst = src;
|
||||
return;
|
||||
}
|
||||
|
||||
if(d.type() != s.type()) {
|
||||
// Skip errors already found
|
||||
if(d.type() == json_spirit::obj_type && d.get_obj().count("ERROR"))
|
||||
return;
|
||||
d = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", d}, {"b", s}});
|
||||
// Do nothing if d is already an error
|
||||
if(dst.type() == json_spirit::obj_type && dst.get_obj().count("ERROR"))
|
||||
return;
|
||||
|
||||
if(dst.type() != src.type()) {
|
||||
dst = json_spirit::mObject({{"ERROR", "Incompatible types."}, {"a", dst}, {"b", src}});
|
||||
return;
|
||||
}
|
||||
|
||||
switch(d.type()) {
|
||||
switch(dst.type()) {
|
||||
case json_spirit::obj_type:
|
||||
{
|
||||
std::string op = getOperator(d.get_obj());
|
||||
|
||||
//printf("Operator: %s\n", op.c_str());
|
||||
// Refs to the objects, for convenience.
|
||||
json_spirit::mObject &aObj = dst.get_obj();
|
||||
const json_spirit::mObject &bObj = src.get_obj();
|
||||
|
||||
const std::string &op = getOperator(aObj);
|
||||
const std::string &opB = getOperator(bObj);
|
||||
|
||||
// Operators must be the same, which could mean both are empty (if these objects are not operators)
|
||||
if(op != opB) {
|
||||
dst = json_spirit::mObject({ {"ERROR", "Operators do not match"}, {"a", dst}, {"b", src} });
|
||||
break;
|
||||
}
|
||||
|
||||
// If objects are not operators then defer to mergeInto
|
||||
if(op.empty()) {
|
||||
mergeInto(d.get_obj(), s.get_obj());
|
||||
mergeInto(dst.get_obj(), src.get_obj());
|
||||
break;
|
||||
}
|
||||
|
||||
// Refs to the operator objects to combine
|
||||
const json_spirit::mObject &op_a = d.get_obj();
|
||||
const json_spirit::mObject &op_b = s.get_obj();
|
||||
|
||||
if(!op_b.count(op)) {
|
||||
d = json_spirit::mObject({{"ERROR", "Operators do not match"}, {"s", s}, {"d", d}});
|
||||
break;
|
||||
}
|
||||
|
||||
const json_spirit::mValue &a = d.get_obj().at(op);
|
||||
const json_spirit::mValue &b = s.get_obj().at(op);
|
||||
// Get the operator values
|
||||
json_spirit::mValue &a = aObj.at(op);
|
||||
const json_spirit::mValue &b = bObj.at(op);
|
||||
|
||||
// First try the operators that are type-agnostic
|
||||
try {
|
||||
d = mergeOperator<json_spirit::mValue>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperator<json_spirit::mValue>(op, aObj, bObj, a, b);
|
||||
return;
|
||||
} catch(std::exception &e) {
|
||||
}
|
||||
|
||||
// If that didn't work, the types must match or we have no operators left to try.
|
||||
// Now try type and type pair specific operators
|
||||
// First, if types are incompatible try to make them compatible or return an error
|
||||
if(a.type() != b.type()) {
|
||||
d = json_spirit::mObject({{"ERROR", "Types do not match"}, {"s", s}, {"d", d}});
|
||||
return;
|
||||
// It's actually okay if the type mismatch is double vs int since once can be converted to the other.
|
||||
if( (a.type() == json_spirit::int_type && b.type() == json_spirit::real_type)
|
||||
|| (b.type() == json_spirit::int_type && a.type() == json_spirit::real_type) )
|
||||
{
|
||||
// Convert d's op value (which a is a reference to) to a double so that the
|
||||
// switch block below will do the operation with doubles.
|
||||
a = a.get_real();
|
||||
}
|
||||
else {
|
||||
// Otherwise, output an error as the types do not match
|
||||
dst = json_spirit::mObject({{"ERROR", "Incompatible operator value types"}, {"a", dst}, {"b", src}});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Now try the type-specific operators.
|
||||
try {
|
||||
switch(a.type()) {
|
||||
case json_spirit::bool_type:
|
||||
d = mergeOperatorWrapper<bool>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<bool>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::int_type:
|
||||
d = mergeOperatorWrapper<int64_t>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<int64_t>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::real_type:
|
||||
d = mergeOperatorWrapper<double>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<double>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::str_type:
|
||||
d = mergeOperatorWrapper<std::string>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<std::string>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::array_type:
|
||||
d = mergeOperatorWrapper<json_spirit::mArray>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<json_spirit::mArray>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::obj_type:
|
||||
d = mergeOperatorWrapper<json_spirit::mObject>(op, op_a, op_b, a, b);
|
||||
dst = mergeOperatorWrapper<json_spirit::mObject>(op, aObj, bObj, a, b);
|
||||
break;
|
||||
case json_spirit::null_type:
|
||||
break;
|
||||
}
|
||||
} catch(...) {
|
||||
d = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}});
|
||||
dst = json_spirit::mObject({{"ERROR", "Unsupported operator / value type combination."}, {"operator", op}, {"type", a.type()}});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case json_spirit::array_type:
|
||||
for(auto &ai : s.get_array())
|
||||
d.get_array().push_back(ai);
|
||||
for(auto &ai : src.get_array())
|
||||
dst.get_array().push_back(ai);
|
||||
break;
|
||||
|
||||
default:
|
||||
if(d != s)
|
||||
d = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", d}, {"b", s}});
|
||||
if(dst != src)
|
||||
dst = json_spirit::mObject({{"ERROR", "Values do not match."}, {"a", dst}, {"b", src}});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -221,21 +221,27 @@ ACTOR Future<Void> deleteRecursively_impl(Reference<BlobStoreEndpoint> b, std::s
|
|||
state Future<Void> done = b->listBucketStream(bucket, resultStream, prefix, '/', std::numeric_limits<int>::max());
|
||||
// Wrap done in an actor which will send end_of_stream since listBucketStream() does not (so that many calls can write to the same stream)
|
||||
done = map(done, [=](Void) {
|
||||
resultStream.sendError(end_of_stream());
|
||||
return Void();
|
||||
});
|
||||
resultStream.sendError(end_of_stream());
|
||||
return Void();
|
||||
});
|
||||
|
||||
state std::list<Future<Void>> deleteFutures;
|
||||
try {
|
||||
loop {
|
||||
BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture());
|
||||
for(auto &object : list.objects) {
|
||||
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
|
||||
deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void {
|
||||
if(pNumDeletedCopy != nullptr)
|
||||
++*pNumDeletedCopy;
|
||||
return Void();
|
||||
}));
|
||||
choose {
|
||||
// Throw if done throws, otherwise don't stop until end_of_stream
|
||||
when(Void _ = wait(done)) {}
|
||||
|
||||
when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) {
|
||||
for(auto &object : list.objects) {
|
||||
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
|
||||
deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void {
|
||||
if(pNumDeletedCopy != nullptr)
|
||||
++*pNumDeletedCopy;
|
||||
return Void();
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is just a precaution to avoid having too many outstanding delete actors waiting to run
|
||||
|
@ -290,35 +296,35 @@ Future<int64_t> BlobStoreEndpoint::objectSize(std::string const &bucket, std::st
|
|||
// Try to read a file, parse it as JSON, and return the resulting document.
|
||||
// It will NOT throw if any errors are encountered, it will just return an empty
|
||||
// JSON object and will log trace events for the errors encountered.
|
||||
ACTOR Future<json_spirit::mObject> tryReadJSONFile(std::string path) {
|
||||
ACTOR Future<Optional<json_spirit::mObject>> tryReadJSONFile(std::string path) {
|
||||
state std::string content;
|
||||
|
||||
// Event type to be logged in the event of an exception
|
||||
state const char *errorEventType = "BlobCredentialFileError";
|
||||
|
||||
try {
|
||||
state Reference<IAsyncFile> f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY, 0));
|
||||
state Reference<IAsyncFile> f = wait(IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0));
|
||||
state int64_t size = wait(f->size());
|
||||
state Standalone<StringRef> buf = makeString(size);
|
||||
int r = wait(f->read(mutateString(buf), size, 0));
|
||||
ASSERT(r == size);
|
||||
content = buf.toString();
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
TraceEvent(SevWarn, "BlobCredentialFileError").detail("File", path).error(e).suppressFor(60, true);
|
||||
return json_spirit::mObject();
|
||||
}
|
||||
|
||||
try {
|
||||
// Any exceptions from hehre forward are parse failures
|
||||
errorEventType = "BlobCredentialFileParseFailed";
|
||||
json_spirit::mValue json;
|
||||
json_spirit::read_string(content, json);
|
||||
if(json.type() == json_spirit::obj_type)
|
||||
return json.get_obj();
|
||||
else
|
||||
TraceEvent(SevWarn, "BlobCredentialFileNotJSONObject").detail("File", path).suppressFor(60, true);
|
||||
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
TraceEvent(SevWarn, "BlobCredentialFileParseFailed").detail("File", path).error(e).suppressFor(60, true);
|
||||
TraceEvent(SevWarn, errorEventType).detail("File", path).error(e).suppressFor(60, true);
|
||||
}
|
||||
|
||||
return json_spirit::mObject();
|
||||
return Optional<json_spirit::mObject>();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
||||
|
@ -326,7 +332,7 @@ ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
|||
if(pFiles == nullptr)
|
||||
return Void();
|
||||
|
||||
state std::vector<Future<json_spirit::mObject>> reads;
|
||||
state std::vector<Future<Optional<json_spirit::mObject>>> reads;
|
||||
for(auto &f : *pFiles)
|
||||
reads.push_back(tryReadJSONFile(f));
|
||||
|
||||
|
@ -334,13 +340,22 @@ ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
|||
|
||||
std::string key = b->key + "@" + b->host;
|
||||
|
||||
int invalid = 0;
|
||||
|
||||
for(auto &f : reads) {
|
||||
JSONDoc doc(f.get());
|
||||
// If value not present then the credentials file wasn't readable or valid. Continue to check other results.
|
||||
if(!f.get().present()) {
|
||||
++invalid;
|
||||
continue;
|
||||
}
|
||||
|
||||
JSONDoc doc(f.get().get());
|
||||
if(doc.has("accounts") && doc.last().type() == json_spirit::obj_type) {
|
||||
JSONDoc accounts(doc.last().get_obj());
|
||||
if(accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) {
|
||||
JSONDoc account(accounts.last());
|
||||
std::string secret;
|
||||
// Once we find a matching account, use it.
|
||||
if(account.tryGet("secret", secret)) {
|
||||
b->secret = secret;
|
||||
return Void();
|
||||
|
@ -349,7 +364,12 @@ ACTOR Future<Void> updateSecret_impl(Reference<BlobStoreEndpoint> b) {
|
|||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
// If any sources were invalid
|
||||
if(invalid > 0)
|
||||
throw backup_auth_unreadable();
|
||||
|
||||
// All sources were valid but didn't contain the desired info
|
||||
throw backup_auth_missing();
|
||||
}
|
||||
|
||||
Future<Void> BlobStoreEndpoint::updateSecret() {
|
||||
|
@ -451,12 +471,9 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
|
|||
rconn.conn.clear();
|
||||
|
||||
} catch(Error &e) {
|
||||
// For timeouts, conn failure, or bad reponse reported by HTTP:doRequest, save the error and handle it / possibly retry below.
|
||||
// Any other error is rethrown.
|
||||
if(e.code() == error_code_connection_failed || e.code() == error_code_timed_out || e.code() == error_code_http_bad_response)
|
||||
err = e;
|
||||
else
|
||||
if(e.code() == error_code_actor_cancelled)
|
||||
throw;
|
||||
err = e;
|
||||
}
|
||||
|
||||
// If err is not present then r is valid.
|
||||
|
@ -667,15 +684,21 @@ ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreE
|
|||
state Future<Void> done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter);
|
||||
// Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same stream
|
||||
done = map(done, [=](Void) {
|
||||
resultStream.sendError(end_of_stream());
|
||||
return Void();
|
||||
});
|
||||
resultStream.sendError(end_of_stream());
|
||||
return Void();
|
||||
});
|
||||
|
||||
try {
|
||||
loop {
|
||||
BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture());
|
||||
results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end());
|
||||
results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end());
|
||||
choose {
|
||||
// Throw if done throws, otherwise don't stop until end_of_stream
|
||||
when(Void _ = wait(done)) {}
|
||||
|
||||
when(BlobStoreEndpoint::ListResult info = waitNext(resultStream.getFuture())) {
|
||||
results.commonPrefixes.insert(results.commonPrefixes.end(), info.commonPrefixes.begin(), info.commonPrefixes.end());
|
||||
results.objects.insert(results.objects.end(), info.objects.begin(), info.objects.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_end_of_stream)
|
||||
|
|
|
@ -307,6 +307,9 @@ namespace HTTP {
|
|||
if(pContent == NULL)
|
||||
pContent = ∅
|
||||
|
||||
state bool earlyResponse = false;
|
||||
state int total_sent = 0;
|
||||
|
||||
try {
|
||||
// Write headers to a packet buffer chain
|
||||
PacketBuffer *pFirst = new PacketBuffer();
|
||||
|
@ -321,11 +324,25 @@ namespace HTTP {
|
|||
printf("Request Header: %s: %s\n", h.first.c_str(), h.second.c_str());
|
||||
}
|
||||
|
||||
state Reference<HTTP::Response> r(new HTTP::Response());
|
||||
state Future<Void> responseReading = r->read(conn, verb == "HEAD" || verb == "DELETE");
|
||||
|
||||
state double send_start = timer();
|
||||
state double total_sent = 0;
|
||||
|
||||
loop {
|
||||
Void _ = wait(conn->onWritable());
|
||||
Void _ = wait( delay( 0, TaskWriteSocket ) );
|
||||
|
||||
// If we already got a response, before finishing sending the request, then close the connection,
|
||||
// set the Connection header to "close" as a hint to the caller that this connection can't be used
|
||||
// again, and break out of the send loop.
|
||||
if(responseReading.isReady()) {
|
||||
conn->close();
|
||||
r->headers["Connection"] = "close";
|
||||
earlyResponse = true;
|
||||
break;
|
||||
}
|
||||
|
||||
state int trySend = CLIENT_KNOBS->HTTP_SEND_SIZE;
|
||||
Void _ = wait(sendRate->getAllowance(trySend));
|
||||
int len = conn->write(pContent->getUnsent(), trySend);
|
||||
|
@ -338,18 +355,20 @@ namespace HTTP {
|
|||
break;
|
||||
}
|
||||
|
||||
state Reference<HTTP::Response> r(new HTTP::Response());
|
||||
Void _ = wait(r->read(conn, verb == "HEAD" || verb == "DELETE"));
|
||||
Void _ = wait(responseReading);
|
||||
|
||||
double elapsed = timer() - send_start;
|
||||
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0)
|
||||
printf("[%s] HTTP code=%d, time=%fs %s %s [%u out, response content len %d]\n", conn->getDebugID().toString().c_str(), r->code, elapsed, verb.c_str(), resource.c_str(), (int)total_sent, (int)r->contentLen);
|
||||
printf("[%s] HTTP code=%d early=%d, time=%fs %s %s contentLen=%d [%d out, response content len %d]\n",
|
||||
conn->getDebugID().toString().c_str(), r->code, earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent, (int)r->contentLen);
|
||||
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 2)
|
||||
printf("[%s] HTTP RESPONSE: %s %s\n%s\n", conn->getDebugID().toString().c_str(), verb.c_str(), resource.c_str(), r->toString().c_str());
|
||||
return r;
|
||||
} catch(Error &e) {
|
||||
double elapsed = timer() - send_start;
|
||||
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0)
|
||||
printf("[%s] HTTP *ERROR*=%s, time=%fs %s %s [%u out]\n", conn->getDebugID().toString().c_str(), e.name(), elapsed, verb.c_str(), resource.c_str(), (int)total_sent);
|
||||
printf("[%s] HTTP *ERROR*=%s early=%d, time=%fs %s %s contentLen=%d [%d out]\n",
|
||||
conn->getDebugID().toString().c_str(), e.name(), earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -219,11 +219,12 @@ struct JSONDoc {
|
|||
return mergeOperator<T>(op, op_a, op_b, a.get_value<T>(), b.get_value<T>());
|
||||
}
|
||||
|
||||
static inline std::string getOperator(const json_spirit::mObject &obj) {
|
||||
static inline const std::string & getOperator(const json_spirit::mObject &obj) {
|
||||
static const std::string empty;
|
||||
for(auto &k : obj)
|
||||
if(!k.first.empty() && k.first[0] == '$')
|
||||
return k.first;
|
||||
return std::string();
|
||||
return empty;
|
||||
}
|
||||
|
||||
// Merge src into dest, applying merge operators
|
||||
|
|
|
@ -828,7 +828,7 @@ public:
|
|||
|
||||
if ( oldMasterFit < mworker.fitness )
|
||||
return false;
|
||||
if ( oldMasterFit > mworker.fitness )
|
||||
if ( oldMasterFit > mworker.fitness || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.worker.first.locality.processId() != clusterControllerProcessId ) )
|
||||
return true;
|
||||
|
||||
std::set<Optional<Key>> primaryDC;
|
||||
|
@ -966,7 +966,8 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
|
|||
std::map< Optional<Standalone<StringRef>>, int> id_used;
|
||||
id_used[cluster->clusterControllerProcessId]++;
|
||||
state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used);
|
||||
if( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
|
||||
if( ( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.first.locality.processId() == cluster->clusterControllerProcessId )
|
||||
&& now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
|
||||
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.worker.second.machineClassFitness( ProcessClass::Master ));
|
||||
Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
|
||||
continue;
|
||||
|
|
|
@ -260,8 +260,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0;
|
||||
init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0;
|
||||
init( CC_CHANGE_DELAY, 0.1 );
|
||||
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 );
|
||||
init( ATTEMPT_RECRUITMENT_DELAY, 0.05 );
|
||||
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 0.1 );
|
||||
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );
|
||||
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
|
||||
init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001;
|
||||
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
|
||||
|
|
|
@ -1945,6 +1945,8 @@ TEST_CASE("status/json/merging") {
|
|||
a.create("not_expired_and_merged.$expires.seven.$sum") = 1;
|
||||
a.create("not_expired_and_merged.$expires.one.$min") = 3;
|
||||
a.create("not_expired_and_merged.version") = 3;
|
||||
a.create("mixed_numeric_sum_6.$sum") = 0.5;
|
||||
a.create("mixed_numeric_min_0.$min") = 1.5;
|
||||
|
||||
b.create("int_one") = 1;
|
||||
b.create("int_unmatched") = 3;
|
||||
|
@ -1968,6 +1970,8 @@ TEST_CASE("status/json/merging") {
|
|||
b.create("latest_obj.timestamp") = 2;
|
||||
b.create("latest_int_5.$latest") = 7;
|
||||
b.create("latest_int_5.timestamp") = 2;
|
||||
b.create("mixed_numeric_sum_6.$sum") = 1;
|
||||
b.create("mixed_numeric_min_0.$min") = 4.5;
|
||||
|
||||
c.create("int_total_30.$sum") = 0;
|
||||
c.create("not_expired.$expires") = "I am still valid";
|
||||
|
@ -1983,18 +1987,25 @@ TEST_CASE("status/json/merging") {
|
|||
c.create("latest_obj.$latest.not_expired.$expires") = "Still alive.";
|
||||
c.create("latest_obj.$latest.not_expired.version") = 3;
|
||||
c.create("latest_obj.timestamp") = 3;
|
||||
b.create("latest_int_5.$latest") = 5;
|
||||
b.create("latest_int_5.timestamp") = 3;
|
||||
c.create("latest_int_5.$latest") = 5;
|
||||
c.create("latest_int_5.timestamp") = 3;
|
||||
c.create("mixed_numeric_sum_6.$sum") = 4.5;
|
||||
c.create("mixed_numeric_min_0.$min") = (double)0.0;
|
||||
|
||||
printf("a = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str());
|
||||
printf("b = \n%s\n", json_spirit::write_string(json_spirit::mValue(objB), json_spirit::pretty_print).c_str());
|
||||
printf("c = \n%s\n", json_spirit::write_string(json_spirit::mValue(objC), json_spirit::pretty_print).c_str());
|
||||
|
||||
JSONDoc::expires_reference_version = 2;
|
||||
a.absorb(b);
|
||||
a.absorb(c);
|
||||
a.cleanOps();
|
||||
printf("result = \n%s\n", json_spirit::write_string(json_spirit::mValue(objA), json_spirit::pretty_print).c_str());
|
||||
std::string result = json_spirit::write_string(json_spirit::mValue(objA));
|
||||
std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}";
|
||||
std::string expected = "{\"a\":\"justA\",\"b\":\"justB\",\"bool_true\":true,\"expired\":null,\"int_one\":1,\"int_total_30\":30,\"int_unmatched\":{\"ERROR\":\"Values do not match.\",\"a\":2,\"b\":3},\"last_hello\":\"hello\",\"latest_int_5\":5,\"latest_obj\":{\"a\":\"a\",\"b\":\"b\",\"not_expired\":\"Still alive.\"},\"mixed_numeric_min_0\":0,\"mixed_numeric_sum_6\":6,\"not_expired\":\"I am still valid\",\"not_expired_and_merged\":{\"one\":1,\"seven\":7},\"string\":\"test\",\"subdoc\":{\"double_max_5\":5,\"double_min_2\":2,\"int_11\":11,\"obj_count_3\":3}}";
|
||||
|
||||
if(result != expected) {
|
||||
printf("ERROR: Combined doc does not match expected.\nexpected: %s\nresult: %s\n", expected.c_str(), result.c_str());
|
||||
printf("ERROR: Combined doc does not match expected.\nexpected:\n\n%s\nresult:\n%s\n", expected.c_str(), result.c_str());
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
|
|
|
@ -648,7 +648,7 @@ ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> waitForVersionNoPastVersion( StorageServer* data, Version version ) {
|
||||
ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
|
||||
// This could become an Actor transparently, but for now it just does the lookup
|
||||
if (version == latestVersion)
|
||||
version = std::max(Version(1), data->version.get());
|
||||
|
@ -752,7 +752,7 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
|||
if( req.debugID.present() )
|
||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
||||
Version version = wait( waitForVersionNoPastVersion( data, req.version ) );
|
||||
Version version = wait( waitForVersionNoTooOld( data, req.version ) );
|
||||
if( req.debugID.present() )
|
||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
||||
|
@ -1655,13 +1655,13 @@ void coalesceShards(StorageServer *data, KeyRangeRef keys) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version version, KeyRangeRef keys, GetRangeLimits limits, bool* isPastVersion ) {
|
||||
ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version version, KeyRangeRef keys, GetRangeLimits limits, bool* isTooOld ) {
|
||||
state Transaction tr( cx );
|
||||
state Standalone<RangeResultRef> output;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual( keys.begin );
|
||||
state KeySelectorRef end = firstGreaterOrEqual( keys.end );
|
||||
|
||||
if( *isPastVersion )
|
||||
if( *isTooOld )
|
||||
throw transaction_too_old();
|
||||
|
||||
tr.setVersion( version );
|
||||
|
@ -1703,7 +1703,7 @@ ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version versi
|
|||
} catch( Error &e ) {
|
||||
if( begin.getKey() != keys.begin && ( e.code() == error_code_transaction_too_old || e.code() == error_code_future_version ) ) {
|
||||
if( e.code() == error_code_transaction_too_old )
|
||||
*isPastVersion = true;
|
||||
*isTooOld = true;
|
||||
output.more = true;
|
||||
if( begin.isFirstGreaterOrEqual() )
|
||||
output.readThrough = begin.getKey();
|
||||
|
@ -1800,13 +1800,13 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
// Get the history
|
||||
state int debug_getRangeRetries = 0;
|
||||
state int debug_nextRetryToLog = 1;
|
||||
state bool isPastVersion = false;
|
||||
state bool isTooOld = false;
|
||||
|
||||
loop {
|
||||
try {
|
||||
TEST(true); // Fetching keys for transferred shard
|
||||
|
||||
state Standalone<RangeResultRef> this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isPastVersion ) );
|
||||
state Standalone<RangeResultRef> this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isTooOld ) );
|
||||
|
||||
int expectedSize = (int)this_block.expectedSize() + (8-(int)sizeof(KeyValueRef))*this_block.size();
|
||||
|
||||
|
@ -1877,7 +1877,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
Void _ = wait( delayJittered( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) );
|
||||
Version lastFV = fetchVersion;
|
||||
fetchVersion = data->version.get();
|
||||
isPastVersion = false;
|
||||
isTooOld = false;
|
||||
|
||||
// Throw away deferred updates from before fetchVersion, since we don't need them to use blocks fetched at that version
|
||||
while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front();
|
||||
|
|
|
@ -30,13 +30,13 @@ struct CycleWorkload : TestWorkload {
|
|||
Key keyPrefix;
|
||||
|
||||
vector<Future<Void>> clients;
|
||||
PerfIntCounter transactions, retries, pastVersionRetries, commitFailedRetries;
|
||||
PerfIntCounter transactions, retries, tooOldRetries, commitFailedRetries;
|
||||
PerfDoubleCounter totalLatency;
|
||||
|
||||
CycleWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx),
|
||||
transactions("Transactions"), retries("Retries"), totalLatency("Latency"),
|
||||
pastVersionRetries("Retries.past_version"), commitFailedRetries("Retries.commit_failed")
|
||||
tooOldRetries("Retries.too_old"), commitFailedRetries("Retries.commit_failed")
|
||||
{
|
||||
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
|
||||
transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 ) / clientCount;
|
||||
|
@ -69,7 +69,7 @@ struct CycleWorkload : TestWorkload {
|
|||
virtual void getMetrics( vector<PerfMetric>& m ) {
|
||||
m.push_back( transactions.getMetric() );
|
||||
m.push_back( retries.getMetric() );
|
||||
m.push_back( pastVersionRetries.getMetric() );
|
||||
m.push_back( tooOldRetries.getMetric() );
|
||||
m.push_back( commitFailedRetries.getMetric() );
|
||||
m.push_back( PerfMetric( "Avg Latency (ms)", 1000 * totalLatency.getValue() / transactions.getValue(), true ) );
|
||||
m.push_back( PerfMetric( "Read rows/simsec (approx)", transactions.getValue() * 3 / testDuration, false ) );
|
||||
|
@ -120,7 +120,7 @@ struct CycleWorkload : TestWorkload {
|
|||
//TraceEvent("CycleCommit");
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) ++self->pastVersionRetries;
|
||||
if (e.code() == error_code_transaction_too_old) ++self->tooOldRetries;
|
||||
else if (e.code() == error_code_not_committed) ++self->commitFailedRetries;
|
||||
Void _ = wait( tr.onError(e) );
|
||||
}
|
||||
|
|
|
@ -29,13 +29,13 @@ struct Increment : TestWorkload {
|
|||
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond;
|
||||
|
||||
vector<Future<Void>> clients;
|
||||
PerfIntCounter transactions, retries, pastVersionRetries, commitFailedRetries;
|
||||
PerfIntCounter transactions, retries, tooOldRetries, commitFailedRetries;
|
||||
PerfDoubleCounter totalLatency;
|
||||
|
||||
Increment(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx),
|
||||
transactions("Transactions"), retries("Retries"), totalLatency("Latency"),
|
||||
pastVersionRetries("Retries.past_version"), commitFailedRetries("Retries.commit_failed")
|
||||
tooOldRetries("Retries.too_old"), commitFailedRetries("Retries.commit_failed")
|
||||
{
|
||||
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
|
||||
transactionsPerSecond = getOption( options, LiteralStringRef("transactionsPerSecond"), 5000.0 );
|
||||
|
@ -68,7 +68,7 @@ struct Increment : TestWorkload {
|
|||
virtual void getMetrics( vector<PerfMetric>& m ) {
|
||||
m.push_back( transactions.getMetric() );
|
||||
m.push_back( retries.getMetric() );
|
||||
m.push_back( pastVersionRetries.getMetric() );
|
||||
m.push_back( tooOldRetries.getMetric() );
|
||||
m.push_back( commitFailedRetries.getMetric() );
|
||||
m.push_back( PerfMetric( "Avg Latency (ms)", 1000 * totalLatency.getValue() / transactions.getValue(), true ) );
|
||||
m.push_back( PerfMetric( "Read rows/simsec (approx)", transactions.getValue() * 3 / testDuration, false ) );
|
||||
|
@ -94,7 +94,7 @@ struct Increment : TestWorkload {
|
|||
Void _ = wait( tr.commit() );
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) ++self->pastVersionRetries;
|
||||
if (e.code() == error_code_transaction_too_old) ++self->tooOldRetries;
|
||||
else if (e.code() == error_code_not_committed) ++self->commitFailedRetries;
|
||||
Void _ = wait( tr.onError(e) );
|
||||
}
|
||||
|
|
|
@ -835,7 +835,7 @@ Future< Reference<IConnection> > Net2::connect( NetworkAddress toAddr ) {
|
|||
}
|
||||
|
||||
ACTOR static Future<std::vector<NetworkAddress>> resolveTCPEndpoint_impl( Net2 *self, std::string host, std::string service) {
|
||||
state Promise<std::vector<NetworkAddress>> result;
|
||||
Promise<std::vector<NetworkAddress>> result;
|
||||
|
||||
self->tcpResolver.async_resolve(tcp::resolver::query(host, service), [=](const boost::system::error_code &ec, tcp::resolver::iterator iter) {
|
||||
if(ec) {
|
||||
|
|
|
@ -701,7 +701,7 @@ void getDiskStatistics(std::string const& directory, uint64_t& currentIOs, uint6
|
|||
disk_stream.ignore( std::numeric_limits<std::streamsize>::max(), '\n');
|
||||
}
|
||||
|
||||
if(!g_network->isSimulated()) TraceEvent(SevWarnAlways, "DeviceNotFound").detail("Directory", directory);
|
||||
if(!g_network->isSimulated()) TraceEvent(SevWarnAlways, "GetDiskStatisticsDeviceNotFound").detail("Directory", directory);
|
||||
}
|
||||
|
||||
dev_t getDeviceId(std::string path) {
|
||||
|
|
|
@ -174,6 +174,8 @@ ERROR( backup_bad_block_size, 2313, "Backup file block size too small")
|
|||
ERROR( backup_invalid_url, 2314, "Backup Container URL invalid")
|
||||
ERROR( backup_invalid_info, 2315, "Backup Container URL invalid")
|
||||
ERROR( backup_cannot_expire, 2316, "Cannot expire requested data from backup without violating minimum restorability")
|
||||
ERROR( backup_auth_missing, 2317, "Cannot find authentication details (such as a password or secret key) for the specified Backup Container URL")
|
||||
ERROR( backup_auth_unreadable, 2318, "Cannot read or parse one or more sources of authentication information for Backup Container URLs")
|
||||
ERROR( restore_invalid_version, 2361, "Invalid restore version")
|
||||
ERROR( restore_corrupted_data, 2362, "Corrupted backup data")
|
||||
ERROR( restore_missing_data, 2363, "Missing backup data")
|
||||
|
|
|
@ -1113,6 +1113,18 @@ ACTOR template <class T> void tagAndForwardError( Promise<T>* pOutputPromise, Er
|
|||
out.sendError(value);
|
||||
}
|
||||
|
||||
ACTOR template <class T> Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
|
||||
choose {
|
||||
when(T val = wait(f)) {
|
||||
return val;
|
||||
}
|
||||
when(Void _ = wait(errorSignal)) {
|
||||
ASSERT(false);
|
||||
throw internal_error();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
|
||||
// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between
|
||||
// wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{62160341-2769-4104-9D74-4235618775EA}'
|
||||
Id='{999DE80B-0F53-490D-849B-E8C7620939E2}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
|
Loading…
Reference in New Issue