Merge remote-tracking branch 'origin/main' into features/validate-trace-events-in-simulation

This commit is contained in:
Markus Pilman 2022-04-25 08:39:56 -06:00
commit cbe4a873d2
57 changed files with 984 additions and 529 deletions

View File

@ -44,8 +44,6 @@
#include "fdbclient/Tuple.h"
#include "flow/config.h"
#include "flow/DeterministicRandom.h"
#include "flow/IRandom.h"
#include "fdb_api.hpp"
@ -2023,17 +2021,15 @@ TEST_CASE("fdb_transaction_add_conflict_range") {
TEST_CASE("special-key-space valid transaction ID") {
auto value = get_value("\xff\xff/tracing/transaction_id", /* snapshot */ false, {});
REQUIRE(value.has_value());
UID transaction_id = UID::fromString(value.value());
CHECK(transaction_id.first() > 0);
CHECK(transaction_id.second() > 0);
uint64_t transaction_id = std::stoul(value.value());
CHECK(transaction_id > 0);
}
TEST_CASE("special-key-space custom transaction ID") {
fdb::Transaction tr(db);
fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0));
while (1) {
UID randomTransactionID = UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64());
tr.set("\xff\xff/tracing/transaction_id", randomTransactionID.toString());
tr.set("\xff\xff/tracing/transaction_id", std::to_string(ULONG_MAX));
fdb::ValueFuture f1 = tr.get("\xff\xff/tracing/transaction_id",
/* snapshot */ false);
@ -2050,8 +2046,8 @@ TEST_CASE("special-key-space custom transaction ID") {
fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen));
REQUIRE(out_present);
UID transaction_id = UID::fromString(val);
CHECK(transaction_id == randomTransactionID);
uint64_t transaction_id = std::stoul(std::string(val, vallen));
CHECK(transaction_id == ULONG_MAX);
break;
}
}
@ -2078,9 +2074,8 @@ TEST_CASE("special-key-space set transaction ID after write") {
fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen));
REQUIRE(out_present);
UID transaction_id = UID::fromString(val);
CHECK(transaction_id.first() > 0);
CHECK(transaction_id.second() > 0);
uint64_t transaction_id = std::stoul(std::string(val, vallen));
CHECK(transaction_id != 0);
break;
}
}
@ -2145,9 +2140,7 @@ TEST_CASE("special-key-space tracing get range") {
CHECK(out_count == 2);
CHECK(std::string((char*)out_kv[1].key, out_kv[1].key_length) == tracingBegin + "transaction_id");
UID transaction_id = UID::fromString(std::string((char*)out_kv[1].value));
CHECK(transaction_id.first() > 0);
CHECK(transaction_id.second() > 0);
CHECK(std::stoul(std::string((char*)out_kv[1].value, out_kv[1].value_length)) > 0);
break;
}
}

View File

