Merge branch 'main' of https://github.com/apple/foundationdb into readaware

This commit is contained in:
Xiaoxi Wang 2022-05-03 13:37:56 -07:00
commit 269d85daa8
69 changed files with 954 additions and 743 deletions

View File

@ -171,6 +171,13 @@ futures must apply the following rules to the result:
the language binding. Make sure the API returns without error. Finally
push the string "GOT_ESTIMATED_RANGE_SIZE" onto the stack.
#### GET_RANGE_SPLIT_POINTS
Pops the top three items off of the stack as BEGIN_KEY, END_KEY and
CHUNK_SIZE. Then call the `getRangeSplitPoints` API of the language
binding. Make sure the API returns without error. Finally push the string
"GOT_RANGE_SPLIT_POINTS" onto the stack.
#### GET_KEY (_SNAPSHOT, _DATABASE)
Pops the top four items off of the stack as KEY, OR_EQUAL, OFFSET, PREFIX

View File

@ -169,7 +169,7 @@ public:
// if there was an error or not all loads finished, delete data
for (auto& it : loadsInProgress) {
uint8_t* dataToFree = it.second;
delete dataToFree;
delete[] dataToFree;
}
}
};
@ -203,7 +203,7 @@ static void granule_free_load(int64_t loadId, void* context) {
TesterGranuleContext* ctx = (TesterGranuleContext*)context;
auto it = ctx->loadsInProgress.find(loadId);
uint8_t* dataToFree = it->second;
delete dataToFree;
delete[] dataToFree;
ctx->loadsInProgress.erase(it);
}

View File

@ -44,6 +44,8 @@
#include "fdbclient/Tuple.h"
#include "flow/config.h"
#include "flow/DeterministicRandom.h"
#include "flow/IRandom.h"
#include "fdb_api.hpp"
@ -2021,15 +2023,17 @@ TEST_CASE("fdb_transaction_add_conflict_range") {
TEST_CASE("special-key-space valid transaction ID") {
auto value = get_value("\xff\xff/tracing/transaction_id", /* snapshot */ false, {});
REQUIRE(value.has_value());
uint64_t transaction_id = std::stoul(value.value());
CHECK(transaction_id > 0);
UID transaction_id = UID::fromString(value.value());
CHECK(transaction_id.first() > 0);
CHECK(transaction_id.second() > 0);
}
TEST_CASE("special-key-space custom transaction ID") {
fdb::Transaction tr(db);
fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0));
while (1) {
tr.set("\xff\xff/tracing/transaction_id", std::to_string(ULONG_MAX));
UID randomTransactionID = UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64());
tr.set("\xff\xff/tracing/transaction_id", randomTransactionID.toString());
fdb::ValueFuture f1 = tr.get("\xff\xff/tracing/transaction_id",
/* snapshot */ false);
@ -2046,8 +2050,8 @@ TEST_CASE("special-key-space custom transaction ID") {
fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen));
REQUIRE(out_present);
uint64_t transaction_id = std::stoul(std::string(val, vallen));
CHECK(transaction_id == ULONG_MAX);
UID transaction_id = UID::fromString(val);
CHECK(transaction_id == randomTransactionID);
break;
}
}
@ -2074,8 +2078,9 @@ TEST_CASE("special-key-space set transaction ID after write") {
fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen));
REQUIRE(out_present);
uint64_t transaction_id = std::stoul(std::string(val, vallen));
CHECK(transaction_id != 0);
UID transaction_id = UID::fromString(val);
CHECK(transaction_id.first() > 0);
CHECK(transaction_id.second() > 0);
break;
}
}
@ -2140,7 +2145,9 @@ TEST_CASE("special-key-space tracing get range") {
CHECK(out_count == 2);
CHECK(std::string((char*)out_kv[1].key, out_kv[1].key_length) == tracingBegin + "transaction_id");
CHECK(std::stoul(std::string((char*)out_kv[1].value, out_kv[1].value_length)) > 0);
UID transaction_id = UID::fromString(std::string((char*)out_kv[1].value));
CHECK(transaction_id.first() > 0);
CHECK(transaction_id.second() > 0);
break;
}
}

View File

@ -84,6 +84,27 @@ public class MappedKeyValue extends KeyValue {
return b;
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (obj == this)
return true;
if (!(obj instanceof MappedKeyValue))
return false;
MappedKeyValue rhs = (MappedKeyValue) obj;
return Arrays.equals(rangeBegin, rhs.rangeBegin)
&& Arrays.equals(rangeEnd, rhs.rangeEnd)
&& Objects.equals(rangeResult, rhs.rangeResult);
}
@Override
public int hashCode() {
int hashForResult = rangeResult == null ? 0 : rangeResult.hashCode();
return 17 + (29 * hashForResult + 37 * Arrays.hashCode(rangeBegin) + Arrays.hashCode(rangeEnd));
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("MappedKeyValue{");

View File

@ -433,7 +433,9 @@ public interface ReadTransaction extends ReadTransactionContext {
*
* @param begin the beginning of the range (inclusive)
* @param end the end of the range (exclusive)
* @param mapper TODO
* @param mapper defines how to map a key-value pair (one of the key-value pairs got
* from the first range query) to a GetRange (or GetValue) request.
* more details: https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange
* @param limit the maximum number of results to return. Limits results to the
* <i>first</i> keys in the range. Pass {@link #ROW_LIMIT_UNLIMITED} if this query
* should not limit the number of results. If {@code reverse} is {@code true} rows

View File

@ -279,6 +279,36 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
return ret;
}
void configureGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
const char* opts[] = { "new",
"single",
"double",
"triple",
"three_data_hall",
"three_datacenter",
"ssd",
"ssd-1",
"ssd-2",
"memory",
"memory-1",
"memory-2",
"memory-radixtree-beta",
"commit_proxies=",
"grv_proxies=",
"logs=",
"resolvers=",
"perpetual_storage_wiggle=",
"perpetual_storage_wiggle_locality=",
"storage_migration_type=",
"tenant_mode=",
"blob_granules_enabled=",
nullptr };
arrayGenerator(text, line, opts, lc);
}
CommandFactory configureFactory(
"configure",
CommandHelp(
@ -322,6 +352,7 @@ CommandFactory configureFactory(
"optional, then transactions can be run with or without specifying tenants. If required, all data must be "
"accessed using tenants.\n\n"
"See the FoundationDB Administration Guide for more information."));
"See the FoundationDB Administration Guide for more information."),
&configureGenerator);
} // namespace fdb_cli

View File

@ -95,6 +95,14 @@ ACTOR Future<bool> killCommandActor(Reference<IDatabase> db,
return result;
}
void killGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
const char* opts[] = { "all", "list", nullptr };
arrayGenerator(text, line, opts, lc);
}
CommandFactory killFactory(
"kill",
CommandHelp(
@ -103,5 +111,6 @@ CommandFactory killFactory(
"If no addresses are specified, populates the list of processes which can be killed. Processes cannot be "
"killed before this list has been populated.\n\nIf `all' is specified, attempts to kill all known "
"processes.\n\nIf `list' is specified, displays all known processes. This is only useful when the database is "
"unresponsive.\n\nFor each IP:port pair in <ADDRESS ...>, attempt to kill the specified process."));
"unresponsive.\n\nFor each IP:port pair in <ADDRESS ...>, attempt to kill the specified process."),
&killGenerator);
} // namespace fdb_cli

View File

@ -1250,6 +1250,16 @@ ACTOR Future<bool> statusCommandActor(Reference<IDatabase> db,
return true;
}
void statusGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
if (tokens.size() == 1) {
const char* opts[] = { "minimal", "details", "json", nullptr };
arrayGenerator(text, line, opts, lc);
}
}
CommandFactory statusFactory(
"status",
CommandHelp("status [minimal|details|json]",
@ -1258,5 +1268,6 @@ CommandFactory statusFactory(
"what is wrong. If the cluster is running, this command will print cluster "
"statistics.\n\nSpecifying `minimal' will provide a minimal description of the status of your "
"database.\n\nSpecifying `details' will provide load information for individual "
"workers.\n\nSpecifying `json' will provide status information in a machine readable JSON format."));
"workers.\n\nSpecifying `json' will provide status information in a machine readable JSON format."),
&statusGenerator);
} // namespace fdb_cli

View File

