Merge remote-tracking branch 'upstream/master' into flowlock-api

This commit is contained in:
Alex Miller 2019-07-03 20:44:15 -07:00
commit ea6898144d
16 changed files with 178 additions and 87 deletions

View File

@ -60,7 +60,7 @@ services:
snapshot-cmake: &snapshot-cmake
<<: *build-setup
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DFDB_RELEASE=0 -DVALGRIND=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" packages preinstall && cpack'
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=0 -DVALGRIND=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" packages preinstall && cpack'
prb-cmake:
<<: *snapshot-cmake
@ -68,7 +68,7 @@ services:
snapshot-ctest: &snapshot-ctest
<<: *build-setup
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure'
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure'
prb-ctest:
<<: *snapshot-ctest
@ -76,7 +76,7 @@ services:
snapshot-correctness: &snapshot-correctness
<<: *build-setup
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure
prb-correctness:
<<: *snapshot-correctness

View File

@ -161,7 +161,6 @@ else()
-Wno-deprecated
-fvisibility=hidden
-Wreturn-type
-fdiagnostics-color=always
-fPIC)
if (GPERFTOOLS_FOUND AND GCC)
add_compile_options(

View File

@ -180,12 +180,12 @@ function(add_flow_target)
list(APPEND generated_files ${CMAKE_CURRENT_BINARY_DIR}/${generated})
if(WIN32)
add_custom_command(OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${generated}"
COMMAND $<TARGET_FILE:actorcompiler> "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} ${actor_compiler_flags}
COMMAND $<TARGET_FILE:actorcompiler> "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags}
DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/${src}" actorcompiler
COMMENT "Compile actor: ${src}")
else()
add_custom_command(OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${generated}"
COMMAND ${MONO_EXECUTABLE} ${actor_exe} "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} ${actor_compiler_flags} > /dev/null
COMMAND ${MONO_EXECUTABLE} ${actor_exe} "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} > /dev/null
DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/${src}" actorcompiler
COMMENT "Compile actor: ${src}")
endif()

View File

@ -14,6 +14,8 @@ Performance
Fixes
-----
* If a cluster is upgraded during an ``onError`` call, the cluster could return a ``cluster_version_changed`` error. `(PR #1734) <https://github.com/apple/foundationdb/pull/1734>`_.
Status
------

View File

@ -588,7 +588,20 @@ ThreadFuture<Void> MultiVersionTransaction::onError(Error const& e) {
else {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->onError(e) : ThreadFuture<Void>(Never());
return abortableFuture(f, tr.onChange);
f = abortableFuture(f, tr.onChange);
return flatMapThreadFuture<Void, Void>(f, [this, e](ErrorOr<Void> ready) {
if(!ready.isError() || ready.getError().code() != error_code_cluster_version_changed) {
if(ready.isError()) {
return ErrorOr<ThreadFuture<Void>>(ready.getError());
}
return ErrorOr<ThreadFuture<Void>>(Void());
}
updateTransaction();
return ErrorOr<ThreadFuture<Void>>(onError(e));
});
}
}

View File

@ -20,8 +20,9 @@
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/IKeyValueStore.h"
#include "flow/ActorCollection.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/UnitTest.h"
#include "flow/IndexedSet.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -360,11 +361,11 @@ struct LeaderRegisterCollection {
return Void();
}
LeaderElectionRegInterface& getInterface(KeyRef key) {
LeaderElectionRegInterface& getInterface(KeyRef key, UID id) {
auto i = registerInterfaces.find( key );
if (i == registerInterfaces.end()) {
Key k = key;
Future<Void> a = wrap(this, k, leaderRegister(registerInterfaces[k], k) );
Future<Void> a = wrap(this, k, leaderRegister(registerInterfaces[k], k), id);
if (a.isError()) throw a.getError();
ASSERT( !a.isReady() );
actors.add( a );
@ -374,11 +375,15 @@ struct LeaderRegisterCollection {
return i->value;
}
ACTOR static Future<Void> wrap( LeaderRegisterCollection* self, Key key, Future<Void> actor ) {
ACTOR static Future<Void> wrap( LeaderRegisterCollection* self, Key key, Future<Void> actor, UID id ) {
state Error e;
try {
// FIXME: Get worker ID here
startRole(Role::COORDINATOR, id, UID());
wait(actor);
endRole(Role::COORDINATOR, id, "Coordinator changed");
} catch (Error& err) {
endRole(Role::COORDINATOR, id, err.what(), err.code() == error_code_actor_cancelled, err);
if (err.code() == error_code_actor_cancelled)
throw;
e = err;
@ -392,7 +397,7 @@ struct LeaderRegisterCollection {
// leaderServer multiplexes multiple leaderRegisters onto a single LeaderElectionRegInterface,
// creating and destroying them on demand.
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore *pStore) {
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore *pStore, UID id) {
state LeaderRegisterCollection regs( pStore );
state ActorCollection forwarders(false);
@ -404,21 +409,21 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
if( forward.present() )
req.reply.send( forward.get() );
else
regs.getInterface(req.key).getLeader.send( req );
regs.getInterface(req.key, id).getLeader.send( req );
}
when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
Optional<LeaderInfo> forward = regs.getForward(req.key);
if( forward.present() )
req.reply.send( forward.get() );
else
regs.getInterface(req.key).candidacy.send(req);
regs.getInterface(req.key, id).candidacy.send(req);
}
when ( LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
Optional<LeaderInfo> forward = regs.getForward(req.key);
if( forward.present() )
req.reply.send( false );
else
regs.getInterface(req.key).leaderHeartbeat.send(req);
regs.getInterface(req.key, id).leaderHeartbeat.send(req);
}
when ( ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
Optional<LeaderInfo> forward = regs.getForward(req.key);
@ -426,7 +431,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
req.reply.send( Void() );
else {
forwarders.add( LeaderRegisterCollection::setForward( &regs, req.key, ClusterConnectionString(req.conn.toString()) ) );
regs.getInterface(req.key).forward.send(req);
regs.getInterface(req.key, id).forward.send(req);
}
}
when( wait( forwarders.getResult() ) ) { ASSERT(false); throw internal_error(); }
@ -442,7 +447,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()).detail("Folder", dataFolder);
try {
wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store) || store.getError() );
wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID) || store.getError() );
throw internal_error();
} catch (Error& e) {
TraceEvent("CoordinationServerError", myID).error(e, true);

View File

@ -69,6 +69,7 @@
// // Retrieves the previously stored boolean
// bool getPrefixSource() const;
//
#pragma pack(push,1)
template <typename T, typename DeltaT = typename T::Delta, typename OffsetT = uint16_t>
struct DeltaTree {
@ -76,36 +77,47 @@ struct DeltaTree {
return std::numeric_limits<OffsetT>::max();
};
#pragma pack(push,1)
struct Node {
OffsetT leftChildOffset;
OffsetT rightChildOffset;
DeltaT delta[0];
inline DeltaT & delta() {
return *(DeltaT *)(this + 1);
};
inline const DeltaT & delta() const {
return *(const DeltaT *)(this + 1);
};
Node * rightChild() const {
//printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta->size());
return rightChildOffset == 0 ? nullptr : (Node *)((uint8_t *)delta + rightChildOffset);
//printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta().size());
return rightChildOffset == 0 ? nullptr : (Node *)((uint8_t *)&delta() + rightChildOffset);
}
Node * leftChild() const {
//printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta->size());
return leftChildOffset == 0 ? nullptr : (Node *)((uint8_t *)delta + leftChildOffset);
//printf("Node(%p): leftOffset=%d rightOffset=%d deltaSize=%d\n", this, (int)leftChildOffset, (int)rightChildOffset, (int)delta().size());
return leftChildOffset == 0 ? nullptr : (Node *)((uint8_t *)&delta() + leftChildOffset);
}
int size() const {
return sizeof(Node) + delta->size();
return sizeof(Node) + delta().size();
}
};
#pragma pack(pop)
#pragma pack(push,1)
struct {
OffsetT nodeBytes; // Total size of all Nodes including the root
uint8_t initialDepth; // Levels in the tree as of the last rebuild
Node root[0];
};
#pragma pack(pop)
inline Node & root() {
return *(Node *)(this + 1);
}
inline const Node & root() const {
return *(const Node *)(this + 1);
}
int size() const {
return sizeof(DeltaTree) + nodeBytes;
}
@ -119,18 +131,18 @@ public:
struct DecodedNode {
DecodedNode(Node *raw, const T *prev, const T *next, Arena &arena)
: raw(raw), parent(nullptr), left(nullptr), right(nullptr), prev(prev), next(next),
item(raw->delta->apply(raw->delta->getPrefixSource() ? *prev : *next, arena))
item(raw->delta().apply(raw->delta().getPrefixSource() ? *prev : *next, arena))
{
//printf("DecodedNode1 raw=%p delta=%s\n", raw, raw->delta->toString().c_str());
//printf("DecodedNode1 raw=%p delta=%s\n", raw, raw->delta().toString().c_str());
}
DecodedNode(Node *raw, DecodedNode *parent, bool left, Arena &arena)
: parent(parent), raw(raw), left(nullptr), right(nullptr),
prev(left ? parent->prev : &parent->item),
next(left ? &parent->item : parent->next),
item(raw->delta->apply(raw->delta->getPrefixSource() ? *prev : *next, arena))
item(raw->delta().apply(raw->delta().getPrefixSource() ? *prev : *next, arena))
{
//printf("DecodedNode2 raw=%p delta=%s\n", raw, raw->delta->toString().c_str());
//printf("DecodedNode2 raw=%p delta=%s\n", raw, raw->delta().toString().c_str());
}
Node *raw;
@ -175,7 +187,7 @@ public:
lower = new(arena) T(arena, *lower);
upper = new(arena) T(arena, *upper);
root = (tree->nodeBytes == 0) ? nullptr : new (arena) DecodedNode(tree->root, lower, upper, arena);
root = (tree->nodeBytes == 0) ? nullptr : new (arena) DecodedNode(&tree->root(), lower, upper, arena);
}
const T *lowerBound() const {
@ -330,7 +342,7 @@ public:
// The boundary leading to the new page acts as the last time we branched right
if(begin != end) {
nodeBytes = build(*root, begin, end, prev, next);
nodeBytes = build(root(), begin, end, prev, next);
}
else {
nodeBytes = 0;
@ -341,7 +353,7 @@ public:
private:
static OffsetT build(Node &root, const T *begin, const T *end, const T *prev, const T *next) {
//printf("build: %s to %s\n", begin->toString().c_str(), (end - 1)->toString().c_str());
//printf("build: root at %p sizeof(Node) %d delta at %p \n", &root, sizeof(Node), root.delta);
//printf("build: root at %p sizeof(Node) %d delta at %p \n", &root, sizeof(Node), &root.delta());
ASSERT(end != begin);
int count = end - begin;
@ -370,12 +382,12 @@ private:
base = next;
}
int deltaSize = item.writeDelta(*root.delta, *base, commonPrefix);
root.delta->setPrefixSource(prefixSourcePrev);
//printf("Serialized %s to %p\n", item.toString().c_str(), root.delta);
int deltaSize = item.writeDelta(root.delta(), *base, commonPrefix);
root.delta().setPrefixSource(prefixSourcePrev);
//printf("Serialized %s to %p\n", item.toString().c_str(), &root.delta());
// Continue writing after the serialized Delta.
uint8_t *wptr = (uint8_t *)root.delta + deltaSize;
uint8_t *wptr = (uint8_t *)&root.delta() + deltaSize;
// Serialize left child
if(count > 1) {
@ -388,7 +400,7 @@ private:
// Serialize right child
if(count > 2) {
root.rightChildOffset = wptr - (uint8_t *)root.delta;
root.rightChildOffset = wptr - (uint8_t *)&root.delta();
wptr += build(*(Node *)wptr, begin + mid + 1, end, &item, next);
}
else {

View File

@ -715,7 +715,7 @@ KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memor
IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit, std::string ext ) {
TraceEvent("KVSMemOpening", logID).detail("Basename", basename).detail("MemoryLimit", memoryLimit);
IDiskQueue *log = openDiskQueue( basename, ext, logID, DiskQueueVersion::V0 );
IDiskQueue *log = openDiskQueue( basename, ext, logID, DiskQueueVersion::V1 );
return new KeyValueStoreMemory( log, logID, memoryLimit, false, false, false );
}

View File

@ -1477,7 +1477,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->rejoins = rejoins;
logSystem->lockResults = lockResults;
logSystem->recoverAt = minEnd;
logSystem->knownCommittedVersion = knownCommittedVersion;
if (knownCommittedVersion > minEnd) {
// FIXME: Remove the Sev40 once disk snapshot v2 feature is enabled, in all other
// code paths we should never be here.
TraceEvent(SevError, "KCVIsInvalid")
.detail("KnownCommittedVersion", knownCommittedVersion)
.detail("MinEnd", minEnd);
logSystem->knownCommittedVersion = minEnd;
} else {
logSystem->knownCommittedVersion = knownCommittedVersion;
}
logSystem->remoteLogsWrittenToCoreState = true;
logSystem->stopped = true;
logSystem->pseudoLocalities = prevState.pseudoLocalities;

View File

@ -431,7 +431,14 @@ struct RedwoodRecordRef {
};
uint8_t flags;
byte data[];
inline byte * data() {
return (byte *)(this + 1);
}
inline const byte * data() const {
return (const byte *)(this + 1);
}
void setPrefixSource(bool val) {
if(val) {
@ -447,7 +454,7 @@ struct RedwoodRecordRef {
}
RedwoodRecordRef apply(const RedwoodRecordRef &base, Arena &arena) const {
Reader r(data);
Reader r(data());
int intFieldSuffixLen = flags & INT_FIELD_SUFFIX_BITS;
int prefixLen = r.readVarInt();
@ -501,19 +508,19 @@ struct RedwoodRecordRef {
}
int size() const {
Reader r(data);
Reader r(data());
int intFieldSuffixLen = flags & INT_FIELD_SUFFIX_BITS;
r.readVarInt(); // prefixlen
int valueLen = (flags & HAS_VALUE) ? r.read<uint8_t>() : 0;
int keySuffixLen = (flags & HAS_KEY_SUFFIX) ? r.readVarInt() : 0;
return sizeof(Delta) + r.rptr - data + intFieldSuffixLen + valueLen + keySuffixLen;
return sizeof(Delta) + r.rptr - data() + intFieldSuffixLen + valueLen + keySuffixLen;
}
// Delta can't be determined without the RedwoodRecordRef upon which the Delta is based.
std::string toString() const {
Reader r(data);
Reader r(data());
std::string flagString = " ";
if(flags & PREFIX_SOURCE) flagString += "prefixSource ";
@ -638,7 +645,7 @@ struct RedwoodRecordRef {
commonPrefix = getCommonPrefixLen(base, 0);
}
Writer w(d.data);
Writer w(d.data());
// prefixLen
w.writeVarInt(commonPrefix);
@ -688,7 +695,7 @@ struct RedwoodRecordRef {
w.writeString(value.get());
}
return w.wptr - d.data + sizeof(Delta);
return w.wptr - d.data() + sizeof(Delta);
}
template<typename StringRefT>
@ -737,10 +744,17 @@ struct BTreePage {
uint16_t count;
uint32_t kvBytes;
uint8_t extensionPageCount;
LogicalPageID extensionPages[0];
};
#pragma pack(pop)
inline LogicalPageID * extensionPages() {
return (LogicalPageID *)(this + 1);
}
inline const LogicalPageID * extensionPages() const {
return (const LogicalPageID *)(this + 1);
}
int size() const {
const BinaryTree *t = &tree();
return (uint8_t *)t - (uint8_t *)this + t->size();
@ -751,15 +765,15 @@ struct BTreePage {
}
BinaryTree & tree() {
return *(BinaryTree *)(extensionPages + extensionPageCount);
return *(BinaryTree *)(extensionPages() + extensionPageCount);
}
const BinaryTree & tree() const {
return *(const BinaryTree *)(extensionPages + extensionPageCount);
return *(const BinaryTree *)(extensionPages() + extensionPageCount);
}
static inline int GetHeaderSize(int extensionPages = 0) {
return sizeof(BTreePage) + extensionPages + sizeof(LogicalPageID);
return sizeof(BTreePage) + (extensionPages * sizeof(LogicalPageID));
}
std::string toString(bool write, LogicalPageID id, Version ver, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) const {
@ -1603,7 +1617,7 @@ private:
for(int e = 0, eEnd = extPages.size(); e < eEnd; ++e) {
LogicalPageID eid = m_pager->allocateLogicalPage();
debug_printf("%p: writePages(): Writing extension page op=write id=%u @%" PRId64 " (%d of %lu) referencePageID=%u\n", actor_debug, eid, version, e + 1, extPages.size(), id);
newPage->extensionPages[e] = bigEndian32(eid);
newPage->extensionPages()[e] = bigEndian32(eid);
// If replacing the primary page below (version == 0) then pass the primary page's ID as the reference page ID
m_pager->writePage(eid, extPages[e], version, (version == 0) ? id : invalidLogicalPageID);
++counts.extPageWrites;
@ -1620,8 +1634,8 @@ private:
// Free the old extension pages now that all replacement pages have been written
for(int i = 0; i < originalPage->extensionPageCount; ++i) {
//debug_printf("%p: writePages(): Freeing old extension op=del id=%u @latest\n", actor_debug, bigEndian32(originalPage->extensionPages[i]));
//m_pager->freeLogicalPage(bigEndian32(originalPage->extensionPages[i]), version);
//debug_printf("%p: writePages(): Freeing old extension op=del id=%u @latest\n", actor_debug, bigEndian32(originalPage->extensionPages()[i]));
//m_pager->freeLogicalPage(bigEndian32(originalPage->extensionPages()[i]), version);
}
return primaryLogicalPageIDs;
@ -1684,8 +1698,8 @@ private:
pageGets.push_back(std::move(result));
for(int i = 0; i < pTreePage->extensionPageCount; ++i) {
debug_printf("readPage() Reading extension page op=read id=%u @%" PRId64 " ext=%d/%d\n", bigEndian32(pTreePage->extensionPages[i]), snapshot->getVersion(), i + 1, (int)pTreePage->extensionPageCount);
pageGets.push_back(snapshot->getPhysicalPage(bigEndian32(pTreePage->extensionPages[i])));
debug_printf("readPage() Reading extension page op=read id=%u @%" PRId64 " ext=%d/%d\n", bigEndian32(pTreePage->extensionPages()[i]), snapshot->getVersion(), i + 1, (int)pTreePage->extensionPageCount);
pageGets.push_back(snapshot->getPhysicalPage(bigEndian32(pTreePage->extensionPages()[i])));
}
std::vector<Reference<const IPage>> pages = wait(getAll(pageGets));
@ -3561,12 +3575,12 @@ TEST_CASE("!/redwood/correctness/unit/deltaTree/RedwoodRecordRef") {
while(1) {
if(fwd.get() != items[i]) {
printf("forward iterator i=%d\n %s found\n %s expected\n", i, fwd.get().toString().c_str(), items[i].toString().c_str());
printf("Delta: %s\n", fwd.node->raw->delta->toString().c_str());
printf("Delta: %s\n", fwd.node->raw->delta().toString().c_str());
ASSERT(false);
}
if(rev.get() != items[items.size() - 1 - i]) {
printf("reverse iterator i=%d\n %s found\n %s expected\n", i, rev.get().toString().c_str(), items[items.size() - 1 - i].toString().c_str());
printf("Delta: %s\n", rev.node->raw->delta->toString().c_str());
printf("Delta: %s\n", rev.node->raw->delta().toString().c_str());
ASSERT(false);
}
++i;

View File

@ -369,6 +369,7 @@ struct Role {
static const Role LOG_ROUTER;
static const Role DATA_DISTRIBUTOR;
static const Role RATEKEEPER;
static const Role COORDINATOR;
std::string roleName;
std::string abbreviation;
@ -415,7 +416,8 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, StorageServerIn
Reference<AsyncVar<ServerDBInfo>> db, std::string folder);
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo>> db, std::string folder,
Promise<Void> recovered); // changes pssi->id() to be the recovered ID
Promise<Void> recovered,
Reference<ClusterConnectionFile> connFile ); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
ACTOR Future<Void> masterServer(MasterInterface mi, Reference<AsyncVar<ServerDBInfo>> db,
ServerCoordinators serverCoordinators, LifetimeToken lifetime, bool forceRecovery);
ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMasterProxyRequest req,

View File

@ -1126,7 +1126,7 @@ ACTOR Future<Void> rejoinRequestHandler( Reference<MasterData> self ) {
}
}
ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems, Promise<Void> remoteRecovered ) {
ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems ) {
state Future<Void> rejoinRequests = Never();
state DBRecoveryCount recoverCount = self->cstate.myDBState.recoveryCount + 1;
loop {
@ -1170,10 +1170,6 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
}
self->registrationTrigger.trigger();
if(allLogs && remoteRecovered.canBeSet()) {
remoteRecovered.send(Void());
}
if( finalUpdate ) {
oldLogSystems->get()->stopRejoins();
rejoinRequests = rejoinRequestHandler(self);
@ -1386,8 +1382,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
// we made to the new Tlogs (self->recoveryTransactionVersion), and only our own semi-commits can come between our
// first commit and the next new TLogs
state Promise<Void> remoteRecovered;
self->addActor.send( trackTlogRecovery(self, oldLogSystems, remoteRecovered) );
self->addActor.send( trackTlogRecovery(self, oldLogSystems) );
debug_advanceMaxCommittedVersion(UID(), self->recoveryTransactionVersion);
wait(self->cstateUpdated.getFuture());
debug_advanceMinCommittedVersion(UID(), self->recoveryTransactionVersion);

View File

@ -31,6 +31,7 @@
#include "fdbclient/SystemData.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -1317,8 +1318,7 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
ASSERT(returnKey != sel.getKey());
return returnKey;
}
else
} else
return forward ? range.end : range.begin;
}
}
@ -2865,11 +2865,11 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
if (g_network->isSimulated()) {
double endTime = g_simulator.checkDisabled(format("%s/updateStorage", data->thisServerID.toString().c_str()));
if(endTime > now()) {
wait(delay(endTime - now(), TaskPriority::Storage));
wait(delay(endTime - now(), TaskPriority::UpdateStorage));
}
}
wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
wait( delay(0, TaskPriority::Storage) );
wait( delay(0, TaskPriority::UpdateStorage) );
state Promise<Void> durableInProgress;
data->durableInProgress = durableInProgress.getFuture();
@ -2884,10 +2884,10 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
Future<Void> finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::Storage );
Future<Void> finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage );
data->oldestVersion.set( newOldestVersion );
wait( finishedForgetting );
wait( yield(TaskPriority::Storage) );
wait( yield(TaskPriority::UpdateStorage) );
if (done) break;
}
@ -2900,9 +2900,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
state Future<Void> durableDelay = Void();
if (bytesLeft > 0) {
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::Storage);
} else {
durableDelay = delay(0, TaskPriority::UpdateStorage) || delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::Storage);
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage);
}
wait( durable );
@ -2922,7 +2920,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
}
durableInProgress.send(Void());
wait( delay(0, TaskPriority::Storage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
wait( delay(0, TaskPriority::UpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
// are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
@ -2931,9 +2929,9 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
data->popVersion( data->durableVersion.get() + 1 );
while (!changeDurableVersion( data, newOldestVersion )) {
if(g_network->check_yield(TaskPriority::Storage)) {
if(g_network->check_yield(TaskPriority::UpdateStorage)) {
data->durableVersionLock.release();
wait(delay(0, TaskPriority::Storage));
wait(delay(0, TaskPriority::UpdateStorage));
wait( data->durableVersionLock.take() );
}
}
@ -3626,6 +3624,38 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
return false;
}
ACTOR Future<Void> memoryStoreRecover(IKeyValueStore* store, Reference<ClusterConnectionFile> connFile, UID id)
{
if (store->getType() != KeyValueStoreType::MEMORY || connFile.getPtr() == nullptr) {
return Never();
}
// create a temp client connect to DB
Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST);
state Transaction tr( cx );
state int noCanRemoveCount = 0;
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state bool canRemove = wait( canRemoveStorageServer( &tr, id ) );
if (!canRemove) {
TEST(true); // it's possible that the caller had a transaction in flight that assigned keys to the server. Wait for it to reverse its mistake.
wait( delayJittered(SERVER_KNOBS->REMOVE_RETRY_DELAY, TaskPriority::UpdateStorage) );
tr.reset();
TraceEvent("RemoveStorageServerRetrying").detail("Count", noCanRemoveCount++).detail("ServerID", id).detail("CanRemove", canRemove);
} else {
return Void();
}
} catch (Error& e) {
state Error err = e;
wait(tr.onError(e));
TraceEvent("RemoveStorageServerRetrying").error(err);
}
}
}
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db, std::string folder )
{
@ -3745,7 +3775,7 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
return Void();
}
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, Promise<Void> recovered )
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, Promise<Void> recovered, Reference<ClusterConnectionFile> connFile)
{
state StorageServer self(persistentData, db, ssi);
self.folder = folder;
@ -3753,8 +3783,19 @@ ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerI
try {
state double start = now();
TraceEvent("StorageServerRebootStart", self.thisServerID);
wait(self.storage.init());
wait(self.storage.commit()); //after a rollback there might be uncommitted changes.
choose {
//after a rollback there might be uncommitted changes.
//for memory storage engine type, wait until recovery is done before commit
when( wait(self.storage.commit())) {}
when( wait(memoryStoreRecover (persistentData, connFile, self.thisServerID))) {
TraceEvent("DisposeStorageServer", self.thisServerID);
throw worker_removed();
}
}
bool ok = wait( self.storage.restoreDurableState() );
if (!ok) {
if(recovered.canBeSet()) recovered.send(Void());

View File

@ -556,7 +556,7 @@ ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
prevStorageServer = storageServer( store, recruited, db, folder, Promise<Void>() );
prevStorageServer = storageServer( store, recruited, db, folder, Promise<Void>(), Reference<ClusterConnectionFile> (nullptr) );
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed());
}
}
@ -804,7 +804,7 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN(recruited.watchValue);
Promise<Void> recovery;
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery, connFile);
recoveries.push_back(recovery.getFuture());
f = handleIOErrors( f, kv, s.storeID, kvClosed );
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv);
@ -1403,3 +1403,4 @@ const Role Role::TESTER("Tester", "TS");
const Role Role::LOG_ROUTER("LogRouter", "LR");
const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD");
const Role Role::RATEKEEPER("Ratekeeper", "RK");
const Role Role::COORDINATOR("Coordinator", "CD");