@ -162,7 +162,7 @@ struct CommitTransactionRequest : TimedRequest {
bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
Arena arena;
SpanContext spanContext;
SpanID spanContext;
CommitTransactionRef transaction;
ReplyPromise<CommitID> reply;
uint32_t flags;
@ -172,8 +172,8 @@ struct CommitTransactionRequest : TimedRequest {
TenantInfo tenantInfo;
CommitTransactionRequest() : CommitTransactionRequest(SpanContext()) {}
CommitTransactionRequest(SpanContext const& context) : spanContext(context), flags(0) {}
CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {}
CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {}
template <class Ar>
void serialize(Ar& ar) {
@ -242,7 +242,7 @@ struct GetReadVersionRequest : TimedRequest {
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
};
SpanContext spanContext;
SpanID spanContext;
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;
@ -255,7 +255,7 @@ struct GetReadVersionRequest : TimedRequest {
Version maxVersion; // max version in the client's version vector cache
GetReadVersionRequest() : transactionCount(1), flags(0), maxVersion(invalidVersion) {}
GetReadVersionRequest(SpanContext spanContext,
GetReadVersionRequest(SpanID spanContext,
uint32_t transactionCount,
TransactionPriority priority,
Version maxVersion,
@ -325,7 +325,7 @@ struct GetKeyServerLocationsReply {
struct GetKeyServerLocationsRequest {
constexpr static FileIdentifier file_identifier = 9144680;
Arena arena;
SpanContext spanContext;
SpanID spanContext;
Optional<TenantNameRef> tenant;
KeyRef begin;
Optional<KeyRef> end;
@ -340,7 +340,7 @@ struct GetKeyServerLocationsRequest {
Version minTenantVersion;
GetKeyServerLocationsRequest() : limit(0), reverse(false), minTenantVersion(latestVersion) {}
GetKeyServerLocationsRequest(SpanContext spanContext,
GetKeyServerLocationsRequest(SpanID spanContext,
Optional<TenantNameRef> const& tenant,
KeyRef const& begin,
Optional<KeyRef> const& end,
@ -378,12 +378,12 @@ struct GetRawCommittedVersionReply {
struct GetRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 12954034;
SpanContext spanContext;
SpanID spanContext;
Optional<UID> debugID;
ReplyPromise<GetRawCommittedVersionReply> reply;
Version maxVersion; // max version in the grv proxy's version vector cache
explicit GetRawCommittedVersionRequest(SpanContext spanContext,
explicit GetRawCommittedVersionRequest(SpanID spanContext,
Optional<UID> const& debugID = Optional<UID>(),
Version maxVersion = invalidVersion)
: spanContext(spanContext), debugID(debugID), maxVersion(maxVersion) {}

View File

@ -24,7 +24,6 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
#include "flow/Tracing.h"
// The versioned message has wire format : -1, version, messages
static const int32_t VERSION_HEADER = -1;
@ -78,7 +77,6 @@ struct MutationRef {
AndV2,
CompareAndClear,
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
Reserved_For_OTELSpanContextMessage,
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes.
@ -192,7 +190,7 @@ struct CommitTransactionRef {
Version read_snapshot = 0;
bool report_conflicting_keys = false;
bool lock_aware = false; // set when metadata mutations are present
Optional<SpanContext> spanContext;
Optional<SpanID> spanContext;
template <class Ar>
force_inline void serialize(Ar& ar) {

View File

@ -141,7 +141,7 @@ struct WatchParameters : public ReferenceCounted<WatchParameters> {
const Version version;
const TagSet tags;
const SpanContext spanContext;
const SpanID spanID;
const TaskPriority taskID;
const Optional<UID> debugID;
const UseProvisionalProxies useProvisionalProxies;
@ -151,11 +151,11 @@ struct WatchParameters : public ReferenceCounted<WatchParameters> {
Optional<Value> value,
Version version,
TagSet tags,
SpanContext spanContext,
SpanID spanID,
TaskPriority taskID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies)
: tenant(tenant), key(key), value(value), version(version), tags(tags), spanContext(spanContext), taskID(taskID),
: tenant(tenant), key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID),
debugID(debugID), useProvisionalProxies(useProvisionalProxies) {}
};
@ -416,12 +416,12 @@ public:
Optional<TenantName> defaultTenant;
struct VersionRequest {
SpanContext spanContext;
SpanID spanContext;
Promise<GetReadVersionReply> reply;
TagSet tags;
Optional<UID> debugID;
VersionRequest(SpanContext spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), tags(tags), debugID(debugID) {}
};

View File

@ -29,10 +29,30 @@
#include <unordered_set>
#include <boost/functional/hash.hpp>
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ProtocolVersion.h"
#include "flow/flow.h"
enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) {
return static_cast<TraceFlags>(static_cast<std::underlying_type_t<TraceFlags>>(lhs) &
static_cast<std::underlying_type_t<TraceFlags>>(rhs));
}
struct SpanContext {
UID traceID;
uint64_t spanID;
TraceFlags m_Flags;
SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {}
SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {}
SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {}
SpanContext(Arena arena, const SpanContext& span)
: traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {}
bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; }
};
typedef int64_t Version;
typedef uint64_t LogEpoch;
typedef uint64_t Sequence;

View File

@ -27,7 +27,6 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "flow/Tracing.h"
#include "flow/ThreadHelper.actor.h"
struct VersionVector;
@ -97,11 +96,11 @@ public:
virtual ThreadFuture<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
// @todo This API and the "getSpanContext()" API may help with debugging simulation
// @todo This API and the "getSpanID()" API may help with debugging simulation
// test failures. (These APIs are not currently invoked anywhere.) Remove them
// later if they are not really needed.
virtual VersionVector getVersionVector() = 0;
virtual SpanContext getSpanContext() = 0;
virtual UID getSpanID() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -45,7 +45,7 @@ public:
// Not implemented:
void setVersion(Version) override { throw client_invalid_operation(); }
VersionVector getVersionVector() const override { throw client_invalid_operation(); }
SpanContext getSpanContext() const override { throw client_invalid_operation(); }
UID getSpanID() const override { throw client_invalid_operation(); }
Future<Key> getKey(KeySelector const& key, Snapshot snapshot = Snapshot::False) override {
throw client_invalid_operation();
}

View File

@ -95,7 +95,7 @@ public:
virtual Future<Void> commit() = 0;
virtual Version getCommittedVersion() const = 0;
virtual VersionVector getVersionVector() const = 0;
virtual SpanContext getSpanContext() const = 0;
virtual UID getSpanID() const = 0;
virtual int64_t getApproximateSize() const = 0;
virtual Future<Standalone<StringRef>> getVersionstamp() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -1111,13 +1111,13 @@ VersionVector MultiVersionTransaction::getVersionVector() {
return VersionVector();
}
SpanContext MultiVersionTransaction::getSpanContext() {
UID MultiVersionTransaction::getSpanID() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getSpanContext();
return tr.transaction->getSpanID();
}
return SpanContext();
return UID();
}
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {

View File

@ -378,7 +378,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override { return SpanContext(); };
UID getSpanID() override { return UID(); };
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
@ -567,7 +567,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override;
UID getSpanID() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;

View File

@ -21,7 +21,6 @@
#include "fdbclient/NativeAPI.actor.h"
#include <algorithm>
#include <cstdio>
#include <iterator>
#include <regex>
#include <unordered_set>
@ -849,9 +848,7 @@ ACTOR Future<Void> assertFailure(GrvProxyInterface remote, Future<ErrorOr<GetRea
Future<Void> attemptGRVFromOldProxies(std::vector<GrvProxyInterface> oldProxies,
std::vector<GrvProxyInterface> newProxies) {
auto debugID = nondeterministicRandom()->randomUniqueID();
g_traceBatch.addEvent("AttemptGRVFromOldProxyDebug", debugID.first(), "NativeAPI.attemptGRVFromOldProxies.Start");
Span span("VerifyCausalReadRisky"_loc);
Span span(deterministicRandom()->randomUniqueID(), "VerifyCausalReadRisky"_loc);
std::vector<Future<Void>> replies;
replies.reserve(oldProxies.size());
GetReadVersionRequest req(
@ -2792,13 +2789,13 @@ void updateTagMappings(Database cx, const GetKeyServerLocationsReply& reply) {
ACTOR Future<KeyRangeLocationInfo> getKeyLocation_internal(Database cx,
Optional<TenantName> tenant,
Key key,
SpanContext spanContext,
SpanID spanID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Reverse isBackward,
Version version) {
state Span span("NAPI:getKeyLocation"_loc, spanContext);
state Span span("NAPI:getKeyLocation"_loc, spanID);
if (isBackward) {
ASSERT(key != allKeys.begin && key <= allKeys.end);
} else {
@ -2886,7 +2883,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
Optional<TenantName> const& tenant,
Key const& key,
F StorageServerInterface::*member,
SpanContext spanContext,
SpanID spanID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Reverse isBackward,
@ -2894,8 +2891,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
// we first check whether this range is cached
Optional<KeyRangeLocationInfo> locationInfo = cx->getCachedLocation(tenant, key, isBackward);
if (!locationInfo.present()) {
return getKeyLocation_internal(
cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version);
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version);
}
bool onlyEndpointFailedAndNeedRefresh = false;
@ -2909,8 +2905,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
cx->invalidateCache(locationInfo.get().tenantEntry.prefix, key);
// Refresh the cache with a new getKeyLocations made to proxies.
return getKeyLocation_internal(
cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version);
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version);
}
return locationInfo.get();
@ -2927,7 +2922,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Reference<TransactionState> trState,
useTenant ? trState->tenant() : Optional<TenantName>(),
key,
member,
trState->spanContext,
trState->spanID,
trState->debugID,
trState->useProvisionalProxies,
isBackward,
@ -2949,11 +2944,11 @@ ACTOR Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations_internal(
KeyRange keys,
int limit,
Reverse reverse,
SpanContext spanContext,
SpanID spanID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Version version) {
state Span span("NAPI:getKeyRangeLocations"_loc, spanContext);
state Span span("NAPI:getKeyRangeLocations"_loc, spanID);
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before");
@ -3023,7 +3018,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
int limit,
Reverse reverse,
F StorageServerInterface::*member,
SpanContext const& spanContext,
SpanID const& spanID,
Optional<UID> const& debugID,
UseProvisionalProxies useProvisionalProxies,
Version version) {
@ -3033,7 +3028,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
std::vector<KeyRangeLocationInfo> locations;
if (!cx->getCachedLocations(tenant, keys, locations, limit, reverse)) {
return getKeyRangeLocations_internal(
cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version);
cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version);
}
bool foundFailed = false;
@ -3054,7 +3049,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
if (foundFailed) {
// Refresh the cache with a new getKeyRangeLocations made to proxies.
return getKeyRangeLocations_internal(
cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version);
cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version);
}
return locations;
@ -3074,7 +3069,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Reference<Transac
limit,
reverse,
member,
trState->spanContext,
trState->spanID,
trState->debugID,
trState->useProvisionalProxies,
version);
@ -3103,7 +3098,7 @@ ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange
keys,
CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT,
Reverse::False,
trState->spanContext,
trState->spanID,
trState->debugID,
trState->useProvisionalProxies,
version));
@ -3134,35 +3129,38 @@ ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange
return Void();
}
SpanContext generateSpanID(bool transactionTracingSample, SpanContext parentContext = SpanContext()) {
SpanID generateSpanID(bool transactionTracingSample, SpanID parentContext = SpanID()) {
uint64_t txnId = deterministicRandom()->randomUInt64();
if (parentContext.isValid()) {
return SpanContext(parentContext.traceID, deterministicRandom()->randomUInt64(), parentContext.m_Flags);
if (parentContext.first() > 0) {
txnId = parentContext.first();
}
uint64_t tokenId = parentContext.second() > 0 ? deterministicRandom()->randomUInt64() : 0;
return SpanID(txnId, tokenId);
} else if (transactionTracingSample) {
uint64_t tokenId = deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE
? deterministicRandom()->randomUInt64()
: 0;
return SpanID(txnId, tokenId);
} else {
return SpanID(txnId, 0);
}
if (transactionTracingSample) {
return SpanContext(deterministicRandom()->randomUniqueID(),
deterministicRandom()->randomUInt64(),
deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE
? TraceFlags::sampled
: TraceFlags::unsampled);
}
return SpanContext(
deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64(), TraceFlags::unsampled);
}
TransactionState::TransactionState(Database cx,
Optional<TenantName> tenant,
TaskPriority taskID,
SpanContext spanContext,
SpanID spanID,
Reference<TransactionLogInfo> trLogInfo)
: cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanContext(spanContext),
readVersionObtainedFromGrvProxy(true), tenant_(tenant), tenantSet(tenant.present()) {}
: cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID), readVersionObtainedFromGrvProxy(true),
tenant_(tenant), tenantSet(tenant.present()) {}
Reference<TransactionState> TransactionState::cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo,
bool generateNewSpan) const {
SpanContext newSpanContext = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanContext;
SpanID newSpanID = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanID;
Reference<TransactionState> newState =
makeReference<TransactionState>(cx, tenant_, cx->taskID, newSpanContext, newTrLogInfo);
makeReference<TransactionState>(cx, tenant_, cx->taskID, newSpanID, newTrLogInfo);
if (!cx->apiVersionAtLeast(16)) {
newState->options = options;
@ -3220,12 +3218,12 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
UseTenant useTenant,
TransactionRecordLogInfo recordLogInfo) {
state Version ver = wait(version);
state Span span("NAPI:getValue"_loc, trState->spanContext);
state Span span("NAPI:getValue"_loc, trState->spanID);
if (useTenant && trState->tenant().present()) {
span.addAttribute("tenant"_sr, trState->tenant().get());
span.addTag("tenant"_sr, trState->tenant().get());
}
span.addAttribute("key"_sr, key);
span.addTag("key"_sr, key);
trState->cx->validateVersion(ver);
loop {
@ -3351,7 +3349,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
wait(success(version));
state Optional<UID> getKeyID = Optional<UID>();
state Span span("NAPI:getKey"_loc, trState->spanContext);
state Span span("NAPI:getKey"_loc, trState->spanID);
if (trState->debugID.present()) {
getKeyID = nondeterministicRandom()->randomUniqueID();
@ -3450,8 +3448,8 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
}
}
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanContext spanContext) {
state Span span("NAPI:waitForCommittedVersion"_loc, spanContext);
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext) {
state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext });
try {
loop {
choose {
@ -3485,14 +3483,14 @@ ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, Span
}
ACTOR Future<Version> getRawVersion(Reference<TransactionState> trState) {
state Span span("NAPI:getRawVersion"_loc, { trState->spanContext });
state Span span("NAPI:getRawVersion"_loc, { trState->spanID });
loop {
choose {
when(wait(trState->cx->onProxiesChanged())) {}
when(GetReadVersionReply v =
wait(basicLoadBalance(trState->cx->getGrvProxies(UseProvisionalProxies::False),
&GrvProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(trState->spanContext,
GetReadVersionRequest(trState->spanID,
0,
TransactionPriority::IMMEDIATE,
trState->cx->ssVersionVectorCache.getMaxVersion()),
@ -3514,7 +3512,7 @@ ACTOR Future<Void> readVersionBatcher(
uint32_t flags);
ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> parameters) {
state Span span("NAPI:watchValue"_loc, parameters->spanContext);
state Span span("NAPI:watchValue"_loc, parameters->spanID);
state Version ver = parameters->version;
cx->validateVersion(parameters->version);
ASSERT(parameters->version != latestVersion);
@ -3524,7 +3522,7 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
parameters->tenant.name,
parameters->key,
&StorageServerInterface::watchValue,
parameters->spanContext,
parameters->spanID,
parameters->debugID,
parameters->useProvisionalProxies,
Reverse::False,
@ -3743,15 +3741,15 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
Optional<Value> value,
Database cx,
TagSet tags,
SpanContext spanContext,
SpanID spanID,
TaskPriority taskID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies) {
state Version ver = wait(version);
wait(getWatchFuture(cx,
makeReference<WatchParameters>(
tenant, key, value, ver, tags, spanContext, taskID, debugID, useProvisionalProxies)));
wait(getWatchFuture(
cx,
makeReference<WatchParameters>(tenant, key, value, ver, tags, spanID, taskID, debugID, useProvisionalProxies)));
return Void();
}
@ -3797,11 +3795,10 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
Reverse reverse,
UseTenant useTenant) {
state RangeResultFamily output;
// TODO - ljoswiak parent or link?
state Span span("NAPI:getExactRange"_loc, trState->spanContext);
state Span span("NAPI:getExactRange"_loc, trState->spanID);
if (useTenant && trState->tenant().present()) {
span.addAttribute("tenant"_sr, trState->tenant().get());
span.addTag("tenant"_sr, trState->tenant().get());
}
// printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
@ -4158,9 +4155,9 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
state KeySelector originalBegin = begin;
state KeySelector originalEnd = end;
state RangeResultFamily output;
state Span span("NAPI:getRange"_loc, trState->spanContext);
state Span span("NAPI:getRange"_loc, trState->spanID);
if (useTenant && trState->tenant().present()) {
span.addAttribute("tenant"_sr, trState->tenant().get());
span.addTag("tenant"_sr, trState->tenant().get());
}
try {
@ -4634,7 +4631,7 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
GetRangeLimits limits,
Snapshot snapshot,
Reverse reverse,
SpanContext spanContext) {
SpanID spanContext) {
loop {
state std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations(trState,
@ -4927,7 +4924,7 @@ ACTOR Future<Void> getRangeStream(Reference<TransactionState> trState,
// FIXME: better handling to disable row limits
ASSERT(!limits.hasRowLimit());
state Span span("NAPI:getRangeStream"_loc, trState->spanContext);
state Span span("NAPI:getRangeStream"_loc, trState->spanID);
state Version version = wait(fVersion);
trState->cx->validateVersion(version);
@ -5050,7 +5047,7 @@ Transaction::Transaction(Database const& cx, Optional<TenantName> const& tenant)
cx->taskID,
generateSpanID(cx->transactionTracingSample),
createTrLogInfoProbabilistically(cx))),
span(trState->spanContext, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanContext) {
span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) {
if (DatabaseContext::debugUseTags) {
debugAddTags(trState);
}
@ -5185,7 +5182,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
Database cx,
Future<TenantInfo> tenant,
TagSet tags,
SpanContext spanContext,
SpanID spanID,
TaskPriority taskID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies) {
@ -5213,7 +5210,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
watch->value,
cx,
tags,
spanContext,
spanID,
taskID,
debugID,
useProvisionalProxies);
@ -5246,7 +5243,7 @@ Future<Void> Transaction::watch(Reference<Watch> watch) {
populateAndGetTenant(
trState, watch->key, readVersion.isValid() && readVersion.isReady() ? readVersion.get() : latestVersion),
trState->options.readTags,
trState->spanContext,
trState->spanID,
trState->taskID,
trState->debugID,
trState->useProvisionalProxies);
@ -5734,7 +5731,7 @@ void TransactionOptions::reset(Database const& cx) {
void Transaction::resetImpl(bool generateNewSpan) {
flushTrLogsIfEnabled();
trState = trState->cloneAndReset(createTrLogInfoProbabilistically(trState->cx), generateNewSpan);
tr = CommitTransactionRequest(trState->spanContext);
tr = CommitTransactionRequest(trState->spanID);
readVersion = Future<Version>();
metadataVersion = Promise<Optional<Key>>();
extraConflictRanges.clear();
@ -5749,7 +5746,7 @@ void Transaction::reset() {
void Transaction::fullReset() {
resetImpl(true);
span = Span(trState->spanContext, "Transaction"_loc);
span = Span(trState->spanID, "Transaction"_loc);
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
}
@ -5870,7 +5867,8 @@ ACTOR void checkWrites(Reference<TransactionState> trState,
ACTOR static Future<Void> commitDummyTransaction(Reference<TransactionState> trState, KeyRange range) {
state Transaction tr(trState->cx);
state int retries = 0;
state Span span(trState->spanContext, "NAPI:dummyTransaction"_loc, span.context);
state Span span("NAPI:dummyTransaction"_loc, trState->spanID);
tr.span.addParent(span.context);
loop {
try {
TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries);
@ -5913,7 +5911,7 @@ void Transaction::setupWatches() {
watches[i]->value,
trState->cx,
trState->options.readTags,
trState->spanContext,
trState->spanID,
trState->taskID,
trState->debugID,
trState->useProvisionalProxies));
@ -6036,7 +6034,7 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
Future<Version> readVersion) {
state TraceInterval interval("TransactionCommit");
state double startTime = now();
state Span span("NAPI:tryCommit"_loc, trState->spanContext);
state Span span("NAPI:tryCommit"_loc, trState->spanID);
state Optional<UID> debugID = trState->debugID;
if (debugID.present()) {
TraceEvent(interval.begin()).detail("Parent", debugID.get());
@ -6526,11 +6524,10 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
case FDBTransactionOptions::SPAN_PARENT:
validateOptionValuePresent(value);
if (value.get().size() != 33) {
if (value.get().size() != 16) {
throw invalid_option_value();
}
TEST(true); // Adding link in FDBTransactionOptions::SPAN_PARENT
span.setParent(BinaryReader::fromStringRef<SpanContext>(value.get(), IncludeVersion()));
span.addParent(BinaryReader::fromStringRef<UID>(value.get(), Unversioned()));
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
@ -6573,7 +6570,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
}
}
ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanContext parentSpan,
ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan,
DatabaseContext* cx,
uint32_t transactionCount,
TransactionPriority priority,
@ -6688,7 +6685,7 @@ ACTOR Future<Void> readVersionBatcher(DatabaseContext* cx,
}
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
}
span.addLink(req.spanContext);
span.addParent(req.spanContext);
requests.push_back(req.reply);
for (auto tag : req.tags) {
++tags[tag];
@ -6744,10 +6741,10 @@ ACTOR Future<Void> readVersionBatcher(DatabaseContext* cx,
ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
Location location,
SpanContext spanContext,
SpanID spanContext,
Future<GetReadVersionReply> f,
Promise<Optional<Value>> metadataVersion) {
state Span span(spanContext, location, trState->spanContext);
state Span span(spanContext, location, { trState->spanID });
GetReadVersionReply rep = wait(f);
double replyTime = now();
double latency = replyTime - trState->startTime;
@ -6920,7 +6917,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
}
Location location = "NAPI:getReadVersion"_loc;
SpanContext spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanContext);
UID spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanID);
auto const req = DatabaseContext::VersionRequest(spanContext, trState->options.tags, trState->debugID);
batcher.stream.send(req);
trState->startTime = now();
@ -7396,7 +7393,7 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Reference<Transa
KeyRange keys,
int64_t chunkSize,
Version version) {
state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanContext);
state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanID);
loop {
state std::vector<KeyRangeLocationInfo> locations =
@ -7960,14 +7957,14 @@ Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(cons
return Reference<TransactionLogInfo>();
}
void Transaction::setTransactionID(UID id) {
void Transaction::setTransactionID(uint64_t id) {
ASSERT(getSize() == 0);
trState->spanContext = SpanContext(id, trState->spanContext.spanID);
trState->spanID = SpanID(id, trState->spanID.second());
}
void Transaction::setToken(uint64_t token) {
ASSERT(getSize() == 0);
trState->spanContext = SpanContext(trState->spanContext.traceID, token);
trState->spanID = SpanID(trState->spanID.first(), token);
}
void enableClientInfoLogging() {

View File

@ -243,7 +243,7 @@ struct TransactionState : ReferenceCounted<TransactionState> {
Optional<UID> debugID;
TaskPriority taskID;
SpanContext spanContext;
SpanID spanID;
UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False;
bool readVersionObtainedFromGrvProxy;
@ -259,14 +259,13 @@ struct TransactionState : ReferenceCounted<TransactionState> {
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
// Only available so that Transaction can have a default constructor, for use in state variables
TransactionState(TaskPriority taskID, SpanContext spanContext)
: taskID(taskID), spanContext(spanContext), tenantSet(false) {}
TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID), tenantSet(false) {}
// VERSION_VECTOR changed default values of readVersionObtainedFromGrvProxy
TransactionState(Database cx,
Optional<TenantName> tenant,
TaskPriority taskID,
SpanContext spanContext,
SpanID spanID,
Reference<TransactionLogInfo> trLogInfo);
Reference<TransactionState> cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo, bool generateNewSpan) const;
@ -436,7 +435,7 @@ public:
void debugTransaction(UID dID) { trState->debugID = dID; }
VersionVector getVersionVector() const;
SpanContext getSpanContext() const { return trState->spanContext; }
UID getSpanID() const { return trState->spanID; }
Future<Void> commitMutations();
void setupWatches();
@ -448,7 +447,7 @@ public:
Database getDatabase() const { return trState->cx; }
static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx);
void setTransactionID(UID id);
void setTransactionID(uint64_t id);
void setToken(uint64_t token);
const std::vector<Future<std::pair<Key, Key>>>& getExtraReadConflictRanges() const { return extraConflictRanges; }
@ -491,7 +490,7 @@ private:
Future<Void> committing;
};
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanContext spanContext);
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext);
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx,
KeyRange keys,
int shardLimit);

View File

@ -1982,7 +1982,7 @@ void ReadYourWritesTransaction::getWriteConflicts(KeyRangeMap<bool>* result) {
}
}
void ReadYourWritesTransaction::setTransactionID(UID id) {
void ReadYourWritesTransaction::setTransactionID(uint64_t id) {
tr.setTransactionID(id);
}

View File

@ -140,7 +140,7 @@ public:
[[nodiscard]] Future<Void> commit() override;
Version getCommittedVersion() const override { return tr.getCommittedVersion(); }
VersionVector getVersionVector() const override { return tr.getVersionVector(); }
SpanContext getSpanContext() const override { return tr.getSpanContext(); }
UID getSpanID() const override { return tr.getSpanID(); }
int64_t getApproximateSize() const override { return approximateSize; }
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp() override;
@ -177,7 +177,7 @@ public:
Reference<const TransactionState> getTransactionState() const { return tr.trState; }
void setTransactionID(UID id);
void setTransactionID(uint64_t id);
void setToken(uint64_t token);
// Read from the special key space readConflictRangeKeysRange

View File

@ -695,6 +695,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"cluster_controller_timestamp":1415650089,
"protocol_version":"fdb00a400050001",
"newest_protocol_version":"fdb00a500040001",
"lowest_compatible_protocol_version":"fdb00a500040001",
"connection_string":"a:a@127.0.0.1:4000",
"full_replication":true,
"maintenance_zone":"0ccb4e0fdbdb5583010f6b77d9d10ece",

View File

@ -1595,10 +1595,10 @@ Future<RangeResult> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
if (key.endsWith(kTracingTransactionIdKey)) {
result.push_back_deep(result.arena(),
KeyValueRef(key, ryw->getTransactionState()->spanContext.traceID.toString()));
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.first())));
} else if (key.endsWith(kTracingTokenKey)) {
result.push_back_deep(result.arena(),
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanContext.spanID)));
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.second())));
}
}
return result;
@ -1612,7 +1612,7 @@ void TracingOptionsImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key,
}
if (key.endsWith(kTracingTransactionIdKey)) {
ryw->setTransactionID(UID::fromString(value.toString()));
ryw->setTransactionID(std::stoul(value.toString()));
} else if (key.endsWith(kTracingTokenKey)) {
if (value.toString() == "true") {
ryw->setToken(deterministicRandom()->randomUInt64());

View File

@ -35,7 +35,6 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/Tenant.h"
#include "flow/Tracing.h"
#include "flow/UnitTest.h"
#include "fdbclient/VersionVector.h"
@ -272,7 +271,7 @@ struct GetValueReply : public LoadBalancedReply {
struct GetValueRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 8454530;
SpanContext spanContext;
SpanID spanContext;
TenantInfo tenantInfo;
Key key;
Version version;
@ -284,7 +283,7 @@ struct GetValueRequest : TimedRequest {
// serve the given key
GetValueRequest() {}
GetValueRequest(SpanContext spanContext,
GetValueRequest(SpanID spanContext,
const TenantInfo& tenantInfo,
const Key& key,
Version ver,
@ -316,7 +315,7 @@ struct WatchValueReply {
struct WatchValueRequest {
constexpr static FileIdentifier file_identifier = 14747733;
SpanContext spanContext;
SpanID spanContext;
TenantInfo tenantInfo;
Key key;
Optional<Value> value;
@ -327,7 +326,7 @@ struct WatchValueRequest {
WatchValueRequest() {}
WatchValueRequest(SpanContext spanContext,
WatchValueRequest(SpanID spanContext,
TenantInfo tenantInfo,
const Key& key,
Optional<Value> value,
@ -361,7 +360,7 @@ struct GetKeyValuesReply : public LoadBalancedReply {
struct GetKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanContext spanContext;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
@ -419,7 +418,7 @@ struct GetMappedKeyValuesReply : public LoadBalancedReply {
struct GetMappedKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795747;
SpanContext spanContext;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
@ -484,7 +483,7 @@ struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply {
struct GetKeyValuesStreamRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanContext spanContext;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
@ -535,7 +534,7 @@ struct GetKeyReply : public LoadBalancedReply {
struct GetKeyRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 10457870;
SpanContext spanContext;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef sel;
@ -549,7 +548,7 @@ struct GetKeyRequest : TimedRequest {
GetKeyRequest() {}
GetKeyRequest(SpanContext spanContext,
GetKeyRequest(SpanID spanContext,
TenantInfo tenantInfo,
KeySelectorRef const& sel,
Version version,
@ -836,7 +835,7 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply {
struct ChangeFeedStreamRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanContext spanContext;
SpanID spanContext;
Arena arena;
Key rangeID;
Version begin = 0;

View File

@ -619,6 +619,19 @@ StorageServerInterface decodeServerListValue(ValueRef const& value) {
return decodeServerListValueFB(value);
}
Value swVersionValue(SWVersion const& swversion) {
auto protocolVersion = currentProtocolVersion;
protocolVersion.addObjectSerializerFlag();
return ObjectWriter::toValue(swversion, IncludeVersion(protocolVersion));
}
SWVersion decodeSWVersionValue(ValueRef const& value) {
SWVersion s;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(s);
return s;
}
// processClassKeys.contains(k) iff k.startsWith( processClassKeys.begin ) because '/'+1 == '0'
const KeyRangeRef processClassKeys(LiteralStringRef("\xff/processClass/"), LiteralStringRef("\xff/processClass0"));
const KeyRef processClassPrefix = processClassKeys.begin;

View File

@ -205,6 +205,9 @@ const Value serverListValue(StorageServerInterface const&);
UID decodeServerListKey(KeyRef const&);
StorageServerInterface decodeServerListValue(ValueRef const&);
Value swVersionValue(SWVersion const& swversion);
SWVersion decodeSWVersionValue(ValueRef const&);
// "\xff/processClass/[[processID]]" := "[[ProcessClass]]"
// Contains a mapping from processID to processClass
extern const KeyRangeRef processClassKeys;

View File

@ -465,8 +465,8 @@ VersionVector ThreadSafeTransaction::getVersionVector() {
return tr->getVersionVector();
}
SpanContext ThreadSafeTransaction::getSpanContext() {
return tr->getSpanContext();
UID ThreadSafeTransaction::getSpanID() {
return tr->getSpanID();
}
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {

View File

@ -167,7 +167,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override;
UID getSpanID() override;
ThreadFuture<int64_t> getApproximateSize() override;
ThreadFuture<uint64_t> getProtocolVersion();

View File

@ -34,13 +34,10 @@ struct TransactionLineage : LineageProperties<TransactionLineage> {
GetKeyServersLocations
};
static constexpr std::string_view name = "Transaction"sv;
UID txID;
uint64_t txID;
Operation operation = Operation::Unset;
bool isSet(uint64_t TransactionLineage::*member) const { return this->*member > 0; }
bool isSet(UID TransactionLineage::*member) const {
return static_cast<UID>(this->*member).first() > 0 && static_cast<UID>(this->*member).second() > 0;
}
bool isSet(Operation TransactionLineage::*member) const { return this->*member != Operation::Unset; }
};

View File

@ -53,7 +53,7 @@ namespace {
class ApplyMetadataMutationsImpl {
public:
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
const UID& dbgid_,
Arena& arena_,
const VectorRef<MutationRef>& mutations_,
@ -61,7 +61,7 @@ public:
: spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_),
confChange(dummyConfChange) {}
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
Arena& arena_,
const VectorRef<MutationRef>& mutations_,
ProxyCommitData& proxyCommitData_,
@ -82,7 +82,7 @@ public:
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
initialCommit(initialCommit_) {}
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
ResolverData& resolverData_,
const VectorRef<MutationRef>& mutations_)
: spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_),
@ -94,7 +94,7 @@ public:
private:
// The following variables are incoming parameters
const SpanContext& spanContext;
const SpanID& spanContext;
const UID& dbgid;
@ -1217,7 +1217,7 @@ public:
} // anonymous namespace
void applyMetadataMutations(SpanContext const& spanContext,
void applyMetadataMutations(SpanID const& spanContext,
ProxyCommitData& proxyCommitData,
Arena& arena,
Reference<ILogSystem> logSystem,
@ -1241,13 +1241,13 @@ void applyMetadataMutations(SpanContext const& spanContext,
.apply();
}
void applyMetadataMutations(SpanContext const& spanContext,
void applyMetadataMutations(SpanID const& spanContext,
ResolverData& resolverData,
const VectorRef<MutationRef>& mutations) {
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations).apply();
}
void applyMetadataMutations(SpanContext const& spanContext,
void applyMetadataMutations(SpanID const& spanContext,
const UID& dbgid,
Arena& arena,
const VectorRef<MutationRef>& mutations,

View File

@ -87,7 +87,7 @@ Reference<StorageInfo> getStorageInfo(UID id,
std::map<UID, Reference<StorageInfo>>* storageCache,
IKeyValueStore* txnStateStore);
void applyMetadataMutations(SpanContext const& spanContext,
void applyMetadataMutations(SpanID const& spanContext,
ProxyCommitData& proxyCommitData,
Arena& arena,
Reference<ILogSystem> logSystem,
@ -97,7 +97,7 @@ void applyMetadataMutations(SpanContext const& spanContext,
Version version,
Version popVersion,
bool initialCommit);
void applyMetadataMutations(SpanContext const& spanContext,
void applyMetadataMutations(SpanID const& spanContext,
const UID& dbgid,
Arena& arena,
const VectorRef<MutationRef>& mutations,
@ -140,7 +140,7 @@ inline bool containsMetadataMutation(const VectorRef<MutationRef>& mutations) {
}
// Resolver's version
void applyMetadataMutations(SpanContext const& spanContext,
void applyMetadataMutations(SpanID const& spanContext,
ResolverData& resolverData,
const VectorRef<MutationRef>& mutations);

View File

@ -67,10 +67,6 @@ struct VersionedMessage {
return false;
if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader))
return false;
if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) {
TEST(true); // Returning false for OTELSpanContextMessage
return false;
}
reader >> *m;
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;

View File

@ -88,7 +88,6 @@ set(FDBSERVER_SRCS
OldTLogServer_4_6.actor.cpp
OldTLogServer_6_0.actor.cpp
OldTLogServer_6_2.actor.cpp
OTELSpanContextMessage.h
OnDemandStore.actor.cpp
OnDemandStore.h
PaxosConfigConsumer.actor.cpp

View File

@ -25,6 +25,7 @@
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WaitFailure.h"
#include "flow/ProtocolVersion.h"
#include "flow/actorcompiler.h" // This must be the last #include.
static std::set<int> const& normalClusterRecoveryErrors() {
@ -1407,6 +1408,11 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
wait(self->cstate.read());
if (self->cstate.prevDBState.lowestCompatibleProtocolVersion > currentProtocolVersion) {
TraceEvent(SevWarnAlways, "IncompatibleProtocolVersion", self->dbgid).log();
throw internal_error();
}
self->recoveryState = RecoveryState::LOCKING_CSTATE;
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), self->dbgid)
.detail("StatusCode", RecoveryStatus::locking_coordinated_state)
@ -1462,8 +1468,21 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
DBCoreState newState = self->cstate.myDBState;
newState.recoveryCount++;
newState.recoveryCount++;
if (self->cstate.prevDBState.newestProtocolVersion.isInvalid() ||
self->cstate.prevDBState.newestProtocolVersion < currentProtocolVersion) {
ASSERT(self->cstate.myDBState.lowestCompatibleProtocolVersion.isInvalid() ||
!self->cstate.myDBState.newestProtocolVersion.isInvalid());
newState.newestProtocolVersion = currentProtocolVersion;
newState.lowestCompatibleProtocolVersion = minCompatibleProtocolVersion;
}
wait(self->cstate.write(newState) || recoverAndEndEpoch);
TraceEvent("ProtocolVersionCompatibilityChecked", self->dbgid)
.detail("NewestProtocolVersion", self->cstate.myDBState.newestProtocolVersion)
.detail("LowestCompatibleProtocolVersion", self->cstate.myDBState.lowestCompatibleProtocolVersion)
.trackLatest(self->swVersionCheckedEventHolder->trackingKey);
self->recoveryState = RecoveryState::RECRUITING;
state std::vector<StorageServerInterface> seedServers;
@ -1611,7 +1630,7 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
tr.set(recoveryCommitRequest.arena, clusterIdKey, BinaryWriter::toValue(self->clusterId, Unversioned()));
}
applyMetadataMutations(SpanContext(),
applyMetadataMutations(SpanID(),
self->dbgid,
recoveryCommitRequest.arena,
tr.mutations.slice(mmApplied, tr.mutations.size()),

View File

@ -22,6 +22,7 @@
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#include "flow/Trace.h"
#include <utility>
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_CLUSTERRECOVERY_ACTOR_G_H)
@ -244,6 +245,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
Future<Void> logger;
Reference<EventCacheHolder> swVersionCheckedEventHolder;
Reference<EventCacheHolder> recoveredConfigEventHolder;
Reference<EventCacheHolder> clusterRecoveryStateEventHolder;
Reference<EventCacheHolder> clusterRecoveryGenerationsEventHolder;
@ -273,6 +275,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc),
swVersionCheckedEventHolder(makeReference<EventCacheHolder>("SWVersionCompatibilityChecked")),
recoveredConfigEventHolder(makeReference<EventCacheHolder>("RecoveredConfig")) {
clusterRecoveryStateEventHolder = makeReference<EventCacheHolder>(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME));

View File

@ -358,7 +358,7 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self,
state int yieldBytes = 0;
state BinaryWriter valueWriter(Unversioned());
toCommit->addTransactionInfo(SpanContext());
toCommit->addTransactionInfo(SpanID());
// Serialize the log range mutations within the map
for (; logRangeMutation != logRangeMutations->cend(); ++logRangeMutation) {
@ -654,7 +654,7 @@ void CommitBatchContext::setupTraceBatch() {
g_traceBatch.addAttach("CommitAttachID", tr.debugID.get().first(), debugID.get().first());
}
span.addLink(tr.spanContext);
span.addParent(tr.spanContext);
}
if (debugID.present()) {
@ -880,7 +880,7 @@ void applyMetadataEffect(CommitBatchContext* self) {
committed =
committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
if (committed) {
applyMetadataMutations(SpanContext(),
applyMetadataMutations(SpanID(),
*self->pProxyCommitData,
self->arena,
self->pProxyCommitData->logSystem,
@ -1300,7 +1300,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
// simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", pProxyCommitData->dbgid).detail("CommittedVersion", pProxyCommitData->committedVersion.get()).detail("NeedToCommit", commitVersion);
waitVersionSpan = Span("MP:overMaxReadTransactionLifeVersions"_loc, span.context);
waitVersionSpan = Span(
deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, { span.context });
choose {
when(wait(pProxyCommitData->committedVersion.whenAtLeast(
self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
@ -1680,7 +1681,7 @@ void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitDat
ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) {
// We can't respond to these requests until we have valid txnStateStore
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
wait(commitData->validState.getFuture());
wait(delay(0, TaskPriority::DefaultEndpoint));
@ -2200,7 +2201,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
Arena arena;
bool confChanges;
applyMetadataMutations(SpanContext(),
applyMetadataMutations(SpanID(),
*pContext->pCommitData,
arena,
Reference<ILogSystem>(),

View File

@ -141,8 +141,13 @@ struct DBCoreState {
DBRecoveryCount recoveryCount; // Increases with sequential successful recoveries.
LogSystemType logSystemType;
std::set<int8_t> pseudoLocalities;
ProtocolVersion newestProtocolVersion;
ProtocolVersion lowestCompatibleProtocolVersion;
DBCoreState() : logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty) {}
DBCoreState()
: logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty),
newestProtocolVersion(ProtocolVersion::invalidProtocolVersion),
lowestCompatibleProtocolVersion(ProtocolVersion::invalidProtocolVersion) {}
std::vector<UID> getPriorCommittedLogServers() {
std::vector<UID> priorCommittedLogServers;
@ -180,6 +185,9 @@ struct DBCoreState {
if (ar.protocolVersion().hasShardedTxsTags()) {
serializer(ar, txsTags);
}
if (ar.protocolVersion().hasSWVersionTracking()) {
serializer(ar, newestProtocolVersion, lowestCompatibleProtocolVersion);
}
} else if (ar.isDeserializing) {
tLogs.push_back(CoreTLogSet());
serializer(ar,

View File

@ -2039,6 +2039,8 @@ public:
}
loop {
state Future<Void> pauseChanged = self->pauseWiggle->onChange();
state Future<Void> stopChanged = stopSignal->onChange();
if (self->wigglingId.present()) {
state UID id = self->wigglingId.get();
if (self->pauseWiggle->get()) {
@ -2067,7 +2069,7 @@ public:
.detail("ExtraHealthyTeamCount", extraTeamCount)
.detail("HealthyTeamCount", self->healthyTeamCount);
}
when(wait(self->pauseWiggle->onChange())) { continue; }
when(wait(pauseChanged)) { continue; }
}
}
}
@ -2098,7 +2100,7 @@ public:
finishStorageWiggleSignal.send(Void());
extraTeamCount = std::max(0, extraTeamCount - 1);
}
when(wait(ddQueueCheck || self->pauseWiggle->onChange() || stopSignal->onChange())) {}
when(wait(ddQueueCheck || pauseChanged || stopChanged)) {}
}
if (stopSignal->get()) {

View File

@ -542,7 +542,7 @@ ACTOR Future<Void> lastCommitUpdater(GrvProxyData* self, PromiseStream<Future<Vo
}
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanContext parentSpan,
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
GrvProxyData* grvProxyData,
uint32_t flags,
Optional<UID> debugID,
@ -945,7 +945,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
int batchGRVProcessed = 0;
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(SpanContext(),
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(UID() /*span.context*/,
grvProxyData,
i,
debugID,

View File

@ -19,9 +19,6 @@
*/
#include "fdbserver/LogSystem.h"
#include "fdbclient/FDBTypes.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "flow/serialize.h"
std::string LogSet::logRouterString() {
@ -280,8 +277,8 @@ void LogPushData::addTxsTag() {
}
}
void LogPushData::addTransactionInfo(SpanContext const& context) {
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanContext
void LogPushData::addTransactionInfo(SpanID const& context) {
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID
spanContext = context;
writtenLocations.clear();
}
@ -347,33 +344,13 @@ bool LogPushData::writeTransactionInfo(int location, uint32_t subseq) {
writtenLocations.insert(location);
BinaryWriter& wr = messagesWriter[location];
SpanContextMessage contextMessage(spanContext);
int offset = wr.getLength();
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
for (auto& tag : prev_tags)
wr << tag;
if (logSystem->getTLogVersion() >= TLogVersion::V7) {
OTELSpanContextMessage contextMessage(spanContext);
wr << contextMessage;
} else {
// When we're on a TLog version below 7, but the front end of the system (i.e. proxy, sequencer, resolver)
// is using OpenTelemetry tracing (i.e on or above 7.2), we need to convert the OpenTelemetry Span data model
// i.e. 16 bytes for traceId, 8 bytes for spanId, to the OpenTracing spec, which is 8 bytes for traceId
// and 8 bytes for spanId. That means we need to drop some data.
//
// As a workaround for this special case we've decided to drop is the 8 bytes
// for spanId. Therefore we're passing along the full 16 byte traceId to the storage server with 0 for spanID.
// This will result in a follows from relationship for the storage span within the trace rather than a
// parent->child.
SpanContextMessage contextMessage;
if (spanContext.isSampled()) {
TEST(true); // Converting OTELSpanContextMessage to traced SpanContextMessage
contextMessage = SpanContextMessage(UID(spanContext.traceID.first(), spanContext.traceID.second()));
} else {
TEST(true); // Converting OTELSpanContextMessage to untraced SpanContextMessage
contextMessage = SpanContextMessage(UID(0, 0));
}
wr << contextMessage;
}
wr << contextMessage;
int length = wr.getLength() - offset;
*(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t);
return true;

View File

@ -26,7 +26,6 @@
#include <vector>
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/DatabaseConfiguration.h"
@ -520,7 +519,7 @@ struct ILogSystem {
Version knownCommittedVersion,
Version minKnownCommittedVersion,
LogPushData& data,
SpanContext const& spanContext,
SpanID const& spanContext,
Optional<UID> debugID = Optional<UID>(),
Optional<std::unordered_map<uint16_t, Version>> tpcvMap =
Optional<std::unordered_map<uint16_t, Version>>()) = 0;
@ -763,7 +762,7 @@ struct LogPushData : NonCopyable {
}
// Add transaction info to be written before the first mutation in the transaction.
void addTransactionInfo(SpanContext const& context);
void addTransactionInfo(SpanID const& context);
// copy written_tags, after filtering, into given set
void saveTags(std::set<Tag>& filteredTags) const {
@ -833,7 +832,7 @@ private:
// field.
std::unordered_set<int> writtenLocations;
uint32_t subsequence;
SpanContext spanContext;
SpanID spanContext;
bool shardChanged = false; // if keyServers has any changes, i.e., shard boundary modifications.
// Writes transaction info to the message stream at the given location if

View File

@ -133,14 +133,14 @@ struct GetCommitVersionReply {
struct GetCommitVersionRequest {
constexpr static FileIdentifier file_identifier = 16683181;
SpanContext spanContext;
SpanID spanContext;
uint64_t requestNum;
uint64_t mostRecentProcessedRequestNum;
UID requestingProxy;
ReplyPromise<GetCommitVersionReply> reply;
GetCommitVersionRequest() {}
GetCommitVersionRequest(SpanContext spanContext,
GetCommitVersionRequest(SpanID spanContext,
uint64_t requestNum,
uint64_t mostRecentProcessedRequestNum,
UID requestingProxy)

View File

@ -24,7 +24,6 @@
#include "fdbserver/MutationTracking.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "fdbclient/SystemData.h"
#if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED
#error "You cannot use mutation tracking in a clean/release build."
@ -97,11 +96,6 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
SpanContextMessage scm;
br >> scm;
} else if (OTELSpanContextMessage::startsOTELSpanContextMessage(mutationType)) {
TEST(true); // MutationTracking reading OTELSpanContextMessage
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
OTELSpanContextMessage scm;
br >> scm;
} else {
MutationRef m;
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));

View File

@ -1,66 +0,0 @@
/*
* OTELSpanContextMessage.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_OTELSPANCONTEXTMESSAGE_H
#define FDBSERVER_OTELSPANCONTEXTMESSAGE_H
#pragma once
#include "flow/Tracing.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
struct OTELSpanContextMessage {
// This message is pushed into the the transaction logs' memory to inform
// it what transaction subsequent mutations were a part of. This allows
// transaction logs and storage servers to associate mutations with a
// transaction identifier, called a span context.
//
// This message is similar to LogProtocolMessage. Storage servers read the
// first byte of this message to uniquely identify it, meaning it will
// never be mistaken for another message. See LogProtocolMessage.h for more
// information.
SpanContext spanContext;
OTELSpanContextMessage() {}
OTELSpanContextMessage(SpanContext const& spanContext) : spanContext(spanContext) {}
std::string toString() const {
return format("code: %d, span context: %s",
MutationRef::Reserved_For_OTELSpanContextMessage,
spanContext.toString().c_str());
}
template <class Ar>
void serialize(Ar& ar) {
uint8_t poly = MutationRef::Reserved_For_OTELSpanContextMessage;
serializer(ar, poly, spanContext);
}
static bool startsOTELSpanContextMessage(uint8_t byte) {
return byte == MutationRef::Reserved_For_OTELSpanContextMessage;
}
template <class Ar>
static bool isNextIn(Ar& ar) {
return startsOTELSpanContextMessage(*(const uint8_t*)ar.peekBytes(1));
}
};
#endif

View File

@ -340,8 +340,8 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
// The condition here must match CommitBatch::applyMetadataToCommittedTransactions()
if (reply.committed[t] == ConflictBatch::TransactionCommitted && !self->forceRecovery &&
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && (!isLocked || req.transactions[t].lock_aware)) {
SpanContext spanContext =
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();
SpanID spanContext =
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanID();
applyMetadataMutations(spanContext, resolverData, req.transactions[t].mutations);
}
@ -565,7 +565,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
ResolverData resolverData(
pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges);
applyMetadataMutations(SpanContext(), resolverData, mutations);
applyMetadataMutations(SpanID(), resolverData, mutations);
} // loop
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();

View File

@ -118,7 +118,7 @@ struct ResolveTransactionBatchRequest {
constexpr static FileIdentifier file_identifier = 16462858;
Arena arena;
SpanContext spanContext;
SpanID spanContext;
Version prevVersion;
Version version; // FIXME: ?
Version lastReceivedVersion;

View File

@ -636,7 +636,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
printf("SimulatedFDBDTerminated: %s\n", e.what());
ASSERT(destructed ||
g_simulator.getCurrentProcess() == process); // simulatedFDBD catch called on different process
TraceEvent(e.code() == error_code_actor_cancelled || e.code() == error_code_file_not_found || destructed
TraceEvent(e.code() == error_code_actor_cancelled || e.code() == error_code_file_not_found ||
e.code() == error_code_incompatible_software_version || destructed
? SevInfo
: SevError,
"SimulatedFDBDTerminated")

View File

@ -24,6 +24,7 @@
#include "fdbclient/KeyBackedTypes.h"
#include "fdbserver/Status.h"
#include "flow/ITrace.h"
#include "flow/ProtocolVersion.h"
#include "flow/Trace.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
@ -1527,6 +1528,41 @@ ACTOR static Future<Void> logRangeWarningFetcher(Database cx,
return Void();
}
struct ProtocolVersionData {
ProtocolVersion runningProtocolVersion;
ProtocolVersion newestProtocolVersion;
ProtocolVersion lowestCompatibleProtocolVersion;
ProtocolVersionData() : runningProtocolVersion(currentProtocolVersion) {}
ProtocolVersionData(uint64_t newestProtocolVersionValue, uint64_t lowestCompatibleProtocolVersionValue)
: runningProtocolVersion(currentProtocolVersion), newestProtocolVersion(newestProtocolVersionValue),
lowestCompatibleProtocolVersion(lowestCompatibleProtocolVersionValue) {}
};
ACTOR Future<ProtocolVersionData> getNewestProtocolVersion(Database cx, WorkerDetails ccWorker) {
try {
state Future<TraceEventFields> swVersionF = timeoutError(
ccWorker.interf.eventLogRequest.getReply(EventLogRequest("SWVersionCompatibilityChecked"_sr)), 1.0);
wait(success(swVersionF));
const TraceEventFields& swVersionTrace = swVersionF.get();
int64_t newestProtocolVersionValue =
std::stoull(swVersionTrace.getValue("NewestProtocolVersion").c_str(), nullptr, 16);
int64_t lowestCompatibleProtocolVersionValue =
std::stoull(swVersionTrace.getValue("LowestCompatibleProtocolVersion").c_str(), nullptr, 16);
return ProtocolVersionData(newestProtocolVersionValue, lowestCompatibleProtocolVersionValue);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
TraceEvent(SevWarnAlways, "SWVersionStatusFailed").error(e);
return ProtocolVersionData();
}
}
struct LoadConfigurationResult {
bool fullReplication;
Optional<Key> healthyZone;
@ -2880,6 +2916,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
messages.push_back(message);
}
state ProtocolVersionData protocolVersion = wait(getNewestProtocolVersion(cx, ccWorker));
// construct status information for cluster subsections
state int statusCode = (int)RecoveryStatus::END;
state JsonBuilderObject recoveryStateStatus = wait(
@ -2917,6 +2955,9 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version());
statusObj["connection_string"] = coordinators.ccr->getConnectionString().toString();
statusObj["bounce_impact"] = getBounceImpactInfo(statusCode);
statusObj["newest_protocol_version"] = format("%" PRIx64, protocolVersion.newestProtocolVersion.version());
statusObj["lowest_compatible_protocol_version"] =
format("%" PRIx64, protocolVersion.lowestCompatibleProtocolVersion.version());
state Optional<DatabaseConfiguration> configuration;
state Optional<LoadConfigurationResult> loadResult;

View File

@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "fdbserver/OTELSpanContextMessage.h"
#include "flow/Arena.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/NativeAPI.actor.h"
@ -1898,10 +1897,6 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
SpanContextMessage::isNextIn(cloneReader)) {
SpanContextMessage scm;
cloneReader >> scm;
} else if (cloneReader.protocolVersion().hasOTELSpanContext() &&
OTELSpanContextMessage::isNextIn(cloneReader)) {
OTELSpanContextMessage scm;
cloneReader >> scm;
} else {
MutationRef msg;
cloneReader >> msg;
@ -1980,10 +1975,6 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
} else if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) {
SpanContextMessage scm;
reader >> scm;
} else if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) {
TEST(true); // StorageCache reading OTELSpanContextMessage
OTELSpanContextMessage oscm;
reader >> oscm;
} else {
MutationRef msg;
reader >> msg;

View File

@ -296,7 +296,7 @@ struct TLogCommitReply {
struct TLogCommitRequest {
constexpr static FileIdentifier file_identifier = 4022206;
SpanContext spanContext;
SpanID spanContext;
Arena arena;
Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion;
@ -307,7 +307,7 @@ struct TLogCommitRequest {
Optional<UID> debugID;
TLogCommitRequest() {}
TLogCommitRequest(const SpanContext& context,
TLogCommitRequest(const SpanID& context,
const Arena& a,
Version prevVersion,
Version version,

View File

@ -507,7 +507,7 @@ Future<Version> TagPartitionedLogSystem::push(Version prevVersion,
Version knownCommittedVersion,
Version minKnownCommittedVersion,
LogPushData& data,
SpanContext const& spanContext,
SpanID const& spanContext,
Optional<UID> debugID,
Optional<std::unordered_map<uint16_t, Version>> tpcvMap) {
// FIXME: Randomize request order as in LegacyLogSystem?

View File

@ -191,7 +191,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
Version knownCommittedVersion,
Version minKnownCommittedVersion,
LogPushData& data,
SpanContext const& spanContext,
SpanID const& spanContext,
Optional<UID> debugID,
Optional<std::unordered_map<uint16_t, Version>> tpcvMap) final;

View File

@ -120,7 +120,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
};
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
state Span span("M:getVersion"_loc, req.spanContext);
state Span span("M:getVersion"_loc, { req.spanContext });
state std::map<UID, CommitProxyVersionReplies>::iterator proxyItr =
self->lastCommitProxyVersionReplies.find(req.requestingProxy); // lastCommitProxyVersionReplies never changes

View File

@ -24,10 +24,8 @@
#include <unordered_map>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/Error.h"
@ -1397,8 +1395,8 @@ void updateProcessStats(StorageServer* self) {
#pragma region Queries
#endif
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanContext spanContext) {
state Span span("SS.WaitForVersion"_loc, spanContext);
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) {
state Span span("SS.WaitForVersion"_loc, { spanContext });
choose {
when(wait(data->version.whenAtLeast(version))) {
// FIXME: A bunch of these can block with or without the following delay 0.
@ -1435,7 +1433,7 @@ Version getLatestCommitVersion(VersionVector& ssLatestCommitVersions, Tag& tag)
return commitVersion;
}
Future<Version> waitForVersion(StorageServer* data, Version version, SpanContext spanContext) {
Future<Version> waitForVersion(StorageServer* data, Version version, SpanID spanContext) {
if (version == latestVersion) {
version = std::max(Version(1), data->version.get());
}
@ -1456,10 +1454,7 @@ Future<Version> waitForVersion(StorageServer* data, Version version, SpanContext
return waitForVersionActor(data, version, spanContext);
}
Future<Version> waitForVersion(StorageServer* data,
Version commitVersion,
Version readVersion,
SpanContext spanContext) {
Future<Version> waitForVersion(StorageServer* data, Version commitVersion, Version readVersion, SpanID spanContext) {
ASSERT(commitVersion == invalidVersion || commitVersion < readVersion);
if (commitVersion == invalidVersion) {
@ -1533,11 +1528,11 @@ Optional<TenantMapEntry> StorageServer::getTenantEntry(Version version, TenantIn
ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
state int64_t resultSize = 0;
Span span("SS:getValue"_loc, req.spanContext);
Span span("SS:getValue"_loc, { req.spanContext });
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
span.addTag("tenant"_sr, req.tenantInfo.name.get());
}
span.addAttribute("key"_sr, req.key);
span.addTag("key"_sr, req.key);
// Temporarily disabled -- this path is hit a lot
// getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
@ -1670,9 +1665,9 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// must be kept alive until the watch is finished.
extern size_t WATCH_OVERHEAD_WATCHQ, WATCH_OVERHEAD_WATCHIMPL;
ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext parent, KeyRef key) {
ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanID parent, KeyRef key) {
state Location spanLocation = "SS:watchWaitForValueChange"_loc;
state Span span(spanLocation, parent);
state Span span(spanLocation, { parent });
state Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(key);
if (metadata->debugID.present())
@ -1779,8 +1774,8 @@ void checkCancelWatchImpl(StorageServer* data, WatchValueRequest req) {
ACTOR Future<Void> watchValueSendReply(StorageServer* data,
WatchValueRequest req,
Future<Version> resp,
SpanContext spanContext) {
state Span span("SS:watchValue"_loc, spanContext);
SpanID spanContext) {
state Span span("SS:watchValue"_loc, { spanContext });
state double startTime = now();
++data->counters.watchQueries;
++data->numWatches;
@ -2508,7 +2503,7 @@ ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq
}
ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req, UID streamUID) {
state Span span("SS:getChangeFeedStream"_loc, req.spanContext);
state Span span("SS:getChangeFeedStream"_loc, { req.spanContext });
state bool atLatest = false;
state bool removeUID = false;
state Optional<Version> blockedVersion;
@ -2865,7 +2860,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
KeyRange range,
int limit,
int* pLimitBytes,
SpanContext parentSpan,
SpanID parentSpan,
IKeyValueStore::ReadType type,
Optional<Key> tenantPrefix) {
state GetKeyValuesReply result;
@ -3104,7 +3099,7 @@ ACTOR Future<Key> findKey(StorageServer* data,
Version version,
KeyRange range,
int* pOffset,
SpanContext parentSpan,
SpanID parentSpan,
IKeyValueStore::ReadType type)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
@ -3125,7 +3120,7 @@ ACTOR Future<Key> findKey(StorageServer* data,
state int sign = forward ? +1 : -1;
state bool skipEqualKey = sel.orEqual == forward;
state int distance = forward ? sel.offset : 1 - sel.offset;
state Span span("SS.findKey"_loc, parentSpan);
state Span span("SS.findKey"_loc, { parentSpan });
// Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from
// the read range in this case)
@ -3223,16 +3218,16 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
state Span span("SS:getKeyValues"_loc, req.spanContext);
state Span span("SS:getKeyValues"_loc, { req.spanContext });
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
span.addTag("tenant"_sr, req.tenantInfo.name.get());
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
++data->counters.getRangeQueries;
++data->counters.allQueries;
@ -3716,16 +3711,16 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
state Span span("SS:getMappedKeyValues"_loc, req.spanContext);
state Span span("SS:getMappedKeyValues"_loc, { req.spanContext });
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
span.addTag("tenant"_sr, req.tenantInfo.name.get());
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
++data->counters.getMappedRangeQueries;
++data->counters.allQueries;
@ -3930,13 +3925,13 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
state Span span("SS:getKeyValuesStream"_loc, { req.spanContext });
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
span.addTag("tenant"_sr, req.tenantInfo.name.get());
}
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
@ -4134,12 +4129,12 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
}
ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
state Span span("SS:getKey"_loc, req.spanContext);
state Span span("SS:getKey"_loc, { req.spanContext });
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
span.addTag("tenant"_sr, req.tenantInfo.name.get());
}
state int64_t resultSize = 0;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
++data->counters.getKeyQueries;
++data->counters.allQueries;
@ -6852,10 +6847,6 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
SpanContextMessage::isNextIn(cloneReader)) {
SpanContextMessage scm;
cloneReader >> scm;
} else if (cloneReader.protocolVersion().hasOTELSpanContext() &&
OTELSpanContextMessage::isNextIn(cloneReader)) {
OTELSpanContextMessage scm;
cloneReader >> scm;
} else {
MutationRef msg;
cloneReader >> msg;
@ -6938,7 +6929,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
state Version ver = invalidVersion;
cloneCursor2->setProtocolVersion(data->logProtocol);
state SpanContext spanContext = SpanContext();
state SpanID spanContext = SpanID();
state double beforeTLogMsgsUpdates = now();
state std::set<Key> updatedChangeFeeds;
for (; cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
@ -6972,27 +6963,17 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
data->logProtocol = rd.protocolVersion();
data->storage.changeLogProtocol(ver, data->logProtocol);
cloneCursor2->setProtocolVersion(rd.protocolVersion());
spanContext.traceID = UID();
spanContext = UID();
} else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) {
SpanContextMessage scm;
rd >> scm;
TEST(true); // storageserveractor converting SpanContextMessage into OTEL SpanContext
spanContext =
SpanContext(UID(scm.spanContext.first(), scm.spanContext.second()),
0,
scm.spanContext.first() != 0 && scm.spanContext.second() != 0 ? TraceFlags::sampled
: TraceFlags::unsampled);
} else if (rd.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(rd)) {
TEST(true); // storageserveractor reading OTELSpanContextMessage
OTELSpanContextMessage scm;
rd >> scm;
spanContext = scm.spanContext;
} else {
MutationRef msg;
rd >> msg;
Span span("SS:update"_loc, spanContext);
span.addAttribute("key"_sr, msg.param1);
Span span("SS:update"_loc, { spanContext });
span.addTag("key"_sr, msg.param1);
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
// quarantine.
@ -8424,11 +8405,11 @@ ACTOR Future<Void> serveGetKeyRequests(StorageServer* self, FutureStream<GetKeyR
ACTOR Future<Void> watchValueWaitForVersion(StorageServer* self,
WatchValueRequest req,
PromiseStream<WatchValueRequest> stream) {
state Span span("SS:watchValueWaitForVersion"_loc, req.spanContext);
state Span span("SS:watchValueWaitForVersion"_loc, { req.spanContext });
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
span.addTag("tenant"_sr, req.tenantInfo.name.get());
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
try {
wait(success(waitForVersionNoTooOld(self, req.version)));
Optional<TenantMapEntry> entry = self->getTenantEntry(latestVersion, req.tenantInfo);
@ -8446,11 +8427,11 @@ ACTOR Future<Void> watchValueWaitForVersion(StorageServer* self,
ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream<WatchValueRequest> stream) {
loop {
getCurrentLineage()->modify(&TransactionLineage::txID) = UID();
getCurrentLineage()->modify(&TransactionLineage::txID) = 0;
state WatchValueRequest req = waitNext(stream);
state Reference<ServerWatchMetadata> metadata = self->getWatchMetadata(req.key.contents());
state Span span("SS:serveWatchValueRequestsImpl"_loc, req.spanContext);
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext });
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
// case 1: no watch set for the current key
if (!metadata.isValid()) {

View File

@ -22,12 +22,18 @@
#include <tuple>
#include <boost/lexical_cast.hpp>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/Locality.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ProcessInterface.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "flow/Error.h"
#include "flow/FileIdentifier.h"
#include "flow/ObjectSerializer.h"
#include "flow/Platform.h"
#include "flow/ProtocolVersion.h"
#include "flow/SystemMonitor.h"
#include "flow/TDMetric.actor.h"
@ -57,7 +63,9 @@
#include "flow/ThreadHelper.actor.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include "flow/serialize.h"
#ifdef __linux__
#include <fcntl.h>
@ -2624,6 +2632,291 @@ ACTOR Future<Void> monitorAndWriteCCPriorityInfo(std::string filePath,
}
}
static const std::string versionFileName = "sw-version";
ACTOR Future<SWVersion> testSoftwareVersionCompatibility(std::string folder, ProtocolVersion currentVersion) {
try {
state std::string versionFilePath = joinPath(folder, versionFileName);
state ErrorOr<Reference<IAsyncFile>> versionFile = wait(
errorOr(IAsyncFileSystem::filesystem(g_network)->open(versionFilePath, IAsyncFile::OPEN_READONLY, 0600)));
if (versionFile.isError()) {
if (versionFile.getError().code() == error_code_file_not_found && !fileExists(versionFilePath)) {
// If a version file does not exist, we assume this is either a fresh
// installation or an upgrade from a version that does not support version files.
// Either way, we can safely continue running this version of software.
TraceEvent(SevInfo, "NoPreviousSWVersion").log();
return SWVersion();
} else {
// Dangerous to continue if we cannot do a software compatibility test
throw versionFile.getError();
}
} else {
// Test whether the most newest software version that has been run on this cluster is
// compatible with the current software version
state int64_t filesize = wait(versionFile.get()->size());
state Standalone<StringRef> buf = makeString(filesize);
int readLen = wait(versionFile.get()->read(mutateString(buf), filesize, 0));
if (filesize == 0 || readLen != filesize) {
throw file_corrupt();
}
try {
SWVersion swversion = ObjectReader::fromStringRef<SWVersion>(buf, IncludeVersion());
ProtocolVersion lowestCompatibleVersion(swversion.lowestCompatibleProtocolVersion());
if (currentVersion >= lowestCompatibleVersion) {
return swversion;
} else {
throw incompatible_software_version();
}
} catch (Error& e) {
throw e;
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
// TODO(bvr): Inject faults
TraceEvent(SevWarnAlways, "OpenReadSWVersionFileError").error(e);
throw;
}
}
ACTOR Future<Void> updateNewestSoftwareVersion(std::string folder,
ProtocolVersion currentVersion,
ProtocolVersion latestVersion,
ProtocolVersion minCompatibleVersion) {
ASSERT(currentVersion >= minCompatibleVersion);
try {
state std::string versionFilePath = joinPath(folder, versionFileName);
ErrorOr<Reference<IAsyncFile>> versionFile = wait(
errorOr(IAsyncFileSystem::filesystem(g_network)->open(versionFilePath, IAsyncFile::OPEN_READONLY, 0600)));
if (versionFile.isError() &&
(versionFile.getError().code() != error_code_file_not_found || fileExists(versionFilePath))) {
throw versionFile.getError();
}
state Reference<IAsyncFile> newVersionFile = wait(IAsyncFileSystem::filesystem()->open(
versionFilePath,
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE,
0600));
SWVersion swVersion(latestVersion, currentVersion, minCompatibleVersion);
auto s = swVersionValue(swVersion);
ErrorOr<Void> e = wait(errorOr(newVersionFile->write(s.toString().c_str(), s.size(), 0)));
if (e.isError()) {
throw e.getError();
}
wait(newVersionFile->sync());
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
TraceEvent(SevWarnAlways, "OpenWriteSWVersionFileError").error(e);
throw;
}
return Void();
}
ACTOR Future<Void> testAndUpdateSoftwareVersionCompatibility(std::string dataFolder, UID processIDUid) {
ErrorOr<SWVersion> swVersion = wait(errorOr(testSoftwareVersionCompatibility(dataFolder, currentProtocolVersion)));
if (swVersion.isError()) {
TraceEvent(SevWarnAlways, "SWVersionCompatibilityCheckError", processIDUid).error(swVersion.getError());
throw swVersion.getError();
}
TraceEvent(SevInfo, "SWVersionCompatible", processIDUid).detail("SWVersion", swVersion.get());
if (!swVersion.get().isValid() ||
currentProtocolVersion > ProtocolVersion(swVersion.get().newestProtocolVersion())) {
ErrorOr<Void> updatedSWVersion = wait(errorOr(updateNewestSoftwareVersion(
dataFolder, currentProtocolVersion, currentProtocolVersion, minCompatibleProtocolVersion)));
if (updatedSWVersion.isError()) {
throw updatedSWVersion.getError();
}
} else if (currentProtocolVersion < ProtocolVersion(swVersion.get().newestProtocolVersion())) {
ErrorOr<Void> updatedSWVersion = wait(
errorOr(updateNewestSoftwareVersion(dataFolder,
currentProtocolVersion,
ProtocolVersion(swVersion.get().newestProtocolVersion()),
ProtocolVersion(swVersion.get().lowestCompatibleProtocolVersion()))));
if (updatedSWVersion.isError()) {
throw updatedSWVersion.getError();
}
}
ErrorOr<SWVersion> newSWVersion =
wait(errorOr(testSoftwareVersionCompatibility(dataFolder, currentProtocolVersion)));
if (newSWVersion.isError()) {
TraceEvent(SevWarnAlways, "SWVersionCompatibilityCheckError", processIDUid).error(newSWVersion.getError());
throw newSWVersion.getError();
}
TraceEvent(SevInfo, "VerifiedNewSoftwareVersion", processIDUid).detail("SWVersion", newSWVersion.get());
return Void();
}
static const std::string swversionTestDirName = "sw-version-test";
TEST_CASE("/fdbserver/worker/swversion/noversionhistory") {
if (!platform::createDirectory("sw-version-test")) {
TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test");
return Void();
}
ErrorOr<SWVersion> swversion = wait(errorOr(
testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness())));
if (!swversion.isError()) {
ASSERT(!swversion.get().isValid());
}
wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true));
return Void();
}
TEST_CASE("/fdbserver/worker/swversion/writeVerifyVersion") {
if (!platform::createDirectory("sw-version-test")) {
TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test");
return Void();
}
ErrorOr<Void> f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName,
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withTSS())));
ErrorOr<SWVersion> swversion = wait(errorOr(
testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness())));
if (!swversion.isError()) {
ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version());
}
wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true));
return Void();
}
TEST_CASE("/fdbserver/worker/swversion/runCompatibleOlder") {
if (!platform::createDirectory("sw-version-test")) {
TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test");
return Void();
}
ErrorOr<Void> f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName,
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withTSS())));
ErrorOr<SWVersion> swversion = wait(errorOr(
testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness())));
if (!swversion.isError()) {
ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version());
TraceEvent(SevInfo, "UT/swversion/runCompatibleOlder").detail("SWVersion", swversion.get());
}
ErrorOr<Void> f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName,
ProtocolVersion::withTSS(),
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withTSS())));
ErrorOr<SWVersion> swversion = wait(errorOr(
testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness())));
if (!swversion.isError()) {
ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withTSS().version());
ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version());
}
wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true));
return Void();
}
TEST_CASE("/fdbserver/worker/swversion/runIncompatibleOlder") {
if (!platform::createDirectory("sw-version-test")) {
TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test");
return Void();
}
ErrorOr<Void> f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName,
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withTSS())));
ErrorOr<SWVersion> swversion = wait(errorOr(
testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness())));
if (!swversion.isError()) {
ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version());
}
ErrorOr<SWVersion> swversion =
wait(errorOr(testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withCacheRole())));
ASSERT(swversion.isError() && swversion.getError().code() == error_code_incompatible_software_version);
wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true));
return Void();
}
TEST_CASE("/fdbserver/worker/swversion/runNewer") {
if (!platform::createDirectory("sw-version-test")) {
TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test");
return Void();
}
ErrorOr<Void> f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName,
ProtocolVersion::withTSS(),
ProtocolVersion::withTSS(),
ProtocolVersion::withCacheRole())));
ErrorOr<SWVersion> swversion = wait(errorOr(
testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness())));
if (!swversion.isError()) {
ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withTSS().version());
ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withTSS().version());
ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withCacheRole().version());
}
ErrorOr<Void> f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName,
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withStorageInterfaceReadiness(),
ProtocolVersion::withTSS())));
ErrorOr<SWVersion> swversion = wait(errorOr(
testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness())));
if (!swversion.isError()) {
ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version());
ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version());
}
wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true));
return Void();
}
ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
state UID processIDUid;
platform::createDirectory(folder);
@ -2923,6 +3216,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
localities.set(LocalityData::keyProcessId, processIDUid.toString());
// Only one process can execute on a dataFolder from this point onwards
wait(testAndUpdateSoftwareVersionCompatibility(dataFolder, processIDUid));
std::string fitnessFilePath = joinPath(dataFolder, "fitness");
auto cc = makeReference<AsyncVar<Optional<ClusterControllerFullInterface>>>();
auto ci = makeReference<AsyncVar<Optional<ClusterInterface>>>();

View File

@ -80,8 +80,8 @@ struct TransactionWrapper : public ReferenceCounted<TransactionWrapper> {
// Gets the version vector cached in a transaction
virtual VersionVector getVersionVector() = 0;
// Gets the spanContext of a transaction
virtual SpanContext getSpanContext() = 0;
// Gets the spanID of a transaction
virtual UID getSpanID() = 0;
// Prints debugging messages for a transaction; not implemented for all transaction types
virtual void debugTransaction(UID debugId) {}
@ -161,8 +161,8 @@ struct FlowTransactionWrapper : public TransactionWrapper {
// Gets the version vector cached in a transaction
VersionVector getVersionVector() override { return transaction.getVersionVector(); }
// Gets the spanContext of a transaction
SpanContext getSpanContext() override { return transaction.getSpanContext(); }
// Gets the spanID of a transaction
UID getSpanID() override { return transaction.getSpanID(); }
// Prints debugging messages for a transaction
void debugTransaction(UID debugId) override { transaction.debugTransaction(debugId); }
@ -229,8 +229,8 @@ struct ThreadTransactionWrapper : public TransactionWrapper {
// Gets the version vector cached in a transaction
VersionVector getVersionVector() override { return transaction->getVersionVector(); }
// Gets the spanContext of a transaction
SpanContext getSpanContext() override { return transaction->getSpanContext(); }
// Gets the spanID of a transaction
UID getSpanID() override { return transaction->getSpanID(); }
void addReadConflictRange(KeyRangeRef const& keys) override { transaction->addReadConflictRange(keys); }
};

View File

@ -873,8 +873,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
state Key begin = kr.begin;
state Key end = kr.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
state Span span(SpanContext(deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64()),
"WL:ConsistencyCheck"_loc);
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);
while (begin < end) {
state Reference<CommitProxyInfo> commitProxyInfo =

View File

@ -106,9 +106,10 @@ struct CycleWorkload : TestWorkload {
state Transaction tr(cx);
if (deterministicRandom()->random01() >= self->traceParentProbability) {
state Span span("CycleClient"_loc);
//TraceEvent("CycleTracingTransaction", span.context).log();
// TraceEvent("CycleTracingTransaction", span.context).log();
TraceEvent("CycleTracingTransaction", span.context).log();
tr.setOption(FDBTransactionOptions::SPAN_PARENT,
BinaryWriter::toValue(span.context, IncludeVersion()));
BinaryWriter::toValue(span.context, Unversioned()));
}
while (true) {
try {

View File

@ -174,7 +174,7 @@ struct MiniCycleWorkload : TestWorkload {
state Transaction tr(cx);
if (deterministicRandom()->random01() >= self->traceParentProbability) {
state Span span("MiniCycleClient"_loc);
TraceEvent("MiniCycleTracingTransaction", span.context.traceID).log();
TraceEvent("MiniCycleTracingTransaction", span.context).log();
tr.setOption(FDBTransactionOptions::SPAN_PARENT,
BinaryWriter::toValue(span.context, Unversioned()));
}

View File

@ -19,8 +19,8 @@
*/
#pragma once
#include <cstdint>
#include "flow/Trace.h"
#include <cstdint>
// This version impacts both communications and the deserialization of certain database and IKeyValueStore keys.
//
@ -37,6 +37,9 @@ constexpr uint64_t currentProtocolVersionValue = 0x0FDB00B072000000LL;
// than the current version, meaning that we only support downgrades between consecutive release versions.
constexpr uint64_t minInvalidProtocolVersionValue = 0x0FDB00B074000000LL;
// The lowest protocol version that can be downgraded to.
constexpr uint64_t minCompatibleProtocolVersionValue = 0x0FDB00B071000000LL;
#define PROTOCOL_VERSION_FEATURE(v, x) \
static_assert((v & 0xF0FFFFLL) == 0 || v < 0x0FDB00B071000000LL, "Unexpected feature protocol version"); \
static_assert(v <= currentProtocolVersionValue, "Feature protocol version too large"); \
@ -59,6 +62,7 @@ public: // constants
static constexpr uint64_t objectSerializerFlag = 0x1000000000000000LL;
static constexpr uint64_t compatibleProtocolVersionMask = 0xFFFFFFFFFFFF0000LL;
static constexpr uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;
static constexpr uint64_t invalidProtocolVersion = 0x0FDB00A100000000LL;
public:
constexpr explicit ProtocolVersion(uint64_t version) : _version(version) {}
@ -74,6 +78,8 @@ public:
}
constexpr bool isValid() const { return version() >= minValidProtocolVersion; }
constexpr bool isInvalid() const { return version() == invalidProtocolVersion; }
constexpr uint64_t version() const { return _version & versionFlagMask; }
constexpr uint64_t versionWithFlags() const { return _version; }
@ -164,7 +170,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, Tenants);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageInterfaceReadiness);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking);
};
template <>
@ -176,6 +182,7 @@ struct Traceable<ProtocolVersion> : std::true_type {
constexpr ProtocolVersion currentProtocolVersion(currentProtocolVersionValue);
constexpr ProtocolVersion minInvalidProtocolVersion(minInvalidProtocolVersionValue);
constexpr ProtocolVersion minCompatibleProtocolVersion(minCompatibleProtocolVersionValue);
// This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to
// change when we reach version 10.
@ -194,4 +201,47 @@ static_assert(minInvalidProtocolVersion.version() >=
// The min invalid protocol version should be the smallest possible protocol version associated with a minor release
// version.
static_assert((minInvalidProtocolVersion.version() & 0xFFFFFFLL) == 0, "Unexpected min invalid protocol version");
static_assert((minInvalidProtocolVersion.version() & 0xFFFFFFLL) == 0, "Unexpected min invalid protocol version");
struct SWVersion {
constexpr static FileIdentifier file_identifier = 13943914;
private:
uint64_t _newestProtocolVersion;
uint64_t _lastRunProtocolVersion;
uint64_t _lowestCompatibleProtocolVersion;
public:
SWVersion() {
_newestProtocolVersion = 0;
_lastRunProtocolVersion = 0;
_lowestCompatibleProtocolVersion = 0;
}
SWVersion(ProtocolVersion latestVersion, ProtocolVersion lastVersion, ProtocolVersion minCompatibleVersion)
: _newestProtocolVersion(latestVersion.version()), _lastRunProtocolVersion(lastVersion.version()),
_lowestCompatibleProtocolVersion(minCompatibleVersion.version()) {}
bool isValid() const {
return (_newestProtocolVersion != 0 && _lastRunProtocolVersion != 0 && _lowestCompatibleProtocolVersion != 0);
}
uint64_t newestProtocolVersion() const { return _newestProtocolVersion; }
uint64_t lastRunProtocolVersion() const { return _lastRunProtocolVersion; }
uint64_t lowestCompatibleProtocolVersion() const { return _lowestCompatibleProtocolVersion; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, _newestProtocolVersion, _lastRunProtocolVersion, _lowestCompatibleProtocolVersion);
}
};
template <>
struct Traceable<SWVersion> : std::true_type {
static std::string toString(const SWVersion& swVersion) {
return format("Newest: 0x%016lX, Last: 0x%016lX, MinCompatible: 0x%016lX",
swVersion.newestProtocolVersion(),
swVersion.lastRunProtocolVersion(),
swVersion.lowestCompatibleProtocolVersion());
}
};

View File

@ -19,7 +19,6 @@
*/
#include "flow/Tracing.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include "flow/Knobs.h"
#include "flow/network.h"
@ -43,11 +42,28 @@ constexpr float kQueueSizeLogInterval = 5.0;
struct NoopTracer : ITracer {
TracerType type() const override { return TracerType::DISABLED; }
void trace(Span const& span) override {}
void trace(OTELSpan const& span) override {}
};
struct LogfileTracer : ITracer {
TracerType type() const override { return TracerType::LOG_FILE; }
void trace(Span const& span) override {
TraceEvent te(SevInfo, "TracingSpan", span.context);
te.detail("Location", span.location.name)
.detail("Begin", format("%.6f", span.begin))
.detail("End", format("%.6f", span.end));
if (span.parents.size() == 1) {
te.detail("Parent", *span.parents.begin());
} else {
for (auto parent : span.parents) {
TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent);
}
}
for (const auto& [key, value] : span.tags) {
TraceEvent(SevInfo, "TracingSpanTag", span.context).detail("Key", key).detail("Value", value);
}
}
void trace(OTELSpan const& span) override {
TraceEvent te(SevInfo, "TracingSpan", span.context.traceID);
te.detail("SpanID", span.context.spanID)
.detail("Location", span.location.name)
@ -167,6 +183,31 @@ struct UDPTracer : public ITracer {
// Serializes span fields as an array into the supplied TraceRequest
// buffer.
void serialize_span(const Span& span, TraceRequest& request) {
// If you change the serialization format here, make sure to update the
// fluentd filter to be able to correctly parse the updated format! See
// the msgpack specification for more info on the bit patterns used
// here.
uint8_t size = 8;
if (span.parents.size() == 0)
--size;
request.write_byte(size | 0b10010000); // write as array
serialize_string(g_network->getLocalAddress().toString(), request); // ip:port
serialize_value(span.context.first(), request, 0xcf); // trace id
serialize_value(span.context.second(), request, 0xcf); // token (span id)
serialize_value(span.begin, request, 0xcb); // start time
serialize_value(span.end - span.begin, request, 0xcb); // duration
serialize_string(span.location.name.toString(), request);
serialize_map(span.tags, request);
serialize_vector(span.parents, request);
}
void serialize_span(const OTELSpan& span, TraceRequest& request) {
uint16_t size = 14;
request.write_byte(size | 0b10010000); // write as array
serialize_value(span.context.traceID.first(), request, 0xcf); // trace id
@ -233,6 +274,30 @@ private:
serialize_string(reinterpret_cast<const uint8_t*>(str.data()), str.size(), request);
}
// Writes the given vector of SpanIDs to the request. If the vector is
// empty, the request is not modified.
inline void serialize_vector(const SmallVectorRef<SpanID>& vec, TraceRequest& request) {
int size = vec.size();
if (size == 0) {
return;
}
if (size <= 15) {
request.write_byte(static_cast<uint8_t>(size) | 0b10010000);
} else if (size <= 65535) {
request.write_byte(0xdc);
request.write_byte(reinterpret_cast<const uint8_t*>(&size)[1]);
request.write_byte(reinterpret_cast<const uint8_t*>(&size)[0]);
} else {
TraceEvent(SevWarn, "TracingSpanSerializeVector")
.detail("Failed to MessagePack encode very large vector", size);
ASSERT_WE_THINK(false);
}
for (const auto& parentContext : vec) {
serialize_value(parentContext.second(), request, 0xcf);
}
}
// Writes the given vector of linked SpanContext's to the request. If the vector is
// empty, the request is not modified.
inline void serialize_vector(const SmallVectorRef<SpanContext>& vec, TraceRequest& request) {
@ -257,7 +322,7 @@ private:
// Writes the given vector of linked SpanContext's to the request. If the vector is
// empty, the request is not modified.
inline void serialize_vector(const SmallVectorRef<SpanEventRef>& vec, TraceRequest& request) {
inline void serialize_vector(const SmallVectorRef<OTELEventRef>& vec, TraceRequest& request) {
int size = vec.size();
if (size <= 15) {
request.write_byte(static_cast<uint8_t>(size) | 0b10010000);
@ -388,6 +453,12 @@ struct FastUDPTracer : public UDPTracer {
request_.reset();
}
void trace(OTELSpan const& span) override {
prepare(span.location.name.size());
serialize_span(span, request_);
write();
}
void trace(Span const& span) override {
prepare(span.location.name.size());
serialize_span(span, request_);
@ -442,6 +513,28 @@ void openTracer(TracerType type) {
ITracer::~ITracer() {}
Span& Span::operator=(Span&& o) {
if (begin > 0.0 && context.second() > 0) {
end = g_network->now();
g_tracer->trace(*this);
}
arena = std::move(o.arena);
context = o.context;
begin = o.begin;
end = o.end;
location = o.location;
parents = std::move(o.parents);
o.begin = 0;
return *this;
}
Span::~Span() {
if (begin > 0.0 && context.second() > 0) {
end = g_network->now();
g_tracer->trace(*this);
}
}
OTELSpan& OTELSpan::operator=(OTELSpan&& o) {
if (begin > 0.0 && o.context.isSampled() > 0) {
end = g_network->now();
g_tracer->trace(*this);
@ -465,7 +558,7 @@ Span& Span::operator=(Span&& o) {
return *this;
}
Span::~Span() {
OTELSpan::~OTELSpan() {
if (begin > 0.0 && context.isSampled()) {
end = g_network->now();
g_tracer->trace(*this);
@ -474,15 +567,16 @@ Span::~Span() {
TEST_CASE("/flow/Tracing/CreateOTELSpan") {
// Sampling disabled, no parent.
Span notSampled("foo"_loc);
OTELSpan notSampled("foo"_loc);
ASSERT(!notSampled.context.isSampled());
// Force Sampling
// Span sampled("foo"_loc, []() { return 1.0; });
// ASSERT(sampled.context.isSampled());
OTELSpan sampled("foo"_loc, []() { return 1.0; });
ASSERT(sampled.context.isSampled());
// Ensure child traceID matches parent, when parent is sampled.
Span childTraceIDMatchesParent("foo"_loc, SpanContext(UID(100, 101), 200, TraceFlags::sampled));
OTELSpan childTraceIDMatchesParent(
"foo"_loc, []() { return 1.0; }, SpanContext(UID(100, 101), 200, TraceFlags::sampled));
ASSERT(childTraceIDMatchesParent.context.traceID.first() ==
childTraceIDMatchesParent.parentContext.traceID.first());
ASSERT(childTraceIDMatchesParent.context.traceID.second() ==
@ -490,20 +584,22 @@ TEST_CASE("/flow/Tracing/CreateOTELSpan") {
// When the parent isn't sampled AND it has legitimate values we should not sample a child,
// even if the child was randomly selected for sampling.
Span parentNotSampled("foo"_loc, SpanContext(UID(1, 1), 1, TraceFlags::unsampled));
OTELSpan parentNotSampled(
"foo"_loc, []() { return 1.0; }, SpanContext(UID(1, 1), 1, TraceFlags::unsampled));
ASSERT(!parentNotSampled.context.isSampled());
// When the parent isn't sampled AND it has zero values for traceID and spanID this means
// we should defer to the child as the new root of the trace as there was no actual parent.
// If the child was sampled we should send the child trace with a null parent.
// Span noParent("foo"_loc, SpanContext(UID(0, 0), 0, TraceFlags::unsampled));
// ASSERT(noParent.context.isSampled());
OTELSpan noParent(
"foo"_loc, []() { return 1.0; }, SpanContext(UID(0, 0), 0, TraceFlags::unsampled));
ASSERT(noParent.context.isSampled());
return Void();
};
TEST_CASE("/flow/Tracing/AddEvents") {
// Use helper method to add an OTELEventRef to an OTELSpan.
Span span1("span_with_event"_loc);
OTELSpan span1("span_with_event"_loc);
auto arena = span1.arena;
SmallVectorRef<KeyValueRef> attrs;
attrs.push_back(arena, KeyValueRef("foo"_sr, "bar"_sr));
@ -514,14 +610,14 @@ TEST_CASE("/flow/Tracing/AddEvents") {
ASSERT(span1.events[0].attributes.begin()->value.toString() == "bar");
// Use helper method to add an OTELEventRef with no attributes to an OTELSpan
Span span2("span_with_event"_loc);
OTELSpan span2("span_with_event"_loc);
span2.addEvent(StringRef(span2.arena, LiteralStringRef("commit_succeed")), 1234567.100);
ASSERT(span2.events[0].name.toString() == "commit_succeed");
ASSERT(span2.events[0].time == 1234567.100);
ASSERT(span2.events[0].attributes.size() == 0);
// Add fully constructed OTELEventRef to OTELSpan passed by value.
Span span3("span_with_event"_loc);
OTELSpan span3("span_with_event"_loc);
auto s3Arena = span3.arena;
SmallVectorRef<KeyValueRef> s3Attrs;
s3Attrs.push_back(s3Arena, KeyValueRef("xyz"_sr, "123"_sr));
@ -540,10 +636,7 @@ TEST_CASE("/flow/Tracing/AddEvents") {
};
TEST_CASE("/flow/Tracing/AddAttributes") {
Span span1("span_with_attrs"_loc,
SpanContext(deterministicRandom()->randomUniqueID(),
deterministicRandom()->randomUInt64(),
TraceFlags::sampled));
OTELSpan span1("span_with_attrs"_loc);
auto arena = span1.arena;
span1.addAttribute(StringRef(arena, LiteralStringRef("foo")), StringRef(arena, LiteralStringRef("bar")));
span1.addAttribute(StringRef(arena, LiteralStringRef("operation")), StringRef(arena, LiteralStringRef("grv")));
@ -551,34 +644,25 @@ TEST_CASE("/flow/Tracing/AddAttributes") {
ASSERT(span1.attributes[1] == KeyValueRef("foo"_sr, "bar"_sr));
ASSERT(span1.attributes[2] == KeyValueRef("operation"_sr, "grv"_sr));
Span span2("span_with_attrs"_loc,
SpanContext(deterministicRandom()->randomUniqueID(),
deterministicRandom()->randomUInt64(),
TraceFlags::sampled));
auto s2Arena = span2.arena;
span2.addAttribute(StringRef(s2Arena, LiteralStringRef("a")), StringRef(s2Arena, LiteralStringRef("1")))
.addAttribute(StringRef(s2Arena, LiteralStringRef("b")), LiteralStringRef("2"))
.addAttribute(StringRef(s2Arena, LiteralStringRef("c")), LiteralStringRef("3"));
OTELSpan span3("span_with_attrs"_loc);
auto s3Arena = span3.arena;
span3.addAttribute(StringRef(s3Arena, LiteralStringRef("a")), StringRef(s3Arena, LiteralStringRef("1")))
.addAttribute(StringRef(s3Arena, LiteralStringRef("b")), LiteralStringRef("2"))
.addAttribute(StringRef(s3Arena, LiteralStringRef("c")), LiteralStringRef("3"));
ASSERT_EQ(span2.attributes.size(), 4); // Includes default attribute of "address"
ASSERT(span2.attributes[1] == KeyValueRef("a"_sr, "1"_sr));
ASSERT(span2.attributes[2] == KeyValueRef("b"_sr, "2"_sr));
ASSERT(span2.attributes[3] == KeyValueRef("c"_sr, "3"_sr));
ASSERT_EQ(span3.attributes.size(), 4); // Includes default attribute of "address"
ASSERT(span3.attributes[1] == KeyValueRef("a"_sr, "1"_sr));
ASSERT(span3.attributes[2] == KeyValueRef("b"_sr, "2"_sr));
ASSERT(span3.attributes[3] == KeyValueRef("c"_sr, "3"_sr));
return Void();
};
TEST_CASE("/flow/Tracing/AddLinks") {
Span span1("span_with_links"_loc);
ASSERT(!span1.context.isSampled());
ASSERT(!span1.context.isValid());
OTELSpan span1("span_with_links"_loc);
span1.addLink(SpanContext(UID(100, 101), 200, TraceFlags::sampled));
span1.addLink(SpanContext(UID(200, 201), 300, TraceFlags::unsampled))
.addLink(SpanContext(UID(300, 301), 400, TraceFlags::sampled));
// Ensure the root span is now sampled and traceID and spanIDs are set.
ASSERT(span1.context.isSampled());
ASSERT(span1.context.isValid());
// Ensure links are present.
ASSERT(span1.links[0].traceID == UID(100, 101));
ASSERT(span1.links[0].spanID == 200);
ASSERT(span1.links[0].m_Flags == TraceFlags::sampled);
@ -589,16 +673,11 @@ TEST_CASE("/flow/Tracing/AddLinks") {
ASSERT(span1.links[2].spanID == 400);
ASSERT(span1.links[2].m_Flags == TraceFlags::sampled);
Span span2("span_with_links"_loc);
ASSERT(!span2.context.isSampled());
ASSERT(!span2.context.isValid());
OTELSpan span2("span_with_links"_loc);
auto link1 = SpanContext(UID(1, 1), 1, TraceFlags::sampled);
auto link2 = SpanContext(UID(2, 2), 2, TraceFlags::sampled);
auto link3 = SpanContext(UID(3, 3), 3, TraceFlags::sampled);
span2.addLinks({ link1, link2 }).addLinks({ link3 });
// Ensure the root span is now sampled and traceID and spanIDs are set.
ASSERT(span2.context.isSampled());
ASSERT(span2.context.isValid());
ASSERT(span2.links[0].traceID == UID(1, 1));
ASSERT(span2.links[0].spanID == 1);
ASSERT(span2.links[0].m_Flags == TraceFlags::sampled);
@ -662,7 +741,7 @@ std::string readMPString(uint8_t* index) {
// Windows doesn't like lack of header and declaration of constructor for FastUDPTracer
#ifndef WIN32
TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
Span span1("encoded_span"_loc);
OTELSpan span1("encoded_span"_loc);
auto request = TraceRequest{ .buffer = std::make_unique<uint8_t[]>(kTraceBufferSize),
.data_size = 0,
.buffer_size = kTraceBufferSize };
@ -674,9 +753,9 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
// Test - constructor OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link)
// Will delegate to other constructors.
Span span2("encoded_span"_loc,
SpanContext(UID(100, 101), 1, TraceFlags::sampled),
{ SpanContext(UID(200, 201), 2, TraceFlags::sampled) });
OTELSpan span2("encoded_span"_loc,
SpanContext(UID(100, 101), 1, TraceFlags::sampled),
SpanContext(UID(200, 201), 2, TraceFlags::sampled));
tracer.serialize_span(span2, request);
data = request.buffer.get();
ASSERT(data[0] == 0b10011110); // 14 element array.
@ -722,7 +801,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
request.reset();
// Exercise all fluent interfaces, include links, events, and attributes.
Span span3("encoded_span_3"_loc, SpanContext());
OTELSpan span3("encoded_span_3"_loc);
auto s3Arena = span3.arena;
SmallVectorRef<KeyValueRef> attrs;
attrs.push_back(s3Arena, KeyValueRef("foo"_sr, "bar"_sr));
@ -791,7 +870,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
"SGKKUrpIb/7zePhBDi+gzUzyAcbQ2zUbFWI1KNi3zQk58uUG6wWJZkw+GCs7Cc3V"
"OUxOljwCJkC4QTgdsbbFhxUC+rtoHV5xAqoTQwR0FXnWigUjP7NtdL6huJUr3qRv"
"40c4yUI1a4+P5vJa";
Span span4;
auto span4 = OTELSpan();
auto location = Location();
location.name = StringRef(span4.arena, longString);
span4.location = location;

View File

@ -33,43 +33,90 @@ inline Location operator"" _loc(const char* str, size_t size) {
return Location{ StringRef(reinterpret_cast<const uint8_t*>(str), size) };
}
enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) {
return static_cast<TraceFlags>(static_cast<std::underlying_type_t<TraceFlags>>(lhs) &
static_cast<std::underlying_type_t<TraceFlags>>(rhs));
}
struct SpanContext {
UID traceID;
uint64_t spanID;
TraceFlags m_Flags;
SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {}
SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {}
SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {}
SpanContext(Arena arena, const SpanContext& span)
: traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {}
bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; }
std::string toString() const { return format("%016llx%016llx%016llx", traceID.first(), traceID.second(), spanID); };
bool isValid() const { return traceID.first() != 0 && traceID.second() != 0 && spanID != 0; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, traceID, spanID, m_Flags);
struct Span {
Span(SpanID context, Location location, std::initializer_list<SpanID> const& parents = {})
: context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) {
if (parents.size() > 0) {
// If the parents' token is 0 (meaning the trace should not be
// recorded), set the child token to 0 as well. Otherwise, generate
// a new, random token.
uint64_t traceId = 0;
if ((*parents.begin()).second() > 0) {
traceId = deterministicRandom()->randomUInt64();
}
this->context = SpanID((*parents.begin()).first(), traceId);
}
}
Span(Location location, std::initializer_list<SpanID> const& parents = {})
: Span(UID(deterministicRandom()->randomUInt64(),
deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE
? deterministicRandom()->randomUInt64()
: 0),
location,
parents) {}
Span(Location location, SpanID context) : Span(location, { context }) {}
Span(const Span&) = delete;
Span(Span&& o) {
arena = std::move(o.arena);
context = o.context;
begin = o.begin;
end = o.end;
location = o.location;
parents = std::move(o.parents);
o.context = UID();
o.begin = 0.0;
o.end = 0.0;
}
Span() {}
~Span();
Span& operator=(Span&& o);
Span& operator=(const Span&) = delete;
void swap(Span& other) {
std::swap(arena, other.arena);
std::swap(context, other.context);
std::swap(begin, other.begin);
std::swap(end, other.end);
std::swap(location, other.location);
std::swap(parents, other.parents);
}
void addParent(SpanID span) {
if (parents.size() == 0) {
uint64_t traceId = 0;
if (span.second() > 0) {
traceId = context.second() == 0 ? deterministicRandom()->randomUInt64() : context.second();
}
// Use first parent to set trace ID. This is non-ideal for spans
// with multiple parents, because the trace ID will associate the
// span with only one trace. A workaround is to look at the parent
// relationships instead of the trace ID. Another option in the
// future is to keep a list of trace IDs.
context = SpanID(span.first(), traceId);
}
parents.push_back(arena, span);
}
void addTag(const StringRef& key, const StringRef& value) { tags[key] = value; }
Arena arena;
UID context = UID();
double begin = 0.0, end = 0.0;
Location location;
SmallVectorRef<SpanID> parents;
std::unordered_map<StringRef, StringRef> tags;
};
// Span
// OTELSpan
//
// Span is a tracing implementation which, for the most part, complies with the W3C Trace Context specification
// OTELSpan is a tracing implementation which, for the most part, complies with the W3C Trace Context specification
// https://www.w3.org/TR/trace-context/ and the OpenTelemetry API
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md.
//
// The major differences between Span and the 7.0 Span implementation, which is based off the OpenTracing.io
// The major differences between OTELSpan and the current Span implementation, which is based off the OpenTracing.io
// specification https://opentracing.io/ are as follows.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#span
//
// OpenTelemetry Spans have...
// OTELSpans have...
// 1. A SpanContext which consists of 3 attributes.
//
// TraceId - A valid trace identifier is a 16-byte array with at least one non-zero byte.
@ -99,61 +146,82 @@ enum class SpanKind : uint8_t { INTERNAL = 0, CLIENT = 1, SERVER = 2, PRODUCER =
enum class SpanStatus : uint8_t { UNSET = 0, OK = 1, ERR = 2 };
struct SpanEventRef {
SpanEventRef() {}
SpanEventRef(const StringRef& name,
struct OTELEventRef {
OTELEventRef() {}
OTELEventRef(const StringRef& name,
const double& time,
const SmallVectorRef<KeyValueRef>& attributes = SmallVectorRef<KeyValueRef>())
: name(name), time(time), attributes(attributes) {}
SpanEventRef(Arena& arena, const SpanEventRef& other)
OTELEventRef(Arena& arena, const OTELEventRef& other)
: name(arena, other.name), time(other.time), attributes(arena, other.attributes) {}
StringRef name;
double time = 0.0;
SmallVectorRef<KeyValueRef> attributes;
};
class Span {
class OTELSpan {
public:
// Construct a Span with a given context, location, parentContext and optional links.
//
// N.B. While this constructor receives a parentContext it does not overwrite the traceId of the Span's context.
// Therefore it is the responsibility of the caller to ensure the traceID and m_Flags of both the context and
// parentContext are identical if the caller wishes to establish a parent/child relationship between these spans. We
// do this to avoid needless comparisons or copies as this constructor is only called once in NativeAPI.actor.cpp
// and from below in the by the Span(location, parent, links) constructor. The Span(location, parent, links)
// constructor is used broadly and performs the copy of the parent's traceID and m_Flags.
Span(const SpanContext& context,
const Location& location,
const SpanContext& parentContext,
const std::initializer_list<SpanContext>& links = {})
OTELSpan(const SpanContext& context,
const Location& location,
const SpanContext& parentContext,
const std::initializer_list<SpanContext>& links = {})
: context(context), location(location), parentContext(parentContext), links(arena, links.begin(), links.end()),
begin(g_network->now()) {
// We've simplified the logic here, essentially we're now always setting trace and span ids and relying on the
// TraceFlags to determine if we're sampling. Therefore if the parent is sampled, we simply overwrite this
// span's traceID with the parent trace id.
if (parentContext.isSampled()) {
this->context.traceID = UID(parentContext.traceID.first(), parentContext.traceID.second());
this->context.m_Flags = TraceFlags::sampled;
} else {
// However there are two other cases.
// 1. A legitamite parent span exists but it was not selected for tracing.
// 2. There is no actual parent, just a default arg parent provided by the constructor AND the "child" span
// was selected for sampling. For case 1. we handle below by marking the child as unsampled. For case 2 we
// needn't do anything, and can rely on the values in this OTELSpan
if (parentContext.traceID.first() != 0 && parentContext.traceID.second() != 0 &&
parentContext.spanID != 0) {
this->context.m_Flags = TraceFlags::unsampled;
}
}
this->kind = SpanKind::SERVER;
this->status = SpanStatus::OK;
this->attributes.push_back(
this->arena, KeyValueRef("address"_sr, StringRef(this->arena, g_network->getLocalAddress().toString())));
}
// Construct Span with a location, parent, and optional links.
// This constructor copies the parent's traceID creating a parent->child relationship between Spans.
// Additionally we inherit the m_Flags of the parent, thus enabling or disabling sampling to match the parent.
Span(const Location& location, const SpanContext& parent, const std::initializer_list<SpanContext>& links = {})
: Span(SpanContext(parent.traceID, deterministicRandom()->randomUInt64(), parent.m_Flags),
location,
parent,
links) {}
OTELSpan(const Location& location,
const SpanContext& parent = SpanContext(),
const std::initializer_list<SpanContext>& links = {})
: OTELSpan(
SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()), // traceID
deterministicRandom()->randomUInt64(), // spanID
deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE // sampled or unsampled
? TraceFlags::sampled
: TraceFlags::unsampled),
location,
parent,
links) {}
// Construct Span without parent. Used for creating a root span, or when the parent is not known at construction
// time.
Span(const SpanContext& context, const Location& location) : Span(context, location, SpanContext()) {}
OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link)
: OTELSpan(location, parent, { link }) {}
// We've determined for initial tracing release, spans with only a location will not be traced.
// Generally these are for background processes, some are called infrequently, while others may be high volume.
// TODO: review and address in subsequent PRs.
Span(const Location& location) : location(location), begin(g_network->now()) {}
// NOTE: This constructor is primarly for unit testing until we sort out how to enable/disable a Knob dynamically in
// a test.
OTELSpan(const Location& location,
const std::function<double()>& rateProvider,
const SpanContext& parent = SpanContext(),
const std::initializer_list<SpanContext>& links = {})
: OTELSpan(SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()),
deterministicRandom()->randomUInt64(),
deterministicRandom()->random01() < rateProvider() ? TraceFlags::sampled
: TraceFlags::unsampled),
location,
parent,
links) {}
Span(const Span&) = delete;
Span(Span&& o) {
OTELSpan(const OTELSpan&) = delete;
OTELSpan(OTELSpan&& o) {
arena = std::move(o.arena);
context = o.context;
location = o.location;
@ -171,11 +239,11 @@ public:
o.end = 0.0;
o.status = SpanStatus::UNSET;
}
Span() {}
~Span();
Span& operator=(Span&& o);
Span& operator=(const Span&) = delete;
void swap(Span& other) {
OTELSpan() {}
~OTELSpan();
OTELSpan& operator=(OTELSpan&& o);
OTELSpan& operator=(const OTELSpan&) = delete;
void swap(OTELSpan& other) {
std::swap(arena, other.arena);
std::swap(context, other.context);
std::swap(location, other.location);
@ -188,62 +256,34 @@ public:
std::swap(events, other.events);
}
Span& addLink(const SpanContext& linkContext) {
OTELSpan& addLink(const SpanContext& linkContext) {
links.push_back(arena, linkContext);
// Check if link is sampled, if so sample this span.
if (!context.isSampled() && linkContext.isSampled()) {
context.m_Flags = TraceFlags::sampled;
// If for some reason this span isn't valid, we need to give it a
// traceID and spanID. This case is currently hit in CommitProxyServer
// CommitBatchContext::CommitBatchContext and CommitBatchContext::setupTraceBatch.
if (!context.isValid()) {
context.traceID = deterministicRandom()->randomUniqueID();
context.spanID = deterministicRandom()->randomUInt64();
}
}
return *this;
}
Span& addLinks(const std::initializer_list<SpanContext>& linkContexts = {}) {
OTELSpan& addLinks(const std::initializer_list<SpanContext>& linkContexts = {}) {
for (auto const& sc : linkContexts) {
addLink(sc);
links.push_back(arena, sc);
}
return *this;
}
Span& addEvent(const SpanEventRef& event) {
OTELSpan& addEvent(const OTELEventRef& event) {
events.push_back_deep(arena, event);
return *this;
}
Span& addEvent(const StringRef& name,
const double& time,
const SmallVectorRef<KeyValueRef>& attrs = SmallVectorRef<KeyValueRef>()) {
return addEvent(SpanEventRef(name, time, attrs));
OTELSpan& addEvent(const StringRef& name,
const double& time,
const SmallVectorRef<KeyValueRef>& attrs = SmallVectorRef<KeyValueRef>()) {
return addEvent(OTELEventRef(name, time, attrs));
}
Span& addAttribute(const StringRef& key, const StringRef& value) {
OTELSpan& addAttribute(const StringRef& key, const StringRef& value) {
attributes.push_back_deep(arena, KeyValueRef(key, value));
return *this;
}
Span& setParent(const SpanContext& parent) {
parentContext = parent;
context.traceID = parent.traceID;
context.spanID = deterministicRandom()->randomUInt64();
context.m_Flags = parent.m_Flags;
return *this;
}
Span& addParentOrLink(const SpanContext& other) {
if (!parentContext.isValid()) {
parentContext = other;
} else {
links.push_back(arena, other);
}
return *this;
}
Arena arena;
SpanContext context;
Location location;
@ -252,7 +292,7 @@ public:
SmallVectorRef<SpanContext> links;
double begin = 0.0, end = 0.0;
SmallVectorRef<KeyValueRef> attributes; // not necessarily sorted
SmallVectorRef<SpanEventRef> events;
SmallVectorRef<OTELEventRef> events;
SpanStatus status;
};
@ -271,6 +311,7 @@ struct ITracer {
virtual TracerType type() const = 0;
// passed ownership to the tracer
virtual void trace(Span const& span) = 0;
virtual void trace(OTELSpan const& span) = 0;
};
void openTracer(TracerType type);
@ -287,3 +328,16 @@ struct SpannedDeque : Deque<T> {
span = std::move(other.span);
}
};
template <class T>
struct OTELSpannedDeque : Deque<T> {
OTELSpan span;
explicit OTELSpannedDeque(Location loc) : span(loc) {}
OTELSpannedDeque(OTELSpannedDeque&& other) : Deque<T>(std::move(other)), span(std::move(other.span)) {}
OTELSpannedDeque(OTELSpannedDeque const&) = delete;
OTELSpannedDeque& operator=(OTELSpannedDeque const&) = delete;
OTELSpannedDeque& operator=(OTELSpannedDeque&& other) {
*static_cast<Deque<T>*>(this) = std::move(other);
span = std::move(other.span);
}
};

View File

@ -123,6 +123,7 @@ ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress
ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different cluster ID" )
ERROR( restart_cluster_controller, 1218, "Restart cluster controller process" )
ERROR( please_reboot_remote_kv_store, 1219, "Need to reboot the storage engine process as it died abnormally")
ERROR( incompatible_software_version, 1220, "Current software does not support database format" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -148,24 +148,31 @@ class UpgradeTest:
local_file = self.binary_path(version, target_bin_name)
if (local_file.exists()):
return
# Download to a temporary file and then replace the target file atomically
# to avoid consistency errors in case of multiple tests are downloading the
# same file in parallel
local_file_tmp = Path("{}.{}".format(
str(local_file), random_secret_string(8)))
self.download_dir.joinpath(version).mkdir(
parents=True, exist_ok=True)
remote_file = "{}{}/{}".format(FDB_DOWNLOAD_ROOT,
version, remote_bin_name)
remote_sha256 = "{}.sha256".format(remote_file)
local_sha256 = Path("{}.sha256".format(local_file))
local_sha256 = Path("{}.sha256".format(local_file_tmp))
for attempt_cnt in range(MAX_DOWNLOAD_ATTEMPTS):
print("Downloading '{}' to '{}'...".format(remote_file, local_file))
request.urlretrieve(remote_file, local_file)
print("Downloading '{}' to '{}'...".format(
remote_file, local_file_tmp))
request.urlretrieve(remote_file, local_file_tmp)
print("Downloading '{}' to '{}'...".format(
remote_sha256, local_sha256))
request.urlretrieve(remote_sha256, local_sha256)
print("Download complete")
assert local_file.exists(), "{} does not exist".format(local_file)
assert local_file_tmp.exists(), "{} does not exist".format(local_file_tmp)
assert local_sha256.exists(), "{} does not exist".format(local_sha256)
expected_checksum = read_to_str(local_sha256)
actual_checkum = compute_sha256(local_file)
actual_checkum = compute_sha256(local_file_tmp)
if (expected_checksum == actual_checkum):
print("Checksum OK")
break
@ -173,7 +180,10 @@ class UpgradeTest:
expected_checksum, actual_checkum))
if attempt_cnt == MAX_DOWNLOAD_ATTEMPTS-1:
assert False, "Failed to download {} after {} attempts".format(
local_file, MAX_DOWNLOAD_ATTEMPTS)
local_file_tmp, MAX_DOWNLOAD_ATTEMPTS)
os.rename(local_file_tmp, local_file)
os.remove(local_sha256)
if makeExecutable:
make_executable(local_file)