diff --git a/bindings/c/test/unit/unit_tests.cpp b/bindings/c/test/unit/unit_tests.cpp index 5ec1c6cec2..4f258eac41 100644 --- a/bindings/c/test/unit/unit_tests.cpp +++ b/bindings/c/test/unit/unit_tests.cpp @@ -44,8 +44,6 @@ #include "fdbclient/Tuple.h" #include "flow/config.h" -#include "flow/DeterministicRandom.h" -#include "flow/IRandom.h" #include "fdb_api.hpp" @@ -2023,17 +2021,15 @@ TEST_CASE("fdb_transaction_add_conflict_range") { TEST_CASE("special-key-space valid transaction ID") { auto value = get_value("\xff\xff/tracing/transaction_id", /* snapshot */ false, {}); REQUIRE(value.has_value()); - UID transaction_id = UID::fromString(value.value()); - CHECK(transaction_id.first() > 0); - CHECK(transaction_id.second() > 0); + uint64_t transaction_id = std::stoul(value.value()); + CHECK(transaction_id > 0); } TEST_CASE("special-key-space custom transaction ID") { fdb::Transaction tr(db); fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0)); while (1) { - UID randomTransactionID = UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()); - tr.set("\xff\xff/tracing/transaction_id", randomTransactionID.toString()); + tr.set("\xff\xff/tracing/transaction_id", std::to_string(ULONG_MAX)); fdb::ValueFuture f1 = tr.get("\xff\xff/tracing/transaction_id", /* snapshot */ false); @@ -2050,8 +2046,8 @@ TEST_CASE("special-key-space custom transaction ID") { fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen)); REQUIRE(out_present); - UID transaction_id = UID::fromString(val); - CHECK(transaction_id == randomTransactionID); + uint64_t transaction_id = std::stoul(std::string(val, vallen)); + CHECK(transaction_id == ULONG_MAX); break; } } @@ -2078,9 +2074,8 @@ TEST_CASE("special-key-space set transaction ID after write") { fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen)); REQUIRE(out_present); - UID transaction_id = UID::fromString(val); - CHECK(transaction_id.first() > 0); - CHECK(transaction_id.second() > 0); + uint64_t transaction_id = std::stoul(std::string(val, vallen)); + CHECK(transaction_id != 0); break; } } @@ -2145,9 +2140,7 @@ TEST_CASE("special-key-space tracing get range") { CHECK(out_count == 2); CHECK(std::string((char*)out_kv[1].key, out_kv[1].key_length) == tracingBegin + "transaction_id"); - UID transaction_id = UID::fromString(std::string((char*)out_kv[1].value)); - CHECK(transaction_id.first() > 0); - CHECK(transaction_id.second() > 0); + CHECK(std::stoul(std::string((char*)out_kv[1].value, out_kv[1].value_length)) > 0); break; } } diff --git a/fdbclient/CommitProxyInterface.h b/fdbclient/CommitProxyInterface.h index 149e77521d..8d068926eb 100644 --- a/fdbclient/CommitProxyInterface.h +++ b/fdbclient/CommitProxyInterface.h @@ -162,7 +162,7 @@ struct CommitTransactionRequest : TimedRequest { bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; } Arena arena; - SpanContext spanContext; + SpanID spanContext; CommitTransactionRef transaction; ReplyPromise reply; uint32_t flags; @@ -172,8 +172,8 @@ struct CommitTransactionRequest : TimedRequest { TenantInfo tenantInfo; - CommitTransactionRequest() : CommitTransactionRequest(SpanContext()) {} - CommitTransactionRequest(SpanContext const& context) : spanContext(context), flags(0) {} + CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {} + CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {} template void serialize(Ar& ar) { @@ -242,7 +242,7 @@ struct GetReadVersionRequest : TimedRequest { FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE, }; - SpanContext spanContext; + SpanID spanContext; uint32_t transactionCount; uint32_t flags; TransactionPriority priority; @@ -255,7 +255,7 @@ struct GetReadVersionRequest : TimedRequest { Version maxVersion; // max version in the client's version vector cache GetReadVersionRequest() : transactionCount(1), flags(0), maxVersion(invalidVersion) {} - GetReadVersionRequest(SpanContext spanContext, + GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority, Version maxVersion, @@ -325,7 +325,7 @@ struct GetKeyServerLocationsReply { struct GetKeyServerLocationsRequest { constexpr static FileIdentifier file_identifier = 9144680; Arena arena; - SpanContext spanContext; + SpanID spanContext; Optional tenant; KeyRef begin; Optional end; @@ -340,7 +340,7 @@ struct GetKeyServerLocationsRequest { Version minTenantVersion; GetKeyServerLocationsRequest() : limit(0), reverse(false), minTenantVersion(latestVersion) {} - GetKeyServerLocationsRequest(SpanContext spanContext, + GetKeyServerLocationsRequest(SpanID spanContext, Optional const& tenant, KeyRef const& begin, Optional const& end, @@ -378,12 +378,12 @@ struct GetRawCommittedVersionReply { struct GetRawCommittedVersionRequest { constexpr static FileIdentifier file_identifier = 12954034; - SpanContext spanContext; + SpanID spanContext; Optional debugID; ReplyPromise reply; Version maxVersion; // max version in the grv proxy's version vector cache - explicit GetRawCommittedVersionRequest(SpanContext spanContext, + explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional const& debugID = Optional(), Version maxVersion = invalidVersion) : spanContext(spanContext), debugID(debugID), maxVersion(maxVersion) {} diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 91bccaf7ba..53c87c43bd 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -24,7 +24,6 @@ #include "fdbclient/FDBTypes.h" #include "fdbclient/Knobs.h" -#include "flow/Tracing.h" // The versioned message has wire format : -1, version, messages static const int32_t VERSION_HEADER = -1; @@ -78,7 +77,6 @@ struct MutationRef { AndV2, CompareAndClear, Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */, - Reserved_For_OTELSpanContextMessage, MAX_ATOMIC_OP }; // This is stored this way for serialization purposes. @@ -192,7 +190,7 @@ struct CommitTransactionRef { Version read_snapshot = 0; bool report_conflicting_keys = false; bool lock_aware = false; // set when metadata mutations are present - Optional spanContext; + Optional spanContext; template force_inline void serialize(Ar& ar) { diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 11f5b1beb7..532bc1a096 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -141,7 +141,7 @@ struct WatchParameters : public ReferenceCounted { const Version version; const TagSet tags; - const SpanContext spanContext; + const SpanID spanID; const TaskPriority taskID; const Optional debugID; const UseProvisionalProxies useProvisionalProxies; @@ -151,11 +151,11 @@ struct WatchParameters : public ReferenceCounted { Optional value, Version version, TagSet tags, - SpanContext spanContext, + SpanID spanID, TaskPriority taskID, Optional debugID, UseProvisionalProxies useProvisionalProxies) - : tenant(tenant), key(key), value(value), version(version), tags(tags), spanContext(spanContext), taskID(taskID), + : tenant(tenant), key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID), debugID(debugID), useProvisionalProxies(useProvisionalProxies) {} }; @@ -416,12 +416,12 @@ public: Optional defaultTenant; struct VersionRequest { - SpanContext spanContext; + SpanID spanContext; Promise reply; TagSet tags; Optional debugID; - VersionRequest(SpanContext spanContext, TagSet tags = TagSet(), Optional debugID = Optional()) + VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional debugID = Optional()) : spanContext(spanContext), tags(tags), debugID(debugID) {} }; diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 9f89237b51..9683c7e27f 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -29,10 +29,30 @@ #include #include +#include "flow/Arena.h" #include "flow/FastRef.h" #include "flow/ProtocolVersion.h" #include "flow/flow.h" +enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 }; + +inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) { + return static_cast(static_cast>(lhs) & + static_cast>(rhs)); +} + +struct SpanContext { + UID traceID; + uint64_t spanID; + TraceFlags m_Flags; + SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {} + SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {} + SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {} + SpanContext(Arena arena, const SpanContext& span) + : traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {} + bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; } +}; + typedef int64_t Version; typedef uint64_t LogEpoch; typedef uint64_t Sequence; diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index e1861432a1..91ef38eeae 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -27,7 +27,6 @@ #include "fdbclient/FDBTypes.h" #include "fdbclient/Tenant.h" -#include "flow/Tracing.h" #include "flow/ThreadHelper.actor.h" struct VersionVector; @@ -97,11 +96,11 @@ public: virtual ThreadFuture commit() = 0; virtual Version getCommittedVersion() = 0; - // @todo This API and the "getSpanContext()" API may help with debugging simulation + // @todo This API and the "getSpanID()" API may help with debugging simulation // test failures. (These APIs are not currently invoked anywhere.) Remove them // later if they are not really needed. virtual VersionVector getVersionVector() = 0; - virtual SpanContext getSpanContext() = 0; + virtual UID getSpanID() = 0; virtual ThreadFuture getApproximateSize() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) = 0; diff --git a/fdbclient/IConfigTransaction.h b/fdbclient/IConfigTransaction.h index 8f21679e27..63e058ee4c 100644 --- a/fdbclient/IConfigTransaction.h +++ b/fdbclient/IConfigTransaction.h @@ -45,7 +45,7 @@ public: // Not implemented: void setVersion(Version) override { throw client_invalid_operation(); } VersionVector getVersionVector() const override { throw client_invalid_operation(); } - SpanContext getSpanContext() const override { throw client_invalid_operation(); } + UID getSpanID() const override { throw client_invalid_operation(); } Future getKey(KeySelector const& key, Snapshot snapshot = Snapshot::False) override { throw client_invalid_operation(); } diff --git a/fdbclient/ISingleThreadTransaction.h b/fdbclient/ISingleThreadTransaction.h index 19beb4e5df..bb5a4913f1 100644 --- a/fdbclient/ISingleThreadTransaction.h +++ b/fdbclient/ISingleThreadTransaction.h @@ -95,7 +95,7 @@ public: virtual Future commit() = 0; virtual Version getCommittedVersion() const = 0; virtual VersionVector getVersionVector() const = 0; - virtual SpanContext getSpanContext() const = 0; + virtual UID getSpanID() const = 0; virtual int64_t getApproximateSize() const = 0; virtual Future> getVersionstamp() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index b7418592ce..a55695253d 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -1111,13 +1111,13 @@ VersionVector MultiVersionTransaction::getVersionVector() { return VersionVector(); } -SpanContext MultiVersionTransaction::getSpanContext() { +UID MultiVersionTransaction::getSpanID() { auto tr = getTransaction(); if (tr.transaction) { - return tr.transaction->getSpanContext(); + return tr.transaction->getSpanID(); } - return SpanContext(); + return UID(); } ThreadFuture MultiVersionTransaction::getApproximateSize() { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 1fb5c604ff..b9d7a20659 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -378,7 +378,7 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; VersionVector getVersionVector() override; - SpanContext getSpanContext() override { return SpanContext(); }; + UID getSpanID() override { return UID(); }; ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) override; @@ -567,7 +567,7 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; VersionVector getVersionVector() override; - SpanContext getSpanContext() override; + UID getSpanID() override; ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) override; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index fb3e3e81a1..6148e9ad4b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -21,7 +21,6 @@ #include "fdbclient/NativeAPI.actor.h" #include -#include #include #include #include @@ -849,9 +848,7 @@ ACTOR Future assertFailure(GrvProxyInterface remote, Future attemptGRVFromOldProxies(std::vector oldProxies, std::vector newProxies) { - auto debugID = nondeterministicRandom()->randomUniqueID(); - g_traceBatch.addEvent("AttemptGRVFromOldProxyDebug", debugID.first(), "NativeAPI.attemptGRVFromOldProxies.Start"); - Span span("VerifyCausalReadRisky"_loc); + Span span(deterministicRandom()->randomUniqueID(), "VerifyCausalReadRisky"_loc); std::vector> replies; replies.reserve(oldProxies.size()); GetReadVersionRequest req( @@ -2792,13 +2789,13 @@ void updateTagMappings(Database cx, const GetKeyServerLocationsReply& reply) { ACTOR Future getKeyLocation_internal(Database cx, Optional tenant, Key key, - SpanContext spanContext, + SpanID spanID, Optional debugID, UseProvisionalProxies useProvisionalProxies, Reverse isBackward, Version version) { - state Span span("NAPI:getKeyLocation"_loc, spanContext); + state Span span("NAPI:getKeyLocation"_loc, spanID); if (isBackward) { ASSERT(key != allKeys.begin && key <= allKeys.end); } else { @@ -2886,7 +2883,7 @@ Future getKeyLocation(Database const& cx, Optional const& tenant, Key const& key, F StorageServerInterface::*member, - SpanContext spanContext, + SpanID spanID, Optional debugID, UseProvisionalProxies useProvisionalProxies, Reverse isBackward, @@ -2894,8 +2891,7 @@ Future getKeyLocation(Database const& cx, // we first check whether this range is cached Optional locationInfo = cx->getCachedLocation(tenant, key, isBackward); if (!locationInfo.present()) { - return getKeyLocation_internal( - cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version); + return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version); } bool onlyEndpointFailedAndNeedRefresh = false; @@ -2909,8 +2905,7 @@ Future getKeyLocation(Database const& cx, cx->invalidateCache(locationInfo.get().tenantEntry.prefix, key); // Refresh the cache with a new getKeyLocations made to proxies. - return getKeyLocation_internal( - cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version); + return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version); } return locationInfo.get(); @@ -2927,7 +2922,7 @@ Future getKeyLocation(Reference trState, useTenant ? trState->tenant() : Optional(), key, member, - trState->spanContext, + trState->spanID, trState->debugID, trState->useProvisionalProxies, isBackward, @@ -2949,11 +2944,11 @@ ACTOR Future> getKeyRangeLocations_internal( KeyRange keys, int limit, Reverse reverse, - SpanContext spanContext, + SpanID spanID, Optional debugID, UseProvisionalProxies useProvisionalProxies, Version version) { - state Span span("NAPI:getKeyRangeLocations"_loc, spanContext); + state Span span("NAPI:getKeyRangeLocations"_loc, spanID); if (debugID.present()) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before"); @@ -3023,7 +3018,7 @@ Future> getKeyRangeLocations(Database const& c int limit, Reverse reverse, F StorageServerInterface::*member, - SpanContext const& spanContext, + SpanID const& spanID, Optional const& debugID, UseProvisionalProxies useProvisionalProxies, Version version) { @@ -3033,7 +3028,7 @@ Future> getKeyRangeLocations(Database const& c std::vector locations; if (!cx->getCachedLocations(tenant, keys, locations, limit, reverse)) { return getKeyRangeLocations_internal( - cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version); + cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version); } bool foundFailed = false; @@ -3054,7 +3049,7 @@ Future> getKeyRangeLocations(Database const& c if (foundFailed) { // Refresh the cache with a new getKeyRangeLocations made to proxies. return getKeyRangeLocations_internal( - cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version); + cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version); } return locations; @@ -3074,7 +3069,7 @@ Future> getKeyRangeLocations(ReferencespanContext, + trState->spanID, trState->debugID, trState->useProvisionalProxies, version); @@ -3103,7 +3098,7 @@ ACTOR Future warmRange_impl(Reference trState, KeyRange keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, Reverse::False, - trState->spanContext, + trState->spanID, trState->debugID, trState->useProvisionalProxies, version)); @@ -3134,35 +3129,38 @@ ACTOR Future warmRange_impl(Reference trState, KeyRange return Void(); } -SpanContext generateSpanID(bool transactionTracingSample, SpanContext parentContext = SpanContext()) { +SpanID generateSpanID(bool transactionTracingSample, SpanID parentContext = SpanID()) { + uint64_t txnId = deterministicRandom()->randomUInt64(); if (parentContext.isValid()) { - return SpanContext(parentContext.traceID, deterministicRandom()->randomUInt64(), parentContext.m_Flags); + if (parentContext.first() > 0) { + txnId = parentContext.first(); + } + uint64_t tokenId = parentContext.second() > 0 ? deterministicRandom()->randomUInt64() : 0; + return SpanID(txnId, tokenId); + } else if (transactionTracingSample) { + uint64_t tokenId = deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE + ? deterministicRandom()->randomUInt64() + : 0; + return SpanID(txnId, tokenId); + } else { + return SpanID(txnId, 0); } - if (transactionTracingSample) { - return SpanContext(deterministicRandom()->randomUniqueID(), - deterministicRandom()->randomUInt64(), - deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE - ? TraceFlags::sampled - : TraceFlags::unsampled); - } - return SpanContext( - deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64(), TraceFlags::unsampled); } TransactionState::TransactionState(Database cx, Optional tenant, TaskPriority taskID, - SpanContext spanContext, + SpanID spanID, Reference trLogInfo) - : cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanContext(spanContext), - readVersionObtainedFromGrvProxy(true), tenant_(tenant), tenantSet(tenant.present()) {} + : cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID), readVersionObtainedFromGrvProxy(true), + tenant_(tenant), tenantSet(tenant.present()) {} Reference TransactionState::cloneAndReset(Reference newTrLogInfo, bool generateNewSpan) const { - SpanContext newSpanContext = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanContext; + SpanID newSpanID = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanID; Reference newState = - makeReference(cx, tenant_, cx->taskID, newSpanContext, newTrLogInfo); + makeReference(cx, tenant_, cx->taskID, newSpanID, newTrLogInfo); if (!cx->apiVersionAtLeast(16)) { newState->options = options; @@ -3220,12 +3218,12 @@ ACTOR Future> getValue(Reference trState, UseTenant useTenant, TransactionRecordLogInfo recordLogInfo) { state Version ver = wait(version); - state Span span("NAPI:getValue"_loc, trState->spanContext); + state Span span("NAPI:getValue"_loc, trState->spanID); if (useTenant && trState->tenant().present()) { - span.addAttribute("tenant"_sr, trState->tenant().get()); + span.addTag("tenant"_sr, trState->tenant().get()); } - span.addAttribute("key"_sr, key); + span.addTag("key"_sr, key); trState->cx->validateVersion(ver); loop { @@ -3351,7 +3349,7 @@ ACTOR Future getKey(Reference trState, wait(success(version)); state Optional getKeyID = Optional(); - state Span span("NAPI:getKey"_loc, trState->spanContext); + state Span span("NAPI:getKey"_loc, trState->spanID); if (trState->debugID.present()) { getKeyID = nondeterministicRandom()->randomUniqueID(); @@ -3450,8 +3448,8 @@ ACTOR Future getKey(Reference trState, } } -ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanContext spanContext) { - state Span span("NAPI:waitForCommittedVersion"_loc, spanContext); +ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanID spanContext) { + state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext }); try { loop { choose { @@ -3485,14 +3483,14 @@ ACTOR Future waitForCommittedVersion(Database cx, Version version, Span } ACTOR Future getRawVersion(Reference trState) { - state Span span("NAPI:getRawVersion"_loc, { trState->spanContext }); + state Span span("NAPI:getRawVersion"_loc, { trState->spanID }); loop { choose { when(wait(trState->cx->onProxiesChanged())) {} when(GetReadVersionReply v = wait(basicLoadBalance(trState->cx->getGrvProxies(UseProvisionalProxies::False), &GrvProxyInterface::getConsistentReadVersion, - GetReadVersionRequest(trState->spanContext, + GetReadVersionRequest(trState->spanID, 0, TransactionPriority::IMMEDIATE, trState->cx->ssVersionVectorCache.getMaxVersion()), @@ -3514,7 +3512,7 @@ ACTOR Future readVersionBatcher( uint32_t flags); ACTOR Future watchValue(Database cx, Reference parameters) { - state Span span("NAPI:watchValue"_loc, parameters->spanContext); + state Span span("NAPI:watchValue"_loc, parameters->spanID); state Version ver = parameters->version; cx->validateVersion(parameters->version); ASSERT(parameters->version != latestVersion); @@ -3524,7 +3522,7 @@ ACTOR Future watchValue(Database cx, Reference p parameters->tenant.name, parameters->key, &StorageServerInterface::watchValue, - parameters->spanContext, + parameters->spanID, parameters->debugID, parameters->useProvisionalProxies, Reverse::False, @@ -3743,15 +3741,15 @@ ACTOR Future watchValueMap(Future version, Optional value, Database cx, TagSet tags, - SpanContext spanContext, + SpanID spanID, TaskPriority taskID, Optional debugID, UseProvisionalProxies useProvisionalProxies) { state Version ver = wait(version); - wait(getWatchFuture(cx, - makeReference( - tenant, key, value, ver, tags, spanContext, taskID, debugID, useProvisionalProxies))); + wait(getWatchFuture( + cx, + makeReference(tenant, key, value, ver, tags, spanID, taskID, debugID, useProvisionalProxies))); return Void(); } @@ -3797,11 +3795,10 @@ Future getExactRange(Reference trState, Reverse reverse, UseTenant useTenant) { state RangeResultFamily output; - // TODO - ljoswiak parent or link? - state Span span("NAPI:getExactRange"_loc, trState->spanContext); + state Span span("NAPI:getExactRange"_loc, trState->spanID); if (useTenant && trState->tenant().present()) { - span.addAttribute("tenant"_sr, trState->tenant().get()); + span.addTag("tenant"_sr, trState->tenant().get()); } // printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str()); @@ -4158,9 +4155,9 @@ Future getRange(Reference trState, state KeySelector originalBegin = begin; state KeySelector originalEnd = end; state RangeResultFamily output; - state Span span("NAPI:getRange"_loc, trState->spanContext); + state Span span("NAPI:getRange"_loc, trState->spanID); if (useTenant && trState->tenant().present()) { - span.addAttribute("tenant"_sr, trState->tenant().get()); + span.addTag("tenant"_sr, trState->tenant().get()); } try { @@ -4634,7 +4631,7 @@ ACTOR Future getRangeStreamFragment(Reference trState, GetRangeLimits limits, Snapshot snapshot, Reverse reverse, - SpanContext spanContext) { + SpanID spanContext) { loop { state std::vector locations = wait(getKeyRangeLocations(trState, @@ -4927,7 +4924,7 @@ ACTOR Future getRangeStream(Reference trState, // FIXME: better handling to disable row limits ASSERT(!limits.hasRowLimit()); - state Span span("NAPI:getRangeStream"_loc, trState->spanContext); + state Span span("NAPI:getRangeStream"_loc, trState->spanID); state Version version = wait(fVersion); trState->cx->validateVersion(version); @@ -5050,7 +5047,7 @@ Transaction::Transaction(Database const& cx, Optional const& tenant) cx->taskID, generateSpanID(cx->transactionTracingSample), createTrLogInfoProbabilistically(cx))), - span(trState->spanContext, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanContext) { + span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) { if (DatabaseContext::debugUseTags) { debugAddTags(trState); } @@ -5185,7 +5182,7 @@ ACTOR Future watch(Reference watch, Database cx, Future tenant, TagSet tags, - SpanContext spanContext, + SpanID spanID, TaskPriority taskID, Optional debugID, UseProvisionalProxies useProvisionalProxies) { @@ -5213,7 +5210,7 @@ ACTOR Future watch(Reference watch, watch->value, cx, tags, - spanContext, + spanID, taskID, debugID, useProvisionalProxies); @@ -5246,7 +5243,7 @@ Future Transaction::watch(Reference watch) { populateAndGetTenant( trState, watch->key, readVersion.isValid() && readVersion.isReady() ? readVersion.get() : latestVersion), trState->options.readTags, - trState->spanContext, + trState->spanID, trState->taskID, trState->debugID, trState->useProvisionalProxies); @@ -5734,7 +5731,7 @@ void TransactionOptions::reset(Database const& cx) { void Transaction::resetImpl(bool generateNewSpan) { flushTrLogsIfEnabled(); trState = trState->cloneAndReset(createTrLogInfoProbabilistically(trState->cx), generateNewSpan); - tr = CommitTransactionRequest(trState->spanContext); + tr = CommitTransactionRequest(trState->spanID); readVersion = Future(); metadataVersion = Promise>(); extraConflictRanges.clear(); @@ -5749,7 +5746,7 @@ void Transaction::reset() { void Transaction::fullReset() { resetImpl(true); - span = Span(trState->spanContext, "Transaction"_loc); + span = Span(trState->spanID, "Transaction"_loc); backoff = CLIENT_KNOBS->DEFAULT_BACKOFF; } @@ -5870,7 +5867,8 @@ ACTOR void checkWrites(Reference trState, ACTOR static Future commitDummyTransaction(Reference trState, KeyRange range) { state Transaction tr(trState->cx); state int retries = 0; - state Span span(trState->spanContext, "NAPI:dummyTransaction"_loc, span.context); + state Span span("NAPI:dummyTransaction"_loc, trState->spanID); + tr.span.addParent(span.context); loop { try { TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries); @@ -5913,7 +5911,7 @@ void Transaction::setupWatches() { watches[i]->value, trState->cx, trState->options.readTags, - trState->spanContext, + trState->spanID, trState->taskID, trState->debugID, trState->useProvisionalProxies)); @@ -6036,7 +6034,7 @@ ACTOR static Future tryCommit(Reference trState, Future readVersion) { state TraceInterval interval("TransactionCommit"); state double startTime = now(); - state Span span("NAPI:tryCommit"_loc, trState->spanContext); + state Span span("NAPI:tryCommit"_loc, trState->spanID); state Optional debugID = trState->debugID; if (debugID.present()) { TraceEvent(interval.begin()).detail("Parent", debugID.get()); @@ -6526,11 +6524,10 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional(value.get(), IncludeVersion())); + span.addParent(BinaryReader::fromStringRef(value.get(), Unversioned())); break; case FDBTransactionOptions::REPORT_CONFLICTING_KEYS: @@ -6573,7 +6570,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional getConsistentReadVersion(SpanContext parentSpan, +ACTOR Future getConsistentReadVersion(SpanID parentSpan, DatabaseContext* cx, uint32_t transactionCount, TransactionPriority priority, @@ -6688,7 +6685,7 @@ ACTOR Future readVersionBatcher(DatabaseContext* cx, } g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first()); } - span.addLink(req.spanContext); + span.addParent(req.spanContext); requests.push_back(req.reply); for (auto tag : req.tags) { ++tags[tag]; @@ -6744,10 +6741,10 @@ ACTOR Future readVersionBatcher(DatabaseContext* cx, ACTOR Future extractReadVersion(Reference trState, Location location, - SpanContext spanContext, + SpanID spanContext, Future f, Promise> metadataVersion) { - state Span span(spanContext, location, trState->spanContext); + state Span span(spanContext, location, { trState->spanID }); GetReadVersionReply rep = wait(f); double replyTime = now(); double latency = replyTime - trState->startTime; @@ -6920,7 +6917,7 @@ Future Transaction::getReadVersion(uint32_t flags) { } Location location = "NAPI:getReadVersion"_loc; - SpanContext spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanContext); + UID spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanID); auto const req = DatabaseContext::VersionRequest(spanContext, trState->options.tags, trState->debugID); batcher.stream.send(req); trState->startTime = now(); @@ -7396,7 +7393,7 @@ ACTOR Future>> getRangeSplitPoints(ReferencespanContext); + state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanID); loop { state std::vector locations = @@ -7960,14 +7957,14 @@ Reference Transaction::createTrLogInfoProbabilistically(cons return Reference(); } -void Transaction::setTransactionID(UID id) { +void Transaction::setTransactionID(uint64_t id) { ASSERT(getSize() == 0); - trState->spanContext = SpanContext(id, trState->spanContext.spanID); + trState->spanID = SpanID(id, trState->spanID.second()); } void Transaction::setToken(uint64_t token) { ASSERT(getSize() == 0); - trState->spanContext = SpanContext(trState->spanContext.traceID, token); + trState->spanID = SpanID(trState->spanID.first(), token); } void enableClientInfoLogging() { diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 1b00313d68..456af4c746 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -243,7 +243,7 @@ struct TransactionState : ReferenceCounted { Optional debugID; TaskPriority taskID; - SpanContext spanContext; + SpanID spanID; UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False; bool readVersionObtainedFromGrvProxy; @@ -259,14 +259,13 @@ struct TransactionState : ReferenceCounted { std::shared_ptr> conflictingKeys; // Only available so that Transaction can have a default constructor, for use in state variables - TransactionState(TaskPriority taskID, SpanContext spanContext) - : taskID(taskID), spanContext(spanContext), tenantSet(false) {} + TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID), tenantSet(false) {} // VERSION_VECTOR changed default values of readVersionObtainedFromGrvProxy TransactionState(Database cx, Optional tenant, TaskPriority taskID, - SpanContext spanContext, + SpanID spanID, Reference trLogInfo); Reference cloneAndReset(Reference newTrLogInfo, bool generateNewSpan) const; @@ -436,7 +435,7 @@ public: void debugTransaction(UID dID) { trState->debugID = dID; } VersionVector getVersionVector() const; - SpanContext getSpanContext() const { return trState->spanContext; } + UID getSpanID() const { return trState->spanID; } Future commitMutations(); void setupWatches(); @@ -448,7 +447,7 @@ public: Database getDatabase() const { return trState->cx; } static Reference createTrLogInfoProbabilistically(const Database& cx); - void setTransactionID(UID id); + void setTransactionID(uint64_t id); void setToken(uint64_t token); const std::vector>>& getExtraReadConflictRanges() const { return extraConflictRanges; } @@ -491,7 +490,7 @@ private: Future committing; }; -ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanContext spanContext); +ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanID spanContext); ACTOR Future>> waitDataDistributionMetricsList(Database cx, KeyRange keys, int shardLimit); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 86609a76cd..a16034963b 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1982,7 +1982,7 @@ void ReadYourWritesTransaction::getWriteConflicts(KeyRangeMap* result) { } } -void ReadYourWritesTransaction::setTransactionID(UID id) { +void ReadYourWritesTransaction::setTransactionID(uint64_t id) { tr.setTransactionID(id); } diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index e67b5334f7..341dc4e2a1 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -140,7 +140,7 @@ public: [[nodiscard]] Future commit() override; Version getCommittedVersion() const override { return tr.getCommittedVersion(); } VersionVector getVersionVector() const override { return tr.getVersionVector(); } - SpanContext getSpanContext() const override { return tr.getSpanContext(); } + UID getSpanID() const override { return tr.getSpanID(); } int64_t getApproximateSize() const override { return approximateSize; } [[nodiscard]] Future> getVersionstamp() override; @@ -177,7 +177,7 @@ public: Reference getTransactionState() const { return tr.trState; } - void setTransactionID(UID id); + void setTransactionID(uint64_t id); void setToken(uint64_t token); // Read from the special key space readConflictRangeKeysRange diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 6415604d86..0284283d2f 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -695,6 +695,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( }, "cluster_controller_timestamp":1415650089, "protocol_version":"fdb00a400050001", + "newest_protocol_version":"fdb00a500040001", + "lowest_compatible_protocol_version":"fdb00a500040001", "connection_string":"a:a@127.0.0.1:4000", "full_replication":true, "maintenance_zone":"0ccb4e0fdbdb5583010f6b77d9d10ece", diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 32cd7ba509..7c95e3aa02 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1595,10 +1595,10 @@ Future TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, if (key.endsWith(kTracingTransactionIdKey)) { result.push_back_deep(result.arena(), - KeyValueRef(key, ryw->getTransactionState()->spanContext.traceID.toString())); + KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.first()))); } else if (key.endsWith(kTracingTokenKey)) { result.push_back_deep(result.arena(), - KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanContext.spanID))); + KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.second()))); } } return result; @@ -1612,7 +1612,7 @@ void TracingOptionsImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, } if (key.endsWith(kTracingTransactionIdKey)) { - ryw->setTransactionID(UID::fromString(value.toString())); + ryw->setTransactionID(std::stoul(value.toString())); } else if (key.endsWith(kTracingTokenKey)) { if (value.toString() == "true") { ryw->setToken(deterministicRandom()->randomUInt64()); diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index cda6a32b66..13ba8f1e18 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -35,7 +35,6 @@ #include "fdbclient/CommitTransaction.h" #include "fdbclient/TagThrottle.actor.h" #include "fdbclient/Tenant.h" -#include "flow/Tracing.h" #include "flow/UnitTest.h" #include "fdbclient/VersionVector.h" @@ -272,7 +271,7 @@ struct GetValueReply : public LoadBalancedReply { struct GetValueRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 8454530; - SpanContext spanContext; + SpanID spanContext; TenantInfo tenantInfo; Key key; Version version; @@ -284,7 +283,7 @@ struct GetValueRequest : TimedRequest { // serve the given key GetValueRequest() {} - GetValueRequest(SpanContext spanContext, + GetValueRequest(SpanID spanContext, const TenantInfo& tenantInfo, const Key& key, Version ver, @@ -316,7 +315,7 @@ struct WatchValueReply { struct WatchValueRequest { constexpr static FileIdentifier file_identifier = 14747733; - SpanContext spanContext; + SpanID spanContext; TenantInfo tenantInfo; Key key; Optional value; @@ -327,7 +326,7 @@ struct WatchValueRequest { WatchValueRequest() {} - WatchValueRequest(SpanContext spanContext, + WatchValueRequest(SpanID spanContext, TenantInfo tenantInfo, const Key& key, Optional value, @@ -361,7 +360,7 @@ struct GetKeyValuesReply : public LoadBalancedReply { struct GetKeyValuesRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 6795746; - SpanContext spanContext; + SpanID spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef begin, end; @@ -419,7 +418,7 @@ struct GetMappedKeyValuesReply : public LoadBalancedReply { struct GetMappedKeyValuesRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 6795747; - SpanContext spanContext; + SpanID spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef begin, end; @@ -484,7 +483,7 @@ struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply { struct GetKeyValuesStreamRequest { constexpr static FileIdentifier file_identifier = 6795746; - SpanContext spanContext; + SpanID spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef begin, end; @@ -535,7 +534,7 @@ struct GetKeyReply : public LoadBalancedReply { struct GetKeyRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 10457870; - SpanContext spanContext; + SpanID spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef sel; @@ -549,7 +548,7 @@ struct GetKeyRequest : TimedRequest { GetKeyRequest() {} - GetKeyRequest(SpanContext spanContext, + GetKeyRequest(SpanID spanContext, TenantInfo tenantInfo, KeySelectorRef const& sel, Version version, @@ -836,7 +835,7 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply { struct ChangeFeedStreamRequest { constexpr static FileIdentifier file_identifier = 6795746; - SpanContext spanContext; + SpanID spanContext; Arena arena; Key rangeID; Version begin = 0; diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 255fccc6c6..af9ba32a31 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -619,6 +619,19 @@ StorageServerInterface decodeServerListValue(ValueRef const& value) { return decodeServerListValueFB(value); } +Value swVersionValue(SWVersion const& swversion) { + auto protocolVersion = currentProtocolVersion; + protocolVersion.addObjectSerializerFlag(); + return ObjectWriter::toValue(swversion, IncludeVersion(protocolVersion)); +} + +SWVersion decodeSWVersionValue(ValueRef const& value) { + SWVersion s; + ObjectReader reader(value.begin(), IncludeVersion()); + reader.deserialize(s); + return s; +} + // processClassKeys.contains(k) iff k.startsWith( processClassKeys.begin ) because '/'+1 == '0' const KeyRangeRef processClassKeys(LiteralStringRef("\xff/processClass/"), LiteralStringRef("\xff/processClass0")); const KeyRef processClassPrefix = processClassKeys.begin; diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 4069c68162..705caf98f1 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -205,6 +205,9 @@ const Value serverListValue(StorageServerInterface const&); UID decodeServerListKey(KeyRef const&); StorageServerInterface decodeServerListValue(ValueRef const&); +Value swVersionValue(SWVersion const& swversion); +SWVersion decodeSWVersionValue(ValueRef const&); + // "\xff/processClass/[[processID]]" := "[[ProcessClass]]" // Contains a mapping from processID to processClass extern const KeyRangeRef processClassKeys; diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index c796f02536..84ab49504b 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -465,8 +465,8 @@ VersionVector ThreadSafeTransaction::getVersionVector() { return tr->getVersionVector(); } -SpanContext ThreadSafeTransaction::getSpanContext() { - return tr->getSpanContext(); +UID ThreadSafeTransaction::getSpanID() { + return tr->getSpanID(); } ThreadFuture ThreadSafeTransaction::getApproximateSize() { diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index a187bb2f45..0ace0a2cfe 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -167,7 +167,7 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; VersionVector getVersionVector() override; - SpanContext getSpanContext() override; + UID getSpanID() override; ThreadFuture getApproximateSize() override; ThreadFuture getProtocolVersion(); diff --git a/fdbclient/TransactionLineage.h b/fdbclient/TransactionLineage.h index 04492db4ba..6eed26b805 100644 --- a/fdbclient/TransactionLineage.h +++ b/fdbclient/TransactionLineage.h @@ -34,13 +34,10 @@ struct TransactionLineage : LineageProperties { GetKeyServersLocations }; static constexpr std::string_view name = "Transaction"sv; - UID txID; + uint64_t txID; Operation operation = Operation::Unset; bool isSet(uint64_t TransactionLineage::*member) const { return this->*member > 0; } - bool isSet(UID TransactionLineage::*member) const { - return static_cast(this->*member).first() > 0 && static_cast(this->*member).second() > 0; - } bool isSet(Operation TransactionLineage::*member) const { return this->*member != Operation::Unset; } }; diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 290e83efd4..90f987021f 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -53,7 +53,7 @@ namespace { class ApplyMetadataMutationsImpl { public: - ApplyMetadataMutationsImpl(const SpanContext& spanContext_, + ApplyMetadataMutationsImpl(const SpanID& spanContext_, const UID& dbgid_, Arena& arena_, const VectorRef& mutations_, @@ -61,7 +61,7 @@ public: : spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_), confChange(dummyConfChange) {} - ApplyMetadataMutationsImpl(const SpanContext& spanContext_, + ApplyMetadataMutationsImpl(const SpanID& spanContext_, Arena& arena_, const VectorRef& mutations_, ProxyCommitData& proxyCommitData_, @@ -82,7 +82,7 @@ public: tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap), initialCommit(initialCommit_) {} - ApplyMetadataMutationsImpl(const SpanContext& spanContext_, + ApplyMetadataMutationsImpl(const SpanID& spanContext_, ResolverData& resolverData_, const VectorRef& mutations_) : spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_), @@ -94,7 +94,7 @@ public: private: // The following variables are incoming parameters - const SpanContext& spanContext; + const SpanID& spanContext; const UID& dbgid; @@ -1217,7 +1217,7 @@ public: } // anonymous namespace -void applyMetadataMutations(SpanContext const& spanContext, +void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena, Reference logSystem, @@ -1241,13 +1241,13 @@ void applyMetadataMutations(SpanContext const& spanContext, .apply(); } -void applyMetadataMutations(SpanContext const& spanContext, +void applyMetadataMutations(SpanID const& spanContext, ResolverData& resolverData, const VectorRef& mutations) { ApplyMetadataMutationsImpl(spanContext, resolverData, mutations).apply(); } -void applyMetadataMutations(SpanContext const& spanContext, +void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena, const VectorRef& mutations, diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index 23f9e3a2f9..d4e47e0946 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -87,7 +87,7 @@ Reference getStorageInfo(UID id, std::map>* storageCache, IKeyValueStore* txnStateStore); -void applyMetadataMutations(SpanContext const& spanContext, +void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena, Reference logSystem, @@ -97,7 +97,7 @@ void applyMetadataMutations(SpanContext const& spanContext, Version version, Version popVersion, bool initialCommit); -void applyMetadataMutations(SpanContext const& spanContext, +void applyMetadataMutations(SpanID const& spanContext, const UID& dbgid, Arena& arena, const VectorRef& mutations, @@ -140,7 +140,7 @@ inline bool containsMetadataMutation(const VectorRef& mutations) { } // Resolver's version -void applyMetadataMutations(SpanContext const& spanContext, +void applyMetadataMutations(SpanID const& spanContext, ResolverData& resolverData, const VectorRef& mutations); diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 8addd89f05..0ac5b56a7d 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -67,10 +67,6 @@ struct VersionedMessage { return false; if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) return false; - if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) { - TEST(true); // Returning false for OTELSpanContextMessage - return false; - } reader >> *m; return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index f6d56ebe41..01b3cd343e 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -88,7 +88,6 @@ set(FDBSERVER_SRCS OldTLogServer_4_6.actor.cpp OldTLogServer_6_0.actor.cpp OldTLogServer_6_2.actor.cpp - OTELSpanContextMessage.h OnDemandStore.actor.cpp OnDemandStore.h PaxosConfigConsumer.actor.cpp diff --git a/fdbserver/ClusterRecovery.actor.cpp b/fdbserver/ClusterRecovery.actor.cpp index d7c0bdf85c..10ed726809 100644 --- a/fdbserver/ClusterRecovery.actor.cpp +++ b/fdbserver/ClusterRecovery.actor.cpp @@ -25,6 +25,7 @@ #include "fdbserver/MasterInterface.h" #include "fdbserver/WaitFailure.h" +#include "flow/ProtocolVersion.h" #include "flow/actorcompiler.h" // This must be the last #include. static std::set const& normalClusterRecoveryErrors() { @@ -1407,6 +1408,11 @@ ACTOR Future clusterRecoveryCore(Reference self) { wait(self->cstate.read()); + if (self->cstate.prevDBState.lowestCompatibleProtocolVersion > currentProtocolVersion) { + TraceEvent(SevWarnAlways, "IncompatibleProtocolVersion", self->dbgid).log(); + throw internal_error(); + } + self->recoveryState = RecoveryState::LOCKING_CSTATE; TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), self->dbgid) .detail("StatusCode", RecoveryStatus::locking_coordinated_state) @@ -1462,8 +1468,21 @@ ACTOR Future clusterRecoveryCore(Reference self) { DBCoreState newState = self->cstate.myDBState; newState.recoveryCount++; + newState.recoveryCount++; + if (self->cstate.prevDBState.newestProtocolVersion.isInvalid() || + self->cstate.prevDBState.newestProtocolVersion < currentProtocolVersion) { + ASSERT(self->cstate.myDBState.lowestCompatibleProtocolVersion.isInvalid() || + !self->cstate.myDBState.newestProtocolVersion.isInvalid()); + newState.newestProtocolVersion = currentProtocolVersion; + newState.lowestCompatibleProtocolVersion = minCompatibleProtocolVersion; + } wait(self->cstate.write(newState) || recoverAndEndEpoch); + TraceEvent("ProtocolVersionCompatibilityChecked", self->dbgid) + .detail("NewestProtocolVersion", self->cstate.myDBState.newestProtocolVersion) + .detail("LowestCompatibleProtocolVersion", self->cstate.myDBState.lowestCompatibleProtocolVersion) + .trackLatest(self->swVersionCheckedEventHolder->trackingKey); + self->recoveryState = RecoveryState::RECRUITING; state std::vector seedServers; @@ -1611,7 +1630,7 @@ ACTOR Future clusterRecoveryCore(Reference self) { tr.set(recoveryCommitRequest.arena, clusterIdKey, BinaryWriter::toValue(self->clusterId, Unversioned())); } - applyMetadataMutations(SpanContext(), + applyMetadataMutations(SpanID(), self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), diff --git a/fdbserver/ClusterRecovery.actor.h b/fdbserver/ClusterRecovery.actor.h index d0deef911f..810bd35f7a 100644 --- a/fdbserver/ClusterRecovery.actor.h +++ b/fdbserver/ClusterRecovery.actor.h @@ -22,6 +22,7 @@ // When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source // version. +#include "flow/Trace.h" #include #if defined(NO_INTELLISENSE) && !defined(FDBSERVER_CLUSTERRECOVERY_ACTOR_G_H) @@ -244,6 +245,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted Future logger; + Reference swVersionCheckedEventHolder; Reference recoveredConfigEventHolder; Reference clusterRecoveryStateEventHolder; Reference clusterRecoveryGenerationsEventHolder; @@ -273,6 +275,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted backupWorkerDoneRequests("BackupWorkerDoneRequests", cc), getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc), reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc), + swVersionCheckedEventHolder(makeReference("SWVersionCompatibilityChecked")), recoveredConfigEventHolder(makeReference("RecoveredConfig")) { clusterRecoveryStateEventHolder = makeReference( getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME)); diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 1d90a1a40e..097c6f632f 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -358,7 +358,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, state int yieldBytes = 0; state BinaryWriter valueWriter(Unversioned()); - toCommit->addTransactionInfo(SpanContext()); + toCommit->addTransactionInfo(SpanID()); // Serialize the log range mutations within the map for (; logRangeMutation != logRangeMutations->cend(); ++logRangeMutation) { @@ -654,7 +654,7 @@ void CommitBatchContext::setupTraceBatch() { g_traceBatch.addAttach("CommitAttachID", tr.debugID.get().first(), debugID.get().first()); } - span.addLink(tr.spanContext); + span.addParent(tr.spanContext); } if (debugID.present()) { @@ -880,7 +880,7 @@ void applyMetadataEffect(CommitBatchContext* self) { committed = committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed; if (committed) { - applyMetadataMutations(SpanContext(), + applyMetadataMutations(SpanID(), *self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem, @@ -1300,7 +1300,8 @@ ACTOR Future postResolution(CommitBatchContext* self) { // simulation TEST(true); // Semi-committed pipeline limited by MVCC window //TraceEvent("ProxyWaitingForCommitted", pProxyCommitData->dbgid).detail("CommittedVersion", pProxyCommitData->committedVersion.get()).detail("NeedToCommit", commitVersion); - waitVersionSpan = Span("MP:overMaxReadTransactionLifeVersions"_loc, span.context); + waitVersionSpan = Span( + deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, { span.context }); choose { when(wait(pProxyCommitData->committedVersion.whenAtLeast( self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) { @@ -1680,7 +1681,7 @@ void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitDat ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) { // We can't respond to these requests until we have valid txnStateStore getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations; - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); wait(commitData->validState.getFuture()); wait(delay(0, TaskPriority::DefaultEndpoint)); @@ -2200,7 +2201,7 @@ ACTOR Future processCompleteTransactionStateRequest(TransactionStateResolv Arena arena; bool confChanges; - applyMetadataMutations(SpanContext(), + applyMetadataMutations(SpanID(), *pContext->pCommitData, arena, Reference(), diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index 7c06432498..0bb6e8936e 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -141,8 +141,13 @@ struct DBCoreState { DBRecoveryCount recoveryCount; // Increases with sequential successful recoveries. LogSystemType logSystemType; std::set pseudoLocalities; + ProtocolVersion newestProtocolVersion; + ProtocolVersion lowestCompatibleProtocolVersion; - DBCoreState() : logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty) {} + DBCoreState() + : logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty), + newestProtocolVersion(ProtocolVersion::invalidProtocolVersion), + lowestCompatibleProtocolVersion(ProtocolVersion::invalidProtocolVersion) {} std::vector getPriorCommittedLogServers() { std::vector priorCommittedLogServers; @@ -180,6 +185,9 @@ struct DBCoreState { if (ar.protocolVersion().hasShardedTxsTags()) { serializer(ar, txsTags); } + if (ar.protocolVersion().hasSWVersionTracking()) { + serializer(ar, newestProtocolVersion, lowestCompatibleProtocolVersion); + } } else if (ar.isDeserializing) { tLogs.push_back(CoreTLogSet()); serializer(ar, diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 95916ee7cd..ed1d232e43 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -2039,6 +2039,8 @@ public: } loop { + state Future pauseChanged = self->pauseWiggle->onChange(); + state Future stopChanged = stopSignal->onChange(); if (self->wigglingId.present()) { state UID id = self->wigglingId.get(); if (self->pauseWiggle->get()) { @@ -2067,7 +2069,7 @@ public: .detail("ExtraHealthyTeamCount", extraTeamCount) .detail("HealthyTeamCount", self->healthyTeamCount); } - when(wait(self->pauseWiggle->onChange())) { continue; } + when(wait(pauseChanged)) { continue; } } } } @@ -2098,7 +2100,7 @@ public: finishStorageWiggleSignal.send(Void()); extraTeamCount = std::max(0, extraTeamCount - 1); } - when(wait(ddQueueCheck || self->pauseWiggle->onChange() || stopSignal->onChange())) {} + when(wait(ddQueueCheck || pauseChanged || stopChanged)) {} } if (stopSignal->get()) { diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 30ad98bcf1..6d0127c431 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -542,7 +542,7 @@ ACTOR Future lastCommitUpdater(GrvProxyData* self, PromiseStream getLiveCommittedVersion(SpanContext parentSpan, +ACTOR Future getLiveCommittedVersion(SpanID parentSpan, GrvProxyData* grvProxyData, uint32_t flags, Optional debugID, @@ -945,7 +945,7 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, int batchGRVProcessed = 0; for (int i = 0; i < start.size(); i++) { if (start[i].size()) { - Future readVersionReply = getLiveCommittedVersion(SpanContext(), + Future readVersionReply = getLiveCommittedVersion(UID() /*span.context*/, grvProxyData, i, debugID, diff --git a/fdbserver/LogSystem.cpp b/fdbserver/LogSystem.cpp index ab8f43cfc5..1e1189facb 100644 --- a/fdbserver/LogSystem.cpp +++ b/fdbserver/LogSystem.cpp @@ -19,9 +19,6 @@ */ #include "fdbserver/LogSystem.h" -#include "fdbclient/FDBTypes.h" -#include "fdbserver/OTELSpanContextMessage.h" -#include "fdbserver/SpanContextMessage.h" #include "flow/serialize.h" std::string LogSet::logRouterString() { @@ -280,8 +277,8 @@ void LogPushData::addTxsTag() { } } -void LogPushData::addTransactionInfo(SpanContext const& context) { - TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanContext +void LogPushData::addTransactionInfo(SpanID const& context) { + TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID spanContext = context; writtenLocations.clear(); } @@ -347,33 +344,13 @@ bool LogPushData::writeTransactionInfo(int location, uint32_t subseq) { writtenLocations.insert(location); BinaryWriter& wr = messagesWriter[location]; + SpanContextMessage contextMessage(spanContext); + int offset = wr.getLength(); wr << uint32_t(0) << subseq << uint16_t(prev_tags.size()); for (auto& tag : prev_tags) wr << tag; - if (logSystem->getTLogVersion() >= TLogVersion::V7) { - OTELSpanContextMessage contextMessage(spanContext); - wr << contextMessage; - } else { - // When we're on a TLog version below 7, but the front end of the system (i.e. proxy, sequencer, resolver) - // is using OpenTelemetry tracing (i.e on or above 7.2), we need to convert the OpenTelemetry Span data model - // i.e. 16 bytes for traceId, 8 bytes for spanId, to the OpenTracing spec, which is 8 bytes for traceId - // and 8 bytes for spanId. That means we need to drop some data. - // - // As a workaround for this special case we've decided to drop is the 8 bytes - // for spanId. Therefore we're passing along the full 16 byte traceId to the storage server with 0 for spanID. - // This will result in a follows from relationship for the storage span within the trace rather than a - // parent->child. - SpanContextMessage contextMessage; - if (spanContext.isSampled()) { - TEST(true); // Converting OTELSpanContextMessage to traced SpanContextMessage - contextMessage = SpanContextMessage(UID(spanContext.traceID.first(), spanContext.traceID.second())); - } else { - TEST(true); // Converting OTELSpanContextMessage to untraced SpanContextMessage - contextMessage = SpanContextMessage(UID(0, 0)); - } - wr << contextMessage; - } + wr << contextMessage; int length = wr.getLength() - offset; *(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t); return true; diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 6581457c25..e8453184e4 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -26,7 +26,6 @@ #include #include "fdbserver/SpanContextMessage.h" -#include "fdbserver/OTELSpanContextMessage.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbclient/DatabaseConfiguration.h" @@ -520,7 +519,7 @@ struct ILogSystem { Version knownCommittedVersion, Version minKnownCommittedVersion, LogPushData& data, - SpanContext const& spanContext, + SpanID const& spanContext, Optional debugID = Optional(), Optional> tpcvMap = Optional>()) = 0; @@ -763,7 +762,7 @@ struct LogPushData : NonCopyable { } // Add transaction info to be written before the first mutation in the transaction. - void addTransactionInfo(SpanContext const& context); + void addTransactionInfo(SpanID const& context); // copy written_tags, after filtering, into given set void saveTags(std::set& filteredTags) const { @@ -833,7 +832,7 @@ private: // field. std::unordered_set writtenLocations; uint32_t subsequence; - SpanContext spanContext; + SpanID spanContext; bool shardChanged = false; // if keyServers has any changes, i.e., shard boundary modifications. // Writes transaction info to the message stream at the given location if diff --git a/fdbserver/MasterInterface.h b/fdbserver/MasterInterface.h index f9c2c506ad..73fc6ef114 100644 --- a/fdbserver/MasterInterface.h +++ b/fdbserver/MasterInterface.h @@ -133,14 +133,14 @@ struct GetCommitVersionReply { struct GetCommitVersionRequest { constexpr static FileIdentifier file_identifier = 16683181; - SpanContext spanContext; + SpanID spanContext; uint64_t requestNum; uint64_t mostRecentProcessedRequestNum; UID requestingProxy; ReplyPromise reply; GetCommitVersionRequest() {} - GetCommitVersionRequest(SpanContext spanContext, + GetCommitVersionRequest(SpanID spanContext, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum, UID requestingProxy) diff --git a/fdbserver/MutationTracking.cpp b/fdbserver/MutationTracking.cpp index fd8f55c313..9ec17299d5 100644 --- a/fdbserver/MutationTracking.cpp +++ b/fdbserver/MutationTracking.cpp @@ -24,7 +24,6 @@ #include "fdbserver/MutationTracking.h" #include "fdbserver/LogProtocolMessage.h" #include "fdbserver/SpanContextMessage.h" -#include "fdbserver/OTELSpanContextMessage.h" #include "fdbclient/SystemData.h" #if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED #error "You cannot use mutation tracking in a clean/release build." @@ -97,11 +96,6 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); SpanContextMessage scm; br >> scm; - } else if (OTELSpanContextMessage::startsOTELSpanContextMessage(mutationType)) { - TEST(true); // MutationTracking reading OTELSpanContextMessage - BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); - OTELSpanContextMessage scm; - br >> scm; } else { MutationRef m; BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); diff --git a/fdbserver/OTELSpanContextMessage.h b/fdbserver/OTELSpanContextMessage.h deleted file mode 100644 index 9f6d588fed..0000000000 --- a/fdbserver/OTELSpanContextMessage.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * OTELSpanContextMessage.h - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FDBSERVER_OTELSPANCONTEXTMESSAGE_H -#define FDBSERVER_OTELSPANCONTEXTMESSAGE_H -#pragma once - -#include "flow/Tracing.h" -#include "fdbclient/FDBTypes.h" -#include "fdbclient/CommitTransaction.h" - -struct OTELSpanContextMessage { - // This message is pushed into the the transaction logs' memory to inform - // it what transaction subsequent mutations were a part of. This allows - // transaction logs and storage servers to associate mutations with a - // transaction identifier, called a span context. - // - // This message is similar to LogProtocolMessage. Storage servers read the - // first byte of this message to uniquely identify it, meaning it will - // never be mistaken for another message. See LogProtocolMessage.h for more - // information. - - SpanContext spanContext; - - OTELSpanContextMessage() {} - OTELSpanContextMessage(SpanContext const& spanContext) : spanContext(spanContext) {} - - std::string toString() const { - return format("code: %d, span context: %s", - MutationRef::Reserved_For_OTELSpanContextMessage, - spanContext.toString().c_str()); - } - - template - void serialize(Ar& ar) { - uint8_t poly = MutationRef::Reserved_For_OTELSpanContextMessage; - serializer(ar, poly, spanContext); - } - - static bool startsOTELSpanContextMessage(uint8_t byte) { - return byte == MutationRef::Reserved_For_OTELSpanContextMessage; - } - template - static bool isNextIn(Ar& ar) { - return startsOTELSpanContextMessage(*(const uint8_t*)ar.peekBytes(1)); - } -}; - -#endif diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index 91449c6923..d24a0401b5 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -340,8 +340,8 @@ ACTOR Future resolveBatch(Reference self, ResolveTransactionBatc // The condition here must match CommitBatch::applyMetadataToCommittedTransactions() if (reply.committed[t] == ConflictBatch::TransactionCommitted && !self->forceRecovery && SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && (!isLocked || req.transactions[t].lock_aware)) { - SpanContext spanContext = - req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext(); + SpanID spanContext = + req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanID(); applyMetadataMutations(spanContext, resolverData, req.transactions[t].mutations); } @@ -565,7 +565,7 @@ ACTOR Future processCompleteTransactionStateRequest(TransactionStateResolv ResolverData resolverData( pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges); - applyMetadataMutations(SpanContext(), resolverData, mutations); + applyMetadataMutations(SpanID(), resolverData, mutations); } // loop auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get(); diff --git a/fdbserver/ResolverInterface.h b/fdbserver/ResolverInterface.h index 51110e5c01..782fa2be88 100644 --- a/fdbserver/ResolverInterface.h +++ b/fdbserver/ResolverInterface.h @@ -118,7 +118,7 @@ struct ResolveTransactionBatchRequest { constexpr static FileIdentifier file_identifier = 16462858; Arena arena; - SpanContext spanContext; + SpanID spanContext; Version prevVersion; Version version; // FIXME: ? Version lastReceivedVersion; diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 3fd22920b0..9e825aa276 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -636,7 +636,8 @@ ACTOR Future simulatedFDBDRebooter(Reference logRangeWarningFetcher(Database cx, return Void(); } +struct ProtocolVersionData { + ProtocolVersion runningProtocolVersion; + ProtocolVersion newestProtocolVersion; + ProtocolVersion lowestCompatibleProtocolVersion; + ProtocolVersionData() : runningProtocolVersion(currentProtocolVersion) {} + + ProtocolVersionData(uint64_t newestProtocolVersionValue, uint64_t lowestCompatibleProtocolVersionValue) + : runningProtocolVersion(currentProtocolVersion), newestProtocolVersion(newestProtocolVersionValue), + lowestCompatibleProtocolVersion(lowestCompatibleProtocolVersionValue) {} +}; + +ACTOR Future getNewestProtocolVersion(Database cx, WorkerDetails ccWorker) { + + try { + state Future swVersionF = timeoutError( + ccWorker.interf.eventLogRequest.getReply(EventLogRequest("SWVersionCompatibilityChecked"_sr)), 1.0); + + wait(success(swVersionF)); + const TraceEventFields& swVersionTrace = swVersionF.get(); + int64_t newestProtocolVersionValue = + std::stoull(swVersionTrace.getValue("NewestProtocolVersion").c_str(), nullptr, 16); + int64_t lowestCompatibleProtocolVersionValue = + std::stoull(swVersionTrace.getValue("LowestCompatibleProtocolVersion").c_str(), nullptr, 16); + + return ProtocolVersionData(newestProtocolVersionValue, lowestCompatibleProtocolVersionValue); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; + + TraceEvent(SevWarnAlways, "SWVersionStatusFailed").error(e); + + return ProtocolVersionData(); + } +} + struct LoadConfigurationResult { bool fullReplication; Optional healthyZone; @@ -2880,6 +2916,8 @@ ACTOR Future clusterGetStatus( messages.push_back(message); } + state ProtocolVersionData protocolVersion = wait(getNewestProtocolVersion(cx, ccWorker)); + // construct status information for cluster subsections state int statusCode = (int)RecoveryStatus::END; state JsonBuilderObject recoveryStateStatus = wait( @@ -2917,6 +2955,9 @@ ACTOR Future clusterGetStatus( statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version()); statusObj["connection_string"] = coordinators.ccr->getConnectionString().toString(); statusObj["bounce_impact"] = getBounceImpactInfo(statusCode); + statusObj["newest_protocol_version"] = format("%" PRIx64, protocolVersion.newestProtocolVersion.version()); + statusObj["lowest_compatible_protocol_version"] = + format("%" PRIx64, protocolVersion.lowestCompatibleProtocolVersion.version()); state Optional configuration; state Optional loadResult; diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 8cf24a67d8..a97931a2a8 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -18,7 +18,6 @@ * limitations under the License. */ -#include "fdbserver/OTELSpanContextMessage.h" #include "flow/Arena.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/NativeAPI.actor.h" @@ -1898,10 +1897,6 @@ ACTOR Future pullAsyncData(StorageCacheData* data) { SpanContextMessage::isNextIn(cloneReader)) { SpanContextMessage scm; cloneReader >> scm; - } else if (cloneReader.protocolVersion().hasOTELSpanContext() && - OTELSpanContextMessage::isNextIn(cloneReader)) { - OTELSpanContextMessage scm; - cloneReader >> scm; } else { MutationRef msg; cloneReader >> msg; @@ -1980,10 +1975,6 @@ ACTOR Future pullAsyncData(StorageCacheData* data) { } else if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) { SpanContextMessage scm; reader >> scm; - } else if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) { - TEST(true); // StorageCache reading OTELSpanContextMessage - OTELSpanContextMessage oscm; - reader >> oscm; } else { MutationRef msg; reader >> msg; diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index 9da4ecedd4..b8ec6899d2 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -296,7 +296,7 @@ struct TLogCommitReply { struct TLogCommitRequest { constexpr static FileIdentifier file_identifier = 4022206; - SpanContext spanContext; + SpanID spanContext; Arena arena; Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion; @@ -307,7 +307,7 @@ struct TLogCommitRequest { Optional debugID; TLogCommitRequest() {} - TLogCommitRequest(const SpanContext& context, + TLogCommitRequest(const SpanID& context, const Arena& a, Version prevVersion, Version version, diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index a3dfa3209c..3ccef6192e 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -507,7 +507,7 @@ Future TagPartitionedLogSystem::push(Version prevVersion, Version knownCommittedVersion, Version minKnownCommittedVersion, LogPushData& data, - SpanContext const& spanContext, + SpanID const& spanContext, Optional debugID, Optional> tpcvMap) { // FIXME: Randomize request order as in LegacyLogSystem? diff --git a/fdbserver/TagPartitionedLogSystem.actor.h b/fdbserver/TagPartitionedLogSystem.actor.h index eb7c389e5b..baf1a46711 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.h +++ b/fdbserver/TagPartitionedLogSystem.actor.h @@ -191,7 +191,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted debugID, Optional> tpcvMap) final; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 9bd1660b07..7cb99d8d21 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -120,7 +120,7 @@ struct MasterData : NonCopyable, ReferenceCounted { }; ACTOR Future getVersion(Reference self, GetCommitVersionRequest req) { - state Span span("M:getVersion"_loc, req.spanContext); + state Span span("M:getVersion"_loc, { req.spanContext }); state std::map::iterator proxyItr = self->lastCommitProxyVersionReplies.find(req.requestingProxy); // lastCommitProxyVersionReplies never changes diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index baf6dec861..50936b5950 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -24,10 +24,8 @@ #include #include "contrib/fmt-8.1.1/include/fmt/format.h" -#include "fdbclient/FDBTypes.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/LoadBalance.h" -#include "fdbserver/OTELSpanContextMessage.h" #include "flow/ActorCollection.h" #include "flow/Arena.h" #include "flow/Error.h" @@ -1397,8 +1395,8 @@ void updateProcessStats(StorageServer* self) { #pragma region Queries #endif -ACTOR Future waitForVersionActor(StorageServer* data, Version version, SpanContext spanContext) { - state Span span("SS.WaitForVersion"_loc, spanContext); +ACTOR Future waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) { + state Span span("SS.WaitForVersion"_loc, { spanContext }); choose { when(wait(data->version.whenAtLeast(version))) { // FIXME: A bunch of these can block with or without the following delay 0. @@ -1435,7 +1433,7 @@ Version getLatestCommitVersion(VersionVector& ssLatestCommitVersions, Tag& tag) return commitVersion; } -Future waitForVersion(StorageServer* data, Version version, SpanContext spanContext) { +Future waitForVersion(StorageServer* data, Version version, SpanID spanContext) { if (version == latestVersion) { version = std::max(Version(1), data->version.get()); } @@ -1456,10 +1454,7 @@ Future waitForVersion(StorageServer* data, Version version, SpanContext return waitForVersionActor(data, version, spanContext); } -Future waitForVersion(StorageServer* data, - Version commitVersion, - Version readVersion, - SpanContext spanContext) { +Future waitForVersion(StorageServer* data, Version commitVersion, Version readVersion, SpanID spanContext) { ASSERT(commitVersion == invalidVersion || commitVersion < readVersion); if (commitVersion == invalidVersion) { @@ -1533,11 +1528,11 @@ Optional StorageServer::getTenantEntry(Version version, TenantIn ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { state int64_t resultSize = 0; - Span span("SS:getValue"_loc, req.spanContext); + Span span("SS:getValue"_loc, { req.spanContext }); if (req.tenantInfo.name.present()) { - span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); + span.addTag("tenant"_sr, req.tenantInfo.name.get()); } - span.addAttribute("key"_sr, req.key); + span.addTag("key"_sr, req.key); // Temporarily disabled -- this path is hit a lot // getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); @@ -1670,9 +1665,9 @@ ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { // must be kept alive until the watch is finished. extern size_t WATCH_OVERHEAD_WATCHQ, WATCH_OVERHEAD_WATCHIMPL; -ACTOR Future watchWaitForValueChange(StorageServer* data, SpanContext parent, KeyRef key) { +ACTOR Future watchWaitForValueChange(StorageServer* data, SpanID parent, KeyRef key) { state Location spanLocation = "SS:watchWaitForValueChange"_loc; - state Span span(spanLocation, parent); + state Span span(spanLocation, { parent }); state Reference metadata = data->getWatchMetadata(key); if (metadata->debugID.present()) @@ -1779,8 +1774,8 @@ void checkCancelWatchImpl(StorageServer* data, WatchValueRequest req) { ACTOR Future watchValueSendReply(StorageServer* data, WatchValueRequest req, Future resp, - SpanContext spanContext) { - state Span span("SS:watchValue"_loc, spanContext); + SpanID spanContext) { + state Span span("SS:watchValue"_loc, { spanContext }); state double startTime = now(); ++data->counters.watchQueries; ++data->numWatches; @@ -2508,7 +2503,7 @@ ACTOR Future stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq } ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req, UID streamUID) { - state Span span("SS:getChangeFeedStream"_loc, req.spanContext); + state Span span("SS:getChangeFeedStream"_loc, { req.spanContext }); state bool atLatest = false; state bool removeUID = false; state Optional blockedVersion; @@ -2865,7 +2860,7 @@ ACTOR Future readRange(StorageServer* data, KeyRange range, int limit, int* pLimitBytes, - SpanContext parentSpan, + SpanID parentSpan, IKeyValueStore::ReadType type, Optional tenantPrefix) { state GetKeyValuesReply result; @@ -3104,7 +3099,7 @@ ACTOR Future findKey(StorageServer* data, Version version, KeyRange range, int* pOffset, - SpanContext parentSpan, + SpanID parentSpan, IKeyValueStore::ReadType type) // Attempts to find the key indicated by sel in the data at version, within range. // Precondition: selectorInRange(sel, range) @@ -3125,7 +3120,7 @@ ACTOR Future findKey(StorageServer* data, state int sign = forward ? +1 : -1; state bool skipEqualKey = sel.orEqual == forward; state int distance = forward ? sel.offset : 1 - sel.offset; - state Span span("SS.findKey"_loc, parentSpan); + state Span span("SS.findKey"_loc, { parentSpan }); // Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from // the read range in this case) @@ -3223,16 +3218,16 @@ ACTOR Future getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req) // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read { - state Span span("SS:getKeyValues"_loc, req.spanContext); + state Span span("SS:getKeyValues"_loc, { req.spanContext }); state int64_t resultSize = 0; state IKeyValueStore::ReadType type = req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL; if (req.tenantInfo.name.present()) { - span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); + span.addTag("tenant"_sr, req.tenantInfo.name.get()); } - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); ++data->counters.getRangeQueries; ++data->counters.allQueries; @@ -3716,16 +3711,16 @@ ACTOR Future getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read { - state Span span("SS:getMappedKeyValues"_loc, req.spanContext); + state Span span("SS:getMappedKeyValues"_loc, { req.spanContext }); state int64_t resultSize = 0; state IKeyValueStore::ReadType type = req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL; if (req.tenantInfo.name.present()) { - span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); + span.addTag("tenant"_sr, req.tenantInfo.name.get()); } - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); ++data->counters.getMappedRangeQueries; ++data->counters.allQueries; @@ -3930,13 +3925,13 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read { - state Span span("SS:getKeyValuesStream"_loc, req.spanContext); + state Span span("SS:getKeyValuesStream"_loc, { req.spanContext }); state int64_t resultSize = 0; state IKeyValueStore::ReadType type = req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL; if (req.tenantInfo.name.present()) { - span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); + span.addTag("tenant"_sr, req.tenantInfo.name.get()); } req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES); @@ -4134,12 +4129,12 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe } ACTOR Future getKeyQ(StorageServer* data, GetKeyRequest req) { - state Span span("SS:getKey"_loc, req.spanContext); + state Span span("SS:getKey"_loc, { req.spanContext }); if (req.tenantInfo.name.present()) { - span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); + span.addTag("tenant"_sr, req.tenantInfo.name.get()); } state int64_t resultSize = 0; - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); ++data->counters.getKeyQueries; ++data->counters.allQueries; @@ -6852,10 +6847,6 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { SpanContextMessage::isNextIn(cloneReader)) { SpanContextMessage scm; cloneReader >> scm; - } else if (cloneReader.protocolVersion().hasOTELSpanContext() && - OTELSpanContextMessage::isNextIn(cloneReader)) { - OTELSpanContextMessage scm; - cloneReader >> scm; } else { MutationRef msg; cloneReader >> msg; @@ -6938,7 +6929,7 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { state Version ver = invalidVersion; cloneCursor2->setProtocolVersion(data->logProtocol); - state SpanContext spanContext = SpanContext(); + state SpanID spanContext = SpanID(); state double beforeTLogMsgsUpdates = now(); state std::set updatedChangeFeeds; for (; cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) { @@ -6972,27 +6963,17 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { data->logProtocol = rd.protocolVersion(); data->storage.changeLogProtocol(ver, data->logProtocol); cloneCursor2->setProtocolVersion(rd.protocolVersion()); - spanContext.traceID = UID(); + spanContext = UID(); } else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) { SpanContextMessage scm; rd >> scm; - TEST(true); // storageserveractor converting SpanContextMessage into OTEL SpanContext - spanContext = - SpanContext(UID(scm.spanContext.first(), scm.spanContext.second()), - 0, - scm.spanContext.first() != 0 && scm.spanContext.second() != 0 ? TraceFlags::sampled - : TraceFlags::unsampled); - } else if (rd.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(rd)) { - TEST(true); // storageserveractor reading OTELSpanContextMessage - OTELSpanContextMessage scm; - rd >> scm; spanContext = scm.spanContext; } else { MutationRef msg; rd >> msg; - Span span("SS:update"_loc, spanContext); - span.addAttribute("key"_sr, msg.param1); + Span span("SS:update"_loc, { spanContext }); + span.addTag("key"_sr, msg.param1); // Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in // quarantine. @@ -8424,11 +8405,11 @@ ACTOR Future serveGetKeyRequests(StorageServer* self, FutureStream watchValueWaitForVersion(StorageServer* self, WatchValueRequest req, PromiseStream stream) { - state Span span("SS:watchValueWaitForVersion"_loc, req.spanContext); + state Span span("SS:watchValueWaitForVersion"_loc, { req.spanContext }); if (req.tenantInfo.name.present()) { - span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); + span.addTag("tenant"_sr, req.tenantInfo.name.get()); } - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); try { wait(success(waitForVersionNoTooOld(self, req.version))); Optional entry = self->getTenantEntry(latestVersion, req.tenantInfo); @@ -8446,11 +8427,11 @@ ACTOR Future watchValueWaitForVersion(StorageServer* self, ACTOR Future serveWatchValueRequestsImpl(StorageServer* self, FutureStream stream) { loop { - getCurrentLineage()->modify(&TransactionLineage::txID) = UID(); + getCurrentLineage()->modify(&TransactionLineage::txID) = 0; state WatchValueRequest req = waitNext(stream); state Reference metadata = self->getWatchMetadata(req.key.contents()); - state Span span("SS:serveWatchValueRequestsImpl"_loc, req.spanContext); - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; + state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext }); + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); // case 1: no watch set for the current key if (!metadata.isValid()) { diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 610d4f128a..a777588037 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -22,12 +22,18 @@ #include #include +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/IAsyncFile.h" #include "fdbrpc/Locality.h" #include "fdbclient/GlobalConfig.actor.h" #include "fdbclient/ProcessInterface.h" #include "fdbclient/StorageServerInterface.h" #include "fdbserver/Knobs.h" #include "flow/ActorCollection.h" +#include "flow/Error.h" +#include "flow/FileIdentifier.h" +#include "flow/ObjectSerializer.h" +#include "flow/Platform.h" #include "flow/ProtocolVersion.h" #include "flow/SystemMonitor.h" #include "flow/TDMetric.actor.h" @@ -57,7 +63,9 @@ #include "flow/ThreadHelper.actor.h" #include "flow/Trace.h" #include "flow/flow.h" +#include "flow/genericactors.actor.h" #include "flow/network.h" +#include "flow/serialize.h" #ifdef __linux__ #include @@ -2624,6 +2632,291 @@ ACTOR Future monitorAndWriteCCPriorityInfo(std::string filePath, } } +static const std::string versionFileName = "sw-version"; + +ACTOR Future testSoftwareVersionCompatibility(std::string folder, ProtocolVersion currentVersion) { + try { + state std::string versionFilePath = joinPath(folder, versionFileName); + state ErrorOr> versionFile = wait( + errorOr(IAsyncFileSystem::filesystem(g_network)->open(versionFilePath, IAsyncFile::OPEN_READONLY, 0600))); + + if (versionFile.isError()) { + if (versionFile.getError().code() == error_code_file_not_found && !fileExists(versionFilePath)) { + // If a version file does not exist, we assume this is either a fresh + // installation or an upgrade from a version that does not support version files. + // Either way, we can safely continue running this version of software. + TraceEvent(SevInfo, "NoPreviousSWVersion").log(); + return SWVersion(); + } else { + // Dangerous to continue if we cannot do a software compatibility test + throw versionFile.getError(); + } + } else { + // Test whether the most newest software version that has been run on this cluster is + // compatible with the current software version + state int64_t filesize = wait(versionFile.get()->size()); + state Standalone buf = makeString(filesize); + int readLen = wait(versionFile.get()->read(mutateString(buf), filesize, 0)); + if (filesize == 0 || readLen != filesize) { + throw file_corrupt(); + } + + try { + SWVersion swversion = ObjectReader::fromStringRef(buf, IncludeVersion()); + ProtocolVersion lowestCompatibleVersion(swversion.lowestCompatibleProtocolVersion()); + if (currentVersion >= lowestCompatibleVersion) { + return swversion; + } else { + throw incompatible_software_version(); + } + } catch (Error& e) { + throw e; + } + } + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + // TODO(bvr): Inject faults + TraceEvent(SevWarnAlways, "OpenReadSWVersionFileError").error(e); + throw; + } +} + +ACTOR Future updateNewestSoftwareVersion(std::string folder, + ProtocolVersion currentVersion, + ProtocolVersion latestVersion, + ProtocolVersion minCompatibleVersion) { + + ASSERT(currentVersion >= minCompatibleVersion); + + try { + state std::string versionFilePath = joinPath(folder, versionFileName); + ErrorOr> versionFile = wait( + errorOr(IAsyncFileSystem::filesystem(g_network)->open(versionFilePath, IAsyncFile::OPEN_READONLY, 0600))); + + if (versionFile.isError() && + (versionFile.getError().code() != error_code_file_not_found || fileExists(versionFilePath))) { + throw versionFile.getError(); + } + + state Reference newVersionFile = wait(IAsyncFileSystem::filesystem()->open( + versionFilePath, + IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, + 0600)); + + SWVersion swVersion(latestVersion, currentVersion, minCompatibleVersion); + auto s = swVersionValue(swVersion); + ErrorOr e = wait(errorOr(newVersionFile->write(s.toString().c_str(), s.size(), 0))); + if (e.isError()) { + throw e.getError(); + } + wait(newVersionFile->sync()); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + TraceEvent(SevWarnAlways, "OpenWriteSWVersionFileError").error(e); + throw; + } + + return Void(); +} + +ACTOR Future testAndUpdateSoftwareVersionCompatibility(std::string dataFolder, UID processIDUid) { + ErrorOr swVersion = wait(errorOr(testSoftwareVersionCompatibility(dataFolder, currentProtocolVersion))); + if (swVersion.isError()) { + TraceEvent(SevWarnAlways, "SWVersionCompatibilityCheckError", processIDUid).error(swVersion.getError()); + throw swVersion.getError(); + } + + TraceEvent(SevInfo, "SWVersionCompatible", processIDUid).detail("SWVersion", swVersion.get()); + + if (!swVersion.get().isValid() || + currentProtocolVersion > ProtocolVersion(swVersion.get().newestProtocolVersion())) { + ErrorOr updatedSWVersion = wait(errorOr(updateNewestSoftwareVersion( + dataFolder, currentProtocolVersion, currentProtocolVersion, minCompatibleProtocolVersion))); + if (updatedSWVersion.isError()) { + throw updatedSWVersion.getError(); + } + } else if (currentProtocolVersion < ProtocolVersion(swVersion.get().newestProtocolVersion())) { + ErrorOr updatedSWVersion = wait( + errorOr(updateNewestSoftwareVersion(dataFolder, + currentProtocolVersion, + ProtocolVersion(swVersion.get().newestProtocolVersion()), + ProtocolVersion(swVersion.get().lowestCompatibleProtocolVersion())))); + if (updatedSWVersion.isError()) { + throw updatedSWVersion.getError(); + } + } + + ErrorOr newSWVersion = + wait(errorOr(testSoftwareVersionCompatibility(dataFolder, currentProtocolVersion))); + if (newSWVersion.isError()) { + TraceEvent(SevWarnAlways, "SWVersionCompatibilityCheckError", processIDUid).error(newSWVersion.getError()); + throw newSWVersion.getError(); + } + + TraceEvent(SevInfo, "VerifiedNewSoftwareVersion", processIDUid).detail("SWVersion", newSWVersion.get()); + + return Void(); +} + +static const std::string swversionTestDirName = "sw-version-test"; + +TEST_CASE("/fdbserver/worker/swversion/noversionhistory") { + if (!platform::createDirectory("sw-version-test")) { + TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test"); + return Void(); + } + + ErrorOr swversion = wait(errorOr( + testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness()))); + + if (!swversion.isError()) { + ASSERT(!swversion.get().isValid()); + } + + wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true)); + + return Void(); +} + +TEST_CASE("/fdbserver/worker/swversion/writeVerifyVersion") { + if (!platform::createDirectory("sw-version-test")) { + TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test"); + return Void(); + } + + ErrorOr f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName, + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withTSS()))); + + ErrorOr swversion = wait(errorOr( + testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness()))); + + if (!swversion.isError()) { + ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version()); + } + + wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true)); + + return Void(); +} + +TEST_CASE("/fdbserver/worker/swversion/runCompatibleOlder") { + if (!platform::createDirectory("sw-version-test")) { + TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test"); + return Void(); + } + + ErrorOr f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName, + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withTSS()))); + + ErrorOr swversion = wait(errorOr( + testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness()))); + + if (!swversion.isError()) { + ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version()); + + TraceEvent(SevInfo, "UT/swversion/runCompatibleOlder").detail("SWVersion", swversion.get()); + } + + ErrorOr f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName, + ProtocolVersion::withTSS(), + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withTSS()))); + + ErrorOr swversion = wait(errorOr( + testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness()))); + + if (!swversion.isError()) { + ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withTSS().version()); + ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version()); + } + + wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true)); + + return Void(); +} + +TEST_CASE("/fdbserver/worker/swversion/runIncompatibleOlder") { + if (!platform::createDirectory("sw-version-test")) { + TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test"); + return Void(); + } + + ErrorOr f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName, + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withTSS()))); + + ErrorOr swversion = wait(errorOr( + testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness()))); + + if (!swversion.isError()) { + ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version()); + } + + ErrorOr swversion = + wait(errorOr(testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withCacheRole()))); + + ASSERT(swversion.isError() && swversion.getError().code() == error_code_incompatible_software_version); + + wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true)); + + return Void(); +} + +TEST_CASE("/fdbserver/worker/swversion/runNewer") { + if (!platform::createDirectory("sw-version-test")) { + TraceEvent(SevWarnAlways, "FailedToCreateDirectory").detail("Directory", "sw-version-test"); + return Void(); + } + + ErrorOr f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName, + ProtocolVersion::withTSS(), + ProtocolVersion::withTSS(), + ProtocolVersion::withCacheRole()))); + + ErrorOr swversion = wait(errorOr( + testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness()))); + + if (!swversion.isError()) { + ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withTSS().version()); + ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withTSS().version()); + ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withCacheRole().version()); + } + + ErrorOr f = wait(errorOr(updateNewestSoftwareVersion(swversionTestDirName, + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withStorageInterfaceReadiness(), + ProtocolVersion::withTSS()))); + + ErrorOr swversion = wait(errorOr( + testSoftwareVersionCompatibility(swversionTestDirName, ProtocolVersion::withStorageInterfaceReadiness()))); + + if (!swversion.isError()) { + ASSERT(swversion.get().newestProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lastRunProtocolVersion() == ProtocolVersion::withStorageInterfaceReadiness().version()); + ASSERT(swversion.get().lowestCompatibleProtocolVersion() == ProtocolVersion::withTSS().version()); + } + + wait(IAsyncFileSystem::filesystem()->deleteFile(joinPath(swversionTestDirName, versionFileName), true)); + + return Void(); +} + ACTOR Future createAndLockProcessIdFile(std::string folder) { state UID processIDUid; platform::createDirectory(folder); @@ -2923,6 +3216,8 @@ ACTOR Future fdbd(Reference connRecord, localities.set(LocalityData::keyProcessId, processIDUid.toString()); // Only one process can execute on a dataFolder from this point onwards + wait(testAndUpdateSoftwareVersionCompatibility(dataFolder, processIDUid)); + std::string fitnessFilePath = joinPath(dataFolder, "fitness"); auto cc = makeReference>>(); auto ci = makeReference>>(); diff --git a/fdbserver/workloads/ApiWorkload.h b/fdbserver/workloads/ApiWorkload.h index 8f46f7b148..64836e03b6 100644 --- a/fdbserver/workloads/ApiWorkload.h +++ b/fdbserver/workloads/ApiWorkload.h @@ -80,8 +80,8 @@ struct TransactionWrapper : public ReferenceCounted { // Gets the version vector cached in a transaction virtual VersionVector getVersionVector() = 0; - // Gets the spanContext of a transaction - virtual SpanContext getSpanContext() = 0; + // Gets the spanID of a transaction + virtual UID getSpanID() = 0; // Prints debugging messages for a transaction; not implemented for all transaction types virtual void debugTransaction(UID debugId) {} @@ -161,8 +161,8 @@ struct FlowTransactionWrapper : public TransactionWrapper { // Gets the version vector cached in a transaction VersionVector getVersionVector() override { return transaction.getVersionVector(); } - // Gets the spanContext of a transaction - SpanContext getSpanContext() override { return transaction.getSpanContext(); } + // Gets the spanID of a transaction + UID getSpanID() override { return transaction.getSpanID(); } // Prints debugging messages for a transaction void debugTransaction(UID debugId) override { transaction.debugTransaction(debugId); } @@ -229,8 +229,8 @@ struct ThreadTransactionWrapper : public TransactionWrapper { // Gets the version vector cached in a transaction VersionVector getVersionVector() override { return transaction->getVersionVector(); } - // Gets the spanContext of a transaction - SpanContext getSpanContext() override { return transaction->getSpanContext(); } + // Gets the spanID of a transaction + UID getSpanID() override { return transaction->getSpanID(); } void addReadConflictRange(KeyRangeRef const& keys) override { transaction->addReadConflictRange(keys); } }; diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index 82dfba5ef7..16d39a7f1e 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -873,8 +873,7 @@ struct ConsistencyCheckWorkload : TestWorkload { state Key begin = kr.begin; state Key end = kr.end; state int limitKeyServers = BUGGIFY ? 1 : 100; - state Span span(SpanContext(deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64()), - "WL:ConsistencyCheck"_loc); + state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc); while (begin < end) { state Reference commitProxyInfo = diff --git a/fdbserver/workloads/Cycle.actor.cpp b/fdbserver/workloads/Cycle.actor.cpp index 81512399f8..1b7131f8e3 100644 --- a/fdbserver/workloads/Cycle.actor.cpp +++ b/fdbserver/workloads/Cycle.actor.cpp @@ -106,9 +106,10 @@ struct CycleWorkload : TestWorkload { state Transaction tr(cx); if (deterministicRandom()->random01() >= self->traceParentProbability) { state Span span("CycleClient"_loc); - //TraceEvent("CycleTracingTransaction", span.context).log(); + // TraceEvent("CycleTracingTransaction", span.context).log(); + TraceEvent("CycleTracingTransaction", span.context).log(); tr.setOption(FDBTransactionOptions::SPAN_PARENT, - BinaryWriter::toValue(span.context, IncludeVersion())); + BinaryWriter::toValue(span.context, Unversioned())); } while (true) { try { diff --git a/fdbserver/workloads/MiniCycle.actor.cpp b/fdbserver/workloads/MiniCycle.actor.cpp index 5b9b48ab2c..b071902a8c 100644 --- a/fdbserver/workloads/MiniCycle.actor.cpp +++ b/fdbserver/workloads/MiniCycle.actor.cpp @@ -174,7 +174,7 @@ struct MiniCycleWorkload : TestWorkload { state Transaction tr(cx); if (deterministicRandom()->random01() >= self->traceParentProbability) { state Span span("MiniCycleClient"_loc); - TraceEvent("MiniCycleTracingTransaction", span.context.traceID).log(); + TraceEvent("MiniCycleTracingTransaction", span.context).log(); tr.setOption(FDBTransactionOptions::SPAN_PARENT, BinaryWriter::toValue(span.context, Unversioned())); } diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index 3ad9c3e0b8..eabcb38145 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -19,8 +19,8 @@ */ #pragma once -#include #include "flow/Trace.h" +#include // This version impacts both communications and the deserialization of certain database and IKeyValueStore keys. // @@ -37,6 +37,9 @@ constexpr uint64_t currentProtocolVersionValue = 0x0FDB00B072000000LL; // than the current version, meaning that we only support downgrades between consecutive release versions. constexpr uint64_t minInvalidProtocolVersionValue = 0x0FDB00B074000000LL; +// The lowest protocol version that can be downgraded to. +constexpr uint64_t minCompatibleProtocolVersionValue = 0x0FDB00B071000000LL; + #define PROTOCOL_VERSION_FEATURE(v, x) \ static_assert((v & 0xF0FFFFLL) == 0 || v < 0x0FDB00B071000000LL, "Unexpected feature protocol version"); \ static_assert(v <= currentProtocolVersionValue, "Feature protocol version too large"); \ @@ -59,6 +62,7 @@ public: // constants static constexpr uint64_t objectSerializerFlag = 0x1000000000000000LL; static constexpr uint64_t compatibleProtocolVersionMask = 0xFFFFFFFFFFFF0000LL; static constexpr uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL; + static constexpr uint64_t invalidProtocolVersion = 0x0FDB00A100000000LL; public: constexpr explicit ProtocolVersion(uint64_t version) : _version(version) {} @@ -74,6 +78,8 @@ public: } constexpr bool isValid() const { return version() >= minValidProtocolVersion; } + constexpr bool isInvalid() const { return version() == invalidProtocolVersion; } + constexpr uint64_t version() const { return _version & versionFlagMask; } constexpr uint64_t versionWithFlags() const { return _version; } @@ -164,7 +170,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, Tenants); PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageInterfaceReadiness); PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations); - PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext); + PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking); }; template <> @@ -176,6 +182,7 @@ struct Traceable : std::true_type { constexpr ProtocolVersion currentProtocolVersion(currentProtocolVersionValue); constexpr ProtocolVersion minInvalidProtocolVersion(minInvalidProtocolVersionValue); +constexpr ProtocolVersion minCompatibleProtocolVersion(minCompatibleProtocolVersionValue); // This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to // change when we reach version 10. @@ -194,4 +201,47 @@ static_assert(minInvalidProtocolVersion.version() >= // The min invalid protocol version should be the smallest possible protocol version associated with a minor release // version. -static_assert((minInvalidProtocolVersion.version() & 0xFFFFFFLL) == 0, "Unexpected min invalid protocol version"); \ No newline at end of file +static_assert((minInvalidProtocolVersion.version() & 0xFFFFFFLL) == 0, "Unexpected min invalid protocol version"); + +struct SWVersion { + constexpr static FileIdentifier file_identifier = 13943914; + +private: + uint64_t _newestProtocolVersion; + uint64_t _lastRunProtocolVersion; + uint64_t _lowestCompatibleProtocolVersion; + +public: + SWVersion() { + _newestProtocolVersion = 0; + _lastRunProtocolVersion = 0; + _lowestCompatibleProtocolVersion = 0; + } + + SWVersion(ProtocolVersion latestVersion, ProtocolVersion lastVersion, ProtocolVersion minCompatibleVersion) + : _newestProtocolVersion(latestVersion.version()), _lastRunProtocolVersion(lastVersion.version()), + _lowestCompatibleProtocolVersion(minCompatibleVersion.version()) {} + + bool isValid() const { + return (_newestProtocolVersion != 0 && _lastRunProtocolVersion != 0 && _lowestCompatibleProtocolVersion != 0); + } + + uint64_t newestProtocolVersion() const { return _newestProtocolVersion; } + uint64_t lastRunProtocolVersion() const { return _lastRunProtocolVersion; } + uint64_t lowestCompatibleProtocolVersion() const { return _lowestCompatibleProtocolVersion; } + + template + void serialize(Ar& ar) { + serializer(ar, _newestProtocolVersion, _lastRunProtocolVersion, _lowestCompatibleProtocolVersion); + } +}; + +template <> +struct Traceable : std::true_type { + static std::string toString(const SWVersion& swVersion) { + return format("Newest: 0x%016lX, Last: 0x%016lX, MinCompatible: 0x%016lX", + swVersion.newestProtocolVersion(), + swVersion.lastRunProtocolVersion(), + swVersion.lowestCompatibleProtocolVersion()); + } +}; diff --git a/flow/Tracing.actor.cpp b/flow/Tracing.actor.cpp index d24673ca84..144f663b7e 100644 --- a/flow/Tracing.actor.cpp +++ b/flow/Tracing.actor.cpp @@ -19,7 +19,6 @@ */ #include "flow/Tracing.h" -#include "flow/IRandom.h" #include "flow/UnitTest.h" #include "flow/Knobs.h" #include "flow/network.h" @@ -43,11 +42,28 @@ constexpr float kQueueSizeLogInterval = 5.0; struct NoopTracer : ITracer { TracerType type() const override { return TracerType::DISABLED; } void trace(Span const& span) override {} + void trace(OTELSpan const& span) override {} }; struct LogfileTracer : ITracer { TracerType type() const override { return TracerType::LOG_FILE; } void trace(Span const& span) override { + TraceEvent te(SevInfo, "TracingSpan", span.context); + te.detail("Location", span.location.name) + .detail("Begin", format("%.6f", span.begin)) + .detail("End", format("%.6f", span.end)); + if (span.parents.size() == 1) { + te.detail("Parent", *span.parents.begin()); + } else { + for (auto parent : span.parents) { + TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent); + } + } + for (const auto& [key, value] : span.tags) { + TraceEvent(SevInfo, "TracingSpanTag", span.context).detail("Key", key).detail("Value", value); + } + } + void trace(OTELSpan const& span) override { TraceEvent te(SevInfo, "TracingSpan", span.context.traceID); te.detail("SpanID", span.context.spanID) .detail("Location", span.location.name) @@ -167,6 +183,31 @@ struct UDPTracer : public ITracer { // Serializes span fields as an array into the supplied TraceRequest // buffer. void serialize_span(const Span& span, TraceRequest& request) { + // If you change the serialization format here, make sure to update the + // fluentd filter to be able to correctly parse the updated format! See + // the msgpack specification for more info on the bit patterns used + // here. + uint8_t size = 8; + if (span.parents.size() == 0) + --size; + request.write_byte(size | 0b10010000); // write as array + + serialize_string(g_network->getLocalAddress().toString(), request); // ip:port + + serialize_value(span.context.first(), request, 0xcf); // trace id + serialize_value(span.context.second(), request, 0xcf); // token (span id) + + serialize_value(span.begin, request, 0xcb); // start time + serialize_value(span.end - span.begin, request, 0xcb); // duration + + serialize_string(span.location.name.toString(), request); + + serialize_map(span.tags, request); + + serialize_vector(span.parents, request); + } + + void serialize_span(const OTELSpan& span, TraceRequest& request) { uint16_t size = 14; request.write_byte(size | 0b10010000); // write as array serialize_value(span.context.traceID.first(), request, 0xcf); // trace id @@ -233,6 +274,30 @@ private: serialize_string(reinterpret_cast(str.data()), str.size(), request); } + // Writes the given vector of SpanIDs to the request. If the vector is + // empty, the request is not modified. + inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { + int size = vec.size(); + if (size == 0) { + return; + } + if (size <= 15) { + request.write_byte(static_cast(size) | 0b10010000); + } else if (size <= 65535) { + request.write_byte(0xdc); + request.write_byte(reinterpret_cast(&size)[1]); + request.write_byte(reinterpret_cast(&size)[0]); + } else { + TraceEvent(SevWarn, "TracingSpanSerializeVector") + .detail("Failed to MessagePack encode very large vector", size); + ASSERT_WE_THINK(false); + } + + for (const auto& parentContext : vec) { + serialize_value(parentContext.second(), request, 0xcf); + } + } + // Writes the given vector of linked SpanContext's to the request. If the vector is // empty, the request is not modified. inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { @@ -257,7 +322,7 @@ private: // Writes the given vector of linked SpanContext's to the request. If the vector is // empty, the request is not modified. - inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { + inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { int size = vec.size(); if (size <= 15) { request.write_byte(static_cast(size) | 0b10010000); @@ -388,6 +453,12 @@ struct FastUDPTracer : public UDPTracer { request_.reset(); } + void trace(OTELSpan const& span) override { + prepare(span.location.name.size()); + serialize_span(span, request_); + write(); + } + void trace(Span const& span) override { prepare(span.location.name.size()); serialize_span(span, request_); @@ -442,6 +513,28 @@ void openTracer(TracerType type) { ITracer::~ITracer() {} Span& Span::operator=(Span&& o) { + if (begin > 0.0 && context.second() > 0) { + end = g_network->now(); + g_tracer->trace(*this); + } + arena = std::move(o.arena); + context = o.context; + begin = o.begin; + end = o.end; + location = o.location; + parents = std::move(o.parents); + o.begin = 0; + return *this; +} + +Span::~Span() { + if (begin > 0.0 && context.second() > 0) { + end = g_network->now(); + g_tracer->trace(*this); + } +} + +OTELSpan& OTELSpan::operator=(OTELSpan&& o) { if (begin > 0.0 && o.context.isSampled() > 0) { end = g_network->now(); g_tracer->trace(*this); @@ -465,7 +558,7 @@ Span& Span::operator=(Span&& o) { return *this; } -Span::~Span() { +OTELSpan::~OTELSpan() { if (begin > 0.0 && context.isSampled()) { end = g_network->now(); g_tracer->trace(*this); @@ -474,15 +567,16 @@ Span::~Span() { TEST_CASE("/flow/Tracing/CreateOTELSpan") { // Sampling disabled, no parent. - Span notSampled("foo"_loc); + OTELSpan notSampled("foo"_loc); ASSERT(!notSampled.context.isSampled()); // Force Sampling - // Span sampled("foo"_loc, []() { return 1.0; }); - // ASSERT(sampled.context.isSampled()); + OTELSpan sampled("foo"_loc, []() { return 1.0; }); + ASSERT(sampled.context.isSampled()); // Ensure child traceID matches parent, when parent is sampled. - Span childTraceIDMatchesParent("foo"_loc, SpanContext(UID(100, 101), 200, TraceFlags::sampled)); + OTELSpan childTraceIDMatchesParent( + "foo"_loc, []() { return 1.0; }, SpanContext(UID(100, 101), 200, TraceFlags::sampled)); ASSERT(childTraceIDMatchesParent.context.traceID.first() == childTraceIDMatchesParent.parentContext.traceID.first()); ASSERT(childTraceIDMatchesParent.context.traceID.second() == @@ -490,20 +584,22 @@ TEST_CASE("/flow/Tracing/CreateOTELSpan") { // When the parent isn't sampled AND it has legitimate values we should not sample a child, // even if the child was randomly selected for sampling. - Span parentNotSampled("foo"_loc, SpanContext(UID(1, 1), 1, TraceFlags::unsampled)); + OTELSpan parentNotSampled( + "foo"_loc, []() { return 1.0; }, SpanContext(UID(1, 1), 1, TraceFlags::unsampled)); ASSERT(!parentNotSampled.context.isSampled()); // When the parent isn't sampled AND it has zero values for traceID and spanID this means // we should defer to the child as the new root of the trace as there was no actual parent. // If the child was sampled we should send the child trace with a null parent. - // Span noParent("foo"_loc, SpanContext(UID(0, 0), 0, TraceFlags::unsampled)); - // ASSERT(noParent.context.isSampled()); + OTELSpan noParent( + "foo"_loc, []() { return 1.0; }, SpanContext(UID(0, 0), 0, TraceFlags::unsampled)); + ASSERT(noParent.context.isSampled()); return Void(); }; TEST_CASE("/flow/Tracing/AddEvents") { // Use helper method to add an OTELEventRef to an OTELSpan. - Span span1("span_with_event"_loc); + OTELSpan span1("span_with_event"_loc); auto arena = span1.arena; SmallVectorRef attrs; attrs.push_back(arena, KeyValueRef("foo"_sr, "bar"_sr)); @@ -514,14 +610,14 @@ TEST_CASE("/flow/Tracing/AddEvents") { ASSERT(span1.events[0].attributes.begin()->value.toString() == "bar"); // Use helper method to add an OTELEventRef with no attributes to an OTELSpan - Span span2("span_with_event"_loc); + OTELSpan span2("span_with_event"_loc); span2.addEvent(StringRef(span2.arena, LiteralStringRef("commit_succeed")), 1234567.100); ASSERT(span2.events[0].name.toString() == "commit_succeed"); ASSERT(span2.events[0].time == 1234567.100); ASSERT(span2.events[0].attributes.size() == 0); // Add fully constructed OTELEventRef to OTELSpan passed by value. - Span span3("span_with_event"_loc); + OTELSpan span3("span_with_event"_loc); auto s3Arena = span3.arena; SmallVectorRef s3Attrs; s3Attrs.push_back(s3Arena, KeyValueRef("xyz"_sr, "123"_sr)); @@ -540,10 +636,7 @@ TEST_CASE("/flow/Tracing/AddEvents") { }; TEST_CASE("/flow/Tracing/AddAttributes") { - Span span1("span_with_attrs"_loc, - SpanContext(deterministicRandom()->randomUniqueID(), - deterministicRandom()->randomUInt64(), - TraceFlags::sampled)); + OTELSpan span1("span_with_attrs"_loc); auto arena = span1.arena; span1.addAttribute(StringRef(arena, LiteralStringRef("foo")), StringRef(arena, LiteralStringRef("bar"))); span1.addAttribute(StringRef(arena, LiteralStringRef("operation")), StringRef(arena, LiteralStringRef("grv"))); @@ -551,34 +644,25 @@ TEST_CASE("/flow/Tracing/AddAttributes") { ASSERT(span1.attributes[1] == KeyValueRef("foo"_sr, "bar"_sr)); ASSERT(span1.attributes[2] == KeyValueRef("operation"_sr, "grv"_sr)); - Span span2("span_with_attrs"_loc, - SpanContext(deterministicRandom()->randomUniqueID(), - deterministicRandom()->randomUInt64(), - TraceFlags::sampled)); - auto s2Arena = span2.arena; - span2.addAttribute(StringRef(s2Arena, LiteralStringRef("a")), StringRef(s2Arena, LiteralStringRef("1"))) - .addAttribute(StringRef(s2Arena, LiteralStringRef("b")), LiteralStringRef("2")) - .addAttribute(StringRef(s2Arena, LiteralStringRef("c")), LiteralStringRef("3")); + OTELSpan span3("span_with_attrs"_loc); + auto s3Arena = span3.arena; + span3.addAttribute(StringRef(s3Arena, LiteralStringRef("a")), StringRef(s3Arena, LiteralStringRef("1"))) + .addAttribute(StringRef(s3Arena, LiteralStringRef("b")), LiteralStringRef("2")) + .addAttribute(StringRef(s3Arena, LiteralStringRef("c")), LiteralStringRef("3")); - ASSERT_EQ(span2.attributes.size(), 4); // Includes default attribute of "address" - ASSERT(span2.attributes[1] == KeyValueRef("a"_sr, "1"_sr)); - ASSERT(span2.attributes[2] == KeyValueRef("b"_sr, "2"_sr)); - ASSERT(span2.attributes[3] == KeyValueRef("c"_sr, "3"_sr)); + ASSERT_EQ(span3.attributes.size(), 4); // Includes default attribute of "address" + ASSERT(span3.attributes[1] == KeyValueRef("a"_sr, "1"_sr)); + ASSERT(span3.attributes[2] == KeyValueRef("b"_sr, "2"_sr)); + ASSERT(span3.attributes[3] == KeyValueRef("c"_sr, "3"_sr)); return Void(); }; TEST_CASE("/flow/Tracing/AddLinks") { - Span span1("span_with_links"_loc); - ASSERT(!span1.context.isSampled()); - ASSERT(!span1.context.isValid()); + OTELSpan span1("span_with_links"_loc); span1.addLink(SpanContext(UID(100, 101), 200, TraceFlags::sampled)); span1.addLink(SpanContext(UID(200, 201), 300, TraceFlags::unsampled)) .addLink(SpanContext(UID(300, 301), 400, TraceFlags::sampled)); - // Ensure the root span is now sampled and traceID and spanIDs are set. - ASSERT(span1.context.isSampled()); - ASSERT(span1.context.isValid()); - // Ensure links are present. ASSERT(span1.links[0].traceID == UID(100, 101)); ASSERT(span1.links[0].spanID == 200); ASSERT(span1.links[0].m_Flags == TraceFlags::sampled); @@ -589,16 +673,11 @@ TEST_CASE("/flow/Tracing/AddLinks") { ASSERT(span1.links[2].spanID == 400); ASSERT(span1.links[2].m_Flags == TraceFlags::sampled); - Span span2("span_with_links"_loc); - ASSERT(!span2.context.isSampled()); - ASSERT(!span2.context.isValid()); + OTELSpan span2("span_with_links"_loc); auto link1 = SpanContext(UID(1, 1), 1, TraceFlags::sampled); auto link2 = SpanContext(UID(2, 2), 2, TraceFlags::sampled); auto link3 = SpanContext(UID(3, 3), 3, TraceFlags::sampled); span2.addLinks({ link1, link2 }).addLinks({ link3 }); - // Ensure the root span is now sampled and traceID and spanIDs are set. - ASSERT(span2.context.isSampled()); - ASSERT(span2.context.isValid()); ASSERT(span2.links[0].traceID == UID(1, 1)); ASSERT(span2.links[0].spanID == 1); ASSERT(span2.links[0].m_Flags == TraceFlags::sampled); @@ -662,7 +741,7 @@ std::string readMPString(uint8_t* index) { // Windows doesn't like lack of header and declaration of constructor for FastUDPTracer #ifndef WIN32 TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { - Span span1("encoded_span"_loc); + OTELSpan span1("encoded_span"_loc); auto request = TraceRequest{ .buffer = std::make_unique(kTraceBufferSize), .data_size = 0, .buffer_size = kTraceBufferSize }; @@ -674,9 +753,9 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { // Test - constructor OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link) // Will delegate to other constructors. - Span span2("encoded_span"_loc, - SpanContext(UID(100, 101), 1, TraceFlags::sampled), - { SpanContext(UID(200, 201), 2, TraceFlags::sampled) }); + OTELSpan span2("encoded_span"_loc, + SpanContext(UID(100, 101), 1, TraceFlags::sampled), + SpanContext(UID(200, 201), 2, TraceFlags::sampled)); tracer.serialize_span(span2, request); data = request.buffer.get(); ASSERT(data[0] == 0b10011110); // 14 element array. @@ -722,7 +801,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { request.reset(); // Exercise all fluent interfaces, include links, events, and attributes. - Span span3("encoded_span_3"_loc, SpanContext()); + OTELSpan span3("encoded_span_3"_loc); auto s3Arena = span3.arena; SmallVectorRef attrs; attrs.push_back(s3Arena, KeyValueRef("foo"_sr, "bar"_sr)); @@ -791,7 +870,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { "SGKKUrpIb/7zePhBDi+gzUzyAcbQ2zUbFWI1KNi3zQk58uUG6wWJZkw+GCs7Cc3V" "OUxOljwCJkC4QTgdsbbFhxUC+rtoHV5xAqoTQwR0FXnWigUjP7NtdL6huJUr3qRv" "40c4yUI1a4+P5vJa"; - Span span4; + auto span4 = OTELSpan(); auto location = Location(); location.name = StringRef(span4.arena, longString); span4.location = location; diff --git a/flow/Tracing.h b/flow/Tracing.h index f394e66bf3..c289a73fcc 100644 --- a/flow/Tracing.h +++ b/flow/Tracing.h @@ -33,43 +33,90 @@ inline Location operator"" _loc(const char* str, size_t size) { return Location{ StringRef(reinterpret_cast(str), size) }; } -enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 }; - -inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) { - return static_cast(static_cast>(lhs) & - static_cast>(rhs)); -} - -struct SpanContext { - UID traceID; - uint64_t spanID; - TraceFlags m_Flags; - SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {} - SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {} - SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {} - SpanContext(Arena arena, const SpanContext& span) - : traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {} - bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; } - std::string toString() const { return format("%016llx%016llx%016llx", traceID.first(), traceID.second(), spanID); }; - bool isValid() const { return traceID.first() != 0 && traceID.second() != 0 && spanID != 0; } - - template - void serialize(Ar& ar) { - serializer(ar, traceID, spanID, m_Flags); +struct Span { + Span(SpanID context, Location location, std::initializer_list const& parents = {}) + : context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) { + if (parents.size() > 0) { + // If the parents' token is 0 (meaning the trace should not be + // recorded), set the child token to 0 as well. Otherwise, generate + // a new, random token. + uint64_t traceId = 0; + if ((*parents.begin()).second() > 0) { + traceId = deterministicRandom()->randomUInt64(); + } + this->context = SpanID((*parents.begin()).first(), traceId); + } } + Span(Location location, std::initializer_list const& parents = {}) + : Span(UID(deterministicRandom()->randomUInt64(), + deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE + ? deterministicRandom()->randomUInt64() + : 0), + location, + parents) {} + Span(Location location, SpanID context) : Span(location, { context }) {} + Span(const Span&) = delete; + Span(Span&& o) { + arena = std::move(o.arena); + context = o.context; + begin = o.begin; + end = o.end; + location = o.location; + parents = std::move(o.parents); + o.context = UID(); + o.begin = 0.0; + o.end = 0.0; + } + Span() {} + ~Span(); + Span& operator=(Span&& o); + Span& operator=(const Span&) = delete; + void swap(Span& other) { + std::swap(arena, other.arena); + std::swap(context, other.context); + std::swap(begin, other.begin); + std::swap(end, other.end); + std::swap(location, other.location); + std::swap(parents, other.parents); + } + + void addParent(SpanID span) { + if (parents.size() == 0) { + uint64_t traceId = 0; + if (span.second() > 0) { + traceId = context.second() == 0 ? deterministicRandom()->randomUInt64() : context.second(); + } + // Use first parent to set trace ID. This is non-ideal for spans + // with multiple parents, because the trace ID will associate the + // span with only one trace. A workaround is to look at the parent + // relationships instead of the trace ID. Another option in the + // future is to keep a list of trace IDs. + context = SpanID(span.first(), traceId); + } + parents.push_back(arena, span); + } + + void addTag(const StringRef& key, const StringRef& value) { tags[key] = value; } + + Arena arena; + UID context = UID(); + double begin = 0.0, end = 0.0; + Location location; + SmallVectorRef parents; + std::unordered_map tags; }; -// Span +// OTELSpan // -// Span is a tracing implementation which, for the most part, complies with the W3C Trace Context specification +// OTELSpan is a tracing implementation which, for the most part, complies with the W3C Trace Context specification // https://www.w3.org/TR/trace-context/ and the OpenTelemetry API // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md. // -// The major differences between Span and the 7.0 Span implementation, which is based off the OpenTracing.io +// The major differences between OTELSpan and the current Span implementation, which is based off the OpenTracing.io // specification https://opentracing.io/ are as follows. // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#span // -// OpenTelemetry Spans have... +// OTELSpans have... // 1. A SpanContext which consists of 3 attributes. // // TraceId - A valid trace identifier is a 16-byte array with at least one non-zero byte. @@ -99,61 +146,82 @@ enum class SpanKind : uint8_t { INTERNAL = 0, CLIENT = 1, SERVER = 2, PRODUCER = enum class SpanStatus : uint8_t { UNSET = 0, OK = 1, ERR = 2 }; -struct SpanEventRef { - SpanEventRef() {} - SpanEventRef(const StringRef& name, +struct OTELEventRef { + OTELEventRef() {} + OTELEventRef(const StringRef& name, const double& time, const SmallVectorRef& attributes = SmallVectorRef()) : name(name), time(time), attributes(attributes) {} - SpanEventRef(Arena& arena, const SpanEventRef& other) + OTELEventRef(Arena& arena, const OTELEventRef& other) : name(arena, other.name), time(other.time), attributes(arena, other.attributes) {} StringRef name; double time = 0.0; SmallVectorRef attributes; }; -class Span { +class OTELSpan { public: - // Construct a Span with a given context, location, parentContext and optional links. - // - // N.B. While this constructor receives a parentContext it does not overwrite the traceId of the Span's context. - // Therefore it is the responsibility of the caller to ensure the traceID and m_Flags of both the context and - // parentContext are identical if the caller wishes to establish a parent/child relationship between these spans. We - // do this to avoid needless comparisons or copies as this constructor is only called once in NativeAPI.actor.cpp - // and from below in the by the Span(location, parent, links) constructor. The Span(location, parent, links) - // constructor is used broadly and performs the copy of the parent's traceID and m_Flags. - Span(const SpanContext& context, - const Location& location, - const SpanContext& parentContext, - const std::initializer_list& links = {}) + OTELSpan(const SpanContext& context, + const Location& location, + const SpanContext& parentContext, + const std::initializer_list& links = {}) : context(context), location(location), parentContext(parentContext), links(arena, links.begin(), links.end()), begin(g_network->now()) { + // We've simplified the logic here, essentially we're now always setting trace and span ids and relying on the + // TraceFlags to determine if we're sampling. Therefore if the parent is sampled, we simply overwrite this + // span's traceID with the parent trace id. + if (parentContext.isSampled()) { + this->context.traceID = UID(parentContext.traceID.first(), parentContext.traceID.second()); + this->context.m_Flags = TraceFlags::sampled; + } else { + // However there are two other cases. + // 1. A legitamite parent span exists but it was not selected for tracing. + // 2. There is no actual parent, just a default arg parent provided by the constructor AND the "child" span + // was selected for sampling. For case 1. we handle below by marking the child as unsampled. For case 2 we + // needn't do anything, and can rely on the values in this OTELSpan + if (parentContext.traceID.first() != 0 && parentContext.traceID.second() != 0 && + parentContext.spanID != 0) { + this->context.m_Flags = TraceFlags::unsampled; + } + } this->kind = SpanKind::SERVER; this->status = SpanStatus::OK; this->attributes.push_back( this->arena, KeyValueRef("address"_sr, StringRef(this->arena, g_network->getLocalAddress().toString()))); } - // Construct Span with a location, parent, and optional links. - // This constructor copies the parent's traceID creating a parent->child relationship between Spans. - // Additionally we inherit the m_Flags of the parent, thus enabling or disabling sampling to match the parent. - Span(const Location& location, const SpanContext& parent, const std::initializer_list& links = {}) - : Span(SpanContext(parent.traceID, deterministicRandom()->randomUInt64(), parent.m_Flags), - location, - parent, - links) {} + OTELSpan(const Location& location, + const SpanContext& parent = SpanContext(), + const std::initializer_list& links = {}) + : OTELSpan( + SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()), // traceID + deterministicRandom()->randomUInt64(), // spanID + deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE // sampled or unsampled + ? TraceFlags::sampled + : TraceFlags::unsampled), + location, + parent, + links) {} - // Construct Span without parent. Used for creating a root span, or when the parent is not known at construction - // time. - Span(const SpanContext& context, const Location& location) : Span(context, location, SpanContext()) {} + OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link) + : OTELSpan(location, parent, { link }) {} - // We've determined for initial tracing release, spans with only a location will not be traced. - // Generally these are for background processes, some are called infrequently, while others may be high volume. - // TODO: review and address in subsequent PRs. - Span(const Location& location) : location(location), begin(g_network->now()) {} + // NOTE: This constructor is primarly for unit testing until we sort out how to enable/disable a Knob dynamically in + // a test. + OTELSpan(const Location& location, + const std::function& rateProvider, + const SpanContext& parent = SpanContext(), + const std::initializer_list& links = {}) + : OTELSpan(SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()), + deterministicRandom()->randomUInt64(), + deterministicRandom()->random01() < rateProvider() ? TraceFlags::sampled + : TraceFlags::unsampled), + location, + parent, + links) {} - Span(const Span&) = delete; - Span(Span&& o) { + OTELSpan(const OTELSpan&) = delete; + OTELSpan(OTELSpan&& o) { arena = std::move(o.arena); context = o.context; location = o.location; @@ -171,11 +239,11 @@ public: o.end = 0.0; o.status = SpanStatus::UNSET; } - Span() {} - ~Span(); - Span& operator=(Span&& o); - Span& operator=(const Span&) = delete; - void swap(Span& other) { + OTELSpan() {} + ~OTELSpan(); + OTELSpan& operator=(OTELSpan&& o); + OTELSpan& operator=(const OTELSpan&) = delete; + void swap(OTELSpan& other) { std::swap(arena, other.arena); std::swap(context, other.context); std::swap(location, other.location); @@ -188,62 +256,34 @@ public: std::swap(events, other.events); } - Span& addLink(const SpanContext& linkContext) { + OTELSpan& addLink(const SpanContext& linkContext) { links.push_back(arena, linkContext); - // Check if link is sampled, if so sample this span. - if (!context.isSampled() && linkContext.isSampled()) { - context.m_Flags = TraceFlags::sampled; - // If for some reason this span isn't valid, we need to give it a - // traceID and spanID. This case is currently hit in CommitProxyServer - // CommitBatchContext::CommitBatchContext and CommitBatchContext::setupTraceBatch. - if (!context.isValid()) { - context.traceID = deterministicRandom()->randomUniqueID(); - context.spanID = deterministicRandom()->randomUInt64(); - } - } return *this; } - Span& addLinks(const std::initializer_list& linkContexts = {}) { + OTELSpan& addLinks(const std::initializer_list& linkContexts = {}) { for (auto const& sc : linkContexts) { - addLink(sc); + links.push_back(arena, sc); } return *this; } - Span& addEvent(const SpanEventRef& event) { + OTELSpan& addEvent(const OTELEventRef& event) { events.push_back_deep(arena, event); return *this; } - Span& addEvent(const StringRef& name, - const double& time, - const SmallVectorRef& attrs = SmallVectorRef()) { - return addEvent(SpanEventRef(name, time, attrs)); + OTELSpan& addEvent(const StringRef& name, + const double& time, + const SmallVectorRef& attrs = SmallVectorRef()) { + return addEvent(OTELEventRef(name, time, attrs)); } - Span& addAttribute(const StringRef& key, const StringRef& value) { + OTELSpan& addAttribute(const StringRef& key, const StringRef& value) { attributes.push_back_deep(arena, KeyValueRef(key, value)); return *this; } - Span& setParent(const SpanContext& parent) { - parentContext = parent; - context.traceID = parent.traceID; - context.spanID = deterministicRandom()->randomUInt64(); - context.m_Flags = parent.m_Flags; - return *this; - } - - Span& addParentOrLink(const SpanContext& other) { - if (!parentContext.isValid()) { - parentContext = other; - } else { - links.push_back(arena, other); - } - return *this; - } - Arena arena; SpanContext context; Location location; @@ -252,7 +292,7 @@ public: SmallVectorRef links; double begin = 0.0, end = 0.0; SmallVectorRef attributes; // not necessarily sorted - SmallVectorRef events; + SmallVectorRef events; SpanStatus status; }; @@ -271,6 +311,7 @@ struct ITracer { virtual TracerType type() const = 0; // passed ownership to the tracer virtual void trace(Span const& span) = 0; + virtual void trace(OTELSpan const& span) = 0; }; void openTracer(TracerType type); @@ -287,3 +328,16 @@ struct SpannedDeque : Deque { span = std::move(other.span); } }; + +template +struct OTELSpannedDeque : Deque { + OTELSpan span; + explicit OTELSpannedDeque(Location loc) : span(loc) {} + OTELSpannedDeque(OTELSpannedDeque&& other) : Deque(std::move(other)), span(std::move(other.span)) {} + OTELSpannedDeque(OTELSpannedDeque const&) = delete; + OTELSpannedDeque& operator=(OTELSpannedDeque const&) = delete; + OTELSpannedDeque& operator=(OTELSpannedDeque&& other) { + *static_cast*>(this) = std::move(other); + span = std::move(other.span); + } +}; diff --git a/flow/error_definitions.h b/flow/error_definitions.h index b40032d00a..a80bd2d3a8 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -123,6 +123,7 @@ ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different cluster ID" ) ERROR( restart_cluster_controller, 1218, "Restart cluster controller process" ) ERROR( please_reboot_remote_kv_store, 1219, "Need to reboot the storage engine process as it died abnormally") +ERROR( incompatible_software_version, 1220, "Current software does not support database format" ) // 15xx Platform errors ERROR( platform_error, 1500, "Platform error" ) diff --git a/tests/TestRunner/upgrade_test.py b/tests/TestRunner/upgrade_test.py index 85ffd226e5..7452107ba0 100755 --- a/tests/TestRunner/upgrade_test.py +++ b/tests/TestRunner/upgrade_test.py @@ -148,24 +148,31 @@ class UpgradeTest: local_file = self.binary_path(version, target_bin_name) if (local_file.exists()): return + + # Download to a temporary file and then replace the target file atomically + # to avoid consistency errors in case of multiple tests are downloading the + # same file in parallel + local_file_tmp = Path("{}.{}".format( + str(local_file), random_secret_string(8))) self.download_dir.joinpath(version).mkdir( parents=True, exist_ok=True) remote_file = "{}{}/{}".format(FDB_DOWNLOAD_ROOT, version, remote_bin_name) remote_sha256 = "{}.sha256".format(remote_file) - local_sha256 = Path("{}.sha256".format(local_file)) + local_sha256 = Path("{}.sha256".format(local_file_tmp)) for attempt_cnt in range(MAX_DOWNLOAD_ATTEMPTS): - print("Downloading '{}' to '{}'...".format(remote_file, local_file)) - request.urlretrieve(remote_file, local_file) + print("Downloading '{}' to '{}'...".format( + remote_file, local_file_tmp)) + request.urlretrieve(remote_file, local_file_tmp) print("Downloading '{}' to '{}'...".format( remote_sha256, local_sha256)) request.urlretrieve(remote_sha256, local_sha256) print("Download complete") - assert local_file.exists(), "{} does not exist".format(local_file) + assert local_file_tmp.exists(), "{} does not exist".format(local_file_tmp) assert local_sha256.exists(), "{} does not exist".format(local_sha256) expected_checksum = read_to_str(local_sha256) - actual_checkum = compute_sha256(local_file) + actual_checkum = compute_sha256(local_file_tmp) if (expected_checksum == actual_checkum): print("Checksum OK") break @@ -173,7 +180,10 @@ class UpgradeTest: expected_checksum, actual_checkum)) if attempt_cnt == MAX_DOWNLOAD_ATTEMPTS-1: assert False, "Failed to download {} after {} attempts".format( - local_file, MAX_DOWNLOAD_ATTEMPTS) + local_file_tmp, MAX_DOWNLOAD_ATTEMPTS) + + os.rename(local_file_tmp, local_file) + os.remove(local_sha256) if makeExecutable: make_executable(local_file)