Merge branch 'release-6.2' into dd-use-available-space

# Conflicts:
#	fdbserver/DataDistribution.actor.cpp
#	fdbserver/DataDistribution.actor.h
#	fdbserver/DataDistributionQueue.actor.cpp
This commit is contained in:
A.J. Beamon 2020-02-20 16:12:42 -08:00
commit e1fb568fd1
53 changed files with 627 additions and 415 deletions

View File

@ -54,7 +54,8 @@ type RangeOptions struct {
// Reverse indicates that the read should be performed in lexicographic
// (false) or reverse lexicographic (true) order. When Reverse is true and
// Limit is non-zero, the last Limit key-value pairs in the range are
// returned.
// returned. Reading ranges in reverse is supported natively by the
// database and should have minimal extra cost.
Reverse bool
}

View File

@ -184,7 +184,9 @@ public interface ReadTransaction extends ReadTransactionContext {
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
* should not limit the number of results. If {@code reverse} is {@code true} rows
* will be limited starting at the end of the range.
* @param reverse return results starting at the end of the range in reverse order
* @param reverse return results starting at the end of the range in reverse order.
* Reading ranges in reverse is supported natively by the database and should
* have minimal extra cost.
*
* @return a handle to access the results of the asynchronous call
*/
@ -205,7 +207,9 @@ public interface ReadTransaction extends ReadTransactionContext {
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
* should not limit the number of results. If {@code reverse} is {@code true} rows
* will be limited starting at the end of the range.
* @param reverse return results starting at the end of the range in reverse order
* @param reverse return results starting at the end of the range in reverse order.
* Reading ranges in reverse is supported natively by the database and should
* have minimal extra cost.
* @param mode provide a hint about how the results are to be used. This
* can provide speed improvements or efficiency gains based on the caller's
* knowledge of the upcoming access pattern.
@ -272,7 +276,9 @@ public interface ReadTransaction extends ReadTransactionContext {
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
* should not limit the number of results. If {@code reverse} is {@code true} rows
* will be limited starting at the end of the range.
* @param reverse return results starting at the end of the range in reverse order
* @param reverse return results starting at the end of the range in reverse order.
* Reading ranges in reverse is supported natively by the database and should
* have minimal extra cost.
*
* @return a handle to access the results of the asynchronous call
*/
@ -293,7 +299,9 @@ public interface ReadTransaction extends ReadTransactionContext {
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
* should not limit the number of results. If {@code reverse} is {@code true} rows
* will be limited starting at the end of the range.
* @param reverse return results starting at the end of the range in reverse order
* @param reverse return results starting at the end of the range in reverse order.
* Reading ranges in reverse is supported natively by the database and should
* have minimal extra cost.
* @param mode provide a hint about how the results are to be used. This
* can provide speed improvements or efficiency gains based on the caller's
* knowledge of the upcoming access pattern.
@ -369,7 +377,9 @@ public interface ReadTransaction extends ReadTransactionContext {
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
* should not limit the number of results. If {@code reverse} is {@code true} rows
* will be limited starting at the end of the range.
* @param reverse return results starting at the end of the range in reverse order
* @param reverse return results starting at the end of the range in reverse order.
* Reading ranges in reverse is supported natively by the database and should
* have minimal extra cost.
*
* @return a handle to access the results of the asynchronous call
*/
@ -393,7 +403,9 @@ public interface ReadTransaction extends ReadTransactionContext {
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
* should not limit the number of results. If {@code reverse} is {@code true} rows
* will be limited starting at the end of the range.
* @param reverse return results starting at the end of the range in reverse order
* @param reverse return results starting at the end of the range in reverse order.
* Reading ranges in reverse is supported natively by the database and should
* have minimal extra cost.
* @param mode provide a hint about how the results are to be used. This
* can provide speed improvements or efficiency gains based on the caller's
* knowledge of the upcoming access pattern.

View File

@ -530,8 +530,7 @@ Applications must provide error handling and an appropriate retry loop around th
|snapshot|
``reverse``
If non-zero, key-value pairs will be returned in reverse lexicographical order beginning at the end of the range.
If non-zero, key-value pairs will be returned in reverse lexicographical order beginning at the end of the range. Reading ranges in reverse is supported natively by the database and should have minimal extra cost.
.. type:: FDBStreamingMode

View File

@ -287,7 +287,7 @@ A |database-blurb1| |database-blurb2|
If ``limit`` is specified, then only the first ``limit`` keys (and their values) in the range will be returned.
If ``reverse`` is True, then the last ``limit`` keys in the range will be returned in reverse order.
If ``reverse`` is True, then the last ``limit`` keys in the range will be returned in reverse order. Reading ranges in reverse is supported natively by the database and should have minimal extra cost.
If ``streaming_mode`` is specified, it must be a value from the :data:`StreamingMode` enumeration. It provides a hint to FoundationDB about how to retrieve the specified range. This option should generally not be specified, allowing FoundationDB to retrieve the full range very efficiently.
@ -503,7 +503,7 @@ Reading data
If ``limit`` is specified, then only the first ``limit`` keys (and their values) in the range will be returned.
If ``reverse`` is True, then the last ``limit`` keys in the range will be returned in reverse order.
If ``reverse`` is True, then the last ``limit`` keys in the range will be returned in reverse order. Reading ranges in reverse is supported natively by the database and should have minimal extra cost.
If ``streaming_mode`` is specified, it must be a value from the :data:`StreamingMode` enumeration. It provides a hint to FoundationDB about how the returned container is likely to be used. The default is :data:`StreamingMode.iterator`.

View File

@ -285,7 +285,7 @@ A |database-blurb1| |database-blurb2|
Only the first ``limit`` keys (and their values) in the range will be returned.
``:reverse``
If ``true``, then the keys in the range will be returned in reverse order.
If ``true``, then the keys in the range will be returned in reverse order. Reading ranges in reverse is supported natively by the database and should have minimal extra cost.
If ``:limit`` is also specified, the *last* ``limit`` keys in the range will be returned in reverse order.
@ -463,7 +463,7 @@ Reading data
Only the first ``limit`` keys (and their values) in the range will be returned.
``:reverse``
If true, then the keys in the range will be returned in reverse order.
If ``true``, then the keys in the range will be returned in reverse order. Reading ranges in reverse is supported natively by the database and should have minimal extra cost.
If ``:limit`` is also specified, the *last* ``limit`` keys in the range will be returned in reverse order.

View File

@ -5,6 +5,11 @@ Release Notes
6.2.16
======
Performance
-----------
* Reverse range reads could read too much data from disk, resulting in poor performance relative to forward range reads. `(PR #2650) <https://github.com/apple/foundationdb./pull/2650>`_.
Fixes
-----

View File

@ -1600,9 +1600,9 @@ ACTOR Future<Void> timeWarning( double when, const char* msg ) {
return Void();
}
ACTOR Future<Void> checkStatus(Future<Void> f, Reference<ClusterConnectionFile> clusterFile, bool displayDatabaseAvailable = true) {
ACTOR Future<Void> checkStatus(Future<Void> f, Database db, bool displayDatabaseAvailable = true) {
wait(f);
StatusObject s = wait(StatusClient::statusFetcher(clusterFile));
StatusObject s = wait(StatusClient::statusFetcher(db));
printf("\n");
printStatus(s, StatusClient::MINIMAL, displayDatabaseAvailable);
printf("\n");
@ -1644,7 +1644,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
state Optional<ConfigureAutoResult> conf;
if( tokens[startToken] == LiteralStringRef("auto") ) {
StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( ccf )) );
StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( db )) );
if(warn.isValid())
warn.cancel();
@ -2061,7 +2061,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
}
if(!force) {
StatusObject status = wait( makeInterruptable( StatusClient::statusFetcher( ccf ) ) );
StatusObject status = wait( makeInterruptable( StatusClient::statusFetcher( db ) ) );
state std::string errorString = "ERROR: Could not calculate the impact of this exclude on the total free space in the cluster.\n"
"Please try the exclude again in 30 seconds.\n"
@ -2636,7 +2636,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if (!opt.exec.present()) {
if(opt.initialStatusCheck) {
Future<Void> checkStatusF = checkStatus(Void(), db->getConnectionFile());
Future<Void> checkStatusF = checkStatus(Void(), db);
wait(makeInterruptable(success(checkStatusF)));
}
else {
@ -2674,7 +2674,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
linenoise.historyAdd(line);
}
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db->getConnectionFile());
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db);
try {
state UID randomID = deterministicRandom()->randomUniqueID();
@ -2819,7 +2819,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
StatusObject s = wait(makeInterruptable(StatusClient::statusFetcher(db->getConnectionFile())));
StatusObject s = wait(makeInterruptable(StatusClient::statusFetcher(db)));
if (!opt.exec.present()) printf("\n");
printStatus(s, level);

View File

@ -1959,8 +1959,8 @@ public:
}
if (!g_network->isSimulated() && !forceAction) {
state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src->getConnectionFile()));
StatusObject destStatus = wait(StatusClient::statusFetcher(dest->getConnectionFile()));
state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src));
StatusObject destStatus = wait(StatusClient::statusFetcher(dest));
checkAtomicSwitchOverConfig(srcStatus, destStatus, tagName);
}

View File

@ -191,6 +191,10 @@ public:
Future<Void> clientInfoMonitor;
Future<Void> connected;
Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface;
Future<Void> statusLeaderMon;
double lastStatusFetch;
int apiVersion;
int mvCacheInsertLocation;

View File

@ -46,6 +46,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( CLIENT_EXAMPLE_AMOUNT, 20 );
init( MAX_CLIENT_STATUS_AGE, 1.0 );
init( MAX_PROXY_CONNECTIONS, 5 ); if( randomize && BUGGIFY ) MAX_PROXY_CONNECTIONS = 1;
init( STATUS_IDLE_TIMEOUT, 120.0 );
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin

View File

@ -45,6 +45,7 @@ public:
int CLIENT_EXAMPLE_AMOUNT;
double MAX_CLIENT_STATUS_AGE;
int MAX_PROXY_CONNECTIONS;
double STATUS_IDLE_TIMEOUT;
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)

View File