View File

@ -215,7 +215,7 @@ struct ExternalWorkload : TestWorkload, FDBWorkloadContext {
Promise<bool> promise;
auto f = promise.getFuture();
keepAlive(f, database);
workloadImpl->start(reinterpret_cast<FDBDatabase*>(database.getPtr()),
workloadImpl->check(reinterpret_cast<FDBDatabase*>(database.getPtr()),
GenericPromise<bool>(new FDBPromiseImpl(promise)));
return f;
}

View File

@ -29,7 +29,6 @@
#include "boost/asio.hpp"
#include "flow/serialize.h"
#include "flow/IRandom.h"
#include "fdbrpc/crc32c.h"
enum class TaskPriority {
Max = 1000000,
@ -65,8 +64,6 @@ enum class TaskPriority {
DefaultOnMainThread = 7500,
DefaultDelay = 7010,
DefaultYield = 7000,
DiskWrite = 5030,
Storage = 5020,
DiskRead = 5010,
DefaultEndpoint = 5000,
UnknownEndpoint = 4000,
@ -74,6 +71,7 @@ enum class TaskPriority {
DataDistributionLaunch = 3530,
Ratekeeper = 3510,
DataDistribution = 3500,
DiskWrite = 3010,
UpdateStorage = 3000,
TLogSpilledPeekReply = 2800,
Low = 2000,