Merge pull request #4105 from sfc-gh-anoyes/anoyes/release-6.3-merge

Merge release-6.2 into release-6.3 and fix conflicts
This commit is contained in:
Markus Pilman 2020-11-25 11:16:05 -07:00 committed by GitHub
commit 18ba83fc3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 611 additions and 218 deletions

View File

@ -2,12 +2,16 @@
Release Notes
#############
6.2.29
======
* Fix invalid memory access on data distributor when snapshotting large clusters. `(PR #4076) <https://github.com/apple/foundationdb/pull/4076>`_
* Add human-readable DateTime to trace events `(PR #4087) <https://github.com/apple/foundationdb/pull/4087>`_
6.2.28
======
* Log detailed team collection information when median available space ratio of all teams is too low. `(PR #3912) <https://github.com/apple/foundationdb/pull/3912>`_
* Bug fix, blob client did not support authentication key sizes over 64 bytes. `(PR #3964) <https://github.com/apple/foundationdb/pull/3964>`_
6.2.27
======
* For clusters with a large number of shards, avoid slow tasks in the data distributor by adding yields to the shard map destruction. `(PR #3834) <https://github.com/apple/foundationdb/pull/3834>`_

View File

@ -37,16 +37,16 @@ typedef StringRef ValueRef;
typedef int64_t Generation;
enum {
tagLocalitySpecial = -1,
tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2)
tagLocalityLogRouter = -2,
tagLocalityRemoteLog = -3,
tagLocalityRemoteLog = -3, // tag created by log router for remote tLogs
tagLocalityUpgraded = -4,
tagLocalitySatellite = -5,
tagLocalityLogRouterMapped = -6, // used by log router to pop from TLogs
tagLocalityLogRouterMapped = -6, // The pseudo tag used by log routers to pop the real LogRouter tag (i.e., -2)
tagLocalityTxs = -7,
tagLocalityBackup = -8, // used by backup role to pop from TLogs
tagLocalityInvalid = -99
}; //The TLog and LogRouter require these number to be as compact as possible
}; // The TLog and LogRouter require these number to be as compact as possible
inline bool isPseudoLocality(int8_t locality) {
return locality == tagLocalityLogRouterMapped || locality == tagLocalityBackup;
@ -54,6 +54,11 @@ inline bool isPseudoLocality(int8_t locality) {
#pragma pack(push, 1)
struct Tag {
// if locality > 0,
// locality decides which DC id the tLog is in;
// id decides which SS owns the tag; id <-> SS mapping is in the system keyspace: serverTagKeys.
// if locality < 0, locality decides the type of tLog set: satellite, LR, or remote tLog, etc.
// id decides which tLog in the tLog type will be used.
int8_t locality;
uint16_t id;

View File

@ -1515,6 +1515,12 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
}
}
// Get the SS locations for each shard in the 'keys' key-range;
// Returned vector size is the number of shards in the input keys key-range.
// Returned vector element is <ShardRange, storage server location info> pairs, where
// ShardRange is the whole shard key-range, not a part of the given key range.
// Example: If query the function with key range (b, d), the returned list of pairs could be something like:
// [([a, b1), locationInfo), ([b1, c), locationInfo), ([c, d1), locationInfo)].
template <class F>
Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database const& cx, KeyRange const& keys, int limit, bool reverse, F StorageServerInterface::*member, TransactionInfo const& info ) {
ASSERT (!keys.empty());

View File

@ -171,14 +171,11 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis
// failMon's information for load balancing and avoiding failed servers
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers
ACTOR template <class Interface, class Request, class Multi>
Future< REPLY_TYPE(Request) > loadBalance(
Reference<MultiInterface<Multi>> alternatives,
RequestStream<Request> Interface::* channel,
Request request = Request(),
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = NULL)
{
Future<REPLY_TYPE(Request)> loadBalance(
Reference<MultiInterface<Multi>> alternatives, RequestStream<Request> Interface::*channel,
Request request = Request(), TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = NULL) {
state Future<Optional<REPLY_TYPE(Request)>> firstRequest;
state Optional<uint64_t> firstRequestEndpoint;
state Future<Optional<REPLY_TYPE(Request)>> secondRequest;

View File

@ -4779,7 +4779,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
state MoveKeysLock lock;
state Reference<DDTeamCollection> primaryTeamCollection;
state Reference<DDTeamCollection> remoteTeamCollection;
state bool trackerCancelled;
loop {
trackerCancelled = false;
try {
loop {
TraceEvent("DDInitTakingMoveKeysLock", self->ddId);
@ -4943,7 +4945,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
actors.push_back(reportErrorsExcept(
dataDistributionTracker(initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics,
getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart,
anyZeroHealthyTeams, self->ddId, &shards),
anyZeroHealthyTeams, self->ddId, &shards, &trackerCancelled),
"DDTracker", self->ddId, &normalDDQueueErrors()));
actors.push_back(reportErrorsExcept(
dataDistributionQueue(cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis,
@ -4977,6 +4979,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
return Void();
}
catch( Error &e ) {
trackerCancelled = true;
state Error err = e;
TraceEvent("DataDistributorDestroyTeamCollections").error(e);
self->teamCollection = nullptr;

View File

@ -212,7 +212,7 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
int shardCount; // number of smaller shards whose metrics are aggregated in the ShardMetrics
bool operator==(ShardMetrics const& rhs) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime &&
@ -229,33 +229,22 @@ struct ShardTrackedData {
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData,
Database const& cx,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
PromiseStream<GetMetricsListRequest> const& getShardMetricsList,
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart,
Reference<AsyncVar<bool>> const& zeroHealthyTeams,
UID const& distributorId,
KeyRangeMap<ShardTrackedData>* const& shards);
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId, KeyRangeMap<ShardTrackedData>* shards,
bool const* trackerCancelled);
Future<Void> dataDistributionQueue(
Database const& cx,
PromiseStream<RelocateShard> const& output,
FutureStream<RelocateShard> const& input,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
Reference<AsyncVar<bool>> const& processingUnhealthy,
vector<TeamCollectionInterface> const& teamCollection,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
MoveKeysLock const& lock,
PromiseStream<Promise<int64_t>> const& getAverageShardBytes,
UID const& distributorId,
int const& teamSize,
int const& singleRegionTeamSize,
double* const& lastLimited);
ACTOR Future<Void> dataDistributionQueue(
Database cx, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics, Reference<AsyncVar<bool>> processingUnhealthy,
vector<TeamCollectionInterface> teamCollection, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock, PromiseStream<Promise<int64_t>> getAverageShardBytes, UID distributorId, int teamSize,
int singleRegionTeamSize, double* lastLimited);
//Holds the permitted size and IO Bounds for a shard
struct ShardSizeBounds {

View File

@ -18,8 +18,9 @@
* limitations under the License.
*/
#include <numeric>
#include <limits>
#include <numeric>
#include <vector>
#include "flow/ActorCollection.h"
#include "flow/Util.h"
@ -82,8 +83,8 @@ struct RelocateData {
};
class ParallelTCInfo : public ReferenceCounted<ParallelTCInfo>, public IDataDistributionTeam {
vector<Reference<IDataDistributionTeam>> teams;
vector<UID> tempServerIDs;
std::vector<Reference<IDataDistributionTeam>> teams;
std::vector<UID> tempServerIDs;
int64_t sum(std::function<int64_t(IDataDistributionTeam const&)> func) const {
int64_t result = 0;
@ -94,11 +95,11 @@ class ParallelTCInfo : public ReferenceCounted<ParallelTCInfo>, public IDataDist
}
template <class T>
vector<T> collect(std::function<vector<T>(IDataDistributionTeam const&)> func) const {
vector<T> result;
std::vector<T> collect(std::function<vector<T>(IDataDistributionTeam const&)> func) const {
std::vector<T> result;
for (const auto& team : teams) {
vector<T> newItems = func(*team);
std::vector<T> newItems = func(*team);
result.insert(result.end(), newItems.begin(), newItems.end());
}
return result;
@ -124,7 +125,7 @@ public:
return !any([func](IDataDistributionTeam const& team) { return !func(team); });
}
vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
std::vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
return collect<StorageServerInterface>(
[](IDataDistributionTeam const& team) { return team.getLastKnownServerInterfaces(); });
}
@ -137,11 +138,11 @@ public:
return totalSize;
}
vector<UID> const& getServerIDs() const override {
std::vector<UID> const& getServerIDs() const override {
static vector<UID> tempServerIDs;
tempServerIDs.clear();
for (const auto& team : teams) {
vector<UID> const &childIDs = team->getServerIDs();
std::vector<UID> const& childIDs = team->getServerIDs();
tempServerIDs.insert(tempServerIDs.end(), childIDs.begin(), childIDs.end());
}
return tempServerIDs;
@ -184,7 +185,7 @@ public:
}
virtual Future<Void> updateStorageMetrics() {
vector<Future<Void>> futures;
std::vector<Future<Void>> futures;
for (auto& team : teams) {
futures.push_back(team->updateStorageMetrics());
@ -247,7 +248,7 @@ public:
};
struct Busyness {
vector<int> ledger;
std::vector<int> ledger;
Busyness() : ledger( 10, 0 ) {}
@ -551,8 +552,8 @@ struct DDQueueData {
if(keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
for( int shard = 0; shard < keyServersEntries.size(); shard++ ) {
vector<UID> src, dest;
decodeKeyServersValue( UIDtoTagMap, keyServersEntries[shard].value, src, dest );
std::vector<UID> src, dest;
decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
ASSERT( src.size() );
for( int i = 0; i < src.size(); i++ ) {
servers.insert( src[i] );
@ -855,7 +856,7 @@ struct DDQueueData {
startedHere++;
// update both inFlightActors and inFlight key range maps, cancelling deleted RelocateShards
vector<KeyRange> ranges;
std::vector<KeyRange> ranges;
inFlightActors.getRangesAffectedByInsertion( rd.keys, ranges );
inFlightActors.cancel( KeyRangeRef( ranges.front().begin, ranges.back().end ) );
inFlight.insert( rd.keys, rd );
@ -1437,7 +1438,7 @@ ACTOR Future<Void> dataDistributionQueue(
state RelocateData launchData;
state Future<Void> recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL);
state vector<Future<Void>> balancingFutures;
state std::vector<Future<Void>> balancingFutures;
state ActorCollectionNoErrors actors;
state PromiseStream<KeyRange> rangesComplete;

View File

@ -91,14 +91,43 @@ struct DataDistributionTracker {
// Read hot detection
PromiseStream<KeyRange> readHotShard;
// The reference to trackerCancelled must be extracted by actors,
// because by the time (trackerCancelled == true) this memory cannot
// be accessed
bool const& trackerCancelled;
// This class extracts the trackerCancelled reference from a DataDistributionTracker object
// Because some actors spawned by the dataDistributionTracker outlive the DataDistributionTracker
// object, we must guard against memory errors by using a GetTracker functor to access
// the DataDistributionTracker object.
class SafeAccessor {
bool const& trackerCancelled;
DataDistributionTracker& tracker;
public:
SafeAccessor(DataDistributionTracker* tracker)
: trackerCancelled(tracker->trackerCancelled), tracker(*tracker) {
ASSERT(!trackerCancelled);
}
DataDistributionTracker* operator()() {
if (trackerCancelled) {
TEST(true); // Trying to access DataDistributionTracker after tracker has been cancelled
throw dd_tracker_cancelled();
}
return &tracker;
}
};
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
Reference<AsyncVar<bool>> anyZeroHealthyTeams, KeyRangeMap<ShardTrackedData>& shards)
Reference<AsyncVar<bool>> anyZeroHealthyTeams, KeyRangeMap<ShardTrackedData>& shards,
bool const& trackerCancelled)
: cx(cx), distributorId(distributorId), dbSizeEstimate(new AsyncVar<int64_t>()), systemSizeEstimate(0),
maxShardSize(new AsyncVar<Optional<int64_t>>()), sizeChanges(false), readyToStart(readyToStart), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams),
shards(shards) {}
shards(shards), trackerCancelled(trackerCancelled) {}
~DataDistributionTracker()
{
@ -150,7 +179,7 @@ int64_t getMaxShardSize( double dbSizeEstimate ) {
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
}
ACTOR Future<Void> trackShardMetrics(DataDistributionTracker* self, KeyRange keys,
ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self, KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics) {
state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus( shardMetrics->get().get().metrics ) : BandwidthStatusNormal;
state double lastLowBandwidthStartTime = shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now();
@ -209,7 +238,7 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker* self, KeyRange key
// TraceEvent("RHDTriggerReadHotLoggingForShard")
// .detail("ShardBegin", keys.begin.printable().c_str())
// .detail("ShardEnd", keys.end.printable().c_str());
self->readHotShard.send(keys);
self()->readHotShard.send(keys);
} else {
ASSERT(false);
}
@ -230,7 +259,8 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker* self, KeyRange key
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
loop {
Transaction tr(self->cx);
Transaction tr(self()->cx);
// metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range
std::pair<Optional<StorageMetrics>, int> metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, shardCount ) );
if(metrics.first.present()) {
BandwidthStatus newBandwidthStatus = getBandwidthStatus( metrics.first.get() );
@ -253,9 +283,11 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker* self, KeyRange key
.detail("TrackerID", trackerID);*/
if( shardMetrics->get().present() ) {
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes );
self()->dbSizeEstimate->set(self()->dbSizeEstimate->get() + metrics.first.get().bytes -
shardMetrics->get().get().metrics.bytes);
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate += metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes;
self()->systemSizeEstimate +=
metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes;
}
}
@ -272,8 +304,9 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker* self, KeyRange key
}
}
} catch( Error &e ) {
if (e.code() != error_code_actor_cancelled)
self->output.sendError(e); // Propagate failure to dataDistributionTracker
if (e.code() != error_code_actor_cancelled && e.code() != error_code_dd_tracker_cancelled) {
self()->output.sendError(e); // Propagate failure to dataDistributionTracker
}
throw e;
}
}
@ -561,6 +594,8 @@ Future<Void> shardMerger(
shardsMerged++;
auto shardBounds = getShardSizeBounds( merged, maxShardSize );
// If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it means
// the shard's metric may not be stable yet. So we cannot continue merging in this direction.
if( endingStats.bytes >= shardBounds.min.bytes ||
getBandwidthStatus( endingStats ) != BandwidthStatusLow ||
now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
@ -591,13 +626,21 @@ Future<Void> shardMerger(
//restarting shard tracker will derefenced values in the shard map, so make a copy
KeyRange mergeRange = merged;
// OldKeys: Shards in the key range are merged as one shard defined by NewKeys;
// NewKeys: New key range after shards are merged;
// EndingSize: The new merged shard size in bytes;
// BatchedMerges: The number of shards merged. Each shard is defined in self->shards;
// LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's status
// becomes BandwidthStatusLow less than DD_LOW_BANDWIDTH_DELAY ago, the merging logic will stop at the shard;
// ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may have
// more than 1 shards.
TraceEvent("RelocateShardMergeMetrics", self->distributorId)
.detail("OldKeys", keys)
.detail("NewKeys", mergeRange)
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged)
.detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime)
.detail("ShardCount", shardCount);
.detail("OldKeys", keys)
.detail("NewKeys", mergeRange)
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged)
.detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime)
.detail("ShardCount", shardCount);
if(mergeRange.begin < systemKeys.begin) {
self->systemSizeEstimate -= systemBytes;
@ -667,18 +710,14 @@ ACTOR Future<Void> shardEvaluator(
return Void();
}
ACTOR Future<Void> shardTracker(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize)
{
wait( yieldedFuture(self->readyToStart.getFuture()) );
ACTOR Future<Void> shardTracker(DataDistributionTracker::SafeAccessor self, KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize) {
wait(yieldedFuture(self()->readyToStart.getFuture()));
if( !shardSize->get().present() )
wait( shardSize->onChange() );
if( !self->maxShardSize->get().present() )
wait( yieldedFuture(self->maxShardSize->onChange()) );
if (!self()->maxShardSize->get().present()) wait(yieldedFuture(self()->maxShardSize->onChange()));
// Since maxShardSize will become present for all shards at once, avoid slow tasks with a short delay
wait( delay( 0, TaskPriority::DataDistribution ) );
@ -686,27 +725,26 @@ ACTOR Future<Void> shardTracker(
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
state Reference<HasBeenTrueFor> wantsToMerge( new HasBeenTrueFor( shardSize->get() ) );
/*TraceEvent("ShardTracker", self->distributorId)
.detail("Begin", keys.begin)
.detail("End", keys.end)
.detail("TrackerID", trackerID)
.detail("MaxBytes", self->maxShardSize->get().get())
.detail("ShardSize", shardSize->get().get().bytes)
.detail("BytesPerKSec", shardSize->get().get().bytesPerKSecond);*/
/*TraceEvent("ShardTracker", self()->distributorId)
.detail("Begin", keys.begin)
.detail("End", keys.end)
.detail("TrackerID", trackerID)
.detail("MaxBytes", self()->maxShardSize->get().get())
.detail("ShardSize", shardSize->get().get().bytes)
.detail("BytesPerKSec", shardSize->get().get().bytesPerKSecond);*/
try {
loop {
// Use the current known size to check for (and start) splits and merges.
wait( shardEvaluator( self, keys, shardSize, wantsToMerge ) );
wait(shardEvaluator(self(), keys, shardSize, wantsToMerge));
// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
// delay(0) mitigates the resulting SlowTask
wait( delay(0, TaskPriority::DataDistribution) );
}
} catch (Error& e) {
// If e is broken_promise then self may have already been deleted
if (e.code() != error_code_actor_cancelled && e.code() != error_code_broken_promise) {
self->output.sendError(e); // Propagate failure to dataDistributionTracker
if (e.code() != error_code_actor_cancelled && e.code() != error_code_dd_tracker_cancelled) {
self()->output.sendError(e); // Propagate failure to dataDistributionTracker
}
throw e;
}
@ -738,8 +776,8 @@ void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optio
ShardTrackedData data;
data.stats = shardMetrics;
data.trackShard = shardTracker(self, ranges[i], shardMetrics);
data.trackBytes = trackShardMetrics(self, ranges[i], shardMetrics);
data.trackShard = shardTracker(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics);
data.trackBytes = trackShardMetrics(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics);
self->shards.insert( ranges[i], data );
}
}
@ -862,9 +900,10 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId, KeyRangeMap<ShardTrackedData>* shards) {
UID distributorId, KeyRangeMap<ShardTrackedData>* shards,
bool const* trackerCancelled) {
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure,
anyZeroHealthyTeams, *shards);
anyZeroHealthyTeams, *shards, *trackerCancelled);
state Future<Void> loggingTrigger = Void();
state Future<Void> readHotDetect = readHotDetector(&self);
try {

View File

@ -579,6 +579,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0;
init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096;
init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, true ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = false;
init( REPORT_DD_METRICS, true );
init( DD_METRICS_REPORT_INTERVAL, 30.0 );
init( FETCH_KEYS_TOO_LONG_TIME_CRITERIA, 300.0 );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
@ -611,6 +614,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( MAX_STATUS_REQUESTS_PER_SECOND, 256.0 );
init( CONFIGURATION_ROWS_TO_FETCH, 20000 );
init( DISABLE_DUPLICATE_LOG_WARNING, false );
init( HISTOGRAM_REPORT_INTERVAL, 300.0 );
// IPager
init( PAGER_RESERVED_PAGES, 1 );

View File

@ -508,6 +508,9 @@ public:
double READ_TAG_MEASUREMENT_INTERVAL;
int64_t OPERATION_COST_BYTE_FACTOR;
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;
bool REPORT_DD_METRICS;
double DD_METRICS_REPORT_INTERVAL;
double FETCH_KEYS_TOO_LONG_TIME_CRITERIA;
//Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
@ -540,6 +543,7 @@ public:
double MAX_STATUS_REQUESTS_PER_SECOND;
int CONFIGURATION_ROWS_TO_FETCH;
bool DISABLE_DUPLICATE_LOG_WARNING;
double HISTOGRAM_REPORT_INTERVAL;
// IPager
int PAGER_RESERVED_PAGES;

View File

@ -78,20 +78,23 @@ struct LogRouterData {
const UID dbgid;
Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
Optional<UID> primaryPeekLocation;
NotifiedVersion version;
NotifiedVersion minPopped;
NotifiedVersion version; // The largest version at which the log router has peeked mutations
// from satellite tLog or primary tLogs.
NotifiedVersion minPopped; // The minimum version among all tags that has been popped by remote tLogs.
const Version startVersion;
Version minKnownCommittedVersion;
Version minKnownCommittedVersion; // The minimum durable version among all LRs.
// A LR's durable version is the maximum version of mutations that have been
// popped by remote tLog.
Version poppedVersion;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
Tag routerTag;
bool allowPops;
LogSet logSet;
bool foundEpochEnd;
double waitForVersionTime = 0;
double maxWaitForVersionTime = 0;
double getMoreTime = 0;
double maxGetMoreTime = 0;
bool foundEpochEnd; // Cluster is not fully recovered yet. LR has to handle recovery
double waitForVersionTime = 0; // The total amount of time LR waits for remote tLog to peek and pop its data.
double maxWaitForVersionTime = 0; // The max one-instance wait time when LR must wait for remote tLog to pop data.
double getMoreTime = 0; // The total amount of time LR waits for satellite tLog's data to become available.
double maxGetMoreTime = 0; // The max wait time LR spent in a pull-data-request to satellite tLog.
int64_t generation = -1;
Reference<Histogram> peekLatencyDist;
@ -103,7 +106,9 @@ struct LogRouterData {
std::map<UID, PeekTrackerData> peekTracker;
CounterCollection cc;
Counter getMoreCount, getMoreBlockedCount;
Counter getMoreCount; // Increase by 1 when LR tries to pull data from satellite tLog.
Counter
getMoreBlockedCount; // Increase by 1 if data is not available when LR tries to pull data from satellite tLog.
Future<Void> logger;
Reference<EventCacheHolder> eventCacheHolder;
@ -148,8 +153,10 @@ struct LogRouterData {
eventCacheHolder = Reference<EventCacheHolder>( new EventCacheHolder(dbgid.shortString() + ".PeekLocation") );
specialCounter(cc, "Version", [this](){ return this->version.get(); });
// FetchedVersions: How many version of mutations buffered at LR and have not been popped by remote tLogs
specialCounter(cc, "Version", [this]() { return this->version.get(); });
specialCounter(cc, "MinPopped", [this](){ return this->minPopped.get(); });
// TODO: Add minPopped locality and minPoppedId, similar as tLog Metrics
specialCounter(cc, "FetchedVersions", [this](){ return std::max<Version>(0, std::min<Version>(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS, this->version.get() - this->minPopped.get())); });
specialCounter(cc, "MinKnownCommittedVersion", [this](){ return this->minKnownCommittedVersion; });
specialCounter(cc, "PoppedVersion", [this](){ return this->poppedVersion; });
@ -222,8 +229,15 @@ ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
// Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version.
state double startTime = now();
if(self->version.get() < self->startVersion) {
// Log router needs to wait for remote tLogs to process data, whose version is less than self->startVersion,
// before the log router can pull more data (i.e., data after self->startVersion) from satellite tLog;
// This prevents LR from getting OOM due to it pulls too much data from satellite tLog at once;
// Note: each commit writes data to both primary tLog and satellite tLog. Satellite tLog can be viewed as
// a part of primary tLogs.
if(ver > self->startVersion) {
self->version.set(self->startVersion);
// Wait for remote tLog to peek and pop from LR,
// so that LR's minPopped version can increase to self->startVersion
wait(self->minPopped.whenAtLeast(self->version.get()));
}
self->waitForVersionTime += now() - startTime;
@ -231,6 +245,9 @@ ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
return Void();
}
if(!self->foundEpochEnd) {
// Similar to proxy that does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions oustanding;
// Log router does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions outstanding because
// remote SS cannot roll back to more than MAX_READ_TRANSACTION_LIFE_VERSIONS ago.
wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
} else {
while(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < ver) {
@ -250,6 +267,7 @@ ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
return Void();
}
// Log router pull data from satellite tLog
ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
@ -581,6 +599,7 @@ ACTOR Future<Void> logRouterCore(
addActor.send( logRouterPeekMessages( &logRouterData, req ) );
}
when( TLogPopRequest req = waitNext( interf.popMessages.getFuture() ) ) {
// Request from remote tLog to pop data from LR
addActor.send( logRouterPop( &logRouterData, req ) );
}
when (wait(error)) {}

View File

@ -87,6 +87,14 @@ struct ProxyStats {
Counter conflictRanges;
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
Version lastCommitVersionAssigned;
double transactionRateAllowed, batchTransactionRateAllowed;
double transactionLimit, batchTransactionLimit;
// how much of the GRV requests queue was processed in one attempt to hand out read version.
double percentageOfDefaultGRVQueueProcessed;
double percentageOfBatchGRVQueueProcessed;
LatencySample defaultTxnGRVTimeInQueue;
LatencySample batchTxnGRVTimeInQueue;
LatencySample commitLatencySample;
LatencySample grvLatencySample;
@ -136,36 +144,53 @@ struct ProxyStats {
return r;
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
maxComputeNS(0), minComputeNS(1e12),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), maxComputeNS(0), minComputeNS(1e12),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc),
txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
transactionRateAllowed(0), batchTransactionRateAllowed(0), transactionLimit(0), batchTransactionLimit(0),
percentageOfDefaultGRVQueueProcessed(0), percentageOfBatchGRVQueueProcessed(0) {
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
specialCounter(cc, "Version", [pVersion](){return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
specialCounter(cc, "MaxCompute", [this](){ return this->getAndResetMaxCompute(); });
specialCounter(cc, "MinCompute", [this](){ return this->getAndResetMinCompute(); });
// The rate at which the limit(budget) is allowed to grow.
specialCounter(cc, "SystemAndDefaultTxnRateAllowed", [this]() { return this->transactionRateAllowed; });
specialCounter(cc, "BatchTransactionRateAllowed", [this]() { return this->batchTransactionRateAllowed; });
specialCounter(cc, "SystemAndDefaultTxnLimit", [this]() { return this->transactionLimit; });
specialCounter(cc, "BatchTransactionLimit", [this]() { return this->batchTransactionLimit; });
specialCounter(cc, "PercentageOfDefaultGRVQueueProcessed",
[this]() { return this->percentageOfDefaultGRVQueueProcessed; });
specialCounter(cc, "PercentageOfBatchGRVQueueProcessed",
[this]() { return this->percentageOfBatchGRVQueueProcessed; });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
@ -246,10 +271,12 @@ struct TransactionRateInfo {
}
};
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, TransactionRateInfo *transactionRateInfo,
TransactionRateInfo *batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter, PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount,
int64_t* inBatchTransactionCount, TransactionRateInfo* transactionRateInfo,
TransactionRateInfo* batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags, ProxyStats* stats) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -290,6 +317,15 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
transactionRateInfo->setRate(rep.transactionRate);
batchTransactionRateInfo->setRate(rep.batchTransactionRate);
//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
stats->transactionRateAllowed = rep.transactionRate;
stats->batchTransactionRateAllowed = rep.batchTransactionRate;
// TraceEvent("MasterProxyTxRate", myID)
// .detail("RKID", db->get().ratekeeper.get().id())
// .detail("RateAllowed", rep.transactionRate)
// .detail("BatchRateAllowed", rep.batchTransactionRate)
// .detail("Lease", rep.leaseDuration)
// .detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
@ -1556,13 +1592,10 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
return Void();
}
ACTOR static Future<Void> transactionStarter(
MasterProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db,
PromiseStream<Future<Void>> addActor,
ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply)
{
ACTOR static Future<Void> transactionStarter(MasterProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db,
PromiseStream<Future<Void>> addActor, ProxyCommitData* commitData,
GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply, ProxyStats* stats) {
state double lastGRVTime = 0;
state PromiseStream<Void> GRVTimer;
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
@ -1581,8 +1614,9 @@ ACTOR static Future<Void> transactionStarter(
state PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo,
healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags,
stats));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo,
&transactionTagCounter));
@ -1611,6 +1645,9 @@ ACTOR static Future<Void> transactionStarter(
normalRateInfo.reset();
batchRateInfo.reset();
stats->transactionLimit = normalRateInfo.limit;
stats->batchTransactionLimit = batchRateInfo.limit;
int transactionsStarted[2] = {0,0};
int systemTransactionsStarted[2] = {0,0};
int defaultPriTransactionsStarted[2] = { 0, 0 };
@ -1621,6 +1658,8 @@ ACTOR static Future<Void> transactionStarter(
int requestsToStart = 0;
uint32_t defaultQueueSize = defaultQueue.size();
uint32_t batchQueueSize = batchQueue.size();
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
Deque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
@ -1649,12 +1688,16 @@ ACTOR static Future<Void> transactionStarter(
}
transactionsStarted[req.flags&1] += tc;
if (req.priority >= TransactionPriority::IMMEDIATE)
double currentTime = g_network->timer();
if (req.priority >= TransactionPriority::IMMEDIATE) {
systemTransactionsStarted[req.flags & 1] += tc;
else if (req.priority >= TransactionPriority::DEFAULT)
} else if (req.priority >= TransactionPriority::DEFAULT) {
defaultPriTransactionsStarted[req.flags & 1] += tc;
else
stats->defaultTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
} else {
batchPriTransactionsStarted[req.flags & 1] += tc;
stats->batchTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
}
start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
transactionQueue->pop_front();
@ -1691,6 +1734,8 @@ ACTOR static Future<Void> transactionStarter(
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
}
int defaultGRVProcessed = 0;
int batchGRVProcessed = 0;
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
@ -1701,8 +1746,12 @@ ACTOR static Future<Void> transactionStarter(
if (i == 0) {
addActor.send(timeReply(readVersionReply, replyTimes));
}
defaultGRVProcessed += defaultPriTransactionsStarted[i];
batchGRVProcessed += batchPriTransactionsStarted[i];
}
}
stats->percentageOfDefaultGRVQueueProcessed = (double)defaultGRVProcessed / defaultQueueSize;
stats->percentageOfBatchGRVQueueProcessed = (double)batchGRVProcessed / batchQueueSize;
}
}
@ -2113,7 +2162,8 @@ ACTOR Future<Void> masterProxyServerCore(
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply,
&detailedHealthMetricsReply, &commitData.stats));
addActor.send(readRequestServer(proxy, addActor, &commitData));
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));

View File

@ -27,6 +27,11 @@
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const StringRef STORAGESERVER_HISTOGRAM_GROUP = LiteralStringRef("StorageServer");
const StringRef FETCH_KEYS_LATENCY_HISTOGRAM = LiteralStringRef("FetchKeysLatency");
const StringRef FETCH_KEYS_BYTES_HISTOGRAM = LiteralStringRef("FetchKeysSize");
const StringRef FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM = LiteralStringRef("FetchKeysBandwidth");
struct StorageMetricSample {
IndexedSet<Key, int64_t> sample;
int64_t metricUnitsPerSample;

View File

@ -41,6 +41,7 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "flow/Histogram.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using std::pair;
@ -332,6 +333,7 @@ struct TLogData : NonCopyable {
FlowLock concurrentLogRouterReads;
FlowLock persistentDataCommitLock;
// Beginning of fields used by snapshot based backup and restore
bool ignorePopRequest; // ignore pop request from storage servers
double ignorePopDeadline; // time until which the ignorePopRequest will be
// honored
@ -343,19 +345,26 @@ struct TLogData : NonCopyable {
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
// End of fields used by snapshot based backup and restore
std::vector<TagsAndMessage> tempTagMessages;
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
{
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
}
Reference<Histogram> commitLatencyDist;
TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<bool>> degraded, std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue),
persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0),
queueCommitEnd(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0),
targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopRequest(false),
ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped(),
commitLatencyDist(Histogram::getHistogram(LiteralStringRef("tLog"), LiteralStringRef("commit"),
Histogram::Unit::microseconds)) {
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
}
};
struct LogData : NonCopyable, public ReferenceCounted<LogData> {
@ -441,13 +450,19 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool stopped, initialized;
DBRecoveryCount recoveryCount;
// If persistentDataVersion != persistentDurableDataVersion,
// then spilling is happening from persistentDurableDataVersion to persistentDataVersion.
// Data less than persistentDataDurableVersion is spilled on disk (or fully popped from the TLog);
VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // The last version number in the portion of the log (written|durable) to persistentData
NotifiedVersion version, queueCommittedVersion;
NotifiedVersion version;
NotifiedVersion queueCommittedVersion; // The disk queue has committed up until the queueCommittedVersion version.
Version queueCommittingVersion;
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
Version queuePoppedVersion;
Version knownCommittedVersion; // The maximum version that a proxy has told us that is committed (all TLogs have
// ack'd a commit for this version).
Version durableKnownCommittedVersion, minKnownCommittedVersion;
Version queuePoppedVersion; // The disk queue has been popped up until the location which represents this version.
Version minPoppedTagVersion;
Tag minPoppedTag;
Tag minPoppedTag; // The tag that makes tLog hold its data and cause tLog's disk queue increasing.
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
@ -490,7 +505,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Version unrecoveredBefore, recoveredAt;
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
std::map<int, Promise<std::pair<Version, bool>>>
sequence_version; // second: Version is peeked begin version. bool is onlySpilled
double lastUpdate;
Tag tag;
@ -565,12 +581,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id);
specialCounter(cc, "Version", [this](){ return this->version.get(); });
specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); });
specialCounter(cc, "QueueCommittedVersion", [this]() { return this->queueCommittedVersion.get(); });
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
specialCounter(cc, "QueuePoppedVersion", [this](){ return this->queuePoppedVersion; });
specialCounter(cc, "MinPoppedTagVersion", [this](){ return this->minPoppedTagVersion; });
specialCounter(cc, "MinPoppedTagVersion", [this]() { return this->minPoppedTagVersion; });
// The locality and id of the tag that is responsible for making the TLog hold onto its oldest piece of data.
// If disk queues are growing and no one is sure why, then you shall look at this to find the tag responsible
// for why the TLog thinks it can't throw away data.
specialCounter(cc, "MinPoppedTagLocality", [this](){ return this->minPoppedTag.locality; });
specialCounter(cc, "MinPoppedTagId", [this](){ return this->minPoppedTag.id; });
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
@ -792,6 +811,9 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
return Void();
}
// It runs against the oldest TLog instance, calculates the first location in the disk queue that contains un-popped
// data, and then issues a pop to the disk queue at that location so that anything earlier can be
// removed/forgotten/overwritten. In effect, it applies the effect of TLogPop RPCs to disk.
ACTOR Future<Void> popDiskQueue( TLogData* self, Reference<LogData> logData ) {
if (!logData->initialized) return Void();
@ -1084,9 +1106,11 @@ ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogDat
return Void();
}
// This function (and updatePersistentData, which is called by this function) run at a low priority and can soak up all CPU resources.
// For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce latencies for more important
// work (e.g. commits).
// This function (and updatePersistentData, which is called by this function) run at a low priority and can soak up all
// CPU resources. For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce
// latencies for more important work (e.g. commits).
// This actor is just a loop that calls updatePersistentData and popDiskQueue whenever
// (a) there's data to be spilled or (b) we should update metadata after some commits have been fully popped.
ACTOR Future<Void> updateStorage( TLogData* self ) {
while(self->spillOrder.size() && !self->id_data.count(self->spillOrder.front())) {
self->spillOrder.pop_front();
@ -1868,7 +1892,11 @@ ACTOR Future<Void> tLogCommit(
return Void();
}
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
state double beforeCommitT = now();
// Not a duplicate (check relies on critical section between here self->version.set() below!)
state bool isNotDuplicate = (logData->version.get() == req.prevVersion);
if (isNotDuplicate) {
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
@ -1906,6 +1934,10 @@ ACTOR Future<Void> tLogCommit(
return Void();
}
if (isNotDuplicate) {
self->commitLatencyDist->sampleSeconds(now() - beforeCommitT);
}
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
@ -2266,6 +2298,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
}
}
// remote tLog pull data from log routers
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;

View File

@ -166,7 +166,7 @@ OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData)
struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogSystem> {
const UID dbgid;
LogSystemType logSystemType;
std::vector<Reference<LogSet>> tLogs; // LogSets in different locations: primary, remote or satellite
std::vector<Reference<LogSet>> tLogs; // LogSets in different locations: primary, satellite, or remote
int expectedLogSets;
int logRouterTags;
int txsTags;
@ -195,7 +195,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Version knownCommittedVersion;
Version backupStartVersion = invalidVersion; // max(tLogs[0].startVersion, previous epochEnd).
LocalityData locality;
std::map< std::pair<UID, Tag>, std::pair<Version, Version> > outstandingPops; // For each currently running popFromLog actor, (log server #, tag)->popped version
// For each currently running popFromLog actor, outstandingPops is
// (logID, tag)->(max popped version, durableKnownCommittedVersion).
// Why do we need durableKnownCommittedVersion? knownCommittedVersion gives the lower bound of what data
// will need to be copied into the next generation to restore the replication factor.
// Guess: It probably serves as a minimum version of what data should be on a TLog in the next generation and
// sending a pop for anything less than durableKnownCommittedVersion for the TLog will be absurd.
std::map<std::pair<UID, Tag>, std::pair<Version, Version>> outstandingPops;
Optional<PromiseStream<Future<Void>>> addActor;
ActorCollection popActors;
std::vector<OldLogData> oldLogData; // each element has the log info. in one old epoch.
@ -270,6 +277,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Version& localityVersion = pseudoLocalityPopVersion[tag];
localityVersion = std::max(localityVersion, upTo);
Version minVersion = localityVersion;
// Why do we need to use the minimum popped version among all tags? Reason: for example,
// 2 pseudo tags pop 100 or 150, respectively. It's only safe to pop min(100, 150),
// because [101,150) is needed by another pseudo tag.
for (const int8_t locality : pseudoLocalities) {
minVersion = std::min(minVersion, pseudoLocalityPopVersion[Tag(locality, tag.id)]);
}
@ -1109,6 +1119,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
// pop 'tag.locality' type data up to the 'upTo' version
void pop(Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality) final {
if (upTo <= 0) return;
if (tag.locality == tagLocalityRemoteLog) {
@ -1134,6 +1145,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
// pop tag from log up to the version defined in self->outstandingPops[].first
ACTOR static Future<Void> popFromLog(TagPartitionedLogSystem* self,
Reference<AsyncVar<OptionalInterface<TLogInterface>>> log, Tag tag,
double time) {
@ -1141,6 +1153,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
loop {
wait( delay(time, TaskPriority::TLogPop) );
// to: first is upto version, second is durableKnownComittedVersion
state std::pair<Version,Version> to = self->outstandingPops[ std::make_pair(log->get().id(),tag) ];
if (to.first <= last) {

View File

@ -366,6 +366,14 @@ void failAfter( Future<Void> trigger, Endpoint e ) {
failAfter( trigger, g_simulator.getProcess( e ) );
}
ACTOR Future<Void> histogramReport() {
loop {
wait(delay(SERVER_KNOBS->HISTOGRAM_REPORT_INTERVAL));
GetHistogramRegistry().logReport();
}
}
void testSerializationSpeed() {
double tstart;
double build = 0, serialize = 0, deserialize = 0, copy = 0, deallocate = 0;
@ -1728,6 +1736,8 @@ int main(int argc, char* argv[]) {
if (role == Simulation) {
TraceEvent("Simulation").detail("TestFile", opts.testFile);
auto histogramReportActor = histogramReport();
clientKnobs->trace();
flowKnobs->trace();
serverKnobs->trace();
@ -1885,6 +1895,7 @@ int main(int argc, char* argv[]) {
actors.push_back(fdbd(opts.connectionFile, opts.localities, opts.processClass, dataFolder, dataFolder,
opts.storageMemLimit, opts.metricsConnFile, opts.metricsPrefix, opts.rsssize,
opts.whitelistBinPaths));
actors.push_back(histogramReport());
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement
f = stopAfter(waitForAll(actors));

View File

@ -19,11 +19,16 @@
*/
#include <cinttypes>
#include <functional>
#include <type_traits>
#include <unordered_map>
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "flow/IndexedSet.h"
#include "flow/Hash3.h"
#include "flow/ActorCollection.h"
#include "flow/Hash3.h"
#include "flow/Histogram.h"
#include "flow/IndexedSet.h"
#include "flow/SystemMonitor.h"
#include "flow/Util.h"
#include "fdbclient/Atomic.h"
@ -52,11 +57,8 @@
#include "fdbrpc/Smoother.h"
#include "fdbrpc/Stats.h"
#include "flow/TDMetric.actor.h"
#include <type_traits>
#include "flow/actorcompiler.h" // This must be the last #include.
using std::pair;
using std::make_pair;
#include "flow/actorcompiler.h" // This must be the last #include.
#ifndef __INTEL_COMPILER
#pragma region Data Structures
@ -224,13 +226,13 @@ struct UpdateEagerReadInfo {
void finishKeyBegin() {
std::sort(keyBegin.begin(), keyBegin.end());
keyBegin.resize( std::unique(keyBegin.begin(), keyBegin.end()) - keyBegin.begin() );
std::sort(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return (lhs.first < rhs.first) || (lhs.first == rhs.first && lhs.second > rhs.second); } );
keys.resize(std::unique(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first == rhs.first; } ) - keys.begin());
std::sort(keys.begin(), keys.end(), [](const std::pair<KeyRef, int>& lhs, const std::pair<KeyRef, int>& rhs) { return (lhs.first < rhs.first) || (lhs.first == rhs.first && lhs.second > rhs.second); } );
keys.resize(std::unique(keys.begin(), keys.end(), [](const std::pair<KeyRef, int>& lhs, const std::pair<KeyRef, int>& rhs) { return lhs.first == rhs.first; } ) - keys.begin());
//value gets populated in doEagerReads
}
Optional<Value>& getValue(KeyRef key) {
int i = std::lower_bound(keys.begin(), keys.end(), pair<KeyRef, int>(key, 0), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first < rhs.first; } ) - keys.begin();
int i = std::lower_bound(keys.begin(), keys.end(),std::pair<KeyRef, int>(key, 0), [](const std::pair<KeyRef, int>& lhs, const std::pair<KeyRef, int>& rhs) { return lhs.first < rhs.first; } ) - keys.begin();
ASSERT( i < keys.size() && keys[i].first == key );
return value[i];
}
@ -284,9 +286,63 @@ private:
std::map<Version, Standalone<VerUpdateRef>> mutationLog; // versions (durableVersion, version]
public:
public:
// Histograms
struct FetchKeysHistograms {
const Reference<Histogram> latency;
const Reference<Histogram> bytes;
const Reference<Histogram> bandwidth;
FetchKeysHistograms()
: latency(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
bytes(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_HISTOGRAM,
Histogram::Unit::bytes)),
bandwidth(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM,
Histogram::Unit::bytes_per_second)) {}
} fetchKeysHistograms;
class CurrentRunningFetchKeys {
std::unordered_map<UID, double> startTimeMap;
std::unordered_map<UID, KeyRangeRef> keyRangeMap;
static const StringRef emptyString;
static const KeyRangeRef emptyKeyRange;
public:
void recordStart(const UID id, const KeyRange keyRange) {
startTimeMap[id] = now();
keyRangeMap[id] = keyRange;
}
void recordFinish(const UID id) {
startTimeMap.erase(id);
keyRangeMap.erase(id);
}
std::pair<double, KeyRangeRef> longestTime() const {
if (numRunning() == 0) {
return {-1, emptyKeyRange};
}
const double currentTime = now();
double longest = 0;
UID UIDofLongest;
for (const auto kv: startTimeMap) {
const double currentRunningTime = currentTime - kv.second;
if (longest < currentRunningTime) {
longest = currentRunningTime;
UIDofLongest = kv.first;
}
}
return {longest, keyRangeMap.at(UIDofLongest)};
}
int numRunning() const { return startTimeMap.size(); }
} currentRunningFetchKeys;
Tag tag;
vector<pair<Version,Tag>> history;
vector<pair<Version,Tag>> allHistory;
vector<std::pair<Version,Tag>> history;
vector<std::pair<Version,Tag>> allHistory;
Version poppedAllAfter;
std::map<Version, Arena> freeable; // for each version, an Arena that must be held until that version is < oldestVersion
Arena lastArena;
@ -333,8 +389,8 @@ public:
poppedAllAfter = std::numeric_limits<Version>::max();
}
vector<pair<Version,Tag>>* hist = &history;
vector<pair<Version,Tag>> allHistoryCopy;
vector<std::pair<Version,Tag>>* hist = &history;
vector<std::pair<Version,Tag>> allHistoryCopy;
if(popAllTags) {
allHistoryCopy = allHistory;
hist = &allHistoryCopy;
@ -601,22 +657,18 @@ public:
}
} counters;
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
: instanceID(deterministicRandom()->randomUniqueID().first()),
storage(this, storage), db(db), actors(false),
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
durableInProgress(Void()),
versionLag(0), primaryLocality(tagLocalityInvalid),
updateEagerReads(0),
shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
behind(false), versionBehind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
{
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db,
StorageServerInterface const& ssi)
: fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage),
db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
rebootAfterDurableVersion(std::numeric_limits<Version>::max()), durableInProgress(Void()), versionLag(0),
primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), shuttingDown(false),
debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), logProtocol(0),
counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), behind(false), versionBehind(false),
byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), lastUpdate(now()),
poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0) {
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
@ -729,6 +781,9 @@ public:
}
};
const StringRef StorageServer::CurrentRunningFetchKeys::emptyString = LiteralStringRef("");
const KeyRangeRef StorageServer::CurrentRunningFetchKeys::emptyKeyRange = KeyRangeRef(StorageServer::CurrentRunningFetchKeys::emptyString, StorageServer::CurrentRunningFetchKeys::emptyString);
// If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end,
// and H(key) < |key+value|/bytesPerSample,
// let sampledSize = max(|key+value|,bytesPerSample)
@ -1791,7 +1846,7 @@ bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion )
setDataDurableVersion(data->thisServerID, data->durableVersion.get());
if (checkFatalError.isReady()) checkFatalError.get();
//TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion);
// TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion);
validate(data);
return nextDurableVersion == desiredDurableVersion;
@ -2107,16 +2162,56 @@ ACTOR Future<Void> logFetchKeysWarning(AddingShard* shard) {
loop {
state double waitSeconds = BUGGIFY ? 5.0 : 600.0;
wait(delay(waitSeconds));
TraceEvent(waitSeconds > 300.0 ? SevWarnAlways : SevInfo, "FetchKeysTooLong").detail("Duration", now() - startTime).detail("Phase", shard->phase).detail("Begin", shard->keys.begin.printable()).detail("End", shard->keys.end.printable());
const auto traceEventLevel = waitSeconds > SERVER_KNOBS->FETCH_KEYS_TOO_LONG_TIME_CRITERIA ? SevWarnAlways : SevInfo;
TraceEvent(traceEventLevel, "FetchKeysTooLong")
.detail("Duration", now() - startTime)
.detail("Phase", shard->phase)
.detail("Begin", shard->keys.begin.printable())
.detail("End", shard->keys.end.printable());
}
}
class FetchKeysMetricReporter {
const UID uid;
const double startTime;
int fetchedBytes;
StorageServer::FetchKeysHistograms& histograms;
StorageServer::CurrentRunningFetchKeys& currentRunning;
public:
FetchKeysMetricReporter(const UID& uid_, const double startTime_, const KeyRange& keyRange, StorageServer::FetchKeysHistograms& histograms_, StorageServer::CurrentRunningFetchKeys& currentRunning_)
: uid(uid_), startTime(startTime_), fetchedBytes(0), histograms(histograms_), currentRunning(currentRunning_) {
currentRunning.recordStart(uid, keyRange);
}
void addFetchedBytes(const int bytes) { fetchedBytes += bytes; }
~FetchKeysMetricReporter() {
double latency = now() - startTime;
// If fetchKeys is *NOT* run, i.e. returning immediately, still report a record.
if (latency == 0) latency = 1e6;
const uint32_t bandwidth = fetchedBytes / latency;
histograms.latency->sampleSeconds(latency);
histograms.bytes->sample(fetchedBytes);
histograms.bandwidth->sample(bandwidth);
currentRunning.recordFinish(uid);
}
};
ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
state const UID fetchKeysID = deterministicRandom()->randomUniqueID();
state TraceInterval interval("FetchKeys");
state KeyRange keys = shard->keys;
state Future<Void> warningLogger = logFetchKeysWarning(shard);
state double startt = now();
state const double startTime = now();
state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES;
state FetchKeysMetricReporter metricReporter(fetchKeysID, startTime, keys, data->fetchKeysHistograms, data->currentRunningFetchKeys);
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
// This allows adding->start() to be called inline with CSK.
@ -2154,7 +2249,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
state double executeStart = now();
++data->counters.fetchWaitingCount;
data->counters.fetchWaitingMS += 1000*(executeStart - startt);
data->counters.fetchWaitingMS += 1000 * (executeStart - startTime);
// Fetch keys gets called while the update actor is processing mutations. data->version will not be updated until all mutations for a version
// have been processed. We need to take the durableVersionLock to ensure data->version is greater than the version of the mutation which caused
@ -2194,6 +2289,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
debugKeyRange("fetchRange", fetchVersion, keys);
for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
metricReporter.addFetchedBytes(expectedSize);
data->counters.bytesFetched += expectedSize;
if( fetchBlockBytes > expectedSize ) {
holdingFKPL.release( fetchBlockBytes - expectedSize );
@ -2261,8 +2357,9 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front();
//FIXME: remove when we no longer support upgrades from 5.X
if(debug_getRangeRetries >= 100) {
if (debug_getRangeRetries >= 100) {
data->cx->enableLocalityLoadBalance = false;
// TODO: Add SevWarnAlways to say it was disabled.
}
debug_getRangeRetries++;
@ -2379,7 +2476,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
TraceEvent(SevError, "FetchKeysError", data->thisServerID)
.error(e)
.detail("Elapsed", now()-startt)
.detail("Elapsed", now() - startTime)
.detail("KeyBegin", keys.begin)
.detail("KeyEnd",keys.end);
if (e.code() != error_code_actor_cancelled)
@ -3226,7 +3323,9 @@ bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion
void StorageServerDisk::makeVersionDurable( Version version ) {
storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())) );
//TraceEvent("MakeDurable", data->thisServerID).detail("FromVersion", prevStorageVersion).detail("ToVersion", version);
// TraceEvent("MakeDurable", data->thisServerID)
// .detail("FromVersion", prevStorageVersion)
// .detail("ToVersion", version);
}
void StorageServerDisk::changeLogProtocol(Version version, ProtocolVersion protocol) {
@ -3736,6 +3835,35 @@ ACTOR Future<Void> serveWatchValueRequests( StorageServer* self, FutureStream<Wa
}
}
ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
if (!SERVER_KNOBS->REPORT_DD_METRICS) {
return Void();
}
loop {
wait(delay(SERVER_KNOBS->DD_METRICS_REPORT_INTERVAL));
const auto numRunningFetchKeys = self->currentRunningFetchKeys.numRunning();
if (numRunningFetchKeys == 0) {
continue;
}
const auto longestRunningFetchKeys = self->currentRunningFetchKeys.longestTime();
auto level = SevInfo;
if (longestRunningFetchKeys.first >= SERVER_KNOBS->FETCH_KEYS_TOO_LONG_TIME_CRITERIA) {
level = SevWarnAlways;
}
TraceEvent(level, "FetchKeyCurrentStatus")
.detail("Timestamp", now())
.detail("LongestRunningTime", longestRunningFetchKeys.first)
.detail("StartKey", longestRunningFetchKeys.second.begin.printable())
.detail("EndKey", longestRunningFetchKeys.second.end.printable())
.detail("NumRunning", numRunningFetchKeys);
}
}
ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterface ssi )
{
state Future<Void> doUpdate = Void();
@ -3756,6 +3884,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture()));
self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture()));
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
self->actors.add(reportStorageServerState(self));
self->transactionTagCounter.startNewInterval(self->thisServerID);
self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));

