Merge branch 'master' into mengxu/performant-restore-PR
Fix conflict in TlogServer.actor.cpp by accepting master changes
This commit is contained in:
commit
1706aaf199
|
@ -48,6 +48,7 @@ Other Changes
|
||||||
-------------
|
-------------
|
||||||
|
|
||||||
* Trace files are now ordered lexicographically. This means that the filename format for trace files did change. `(PR #1828) <https://github.com/apple/foundationdb/pull/1828>`_.
|
* Trace files are now ordered lexicographically. This means that the filename format for trace files did change. `(PR #1828) <https://github.com/apple/foundationdb/pull/1828>`_.
|
||||||
|
* Improved ``TransactionMetrics`` log events by adding a random UID to distinguish multiple open connections, a flag to identify internal vs. client connections, and logging of rates and roughness in addition to total count for several metrics. `(PR #1808) <https://github.com/apple/foundationdb/pull/1808>`_.
|
||||||
* FoundationDB can now be built with clang and libc++ on Linux `(PR #1666) <https://github.com/apple/foundationdb/pull/1666>`_.
|
* FoundationDB can now be built with clang and libc++ on Linux `(PR #1666) <https://github.com/apple/foundationdb/pull/1666>`_.
|
||||||
* Added experimental framework to run C and Java clients in simulator `(PR #1678) <https://github.com/apple/foundationdb/pull/1678>`_.
|
* Added experimental framework to run C and Java clients in simulator `(PR #1678) <https://github.com/apple/foundationdb/pull/1678>`_.
|
||||||
* Added new network option for client buggify which will randomly throw expected exceptions in the client. Intended for client testing `(PR #1417) <https://github.com/apple/foundationdb/pull/1417>`_.
|
* Added new network option for client buggify which will randomly throw expected exceptions in the client. Intended for client testing `(PR #1417) <https://github.com/apple/foundationdb/pull/1417>`_.
|
||||||
|
|
|
@ -2171,13 +2171,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||||
|
|
||||||
ACTOR Future<bool> createSnapshot(Database db, StringRef snapCmd) {
|
ACTOR Future<bool> createSnapshot(Database db, StringRef snapCmd) {
|
||||||
try {
|
try {
|
||||||
UID snapUID = wait(makeInterruptable(mgmtSnapCreate(db, snapCmd, 2 /* version */)));
|
UID snapUID = wait(makeInterruptable(mgmtSnapCreate(db, snapCmd)));
|
||||||
int version = 2;
|
printf("Snapshots create succeeded with UID: %s\n", snapUID.toString().c_str());
|
||||||
if (version == 1) {
|
|
||||||
printf("Snapshots tagged with UID: %s, check logs for status\n", snapUID.toString().c_str());
|
|
||||||
} else {
|
|
||||||
printf("Snapshots create succeeded with UID: %s\n", snapUID.toString().c_str());
|
|
||||||
}
|
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
fprintf(stderr, "Snapshot create failed, %d (%s)."
|
fprintf(stderr, "Snapshot create failed, %d (%s)."
|
||||||
" Please cleanup any instance level snapshots created.\n", e.code(), e.what());
|
" Please cleanup any instance level snapshots created.\n", e.code(), e.what());
|
||||||
|
|
|
@ -44,8 +44,7 @@ static const char* typeString[] = { "SetValue",
|
||||||
"ByteMax",
|
"ByteMax",
|
||||||
"MinV2",
|
"MinV2",
|
||||||
"AndV2",
|
"AndV2",
|
||||||
"CompareAndClear",
|
"CompareAndClear"};
|
||||||
"Exec" };
|
|
||||||
|
|
||||||
struct MutationRef;
|
struct MutationRef;
|
||||||
std::string getHexString(StringRef input);
|
std::string getHexString(StringRef input);
|
||||||
|
@ -74,9 +73,6 @@ struct MutationRef {
|
||||||
MinV2,
|
MinV2,
|
||||||
AndV2,
|
AndV2,
|
||||||
CompareAndClear,
|
CompareAndClear,
|
||||||
// ExecOp is always set with FIRST_IN_BATCH option to quickly identify
|
|
||||||
// the op in a transaction batch while parsing it in TLog
|
|
||||||
Exec,
|
|
||||||
MAX_ATOMIC_OP
|
MAX_ATOMIC_OP
|
||||||
};
|
};
|
||||||
// This is stored this way for serialization purposes.
|
// This is stored this way for serialization purposes.
|
||||||
|
|
|
@ -1482,13 +1482,13 @@ ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vec
|
||||||
return inProgressExclusion;
|
return inProgressExclusion;
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd, int version) {
|
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd) {
|
||||||
state int retryCount = 0;
|
state int retryCount = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
state UID snapUID = deterministicRandom()->randomUniqueID();
|
state UID snapUID = deterministicRandom()->randomUniqueID();
|
||||||
try {
|
try {
|
||||||
wait(snapCreate(cx, snapCmd, snapUID, version));
|
wait(snapCreate(cx, snapCmd, snapUID));
|
||||||
TraceEvent("SnapCreateSucceeded").detail("snapUID", snapUID);
|
TraceEvent("SnapCreateSucceeded").detail("snapUID", snapUID);
|
||||||
return snapUID;
|
return snapUID;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
|
|
@ -195,7 +195,7 @@ bool schemaMatch( json_spirit::mValue const& schema, json_spirit::mValue const&
|
||||||
|
|
||||||
// execute payload in 'snapCmd' on all the coordinators, TLogs and
|
// execute payload in 'snapCmd' on all the coordinators, TLogs and
|
||||||
// storage nodes
|
// storage nodes
|
||||||
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd, int version);
|
ACTOR Future<UID> mgmtSnapCreate(Database cx, StringRef snapCmd);
|
||||||
|
|
||||||
#include "flow/unactorcompiler.h"
|
#include "flow/unactorcompiler.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -50,7 +50,6 @@ struct MasterProxyInterface {
|
||||||
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
|
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
|
||||||
RequestStream< struct TxnStateRequest > txnState;
|
RequestStream< struct TxnStateRequest > txnState;
|
||||||
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
|
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
|
||||||
RequestStream< struct ExecRequest > execReq;
|
|
||||||
RequestStream< struct ProxySnapRequest > proxySnapReq;
|
RequestStream< struct ProxySnapRequest > proxySnapReq;
|
||||||
|
|
||||||
UID id() const { return commit.getEndpoint().token; }
|
UID id() const { return commit.getEndpoint().token; }
|
||||||
|
@ -63,7 +62,7 @@ struct MasterProxyInterface {
|
||||||
void serialize(Archive& ar) {
|
void serialize(Archive& ar) {
|
||||||
serializer(ar, locality, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
|
serializer(ar, locality, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
|
||||||
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
|
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
|
||||||
txnState, getHealthMetrics, execReq, proxySnapReq);
|
txnState, getHealthMetrics, proxySnapReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initEndpoints() {
|
void initEndpoints() {
|
||||||
|
@ -333,23 +332,6 @@ struct GetHealthMetricsRequest
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ExecRequest
|
|
||||||
{
|
|
||||||
constexpr static FileIdentifier file_identifier = 22403900;
|
|
||||||
Arena arena;
|
|
||||||
StringRef execPayload;
|
|
||||||
ReplyPromise<Void> reply;
|
|
||||||
Optional<UID> debugID;
|
|
||||||
|
|
||||||
explicit ExecRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
|
|
||||||
explicit ExecRequest(StringRef exec, Optional<UID> debugID = Optional<UID>()) : execPayload(exec), debugID(debugID) {}
|
|
||||||
|
|
||||||
template <class Ar>
|
|
||||||
void serialize(Ar& ar) {
|
|
||||||
serializer(ar, execPayload, reply, arena, debugID);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct ProxySnapRequest
|
struct ProxySnapRequest
|
||||||
{
|
{
|
||||||
constexpr static FileIdentifier file_identifier = 22204900;
|
constexpr static FileIdentifier file_identifier = 22204900;
|
||||||
|
|
|
@ -2371,45 +2371,6 @@ void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationR
|
||||||
TEST(true); //NativeAPI atomic operation
|
TEST(true); //NativeAPI atomic operation
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> executeCoordinators(DatabaseContext* cx, StringRef execPayload, Optional<UID> debugID) {
|
|
||||||
try {
|
|
||||||
if (debugID.present()) {
|
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.executeCoordinators.Before");
|
|
||||||
}
|
|
||||||
|
|
||||||
state ExecRequest req(execPayload, debugID);
|
|
||||||
if (debugID.present()) {
|
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
|
|
||||||
"NativeAPI.executeCoordinators.Inside loop");
|
|
||||||
}
|
|
||||||
wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::execReq, req, cx->taskID));
|
|
||||||
if (debugID.present())
|
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
|
|
||||||
"NativeAPI.executeCoordinators.After");
|
|
||||||
return Void();
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("NativeAPI.executeCoordinatorsError").error(e);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void Transaction::execute(const KeyRef& cmdType, const ValueRef& cmdPayload) {
|
|
||||||
TraceEvent("Execute operation").detail("Key", cmdType.toString()).detail("Value", cmdPayload.toString());
|
|
||||||
|
|
||||||
if (cmdType.size() > CLIENT_KNOBS->KEY_SIZE_LIMIT) throw key_too_large();
|
|
||||||
if (cmdPayload.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT) throw value_too_large();
|
|
||||||
|
|
||||||
auto& req = tr;
|
|
||||||
|
|
||||||
// Helps with quickly finding the exec op in a tlog batch
|
|
||||||
setOption(FDBTransactionOptions::FIRST_IN_BATCH);
|
|
||||||
|
|
||||||
auto& t = req.transaction;
|
|
||||||
auto r = singleKeyRange(cmdType, req.arena);
|
|
||||||
auto v = ValueRef(req.arena, cmdPayload);
|
|
||||||
t.mutations.push_back(req.arena, MutationRef(MutationRef::Exec, r.begin, v));
|
|
||||||
}
|
|
||||||
|
|
||||||
void Transaction::clear( const KeyRangeRef& range, bool addConflictRange ) {
|
void Transaction::clear( const KeyRangeRef& range, bool addConflictRange ) {
|
||||||
auto &req = tr;
|
auto &req = tr;
|
||||||
auto &t = req.transaction;
|
auto &t = req.transaction;
|
||||||
|
@ -3440,105 +3401,6 @@ void enableClientInfoLogging() {
|
||||||
TraceEvent(SevInfo, "ClientInfoLoggingEnabled");
|
TraceEvent(SevInfo, "ClientInfoLoggingEnabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> snapCreateVersion1(Database inputCx, StringRef snapCmd, UID snapUID) {
|
|
||||||
state Transaction tr(inputCx);
|
|
||||||
state DatabaseContext* cx = inputCx.getPtr();
|
|
||||||
// remember the client ID before the snap operation
|
|
||||||
state UID preSnapClientUID = cx->clientInfo->get().id;
|
|
||||||
|
|
||||||
TraceEvent("SnapCreateEnter")
|
|
||||||
.detail("SnapCmd", snapCmd.toString())
|
|
||||||
.detail("UID", snapUID)
|
|
||||||
.detail("PreSnapClientUID", preSnapClientUID);
|
|
||||||
|
|
||||||
StringRef snapCmdArgs = snapCmd;
|
|
||||||
StringRef snapCmdPart = snapCmdArgs.eat(":");
|
|
||||||
Standalone<StringRef> snapUIDRef(snapUID.toString());
|
|
||||||
state Standalone<StringRef> snapPayloadRef = snapCmdPart
|
|
||||||
.withSuffix(LiteralStringRef(":uid="))
|
|
||||||
.withSuffix(snapUIDRef)
|
|
||||||
.withSuffix(LiteralStringRef(","))
|
|
||||||
.withSuffix(snapCmdArgs);
|
|
||||||
state Standalone<StringRef>
|
|
||||||
tLogCmdPayloadRef = LiteralStringRef("empty-binary:uid=").withSuffix(snapUIDRef);
|
|
||||||
// disable popping of TLog
|
|
||||||
tr.reset();
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
||||||
tr.execute(execDisableTLogPop, tLogCmdPayloadRef);
|
|
||||||
wait(timeoutError(tr.commit(), 10));
|
|
||||||
break;
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("DisableTLogPopFailed").error(e);
|
|
||||||
wait(tr.onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TraceEvent("SnapCreateAfterLockingTLogs").detail("UID", snapUID);
|
|
||||||
|
|
||||||
// snap the storage and Tlogs
|
|
||||||
// if we retry the below command in failure cases with the same snapUID
|
|
||||||
// then the snapCreate can end up creating multiple snapshots with
|
|
||||||
// the same name which needs additional handling, hence we fail in
|
|
||||||
// failure cases and let the caller retry with different snapUID
|
|
||||||
tr.reset();
|
|
||||||
try {
|
|
||||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
||||||
tr.execute(execSnap, snapPayloadRef);
|
|
||||||
wait(tr.commit());
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("SnapCreateErroSnapTLogStorage").error(e);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
TraceEvent("SnapCreateAfterSnappingTLogStorage").detail("UID", snapUID);
|
|
||||||
|
|
||||||
if (BUGGIFY) {
|
|
||||||
int32_t toDelay = deterministicRandom()->randomInt(1, 30);
|
|
||||||
wait(delay(toDelay));
|
|
||||||
}
|
|
||||||
|
|
||||||
// enable popping of the TLog
|
|
||||||
tr.reset();
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
||||||
tr.execute(execEnableTLogPop, tLogCmdPayloadRef);
|
|
||||||
wait(tr.commit());
|
|
||||||
break;
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("EnableTLogPopFailed").error(e);
|
|
||||||
wait(tr.onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TraceEvent("SnapCreateAfterUnlockingTLogs").detail("UID", snapUID);
|
|
||||||
|
|
||||||
// snap the coordinators
|
|
||||||
try {
|
|
||||||
Future<Void> exec = executeCoordinators(cx, snapPayloadRef, snapUID);
|
|
||||||
wait(timeoutError(exec, 5.0));
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("SnapCreateErrorSnapCoords").error(e);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
TraceEvent("SnapCreateAfterSnappingCoords").detail("UID", snapUID);
|
|
||||||
|
|
||||||
// if the client IDs did not change then we have a clean snapshot
|
|
||||||
UID postSnapClientUID = cx->clientInfo->get().id;
|
|
||||||
if (preSnapClientUID != postSnapClientUID) {
|
|
||||||
TraceEvent("UID mismatch")
|
|
||||||
.detail("SnapPreSnapClientUID", preSnapClientUID)
|
|
||||||
.detail("SnapPostSnapClientUID", postSnapClientUID);
|
|
||||||
throw coordinators_changed();
|
|
||||||
}
|
|
||||||
|
|
||||||
TraceEvent("SnapCreateComplete").detail("UID", snapUID);
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef snapPayload, UID snapUID, Optional<UID> debugID) {
|
ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef snapPayload, UID snapUID, Optional<UID> debugID) {
|
||||||
TraceEvent("NativeAPI.SnapshotDatabaseEnter")
|
TraceEvent("NativeAPI.SnapshotDatabaseEnter")
|
||||||
.detail("SnapPayload", snapPayload)
|
.detail("SnapPayload", snapPayload)
|
||||||
|
@ -3563,11 +3425,11 @@ ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef sna
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> snapCreateVersion2(Database cx, StringRef snapCmd, UID snapUID) {
|
ACTOR Future<Void> snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) {
|
||||||
// remember the client ID before the snap operation
|
// remember the client ID before the snap operation
|
||||||
state UID preSnapClientUID = cx->clientInfo->get().id;
|
state UID preSnapClientUID = cx->clientInfo->get().id;
|
||||||
|
|
||||||
TraceEvent("SnapCreateEnterVersion2")
|
TraceEvent("SnapCreateCoreEnter")
|
||||||
.detail("SnapCmd", snapCmd.toString())
|
.detail("SnapCmd", snapCmd.toString())
|
||||||
.detail("UID", snapUID)
|
.detail("UID", snapUID)
|
||||||
.detail("PreSnapClientUID", preSnapClientUID);
|
.detail("PreSnapClientUID", preSnapClientUID);
|
||||||
|
@ -3585,7 +3447,7 @@ ACTOR Future<Void> snapCreateVersion2(Database cx, StringRef snapCmd, UID snapUI
|
||||||
Future<Void> exec = snapshotDatabase(Reference<DatabaseContext>::addRef(cx.getPtr()), snapPayloadRef, snapUID, snapUID);
|
Future<Void> exec = snapshotDatabase(Reference<DatabaseContext>::addRef(cx.getPtr()), snapPayloadRef, snapUID, snapUID);
|
||||||
wait(exec);
|
wait(exec);
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
TraceEvent("SnapshotDatabaseErrorVersion2")
|
TraceEvent("SnapCreateCoreError")
|
||||||
.detail("SnapCmd", snapCmd.toString())
|
.detail("SnapCmd", snapCmd.toString())
|
||||||
.detail("UID", snapUID)
|
.detail("UID", snapUID)
|
||||||
.error(e);
|
.error(e);
|
||||||
|
@ -3595,27 +3457,23 @@ ACTOR Future<Void> snapCreateVersion2(Database cx, StringRef snapCmd, UID snapUI
|
||||||
UID postSnapClientUID = cx->clientInfo->get().id;
|
UID postSnapClientUID = cx->clientInfo->get().id;
|
||||||
if (preSnapClientUID != postSnapClientUID) {
|
if (preSnapClientUID != postSnapClientUID) {
|
||||||
// if the client IDs changed then we fail the snapshot
|
// if the client IDs changed then we fail the snapshot
|
||||||
TraceEvent("UIDMismatchVersion2")
|
TraceEvent("SnapCreateCoreUIDMismatch")
|
||||||
.detail("SnapPreSnapClientUID", preSnapClientUID)
|
.detail("SnapPreSnapClientUID", preSnapClientUID)
|
||||||
.detail("SnapPostSnapClientUID", postSnapClientUID);
|
.detail("SnapPostSnapClientUID", postSnapClientUID);
|
||||||
throw coordinators_changed();
|
throw coordinators_changed();
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("SnapCreateExitVersion2")
|
TraceEvent("SnapCreateCoreExit")
|
||||||
.detail("SnapCmd", snapCmd.toString())
|
.detail("SnapCmd", snapCmd.toString())
|
||||||
.detail("UID", snapUID)
|
.detail("UID", snapUID)
|
||||||
.detail("PreSnapClientUID", preSnapClientUID);
|
.detail("PreSnapClientUID", preSnapClientUID);
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID, int version) {
|
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID) {
|
||||||
if (version == 1) {
|
|
||||||
wait(snapCreateVersion1(cx, snapCmd, snapUID));
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
state int oldMode = wait( setDDMode( cx, 0 ) );
|
state int oldMode = wait( setDDMode( cx, 0 ) );
|
||||||
try {
|
try {
|
||||||
wait(snapCreateVersion2(cx, snapCmd, snapUID));
|
wait(snapCreateCore(cx, snapCmd, snapUID));
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
state Error err = e;
|
state Error err = e;
|
||||||
wait(success( setDDMode( cx, oldMode ) ));
|
wait(success( setDDMode( cx, oldMode ) ));
|
||||||
|
|
|
@ -268,14 +268,6 @@ public:
|
||||||
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
|
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
|
||||||
void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true );
|
void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true );
|
||||||
void atomicOp( const KeyRef& key, const ValueRef& value, MutationRef::Type operationType, bool addConflictRange = true );
|
void atomicOp( const KeyRef& key, const ValueRef& value, MutationRef::Type operationType, bool addConflictRange = true );
|
||||||
// execute operation is similar to set, but the command will reach
|
|
||||||
// one of the proxies, all the TLogs and all the storage nodes.
|
|
||||||
// instead of setting a key and value on the DB, it executes the command
|
|
||||||
// that is passed in the value field.
|
|
||||||
// - cmdType can be used for logging purposes
|
|
||||||
// - cmdPayload contains the details of the command to be executed:
|
|
||||||
// format of the cmdPayload : <binary-path>:<arg1=val1>,<arg2=val2>...
|
|
||||||
void execute(const KeyRef& cmdType, const ValueRef& cmdPayload);
|
|
||||||
void clear( const KeyRangeRef& range, bool addConflictRange = true );
|
void clear( const KeyRangeRef& range, bool addConflictRange = true );
|
||||||
void clear( const KeyRef& key, bool addConflictRange = true );
|
void clear( const KeyRef& key, bool addConflictRange = true );
|
||||||
Future<Void> commit(); // Throws not_committed or commit_unknown_result errors in normal operation
|
Future<Void> commit(); // Throws not_committed or commit_unknown_result errors in normal operation
|
||||||
|
@ -344,7 +336,7 @@ int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::num
|
||||||
|
|
||||||
// Takes a snapshot of the cluster, specifically the following persistent
|
// Takes a snapshot of the cluster, specifically the following persistent
|
||||||
// states: coordinator, TLog and storage state
|
// states: coordinator, TLog and storage state
|
||||||
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID, int version);
|
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID);
|
||||||
|
|
||||||
#include "flow/unactorcompiler.h"
|
#include "flow/unactorcompiler.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -2619,6 +2619,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
||||||
self.addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));
|
self.addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));
|
||||||
self.addActor.send( timeKeeper(&self) );
|
self.addActor.send( timeKeeper(&self) );
|
||||||
self.addActor.send( monitorProcessClasses(&self) );
|
self.addActor.send( monitorProcessClasses(&self) );
|
||||||
|
self.addActor.send( monitorServerInfoConfig(&self.db) );
|
||||||
self.addActor.send( monitorClientTxnInfoConfigs(&self.db) );
|
self.addActor.send( monitorClientTxnInfoConfigs(&self.db) );
|
||||||
self.addActor.send( updatedChangingDatacenters(&self) );
|
self.addActor.send( updatedChangingDatacenters(&self) );
|
||||||
self.addActor.send( updatedChangedDatacenters(&self) );
|
self.addActor.send( updatedChangedDatacenters(&self) );
|
||||||
|
|
|
@ -40,9 +40,6 @@ struct ConflictBatch {
|
||||||
TransactionConflict = 0,
|
TransactionConflict = 0,
|
||||||
TransactionTooOld,
|
TransactionTooOld,
|
||||||
TransactionCommitted,
|
TransactionCommitted,
|
||||||
TransactionNotPermitted,
|
|
||||||
TransactionNotFullyRecovered,
|
|
||||||
TransactionExecLogAntiQuorum,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
void addTransaction( const CommitTransactionRef& transaction );
|
void addTransaction( const CommitTransactionRef& transaction );
|
||||||
|
|
|
@ -153,11 +153,11 @@ ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> par
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int snapVersion) {
|
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role) {
|
||||||
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
||||||
state int err = 0;
|
state int err = 0;
|
||||||
state Future<int> cmdErr;
|
state Future<int> cmdErr;
|
||||||
state double maxWaitTime = (snapVersion == 2) ? SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT : 3.0;
|
state double maxWaitTime = SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
|
||||||
if (!g_network->isSimulated()) {
|
if (!g_network->isSimulated()) {
|
||||||
// get bin path
|
// get bin path
|
||||||
auto snapBin = execArg->getBinaryPath();
|
auto snapBin = execArg->getBinaryPath();
|
||||||
|
@ -183,13 +183,8 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, st
|
||||||
// copy the files
|
// copy the files
|
||||||
state std::string folderFrom = folder + "/.";
|
state std::string folderFrom = folder + "/.";
|
||||||
state std::string folderTo = folder + "-snap-" + uidStr.toString();
|
state std::string folderTo = folder + "-snap-" + uidStr.toString();
|
||||||
double maxSimDelayTime = 1.0;
|
double maxSimDelayTime = 10.0;
|
||||||
if (snapVersion == 1) {
|
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
|
||||||
folderTo = folder + "-snap-" + uidStr.toString();
|
|
||||||
} else {
|
|
||||||
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
|
|
||||||
maxSimDelayTime = 10.0;
|
|
||||||
}
|
|
||||||
std::vector<std::string> paramList;
|
std::vector<std::string> paramList;
|
||||||
std::string mkdirBin = "/bin/mkdir";
|
std::string mkdirBin = "/bin/mkdir";
|
||||||
paramList.push_back(folderTo);
|
paramList.push_back(folderTo);
|
||||||
|
|
|
@ -52,7 +52,7 @@ private: // data
|
||||||
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime);
|
ACTOR Future<int> spawnProcess(std::string binPath, std::vector<std::string> paramList, double maxWaitTime, bool isSync, double maxSimDelayTime);
|
||||||
|
|
||||||
// helper to run all the work related to running the exec command
|
// helper to run all the work related to running the exec command
|
||||||
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role, int version);
|
ACTOR Future<int> execHelper(ExecCmdValueString* execArg, std::string folder, std::string role);
|
||||||
|
|
||||||
// returns true if the execUID op is in progress
|
// returns true if the execUID op is in progress
|
||||||
bool isExecOpInProgress(UID execUID);
|
bool isExecOpInProgress(UID execUID);
|
||||||
|
|
|
@ -749,7 +749,7 @@ struct CompareFirst {
|
||||||
struct LogPushData : NonCopyable {
|
struct LogPushData : NonCopyable {
|
||||||
// Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version
|
// Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version
|
||||||
|
|
||||||
explicit LogPushData(Reference<ILogSystem> logSystem) : logSystem(logSystem), subsequence(1), hasExecOp(false) {
|
explicit LogPushData(Reference<ILogSystem> logSystem) : logSystem(logSystem), subsequence(1) {
|
||||||
for(auto& log : logSystem->getLogSystemConfig().tLogs) {
|
for(auto& log : logSystem->getLogSystemConfig().tLogs) {
|
||||||
if(log.isLocal) {
|
if(log.isLocal) {
|
||||||
for(int i = 0; i < log.tLogs.size(); i++) {
|
for(int i = 0; i < log.tLogs.size(); i++) {
|
||||||
|
@ -825,10 +825,6 @@ struct LogPushData : NonCopyable {
|
||||||
return messagesWriter[loc].toValue();
|
return messagesWriter[loc].toValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setHasExecOp() { hasExecOp = true; }
|
|
||||||
|
|
||||||
bool getHasExecOp() { return hasExecOp; }
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Reference<ILogSystem> logSystem;
|
Reference<ILogSystem> logSystem;
|
||||||
std::vector<Tag> next_message_tags;
|
std::vector<Tag> next_message_tags;
|
||||||
|
@ -836,7 +832,6 @@ private:
|
||||||
std::vector<BinaryWriter> messagesWriter;
|
std::vector<BinaryWriter> messagesWriter;
|
||||||
std::vector<int> msg_locations;
|
std::vector<int> msg_locations;
|
||||||
uint32_t subsequence;
|
uint32_t subsequence;
|
||||||
bool hasExecOp;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -769,96 +769,6 @@ ACTOR Future<Void> commitBatch(
|
||||||
toCommit.addTags(allSources);
|
toCommit.addTags(allSources);
|
||||||
}
|
}
|
||||||
toCommit.addTypedMessage(m);
|
toCommit.addTypedMessage(m);
|
||||||
} else if (m.type == MutationRef::Exec) {
|
|
||||||
state std::string param2 = m.param2.toString();
|
|
||||||
state ExecCmdValueString execArg(param2);
|
|
||||||
execArg.dbgPrint();
|
|
||||||
state StringRef binPath = execArg.getBinaryPath();
|
|
||||||
state StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
|
|
||||||
|
|
||||||
auto result =
|
|
||||||
self->txnStateStore->readValue(LiteralStringRef("log_anti_quorum").withPrefix(configKeysPrefix)).get();
|
|
||||||
state int logAntiQuorum = 0;
|
|
||||||
if (result.present()) {
|
|
||||||
logAntiQuorum = atoi(result.get().toString().c_str());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (m.param1 != execDisableTLogPop
|
|
||||||
&& m.param1 != execEnableTLogPop
|
|
||||||
&& !isWhitelisted(self->whitelistedBinPathVec, binPath)) {
|
|
||||||
TraceEvent("ExecTransactionNotPermitted")
|
|
||||||
.detail("TransactionNum", transactionNum);
|
|
||||||
committed[transactionNum] = ConflictBatch::TransactionNotPermitted;
|
|
||||||
} else if (self->db->get().recoveryState != RecoveryState::FULLY_RECOVERED) {
|
|
||||||
// Cluster is not fully recovered and needs TLogs
|
|
||||||
// from previous generation for full recovery.
|
|
||||||
// Currently, snapshot of old tlog generation is not
|
|
||||||
// supported and hence failing the snapshot request until
|
|
||||||
// cluster is fully_recovered.
|
|
||||||
TraceEvent("ExecTransactionNotFullyRecovered")
|
|
||||||
.detail("TransactionNum", transactionNum);
|
|
||||||
committed[transactionNum] = ConflictBatch::TransactionNotFullyRecovered;
|
|
||||||
} else if (logAntiQuorum > 0) {
|
|
||||||
// exec op is not supported when logAntiQuorum is configured
|
|
||||||
// FIXME: Add support for exec ops in the presence of log anti quorum
|
|
||||||
TraceEvent("ExecOpNotSupportedWithLogAntiQuorum")
|
|
||||||
.detail("LogAntiQuorum", logAntiQuorum)
|
|
||||||
.detail("TransactionNum", transactionNum);
|
|
||||||
committed[transactionNum] = ConflictBatch::TransactionExecLogAntiQuorum;
|
|
||||||
} else {
|
|
||||||
// Send the ExecOp to
|
|
||||||
// - all the storage nodes in a single region and
|
|
||||||
// - only to storage nodes in local region in multi-region setup
|
|
||||||
// step 1: get the DatabaseConfiguration
|
|
||||||
auto result =
|
|
||||||
self->txnStateStore->readValue(LiteralStringRef("usable_regions").withPrefix(configKeysPrefix)).get();
|
|
||||||
ASSERT(result.present());
|
|
||||||
state int usableRegions = atoi(result.get().toString().c_str());
|
|
||||||
|
|
||||||
// step 2: find the tag.id from locality info of the master
|
|
||||||
auto localityKey =
|
|
||||||
self->txnStateStore->readValue(tagLocalityListKeyFor(self->master.locality.dcId())).get();
|
|
||||||
|
|
||||||
int8_t locality = tagLocalityInvalid;
|
|
||||||
if (usableRegions > 1) {
|
|
||||||
if (!localityKey.present()) {
|
|
||||||
TraceEvent(SevError, "LocalityKeyNotPresentForMasterDCID");
|
|
||||||
ASSERT(localityKey.present());
|
|
||||||
}
|
|
||||||
locality = decodeTagLocalityListValue(localityKey.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
std::set<Tag> allSources;
|
|
||||||
auto& m = (*pMutations)[mutationNum];
|
|
||||||
if (debugMutation("ProxyCommit", commitVersion, m))
|
|
||||||
TraceEvent("ProxyCommitTo", self->dbgid)
|
|
||||||
.detail("To", "all sources")
|
|
||||||
.detail("Mutation", m.toString())
|
|
||||||
.detail("Version", commitVersion);
|
|
||||||
|
|
||||||
std::vector<Tag> localTags;
|
|
||||||
auto tagKeys = self->txnStateStore->readRange(serverTagKeys).get();
|
|
||||||
for( auto& kv : tagKeys ) {
|
|
||||||
Tag t = decodeServerTagValue( kv.value );
|
|
||||||
if ((usableRegions > 1 && t.locality == locality)
|
|
||||||
|| (usableRegions == 1)) {
|
|
||||||
localTags.push_back(t);
|
|
||||||
}
|
|
||||||
allSources.insert(localTags.begin(), localTags.end());
|
|
||||||
}
|
|
||||||
|
|
||||||
auto te1 = TraceEvent("ProxyCommitTo", self->dbgid);
|
|
||||||
te1.detail("To", "all sources");
|
|
||||||
te1.detail("UidStr", uidStr);
|
|
||||||
te1.detail("Mutation", m.toString());
|
|
||||||
te1.detail("Version", commitVersion);
|
|
||||||
te1.detail("NumTags", allSources.size());
|
|
||||||
for (auto& tag : allSources) {
|
|
||||||
toCommit.addTag(tag);
|
|
||||||
}
|
|
||||||
toCommit.addTypedMessage(m, true /* allLocations */);
|
|
||||||
toCommit.setHasExecOp();
|
|
||||||
}
|
|
||||||
} else
|
} else
|
||||||
UNREACHABLE();
|
UNREACHABLE();
|
||||||
|
|
||||||
|
@ -1078,15 +988,7 @@ ACTOR Future<Void> commitBatch(
|
||||||
else if (committed[t] == ConflictBatch::TransactionTooOld) {
|
else if (committed[t] == ConflictBatch::TransactionTooOld) {
|
||||||
trs[t].reply.sendError(transaction_too_old());
|
trs[t].reply.sendError(transaction_too_old());
|
||||||
}
|
}
|
||||||
else if (committed[t] == ConflictBatch::TransactionNotPermitted) {
|
else {
|
||||||
trs[t].reply.sendError(transaction_not_permitted());
|
|
||||||
}
|
|
||||||
else if (committed[t] == ConflictBatch::TransactionNotFullyRecovered) {
|
|
||||||
trs[t].reply.sendError(cluster_not_fully_recovered());
|
|
||||||
}
|
|
||||||
else if (committed[t] == ConflictBatch::TransactionExecLogAntiQuorum) {
|
|
||||||
trs[t].reply.sendError(txn_exec_log_anti_quorum());
|
|
||||||
} else {
|
|
||||||
trs[t].reply.sendError(not_committed());
|
trs[t].reply.sendError(not_committed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1736,63 +1638,6 @@ ACTOR Future<Void> masterProxyServerCore(
|
||||||
rep.version = commitData.committedVersion.get();
|
rep.version = commitData.committedVersion.get();
|
||||||
req.reply.send(rep);
|
req.reply.send(rep);
|
||||||
}
|
}
|
||||||
when(ExecRequest _execReq = waitNext(proxy.execReq.getFuture())) {
|
|
||||||
state ExecRequest execReq = _execReq;
|
|
||||||
if (execReq.debugID.present())
|
|
||||||
g_traceBatch.addEvent("TransactionDebug", execReq.debugID.get().first(),
|
|
||||||
"MasterProxyServer.masterProxyServerCore."
|
|
||||||
"ExecRequest");
|
|
||||||
|
|
||||||
TraceEvent("ExecRequest").detail("Payload", execReq.execPayload.toString());
|
|
||||||
|
|
||||||
// get the list of coordinators
|
|
||||||
state Optional<Value> coordinators = commitData.txnStateStore->readValue(coordinatorsKey).get();
|
|
||||||
state std::vector<NetworkAddress> coordinatorsAddr =
|
|
||||||
ClusterConnectionString(coordinators.get().toString()).coordinators();
|
|
||||||
state std::set<NetworkAddress> coordinatorsAddrSet;
|
|
||||||
for (int i = 0; i < coordinatorsAddr.size(); i++) {
|
|
||||||
TraceEvent(SevDebug, "CoordinatorAddress").detail("Addr", coordinatorsAddr[i]);
|
|
||||||
coordinatorsAddrSet.insert(coordinatorsAddr[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the list of workers
|
|
||||||
state std::vector<WorkerDetails> workers =
|
|
||||||
wait(commitData.db->get().clusterInterface.getWorkers.getReply(GetWorkersRequest()));
|
|
||||||
|
|
||||||
// send the exec command to the list of workers which are
|
|
||||||
// coordinators
|
|
||||||
state vector<Future<Void>> execCoords;
|
|
||||||
for (int i = 0; i < workers.size(); i++) {
|
|
||||||
NetworkAddress primary = workers[i].interf.address();
|
|
||||||
Optional<NetworkAddress> secondary = workers[i].interf.tLog.getEndpoint().addresses.secondaryAddress;
|
|
||||||
if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end()
|
|
||||||
|| (secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) {
|
|
||||||
TraceEvent("ExecReqToCoordinator")
|
|
||||||
.detail("PrimaryWorkerAddr", primary)
|
|
||||||
.detail("SecondaryWorkerAddr", secondary);
|
|
||||||
execCoords.push_back(brokenPromiseToNever(workers[i].interf.execReq.getReply(ExecuteRequest(execReq.execPayload))));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (execCoords.size() <= 0) {
|
|
||||||
TraceEvent(SevDebug, "CoordinatorWorkersNotFound");
|
|
||||||
execReq.reply.sendError(operation_failed());
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
wait(timeoutError(waitForAll(execCoords), 10.0));
|
|
||||||
int numSucc = 0;
|
|
||||||
for (auto item : execCoords) {
|
|
||||||
if (item.isValid() && item.isReady()) {
|
|
||||||
++numSucc;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bool succ = (numSucc >= ((execCoords.size() + 1) / 2));
|
|
||||||
succ ? execReq.reply.send(Void()) : execReq.reply.sendError(operation_failed());
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("WaitingForAllExecCoords").error(e);
|
|
||||||
execReq.reply.sendError(broken_promise());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) {
|
when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) {
|
||||||
addActor.send(proxySnapCreate(snapReq, &commitData));
|
addActor.send(proxySnapCreate(snapReq, &commitData));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1314,207 +1314,6 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void execProcessingHelper(TLogData* self,
|
|
||||||
Reference<LogData> logData,
|
|
||||||
TLogCommitRequest* req,
|
|
||||||
Standalone<VectorRef<Tag>>* execTags,
|
|
||||||
ExecCmdValueString* execArg,
|
|
||||||
StringRef* execCmd,
|
|
||||||
Version* execVersion,
|
|
||||||
vector<Future<Void>>* snapFailKeySetters,
|
|
||||||
vector<Future<Void>>* ignoredPops)
|
|
||||||
{
|
|
||||||
// inspect the messages to find if there is an Exec type and print
|
|
||||||
// it. message are prefixed by the length of the message and each
|
|
||||||
// field is prefixed by the length too
|
|
||||||
uint8_t type = MutationRef::MAX_ATOMIC_OP;
|
|
||||||
StringRef param2;
|
|
||||||
ArenaReader rd(req->arena, req->messages, Unversioned());
|
|
||||||
int32_t messageLength, rawLength;
|
|
||||||
uint16_t tagCount;
|
|
||||||
uint32_t sub;
|
|
||||||
while (!rd.empty()) {
|
|
||||||
Tag tmpTag;
|
|
||||||
bool hasTxsTag = false;
|
|
||||||
rd.checkpoint();
|
|
||||||
rd >> messageLength >> sub >> tagCount;
|
|
||||||
for (int i = 0; i < tagCount; i++) {
|
|
||||||
rd >> tmpTag;
|
|
||||||
if (tmpTag.locality == tagLocalityTxs || tmpTag == txsTag) {
|
|
||||||
hasTxsTag = true;
|
|
||||||
}
|
|
||||||
execTags->push_back(execTags->arena(), tmpTag);
|
|
||||||
}
|
|
||||||
if (!hasTxsTag) {
|
|
||||||
rd >> type;
|
|
||||||
if (type == MutationRef::Exec) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rawLength = messageLength + sizeof(messageLength);
|
|
||||||
rd.rewind();
|
|
||||||
rd.readBytes(rawLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t len = 0;
|
|
||||||
if (type == MutationRef::Exec) {
|
|
||||||
// get param1
|
|
||||||
rd >> len;
|
|
||||||
*execCmd = StringRef((uint8_t const*)rd.readBytes(len), len);
|
|
||||||
// get param2
|
|
||||||
rd >> len;
|
|
||||||
param2 = StringRef((uint8_t const*)rd.readBytes(len), len);
|
|
||||||
|
|
||||||
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid)
|
|
||||||
.detail("Value", execCmd->toString())
|
|
||||||
.detail("Version", req->version);
|
|
||||||
|
|
||||||
execArg->setCmdValueString(param2);
|
|
||||||
execArg->dbgPrint();
|
|
||||||
StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
|
||||||
if (!execCmd->startsWith(LiteralStringRef("\xff"))) {
|
|
||||||
*execVersion = req->version;
|
|
||||||
}
|
|
||||||
if (*execCmd == execSnap) {
|
|
||||||
// validation check specific to snap request
|
|
||||||
std::string reason;
|
|
||||||
if (!self->ignorePopRequest) {
|
|
||||||
*execVersion = invalidVersion;
|
|
||||||
reason = "SnapFailIgnorePopNotSet";
|
|
||||||
} else if (uidStr.toString() != self->ignorePopUid) {
|
|
||||||
*execVersion = invalidVersion;
|
|
||||||
reason = "SnapFailedDisableTLogUidMismatch";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*execVersion == invalidVersion) {
|
|
||||||
TraceEvent(SevWarn, "TLogSnapFailed")
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
|
||||||
.detail("Reason", reason)
|
|
||||||
.detail("Version", req->version);
|
|
||||||
|
|
||||||
TraceEvent("ExecCmdSnapCreate")
|
|
||||||
.detail("Uid", uidStr.toString())
|
|
||||||
.detail("Status", -1)
|
|
||||||
.detail("Tag", logData->allTags.begin()->toString())
|
|
||||||
.detail("Role", "TLog")
|
|
||||||
.detail("Version", req->version);
|
|
||||||
if (g_network->isSimulated()) {
|
|
||||||
// write SnapFailedTLog.$UID
|
|
||||||
Standalone<StringRef> keyStr = snapTestFailStatus.withSuffix(uidStr);
|
|
||||||
Standalone<StringRef> valStr = LiteralStringRef("Success");
|
|
||||||
TraceEvent(SevDebug, "TLogKeyStr").detail("Value", keyStr);
|
|
||||||
snapFailKeySetters->push_back(runRYWTransaction(self->cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void>
|
|
||||||
{ tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->set(keyStr, valStr); return Void(); }));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (*execCmd == execDisableTLogPop) {
|
|
||||||
self->ignorePopRequest = true;
|
|
||||||
if (self->ignorePopUid != "") {
|
|
||||||
TraceEvent(SevWarn, "TLogPopDisableonDisable")
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
self->ignorePopUid = uidStr.toString();
|
|
||||||
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
|
|
||||||
TraceEvent("TLogExecCmdPopDisable")
|
|
||||||
.detail("ExecCmd", execCmd->toString())
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
|
||||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
if (*execCmd == execEnableTLogPop) {
|
|
||||||
if (self->ignorePopUid != uidStr.toString()) {
|
|
||||||
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
|
|
||||||
TraceEvent("EnableTLogPlayAllIgnoredPops2");
|
|
||||||
// use toBePopped and issue all the pops
|
|
||||||
std::map<Tag, Version>::iterator it;
|
|
||||||
self->ignorePopRequest = false;
|
|
||||||
self->ignorePopDeadline = 0.0;
|
|
||||||
self->ignorePopUid = "";
|
|
||||||
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
|
|
||||||
TraceEvent("PlayIgnoredPop")
|
|
||||||
.detail("Tag", it->first.toString())
|
|
||||||
.detail("Version", it->second);
|
|
||||||
ignoredPops->push_back(tLogPopCore(self, it->first, it->second, logData));
|
|
||||||
}
|
|
||||||
self->toBePopped.clear();
|
|
||||||
TraceEvent("TLogExecCmdPopEnable")
|
|
||||||
.detail("ExecCmd", execCmd->toString())
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
|
||||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
ACTOR Future<Void> tLogSnapHelper(TLogData* self,
|
|
||||||
Reference<LogData> logData,
|
|
||||||
ExecCmdValueString* execArg,
|
|
||||||
Version version,
|
|
||||||
Version execVersion,
|
|
||||||
StringRef execCmd,
|
|
||||||
Standalone<VectorRef<Tag>> execTags)
|
|
||||||
{
|
|
||||||
state int err = 0;
|
|
||||||
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
|
||||||
state UID execUID = UID::fromString(uidStr.toString());
|
|
||||||
state bool otherRoleExeced = false;
|
|
||||||
// TLog is special, we need to snap at the execVersion.
|
|
||||||
// storage on the same node should not initiate a snap before TLog which will make
|
|
||||||
// the snap version at TLog unpredictable
|
|
||||||
ASSERT(!isExecOpInProgress(execUID));
|
|
||||||
if (!otherRoleExeced) {
|
|
||||||
setExecOpInProgress(execUID);
|
|
||||||
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
|
|
||||||
err = tmpErr;
|
|
||||||
clearExecOpInProgress(execUID);
|
|
||||||
}
|
|
||||||
TraceEvent("TLogCommitExecTraceTLog")
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("Status", err)
|
|
||||||
.detail("Tag", logData->allTags.begin()->toString())
|
|
||||||
.detail("OldTagSize", logData->allTags.size())
|
|
||||||
.detail("Role", "TLog");
|
|
||||||
|
|
||||||
// print the detailed status message
|
|
||||||
for (int i = 0; i < execTags.size(); i++) {
|
|
||||||
Version poppedTagVersion = -1;
|
|
||||||
auto tagv = logData->getTagData(execTags[i]);
|
|
||||||
if (!tagv) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
poppedTagVersion = tagv->popped;
|
|
||||||
|
|
||||||
TraceEvent te = TraceEvent(SevDebug, "TLogExecTraceDetailed");
|
|
||||||
te.detail("Uid", uidStr.toString());
|
|
||||||
te.detail("Status", err);
|
|
||||||
te.detail("Role", "TLog");
|
|
||||||
te.detail("ExecCmd", execCmd.toString());
|
|
||||||
te.detail("Param2", execArg->getCmdValueString().toString());
|
|
||||||
te.detail("Tag", tagv->tag.toString());
|
|
||||||
te.detail("Version", version);
|
|
||||||
te.detail("PoppedTagVersion", poppedTagVersion);
|
|
||||||
te.detail("PersistentDataVersion", logData->persistentDataVersion);
|
|
||||||
te.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion);
|
|
||||||
te.detail("QueueCommittedVersion", logData->queueCommittedVersion.get());
|
|
||||||
te.detail("IgnorePopUid", self->ignorePopUid);
|
|
||||||
}
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> tLogCommit(
|
ACTOR Future<Void> tLogCommit(
|
||||||
TLogData* self,
|
TLogData* self,
|
||||||
TLogCommitRequest req,
|
TLogCommitRequest req,
|
||||||
|
@ -1549,58 +1348,21 @@ ACTOR Future<Void> tLogCommit(
|
||||||
wait( delayJittered(.005, TaskPriority::TLogCommit) );
|
wait( delayJittered(.005, TaskPriority::TLogCommit) );
|
||||||
}
|
}
|
||||||
|
|
||||||
// while exec op is being committed, no new transactions will be admitted.
|
|
||||||
// This property is useful for snapshot kind of operations which wants to
|
|
||||||
// take a snap of the disk image at a particular version (no data from
|
|
||||||
// future version to be included)
|
|
||||||
// NOTE: execOpCommitInProgress will not be set for exec commands which
|
|
||||||
// start with \xff
|
|
||||||
state bool execOpLockTaken = false;
|
|
||||||
if (logData->execOpCommitInProgress) {
|
|
||||||
wait(logData->execOpLock.take());
|
|
||||||
execOpLockTaken = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(logData->stopped) {
|
if(logData->stopped) {
|
||||||
req.reply.sendError( tlog_stopped() );
|
req.reply.sendError( tlog_stopped() );
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
state Version execVersion = invalidVersion;
|
|
||||||
state ExecCmdValueString execArg;
|
|
||||||
state TLogQueueEntryRef qe;
|
|
||||||
state StringRef execCmd;
|
|
||||||
state Standalone<VectorRef<Tag>> execTags;
|
|
||||||
state vector<Future<Void>> snapFailKeySetters;
|
|
||||||
state vector<Future<Void>> playIgnoredPops;
|
|
||||||
|
|
||||||
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
|
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
|
||||||
if(req.debugID.present())
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
||||||
|
|
||||||
if (req.hasExecOp) {
|
|
||||||
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
|
|
||||||
if (execVersion != invalidVersion) {
|
|
||||||
TraceEvent(SevDebug, "SettingExecOpCommit")
|
|
||||||
.detail("LogId", logData->logId)
|
|
||||||
.detail("ExecVersion", execVersion)
|
|
||||||
.detail("Version", req.version);
|
|
||||||
logData->execOpCommitInProgress = true;
|
|
||||||
if (!execOpLockTaken) {
|
|
||||||
wait(logData->execOpLock.take());
|
|
||||||
execOpLockTaken = true;
|
|
||||||
} else {
|
|
||||||
ASSERT(logData->execOpLock.available() == 0);
|
|
||||||
}
|
|
||||||
ASSERT(execOpLockTaken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
|
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
|
||||||
commitMessages(self, logData, req.version, req.arena, req.messages);
|
commitMessages(self, logData, req.version, req.arena, req.messages);
|
||||||
|
|
||||||
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion);
|
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion);
|
||||||
|
|
||||||
|
TLogQueueEntryRef qe;
|
||||||
// Log the changes to the persistent queue, to be committed by commitQueue()
|
// Log the changes to the persistent queue, to be committed by commitQueue()
|
||||||
qe.version = req.version;
|
qe.version = req.version;
|
||||||
qe.knownCommittedVersion = logData->knownCommittedVersion;
|
qe.knownCommittedVersion = logData->knownCommittedVersion;
|
||||||
|
@ -1615,7 +1377,6 @@ ACTOR Future<Void> tLogCommit(
|
||||||
|
|
||||||
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
||||||
logData->version.set( req.version );
|
logData->version.set( req.version );
|
||||||
wait(waitForAll(playIgnoredPops));
|
|
||||||
|
|
||||||
if(req.debugID.present())
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
||||||
|
@ -1624,19 +1385,6 @@ ACTOR Future<Void> tLogCommit(
|
||||||
state Future<Void> stopped = logData->stopCommit.onTrigger();
|
state Future<Void> stopped = logData->stopCommit.onTrigger();
|
||||||
wait( timeoutWarning( logData->queueCommittedVersion.whenAtLeast( req.version ) || stopped, 0.1, warningCollectorInput ) );
|
wait( timeoutWarning( logData->queueCommittedVersion.whenAtLeast( req.version ) || stopped, 0.1, warningCollectorInput ) );
|
||||||
|
|
||||||
if ((execVersion != invalidVersion) && execVersion <= logData->queueCommittedVersion.get()) {
|
|
||||||
wait(tLogSnapHelper(self, logData, &execArg, qe.version, execVersion, execCmd, execTags));
|
|
||||||
}
|
|
||||||
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
|
|
||||||
ASSERT(execOpLockTaken);
|
|
||||||
logData->execOpCommitInProgress = false;
|
|
||||||
}
|
|
||||||
if (execOpLockTaken) {
|
|
||||||
logData->execOpLock.release();
|
|
||||||
execOpLockTaken = false;
|
|
||||||
}
|
|
||||||
execVersion = invalidVersion;
|
|
||||||
|
|
||||||
if(stopped.isReady()) {
|
if(stopped.isReady()) {
|
||||||
ASSERT(logData->stopped);
|
ASSERT(logData->stopped);
|
||||||
req.reply.sendError( tlog_stopped() );
|
req.reply.sendError( tlog_stopped() );
|
||||||
|
@ -1647,13 +1395,6 @@ ACTOR Future<Void> tLogCommit(
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
|
||||||
|
|
||||||
req.reply.send( logData->durableKnownCommittedVersion );
|
req.reply.send( logData->durableKnownCommittedVersion );
|
||||||
if (g_network->isSimulated()) {
|
|
||||||
if (snapFailKeySetters.size() > 0) {
|
|
||||||
TraceEvent(SevDebug, "SettingSnapFailKey");
|
|
||||||
wait(waitForAll(snapFailKeySetters));
|
|
||||||
TraceEvent(SevDebug, "SettingSnapFailKeyDone");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1805,7 +1546,7 @@ tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logDa
|
||||||
ExecCmdValueString snapArg(snapReq.snapPayload);
|
ExecCmdValueString snapArg(snapReq.snapPayload);
|
||||||
try {
|
try {
|
||||||
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
|
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
|
||||||
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
|
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString()));
|
||||||
|
|
||||||
std::string uidStr = snapReq.snapUID.toString();
|
std::string uidStr = snapReq.snapUID.toString();
|
||||||
TraceEvent("ExecTraceTLog")
|
TraceEvent("ExecTraceTLog")
|
||||||
|
@ -1824,7 +1565,7 @@ tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logDa
|
||||||
}
|
}
|
||||||
snapReq.reply.send(Void());
|
snapReq.reply.send(Void());
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
TraceEvent("TLogExecHelperError").error(e, true /*includeCancelled */);
|
TraceEvent("TLogSnapCreateError").error(e, true /*includeCancelled */);
|
||||||
if (e.code() != error_code_operation_cancelled) {
|
if (e.code() != error_code_operation_cancelled) {
|
||||||
snapReq.reply.sendError(e);
|
snapReq.reply.sendError(e);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -348,8 +348,9 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
|
||||||
state bool ret = false;
|
state bool ret = false;
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
if (g_simulator.storagePolicy.isValid() &&
|
if (!g_network->isSimulated() ||
|
||||||
g_simulator.storagePolicy->info().find("data_hall") != std::string::npos) {
|
(g_simulator.storagePolicy.isValid() &&
|
||||||
|
g_simulator.storagePolicy->info().find("data_hall") != std::string::npos)) {
|
||||||
// Do not test DD team number for data_hall modes
|
// Do not test DD team number for data_hall modes
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -427,6 +428,9 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
if(e.code() == error_code_actor_cancelled) {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
TraceEvent("QuietDatabaseFailure", dataDistributorWorker.id())
|
TraceEvent("QuietDatabaseFailure", dataDistributorWorker.id())
|
||||||
.detail("Reason", "Failed to extract GetTeamCollectionValid information");
|
.detail("Reason", "Failed to extract GetTeamCollectionValid information");
|
||||||
attempts++;
|
attempts++;
|
||||||
|
|
|
@ -225,14 +225,13 @@ struct TLogCommitRequest {
|
||||||
|
|
||||||
ReplyPromise<Version> reply;
|
ReplyPromise<Version> reply;
|
||||||
Optional<UID> debugID;
|
Optional<UID> debugID;
|
||||||
bool hasExecOp;
|
|
||||||
|
|
||||||
TLogCommitRequest() {}
|
TLogCommitRequest() {}
|
||||||
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, bool hasExecOp, Optional<UID> debugID )
|
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
|
||||||
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID), hasExecOp(hasExecOp){}
|
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize( Ar& ar ) {
|
void serialize( Ar& ar ) {
|
||||||
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID, hasExecOp);
|
serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1689,207 +1689,6 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void execProcessingHelper(TLogData* self,
|
|
||||||
Reference<LogData> logData,
|
|
||||||
TLogCommitRequest* req,
|
|
||||||
Standalone<VectorRef<Tag>>* execTags,
|
|
||||||
ExecCmdValueString* execArg,
|
|
||||||
StringRef* execCmd,
|
|
||||||
Version* execVersion,
|
|
||||||
vector<Future<Void>>* snapFailKeySetters,
|
|
||||||
vector<Future<Void>>* ignoredPops)
|
|
||||||
{
|
|
||||||
// inspect the messages to find if there is an Exec type and print
|
|
||||||
// it. message are prefixed by the length of the message and each
|
|
||||||
// field is prefixed by the length too
|
|
||||||
uint8_t type = MutationRef::MAX_ATOMIC_OP;
|
|
||||||
StringRef param2;
|
|
||||||
ArenaReader rd(req->arena, req->messages, Unversioned());
|
|
||||||
int32_t messageLength, rawLength;
|
|
||||||
uint16_t tagCount;
|
|
||||||
uint32_t sub;
|
|
||||||
while (!rd.empty()) {
|
|
||||||
Tag tmpTag;
|
|
||||||
bool hasTxsTag = false;
|
|
||||||
rd.checkpoint();
|
|
||||||
rd >> messageLength >> sub >> tagCount;
|
|
||||||
for (int i = 0; i < tagCount; i++) {
|
|
||||||
rd >> tmpTag;
|
|
||||||
if (tmpTag.locality == tagLocalityTxs || tmpTag == txsTag) {
|
|
||||||
hasTxsTag = true;
|
|
||||||
}
|
|
||||||
execTags->push_back(execTags->arena(), tmpTag);
|
|
||||||
}
|
|
||||||
if (!hasTxsTag) {
|
|
||||||
rd >> type;
|
|
||||||
if (type == MutationRef::Exec) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rawLength = messageLength + sizeof(messageLength);
|
|
||||||
rd.rewind();
|
|
||||||
rd.readBytes(rawLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t len = 0;
|
|
||||||
if (type == MutationRef::Exec) {
|
|
||||||
// get param1
|
|
||||||
rd >> len;
|
|
||||||
*execCmd = StringRef((uint8_t const*)rd.readBytes(len), len);
|
|
||||||
// get param2
|
|
||||||
rd >> len;
|
|
||||||
param2 = StringRef((uint8_t const*)rd.readBytes(len), len);
|
|
||||||
|
|
||||||
TraceEvent(SevDebug, "TLogExecCommandType", self->dbgid)
|
|
||||||
.detail("Value", execCmd->toString())
|
|
||||||
.detail("Version", req->version);
|
|
||||||
|
|
||||||
execArg->setCmdValueString(param2);
|
|
||||||
execArg->dbgPrint();
|
|
||||||
StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
|
||||||
if (!execCmd->startsWith(LiteralStringRef("\xff"))) {
|
|
||||||
*execVersion = req->version;
|
|
||||||
}
|
|
||||||
if (*execCmd == execSnap) {
|
|
||||||
// validation check specific to snap request
|
|
||||||
std::string reason;
|
|
||||||
if (!self->ignorePopRequest) {
|
|
||||||
*execVersion = invalidVersion;
|
|
||||||
reason = "SnapFailIgnorePopNotSet";
|
|
||||||
} else if (uidStr.toString() != self->ignorePopUid) {
|
|
||||||
*execVersion = invalidVersion;
|
|
||||||
reason = "SnapFailedDisableTLogUidMismatch";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*execVersion == invalidVersion) {
|
|
||||||
TraceEvent(SevWarn, "TLogSnapFailed")
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
|
||||||
.detail("Reason", reason)
|
|
||||||
.detail("Version", req->version);
|
|
||||||
|
|
||||||
TraceEvent("ExecCmdSnapCreate")
|
|
||||||
.detail("Uid", uidStr.toString())
|
|
||||||
.detail("Status", -1)
|
|
||||||
.detail("Tag", logData->allTags.begin()->toString())
|
|
||||||
.detail("Role", "TLog")
|
|
||||||
.detail("Version", req->version);
|
|
||||||
|
|
||||||
if (g_network->isSimulated()) {
|
|
||||||
// write SnapFailedTLog.$UID
|
|
||||||
Standalone<StringRef> keyStr = snapTestFailStatus.withSuffix(uidStr);
|
|
||||||
StringRef valStr = LiteralStringRef("Success");
|
|
||||||
TraceEvent(SevDebug, "TLogKeyStr").detail("Value", keyStr);
|
|
||||||
snapFailKeySetters->push_back(runRYWTransaction(self->cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void>
|
|
||||||
{ tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->set(keyStr, valStr); return Void(); }));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (*execCmd == execDisableTLogPop) {
|
|
||||||
self->ignorePopRequest = true;
|
|
||||||
if (self->ignorePopUid != "") {
|
|
||||||
TraceEvent(SevWarn, "TLogPopDisableonDisable")
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
self->ignorePopUid = uidStr.toString();
|
|
||||||
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
|
|
||||||
TraceEvent("TLogExecCmdPopDisable")
|
|
||||||
.detail("ExecCmd", execCmd->toString())
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
|
||||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
if (*execCmd == execEnableTLogPop) {
|
|
||||||
if (self->ignorePopUid != uidStr.toString()) {
|
|
||||||
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
|
|
||||||
TraceEvent("EnableTLogPlayAllIgnoredPops2");
|
|
||||||
// use toBePopped and issue all the pops
|
|
||||||
std::map<Tag, Version>::iterator it;
|
|
||||||
self->ignorePopRequest = false;
|
|
||||||
self->ignorePopDeadline = 0.0;
|
|
||||||
self->ignorePopUid = "";
|
|
||||||
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
|
|
||||||
TraceEvent("PlayIgnoredPop")
|
|
||||||
.detail("Tag", it->first.toString())
|
|
||||||
.detail("Version", it->second);
|
|
||||||
ignoredPops->push_back(tLogPopCore(self, it->first, it->second, logData));
|
|
||||||
}
|
|
||||||
self->toBePopped.clear();
|
|
||||||
TraceEvent("TLogExecCmdPopEnable")
|
|
||||||
.detail("ExecCmd", execCmd->toString())
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("IgnorePopUid", self->ignorePopUid)
|
|
||||||
.detail("IgnporePopRequest", self->ignorePopRequest)
|
|
||||||
.detail("IgnporePopDeadline", self->ignorePopDeadline)
|
|
||||||
.detail("Version", req->version);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> tLogSnapHelper(TLogData* self,
|
|
||||||
Reference<LogData> logData,
|
|
||||||
ExecCmdValueString* execArg,
|
|
||||||
Version version,
|
|
||||||
Version execVersion,
|
|
||||||
StringRef execCmd,
|
|
||||||
Standalone<VectorRef<Tag>> execTags)
|
|
||||||
{
|
|
||||||
state int err = 0;
|
|
||||||
state StringRef uidStr = execArg->getBinaryArgValue(LiteralStringRef("uid"));
|
|
||||||
state UID execUID = UID::fromString(uidStr.toString());
|
|
||||||
state bool otherRoleExeced = false;
|
|
||||||
// TLog is special, we need to snap at the execVersion.
|
|
||||||
// storage on the same node should not initiate a snap before TLog which will make
|
|
||||||
// the snap version at TLog unpredictable
|
|
||||||
ASSERT(!isExecOpInProgress(execUID));
|
|
||||||
if (!otherRoleExeced) {
|
|
||||||
setExecOpInProgress(execUID);
|
|
||||||
int tmpErr = wait(execHelper(execArg, self->dataFolder, "role=tlog", 1 /*version*/));
|
|
||||||
err = tmpErr;
|
|
||||||
clearExecOpInProgress(execUID);
|
|
||||||
}
|
|
||||||
TraceEvent("TLogCommitExecTraceTLog")
|
|
||||||
.detail("UidStr", uidStr.toString())
|
|
||||||
.detail("Status", err)
|
|
||||||
.detail("Tag", logData->allTags.begin()->toString())
|
|
||||||
.detail("OldTagSize", logData->allTags.size())
|
|
||||||
.detail("Role", "TLog");
|
|
||||||
|
|
||||||
// print the detailed status message
|
|
||||||
for (int i = 0; i < execTags.size(); i++) {
|
|
||||||
Version poppedTagVersion = -1;
|
|
||||||
auto tagv = logData->getTagData(execTags[i]);
|
|
||||||
if (!tagv) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
poppedTagVersion = tagv->popped;
|
|
||||||
|
|
||||||
TraceEvent te = TraceEvent(SevDebug, "TLogExecTraceDetailed");
|
|
||||||
te.detail("Uid", uidStr.toString());
|
|
||||||
te.detail("Status", err);
|
|
||||||
te.detail("Role", "TLog");
|
|
||||||
te.detail("ExecCmd", execCmd.toString());
|
|
||||||
te.detail("Param2", execArg->getCmdValueString().toString());
|
|
||||||
te.detail("Tag", tagv->tag.toString());
|
|
||||||
te.detail("Version", version);
|
|
||||||
te.detail("PoppedTagVersion", poppedTagVersion);
|
|
||||||
te.detail("PersistentDataVersion", logData->persistentDataVersion);
|
|
||||||
te.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion);
|
|
||||||
te.detail("QueueCommittedVersion", logData->queueCommittedVersion.get());
|
|
||||||
te.detail("IgnorePopUid", self->ignorePopUid);
|
|
||||||
}
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> tLogCommit(
|
ACTOR Future<Void> tLogCommit(
|
||||||
TLogData* self,
|
TLogData* self,
|
||||||
TLogCommitRequest req,
|
TLogCommitRequest req,
|
||||||
|
@ -1924,61 +1723,21 @@ ACTOR Future<Void> tLogCommit(
|
||||||
wait( delayJittered(.005, TaskPriority::TLogCommit) );
|
wait( delayJittered(.005, TaskPriority::TLogCommit) );
|
||||||
}
|
}
|
||||||
|
|
||||||
// while exec op is being committed, no new transactions will be admitted.
|
|
||||||
// This property is useful for snapshot kind of operations which wants to
|
|
||||||
// take a snap of the disk image at a particular version (not data from
|
|
||||||
// future version to be included)
|
|
||||||
// NOTE: execOpCommitInProgress will not be set for exec commands which
|
|
||||||
// start with \xff
|
|
||||||
state bool execOpLockTaken = false;
|
|
||||||
if (logData->execOpCommitInProgress) {
|
|
||||||
wait(logData->execOpLock.take());
|
|
||||||
execOpLockTaken = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(logData->stopped) {
|
if(logData->stopped) {
|
||||||
req.reply.sendError( tlog_stopped() );
|
req.reply.sendError( tlog_stopped() );
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
state Version execVersion = invalidVersion;
|
|
||||||
state ExecCmdValueString execArg;
|
|
||||||
state TLogQueueEntryRef qe;
|
|
||||||
state StringRef execCmd;
|
|
||||||
state Standalone<VectorRef<Tag>> execTags;
|
|
||||||
state vector<Future<Void>> playIgnoredPops;
|
|
||||||
state vector<Future<Void>> snapFailKeySetters;
|
|
||||||
|
|
||||||
// The logic of increasing logData->version must be atomic in a process, i.e, not including wait() or yield.
|
|
||||||
// Otherwise, the duplicate req (with the same preVersion) can be executed twice
|
|
||||||
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
|
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
|
||||||
if(req.debugID.present())
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
||||||
|
|
||||||
|
|
||||||
if (req.hasExecOp) {
|
|
||||||
execProcessingHelper(self, logData, &req, &execTags, &execArg, &execCmd, &execVersion, &snapFailKeySetters, &playIgnoredPops);
|
|
||||||
if (execVersion != invalidVersion) {
|
|
||||||
TraceEvent(SevDebug, "SettingExecOpCommit")
|
|
||||||
.detail("LogId", logData->logId)
|
|
||||||
.detail("ExecVersion", execVersion)
|
|
||||||
.detail("Version", req.version);
|
|
||||||
logData->execOpCommitInProgress = true;
|
|
||||||
if (!execOpLockTaken) {
|
|
||||||
wait(logData->execOpLock.take());
|
|
||||||
execOpLockTaken = true;
|
|
||||||
} else {
|
|
||||||
ASSERT(logData->execOpLock.available() == 0);
|
|
||||||
}
|
|
||||||
ASSERT(execOpLockTaken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
|
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
|
||||||
commitMessages(self, logData, req.version, req.arena, req.messages);
|
commitMessages(self, logData, req.version, req.arena, req.messages);
|
||||||
|
|
||||||
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion);
|
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion);
|
||||||
|
|
||||||
|
TLogQueueEntryRef qe;
|
||||||
// Log the changes to the persistent queue, to be committed by commitQueue()
|
// Log the changes to the persistent queue, to be committed by commitQueue()
|
||||||
qe.version = req.version;
|
qe.version = req.version;
|
||||||
qe.knownCommittedVersion = logData->knownCommittedVersion;
|
qe.knownCommittedVersion = logData->knownCommittedVersion;
|
||||||
|
@ -1993,7 +1752,6 @@ ACTOR Future<Void> tLogCommit(
|
||||||
|
|
||||||
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
||||||
logData->version.set( req.version );
|
logData->version.set( req.version );
|
||||||
wait(waitForAll(playIgnoredPops));
|
|
||||||
|
|
||||||
if(req.debugID.present())
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
||||||
|
@ -2002,20 +1760,6 @@ ACTOR Future<Void> tLogCommit(
|
||||||
state Future<Void> stopped = logData->stopCommit.onTrigger();
|
state Future<Void> stopped = logData->stopCommit.onTrigger();
|
||||||
wait( timeoutWarning( logData->queueCommittedVersion.whenAtLeast( req.version ) || stopped, 0.1, warningCollectorInput ) );
|
wait( timeoutWarning( logData->queueCommittedVersion.whenAtLeast( req.version ) || stopped, 0.1, warningCollectorInput ) );
|
||||||
|
|
||||||
if ((execVersion != invalidVersion) &&
|
|
||||||
execVersion <= logData->queueCommittedVersion.get()) {
|
|
||||||
wait(tLogSnapHelper(self, logData, &execArg, qe.version, execVersion, execCmd, execTags));
|
|
||||||
}
|
|
||||||
if (execVersion != invalidVersion && logData->execOpCommitInProgress) {
|
|
||||||
ASSERT(execOpLockTaken);
|
|
||||||
logData->execOpCommitInProgress = false;
|
|
||||||
}
|
|
||||||
if (execOpLockTaken) {
|
|
||||||
logData->execOpLock.release();
|
|
||||||
execOpLockTaken = false;
|
|
||||||
}
|
|
||||||
execVersion = invalidVersion;
|
|
||||||
|
|
||||||
if(stopped.isReady()) {
|
if(stopped.isReady()) {
|
||||||
ASSERT(logData->stopped);
|
ASSERT(logData->stopped);
|
||||||
req.reply.sendError( tlog_stopped() );
|
req.reply.sendError( tlog_stopped() );
|
||||||
|
@ -2026,13 +1770,6 @@ ACTOR Future<Void> tLogCommit(
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
|
||||||
|
|
||||||
req.reply.send( logData->durableKnownCommittedVersion );
|
req.reply.send( logData->durableKnownCommittedVersion );
|
||||||
if (g_network->isSimulated()) {
|
|
||||||
if (snapFailKeySetters.size() > 0) {
|
|
||||||
TraceEvent(SevDebug, "SettingSnapFailKey");
|
|
||||||
wait(waitForAll(snapFailKeySetters));
|
|
||||||
TraceEvent(SevDebug, "SettingSnapFailKeyDone");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2186,7 +1923,7 @@ tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logDa
|
||||||
ExecCmdValueString snapArg(snapReq.snapPayload);
|
ExecCmdValueString snapArg(snapReq.snapPayload);
|
||||||
try {
|
try {
|
||||||
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
|
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
|
||||||
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString(), 2 /* version */));
|
int err = wait(execHelper(&snapArg, self->dataFolder, role.toString()));
|
||||||
|
|
||||||
std::string uidStr = snapReq.snapUID.toString();
|
std::string uidStr = snapReq.snapUID.toString();
|
||||||
TraceEvent("ExecTraceTLog")
|
TraceEvent("ExecTraceTLog")
|
||||||
|
|
|
@ -436,7 +436,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||||
vector<Future<Void>> tLogCommitResults;
|
vector<Future<Void>> tLogCommitResults;
|
||||||
for(int loc=0; loc< it->logServers.size(); loc++) {
|
for(int loc=0; loc< it->logServers.size(); loc++) {
|
||||||
Standalone<StringRef> msg = data.getMessages(location);
|
Standalone<StringRef> msg = data.getMessages(location);
|
||||||
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, data.getHasExecOp(), debugID ), TaskPriority::TLogCommitReply ) );
|
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::TLogCommitReply ) );
|
||||||
Future<Void> commitSuccess = success(allReplies.back());
|
Future<Void> commitSuccess = success(allReplies.back());
|
||||||
addActor.get().send(commitSuccess);
|
addActor.get().send(commitSuccess);
|
||||||
tLogCommitResults.push_back(commitSuccess);
|
tLogCommitResults.push_back(commitSuccess);
|
||||||
|
|
|
@ -1723,7 +1723,6 @@ int main(int argc, char* argv[]) {
|
||||||
std::string absDataFolder = abspath(dataFolder);
|
std::string absDataFolder = abspath(dataFolder);
|
||||||
ini.LoadFile(joinPath(absDataFolder, "restartInfo.ini").c_str());
|
ini.LoadFile(joinPath(absDataFolder, "restartInfo.ini").c_str());
|
||||||
int backupFailed = true;
|
int backupFailed = true;
|
||||||
int backupVersion = 1;
|
|
||||||
const char* isRestoringStr = ini.GetValue("RESTORE", "isRestoring", NULL);
|
const char* isRestoringStr = ini.GetValue("RESTORE", "isRestoring", NULL);
|
||||||
if (isRestoringStr) {
|
if (isRestoringStr) {
|
||||||
isRestoring = atoi(isRestoringStr);
|
isRestoring = atoi(isRestoringStr);
|
||||||
|
@ -1731,142 +1730,77 @@ int main(int argc, char* argv[]) {
|
||||||
if (isRestoring && backupFailedStr) {
|
if (isRestoring && backupFailedStr) {
|
||||||
backupFailed = atoi(backupFailedStr);
|
backupFailed = atoi(backupFailedStr);
|
||||||
}
|
}
|
||||||
const char* backupVersionStr = ini.GetValue("RESTORE", "BackupVersion", NULL);
|
|
||||||
if (isRestoring && backupVersionStr) {
|
|
||||||
backupVersion = atoi(backupVersionStr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (isRestoring && !backupFailed) {
|
if (isRestoring && !backupFailed) {
|
||||||
if (backupVersion == 1) {
|
std::vector<std::string> returnList;
|
||||||
std::vector<std::string> returnList;
|
std::string ext = "";
|
||||||
std::string ext = "";
|
returnList = platform::listDirectories(absDataFolder);
|
||||||
returnList = platform::listDirectories(absDataFolder);
|
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
|
||||||
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
|
|
||||||
|
|
||||||
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
|
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
|
||||||
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
|
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
|
||||||
|
|
||||||
// delete all files (except fdb.cluster) in non-snap directories
|
// delete all files (except fdb.cluster) in non-snap directories
|
||||||
for (int i = 0; i < returnList.size(); i++) {
|
for (const auto & dirEntry : returnList) {
|
||||||
if (returnList[i] == "." || returnList[i] == "..") {
|
if (dirEntry == "." || dirEntry == "..") {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (returnList[i].find(snapStr) != std::string::npos) {
|
if (dirEntry.find(snapStr) != std::string::npos) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string childf = absDataFolder + "/" + returnList[i];
|
std::string childf = absDataFolder + "/" + dirEntry;
|
||||||
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
|
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
|
||||||
for (int j = 0; j < returnFiles.size(); j++) {
|
for (const auto & fileEntry : returnFiles) {
|
||||||
if (returnFiles[j] != "fdb.cluster" && returnFiles[j] != "fitness") {
|
if (fileEntry != "fdb.cluster" && fileEntry != "fitness") {
|
||||||
TraceEvent("DeletingNonSnapfiles")
|
TraceEvent("DeletingNonSnapfiles")
|
||||||
.detail("FileBeingDeleted", childf + "/" + returnFiles[j]);
|
.detail("FileBeingDeleted", childf + "/" + fileEntry);
|
||||||
deleteFile(childf + "/" + returnFiles[j]);
|
deleteFile(childf + "/" + fileEntry);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// move the contents from snap folder to the original folder,
|
}
|
||||||
// delete snap folders
|
// cleanup unwanted and partial directories
|
||||||
for (int i = 0; i < returnList.size(); i++) {
|
for (const auto & dirEntry : returnList) {
|
||||||
if (returnList[i] == "." || returnList[i] == "..") {
|
if (dirEntry == "." || dirEntry == "..") {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
std::string dirSrc = absDataFolder + "/" + returnList[i];
|
std::string dirSrc = absDataFolder + "/" + dirEntry;
|
||||||
// delete snap directories which are not part of restoreSnapUID
|
// delete snap directories which are not part of restoreSnapUID
|
||||||
if (returnList[i].find(snapStr) == std::string::npos) {
|
if (dirEntry.find(snapStr) == std::string::npos) {
|
||||||
if (returnList[i].find("snap") != std::string::npos) {
|
if (dirEntry.find("snap") != std::string::npos) {
|
||||||
platform::eraseDirectoryRecursive(dirSrc);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// remove empty/partial snap directories
|
|
||||||
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
|
|
||||||
if (childrenList.size() == 0) {
|
|
||||||
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
|
|
||||||
platform::eraseDirectoryRecursive(dirSrc);
|
platform::eraseDirectoryRecursive(dirSrc);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
std::string origDir = returnList[i].substr(0, 32);
|
continue;
|
||||||
std::string dirToRemove = absDataFolder + "/" + origDir;
|
|
||||||
TraceEvent("DeletingOriginalNonSnapDirectory").detail("FileBeingDeleted", dirToRemove);
|
|
||||||
platform::eraseDirectoryRecursive(dirToRemove);
|
|
||||||
renameFile(dirSrc, dirToRemove);
|
|
||||||
TraceEvent("RenamingSnapToOriginalDirectory")
|
|
||||||
.detail("Oldname", dirSrc)
|
|
||||||
.detail("Newname", dirToRemove);
|
|
||||||
}
|
}
|
||||||
} else if (backupVersion == 2) {
|
// remove empty/partial snap directories
|
||||||
std::vector<std::string> returnList;
|
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
|
||||||
std::string ext = "";
|
if (childrenList.size() == 0) {
|
||||||
returnList = platform::listDirectories(absDataFolder);
|
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
|
||||||
std::string snapStr = ini.GetValue("RESTORE", "RestoreSnapUID");
|
platform::eraseDirectoryRecursive(dirSrc);
|
||||||
|
continue;
|
||||||
TraceEvent("RestoringDataFolder").detail("DataFolder", absDataFolder);
|
|
||||||
TraceEvent("RestoreSnapUID").detail("UID", snapStr);
|
|
||||||
|
|
||||||
// delete all files (except fdb.cluster) in non-snap directories
|
|
||||||
for (const auto & dirEntry : returnList) {
|
|
||||||
if (dirEntry == "." || dirEntry == "..") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (dirEntry.find(snapStr) != std::string::npos) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string childf = absDataFolder + "/" + dirEntry;
|
|
||||||
std::vector<std::string> returnFiles = platform::listFiles(childf, ext);
|
|
||||||
for (const auto & fileEntry : returnFiles) {
|
|
||||||
if (fileEntry != "fdb.cluster" && fileEntry != "fitness") {
|
|
||||||
TraceEvent("DeletingNonSnapfiles")
|
|
||||||
.detail("FileBeingDeleted", childf + "/" + fileEntry);
|
|
||||||
deleteFile(childf + "/" + fileEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// cleanup unwanted and partial directories
|
}
|
||||||
for (const auto & dirEntry : returnList) {
|
// move snapshotted files to appropriate locations
|
||||||
if (dirEntry == "." || dirEntry == "..") {
|
for (const auto & dirEntry : returnList) {
|
||||||
continue;
|
if (dirEntry == "." || dirEntry == "..") {
|
||||||
}
|
continue;
|
||||||
std::string dirSrc = absDataFolder + "/" + dirEntry;
|
|
||||||
// delete snap directories which are not part of restoreSnapUID
|
|
||||||
if (dirEntry.find(snapStr) == std::string::npos) {
|
|
||||||
if (dirEntry.find("snap") != std::string::npos) {
|
|
||||||
platform::eraseDirectoryRecursive(dirSrc);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// remove empty/partial snap directories
|
|
||||||
std::vector<std::string> childrenList = platform::listFiles(dirSrc);
|
|
||||||
if (childrenList.size() == 0) {
|
|
||||||
TraceEvent("RemovingEmptySnapDirectory").detail("DirBeingDeleted", dirSrc);
|
|
||||||
platform::eraseDirectoryRecursive(dirSrc);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// move snapshotted files to appropriate locations
|
std::string dirSrc = absDataFolder + "/" + dirEntry;
|
||||||
for (const auto & dirEntry : returnList) {
|
std::string origDir = dirEntry.substr(0, 32);
|
||||||
if (dirEntry == "." || dirEntry == "..") {
|
std::string dirToMove = absDataFolder + "/" + origDir;
|
||||||
continue;
|
if ((dirEntry.find("snap") != std::string::npos) &&
|
||||||
}
|
(dirEntry.find("tlog") != std::string::npos)) {
|
||||||
std::string dirSrc = absDataFolder + "/" + dirEntry;
|
// restore tlog files
|
||||||
std::string origDir = dirEntry.substr(0, 32);
|
restoreRoleFilesHelper(dirSrc, dirToMove, "log");
|
||||||
std::string dirToMove = absDataFolder + "/" + origDir;
|
} else if ((dirEntry.find("snap") != std::string::npos) &&
|
||||||
if ((dirEntry.find("snap") != std::string::npos) &&
|
(dirEntry.find("storage") != std::string::npos)) {
|
||||||
(dirEntry.find("tlog") != std::string::npos)) {
|
// restore storage files
|
||||||
// restore tlog files
|
restoreRoleFilesHelper(dirSrc, dirToMove, "storage");
|
||||||
restoreRoleFilesHelper(dirSrc, dirToMove, "log");
|
} else if ((dirEntry.find("snap") != std::string::npos) &&
|
||||||
} else if ((dirEntry.find("snap") != std::string::npos) &&
|
(dirEntry.find("coord") != std::string::npos)) {
|
||||||
(dirEntry.find("storage") != std::string::npos)) {
|
// restore coordinator files
|
||||||
// restore storage files
|
restoreRoleFilesHelper(dirSrc, dirToMove, "coordination");
|
||||||
restoreRoleFilesHelper(dirSrc, dirToMove, "storage");
|
|
||||||
} else if ((dirEntry.find("snap") != std::string::npos) &&
|
|
||||||
(dirEntry.find("coord") != std::string::npos)) {
|
|
||||||
// restore coordinator files
|
|
||||||
restoreRoleFilesHelper(dirSrc, dirToMove, "coordination");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,30 +27,30 @@
|
||||||
#include "flow/SystemMonitor.h"
|
#include "flow/SystemMonitor.h"
|
||||||
#include "flow/Util.h"
|
#include "flow/Util.h"
|
||||||
#include "fdbclient/Atomic.h"
|
#include "fdbclient/Atomic.h"
|
||||||
|
#include "fdbclient/DatabaseContext.h"
|
||||||
#include "fdbclient/KeyRangeMap.h"
|
#include "fdbclient/KeyRangeMap.h"
|
||||||
#include "fdbclient/SystemData.h"
|
#include "fdbclient/MasterProxyInterface.h"
|
||||||
#include "fdbclient/NativeAPI.actor.h"
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
#include "fdbclient/Notified.h"
|
#include "fdbclient/Notified.h"
|
||||||
#include "fdbclient/StatusClient.h"
|
#include "fdbclient/StatusClient.h"
|
||||||
#include "fdbclient/MasterProxyInterface.h"
|
#include "fdbclient/SystemData.h"
|
||||||
#include "fdbclient/DatabaseContext.h"
|
|
||||||
#include "fdbserver/WorkerInterface.actor.h"
|
|
||||||
#include "fdbserver/TLogInterface.h"
|
|
||||||
#include "fdbserver/MoveKeys.actor.h"
|
|
||||||
#include "fdbserver/Knobs.h"
|
|
||||||
#include "fdbserver/WaitFailure.h"
|
|
||||||
#include "fdbserver/IKeyValueStore.h"
|
|
||||||
#include "fdbclient/VersionedMap.h"
|
#include "fdbclient/VersionedMap.h"
|
||||||
|
#include "fdbserver/FDBExecHelper.actor.h"
|
||||||
|
#include "fdbserver/IKeyValueStore.h"
|
||||||
|
#include "fdbserver/Knobs.h"
|
||||||
|
#include "fdbserver/LatencyBandConfig.h"
|
||||||
|
#include "fdbserver/LogProtocolMessage.h"
|
||||||
|
#include "fdbserver/LogSystem.h"
|
||||||
|
#include "fdbserver/MoveKeys.actor.h"
|
||||||
|
#include "fdbserver/RecoveryState.h"
|
||||||
#include "fdbserver/StorageMetrics.h"
|
#include "fdbserver/StorageMetrics.h"
|
||||||
#include "fdbrpc/sim_validation.h"
|
|
||||||
#include "fdbserver/ServerDBInfo.h"
|
#include "fdbserver/ServerDBInfo.h"
|
||||||
|
#include "fdbserver/TLogInterface.h"
|
||||||
|
#include "fdbserver/WaitFailure.h"
|
||||||
|
#include "fdbserver/WorkerInterface.actor.h"
|
||||||
|
#include "fdbrpc/sim_validation.h"
|
||||||
#include "fdbrpc/Smoother.h"
|
#include "fdbrpc/Smoother.h"
|
||||||
#include "flow/Stats.h"
|
#include "flow/Stats.h"
|
||||||
#include "fdbserver/LogSystem.h"
|
|
||||||
#include "fdbserver/RecoveryState.h"
|
|
||||||
#include "fdbserver/LogProtocolMessage.h"
|
|
||||||
#include "fdbserver/LatencyBandConfig.h"
|
|
||||||
#include "fdbserver/FDBExecHelper.actor.h"
|
|
||||||
#include "flow/TDMetric.actor.h"
|
#include "flow/TDMetric.actor.h"
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
@ -1907,12 +1907,9 @@ void addMutation( Reference<T>& target, Version version, MutationRef const& muta
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
void splitMutations(StorageServer* data, KeyRangeMap<T>& map, VerUpdateRef const& update, vector<int>& execIndex) {
|
void splitMutations(StorageServer* data, KeyRangeMap<T>& map, VerUpdateRef const& update) {
|
||||||
for(int i = 0; i < update.mutations.size(); i++) {
|
for(int i = 0; i < update.mutations.size(); i++) {
|
||||||
splitMutation(data, map, update.mutations[i], update.version);
|
splitMutation(data, map, update.mutations[i], update.version);
|
||||||
if (update.mutations[i].type == MutationRef::Exec) {
|
|
||||||
execIndex.push_back(i);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1931,53 +1928,10 @@ void splitMutation(StorageServer* data, KeyRangeMap<T>& map, MutationRef const&
|
||||||
addMutation( i->value(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end) );
|
addMutation( i->value(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end) );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (m.type == MutationRef::Exec) {
|
|
||||||
} else
|
} else
|
||||||
ASSERT(false); // Unknown mutation type in splitMutations
|
ASSERT(false); // Unknown mutation type in splitMutations
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void>
|
|
||||||
snapHelper(StorageServer* data, MutationRef m, Version ver)
|
|
||||||
{
|
|
||||||
state std::string cmd = m.param1.toString();
|
|
||||||
if ((cmd == execDisableTLogPop) || (cmd == execEnableTLogPop)) {
|
|
||||||
TraceEvent("IgnoreNonSnapCommands").detail("ExecCommand", cmd);
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
state ExecCmdValueString execArg(m.param2);
|
|
||||||
state StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
|
|
||||||
state int err = 0;
|
|
||||||
state Future<int> cmdErr;
|
|
||||||
state UID execUID = UID::fromString(uidStr.toString());
|
|
||||||
state bool skip = false;
|
|
||||||
if (cmd == execSnap && isTLogInSameNode()) {
|
|
||||||
skip = true;
|
|
||||||
}
|
|
||||||
// other storage has initiated the exec, so we can skip
|
|
||||||
if (!skip && isExecOpInProgress(execUID)) {
|
|
||||||
skip = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!skip) {
|
|
||||||
setExecOpInProgress(execUID);
|
|
||||||
int err = wait(execHelper(&execArg, data->folder, "role=storage", 1 /*version*/));
|
|
||||||
clearExecOpInProgress(execUID);
|
|
||||||
}
|
|
||||||
TraceEvent te = TraceEvent("ExecTraceStorage");
|
|
||||||
te.detail("Uid", uidStr.toString());
|
|
||||||
te.detail("Status", err);
|
|
||||||
te.detail("Role", "storage");
|
|
||||||
te.detail("Version", ver);
|
|
||||||
te.detail("Mutation", m.toString());
|
|
||||||
te.detail("Mid", data->thisServerID.toString());
|
|
||||||
te.detail("DurableVersion", data->durableVersion.get());
|
|
||||||
te.detail("DataVersion", data->version.get());
|
|
||||||
te.detail("Tag", data->tag.toString());
|
|
||||||
te.detail("SnapCreateSkipped", skip);
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
||||||
state TraceInterval interval("FetchKeys");
|
state TraceInterval interval("FetchKeys");
|
||||||
state KeyRange keys = shard->keys;
|
state KeyRange keys = shard->keys;
|
||||||
|
@ -2085,28 +2039,22 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
||||||
if (this_block.more) {
|
if (this_block.more) {
|
||||||
Key nfk = this_block.readThrough.present() ? this_block.readThrough.get() : keyAfter( this_block.end()[-1].key );
|
Key nfk = this_block.readThrough.present() ? this_block.readThrough.get() : keyAfter( this_block.end()[-1].key );
|
||||||
if (nfk != keys.end) {
|
if (nfk != keys.end) {
|
||||||
state std::deque< Standalone<VerUpdateRef> > updatesToSplit = std::move( shard->updates );
|
std::deque< Standalone<VerUpdateRef> > updatesToSplit = std::move( shard->updates );
|
||||||
|
|
||||||
// This actor finishes committing the keys [keys.begin,nfk) that we already fetched.
|
// This actor finishes committing the keys [keys.begin,nfk) that we already fetched.
|
||||||
// The remaining unfetched keys [nfk,keys.end) will become a separate AddingShard with its own fetchKeys.
|
// The remaining unfetched keys [nfk,keys.end) will become a separate AddingShard with its own fetchKeys.
|
||||||
shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
|
shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
|
||||||
shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
|
shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
|
||||||
shard = data->shards.rangeContaining( keys.begin ).value()->adding;
|
shard = data->shards.rangeContaining( keys.begin ).value()->adding;
|
||||||
state AddingShard* otherShard = data->shards.rangeContaining( nfk ).value()->adding;
|
AddingShard* otherShard = data->shards.rangeContaining( nfk ).value()->adding;
|
||||||
keys = shard->keys;
|
keys = shard->keys;
|
||||||
|
|
||||||
// Split our prior updates. The ones that apply to our new, restricted key range will go back into shard->updates,
|
// Split our prior updates. The ones that apply to our new, restricted key range will go back into shard->updates,
|
||||||
// and the ones delivered to the new shard will be discarded because it is in WaitPrevious phase (hasn't chosen a fetchVersion yet).
|
// and the ones delivered to the new shard will be discarded because it is in WaitPrevious phase (hasn't chosen a fetchVersion yet).
|
||||||
// What we are doing here is expensive and could get more expensive if we started having many more blocks per shard. May need optimization in the future.
|
// What we are doing here is expensive and could get more expensive if we started having many more blocks per shard. May need optimization in the future.
|
||||||
state vector<int> execIdxVec;
|
std::deque< Standalone<VerUpdateRef> >::iterator u = updatesToSplit.begin();
|
||||||
state std::deque< Standalone<VerUpdateRef> >::iterator u = updatesToSplit.begin();
|
|
||||||
for(; u != updatesToSplit.end(); ++u) {
|
for(; u != updatesToSplit.end(); ++u) {
|
||||||
ASSERT(execIdxVec.size() == 0);
|
splitMutations(data, data->shards, *u);
|
||||||
splitMutations(data, data->shards, *u, execIdxVec);
|
|
||||||
for (auto execIdx : execIdxVec) {
|
|
||||||
wait(snapHelper(data, u->mutations[execIdx], u->version));
|
|
||||||
}
|
|
||||||
execIdxVec.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST( true );
|
TEST( true );
|
||||||
|
@ -2299,8 +2247,7 @@ void ShardInfo::addMutation(Version version, MutationRef const& mutation) {
|
||||||
adding->addMutation(version, mutation);
|
adding->addMutation(version, mutation);
|
||||||
else if (readWrite)
|
else if (readWrite)
|
||||||
readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads);
|
readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads);
|
||||||
else if ((mutation.type != MutationRef::ClearRange)
|
else if (mutation.type != MutationRef::ClearRange) {
|
||||||
&& (mutation.type != MutationRef::Exec)) {
|
|
||||||
TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation.toString());
|
TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation.toString());
|
||||||
ASSERT(false); // Mutation delivered to notAssigned shard!
|
ASSERT(false); // Mutation delivered to notAssigned shard!
|
||||||
}
|
}
|
||||||
|
@ -2715,9 +2662,6 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
||||||
state VerUpdateRef* pUpdate = &fii.changes[changeNum];
|
state VerUpdateRef* pUpdate = &fii.changes[changeNum];
|
||||||
for(; mutationNum < pUpdate->mutations.size(); mutationNum++) {
|
for(; mutationNum < pUpdate->mutations.size(); mutationNum++) {
|
||||||
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
|
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
|
||||||
if (pUpdate->mutations[mutationNum].type == MutationRef::Exec) {
|
|
||||||
wait(snapHelper(data, pUpdate->mutations[mutationNum], pUpdate->version));
|
|
||||||
}
|
|
||||||
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
|
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
|
||||||
injectedChanges = true;
|
injectedChanges = true;
|
||||||
if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
|
if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
|
||||||
|
@ -2793,9 +2737,6 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
||||||
++data->counters.atomicMutations;
|
++data->counters.atomicMutations;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (msg.type == MutationRef::Exec) {
|
|
||||||
wait(snapHelper(data, msg, ver));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
|
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
|
||||||
|
|
|
@ -659,7 +659,7 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol
|
||||||
state ExecCmdValueString snapArg(snapReq.snapPayload);
|
state ExecCmdValueString snapArg(snapReq.snapPayload);
|
||||||
try {
|
try {
|
||||||
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
|
Standalone<StringRef> role = LiteralStringRef("role=").withSuffix(snapReq.role);
|
||||||
int err = wait(execHelper(&snapArg, snapFolder.toString(), role.toString(), 2 /* version */));
|
int err = wait(execHelper(&snapArg, snapFolder.toString(), role.toString()));
|
||||||
std::string uidStr = snapReq.snapUID.toString();
|
std::string uidStr = snapReq.snapUID.toString();
|
||||||
TraceEvent("ExecTraceWorker")
|
TraceEvent("ExecTraceWorker")
|
||||||
.detail("Uid", uidStr)
|
.detail("Uid", uidStr)
|
||||||
|
@ -1228,25 +1228,6 @@ ACTOR Future<Void> workerServer(
|
||||||
systemMonitor();
|
systemMonitor();
|
||||||
loggingTrigger = delay( loggingDelay, TaskPriority::FlushTrace );
|
loggingTrigger = delay( loggingDelay, TaskPriority::FlushTrace );
|
||||||
}
|
}
|
||||||
when(state ExecuteRequest req = waitNext(interf.execReq.getFuture())) {
|
|
||||||
state ExecCmdValueString execArg(req.execPayload);
|
|
||||||
try {
|
|
||||||
int err = wait(execHelper(&execArg, coordFolder, "role=coordinator", 1 /*version*/));
|
|
||||||
StringRef uidStr = execArg.getBinaryArgValue(LiteralStringRef("uid"));
|
|
||||||
auto tokenStr = "ExecTrace/Coordinators/" + uidStr.toString();
|
|
||||||
auto te = TraceEvent("ExecTraceCoordinators");
|
|
||||||
te.detail("Uid", uidStr.toString());
|
|
||||||
te.detail("Status", err);
|
|
||||||
te.detail("Role", "coordinator");
|
|
||||||
te.detail("Value", coordFolder);
|
|
||||||
te.detail("ExecPayload", execArg.getCmdValueString().toString());
|
|
||||||
te.trackLatest(tokenStr.c_str());
|
|
||||||
req.reply.send(Void());
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("ExecHelperError").error(e);
|
|
||||||
req.reply.sendError(broken_promise());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
|
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
|
||||||
Standalone<StringRef> snapFolder = StringRef(folder);
|
Standalone<StringRef> snapFolder = StringRef(folder);
|
||||||
if (snapReq.role.toString() == "coord") {
|
if (snapReq.role.toString() == "coord") {
|
||||||
|
|
|
@ -82,7 +82,6 @@ public: // variables
|
||||||
std::string restartInfoLocation; // file location to store the snap restore info
|
std::string restartInfoLocation; // file location to store the snap restore info
|
||||||
int maxRetryCntToRetrieveMessage; // number of retires to do trackLatest
|
int maxRetryCntToRetrieveMessage; // number of retires to do trackLatest
|
||||||
bool skipCheck; // disable check if the exec fails
|
bool skipCheck; // disable check if the exec fails
|
||||||
int snapVersion; // snapVersion to invoke
|
|
||||||
|
|
||||||
public: // ctor & dtor
|
public: // ctor & dtor
|
||||||
SnapTestWorkload(WorkloadContext const& wcx)
|
SnapTestWorkload(WorkloadContext const& wcx)
|
||||||
|
@ -98,7 +97,6 @@ public: // ctor & dtor
|
||||||
getOption(options, LiteralStringRef("restartInfoLocation"), LiteralStringRef("simfdb/restartInfo.ini"))
|
getOption(options, LiteralStringRef("restartInfoLocation"), LiteralStringRef("simfdb/restartInfo.ini"))
|
||||||
.toString();
|
.toString();
|
||||||
skipCheck = false;
|
skipCheck = false;
|
||||||
snapVersion = getOption(options, LiteralStringRef("version"), 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public: // workload functions
|
public: // workload functions
|
||||||
|
@ -153,34 +151,6 @@ public: // workload functions
|
||||||
|
|
||||||
void getMetrics(vector<PerfMetric>& m) override { TraceEvent("SnapTestWorkloadGetMetrics"); }
|
void getMetrics(vector<PerfMetric>& m) override { TraceEvent("SnapTestWorkloadGetMetrics"); }
|
||||||
|
|
||||||
ACTOR Future<Void> snapExecHelper(SnapTestWorkload* self, Database cx, StringRef keyRef, StringRef valueRef) {
|
|
||||||
state Transaction tr(cx);
|
|
||||||
state int retry = 0;
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr.execute(keyRef, valueRef);
|
|
||||||
wait(tr.commit());
|
|
||||||
break;
|
|
||||||
} catch (Error& e) {
|
|
||||||
++retry;
|
|
||||||
if (e.code() == error_code_txn_exec_log_anti_quorum) {
|
|
||||||
self->skipCheck = true;
|
|
||||||
break;
|
|
||||||
|
|
||||||
}
|
|
||||||
if (e.code() == error_code_cluster_not_fully_recovered) {
|
|
||||||
TraceEvent(SevWarnAlways, "ClusterNotFullyRecovered")
|
|
||||||
.error(e);
|
|
||||||
if (retry > 10) {
|
|
||||||
self->skipCheck = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> _create_keys(Database cx, std::string prefix, bool even = true) {
|
ACTOR Future<Void> _create_keys(Database cx, std::string prefix, bool even = true) {
|
||||||
state Transaction tr(cx);
|
state Transaction tr(cx);
|
||||||
state vector<int64_t> keys;
|
state vector<int64_t> keys;
|
||||||
|
@ -237,7 +207,7 @@ public: // workload functions
|
||||||
self->snapUID = deterministicRandom()->randomUniqueID();
|
self->snapUID = deterministicRandom()->randomUniqueID();
|
||||||
try {
|
try {
|
||||||
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create.sh");
|
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create.sh");
|
||||||
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID, self->snapVersion);
|
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID);
|
||||||
wait(status);
|
wait(status);
|
||||||
break;
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
@ -245,20 +215,11 @@ public: // workload functions
|
||||||
snapFailed = true;
|
snapFailed = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (e.code() == error_code_cluster_not_fully_recovered) {
|
++retry;
|
||||||
++retry;
|
// snap v2 can fail for many reasons, so retry for 5 times and then fail it
|
||||||
if (retry > 10) {
|
if (retry > 5) {
|
||||||
snapFailed = true;
|
snapFailed = true;
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
}
|
|
||||||
if (self->snapVersion == 2) {
|
|
||||||
++retry;
|
|
||||||
// snap v2 can fail for many reasons, so retry for 5 times and then fail it
|
|
||||||
if (retry > 5) {
|
|
||||||
snapFailed = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -268,7 +229,6 @@ public: // workload functions
|
||||||
std::string uidStr = self->snapUID.toString();
|
std::string uidStr = self->snapUID.toString();
|
||||||
ini.SetValue("RESTORE", "RestoreSnapUID", uidStr.c_str());
|
ini.SetValue("RESTORE", "RestoreSnapUID", uidStr.c_str());
|
||||||
ini.SetValue("RESTORE", "BackupFailed", format("%d", snapFailed).c_str());
|
ini.SetValue("RESTORE", "BackupFailed", format("%d", snapFailed).c_str());
|
||||||
ini.SetValue("RESTORE", "BackupVersion", format("%d", self->snapVersion).c_str());
|
|
||||||
ini.SaveFile(self->restartInfoLocation.c_str());
|
ini.SaveFile(self->restartInfoLocation.c_str());
|
||||||
// write the snapUID to a file
|
// write the snapUID to a file
|
||||||
TraceEvent("SnapshotCreateStatus").detail("Status", !snapFailed ? "Success" : "Failure");
|
TraceEvent("SnapshotCreateStatus").detail("Status", !snapFailed ? "Success" : "Failure");
|
||||||
|
@ -326,32 +286,6 @@ public: // workload functions
|
||||||
throw operation_failed();
|
throw operation_failed();
|
||||||
}
|
}
|
||||||
} else if (self->testID == 4) {
|
} else if (self->testID == 4) {
|
||||||
// description: if disable of a TLog pop was not followed by a
|
|
||||||
// corresponding enable, then TLog will automatically enable the
|
|
||||||
// popping of TLogs. this test case validates that we auto
|
|
||||||
// enable the popping of TLogs
|
|
||||||
state Standalone<StringRef> payLoadRef = LiteralStringRef("empty-binary:uid=a36b2ca0e8dab0452ac3e12b6b926f4b");
|
|
||||||
wait(self->snapExecHelper(self, cx, execDisableTLogPop, payLoadRef));
|
|
||||||
} else if (self->testID == 5) {
|
|
||||||
// snapshot create without disabling pop of the TLog
|
|
||||||
StringRef uidStr = LiteralStringRef("d78b08d47f341158e9a54d4baaf4a4dd");
|
|
||||||
self->snapUID = UID::fromString(uidStr.toString());
|
|
||||||
state Standalone<StringRef> snapPayload = LiteralStringRef("/bin/"
|
|
||||||
"snap_create.sh:uid=").withSuffix(uidStr);
|
|
||||||
wait(self->snapExecHelper(self, cx, execSnap, snapPayload));
|
|
||||||
} else if (self->testID == 6) {
|
|
||||||
// disable popping of TLog and snapshot create with mis-matching
|
|
||||||
payLoadRef = LiteralStringRef("empty-binary:uid=f49d27ddf7a28b6549d930743e0ebdbe");
|
|
||||||
wait(self->snapExecHelper(self, cx, execDisableTLogPop, payLoadRef));
|
|
||||||
if (self->skipCheck) {
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
StringRef uidStr = LiteralStringRef("ba61e9612a561d60bd83ad83e1b63568");
|
|
||||||
self->snapUID = UID::fromString(uidStr.toString());
|
|
||||||
snapPayload = LiteralStringRef("/bin/snap_create.sh:uid=").withSuffix(uidStr);
|
|
||||||
wait(self->snapExecHelper(self, cx, execSnap, snapPayload));
|
|
||||||
} else if (self->testID == 7) {
|
|
||||||
// create a snapshot with a non whitelisted binary path and operation
|
// create a snapshot with a non whitelisted binary path and operation
|
||||||
// should fail
|
// should fail
|
||||||
state bool testedFailure = false;
|
state bool testedFailure = false;
|
||||||
|
@ -360,7 +294,7 @@ public: // workload functions
|
||||||
self->snapUID = deterministicRandom()->randomUniqueID();
|
self->snapUID = deterministicRandom()->randomUniqueID();
|
||||||
try {
|
try {
|
||||||
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create1.sh");
|
StringRef snapCmdRef = LiteralStringRef("/bin/snap_create1.sh");
|
||||||
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID, self->snapVersion);
|
Future<Void> status = snapCreate(cx, snapCmdRef, self->snapUID);
|
||||||
wait(status);
|
wait(status);
|
||||||
break;
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
|
|
@ -123,7 +123,9 @@ class Void {
|
||||||
public:
|
public:
|
||||||
constexpr static FileIdentifier file_identifier = 2010442;
|
constexpr static FileIdentifier file_identifier = 2010442;
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar&) {}
|
void serialize(Ar& ar) {
|
||||||
|
serializer(ar);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
class Never {};
|
class Never {};
|
||||||
|
|
|
@ -114,7 +114,6 @@ add_fdb_test(TEST_FILES fast/RandomUnitTests.txt)
|
||||||
add_fdb_test(TEST_FILES fast/SelectorCorrectness.txt)
|
add_fdb_test(TEST_FILES fast/SelectorCorrectness.txt)
|
||||||
add_fdb_test(TEST_FILES fast/Sideband.txt)
|
add_fdb_test(TEST_FILES fast/Sideband.txt)
|
||||||
add_fdb_test(TEST_FILES fast/SidebandWithStatus.txt)
|
add_fdb_test(TEST_FILES fast/SidebandWithStatus.txt)
|
||||||
add_fdb_test(TEST_FILES fast/SnapTestFailAndDisablePop.txt)
|
|
||||||
add_fdb_test(TEST_FILES fast/SwizzledRollbackSideband.txt)
|
add_fdb_test(TEST_FILES fast/SwizzledRollbackSideband.txt)
|
||||||
add_fdb_test(TEST_FILES fast/SystemRebootTestCycle.txt)
|
add_fdb_test(TEST_FILES fast/SystemRebootTestCycle.txt)
|
||||||
add_fdb_test(TEST_FILES fast/TaskBucketCorrectness.txt)
|
add_fdb_test(TEST_FILES fast/TaskBucketCorrectness.txt)
|
||||||
|
|
|
@ -1,28 +0,0 @@
|
||||||
; verify that the TLog popping disable times out and switches to enable mode
|
|
||||||
; automatically, if not enabled specifically
|
|
||||||
testTitle=SnapTLogPopDisableTimeout
|
|
||||||
testName=SnapTest
|
|
||||||
numSnaps=1
|
|
||||||
maxSnapDelay=3.0
|
|
||||||
testID=4
|
|
||||||
|
|
||||||
; snapCreate without TLogPopDisable
|
|
||||||
testTitle=SnapCreateWithNoDisablePop
|
|
||||||
testName=SnapTest
|
|
||||||
numSnaps=1
|
|
||||||
maxSnapDelay=3.0
|
|
||||||
testID=5
|
|
||||||
|
|
||||||
; snapCreate and tlogPopDisable with mis-matched UID
|
|
||||||
testTitle=SnapCreateDisableTLogPopMismatch
|
|
||||||
testName=SnapTest
|
|
||||||
numSnaps=1
|
|
||||||
maxSnapDelay=3.0
|
|
||||||
testID=6
|
|
||||||
|
|
||||||
; snapCreate with binary path that is not whitelisted
|
|
||||||
testTitle=SnapCreateNotWhitelistedBinaryPath
|
|
||||||
testName=SnapTest
|
|
||||||
numSnaps=1
|
|
||||||
maxSnapDelay=3.0
|
|
||||||
testID=7
|
|
|
@ -12,7 +12,6 @@ testTitle=SnapCyclePre
|
||||||
maxSnapDelay=10.0
|
maxSnapDelay=10.0
|
||||||
testID=1
|
testID=1
|
||||||
clearAfterTest=false
|
clearAfterTest=false
|
||||||
version=2
|
|
||||||
|
|
||||||
testTitle=SnapCycleShutdown
|
testTitle=SnapCycleShutdown
|
||||||
;save and shutdown
|
;save and shutdown
|
||||||
|
|
|
@ -25,7 +25,6 @@ testTitle=SnapTestTakeSnap
|
||||||
maxSnapDelay=10.0
|
maxSnapDelay=10.0
|
||||||
testID=1
|
testID=1
|
||||||
clearAfterTest=false
|
clearAfterTest=false
|
||||||
version=2
|
|
||||||
|
|
||||||
testName=Attrition
|
testName=Attrition
|
||||||
testDuration=10.0
|
testDuration=10.0
|
||||||
|
|
|
@ -25,7 +25,6 @@ testTitle=SnapTestTakeSnap
|
||||||
maxSnapDelay=10.0
|
maxSnapDelay=10.0
|
||||||
testID=1
|
testID=1
|
||||||
clearAfterTest=false
|
clearAfterTest=false
|
||||||
version=2
|
|
||||||
|
|
||||||
testTitle=SnapTestPost
|
testTitle=SnapTestPost
|
||||||
;write 1000 Keys ending with odd numbers
|
;write 1000 Keys ending with odd numbers
|
||||||
|
|
|
@ -13,7 +13,6 @@ testTitle=SnapSimpleTakeSnap
|
||||||
maxSnapDelay=5.0
|
maxSnapDelay=5.0
|
||||||
testID=1
|
testID=1
|
||||||
clearAfterTest=false
|
clearAfterTest=false
|
||||||
version=2
|
|
||||||
|
|
||||||
;write 1000 Keys ending with odd number
|
;write 1000 Keys ending with odd number
|
||||||
testTitle=SnapSimplePost
|
testTitle=SnapSimplePost
|
||||||
|
@ -23,7 +22,14 @@ testTitle=SnapSimplePost
|
||||||
testID=2
|
testID=2
|
||||||
clearAfterTest=false
|
clearAfterTest=false
|
||||||
|
|
||||||
; save and shutdown
|
;snapCreate with binary path that is not whitelisted
|
||||||
|
testTitle=SnapCreateNotWhitelistedBinaryPath
|
||||||
|
testName=SnapTest
|
||||||
|
numSnaps=1
|
||||||
|
maxSnapDelay=3.0
|
||||||
|
testID=4
|
||||||
|
|
||||||
|
;save and shutdown
|
||||||
testTitle=SnapSimpleShutdown
|
testTitle=SnapSimpleShutdown
|
||||||
testName=SaveAndKill
|
testName=SaveAndKill
|
||||||
restartInfoLocation=simfdb/restartInfo.ini
|
restartInfoLocation=simfdb/restartInfo.ini
|
||||||
|
|
Loading…
Reference in New Issue