@ -1165,8 +1165,8 @@ Optional<Value> getValueFromJSON(StatusObject statusObj) {
}
}
ACTOR Future<Optional<Value>> getJSON(Reference<ClusterConnectionFile> clusterFile) {
StatusObject statusObj = wait(StatusClient::statusFetcher(clusterFile));
ACTOR Future<Optional<Value>> getJSON(Database db) {
StatusObject statusObj = wait(StatusClient::statusFetcher(db));
return getValueFromJSON(statusObj);
}
@ -1194,7 +1194,7 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
if (key == LiteralStringRef("\xff\xff/status/json")){
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getJSON(tr.getDatabase()->getConnectionFile());
return getJSON(tr.getDatabase());
}
else {
return Optional<Value>();

View File

@ -452,7 +452,7 @@ StatusObject getClientDatabaseStatus(StatusObjectReader client, StatusObjectRead
return databaseStatus;
}
ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f ) {
ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f, Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface) {
if (!g_network) throw network_not_setup();
state StatusObject statusObj;
@ -462,13 +462,10 @@ ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f
// This could be read from the JSON but doing so safely is ugly so using a real var.
state bool quorum_reachable = false;
state int coordinatorsFaultTolerance = 0;
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
try {
state int64_t clientTime = time(0);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(f, clusterInterface);
StatusObject _statusObjClient = wait(clientStatusFetcher(f, &clientMessages, &quorum_reachable, &coordinatorsFaultTolerance));
statusObjClient = _statusObjClient;
@ -548,6 +545,23 @@ ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f
return statusObj;
}
Future<StatusObject> StatusClient::statusFetcher( Reference<ClusterConnectionFile> clusterFile ) {
return statusFetcherImpl(clusterFile);
ACTOR Future<Void> timeoutMonitorLeader(Database db) {
state Future<Void> leadMon = monitorLeader<ClusterInterface>(db->getConnectionFile(), db->statusClusterInterface);
loop {
wait(delay(CLIENT_KNOBS->STATUS_IDLE_TIMEOUT + 0.00001 + db->lastStatusFetch - now()));
if(now() - db->lastStatusFetch > CLIENT_KNOBS->STATUS_IDLE_TIMEOUT) {
db->statusClusterInterface = Reference<AsyncVar<Optional<ClusterInterface>>>();
return Void();
}
}
}
Future<StatusObject> StatusClient::statusFetcher( Database db ) {
db->lastStatusFetch = now();
if(!db->statusClusterInterface) {
db->statusClusterInterface = Reference<AsyncVar<Optional<ClusterInterface>>>(new AsyncVar<Optional<ClusterInterface>>);
db->statusLeaderMon = timeoutMonitorLeader(db);
}
return statusFetcherImpl(db->getConnectionFile(), db->statusClusterInterface);
}

View File

@ -23,11 +23,12 @@
#include "flow/flow.h"
#include "fdbclient/Status.h"
#include "fdbclient/DatabaseContext.h"
class StatusClient {
public:
enum StatusLevel { MINIMAL = 0, NORMAL = 1, DETAILED = 2, JSON = 3 };
static Future<StatusObject> statusFetcher(Reference<ClusterConnectionFile> clusterFile);
static Future<StatusObject> statusFetcher(Database db);
};
#endif

View File

@ -191,6 +191,7 @@ struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
virtual TaskPriority getCurrentTask() { return baseNetwork->getCurrentTask(); }
virtual void setCurrentTask(TaskPriority taskID) { baseNetwork->setCurrentTask(taskID); }
virtual double now() { return baseNetwork->now(); }
virtual double timer() { return baseNetwork->timer(); }
virtual void stop() { return baseNetwork->stop(); }
virtual bool isSimulated() const { return baseNetwork->isSimulated(); }
virtual void onMainThread(Promise<Void>&& signal, TaskPriority taskID) { return baseNetwork->onMainThread(std::move(signal), taskID); }

View File

@ -1118,7 +1118,7 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
.detail("Address", endpoint.getPrimaryAddress())
.detail("Token", endpoint.token);
}
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty() && peer->outstandingReplies==0) {
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty() && peer->outstandingReplies==0 && peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY) {
peer->resetPing.trigger();
}
}

View File

@ -129,8 +129,7 @@ public:
std::vector<LocalityEntry> const& getEntries() const
{ return _entryArray; }
std::vector<LocalityEntry> const& getMutableEntries() const
{ return _mutableEntryArray; }
std::vector<LocalityEntry>& getMutableEntries() { return _mutableEntryArray; }
std::vector<LocalityEntry> const& getGroupEntries() const
{ return _localitygroup->_entryArray; }
@ -253,7 +252,7 @@ public:
while (nRandomItems > 0)
{
if (nItemsLeft <= 0) {
if (nRandomItems > nItemsLeft || nItemsLeft <= 0) {
bComplete = false;
break;
}
@ -479,6 +478,8 @@ public:
Reference<StringToIntMap> _keymap;
virtual std::vector<std::vector<AttribValue>> const& getKeyValueArray() const { return _keyValueArray; }
protected:
virtual Reference<StringToIntMap>& getGroupValueMap()
{ return _localitygroup->getGroupValueMap(); }

View File

@ -119,6 +119,8 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross>
explicit PolicyAcross(const PolicyAcross& other) : PolicyAcross(other._count, other._attribKey, other._policy) {}
virtual ~PolicyAcross();
virtual std::string name() const { return "Across"; }
std::string embeddedPolicyName() const { return _policy->name(); }
int getCount() const { return _count; }
virtual std::string info() const { return format("%s^%d x ", _attribKey.c_str(), _count) + _policy->info(); }
virtual int maxResults() const { return _count * _policy->maxResults(); }
virtual int depth() const { return 1 + _policy->depth(); }

View File

@ -82,14 +82,55 @@ double ratePolicy(
return rating;
}
bool findBestPolicySet(
std::vector<LocalityEntry>& bestResults,
Reference<LocalitySet> & localitySet,
Reference<IReplicationPolicy> const& policy,
unsigned int nMinItems,
unsigned int nSelectTests,
unsigned int nPolicyTests)
{
bool findBestPolicySetSimple(PolicyAcross* pa, Reference<LocalitySet> logServerSet, std::vector<LocalityEntry>& bestSet,
int desired) {
auto& mutableEntries = logServerSet->getMutableEntries();
deterministicRandom()->randomShuffle(mutableEntries);
// First make sure the current localitySet is able to fulfuill the policy
std::set<std::string> attributeKeys;
AttribKey indexKey = logServerSet->keyIndex(*attributeKeys.begin());
int uniqueValueCount = logServerSet->getKeyValueArray()[indexKey._id].size();
int targetUniqueValueCount = pa->getCount();
bool found = false;
if (uniqueValueCount < targetUniqueValueCount) {
// logServerSet won't be able to fulfill the policy
found = false;
} else {
// Loop through all servers and, in each loop, try to choose `targetUniqueValueCount`
// servers, each of which has a unique attribute value
std::set<AttribValue> seen;
int upperBound = mutableEntries.size();
int i = 0;
while (bestSet.size() < desired) {
auto& item = mutableEntries[i];
Optional<AttribValue> value = logServerSet->getRecord(item._id)->getValue(indexKey);
if (value.present() && seen.find(value.get()) == seen.end()) {
seen.insert(value.get());
bestSet.push_back(item);
upperBound--;
if (i < upperBound) {
std::swap(mutableEntries[i], mutableEntries[upperBound]);
}
if (seen.size() == targetUniqueValueCount) {
seen.clear();
i = 0;
}
continue;
}
i++;
if (i == upperBound && bestSet.size() < desired) {
seen.clear();
i = 0;
}
}
found = true;
}
return found;
}
bool findBestPolicySetExpensive(std::vector<LocalityEntry>& bestResults, Reference<LocalitySet>& localitySet,
Reference<IReplicationPolicy> const& policy, unsigned int nMinItems,
unsigned int nSelectTests, unsigned int nPolicyTests) {
bool bSucceeded = true;
Reference<LocalitySet> bestLocalitySet, testLocalitySet;
std::vector<LocalityEntry> results;
@ -113,9 +154,7 @@ bool findBestPolicySet(
}
// Get some additional random items, if needed
if ((nMinItems > results.size()) &&
(!localitySet->random(results, results, nMinItems-results.size())))
{
if ((nMinItems > results.size()) && (!localitySet->random(results, results, nMinItems - results.size()))) {
bSucceeded = false;
break;
}
@ -158,6 +197,55 @@ bool findBestPolicySet(
return bSucceeded;
}
bool findBestPolicySet(std::vector<LocalityEntry>& bestResults, Reference<LocalitySet>& localitySet,
Reference<IReplicationPolicy> const& policy, unsigned int nMinItems, unsigned int nSelectTests,
unsigned int nPolicyTests) {
bool bestFound = false;
// Specialization for policies of shape:
// - PolicyOne()
// - PolicyAcross(,"zoneId",PolicyOne())
// - TODO: More specializations for common policies
if (policy->name() == "One") {
bestFound = true;
int count = 0;
auto& mutableEntries = localitySet->getMutableEntries();
deterministicRandom()->randomShuffle(mutableEntries);
for (auto const& entry : mutableEntries) {
bestResults.push_back(entry);
if (++count == nMinItems) break;
}
} else if (policy->name() == "Across") {
PolicyAcross* pa = (PolicyAcross*)policy.getPtr();
std::set<std::string> attributeKeys;
pa->attributeKeys(&attributeKeys);
if (pa->embeddedPolicyName() == "One" && attributeKeys.size() == 1 &&
*attributeKeys.begin() == "zoneId" // This algorithm can actually apply to any field
) {
bestFound = findBestPolicySetSimple(pa, localitySet, bestResults, nMinItems);
if (bestFound && g_network->isSimulated()) {
std::vector<LocalityEntry> oldBest;
auto oldBestFound =
findBestPolicySetExpensive(oldBest, localitySet, policy, nMinItems, nSelectTests, nPolicyTests);
if (!oldBestFound) {
TraceEvent(SevError, "FBPSMissmatch").detail("Policy", policy->info());
} else {
auto set = localitySet->restrict(bestResults);
auto oldSet = localitySet->restrict(oldBest);
ASSERT_WE_THINK(ratePolicy(set, policy, nPolicyTests) <= ratePolicy(oldSet, policy, nPolicyTests));
}
}
} else {
bestFound =
findBestPolicySetExpensive(bestResults, localitySet, policy, nMinItems, nSelectTests, nPolicyTests);
}
} else {
bestFound = findBestPolicySetExpensive(bestResults, localitySet, policy, nMinItems, nSelectTests, nPolicyTests);
}
return bestFound;
}
bool findBestUniquePolicySet(
std::vector<LocalityEntry>& bestResults,
Reference<LocalitySet> & localitySet,

View File

@ -751,6 +751,12 @@ public:
// Everything actually network related is delegated to the Sim2Net class; Sim2 is only concerned with simulating machines and time
virtual double now() { return time; }
// timer() can be up to one second ahead of now()
virtual double timer() {
timerTime += deterministicRandom()->random01()*(time+1.0-timerTime)/2.0;
return timerTime;
}
virtual Future<class Void> delay( double seconds, TaskPriority taskID ) {
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
return delay( seconds, taskID, currentProcess );
@ -1588,7 +1594,7 @@ public:
machines.erase(machineId);
}
Sim2() : time(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(TaskPriority::Zero) {
Sim2() : time(0.0), timerTime(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(TaskPriority::Zero) {
// Not letting currentProcess be NULL eliminates some annoying special cases
currentProcess = new ProcessInfo("NoMachine", LocalityData(Optional<Standalone<StringRef>>(), StringRef(), StringRef(), StringRef()), ProcessClass(), {NetworkAddress()}, this, "", "");
g_network = net2 = newNet2(false, true);
@ -1624,6 +1630,7 @@ public:
else {
mutex.enter();
this->time = t.time;
this->timerTime = std::max(this->timerTime, this->time);
mutex.leave();
this->currentProcess = t.machine;
@ -1676,6 +1683,7 @@ public:
//time is guarded by ISimulator::mutex. It is not necessary to guard reads on the main thread because
//time should only be modified from the main thread.
double time;
double timerTime;
TaskPriority currentTaskID;
//taskCount is guarded by ISimulator::mutex
@ -1718,7 +1726,7 @@ ACTOR void doReboot( ISimulator::ProcessInfo *p, ISimulator::KillType kt ) {
TEST( kt == ISimulator::RebootAndDelete ); // Simulated machine rebooted with data and coordination state deletion
TEST( kt == ISimulator::RebootProcessAndDelete ); // Simulated process rebooted with data and coordination state deletion
if( p->rebooting )
if( p->rebooting || !p->isReliable() )
return;
TraceEvent("RebootingProcess").detail("KillType", kt).detail("Address", p->address).detail("ZoneId", p->locality.zoneId()).detail("DataHall", p->locality.dataHallId()).detail("Locality", p->locality.toString()).detail("Failed", p->failed).detail("Excluded", p->excluded).detail("Cleared", p->cleared).backtrace();
p->rebooting = true;

View File

@ -51,25 +51,23 @@ struct WorkerInfo : NonCopyable {
ReplyPromise<RegisterWorkerReply> reply;
Generation gen;
int reboots;
double lastAvailableTime;
ProcessClass initialClass;
ClusterControllerPriorityInfo priorityInfo;
WorkerDetails details;
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo() : gen(-1), reboots(0), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), lastAvailableTime(now()), initialClass(initialClass), priorityInfo(priorityInfo), details(interf, processClass, degraded) {}
watcher(watcher), reply(reply), gen(gen), reboots(0), initialClass(initialClass), priorityInfo(priorityInfo), details(interf, processClass, degraded) {}
WorkerInfo( WorkerInfo&& r ) BOOST_NOEXCEPT : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), lastAvailableTime(r.lastAvailableTime), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)) {}
reboots(r.reboots), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)) {}
void operator=( WorkerInfo&& r ) BOOST_NOEXCEPT {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
gen = r.gen;
reboots = r.reboots;
lastAvailableTime = r.lastAvailableTime;
initialClass = r.initialClass;
priorityInfo = r.priorityInfo;
details = std::move(r.details);
@ -339,7 +337,8 @@ public:
std::vector<LocalityData> tLocalities;
// Try to find the best team of servers to fulfill the policy
if (findBestPolicySet(bestSet, logServerSet, policy, desired, SERVER_KNOBS->POLICY_RATING_TESTS, SERVER_KNOBS->POLICY_GENERATIONS)) {
if (findBestPolicySet(bestSet, logServerSet, policy, desired, SERVER_KNOBS->POLICY_RATING_TESTS,
SERVER_KNOBS->POLICY_GENERATIONS)) {
results.reserve(results.size() + bestSet.size());
for (auto& entry : bestSet) {
auto object = logServerMap->getObject(entry);
@ -381,8 +380,6 @@ public:
TraceEvent("GetTLogTeamDone").detail("Completed", bCompleted).detail("Policy", policy->info()).detail("Results", results.size()).detail("Processes", logServerSet->size()).detail("Workers", id_worker.size())
.detail("Required", required).detail("Desired", desired).detail("RatingTests",SERVER_KNOBS->POLICY_RATING_TESTS).detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS);
logServerSet->clear();
logServerSet.clear();
return results;
}
@ -395,7 +392,7 @@ public:
if(satelliteFallback || region.satelliteTLogUsableDcsFallback == 0) {
throw no_more_servers();
} else {
if(now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
if(!goodRecruitmentTime.isReady()) {
throw operation_failed();
}
satelliteFallback = true;
@ -641,18 +638,8 @@ public:
result.logRouters.push_back(logRouters[i].interf);
}
if(!remoteStartTime.present()) {
double maxAvailableTime = 0;
for(auto& it : result.remoteTLogs) {
maxAvailableTime = std::max(maxAvailableTime, id_worker[it.locality.processId()].lastAvailableTime);
}
for(auto& it : result.logRouters) {
maxAvailableTime = std::max(maxAvailableTime, id_worker[it.locality.processId()].lastAvailableTime);
}
remoteStartTime = maxAvailableTime;
}
if( now() - remoteStartTime.get() < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY &&
if( !goodRemoteRecruitmentTime.isReady() &&
( ( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredRemoteLogs(), ProcessClass::TLog).betterCount(RoleFitness(remoteLogs, ProcessClass::TLog)) ) ||
( RoleFitness(SERVER_KNOBS->EXPECTED_LOG_ROUTER_FITNESS, req.logRouterCount, ProcessClass::LogRouter).betterCount(RoleFitness(logRouters, ProcessClass::LogRouter)) ) ) ) {
throw operation_failed();
@ -729,7 +716,7 @@ public:
}
}
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
if( !goodRecruitmentTime.isReady() &&
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId), ProcessClass::TLog).betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog)) ) ||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies(), ProcessClass::Proxy).betterCount(RoleFitness(proxies, ProcessClass::Proxy)) ||
@ -766,7 +753,7 @@ public:
}
throw no_more_servers();
} catch( Error& e ) {
if (now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY && regions[1].dcId != clusterControllerDcId.get()) {
if (!goodRemoteRecruitmentTime.isReady() && regions[1].dcId != clusterControllerDcId.get()) {
throw operation_failed();
}
@ -884,7 +871,7 @@ public:
.detail("DesiredProxies", req.configuration.getDesiredProxies()).detail("ActualProxies", result.proxies.size())
.detail("DesiredResolvers", req.configuration.getDesiredResolvers()).detail("ActualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
if( !goodRecruitmentTime.isReady() &&
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies(), ProcessClass::Proxy).betterCount(bestFitness.proxy) ||
RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers(), ProcessClass::Resolver).betterCount(bestFitness.resolver) ) ) {
@ -1243,11 +1230,13 @@ public:
ActorCollection ac;
UpdateWorkerList updateWorkerList;
Future<Void> outstandingRequestChecker;
Future<Void> outstandingRemoteRequestChecker;
DBInfo db;
Database cx;
double startTime;
Optional<double> remoteStartTime;
Future<Void> goodRecruitmentTime;
Future<Void> goodRemoteRecruitmentTime;
Version datacenterVersionDifference;
bool versionDifferenceUpdated;
PromiseStream<Future<Void>> addActor;
@ -1271,8 +1260,9 @@ public:
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
: clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()),
id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), gotProcessClasses(false),
gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0),
id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), gotProcessClasses(false),
gotFullyRecoveredConfig(false), startTime(now()), goodRecruitmentTime(Never()),
goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0),
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false),
clusterControllerMetrics("ClusterController", id.toString()),
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
@ -1320,7 +1310,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
id_used[cluster->clusterControllerProcessId]++;
state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used);
if( ( masterWorker.worker.processClass.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.interf.locality.processId() == cluster->clusterControllerProcessId )
&& now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
&& !cluster->goodRecruitmentTime.isReady() ) {
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.worker.processClass.machineClassFitness( ProcessClass::Master ));
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
continue;
@ -1594,9 +1584,11 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
try {
wait( delay(SERVER_KNOBS->CHECK_OUTSTANDING_INTERVAL) );
while( !self->goodRecruitmentTime.isReady() ) {
wait(self->goodRecruitmentTime);
}
checkOutstandingRecruitmentRequests( self );
checkOutstandingRemoteRecruitmentRequests( self );
checkOutstandingStorageRequests( self );
checkBetterDDOrRK(self);
@ -1606,7 +1598,23 @@ ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().read().master.id());
}
} catch( Error &e ) {
if(e.code() != error_code_operation_failed && e.code() != error_code_no_more_servers) {
if(e.code() != error_code_no_more_servers) {
TraceEvent(SevError, "CheckOutstandingError").error(e);
}
}
return Void();
}
ACTOR Future<Void> doCheckOutstandingRemoteRequests( ClusterControllerData* self ) {
try {
wait( delay(SERVER_KNOBS->CHECK_OUTSTANDING_INTERVAL) );
while( !self->goodRemoteRecruitmentTime.isReady() ) {
wait(self->goodRemoteRecruitmentTime);
}
checkOutstandingRemoteRecruitmentRequests( self );
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
TraceEvent(SevError, "CheckOutstandingError").error(e);
}
}
@ -1614,10 +1622,13 @@ ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
}
void checkOutstandingRequests( ClusterControllerData* self ) {
if( !self->outstandingRequestChecker.isReady() )
return;
if( self->outstandingRemoteRequestChecker.isReady() ) {
self->outstandingRemoteRequestChecker = doCheckOutstandingRemoteRequests(self);
}
self->outstandingRequestChecker = doCheckOutstandingRequests(self);
if( self->outstandingRequestChecker.isReady() ) {
self->outstandingRequestChecker = doCheckOutstandingRequests(self);
}
}
ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Standalone<StringRef>> processID ) {
@ -1625,7 +1636,6 @@ ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Stan
auto watcher = cluster->id_worker.find(processID);
ASSERT(watcher != cluster->id_worker.end());
watcher->second.lastAvailableTime = now();
watcher->second.reboots++;
wait( delay( g_network->isSimulated() ? SERVER_KNOBS->SIM_SHUTDOWN_TIMEOUT : SERVER_KNOBS->SHUTDOWN_TIMEOUT ) );
}
@ -1867,7 +1877,7 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
req.reply.send( rep );
return Void();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers && now() - self->startTime >= SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
if (e.code() == error_code_no_more_servers && self->goodRecruitmentTime.isReady()) {
self->outstandingRecruitmentRequests.push_back( req );
TraceEvent(SevWarn, "RecruitFromConfigurationNotAvailable", self->id).error(e);
return Void();
@ -1879,7 +1889,7 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
throw; // goodbye, cluster controller
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
@ -1895,7 +1905,7 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
req.reply.send( rep );
return Void();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers && self->remoteStartTime.present() && now() - self->remoteStartTime.get() >= SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) {
if (e.code() == error_code_no_more_servers && self->goodRemoteRecruitmentTime.isReady()) {
self->outstandingRemoteRecruitmentRequests.push_back( req );
TraceEvent(SevWarn, "RecruitRemoteFromConfigurationNotAvailable", self->id).error(e);
return Void();
@ -1907,7 +1917,7 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
throw; // goodbye, cluster controller
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
@ -2010,6 +2020,8 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
if(info == self->id_worker.end()) {
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerId",w.id()).detail("ProcessId", w.locality.processId()).detail("ZoneId", w.locality.zoneId()).detail("DataHall", w.locality.dataHallId()).detail("PClass", req.processClass.toString()).detail("Workers", self->id_worker.size());
self->goodRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY);
self->goodRemoteRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY);
} else {
TraceEvent("ClusterControllerWorkerAlreadyRegistered", self->id).suppressFor(1.0).detail("WorkerId",w.id()).detail("ProcessId", w.locality.processId()).detail("ZoneId", w.locality.zoneId()).detail("DataHall", w.locality.dataHallId()).detail("PClass", req.processClass.toString()).detail("Workers", self->id_worker.size());
}
@ -2674,7 +2686,7 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
throw;
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
@ -2748,7 +2760,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
throw;
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}

