foundationdb/fdbserver/storageserver.actor.cpp

3536 lines
148 KiB
C++

/*
* storageserver.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/actorcompiler.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "flow/IndexedSet.h"
#include "flow/Hash3.h"
#include "flow/ActorCollection.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/NativeAPI.h"
#include "fdbclient/Notified.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "WorkerInterface.h"
#include "TLogInterface.h"
#include "MoveKeys.h"
#include "Knobs.h"
#include "WaitFailure.h"
#include "IKeyValueStore.h"
#include "fdbclient/VersionedMap.h"
#include "StorageMetrics.h"
#include "fdbrpc/sim_validation.h"
#include "ServerDBInfo.h"
#include "fdbrpc/Smoother.h"
#include "flow/Stats.h"
#include "LogSystem.h"
#include "RecoveryState.h"
#include "LogProtocolMessage.h"
#include "flow/TDMetric.actor.h"
using std::make_pair;
#pragma region Data Structures
#define SHORT_CIRCUT_ACTUAL_STORAGE 0
struct StorageServer;
class ValueOrClearToRef {
public:
static ValueOrClearToRef value(ValueRef const& v) { return ValueOrClearToRef(v, false); }
static ValueOrClearToRef clearTo(KeyRef const& k) { return ValueOrClearToRef(k, true); }
bool isValue() const { return !isClear; };
bool isClearTo() const { return isClear; }
ValueRef const& getValue() const { ASSERT( isValue() ); return item; };
KeyRef const& getEndKey() const { ASSERT(isClearTo()); return item; };
private:
ValueOrClearToRef( StringRef item, bool isClear ) : item(item), isClear(isClear) {}
StringRef item;
bool isClear;
};
struct AddingShard : NonCopyable {
KeyRange keys;
Future<Void> fetchClient; // holds FetchKeys() actor
Promise<Void> fetchComplete;
Promise<Void> readWrite;
std::deque< Standalone<VerUpdateRef> > updates; // during the Fetching phase, mutations with key in keys and version>=(fetchClient's) fetchVersion;
struct StorageServer* server;
Version transferredVersion;
enum Phase { WaitPrevious, Fetching, Waiting };
Phase phase;
AddingShard( StorageServer* server, KeyRangeRef const& keys );
// When fetchKeys "partially completes" (splits an adding shard in two), this is used to construct the left half
AddingShard( AddingShard* prev, KeyRange const& keys )
: keys(keys), fetchClient(prev->fetchClient), server(prev->server), transferredVersion(prev->transferredVersion), phase(prev->phase)
{
}
~AddingShard() {
if( !fetchComplete.isSet() )
fetchComplete.send(Void());
if( !readWrite.isSet() )
readWrite.send(Void());
}
void addMutation( Version version, MutationRef const& mutation );
bool isTransferred() const { return phase == Waiting; }
};
struct ShardInfo : ReferenceCounted<ShardInfo>, NonCopyable {
AddingShard* adding;
struct StorageServer* readWrite;
KeyRange keys;
uint64_t changeCounter;
ShardInfo(KeyRange keys, AddingShard* adding, StorageServer* readWrite)
: adding(adding), readWrite(readWrite), keys(keys)
{
}
~ShardInfo() {
delete adding;
}
static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, NULL, NULL); }
static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, NULL, data); }
static ShardInfo* newAdding(StorageServer* data, KeyRange keys) { return new ShardInfo(keys, new AddingShard(data, keys), NULL); }
static ShardInfo* addingSplitLeft( KeyRange keys, AddingShard* oldShard) { return new ShardInfo(keys, new AddingShard(oldShard, keys), NULL); }
bool isReadable() const { return readWrite!=NULL; }
bool notAssigned() const { return !readWrite && !adding; }
bool assigned() const { return readWrite || adding; }
bool isInVersionedData() const { return readWrite || (adding && adding->isTransferred()); }
void addMutation( Version version, MutationRef const& mutation );
bool isFetched() const { return readWrite || ( adding && adding->fetchComplete.isSet() ); }
const char* debugDescribeState() const {
if (notAssigned()) return "NotAssigned";
else if (adding && !adding->isTransferred()) return "AddingFetching";
else if (adding) return "AddingTransferred";
else return "ReadWrite";
}
};
struct StorageServerDisk {
explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage) {}
void makeNewStorageServerDurable();
bool makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft );
void makeVersionDurable( Version version );
Future<bool> restoreDurableState();
void changeLogProtocol(Version version, uint64_t protocol);
void writeMutation( MutationRef mutation );
void writeKeyValue( KeyValueRef kv );
void clearRange( KeyRangeRef keys );
Future<Void> commit() { return storage->commit(); }
// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
Future<Key> readNextKeyInclusive( KeyRef key ) { return readFirstKey(storage, KeyRangeRef(key, allKeys.end)); }
Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) { return storage->readValue(key, debugID); }
Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>() ) { return storage->readValuePrefix(key, maxLength, debugID); }
Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) { return storage->readRange(keys, rowLimit, byteLimit); }
KeyValueStoreType getKeyValueStoreType() { return storage->getType(); }
StorageBytes getStorageBytes() { return storage->getStorageBytes(); }
private:
struct StorageServer* data;
IKeyValueStore* storage;
void writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext );
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
Standalone<VectorRef<KeyValueRef>> r = wait( storage->readRange( range, 1 ) );
if (r.size()) return r[0].key;
else return range.end;
}
};
struct UpdateEagerReadInfo {
vector<KeyRef> keyBegin;
vector<Key> keyEnd; // these are for ClearRange
vector<pair<KeyRef, int>> keys;
vector<Optional<Value>> value;
void addMutations( VectorRef<MutationRef> const& mutations ) {
for(auto& m : mutations)
addMutation(m);
}
void addMutation( MutationRef const& m ) {
// SOMEDAY: Theoretically we can avoid a read if there is an earlier overlapping ClearRange
if (m.type == MutationRef::ClearRange && !m.param2.startsWith(systemKeys.end))
keyBegin.push_back( m.param2 );
else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) || (m.type == MutationRef::ByteMax))
keys.push_back(pair<KeyRef, int>(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT));
else if (isAtomicOp((MutationRef::Type) m.type))
keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size()));
}
void finishKeyBegin() {
std::sort(keyBegin.begin(), keyBegin.end());
keyBegin.resize( std::unique(keyBegin.begin(), keyBegin.end()) - keyBegin.begin() );
std::sort(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return (lhs.first < rhs.first) || (lhs.first == rhs.first && lhs.second > rhs.second); } );
keys.resize(std::unique(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first == rhs.first; } ) - keys.begin());
//value gets populated in doEagerReads
}
Optional<Value>& getValue(KeyRef key) {
int i = std::lower_bound(keys.begin(), keys.end(), pair<KeyRef, int>(key, 0), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first < rhs.first; } ) - keys.begin();
ASSERT( i < keys.size() && keys[i].first == key );
return value[i];
}
KeyRef getKeyEnd( KeyRef key ) {
int i = std::lower_bound(keyBegin.begin(), keyBegin.end(), key) - keyBegin.begin();
ASSERT( i < keyBegin.size() && keyBegin[i] == key );
return keyEnd[i];
}
};
const int VERSION_OVERHEAD = 64 + sizeof(Version) + sizeof(Standalone<VersionUpdateRef>) + //mutationLog, 64b overhead for map
2 * (64 + sizeof(Version) + sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); //versioned map [ x2 for createNewVersion(version+1) ], 64b overhead for map
static int mvccStorageBytes( MutationRef const& m ) { return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2; }
struct FetchInjectionInfo {
Arena arena;
vector<VerUpdateRef> changes;
};
struct StorageServer {
typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
private:
// versionedData contains sets and clears.
// * Nonoverlapping: No clear overlaps a set or another clear, or adjoins another clear.
// ~ Clears are maximal: If versionedData.at(v) contains a clear [b,e) then
// there is a key data[e]@v, or e==allKeys.end, or a shard boundary or former boundary at e
// * Reads are possible: When k is in a readable shard, for any v in [storageVersion, version.get()],
// storage[k] + versionedData.at(v)[k] = database[k] @ v (storage[k] might be @ any version in [durableVersion, storageVersion])
// * Transferred shards are partially readable: When k is in an adding, transferred shard, for any v in [transferredVersion, version.get()],
// storage[k] + versionedData.at(v)[k] = database[k] @ v
// * versionedData contains versions [storageVersion(), version.get()]. It might also contain version (version.get()+1), in which changeDurableVersion may be deleting ghosts, and/or it might
// contain later versions if applyUpdate is on the stack.
// * Old shards are erased: versionedData.atLatest() has entries (sets or intersecting clears) only for keys in readable or adding,transferred shards.
// Earlier versions may have extra entries for shards that *were* readable or adding,transferred when those versions were the latest, but they eventually are forgotten.
// * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion(), but views
// at older versions may contain older items which are also in storage (this is OK because of idempotency)
VersionedData versionedData;
std::map<Version, Standalone<VersionUpdateRef>> mutationLog; // versions (durableVersion, version]
public:
Tag tag;
vector<pair<Version,Tag>> history;
vector<pair<Version,Tag>> allHistory;
Version poppedAllAfter;
std::map<Version, Arena> freeable; // for each version, an Arena that must be held until that version is < oldestVersion
Arena lastArena;
std::map<Version, Standalone<VersionUpdateRef>> const & getMutationLog() { return mutationLog; }
std::map<Version, Standalone<VersionUpdateRef>>& getMutableMutationLog() { return mutationLog; }
VersionedData const& data() const { return versionedData; }
VersionedData& mutableData() { return versionedData; }
void addMutationToMutationLogOrStorage( Version ver, MutationRef m ); // Appends m to mutationLog@ver, or to storage if ver==invalidVersion
// Update the byteSample, and write the updates to the mutation log@ver, or to storage if ver==invalidVersion
void byteSampleApplyMutation( MutationRef const& m, Version ver );
void byteSampleApplySet( KeyValueRef kv, Version ver );
void byteSampleApplyClear( KeyRangeRef range, Version ver );
void popVersion(Version v, bool popAllTags = false) {
if(logSystem) {
if(v > poppedAllAfter) {
popAllTags = true;
poppedAllAfter = std::numeric_limits<Version>::max();
}
vector<pair<Version,Tag>>* hist = &history;
vector<pair<Version,Tag>> allHistoryCopy;
if(popAllTags) {
allHistoryCopy = allHistory;
hist = &allHistoryCopy;
}
while(hist->size() && v > hist->back().first ) {
logSystem->pop( v, hist->back().second );
hist->pop_back();
}
if(hist->size()) {
logSystem->pop( v, hist->back().second );
} else {
logSystem->pop( v, tag );
}
}
}
Standalone<VersionUpdateRef>& addVersionToMutationLog(Version v) {
// return existing version...
auto m = mutationLog.find(v);
if (m != mutationLog.end())
return m->second;
// ...or create a new one
auto& u = mutationLog[v];
u.version = v;
if (lastArena.getSize() >= 65536) lastArena = Arena(4096);
u.arena() = lastArena;
counters.bytesInput += VERSION_OVERHEAD;
return u;
}
MutationRef addMutationToMutationLog(Standalone<VersionUpdateRef> &mLV, MutationRef const& m){
byteSampleApplyMutation(m, mLV.version);
counters.bytesInput += mvccStorageBytes(m);
return mLV.mutations.push_back_deep( mLV.arena(), m );
}
StorageServerDisk storage;
KeyRangeMap< Reference<ShardInfo> > shards;
uint64_t shardChangeCounter; // max( shards->changecounter )
// newestAvailableVersion[k]
// == invalidVersion -> k is unavailable at all versions
// <= storageVersion -> k is unavailable at all versions (but might be read anyway from storage if we are in the process of committing makeShardDurable)
// == v -> k is readable (from storage+versionedData) @ [storageVersion,v], and not being updated when version increases
// == latestVersion -> k is readable (from storage+versionedData) @ [storageVersion,version.get()], and thus stays available when version increases
CoalescedKeyRangeMap< Version > newestAvailableVersion;
CoalescedKeyRangeMap< Version > newestDirtyVersion; // Similar to newestAvailableVersion, but includes (only) keys that were only partly available (due to cancelled fetchKeys)
// The following are in rough order from newest to oldest
Version lastTLogVersion, lastVersionWithData, restoredVersion;
NotifiedVersion version;
NotifiedVersion desiredOldestVersion; // We can increase oldestVersion (and then durableVersion) to this version when the disk permits
NotifiedVersion oldestVersion; // See also storageVersion()
NotifiedVersion durableVersion; // At least this version will be readable from storage after a power failure
Deque<std::pair<Version,Version>> recoveryVersionSkips;
int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage server
uint64_t logProtocol;
Reference<ILogSystem> logSystem;
Reference<ILogSystem::IPeekCursor> logCursor;
UID thisServerID;
Key sk;
Reference<AsyncVar<ServerDBInfo>> db;
Database cx;
StorageServerMetrics metrics;
CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
AsyncVar<bool> byteSampleClearsTooLarge;
Future<Void> byteSampleRecovery;
AsyncMap<Key,bool> watches;
int64_t watchBytes;
int64_t numWatches;
AsyncVar<bool> noRecentUpdates;
double lastUpdate;
Int64MetricHandle readQueueSizeMetric;
std::string folder;
// defined only during splitMutations()/addMutation()
UpdateEagerReadInfo *updateEagerReads;
FlowLock durableVersionLock;
FlowLock fetchKeysParallelismLock;
vector< Promise<FetchInjectionInfo*> > readyFetchKeys;
int64_t instanceID;
Promise<Void> otherError;
Promise<Void> coreStarted;
bool shuttingDown;
bool behind;
bool debug_inApplyUpdate;
double debug_lastValidateTime;
int maxQueryQueue;
int getAndResetMaxQueryQueueSize() {
int val = maxQueryQueue;
maxQueryQueue = 0;
return val;
}
struct Counters {
CounterCollection cc;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries;
Counter bytesInput, bytesDurable, bytesFetched,
mutationBytes; // Like bytesInput but without MVCC accounting
Counter mutations, setMutations, clearRangeMutations, atomicMutations;
Counter updateBatches, updateVersions;
Counter loops;
Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount;
Counters(StorageServer* self)
: cc("StorageServer", self->thisServerID.toString()),
getKeyQueries("GetKeyQueries", cc),
getValueQueries("GetValueQueries",cc),
getRangeQueries("GetRangeQueries", cc),
allQueries("QueryQueue", cc),
finishedQueries("FinishedQueries", cc),
rowsQueried("RowsQueried", cc),
bytesQueried("BytesQueried", cc),
watchQueries("WatchQueries", cc),
bytesInput("BytesInput", cc),
bytesDurable("BytesDurable", cc),
bytesFetched("BytesFetched", cc),
mutationBytes("MutationBytes", cc),
mutations("Mutations", cc),
setMutations("SetMutations", cc),
clearRangeMutations("ClearRangeMutations", cc),
atomicMutations("AtomicMutations", cc),
updateBatches("UpdateBatches", cc),
updateVersions("UpdateVersions", cc),
loops("Loops", cc),
fetchWaitingMS("FetchWaitingMS", cc),
fetchWaitingCount("FetchWaitingCount", cc),
fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc)
{
specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; });
specialCounter(cc, "Version", [self](){ return self->version.get(); });
specialCounter(cc, "StorageVersion", [self](){ return self->storageVersion(); });
specialCounter(cc, "DurableVersion", [self](){ return self->durableVersion.get(); });
specialCounter(cc, "DesiredOldestVersion", [self](){ return self->desiredOldestVersion.get(); });
specialCounter(cc, "VersionLag", [self](){ return self->versionLag; });
specialCounter(cc, "FetchKeysFetchActive", [self](){ return self->fetchKeysParallelismLock.activePermits(); });
specialCounter(cc, "FetchKeysWaiting", [self](){ return self->fetchKeysParallelismLock.waiters(); });
specialCounter(cc, "QueryQueueMax", [self](){ return self->getAndResetMaxQueryQueueSize(); });
specialCounter(cc, "BytesStored", [self](){ return self->metrics.byteSample.getEstimate(allKeys); });
specialCounter(cc, "ActiveWatches", [self](){ return self->numWatches; });
specialCounter(cc, "WatchBytes", [self](){ return self->watchBytes; });
specialCounter(cc, "KvstoreBytesUsed", [self](){ return self->storage.getStorageBytes().used; });
specialCounter(cc, "KvstoreBytesFree", [self](){ return self->storage.getStorageBytes().free; });
specialCounter(cc, "KvstoreBytesAvailable", [self](){ return self->storage.getStorageBytes().available; });
specialCounter(cc, "KvstoreBytesTotal", [self](){ return self->storage.getStorageBytes().total; });
}
} counters;
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
: instanceID(g_random->randomUniqueID().first()),
storage(this, storage), db(db),
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
versionLag(0),
updateEagerReads(0),
shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max())
{
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
desiredOldestVersion.initMetric(LiteralStringRef("StorageServer.DesiredOldestVersion"), counters.cc.id);
newestAvailableVersion.insert(allKeys, invalidVersion);
newestDirtyVersion.insert(allKeys, invalidVersion);
addShard( ShardInfo::newNotAssigned( allKeys ) );
cx = openDBOnServer(db, TaskDefaultEndpoint, true, true);
}
//~StorageServer() { fclose(log); }
// Puts the given shard into shards. The caller is responsible for adding shards
// for all ranges in shards.getAffectedRangesAfterInsertion(newShard->keys)), because these
// shards are invalidated by the call.
void addShard( ShardInfo* newShard ) {
ASSERT( !newShard->keys.empty() );
newShard->changeCounter = ++shardChangeCounter;
//TraceEvent("AddShard", this->thisServerID).detail("KeyBegin", printable(newShard->keys.begin)).detail("KeyEnd", printable(newShard->keys.end)).detail("State", newShard->isReadable() ? "Readable" : newShard->notAssigned() ? "NotAssigned" : "Adding").detail("Version", this->version.get());
/*auto affected = shards.getAffectedRangesAfterInsertion( newShard->keys, Reference<ShardInfo>() );
for(auto i = affected.begin(); i != affected.end(); ++i)
shards.insert( *i, Reference<ShardInfo>() );*/
shards.insert( newShard->keys, Reference<ShardInfo>(newShard) );
}
void addMutation(Version version, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads );
void setInitialVersion(Version ver) {
version = ver;
desiredOldestVersion = ver;
oldestVersion = ver;
durableVersion = ver;
lastVersionWithData = ver;
restoredVersion = ver;
mutableData().createNewVersion(ver);
mutableData().forgetVersionsBefore(ver);
}
// This is the maximum version that might be read from storage (the minimum version is durableVersion)
Version storageVersion() const { return oldestVersion.get(); }
bool isReadable( KeyRangeRef const& keys ) {
auto sh = shards.intersectingRanges(keys);
for(auto i = sh.begin(); i != sh.end(); ++i)
if (!i->value()->isReadable())
return false;
return true;
}
void checkChangeCounter( uint64_t oldShardChangeCounter, KeyRef const& key ) {
if (oldShardChangeCounter != shardChangeCounter &&
shards[key]->changeCounter > oldShardChangeCounter)
{
TEST(true); // shard change during getValueQ
throw wrong_shard_server();
}
}
void checkChangeCounter( uint64_t oldShardChangeCounter, KeyRangeRef const& keys ) {
if (oldShardChangeCounter != shardChangeCounter) {
auto sh = shards.intersectingRanges(keys);
for(auto i = sh.begin(); i != sh.end(); ++i)
if (i->value()->changeCounter > oldShardChangeCounter) {
TEST(true); // shard change during range operation
throw wrong_shard_server();
}
}
}
Counter::Value queueSize() {
return counters.bytesInput.getValue() - counters.bytesDurable.getValue();
}
double getPenalty() {
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
}
};
// If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end,
// and H(key) < |key+value|/bytesPerSample,
// let sampledSize = max(|key+value|,bytesPerSample)
// persistByteSampleKeys.begin()+key := sampledSize is in storage
// (key,sampledSize) is in byteSample
// So P(key is sampled) * sampledSize == |key+value|
void StorageServer::byteSampleApplyMutation( MutationRef const& m, Version ver ){
if (m.type == MutationRef::ClearRange)
byteSampleApplyClear( KeyRangeRef(m.param1, m.param2), ver );
else if (m.type == MutationRef::SetValue)
byteSampleApplySet( KeyValueRef(m.param1, m.param2), ver );
else
ASSERT(false); // Mutation of unknown type modfying byte sample
}
#pragma endregion
/////////////////////////////////// Validation ///////////////////////////////////////
#pragma region Validation
bool validateRange( StorageServer::VersionedData::ViewAtVersion const& view, KeyRangeRef range, Version version, UID id, Version minInsertVersion ) {
// * Nonoverlapping: No clear overlaps a set or another clear, or adjoins another clear.
// * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion()
TraceEvent("ValidateRange", id).detail("KeyBegin", printable(range.begin)).detail("KeyEnd", printable(range.end)).detail("Version", version);
KeyRef k;
bool ok = true;
bool kIsClear = false;
auto i = view.lower_bound(range.begin);
if (i != view.begin()) --i;
for(; i != view.end() && i.key() < range.end; ++i) {
ASSERT( i.insertVersion() > minInsertVersion );
if (kIsClear && i->isClearTo() ? i.key() <= k : i.key() < k) {
TraceEvent(SevError,"InvalidRange",id).detail("Key1", printable(k)).detail("Key2", printable(i.key())).detail("Version", version);
ok = false;
}
//ASSERT( i.key() >= k );
kIsClear = i->isClearTo();
k = kIsClear ? i->getEndKey() : i.key();
}
return ok;
}
void validate(StorageServer* data, bool force = false) {
try {
if (force || (EXPENSIVE_VALIDATION)) {
data->newestAvailableVersion.validateCoalesced();
data->newestDirtyVersion.validateCoalesced();
for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
ASSERT( s->value()->keys == s->range() );
ASSERT( !s->value()->keys.empty() );
}
for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s)
if (s->value()->isReadable()) {
auto ar = data->newestAvailableVersion.intersectingRanges(s->range());
for(auto a = ar.begin(); a != ar.end(); ++a)
ASSERT( a->value() == latestVersion );
}
// * versionedData contains versions [storageVersion(), version.get()]. It might also contain version (version.get()+1), in which changeDurableVersion may be deleting ghosts, and/or it might
// contain later versions if applyUpdate is on the stack.
ASSERT( data->data().getOldestVersion() == data->storageVersion() );
ASSERT( data->data().getLatestVersion() == data->version.get() || data->data().getLatestVersion() == data->version.get()+1 || (data->debug_inApplyUpdate && data->data().getLatestVersion() > data->version.get()) );
auto latest = data->data().atLatest();
// * Old shards are erased: versionedData.atLatest() has entries (sets or clear *begins*) only for keys in readable or adding,transferred shards.
for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
ShardInfo* shard = s->value().getPtr();
if (!shard->isInVersionedData()) {
if (latest.lower_bound(s->begin()) != latest.lower_bound(s->end())) {
TraceEvent(SevError, "VF", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime).detail("KeyBegin", printable(s->begin())).detail("KeyEnd", printable(s->end()))
.detail("FirstKey", printable(latest.lower_bound(s->begin()).key())).detail("FirstInsertV", latest.lower_bound(s->begin()).insertVersion());
}
ASSERT( latest.lower_bound(s->begin()) == latest.lower_bound(s->end()) );
}
}
latest.validate();
validateRange(latest, allKeys, data->version.get(), data->thisServerID, data->durableVersion.get());
data->debug_lastValidateTime = now();
}
} catch (...) {
TraceEvent(SevError, "ValidationFailure", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime);
throw;
}
}
#pragma endregion
///////////////////////////////////// Queries /////////////////////////////////
#pragma region Queries
ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
// This could become an Actor transparently, but for now it just does the lookup
if (version == latestVersion)
version = std::max(Version(1), data->version.get());
if (version < data->oldestVersion.get() || version <= 0) throw transaction_too_old();
else if (version <= data->version.get())
return version;
if(data->behind && version > data->version.get()) {
throw process_behind();
}
if(g_random->random01() < 0.001)
TraceEvent("WaitForVersion1000x");
choose {
when ( Void _ = wait( data->version.whenAtLeast(version) ) ) {
//FIXME: A bunch of these can block with or without the following delay 0.
//Void _ = wait( delay(0) ); // don't do a whole bunch of these at once
if (version < data->oldestVersion.get()) throw transaction_too_old(); // just in case
return version;
}
when ( Void _ = wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) {
if(g_random->random01() < 0.001)
TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
.detail("Version", version)
.detail("MyVersion", data->version.get())
.detail("ServerID", data->thisServerID);
throw future_version();
}
}
}
ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
// This could become an Actor transparently, but for now it just does the lookup
if (version == latestVersion)
version = std::max(Version(1), data->version.get());
if (version <= data->version.get())
return version;
choose {
when ( Void _ = wait( data->version.whenAtLeast(version) ) ) {
return version;
}
when ( Void _ = wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) {
if(g_random->random01() < 0.001)
TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
.detail("Version", version)
.detail("MyVersion", data->version.get())
.detail("ServerID", data->thisServerID);
throw future_version();
}
}
}
ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
state double startTime = timer();
try {
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
++data->counters.getValueQueries;
++data->counters.allQueries;
++data->readQueueSizeMetric;
data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
Void _ = wait( delay(0, TaskDefaultEndpoint) );
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
state uint64_t changeCounter = data->shardChangeCounter;
if (!data->shards[req.key]->isReadable()) {
//TraceEvent("WrongShardServer", data->thisServerID).detail("Key", printable(req.key)).detail("Version", version).detail("In", "getValueQ");
throw wrong_shard_server();
}
state int path = 0;
auto i = data->data().at(version).lastLessOrEqual(req.key);
if (i && i->isValue() && i.key() == req.key) {
v = (Value)i->getValue();
path = 1;
} else if (!i || !i->isClearTo() || i->getEndKey() <= req.key) {
path = 2;
Optional<Value> vv = wait( data->storage.readValue( req.key, req.debugID ) );
// Validate that while we were reading the data we didn't lose the version or shard
if (version < data->storageVersion()) {
TEST(true); // transaction_too_old after readValue
throw transaction_too_old();
}
data->checkChangeCounter(changeCounter, req.key);
v = vv;
}
debugMutation("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
debugMutation("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
/*
StorageMetrics m;
m.bytesPerKSecond = req.key.size() + (v.present() ? v.get().size() : 0);
m.iosPerKSecond = 1;
data->metrics.notify(req.key, m);
*/
if (v.present()) {
++data->counters.rowsQueried;
data->counters.bytesQueried += v.get().size();
}
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
GetValueReply reply(v);
reply.penalty = data->getPenalty();
req.reply.send(reply);
} catch (Error& e) {
if (e.code() == error_code_internal_error || e.code() == error_code_actor_cancelled) throw;
req.reply.sendError(e);
}
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
return Void();
};
ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req ) {
try {
++data->counters.watchQueries;
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
Version version = wait( waitForVersionNoTooOld( data, req.version ) );
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
loop {
try {
state Version latest = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(req.key);
GetValueRequest getReq( req.key, latest, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
//TraceEvent("WatcherCheckValue").detail("Key", printable( req.key ) ).detail("Value", printable( req.value ) ).detail("CurrentValue", printable( v ) ).detail("Ver", latest);
debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
if( reply.value != req.value ) {
req.reply.send( latest );
return Void();
}
if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) {
TEST(true); //Too many watches, reverting to polling
req.reply.sendError( watch_cancelled() );
return Void();
}
++data->numWatches;
data->watchBytes += ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
try {
Void _ = wait( watchFuture );
--data->numWatches;
data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
} catch( Error &e ) {
--data->numWatches;
data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
throw;
}
} catch( Error &e ) {
if( e.code() != error_code_transaction_too_old )
throw;
}
}
} catch (Error& e) {
if( e.code() == error_code_internal_error || e.code() == error_code_actor_cancelled ) throw;
req.reply.sendError(e);
}
return Void();
}
ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
state Future<Void> watch = watchValue_impl( data, req );
state double startTime = now();
loop {
double timeoutDelay = -1;
if(data->noRecentUpdates.get()) {
timeoutDelay = std::max(CLIENT_KNOBS->FAST_WATCH_TIMEOUT - (now() - startTime), 0.0);
} else if(!BUGGIFY) {
timeoutDelay = std::max(CLIENT_KNOBS->WATCH_TIMEOUT - (now() - startTime), 0.0);
}
choose {
when( Void _ = wait( watch ) ) {
return Void();
}
when( Void _ = wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) {
req.reply.sendError( timed_out() );
return Void();
}
when( Void _ = wait( data->noRecentUpdates.onChange()) ) {}
}
}
}
ACTOR Future<Void> getShardState_impl( StorageServer* data, GetShardStateRequest req ) {
ASSERT( req.mode != GetShardStateRequest::NO_WAIT );
loop {
std::vector<Future<Void>> onChange;
for( auto t : data->shards.intersectingRanges( req.keys ) ) {
if( !t.value()->assigned() ) {
onChange.push_back( delay( SERVER_KNOBS->SHARD_READY_DELAY ) );
break;
}
if( req.mode == GetShardStateRequest::READABLE && !t.value()->isReadable() )
onChange.push_back( t.value()->adding->readWrite.getFuture() );
if( req.mode == GetShardStateRequest::FETCHING && !t.value()->isFetched() )
onChange.push_back( t.value()->adding->fetchComplete.getFuture() );
}
if( !onChange.size() ) {
req.reply.send(std::make_pair(data->version.get(), data->durableVersion.get()));
return Void();
}
Void _ = wait( waitForAll( onChange ) );
Void _ = wait( delay(0) ); //onChange could have been triggered by cancellation, let things settle before rechecking
}
}
ACTOR Future<Void> getShardStateQ( StorageServer* data, GetShardStateRequest req ) {
choose {
when( Void _ = wait( getShardState_impl( data, req ) ) ) {}
when( Void _ = wait( delay( g_network->isSimulated() ? 10 : 60 ) ) ) {
req.reply.sendError( timed_out() );
}
}
return Void();
}
void merge( Arena& arena, VectorRef<KeyValueRef>& output, VectorRef<KeyValueRef> const& base,
StorageServer::VersionedData::iterator& start, StorageServer::VersionedData::iterator const& end,
int versionedDataCount, int limit, bool stopAtEndOfBase, int limitBytes = 1<<30 )
// Combines data from base (at an older version) with sets from newer versions in [start, end) and appends the first (up to) |limit| rows to output
// If limit<0, base and output are in descending order, and start->key()>end->key(), but start is still inclusive and end is exclusive
{
if (limit==0) return;
int originalLimit = abs(limit) + output.size();
bool forward = limit>0;
if (!forward) limit = -limit;
int accumulatedBytes = 0;
KeyValueRef const* baseStart = base.begin();
KeyValueRef const* baseEnd = base.end();
while (baseStart!=baseEnd && start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
if (forward ? baseStart->key < start.key() : baseStart->key > start.key())
output.push_back_deep( arena, *baseStart++ );
else {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
if (baseStart->key == start.key()) ++baseStart;
if (forward) ++start; else --start;
}
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
while (baseStart!=baseEnd && --limit>=0 && accumulatedBytes < limitBytes) {
output.push_back_deep( arena, *baseStart++ );
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
if( !stopAtEndOfBase ) {
while (start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
if (forward) ++start; else --start;
}
}
ASSERT( output.size() <= originalLimit );
}
// readRange reads up to |limit| rows from the given range and version, combining data->storage and data->versionedData.
// If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
// readRange has O(|result|) + O(log |data|) cost
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes ) {
state GetKeyValuesReply result;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
state StorageServer::VersionedData::iterator vStart = view.end();
state StorageServer::VersionedData::iterator vEnd = view.end();
state KeyRef readBegin;
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount;
//state UID rrid = g_random->randomUniqueID();
//state int originalLimit = limit;
//state int originalLimitBytes = *pLimitBytes;
//state bool track = rrid.first() == 0x1bc134c2f752187cLL;
// FIXME: Review pLimitBytes behavior
// if (limit >= 0) we are reading forward, else backward
if (limit >= 0) {
// We might care about a clear beginning before start that
// runs into range
vStart = view.lastLessOrEqual(range.begin);
if (vStart && vStart->isClearTo() && vStart->getEndKey() > range.begin)
readBegin = vStart->getEndKey();
else
readBegin = range.begin;
vStart = view.lower_bound(readBegin);
/*if (track) {
printf("readRange(%llx, @%lld, '%s'-'%s')\n", data->thisServerID.first(), version, printable(range.begin).c_str(), printable(range.end).c_str());
printf("mvcc:\n");
vEnd = view.upper_bound(range.end);
for(auto r=vStart; r != vEnd; ++r) {
if (r->isClearTo())
printf(" '%s'-'%s' cleared\n", printable(r.key()).c_str(), printable(r->getEndKey()).c_str());
else
printf(" '%s' := '%s'\n", printable(r.key()).c_str(), printable(r->getValue()).c_str());
}
}*/
while (limit>0 && *pLimitBytes>0 && readBegin < range.end) {
// ASSERT( vStart == view.lower_bound(readBegin) );
ASSERT( !vStart || vStart.key() >= readBegin );
if (vStart) { auto b = vStart; --b; ASSERT( !b || b.key() < readBegin ); }
ASSERT( data->storageVersion() <= version );
// Read up to limit items from the view, stopping at the next clear (or the end of the range)
vEnd = vStart;
vCount = 0;
int vSize = 0;
while (vEnd && vEnd.key() < range.end && !vEnd->isClearTo() && vCount < limit && vSize < *pLimitBytes){
vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
++vCount;
++vEnd;
}
// Read the data on disk up to vEnd (or the end of the range)
readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
/*if (track) {
printf("read [%s,%s): %d rows\n", printable(readBegin).c_str(), printable(readEnd).c_str(), atStorageVersion.size());
for(auto r=atStorageVersion.begin(); r != atStorageVersion.end(); ++r)
printf(" '%s' := '%s'\n", printable(r->key).c_str(), printable(r->value).c_str());
}*/
ASSERT( atStorageVersion.size() <= limit );
if (data->storageVersion() > version) throw transaction_too_old();
bool more = atStorageVersion.size()!=0;
// merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if there is 'more'
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, more, *pLimitBytes );
limit -= result.data.size() - prevSize;
for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
// Setup for the next iteration
if (more) { // if there might be more data, begin reading right after what we already found to find out
//if (track) printf("more\n");
if (!(limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key))
TraceEvent(SevError, "ReadRangeIssue", data->thisServerID).detail("ReadBegin", printable(readBegin)).detail("ReadEnd", printable(readEnd))
.detail("VStart", vStart ? printable(vStart.key()) : "nil").detail("VEnd", vEnd ? printable(vEnd.key()) : "nil")
.detail("AtStorageVersionBack", printable(atStorageVersion.end()[-1].key)).detail("ResultBack", printable(result.data.end()[-1].key))
.detail("Limit", limit).detail("LimitBytes", *pLimitBytes).detail("ResultSize", result.data.size()).detail("PrevSize", prevSize);
readBegin = readBeginTemp = keyAfter( result.data.end()[-1].key );
ASSERT( limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key );
} else if (vStart && vStart->isClearTo()){ // if vStart is a clear, skip it.
//if (track) printf("skip clear\n");
readBegin = vStart->getEndKey(); // next disk read should start at the end of the clear
++vStart;
} else { // Otherwise, continue at readEnd
//if (track) printf("continue\n");
readBegin = readEnd;
}
}
// all but the last item are less than *pLimitBytes
ASSERT( result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0 );
/*if (*pLimitBytes <= 0)
TraceEvent(SevWarn, "ReadRangeLimitExceeded")
.detail("Version", version)
.detail("Begin", printable(range.begin) )
.detail("End", printable(range.end) )
.detail("LimitReamin", limit)
.detail("LimitBytesRemain", *pLimitBytes); */
/*GetKeyValuesReply correct = wait( readRangeOld(data, version, range, originalLimit, originalLimitBytes) );
bool prefix_equal = true;
int totalsize = 0;
int first_difference = -1;
for(int i=0; i<result.data.size() && i<correct.data.size(); i++) {
if (result.data[i] != correct.data[i]) {
first_difference = i;
prefix_equal = false;
break;
}
totalsize += result.data[i].expectedSize() + sizeof(KeyValueRef);
}
// for the following check
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
result.version = version;
if ( !(totalsize>originalLimitBytes ? prefix_equal : result.data==correct.data) || correct.more != result.more ) {
TraceEvent(SevError, "IncorrectResult", rrid).detail("Server", data->thisServerID).detail("CorrectRows", correct.data.size())
.detail("FirstDifference", first_difference).detail("OriginalLimit", originalLimit)
.detail("ResultRows", result.data.size()).detail("Result0", printable(result.data[0].key)).detail("Correct0", printable(correct.data[0].key))
.detail("ResultN", result.data.size() ? printable(result.data[std::min(correct.data.size(),result.data.size())-1].key) : "nil")
.detail("CorrectN", correct.data.size() ? printable(correct.data[std::min(correct.data.size(),result.data.size())-1].key) : "nil");
}*/
} else {
// Reverse read - abandon hope alle ye who enter here
readEnd = range.end;
vStart = view.lastLess(readEnd);
// A clear might extend all the way to range.end
if (vStart && vStart->isClearTo() && vStart->getEndKey() >= readEnd) {
readEnd = vStart.key();
--vStart;
}
while (limit < 0 && *pLimitBytes > 0 && readEnd > range.begin) {
vEnd = vStart;
vCount = 0;
int vSize=0;
while (vEnd && vEnd.key() >= range.begin && !vEnd->isClearTo() && vCount < -limit && vSize < *pLimitBytes){
vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
++vCount;
--vEnd;
}
readBegin = range.begin;
if (vEnd)
readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait( data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit ) );
if (data->storageVersion() > version) throw transaction_too_old();
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, false, *pLimitBytes );
limit += result.data.size() - prevSize;
for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
vStart = vEnd;
readEnd = readBegin;
if (vStart && vStart->isClearTo()) {
ASSERT( vStart.key() < readEnd );
readEnd = vStart.key();
--vStart;
}
}
}
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
result.version = version;
return result;
}
bool selectorInRange( KeySelectorRef const& sel, KeyRangeRef const& range ) {
// Returns true if the given range suffices to at least begin to resolve the given KeySelectorRef
return sel.getKey() >= range.begin && (sel.isBackward() ? sel.getKey() <= range.end : sel.getKey() < range.end);
}
ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
// If it is found, offset is set to 0 and a key is returned which falls inside range.
// If the search would depend on any key outside range OR if the key selector offset is too large (range read returns too many bytes), it returns either
// a negative offset and a key in [range.begin, sel.getKey()], indicating the key is (the first key <= returned key) + offset, or
// a positive offset and a key in (sel.getKey(), range.end], indicating the key is (the first key >= returned key) + offset-1
// The range passed in to this function should specify a shard. If range.begin is repeatedly not the beginning of a shard, then it is possible to get stuck looping here
{
ASSERT( version != latestVersion );
ASSERT( selectorInRange(sel, range) && version >= data->oldestVersion.get() );
// Count forward or backward distance items, skipping the first one if it == key and skipEqualKey
state bool forward = sel.offset > 0; // If forward, result >= sel.getKey(); else result <= sel.getKey()
state int sign = forward ? +1 : -1;
state bool skipEqualKey = sel.orEqual == forward;
state int distance = forward ? sel.offset : 1-sel.offset;
//Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from the read range in this case)
state int maxBytes;
if (sel.offset <= 1 && sel.offset >= 0)
maxBytes = std::numeric_limits<int>::max();
else
maxBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_LIMIT_BYTES : SERVER_KNOBS->STORAGE_LIMIT_BYTES;
state GetKeyValuesReply rep = wait( readRange( data, version, forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())), (distance + skipEqualKey)*sign, &maxBytes ) );
state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
//If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in a loop
if(more && !forward && rep.data.size() == 1) {
TEST(true); //Reverse key selector returned only one result in range read
maxBytes = std::numeric_limits<int>::max();
GetKeyValuesReply rep2 = wait( readRange( data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes ) );
rep = rep2;
more = rep.more && rep.data.size() != distance + skipEqualKey;
ASSERT(rep.data.size() == 2 || !more);
}
int index = distance-1;
if (skipEqualKey && rep.data.size() && rep.data[0].key == sel.getKey() )
++index;
if (index < rep.data.size()) {
*pOffset = 0;
return rep.data[ index ].key;
} else {
// FIXME: If range.begin=="" && !forward, return success?
*pOffset = index - rep.data.size() + 1;
if (!forward) *pOffset = -*pOffset;
if (more) {
TEST(true); // Key selector read range had more results
ASSERT(rep.data.size());
Key returnKey = forward ? keyAfter(rep.data.back().key) : rep.data.back().key;
//This is possible if key/value pairs are very large and only one result is returned on a last less than query
//SOMEDAY: graceful handling of exceptionally sized values
ASSERT(returnKey != sel.getKey());
return returnKey;
}
else
return forward ? range.end : range.begin;
}
}
KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
// Returns largest range such that the shard state isReadable and selectorInRange(sel, range) or wrong_shard_server if no such range exists
{
auto i = sel.isBackward() ? data->shards.rangeContainingKeyBefore( sel.getKey() ) : data->shards.rangeContaining( sel.getKey() );
if (!i->value()->isReadable()) throw wrong_shard_server();
ASSERT( selectorInRange(sel, i->range()) );
return i->range();
}
ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents
// all data from being read in one range read
{
++data->counters.getRangeQueries;
++data->counters.allQueries;
++data->readQueueSizeMetric;
data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
Void _ = wait( delay(0, TaskDefaultEndpoint) );
try {
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
state Version version = wait( waitForVersion( data, req.version ) );
state uint64_t changeCounter = data->shardChangeCounter;
// try {
state KeyRange shard = getShardKeyRange( data, req.begin );
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
//.detail("ShardBegin", printable(shard.begin)).detail("ShardEnd", printable(shard.end));
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
if ( !selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end) ) {
// TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", printable(shard.begin)).detail("ShardEnd", printable(shard.end)).detail("In", "getKeyValues>checkShardExtents");
throw wrong_shard_server();
}
state int offset1;
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1 );
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",printable(req.begin.getKey())).detail("ReqEnd",printable(req.end.getKey()));
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
// An end offset of 1 is also OK because the end key is exclusive, so if the first key of the next shard is the end the last actual key returned must be from this shard.
// A begin offset of 1 is also OK because then either begin is past end or equal to end (so the result is definitely empty)
if ((offset1 && offset1!=1) || (offset2 && offset2!=1)) {
TEST(true); // wrong_shard_server due to offset
// We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end, and return a clipped range rather
// than an error (since that is what the NativeAPI.getRange will do anyway via its "slow path"), but we would have to add some flags to the response
// to encode whether we went off the beginning and the end, since it needs that information.
//TraceEvent("WrongShardServer2", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", printable(shard.begin)).detail("ShardEnd", printable(shard.end)).detail("In", "getKeyValues>checkOffsets").detail("BeginKey", printable(begin)).detail("EndKey", printable(end)).detail("BeginOffset", offset1).detail("EndOffset", offset2);
throw wrong_shard_server();
}
if (begin >= end) {
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Send");
//.detail("Begin",printable(begin)).detail("End",printable(end));
GetKeyValuesReply none;
none.version = version;
none.more = false;
none.penalty = data->getPenalty();
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.begin.getKey(), req.end.getKey()), std::max<KeyRef>(req.begin.getKey(), req.end.getKey()) ) );
req.reply.send( none );
} else {
state int remainingLimitBytes = req.limitBytes;
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
GetKeyValuesReply r = _r;
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange");
//.detail("Begin",printable(begin)).detail("End",printable(end)).detail("SizeOf",r.data.size());
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())), std::max<KeyRef>(end, std::max<KeyRef>(req.begin.getKey(), req.end.getKey())) ) );
if (EXPENSIVE_VALIDATION) {
for (int i = 0; i < r.data.size(); i++)
ASSERT(r.data[i].key >= begin && r.data[i].key < end);
ASSERT(r.data.size() <= std::abs(req.limit));
}
/*for( int i = 0; i < r.data.size(); i++ ) {
StorageMetrics m;
m.bytesPerKSecond = r.data[i].expectedSize();
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
data->metrics.notify(r.data[i].key, m);
}*/
r.penalty = data->getPenalty();
req.reply.send( r );
data->counters.rowsQueried += r.data.size();
data->counters.bytesQueried += req.limitBytes - remainingLimitBytes;
}
} catch (Error& e) {
if (e.code() == error_code_internal_error || e.code() == error_code_actor_cancelled) throw;
req.reply.sendError(e);
}
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
return Void();
}
ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
++data->counters.getKeyQueries;
++data->counters.allQueries;
++data->readQueueSizeMetric;
data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
Void _ = wait( delay(0, TaskDefaultEndpoint) );
try {
state Version version = wait( waitForVersion( data, req.version ) );
state uint64_t changeCounter = data->shardChangeCounter;
state KeyRange shard = getShardKeyRange( data, req.sel );
state int offset;
Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
KeySelector updated;
if (offset < 0)
updated = firstGreaterOrEqual(k)+offset; // first thing on this shard OR (large offset case) smallest key retrieved in range read
else if (offset > 0)
updated = firstGreaterOrEqual(k)+offset-1; // first thing on next shard OR (large offset case) keyAfter largest key retrieved in range read
else
updated = KeySelectorRef(k,true,0); //found
++data->counters.rowsQueried;
data->counters.bytesQueried += k.size();
GetKeyReply reply(updated);
reply.penalty = data->getPenalty();
req.reply.send(reply);
}
catch (Error& e) {
//if (e.code() == error_code_wrong_shard_server) TraceEvent("WrongShardServer").detail("In","getKey");
if (e.code() == error_code_internal_error || e.code() == error_code_actor_cancelled) throw;
req.reply.sendError(e);
}
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
return Void();
}
void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const& req ) {
StorageQueuingMetricsReply reply;
reply.localTime = now();
reply.instanceID = self->instanceID;
reply.bytesInput = self->counters.bytesInput.getValue();
reply.bytesDurable = self->counters.bytesDurable.getValue();
reply.storageBytes = self->storage.getStorageBytes();
reply.v = self->version.get();
req.reply.send( reply );
}
#pragma endregion
/////////////////////////// Updates ////////////////////////////////
#pragma region Updates
ACTOR Future<Void> doEagerReads( StorageServer* data, UpdateEagerReadInfo* eager ) {
eager->finishKeyBegin();
vector<Future<Key>> keyEnd( eager->keyBegin.size() );
for(int i=0; i<keyEnd.size(); i++)
keyEnd[i] = data->storage.readNextKeyInclusive( eager->keyBegin[i] );
state Future<vector<Key>> futureKeyEnds = getAll(keyEnd);
vector<Future<Optional<Value>>> value( eager->keys.size() );
for(int i=0; i<value.size(); i++)
value[i] = data->storage.readValuePrefix( eager->keys[i].first, eager->keys[i].second );
state Future<vector<Optional<Value>>> futureValues = getAll(value);
state vector<Key> keyEndVal = wait( futureKeyEnds );
vector<Optional<Value>> optionalValues = wait ( futureValues);
eager->keyEnd = keyEndVal;
eager->value = optionalValues;
return Void();
}
void singleEagerReadFromVector( UpdateEagerReadInfo& eager, KeyRef const& key, VectorRef<KeyValueRef> data ) {
eager.keyBegin.clear(); eager.keyEnd.clear();
eager.keyBegin.push_back( key );
auto e = std::lower_bound( data.begin(), data.end(), key, KeyValueRef::OrderByKey() );
eager.keyEnd.push_back( e != data.end() ? e->key : allKeys.end );
}
bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion ) {
// Remove entries from the latest version of data->versionedData that haven't changed since they were inserted
// before or at desiredDurableVersion, to maintain the invariants for versionedData.
// Such entries remain in older versions of versionedData until they are forgotten, because it is expensive to dig them out.
// We also remove everything up to and including newDurableVersion from mutationLog, and everything
// up to but excluding desiredDurableVersion from freeable
// May return false if only part of the work has been done, in which case the caller must call again with the same parameters
auto& verData = data->mutableData();
ASSERT( verData.getLatestVersion() == data->version.get() || verData.getLatestVersion() == data->version.get()+1 );
Version nextDurableVersion = desiredDurableVersion;
auto mlv = data->getMutationLog().begin();
if (mlv != data->getMutationLog().end() && mlv->second.version <= desiredDurableVersion) {
auto& v = mlv->second;
nextDurableVersion = v.version;
data->freeable[ data->version.get() ].dependsOn( v.arena() );
if (verData.getLatestVersion() <= data->version.get())
verData.createNewVersion( data->version.get()+1 );
int64_t bytesDurable = VERSION_OVERHEAD;
for(auto m = v.mutations.begin(); m; ++m) {
bytesDurable += mvccStorageBytes(*m);
auto i = verData.atLatest().find(m->param1);
if (i) {
ASSERT( i.key() == m->param1 );
ASSERT( i.insertVersion() >= nextDurableVersion );
if (i.insertVersion() == nextDurableVersion)
verData.erase(i);
}
if (m->type == MutationRef::SetValue) {
// A set can split a clear, so there might be another entry immediately after this one that should also be cleaned up
i = verData.atLatest().upper_bound(m->param1);
if (i) {
ASSERT( i.insertVersion() >= nextDurableVersion );
if (i.insertVersion() == nextDurableVersion)
verData.erase(i);
}
}
}
data->counters.bytesDurable += bytesDurable;
}
if (EXPENSIVE_VALIDATION) {
// Check that the above loop did its job
auto view = data->data().atLatest();
for(auto i = view.begin(); i != view.end(); ++i)
ASSERT( i.insertVersion() > nextDurableVersion );
}
data->getMutableMutationLog().erase(data->getMutationLog().begin(), data->getMutationLog().upper_bound(nextDurableVersion));
data->freeable.erase( data->freeable.begin(), data->freeable.lower_bound(nextDurableVersion) );
Future<Void> checkFatalError = data->otherError.getFuture();
data->durableVersion.set( nextDurableVersion );
if (checkFatalError.isReady()) checkFatalError.get();
//TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion);
validate(data);
return nextDurableVersion == desiredDurableVersion;
}
Optional<MutationRef> clipMutation( MutationRef const& m, KeyRangeRef range ) {
if (isSingleKeyMutation((MutationRef::Type) m.type)) {
if (range.contains(m.param1)) return m;
}
else if (m.type == MutationRef::ClearRange) {
KeyRangeRef i = range & KeyRangeRef(m.param1, m.param2);
if (!i.empty())
return MutationRef( (MutationRef::Type)m.type, i.begin, i.end );
}
else
ASSERT(false);
return Optional<MutationRef>();
}
bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, UpdateEagerReadInfo* eager, KeyRef eagerTrustedEnd, Arena& ar ) {
// After this function call, m should be copied into an arena immediately (before modifying data, shards, or eager)
if (m.type == MutationRef::ClearRange) {
// Expand the clear
const auto& d = data.atLatest();
// Up through 2.0.3, when a clear of a single key is
// translated to a single key range, if the key exceeded the
// key size limit the begin and end of the range will be
// equal. We need to be able to recover from a database that
// was left in this state as long as we're supporting upgrades
// from 2.0.3 or earlier.
if ( (m.param1.size() == CLIENT_KNOBS->KEY_SIZE_LIMIT + 1) && (m.param2 == m.param1) ) {
return false;
}
ASSERT( m.param2 > m.param1 );
// If another clear overlaps the beginning of this one, engulf it
auto i = d.lastLess(m.param1);
if (i && i->isClearTo() && i->getEndKey() >= m.param1)
m.param1 = i.key();
// If another clear overlaps the end of this one, engulf it; otherwise expand
i = d.lastLessOrEqual(m.param2);
if (i && i->isClearTo() && i->getEndKey() >= m.param2) {
m.param2 = i->getEndKey();
} else {
// Expand to the next set or clear (from storage or latestVersion), and if it
// is a clear, engulf it as well
i = d.lower_bound(m.param2);
KeyRef endKeyAtStorageVersion = m.param2 == eagerTrustedEnd ? eagerTrustedEnd : std::min( eager->getKeyEnd( m.param2 ), eagerTrustedEnd );
if (!i || endKeyAtStorageVersion < i.key())
m.param2 = endKeyAtStorageVersion;
else if (i->isClearTo())
m.param2 = i->getEndKey();
else
m.param2 = i.key();
}
ASSERT( m.param2 > m.param1 );
}
else if (m.type != MutationRef::SetValue && (m.type)) {
Optional<StringRef> oldVal;
auto it = data.atLatest().lastLessOrEqual(m.param1);
if (it != data.atLatest().end() && it->isValue() && it.key() == m.param1)
oldVal = it->getValue();
else if (it != data.atLatest().end() && it->isClearTo() && it->getEndKey() > m.param1) {
TEST(true); // Atomic op right after a clear.
}
else {
Optional<Value>& oldThing = eager->getValue(m.param1);
if (oldThing.present())
oldVal = oldThing.get();
}
switch(m.type) {
case MutationRef::AddValue:
m.param2 = doLittleEndianAdd(oldVal, m.param2, ar);
break;
case MutationRef::And:
m.param2 = doAnd(oldVal, m.param2, ar);
break;
case MutationRef::Or:
m.param2 = doOr(oldVal, m.param2, ar);
break;
case MutationRef::Xor:
m.param2 = doXor(oldVal, m.param2, ar);
break;
case MutationRef::AppendIfFits:
m.param2 = doAppendIfFits(oldVal, m.param2, ar);
break;
case MutationRef::Max:
m.param2 = doMax(oldVal, m.param2, ar);
break;
case MutationRef::Min:
m.param2 = doMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMin:
m.param2 = doByteMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMax:
m.param2 = doByteMax(oldVal, m.param2, ar);
break;
case MutationRef::MinV2:
m.param2 = doMinV2(oldVal, m.param2, ar);
break;
case MutationRef::AndV2:
m.param2 = doAndV2(oldVal, m.param2, ar);
break;
}
m.type = MutationRef::SetValue;
}
return true;
}
bool isClearContaining( StorageServer::VersionedData::ViewAtVersion const& view, KeyRef key ) {
auto i = view.lastLessOrEqual(key);
return i && i->isClearTo() && i->getEndKey() > key;
}
void applyMutation( StorageServer *self, MutationRef const& m, Arena& arena, StorageServer::VersionedData &data ) {
// m is expected to be in arena already
// Clear split keys are added to arena
StorageMetrics metrics;
metrics.bytesPerKSecond = mvccStorageBytes( m ) / 2;
metrics.iosPerKSecond = 1;
self->metrics.notify(m.param1, metrics);
if (m.type == MutationRef::SetValue) {
auto prev = data.atLatest().lastLessOrEqual(m.param1);
if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) {
ASSERT( prev.key() <= m.param1 );
KeyRef end = prev->getEndKey();
// the insert version of the previous clear is preserved for the "left half", because in changeDurableVersion() the previous clear is still responsible for removing it
// insert() invalidates prev, so prev.key() is not safe to pass to it by reference
data.insert( KeyRef(prev.key()), ValueOrClearToRef::clearTo( m.param1 ), prev.insertVersion() ); // overwritten by below insert if empty
KeyRef nextKey = keyAfter(m.param1, arena);
if ( end != nextKey ) {
ASSERT( end > nextKey );
// the insert version of the "right half" is not preserved, because in changeDurableVersion() this set is responsible for removing it
// FIXME: This copy is technically an asymptotic problem, definitely a waste of memory (copy of keyAfter is a waste, but not asymptotic)
data.insert( nextKey, ValueOrClearToRef::clearTo( KeyRef(arena, end) ) );
}
}
data.insert( m.param1, ValueOrClearToRef::value(m.param2) );
self->watches.trigger( m.param1 );
} else if (m.type == MutationRef::ClearRange) {
data.erase( m.param1, m.param2 );
ASSERT( m.param2 > m.param1 );
ASSERT( !isClearContaining( data.atLatest(), m.param1 ) );
data.insert( m.param1, ValueOrClearToRef::clearTo(m.param2) );
self->watches.triggerRange( m.param1, m.param2 );
}
}
void removeDataRange( StorageServer *ss, Standalone<VersionUpdateRef> &mLV, KeyRangeMap<Reference<ShardInfo>>& shards, KeyRangeRef range ) {
// modify the latest version of data to remove all sets and trim all clears to exclude range.
// Add a clear to mLV (mutationLog[data.getLatestVersion()]) that ensures all keys in range are removed from the disk when this latest version becomes durable
// mLV is also modified if necessary to ensure that split clears can be forgotten
MutationRef clearRange( MutationRef::ClearRange, range.begin, range.end );
clearRange = ss->addMutationToMutationLog( mLV, clearRange );
auto& data = ss->mutableData();
// Expand the range to the right to include other shards not in versionedData
for( auto r = shards.rangeContaining(range.end); r != shards.ranges().end() && !r->value()->isInVersionedData(); ++r )
range = KeyRangeRef(range.begin, r->end());
auto endClear = data.atLatest().lastLess( range.end );
if (endClear && endClear->isClearTo() && endClear->getEndKey() > range.end ) {
// This clear has been bumped up to insertVersion==data.getLatestVersion and needs a corresponding mutation log entry to forget
MutationRef m( MutationRef::ClearRange, range.end, endClear->getEndKey() );
m = ss->addMutationToMutationLog( mLV, m );
data.insert( m.param1, ValueOrClearToRef::clearTo( m.param2 ) );
}
auto beginClear = data.atLatest().lastLess( range.begin );
if (beginClear && beginClear->isClearTo() && beginClear->getEndKey() > range.begin ) {
// We don't need any special mutationLog entry - because the begin key and insert version are unchanged the original clear
// mutation works to forget this one - but we need range.begin in the right arena
KeyRef rb( mLV.arena(), range.begin );
// insert() invalidates beginClear, so beginClear.key() is not safe to pass to it by reference
data.insert( KeyRef(beginClear.key()), ValueOrClearToRef::clearTo( rb ), beginClear.insertVersion() );
}
data.erase( range.begin, range.end );
}
void setAvailableStatus( StorageServer* self, KeyRangeRef keys, bool available );
void setAssignedStatus( StorageServer* self, KeyRangeRef keys, bool nowAssigned );
void coalesceShards(StorageServer *data, KeyRangeRef keys) {
auto shardRanges = data->shards.intersectingRanges(keys);
auto fullRange = data->shards.ranges();
auto iter = shardRanges.begin();
if( iter != fullRange.begin() ) --iter;
auto iterEnd = shardRanges.end();
if( iterEnd != fullRange.end() ) ++iterEnd;
bool lastReadable = false;
bool lastNotAssigned = false;
KeyRangeMap<Reference<ShardInfo>>::Iterator lastRange;
for( ; iter != iterEnd; ++iter) {
if( lastReadable && iter->value()->isReadable() ) {
KeyRange range = KeyRangeRef( lastRange->begin(), iter->end() );
data->addShard( ShardInfo::newReadWrite( range, data) );
iter = data->shards.rangeContaining(range.begin);
} else if( lastNotAssigned && iter->value()->notAssigned() ) {
KeyRange range = KeyRangeRef( lastRange->begin(), iter->end() );
data->addShard( ShardInfo::newNotAssigned( range) );
iter = data->shards.rangeContaining(range.begin);
}
lastReadable = iter->value()->isReadable();
lastNotAssigned = iter->value()->notAssigned();
lastRange = iter;
}
}
ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version version, KeyRangeRef keys, GetRangeLimits limits, bool* isTooOld ) {
state Transaction tr( cx );
state Standalone<RangeResultRef> output;
state KeySelectorRef begin = firstGreaterOrEqual( keys.begin );
state KeySelectorRef end = firstGreaterOrEqual( keys.end );
if( *isTooOld )
throw transaction_too_old();
tr.setVersion( version );
limits.minRows = 0;
try {
loop {
Standalone<RangeResultRef> rep = wait( tr.getRange( begin, end, limits, true ) );
limits.decrement( rep );
if( limits.isReached() || !rep.more ) {
if( output.size() ) {
output.arena().dependsOn( rep.arena() );
output.append( output.arena(), rep.begin(), rep.size() );
if( limits.isReached() && rep.readThrough.present() )
output.readThrough = rep.readThrough.get();
} else {
output = rep;
}
output.more = limits.isReached();
return output;
} else if( rep.readThrough.present() ) {
output.arena().dependsOn( rep.arena() );
if( rep.size() ) {
output.append( output.arena(), rep.begin(), rep.size() );
ASSERT( rep.readThrough.get() > rep.end()[-1].key );
} else {
ASSERT( rep.readThrough.get() > keys.begin );
}
begin = firstGreaterOrEqual( rep.readThrough.get() );
} else {
output.arena().dependsOn( rep.arena() );
output.append( output.arena(), rep.begin(), rep.size() );
begin = firstGreaterThan( output.end()[-1].key );
}
}
} catch( Error &e ) {
if( begin.getKey() != keys.begin && ( e.code() == error_code_transaction_too_old || e.code() == error_code_future_version ) ) {
if( e.code() == error_code_transaction_too_old )
*isTooOld = true;
output.more = true;
if( begin.isFirstGreaterOrEqual() )
output.readThrough = begin.getKey();
return output;
}
throw;
}
}
template <class T>
void addMutation( T& target, Version version, MutationRef const& mutation ) {
target.addMutation( version, mutation );
}
template <class T>
void addMutation( Reference<T>& target, Version version, MutationRef const& mutation ) {
addMutation(*target, version, mutation);
}
template <class T>
void splitMutations( KeyRangeMap<T>& map, VerUpdateRef const& update ) {
for(auto& m : update.mutations) {
splitMutation(map, m, update.version);
}
}
template <class T>
void splitMutation( KeyRangeMap<T>& map, MutationRef const& m, Version ver ) {
if(isSingleKeyMutation((MutationRef::Type) m.type)) {
if ( !SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(m.param1) )
addMutation( map.rangeContaining(m.param1)->value(), ver, m );
}
else if (m.type == MutationRef::ClearRange) {
KeyRangeRef mKeys( m.param1, m.param2 );
if ( !SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(mKeys) ){
auto r = map.intersectingRanges( mKeys );
for(auto i = r.begin(); i != r.end(); ++i) {
KeyRangeRef k = mKeys & i->range();
addMutation( i->value(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end) );
}
}
}
else
ASSERT(false); // Unknown mutation type in splitMutations
}
ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
state TraceInterval interval("FetchKeys");
state KeyRange keys = shard->keys;
state double startt = now();
state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES;
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
// This allows adding->start() to be called inline with CSK.
Void _ = wait( data->coreStarted.getFuture() && delay( 0 ) );
try {
debugKeyRange("fetchKeysBegin", data->version.get(), shard->keys);
TraceEvent(SevDebug, interval.begin(), data->thisServerID)
.detail("KeyBegin", printable(shard->keys.begin))
.detail("KeyEnd",printable(shard->keys.end));
validate(data);
// Wait (if necessary) for the latest version at which any key in keys was previously available (+1) to be durable
auto navr = data->newestAvailableVersion.intersectingRanges( keys );
Version lastAvailable = invalidVersion;
for(auto r=navr.begin(); r!=navr.end(); ++r) {
ASSERT( r->value() != latestVersion );
lastAvailable = std::max(lastAvailable, r->value());
}
auto ndvr = data->newestDirtyVersion.intersectingRanges( keys );
for(auto r=ndvr.begin(); r!=ndvr.end(); ++r)
lastAvailable = std::max(lastAvailable, r->value());
if (lastAvailable != invalidVersion && lastAvailable >= data->durableVersion.get()) {
TEST(true); // FetchKeys waits for previous available version to be durable
Void _ = wait( data->durableVersion.whenAtLeast(lastAvailable+1) );
}
TraceEvent(SevDebug, "FetchKeysVersionSatisfied", data->thisServerID).detail("FKID", interval.pairID);
Void _ = wait( data->fetchKeysParallelismLock.take( TaskDefaultYield, fetchBlockBytes ) );
state FlowLock::Releaser holdingFKPL( data->fetchKeysParallelismLock, fetchBlockBytes );
state double executeStart = now();
++data->counters.fetchWaitingCount;
data->counters.fetchWaitingMS += 1000*(executeStart - startt);
Void _ = wait(delay(0));
shard->phase = AddingShard::Fetching;
state Version fetchVersion = data->version.get();
TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID).detail("FKID", interval.pairID).detail("Version", fetchVersion);
// Get the history
state int debug_getRangeRetries = 0;
state int debug_nextRetryToLog = 1;
state bool isTooOld = false;
//FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server we must refresh the cache manually.
data->cx->invalidateCache(keys);
loop {
try {
TEST(true); // Fetching keys for transferred shard
state Standalone<RangeResultRef> this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isTooOld ) );
int expectedSize = (int)this_block.expectedSize() + (8-(int)sizeof(KeyValueRef))*this_block.size();
TraceEvent(SevDebug, "FetchKeysBlock", data->thisServerID).detail("FKID", interval.pairID)
.detail("BlockRows", this_block.size()).detail("BlockBytes", expectedSize)
.detail("KeyBegin", printable(keys.begin)).detail("KeyEnd", printable(keys.end))
.detail("Last", this_block.size() ? printable(this_block.end()[-1].key) : std::string())
.detail("Version", fetchVersion).detail("More", this_block.more);
debugKeyRange("fetchRange", fetchVersion, keys);
for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
data->counters.bytesFetched += expectedSize;
if( fetchBlockBytes > expectedSize ) {
holdingFKPL.release( fetchBlockBytes - expectedSize );
}
// Wait for permission to proceed
//Void _ = wait( data->fetchKeysStorageWriteLock.take() );
//state FlowLock::Releaser holdingFKSWL( data->fetchKeysStorageWriteLock );
// Write this_block to storage
state KeyValueRef *kvItr = this_block.begin();
for(; kvItr != this_block.end(); ++kvItr) {
data->storage.writeKeyValue( *kvItr );
Void _ = wait(yield());
}
kvItr = this_block.begin();
for(; kvItr != this_block.end(); ++kvItr) {
data->byteSampleApplySet( *kvItr, invalidVersion );
Void _ = wait(yield());
}
if (this_block.more) {
Key nfk = this_block.readThrough.present() ? this_block.readThrough.get() : keyAfter( this_block.end()[-1].key );
if (nfk != keys.end) {
std::deque< Standalone<VerUpdateRef> > updatesToSplit = std::move( shard->updates );
// This actor finishes committing the keys [keys.begin,nfk) that we already fetched.
// The remaining unfetched keys [nfk,keys.end) will become a separate AddingShard with its own fetchKeys.
shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
shard = data->shards.rangeContaining( keys.begin ).value()->adding;
auto otherShard = data->shards.rangeContaining( nfk ).value()->adding;
keys = shard->keys;
// Split our prior updates. The ones that apply to our new, restricted key range will go back into shard->updates,
// and the ones delivered to the new shard will be discarded because it is in WaitPrevious phase (hasn't chosen a fetchVersion yet).
// What we are doing here is expensive and could get more expensive if we started having many more blocks per shard. May need optimization in the future.
for(auto u = updatesToSplit.begin(); u != updatesToSplit.end(); ++u)
splitMutations( data->shards, *u );
TEST( true );
TEST( shard->updates.size() );
ASSERT( otherShard->updates.empty() );
}
}
this_block = Standalone<RangeResultRef>();
if (BUGGIFY) Void _ = wait( delay( 1 ) );
break;
} catch (Error& e) {
TraceEvent("FKBlockFail", data->thisServerID).error(e,true).suppressFor(1.0).detail("FKID", interval.pairID);
if (e.code() == error_code_transaction_too_old){
TEST(true); // A storage server has forgotten the history data we are fetching
Version lastFV = fetchVersion;
fetchVersion = data->version.get();
isTooOld = false;
// Throw away deferred updates from before fetchVersion, since we don't need them to use blocks fetched at that version
while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front();
//FIXME: remove when we no longer support upgrades from 5.X
if(debug_getRangeRetries >= 100) {
data->cx->enableLocalityLoadBalance = false;
}
debug_getRangeRetries++;
if (debug_nextRetryToLog==debug_getRangeRetries){
debug_nextRetryToLog += std::min(debug_nextRetryToLog, 1024);
TraceEvent(SevWarn, "FetchPast", data->thisServerID).detail("TotalAttempts", debug_getRangeRetries).detail("FKID", interval.pairID).detail("V", lastFV).detail("N", fetchVersion).detail("E", data->version.get());
}
} else if (e.code() == error_code_future_version) {
TEST(true); // fetchKeys got future_version, so there must be a huge storage lag somewhere. Keep trying.
} else {
throw;
}
Void _ = wait( delayJittered( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) );
}
}
//FIXME: remove when we no longer support upgrades from 5.X
data->cx->enableLocalityLoadBalance = true;
// We have completed the fetch and write of the data, now we wait for MVCC window to pass.
// As we have finished this work, we will allow more work to start...
shard->fetchComplete.send(Void());
TraceEvent(SevDebug, "FKBeforeFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
// Directly commit()ing the IKVS would interfere with updateStorage, possibly resulting in an incomplete version being recovered.
// Instead we wait for the updateStorage loop to commit something (and consequently also what we have written)
Void _ = wait( data->durableVersion.whenAtLeast( data->storageVersion()+1 ) );
holdingFKPL.release();
TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
// Wait to run during update(), after a new batch of versions is received from the tlog but before eager reads take place.
Promise<FetchInjectionInfo*> p;
data->readyFetchKeys.push_back( p );
FetchInjectionInfo* batch = wait( p.getFuture() );
TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID);
shard->phase = AddingShard::Waiting;
// Choose a transferredVersion. This choice and timing ensure that
// * The transferredVersion can be mutated in versionedData
// * The transferredVersion isn't yet committed to storage (so we can write the availability status change)
// * The transferredVersion is <= the version of any of the updates in batch, and if there is an equal version
// its mutations haven't been processed yet
shard->transferredVersion = data->version.get() + 1;
//shard->transferredVersion = batch->changes[0].version; //< FIXME: This obeys the documented properties, and seems "safer" because it never introduces extra versions into the data structure, but violates some ASSERTs currently
data->mutableData().createNewVersion( shard->transferredVersion );
ASSERT( shard->transferredVersion > data->storageVersion() );
ASSERT( shard->transferredVersion == data->data().getLatestVersion() );
TraceEvent(SevDebug, "FetchKeysHaveData", data->thisServerID).detail("FKID", interval.pairID)
.detail("Version", shard->transferredVersion).detail("StorageVersion", data->storageVersion());
validate(data);
// Put the updates that were collected during the FinalCommit phase into the batch at the transferredVersion. Eager reads will be done
// for them by update(), and the mutations will come back through AddingShard::addMutations and be applied to versionedMap and mutationLog as normal.
// The lie about their version is acceptable because this shard will never be read at versions < transferredVersion
for(auto i=shard->updates.begin(); i!=shard->updates.end(); ++i) {
i->version = shard->transferredVersion;
batch->arena.dependsOn(i->arena());
}
int startSize = batch->changes.size();
TEST(startSize); //Adding fetch data to a batch which already has changes
batch->changes.resize( batch->changes.size()+shard->updates.size() );
//FIXME: pass the deque back rather than copy the data
std::copy( shard->updates.begin(), shard->updates.end(), batch->changes.begin()+startSize );
Version checkv = shard->transferredVersion;
for(auto b = batch->changes.begin()+startSize; b != batch->changes.end(); ++b ) {
ASSERT( b->version >= checkv );
checkv = b->version;
for(auto& m : b->mutations)
debugMutation("fetchKeysFinalCommitInject", batch->changes[0].version, m);
}
shard->updates.clear();
setAvailableStatus(data, keys, true); // keys will be available when getLatestVersion()==transferredVersion is durable
// Wait for the transferredVersion (and therefore the shard data) to be committed and durable.
Void _ = wait( data->durableVersion.whenAtLeast( shard->transferredVersion ) );
ASSERT( data->shards[shard->keys.begin]->assigned() && data->shards[shard->keys.begin]->keys == shard->keys ); // We aren't changing whether the shard is assigned
data->newestAvailableVersion.insert(shard->keys, latestVersion);
shard->readWrite.send(Void());
data->addShard( ShardInfo::newReadWrite(shard->keys, data) ); // invalidates shard!
coalesceShards(data, keys);
validate(data);
++data->counters.fetchExecutingCount;
data->counters.fetchExecutingMS += 1000*(now() - executeStart);
TraceEvent(SevDebug, interval.end(), data->thisServerID);
} catch (Error &e){
TraceEvent(SevDebug, interval.end(), data->thisServerID).error(e, true).detail("Version", data->version.get());
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
if (shard->phase < AddingShard::Waiting) {
data->storage.clearRange( keys );
data->byteSampleApplyClear( keys, invalidVersion );
} else {
ASSERT( data->data().getLatestVersion() > data->version.get() );
removeDataRange( data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, keys );
setAvailableStatus(data, keys, false);
// Prevent another, overlapping fetchKeys from entering the Fetching phase until data->data().getLatestVersion() is durable
data->newestDirtyVersion.insert( keys, data->data().getLatestVersion() );
}
}
TraceEvent(SevError, "FetchKeysError", data->thisServerID)
.error(e)
.detail("Elapsed", now()-startt)
.detail("KeyBegin", printable(keys.begin))
.detail("KeyEnd",printable(keys.end));
if (e.code() != error_code_actor_cancelled)
data->otherError.sendError(e); // Kill the storage server. Are there any recoverable errors?
throw; // goes nowhere
}
return Void();
};
AddingShard::AddingShard( StorageServer* server, KeyRangeRef const& keys )
: server(server), keys(keys), transferredVersion(invalidVersion), phase(WaitPrevious)
{
fetchClient = fetchKeys(server, this);
}
void AddingShard::addMutation( Version version, MutationRef const& mutation ){
if (mutation.type == mutation.ClearRange) {
ASSERT( keys.begin<=mutation.param1 && mutation.param2<=keys.end );
}
else if (isSingleKeyMutation((MutationRef::Type) mutation.type)) {
ASSERT( keys.contains(mutation.param1) );
}
if (phase == WaitPrevious) {
// Updates can be discarded
} else if (phase == Fetching) {
if (!updates.size() || version > updates.end()[-1].version) {
VerUpdateRef v;
v.version = version;
v.isPrivateData = false;
updates.push_back(v);
} else {
ASSERT( version == updates.end()[-1].version );
}
updates.back().mutations.push_back_deep( updates.back().arena(), mutation );
} else if (phase == Waiting) {
server->addMutation(version, mutation, keys, server->updateEagerReads);
} else ASSERT(false);
}
void ShardInfo::addMutation(Version version, MutationRef const& mutation) {
ASSERT( (void *)this);
ASSERT( keys.contains( mutation.param1 ) );
if (adding)
adding->addMutation(version, mutation);
else if (readWrite)
readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads);
else if (mutation.type != MutationRef::ClearRange) {
TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation.toString());
ASSERT(false); // Mutation delivered to notAssigned shard!
}
}
enum ChangeServerKeysContext { CSK_UPDATE, CSK_RESTORE };
const char* changeServerKeysContextName[] = { "Update", "Restore" };
void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAssigned, Version version, ChangeServerKeysContext context ) {
ASSERT( !keys.empty() );
//TraceEvent("ChangeServerKeys", data->thisServerID)
// .detail("KeyBegin", printable(keys.begin))
// .detail("KeyEnd", printable(keys.end))
// .detail("NowAssigned", nowAssigned)
// .detail("Version", version)
// .detail("Context", changeServerKeysContextName[(int)context]);
validate(data);
debugKeyRange( nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys );
bool isDifferent = false;
auto existingShards = data->shards.intersectingRanges(keys);
for( auto it = existingShards.begin(); it != existingShards.end(); ++it ) {
if( nowAssigned != it->value()->assigned() ) {
isDifferent = true;
/*TraceEvent("CSKRangeDifferent", data->thisServerID)
.detail("KeyBegin", printable(it->range().begin))
.detail("KeyEnd", printable(it->range().end));*/
break;
}
}
if( !isDifferent ) {
//TraceEvent("CSKShortCircuit", data->thisServerID)
// .detail("KeyBegin", printable(keys.begin))
// .detail("KeyEnd", printable(keys.end));
return;
}
// Save a backup of the ShardInfo references before we start messing with shards, in order to defer fetchKeys cancellation (and
// its potential call to removeDataRange()) until shards is again valid
vector< Reference<ShardInfo> > oldShards;
auto os = data->shards.intersectingRanges(keys);
for(auto r = os.begin(); r != os.end(); ++r)
oldShards.push_back( r->value() );
// As addShard (called below)'s documentation requires, reinitialize any overlapping range(s)
auto ranges = data->shards.getAffectedRangesAfterInsertion( keys, Reference<ShardInfo>() ); // null reference indicates the range being changed
for(int i=0; i<ranges.size(); i++) {
if (!ranges[i].value) {
ASSERT( (KeyRangeRef&)ranges[i] == keys ); // there shouldn't be any nulls except for the range being inserted
} else if (ranges[i].value->notAssigned())
data->addShard( ShardInfo::newNotAssigned(ranges[i]) );
else if (ranges[i].value->isReadable())
data->addShard( ShardInfo::newReadWrite(ranges[i], data) );
else {
ASSERT( ranges[i].value->adding );
data->addShard( ShardInfo::newAdding( data, ranges[i] ) );
TEST( true ); // ChangeServerKeys reFetchKeys
}
}
// Shard state depends on nowAssigned and whether the data is available (actually assigned in memory or on the disk) up to the given
// version. The latter depends on data->newestAvailableVersion, so loop over the ranges of that.
// SOMEDAY: Could this just use shards? Then we could explicitly do the removeDataRange here when an adding/transferred shard is cancelled
auto vr = data->newestAvailableVersion.intersectingRanges(keys);
vector<std::pair<KeyRange,Version>> changeNewestAvailable;
vector<KeyRange> removeRanges;
for (auto r = vr.begin(); r != vr.end(); ++r) {
KeyRangeRef range = keys & r->range();
bool dataAvailable = r->value()==latestVersion || r->value() >= version;
/*TraceEvent("CSKRange", data->thisServerID)
.detail("KeyBegin", printable(range.begin))
.detail("KeyEnd", printable(range.end))
.detail("Available", dataAvailable)
.detail("NowAssigned", nowAssigned)
.detail("NewestAvailable", r->value())
.detail("ShardState0", data->shards[range.begin]->debugDescribeState());*/
if (!nowAssigned) {
if (dataAvailable) {
ASSERT( r->value() == latestVersion); // Not that we care, but this used to be checked instead of dataAvailable
ASSERT( data->mutableData().getLatestVersion() > version || context == CSK_RESTORE );
changeNewestAvailable.push_back(make_pair(range, version));
removeRanges.push_back( range );
}
data->addShard( ShardInfo::newNotAssigned(range) );
data->watches.triggerRange( range.begin, range.end );
} else if (!dataAvailable) {
// SOMEDAY: Avoid restarting adding/transferred shards
if (version==0){ // bypass fetchkeys; shard is known empty at version 0
changeNewestAvailable.push_back(make_pair(range, latestVersion));
data->addShard( ShardInfo::newReadWrite(range, data) );
setAvailableStatus(data, range, true);
} else {
auto& shard = data->shards[range.begin];
if( !shard->assigned() || shard->keys != range )
data->addShard( ShardInfo::newAdding(data, range) );
}
} else {
changeNewestAvailable.push_back(make_pair(range, latestVersion));
data->addShard( ShardInfo::newReadWrite(range, data) );
}
}
// Update newestAvailableVersion when a shard becomes (un)available (in a separate loop to avoid invalidating vr above)
for(auto r = changeNewestAvailable.begin(); r != changeNewestAvailable.end(); ++r)
data->newestAvailableVersion.insert( r->first, r->second );
if (!nowAssigned)
data->metrics.notifyNotReadable( keys );
coalesceShards( data, KeyRangeRef(ranges[0].begin, ranges[ranges.size()-1].end) );
// Now it is OK to do removeDataRanges, directly and through fetchKeys cancellation (and we have to do so before validate())
oldShards.clear();
ranges.clear();
for(auto r=removeRanges.begin(); r!=removeRanges.end(); ++r) {
removeDataRange( data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r );
setAvailableStatus(data, *r, false);
}
validate(data);
}
void rollback( StorageServer* data, Version rollbackVersion, Version nextVersion ) {
TEST(true); // call to shard rollback
debugKeyRange("Rollback", rollbackVersion, allKeys);
// We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
// forward from the TLog's history. It's not quite as efficient, but we rarely have to do this in practice.
// FIXME: This code is relying for liveness on an undocumented property of the log system implementation: that after a rollback the rolled back versions will
// eventually be missing from the peeked log. A more sophisticated approach would be to make the rollback range durable and, after reboot, skip over
// those versions if they appear in peek results.
throw please_reboot();
}
void StorageServer::addMutation(Version version, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads ) {
MutationRef expanded = mutation;
auto& mLog = addVersionToMutationLog(version);
if ( !expandMutation( expanded, data(), eagerReads, shard.end, mLog.arena()) ) {
return;
}
expanded = addMutationToMutationLog(mLog, expanded);
if (debugMutation("expandedMutation", version, expanded)) {
const char* type =
mutation.type == MutationRef::SetValue ? "SetValue" :
mutation.type == MutationRef::ClearRange ? "ClearRange" :
mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" :
mutation.type == MutationRef::DebugKey ? "DebugKey" :
"UnknownMutation";
printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%lld\t%s\t%s\t%s\n", now(), g_network->getLocalAddress().toString().c_str(), "originalMutation", version, type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str());
printf(" shard: %s - %s\n", printable(shard.begin).c_str(), printable(shard.end).c_str());
if (mutation.type == MutationRef::ClearRange && mutation.param2 != shard.end)
printf(" eager: %s\n", printable( eagerReads->getKeyEnd( mutation.param2 ) ).c_str() );
}
applyMutation( this, expanded, mLog.arena(), mutableData() );
}
struct OrderByVersion {
bool operator()( const VersionUpdateRef& a, const VersionUpdateRef& b ) {
if (a.version != b.version) return a.version < b.version;
if (a.isPrivateData != b.isPrivateData) return a.isPrivateData;
return false;
}
};
bool containsRollback( VersionUpdateRef const& changes, Version& rollbackVersion ) {
for(auto it = changes.mutations.begin(); it; ++it)
if (it->type == it->SetValue && it->param1 == lastEpochEndKey) {
BinaryReader br(it->param2, Unversioned());
br >> rollbackVersion;
return true;
}
return false;
}
class StorageUpdater {
public:
StorageUpdater(Version fromVersion, Version restoredVersion) : fromVersion(fromVersion), currentVersion(fromVersion), restoredVersion(restoredVersion), processedStartKey(false) {}
void applyMutation(StorageServer* data, MutationRef const& m, Version ver) {
//TraceEvent("SSNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver);
if(currentVersion != ver) {
fromVersion = currentVersion;
currentVersion = ver;
data->mutableData().createNewVersion(ver);
}
if (m.param1.startsWith( systemKeys.end )) {
//TraceEvent("PrivateData", data->thisServerID).detail("Mutation", m.toString()).detail("Version", ver);
applyPrivateData( data, m );
} else {
// FIXME: enable when debugMutation is active
//for(auto m = changes[c].mutations.begin(); m; ++m) {
// debugMutation("SSUpdateMutation", changes[c].version, *m);
//}
splitMutation( data->shards, m, ver );
}
if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
}
Version currentVersion;
private:
Version fromVersion;
Version restoredVersion;
KeyRef startKey;
bool nowAssigned;
bool processedStartKey;
void applyPrivateData( StorageServer* data, MutationRef const& m ) {
TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m.toString());
if (processedStartKey) {
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same keys
ASSERT (m.type == MutationRef::SetValue && m.param1.startsWith(data->sk));
KeyRangeRef keys( startKey.removePrefix( data->sk ), m.param1.removePrefix( data->sk ));
// add changes in shard assignment to the mutation log
setAssignedStatus( data, keys, nowAssigned );
// The changes for version have already been received (and are being processed now). We need
// to fetch the data for change.version-1 (changes from versions < change.version)
changeServerKeys( data, keys, nowAssigned, currentVersion-1, CSK_UPDATE );
processedStartKey = false;
} else if (m.type == MutationRef::SetValue && m.param1.startsWith( data->sk )) {
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same keys
startKey = m.param1;
nowAssigned = m.param2 != serverKeysFalse;
processedStartKey = true;
} else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
// lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)
// That means we don't have to worry about the impact on changeServerKeys
//ASSERT( /*isFirstVersionUpdateFromTLog && */!std::next(it) );
Version rollbackVersion;
BinaryReader br(m.param2, Unversioned());
br >> rollbackVersion;
if ( rollbackVersion < fromVersion && rollbackVersion > restoredVersion ) {
TEST( true ); // ShardApplyPrivateData shard rollback
TraceEvent(SevWarn, "Rollback", data->thisServerID)
.detail("FromVersion", fromVersion)
.detail("ToVersion", rollbackVersion)
.detail("AtVersion", currentVersion)
.detail("StorageVersion", data->storageVersion());
ASSERT( rollbackVersion >= data->storageVersion() );
}
// Don't let oldestVersion (and thus storageVersion) go into the rolled back range of versions
// Since we currently don't read from uncommitted log systems, seeing the lastEpochEnd implies that currentVersion is fully committed, so we can safely make it durable
if ( rollbackVersion < fromVersion && rollbackVersion > restoredVersion )
rollback( data, rollbackVersion, currentVersion );
data->recoveryVersionSkips.push_back(std::make_pair(rollbackVersion, currentVersion - rollbackVersion));
} else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) && m.param1.substr(1).startsWith(serverTagPrefix)) {
bool matchesThisServer = decodeServerTagKey(m.param1.substr(1)) == data->thisServerID;
if( (m.type == MutationRef::SetValue && !matchesThisServer) || (m.type == MutationRef::ClearRange && matchesThisServer) )
throw worker_removed();
} else {
ASSERT(false); // Unknown private mutation
}
}
};
ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
{
state double start;
try {
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
// This is often referred to as the storage server e-brake (emergency brake)
state double waitStartT = 0;
while ( data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES && data->durableVersion.get() < data->desiredOldestVersion.get() ) {
if (now() - waitStartT >= 1) {
TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
.detail("Version", data->version.get())
.detail("DurableVersion", data->durableVersion.get());
waitStartT = now();
}
data->behind = true;
Void _ = wait( delayJittered(.005, TaskTLogPeekReply) );
}
while( data->byteSampleClearsTooLarge.get() ) {
Void _ = wait( data->byteSampleClearsTooLarge.onChange() );
}
state Reference<ILogSystem::IPeekCursor> cursor = data->logCursor;
//TraceEvent("SSUpdatePeeking", data->thisServerID).detail("MyVer", data->version.get()).detail("Epoch", data->updateEpoch).detail("Seq", data->updateSequence);
loop {
Void _ = wait( cursor->getMore() );
if(!cursor->isExhausted()) {
break;
}
}
if(cursor->popped() > 0)
throw worker_removed();
++data->counters.updateBatches;
data->lastTLogVersion = cursor->getMaxKnownVersion();
data->versionLag = std::max<int64_t>(0, data->lastTLogVersion - data->version.get());
ASSERT(*pReceivedUpdate == false);
*pReceivedUpdate = true;
start = now();
Void _ = wait( data->durableVersionLock.take(TaskTLogPeekReply,1) );
state FlowLock::Releaser holdingDVL( data->durableVersionLock );
if(now() - start > 0.1)
TraceEvent("SSSlowTakeLock1", data->thisServerID).detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken).detail("Duration", now() - start).detail("Version", data->version.get());
start = now();
state UpdateEagerReadInfo eager;
state FetchInjectionInfo fii;
state Reference<ILogSystem::IPeekCursor> cloneCursor2;
loop{
state uint64_t changeCounter = data->shardChangeCounter;
bool epochEnd = false;
bool hasPrivateData = false;
bool firstMutation = true;
bool dbgLastMessageWasProtocol = false;
Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
cloneCursor2 = cursor->cloneNoMore();
cloneCursor1->setProtocolVersion(data->logProtocol);
for (; cloneCursor1->hasMessage(); cloneCursor1->nextMessage()) {
ArenaReader& cloneReader = *cloneCursor1->reader();
if (LogProtocolMessage::isNextIn(cloneReader)) {
LogProtocolMessage lpm;
cloneReader >> lpm;
dbgLastMessageWasProtocol = true;
cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
}
else {
MutationRef msg;
cloneReader >> msg;
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
firstMutation = false;
if (msg.param1 == lastEpochEndPrivateKey) {
epochEnd = true;
ASSERT(dbgLastMessageWasProtocol);
}
eager.addMutation(msg);
dbgLastMessageWasProtocol = false;
}
}
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in the middle of a rolled back version range.
while(!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send( &fii );
}
for(auto& c : fii.changes)
eager.addMutations(c.mutations);
Void _ = wait( doEagerReads( data, &eager ) );
if (data->shardChangeCounter == changeCounter) break;
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it again.
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads only selectively
eager = UpdateEagerReadInfo();
}
if(now() - start > 0.1)
TraceEvent("SSSlowTakeLock2", data->thisServerID).detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken).detail("Duration", now() - start).detail("Version", data->version.get());
data->updateEagerReads = &eager;
data->debug_inApplyUpdate = true;
StorageUpdater updater(data->lastVersionWithData, data->restoredVersion);
if (EXPENSIVE_VALIDATION) data->data().atLatest().validate();
validate(data);
state bool injectedChanges = false;
for(auto& c : fii.changes) {
for(auto& m : c.mutations) {
updater.applyMutation(data, m, c.version);
injectedChanges = true;
}
}
Version ver = invalidVersion;
cloneCursor2->setProtocolVersion(data->logProtocol);
//TraceEvent("SSUpdatePeeked", data->thisServerID).detail("FromEpoch", data->updateEpoch).detail("FromSeq", data->updateSequence).detail("ToEpoch", results.end_epoch).detail("ToSeq", results.end_seq).detail("MsgSize", results.messages.size());
for (;cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
auto &rd = *cloneCursor2->reader();
if (cloneCursor2->version().version > ver) ASSERT(cloneCursor2->version().version > data->version.get());
if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) {
++data->counters.updateVersions;
ver = cloneCursor2->version().version;
}
if (LogProtocolMessage::isNextIn(rd)) {
LogProtocolMessage lpm;
rd >> lpm;
data->logProtocol = rd.protocolVersion();
data->storage.changeLogProtocol(ver, data->logProtocol);
cloneCursor2->setProtocolVersion(rd.protocolVersion());
}
else {
MutationRef msg;
rd >> msg;
if (ver != invalidVersion) { // This change belongs to a version < minVersion
if (debugMutation("SSPeek", ver, msg) || ver == 1)
TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
updater.applyMutation(data, msg, ver);
data->counters.mutationBytes += msg.totalSize();
++data->counters.mutations;
switch(msg.type) {
case MutationRef::SetValue:
++data->counters.setMutations;
break;
case MutationRef::ClearRange:
++data->counters.clearRangeMutations;
break;
case MutationRef::AddValue:
case MutationRef::And:
case MutationRef::AndV2:
case MutationRef::AppendIfFits:
case MutationRef::ByteMax:
case MutationRef::ByteMin:
case MutationRef::Max:
case MutationRef::Min:
case MutationRef::MinV2:
case MutationRef::Or:
case MutationRef::Xor:
++data->counters.atomicMutations;
break;
}
}
else
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
}
}
if(ver != invalidVersion) {
data->lastVersionWithData = ver;
} else {
ver = cloneCursor2->version().version - 1;
}
if(injectedChanges) data->lastVersionWithData = ver;
data->updateEagerReads = NULL;
data->debug_inApplyUpdate = false;
if(ver == invalidVersion && !fii.changes.empty() ) {
ver = updater.currentVersion;
}
if(ver != invalidVersion) {
debugKeyRange("SSUpdate", ver, allKeys);
data->mutableData().createNewVersion(ver);
if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
data->noRecentUpdates.set(false);
data->lastUpdate = now();
data->version.set( ver ); // Triggers replies to waiting gets for new version(s)
if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
Version maxVersionInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;
for(int i = 0; i < data->recoveryVersionSkips.size(); i++) {
maxVersionInMemory += data->recoveryVersionSkips[i].second;
}
//TraceEvent("StorageServerUpdated", data->thisServerID).detail("Ver", ver).detail("DataVersion", data->version.get())
// .detail("LastTLogVersion", data->lastTLogVersion).detail("NewOldest", updater.newOldestVersion).detail("DesiredOldest",data->desiredOldestVersion.get())
// .detail("MaxVersionInMemory", maxVersionInMemory);
// Trigger updateStorage if necessary
Version proposedOldestVersion = std::max(data->version.get(), data->lastTLogVersion) - maxVersionInMemory;
proposedOldestVersion = std::min(proposedOldestVersion, data->version.get()-1);
proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());
while(!data->recoveryVersionSkips.empty() && proposedOldestVersion > data->recoveryVersionSkips.front().first) {
data->recoveryVersionSkips.pop_front();
}
data->desiredOldestVersion.set(proposedOldestVersion);
}
validate(data);
data->logCursor->advanceTo( cloneCursor2->version() );
if(cursor->version().version >= data->lastTLogVersion) {
if(data->behind) {
TraceEvent("StorageServerNoLongerBehind", data->thisServerID).detail("CursorVersion", cursor->version().version).detail("TLogVersion", data->lastTLogVersion);
}
data->behind = false;
}
return Void(); // update will get called again ASAP
} catch (Error& e) {
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot)
TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
throw;
}
}
ACTOR Future<Void> updateStorage(StorageServer* data) {
loop {
ASSERT( data->durableVersion.get() == data->storageVersion() );
Void _ = wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
Void _ = wait( delay(0, TaskUpdateStorage) );
state Version startOldestVersion = data->storageVersion();
state Version newOldestVersion = data->storageVersion();
state Version desiredVersion = data->desiredOldestVersion.get();
state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
loop {
state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
Future<Void> finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskUpdateStorage );
data->oldestVersion.set( newOldestVersion );
Void _ = wait( finishedForgetting );
Void _ = wait( yield(TaskUpdateStorage) );
if (done) break;
}
if (startOldestVersion != newOldestVersion)
data->storage.makeVersionDurable( newOldestVersion );
debug_advanceMaxCommittedVersion( data->thisServerID, newOldestVersion );
state Future<Void> durable = data->storage.commit();
state Future<Void> durableDelay = Void();
if (bytesLeft > 0)
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL);
Void _ = wait( durable );
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
// are applied after we change the durable version.
Void _ = wait( data->durableVersionLock.take() );
data->durableVersionLock.release();
Void _ = wait( delay(0, TaskUpdateStorage) );
data->popVersion( data->durableVersion.get() + 1 );
while (!changeDurableVersion( data, newOldestVersion )) {
Void _ = wait( yield(TaskUpdateStorage) );
}
//TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion);
Void _ = wait( durableDelay );
}
}
#pragma endregion
////////////////////////////////// StorageServerDisk ///////////////////////////////////////
#pragma region StorageServerDisk
#define PERSIST_PREFIX "\xff\xff"
// Immutable
static const KeyValueRef persistFormat( LiteralStringRef( PERSIST_PREFIX "Format" ), LiteralStringRef("FoundationDB/StorageServer/1/4") );
static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/StorageServer/1/2"), LiteralStringRef("FoundationDB/StorageServer/1/5") );
static const KeyRef persistID = LiteralStringRef( PERSIST_PREFIX "ID" );
// (Potentially) change with the durable version or when fetchKeys completes
static const KeyRef persistVersion = LiteralStringRef( PERSIST_PREFIX "Version" );
static const KeyRangeRef persistShardAssignedKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAssigned/" ), LiteralStringRef( PERSIST_PREFIX "ShardAssigned0" ) );
static const KeyRangeRef persistShardAvailableKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAvailable/" ), LiteralStringRef( PERSIST_PREFIX "ShardAvailable0" ) );
static const KeyRangeRef persistByteSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS0" ) );
static const KeyRangeRef persistByteSampleSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0" ) );
static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
void StorageServerDisk::makeNewStorageServerDurable() {
storage->set( persistFormat );
storage->set( KeyValueRef(persistID, BinaryWriter::toValue(data->thisServerID, Unversioned())) );
storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(data->version.get(), Unversioned())) );
storage->set( KeyValueRef(persistShardAssignedKeys.begin.toString(), LiteralStringRef("0")) );
storage->set( KeyValueRef(persistShardAvailableKeys.begin.toString(), LiteralStringRef("0")) );
}
void setAvailableStatus( StorageServer* self, KeyRangeRef keys, bool available ) {
//ASSERT( self->debug_inApplyUpdate );
ASSERT( !keys.empty() );
auto& mLV = self->addVersionToMutationLog( self->data().getLatestVersion() );
KeyRange availableKeys = KeyRangeRef( persistShardAvailableKeys.begin.toString() + keys.begin.toString(), persistShardAvailableKeys.begin.toString() + keys.end.toString() );
//TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", printable(availableKeys.begin)).detail("RangeEnd", printable(availableKeys.end));
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::ClearRange, availableKeys.begin, availableKeys.end ) );
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, availableKeys.begin, available ? LiteralStringRef("1") : LiteralStringRef("0") ) );
if (keys.end != allKeys.end) {
bool endAvailable = self->shards.rangeContaining( keys.end )->value()->isInVersionedData();
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, availableKeys.end, endAvailable ? LiteralStringRef("1") : LiteralStringRef("0") ) );
}
}
void setAssignedStatus( StorageServer* self, KeyRangeRef keys, bool nowAssigned ) {
ASSERT( !keys.empty() );
auto& mLV = self->addVersionToMutationLog( self->data().getLatestVersion() );
KeyRange assignedKeys = KeyRangeRef(
persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
persistShardAssignedKeys.begin.toString() + keys.end.toString() );
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", printable(assignedKeys.begin)).detail("RangeEnd", printable(assignedKeys.end));
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end ) );
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.begin,
nowAssigned ? LiteralStringRef("1") : LiteralStringRef("0") ) );
if (keys.end != allKeys.end) {
bool endAssigned = self->shards.rangeContaining( keys.end )->value()->assigned();
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.end, endAssigned ? LiteralStringRef("1") : LiteralStringRef("0") ) );
}
}
void StorageServerDisk::clearRange( KeyRangeRef keys ) {
storage->clear(keys);
}
void StorageServerDisk::writeKeyValue( KeyValueRef kv ) {
storage->set( kv );
}
void StorageServerDisk::writeMutation( MutationRef mutation ) {
// FIXME: debugMutation(debugContext, debugVersion, *m);
if (mutation.type == MutationRef::SetValue) {
storage->set( KeyValueRef(mutation.param1, mutation.param2) );
} else if (mutation.type == MutationRef::ClearRange) {
storage->clear( KeyRangeRef(mutation.param1, mutation.param2) );
} else
ASSERT(false);
}
void StorageServerDisk::writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext ) {
for(auto m = mutations.begin(); m; ++m) {
debugMutation(debugContext, debugVersion, *m);
if (m->type == MutationRef::SetValue) {
storage->set( KeyValueRef(m->param1, m->param2) );
} else if (m->type == MutationRef::ClearRange) {
storage->clear( KeyRangeRef(m->param1, m->param2) );
}
}
}
bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft ) {
if (bytesLeft <= 0) return true;
// Apply mutations from the mutationLog
auto u = data->getMutationLog().upper_bound(prevStorageVersion);
if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
VersionUpdateRef const& v = u->second;
ASSERT( v.version > prevStorageVersion && v.version <= newStorageVersion );
debugKeyRange("makeVersionMutationsDurable", v.version, allKeys);
writeMutations(v.mutations, v.version, "makeVersionDurable");
for(auto m=v.mutations.begin(); m; ++m)
bytesLeft -= mvccStorageBytes(*m);
prevStorageVersion = v.version;
return false;
} else {
prevStorageVersion = newStorageVersion;
return true;
}
}
// Update data->storage to persist the changes from (data->storageVersion(),version]
void StorageServerDisk::makeVersionDurable( Version version ) {
storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())) );
//TraceEvent("MakeDurable", data->thisServerID).detail("FromVersion", prevStorageVersion).detail("ToVersion", version);
}
void StorageServerDisk::changeLogProtocol(Version version, uint64_t protocol) {
data->addMutationToMutationLogOrStorage(version, MutationRef(MutationRef::SetValue, persistLogProtocol, BinaryWriter::toValue(protocol, Unversioned())));
}
ACTOR Future<Void> applyByteSampleResult( StorageServer* data, KeyRange range, Future<Standalone<VectorRef<KeyValueRef>>> result) {
Standalone<VectorRef<KeyValueRef>> bs = wait( result );
for( int j = 0; j < bs.size(); j++ ) {
KeyRef key = bs[j].key.removePrefix(persistByteSampleKeys.begin);
if(!data->byteSampleClears.rangeContaining(key).value()) {
data->metrics.byteSample.sample.insert( key, BinaryReader::fromStringRef<int32_t>(bs[j].value, Unversioned()), false );
}
}
data->byteSampleClears.insert(range, true);
data->byteSampleClearsTooLarge.set(data->byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
return Void();
}
ACTOR Future<Void> restoreByteSample(StorageServer* data, IKeyValueStore* storage, Standalone<VectorRef<KeyValueRef>> bsSample) {
Void _ = wait( delay( BUGGIFY ? g_random->random01() * 2.0 : 0.0001 ) );
TraceEvent("RecoveredByteSampleSample", data->thisServerID).detail("Keys", bsSample.size()).detail("ReadBytes", bsSample.expectedSize());
size_t bytes_per_fetch = 0;
// Since the expected size also includes (as of now) the space overhead of the container, we calculate our own number here
for( int i = 0; i < bsSample.size(); i++ )
bytes_per_fetch += BinaryReader::fromStringRef<int32_t>(bsSample[i].value, Unversioned());
bytes_per_fetch /= 32;
state std::vector<Future<Void>> sampleRanges;
int accumulatedSize = 0;
std::string prefix = PERSIST_PREFIX "BS/";
Key lastStart = LiteralStringRef( PERSIST_PREFIX "BS/" ); // make sure the first range starts at the absolute beginning of the byte sample
for( auto it = bsSample.begin(); it != bsSample.end(); ++it ) {
if( accumulatedSize >= bytes_per_fetch ) {
accumulatedSize = 0;
Key realKey = it->key.removePrefix( prefix );
KeyRange sampleRange = KeyRangeRef( lastStart, realKey );
sampleRanges.push_back( applyByteSampleResult(data, sampleRange.removePrefix(persistByteSampleKeys.begin), storage->readRange( sampleRange )) );
lastStart = realKey;
}
accumulatedSize += BinaryReader::fromStringRef<int32_t>(it->value, Unversioned());
}
// make sure that the last range goes all the way to the end of the byte sample
KeyRange sampleRange = KeyRangeRef( lastStart, LiteralStringRef( PERSIST_PREFIX "BS0" ));
sampleRanges.push_back( applyByteSampleResult(data, KeyRangeRef(lastStart.removePrefix(persistByteSampleKeys.begin), LiteralStringRef("\xff\xff\xff")), storage->readRange( sampleRange )) );
Void _ = wait( waitForAll( sampleRanges ) );
TraceEvent("RecoveredByteSampleChunkedRead", data->thisServerID).detail("Ranges",sampleRanges.size());
if( BUGGIFY )
Void _ = wait( delay( g_random->random01() * 10.0 ) );
return Void();
}
ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* storage ) {
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fID = storage->readValue(persistID);
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fByteSampleSample = storage->readRange(persistByteSampleSampleKeys);
TraceEvent("ReadingDurableState", data->thisServerID);
Void _ = wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat, fID, fVersion, fLogProtocol) ) );
Void _ = wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fShardAssigned, fShardAvailable, fByteSampleSample) ) );
TraceEvent("RestoringDurableState", data->thisServerID);
if (!fFormat.get().present()) {
// The DB was never initialized
TraceEvent("DBNeverInitialized", data->thisServerID);
storage->dispose();
data->thisServerID = UID();
data->sk = Key();
return false;
}
if (!persistFormatReadableRange.contains( fFormat.get().get() )) {
TraceEvent(SevError, "UnsupportedDBFormat").detail("Format", fFormat.get().get().toString()).detail("Expected", persistFormat.value.toString());
throw worker_recovery_failed();
}
data->thisServerID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned());
data->sk = serverKeysPrefixFor( data->thisServerID ).withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
if (fLogProtocol.get().present())
data->logProtocol = BinaryReader::fromStringRef<uint64_t>(fLogProtocol.get().get(), Unversioned());
state Version version = BinaryReader::fromStringRef<Version>( fVersion.get().get(), Unversioned() );
debug_checkRestoredVersion( data->thisServerID, version, "StorageServer" );
data->setInitialVersion( version );
state Standalone<VectorRef<KeyValueRef>> available = fShardAvailable.get();
state int availableLoc;
for(availableLoc=0; availableLoc<available.size(); availableLoc++) {
KeyRangeRef keys(
available[availableLoc].key.removePrefix(persistShardAvailableKeys.begin),
availableLoc+1==available.size() ? allKeys.end : available[availableLoc+1].key.removePrefix(persistShardAvailableKeys.begin));
ASSERT( !keys.empty() );
bool nowAvailable = available[availableLoc].value!=LiteralStringRef("0");
/*if(nowAvailable)
TraceEvent("AvailableShard", data->thisServerID).detail("RangeBegin", printable(keys.begin)).detail("RangeEnd", printable(keys.end));*/
data->newestAvailableVersion.insert( keys, nowAvailable ? latestVersion : invalidVersion );
Void _ = wait(yield());
}
state Standalone<VectorRef<KeyValueRef>> assigned = fShardAssigned.get();
state int assignedLoc;
for(assignedLoc=0; assignedLoc<assigned.size(); assignedLoc++) {
KeyRangeRef keys(
assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin),
assignedLoc+1==assigned.size() ? allKeys.end : assigned[assignedLoc+1].key.removePrefix(persistShardAssignedKeys.begin));
ASSERT( !keys.empty() );
bool nowAssigned = assigned[assignedLoc].value!=LiteralStringRef("0");
/*if(nowAssigned)
TraceEvent("AssignedShard", data->thisServerID).detail("RangeBegin", printable(keys.begin)).detail("RangeEnd", printable(keys.end));*/
changeServerKeys(data, keys, nowAssigned, version, CSK_RESTORE);
if (!nowAssigned) ASSERT( data->newestAvailableVersion.allEqual(keys, invalidVersion) );
Void _ = wait(yield());
}
Void _ = wait( applyByteSampleResult(data, persistByteSampleSampleKeys.removePrefix(persistByteSampleKeys.begin), fByteSampleSample) );
data->byteSampleRecovery = restoreByteSample(data, storage, fByteSampleSample.get());
Void _ = wait( delay( 0.0001 ) );
{
// Erase data which isn't available (it is from some fetch at a later version)
// SOMEDAY: Keep track of keys that might be fetching, make sure we don't have any data elsewhere?
for(auto it = data->newestAvailableVersion.ranges().begin(); it != data->newestAvailableVersion.ranges().end(); ++it) {
if (it->value() == invalidVersion) {
KeyRangeRef clearRange(it->begin(), it->end());
debugKeyRange("clearInvalidVersion", invalidVersion, clearRange);
storage->clear( clearRange );
data->byteSampleApplyClear( clearRange, invalidVersion );
}
}
}
validate(data, true);
return true;
}
Future<bool> StorageServerDisk::restoreDurableState() {
return ::restoreDurableState(data, storage);
}
//Determines whether a key-value pair should be included in a byte sample
//Also returns size information about the sample
ByteSampleInfo isKeyValueInSample(KeyValueRef keyValue) {
ByteSampleInfo info;
const KeyRef key = keyValue.key;
info.size = key.size() + keyValue.value.size();
uint32_t a = 0;
uint32_t b = 0;
hashlittle2( key.begin(), key.size(), &a, &b );
double probability = (double)info.size / (key.size() + SERVER_KNOBS->BYTE_SAMPLING_OVERHEAD) / SERVER_KNOBS->BYTE_SAMPLING_FACTOR;
info.inSample = a / ((1 << 30) * 4.0) < probability;
info.sampledSize = info.size / std::min(1.0, probability);
return info;
}
void StorageServer::addMutationToMutationLogOrStorage( Version ver, MutationRef m ) {
if (ver != invalidVersion) {
addMutationToMutationLog( addVersionToMutationLog(ver), m );
} else {
storage.writeMutation( m );
byteSampleApplyMutation( m, ver );
}
}
void StorageServer::byteSampleApplySet( KeyValueRef kv, Version ver ) {
// Update byteSample in memory and (eventually) on disk and notify waiting metrics
ByteSampleInfo sampleInfo = isKeyValueInSample(kv);
auto& byteSample = metrics.byteSample.sample;
int64_t delta = 0;
const KeyRef key = kv.key;
auto old = byteSample.find(key);
if (old != byteSample.end()) delta = -byteSample.getMetric(old);
if (sampleInfo.inSample) {
delta += sampleInfo.sampledSize;
byteSample.insert( key, sampleInfo.sampledSize );
addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::SetValue, key.withPrefix(persistByteSampleKeys.begin), BinaryWriter::toValue( sampleInfo.sampledSize, Unversioned() )) );
} else {
bool any = old != byteSample.end();
if(!byteSampleRecovery.isReady() ) {
if(!byteSampleClears.rangeContaining(key).value()) {
byteSampleClears.insert(key, true);
byteSampleClearsTooLarge.set(byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
any = true;
}
}
if (any) {
byteSample.erase(old);
auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin));
addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end) );
}
}
if (delta) metrics.notifyBytes( key, delta );
}
void StorageServer::byteSampleApplyClear( KeyRangeRef range, Version ver ) {
// Update byteSample in memory and (eventually) on disk via the mutationLog and notify waiting metrics
auto& byteSample = metrics.byteSample.sample;
bool any = false;
if(range.begin < allKeys.end) {
//NotifyBytes should not be called for keys past allKeys.end
KeyRangeRef searchRange = KeyRangeRef(range.begin, std::min(range.end, allKeys.end));
auto r = metrics.waitMetricsMap.intersectingRanges(searchRange);
for(auto shard = r.begin(); shard != r.end(); ++shard) {
KeyRangeRef intersectingRange = shard.range() & range;
int64_t bytes = byteSample.sumRange(intersectingRange.begin, intersectingRange.end);
metrics.notifyBytes(shard, -bytes);
any = any || bytes > 0;
}
}
if(range.end > allKeys.end && byteSample.sumRange(std::max(allKeys.end, range.begin), range.end) > 0)
any = true;
if(!byteSampleRecovery.isReady()) {
auto clearRanges = byteSampleClears.intersectingRanges(range);
for(auto it : clearRanges) {
if(!it.value()) {
byteSampleClears.insert(range, true);
byteSampleClearsTooLarge.set(byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
any = true;
break;
}
}
}
if (any) {
byteSample.eraseAsync( range.begin, range.end );
auto diskRange = range.withPrefix( persistByteSampleKeys.begin );
addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end) );
}
}
ACTOR Future<Void> waitMetrics( StorageServerMetrics* self, WaitMetricsRequest req, Future<Void> timeout ) {
state PromiseStream< StorageMetrics > change;
state StorageMetrics metrics = self->getMetrics( req.keys );
state Error error = success();
state bool timedout = false;
if ( !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
TEST( true ); // ShardWaitMetrics return case 1 (quickly)
req.reply.send( metrics );
return Void();
}
auto rs = self->waitMetricsMap.modify( req.keys );
for(auto r = rs.begin(); r != rs.end(); ++r)
r->value().push_back( change );
loop {
try {
choose {
when( StorageMetrics c = waitNext( change.getFuture() ) ) {
metrics += c;
// SOMEDAY: validation! The changes here are possibly partial changes (we recieve multiple messages per
// update to our requested range). This means that the validation would have to occur after all
// the messages for one clear or set have been dispatched.
/*StorageMetrics m = getMetrics( data, req.keys );
bool b = ( m.bytes != metrics.bytes || m.bytesPerKSecond != metrics.bytesPerKSecond || m.iosPerKSecond != metrics.iosPerKSecond );
if (b) {
printf("keys: '%s' - '%s' @%p\n", printable(req.keys.begin).c_str(), printable(req.keys.end).c_str(), this);
printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n", b, m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond, metrics.iosPerKSecond, c.bytes, c.bytesPerKSecond, c.iosPerKSecond);
}*/
}
when( Void _ = wait( timeout ) ) {
timedout = true;
}
}
} catch (Error& e) {
if( e.code() == error_code_actor_cancelled ) throw; // This is only cancelled when the main loop had exited...no need in this case to clean up self
error = e;
break;
}
if ( timedout || !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
TEST( !timedout ); // ShardWaitMetrics return case 2 (delayed)
TEST( timedout ); // ShardWaitMetrics return on timeout
req.reply.send( metrics );
break;
}
}
Void _ = wait( delay(0) ); //prevent iterator invalidation of functions sending changes
auto rs = self->waitMetricsMap.modify( req.keys );
for(auto i = rs.begin(); i != rs.end(); ++i) {
auto &x = i->value();
for( int j = 0; j < x.size(); j++ ) {
if( x[j] == change ) {
std::swap( x[j], x.back() );
x.pop_back();
break;
}
}
}
self->waitMetricsMap.coalesce( req.keys );
if (error.code() != error_code_success ) {
if (error.code() != error_code_wrong_shard_server) throw error;
TEST( true ); // ShardWaitMetrics delayed wrong_shard_server()
req.reply.sendError(error);
}
return Void();
}
Future<Void> StorageServerMetrics::waitMetrics(WaitMetricsRequest req, Future<Void> delay) {
return ::waitMetrics(this, req, delay);
}
#pragma endregion
/////////////////////////////// Core //////////////////////////////////////
#pragma region Core
ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi ) {
state Future<Void> doPollMetrics = Void();
state ActorCollection actors(false);
Void _ = wait( self->byteSampleRecovery );
actors.add(traceCounters("StorageMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self->counters.cc, self->thisServerID.toString() + "/StorageMetrics"));
loop {
choose {
when (WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
if (!self->isReadable( req.keys )) {
TEST( true ); // waitMetrics immediate wrong_shard_server()
req.reply.sendError(wrong_shard_server());
} else {
actors.add( self->metrics.waitMetrics( req, delayJittered( SERVER_KNOBS->STORAGE_METRIC_TIMEOUT ) ) );
}
}
when (SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) {
if (!self->isReadable( req.keys )) {
TEST( true ); // splitMetrics immediate wrong_shard_server()
req.reply.sendError(wrong_shard_server());
} else {
self->metrics.splitMetrics( req );
}
}
when (GetPhysicalMetricsRequest req = waitNext(ssi.getPhysicalMetrics.getFuture())) {
StorageBytes sb = self->storage.getStorageBytes();
self->metrics.getPhysicalMetrics( req, sb );
}
when (Void _ = wait(doPollMetrics) ) {
self->metrics.poll();
doPollMetrics = delay(SERVER_KNOBS->STORAGE_SERVER_POLL_METRICS_DELAY);
}
when(Void _ = wait(actors.getResult())) {}
}
}
}
ACTOR Future<Void> logLongByteSampleRecovery(Future<Void> recovery) {
choose {
when(Void _ = wait(recovery)) {}
when(Void _ = wait(delay(SERVER_KNOBS->LONG_BYTE_SAMPLE_RECOVERY_DELAY))) {
TraceEvent(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LongByteSampleRecovery");
}
}
return Void();
}
ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterface ssi )
{
state Future<Void> doUpdate = Void();
state bool updateReceived = false; // true iff the current update() actor assigned to doUpdate has already received an update from the tlog
state ActorCollection actors(false);
state double lastLoopTopTime = now();
state Future<Void> dbInfoChange = Void();
state Future<Void> checkLastUpdate = Void();
actors.add(updateStorage(self));
actors.add(waitFailureServer(ssi.waitFailure.getFuture()));
actors.add(self->otherError.getFuture());
actors.add(metricsCore(self, ssi));
actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
self->coreStarted.send( Void() );
loop {
++self->counters.loops;
double loopTopTime = now();
double elapsedTime = loopTopTime - lastLoopTopTime;
if( elapsedTime > 0.050 ) {
if (g_random->random01() < 0.01)
TraceEvent(SevWarn, "SlowSSLoopx100", self->thisServerID).detail("Elapsed", elapsedTime);
}
lastLoopTopTime = loopTopTime;
choose {
when( Void _ = wait( checkLastUpdate ) ) {
if(now() - self->lastUpdate >= CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION) {
self->noRecentUpdates.set(true);
checkLastUpdate = delay(CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION);
} else {
checkLastUpdate = delay( std::max(CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION-(now()-self->lastUpdate), 0.1) );
}
}
when( Void _ = wait( dbInfoChange ) ) {
TEST( self->logSystem ); // shardServer dbInfo changed
dbInfoChange = self->db->onChange();
if( self->db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS ) {
self->logSystem = ILogSystem::fromServerDBInfo( self->thisServerID, self->db->get() );
if (self->logSystem) {
if(self->db->get().logSystemConfig.recoveredAt.present()) {
self->poppedAllAfter = self->db->get().logSystemConfig.recoveredAt.get();
}
self->logCursor = self->logSystem->peekSingle( self->thisServerID, self->version.get() + 1, self->tag, self->history );
self->popVersion( self->durableVersion.get() + 1, true );
}
// If update() is waiting for results from the tlog, it might never get them, so needs to be cancelled. But if it is waiting later,
// cancelling it could cause problems (e.g. fetchKeys that already committed to transitioning to waiting state)
if (!updateReceived) {
doUpdate = Void();
}
}
}
when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.recieved"); //.detail("TaskID", g_network->getCurrentTask());
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
req.reply.send(GetValueReply());
else
actors.add( getValueQ( self, req ) );
}
when( WatchValueRequest req = waitNext(ssi.watchValue.getFuture()) ) {
// TODO: fast load balancing?
// SOMEDAY: combine watches for the same key/value into a single watch
actors.add( watchValueQ( self, req ) );
}
when (GetKeyRequest req = waitNext(ssi.getKey.getFuture())) {
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
actors.add( getKey( self, req ) );
}
when (GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture()) ) {
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
actors.add( getKeyValues( self, req ) );
}
when (GetShardStateRequest req = waitNext(ssi.getShardState.getFuture()) ) {
if (req.mode == GetShardStateRequest::NO_WAIT ) {
if( self->isReadable( req.keys ) )
req.reply.send(std::make_pair(self->version.get(),self->durableVersion.get()));
else
req.reply.sendError(wrong_shard_server());
} else {
actors.add( getShardStateQ( self, req ) );
}
}
when (StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) {
getQueuingMetrics(self, req);
}
when( ReplyPromise<Version> reply = waitNext(ssi.getVersion.getFuture()) ) {
reply.send( self->version.get() );
}
when( ReplyPromise<KeyValueStoreType> reply = waitNext(ssi.getKeyValueStoreType.getFuture()) ) {
reply.send( self->storage.getKeyValueStoreType() );
}
when( Void _ = wait(doUpdate) ) {
updateReceived = false;
if (!self->logSystem)
doUpdate = Never();
else
doUpdate = update( self, &updateReceived );
}
when(Void _ = wait(actors.getResult())) {}
}
}
}
bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData, Error const& e) {
self.shuttingDown = true;
// Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with self still valid
self.shards.insert( allKeys, Reference<ShardInfo>() );
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just close it.
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed)
persistentData->dispose();
else
persistentData->close();
if ( e.code() == error_code_worker_removed ||
e.code() == error_code_recruitment_failed ||
e.code() == error_code_file_not_found ||
e.code() == error_code_actor_cancelled )
{
TraceEvent("StorageServerTerminated", self.thisServerID).error(e, true);
return true;
} else
return false;
}
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db, std::string folder )
{
state StorageServer self(persistentData, db, ssi);
self.sk = serverKeysPrefixFor( self.thisServerID ).withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
self.folder = folder;
try {
if (seedTag == invalidTag) {
std::pair<Version, Tag> verAndTag = wait( addStorageServer(self.cx, ssi) ); // Might throw recruitment_failed in case of simultaneous master failure
self.tag = verAndTag.second;
self.setInitialVersion( verAndTag.first-1 );
} else {
self.tag = seedTag;
}
self.storage.makeNewStorageServerDurable();
Void _ = wait( self.storage.commit() );
TraceEvent("StorageServerInit", ssi.id()).detail("Version", self.version.get()).detail("SeedTag", seedTag.toString());
InitializeStorageReply rep;
rep.interf = ssi;
rep.addedVersion = self.version.get();
recruitReply.send(rep);
self.byteSampleRecovery = Void();
Void _ = wait( storageServerCore(&self, ssi) );
throw internal_error();
} catch (Error& e) {
// If we die with an error before replying to the recruitment request, send the error to the recruiter (ClusterController, and from there to the DataDistributionTeamCollection)
if (!recruitReply.isSet())
recruitReply.sendError( recruitment_failed() );
if (storageServerTerminated(self, persistentData, e))
return Void();
throw e;
}
}
ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface ssi )
{
state Transaction tr(self->cx);
loop {
state Future<Void> infoChanged = self->db->onChange();
state Reference<MultiInterface<MasterProxyInterface>> proxies( new MultiInterface<MasterProxyInterface>(self->db->get().client.proxies, self->db->get().myLocality, ALWAYS_FRESH) );
choose {
when( GetStorageServerRejoinInfoReply _rep = wait( proxies->size() ? loadBalance( proxies, &MasterProxyInterface::getStorageServerRejoinInfo, GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId()) ) : Never() ) ) {
state GetStorageServerRejoinInfoReply rep = _rep;
try {
tr.reset();
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setVersion( rep.version );
tr.addReadConflictRange(singleKeyRange(serverListKeyFor(ssi.id())));
tr.addReadConflictRange(singleKeyRange(serverTagKeyFor(ssi.id())));
tr.addReadConflictRange(serverTagHistoryRangeFor(ssi.id()));
tr.addReadConflictRange(singleKeyRange(tagLocalityListKeyFor(ssi.locality.dcId())));
tr.set(serverListKeyFor(ssi.id()), serverListValue(ssi));
if(rep.newLocality) {
tr.addReadConflictRange(tagLocalityListKeys);
tr.set( tagLocalityListKeyFor(ssi.locality.dcId()), tagLocalityListValue(rep.newTag.get().locality) );
}
if(rep.newTag.present()) {
KeyRange conflictRange = singleKeyRange(serverTagConflictKeyFor(rep.newTag.get()));
tr.addReadConflictRange( conflictRange );
tr.addWriteConflictRange( conflictRange );
tr.setOption(FDBTransactionOptions::FIRST_IN_BATCH);
tr.set( serverTagKeyFor(ssi.id()), serverTagValue(rep.newTag.get()) );
tr.atomicOp( serverTagHistoryKeyFor(ssi.id()), serverTagValue(rep.tag), MutationRef::SetVersionstampedKey );
}
if(rep.history.size() && rep.history.back().first < self->version.get()) {
tr.clear(serverTagHistoryRangeBefore(ssi.id(), self->version.get()));
}
choose {
when ( Void _ = wait( tr.commit() ) ) {
self->history = rep.history;
while(self->history.size() && self->history.back().first < self->version.get() ) {
self->history.pop_back();
}
if(rep.newTag.present()) {
self->tag = rep.newTag.get();
self->history.insert(self->history.begin(), std::make_pair(tr.getCommittedVersion(), rep.tag));
} else {
self->tag = rep.tag;
}
self->allHistory = self->history;
TraceEvent("SSTag", self->thisServerID).detail("MyTag", self->tag.toString());
for(auto it : self->history) {
TraceEvent("SSHistory", self->thisServerID).detail("Ver", it.first).detail("Tag", it.second.toString());
}
if(self->history.size() && BUGGIFY) {
TraceEvent("SSHistoryReboot", self->thisServerID);
throw please_reboot();
}
break;
}
when ( Void _ = wait(infoChanged) ) {}
}
} catch (Error& e) {
Void _ = wait( tr.onError(e) );
}
}
when ( Void _ = wait(infoChanged) ) {}
}
}
return Void();
}
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, Promise<Void> recovered )
{
state StorageServer self(persistentData, db, ssi);
self.folder = folder;
self.sk = serverKeysPrefixFor( self.thisServerID ).withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
try {
state double start = now();
TraceEvent("StorageServerRebootStart", self.thisServerID);
bool ok = wait( self.storage.restoreDurableState() );
if (!ok) {
if(recovered.canBeSet()) recovered.send(Void());
return Void();
}
TraceEvent("SSTimeRestoreDurableState", self.thisServerID).detail("TimeTaken", now() - start);
ASSERT( self.thisServerID == ssi.id() );
TraceEvent("StorageServerReboot", self.thisServerID)
.detail("Version", self.version.get());
if(recovered.canBeSet()) recovered.send(Void());
Void _ = wait( replaceInterface( &self, ssi ) );
TraceEvent("StorageServerStartingCore", self.thisServerID).detail("TimeTaken", now() - start);
//Void _ = wait( delay(0) ); // To make sure self->zkMasterInfo.onChanged is available to wait on
Void _ = wait( storageServerCore(&self, ssi) );
throw internal_error();
} catch (Error& e) {
if(recovered.canBeSet()) recovered.send(Void());
if (storageServerTerminated(self, persistentData, e))
return Void();
throw e;
}
}
#pragma endregion
/*
4 Reference count
4 priority
24 pointers
8 lastUpdateVersion
2 updated, replacedPointer
--
42 PTree overhead
8 Version insertVersion
--
50 VersionedMap overhead
12 KeyRef
12 ValueRef
1 isClear
--
25 payload
50 overhead
25 payload
21 structure padding
32 allocator rounds up
---
128 allocated
To reach 64, need to save: 11 bytes + all padding
Possibilities:
-8 Combine lastUpdateVersion, insertVersion?
-2 Fold together updated, replacedPointer, isClear bits
-3 Fold away updated, replacedPointer, isClear
-8 Move value lengths into arena
-4 Replace priority with H(pointer)
-12 Compress pointers (using special allocator)
-4 Modular lastUpdateVersion (make sure no node survives 4 billion updates)
*/
void versionedMapTest() {
VersionedMap<int,int> vm;
printf("SS Ptree node is %zu bytes\n", sizeof( StorageServer::VersionedData::PTreeT ) );
const int NSIZE = sizeof(VersionedMap<int,int>::PTreeT);
const int ASIZE = NSIZE<=64 ? 64 : NextPowerOfTwo<NSIZE>::Result;
auto before = FastAllocator< ASIZE >::getMemoryUsed();
for(int v=1; v<=1000; ++v) {
vm.createNewVersion(v);
for(int i=0; i<1000; i++) {
int k = g_random->randomInt(0, 2000000);
/*for(int k2=k-5; k2<k+5; k2++)
if (vm.atLatest().find(k2) != vm.atLatest().end())
vm.erase(k2);*/
vm.erase( k-5, k+5 );
vm.insert( k, v );
}
}
auto after = FastAllocator< ASIZE >::getMemoryUsed();
int count = 0;
for(auto i = vm.atLatest().begin(); i != vm.atLatest().end(); ++i)
++count;
printf("PTree node is %d bytes, allocated as %d bytes\n", NSIZE, ASIZE);
printf("%d distinct after %d insertions\n", count, 1000*1000);
printf("Memory used: %f MB\n",
(after - before)/ 1e6);
}