@ -310,10 +310,104 @@ ACTOR Future<bool> throttleCommandActor(Reference<IDatabase> db, std::vector<Str
return true;
}
void throttleGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
if (tokens.size() == 1) {
const char* opts[] = { "on tag", "off", "enable auto", "disable auto", "list", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "on")) {
if (tokens.size() == 2) {
const char* opts[] = { "tag", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 6) {
const char* opts[] = { "default", "immediate", "batch", nullptr };
arrayGenerator(text, line, opts, lc);
}
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "off") && !tokencmp(tokens[tokens.size() - 1], "tag")) {
const char* opts[] = { "all", "auto", "manual", "tag", "default", "immediate", "batch", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 2 && (tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable"))) {
const char* opts[] = { "auto", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
if (tokens.size() == 2) {
const char* opts[] = { "throttled", "recommended", "all", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 3) {
const char* opts[] = { "LIMITS", nullptr };
arrayGenerator(text, line, opts, lc);
}
}
}
std::vector<const char*> throttleHintGenerator(std::vector<StringRef> const& tokens, bool inArgument) {
if (tokens.size() == 1) {
return { "<on|off|enable auto|disable auto|list>", "[ARGS]" };
} else if (tokencmp(tokens[1], "on")) {
std::vector<const char*> opts = { "tag", "<TAG>", "[RATE]", "[DURATION]", "[default|immediate|batch]" };
if (tokens.size() == 2) {
return opts;
} else if (((tokens.size() == 3 && inArgument) || tokencmp(tokens[2], "tag")) && tokens.size() < 7) {
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
}
} else if (tokencmp(tokens[1], "off")) {
if (tokencmp(tokens[tokens.size() - 1], "tag")) {
return { "<TAG>" };
} else {
bool hasType = false;
bool hasTag = false;
bool hasPriority = false;
for (int i = 2; i < tokens.size(); ++i) {
if (tokencmp(tokens[i], "all") || tokencmp(tokens[i], "auto") || tokencmp(tokens[i], "manual")) {
hasType = true;
} else if (tokencmp(tokens[i], "default") || tokencmp(tokens[i], "immediate") ||
tokencmp(tokens[i], "batch")) {
hasPriority = true;
} else if (tokencmp(tokens[i], "tag")) {
hasTag = true;
++i;
} else {
return {};
}
}
std::vector<const char*> options;
if (!hasType) {
options.push_back("[all|auto|manual]");
}
if (!hasTag) {
options.push_back("[tag <TAG>]");
}
if (!hasPriority) {
options.push_back("[default|immediate|batch]");
}
return options;
}
} else if ((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 2) {
return { "auto" };
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
if (tokens.size() == 2) {
return { "[throttled|recommended|all]", "[LIMITS]" };
} else if (tokens.size() == 3 && (tokencmp(tokens[2], "throttled") || tokencmp(tokens[2], "recommended") ||
tokencmp(tokens[2], "all"))) {
return { "[LIMITS]" };
}
} else if (tokens.size() == 2 && inArgument) {
return { "[ARGS]" };
}
return std::vector<const char*>();
}
CommandFactory throttleFactory(
"throttle",
CommandHelp("throttle <on|off|enable auto|disable auto|list> [ARGS]",
"view and control throttled tags",
"Use `on' and `off' to manually throttle or unthrottle tags. Use `enable auto' or `disable auto' "
"to enable or disable automatic tag throttling. Use `list' to print the list of throttled tags.\n"));
"to enable or disable automatic tag throttling. Use `list' to print the list of throttled tags.\n"),
&throttleGenerator,
&throttleHintGenerator);
} // namespace fdb_cli

View File

@ -758,6 +758,7 @@ void optionGenerator(const char* text, const char* line, std::vector<std::string
}
}
namespace fdb_cli {
void arrayGenerator(const char* text, const char* line, const char** options, std::vector<std::string>& lc) {
const char** iter = options;
int len = strlen(text);
@ -770,81 +771,13 @@ void arrayGenerator(const char* text, const char* line, const char** options, st
}
}
}
} // namespace fdb_cli
void onOffGenerator(const char* text, const char* line, std::vector<std::string>& lc) {
const char* opts[] = { "on", "off", nullptr };
arrayGenerator(text, line, opts, lc);
}
void configureGenerator(const char* text, const char* line, std::vector<std::string>& lc) {
const char* opts[] = { "new",
"single",
"double",
"triple",
"three_data_hall",
"three_datacenter",
"ssd",
"ssd-1",
"ssd-2",
"memory",
"memory-1",
"memory-2",
"memory-radixtree-beta",
"commit_proxies=",
"grv_proxies=",
"logs=",
"resolvers=",
"perpetual_storage_wiggle=",
"perpetual_storage_wiggle_locality=",
"storage_migration_type=",
"tenant_mode=",
"blob_granules_enabled=",
nullptr };
arrayGenerator(text, line, opts, lc);
}
void statusGenerator(const char* text, const char* line, std::vector<std::string>& lc) {
const char* opts[] = { "minimal", "details", "json", nullptr };
arrayGenerator(text, line, opts, lc);
}
void killGenerator(const char* text, const char* line, std::vector<std::string>& lc) {
const char* opts[] = { "all", "list", nullptr };
arrayGenerator(text, line, opts, lc);
}
void throttleGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
if (tokens.size() == 1) {
const char* opts[] = { "on tag", "off", "enable auto", "disable auto", "list", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "on")) {
if (tokens.size() == 2) {
const char* opts[] = { "tag", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 6) {
const char* opts[] = { "default", "immediate", "batch", nullptr };
arrayGenerator(text, line, opts, lc);
}
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "off") && !tokencmp(tokens[tokens.size() - 1], "tag")) {
const char* opts[] = { "all", "auto", "manual", "tag", "default", "immediate", "batch", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 2 && (tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable"))) {
const char* opts[] = { "auto", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
if (tokens.size() == 2) {
const char* opts[] = { "throttled", "recommended", "all", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 3) {
const char* opts[] = { "LIMITS", nullptr };
arrayGenerator(text, line, opts, lc);
}
}
}
void fdbcliCompCmd(std::string const& text, std::vector<std::string>& lc) {
bool err, partial;
std::string whole_line = text;
@ -892,81 +825,10 @@ void fdbcliCompCmd(std::string const& text, std::vector<std::string>& lc) {
onOffGenerator(ntext.c_str(), base_input.c_str(), lc);
}
if (tokencmp(tokens[0], "configure")) {
configureGenerator(ntext.c_str(), base_input.c_str(), lc);
auto itr = CommandFactory::completionGenerators().find(tokens[0].toString());
if (itr != CommandFactory::completionGenerators().end()) {
itr->second(ntext.c_str(), base_input.c_str(), lc, tokens);
}
if (tokencmp(tokens[0], "status") && count == 1) {
statusGenerator(ntext.c_str(), base_input.c_str(), lc);
}
if (tokencmp(tokens[0], "kill") && count == 1) {
killGenerator(ntext.c_str(), base_input.c_str(), lc);
}
if (tokencmp(tokens[0], "throttle")) {
throttleGenerator(ntext.c_str(), base_input.c_str(), lc, tokens);
}
}
std::vector<const char*> throttleHintGenerator(std::vector<StringRef> const& tokens, bool inArgument) {
if (tokens.size() == 1) {
return { "<on|off|enable auto|disable auto|list>", "[ARGS]" };
} else if (tokencmp(tokens[1], "on")) {
std::vector<const char*> opts = { "tag", "<TAG>", "[RATE]", "[DURATION]", "[default|immediate|batch]" };
if (tokens.size() == 2) {
return opts;
} else if (((tokens.size() == 3 && inArgument) || tokencmp(tokens[2], "tag")) && tokens.size() < 7) {
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
}
} else if (tokencmp(tokens[1], "off")) {
if (tokencmp(tokens[tokens.size() - 1], "tag")) {
return { "<TAG>" };
} else {
bool hasType = false;
bool hasTag = false;
bool hasPriority = false;
for (int i = 2; i < tokens.size(); ++i) {
if (tokencmp(tokens[i], "all") || tokencmp(tokens[i], "auto") || tokencmp(tokens[i], "manual")) {
hasType = true;
} else if (tokencmp(tokens[i], "default") || tokencmp(tokens[i], "immediate") ||
tokencmp(tokens[i], "batch")) {
hasPriority = true;
} else if (tokencmp(tokens[i], "tag")) {
hasTag = true;
++i;
} else {
return {};
}
}
std::vector<const char*> options;
if (!hasType) {
options.push_back("[all|auto|manual]");
}
if (!hasTag) {
options.push_back("[tag <TAG>]");
}
if (!hasPriority) {
options.push_back("[default|immediate|batch]");
}
return options;
}
} else if ((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 2) {
return { "auto" };
} else if (tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
if (tokens.size() == 2) {
return { "[throttled|recommended|all]", "[LIMITS]" };
} else if (tokens.size() == 3 && (tokencmp(tokens[2], "throttled") || tokencmp(tokens[2], "recommended") ||
tokencmp(tokens[2], "all"))) {
return { "[LIMITS]" };
}
} else if (tokens.size() == 2 && inArgument) {
return { "[ARGS]" };
}
return std::vector<const char*>();
}
void LogCommand(std::string line, UID randomID, std::string errMsg) {
@ -2080,8 +1942,9 @@ ACTOR Future<int> runCli(CLIOptions opt) {
bool inArgument = *(line.end() - 1) != ' ';
std::string hintLine = inArgument ? " " : "";
if (tokencmp(command, "throttle")) {
std::vector<const char*> hintItems = throttleHintGenerator(parsed.back(), inArgument);
auto itr = CommandFactory::hintGenerators().find(command.toString());
if (itr != CommandFactory::hintGenerators().end()) {
std::vector<const char*> hintItems = itr->second(parsed.back(), inArgument);
if (hintItems.empty()) {
return LineNoise::Hint();
}

View File

@ -47,8 +47,28 @@ struct CommandHelp {
CommandHelp(const char* u, const char* s, const char* l) : usage(u), short_desc(s), long_desc(l) {}
};
void arrayGenerator(const char* text, const char* line, const char** options, std::vector<std::string>& lc);
struct CommandFactory {
CommandFactory(const char* name, CommandHelp help) { commands()[name] = help; }
typedef void (*CompletionGeneratorFunc)(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens);
typedef std::vector<const char*> (*HintGeneratorFunc)(std::vector<StringRef> const& tokens, bool inArgument);
CommandFactory(const char* name,
CommandHelp help,
CompletionGeneratorFunc completionFunc = nullptr,
HintGeneratorFunc hintFunc = nullptr) {
commands()[name] = help;
if (completionFunc) {
completionGenerators()[name] = completionFunc;
}
if (hintFunc) {
hintGenerators()[name] = hintFunc;
}
}
CommandFactory(const char* name) { hiddenCommands().insert(name); }
static std::map<std::string, CommandHelp>& commands() {
static std::map<std::string, CommandHelp> helpMap;
@ -58,6 +78,14 @@ struct CommandFactory {
static std::set<std::string> commands;
return commands;
}
static std::map<std::string, CompletionGeneratorFunc>& completionGenerators() {
static std::map<std::string, CompletionGeneratorFunc> completionMap;
return completionMap;
}
static std::map<std::string, HintGeneratorFunc>& hintGenerators() {
static std::map<std::string, HintGeneratorFunc> hintMap;
return hintMap;
}
};
// Special keys used by fdbcli commands

View File

@ -162,7 +162,7 @@ struct CommitTransactionRequest : TimedRequest {
bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
Arena arena;
SpanID spanContext;
SpanContext spanContext;
CommitTransactionRef transaction;
ReplyPromise<CommitID> reply;
uint32_t flags;
@ -172,8 +172,8 @@ struct CommitTransactionRequest : TimedRequest {
TenantInfo tenantInfo;
CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {}
CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {}
CommitTransactionRequest() : CommitTransactionRequest(SpanContext()) {}
CommitTransactionRequest(SpanContext const& context) : spanContext(context), flags(0) {}
template <class Ar>
void serialize(Ar& ar) {
@ -242,7 +242,7 @@ struct GetReadVersionRequest : TimedRequest {
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
};
SpanID spanContext;
SpanContext spanContext;
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;
@ -255,7 +255,7 @@ struct GetReadVersionRequest : TimedRequest {
Version maxVersion; // max version in the client's version vector cache
GetReadVersionRequest() : transactionCount(1), flags(0), maxVersion(invalidVersion) {}
GetReadVersionRequest(SpanID spanContext,
GetReadVersionRequest(SpanContext spanContext,
uint32_t transactionCount,
TransactionPriority priority,
Version maxVersion,
@ -325,7 +325,7 @@ struct GetKeyServerLocationsReply {
struct GetKeyServerLocationsRequest {
constexpr static FileIdentifier file_identifier = 9144680;
Arena arena;
SpanID spanContext;
SpanContext spanContext;
Optional<TenantNameRef> tenant;
KeyRef begin;
Optional<KeyRef> end;
@ -340,7 +340,7 @@ struct GetKeyServerLocationsRequest {
Version minTenantVersion;
GetKeyServerLocationsRequest() : limit(0), reverse(false), minTenantVersion(latestVersion) {}
GetKeyServerLocationsRequest(SpanID spanContext,
GetKeyServerLocationsRequest(SpanContext spanContext,
Optional<TenantNameRef> const& tenant,
KeyRef const& begin,
Optional<KeyRef> const& end,
@ -378,12 +378,12 @@ struct GetRawCommittedVersionReply {
struct GetRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 12954034;
SpanID spanContext;
SpanContext spanContext;
Optional<UID> debugID;
ReplyPromise<GetRawCommittedVersionReply> reply;
Version maxVersion; // max version in the grv proxy's version vector cache
explicit GetRawCommittedVersionRequest(SpanID spanContext,
explicit GetRawCommittedVersionRequest(SpanContext spanContext,
Optional<UID> const& debugID = Optional<UID>(),
Version maxVersion = invalidVersion)
: spanContext(spanContext), debugID(debugID), maxVersion(maxVersion) {}

View File

@ -24,6 +24,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
#include "flow/Tracing.h"
// The versioned message has wire format : -1, version, messages
static const int32_t VERSION_HEADER = -1;
@ -77,6 +78,7 @@ struct MutationRef {
AndV2,
CompareAndClear,
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
Reserved_For_OTELSpanContextMessage,
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes.
@ -190,7 +192,7 @@ struct CommitTransactionRef {
Version read_snapshot = 0;
bool report_conflicting_keys = false;
bool lock_aware = false; // set when metadata mutations are present
Optional<SpanID> spanContext;
Optional<SpanContext> spanContext;
template <class Ar>
force_inline void serialize(Ar& ar) {

View File

@ -141,7 +141,7 @@ struct WatchParameters : public ReferenceCounted<WatchParameters> {
const Version version;
const TagSet tags;
const SpanID spanID;
const SpanContext spanContext;
const TaskPriority taskID;
const Optional<UID> debugID;
const UseProvisionalProxies useProvisionalProxies;
@ -151,11 +151,11 @@ struct WatchParameters : public ReferenceCounted<WatchParameters> {
Optional<Value> value,
Version version,
TagSet tags,
SpanID spanID,
SpanContext spanContext,
TaskPriority taskID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies)
: tenant(tenant), key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID),
: tenant(tenant), key(key), value(value), version(version), tags(tags), spanContext(spanContext), taskID(taskID),
debugID(debugID), useProvisionalProxies(useProvisionalProxies) {}
};
@ -416,12 +416,12 @@ public:
Optional<TenantName> defaultTenant;
struct VersionRequest {
SpanID spanContext;
SpanContext spanContext;
Promise<GetReadVersionReply> reply;
TagSet tags;
Optional<UID> debugID;
VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
VersionRequest(SpanContext spanContext, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), tags(tags), debugID(debugID) {}
};

View File

@ -29,30 +29,10 @@
#include <unordered_set>
#include <boost/functional/hash.hpp>
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ProtocolVersion.h"
#include "flow/flow.h"
enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) {
return static_cast<TraceFlags>(static_cast<std::underlying_type_t<TraceFlags>>(lhs) &
static_cast<std::underlying_type_t<TraceFlags>>(rhs));
}
struct SpanContext {
UID traceID;
uint64_t spanID;
TraceFlags m_Flags;
SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {}
SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {}
SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {}
SpanContext(Arena arena, const SpanContext& span)
: traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {}
bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; }
};
typedef int64_t Version;
typedef uint64_t LogEpoch;
typedef uint64_t Sequence;

View File

@ -27,6 +27,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "flow/Tracing.h"
#include "flow/ThreadHelper.actor.h"
struct VersionVector;
@ -96,11 +97,11 @@ public:
virtual ThreadFuture<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
// @todo This API and the "getSpanID()" API may help with debugging simulation
// @todo This API and the "getSpanContext()" API may help with debugging simulation
// test failures. (These APIs are not currently invoked anywhere.) Remove them
// later if they are not really needed.
virtual VersionVector getVersionVector() = 0;
virtual UID getSpanID() = 0;
virtual SpanContext getSpanContext() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -45,7 +45,7 @@ public:
// Not implemented:
void setVersion(Version) override { throw client_invalid_operation(); }
VersionVector getVersionVector() const override { throw client_invalid_operation(); }
UID getSpanID() const override { throw client_invalid_operation(); }
SpanContext getSpanContext() const override { throw client_invalid_operation(); }
Future<Key> getKey(KeySelector const& key, Snapshot snapshot = Snapshot::False) override {
throw client_invalid_operation();
}

View File

@ -95,7 +95,7 @@ public:
virtual Future<Void> commit() = 0;
virtual Version getCommittedVersion() const = 0;
virtual VersionVector getVersionVector() const = 0;
virtual UID getSpanID() const = 0;
virtual SpanContext getSpanContext() const = 0;
virtual int64_t getApproximateSize() const = 0;
virtual Future<Standalone<StringRef>> getVersionstamp() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -300,17 +300,7 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
beginVersion,
rv,
context);
const FdbCApi::FDBKeyValue* kvs;
int count;
FdbCApi::fdb_bool_t more;
FdbCApi::fdb_error_t error = api->resultGetKeyValueArray(r, &kvs, &count, &more);
if (error) {
return ThreadResult<RangeResult>(Error(error));
}
// The memory for this is stored in the FDBResult and is released when the result gets destroyed
return ThreadResult<RangeResult>(
RangeResult(RangeResultRef(VectorRef<KeyValueRef>((KeyValueRef*)kvs, count), more), Arena()));
return ThreadResult<RangeResult>((ThreadSingleAssignmentVar<RangeResult>*)(r));
}
void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
@ -1115,13 +1105,13 @@ VersionVector MultiVersionTransaction::getVersionVector() {
return VersionVector();
}
UID MultiVersionTransaction::getSpanID() {
SpanContext MultiVersionTransaction::getSpanContext() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getSpanID();
return tr.transaction->getSpanContext();
}
return UID();
return SpanContext();
}
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {

View File

@ -378,7 +378,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
UID getSpanID() override { return UID(); };
SpanContext getSpanContext() override { return SpanContext(); };
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
@ -567,7 +567,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
UID getSpanID() override;
SpanContext getSpanContext() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;

View File

@ -21,6 +21,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include <algorithm>
#include <cstdio>
#include <iterator>
#include <regex>
#include <unordered_set>
@ -848,7 +849,9 @@ ACTOR Future<Void> assertFailure(GrvProxyInterface remote, Future<ErrorOr<GetRea
Future<Void> attemptGRVFromOldProxies(std::vector<GrvProxyInterface> oldProxies,
std::vector<GrvProxyInterface> newProxies) {
Span span(deterministicRandom()->randomUniqueID(), "VerifyCausalReadRisky"_loc);
auto debugID = nondeterministicRandom()->randomUniqueID();
g_traceBatch.addEvent("AttemptGRVFromOldProxyDebug", debugID.first(), "NativeAPI.attemptGRVFromOldProxies.Start");
Span span("VerifyCausalReadRisky"_loc);
std::vector<Future<Void>> replies;
replies.reserve(oldProxies.size());
GetReadVersionRequest req(
@ -2789,13 +2792,13 @@ void updateTagMappings(Database cx, const GetKeyServerLocationsReply& reply) {
ACTOR Future<KeyRangeLocationInfo> getKeyLocation_internal(Database cx,
Optional<TenantName> tenant,
Key key,
SpanID spanID,
SpanContext spanContext,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Reverse isBackward,
Version version) {
state Span span("NAPI:getKeyLocation"_loc, spanID);
state Span span("NAPI:getKeyLocation"_loc, spanContext);
if (isBackward) {
ASSERT(key != allKeys.begin && key <= allKeys.end);
} else {
@ -2883,7 +2886,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
Optional<TenantName> const& tenant,
Key const& key,
F StorageServerInterface::*member,
SpanID spanID,
SpanContext spanContext,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Reverse isBackward,
@ -2891,7 +2894,8 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
// we first check whether this range is cached
Optional<KeyRangeLocationInfo> locationInfo = cx->getCachedLocation(tenant, key, isBackward);
if (!locationInfo.present()) {
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version);
return getKeyLocation_internal(
cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version);
}
bool onlyEndpointFailedAndNeedRefresh = false;
@ -2905,7 +2909,8 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
cx->invalidateCache(locationInfo.get().tenantEntry.prefix, key);
// Refresh the cache with a new getKeyLocations made to proxies.
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version);
return getKeyLocation_internal(
cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version);
}
return locationInfo.get();
@ -2922,7 +2927,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Reference<TransactionState> trState,
useTenant ? trState->tenant() : Optional<TenantName>(),
key,
member,
trState->spanID,
trState->spanContext,
trState->debugID,
trState->useProvisionalProxies,
isBackward,
@ -2944,11 +2949,11 @@ ACTOR Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations_internal(
KeyRange keys,
int limit,
Reverse reverse,
SpanID spanID,
SpanContext spanContext,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Version version) {
state Span span("NAPI:getKeyRangeLocations"_loc, spanID);
state Span span("NAPI:getKeyRangeLocations"_loc, spanContext);
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before");
@ -3018,7 +3023,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
int limit,
Reverse reverse,
F StorageServerInterface::*member,
SpanID const& spanID,
SpanContext const& spanContext,
Optional<UID> const& debugID,
UseProvisionalProxies useProvisionalProxies,
Version version) {
@ -3028,7 +3033,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
std::vector<KeyRangeLocationInfo> locations;
if (!cx->getCachedLocations(tenant, keys, locations, limit, reverse)) {
return getKeyRangeLocations_internal(
cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version);
cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version);
}
bool foundFailed = false;
@ -3049,7 +3054,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
if (foundFailed) {
// Refresh the cache with a new getKeyRangeLocations made to proxies.
return getKeyRangeLocations_internal(
cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version);
cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version);
}
return locations;
@ -3069,7 +3074,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Reference<Transac
limit,
reverse,
member,
trState->spanID,
trState->spanContext,
trState->debugID,
trState->useProvisionalProxies,
version);
@ -3098,7 +3103,7 @@ ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange
keys,
CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT,
Reverse::False,
trState->spanID,
trState->spanContext,
trState->debugID,
trState->useProvisionalProxies,
version));
@ -3129,38 +3134,35 @@ ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange
return Void();
}
SpanID generateSpanID(bool transactionTracingSample, SpanID parentContext = SpanID()) {
uint64_t txnId = deterministicRandom()->randomUInt64();
SpanContext generateSpanID(bool transactionTracingSample, SpanContext parentContext = SpanContext()) {
if (parentContext.isValid()) {
if (parentContext.first() > 0) {
txnId = parentContext.first();
}
uint64_t tokenId = parentContext.second() > 0 ? deterministicRandom()->randomUInt64() : 0;
return SpanID(txnId, tokenId);
} else if (transactionTracingSample) {
uint64_t tokenId = deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE
? deterministicRandom()->randomUInt64()
: 0;
return SpanID(txnId, tokenId);
} else {
return SpanID(txnId, 0);
return SpanContext(parentContext.traceID, deterministicRandom()->randomUInt64(), parentContext.m_Flags);
}
if (transactionTracingSample) {
return SpanContext(deterministicRandom()->randomUniqueID(),
deterministicRandom()->randomUInt64(),
deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE
? TraceFlags::sampled
: TraceFlags::unsampled);
}
return SpanContext(
deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64(), TraceFlags::unsampled);
}
TransactionState::TransactionState(Database cx,
Optional<TenantName> tenant,
TaskPriority taskID,
SpanID spanID,
SpanContext spanContext,
Reference<TransactionLogInfo> trLogInfo)
: cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID), readVersionObtainedFromGrvProxy(true),
tenant_(tenant), tenantSet(tenant.present()) {}
: cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanContext(spanContext),
readVersionObtainedFromGrvProxy(true), tenant_(tenant), tenantSet(tenant.present()) {}
Reference<TransactionState> TransactionState::cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo,
bool generateNewSpan) const {
SpanID newSpanID = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanID;
SpanContext newSpanContext = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanContext;
Reference<TransactionState> newState =
makeReference<TransactionState>(cx, tenant_, cx->taskID, newSpanID, newTrLogInfo);
makeReference<TransactionState>(cx, tenant_, cx->taskID, newSpanContext, newTrLogInfo);
if (!cx->apiVersionAtLeast(16)) {
newState->options = options;
@ -3218,12 +3220,12 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
UseTenant useTenant,
TransactionRecordLogInfo recordLogInfo) {
state Version ver = wait(version);
state Span span("NAPI:getValue"_loc, trState->spanID);
state Span span("NAPI:getValue"_loc, trState->spanContext);
if (useTenant && trState->tenant().present()) {
span.addTag("tenant"_sr, trState->tenant().get());
span.addAttribute("tenant"_sr, trState->tenant().get());
}
span.addTag("key"_sr, key);
span.addAttribute("key"_sr, key);
trState->cx->validateVersion(ver);
loop {
@ -3349,7 +3351,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
wait(success(version));
state Optional<UID> getKeyID = Optional<UID>();
state Span span("NAPI:getKey"_loc, trState->spanID);
state Span span("NAPI:getKey"_loc, trState->spanContext);
if (trState->debugID.present()) {
getKeyID = nondeterministicRandom()->randomUniqueID();
@ -3448,8 +3450,8 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
}
}
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext) {
state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext });
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanContext spanContext) {
state Span span("NAPI:waitForCommittedVersion"_loc, spanContext);
try {
loop {
choose {
@ -3483,14 +3485,14 @@ ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, Span
}
ACTOR Future<Version> getRawVersion(Reference<TransactionState> trState) {
state Span span("NAPI:getRawVersion"_loc, { trState->spanID });
state Span span("NAPI:getRawVersion"_loc, trState->spanContext);
loop {
choose {
when(wait(trState->cx->onProxiesChanged())) {}
when(GetReadVersionReply v =
wait(basicLoadBalance(trState->cx->getGrvProxies(UseProvisionalProxies::False),
&GrvProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(trState->spanID,
GetReadVersionRequest(trState->spanContext,
0,
TransactionPriority::IMMEDIATE,
trState->cx->ssVersionVectorCache.getMaxVersion()),
@ -3512,7 +3514,7 @@ ACTOR Future<Void> readVersionBatcher(
uint32_t flags);
ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> parameters) {
state Span span("NAPI:watchValue"_loc, parameters->spanID);
state Span span("NAPI:watchValue"_loc, parameters->spanContext);
state Version ver = parameters->version;
cx->validateVersion(parameters->version);
ASSERT(parameters->version != latestVersion);
@ -3522,7 +3524,7 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
parameters->tenant.name,
parameters->key,
&StorageServerInterface::watchValue,
parameters->spanID,
parameters->spanContext,
parameters->debugID,
parameters->useProvisionalProxies,
Reverse::False,
@ -3741,15 +3743,15 @@ ACTOR Future<Void> watchValueMap(Future<Version> version,
Optional<Value> value,
Database cx,
TagSet tags,
SpanID spanID,
SpanContext spanContext,
TaskPriority taskID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies) {
state Version ver = wait(version);
wait(getWatchFuture(
cx,
makeReference<WatchParameters>(tenant, key, value, ver, tags, spanID, taskID, debugID, useProvisionalProxies)));
wait(getWatchFuture(cx,
makeReference<WatchParameters>(
tenant, key, value, ver, tags, spanContext, taskID, debugID, useProvisionalProxies)));
return Void();
}
@ -3795,10 +3797,11 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
Reverse reverse,
UseTenant useTenant) {
state RangeResultFamily output;
state Span span("NAPI:getExactRange"_loc, trState->spanID);
// TODO - ljoswiak parent or link?
state Span span("NAPI:getExactRange"_loc, trState->spanContext);
if (useTenant && trState->tenant().present()) {
span.addTag("tenant"_sr, trState->tenant().get());
span.addAttribute("tenant"_sr, trState->tenant().get());
}
// printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
@ -4155,9 +4158,9 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
state KeySelector originalBegin = begin;
state KeySelector originalEnd = end;
state RangeResultFamily output;
state Span span("NAPI:getRange"_loc, trState->spanID);
state Span span("NAPI:getRange"_loc, trState->spanContext);
if (useTenant && trState->tenant().present()) {
span.addTag("tenant"_sr, trState->tenant().get());
span.addAttribute("tenant"_sr, trState->tenant().get());
}
try {
@ -4631,7 +4634,7 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
GetRangeLimits limits,
Snapshot snapshot,
Reverse reverse,
SpanID spanContext) {
SpanContext spanContext) {
loop {
state std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations(trState,
@ -4924,7 +4927,7 @@ ACTOR Future<Void> getRangeStream(Reference<TransactionState> trState,
// FIXME: better handling to disable row limits
ASSERT(!limits.hasRowLimit());
state Span span("NAPI:getRangeStream"_loc, trState->spanID);
state Span span("NAPI:getRangeStream"_loc, trState->spanContext);
state Version version = wait(fVersion);
trState->cx->validateVersion(version);
@ -5047,7 +5050,7 @@ Transaction::Transaction(Database const& cx, Optional<TenantName> const& tenant)
cx->taskID,
generateSpanID(cx->transactionTracingSample),
createTrLogInfoProbabilistically(cx))),
span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) {
span(trState->spanContext, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanContext) {
if (DatabaseContext::debugUseTags) {
debugAddTags(trState);
}
@ -5182,7 +5185,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
Database cx,
Future<TenantInfo> tenant,
TagSet tags,
SpanID spanID,
SpanContext spanContext,
TaskPriority taskID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies) {
@ -5210,7 +5213,7 @@ ACTOR Future<Void> watch(Reference<Watch> watch,
watch->value,
cx,
tags,
spanID,
spanContext,
taskID,
debugID,
useProvisionalProxies);
@ -5243,7 +5246,7 @@ Future<Void> Transaction::watch(Reference<Watch> watch) {
populateAndGetTenant(
trState, watch->key, readVersion.isValid() && readVersion.isReady() ? readVersion.get() : latestVersion),
trState->options.readTags,
trState->spanID,
trState->spanContext,
trState->taskID,
trState->debugID,
trState->useProvisionalProxies);
@ -5716,7 +5719,7 @@ void TransactionOptions::reset(Database const& cx) {
void Transaction::resetImpl(bool generateNewSpan) {
flushTrLogsIfEnabled();
trState = trState->cloneAndReset(createTrLogInfoProbabilistically(trState->cx), generateNewSpan);
tr = CommitTransactionRequest(trState->spanID);
tr = CommitTransactionRequest(trState->spanContext);
readVersion = Future<Version>();
metadataVersion = Promise<Optional<Key>>();
extraConflictRanges.clear();
@ -5731,7 +5734,7 @@ void Transaction::reset() {
void Transaction::fullReset() {
resetImpl(true);
span = Span(trState->spanID, "Transaction"_loc);
span = Span(trState->spanContext, "Transaction"_loc);
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
}
@ -5852,8 +5855,8 @@ ACTOR void checkWrites(Reference<TransactionState> trState,
ACTOR static Future<Void> commitDummyTransaction(Reference<TransactionState> trState, KeyRange range) {
state Transaction tr(trState->cx);
state int retries = 0;
state Span span("NAPI:dummyTransaction"_loc, trState->spanID);
tr.span.addParent(span.context);
state Span span("NAPI:dummyTransaction"_loc, trState->spanContext);
tr.span.setParent(span.context);
loop {
try {
TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries);
@ -5896,7 +5899,7 @@ void Transaction::setupWatches() {
watches[i]->value,
trState->cx,
trState->options.readTags,
trState->spanID,
trState->spanContext,
trState->taskID,
trState->debugID,
trState->useProvisionalProxies));
@ -6019,7 +6022,7 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
Future<Version> readVersion) {
state TraceInterval interval("TransactionCommit");
state double startTime = now();
state Span span("NAPI:tryCommit"_loc, trState->spanID);
state Span span("NAPI:tryCommit"_loc, trState->spanContext);
state Optional<UID> debugID = trState->debugID;
if (debugID.present()) {
TraceEvent(interval.begin()).detail("Parent", debugID.get());
@ -6509,10 +6512,11 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
case FDBTransactionOptions::SPAN_PARENT:
validateOptionValuePresent(value);
if (value.get().size() != 16) {
if (value.get().size() != 33) {
throw invalid_option_value();
}
span.addParent(BinaryReader::fromStringRef<UID>(value.get(), Unversioned()));
TEST(true); // Adding link in FDBTransactionOptions::SPAN_PARENT
span.setParent(BinaryReader::fromStringRef<SpanContext>(value.get(), IncludeVersion()));
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
@ -6555,7 +6559,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
}
}
ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan,
ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanContext parentSpan,
DatabaseContext* cx,
uint32_t transactionCount,
TransactionPriority priority,
@ -6670,7 +6674,7 @@ ACTOR Future<Void> readVersionBatcher(DatabaseContext* cx,
}
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
}
span.addParent(req.spanContext);
span.addLink(req.spanContext);
requests.push_back(req.reply);
for (auto tag : req.tags) {
++tags[tag];
@ -6726,10 +6730,10 @@ ACTOR Future<Void> readVersionBatcher(DatabaseContext* cx,
ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
Location location,
SpanID spanContext,
SpanContext spanContext,
Future<GetReadVersionReply> f,
Promise<Optional<Value>> metadataVersion) {
state Span span(spanContext, location, { trState->spanID });
state Span span(spanContext, location, trState->spanContext);
GetReadVersionReply rep = wait(f);
double replyTime = now();
double latency = replyTime - trState->startTime;
@ -6902,7 +6906,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
}
Location location = "NAPI:getReadVersion"_loc;
UID spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanID);
SpanContext spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanContext);
auto const req = DatabaseContext::VersionRequest(spanContext, trState->options.tags, trState->debugID);
batcher.stream.send(req);
trState->startTime = now();
@ -7392,7 +7396,7 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Reference<Transa
KeyRange keys,
int64_t chunkSize,
Version version) {
state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanID);
state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanContext);
loop {
state std::vector<KeyRangeLocationInfo> locations =
@ -7956,14 +7960,14 @@ Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(cons
return Reference<TransactionLogInfo>();
}
void Transaction::setTransactionID(uint64_t id) {
void Transaction::setTransactionID(UID id) {
ASSERT(getSize() == 0);
trState->spanID = SpanID(id, trState->spanID.second());
trState->spanContext = SpanContext(id, trState->spanContext.spanID);
}
void Transaction::setToken(uint64_t token) {
ASSERT(getSize() == 0);
trState->spanID = SpanID(trState->spanID.first(), token);
trState->spanContext = SpanContext(trState->spanContext.traceID, token);
}
void enableClientInfoLogging() {

View File

@ -243,7 +243,7 @@ struct TransactionState : ReferenceCounted<TransactionState> {
Optional<UID> debugID;
TaskPriority taskID;
SpanID spanID;
SpanContext spanContext;
UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False;
bool readVersionObtainedFromGrvProxy;
@ -259,13 +259,14 @@ struct TransactionState : ReferenceCounted<TransactionState> {
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
// Only available so that Transaction can have a default constructor, for use in state variables
TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID), tenantSet(false) {}
TransactionState(TaskPriority taskID, SpanContext spanContext)
: taskID(taskID), spanContext(spanContext), tenantSet(false) {}
// VERSION_VECTOR changed default values of readVersionObtainedFromGrvProxy
TransactionState(Database cx,
Optional<TenantName> tenant,
TaskPriority taskID,
SpanID spanID,
SpanContext spanContext,
Reference<TransactionLogInfo> trLogInfo);
Reference<TransactionState> cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo, bool generateNewSpan) const;
@ -435,7 +436,7 @@ public:
void debugTransaction(UID dID) { trState->debugID = dID; }
VersionVector getVersionVector() const;
UID getSpanID() const { return trState->spanID; }
SpanContext getSpanContext() const { return trState->spanContext; }
Future<Void> commitMutations();
void setupWatches();
@ -447,7 +448,7 @@ public:
Database getDatabase() const { return trState->cx; }
static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx);
void setTransactionID(uint64_t id);
void setTransactionID(UID id);
void setToken(uint64_t token);
const std::vector<Future<std::pair<Key, Key>>>& getExtraReadConflictRanges() const { return extraConflictRanges; }
@ -490,7 +491,7 @@ private:
Future<Void> committing;
};
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanID spanContext);
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanContext spanContext);
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx,
KeyRange keys,
int shardLimit);

View File

@ -430,6 +430,27 @@ TEST_CASE("/fdbclient/WriteMap/emptiness") {
return Void();
}
TEST_CASE("/fdbclient/WriteMap/VersionstampedvalueAfterSet") {
Arena arena = Arena();
SnapshotCache cache(&arena);
WriteMap writes = WriteMap(&arena);
ASSERT(writes.empty());
writes.mutate("apple"_sr, MutationRef::SetValue, "red"_sr, true);
writes.mutate("apple"_sr, MutationRef::SetVersionstampedValue, metadataVersionRequiredValue, true);
RYWIterator it(&cache, &writes);
it.bypassUnreadableProtection();
it.skip("apple"_sr);
ASSERT(it.is_unreadable());
ASSERT(it.is_kv());
const KeyValueRef* kv = it.kv(arena);
ASSERT(kv->key == "apple"_sr);
ASSERT(kv->value == metadataVersionRequiredValue);
return Void();
}
TEST_CASE("/fdbclient/WriteMap/clear") {
Arena arena = Arena();
WriteMap writes = WriteMap(&arena);
@ -655,7 +676,10 @@ TEST_CASE("/fdbclient/WriteMap/random") {
KeyRef key = RandomTestImpl::getRandomKey(arena);
ValueRef value = RandomTestImpl::getRandomValue(arena);
writes.mutate(key, MutationRef::SetVersionstampedValue, value, addConflict);
setMap[key].push(RYWMutation(value, MutationRef::SetVersionstampedValue));
if (unreadableMap[key])
setMap[key].push(RYWMutation(value, MutationRef::SetVersionstampedValue));
else
setMap[key] = OperationStack(RYWMutation(value, MutationRef::SetVersionstampedValue));
if (addConflict)
conflictMap.insert(key, true);
clearMap.insert(key, false);

View File

@ -1979,7 +1979,7 @@ void ReadYourWritesTransaction::getWriteConflicts(KeyRangeMap<bool>* result) {
}
}
void ReadYourWritesTransaction::setTransactionID(uint64_t id) {
void ReadYourWritesTransaction::setTransactionID(UID id) {
tr.setTransactionID(id);
}

View File

@ -140,7 +140,7 @@ public:
[[nodiscard]] Future<Void> commit() override;
Version getCommittedVersion() const override { return tr.getCommittedVersion(); }
VersionVector getVersionVector() const override { return tr.getVersionVector(); }
UID getSpanID() const override { return tr.getSpanID(); }
SpanContext getSpanContext() const override { return tr.getSpanContext(); }
int64_t getApproximateSize() const override { return approximateSize; }
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp() override;
@ -177,7 +177,7 @@ public:
Reference<const TransactionState> getTransactionState() const { return tr.trState; }
void setTransactionID(uint64_t id);
void setTransactionID(UID id);
void setToken(uint64_t token);
// Read from the special key space readConflictRangeKeysRange

View File

@ -1595,10 +1595,10 @@ Future<RangeResult> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
if (key.endsWith(kTracingTransactionIdKey)) {
result.push_back_deep(result.arena(),
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.first())));
KeyValueRef(key, ryw->getTransactionState()->spanContext.traceID.toString()));
} else if (key.endsWith(kTracingTokenKey)) {
result.push_back_deep(result.arena(),
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.second())));
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanContext.spanID)));
}
}
return result;
@ -1612,7 +1612,7 @@ void TracingOptionsImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key,
}
if (key.endsWith(kTracingTransactionIdKey)) {
ryw->setTransactionID(std::stoul(value.toString()));
ryw->setTransactionID(UID::fromString(value.toString()));
} else if (key.endsWith(kTracingTokenKey)) {
if (value.toString() == "true") {
ryw->setToken(deterministicRandom()->randomUInt64());

View File

@ -35,6 +35,7 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/Tenant.h"
#include "flow/Tracing.h"
#include "flow/UnitTest.h"
#include "fdbclient/VersionVector.h"
@ -271,7 +272,7 @@ struct GetValueReply : public LoadBalancedReply {
struct GetValueRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 8454530;
SpanID spanContext;
SpanContext spanContext;
TenantInfo tenantInfo;
Key key;
Version version;
@ -283,7 +284,7 @@ struct GetValueRequest : TimedRequest {
// serve the given key
GetValueRequest() {}
GetValueRequest(SpanID spanContext,
GetValueRequest(SpanContext spanContext,
const TenantInfo& tenantInfo,
const Key& key,
Version ver,
@ -315,7 +316,7 @@ struct WatchValueReply {
struct WatchValueRequest {
constexpr static FileIdentifier file_identifier = 14747733;
SpanID spanContext;
SpanContext spanContext;
TenantInfo tenantInfo;
Key key;
Optional<Value> value;
@ -326,7 +327,7 @@ struct WatchValueRequest {
WatchValueRequest() {}
WatchValueRequest(SpanID spanContext,
WatchValueRequest(SpanContext spanContext,
TenantInfo tenantInfo,
const Key& key,
Optional<Value> value,
@ -360,7 +361,7 @@ struct GetKeyValuesReply : public LoadBalancedReply {
struct GetKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
SpanContext spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
@ -418,7 +419,7 @@ struct GetMappedKeyValuesReply : public LoadBalancedReply {
struct GetMappedKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795747;
SpanID spanContext;
SpanContext spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
@ -483,7 +484,7 @@ struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply {
struct GetKeyValuesStreamRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
SpanContext spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
@ -534,7 +535,7 @@ struct GetKeyReply : public LoadBalancedReply {
struct GetKeyRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 10457870;
SpanID spanContext;
SpanContext spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef sel;
@ -548,7 +549,7 @@ struct GetKeyRequest : TimedRequest {
GetKeyRequest() {}
GetKeyRequest(SpanID spanContext,
GetKeyRequest(SpanContext spanContext,
TenantInfo tenantInfo,
KeySelectorRef const& sel,
Version version,
@ -836,7 +837,7 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply {
struct ChangeFeedStreamRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
SpanContext spanContext;
Arena arena;
Key rangeID;
Version begin = 0;

View File

@ -465,8 +465,8 @@ VersionVector ThreadSafeTransaction::getVersionVector() {
return tr->getVersionVector();
}
UID ThreadSafeTransaction::getSpanID() {
return tr->getSpanID();
SpanContext ThreadSafeTransaction::getSpanContext() {
return tr->getSpanContext();
}
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {

View File

@ -167,7 +167,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
UID getSpanID() override;
SpanContext getSpanContext() override;
ThreadFuture<int64_t> getApproximateSize() override;
ThreadFuture<uint64_t> getProtocolVersion();

View File

@ -34,10 +34,13 @@ struct TransactionLineage : LineageProperties<TransactionLineage> {
GetKeyServersLocations
};
static constexpr std::string_view name = "Transaction"sv;
uint64_t txID;
UID txID;
Operation operation = Operation::Unset;
bool isSet(uint64_t TransactionLineage::*member) const { return this->*member > 0; }
bool isSet(UID TransactionLineage::*member) const {
return static_cast<UID>(this->*member).first() > 0 && static_cast<UID>(this->*member).second() > 0;
}
bool isSet(Operation TransactionLineage::*member) const { return this->*member != Operation::Unset; }
};

View File

@ -231,7 +231,8 @@ public:
is_unreadable));
}
} else {
if (!it.is_unreadable() && operation == MutationRef::SetValue) {
if (!it.is_unreadable() &&
(operation == MutationRef::SetValue || operation == MutationRef::SetVersionstampedValue)) {
it.tree.clear();
PTreeImpl::remove(writes, ver, key);
PTreeImpl::insert(writes,
@ -523,9 +524,10 @@ public:
static RYWMutation coalesce(RYWMutation existingEntry, RYWMutation newEntry, Arena& arena) {
ASSERT(newEntry.value.present());
if (newEntry.type == MutationRef::SetValue)
if (newEntry.type == MutationRef::SetValue || newEntry.type == MutationRef::SetVersionstampedValue) {
// independent mutations
return newEntry;
else if (newEntry.type == MutationRef::AddValue) {
} else if (newEntry.type == MutationRef::AddValue) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value.get(), arena),

View File

@ -20,6 +20,7 @@
// Unit tests for the flow language and libraries
#include "flow/Arena.h"
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/DeterministicRandom.h"

View File

@ -19,6 +19,7 @@
*/
#include "fdbrpc/FlowTransport.h"
#include "flow/Arena.h"
#include "flow/network.h"
#include <cstdint>
@ -278,6 +279,33 @@ struct UnauthorizedEndpointReceiver final : NetworkMessageReceiver {
bool isPublic() const override { return true; }
};
// NetworkAddressCachedString retains a cached Standalone<StringRef> of
// a NetworkAddressList.address.toString() value. This cached value is useful
// for features in the hot path (i.e. Tracing), which need the String formatted value
// frequently and do not wish to pay the formatting cost. If the underlying NetworkAddressList
// needs to change, do not attempt to update it directly, use the setNetworkAddress API as it
// will ensure the new toString() cached value is updated.
class NetworkAddressCachedString {
public:
NetworkAddressCachedString() { setAddressList(NetworkAddressList()); }
NetworkAddressCachedString(NetworkAddressList const& list) { setAddressList(list); }
NetworkAddressList const& getAddressList() const { return addressList; }
void setAddressList(NetworkAddressList const& list) {
cachedStr = Standalone<StringRef>(StringRef(list.address.toString()));
addressList = list;
}
void setNetworkAddress(NetworkAddress const& addr) {
addressList.address = addr;
setAddressList(addressList); // force the recaching of the string.
}
Standalone<StringRef> getLocalAddressAsString() const { return cachedStr; }
operator NetworkAddressList const&() { return addressList; }
private:
NetworkAddressList addressList;
Standalone<StringRef> cachedStr;
};
class TransportData {
public:
TransportData(uint64_t transportId, int maxWellKnownEndpoints, IPAllowList const* allowList);
@ -299,7 +327,7 @@ public:
// Returns true if given network address 'address' is one of the address we are listening on.
bool isLocalAddress(const NetworkAddress& address) const;
NetworkAddressList localAddresses;
NetworkAddressCachedString localAddresses;
std::vector<Future<Void>> listeners;
std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
@ -877,12 +905,12 @@ void Peer::send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) {
void Peer::prependConnectPacket() {
// Send the ConnectPacket expected at the beginning of a new connection
ConnectPacket pkt;
if (transport->localAddresses.address.isTLS() == destination.isTLS()) {
pkt.canonicalRemotePort = transport->localAddresses.address.port;
pkt.setCanonicalRemoteIp(transport->localAddresses.address.ip);
} else if (transport->localAddresses.secondaryAddress.present()) {
pkt.canonicalRemotePort = transport->localAddresses.secondaryAddress.get().port;
pkt.setCanonicalRemoteIp(transport->localAddresses.secondaryAddress.get().ip);
if (transport->localAddresses.getAddressList().address.isTLS() == destination.isTLS()) {
pkt.canonicalRemotePort = transport->localAddresses.getAddressList().address.port;
pkt.setCanonicalRemoteIp(transport->localAddresses.getAddressList().address.ip);
} else if (transport->localAddresses.getAddressList().secondaryAddress.present()) {
pkt.canonicalRemotePort = transport->localAddresses.getAddressList().secondaryAddress.get().port;
pkt.setCanonicalRemoteIp(transport->localAddresses.getAddressList().secondaryAddress.get().ip);
} else {
// a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it
pkt.canonicalRemotePort = 0;
@ -919,10 +947,10 @@ void Peer::onIncomingConnection(Reference<Peer> self, Reference<IConnection> con
++self->connectIncomingCount;
if (!destination.isPublic() && !outgoingConnectionIdle)
throw address_in_use();
NetworkAddress compatibleAddr = transport->localAddresses.address;
if (transport->localAddresses.secondaryAddress.present() &&
transport->localAddresses.secondaryAddress.get().isTLS() == destination.isTLS()) {
compatibleAddr = transport->localAddresses.secondaryAddress.get();
NetworkAddress compatibleAddr = transport->localAddresses.getAddressList().address;
if (transport->localAddresses.getAddressList().secondaryAddress.present() &&
transport->localAddresses.getAddressList().secondaryAddress.get().isTLS() == destination.isTLS()) {
compatibleAddr = transport->localAddresses.getAddressList().secondaryAddress.get();
}
if (!destination.isPublic() || outgoingConnectionIdle || destination > compatibleAddr ||
@ -1455,10 +1483,10 @@ ACTOR static Future<Void> listen(TransportData* self, NetworkAddress listenAddr)
state ActorCollectionNoErrors
incoming; // Actors monitoring incoming connections that haven't yet been associated with a peer
state Reference<IListener> listener = INetworkConnections::net()->listen(listenAddr);
if (!g_network->isSimulated() && self->localAddresses.address.port == 0) {
if (!g_network->isSimulated() && self->localAddresses.getAddressList().address.port == 0) {
TraceEvent(SevInfo, "UpdatingListenAddress")
.detail("AssignedListenAddress", listener->getListenAddress().toString());
self->localAddresses.address = listener->getListenAddress();
self->localAddresses.setNetworkAddress(listener->getListenAddress());
}
state uint64_t connectionCount = 0;
try {
@ -1507,8 +1535,9 @@ Reference<Peer> TransportData::getOrOpenPeer(NetworkAddress const& address, bool
}
bool TransportData::isLocalAddress(const NetworkAddress& address) const {
return address == localAddresses.address ||
(localAddresses.secondaryAddress.present() && address == localAddresses.secondaryAddress.get());
return address == localAddresses.getAddressList().address ||
(localAddresses.getAddressList().secondaryAddress.present() &&
address == localAddresses.getAddressList().secondaryAddress.get());
}
ACTOR static Future<Void> multiVersionCleanupWorker(TransportData* self) {
@ -1554,15 +1583,21 @@ void FlowTransport::initMetrics() {
}
NetworkAddressList FlowTransport::getLocalAddresses() const {
return self->localAddresses;
return self->localAddresses.getAddressList();
}
NetworkAddress FlowTransport::getLocalAddress() const {
return self->localAddresses.address;
return self->localAddresses.getAddressList().address;
}
Standalone<StringRef> FlowTransport::getLocalAddressAsString() const {
return self->localAddresses.getLocalAddressAsString();
}
void FlowTransport::setLocalAddress(NetworkAddress const& address) {
self->localAddresses.address = address;
auto newAddress = self->localAddresses.getAddressList();
newAddress.address = address;
self->localAddresses.setAddressList(newAddress);
}
const std::unordered_map<NetworkAddress, Reference<Peer>>& FlowTransport::getAllPeers() const {
@ -1586,11 +1621,14 @@ Future<Void> FlowTransport::onIncompatibleChanged() {
Future<Void> FlowTransport::bind(NetworkAddress publicAddress, NetworkAddress listenAddress) {
ASSERT(publicAddress.isPublic());
if (self->localAddresses.address == NetworkAddress()) {
self->localAddresses.address = publicAddress;
if (self->localAddresses.getAddressList().address == NetworkAddress()) {
self->localAddresses.setNetworkAddress(publicAddress);
} else {
self->localAddresses.secondaryAddress = publicAddress;
auto addrList = self->localAddresses.getAddressList();
addrList.secondaryAddress = publicAddress;
self->localAddresses.setAddressList(addrList);
}
// reformatLocalAddress()
TraceEvent("Binding").detail("PublicAddress", publicAddress).detail("ListenAddress", listenAddress);
Future<Void> listenF = listen(self, listenAddress);
@ -1641,7 +1679,7 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
void FlowTransport::addEndpoint(Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID) {
endpoint.token = deterministicRandom()->randomUniqueID();
if (receiver->isStream()) {
endpoint.addresses = self->localAddresses;
endpoint.addresses = self->localAddresses.getAddressList();
endpoint.token = UID(endpoint.token.first() | TOKEN_STREAM_FLAG, endpoint.token.second());
} else {
endpoint.addresses = NetworkAddressList();
@ -1651,7 +1689,7 @@ void FlowTransport::addEndpoint(Endpoint& endpoint, NetworkMessageReceiver* rece
}
void FlowTransport::addEndpoints(std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams) {
self->endpoints.insert(self->localAddresses, streams);
self->endpoints.insert(self->localAddresses.getAddressList(), streams);
}
void FlowTransport::removeEndpoint(const Endpoint& endpoint, NetworkMessageReceiver* receiver) {
@ -1659,7 +1697,7 @@ void FlowTransport::removeEndpoint(const Endpoint& endpoint, NetworkMessageRecei
}
void FlowTransport::addWellKnownEndpoint(Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID) {
endpoint.addresses = self->localAddresses;
endpoint.addresses = self->localAddresses.getAddressList();
ASSERT(receiver->isStream());
self->endpoints.insertWellKnown(receiver, endpoint.token, taskID);
}

View File

@ -20,6 +20,7 @@
#ifndef FLOW_TRANSPORT_H
#define FLOW_TRANSPORT_H
#include "flow/Arena.h"
#pragma once
#include <algorithm>
@ -215,6 +216,10 @@ public:
// Returns first local NetworkAddress.
NetworkAddress getLocalAddress() const;
// Returns first local NetworkAddress as std::string. Caches value
// to avoid unnecessary calls to toString() and fmt overhead.
Standalone<StringRef> getLocalAddressAsString() const;
// Returns first local NetworkAddress.
void setLocalAddress(NetworkAddress const&);

View File

@ -24,6 +24,7 @@
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fdbrpc/simulator.h"
#include "flow/Arena.h"
#define BOOST_SYSTEM_NO_LIB
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB

View File

@ -53,7 +53,7 @@ namespace {
class ApplyMetadataMutationsImpl {
public:
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
const UID& dbgid_,
Arena& arena_,
const VectorRef<MutationRef>& mutations_,
@ -61,7 +61,7 @@ public:
: spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_),
confChange(dummyConfChange) {}
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
Arena& arena_,
const VectorRef<MutationRef>& mutations_,
ProxyCommitData& proxyCommitData_,
@ -82,7 +82,7 @@ public:
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
initialCommit(initialCommit_) {}
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
ResolverData& resolverData_,
const VectorRef<MutationRef>& mutations_)
: spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_),
@ -94,7 +94,7 @@ public:
private:
// The following variables are incoming parameters
const SpanID& spanContext;
const SpanContext& spanContext;
const UID& dbgid;
@ -1217,7 +1217,7 @@ public:
} // anonymous namespace
void applyMetadataMutations(SpanID const& spanContext,
void applyMetadataMutations(SpanContext const& spanContext,
ProxyCommitData& proxyCommitData,
Arena& arena,
Reference<ILogSystem> logSystem,
@ -1241,13 +1241,13 @@ void applyMetadataMutations(SpanID const& spanContext,
.apply();
}
void applyMetadataMutations(SpanID const& spanContext,
void applyMetadataMutations(SpanContext const& spanContext,
ResolverData& resolverData,
const VectorRef<MutationRef>& mutations) {
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations).apply();
}
void applyMetadataMutations(SpanID const& spanContext,
void applyMetadataMutations(SpanContext const& spanContext,
const UID& dbgid,
Arena& arena,
const VectorRef<MutationRef>& mutations,

View File

@ -87,7 +87,7 @@ Reference<StorageInfo> getStorageInfo(UID id,
std::map<UID, Reference<StorageInfo>>* storageCache,
IKeyValueStore* txnStateStore);
void applyMetadataMutations(SpanID const& spanContext,
void applyMetadataMutations(SpanContext const& spanContext,
ProxyCommitData& proxyCommitData,
Arena& arena,
Reference<ILogSystem> logSystem,
@ -97,7 +97,7 @@ void applyMetadataMutations(SpanID const& spanContext,
Version version,
Version popVersion,
bool initialCommit);
void applyMetadataMutations(SpanID const& spanContext,
void applyMetadataMutations(SpanContext const& spanContext,
const UID& dbgid,
Arena& arena,
const VectorRef<MutationRef>& mutations,
@ -140,7 +140,7 @@ inline bool containsMetadataMutation(const VectorRef<MutationRef>& mutations) {
}
// Resolver's version
void applyMetadataMutations(SpanID const& spanContext,
void applyMetadataMutations(SpanContext const& spanContext,
ResolverData& resolverData,
const VectorRef<MutationRef>& mutations);

View File

@ -67,6 +67,10 @@ struct VersionedMessage {
return false;
if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader))
return false;
if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) {
TEST(true); // Returning false for OTELSpanContextMessage
return false;
}
reader >> *m;
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;

View File

@ -88,6 +88,7 @@ set(FDBSERVER_SRCS
OldTLogServer_4_6.actor.cpp
OldTLogServer_6_0.actor.cpp
OldTLogServer_6_2.actor.cpp
OTELSpanContextMessage.h
OnDemandStore.actor.cpp
OnDemandStore.h
PaxosConfigConsumer.actor.cpp

View File

@ -1629,7 +1629,7 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
tr.set(recoveryCommitRequest.arena, clusterIdKey, BinaryWriter::toValue(self->clusterId, Unversioned()));
}
applyMetadataMutations(SpanID(),
applyMetadataMutations(SpanContext(),
self->dbgid,
recoveryCommitRequest.arena,
tr.mutations.slice(mmApplied, tr.mutations.size()),

View File

@ -464,7 +464,7 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self,
state int yieldBytes = 0;
state BinaryWriter valueWriter(Unversioned());
toCommit->addTransactionInfo(SpanID());
toCommit->addTransactionInfo(SpanContext());
// Serialize the log range mutations within the map
for (; logRangeMutation != logRangeMutations->cend(); ++logRangeMutation) {
@ -731,7 +731,7 @@ void CommitBatchContext::setupTraceBatch() {
g_traceBatch.addAttach("CommitAttachID", tr.debugID.get().first(), debugID.get().first());
}
span.addParent(tr.spanContext);
span.addLink(tr.spanContext);
}
if (debugID.present()) {
@ -960,7 +960,7 @@ void applyMetadataEffect(CommitBatchContext* self) {
committed =
committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
if (committed) {
applyMetadataMutations(SpanID(),
applyMetadataMutations(SpanContext(),
*self->pProxyCommitData,
self->arena,
self->pProxyCommitData->logSystem,
@ -1380,8 +1380,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
// simulation
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", pProxyCommitData->dbgid).detail("CommittedVersion", pProxyCommitData->committedVersion.get()).detail("NeedToCommit", commitVersion);
waitVersionSpan = Span(
deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, { span.context });
waitVersionSpan = Span("MP:overMaxReadTransactionLifeVersions"_loc, span.context);
choose {
when(wait(pProxyCommitData->committedVersion.whenAtLeast(
self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
@ -1777,7 +1776,7 @@ void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitDat
ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) {
// We can't respond to these requests until we have valid txnStateStore
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
wait(commitData->validState.getFuture());
wait(delay(0, TaskPriority::DefaultEndpoint));
@ -2297,7 +2296,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
Arena arena;
bool confChanges;
applyMetadataMutations(SpanID(),
applyMetadataMutations(SpanContext(),
*pContext->pCommitData,
arena,
Reference<ILogSystem>(),

View File

@ -542,7 +542,7 @@ ACTOR Future<Void> lastCommitUpdater(GrvProxyData* self, PromiseStream<Future<Vo
}
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanContext parentSpan,
GrvProxyData* grvProxyData,
uint32_t flags,
Optional<UID> debugID,
@ -945,7 +945,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
int batchGRVProcessed = 0;
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(UID() /*span.context*/,
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(SpanContext(),
grvProxyData,
i,
debugID,

View File

@ -603,7 +603,12 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
}
replyPromise.send(reply);
//TraceEvent("LogRouterPeek4", self->dbgid);
DisabledTraceEvent("LogRouterPeek4", self->dbgid)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("End", reply.end)
.detail("MessageSize", reply.messages.size())
.detail("PoppedVersion", self->poppedVersion);
return Void();
}

View File

@ -19,6 +19,9 @@
*/
#include "fdbserver/LogSystem.h"
#include "fdbclient/FDBTypes.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "flow/serialize.h"
std::string LogSet::logRouterString() {
@ -277,8 +280,8 @@ void LogPushData::addTxsTag() {
}
}
void LogPushData::addTransactionInfo(SpanID const& context) {
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID
void LogPushData::addTransactionInfo(SpanContext const& context) {
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanContext
spanContext = context;
writtenLocations.clear();
}
@ -344,13 +347,33 @@ bool LogPushData::writeTransactionInfo(int location, uint32_t subseq) {
writtenLocations.insert(location);
BinaryWriter& wr = messagesWriter[location];
SpanContextMessage contextMessage(spanContext);
int offset = wr.getLength();
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
for (auto& tag : prev_tags)
wr << tag;
wr << contextMessage;
if (logSystem->getTLogVersion() >= TLogVersion::V7) {
OTELSpanContextMessage contextMessage(spanContext);
wr << contextMessage;
} else {
// When we're on a TLog version below 7, but the front end of the system (i.e. proxy, sequencer, resolver)
// is using OpenTelemetry tracing (i.e on or above 7.2), we need to convert the OpenTelemetry Span data model
// i.e. 16 bytes for traceId, 8 bytes for spanId, to the OpenTracing spec, which is 8 bytes for traceId
// and 8 bytes for spanId. That means we need to drop some data.
//
// As a workaround for this special case we've decided to drop is the 8 bytes
// for spanId. Therefore we're passing along the full 16 byte traceId to the storage server with 0 for spanID.
// This will result in a follows from relationship for the storage span within the trace rather than a
// parent->child.
SpanContextMessage contextMessage;
if (spanContext.isSampled()) {
TEST(true); // Converting OTELSpanContextMessage to traced SpanContextMessage
contextMessage = SpanContextMessage(UID(spanContext.traceID.first(), spanContext.traceID.second()));
} else {
TEST(true); // Converting OTELSpanContextMessage to untraced SpanContextMessage
contextMessage = SpanContextMessage(UID(0, 0));
}
wr << contextMessage;
}
int length = wr.getLength() - offset;
*(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t);
return true;

View File

@ -26,6 +26,7 @@
#include <vector>
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/DatabaseConfiguration.h"
@ -519,7 +520,7 @@ struct ILogSystem {
Version knownCommittedVersion,
Version minKnownCommittedVersion,
LogPushData& data,
SpanID const& spanContext,
SpanContext const& spanContext,
Optional<UID> debugID = Optional<UID>(),
Optional<std::unordered_map<uint16_t, Version>> tpcvMap =
Optional<std::unordered_map<uint16_t, Version>>()) = 0;
@ -762,7 +763,7 @@ struct LogPushData : NonCopyable {
}
// Add transaction info to be written before the first mutation in the transaction.
void addTransactionInfo(SpanID const& context);
void addTransactionInfo(SpanContext const& context);
// copy written_tags, after filtering, into given set
void saveTags(std::set<Tag>& filteredTags) const {
@ -832,7 +833,7 @@ private:
// field.
std::unordered_set<int> writtenLocations;
uint32_t subsequence;
SpanID spanContext;
SpanContext spanContext;
bool shardChanged = false; // if keyServers has any changes, i.e., shard boundary modifications.
// Writes transaction info to the message stream at the given location if

View File

@ -58,6 +58,7 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf
this->results.minKnownCommittedVersion = 0;
DisabledTraceEvent(SevDebug, "SPC_Starting", randomID)
.detail("Tag", tag.toString())
.detail("UsePeekStream", usePeekStream)
.detail("Begin", begin)
.detail("End", end);
}

View File

@ -133,14 +133,14 @@ struct GetCommitVersionReply {
struct GetCommitVersionRequest {
constexpr static FileIdentifier file_identifier = 16683181;
SpanID spanContext;
SpanContext spanContext;
uint64_t requestNum;
uint64_t mostRecentProcessedRequestNum;
UID requestingProxy;
ReplyPromise<GetCommitVersionReply> reply;
GetCommitVersionRequest() {}
GetCommitVersionRequest(SpanID spanContext,
GetCommitVersionRequest(SpanContext spanContext,
uint64_t requestNum,
uint64_t mostRecentProcessedRequestNum,
UID requestingProxy)

View File

@ -24,6 +24,7 @@
#include "fdbserver/MutationTracking.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "fdbclient/SystemData.h"
#if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED
#error "You cannot use mutation tracking in a clean/release build."
@ -96,6 +97,11 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
SpanContextMessage scm;
br >> scm;
} else if (OTELSpanContextMessage::startsOTELSpanContextMessage(mutationType)) {
TEST(true); // MutationTracking reading OTELSpanContextMessage
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
OTELSpanContextMessage scm;
br >> scm;
} else {
MutationRef m;
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));

View File

@ -0,0 +1,66 @@
/*
* OTELSpanContextMessage.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_OTELSPANCONTEXTMESSAGE_H
#define FDBSERVER_OTELSPANCONTEXTMESSAGE_H
#pragma once
#include "flow/Tracing.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
struct OTELSpanContextMessage {
// This message is pushed into the the transaction logs' memory to inform
// it what transaction subsequent mutations were a part of. This allows
// transaction logs and storage servers to associate mutations with a
// transaction identifier, called a span context.
//
// This message is similar to LogProtocolMessage. Storage servers read the
// first byte of this message to uniquely identify it, meaning it will
// never be mistaken for another message. See LogProtocolMessage.h for more
// information.
SpanContext spanContext;
OTELSpanContextMessage() {}
OTELSpanContextMessage(SpanContext const& spanContext) : spanContext(spanContext) {}
std::string toString() const {
return format("code: %d, span context: %s",
MutationRef::Reserved_For_OTELSpanContextMessage,
spanContext.toString().c_str());
}
template <class Ar>
void serialize(Ar& ar) {
uint8_t poly = MutationRef::Reserved_For_OTELSpanContextMessage;
serializer(ar, poly, spanContext);
}
static bool startsOTELSpanContextMessage(uint8_t byte) {
return byte == MutationRef::Reserved_For_OTELSpanContextMessage;
}
template <class Ar>
static bool isNextIn(Ar& ar) {
return startsOTELSpanContextMessage(*(const uint8_t*)ar.peekBytes(1));
}
};
#endif

View File

@ -340,8 +340,8 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
// The condition here must match CommitBatch::applyMetadataToCommittedTransactions()
if (reply.committed[t] == ConflictBatch::TransactionCommitted && !self->forceRecovery &&
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && (!isLocked || req.transactions[t].lock_aware)) {
SpanID spanContext =
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanID();
SpanContext spanContext =
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();
applyMetadataMutations(spanContext, resolverData, req.transactions[t].mutations);
}
@ -565,7 +565,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
ResolverData resolverData(
pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges);
applyMetadataMutations(SpanID(), resolverData, mutations);
applyMetadataMutations(SpanContext(), resolverData, mutations);
} // loop
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();

View File

@ -118,7 +118,7 @@ struct ResolveTransactionBatchRequest {
constexpr static FileIdentifier file_identifier = 16462858;
Arena arena;
SpanID spanContext;
SpanContext spanContext;
Version prevVersion;
Version version; // FIXME: ?
Version lastReceivedVersion;

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbserver/OTELSpanContextMessage.h"
#include "flow/Arena.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/NativeAPI.actor.h"
@ -1897,6 +1898,10 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
SpanContextMessage::isNextIn(cloneReader)) {
SpanContextMessage scm;
cloneReader >> scm;
} else if (cloneReader.protocolVersion().hasOTELSpanContext() &&
OTELSpanContextMessage::isNextIn(cloneReader)) {
OTELSpanContextMessage scm;
cloneReader >> scm;
} else {
MutationRef msg;
cloneReader >> msg;
@ -1975,6 +1980,10 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
} else if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) {
SpanContextMessage scm;
reader >> scm;
} else if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) {
TEST(true); // StorageCache reading OTELSpanContextMessage
OTELSpanContextMessage oscm;
reader >> oscm;
} else {
MutationRef msg;
reader >> msg;

View File

@ -296,7 +296,7 @@ struct TLogCommitReply {
struct TLogCommitRequest {
constexpr static FileIdentifier file_identifier = 4022206;
SpanID spanContext;
SpanContext spanContext;
Arena arena;
Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion;
@ -307,7 +307,7 @@ struct TLogCommitRequest {
Optional<UID> debugID;
TLogCommitRequest() {}
TLogCommitRequest(const SpanID& context,
TLogCommitRequest(const SpanContext& context,
const Arena& a,
Version prevVersion,
Version version,

View File

@ -567,6 +567,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
TLogData* tLogData;
Promise<Void> recoveryComplete, committingQueue;
Version unrecoveredBefore, recoveredAt;
Version recoveryTxnVersion;
Promise<Void> recoveryTxnReceived;
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>>
@ -646,10 +648,11 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
blockingPeekTimeouts("BlockingPeekTimeouts", cc), emptyPeeks("EmptyPeeks", cc),
nonEmptyPeeks("NonEmptyPeeks", cc), logId(interf.id()), protocolVersion(protocolVersion),
newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1),
logSystem(new AsyncVar<Reference<ILogSystem>>()), remoteTag(remoteTag), isPrimary(isPrimary),
logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), locality(tagLocalityInvalid),
recruitmentID(recruitmentID), logSpillType(logSpillType), allTags(tags.begin(), tags.end()),
terminated(tLogData->terminated.getFuture()), execOpCommitInProgress(false), txsTags(txsTags) {
recoveryTxnVersion(1), logSystem(new AsyncVar<Reference<ILogSystem>>()), remoteTag(remoteTag),
isPrimary(isPrimary), logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0),
locality(tagLocalityInvalid), recruitmentID(recruitmentID), logSpillType(logSpillType),
allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), execOpCommitInProgress(false),
txsTags(txsTags) {
startRole(Role::TRANSACTION_LOG,
interf.id(),
tLogData->workerID,
@ -1565,7 +1568,7 @@ Version poppedVersion(Reference<LogData> self, Tag tag) {
if (tag == txsTag || tag.locality == tagLocalityTxs) {
return 0;
}
return self->recoveredAt + 1;
return std::max(self->recoveredAt + 1, self->recoveryTxnVersion);
}
return tagData->popped;
}
@ -1743,12 +1746,24 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
return Void();
}
//TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
DisabledTraceEvent("TLogPeekMessages0", self->dbgid)
.detail("LogId", logData->logId)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("Version", logData->version.get())
.detail("RecoveredAt", logData->recoveredAt);
// Wait until we have something to return that the caller doesn't already have
if (logData->version.get() < reqBegin) {
wait(logData->version.whenAtLeast(reqBegin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
}
if (!logData->stopped && reqTag.locality != tagLocalityTxs && reqTag != txsTag) {
// Make sure the peek reply has the recovery txn for the current TLog.
// Older generation TLog has been stopped and doesn't wait here.
// Similarly during recovery, reading transaction state store
// doesn't wait here.
wait(logData->recoveryTxnReceived.getFuture());
}
if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) {
wait(self->concurrentLogRouterReads.take());
@ -1788,6 +1803,11 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
poppedVer = poppedVersion(logData, reqTag);
}
DisabledTraceEvent("TLogPeekMessages1", self->dbgid)
.detail("LogId", logData->logId)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("PoppedVer", poppedVer);
if (poppedVer > reqBegin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
@ -1832,7 +1852,9 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
onlySpilled = false;
// grab messages from disk
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2);
DisabledTraceEvent("TLogPeekMessages2", self->dbgid)
.detail("ReqBegin", reqBegin)
.detail("Tag", reqTag.toString());
if (reqBegin <= logData->persistentDataDurableVersion) {
// Just in case the durable version changes while we are waiting for the read, we grab this data from
// memory. We may or may not actually send it depending on whether we get enough data from disk. SOMEDAY:
@ -1993,13 +2015,12 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
reply.end = endVersion;
reply.onlySpilled = onlySpilled;
// TraceEvent("TlogPeek", self->dbgid)
// .detail("LogId", logData->logId)
// .detail("Tag", req.tag.toString())
// .detail("BeginVer", req.begin)
// .detail("EndVer", reply.end)
// .detail("MsgBytes", reply.messages.expectedSize())
// .detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
DisabledTraceEvent("TLogPeekMessages4", self->dbgid)
.detail("LogId", logData->logId)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("EndVer", reply.end)
.detail("MsgBytes", reply.messages.expectedSize());
if (reqSequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
@ -2221,6 +2242,9 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.BeforeWaitForVersion");
}
if (req.prevVersion == logData->recoveredAt) {
logData->recoveryTxnVersion = req.version;
}
logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, req.minKnownCommittedVersion);
wait(logData->version.whenAtLeast(req.prevVersion));
@ -2274,6 +2298,15 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
}
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set(req.version);
if (logData->recoveryTxnReceived.canBeSet() &&
(req.prevVersion == 0 || req.prevVersion == logData->recoveredAt)) {
TraceEvent("TLogInfo", self->dbgid)
.detail("Log", logData->logId)
.detail("Prev", req.prevVersion)
.detail("RecoveredAt", logData->recoveredAt)
.detail("RecoveryTxnVersion", req.version);
logData->recoveryTxnReceived.send(Void());
}
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
self->unknownCommittedVersions.push_front(std::make_tuple(req.version, req.tLogCount));
while (!self->unknownCommittedVersions.empty() &&
@ -2777,6 +2810,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
state Version ver = 0;
state std::vector<TagsAndMessage> messages;
state bool pullingRecoveryData = endVersion.present() && endVersion.get() == logData->recoveredAt;
loop {
state bool foundMessage = r->hasMessage();
if (!foundMessage || r->version().version != ver) {
@ -2814,6 +2848,13 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages
// actors
logData->version.set(ver);
if (logData->recoveryTxnReceived.canBeSet() && !pullingRecoveryData && ver > logData->recoveredAt) {
TraceEvent("TLogInfo", self->dbgid)
.detail("Log", logData->logId)
.detail("RecoveredAt", logData->recoveredAt)
.detail("RecoveryTxnVersion", ver);
logData->recoveryTxnReceived.send(Void());
}
wait(yield(TaskPriority::TLogCommit));
}
lastVer = ver;

View File

@ -507,7 +507,7 @@ Future<Version> TagPartitionedLogSystem::push(Version prevVersion,
Version knownCommittedVersion,
Version minKnownCommittedVersion,
LogPushData& data,
SpanID const& spanContext,
SpanContext const& spanContext,
Optional<UID> debugID,
Optional<std::unordered_map<uint16_t, Version>> tpcvMap) {
// FIXME: Randomize request order as in LegacyLogSystem?

View File

@ -191,7 +191,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
Version knownCommittedVersion,
Version minKnownCommittedVersion,
LogPushData& data,
SpanID const& spanContext,
SpanContext const& spanContext,
Optional<UID> debugID,
Optional<std::unordered_map<uint16_t, Version>> tpcvMap) final;

View File

@ -2108,7 +2108,9 @@ public:
return !reading() && !writing();
}
Future<Void> onEvictable() const { return ready(readFuture) && writeFuture; }
// Entry is evictable when its write and read futures are ready, even if they are
// errors, so any buffers they hold are no longer needed by the underlying file actors
Future<Void> onEvictable() const { return ready(readFuture) && ready(writeFuture); }
};
typedef ObjectCache<LogicalPageID, PageCacheEntry> PageCacheT;
@ -3761,7 +3763,9 @@ public:
// Must wait for pending operations to complete, canceling them can cause a crash because the underlying
// operations may be uncancellable and depend on memory from calling scope's page reference
debug_printf("DWALPager(%s) shutdown wait for operations\n", self->filename.c_str());
wait(waitForAll(self->operations));
// Pending ops must be all ready, errors are okay
wait(waitForAllReady(self->operations));
self->operations.clear();
debug_printf("DWALPager(%s) shutdown destroy page cache\n", self->filename.c_str());

View File

@ -120,7 +120,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
};
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
state Span span("M:getVersion"_loc, { req.spanContext });
state Span span("M:getVersion"_loc, req.spanContext);
state std::map<UID, CommitProxyVersionReplies>::iterator proxyItr =
self->lastCommitProxyVersionReplies.find(req.requestingProxy); // lastCommitProxyVersionReplies never changes

View File

@ -24,8 +24,10 @@
#include <unordered_map>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/Error.h"
@ -1395,8 +1397,8 @@ void updateProcessStats(StorageServer* self) {
#pragma region Queries
#endif
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) {
state Span span("SS.WaitForVersion"_loc, { spanContext });
ACTOR Future<Version> waitForVersionActor(StorageServer* data, Version version, SpanContext spanContext) {
state Span span("SS.WaitForVersion"_loc, spanContext);
choose {
when(wait(data->version.whenAtLeast(version))) {
// FIXME: A bunch of these can block with or without the following delay 0.
@ -1433,7 +1435,7 @@ Version getLatestCommitVersion(VersionVector& ssLatestCommitVersions, Tag& tag)
return commitVersion;
}
Future<Version> waitForVersion(StorageServer* data, Version version, SpanID spanContext) {
Future<Version> waitForVersion(StorageServer* data, Version version, SpanContext spanContext) {
if (version == latestVersion) {
version = std::max(Version(1), data->version.get());
}
@ -1454,7 +1456,10 @@ Future<Version> waitForVersion(StorageServer* data, Version version, SpanID span
return waitForVersionActor(data, version, spanContext);
}
Future<Version> waitForVersion(StorageServer* data, Version commitVersion, Version readVersion, SpanID spanContext) {
Future<Version> waitForVersion(StorageServer* data,
Version commitVersion,
Version readVersion,
SpanContext spanContext) {
ASSERT(commitVersion == invalidVersion || commitVersion < readVersion);
if (commitVersion == invalidVersion) {
@ -1528,11 +1533,11 @@ Optional<TenantMapEntry> StorageServer::getTenantEntry(Version version, TenantIn
ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
state int64_t resultSize = 0;
Span span("SS:getValue"_loc, { req.spanContext });
Span span("SS:getValue"_loc, req.spanContext);
if (req.tenantInfo.name.present()) {
span.addTag("tenant"_sr, req.tenantInfo.name.get());
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
span.addTag("key"_sr, req.key);
span.addAttribute("key"_sr, req.key);
// Temporarily disabled -- this path is hit a lot
// getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
@ -1665,9 +1670,9 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// must be kept alive until the watch is finished.
extern size_t WATCH_OVERHEAD_WATCHQ, WATCH_OVERHEAD_WATCHIMPL;
ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanID parent, KeyRef key) {
ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext parent, KeyRef key) {
state Location spanLocation = "SS:watchWaitForValueChange"_loc;
state Span span(spanLocation, { parent });
state Span span(spanLocation, parent);
state Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(key);
if (metadata->debugID.present())
@ -1774,8 +1779,8 @@ void checkCancelWatchImpl(StorageServer* data, WatchValueRequest req) {
ACTOR Future<Void> watchValueSendReply(StorageServer* data,
WatchValueRequest req,
Future<Version> resp,
SpanID spanContext) {
state Span span("SS:watchValue"_loc, { spanContext });
SpanContext spanContext) {
state Span span("SS:watchValue"_loc, spanContext);
state double startTime = now();
++data->counters.watchQueries;
++data->numWatches;
@ -2503,7 +2508,7 @@ ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq
}
ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req, UID streamUID) {
state Span span("SS:getChangeFeedStream"_loc, { req.spanContext });
state Span span("SS:getChangeFeedStream"_loc, req.spanContext);
state bool atLatest = false;
state bool removeUID = false;
state Optional<Version> blockedVersion;
@ -2859,7 +2864,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
KeyRange range,
int limit,
int* pLimitBytes,
SpanID parentSpan,
SpanContext parentSpan,
IKeyValueStore::ReadType type,
Optional<Key> tenantPrefix) {
state GetKeyValuesReply result;
@ -3098,7 +3103,7 @@ ACTOR Future<Key> findKey(StorageServer* data,
Version version,
KeyRange range,
int* pOffset,
SpanID parentSpan,
SpanContext parentSpan,
IKeyValueStore::ReadType type)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
@ -3119,7 +3124,7 @@ ACTOR Future<Key> findKey(StorageServer* data,
state int sign = forward ? +1 : -1;
state bool skipEqualKey = sel.orEqual == forward;
state int distance = forward ? sel.offset : 1 - sel.offset;
state Span span("SS.findKey"_loc, { parentSpan });
state Span span("SS.findKey"_loc, parentSpan);
// Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from
// the read range in this case)
@ -3217,16 +3222,16 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
state Span span("SS:getKeyValues"_loc, { req.spanContext });
state Span span("SS:getKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addTag("tenant"_sr, req.tenantInfo.name.get());
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
++data->counters.getRangeQueries;
++data->counters.allQueries;
@ -3711,16 +3716,16 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
state Span span("SS:getMappedKeyValues"_loc, { req.spanContext });
state Span span("SS:getMappedKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addTag("tenant"_sr, req.tenantInfo.name.get());
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
++data->counters.getMappedRangeQueries;
++data->counters.allQueries;
@ -3925,13 +3930,13 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
state Span span("SS:getKeyValuesStream"_loc, { req.spanContext });
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addTag("tenant"_sr, req.tenantInfo.name.get());
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
@ -4129,12 +4134,12 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
}
ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
state Span span("SS:getKey"_loc, { req.spanContext });
state Span span("SS:getKey"_loc, req.spanContext);
if (req.tenantInfo.name.present()) {
span.addTag("tenant"_sr, req.tenantInfo.name.get());
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
state int64_t resultSize = 0;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
++data->counters.getKeyQueries;
++data->counters.allQueries;
@ -6797,7 +6802,9 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
}
data->tlogCursorReadsLatencyHistogram->sampleSeconds(now() - beforeTLogCursorReads);
if (cursor->popped() > 0) {
TraceEvent("StorageServerWorkerRemoved", data->thisServerID).detail("Reason", "PeekPoppedTLogData");
TraceEvent("StorageServerWorkerRemoved", data->thisServerID)
.detail("Reason", "PeekPoppedTLogData")
.detail("Version", cursor->popped());
throw worker_removed();
}
@ -6851,6 +6858,10 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
SpanContextMessage::isNextIn(cloneReader)) {
SpanContextMessage scm;
cloneReader >> scm;
} else if (cloneReader.protocolVersion().hasOTELSpanContext() &&
OTELSpanContextMessage::isNextIn(cloneReader)) {
OTELSpanContextMessage scm;
cloneReader >> scm;
} else {
MutationRef msg;
cloneReader >> msg;
@ -6933,7 +6944,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
state Version ver = invalidVersion;
cloneCursor2->setProtocolVersion(data->logProtocol);
state SpanID spanContext = SpanID();
state SpanContext spanContext = SpanContext();
state double beforeTLogMsgsUpdates = now();
state std::set<Key> updatedChangeFeeds;
for (; cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
@ -6967,17 +6978,27 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
data->logProtocol = rd.protocolVersion();
data->storage.changeLogProtocol(ver, data->logProtocol);
cloneCursor2->setProtocolVersion(rd.protocolVersion());
spanContext = UID();
spanContext.traceID = UID();
} else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) {
SpanContextMessage scm;
rd >> scm;
TEST(true); // storageserveractor converting SpanContextMessage into OTEL SpanContext
spanContext =
SpanContext(UID(scm.spanContext.first(), scm.spanContext.second()),
0,
scm.spanContext.first() != 0 && scm.spanContext.second() != 0 ? TraceFlags::sampled
: TraceFlags::unsampled);
} else if (rd.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(rd)) {
TEST(true); // storageserveractor reading OTELSpanContextMessage
OTELSpanContextMessage scm;
rd >> scm;
spanContext = scm.spanContext;
} else {
MutationRef msg;
rd >> msg;
Span span("SS:update"_loc, { spanContext });
span.addTag("key"_sr, msg.param1);
Span span("SS:update"_loc, spanContext);
span.addAttribute("key"_sr, msg.param1);
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
// quarantine.
@ -8410,11 +8431,11 @@ ACTOR Future<Void> serveGetKeyRequests(StorageServer* self, FutureStream<GetKeyR
ACTOR Future<Void> watchValueWaitForVersion(StorageServer* self,
WatchValueRequest req,
PromiseStream<WatchValueRequest> stream) {
state Span span("SS:watchValueWaitForVersion"_loc, { req.spanContext });
state Span span("SS:watchValueWaitForVersion"_loc, req.spanContext);
if (req.tenantInfo.name.present()) {
span.addTag("tenant"_sr, req.tenantInfo.name.get());
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
try {
wait(success(waitForVersionNoTooOld(self, req.version)));
Optional<TenantMapEntry> entry = self->getTenantEntry(latestVersion, req.tenantInfo);
@ -8432,11 +8453,11 @@ ACTOR Future<Void> watchValueWaitForVersion(StorageServer* self,
ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream<WatchValueRequest> stream) {
loop {
getCurrentLineage()->modify(&TransactionLineage::txID) = 0;
getCurrentLineage()->modify(&TransactionLineage::txID) = UID();
state WatchValueRequest req = waitNext(stream);
state Reference<ServerWatchMetadata> metadata = self->getWatchMetadata(req.key.contents());
state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext });
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
state Span span("SS:serveWatchValueRequestsImpl"_loc, req.spanContext);
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
// case 1: no watch set for the current key
if (!metadata.isValid()) {

View File

@ -80,8 +80,8 @@ struct TransactionWrapper : public ReferenceCounted<TransactionWrapper> {
// Gets the version vector cached in a transaction
virtual VersionVector getVersionVector() = 0;
// Gets the spanID of a transaction
virtual UID getSpanID() = 0;
// Gets the spanContext of a transaction
virtual SpanContext getSpanContext() = 0;
// Prints debugging messages for a transaction; not implemented for all transaction types
virtual void debugTransaction(UID debugId) {}
@ -161,8 +161,8 @@ struct FlowTransactionWrapper : public TransactionWrapper {
// Gets the version vector cached in a transaction
VersionVector getVersionVector() override { return transaction.getVersionVector(); }
// Gets the spanID of a transaction
UID getSpanID() override { return transaction.getSpanID(); }
// Gets the spanContext of a transaction
SpanContext getSpanContext() override { return transaction.getSpanContext(); }
// Prints debugging messages for a transaction
void debugTransaction(UID debugId) override { transaction.debugTransaction(debugId); }
@ -229,8 +229,8 @@ struct ThreadTransactionWrapper : public TransactionWrapper {
// Gets the version vector cached in a transaction
VersionVector getVersionVector() override { return transaction->getVersionVector(); }
// Gets the spanID of a transaction
UID getSpanID() override { return transaction->getSpanID(); }
// Gets the spanContext of a transaction
SpanContext getSpanContext() override { return transaction->getSpanContext(); }
void addReadConflictRange(KeyRangeRef const& keys) override { transaction->addReadConflictRange(keys); }
};

View File

@ -873,7 +873,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
state Key begin = kr.begin;
state Key end = kr.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);
state Span span(SpanContext(deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64()),
"WL:ConsistencyCheck"_loc);
while (begin < end) {
state Reference<CommitProxyInfo> commitProxyInfo =

View File

@ -106,10 +106,9 @@ struct CycleWorkload : TestWorkload {
state Transaction tr(cx);
if (deterministicRandom()->random01() >= self->traceParentProbability) {
state Span span("CycleClient"_loc);
// TraceEvent("CycleTracingTransaction", span.context).log();
TraceEvent("CycleTracingTransaction", span.context).log();
TraceEvent("CycleTracingTransaction", span.context.traceID).log();
tr.setOption(FDBTransactionOptions::SPAN_PARENT,
BinaryWriter::toValue(span.context, Unversioned()));
BinaryWriter::toValue(span.context, IncludeVersion()));
}
while (true) {
try {

View File

@ -174,7 +174,7 @@ struct MiniCycleWorkload : TestWorkload {
state Transaction tr(cx);
if (deterministicRandom()->random01() >= self->traceParentProbability) {
state Span span("MiniCycleClient"_loc);
TraceEvent("MiniCycleTracingTransaction", span.context).log();
TraceEvent("MiniCycleTracingTransaction", span.context.traceID).log();
tr.setOption(FDBTransactionOptions::SPAN_PARENT,
BinaryWriter::toValue(span.context, Unversioned()));
}

View File

@ -21,6 +21,7 @@
#include "boost/asio/buffer.hpp"
#include "boost/asio/ip/address.hpp"
#include "boost/system/system_error.hpp"
#include "flow/Arena.h"
#include "flow/Platform.h"
#include "flow/Trace.h"
#include <algorithm>

View File

@ -2038,16 +2038,47 @@ static void enableLargePages() {
}
#ifndef _WIN32
static void* mmapSafe(void* addr, size_t len, int prot, int flags, int fd, off_t offset) {
void* result = mmap(addr, len, prot, flags, fd, offset);
if (result == MAP_FAILED) {
int err = errno;
fprintf(stderr,
"Error calling mmap(%p, %zu, %d, %d, %d, %jd): %s\n",
addr,
len,
prot,
flags,
fd,
(intmax_t)offset,
strerror(err));
fflush(stderr);
std::abort();
}
return result;
}
static void mprotectSafe(void* p, size_t s, int prot) {
if (mprotect(p, s, prot) != 0) {
int err = errno;
fprintf(stderr, "Error calling mprotect(%p, %zu, %d): %s\n", p, s, prot, strerror(err));
fflush(stderr);
std::abort();
}
}
static void* mmapInternal(size_t length, int flags, bool guardPages) {
if (guardPages) {
constexpr size_t pageSize = 4096;
static size_t pageSize = sysconf(_SC_PAGESIZE);
length = RightAlign(length, pageSize);
length += 2 * pageSize; // Map enough for the guard pages
void* resultWithGuardPages = mmap(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
mprotect(resultWithGuardPages, pageSize, PROT_NONE); // left guard page
mprotect((void*)(uintptr_t(resultWithGuardPages) + length - pageSize), pageSize, PROT_NONE); // right guard page
void* resultWithGuardPages = mmapSafe(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
// left guard page
mprotectSafe(resultWithGuardPages, pageSize, PROT_NONE);
// right guard page
mprotectSafe((void*)(uintptr_t(resultWithGuardPages) + length - pageSize), pageSize, PROT_NONE);
return (void*)(uintptr_t(resultWithGuardPages) + pageSize);
} else {
return mmap(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
return mmapSafe(nullptr, length, PROT_READ | PROT_WRITE, flags, -1, 0);
}
}
#endif

View File

@ -170,6 +170,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, Tenants);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageInterfaceReadiness);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking);
};

View File

@ -19,6 +19,7 @@
*/
#include "flow/Tracing.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include "flow/Knobs.h"
#include "flow/network.h"
@ -42,28 +43,11 @@ constexpr float kQueueSizeLogInterval = 5.0;
struct NoopTracer : ITracer {
TracerType type() const override { return TracerType::DISABLED; }
void trace(Span const& span) override {}
void trace(OTELSpan const& span) override {}
};
struct LogfileTracer : ITracer {
TracerType type() const override { return TracerType::LOG_FILE; }
void trace(Span const& span) override {
TraceEvent te(SevInfo, "TracingSpan", span.context);
te.detail("Location", span.location.name)
.detail("Begin", format("%.6f", span.begin))
.detail("End", format("%.6f", span.end));
if (span.parents.size() == 1) {
te.detail("Parent", *span.parents.begin());
} else {
for (auto parent : span.parents) {
TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent);
}
}
for (const auto& [key, value] : span.tags) {
TraceEvent(SevInfo, "TracingSpanTag", span.context).detail("Key", key).detail("Value", value);
}
}
void trace(OTELSpan const& span) override {
TraceEvent te(SevInfo, "TracingSpan", span.context.traceID);
te.detail("SpanID", span.context.spanID)
.detail("Location", span.location.name)
@ -183,31 +167,6 @@ struct UDPTracer : public ITracer {
// Serializes span fields as an array into the supplied TraceRequest
// buffer.
void serialize_span(const Span& span, TraceRequest& request) {
// If you change the serialization format here, make sure to update the
// fluentd filter to be able to correctly parse the updated format! See
// the msgpack specification for more info on the bit patterns used
// here.
uint8_t size = 8;
if (span.parents.size() == 0)
--size;
request.write_byte(size | 0b10010000); // write as array
serialize_string(g_network->getLocalAddress().toString(), request); // ip:port
serialize_value(span.context.first(), request, 0xcf); // trace id
serialize_value(span.context.second(), request, 0xcf); // token (span id)
serialize_value(span.begin, request, 0xcb); // start time
serialize_value(span.end - span.begin, request, 0xcb); // duration
serialize_string(span.location.name.toString(), request);
serialize_map(span.tags, request);
serialize_vector(span.parents, request);
}
void serialize_span(const OTELSpan& span, TraceRequest& request) {
uint16_t size = 14;
request.write_byte(size | 0b10010000); // write as array
serialize_value(span.context.traceID.first(), request, 0xcf); // trace id
@ -274,30 +233,6 @@ private:
serialize_string(reinterpret_cast<const uint8_t*>(str.data()), str.size(), request);
}
// Writes the given vector of SpanIDs to the request. If the vector is
// empty, the request is not modified.
inline void serialize_vector(const SmallVectorRef<SpanID>& vec, TraceRequest& request) {
int size = vec.size();
if (size == 0) {
return;
}
if (size <= 15) {
request.write_byte(static_cast<uint8_t>(size) | 0b10010000);
} else if (size <= 65535) {
request.write_byte(0xdc);
request.write_byte(reinterpret_cast<const uint8_t*>(&size)[1]);
request.write_byte(reinterpret_cast<const uint8_t*>(&size)[0]);
} else {
TraceEvent(SevWarn, "TracingSpanSerializeVector")
.detail("Failed to MessagePack encode very large vector", size);
ASSERT_WE_THINK(false);
}
for (const auto& parentContext : vec) {
serialize_value(parentContext.second(), request, 0xcf);
}
}
// Writes the given vector of linked SpanContext's to the request. If the vector is
// empty, the request is not modified.
inline void serialize_vector(const SmallVectorRef<SpanContext>& vec, TraceRequest& request) {
@ -322,7 +257,7 @@ private:
// Writes the given vector of linked SpanContext's to the request. If the vector is
// empty, the request is not modified.
inline void serialize_vector(const SmallVectorRef<OTELEventRef>& vec, TraceRequest& request) {
inline void serialize_vector(const SmallVectorRef<SpanEventRef>& vec, TraceRequest& request) {
int size = vec.size();
if (size <= 15) {
request.write_byte(static_cast<uint8_t>(size) | 0b10010000);
@ -453,12 +388,6 @@ struct FastUDPTracer : public UDPTracer {
request_.reset();
}
void trace(OTELSpan const& span) override {
prepare(span.location.name.size());
serialize_span(span, request_);
write();
}
void trace(Span const& span) override {
prepare(span.location.name.size());
serialize_span(span, request_);
@ -513,28 +442,6 @@ void openTracer(TracerType type) {
ITracer::~ITracer() {}
Span& Span::operator=(Span&& o) {
if (begin > 0.0 && context.second() > 0) {
end = g_network->now();
g_tracer->trace(*this);
}
arena = std::move(o.arena);
context = o.context;
begin = o.begin;
end = o.end;
location = o.location;
parents = std::move(o.parents);
o.begin = 0;
return *this;
}
Span::~Span() {
if (begin > 0.0 && context.second() > 0) {
end = g_network->now();
g_tracer->trace(*this);
}
}
OTELSpan& OTELSpan::operator=(OTELSpan&& o) {
if (begin > 0.0 && o.context.isSampled() > 0) {
end = g_network->now();
g_tracer->trace(*this);
@ -558,7 +465,7 @@ OTELSpan& OTELSpan::operator=(OTELSpan&& o) {
return *this;
}
OTELSpan::~OTELSpan() {
Span::~Span() {
if (begin > 0.0 && context.isSampled()) {
end = g_network->now();
g_tracer->trace(*this);
@ -567,16 +474,15 @@ OTELSpan::~OTELSpan() {
TEST_CASE("/flow/Tracing/CreateOTELSpan") {
// Sampling disabled, no parent.
OTELSpan notSampled("foo"_loc);
Span notSampled("foo"_loc);
ASSERT(!notSampled.context.isSampled());
// Force Sampling
OTELSpan sampled("foo"_loc, []() { return 1.0; });
ASSERT(sampled.context.isSampled());
// Span sampled("foo"_loc, []() { return 1.0; });
// ASSERT(sampled.context.isSampled());
// Ensure child traceID matches parent, when parent is sampled.
OTELSpan childTraceIDMatchesParent(
"foo"_loc, []() { return 1.0; }, SpanContext(UID(100, 101), 200, TraceFlags::sampled));
Span childTraceIDMatchesParent("foo"_loc, SpanContext(UID(100, 101), 200, TraceFlags::sampled));
ASSERT(childTraceIDMatchesParent.context.traceID.first() ==
childTraceIDMatchesParent.parentContext.traceID.first());
ASSERT(childTraceIDMatchesParent.context.traceID.second() ==
@ -584,22 +490,20 @@ TEST_CASE("/flow/Tracing/CreateOTELSpan") {
// When the parent isn't sampled AND it has legitimate values we should not sample a child,
// even if the child was randomly selected for sampling.
OTELSpan parentNotSampled(
"foo"_loc, []() { return 1.0; }, SpanContext(UID(1, 1), 1, TraceFlags::unsampled));
Span parentNotSampled("foo"_loc, SpanContext(UID(1, 1), 1, TraceFlags::unsampled));
ASSERT(!parentNotSampled.context.isSampled());
// When the parent isn't sampled AND it has zero values for traceID and spanID this means
// we should defer to the child as the new root of the trace as there was no actual parent.
// If the child was sampled we should send the child trace with a null parent.
OTELSpan noParent(
"foo"_loc, []() { return 1.0; }, SpanContext(UID(0, 0), 0, TraceFlags::unsampled));
ASSERT(noParent.context.isSampled());
// Span noParent("foo"_loc, SpanContext(UID(0, 0), 0, TraceFlags::unsampled));
// ASSERT(noParent.context.isSampled());
return Void();
};
TEST_CASE("/flow/Tracing/AddEvents") {
// Use helper method to add an OTELEventRef to an OTELSpan.
OTELSpan span1("span_with_event"_loc);
Span span1("span_with_event"_loc);
auto arena = span1.arena;
SmallVectorRef<KeyValueRef> attrs;
attrs.push_back(arena, KeyValueRef("foo"_sr, "bar"_sr));
@ -610,14 +514,14 @@ TEST_CASE("/flow/Tracing/AddEvents") {
ASSERT(span1.events[0].attributes.begin()->value.toString() == "bar");
// Use helper method to add an OTELEventRef with no attributes to an OTELSpan
OTELSpan span2("span_with_event"_loc);
Span span2("span_with_event"_loc);
span2.addEvent(StringRef(span2.arena, LiteralStringRef("commit_succeed")), 1234567.100);
ASSERT(span2.events[0].name.toString() == "commit_succeed");
ASSERT(span2.events[0].time == 1234567.100);
ASSERT(span2.events[0].attributes.size() == 0);
// Add fully constructed OTELEventRef to OTELSpan passed by value.
OTELSpan span3("span_with_event"_loc);
Span span3("span_with_event"_loc);
auto s3Arena = span3.arena;
SmallVectorRef<KeyValueRef> s3Attrs;
s3Attrs.push_back(s3Arena, KeyValueRef("xyz"_sr, "123"_sr));
@ -636,7 +540,10 @@ TEST_CASE("/flow/Tracing/AddEvents") {
};
TEST_CASE("/flow/Tracing/AddAttributes") {
OTELSpan span1("span_with_attrs"_loc);
Span span1("span_with_attrs"_loc,
SpanContext(deterministicRandom()->randomUniqueID(),
deterministicRandom()->randomUInt64(),
TraceFlags::sampled));
auto arena = span1.arena;
span1.addAttribute(StringRef(arena, LiteralStringRef("foo")), StringRef(arena, LiteralStringRef("bar")));
span1.addAttribute(StringRef(arena, LiteralStringRef("operation")), StringRef(arena, LiteralStringRef("grv")));
@ -644,25 +551,34 @@ TEST_CASE("/flow/Tracing/AddAttributes") {
ASSERT(span1.attributes[1] == KeyValueRef("foo"_sr, "bar"_sr));
ASSERT(span1.attributes[2] == KeyValueRef("operation"_sr, "grv"_sr));
OTELSpan span3("span_with_attrs"_loc);
auto s3Arena = span3.arena;
span3.addAttribute(StringRef(s3Arena, LiteralStringRef("a")), StringRef(s3Arena, LiteralStringRef("1")))
.addAttribute(StringRef(s3Arena, LiteralStringRef("b")), LiteralStringRef("2"))
.addAttribute(StringRef(s3Arena, LiteralStringRef("c")), LiteralStringRef("3"));
Span span2("span_with_attrs"_loc,
SpanContext(deterministicRandom()->randomUniqueID(),
deterministicRandom()->randomUInt64(),
TraceFlags::sampled));
auto s2Arena = span2.arena;
span2.addAttribute(StringRef(s2Arena, LiteralStringRef("a")), StringRef(s2Arena, LiteralStringRef("1")))
.addAttribute(StringRef(s2Arena, LiteralStringRef("b")), LiteralStringRef("2"))
.addAttribute(StringRef(s2Arena, LiteralStringRef("c")), LiteralStringRef("3"));
ASSERT_EQ(span3.attributes.size(), 4); // Includes default attribute of "address"
ASSERT(span3.attributes[1] == KeyValueRef("a"_sr, "1"_sr));
ASSERT(span3.attributes[2] == KeyValueRef("b"_sr, "2"_sr));
ASSERT(span3.attributes[3] == KeyValueRef("c"_sr, "3"_sr));
ASSERT_EQ(span2.attributes.size(), 4); // Includes default attribute of "address"
ASSERT(span2.attributes[1] == KeyValueRef("a"_sr, "1"_sr));
ASSERT(span2.attributes[2] == KeyValueRef("b"_sr, "2"_sr));
ASSERT(span2.attributes[3] == KeyValueRef("c"_sr, "3"_sr));
return Void();
};
TEST_CASE("/flow/Tracing/AddLinks") {
OTELSpan span1("span_with_links"_loc);
Span span1("span_with_links"_loc);
ASSERT(!span1.context.isSampled());
ASSERT(!span1.context.isValid());
span1.addLink(SpanContext(UID(100, 101), 200, TraceFlags::sampled));
span1.addLink(SpanContext(UID(200, 201), 300, TraceFlags::unsampled))
.addLink(SpanContext(UID(300, 301), 400, TraceFlags::sampled));
// Ensure the root span is now sampled and traceID and spanIDs are set.
ASSERT(span1.context.isSampled());
ASSERT(span1.context.isValid());
// Ensure links are present.
ASSERT(span1.links[0].traceID == UID(100, 101));
ASSERT(span1.links[0].spanID == 200);
ASSERT(span1.links[0].m_Flags == TraceFlags::sampled);
@ -673,11 +589,16 @@ TEST_CASE("/flow/Tracing/AddLinks") {
ASSERT(span1.links[2].spanID == 400);
ASSERT(span1.links[2].m_Flags == TraceFlags::sampled);
OTELSpan span2("span_with_links"_loc);
Span span2("span_with_links"_loc);
ASSERT(!span2.context.isSampled());
ASSERT(!span2.context.isValid());
auto link1 = SpanContext(UID(1, 1), 1, TraceFlags::sampled);
auto link2 = SpanContext(UID(2, 2), 2, TraceFlags::sampled);
auto link3 = SpanContext(UID(3, 3), 3, TraceFlags::sampled);
span2.addLinks({ link1, link2 }).addLinks({ link3 });
// Ensure the root span is now sampled and traceID and spanIDs are set.
ASSERT(span2.context.isSampled());
ASSERT(span2.context.isValid());
ASSERT(span2.links[0].traceID == UID(1, 1));
ASSERT(span2.links[0].spanID == 1);
ASSERT(span2.links[0].m_Flags == TraceFlags::sampled);
@ -741,7 +662,7 @@ std::string readMPString(uint8_t* index) {
// Windows doesn't like lack of header and declaration of constructor for FastUDPTracer
#ifndef WIN32
TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
OTELSpan span1("encoded_span"_loc);
Span span1("encoded_span"_loc);
auto request = TraceRequest{ .buffer = std::make_unique<uint8_t[]>(kTraceBufferSize),
.data_size = 0,
.buffer_size = kTraceBufferSize };
@ -753,9 +674,9 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
// Test - constructor OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link)
// Will delegate to other constructors.
OTELSpan span2("encoded_span"_loc,
SpanContext(UID(100, 101), 1, TraceFlags::sampled),
SpanContext(UID(200, 201), 2, TraceFlags::sampled));
Span span2("encoded_span"_loc,
SpanContext(UID(100, 101), 1, TraceFlags::sampled),
{ SpanContext(UID(200, 201), 2, TraceFlags::sampled) });
tracer.serialize_span(span2, request);
data = request.buffer.get();
ASSERT(data[0] == 0b10011110); // 14 element array.
@ -801,7 +722,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
request.reset();
// Exercise all fluent interfaces, include links, events, and attributes.
OTELSpan span3("encoded_span_3"_loc);
Span span3("encoded_span_3"_loc, SpanContext());
auto s3Arena = span3.arena;
SmallVectorRef<KeyValueRef> attrs;
attrs.push_back(s3Arena, KeyValueRef("foo"_sr, "bar"_sr));
@ -870,7 +791,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
"SGKKUrpIb/7zePhBDi+gzUzyAcbQ2zUbFWI1KNi3zQk58uUG6wWJZkw+GCs7Cc3V"
"OUxOljwCJkC4QTgdsbbFhxUC+rtoHV5xAqoTQwR0FXnWigUjP7NtdL6huJUr3qRv"
"40c4yUI1a4+P5vJa";
auto span4 = OTELSpan();
Span span4;
auto location = Location();
location.name = StringRef(span4.arena, longString);
span4.location = location;

View File

@ -21,6 +21,7 @@
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/FlowTransport.h"
#include "flow/IRandom.h"
#include <unordered_set>
#include <atomic>
@ -33,90 +34,43 @@ inline Location operator"" _loc(const char* str, size_t size) {
return Location{ StringRef(reinterpret_cast<const uint8_t*>(str), size) };
}
struct Span {
Span(SpanID context, Location location, std::initializer_list<SpanID> const& parents = {})
: context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) {
if (parents.size() > 0) {
// If the parents' token is 0 (meaning the trace should not be
// recorded), set the child token to 0 as well. Otherwise, generate
// a new, random token.
uint64_t traceId = 0;
if ((*parents.begin()).second() > 0) {
traceId = deterministicRandom()->randomUInt64();
}
this->context = SpanID((*parents.begin()).first(), traceId);
}
}
Span(Location location, std::initializer_list<SpanID> const& parents = {})
: Span(UID(deterministicRandom()->randomUInt64(),
deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE
? deterministicRandom()->randomUInt64()
: 0),
location,
parents) {}
Span(Location location, SpanID context) : Span(location, { context }) {}
Span(const Span&) = delete;
Span(Span&& o) {
arena = std::move(o.arena);
context = o.context;
begin = o.begin;
end = o.end;
location = o.location;
parents = std::move(o.parents);
o.context = UID();
o.begin = 0.0;
o.end = 0.0;
}
Span() {}
~Span();
Span& operator=(Span&& o);
Span& operator=(const Span&) = delete;
void swap(Span& other) {
std::swap(arena, other.arena);
std::swap(context, other.context);
std::swap(begin, other.begin);
std::swap(end, other.end);
std::swap(location, other.location);
std::swap(parents, other.parents);
}
enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
void addParent(SpanID span) {
if (parents.size() == 0) {
uint64_t traceId = 0;
if (span.second() > 0) {
traceId = context.second() == 0 ? deterministicRandom()->randomUInt64() : context.second();
}
// Use first parent to set trace ID. This is non-ideal for spans
// with multiple parents, because the trace ID will associate the
// span with only one trace. A workaround is to look at the parent
// relationships instead of the trace ID. Another option in the
// future is to keep a list of trace IDs.
context = SpanID(span.first(), traceId);
}
parents.push_back(arena, span);
inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) {
return static_cast<TraceFlags>(static_cast<std::underlying_type_t<TraceFlags>>(lhs) &
static_cast<std::underlying_type_t<TraceFlags>>(rhs));
}
struct SpanContext {
UID traceID;
uint64_t spanID;
TraceFlags m_Flags;
SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {}
SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {}
SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {}
SpanContext(Arena arena, const SpanContext& span)
: traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {}
bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; }
std::string toString() const { return format("%016llx%016llx%016llx", traceID.first(), traceID.second(), spanID); };
bool isValid() const { return traceID.first() != 0 && traceID.second() != 0 && spanID != 0; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, traceID, spanID, m_Flags);
}
void addTag(const StringRef& key, const StringRef& value) { tags[key] = value; }
Arena arena;
UID context = UID();
double begin = 0.0, end = 0.0;
Location location;
SmallVectorRef<SpanID> parents;
std::unordered_map<StringRef, StringRef> tags;
};
// OTELSpan
// Span
//
// OTELSpan is a tracing implementation which, for the most part, complies with the W3C Trace Context specification
// Span is a tracing implementation which, for the most part, complies with the W3C Trace Context specification
// https://www.w3.org/TR/trace-context/ and the OpenTelemetry API
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md.
//
// The major differences between OTELSpan and the current Span implementation, which is based off the OpenTracing.io
// The major differences between Span and the 7.0 Span implementation, which is based off the OpenTracing.io
// specification https://opentracing.io/ are as follows.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#span
//
// OTELSpans have...
// OpenTelemetry Spans have...
// 1. A SpanContext which consists of 3 attributes.
//
// TraceId - A valid trace identifier is a 16-byte array with at least one non-zero byte.
@ -146,82 +100,63 @@ enum class SpanKind : uint8_t { INTERNAL = 0, CLIENT = 1, SERVER = 2, PRODUCER =
enum class SpanStatus : uint8_t { UNSET = 0, OK = 1, ERR = 2 };
struct OTELEventRef {
OTELEventRef() {}
OTELEventRef(const StringRef& name,
struct SpanEventRef {
SpanEventRef() {}
SpanEventRef(const StringRef& name,
const double& time,
const SmallVectorRef<KeyValueRef>& attributes = SmallVectorRef<KeyValueRef>())
: name(name), time(time), attributes(attributes) {}
OTELEventRef(Arena& arena, const OTELEventRef& other)
SpanEventRef(Arena& arena, const SpanEventRef& other)
: name(arena, other.name), time(other.time), attributes(arena, other.attributes) {}
StringRef name;
double time = 0.0;
SmallVectorRef<KeyValueRef> attributes;
};
class OTELSpan {
class Span {
public:
OTELSpan(const SpanContext& context,
const Location& location,
const SpanContext& parentContext,
const std::initializer_list<SpanContext>& links = {})
// Construct a Span with a given context, location, parentContext and optional links.
//
// N.B. While this constructor receives a parentContext it does not overwrite the traceId of the Span's context.
// Therefore it is the responsibility of the caller to ensure the traceID and m_Flags of both the context and
// parentContext are identical if the caller wishes to establish a parent/child relationship between these spans. We
// do this to avoid needless comparisons or copies as this constructor is only called once in NativeAPI.actor.cpp
// and from below in the by the Span(location, parent, links) constructor. The Span(location, parent, links)
// constructor is used broadly and performs the copy of the parent's traceID and m_Flags.
Span(const SpanContext& context,
const Location& location,
const SpanContext& parentContext,
const std::initializer_list<SpanContext>& links = {})
: context(context), location(location), parentContext(parentContext), links(arena, links.begin(), links.end()),
begin(g_network->now()) {
// We've simplified the logic here, essentially we're now always setting trace and span ids and relying on the
// TraceFlags to determine if we're sampling. Therefore if the parent is sampled, we simply overwrite this
// span's traceID with the parent trace id.
if (parentContext.isSampled()) {
this->context.traceID = UID(parentContext.traceID.first(), parentContext.traceID.second());
this->context.m_Flags = TraceFlags::sampled;
} else {
// However there are two other cases.
// 1. A legitamite parent span exists but it was not selected for tracing.
// 2. There is no actual parent, just a default arg parent provided by the constructor AND the "child" span
// was selected for sampling. For case 1. we handle below by marking the child as unsampled. For case 2 we
// needn't do anything, and can rely on the values in this OTELSpan
if (parentContext.traceID.first() != 0 && parentContext.traceID.second() != 0 &&
parentContext.spanID != 0) {
this->context.m_Flags = TraceFlags::unsampled;
}
}
this->kind = SpanKind::SERVER;
this->status = SpanStatus::OK;
this->attributes.push_back(
this->arena, KeyValueRef("address"_sr, StringRef(this->arena, g_network->getLocalAddress().toString())));
// this->arena, KeyValueRef("address"_sr, StringRef(this->arena, "localhost:4000")));
this->arena,
KeyValueRef("address"_sr, StringRef(this->arena, FlowTransport::transport().getLocalAddressAsString())));
}
OTELSpan(const Location& location,
const SpanContext& parent = SpanContext(),
const std::initializer_list<SpanContext>& links = {})
: OTELSpan(
SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()), // traceID
deterministicRandom()->randomUInt64(), // spanID
deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE // sampled or unsampled
? TraceFlags::sampled
: TraceFlags::unsampled),
location,
parent,
links) {}
// Construct Span with a location, parent, and optional links.
// This constructor copies the parent's traceID creating a parent->child relationship between Spans.
// Additionally we inherit the m_Flags of the parent, thus enabling or disabling sampling to match the parent.
Span(const Location& location, const SpanContext& parent, const std::initializer_list<SpanContext>& links = {})
: Span(SpanContext(parent.traceID, deterministicRandom()->randomUInt64(), parent.m_Flags),
location,
parent,
links) {}
OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link)
: OTELSpan(location, parent, { link }) {}
// Construct Span without parent. Used for creating a root span, or when the parent is not known at construction
// time.
Span(const SpanContext& context, const Location& location) : Span(context, location, SpanContext()) {}
// NOTE: This constructor is primarly for unit testing until we sort out how to enable/disable a Knob dynamically in
// a test.
OTELSpan(const Location& location,
const std::function<double()>& rateProvider,
const SpanContext& parent = SpanContext(),
const std::initializer_list<SpanContext>& links = {})
: OTELSpan(SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()),
deterministicRandom()->randomUInt64(),
deterministicRandom()->random01() < rateProvider() ? TraceFlags::sampled
: TraceFlags::unsampled),
location,
parent,
links) {}
// We've determined for initial tracing release, spans with only a location will not be traced.
// Generally these are for background processes, some are called infrequently, while others may be high volume.
// TODO: review and address in subsequent PRs.
Span(const Location& location) : location(location), begin(g_network->now()) {}
OTELSpan(const OTELSpan&) = delete;
OTELSpan(OTELSpan&& o) {
Span(const Span&) = delete;
Span(Span&& o) {
arena = std::move(o.arena);
context = o.context;
location = o.location;
@ -239,11 +174,11 @@ public:
o.end = 0.0;
o.status = SpanStatus::UNSET;
}
OTELSpan() {}
~OTELSpan();
OTELSpan& operator=(OTELSpan&& o);
OTELSpan& operator=(const OTELSpan&) = delete;
void swap(OTELSpan& other) {
Span() {}
~Span();
Span& operator=(Span&& o);
Span& operator=(const Span&) = delete;
void swap(Span& other) {
std::swap(arena, other.arena);
std::swap(context, other.context);
std::swap(location, other.location);
@ -256,34 +191,53 @@ public:
std::swap(events, other.events);
}
OTELSpan& addLink(const SpanContext& linkContext) {
Span& addLink(const SpanContext& linkContext) {
links.push_back(arena, linkContext);
return *this;
}
OTELSpan& addLinks(const std::initializer_list<SpanContext>& linkContexts = {}) {
for (auto const& sc : linkContexts) {
links.push_back(arena, sc);
// Check if link is sampled, if so sample this span.
if (!context.isSampled() && linkContext.isSampled()) {
context.m_Flags = TraceFlags::sampled;
// If for some reason this span isn't valid, we need to give it a
// traceID and spanID. This case is currently hit in CommitProxyServer
// CommitBatchContext::CommitBatchContext and CommitBatchContext::setupTraceBatch.
if (!context.isValid()) {
context.traceID = deterministicRandom()->randomUniqueID();
context.spanID = deterministicRandom()->randomUInt64();
}
}
return *this;
}
OTELSpan& addEvent(const OTELEventRef& event) {
Span& addLinks(const std::initializer_list<SpanContext>& linkContexts = {}) {
for (auto const& sc : linkContexts) {
addLink(sc);
}
return *this;
}
Span& addEvent(const SpanEventRef& event) {
events.push_back_deep(arena, event);
return *this;
}
OTELSpan& addEvent(const StringRef& name,
const double& time,
const SmallVectorRef<KeyValueRef>& attrs = SmallVectorRef<KeyValueRef>()) {
return addEvent(OTELEventRef(name, time, attrs));
Span& addEvent(const StringRef& name,
const double& time,
const SmallVectorRef<KeyValueRef>& attrs = SmallVectorRef<KeyValueRef>()) {
return addEvent(SpanEventRef(name, time, attrs));
}
OTELSpan& addAttribute(const StringRef& key, const StringRef& value) {
Span& addAttribute(const StringRef& key, const StringRef& value) {
attributes.push_back_deep(arena, KeyValueRef(key, value));
return *this;
}
Span& setParent(const SpanContext& parent) {
parentContext = parent;
context.traceID = parent.traceID;
context.spanID = deterministicRandom()->randomUInt64();
context.m_Flags = parent.m_Flags;
return *this;
}
Arena arena;
SpanContext context;
Location location;
@ -292,7 +246,7 @@ public:
SmallVectorRef<SpanContext> links;
double begin = 0.0, end = 0.0;
SmallVectorRef<KeyValueRef> attributes; // not necessarily sorted
SmallVectorRef<OTELEventRef> events;
SmallVectorRef<SpanEventRef> events;
SpanStatus status;
};
@ -311,7 +265,6 @@ struct ITracer {
virtual TracerType type() const = 0;
// passed ownership to the tracer
virtual void trace(Span const& span) = 0;
virtual void trace(OTELSpan const& span) = 0;
};
void openTracer(TracerType type);
@ -328,16 +281,3 @@ struct SpannedDeque : Deque<T> {
span = std::move(other.span);
}
};
template <class T>
struct OTELSpannedDeque : Deque<T> {
OTELSpan span;
explicit OTELSpannedDeque(Location loc) : span(loc) {}
OTELSpannedDeque(OTELSpannedDeque&& other) : Deque<T>(std::move(other)), span(std::move(other.span)) {}
OTELSpannedDeque(OTELSpannedDeque const&) = delete;
OTELSpannedDeque& operator=(OTELSpannedDeque const&) = delete;
OTELSpannedDeque& operator=(OTELSpannedDeque&& other) {
*static_cast<Deque<T>*>(this) = std::move(other);
span = std::move(other.span);
}
};

View File

@ -518,10 +518,16 @@ class Server(BaseHTTPRequestHandler):
return
if self.path.startswith("/check_hash/"):
try:
self.send_text(check_hash(self.path[12:]), add_newline=False)
self.send_text(check_hash(os.path.basename(self.path)), add_newline=False)
except FileNotFoundError:
self.send_error(404, "Path not found")
self.end_headers()
if self.path.startswith("/is_present/"):
if is_present(os.path.basename(self.path))):
self.send_text("OK")
else:
self.send_error(404, "Path not found")
self.end_headers()
elif self.path == "/ready":
self.send_text(ready())
elif self.path == "/substitutions":
@ -599,6 +605,10 @@ def check_hash(filename):
return m.hexdigest()
def is_present(filename):
return os.path.exists(os.path.join(Config.shared().output_dir, filename))
def copy_files():
config = Config.shared()
if config.require_not_empty: