Refactor: add std:: qualifier and use emplace_back

This commit is contained in:
Jingyu Zhou 2019-05-16 13:54:06 -07:00 committed by Alex Miller
parent f598d0a614
commit b8e7fc1b84
16 changed files with 244 additions and 224 deletions

View File

@ -60,7 +60,7 @@ public:
Database clone() const { return Database(new DatabaseContext( cluster, clientInfo, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion )); }
pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false );
std::pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false );
bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse );
Reference<LocationInfo> setCachedLocation( const KeyRangeRef&, const vector<struct StorageServerInterface>& );
void invalidateCache( const KeyRef&, bool isBackward = false );

View File

@ -21,11 +21,14 @@
#ifndef FDBCLIENT_FDBTYPES_H
#define FDBCLIENT_FDBTYPES_H
#include <algorithm>
#include <set>
#include <string>
#include <vector>
#include "flow/flow.h"
#include "fdbclient/Knobs.h"
using std::vector;
using std::pair;
typedef int64_t Version;
typedef uint64_t LogEpoch;
typedef uint64_t Sequence;
@ -761,7 +764,7 @@ static bool addressExcluded( std::set<AddressExclusion> const& exclusions, Netwo
struct ClusterControllerPriorityInfo {
enum DCFitness { FitnessPrimary, FitnessRemote, FitnessPreferred, FitnessUnknown, FitnessBad }; //cannot be larger than 7 because of leader election mask
static DCFitness calculateDCFitness(Optional<Key> const& dcId, vector<Optional<Key>> const& dcPriority) {
static DCFitness calculateDCFitness(Optional<Key> const& dcId, std::vector<Optional<Key>> const& dcPriority) {
if(!dcPriority.size()) {
return FitnessUnknown;
} else if(dcPriority.size() == 1) {

View File

@ -23,6 +23,9 @@
#define FDBCLIENT_MASTERPROXYINTERFACE_H
#pragma once
#include <utility>
#include <vector>
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/CommitTransaction.h"
@ -168,7 +171,7 @@ struct GetReadVersionRequest : TimedRequest {
struct GetKeyServerLocationsReply {
constexpr static FileIdentifier file_identifier = 10636023;
Arena arena;
vector<pair<KeyRangeRef, vector<StorageServerInterface>>> results;
std::vector<std::pair<KeyRangeRef, vector<StorageServerInterface>>> results;
template <class Ar>
void serialize(Ar& ar) {
@ -213,7 +216,7 @@ struct GetStorageServerRejoinInfoReply {
Tag tag;
Optional<Tag> newTag;
bool newLocality;
vector<pair<Version, Tag>> history;
std::vector<std::pair<Version, Tag>> history;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -62,6 +62,7 @@ extern const char* getHGVersion();
using std::make_pair;
using std::max;
using std::min;
using std::pair;
NetworkOptions networkOptions;
Reference<TLSOptions> tlsOptions;

View File

@ -22,6 +22,9 @@
#define CONFLICTSET_H
#pragma once
#include <utility>
#include <vector>
#include "fdbclient/CommitTransaction.h"
struct ConflictSet;
@ -40,23 +43,23 @@ struct ConflictBatch {
};
void addTransaction( const CommitTransactionRef& transaction );
void detectConflicts(Version now, Version newOldestVersion, vector<int>& nonConflicting, vector<int>* tooOldTransactions = NULL);
void GetTooOldTransactions(vector<int>& tooOldTransactions);
void detectConflicts(Version now, Version newOldestVersion, std::vector<int>& nonConflicting, std::vector<int>* tooOldTransactions = NULL);
void GetTooOldTransactions(std::vector<int>& tooOldTransactions);
private:
ConflictSet* cs;
Standalone< VectorRef< struct TransactionInfo* > > transactionInfo;
vector<struct KeyInfo> points;
std::vector<struct KeyInfo> points;
int transactionCount;
vector< pair<StringRef,StringRef> > combinedWriteConflictRanges;
vector< struct ReadConflictRange > combinedReadConflictRanges;
std::vector< std::pair<StringRef,StringRef> > combinedWriteConflictRanges;
std::vector< struct ReadConflictRange > combinedReadConflictRanges;
bool* transactionConflictStatus;
void checkIntraBatchConflicts();
void combineWriteConflictRanges();
void checkReadConflictRanges();
void mergeWriteConflictRanges(Version now);
void addConflictRanges(Version now, vector< pair<StringRef,StringRef> >::iterator begin, vector< pair<StringRef,StringRef> >::iterator end, class SkipList* part);
void addConflictRanges(Version now, std::vector< std::pair<StringRef,StringRef> >::iterator begin, std::vector< std::pair<StringRef,StringRef> >::iterator end, class SkipList* part);
};
#endif

View File

@ -204,7 +204,7 @@ public:
return tag.id % logServers.size();
}
void updateLocalitySet( vector<LocalityData> const& localities ) {
void updateLocalitySet( std::vector<LocalityData> const& localities ) {
LocalityMap<int>* logServerMap;
logServerSet = Reference<LocalitySet>(new LocalityMap<int>());
@ -418,7 +418,7 @@ struct ILogSystem {
struct MergedPeekCursor : IPeekCursor, ReferenceCounted<MergedPeekCursor> {
Reference<LogSet> logSet;
vector< Reference<IPeekCursor> > serverCursors;
std::vector< Reference<IPeekCursor> > serverCursors;
std::vector<LocalityEntry> locations;
std::vector< std::pair<LogMessageVersion, int> > sortedVersions;
Tag tag;
@ -429,9 +429,9 @@ struct ILogSystem {
UID randomID;
int tLogReplicationFactor;
MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin );
MergedPeekCursor( std::vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin );
MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector<LocalityData> const& tLogLocalities, Reference<IReplicationPolicy> const tLogPolicy, int tLogReplicationFactor );
MergedPeekCursor( vector< Reference<IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, Reference<LogSet> logSet, int tLogReplicationFactor );
MergedPeekCursor( std::vector< Reference<IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, Reference<LogSet> logSet, int tLogReplicationFactor );
virtual Reference<IPeekCursor> cloneNoMore();
virtual void setProtocolVersion( uint64_t version );
@ -636,7 +636,7 @@ struct ILogSystem {
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Optional<Version> end, std::vector<Tag> tags, bool parallelGetMore = false ) = 0;
// Same contract as peek(), but for a set of tags
virtual Reference<IPeekCursor> peekSingle( UID dbgid, Version begin, Tag tag, vector<pair<Version,Tag>> history = vector<pair<Version,Tag>>() ) = 0;
virtual Reference<IPeekCursor> peekSingle( UID dbgid, Version begin, Tag tag, std::vector<std::pair<Version,Tag>> history = std::vector<std::pair<Version,Tag>>() ) = 0;
// Same contract as peek(), but blocks until the preferred log server(s) for the given tag are available (and is correspondingly less expensive)
virtual Reference<IPeekCursor> peekLogRouter( UID dbgid, Version begin, Tag tag ) = 0;
@ -690,7 +690,7 @@ struct ILogSystem {
virtual Future<Void> onLogSystemConfigChange() = 0;
// Returns when the log system configuration has changed due to a tlog rejoin.
virtual void getPushLocations( std::vector<Tag> const& tags, vector<int>& locations ) = 0;
virtual void getPushLocations( std::vector<Tag> const& tags, std::vector<int>& locations ) = 0;
virtual bool hasRemoteLogs() = 0;
@ -807,10 +807,10 @@ struct LogPushData : NonCopyable {
private:
Reference<ILogSystem> logSystem;
vector<Tag> next_message_tags;
vector<Tag> prev_tags;
vector<BinaryWriter> messagesWriter;
vector<int> msg_locations;
std::vector<Tag> next_message_tags;
std::vector<Tag> prev_tags;
std::vector<BinaryWriter> messagesWriter;
std::vector<int> msg_locations;
uint32_t subsequence;
};

View File

@ -24,6 +24,7 @@
#include <algorithm>
#include <numeric>
#include <string>
#include <vector>
/*
#ifdef __GNUG__
@ -43,9 +44,8 @@
using std::min;
using std::max;
using std::make_pair;
static vector<PerfDoubleCounter*> skc;
static std::vector<PerfDoubleCounter*> skc;
static thread_local uint32_t g_seed = 0;
@ -199,7 +199,7 @@ bool operator == (const KeyInfo& lhs, const KeyInfo& rhs ) {
return !(lhs<rhs || rhs<lhs);
}
void swapSort(vector<KeyInfo>& points, int a, int b){
void swapSort(std::vector<KeyInfo>& points, int a, int b){
if (points[b] < points[a]){
KeyInfo temp;
temp = points[a];
@ -208,7 +208,7 @@ void swapSort(vector<KeyInfo>& points, int a, int b){
}
}
void smallSort(vector<KeyInfo>& points, int start, int N){
void smallSort(std::vector<KeyInfo>& points, int start, int N){
for (int i=1;i<N;i++)
for (int j=i;j>0;j-=2)
swapSort(points, start+j-1, start+j);
@ -224,12 +224,12 @@ struct SortTask {
SortTask(int begin, int size, int character) : begin(begin), size(size), character(character) {}
};
void sortPoints(vector<KeyInfo>& points){
vector<SortTask> tasks;
vector<KeyInfo> newPoints;
vector<int> counts;
void sortPoints(std::vector<KeyInfo>& points){
std::vector<SortTask> tasks;
std::vector<KeyInfo> newPoints;
std::vector<int> counts;
tasks.push_back( SortTask(0, points.size(), 0) );
tasks.emplace_back(0, points.size(), 0);
while (tasks.size()){
SortTask st = tasks.back();
@ -259,7 +259,7 @@ void sortPoints(vector<KeyInfo>& points){
for(int i=0;i<counts.size();i++){
int temp = counts[i];
if (temp > 1)
tasks.push_back(SortTask(st.begin+total, temp, st.character+1));
tasks.emplace_back(st.begin+total, temp, st.character+1);
counts[i] = total;
total += temp;
}
@ -569,7 +569,7 @@ public:
}
void concatenate( SkipList* input, int count ) {
vector<Finger> ends( count-1 );
std::vector<Finger> ends( count-1 );
for(int i=0; i<ends.size(); i++)
input[i].getEnd( ends[i] );
@ -948,9 +948,9 @@ struct ConflictSet {
SkipList versionHistory;
Key removalKey;
Version oldestVersion;
vector<PAction> worker_nextAction;
vector<Event*> worker_ready;
vector<Event*> worker_finished;
std::vector<PAction> worker_nextAction;
std::vector<Event*> worker_ready;
std::vector<Event*> worker_finished;
};
ConflictSet* newConflictSet() { return new ConflictSet; }
@ -989,18 +989,18 @@ void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) {
info->readRanges.resize( arena, tr.read_conflict_ranges.size() );
info->writeRanges.resize( arena, tr.write_conflict_ranges.size() );
vector<KeyInfo> &points = this->points;
std::vector<KeyInfo>& points = this->points;
for(int r=0; r<tr.read_conflict_ranges.size(); r++) {
const KeyRangeRef& range = tr.read_conflict_ranges[r];
points.push_back( KeyInfo( range.begin, false, true, false, t, &info->readRanges[r].first ) );
points.emplace_back(range.begin, false, true, false, t, &info->readRanges[r].first);
//points.back().keyEnd = StringRef(buf,range.second);
points.push_back( KeyInfo( range.end, false, false, false, t, &info->readRanges[r].second ) );
combinedReadConflictRanges.push_back( ReadConflictRange( range.begin, range.end, tr.read_snapshot, t ) );
points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t);
}
for(int r=0; r<tr.write_conflict_ranges.size(); r++) {
const KeyRangeRef& range = tr.write_conflict_ranges[r];
points.push_back( KeyInfo( range.begin, false, true, true, t, &info->writeRanges[r].first ) );
points.push_back( KeyInfo( range.end, false, false, true, t, &info->writeRanges[r].second ) );
points.emplace_back(range.begin, false, true, true, t, &info->writeRanges[r].first);
points.emplace_back(range.end, false, false, true, t, &info->writeRanges[r].second);
}
}
@ -1008,7 +1008,7 @@ void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) {
}
class MiniConflictSet2 : NonCopyable {
vector<bool> values;
std::vector<bool> values;
public:
explicit MiniConflictSet2( int size ) {
values.assign( size, false );
@ -1028,21 +1028,21 @@ public:
class MiniConflictSet : NonCopyable {
typedef uint64_t wordType;
enum { bucketShift = 6, bucketMask=sizeof(wordType)*8-1 };
vector<wordType> values; // undefined when andValues is true for a range of values
vector<wordType> orValues;
vector<wordType> andValues;
std::vector<wordType> values; // undefined when andValues is true for a range of values
std::vector<wordType> orValues;
std::vector<wordType> andValues;
MiniConflictSet2 debug; // SOMEDAY: Test on big ranges, eliminate this
uint64_t bitMask(unsigned int bit){ // computes results for bit%word
return (((wordType)1) << ( bit & bucketMask )); // '&' unnecesary?
}
void setNthBit(vector<wordType> &v, const unsigned int bit){
void setNthBit(std::vector<wordType>& v, const unsigned int bit){
v[bit>>bucketShift] |= bitMask(bit);
}
void clearNthBit(vector<wordType> &v, const unsigned int bit){
void clearNthBit(std::vector<wordType>& v, const unsigned int bit){
v[bit>>bucketShift] &= ~(bitMask(bit));
}
bool getNthBit(const vector<wordType> &v, const unsigned int bit){
bool getNthBit(const std::vector<wordType>& v, const unsigned int bit){
return (v[bit>>bucketShift] & bitMask(bit)) != 0;
}
int wordsForNBits(unsigned int bits){
@ -1060,7 +1060,7 @@ class MiniConflictSet : NonCopyable {
return (b&bucketMask) ? lowBits(b) : -1;
}
void setBits(vector<wordType> &v, int bitBegin, int bitEnd, bool fillMiddle){
void setBits(std::vector<wordType>& v, int bitBegin, int bitEnd, bool fillMiddle){
if (bitBegin >= bitEnd) return;
int beginWord = bitBegin>>bucketShift;
int lastWord = ((bitEnd+bucketMask) >> bucketShift) - 1;
@ -1075,7 +1075,7 @@ class MiniConflictSet : NonCopyable {
}
}
bool orBits(vector<wordType> &v, int bitBegin, int bitEnd, bool getMiddle) {
bool orBits(std::vector<wordType>& v, int bitBegin, int bitEnd, bool getMiddle) {
if (bitBegin >= bitEnd) return false;
int beginWord = bitBegin >> bucketShift;
int lastWord = ((bitEnd+bucketMask) >> bucketShift) - 1;
@ -1152,7 +1152,7 @@ void ConflictBatch::checkIntraBatchConflicts() {
}
}
void ConflictBatch::GetTooOldTransactions(vector<int>& tooOldTransactions) {
void ConflictBatch::GetTooOldTransactions(std::vector<int>& tooOldTransactions) {
for (int i = 0; i<transactionInfo.size(); i++) {
if (transactionInfo[i]->tooOld) {
tooOldTransactions.push_back(i);
@ -1160,7 +1160,7 @@ void ConflictBatch::GetTooOldTransactions(vector<int>& tooOldTransactions) {
}
}
void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, vector<int>& nonConflicting, vector<int>* tooOldTransactions) {
void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, std::vector<int>& nonConflicting, std::vector<int>* tooOldTransactions) {
double t = timer();
sortPoints( points );
//std::sort( combinedReadConflictRanges.begin(), combinedReadConflictRanges.end() );
@ -1232,7 +1232,7 @@ DISABLE_ZERO_DIVISION_FLAG
}
}
void ConflictBatch::addConflictRanges(Version now, vector< pair<StringRef,StringRef> >::iterator begin, vector< pair<StringRef,StringRef> >::iterator end,SkipList* part) {
void ConflictBatch::addConflictRanges(Version now, std::vector< std::pair<StringRef,StringRef> >::iterator begin, std::vector< std::pair<StringRef,StringRef> >::iterator end,SkipList* part) {
int count = end-begin;
#if 0
//for(auto w = begin; w != end; ++w)
@ -1262,16 +1262,16 @@ void ConflictBatch::mergeWriteConflictRanges(Version now) {
return;
if (PARALLEL_THREAD_COUNT) {
vector<SkipList> parts;
std::vector<SkipList> parts;
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++)
parts.push_back(SkipList());
parts.emplace_back();
vector<StringRef> splits( parts.size()-1 );
std::vector<StringRef> splits( parts.size()-1 );
for(int s=0; s<splits.size(); s++)
splits[s] = combinedWriteConflictRanges[ (s+1)*combinedWriteConflictRanges.size()/parts.size() ].first;
cs->versionHistory.partition( splits.size() ? &splits[0] : NULL, splits.size(), &parts[0] );
vector<double> tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT);
std::vector<double> tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT);
Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1];
double before = timer();
for(int t=0; t<parts.size(); t++) {
@ -1325,8 +1325,8 @@ void ConflictBatch::combineWriteConflictRanges()
if (point.write && !transactionConflictStatus[ point.transaction ]) {
if (point.begin) {
activeWriteCount++;
if (activeWriteCount == 1)
combinedWriteConflictRanges.push_back( make_pair( point.key, KeyRef() ) );
if (activeWriteCount == 1)
combinedWriteConflictRanges.emplace_back(point.key, KeyRef());
} else /*if (point.end)*/ {
activeWriteCount--;
if (activeWriteCount == 0)
@ -1431,8 +1431,8 @@ void skipListTest() {
Arena testDataArena;
VectorRef< VectorRef<KeyRangeRef> > testData;
testData.resize(testDataArena, 500);
vector<vector<uint8_t>> success( testData.size() );
vector<vector<uint8_t>> success2( testData.size() );
std::vector<std::vector<uint8_t>> success( testData.size() );
std::vector<std::vector<uint8_t>> success2( testData.size() );
for(int i=0; i<testData.size(); i++) {
testData[i].resize(testDataArena, 5000);
success[i].assign( testData[i].size(), false );
@ -1454,10 +1454,10 @@ void skipListTest() {
int cranges = 0, tcount = 0;
start = timer();
vector<vector<int>> nonConflict( testData.size() );
std::vector<std::vector<int>> nonConflict( testData.size() );
for(int i=0; i<testData.size(); i++) {
Arena buf;
vector<CommitTransactionRef> trs;
std::vector<CommitTransactionRef> trs;
double t = timer();
for(int j=0; j+readCount+writeCount<=testData[i].size(); j+=readCount+writeCount) {
CommitTransactionRef tr;

View File

@ -767,7 +767,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
virtual Reference<IPeekCursor> peekSingle( UID dbgid, Version begin, Tag tag, vector<pair<Version,Tag>> history ) {
virtual Reference<IPeekCursor> peekSingle( UID dbgid, Version begin, Tag tag, std::vector<std::pair<Version,Tag>> history ) {
while(history.size() && begin >= history.back().first) {
history.pop_back();
}

View File

@ -429,12 +429,12 @@ ACTOR Future<std::vector<Message>> _listInboxMessages(Database cx, uint64_t inbo
//printf(" -> cached message %016llx from feed %016llx\n", messageId, feed);
if(messageId >= cursor) {
//printf(" -> entering message %016llx from feed %016llx\n", messageId, feed);
feedLatest.insert(pair<MessageId, Feed>(messageId, feed));
feedLatest.emplace(messageId, feed);
} else {
// replace this with the first message older than the cursor
MessageId mId = wait(getFeedLatestAtOrAfter(&tr, feed, cursor));
if(mId) {
feedLatest.insert(pair<MessageId, Feed>(mId, feed));
feedLatest.emplace(mId, feed);
}
}
}
@ -465,7 +465,7 @@ ACTOR Future<std::vector<Message>> _listInboxMessages(Database cx, uint64_t inbo
MessageId nextMessage = wait(getFeedLatestAtOrAfter(&tr, f, id + 1));
if(nextMessage) {
feedLatest.insert(pair<MessageId, Feed>(nextMessage, f));
feedLatest.emplace(nextMessage, f);
}
}

View File

@ -52,6 +52,7 @@
#include "flow/TDMetric.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using std::pair;
using std::make_pair;
#pragma region Data Structures
@ -199,11 +200,11 @@ private:
};
struct UpdateEagerReadInfo {
vector<KeyRef> keyBegin;
vector<Key> keyEnd; // these are for ClearRange
std::vector<KeyRef> keyBegin;
std::vector<Key> keyEnd; // these are for ClearRange
vector<pair<KeyRef, int>> keys;
vector<Optional<Value>> value;
std::vector<std::pair<KeyRef, int>> keys;
std::vector<Optional<Value>> value;
Arena arena;
@ -223,13 +224,13 @@ struct UpdateEagerReadInfo {
// CompareAndClear is likely to be used after another atomic operation on same key.
keys.back().second = std::max(keys.back().second, m.param2.size() + 1);
} else {
keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size() + 1));
keys.emplace_back(m.param1, m.param2.size() + 1);
}
} else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) ||
(m.type == MutationRef::ByteMax))
keys.push_back(pair<KeyRef, int>(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT));
keys.emplace_back(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT);
else if (isAtomicOp((MutationRef::Type) m.type))
keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size()));
keys.emplace_back(m.param1, m.param2.size());
}
void finishKeyBegin() {
@ -2239,8 +2240,8 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss
// version. The latter depends on data->newestAvailableVersion, so loop over the ranges of that.
// SOMEDAY: Could this just use shards? Then we could explicitly do the removeDataRange here when an adding/transferred shard is cancelled
auto vr = data->newestAvailableVersion.intersectingRanges(keys);
vector<std::pair<KeyRange,Version>> changeNewestAvailable;
vector<KeyRange> removeRanges;
std::vector<std::pair<KeyRange,Version>> changeNewestAvailable;
std::vector<KeyRange> removeRanges;
for (auto r = vr.begin(); r != vr.end(); ++r) {
KeyRangeRef range = keys & r->range();
bool dataAvailable = r->value()==latestVersion || r->value() >= version;
@ -2255,7 +2256,7 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss
if (dataAvailable) {
ASSERT( r->value() == latestVersion); // Not that we care, but this used to be checked instead of dataAvailable
ASSERT( data->mutableData().getLatestVersion() > version || context == CSK_RESTORE );
changeNewestAvailable.push_back(make_pair(range, version));
changeNewestAvailable.emplace_back(range, version);
removeRanges.push_back( range );
}
data->addShard( ShardInfo::newNotAssigned(range) );
@ -2263,7 +2264,7 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss
} else if (!dataAvailable) {
// SOMEDAY: Avoid restarting adding/transferred shards
if (version==0){ // bypass fetchkeys; shard is known empty at version 0
changeNewestAvailable.push_back(make_pair(range, latestVersion));
changeNewestAvailable.emplace_back(range, latestVersion);
data->addShard( ShardInfo::newReadWrite(range, data) );
setAvailableStatus(data, range, true);
} else {
@ -2272,7 +2273,7 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss
data->addShard( ShardInfo::newAdding(data, range) );
}
} else {
changeNewestAvailable.push_back(make_pair(range, latestVersion));
changeNewestAvailable.emplace_back(range, latestVersion);
data->addShard( ShardInfo::newReadWrite(range, data) );
}
}
@ -2438,7 +2439,7 @@ private:
rollback( data, rollbackVersion, currentVersion );
}
data->recoveryVersionSkips.push_back(std::make_pair(rollbackVersion, currentVersion - rollbackVersion));
data->recoveryVersionSkips.emplace_back(rollbackVersion, currentVersion - rollbackVersion);
} else if (m.type == MutationRef::SetValue && m.param1 == killStoragePrivateKey) {
throw worker_removed();
} else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) && m.param1.substr(1).startsWith(serverTagPrefix)) {

View File

@ -17,6 +17,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <utility>
#include <vector>
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/ActorCollection.h"
@ -75,18 +77,18 @@ struct IOLog {
ProcessLog issueR, completionR, durationR;
ProcessLog issueW, completionW, durationW;
vector<pair<std::string, ProcessLog*> > logs;
std::vector<std::pair<std::string, ProcessLog*> > logs;
IOLog(){
logs.push_back(std::make_pair("issue", &issue));
logs.push_back(std::make_pair("completion", &completion));
logs.push_back(std::make_pair("duration", &duration));
logs.push_back(std::make_pair("issueR", &issueR));
logs.push_back(std::make_pair("completionR", &completionR));
logs.push_back(std::make_pair("durationR", &durationR));
logs.push_back(std::make_pair("issueW", &issueW));
logs.push_back(std::make_pair("completionW", &completionW));
logs.push_back(std::make_pair("durationW", &durationW));
logs.emplace_back("issue", &issue);
logs.emplace_back("completion", &completion);
logs.emplace_back("duration", &duration);
logs.emplace_back("issueR", &issueR);
logs.emplace_back("completionR", &completionR);
logs.emplace_back("durationR", &durationR);
logs.emplace_back("issueW", &issueW);
logs.emplace_back("completionW", &completionW);
logs.emplace_back("durationW", &durationW);
duration.logLatency = true;
durationR.logLatency = true;
@ -138,10 +140,10 @@ struct IOLog {
struct AsyncFileReadWorkload : public AsyncFileWorkload
{
//Buffers used to store what is being read or written
vector<Reference<AsyncFileBuffer> > readBuffers;
std::vector<Reference<AsyncFileBuffer> > readBuffers;
//The futures for the asynchronous read operations
vector<Future<int> > readFutures;
std::vector<Future<int> > readFutures;
//Number of reads to perform in parallel. Read tests are performed only if this is greater than zero
int numParallelReads;
@ -280,7 +282,7 @@ struct AsyncFileReadWorkload : public AsyncFileWorkload
if (self->unbatched) {
self->ioLog = new IOLog();
vector<Future<Void>> readers;
std::vector<Future<Void>> readers;
for(int i=0; i<self->numParallelReads; i++)
readers.push_back( readLoop(self, i, self->fixedRate / self->numParallelReads) );
@ -333,14 +335,12 @@ struct AsyncFileReadWorkload : public AsyncFileWorkload
}
}
virtual void getMetrics(vector<PerfMetric>& m)
{
if(enabled)
{
m.push_back(PerfMetric("Bytes read/sec", bytesRead.getValue() / testDuration, false));
m.push_back(PerfMetric("Average CPU Utilization (Percentage)", averageCpuUtilization * 100, false));
virtual void getMetrics(std::vector<PerfMetric>& m) {
if (enabled) {
m.emplace_back("Bytes read/sec", bytesRead.getValue() / testDuration, false);
m.emplace_back("Average CPU Utilization (Percentage)", averageCpuUtilization * 100, false);
}
}
};
};
WorkloadFactory<AsyncFileReadWorkload> AsyncFileReadWorkloadFactory("AsyncFileRead");

View File

@ -29,6 +29,9 @@
#define FDBSERVER_BULK_SETUP_ACTOR_H
#include <string>
#include <utility>
#include <vector>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/ServerDBInfo.h"
@ -102,14 +105,14 @@ Future<uint64_t> setupRange( Database cx, T* workload, uint64_t begin, uint64_t
}
ACTOR template<class T>
Future<uint64_t> setupRangeWorker( Database cx, T* workload, vector<std::pair<uint64_t,uint64_t>>* jobs, double maxKeyInsertRate, int keySaveIncrement, int actorId) {
Future<uint64_t> setupRangeWorker( Database cx, T* workload, std::vector<std::pair<uint64_t,uint64_t>>* jobs, double maxKeyInsertRate, int keySaveIncrement, int actorId) {
state double nextStart;
state uint64_t loadedRanges = 0;
state int lastStoredKeysLoaded = 0;
state uint64_t keysLoaded = 0;
state uint64_t bytesStored = 0;
while (jobs->size()){
state pair<uint64_t, uint64_t> job = jobs->back();
state std::pair<uint64_t, uint64_t> job = jobs->back();
jobs->pop_back();
nextStart = now() + (job.second-job.first)/maxKeyInsertRate;
uint64_t numBytes = wait( setupRange(cx, workload, job.first, job.second) );
@ -152,7 +155,7 @@ Future<uint64_t> setupRangeWorker( Database cx, T* workload, vector<std::pair<ui
//to reach that mark. Returns a vector of times (in seconds) corresponding to the counts in the countsOfInterest vector.
//Expects countsOfInterest to be sorted in ascending order
ACTOR static Future<vector<pair<uint64_t, double> > > trackInsertionCount(Database cx, vector<uint64_t> countsOfInterest, double checkInterval)
ACTOR static Future<std::vector<std::pair<uint64_t, double> > > trackInsertionCount(Database cx, std::vector<uint64_t> countsOfInterest, double checkInterval)
{
state KeyRange keyPrefix = KeyRangeRef(std::string("keycount"), std::string("keycount") + char(255));
state KeyRange bytesPrefix = KeyRangeRef(std::string("bytesstored"), std::string("bytesstored") + char(255));
@ -160,7 +163,7 @@ ACTOR static Future<vector<pair<uint64_t, double> > > trackInsertionCount(Databa
state uint64_t lastInsertionCount = 0;
state int currentCountIndex = 0;
state vector<pair<uint64_t, double> > countInsertionRates;
state std::vector<std::pair<uint64_t, double> > countInsertionRates;
state double startTime = now();
@ -184,7 +187,7 @@ ACTOR static Future<vector<pair<uint64_t, double> > > trackInsertionCount(Databa
bytesInserted += *(uint64_t*)bytes[i].value.begin();
while(currentCountIndex < countsOfInterest.size() && countsOfInterest[currentCountIndex] > lastInsertionCount && countsOfInterest[currentCountIndex] <= numInserted)
countInsertionRates.push_back(pair<uint64_t, double>(countsOfInterest[currentCountIndex++], bytesInserted / (now() - startTime)));
countInsertionRates.emplace_back(countsOfInterest[currentCountIndex++], bytesInserted / (now() - startTime));
lastInsertionCount = numInserted;
wait(delay(checkInterval));
@ -198,13 +201,16 @@ ACTOR static Future<vector<pair<uint64_t, double> > > trackInsertionCount(Databa
return countInsertionRates;
}
ACTOR template<class T>
Future<Void> bulkSetup( Database cx, T* workload, uint64_t nodeCount, Promise<double> setupTime,
bool valuesInconsequential = false, double postSetupWarming = 0.0, double maxKeyInsertRate = 1e12,
vector<uint64_t> insertionCountsToMeasure = vector<uint64_t>(), Promise<vector<pair<uint64_t, double> > > ratesAtKeyCounts = Promise<vector<pair<uint64_t, double> > >(),
int keySaveIncrement = 0, double keyCheckInterval = 0.1 ) {
ACTOR template <class T>
Future<Void> bulkSetup(Database cx, T* workload, uint64_t nodeCount, Promise<double> setupTime,
bool valuesInconsequential = false, double postSetupWarming = 0.0,
double maxKeyInsertRate = 1e12,
std::vector<uint64_t> insertionCountsToMeasure = std::vector<uint64_t>(),
Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts =
Promise<std::vector<std::pair<uint64_t, double>>>(),
int keySaveIncrement = 0, double keyCheckInterval = 0.1) {
state vector<pair<uint64_t,uint64_t>> jobs;
state std::vector<std::pair<uint64_t,uint64_t>> jobs;
state uint64_t startNode = (nodeCount * workload->clientId) / workload->clientCount;
state uint64_t endNode = (nodeCount * (workload->clientId+1)) / workload->clientCount;
@ -226,7 +232,7 @@ Future<Void> bulkSetup( Database cx, T* workload, uint64_t nodeCount, Promise<do
.detail("End", endNode)
.detail("CheckMethod", "SimpleValueSize");
setupTime.send(0.0);
ratesAtKeyCounts.send(vector<pair<uint64_t, double> >());
ratesAtKeyCounts.send(std::vector<std::pair<uint64_t, double> >());
return Void();
} else {
TraceEvent("BulkRangeNotFound")
@ -257,14 +263,14 @@ Future<Void> bulkSetup( Database cx, T* workload, uint64_t nodeCount, Promise<do
// create a random vector of range-create jobs
for(uint64_t n=startNode; n<endNode; n+=BULK_SETUP_RANGE_SIZE)
jobs.push_back( std::make_pair( n, std::min(endNode, n+BULK_SETUP_RANGE_SIZE) ) );
jobs.emplace_back(n, std::min(endNode, n+BULK_SETUP_RANGE_SIZE));
g_random->randomShuffle(jobs);
// fire up the workers and wait for them to eat all the jobs
double maxWorkerInsertRate = maxKeyInsertRate / BULK_SETUP_WORKERS / workload->clientCount;
state vector<Future<uint64_t>> fs;
state std::vector<Future<uint64_t>> fs;
state Future<vector<pair<uint64_t, double> > > insertionTimes = vector<pair<uint64_t, double> >();
state Future<std::vector<std::pair<uint64_t, double> > > insertionTimes = std::vector<std::pair<uint64_t, double>>();
if(insertionCountsToMeasure.size() > 0)
{

View File

@ -268,11 +268,11 @@ struct ConsistencyCheckWorkload : TestWorkload
}
//Get a list of key servers; verify that the TLogs and master all agree about who the key servers are
state Promise<vector<pair<KeyRange, vector<StorageServerInterface>>>> keyServerPromise;
state Promise<std::vector<std::pair<KeyRange, std::vector<StorageServerInterface>>>> keyServerPromise;
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise));
if(keyServerResult)
{
state vector<pair<KeyRange, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
//Get the locations of all the shards in the database
state Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise;
@ -323,9 +323,9 @@ struct ConsistencyCheckWorkload : TestWorkload
//Get a list of storage servers from the master and compares them with the TLogs.
//If this is a quiescent check, then each master proxy needs to respond, otherwise only one needs to respond.
//Returns false if there is a failure (in this case, keyServersPromise will never be set)
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<vector<pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise)
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<std::vector<std::pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise)
{
state vector<pair<KeyRange, vector<StorageServerInterface>>> keyServers;
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> keyServers;
//Try getting key server locations from the master proxies
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
@ -382,7 +382,7 @@ struct ConsistencyCheckWorkload : TestWorkload
//Retrieves the locations of all shards in the database
//Returns false if there is a failure (in this case, keyLocationPromise will never be set)
ACTOR Future<bool> getKeyLocations(Database cx, vector<pair<KeyRange, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
ACTOR Future<bool> getKeyLocations(Database cx, std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
{
state Standalone<VectorRef<KeyValueRef>> keyLocations;
state Key beginKey = allKeys.begin.withPrefix(keyServersPrefix);

View File

@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
@ -33,7 +34,7 @@ struct QueuePushWorkload : TestWorkload {
std::string valueString;
Key endingKey, startingKey;
vector<Future<Void>> clients;
std::vector<Future<Void>> clients;
PerfIntCounter transactions, retries;
ContinuousSample<double> commitLatencies, GRVLatencies;
@ -58,25 +59,25 @@ struct QueuePushWorkload : TestWorkload {
virtual Future<bool> check( Database const& cx ) { return true; }
virtual void getMetrics( vector<PerfMetric>& m ) {
virtual void getMetrics(std::vector<PerfMetric>& m ) {
double duration = testDuration;
int writes = transactions.getValue();
m.push_back( PerfMetric( "Measured Duration", duration, true ) );
m.push_back( PerfMetric( "Operations/sec", writes / duration, false ) );
m.emplace_back("Measured Duration", duration, true);
m.emplace_back("Operations/sec", writes / duration, false);
m.push_back( transactions.getMetric() );
m.push_back( retries.getMetric() );
m.push_back( PerfMetric( "Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), true ) );
m.push_back( PerfMetric( "Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), true ) );
m.push_back( PerfMetric( "90% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.90 ), true ) );
m.push_back( PerfMetric( "98% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.98 ), true ) );
m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), true);
m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), true);
m.emplace_back("90% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.90 ), true);
m.emplace_back("98% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.98 ), true);
m.push_back( PerfMetric( "Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), true ) );
m.push_back( PerfMetric( "Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), true ) );
m.push_back( PerfMetric( "90% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.90 ), true ) );
m.push_back( PerfMetric( "98% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.98 ), true ) );
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), true);
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), true);
m.emplace_back("90% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.90 ), true);
m.emplace_back("98% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.98 ), true);
m.push_back( PerfMetric( "Bytes written/sec", (writes * (keyBytes + valueBytes)) / duration, false ) );
m.emplace_back("Bytes written/sec", (writes * (keyBytes + valueBytes)) / duration, false);
}
static Key keyForIndex( int base, int offset ) { return StringRef( format( "%08x%08x", base, offset ) ); }
@ -127,7 +128,7 @@ struct QueuePushWorkload : TestWorkload {
lastKey = self->endingKey;
}
pair<int, int> unpacked = valuesForKey( lastKey );
std::pair<int, int> unpacked = valuesForKey( lastKey );
if( self->forward )
tr.set( keyForIndex( unpacked.first + unpacked.second, g_random->randomInt(1, 1000) ),

View File

@ -19,6 +19,8 @@
*/
#include <boost/lexical_cast.hpp>
#include <utility>
#include <vector>
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
@ -106,15 +108,15 @@ struct ReadWriteWorkload : KVWorkload {
EventMetricHandle<TransactionFailureMetric> transactionFailureMetric;
EventMetricHandle<ReadMetric> readMetric;
vector<Future<Void>> clients;
std::vector<Future<Void>> clients;
PerfIntCounter aTransactions, bTransactions, retries;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, fullReadLatencies;
double readLatencyTotal; int readLatencyCount;
vector<uint64_t> insertionCountsToMeasure;
vector<pair<uint64_t, double> > ratesAtKeyCounts;
std::vector<uint64_t> insertionCountsToMeasure;
std::vector<std::pair<uint64_t, double> > ratesAtKeyCounts;
vector<PerfMetric> periodicMetrics;
std::vector<PerfMetric> periodicMetrics;
bool doSetup;
@ -193,9 +195,9 @@ struct ReadWriteWorkload : KVWorkload {
ASSERT((keyForIndex(a, false) <= keyForIndex(b, false)));
}
vector<std::string> insertionCountsToMeasureString = getOption(options, LiteralStringRef("insertionCountsToMeasure"), vector<std::string>());
for(int i = 0; i < insertionCountsToMeasureString.size(); i++)
{
std::vector<std::string> insertionCountsToMeasureString =
getOption(options, LiteralStringRef("insertionCountsToMeasure"), std::vector<std::string>());
for (int i = 0; i < insertionCountsToMeasureString.size(); i++) {
try
{
uint64_t count = boost::lexical_cast<uint64_t>(insertionCountsToMeasureString[i]);
@ -225,7 +227,7 @@ struct ReadWriteWorkload : KVWorkload {
ACTOR static Future<bool> traceDumpWorkers( Reference<AsyncVar<ServerDBInfo>> db ) {
try {
loop {
ErrorOr<vector<WorkerDetails>> workerList = wait( db->get().clusterInterface.getWorkers.tryGetReply( GetWorkersRequest() ) );
ErrorOr<std::vector<WorkerDetails>> workerList = wait( db->get().clusterInterface.getWorkers.tryGetReply( GetWorkersRequest() ) );
if( workerList.present() ) {
std::vector<Future<ErrorOr<Void>>> dumpRequests;
for( int i = 0; i < workerList.get().size(); i++)
@ -254,53 +256,53 @@ struct ReadWriteWorkload : KVWorkload {
return true;
}
virtual void getMetrics( vector<PerfMetric>& m ) {
virtual void getMetrics(std::vector<PerfMetric>& m) {
double duration = metricsDuration;
int reads = (aTransactions.getValue() * readsPerTransactionA) + (bTransactions.getValue() * readsPerTransactionB);
int writes = (aTransactions.getValue() * writesPerTransactionA) + (bTransactions.getValue() * writesPerTransactionB);
m.push_back( PerfMetric( "Measured Duration", duration, true ) );
m.push_back( PerfMetric( "Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, false ) );
m.push_back( PerfMetric( "Operations/sec", ( ( reads + writes ) / duration ), false ) );
m.emplace_back("Measured Duration", duration, true);
m.emplace_back("Transactions/sec", (aTransactions.getValue() + bTransactions.getValue()) / duration, false);
m.emplace_back("Operations/sec", ( ( reads + writes ) / duration ), false);
m.push_back( aTransactions.getMetric() );
m.push_back( bTransactions.getMetric() );
m.push_back( retries.getMetric() );
m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) );
m.push_back( PerfMetric( "Read rows", reads, false ) );
m.push_back( PerfMetric( "Write rows", writes, false ) );
m.emplace_back("Mean load time (seconds)", loadTime, true);
m.emplace_back("Read rows", reads, false);
m.emplace_back("Write rows", writes, false);
if(!rampUpLoad) {
m.push_back(PerfMetric("Mean Latency (ms)", 1000 * latencies.mean(), true));
m.push_back(PerfMetric("Median Latency (ms, averaged)", 1000 * latencies.median(), true));
m.push_back(PerfMetric("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), true));
m.push_back(PerfMetric("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), true));
m.push_back(PerfMetric("Max Latency (ms, averaged)", 1000 * latencies.max(), true));
m.emplace_back("Mean Latency (ms)", 1000 * latencies.mean(), true);
m.emplace_back("Median Latency (ms, averaged)", 1000 * latencies.median(), true);
m.emplace_back("90% Latency (ms, averaged)", 1000 * latencies.percentile(0.90), true);
m.emplace_back("98% Latency (ms, averaged)", 1000 * latencies.percentile(0.98), true);
m.emplace_back("Max Latency (ms, averaged)", 1000 * latencies.max(), true);
m.push_back(PerfMetric("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), true));
m.push_back(PerfMetric("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), true));
m.push_back(PerfMetric("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), true));
m.emplace_back("Mean Row Read Latency (ms)", 1000 * readLatencies.mean(), true);
m.emplace_back("Median Row Read Latency (ms, averaged)", 1000 * readLatencies.median(), true);
m.emplace_back("Max Row Read Latency (ms, averaged)", 1000 * readLatencies.max(), true);
m.push_back(PerfMetric("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), true));
m.push_back(PerfMetric("Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), true));
m.push_back(PerfMetric("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), true));
m.emplace_back("Mean Total Read Latency (ms)", 1000 * fullReadLatencies.mean(), true);
m.emplace_back("Median Total Read Latency (ms, averaged)", 1000 * fullReadLatencies.median(), true);
m.emplace_back("Max Total Latency (ms, averaged)", 1000 * fullReadLatencies.max(), true);
m.push_back(PerfMetric("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), true));
m.push_back(PerfMetric("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), true));
m.push_back(PerfMetric("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), true));
m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), true);
m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), true);
m.emplace_back("Max GRV Latency (ms, averaged)", 1000 * GRVLatencies.max(), true);
m.push_back(PerfMetric("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), true));
m.push_back(PerfMetric("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), true));
m.push_back(PerfMetric("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), true));
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), true);
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), true);
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), true);
}
m.push_back( PerfMetric( "Read rows/sec", reads / duration, false ) );
m.push_back( PerfMetric( "Write rows/sec", writes / duration, false ) );
m.push_back( PerfMetric( "Bytes read/sec", (reads * (keyBytes + (minValueBytes+maxValueBytes)*0.5)) / duration, false ) );
m.push_back( PerfMetric( "Bytes written/sec", (writes * (keyBytes + (minValueBytes+maxValueBytes)*0.5)) / duration, false ) );
m.emplace_back("Read rows/sec", reads / duration, false);
m.emplace_back("Write rows/sec", writes / duration, false);
m.emplace_back("Bytes read/sec", (reads * (keyBytes + (minValueBytes+maxValueBytes)*0.5)) / duration, false);
m.emplace_back("Bytes written/sec", (writes * (keyBytes + (minValueBytes+maxValueBytes)*0.5)) / duration, false);
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
vector<pair<uint64_t, double> >::iterator ratesItr = ratesAtKeyCounts.begin();
std::vector<std::pair<uint64_t, double> >::iterator ratesItr = ratesAtKeyCounts.begin();
for(; ratesItr != ratesAtKeyCounts.end(); ratesItr++)
m.push_back(PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
m.emplace_back(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false);
}
Value randomValue() { return StringRef( (uint8_t*)valueString.c_str(), g_random->randomInt(minValueBytes, maxValueBytes+1) ); }
@ -336,40 +338,40 @@ struct ReadWriteWorkload : KVWorkload {
bool recordEnd = self->shouldRecord( now() );
if( recordBegin && recordEnd ) {
std::string ts = format("T=%04.0fs:", elapsed);
self->periodicMetrics.push_back( PerfMetric( ts + "Operations/sec", (ops-last_ops)/self->periodicLoggingInterval, false ) );
self->periodicMetrics.emplace_back(ts + "Operations/sec", (ops-last_ops)/self->periodicLoggingInterval, false);
//if(self->rampUpLoad) {
self->periodicMetrics.push_back(PerfMetric(ts + "Mean Latency (ms)", 1000 * self->latencies.mean(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Median Latency (ms, averaged)", 1000 * self->latencies.median(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "5% Latency (ms, averaged)", 1000 * self->latencies.percentile(.05), true));
self->periodicMetrics.push_back(PerfMetric(ts + "95% Latency (ms, averaged)", 1000 * self->latencies.percentile(.95), true));
self->periodicMetrics.emplace_back(ts + "Mean Latency (ms)", 1000 * self->latencies.mean(), true);
self->periodicMetrics.emplace_back(ts + "Median Latency (ms, averaged)", 1000 * self->latencies.median(), true);
self->periodicMetrics.emplace_back(ts + "5% Latency (ms, averaged)", 1000 * self->latencies.percentile(.05), true);
self->periodicMetrics.emplace_back(ts + "95% Latency (ms, averaged)", 1000 * self->latencies.percentile(.95), true);
self->periodicMetrics.push_back(PerfMetric(ts + "Mean Row Read Latency (ms)", 1000 * self->readLatencies.mean(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Median Row Read Latency (ms, averaged)", 1000 * self->readLatencies.median(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "5% Row Read Latency (ms, averaged)", 1000 * self->readLatencies.percentile(.05), true));
self->periodicMetrics.push_back(PerfMetric(ts + "95% Row Read Latency (ms, averaged)", 1000 * self->readLatencies.percentile(.95), true));
self->periodicMetrics.emplace_back(ts + "Mean Row Read Latency (ms)", 1000 * self->readLatencies.mean(), true);
self->periodicMetrics.emplace_back(ts + "Median Row Read Latency (ms, averaged)", 1000 * self->readLatencies.median(), true);
self->periodicMetrics.emplace_back(ts + "5% Row Read Latency (ms, averaged)", 1000 * self->readLatencies.percentile(.05), true);
self->periodicMetrics.emplace_back(ts + "95% Row Read Latency (ms, averaged)", 1000 * self->readLatencies.percentile(.95), true);
self->periodicMetrics.push_back(PerfMetric(ts + "Mean Total Read Latency (ms)", 1000 * self->fullReadLatencies.mean(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Median Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.median(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "5% Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.percentile(.05), true));
self->periodicMetrics.push_back(PerfMetric(ts + "95% Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.percentile(.95), true));
self->periodicMetrics.emplace_back(ts + "Mean Total Read Latency (ms)", 1000 * self->fullReadLatencies.mean(), true);
self->periodicMetrics.emplace_back(ts + "Median Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.median(), true);
self->periodicMetrics.emplace_back(ts + "5% Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.percentile(.05), true);
self->periodicMetrics.emplace_back(ts + "95% Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.percentile(.95), true);
self->periodicMetrics.push_back(PerfMetric(ts + "Mean GRV Latency (ms)", 1000 * self->GRVLatencies.mean(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Median GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.median(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "5% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.05), true));
self->periodicMetrics.push_back(PerfMetric(ts + "95% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.95), true));
self->periodicMetrics.emplace_back(ts + "Mean GRV Latency (ms)", 1000 * self->GRVLatencies.mean(), true);
self->periodicMetrics.emplace_back(ts + "Median GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.median(), true);
self->periodicMetrics.emplace_back(ts + "5% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.05), true);
self->periodicMetrics.emplace_back(ts + "95% GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.percentile(.95), true);
self->periodicMetrics.push_back(PerfMetric(ts + "Mean Commit Latency (ms)", 1000 * self->commitLatencies.mean(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Median Commit Latency (ms, averaged)", 1000 * self->commitLatencies.median(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "5% Commit Latency (ms, averaged)", 1000 * self->commitLatencies.percentile(.05), true));
self->periodicMetrics.push_back(PerfMetric(ts + "95% Commit Latency (ms, averaged)", 1000 * self->commitLatencies.percentile(.95), true));
self->periodicMetrics.emplace_back(ts + "Mean Commit Latency (ms)", 1000 * self->commitLatencies.mean(), true);
self->periodicMetrics.emplace_back(ts + "Median Commit Latency (ms, averaged)", 1000 * self->commitLatencies.median(), true);
self->periodicMetrics.emplace_back(ts + "5% Commit Latency (ms, averaged)", 1000 * self->commitLatencies.percentile(.05), true);
self->periodicMetrics.emplace_back(ts + "95% Commit Latency (ms, averaged)", 1000 * self->commitLatencies.percentile(.95), true);
//}
self->periodicMetrics.push_back(PerfMetric(ts + "Max Latency (ms, averaged)", 1000 * self->latencies.max(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Max Row Read Latency (ms, averaged)", 1000 * self->readLatencies.max(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Max Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.max(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Max GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.max(), true));
self->periodicMetrics.push_back(PerfMetric(ts + "Max Commit Latency (ms, averaged)", 1000 * self->commitLatencies.max(), true));
self->periodicMetrics.emplace_back(ts + "Max Latency (ms, averaged)", 1000 * self->latencies.max(), true);
self->periodicMetrics.emplace_back(ts + "Max Row Read Latency (ms, averaged)", 1000 * self->readLatencies.max(), true);
self->periodicMetrics.emplace_back(ts + "Max Total Read Latency (ms, averaged)", 1000 * self->fullReadLatencies.max(), true);
self->periodicMetrics.emplace_back(ts + "Max GRV Latency (ms, averaged)", 1000 * self->GRVLatencies.max(), true);
self->periodicMetrics.emplace_back(ts + "Max Commit Latency (ms, averaged)", 1000 * self->commitLatencies.max(), true);
}
last_ops = ops;
@ -450,7 +452,7 @@ struct ReadWriteWorkload : KVWorkload {
return Void();
state Promise<double> loadTime;
state Promise<vector<pair<uint64_t, double> > > ratesAtKeyCounts;
state Promise<std::vector<std::pair<uint64_t, double> > > ratesAtKeyCounts;
wait( bulkSetup( cx, self, self->nodeCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay, self->maxInsertRate,
self->insertionCountsToMeasure, ratesAtKeyCounts ) );
@ -481,7 +483,7 @@ struct ReadWriteWorkload : KVWorkload {
wait( delay( std::max(0.1, 1.0 - (now() - startTime) ) ) );
vector<Future<Void>> clients;
std::vector<Future<Void>> clients;
if(self->enableReadLatencyLogging)
clients.push_back(tracePeriodically(self));
@ -554,9 +556,9 @@ struct ReadWriteWorkload : KVWorkload {
state double tstart = now();
state bool aTransaction = g_random->random01() > (self->rampTransactionType ? self->sweepAlpha(startTime) : self->alpha);
state vector<int64_t> keys;
state vector<Value> values;
state vector<KeyRange> extra_ranges;
state std::vector<int64_t> keys;
state std::vector<Value> values;
state std::vector<KeyRange> extra_ranges;
int reads = aTransaction ? self->readsPerTransactionA : self->readsPerTransactionB;
state int writes = aTransaction ? self->writesPerTransactionA : self->writesPerTransactionB;
state int extra_read_conflict_ranges = writes ? self->extraReadConflictRangesPerTransaction : 0;

View File

@ -33,7 +33,7 @@ struct WriteBandwidthWorkload : KVWorkload {
double testDuration, warmingDelay, loadTime, maxInsertRate;
std::string valueString;
vector<Future<Void>> clients;
std::vector<Future<Void>> clients;
PerfIntCounter transactions, retries;
ContinuousSample<double> commitLatencies, GRVLatencies;
@ -56,29 +56,29 @@ struct WriteBandwidthWorkload : KVWorkload {
virtual Future<bool> check( Database const& cx ) { return true; }
virtual void getMetrics( vector<PerfMetric>& m ) {
virtual void getMetrics( std::vector<PerfMetric>& m ) {
double duration = testDuration;
int writes = transactions.getValue() * keysPerTransaction;
m.push_back( PerfMetric( "Measured Duration", duration, true ) );
m.push_back( PerfMetric( "Transactions/sec", transactions.getValue() / duration, false ) );
m.push_back( PerfMetric( "Operations/sec", writes / duration, false ) );
m.push_back( transactions.getMetric() );
m.push_back( retries.getMetric() );
m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) );
m.push_back( PerfMetric( "Write rows", writes, false ) );
m.emplace_back("Measured Duration", duration, true);
m.emplace_back("Transactions/sec", transactions.getValue() / duration, false);
m.emplace_back("Operations/sec", writes / duration, false);
m.push_back(transactions.getMetric());
m.push_back(retries.getMetric());
m.emplace_back("Mean load time (seconds)", loadTime, true);
m.emplace_back("Write rows", writes, false);
m.push_back( PerfMetric( "Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), true ) );
m.push_back( PerfMetric( "Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), true ) );
m.push_back( PerfMetric( "90% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.90 ), true ) );
m.push_back( PerfMetric( "98% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.98 ), true ) );
m.emplace_back("Mean GRV Latency (ms)", 1000 * GRVLatencies.mean(), true);
m.emplace_back("Median GRV Latency (ms, averaged)", 1000 * GRVLatencies.median(), true);
m.emplace_back("90% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.90 ), true);
m.emplace_back("98% GRV Latency (ms, averaged)", 1000 * GRVLatencies.percentile( 0.98 ), true);
m.push_back( PerfMetric( "Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), true ) );
m.push_back( PerfMetric( "Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), true ) );
m.push_back( PerfMetric( "90% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.90 ), true ) );
m.push_back( PerfMetric( "98% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.98 ), true ) );
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), true);
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), true);
m.emplace_back("90% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.90 ), true);
m.emplace_back("98% Commit Latency (ms, averaged)", 1000 * commitLatencies.percentile( 0.98 ), true);
m.push_back( PerfMetric( "Write rows/sec", writes / duration, false ) );
m.push_back( PerfMetric( "Bytes written/sec", (writes * (keyBytes + (minValueBytes+maxValueBytes)*0.5)) / duration, false ) );
m.emplace_back("Write rows/sec", writes / duration, false);
m.emplace_back("Bytes written/sec", (writes * (keyBytes + (minValueBytes+maxValueBytes)*0.5)) / duration, false);
}
Value randomValue() { return StringRef( (uint8_t*)valueString.c_str(), g_random->randomInt(minValueBytes, maxValueBytes+1) ); }
@ -89,7 +89,7 @@ struct WriteBandwidthWorkload : KVWorkload {
ACTOR Future<Void> _setup( Database cx, WriteBandwidthWorkload *self ) {
state Promise<double> loadTime;
state Promise<vector<pair<uint64_t, double> > > ratesAtKeyCounts;
state Promise<std::vector<std::pair<uint64_t, double> > > ratesAtKeyCounts;
wait( bulkSetup( cx, self, self->nodeCount, loadTime, true, self->warmingDelay, self->maxInsertRate ) );
self->loadTime = loadTime.getFuture().get();