Resolve conflicts

This commit is contained in:
Andrew Noyes 2020-11-24 17:41:36 +00:00
parent 1f541f02be
commit dc2bac5670
19 changed files with 52 additions and 350 deletions

View File

@ -18,11 +18,7 @@
# limitations under the License.
cmake_minimum_required(VERSION 3.13)
project(foundationdb
<<<<<<< HEAD
VERSION 6.3.10
=======
VERSION 6.2.29
>>>>>>> anoyes/merge-6.2-to-6.3
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
HOMEPAGE_URL "http://www.foundationdb.org/"
LANGUAGES C CXX ASM)

View File

@ -10,61 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
<<<<<<< HEAD
* `FoundationDB-6.3.9.pkg <https://www.foundationdb.org/downloads/6.3.9/macOS/installers/FoundationDB-6.3.9.pkg>`_
=======
* `FoundationDB-6.2.28.pkg <https://www.foundationdb.org/downloads/6.2.28/macOS/installers/FoundationDB-6.2.28.pkg>`_
>>>>>>> anoyes/merge-6.2-to-6.3
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
<<<<<<< HEAD
* `foundationdb-clients-6.3.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.9/ubuntu/installers/foundationdb-clients_6.3.9-1_amd64.deb>`_
* `foundationdb-server-6.3.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.9/ubuntu/installers/foundationdb-server_6.3.9-1_amd64.deb>`_ (depends on the clients package)
=======
* `foundationdb-clients-6.2.28-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.28/ubuntu/installers/foundationdb-clients_6.2.28-1_amd64.deb>`_
* `foundationdb-server-6.2.28-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.28/ubuntu/installers/foundationdb-server_6.2.28-1_amd64.deb>`_ (depends on the clients package)
>>>>>>> anoyes/merge-6.2-to-6.3
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
<<<<<<< HEAD
* `foundationdb-clients-6.3.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel6/installers/foundationdb-clients-6.3.9-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel6/installers/foundationdb-server-6.3.9-1.el6.x86_64.rpm>`_ (depends on the clients package)
=======
* `foundationdb-clients-6.2.28-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.28/rhel6/installers/foundationdb-clients-6.2.28-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.28-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.28/rhel6/installers/foundationdb-server-6.2.28-1.el6.x86_64.rpm>`_ (depends on the clients package)
>>>>>>> anoyes/merge-6.2-to-6.3
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
<<<<<<< HEAD
* `foundationdb-clients-6.3.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel7/installers/foundationdb-clients-6.3.9-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel7/installers/foundationdb-server-6.3.9-1.el7.x86_64.rpm>`_ (depends on the clients package)
=======
* `foundationdb-clients-6.2.28-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.28/rhel7/installers/foundationdb-clients-6.2.28-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.28-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.28/rhel7/installers/foundationdb-server-6.2.28-1.el7.x86_64.rpm>`_ (depends on the clients package)
>>>>>>> anoyes/merge-6.2-to-6.3
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
<<<<<<< HEAD
* `foundationdb-6.3.9-x64.msi <https://www.foundationdb.org/downloads/6.3.9/windows/installers/foundationdb-6.3.9-x64.msi>`_
=======
* `foundationdb-6.2.28-x64.msi <https://www.foundationdb.org/downloads/6.2.28/windows/installers/foundationdb-6.2.28-x64.msi>`_
>>>>>>> anoyes/merge-6.2-to-6.3
API Language Bindings
=====================
@ -81,31 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
<<<<<<< HEAD
* `foundationdb-6.3.9.tar.gz <https://www.foundationdb.org/downloads/6.3.9/bindings/python/foundationdb-6.3.9.tar.gz>`_
=======
* `foundationdb-6.2.28.tar.gz <https://www.foundationdb.org/downloads/6.2.28/bindings/python/foundationdb-6.2.28.tar.gz>`_
>>>>>>> anoyes/merge-6.2-to-6.3
Ruby 1.9.3/2.0.0+
-----------------
<<<<<<< HEAD
* `fdb-6.3.9.gem <https://www.foundationdb.org/downloads/6.3.9/bindings/ruby/fdb-6.3.9.gem>`_
=======
* `fdb-6.2.28.gem <https://www.foundationdb.org/downloads/6.2.28/bindings/ruby/fdb-6.2.28.gem>`_
>>>>>>> anoyes/merge-6.2-to-6.3
Java 8+
-------
<<<<<<< HEAD
* `fdb-java-6.3.9.jar <https://www.foundationdb.org/downloads/6.3.9/bindings/java/fdb-java-6.3.9.jar>`_
* `fdb-java-6.3.9-javadoc.jar <https://www.foundationdb.org/downloads/6.3.9/bindings/java/fdb-java-6.3.9-javadoc.jar>`_
=======
* `fdb-java-6.2.28.jar <https://www.foundationdb.org/downloads/6.2.28/bindings/java/fdb-java-6.2.28.jar>`_
* `fdb-java-6.2.28-javadoc.jar <https://www.foundationdb.org/downloads/6.2.28/bindings/java/fdb-java-6.2.28-javadoc.jar>`_
>>>>>>> anoyes/merge-6.2-to-6.3
Go 1.11+
--------