View File

@ -41,6 +41,8 @@ thread_local ISimulator::ProcessInfo* ISimulator::currentProcess = nullptr;
// we have a simulated contex here; we'd just use the current context regardless.
static HistogramRegistry* globalHistograms = nullptr;
#pragma region HistogramRegistry
HistogramRegistry& GetHistogramRegistry() {
ISimulator::ProcessInfo* info = g_simulator.getCurrentProcess();
@ -89,6 +91,16 @@ void HistogramRegistry::logReport() {
}
}
#pragma endregion // HistogramRegistry
#pragma region Histogram
const std::unordered_map<Histogram::Unit, std::string> Histogram::UnitToStringMapper = {
{ Histogram::Unit::microseconds, "microseconds" },
{ Histogram::Unit::bytes, "bytes" },
{ Histogram::Unit::bytes_per_second, "bytes_per_second" }
};
void Histogram::writeToLog() {
bool active = false;
for (uint32_t i = 0; i < 32; i++) {
@ -102,17 +114,19 @@ void Histogram::writeToLog() {
}
TraceEvent e(SevInfo, "Histogram");
e.detail("Group", group).detail("Op", op);
e.detail("Group", group).detail("Op", op).detail("Unit", UnitToStringMapper.at(unit));
for (uint32_t i = 0; i < 32; i++) {
uint32_t value = ((uint32_t)1) << (i + 1);
if (buckets[i]) {
switch (unit) {
case Unit::microseconds: {
uint32_t usec = ((uint32_t)1) << (i + 1);
e.detail(format("LessThan%u.%03u", usec / 1000, usec % 1000), buckets[i]);
case Unit::microseconds:
e.detail(format("LessThan%u.%03u", value / 1000, value % 1000), buckets[i]);
break;
}
case Unit::bytes:
e.detail(format("LessThan%u", ((uint32_t)1) << (i + 1)), buckets[i]);
case Unit::bytes_per_second:
e.detail(format("LessThan%u", value), buckets[i]);
break;
default:
ASSERT(false);
@ -121,6 +135,8 @@ void Histogram::writeToLog() {
}
}
#pragma endregion // Histogram
TEST_CASE("/flow/histogram/smoke_test") {
{
@ -168,4 +184,4 @@ TEST_CASE("/flow/histogram/smoke_test") {
GetHistogramRegistry().logReport();
return Void();
}
}

View File

@ -26,6 +26,7 @@
#include <string>
#include <map>
#include <unordered_map>
#ifdef _WIN32
#include <intrin.h>
@ -57,11 +58,16 @@ HistogramRegistry& GetHistogramRegistry();
*/
class Histogram sealed : public ReferenceCounted<Histogram> {
public:
enum class Unit { microseconds, bytes };
enum class Unit { microseconds, bytes, bytes_per_second };
private:
static const std::unordered_map<Unit, std::string> UnitToStringMapper;
Histogram(std::string group, std::string op, Unit unit, HistogramRegistry& registry)
: group(group), op(op), unit(unit), registry(registry), ReferenceCounted<Histogram>() {
ASSERT(UnitToStringMapper.find(unit) != UnitToStringMapper.end());
clear();
}

View File

@ -173,6 +173,7 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( TRACE_RETRY_OPEN_INTERVAL, 1.00 );
init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h
init( MAX_TRACE_SUPPRESSIONS, 1e4 );
init( TRACE_DATETIME_ENABLED, true ); // trace time in human readable format (always real time)
init( TRACE_SYNC_ENABLED, 0 );
init( TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, 500 );
init( TRACE_EVENT_THROTTLER_SAMPLE_EXPIRY, 1800.0 ); // 30 mins

View File

@ -192,6 +192,7 @@ public:
double TRACE_RETRY_OPEN_INTERVAL;
int MIN_TRACE_SEVERITY;
int MAX_TRACE_SUPPRESSIONS;
bool TRACE_DATETIME_ENABLED;
int TRACE_SYNC_ENABLED;
int TRACE_EVENT_METRIC_UNITS_PER_SAMPLE;
int TRACE_EVENT_THROTTLER_SAMPLE_EXPIRY;

View File

@ -60,7 +60,6 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
netData.init();
if (!g_network->isSimulated() && currentStats.initialized) {
{
GetHistogramRegistry().logReport();
TraceEvent(eventName.c_str())
.detail("Elapsed", currentStats.elapsed)
.detail("CPUSeconds", currentStats.processCPUSeconds)

View File

@ -30,7 +30,7 @@
#include <cctype>
#include <time.h>
#include <set>
#include <iomanip>
#include "flow/IThreadPool.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/FastRef.h"
@ -474,6 +474,7 @@ public:
if (roll) {
auto o = new WriterThread::Roll;
double time = 0;
writer->post(o);
std::vector<TraceEventFields> events = latestEventCache.getAllUnsafe();
@ -482,9 +483,15 @@ public:
TraceEventFields rolledFields;
for(auto itr = events[idx].begin(); itr != events[idx].end(); ++itr) {
if(itr->first == "Time") {
rolledFields.addField("Time", format("%.6f", TraceEvent::getCurrentTime()));
time = TraceEvent::getCurrentTime();
rolledFields.addField("Time", format("%.6f", time));
rolledFields.addField("OriginalTime", itr->second);
}
else if (itr->first == "DateTime") {
UNSTOPPABLE_ASSERT(time > 0); // "Time" field should always come first
rolledFields.addField("DateTime", TraceEvent::printRealTime(time));
rolledFields.addField("OriginalDateTime", itr->second);
}
else if(itr->first == "TrackLatestType") {
rolledFields.addField("TrackLatestType", "Rolled");
}
@ -912,6 +919,9 @@ bool TraceEvent::init() {
detail("Severity", int(severity));
detail("Time", "0.000000");
timeIndex = fields.size() - 1;
if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) {
detail("DateTime", "");
}
detail("Type", type);
if(g_network && g_network->isSimulated()) {
@ -1112,7 +1122,7 @@ TraceEvent& TraceEvent::GetLastError() {
// We're cheating in counting, as in practice, we only use {10,20,30,40}.
static_assert(SevMaxUsed / 10 + 1 == 5, "Please bump eventCounts[5] to SevMaxUsed/10+1");
unsigned long TraceEvent::eventCounts[5] = {0,0,0,0,0};
unsigned long TraceEvent::eventCounts[5] = { 0, 0, 0, 0, 0 };
unsigned long TraceEvent::CountEventsLoggedAt(Severity sev) {
return TraceEvent::eventCounts[sev/10];
@ -1130,7 +1140,11 @@ void TraceEvent::log() {
++g_allocation_tracing_disabled;
try {
if (enabled) {
fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime());
double time = TraceEvent::getCurrentTime();
fields.mutate(timeIndex).second = format("%.6f", time);
if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) {
fields.mutate(timeIndex + 1).second = TraceEvent::printRealTime(time);
}
if (this->severity == SevError) {
severity = SevInfo;
@ -1201,6 +1215,31 @@ double TraceEvent::getCurrentTime() {
}
}
// converts the given flow time into a string
// with format: %Y-%m-%dT%H:%M:%S
// This only has second-resolution for the simple reason
// that std::put_time does not support higher resolution.
// This is fine since we always log the flow time as well.
std::string TraceEvent::printRealTime(double time) {
using Clock = std::chrono::system_clock;
time_t ts = Clock::to_time_t(Clock::time_point(
std::chrono::duration_cast<Clock::duration>(std::chrono::duration<double, std::ratio<1>>(time))));
if (g_network && g_network->isSimulated()) {
// The clock is simulated, so return the real time
ts = Clock::to_time_t(Clock::now());
}
std::stringstream ss;
#ifdef _WIN32
// MSVC gmtime is threadsafe
ss << std::put_time(::gmtime(&ts), "%Y-%m-%dT%H:%M:%SZ");
#else
// use threadsafe gmt
struct tm result;
ss << std::put_time(::gmtime_r(&ts, &result), "%Y-%m-%dT%H:%M:%SZ");
#endif
return ss.str();
}
TraceInterval& TraceInterval::begin() {
pairID = nondeterministicRandom()->randomUniqueID();
count = 0;
@ -1278,6 +1317,9 @@ void TraceBatch::dump() {
TraceBatch::EventInfo::EventInfo(double time, const char *name, uint64_t id, const char *location) {
fields.addField("Severity", format("%d", (int)SevInfo));
fields.addField("Time", format("%.6f", time));
if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) {
fields.addField("DateTime", TraceEvent::printRealTime(time));
}
fields.addField("Type", name);
fields.addField("ID", format("%016" PRIx64, id));
fields.addField("Location", location);
@ -1286,6 +1328,9 @@ TraceBatch::EventInfo::EventInfo(double time, const char *name, uint64_t id, con
TraceBatch::AttachInfo::AttachInfo(double time, const char *name, uint64_t id, uint64_t to) {
fields.addField("Severity", format("%d", (int)SevInfo));
fields.addField("Time", format("%.6f", time));
if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) {
fields.addField("DateTime", TraceEvent::printRealTime(time));
}
fields.addField("Type", name);
fields.addField("ID", format("%016" PRIx64, id));
fields.addField("To", format("%016" PRIx64, to));
@ -1294,6 +1339,9 @@ TraceBatch::AttachInfo::AttachInfo(double time, const char *name, uint64_t id, u
TraceBatch::BuggifyInfo::BuggifyInfo(double time, int activated, int line, std::string file) {
fields.addField("Severity", format("%d", (int)SevInfo));
fields.addField("Time", format("%.6f", time));
if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) {
fields.addField("DateTime", TraceEvent::printRealTime(time));
}
fields.addField("Type", "BuggifySection");
fields.addField("Activated", format("%d", activated));
fields.addField("File", std::move(file));

View File

@ -26,6 +26,7 @@
#include <stdarg.h>
#include <stdint.h>
#include <string>
#include <chrono>
#include <map>
#include <set>
#include <type_traits>
@ -394,6 +395,7 @@ struct TraceEvent {
static bool isNetworkThread();
static double getCurrentTime();
static std::string printRealTime(double time);
//Must be called directly after constructing the trace event
TraceEvent& error(const class Error& e, bool includeCancelled=false) {

View File

@ -92,6 +92,7 @@ ERROR( master_resolver_failed, 1210, "Master terminating because a Resolver fail
ERROR( server_overloaded, 1211, "Server is under too much load and cannot respond" )
ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup worker failed")
ERROR( tag_throttled, 1213, "Transaction tag is being throttled" )
ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

7
versions.target Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Version>6.3.10</Version>
<PackageName>6.3</PackageName>
</PropertyGroup>
</Project>