View File

@ -401,7 +401,7 @@ struct LeaderRegisterCollection {
if( !self->pStore->exists() )
return Void();
OnDemandStore &store = *self->pStore;
Standalone<VectorRef<KeyValueRef>> forwardingInfo = wait( store->readRange( fwdKeys ) );
Standalone<RangeResultRef> forwardingInfo = wait( store->readRange( fwdKeys ) );
for( int i = 0; i < forwardingInfo.size(); i++ ) {
LeaderInfo forwardInfo;
forwardInfo.forward = true;

View File

@ -293,8 +293,8 @@ public:
return minRatio;
}
virtual bool hasHealthyAvailableSpace() {
return getMinAvailableSpaceRatio() > SERVER_KNOBS->MIN_FREE_SPACE_RATIO && getMinAvailableSpace() > SERVER_KNOBS->MIN_FREE_SPACE;
virtual bool hasHealthyAvailableSpace(double minRatio, int64_t minAvailableSpace) {
return (minRatio == 0 || getMinAvailableSpaceRatio() > minRatio) && (minAvailableSpace == std::numeric_limits<int64_t>::min() || getMinAvailableSpace() > minAvailableSpace);
}
virtual Future<Void> updateStorageMetrics() {
@ -758,6 +758,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
std::vector<Reference<IDataDistributionTeam>> randomTeams;
const std::set<UID> completeSources(req.completeSources.begin(), req.completeSources.end());
// Note: this block does not apply any filters from the request
if( !req.wantsNewServers ) {
for( int i = 0; i < req.completeSources.size(); i++ ) {
if( !self->server_info.count( req.completeSources[i] ) ) {
@ -773,10 +774,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
break;
}
}
if(found && teamList[j]->isHealthy() &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(teamList[j]->getServerIDs(), self->primary)).size() > 0) &&
teamList[j]->getMinAvailableSpaceRatio() >= req.minAvailableSpaceRatio)
{
if(found && teamList[j]->isHealthy()) {
req.reply.send( teamList[j] );
return Void();
}
@ -788,9 +786,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
ASSERT( !bestOption.present() );
for( int i = 0; i < self->teams.size(); i++ ) {
if (self->teams[i]->isHealthy() &&
(!req.preferLowerUtilization || self->teams[i]->hasHealthyAvailableSpace()) &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(self->teams[i]->getServerIDs(), self->primary)) .size() > 0) &&
self->teams[i]->getMinAvailableSpaceRatio() >= req.minAvailableSpaceRatio)
self->teams[i]->hasHealthyAvailableSpace(req.minAvailableSpaceRatio, req.preferLowerUtilization ? SERVER_KNOBS->MIN_FREE_SPACE : std::numeric_limits<int64_t>::min()) &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(self->teams[i]->getServerIDs(), self->primary)).size() > 0))
{
int64_t loadBytes = self->teams[i]->getLoadBytes(true, req.inflightPenalty);
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
@ -806,9 +803,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<IDataDistributionTeam> dest = deterministicRandom()->randomChoice(self->teams);
bool ok = dest->isHealthy() &&
(!req.preferLowerUtilization || dest->hasHealthyAvailableSpace()) &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(dest->getServerIDs(), self->primary)).size() > 0) &&
dest->getMinAvailableSpaceRatio() >= req.minAvailableSpaceRatio;
dest->hasHealthyAvailableSpace(req.minAvailableSpaceRatio, req.preferLowerUtilization ? SERVER_KNOBS->MIN_FREE_SPACE : std::numeric_limits<int64_t>::min()) &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(dest->getServerIDs(), self->primary)).size() > 0);
for(int i=0; ok && i<randomTeams.size(); i++) {
if (randomTeams[i]->getServerIDs() == dest->getServerIDs()) {
@ -834,6 +830,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Note: req.completeSources can be empty and all servers (and server teams) can be unhealthy.
// We will get stuck at this! This only happens when a DC fails. No need to consider it right now.
// Note: this block does not apply any filters from the request
if(!bestOption.present() && self->zeroHealthyTeams->get()) {
//Attempt to find the unhealthy source server team and return it
for( int i = 0; i < req.completeSources.size(); i++ ) {
@ -844,11 +841,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
for( int j = 0; j < teamList.size(); j++ ) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
if((req.teamMustHaveShards && self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(serverIDs, self->primary)).size() == 0) ||
teamList[j]->getMinAvailableSpaceRatio() < req.minAvailableSpaceRatio)
{
continue;
}
for( int k = 0; k < teamList[j]->size(); k++ ) {
if( !completeSources.count( serverIDs[k] ) ) {
found = false;

View File

@ -47,7 +47,7 @@ struct IDataDistributionTeam {
virtual int64_t getLoadBytes( bool includeInFlight = true, double inflightPenalty = 1.0 ) = 0;
virtual int64_t getMinAvailableSpace( bool includeInFlight = true ) = 0;
virtual double getMinAvailableSpaceRatio( bool includeInFlight = true ) = 0;
virtual bool hasHealthyAvailableSpace() = 0;
virtual bool hasHealthyAvailableSpace( double minRatio, int64_t minAvailableSpace ) = 0;
virtual Future<Void> updateStorageMetrics() = 0;
virtual void addref() = 0;
virtual void delref() = 0;

View File

@ -186,9 +186,9 @@ public:
return result;
}
virtual bool hasHealthyAvailableSpace() {
return all([](Reference<IDataDistributionTeam> team) {
return team->hasHealthyAvailableSpace();
virtual bool hasHealthyAvailableSpace(double minRatio, int64_t minAvailableSpace) {
return all([minRatio, minAvailableSpace](Reference<IDataDistributionTeam> team) {
return team->hasHealthyAvailableSpace(minRatio, minAvailableSpace);
});
}
@ -929,7 +929,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, false, 0.0, inflightPenalty);
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, false, SERVER_KNOBS->MIN_FREE_SPACE_RATIO, inflightPenalty);
req.completeSources = rd.completeSources;
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
// If a DC has no healthy team, we stop checking the other DCs until

View File

@ -49,7 +49,7 @@ public:
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) = 0;
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) = 0;
//Returns the amount of free and total space for this store, in bytes
virtual StorageBytes getStorageBytes() = 0;

View File

@ -77,12 +77,12 @@ struct KeyValueStoreCompressTestData : IKeyValueStore {
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
return doReadRange(store, keys, rowLimit, byteLimit);
}
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> doReadRange( IKeyValueStore* store, KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<VectorRef<KeyValueRef>> _vs = wait( store->readRange(keys, rowLimit, byteLimit) );
Standalone<VectorRef<KeyValueRef>> vs = _vs; // Get rid of implicit const& from wait statement
ACTOR Future<Standalone<RangeResultRef>> doReadRange( IKeyValueStore* store, KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<RangeResultRef> _vs = wait( store->readRange(keys, rowLimit, byteLimit) );
Standalone<RangeResultRef> vs = _vs; // Get rid of implicit const& from wait statement
Arena& a = vs.arena();
for(int i=0; i<vs.size(); i++)
vs[i].value = ValueRef( a, (ValueRef const&)unpack(vs[i].value) );

View File

@ -216,14 +216,18 @@ public:
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
if(recovering.isError()) throw recovering.getError();
if (!recovering.isReady()) return waitAndReadRange(this, keys, rowLimit, byteLimit);
Standalone<VectorRef<KeyValueRef>> result;
if (rowLimit >= 0) {
Standalone<RangeResultRef> result;
if (rowLimit == 0) {
return result;
}
if (rowLimit > 0) {
auto it = data.lower_bound(keys.begin);
while (it!=data.end() && it->key < keys.end && rowLimit && byteLimit>=0) {
while (it!=data.end() && it->key < keys.end && rowLimit && byteLimit>0) {
byteLimit -= sizeof(KeyValueRef) + it->key.size() + it->value.size();
result.push_back_deep( result.arena(), KeyValueRef(it->key, it->value) );
++it;
@ -232,13 +236,19 @@ public:
} else {
rowLimit = -rowLimit;
auto it = data.previous( data.lower_bound(keys.end) );
while (it!=data.end() && it->key >= keys.begin && rowLimit && byteLimit>=0) {
while (it!=data.end() && it->key >= keys.begin && rowLimit && byteLimit>0) {
byteLimit -= sizeof(KeyValueRef) + it->key.size() + it->value.size();
result.push_back_deep( result.arena(), KeyValueRef(it->key, it->value) );
it = data.previous(it);
--rowLimit;
}
}
result.more = rowLimit == 0 || byteLimit <= 0;
if(result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size()-1].key;
}
return result;
}
@ -694,7 +704,7 @@ private:
wait( self->recovering );
return self->readValuePrefix(key, maxLength).get();
}
ACTOR static Future<Standalone<VectorRef<KeyValueRef>>> waitAndReadRange( KeyValueStoreMemory* self, KeyRange keys, int rowLimit, int byteLimit ) {
ACTOR static Future<Standalone<RangeResultRef>> waitAndReadRange( KeyValueStoreMemory* self, KeyRange keys, int rowLimit, int byteLimit ) {
wait( self->recovering );
return self->readRange(keys, rowLimit, byteLimit).get();
}

View File

@ -1076,21 +1076,26 @@ struct RawCursor {
}
return Optional<Value>();
}
Standalone<VectorRef<KeyValueRef>> getRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<VectorRef<KeyValueRef>> result;
Standalone<RangeResultRef> getRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<RangeResultRef> result;
int accumulatedBytes = 0;
ASSERT( byteLimit > 0 );
if(rowLimit == 0) {
return result;
}
if(db.fragment_values) {
if(rowLimit >= 0) {
if(rowLimit > 0) {
int r = moveTo(keys.begin);
if (r < 0)
moveNext();
DefragmentingReader i(*this, result.arena(), true);
Optional<KeyRef> nextKey = i.peek();
while(nextKey.present() && nextKey.get() < keys.end && rowLimit-- && accumulatedBytes < byteLimit) {
while(nextKey.present() && nextKey.get() < keys.end && rowLimit != 0 && accumulatedBytes < byteLimit) {
Optional<KeyValueRef> kv = i.getNext();
result.push_back(result.arena(), kv.get());
--rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.get().expectedSize();
nextKey = i.peek();
}
@ -1101,37 +1106,45 @@ struct RawCursor {
movePrevious();
DefragmentingReader i(*this, result.arena(), false);
Optional<KeyRef> nextKey = i.peek();
while(nextKey.present() && nextKey.get() >= keys.begin && rowLimit++ && accumulatedBytes < byteLimit) {
while(nextKey.present() && nextKey.get() >= keys.begin && rowLimit != 0 && accumulatedBytes < byteLimit) {
Optional<KeyValueRef> kv = i.getNext();
result.push_back(result.arena(), kv.get());
++rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.get().expectedSize();
nextKey = i.peek();
}
}
}
else {
if (rowLimit >= 0) {
if (rowLimit > 0) {
int r = moveTo( keys.begin );
if (r < 0) moveNext();
while (this->valid && rowLimit-- && accumulatedBytes < byteLimit) {
while (this->valid && rowLimit != 0 && accumulatedBytes < byteLimit) {
KeyValueRef kv = decodeKV( getEncodedRow( result.arena() ) );
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
if (kv.key >= keys.end) break;
--rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back( result.arena(), kv );
moveNext();
}
} else {
int r = moveTo( keys.end );
if (r >= 0) movePrevious();
while (this->valid && rowLimit++ && accumulatedBytes < byteLimit) {
while (this->valid && rowLimit != 0 && accumulatedBytes < byteLimit) {
KeyValueRef kv = decodeKV( getEncodedRow( result.arena() ) );
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
if (kv.key < keys.begin) break;
++rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back( result.arena(), kv );
movePrevious();
}
}
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if(result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size()-1].key;
}
return result;
}
@ -1451,7 +1464,7 @@ public:
virtual Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID );
virtual Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID );
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 );
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 );
KeyValueStoreSQLite(std::string const& filename, UID logID, KeyValueStoreType type, bool checkChecksums, bool checkIntegrity);
~KeyValueStoreSQLite();
@ -1550,7 +1563,7 @@ private:
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
KeyRange keys;
int rowLimit, byteLimit;
ThreadReturnPromise<Standalone<VectorRef<KeyValueRef>>> result;
ThreadReturnPromise<Standalone<RangeResultRef>> result;
ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit) : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {}
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
@ -2000,7 +2013,7 @@ Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix( KeyRef key, int ma
readThreads->post(p);
return f;
}
Future<Standalone<VectorRef<KeyValueRef>>> KeyValueStoreSQLite::readRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
Future<Standalone<RangeResultRef>> KeyValueStoreSQLite::readRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
++readsRequested;
auto p = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto f = p->result.getFuture();

View File

@ -79,7 +79,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_MAX_TRUNCATE_BYTES, 2<<30 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_BYTES = 0;
init( TLOG_DEGRADED_DELAY_COUNT, 5 );
init( TLOG_DEGRADED_DURATION, 5.0 );
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
init( TXS_POPPED_MAX_DELAY, 1.0 ); if ( randomize && BUGGIFY ) TXS_POPPED_MAX_DELAY = deterministicRandom()->random01();
@ -166,7 +165,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
If this value is too small relative to SHARD_MIN_BYTES_PER_KSEC immediate merging work will be generated.
*/
init( STORAGE_METRIC_TIMEOUT, 600.0 ); if( randomize && BUGGIFY ) STORAGE_METRIC_TIMEOUT = deterministicRandom()->coinflip() ? 10.0 : 60.0;
init( STORAGE_METRIC_TIMEOUT, isSimulated ? 60.0 : 600.0 ); if( randomize && BUGGIFY ) STORAGE_METRIC_TIMEOUT = deterministicRandom()->coinflip() ? 10.0 : 30.0;
init( METRIC_DELAY, 0.1 ); if( randomize && BUGGIFY ) METRIC_DELAY = 1.0;
init( ALL_DATA_REMOVED_DELAY, 1.0 );
init( INITIAL_FAILURE_REACTION_DELAY, 30.0 ); if( randomize && BUGGIFY ) INITIAL_FAILURE_REACTION_DELAY = 0.0;
@ -181,8 +180,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DATA_DISTRIBUTION_LOGGING_INTERVAL, 5.0 );
init( DD_ENABLED_CHECK_DELAY, 1.0 );
init( DD_STALL_CHECK_DELAY, 0.4 ); //Must be larger than 2*MAX_BUGGIFIED_DELAY
init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 90.0 : 240.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0; //Because of delayJitter, this should be less than 0.9 * DD_MERGE_COALESCE_DELAY
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 120.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 15.0 : 240.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0; //Because of delayJitter, this should be less than 0.9 * DD_MERGE_COALESCE_DELAY
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 30.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
init( STORAGE_METRICS_POLLING_DELAY, 2.0 ); if( randomize && BUGGIFY ) STORAGE_METRICS_POLLING_DELAY = 15.0;
init( STORAGE_METRICS_RANDOM_DELAY, 0.2 );
init( FREE_SPACE_RATIO_CUTOFF, 0.1 );
@ -319,6 +318,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( REQUIRED_MIN_RECOVERY_DURATION, 0.080 ); if( shortRecoveryDuration ) REQUIRED_MIN_RECOVERY_DURATION = 0.01;
init( ALWAYS_CAUSAL_READ_RISKY, false );
init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1;
init( MIN_PROXY_COMPUTE, 0.001 );
init( PROXY_COMPUTE_BUCKETS, 5000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
// Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
@ -476,7 +478,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( BEHIND_CHECK_DELAY, 2.0 );
init( BEHIND_CHECK_COUNT, 2 );
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, 0.1 );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -82,7 +82,6 @@ public:
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
int DISK_QUEUE_MAX_TRUNCATE_BYTES; // A truncate larger than this will cause the file to be replaced instead.
int TLOG_DEGRADED_DELAY_COUNT;
double TLOG_DEGRADED_DURATION;
double TXS_POPPED_MAX_DELAY;
@ -264,6 +263,9 @@ public:
double REQUIRED_MIN_RECOVERY_DURATION;
bool ALWAYS_CAUSAL_READ_RISKY;
int MAX_COMMIT_UPDATES;
double MIN_PROXY_COMPUTE;
int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE;
// Master Server
double COMMIT_SLEEP_TIME;

View File

@ -1054,7 +1054,12 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
loop {
wait( allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID) );
minVersion = self->end;
for(auto cursor : self->cursors) {
for(int i = 0; i < self->cursors.size(); i++) {
auto cursor = self->cursors[i];
while(cursor->hasMessage()) {
self->cursorMessages[i].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef<Tag>() : cursor->getTags(), cursor->version()));
cursor->nextMessage();
}
minVersion = std::min(minVersion, cursor->version().version);
}
if(minVersion > self->messageVersion.version) {

View File

@ -237,6 +237,8 @@ struct ProxyCommitData {
int updateCommitRequests = 0;
NotifiedDouble lastCommitTime;
vector<double> commitComputePerOperation;
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
//We do not repopulate them immediately to avoid a slow task.
@ -293,7 +295,9 @@ struct ProxyCommitData {
localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
firstProxy(firstProxy), cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0), lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0)
{}
{
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS,0.0);
}
};
struct ResolutionRequestBuilder {
@ -475,6 +479,13 @@ bool isWhitelisted(const vector<Standalone<StringRef>>& binPathVec, StringRef bi
return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end();
}
ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> releaseDelay, int64_t localBatchNumber) {
wait(releaseDelay);
ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1);
self->latestLocalCommitBatchResolving.set(localBatchNumber);
return Void();
}
ACTOR Future<Void> commitBatch(
ProxyCommitData* self,
vector<CommitTransactionRequest> trs,
@ -486,6 +497,14 @@ ACTOR Future<Void> commitBatch(
state Optional<UID> debugID;
state bool forceRecovery = false;
state BinaryWriter valueWriter(Unversioned());
state int batchOperations = 0;
int64_t batchBytes = 0;
for (int t = 0; t<trs.size(); t++) {
batchOperations += trs[t].transaction.mutations.size();
batchBytes += trs[t].transaction.mutations.expectedSize();
}
state int latencyBucket = batchOperations == 0 ? 0 : std::min<int>(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS-1,SERVER_KNOBS->PROXY_COMPUTE_BUCKETS*batchBytes/(batchOperations*(CLIENT_KNOBS->VALUE_SIZE_LIMIT+CLIENT_KNOBS->KEY_SIZE_LIMIT)));
ASSERT(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT); // since we are using just the former to limit the number of versions actually in flight!
@ -515,7 +534,7 @@ ACTOR Future<Void> commitBatch(
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
wait(yield(TaskPriority::ProxyCommitYield1));
state Future<Void> releaseDelay = delay(batchOperations*self->commitComputePerOperation[latencyBucket], TaskPriority::ProxyMasterVersionReply);
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
@ -566,9 +585,7 @@ ACTOR Future<Void> commitBatch(
}
state vector<vector<int>> transactionResolverMap = std::move( requests.transactionResolverMap );
ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1);
self->latestLocalCommitBatchResolving.set(localBatchNumber);
state Future<Void> releaseFuture = releaseResolvingAfter(self, releaseDelay, localBatchNumber);
/////// Phase 2: Resolution (waiting on the network; pipelined)
state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );
@ -579,8 +596,10 @@ ACTOR Future<Void> commitBatch(
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be)
TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber-1); // Queuing post-resolution commit processing
wait(self->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber-1));
wait(yield(TaskPriority::ProxyCommitYield2));
wait(yield(TaskPriority::ProxyCommitYield1));
state double computeStart = g_network->timer();
state double computeDuration = 0;
self->stats.txnCommitResolved += trs.size();
if (debugID.present())
@ -738,7 +757,11 @@ ACTOR Future<Void> commitBatch(
for (; mutationNum < pMutations->size(); mutationNum++) {
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield(TaskPriority::ProxyCommitYield2));
if(g_network->check_yield(TaskPriority::ProxyCommitYield1)) {
computeDuration += g_network->timer() - computeStart;
wait(delay(0, TaskPriority::ProxyCommitYield1));
computeStart = g_network->timer();
}
}
auto& m = (*pMutations)[mutationNum];
@ -844,7 +867,11 @@ ACTOR Future<Void> commitBatch(
while(blobIter) {
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield(TaskPriority::ProxyCommitYield2));
if(g_network->check_yield(TaskPriority::ProxyCommitYield1)) {
computeDuration += g_network->timer() - computeStart;
wait(delay(0, TaskPriority::ProxyCommitYield1));
computeStart = g_network->timer();
}
}
valueWriter.serializeBytes(blobIter->data);
yieldBytes += blobIter->data.size();
@ -906,29 +933,33 @@ ACTOR Future<Void> commitBatch(
// Storage servers mustn't make durable versions which are not fully committed (because then they are impossible to roll back)
// We prevent this by limiting the number of versions which are semi-committed but not fully committed to be less than the MVCC window
while (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
// This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
choose{
when(wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
wait(yield());
break;
}
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(GetReadVersionRequest(0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE | GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if(v.version > self->committedVersion.get()) {
self->locked = v.locked;
self->metadataVersion = v.metadataVersion;
self->committedVersion.set(v.version);
if(self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
computeDuration += g_network->timer() - computeStart;
while (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
// This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
choose{
when(wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
wait(yield());
break;
}
when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(GetReadVersionRequest(0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE | GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
if(v.version > self->committedVersion.get()) {
self->locked = v.locked;
self->metadataVersion = v.metadataVersion;
self->committedVersion.set(v.version);
}
if (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)
wait(delay(SERVER_KNOBS->PROXY_SPIN_DELAY));
}
if (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)
wait(delay(SERVER_KNOBS->PROXY_SPIN_DELAY));
}
}
computeStart = g_network->timer();
}
state LogSystemDiskQueueAdapter::CommitMessage msg = wait(storeCommits.back().first); // Should just be doing yields
state LogSystemDiskQueueAdapter::CommitMessage msg = storeCommits.back().first.get();
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterStoreCommits");
@ -961,6 +992,16 @@ ACTOR Future<Void> commitBatch(
self->latestLocalCommitBatchLogging.set(localBatchNumber);
}
computeDuration += g_network->timer() - computeStart;
if(computeDuration > SERVER_KNOBS->MIN_PROXY_COMPUTE && batchOperations > 0) {
double computePerOperation = computeDuration/batchOperations;
if(computePerOperation <= self->commitComputePerOperation[latencyBucket] || self->commitComputePerOperation[latencyBucket] == 0.0) {
self->commitComputePerOperation[latencyBucket] = computePerOperation;
} else {
self->commitComputePerOperation[latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE*computePerOperation + ((1.0-SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE)*self->commitComputePerOperation[latencyBucket]);
}
}
/////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above))
try {
@ -978,7 +1019,7 @@ ACTOR Future<Void> commitBatch(
}
self->lastCommitLatency = now()-commitStartTime;
self->lastCommitTime = std::max(self->lastCommitTime.get(), commitStartTime);
wait(yield(TaskPriority::ProxyCommitYield3));
wait(yield(TaskPriority::ProxyCommitYield2));
if( self->popRemoteTxs && msg.popTo > ( self->txsPopVersions.size() ? self->txsPopVersions.back().second : self->lastTxsPop ) ) {
if(self->txsPopVersions.size() >= SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) {
@ -1017,7 +1058,7 @@ ACTOR Future<Void> commitBatch(
}
// Send replies to clients
double endTime = timer();
double endTime = g_network->timer();
for (int t = 0; t < trs.size(); t++) {
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
ASSERT_WE_THINK(commitVersion != invalidVersion);
@ -1068,6 +1109,7 @@ ACTOR Future<Void> commitBatch(
self->commitBatchesMemBytesCount -= currentBatchMemBytesCount;
ASSERT_ABORT(self->commitBatchesMemBytesCount >= 0);
wait(releaseFuture);
return Void();
}
@ -1149,7 +1191,7 @@ struct TransactionRateInfo {
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
GetReadVersionReply reply = wait(replyFuture);
double end = timer();
double end = g_network->timer();
for(GetReadVersionRequest const& request : requests) {
if(request.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime());
@ -1345,7 +1387,7 @@ ACTOR static Future<Void> rejoinServer( MasterProxyInterface proxy, ProxyCommitD
GetStorageServerRejoinInfoReply rep;
rep.version = commitData->version;
rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() );
Standalone<VectorRef<KeyValueRef>> history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get();
Standalone<RangeResultRef> history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get();
for(int i = history.size()-1; i >= 0; i-- ) {
rep.history.push_back(std::make_pair(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value)));
}
@ -1696,7 +1738,7 @@ ACTOR Future<Void> masterProxyServerCore(
state KeyRange txnKeys = allKeys;
loop {
wait(yield());
Standalone<VectorRef<KeyValueRef>> data = commitData.txnStateStore->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES).get();
Standalone<RangeResultRef> data = commitData.txnStateStore->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES).get();
if(!data.size()) break;
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );

View File

@ -270,6 +270,7 @@ namespace oldTLog_4_6 {
std::map<UID, Reference<struct LogData>> id_data;
UID dbgid;
UID workerID;
IKeyValueStore* persistentData;
IDiskQueue* rawPersistentQueue;
@ -303,8 +304,8 @@ namespace oldTLog_4_6 {
PromiseStream<Future<Void>> sharedActors;
bool terminated;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false),
@ -412,7 +413,7 @@ namespace oldTLog_4_6 {
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void())
{
startRole(Role::TRANSACTION_LOG,interf.id(), UID());
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, "Restored");
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
@ -954,7 +955,7 @@ namespace oldTLog_4_6 {
peekMessagesFromMemory( logData, req, messages2, endVersion );
Standalone<VectorRef<KeyValueRef>> kvs = wait(
Standalone<RangeResultRef> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessagesKey(logData->logId, oldTag, req.begin),
persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -1120,7 +1121,7 @@ namespace oldTLog_4_6 {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
TLogRejoinRequest req;
req.myInterface = tli;
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
choose {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
@ -1268,8 +1269,8 @@ namespace oldTLog_4_6 {
IKeyValueStore *storage = self->persistentData;
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<RangeResultRef>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<RangeResultRef>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
// FIXME: metadata in queue?
@ -1282,7 +1283,7 @@ namespace oldTLog_4_6 {
}
if (!fFormat.get().present()) {
Standalone<VectorRef<KeyValueRef>> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
Standalone<RangeResultRef> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
if (!v.size()) {
TEST(true); // The DB is completely empty, so it was never initialized. Delete it.
throw worker_removed();
@ -1335,7 +1336,7 @@ namespace oldTLog_4_6 {
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
loop {
if(logData->removed.isReady()) break;
Standalone<VectorRef<KeyValueRef>> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
Standalone<RangeResultRef> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
if (!data.size()) break;
((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end );
@ -1421,9 +1422,9 @@ namespace oldTLog_4_6 {
return Void();
}
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId )
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId, UID workerID )
{
state TLogData self( tlogId, persistentData, persistentQueue, db );
state TLogData self( tlogId, workerID, persistentData, persistentQueue, db );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
TraceEvent("SharedTlog", tlogId);

View File

@ -245,6 +245,7 @@ struct TLogData : NonCopyable {
std::map<UID, Reference<struct LogData>> id_data;
UID dbgid;
UID workerID;
IKeyValueStore* persistentData;
IDiskQueue* rawPersistentQueue;
@ -286,8 +287,8 @@ struct TLogData : NonCopyable {
Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
@ -439,14 +440,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool execOpCommitInProgress;
int txsTags;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, std::vector<Tag> tags, std::string context)
: tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
{
startRole(Role::TRANSACTION_LOG, interf.id(), UID());
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
@ -1156,7 +1158,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
peekMessagesFromMemory( logData, req, messages2, endVersion );
}
Standalone<VectorRef<KeyValueRef>> kvs = wait(
Standalone<RangeResultRef> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -1220,12 +1222,8 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
return Void();
}
//This delay is divided into multiple delays to avoid marking the tlog as degraded because of a single SlowTask
state int loopCount = 0;
while(loopCount < SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT) {
wait(delay(SERVER_KNOBS->TLOG_DEGRADED_DURATION/SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT, TaskPriority::Low));
loopCount++;
}
wait(lowPriorityDelay(SERVER_KNOBS->TLOG_DEGRADED_DURATION));
TraceEvent(SevWarnAlways, "TLogDegraded", self->dbgid);
TEST(true); //6.0 TLog degraded
self->degraded->set(true);
@ -1478,7 +1476,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
if ( self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
choose {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
@ -1927,12 +1925,12 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
state IKeyValueStore *storage = self->persistentData;
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<RangeResultRef>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<RangeResultRef>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<RangeResultRef>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<RangeResultRef>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<RangeResultRef>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<RangeResultRef>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
// FIXME: metadata in queue?
@ -1951,7 +1949,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
}
if (!fFormat.get().present()) {
Standalone<VectorRef<KeyValueRef>> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
Standalone<RangeResultRef> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
if (!v.size()) {
TEST(true); // The DB is completely empty, so it was never initialized. Delete it.
throw worker_removed();
@ -1973,7 +1971,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
tlogRequests.getFuture().pop().reply.sendError(recruitment_failed());
}
wait( oldTLog_4_6::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid) );
wait( oldTLog_4_6::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid, self->workerID) );
throw internal_error();
}
@ -2019,7 +2017,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
DUMPTOKEN( recruited.confirmRunning );
//We do not need the remoteTag, because we will not be loading any additional data
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector<Tag>()) );
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector<Tag>(), "Restored") );
logData->locality = id_locality[id1];
logData->stopped = true;
self->id_data[id1] = logData;
@ -2041,7 +2039,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
loop {
if(logData->removed.isReady()) break;
Standalone<VectorRef<KeyValueRef>> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
Standalone<RangeResultRef> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
if (!data.size()) break;
((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end );
@ -2202,7 +2200,8 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
it.second->stopCommit.trigger();
}
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, req.allTags) );
bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned);
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, req.allTags, recovering ? "Recovered" : "Recruited") );
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
@ -2218,7 +2217,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
throw logData->removed.getError();
}
if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) {
if (recovering) {
logData->unrecoveredBefore = req.startVersion;
logData->recoveredAt = req.recoverAt;
logData->knownCommittedVersion = req.startVersion - 1;
@ -2324,13 +2323,11 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
state TLogData self( tlogId, workerID, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
TraceEvent("SharedTlog", tlogId);
// FIXME: Pass the worker id instead of stubbing it
startRole(Role::SHARED_TRANSACTION_LOG, tlogId, UID());
try {
if(restoreFromDisk) {
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
@ -2371,7 +2368,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
} catch (Error& e) {
self.terminated.send(Void());
TraceEvent("TLogError", tlogId).error(e, true);
endRole(Role::SHARED_TRANSACTION_LOG, tlogId, "Error", true);
if(recovered.canBeSet()) recovered.send(Void());
while(!tlogRequests.isEmpty()) {

View File

@ -295,6 +295,7 @@ struct TLogData : NonCopyable {
std::map<UID, Reference<struct LogData>> id_data;
UID dbgid;
UID workerID;
IKeyValueStore* persistentData;
IDiskQueue* rawPersistentQueue;
@ -337,8 +338,8 @@ struct TLogData : NonCopyable {
Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
@ -499,15 +500,16 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool execOpCommitInProgress;
int txsTags;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
minPoppedTagVersion(0), minPoppedTag(invalidTag),
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags, std::string context)
: tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
minPoppedTagVersion(0), minPoppedTag(invalidTag),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
{
startRole(Role::TRANSACTION_LOG, interf.id(), UID());
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
@ -683,7 +685,7 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
if (data->popped <= logData->persistentDataVersion) {
// Recover the next needed location in the Disk Queue from the index.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
Standalone<RangeResultRef> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, data->tag, data->popped),
persistTagMessageRefsKey(logData->logId, data->tag, logData->persistentDataVersion + 1)), 1));
@ -1461,7 +1463,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
if (req.tag.locality == tagLocalityTxs || req.tag == txsTag) {
Standalone<VectorRef<KeyValueRef>> kvs = wait(
Standalone<RangeResultRef> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -1480,7 +1482,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
Standalone<RangeResultRef> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
@ -1611,12 +1613,8 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
return Void();
}
//This delay is divided into multiple delays to avoid marking the tlog as degraded because of a single SlowTask
state int loopCount = 0;
while(loopCount < SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT) {
wait(delay(SERVER_KNOBS->TLOG_DEGRADED_DURATION/SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT, TaskPriority::Low));
loopCount++;
}
wait(lowPriorityDelay(SERVER_KNOBS->TLOG_DEGRADED_DURATION));
TraceEvent(SevWarnAlways, "TLogDegraded", self->dbgid);
TEST(true); //TLog degraded
self->degraded->set(true);
@ -1870,7 +1868,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
if ( self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
choose {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
@ -2333,13 +2331,13 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
state Future<Standalone<RangeResultRef>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<RangeResultRef>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<RangeResultRef>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<RangeResultRef>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<RangeResultRef>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<RangeResultRef>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<RangeResultRef>> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
// FIXME: metadata in queue?
@ -2358,7 +2356,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
}
if (!fFormat.get().present()) {
Standalone<VectorRef<KeyValueRef>> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
Standalone<RangeResultRef> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
if (!v.size()) {
TEST(true); // The DB is completely empty, so it was never initialized. Delete it.
throw worker_removed();
@ -2424,7 +2422,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
ProtocolVersion protocolVersion = BinaryReader::fromStringRef<ProtocolVersion>( fProtocolVersions.get()[idx].value, Unversioned() );
//We do not need the remoteTag, because we will not be loading any additional data
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, std::vector<Tag>()) );
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), protocolVersion, std::vector<Tag>(), "Restored") );
logData->locality = id_locality[id1];
logData->stopped = true;
self->id_data[id1] = logData;
@ -2446,7 +2444,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
loop {
if(logData->removed.isReady()) break;
Standalone<VectorRef<KeyValueRef>> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
Standalone<RangeResultRef> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
if (!data.size()) break;
((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end );
@ -2631,7 +2629,8 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
stopAllTLogs(self, recruited.id());
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags) );
bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned);
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.allTags, recovering ? "Recovered" : "Recruited") );
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
@ -2649,7 +2648,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
throw logData->removed.getError();
}
if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) {
if (recovering) {
logData->unrecoveredBefore = req.startVersion;
logData->recoveredAt = req.recoverAt;
logData->knownCommittedVersion = req.startVersion - 1;
@ -2758,13 +2757,11 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
}
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
state TLogData self( tlogId, workerID, persistentData, persistentQueue, db, degraded, folder );
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
TraceEvent("SharedTlog", tlogId);
// FIXME: Pass the worker id instead of stubbing it
startRole(Role::SHARED_TRANSACTION_LOG, tlogId, UID());
try {
if(restoreFromDisk) {
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
@ -2808,7 +2805,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
} catch (Error& e) {
self.terminated.send(Void());
TraceEvent("TLogError", tlogId).error(e, true);
endRole(Role::SHARED_TRANSACTION_LOG, tlogId, "Error", true);
if(recovered.canBeSet()) recovered.send(Void());
while(!tlogRequests.isEmpty()) {

View File

@ -4859,22 +4859,26 @@ public:
m_tree->set(keyValue);
}
Future< Standalone< VectorRef< KeyValueRef > > > readRange(KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30) {
Future< Standalone< RangeResultRef > > readRange(KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30) {
debug_printf("READRANGE %s\n", printable(keys).c_str());
return catchError(readRange_impl(this, keys, rowLimit, byteLimit));
}
ACTOR static Future< Standalone< VectorRef< KeyValueRef > > > readRange_impl(KeyValueStoreRedwoodUnversioned *self, KeyRange keys, int rowLimit, int byteLimit) {
ACTOR static Future< Standalone< RangeResultRef > > readRange_impl(KeyValueStoreRedwoodUnversioned *self, KeyRange keys, int rowLimit, int byteLimit) {
self->m_tree->counts.getRanges++;
state Standalone<VectorRef<KeyValueRef>> result;
state Standalone<RangeResultRef> result;
state int accumulatedBytes = 0;
ASSERT( byteLimit > 0 );
if(rowLimit == 0) {
return result;
}
state Reference<IStoreCursor> cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion());
// Prefetch is currently only done in the forward direction
state int prefetchBytes = rowLimit > 1 ? byteLimit : 0;
if(rowLimit >= 0) {
if(rowLimit > 0) {
wait(cur->findFirstEqualOrGreater(keys.begin, prefetchBytes));
while(cur->isValid() && cur->getKey() < keys.end) {
KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue()));
@ -4900,6 +4904,12 @@ public:
wait(cur->prev());
}
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if(result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size()-1].key;
}
return result;
}

View File

@ -418,7 +418,7 @@ private:
}
};
void startRole(const Role &role, UID roleId, UID workerId, std::map<std::string, std::string> details = std::map<std::string, std::string>(), std::string origination = "Recruited");
void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details = std::map<std::string, std::string>(), const std::string &origination = "Recruited");
void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error e = Error());
struct ServerDBInfo;
@ -455,8 +455,8 @@ ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMaste
Reference<AsyncVar<ServerDBInfo>> db, std::string whitelistBinPaths);
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID,
bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
@ -474,13 +474,13 @@ void updateCpuProfiler(ProfilerRequest req);
namespace oldTLog_4_6 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId);
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId, UID workerID);
}
namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID,
bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
}

View File

@ -665,8 +665,8 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
TraceEvent("MasterRecovering", self->dbgid).detail("LastEpochEnd", self->lastEpochEnd).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);
Standalone<VectorRef<KeyValueRef>> rawConf = wait( self->txnStateStore->readRange( configKeys ) );
self->configuration.fromKeyValues( rawConf );
Standalone<RangeResultRef> rawConf = wait( self->txnStateStore->readRange( configKeys ) );
self->configuration.fromKeyValues( rawConf.castTo<VectorRef<KeyValueRef>>() );
self->originalConfiguration = self->configuration;
self->hasConfiguration = true;
@ -676,13 +676,13 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
.detail("Conf", self->configuration.toString())
.trackLatest("RecoveredConfig");
Standalone<VectorRef<KeyValueRef>> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) );
Standalone<RangeResultRef> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) );
self->dcId_locality.clear();
for(auto& kv : rawLocalities) {
self->dcId_locality[decodeTagLocalityListKey(kv.key)] = decodeTagLocalityListValue(kv.value);
}
Standalone<VectorRef<KeyValueRef>> rawTags = wait( self->txnStateStore->readRange( serverTagKeys ) );
Standalone<RangeResultRef> rawTags = wait( self->txnStateStore->readRange( serverTagKeys ) );
self->allTags.clear();
if(self->forceRecovery) {
@ -699,7 +699,7 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
}
}
Standalone<VectorRef<KeyValueRef>> rawHistoryTags = wait( self->txnStateStore->readRange( serverTagHistoryKeys ) );
Standalone<RangeResultRef> rawHistoryTags = wait( self->txnStateStore->readRange( serverTagHistoryKeys ) );
for(auto& kv : rawHistoryTags) {
self->allTags.push_back(decodeServerTagValue( kv.value ));
}
@ -722,13 +722,13 @@ ACTOR Future<Void> sendInitialCommitToResolvers( Reference<MasterData> self ) {
state Sequence txnSequence = 0;
ASSERT(self->recoveryTransactionVersion);
state Standalone<VectorRef<KeyValueRef>> data = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
state Standalone<RangeResultRef> data = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
state vector<Future<Void>> txnReplies;
state int64_t dataOutstanding = 0;
loop {
if(!data.size()) break;
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
Standalone<VectorRef<KeyValueRef>> nextData = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
Standalone<RangeResultRef> nextData = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
for(auto& r : self->proxies) {
TxnStateRequest req;

View File

@ -185,7 +185,7 @@ struct StorageServerDisk {
Future<Key> readNextKeyInclusive( KeyRef key ) { return readFirstKey(storage, KeyRangeRef(key, allKeys.end)); }
Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) { return storage->readValue(key, debugID); }
Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>() ) { return storage->readValuePrefix(key, maxLength, debugID); }
Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) { return storage->readRange(keys, rowLimit, byteLimit); }
Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) { return storage->readRange(keys, rowLimit, byteLimit); }
KeyValueStoreType getKeyValueStoreType() { return storage->getType(); }
StorageBytes getStorageBytes() { return storage->getStorageBytes(); }
@ -197,7 +197,7 @@ private:
void writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext );
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
Standalone<VectorRef<KeyValueRef>> r = wait( storage->readRange( range, 1 ) );
Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) );
if (r.size()) return r[0].key;
else return range.end;
}
@ -1045,17 +1045,19 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
// Combines data from base (at an older version) with sets from newer versions in [start, end) and appends the first (up to) |limit| rows to output
// If limit<0, base and output are in descending order, and start->key()>end->key(), but start is still inclusive and end is exclusive
{
if (limit==0) return;
int originalLimit = abs(limit) + output.size();
ASSERT(limit != 0);
bool forward = limit>0;
if (!forward) limit = -limit;
int adjustedLimit = limit + output.size();
int accumulatedBytes = 0;
KeyValueRef const* baseStart = base.begin();
KeyValueRef const* baseEnd = base.end();
while (baseStart!=baseEnd && start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
if (forward ? baseStart->key < start.key() : baseStart->key > start.key())
while (baseStart!=baseEnd && start!=end && output.size() < adjustedLimit && accumulatedBytes < limitBytes) {
if (forward ? baseStart->key < start.key() : baseStart->key > start.key()) {
output.push_back_deep( arena, *baseStart++ );
}
else {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
if (baseStart->key == start.key()) ++baseStart;
@ -1063,18 +1065,17 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
}
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
while (baseStart!=baseEnd && --limit>=0 && accumulatedBytes < limitBytes) {
while (baseStart!=baseEnd && output.size() < adjustedLimit && accumulatedBytes < limitBytes) {
output.push_back_deep( arena, *baseStart++ );
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
if( !stopAtEndOfBase ) {
while (start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
while (start!=end && output.size() < adjustedLimit && accumulatedBytes < limitBytes) {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
if (forward) ++start; else --start;
}
}
ASSERT( output.size() <= originalLimit );
}
// readRange reads up to |limit| rows from the given range and version, combining data->storage and data->versionedData.
@ -1089,14 +1090,8 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount;
//state UID rrid = deterministicRandom()->randomUniqueID();
//state int originalLimit = limit;
//state int originalLimitBytes = *pLimitBytes;
//state bool track = rrid.first() == 0x1bc134c2f752187cLL;
// FIXME: Review pLimitBytes behavior
// if (limit >= 0) we are reading forward, else backward
if (limit >= 0) {
// We might care about a clear beginning before start that
// runs into range
@ -1108,20 +1103,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
vStart = view.lower_bound(readBegin);
/*if (track) {
printf("readRange(%llx, @%lld, '%s'-'%s')\n", data->thisServerID.first(), version, printable(range.begin).c_str(), printable(range.end).c_str());
printf("mvcc:\n");
vEnd = view.upper_bound(range.end);
for(auto r=vStart; r != vEnd; ++r) {
if (r->isClearTo())
printf(" '%s'-'%s' cleared\n", printable(r.key()).c_str(), printable(r->getEndKey()).c_str());
else
printf(" '%s' := '%s'\n", printable(r.key()).c_str(), printable(r->getValue()).c_str());
}
}*/
while (limit>0 && *pLimitBytes>0 && readBegin < range.end) {
// ASSERT( vStart == view.lower_bound(readBegin) );
ASSERT( !vStart || vStart.key() >= readBegin );
if (vStart) { auto b = vStart; --b; ASSERT( !b || b.key() < readBegin ); }
ASSERT( data->storageVersion() <= version );
@ -1138,93 +1120,58 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
// Read the data on disk up to vEnd (or the end of the range)
readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
Standalone<RangeResultRef> atStorageVersion = wait(
data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
/*if (track) {
printf("read [%s,%s): %d rows\n", printable(readBegin).c_str(), printable(readEnd).c_str(), atStorageVersion.size());
for(auto r=atStorageVersion.begin(); r != atStorageVersion.end(); ++r)
printf(" '%s' := '%s'\n", printable(r->key).c_str(), printable(r->value).c_str());
}*/
ASSERT( atStorageVersion.size() <= limit );
if (data->storageVersion() > version) throw transaction_too_old();
bool more = atStorageVersion.size()!=0;
// merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if there is 'more'
// merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if we were limited
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, more, *pLimitBytes );
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, atStorageVersion.more, *pLimitBytes );
limit -= result.data.size() - prevSize;
for (auto i = result.data.begin() + prevSize; i != result.data.end(); i++)
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
// Setup for the next iteration
if (more) { // if there might be more data, begin reading right after what we already found to find out
//if (track) printf("more\n");
if (!(limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key))
TraceEvent(SevError, "ReadRangeIssue", data->thisServerID).detail("ReadBegin", readBegin).detail("ReadEnd", readEnd)
.detail("VStart", vStart ? vStart.key() : LiteralStringRef("nil")).detail("VEnd", vEnd ? vEnd.key() : LiteralStringRef("nil"))
.detail("AtStorageVersionBack", atStorageVersion.end()[-1].key).detail("ResultBack", result.data.end()[-1].key)
.detail("Limit", limit).detail("LimitBytes", *pLimitBytes).detail("ResultSize", result.data.size()).detail("PrevSize", prevSize);
readBegin = readBeginTemp = keyAfter( result.data.end()[-1].key );
ASSERT( limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key );
} else if (vStart && vStart->isClearTo()){ // if vStart is a clear, skip it.
//if (track) printf("skip clear\n");
readBegin = vStart->getEndKey(); // next disk read should start at the end of the clear
++vStart;
} else { // Otherwise, continue at readEnd
//if (track) printf("continue\n");
readBegin = readEnd;
}
}
// all but the last item are less than *pLimitBytes
ASSERT( result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0 );
/*if (*pLimitBytes <= 0)
TraceEvent(SevWarn, "ReadRangeLimitExceeded")
.detail("Version", version)
.detail("Begin", range.begin )
.detail("End", range.end )
.detail("LimitReamin", limit)
.detail("LimitBytesRemain", *pLimitBytes); */
if (limit <=0 || *pLimitBytes <= 0) {
break;
}
/*GetKeyValuesReply correct = wait( readRangeOld(data, version, range, originalLimit, originalLimitBytes) );
bool prefix_equal = true;
int totalsize = 0;
int first_difference = -1;
for(int i=0; i<result.data.size() && i<correct.data.size(); i++) {
if (result.data[i] != correct.data[i]) {
first_difference = i;
prefix_equal = false;
// If we hit our limits reading from disk but then combining with MVCC gave us back more room
if (atStorageVersion.more) {
ASSERT(result.data.end()[-1].key == atStorageVersion.end()[-1].key);
readBegin = readBeginTemp = keyAfter(result.data.end()[-1].key);
} else if (vEnd && vEnd->isClearTo()) {
ASSERT(vStart == vEnd); // vStart will have been advanced by merge()
ASSERT(vEnd->getEndKey() > readBegin);
readBegin = vEnd->getEndKey();
++vStart;
} else {
ASSERT(readEnd == range.end);
break;
}
totalsize += result.data[i].expectedSize() + sizeof(KeyValueRef);
}
// for the following check
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
result.version = version;
if ( !(totalsize>originalLimitBytes ? prefix_equal : result.data==correct.data) || correct.more != result.more ) {
TraceEvent(SevError, "IncorrectResult", rrid).detail("Server", data->thisServerID).detail("CorrectRows", correct.data.size())
.detail("FirstDifference", first_difference).detail("OriginalLimit", originalLimit)
.detail("ResultRows", result.data.size()).detail("Result0", result.data[0].key).detail("Correct0", correct.data[0].key)
.detail("ResultN", result.data.size() ? result.data[std::min(correct.data.size(),result.data.size())-1].key : "nil")
.detail("CorrectN", correct.data.size() ? correct.data[std::min(correct.data.size(),result.data.size())-1].key : "nil");
}*/
} else {
// Reverse read - abandon hope alle ye who enter here
readEnd = range.end;
vStart = view.lastLess(readEnd);
vStart = view.lastLess(range.end);
// A clear might extend all the way to range.end
if (vStart && vStart->isClearTo() && vStart->getEndKey() >= readEnd) {
if (vStart && vStart->isClearTo() && vStart->getEndKey() >= range.end) {
readEnd = vStart.key();
--vStart;
} else {
readEnd = range.end;
}
while (limit < 0 && *pLimitBytes > 0 && readEnd > range.begin) {
ASSERT(!vStart || vStart.key() < readEnd);
if (vStart) {
auto b = vStart;
++b;
ASSERT(!b || b.key() >= readEnd);
}
ASSERT(data->storageVersion() <= version);
vEnd = vStart;
vCount = 0;
int vSize=0;
@ -1234,30 +1181,42 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
--vEnd;
}
readBegin = range.begin;
if (vEnd)
readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
readBegin = vEnd ? std::max(vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key(), range.begin) : range.begin;
Standalone<RangeResultRef> atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes));
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait( data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit ) );
ASSERT(atStorageVersion.size() <= -limit);
if (data->storageVersion() > version) throw transaction_too_old();
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, false, *pLimitBytes );
merge(result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, atStorageVersion.more, *pLimitBytes);
limit += result.data.size() - prevSize;
for (auto i = result.data.begin() + prevSize; i != result.data.end(); i++)
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
vStart = vEnd;
readEnd = readBegin;
if (limit >=0 || *pLimitBytes <= 0) {
break;
}
if (vStart && vStart->isClearTo()) {
ASSERT( vStart.key() < readEnd );
readEnd = vStart.key();
if (atStorageVersion.more) {
ASSERT(result.data.end()[-1].key == atStorageVersion.end()[-1].key);
readEnd = result.data.end()[-1].key;
} else if (vEnd && vEnd->isClearTo()) {
ASSERT(vStart == vEnd);
ASSERT(vEnd.key() < readEnd)
readEnd = vEnd.key();
--vStart;
} else {
ASSERT(readBegin == range.begin);
break;
}
}
}
// all but the last item are less than *pLimitBytes
ASSERT(result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0);
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
result.version = version;
return result;
@ -3036,8 +2995,8 @@ ACTOR Future<Void> applyByteSampleResult( StorageServer* data, IKeyValueStore* s
state int totalKeys = 0;
state int totalBytes = 0;
loop {
Standalone<VectorRef<KeyValueRef>> bs = wait( storage->readRange( KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES ) );
if(results) results->push_back(bs);
Standalone<RangeResultRef> bs = wait( storage->readRange( KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES ) );
if(results) results->push_back(bs.castTo<VectorRef<KeyValueRef>>());
int rangeSize = bs.expectedSize();
totalFetches++;
totalKeys += bs.size();
@ -3118,8 +3077,8 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
state Future<Standalone<RangeResultRef>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
state Future<Standalone<RangeResultRef>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
state Promise<Void> byteSampleSampleRecovered;
state Promise<Void> startByteSampleRestore;
@ -3156,7 +3115,7 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
debug_checkRestoredVersion( data->thisServerID, version, "StorageServer" );
data->setInitialVersion( version );
state Standalone<VectorRef<KeyValueRef>> available = fShardAvailable.get();
state Standalone<RangeResultRef> available = fShardAvailable.get();
state int availableLoc;
for(availableLoc=0; availableLoc<available.size(); availableLoc++) {
KeyRangeRef keys(
@ -3170,7 +3129,7 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
wait(yield());
}
state Standalone<VectorRef<KeyValueRef>> assigned = fShardAssigned.get();
state Standalone<RangeResultRef> assigned = fShardAssigned.get();
state int assignedLoc;
for(assignedLoc=0; assignedLoc<assigned.size(); assignedLoc++) {
KeyRangeRef keys(
@ -3361,6 +3320,7 @@ ACTOR Future<Void> waitMetrics( StorageServerMetrics* self, WaitMetricsRequest r
if( timedout ) {
TEST( true ); // ShardWaitMetrics return on timeout
//FIXME: instead of using random chance, send wrong_shard_server when the call in from waitMetricsMultiple (requires additional information in the request)
if(deterministicRandom()->random01() < SERVER_KNOBS->WAIT_METRICS_WRONG_SHARD_CHANCE) {
req.reply.sendError( wrong_shard_server() );
} else {

View File

@ -618,7 +618,7 @@ Standalone<StringRef> roleString(std::set<std::pair<std::string, std::string>> r
return StringRef(result);
}
void startRole(const Role &role, UID roleId, UID workerId, std::map<std::string, std::string> details, std::string origination) {
void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details, const std::string &origination) {
if(role.includeInTraceRoles) {
addTraceRole(role.abbreviation);
}
@ -921,7 +921,7 @@ ACTOR Future<Void> workerServer(
auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery, folder, degraded, activeSharedTLog );
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.actor.isValid() || logData.actor.isReady() ? logData.requests : PromiseStream<InitializeTLogRequest>(), s.storeID, interf.id(), true, oldLog, recovery, folder, degraded, activeSharedTLog );
recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID);
@ -1087,7 +1087,7 @@ ACTOR Future<Void> workerServer(
filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() );
Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog );
Future<Void> tLogCore = tLogFn( data, queue, dbInfo, locality, logData.requests, logId, interf.id(), false, Promise<Void>(), Promise<Void>(), folder, degraded, activeSharedTLog );
tLogCore = handleIOErrors( tLogCore, data, logId );
tLogCore = handleIOErrors( tLogCore, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore ) );
@ -1383,37 +1383,46 @@ ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
state UID processIDUid;
platform::createDirectory(folder);
try {
state std::string lockFilePath = joinPath(folder, "processId");
state ErrorOr<Reference<IAsyncFile>> lockFile = wait(errorOr(IAsyncFileSystem::filesystem(g_network)->open(lockFilePath, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK, 0600)));
loop {
try {
state std::string lockFilePath = joinPath(folder, "processId");
state ErrorOr<Reference<IAsyncFile>> lockFile = wait(errorOr(IAsyncFileSystem::filesystem(g_network)->open(lockFilePath, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK, 0600)));
if (lockFile.isError() && lockFile.getError().code() == error_code_file_not_found && !fileExists(lockFilePath)) {
Reference<IAsyncFile> _lockFile = wait(IAsyncFileSystem::filesystem()->open(lockFilePath, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK | IAsyncFile::OPEN_READWRITE, 0600));
lockFile = _lockFile;
processIDUid = deterministicRandom()->randomUniqueID();
BinaryWriter wr(IncludeVersion());
wr << processIDUid;
wait(lockFile.get()->write(wr.getData(), wr.getLength(), 0));
wait(lockFile.get()->sync());
}
else {
if (lockFile.isError()) throw lockFile.getError(); // If we've failed to open the file, throw an exception
if (lockFile.isError() && lockFile.getError().code() == error_code_file_not_found && !fileExists(lockFilePath)) {
Reference<IAsyncFile> _lockFile = wait(IAsyncFileSystem::filesystem()->open(lockFilePath, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK | IAsyncFile::OPEN_READWRITE, 0600));
lockFile = _lockFile;
processIDUid = deterministicRandom()->randomUniqueID();
BinaryWriter wr(IncludeVersion());
wr << processIDUid;
wait(lockFile.get()->write(wr.getData(), wr.getLength(), 0));
wait(lockFile.get()->sync());
}
else {
if (lockFile.isError()) throw lockFile.getError(); // If we've failed to open the file, throw an exception
int64_t fileSize = wait(lockFile.get()->size());
state Key fileData = makeString(fileSize);
wait(success(lockFile.get()->read(mutateString(fileData), fileSize, 0)));
processIDUid = BinaryReader::fromStringRef<UID>(fileData, IncludeVersion());
int64_t fileSize = wait(lockFile.get()->size());
state Key fileData = makeString(fileSize);
wait(success(lockFile.get()->read(mutateString(fileData), fileSize, 0)));
processIDUid = BinaryReader::fromStringRef<UID>(fileData, IncludeVersion());
}
return processIDUid;
}
}
catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
if (!e.isInjectedFault())
catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (!e.isInjectedFault()) {
fprintf(stderr, "ERROR: error creating or opening process id file `%s'.\n", joinPath(folder, "processId").c_str());
}
TraceEvent(SevError, "OpenProcessIdError").error(e);
if(!g_network->isSimulated()) {
throw;
}
deleteFile(lockFilePath);
}
throw;
}
return processIDUid;
}
ACTOR Future<Void> fdbd(

View File

@ -46,7 +46,7 @@ struct DDMetricsExcludeWorkload : TestWorkload {
ACTOR static Future<double> getMovingDataAmount(Database cx, DDMetricsExcludeWorkload* self) {
try {
StatusObject statusObj = wait(StatusClient::statusFetcher(cx->getConnectionFile()));
StatusObject statusObj = wait(StatusClient::statusFetcher(cx));
StatusObjectReader statusObjCluster;
((StatusObjectReader)statusObj).get("cluster", statusObjCluster);
StatusObjectReader statusObjData;

View File

@ -270,7 +270,7 @@ ACTOR Future<Void> testKVStoreMain( KVStoreTestWorkload* workload, KVTest* ptest
state Key k;
state double cst = timer();
while (true) {
Standalone<VectorRef<KeyValueRef>> kv = wait( test.store->readRange( KeyRangeRef(k, LiteralStringRef("\xff\xff\xff\xff")), 1000 ) );
Standalone<RangeResultRef> kv = wait( test.store->readRange( KeyRangeRef(k, LiteralStringRef("\xff\xff\xff\xff")), 1000 ) );
count += kv.size();
if (kv.size() < 1000) break;
k = keyAfter( kv[ kv.size()-1 ].key );

View File

@ -69,7 +69,7 @@ struct StatusWorkload : TestWorkload {
if (clientId != 0)
return Void();
return success(timeout(fetcher(cx->getConnectionFile(), this), testDuration));
return success(timeout(fetcher(cx, this), testDuration));
}
virtual Future<bool> check(Database const& cx) {
return errors.getValue() == 0;
@ -161,7 +161,7 @@ struct StatusWorkload : TestWorkload {
}
}
ACTOR Future<Void> fetcher(Reference<ClusterConnectionFile> connFile, StatusWorkload *self) {
ACTOR Future<Void> fetcher(Database cx, StatusWorkload *self) {
state double lastTime = now();
loop{
@ -170,7 +170,7 @@ struct StatusWorkload : TestWorkload {
// Since we count the requests that start, we could potentially never really hear back?
++self->requests;
state double issued = now();
StatusObject result = wait(StatusClient::statusFetcher(connFile));
StatusObject result = wait(StatusClient::statusFetcher(cx));
++self->replies;
BinaryWriter br(AssumeVersion(currentProtocolVersion));
save(br, result);

View File

@ -519,6 +519,10 @@ public:
}
#endif
template <class U> Standalone<U> castTo() const {
return Standalone<U>(*this, arena());
}
template <class Archive>
void serialize(Archive& ar) {
// FIXME: something like BinaryReader(ar) >> arena >> *(T*)this; to guarantee standalone arena???

View File

@ -105,6 +105,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
//GenericActors
init( BUGGIFY_FLOW_LOCK_RELEASE_DELAY, 1.0 );
init( LOW_PRIORITY_DELAY_COUNT, 5 );
//IAsyncFile
init( INCREMENTAL_DELETE_TRUNCATE_AMOUNT, 5e8 ); //500MB

View File

@ -125,6 +125,7 @@ public:
//GenericActors
double BUGGIFY_FLOW_LOCK_RELEASE_DELAY;
int LOW_PRIORITY_DELAY_COUNT;
//IAsyncFile
int64_t INCREMENTAL_DELETE_TRUNCATE_AMOUNT;

View File

@ -122,6 +122,7 @@ public:
// INetwork interface
virtual double now() { return currentTime; };
virtual double timer() { return ::timer(); };
virtual Future<Void> delay( double seconds, TaskPriority taskId );
virtual Future<class Void> yield( TaskPriority taskID );
virtual bool check_yield(TaskPriority taskId);

View File

@ -83,3 +83,12 @@ ACTOR Future<bool> quorumEqualsTrue( std::vector<Future<bool>> futures, int requ
}
}
}
ACTOR Future<Void> lowPriorityDelay( double waitTime ) {
state int loopCount = 0;
while(loopCount < FLOW_KNOBS->LOW_PRIORITY_DELAY_COUNT) {
wait(delay(waitTime/FLOW_KNOBS->LOW_PRIORITY_DELAY_COUNT, TaskPriority::Low));
loopCount++;
}
return Void();
}

View File

@ -823,6 +823,7 @@ Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Refer
Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );
Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double const& logDelay, const char* const& context, UID const& id );
Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
Future<Void> lowPriorityDelay( double const& waitTime );
ACTOR template <class T>
Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {

View File

@ -59,12 +59,11 @@ enum class TaskPriority {
TLogCommitReply = 8580,
TLogCommit = 8570,
ProxyGetRawCommittedVersion = 8565,
ProxyCommitYield3 = 8562,
ProxyTLogCommitReply = 8560,
ProxyMasterVersionReply = 8560,
ProxyCommitYield2 = 8557,
ProxyResolverReply = 8555,
ProxyMasterVersionReply = 8550,
ProxyCommitYield1 = 8547,
ProxyTLogCommitReply = 8555,
ProxyCommitYield1 = 8550,
ProxyResolverReply = 8547,
ProxyCommit = 8545,
ProxyCommitBatcher = 8540,
TLogConfirmRunningReply = 8530,
@ -420,6 +419,10 @@ public:
// Provides a clock that advances at a similar rate on all connected endpoints
// FIXME: Return a fixed point Time class
virtual double timer() = 0;
// A wrapper for directly getting the system time. The time returned by now() only updates in the run loop,
// so it cannot be used to measure times of functions that do not have wait statements.
virtual Future<class Void> delay( double seconds, TaskPriority taskID ) = 0;
// The given future will be set after seconds have elapsed

View File

@ -282,7 +282,7 @@ struct _IncludeVersion {
ar >> v;
if (!v.isValid()) {
auto err = incompatible_protocol_version();
TraceEvent(SevError, "InvalidSerializationVersion").error(err).detailf("Version", "%llx", v);
TraceEvent(SevWarnAlways, "InvalidSerializationVersion").error(err).detailf("Version", "%llx", v);
throw err;
}
if (v > currentProtocolVersion) {

View File

@ -19,6 +19,10 @@ Requires: foundationdb-clients = %{version}-%{release}
Conflicts: foundationdb < 0.1.4
ifdef(`RHEL6', `Requires(post): chkconfig >= 0.9, /sbin/service')
Requires(pre): /usr/sbin/useradd, /usr/sbin/groupadd, /usr/bin/getent
# This is a heavy hammer, to remove /usr/bin/python as a dependency,
# as it also removes dependencies like glibc. However, none of the
# other strategies (__requires_exclude) seem to work.
AutoReq: 0
%package clients
Summary: FoundationDB clients and library