View File

@ -42,11 +42,7 @@ enum {
tagLocalityRemoteLog = -3, // tag created by log router for remote tLogs
tagLocalityUpgraded = -4,
tagLocalitySatellite = -5,
<<<<<<< HEAD
tagLocalityLogRouterMapped = -6, // used by log router to pop from TLogs
=======
tagLocalityLogRouterMapped = -6, // The pseudo tag used by log routers to pop the real LogRouter tag (i.e., -2)
>>>>>>> anoyes/merge-6.2-to-6.3
tagLocalityTxs = -7,
tagLocalityBackup = -8, // used by backup role to pop from TLogs
tagLocalityInvalid = -99

View File

@ -4777,12 +4777,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
state DatabaseConfiguration configuration;
state Reference<InitialDataDistribution> initData;
state MoveKeysLock lock;
<<<<<<< HEAD
state Reference<DDTeamCollection> primaryTeamCollection;
state Reference<DDTeamCollection> remoteTeamCollection;
=======
state bool trackerCancelled;
>>>>>>> anoyes/merge-6.2-to-6.3
loop {
trackerCancelled = false;
try {
@ -4947,21 +4944,14 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back(reportErrorsExcept(
dataDistributionTracker(initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics,
<<<<<<< HEAD
getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart,
anyZeroHealthyTeams, self->ddId, &shards),
anyZeroHealthyTeams, self->ddId, &shards, &trackerCancelled),
"DDTracker", self->ddId, &normalDDQueueErrors()));
actors.push_back(reportErrorsExcept(
dataDistributionQueue(cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis,
shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId,
storageTeamSize, configuration.storageTeamSize, &lastLimited),
"DDQueue", self->ddId, &normalDDQueueErrors()));
=======
getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId,
&shards, &trackerCancelled),
"DDTracker", self->ddId, &normalDDQueueErrors()));
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, configuration.storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
>>>>>>> anoyes/merge-6.2-to-6.3
vector<DDTeamCollection*> teamCollectionsPtrs;
primaryTeamCollection = Reference<DDTeamCollection>(new DDTeamCollection(
@ -4990,14 +4980,11 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
}
catch( Error &e ) {
state Error err = e;
<<<<<<< HEAD
TraceEvent("DataDistributorDestroyTeamCollections").error(e);
self->teamCollection = nullptr;
primaryTeamCollection = Reference<DDTeamCollection>();
remoteTeamCollection = Reference<DDTeamCollection>();
=======
trackerCancelled = true;
>>>>>>> anoyes/merge-6.2-to-6.3
wait(shards.clearAsync());
if (err.code() != error_code_movekeys_conflict) throw err;
bool ddEnabled = wait( isDataDistributionEnabled(cx) );

View File

@ -229,41 +229,13 @@ struct ShardTrackedData {
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
<<<<<<< HEAD
Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData,
Database const& cx,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
PromiseStream<GetMetricsListRequest> const& getShardMetricsList,
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart,
Reference<AsyncVar<bool>> const& zeroHealthyTeams,
UID const& distributorId,
KeyRangeMap<ShardTrackedData>* const& shards);
Future<Void> dataDistributionQueue(
Database const& cx,
PromiseStream<RelocateShard> const& output,
FutureStream<RelocateShard> const& input,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
Reference<AsyncVar<bool>> const& processingUnhealthy,
vector<TeamCollectionInterface> const& teamCollection,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
MoveKeysLock const& lock,
PromiseStream<Promise<int64_t>> const& getAverageShardBytes,
UID const& distributorId,
int const& teamSize,
int const& singleRegionTeamSize,
double* const& lastLimited);
=======
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId, KeyRangeMap<ShardTrackedData>* shards,
bool const* trackerCancelled);
@ -273,7 +245,6 @@ ACTOR Future<Void> dataDistributionQueue(
vector<TeamCollectionInterface> teamCollection, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock, PromiseStream<Promise<int64_t>> getAverageShardBytes, UID distributorId, int teamSize,
int singleRegionTeamSize, double* lastLimited);
>>>>>>> anoyes/merge-6.2-to-6.3
//Holds the permitted size and IO Bounds for a shard
struct ShardSizeBounds {

View File

@ -83,14 +83,8 @@ struct RelocateData {
};
class ParallelTCInfo : public ReferenceCounted<ParallelTCInfo>, public IDataDistributionTeam {
<<<<<<< HEAD
vector<Reference<IDataDistributionTeam>> teams;
vector<UID> tempServerIDs;
=======
public:
std::vector<Reference<IDataDistributionTeam>> teams;
std::vector<UID> tempServerIDs;
>>>>>>> anoyes/merge-6.2-to-6.3
int64_t sum(std::function<int64_t(IDataDistributionTeam const&)> func) const {
int64_t result = 0;
@ -100,21 +94,12 @@ public:
return result;
}
<<<<<<< HEAD
template <class T>
vector<T> collect(std::function<vector<T>(IDataDistributionTeam const&)> func) const {
vector<T> result;
for (const auto& team : teams) {
vector<T> newItems = func(*team);
=======
template<class T>
std::vector<T> collect(std::function<std::vector<T>(Reference<IDataDistributionTeam>)> func) {
std::vector<T> collect(std::function<vector<T>(IDataDistributionTeam const&)> func) const {
std::vector<T> result;
for (auto it = teams.begin(); it != teams.end(); it++) {
std::vector<T> newItems = func(*it);
>>>>>>> anoyes/merge-6.2-to-6.3
for (const auto& team : teams) {
std::vector<T> newItems = func(*team);
result.insert(result.end(), newItems.begin(), newItems.end());
}
return result;
@ -140,16 +125,9 @@ public:
return !any([func](IDataDistributionTeam const& team) { return !func(team); });
}
<<<<<<< HEAD
vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
std::vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
return collect<StorageServerInterface>(
[](IDataDistributionTeam const& team) { return team.getLastKnownServerInterfaces(); });
=======
virtual std::vector<StorageServerInterface> getLastKnownServerInterfaces() {
return collect<StorageServerInterface>([](Reference<IDataDistributionTeam> team) {
return team->getLastKnownServerInterfaces();
});
>>>>>>> anoyes/merge-6.2-to-6.3
}
int size() const override {
@ -160,18 +138,11 @@ public:
return totalSize;
}
<<<<<<< HEAD
vector<UID> const& getServerIDs() const override {
std::vector<UID> const& getServerIDs() const override {
static vector<UID> tempServerIDs;
tempServerIDs.clear();
for (const auto& team : teams) {
vector<UID> const &childIDs = team->getServerIDs();
=======
virtual std::vector<UID> const& getServerIDs() {
tempServerIDs.clear();
for (auto it = teams.begin(); it != teams.end(); it++) {
std::vector<UID> const& childIDs = (*it)->getServerIDs();
>>>>>>> anoyes/merge-6.2-to-6.3
std::vector<UID> const& childIDs = team->getServerIDs();
tempServerIDs.insert(tempServerIDs.end(), childIDs.begin(), childIDs.end());
}
return tempServerIDs;
@ -581,13 +552,8 @@ struct DDQueueData {
if(keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
for( int shard = 0; shard < keyServersEntries.size(); shard++ ) {
<<<<<<< HEAD
vector<UID> src, dest;
decodeKeyServersValue( UIDtoTagMap, keyServersEntries[shard].value, src, dest );
=======
std::vector<UID> src, dest;
decodeKeyServersValue( keyServersEntries[shard].value, src, dest );
>>>>>>> anoyes/merge-6.2-to-6.3
decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
ASSERT( src.size() );
for( int i = 0; i < src.size(); i++ ) {
servers.insert( src[i] );

View File

@ -88,10 +88,9 @@ struct DataDistributionTracker {
Promise<Void> readyToStart;
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
<<<<<<< HEAD
// Read hot detection
PromiseStream<KeyRange> readHotShard;
=======
// The reference to trackerCancelled must be extracted by actors,
// because by the time (trackerCancelled == true) this memory cannot
// be accessed
@ -119,7 +118,6 @@ struct DataDistributionTracker {
return &tracker;
}
};
>>>>>>> anoyes/merge-6.2-to-6.3
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart,
PromiseStream<RelocateShard> const& output,
@ -181,21 +179,13 @@ int64_t getMaxShardSize( double dbSizeEstimate ) {
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
}
<<<<<<< HEAD
ACTOR Future<Void> trackShardMetrics(DataDistributionTracker* self, KeyRange keys,
ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self, KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics) {
state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus( shardMetrics->get().get().metrics ) : BandwidthStatusNormal;
state double lastLowBandwidthStartTime = shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now();
state int shardCount = shardMetrics->get().present() ? shardMetrics->get().get().shardCount : 1;
state ReadBandwidthStatus readBandwidthStatus = shardMetrics->get().present() ? getReadBandwidthStatus(shardMetrics->get().get().metrics) : ReadBandwidthStatusNormal;
=======
ACTOR Future<Void> trackShardBytes(DataDistributionTracker::SafeAccessor self, KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize) {
state BandwidthStatus bandwidthStatus = shardSize->get().present() ? getBandwidthStatus( shardSize->get().get().metrics ) : BandwidthStatusNormal;
state double lastLowBandwidthStartTime = shardSize->get().present() ? shardSize->get().get().lastLowBandwidthStartTime : now();
state int shardCount = shardSize->get().present() ? shardSize->get().get().shardCount : 1;
>>>>>>> anoyes/merge-6.2-to-6.3
wait( delay( 0, TaskPriority::DataDistribution ) );
/*TraceEvent("TrackShardMetricsStarting")
@ -248,7 +238,7 @@ ACTOR Future<Void> trackShardBytes(DataDistributionTracker::SafeAccessor self, K
// TraceEvent("RHDTriggerReadHotLoggingForShard")
// .detail("ShardBegin", keys.begin.printable().c_str())
// .detail("ShardEnd", keys.end.printable().c_str());
self->readHotShard.send(keys);
self()->readHotShard.send(keys);
} else {
ASSERT(false);
}
@ -292,19 +282,12 @@ ACTOR Future<Void> trackShardBytes(DataDistributionTracker::SafeAccessor self, K
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
.detail("TrackerID", trackerID);*/
<<<<<<< HEAD
if( shardMetrics->get().present() ) {
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes );
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate += metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes;
=======
if( shardSize->get().present() ) {
self()->dbSizeEstimate->set(self()->dbSizeEstimate->get() + metrics.first.get().bytes -
shardSize->get().get().metrics.bytes);
shardMetrics->get().get().metrics.bytes);
if(keys.begin >= systemKeys.begin) {
self()->systemSizeEstimate +=
metrics.first.get().bytes - shardSize->get().get().metrics.bytes;
>>>>>>> anoyes/merge-6.2-to-6.3
metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes;
}
}
@ -792,15 +775,9 @@ void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optio
}
ShardTrackedData data;
<<<<<<< HEAD
data.stats = shardMetrics;
data.trackShard = shardTracker(self, ranges[i], shardMetrics);
data.trackBytes = trackShardMetrics(self, ranges[i], shardMetrics);
=======
data.stats = shardSize;
data.trackShard = shardTracker(DataDistributionTracker::SafeAccessor(self), ranges[i], shardSize);
data.trackBytes = trackShardBytes(DataDistributionTracker::SafeAccessor(self), ranges[i], shardSize);
>>>>>>> anoyes/merge-6.2-to-6.3
data.trackShard = shardTracker(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics);
data.trackBytes = trackShardMetrics(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics);
self->shards.insert( ranges[i], data );
}
}

View File

@ -575,16 +575,13 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( BEHIND_CHECK_COUNT, 2 );
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
<<<<<<< HEAD
init( MIN_TAG_PAGES_READ_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_READ_RATE = 0;
init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0;
init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096;
init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, true ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = false;
=======
init( REPORT_DD_METRICS, true );
init( DD_METRICS_REPORT_INTERVAL, 30.0 );
init( FETCH_KEYS_TOO_LONG_TIME_CRITERIA, 300.0 );
>>>>>>> anoyes/merge-6.2-to-6.3
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -504,16 +504,13 @@ public:
int BEHIND_CHECK_COUNT;
int64_t BEHIND_CHECK_VERSIONS;
double WAIT_METRICS_WRONG_SHARD_CHANCE;
<<<<<<< HEAD
int64_t MIN_TAG_PAGES_READ_RATE;
double READ_TAG_MEASUREMENT_INTERVAL;
int64_t OPERATION_COST_BYTE_FACTOR;
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;
=======
bool REPORT_DD_METRICS;
double DD_METRICS_REPORT_INTERVAL;
double FETCH_KEYS_TOO_LONG_TIME_CRITERIA;
>>>>>>> anoyes/merge-6.2-to-6.3
//Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;

View File

@ -78,20 +78,13 @@ struct LogRouterData {
const UID dbgid;
Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
Optional<UID> primaryPeekLocation;
<<<<<<< HEAD
NotifiedVersion version;
NotifiedVersion minPopped;
const Version startVersion;
Version minKnownCommittedVersion;
=======
NotifiedVersion version; // The largest version at which the log router has peeked mutations
// from satellite tLog or primary tLogs.
NotifiedVersion minPopped; // The minimum version among all tags that has been popped by remote tLogs.
Version startVersion;
const Version startVersion;
Version minKnownCommittedVersion; // The minimum durable version among all LRs.
// A LR's durable version is the maximum version of mutations that have been
// popped by remote tLog.
>>>>>>> anoyes/merge-6.2-to-6.3
Version poppedVersion;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
Tag routerTag;

View File

@ -104,7 +104,6 @@ struct ProxyStats {
Future<Void> logger;
<<<<<<< HEAD
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
@ -145,36 +144,13 @@ struct ProxyStats {
return r;
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
maxComputeNS(0), minComputeNS(1e12),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
=======
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), maxComputeNS(0), minComputeNS(1e12),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc),
txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
@ -183,10 +159,11 @@ struct ProxyStats {
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc),
mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc),
keyServerLocationErrors("KeyServerLocationErrors", cc), lastCommitVersionAssigned(0),
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
@ -199,15 +176,12 @@ struct ProxyStats {
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
transactionRateAllowed(0), batchTransactionRateAllowed(0), transactionLimit(0), batchTransactionLimit(0),
percentageOfDefaultGRVQueueProcessed(0), percentageOfBatchGRVQueueProcessed(0) {
>>>>>>> anoyes/merge-6.2-to-6.3
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
specialCounter(cc, "Version", [pVersion](){return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
<<<<<<< HEAD
specialCounter(cc, "MaxCompute", [this](){ return this->getAndResetMaxCompute(); });
specialCounter(cc, "MinCompute", [this](){ return this->getAndResetMinCompute(); });
=======
// The rate at which the limit(budget) is allowed to grow.
specialCounter(cc, "SystemAndDefaultTxnRateAllowed", [this]() { return this->transactionRateAllowed; });
specialCounter(cc, "BatchTransactionRateAllowed", [this]() { return this->batchTransactionRateAllowed; });
@ -217,7 +191,6 @@ struct ProxyStats {
[this]() { return this->percentageOfDefaultGRVQueueProcessed; });
specialCounter(cc, "PercentageOfBatchGRVQueueProcessed",
[this]() { return this->percentageOfBatchGRVQueueProcessed; });
>>>>>>> anoyes/merge-6.2-to-6.3
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
@ -298,17 +271,12 @@ struct TransactionRateInfo {
}
};
<<<<<<< HEAD
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, TransactionRateInfo *transactionRateInfo,
TransactionRateInfo *batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter, PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags) {
=======
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount,
int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply, ProxyStats* stats) {
>>>>>>> anoyes/merge-6.2-to-6.3
int64_t* inBatchTransactionCount, TransactionRateInfo* transactionRateInfo,
TransactionRateInfo* batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags, ProxyStats* stats) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -345,14 +313,11 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
when ( GetRateInfoReply rep = wait(reply) ) {
reply = Never();
<<<<<<< HEAD
transactionRateInfo->setRate(rep.transactionRate);
batchTransactionRateInfo->setRate(rep.batchTransactionRate);
//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
=======
*outTransactionRate = rep.transactionRate;
*outBatchTransactionRate = rep.batchTransactionRate;
stats->transactionRateAllowed = rep.transactionRate;
stats->batchTransactionRateAllowed = rep.batchTransactionRate;
// TraceEvent("MasterProxyTxRate", myID)
@ -361,7 +326,6 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
// .detail("BatchRateAllowed", rep.batchTransactionRate)
// .detail("Lease", rep.leaseDuration)
// .detail("ReleasedTransactions", *inTransactionCount - lastTC);
>>>>>>> anoyes/merge-6.2-to-6.3
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
@ -1650,17 +1614,12 @@ ACTOR static Future<Void> transactionStarter(MasterProxyInterface proxy, Referen
state PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
state PromiseStream<double> replyTimes;
<<<<<<< HEAD
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo,
healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags,
stats));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo,
&transactionTagCounter));
=======
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate,
&batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply, stats));
addActor.send(queueTransactionStartRequests(&systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
>>>>>>> anoyes/merge-6.2-to-6.3
// Get a list of the other proxies that go together with us
while (std::find(db->get().client.proxies.begin(), db->get().client.proxies.end(), proxy) == db->get().client.proxies.end())
@ -1729,16 +1688,10 @@ ACTOR static Future<Void> transactionStarter(MasterProxyInterface proxy, Referen
}
transactionsStarted[req.flags&1] += tc;
<<<<<<< HEAD
if (req.priority >= TransactionPriority::IMMEDIATE)
systemTransactionsStarted[req.flags & 1] += tc;
else if (req.priority >= TransactionPriority::DEFAULT)
=======
double currentTime = g_network->timer();
if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
if (req.priority >= TransactionPriority::IMMEDIATE) {
systemTransactionsStarted[req.flags & 1] += tc;
} else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
>>>>>>> anoyes/merge-6.2-to-6.3
} else if (req.priority >= TransactionPriority::DEFAULT) {
defaultPriTransactionsStarted[req.flags & 1] += tc;
stats->defaultTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
} else {

View File

@ -1027,7 +1027,6 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
return Void();
}
<<<<<<< HEAD
ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Reference<LogData> logData ) {
if (self->ignorePopRequest) {
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
@ -1107,16 +1106,11 @@ ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogDat
return Void();
}
// This function (and updatePersistentData, which is called by this function) run at a low priority and can soak up all CPU resources.
// For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce latencies for more important
// work (e.g. commits).
=======
// This function (and updatePersistentData, which is called by this function) run at a low priority and can soak up all
// CPU resources. For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce
// latencies for more important work (e.g. commits).
// This actor is just a loop that calls updatePersistentData and popDiskQueue whenever
// (a) there's data to be spilled or (b) we should update metadata after some commits have been fully popped.
>>>>>>> anoyes/merge-6.2-to-6.3
ACTOR Future<Void> updateStorage( TLogData* self ) {
while(self->spillOrder.size() && !self->id_data.count(self->spillOrder.front())) {
self->spillOrder.pop_front();

View File

@ -166,25 +166,16 @@ OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData)
struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogSystem> {
const UID dbgid;
LogSystemType logSystemType;
<<<<<<< HEAD
std::vector<Reference<LogSet>> tLogs; // LogSets in different locations: primary, remote or satellite
=======
std::vector<Reference<LogSet>> tLogs; // LogSets in different locations: primary, satellite, or remote
>>>>>>> anoyes/merge-6.2-to-6.3
int expectedLogSets;
int logRouterTags;
int txsTags;
UID recruitmentID;
int repopulateRegionAntiQuorum;
bool stopped;
<<<<<<< HEAD
std::set<int8_t> pseudoLocalities; // Represent special localities that will be mapped to tagLocalityLogRouter
const LogEpoch epoch;
LogEpoch oldestBackupEpoch;
=======
std::set<int8_t> pseudoLocalities;
std::map<int8_t, Version> pseudoLocalityPopVersion; // first:locality, second:popped version at the locality
>>>>>>> anoyes/merge-6.2-to-6.3
// new members
std::map<Tag, Version> pseudoLocalityPopVersion;
@ -279,7 +270,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
bool hasPseudoLocality(int8_t locality) final { return pseudoLocalities.count(locality) > 0; }
<<<<<<< HEAD
// Return the min version of all pseudoLocalities, i.e., logRouter and backupTag
Version popPseudoLocalityTag(Tag tag, Version upTo) final {
ASSERT(isPseudoLocality(tag.locality) && hasPseudoLocality(tag.locality));
@ -287,21 +277,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Version& localityVersion = pseudoLocalityPopVersion[tag];
localityVersion = std::max(localityVersion, upTo);
Version minVersion = localityVersion;
for (const int8_t locality : pseudoLocalities) {
minVersion = std::min(minVersion, pseudoLocalityPopVersion[Tag(locality, tag.id)]);
=======
// Return the max version that can be popped for the locality;
Version popPseudoLocalityTag(int8_t locality, Version upTo) override {
ASSERT(isPseudoLocality(locality));
auto& localityVersion = pseudoLocalityPopVersion[locality];
localityVersion = std::max(localityVersion, upTo);
Version minVersion = localityVersion;
// Why do we need to use the minimum popped version among all tags? Reason: for example,
// 2 pseudo tags pop 100 or 150, respectively. It's only safe to pop min(100, 150),
// because [101,150) is needed by another pseudo tag.
for (const auto& it : pseudoLocalityPopVersion) {
minVersion = std::min(minVersion, it.second);
>>>>>>> anoyes/merge-6.2-to-6.3
for (const int8_t locality : pseudoLocalities) {
minVersion = std::min(minVersion, pseudoLocalityPopVersion[Tag(locality, tag.id)]);
}
// TraceEvent("TLogPopPseudoTag", dbgid).detail("Tag", tag.toString()).detail("Version", upTo).detail("PopVersion", minVersion);
return minVersion;
@ -1139,12 +1119,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
<<<<<<< HEAD
void pop(Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality) final {
=======
// pop 'tag.locality' type data up to the 'upTo' version
virtual void pop( Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality ) {
>>>>>>> anoyes/merge-6.2-to-6.3
void pop(Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality) final {
if (upTo <= 0) return;
if (tag.locality == tagLocalityRemoteLog) {
popLogRouter(upTo, tag, durableKnownCommittedVersion, popLocality);
@ -1159,30 +1135,20 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if (prev < upTo) {
// update pop version for popFromLog actor
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, durableKnownCommittedVersion);
<<<<<<< HEAD
}
if (prev == 0) {
// pop tag from log upto version defined in outstandingPops[].first
popActors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
=======
if (prev == 0) {
popActors.add(popFromLog(this, log, tag, 1.0)); //< FIXME: knob // TODO: Knobify it
>>>>>>> anoyes/merge-6.2-to-6.3
}
}
}
}
}
<<<<<<< HEAD
// pop tag from log up to the version defined in self->outstandingPops[].first
ACTOR static Future<Void> popFromLog(TagPartitionedLogSystem* self,
Reference<AsyncVar<OptionalInterface<TLogInterface>>> log, Tag tag,
double time) {
=======
// pop tag from log up to the version defined in self->outstandingPops[].first
ACTOR static Future<Void> popFromLog( TagPartitionedLogSystem* self, Reference<AsyncVar<OptionalInterface<TLogInterface>>> log, Tag tag, double time ) {
>>>>>>> anoyes/merge-6.2-to-6.3
state Version last = 0;
loop {
wait( delay(time, TaskPriority::TLogPop) );

View File

@ -1858,7 +1858,6 @@ int main(int argc, char* argv[]) {
setupAndRun(dataFolder, opts.testFile, opts.restarting, (isRestoring >= 1), opts.whitelistBinPaths);
g_simulator.run();
} else if (role == FDBD) {
<<<<<<< HEAD
// Update the global blob credential files list so that both fast
// restore workers and backup workers can access blob storage.
std::vector<std::string>* pFiles =
@ -1868,19 +1867,6 @@ int main(int argc, char* argv[]) {
pFiles->push_back(f);
}
}
=======
ASSERT( connectionFile );
setupSlowTaskProfiler();
if (!dataFolder.size())
dataFolder = format("fdb/%d/", publicAddresses.address.port); // SOMEDAY: Better default
vector<Future<Void>> actors(listenErrors.begin(), listenErrors.end());
actors.push_back( fdbd(connectionFile, localities, processClass, dataFolder, dataFolder, storageMemLimit, metricsConnFile, metricsPrefix, rsssize, whitelistBinPaths) );
actors.push_back(histogramReport());
//actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement
>>>>>>> anoyes/merge-6.2-to-6.3
// Call fast restore for the class FastRestoreClass. This is a short-cut to run fast restore in circus
if (opts.processClass == ProcessClass::FastRestoreClass) {
@ -1909,6 +1895,7 @@ int main(int argc, char* argv[]) {
actors.push_back(fdbd(opts.connectionFile, opts.localities, opts.processClass, dataFolder, dataFolder,
opts.storageMemLimit, opts.metricsConnFile, opts.metricsPrefix, opts.rsssize,
opts.whitelistBinPaths));
actors.push_back(histogramReport());
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement
f = stopAfter(waitForAll(actors));

View File

@ -657,28 +657,10 @@ public:
}
} counters;
<<<<<<< HEAD
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
: instanceID(deterministicRandom()->randomUniqueID().first()),
storage(this, storage), db(db), actors(false),
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
durableInProgress(Void()),
versionLag(0), primaryLocality(tagLocalityInvalid),
updateEagerReads(0),
shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
behind(false), versionBehind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
{
=======
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db,
StorageServerInterface const& ssi)
: fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage),
db(db), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
rebootAfterDurableVersion(std::numeric_limits<Version>::max()), durableInProgress(Void()), versionLag(0),
primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), shuttingDown(false),
@ -687,7 +669,6 @@ public:
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), behind(false), versionBehind(false),
byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), lastUpdate(now()),
poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0) {
>>>>>>> anoyes/merge-6.2-to-6.3
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
@ -3815,7 +3796,6 @@ ACTOR Future<Void> checkBehind( StorageServer* self ) {
}
}
<<<<<<< HEAD
ACTOR Future<Void> serveGetValueRequests( StorageServer* self, FutureStream<GetValueRequest> getValue ) {
loop {
GetValueRequest req = waitNext(getValue);
@ -3852,7 +3832,9 @@ ACTOR Future<Void> serveWatchValueRequests( StorageServer* self, FutureStream<Wa
// TODO: fast load balancing?
// SOMEDAY: combine watches for the same key/value into a single watch
self->actors.add(self->readGuard(req, watchValueQ));
=======
}
}
ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
if (!SERVER_KNOBS->REPORT_DD_METRICS) {
return Void();
@ -3879,7 +3861,6 @@ ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
.detail("StartKey", longestRunningFetchKeys.second.begin.printable())
.detail("EndKey", longestRunningFetchKeys.second.end.printable())
.detail("NumRunning", numRunningFetchKeys);
>>>>>>> anoyes/merge-6.2-to-6.3
}
}
@ -3890,7 +3871,6 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
state double lastLoopTopTime = now();
state Future<Void> dbInfoChange = Void();
state Future<Void> checkLastUpdate = Void();
<<<<<<< HEAD
state Future<Void> updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
self->actors.add(updateStorage(self));
@ -3904,21 +3884,10 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture()));
self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture()));
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
self->actors.add(reportStorageServerState(self));
self->transactionTagCounter.startNewInterval(self->thisServerID);
self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
=======
state double updateProcessStatsDelay = SERVER_KNOBS->UPDATE_STORAGE_PROCESS_STATS_INTERVAL;
state Future<Void> updateProcessStatsTimer = delay(updateProcessStatsDelay);
actors.add(updateStorage(self));
actors.add(waitFailureServer(ssi.waitFailure.getFuture()));
actors.add(self->otherError.getFuture());
actors.add(metricsCore(self, ssi));
actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
actors.add(checkBehind(self));
actors.add(reportStorageServerState(self));
>>>>>>> anoyes/merge-6.2-to-6.3
self->coreStarted.send( Void() );

View File

@ -30,11 +30,7 @@
#include <cctype>
#include <time.h>
#include <set>
<<<<<<< HEAD
=======
#include <iomanip>
>>>>>>> anoyes/merge-6.2-to-6.3
#include "flow/IThreadPool.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/FastRef.h"

View File

@ -90,12 +90,9 @@ ERROR( please_reboot_delete, 1208, "Reboot of server process requested, with del
ERROR( master_proxy_failed, 1209, "Master terminating because a Proxy failed" )
ERROR( master_resolver_failed, 1210, "Master terminating because a Resolver failed" )
ERROR( server_overloaded, 1211, "Server is under too much load and cannot respond" )
<<<<<<< HEAD
ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup worker failed")
ERROR( tag_throttled, 1213, "Transaction tag is being throttled" )
=======
ERROR(dd_tracker_cancelled, 1212, "The data distribution tracker has been cancelled")
>>>>>>> anoyes/merge-6.2-to-6.3
ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -32,11 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
<<<<<<< HEAD
Id='{4F01CFD6-596A-4224-BF7D-4AEF30BA2083}'
=======
Id='{88AA3058-920F-4DB3-8E3E-492E35F13DDE}'
>>>>>>> anoyes/merge-6.2-to-6.3
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'

View File

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Version>6.2.29</Version>
<PackageName>6.2</PackageName>
<Version>6.3.10</Version>
<PackageName>6.3</PackageName>
</PropertyGroup>
</Project>