Merge remote-tracking branch 'origin/master' into features/udp

This commit is contained in:
Markus Pilman 2020-08-11 15:35:52 -06:00
commit 4782c7c4b1
17 changed files with 1403 additions and 709 deletions

View File

@ -0,0 +1,52 @@
# fdbcstat
`fdbcstat` is a FoundationDB client monitoring tool which collects and displays transaction operation statistics inside the C API library (`libfdb_c.so`).
## How it works
`fdbcstat` utilizes [eBPF/bcc](https://github.com/iovisor/bcc) to attach to `libfdb_c.so` shared library and insert special instructions to collect statistics in several common `fdb_transaction_*` calls, then it periodically displays the aggregated statistics.
## How to use
### Syntax
`fdbcstat <full path to libfdb_c.so> <options...>`
### Options
- `-p` or `--pid` : Only capture statistics for the functions called by the specified process
- `-i` or `--interval` : Specify the time interval in seconds between 2 outputs (Default: 1)
- `-d` or `--duration` : Specify the total duration in seconds `fdbcstats` will run (Default: Unset / Forever)
- `-f` or `--functions` : Specify the comma-separated list of functions to monitor (Default: Unset / All supported functions)
### Supported Functions
- get
- get_range
- get_read_version
- set
- clear
- clear_range
- commit
### Examples
##### Collect all statistics and display every second
`fdbcstat /usr/lib64/libfdb_c.so`
##### Collect all statistics for PID 12345 for 60 seconds with 10 second interval
`fdbcstat /usr/lib64/libfdb_c.so -p 12345 -d 60 -i 10`
##### Collect statitics only for get and commit
`fdbcstat /usr/lib64/libfdb_c.so -f get,commit`
## Output Format
Each line contains multiple fields. The first field is the timestamp. Other fields are the statistics for each operation. Each operation field contains the following statistics in a slash (/) separated format.
- Function
- Number of calls per second
- Average latency in microseconds (us)
- Maximum latency in microseconds (us)
**Note**: The latency is computed as the time difference between the start time and the end time of the `fdb_transaction_*` function call except for `get`, `get_range`, `get_read_version` and `commit`. For those 4 functions, the latency is the time difference between the start time of the function and the end time of the following `fdb_future_block_until_ready` call.
## Sample Output
```
...
15:05:31 clear/22426/2/34 commit/18290/859/15977 get/56230/1110/12748 get_range/14141/23/75 set/6276/3/19
15:05:41 clear/24147/2/38 commit/18259/894/44259 get/57978/1098/15636 get_range/13171/23/90 set/6564/3/15
15:05:51 clear/21287/2/34 commit/18386/876/17824 get/58318/1106/30539 get_range/13018/23/68 set/6559/3/13
...
```

304
contrib/fdbcstat/fdbcstat Executable file
View File

@ -0,0 +1,304 @@
#!/usr/bin/env python
from __future__ import print_function
from bcc import BPF
from time import sleep, strftime, time
import argparse
import signal
description = """The fdbcstat utility displays FDB C API statistics on terminal
that include calls-per-second, average latency and maximum latency
within the given time interval.
Each field in the output represents the following elements
in a slash-separated format:
- Operation type
- Number of calls per second
- Average latency in microseconds (us)
- Maximum latency in microseconds (us)
"""
# supported APIs
# note: the array index is important here.
# it's used in BPF as the funciton identifier.
# 0: get
# 1: get_range
# 2: get_read_version
# 3: set
# 4: clear
# 5: clear_range
# 6: commit
fdbfuncs = [
{ "name":"get", "waitfuture":True, "enabled":True },
{ "name":"get_range", "waitfuture":True, "enabled":True },
{ "name":"get_read_version", "waitfuture":True, "enabled":True },
{ "name":"set", "waitfuture":False, "enabled":True },
{ "name":"clear", "waitfuture":False, "enabled":True },
{ "name":"clear_range", "waitfuture":False, "enabled":True },
{ "name":"commit", "waitfuture":True, "enabled":True }
]
# arguments
parser = argparse.ArgumentParser(
description="FoundationDB client statistics collector",
formatter_class=argparse.RawTextHelpFormatter,
epilog=description)
parser.add_argument("-p", "--pid", type=int,
help="Capture for this PID only")
parser.add_argument("-i", "--interval", type=int,
help="Print interval in seconds (Default: 1 second)")
parser.add_argument("-d", "--duration", type=int,
help="Duration in seconds (Default: unset)")
parser.add_argument("-f", "--functions", type=str,
help='''Capture for specific functions (comma-separated) (Default: unset)
Supported functions: get, get_range, get_read_version,
set, clear, clear_range, commit''')
parser.add_argument("libpath",
help="Full path to libfdb_c.so")
args = parser.parse_args()
if not args.interval:
args.interval = 1
if args.functions:
# reset all
idx=0
while idx < len(fdbfuncs):
fdbfuncs[idx]['enabled'] = False
idx += 1
# enable specified functions
for f in args.functions.split(','):
idx=0
while idx < len(fdbfuncs):
if fdbfuncs[idx]['name'] == f:
fdbfuncs[idx]['enabled'] = True
idx += 1
# check for libfdb_c.so
libpath = BPF.find_library(args.libpath) or BPF.find_exe(args.libpath)
if libpath is None:
print("Error: Can't find %s" % args.libpath)
exit(1)
# main BPF program
# we do not rely on PT_REGS_IP() and BPF.sym() to retrive the symbol name
# because some "backword-compatible" symbols do not get resovled through BPF.sym().
bpf_text = """
#include <uapi/linux/ptrace.h>
typedef struct _stats_key_t {
u32 pid;
u32 func;
} stats_key_t;
typedef struct _stats_val_t {
u64 cnt;
u64 total;
u64 max;
} stats_val_t;
BPF_HASH(starttime, u32, u64);
BPF_HASH(startfunc, u32, u32);
BPF_HASH(stats, stats_key_t, stats_val_t);
static int trace_common_entry(struct pt_regs *ctx, u32 func)
{
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid; /* lower 32-bit = Process ID (Thread ID) */
u32 tgid = pid_tgid >> 32; /* upper 32-bit = Thread Group ID (Process ID) */
/* if PID is specified, we'll filter by tgid here */
FILTERPID
/* start time in ns */
u64 ts = bpf_ktime_get_ns();
/* function type */
u32 f = func;
startfunc.update(&pid, &f);
/* update start time */
starttime.update(&pid, &ts);
return 0;
}
int trace_get_entry(struct pt_regs *ctx)
{
return trace_common_entry(ctx, 0);
}
int trace_get_range_entry(struct pt_regs *ctx)
{
return trace_common_entry(ctx, 1);
}
int trace_get_read_version_entry(struct pt_regs *ctx)
{
return trace_common_entry(ctx, 2);
}
int trace_set_entry(struct pt_regs *ctx)
{
return trace_common_entry(ctx, 3);
}
int trace_clear_entry(struct pt_regs *ctx)
{
return trace_common_entry(ctx, 4);
}
int trace_clear_range_entry(struct pt_regs *ctx)
{
return trace_common_entry(ctx, 5);
}
int trace_commit_entry(struct pt_regs *ctx)
{
return trace_common_entry(ctx, 6);
}
int trace_func_return(struct pt_regs *ctx)
{
u64 *st; /* start time */
u64 duration;
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid;
u32 tgid = pid_tgid >> 32;
/* if PID is specified, we'll filter by tgid here */
FILTERPID
/* calculate duration in ns */
st = starttime.lookup(&pid);
if (!st || st == 0) {
return 0; /* missed start */
}
/* duration in ns */
duration = bpf_ktime_get_ns() - *st;
starttime.delete(&pid);
/* update stats */
u32 func, *funcp = startfunc.lookup(&pid);
if (funcp) {
func = *funcp;
stats_key_t key;
stats_val_t *prev;
stats_val_t cur;
key.pid = pid; /* pid here is the thread ID in user space */
key.func = func;
prev = stats.lookup(&key);
if (prev) {
cur.cnt = prev->cnt + 1;
cur.total = prev->total + duration;
cur.max = (duration > prev->max) ? duration : prev->max;
stats.update(&key, &cur);
} else {
cur.cnt = 1;
cur.total = duration;
cur.max = duration;
stats.insert(&key, &cur);
}
startfunc.delete(&pid);
}
return 0;
}
"""
# If PID is specified, insert the PID filter
if args.pid:
bpf_text = bpf_text.replace('FILTERPID',
'if (tgid != %d) { return 0; }' % args.pid)
else:
bpf_text = bpf_text.replace('FILTERPID', '')
# signal handler
def signal_ignore(signal, frame):
pass
# load BPF program
b = BPF(text=bpf_text)
# attach probes
waitfuture = False;
for f in fdbfuncs:
# skip disabled functions
if not f['enabled']:
continue
# attach the entry point
b.attach_uprobe(name=libpath, sym='fdb_transaction_'+f['name'],
fn_name='trace_' + f['name'] + '_entry', pid=args.pid or -1)
if f['waitfuture']:
waitfuture = True
else:
b.attach_uretprobe(name=libpath, sym='fdb_transaction_'+f['name'],
fn_name="trace_func_return", pid=args.pid or -1)
if waitfuture:
b.attach_uretprobe(name=libpath, sym='fdb_future_block_until_ready',
fn_name="trace_func_return", pid=args.pid or -1)
# open uprobes
matched = b.num_open_uprobes()
if matched == 0:
print("0 functions matched... Exiting.")
exit()
stats = b.get_table("stats")
# aggregated stats dictionary
agg = {}
exiting = 0
seconds = 0
prev = 0.0
now = 0.0
# main loop
while (1):
try:
sleep(args.interval)
seconds += args.interval
prev = now
now = time()
if prev == 0:
stats.clear()
continue
except KeyboardInterrupt:
exiting = 1
signal.signal(signal.SIGINT, signal_ignore)
if args.duration and seconds >= args.duration:
exiting = 1
# walk through the stats and aggregate by the functions
for k,v in stats.items():
f = fdbfuncs[k.func]['name']
if f in agg:
# update an exiting entry
agg[f]['cnt'] = agg[f]['cnt'] + v.cnt
agg[f]['total'] = agg[f]['total'] + v.total;
if v.cnt > agg[f]['max']:
agg[f]['max'] = v.cnt
else:
# insert a new entry
agg[f] = {'cnt':v.cnt, 'total':v.total, 'max':v.max}
# print out aggregated stats
print("%-8s " % (strftime("%H:%M:%S")), end="", flush=True)
for f in sorted(agg):
print("%s/%d/%d/%d " % (f,
agg[f]['cnt'] / (now - prev),
agg[f]['total']/agg[f]['cnt'] / 1000, # us
agg[f]['max'] / 1000), # us
end="")
print()
stats.clear()
agg.clear()
if exiting:
exit()

View File

@ -581,6 +581,11 @@ void initHelp() {
"view and control throttled tags",
"Use `on' and `off' to manually throttle or unthrottle tags. Use `enable auto' or `disable auto' to enable or disable automatic tag throttling. Use `list' to print the list of throttled tags.\n"
);
helpMap["cache_range"] = CommandHelp(
"cache_range <set|clear> <BEGINKEY> <ENDKEY>",
"Mark a key range to add to or remove from storage caches.",
"Use the storage caches to assist in balancing hot read shards. Set the appropriate ranges when experiencing heavy load, and clear them when they are no longer necessary."
);
helpMap["lock"] = CommandHelp(
"lock",
"lock the database with a randomly generated lockUID",
@ -4316,6 +4321,24 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
continue;
}
if (tokencmp(tokens[0], "cache_range")) {
if (tokens.size() != 4) {
printUsage(tokens[0]);
is_error = true;
continue;
}
KeyRangeRef cacheRange(tokens[2], tokens[3]);
if (tokencmp(tokens[1], "set")) {
wait(makeInterruptable(addCachedRange(db, cacheRange)));
} else if (tokencmp(tokens[1], "clear")) {
wait(makeInterruptable(removeCachedRange(db, cacheRange)));
} else {
printUsage(tokens[0]);
is_error = true;
}
continue;
}
printf("ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
is_error = true;

View File

@ -50,7 +50,6 @@ struct MasterProxyInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
RequestStream< struct TxnStateRequest > txnState;
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
RequestStream< struct ProxySnapRequest > proxySnapReq;
@ -71,12 +70,11 @@ struct MasterProxyInterface {
getKeyServersLocations = RequestStream< struct GetKeyServerLocationsRequest >( commit.getEndpoint().getAdjustedEndpoint(2) );
getStorageServerRejoinInfo = RequestStream< struct GetStorageServerRejoinInfoRequest >( commit.getEndpoint().getAdjustedEndpoint(3) );
waitFailure = RequestStream<ReplyPromise<Void>>( commit.getEndpoint().getAdjustedEndpoint(4) );
getRawCommittedVersion = RequestStream< struct GetRawCommittedVersionRequest >( commit.getEndpoint().getAdjustedEndpoint(5) );
txnState = RequestStream< struct TxnStateRequest >( commit.getEndpoint().getAdjustedEndpoint(6) );
getHealthMetrics = RequestStream< struct GetHealthMetricsRequest >( commit.getEndpoint().getAdjustedEndpoint(7) );
proxySnapReq = RequestStream< struct ProxySnapRequest >( commit.getEndpoint().getAdjustedEndpoint(8) );
exclusionSafetyCheckReq = RequestStream< struct ExclusionSafetyCheckRequest >( commit.getEndpoint().getAdjustedEndpoint(9) );
getDDMetrics = RequestStream< struct GetDDMetricsRequest >( commit.getEndpoint().getAdjustedEndpoint(10) );
txnState = RequestStream< struct TxnStateRequest >( commit.getEndpoint().getAdjustedEndpoint(5) );
getHealthMetrics = RequestStream< struct GetHealthMetricsRequest >( commit.getEndpoint().getAdjustedEndpoint(6) );
proxySnapReq = RequestStream< struct ProxySnapRequest >( commit.getEndpoint().getAdjustedEndpoint(7) );
exclusionSafetyCheckReq = RequestStream< struct ExclusionSafetyCheckRequest >( commit.getEndpoint().getAdjustedEndpoint(8) );
getDDMetrics = RequestStream< struct GetDDMetricsRequest >( commit.getEndpoint().getAdjustedEndpoint(9) );
}
}
@ -87,7 +85,6 @@ struct MasterProxyInterface {
streams.push_back(getKeyServersLocations.getReceiver(TaskPriority::ReadSocket)); //priority lowered to TaskPriority::DefaultEndpoint on the proxy
streams.push_back(getStorageServerRejoinInfo.getReceiver(TaskPriority::ProxyStorageRejoin));
streams.push_back(waitFailure.getReceiver());
streams.push_back(getRawCommittedVersion.getReceiver(TaskPriority::ProxyGetRawCommittedVersion));
streams.push_back(txnState.getReceiver());
streams.push_back(getHealthMetrics.getReceiver());
streams.push_back(proxySnapReq.getReceiver());

View File

@ -3888,11 +3888,11 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
}
ACTOR Future<Version> extractReadVersion(SpanID parentSpan, DatabaseContext* cx, TransactionPriority priority,
ACTOR Future<Version> extractReadVersion(Location location, SpanID spanContext, SpanID parent, DatabaseContext* cx, TransactionPriority priority,
Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f,
bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion,
TagSet tags) {
// parentSpan here is only used to keep the parent alive until the request completes
state Span span(spanContext, location, { parent });
GetReadVersionReply rep = wait(f);
double latency = now() - startTime;
cx->GRVLatencies.addSample(latency);
@ -4014,12 +4014,13 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), options.priority, flags );
}
Span span("NAPI:getReadVersion"_loc, info.spanID);
auto const req = DatabaseContext::VersionRequest(span.context, options.tags, info.debugID);
Location location = "NAPI:getReadVersion"_loc;
UID spanContext = deterministicRandom()->randomUniqueID();
auto const req = DatabaseContext::VersionRequest(spanContext, options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion(span.context, cx.getPtr(), options.priority, trLogInfo, req.reply.getFuture(),
options.lockAware, startTime, metadataVersion, options.tags);
readVersion = extractReadVersion(location, spanContext, info.spanID, cx.getPtr(), options.priority, trLogInfo,
req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags);
}
return readVersion;
}

View File

@ -1391,7 +1391,7 @@ Future< Standalone<VectorRef<const char*> >> ReadYourWritesTransaction::getAddre
return result;
}
Future<int64_t> ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) {
Future<int64_t> ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyRange& keys) {
if(checkUsedDuringCommit()) {
throw used_during_commit();
}

View File

@ -86,7 +86,7 @@ public:
}
[[nodiscard]] Future<Standalone<VectorRef<const char*>>> getAddressesForKey(const Key& key);
Future<int64_t> getEstimatedRangeSizeBytes( const KeyRangeRef& keys );
Future<int64_t> getEstimatedRangeSizeBytes(const KeyRange& keys);
void addReadConflictRange( KeyRangeRef const& keys );
void makeSelfConflicting() { tr.makeSelfConflicting(); }

View File

@ -45,9 +45,14 @@ Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInf
// It is incredibly important that any modifications to txnStateStore are done in such a way that
// the same operations will be done on all proxies at the same time. Otherwise, the data stored in
// txnStateStore will become corrupted.
void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference<ILogSystem> logSystem, Version popVersion,
KeyRangeMap<std::set<Key> >* vecBackupKeys, KeyRangeMap<ServerCacheInfo>* keyInfo, KeyRangeMap<bool>* cacheInfo, std::map<Key, applyMutationsData>* uid_applyMutationsData, RequestStream<CommitTransactionRequest> commit,
Database cx, NotifiedVersion* commitVersion, std::map<UID, Reference<StorageInfo>>* storageCache, std::map<Tag, Version>* tag_popped, bool initialCommit ) {
void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRef> const& mutations,
IKeyValueStore* txnStateStore, LogPushData* toCommit, bool& confChange,
Reference<ILogSystem> logSystem, Version popVersion,
KeyRangeMap<std::set<Key>>* vecBackupKeys, KeyRangeMap<ServerCacheInfo>* keyInfo,
KeyRangeMap<bool>* cacheInfo, std::map<Key, ApplyMutationsData>* uid_applyMutationsData,
RequestStream<CommitTransactionRequest> commit, Database cx, NotifiedVersion* commitVersion,
std::map<UID, Reference<StorageInfo>>* storageCache, std::map<Tag, Version>* tag_popped,
bool initialCommit) {
//std::map<keyRef, vector<uint16_t>> cacheRangeInfo;
std::map<KeyRef, MutationRef> cachedRangeInfo;
for (auto const& m : mutations) {
@ -175,7 +180,7 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
.detail("M", m.toString())
.detail("PrevValue", t.present() ? t.get() : LiteralStringRef("(none)"))
.detail("ToCommit", toCommit!=nullptr);
if(confChange) *confChange = true;
confChange = true;
}
}
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
@ -293,7 +298,7 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
Version requested = BinaryReader::fromStringRef<Version>(m.param2, Unversioned());
TraceEvent("MinRequiredCommitVersion", dbgid).detail("Min", requested).detail("Current", popVersion).detail("HasConf", !!confChange);
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
if (confChange) *confChange = true;
confChange = true;
TEST(true); // Recovering at a higher version.
}
}
@ -313,7 +318,7 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
if(!initialCommit) txnStateStore->clear(range & configKeys);
if(!excludedServersKeys.contains(range) && !failedServersKeys.contains(range)) {
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString());
if(confChange) *confChange = true;
confChange = true;
}
}
if ( serverListKeys.intersects( range )) {
@ -329,11 +334,14 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
auto serverKeysCleared = txnStateStore->readRange( range & serverTagKeys ).get(); // read is expected to be immediately available
for(auto &kv : serverKeysCleared) {
Tag tag = decodeServerTagValue(kv.value);
TraceEvent("ServerTagRemove").detail("PopVersion", popVersion).detail("Tag", tag.toString()).detail("Server", decodeServerTagKey(kv.key));
logSystem->pop( popVersion, decodeServerTagValue(kv.value) );
TraceEvent("ServerTagRemove")
.detail("PopVersion", popVersion)
.detail("Tag", tag.toString())
.detail("Server", decodeServerTagKey(kv.key));
logSystem->pop(popVersion, decodeServerTagValue(kv.value));
(*tag_popped)[tag] = popVersion;
if(toCommit) {
if (toCommit) {
MutationRef privatized = m;
privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena);
privatized.param2 = keyAfter(kv.key, arena).withPrefix(systemKeys.begin, arena);
@ -536,3 +544,31 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
}
}
}
void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations, LogPushData* toCommit, bool& confChange,
Version popVersion, bool initialCommit) {
std::map<Key, ApplyMutationsData>* uid_applyMutationsData = nullptr;
if (proxyCommitData.firstProxy) {
uid_applyMutationsData = &proxyCommitData.uid_applyMutationsData;
}
applyMetadataMutations(proxyCommitData.dbgid, arena, mutations, proxyCommitData.txnStateStore, toCommit, confChange,
logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo,
&proxyCommitData.cacheInfo, uid_applyMutationsData, proxyCommitData.commit,
proxyCommitData.cx, &proxyCommitData.committedVersion, &proxyCommitData.storageCache,
&proxyCommitData.tag_popped, initialCommit);
}
void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef<MutationRef>& mutations,
IKeyValueStore* txnStateStore) {
bool confChange; // Dummy variable, not used.
applyMetadataMutations(dbgid, arena, mutations, txnStateStore, /* toCommit= */ nullptr, confChange,
Reference<ILogSystem>(), /* popVersion= */ 0, /* vecBackupKeys= */ nullptr,
/* keyInfo= */ nullptr, /* cacheInfo= */ nullptr, /* uid_applyMutationsData= */ nullptr,
RequestStream<CommitTransactionRequest>(), Database(), /* commitVersion= */ nullptr,
/* storageCache= */ nullptr, /* tag_popped= */ nullptr, /* initialCommit= */ false);
}

View File

@ -29,6 +29,7 @@
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/ProxyCommitData.actor.h"
inline bool isMetadataMutation(MutationRef const& m) {
// FIXME: This is conservative - not everything in system keyspace is necessarily processed by applyMetadataMutations
@ -36,16 +37,12 @@ inline bool isMetadataMutation(MutationRef const& m) {
(m.type == MutationRef::ClearRange && m.param2.size() && m.param2[0] == systemKeys.begin[0] && !nonMetadataSystemKeys.contains(KeyRangeRef(m.param1, m.param2)) );
}
struct applyMutationsData {
Future<Void> worker;
Version endVersion;
Reference<KeyRangeMap<Version>> keyVersion;
};
Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInfo>>* storageCache, IKeyValueStore* txnStateStore);
void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference<ILogSystem> logSystem = Reference<ILogSystem>(), Version popVersion = 0,
KeyRangeMap<std::set<Key> >* vecBackupKeys = nullptr, KeyRangeMap<ServerCacheInfo>* keyInfo = nullptr, KeyRangeMap<bool>* cacheInfo = nullptr, std::map<Key, applyMutationsData>* uid_applyMutationsData = nullptr, RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>(),
Database cx = Database(), NotifiedVersion* commitVersion = nullptr, std::map<UID, Reference<StorageInfo>>* storageCache = nullptr, std::map<Tag, Version>* tag_popped = nullptr, bool initialCommit = false );
void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations, LogPushData* pToCommit, bool& confChange,
Version popVersion, bool initialCommit);
void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef<MutationRef>& mutations,
IKeyValueStore* txnStateStore);
#endif

View File

@ -57,6 +57,7 @@ set(FDBSERVER_SRCS
OldTLogServer_6_0.actor.cpp
OldTLogServer_6_2.actor.cpp
Orderer.actor.h
ProxyCommitData.actor.h
pubsub.actor.cpp
pubsub.h
QuietDatabase.actor.cpp

View File

@ -365,7 +365,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( TXN_STATE_SEND_AMOUNT, 4 );
init( ASK_READ_VERSION_FROM_MASTER, true );
// Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)

View File

@ -295,7 +295,6 @@ public:
int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE;
int TXN_STATE_SEND_AMOUNT;
bool ASK_READ_VERSION_FROM_MASTER;
// Master Server
double COMMIT_SLEEP_TIME;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,269 @@
/*
* ProxyCommitData.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_PROXYCOMMITDATA_ACTOR_G_H)
#define FDBSERVER_PROXYCOMMITDATA_ACTOR_G_H
#include "fdbserver/ProxyCommitData.actor.g.h"
#elif !defined(FDBSERVER_PROXYCOMMITDATA_ACTOR_H)
#define FDBSERVER_PROXYCOMMITDATA_ACTOR_H
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "flow/IRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
DESCR struct SingleKeyMutation {
Standalone<StringRef> shardBegin;
Standalone<StringRef> shardEnd;
int64_t tag1;
int64_t tag2;
int64_t tag3;
};
struct ApplyMutationsData {
Future<Void> worker;
Version endVersion;
Reference<KeyRangeMap<Version>> keyVersion;
};
struct ProxyStats {
CounterCollection cc;
Counter txnRequestIn, txnRequestOut, txnRequestErrors;
Counter txnStartIn, txnStartOut, txnStartBatch;
Counter txnSystemPriorityStartIn, txnSystemPriorityStartOut;
Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut;
Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut,
txnCommitOutSuccess, txnCommitErrors;
Counter txnConflicts;
Counter txnThrottled;
Counter commitBatchIn, commitBatchOut;
Counter mutationBytes;
Counter mutations;
Counter conflictRanges;
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
Version lastCommitVersionAssigned;
LatencySample commitLatencySample;
LatencySample grvLatencySample;
LatencyBands commitLatencyBands;
LatencyBands grvLatencyBands;
Future<Void> logger;
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
void updateRequestBuckets() {
while (now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval;
recentRequests -= requestBuckets.front();
requestBuckets.pop_front();
requestBuckets.push_back(0);
}
}
void addRequest() {
updateRequestBuckets();
++recentRequests;
++requestBuckets.back();
}
int getRecentRequests() {
updateRequestBuckets();
return recentRequests * FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE /
(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE - (lastBucketBegin + bucketInterval - now()));
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc),
txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount",
[commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for (int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
}
}
};
struct ProxyCommitData {
UID dbgid;
int64_t commitBatchesMemBytesCount;
ProxyStats stats;
MasterInterface master;
vector<ResolverInterface> resolvers;
LogSystemDiskQueueAdapter* logAdapter;
Reference<ILogSystem> logSystem;
IKeyValueStore* txnStateStore;
NotifiedVersion committedVersion; // Provided that this recovery has succeeded or will succeed, this version is
// fully committed (durable)
Version minKnownCommittedVersion; // No version smaller than this one will be used as the known committed version
// during recovery
Version version; // The version at which txnStateStore is up to date
Promise<Void> validState; // Set once txnStateStore and version are valid
double lastVersionTime;
KeyRangeMap<std::set<Key>> vecBackupKeys;
uint64_t commitVersionRequestNumber;
uint64_t mostRecentProcessedRequestNumber;
KeyRangeMap<Deque<std::pair<Version, int>>> keyResolvers;
KeyRangeMap<ServerCacheInfo> keyInfo;
KeyRangeMap<bool> cacheInfo;
std::map<Key, ApplyMutationsData> uid_applyMutationsData;
bool firstProxy;
double lastCoalesceTime;
bool locked;
Optional<Value> metadataVersion;
double commitBatchInterval;
int64_t localCommitBatchesStarted;
NotifiedVersion latestLocalCommitBatchResolving;
NotifiedVersion latestLocalCommitBatchLogging;
RequestStream<GetReadVersionRequest> getConsistentReadVersion;
RequestStream<CommitTransactionRequest> commit;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db;
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Reference<StorageInfo>> storageCache;
std::map<Tag, Version> tag_popped;
Deque<std::pair<Version, Version>> txsPopVersions;
Version lastTxsPop;
bool popRemoteTxs;
vector<Standalone<StringRef>> whitelistedBinPathVec;
Optional<LatencyBandConfig> latencyBandConfig;
double lastStartCommit;
double lastCommitLatency;
int updateCommitRequests = 0;
NotifiedDouble lastCommitTime;
vector<double> commitComputePerOperation;
TransactionTagMap<TransactionCommitCostEstimation> transactionTagCommitCostEst;
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
// signify they must be repopulated. We do not repopulate them immediately to avoid a slow task.
const vector<Tag>& tagsForKey(StringRef key) {
auto& tags = keyInfo[key].tags;
if (!tags.size()) {
auto& r = keyInfo.rangeContaining(key).value();
for (auto info : r.src_info) {
r.tags.push_back(info->tag);
}
for (auto info : r.dest_info) {
r.tags.push_back(info->tag);
}
uniquify(r.tags);
return r.tags;
}
return tags;
}
bool needsCacheTag(KeyRangeRef range) {
auto ranges = cacheInfo.intersectingRanges(range);
for (auto r : ranges) {
if (r.value()) {
return true;
}
}
return false;
}
void updateLatencyBandConfig(Optional<LatencyBandConfig> newLatencyBandConfig) {
if (newLatencyBandConfig.present() != latencyBandConfig.present() ||
(newLatencyBandConfig.present() &&
newLatencyBandConfig.get().grvConfig != latencyBandConfig.get().grvConfig)) {
TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.grvLatencyBands.clearBands();
if (newLatencyBandConfig.present()) {
for (auto band : newLatencyBandConfig.get().grvConfig.bands) {
stats.grvLatencyBands.addThreshold(band);
}
}
}
if (newLatencyBandConfig.present() != latencyBandConfig.present() ||
(newLatencyBandConfig.present() &&
newLatencyBandConfig.get().commitConfig != latencyBandConfig.get().commitConfig)) {
TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.commitLatencyBands.clearBands();
if (newLatencyBandConfig.present()) {
for (auto band : newLatencyBandConfig.get().commitConfig.bands) {
stats.commitLatencyBands.addThreshold(band);
}
}
}
latencyBandConfig = newLatencyBandConfig;
}
ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit,
Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
logAdapter(NULL), txnStateStore(NULL), popRemoteTxs(false), committedVersion(recoveryTransactionVersion),
version(0), minKnownCommittedVersion(0), lastVersionTime(0), commitVersionRequestNumber(1),
mostRecentProcessedRequestNumber(0), getConsistentReadVersion(getConsistentReadVersion), commit(commit),
lastCoalesceTime(0), localCommitBatchesStarted(0), locked(false),
commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), firstProxy(firstProxy),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0),
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
}
};
#include "flow/unactorcompiler.h"
#endif // FDBSERVER_PROXYCOMMITDATA_H

View File

@ -18,31 +18,34 @@
* limitations under the License.
*/
#include "flow/ActorCollection.h"
#include "fdbrpc/PerfMetric.h"
#include "flow/Trace.h"
#include "fdbrpc/FailureMonitor.h"
#include <iterator>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/Knobs.h"
#include <iterator>
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/PerfMetric.h"
#include "fdbrpc/sim_validation.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/BackupProgress.actor.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/CoordinatedState.h"
#include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
#include "fdbrpc/sim_validation.h"
#include "fdbserver/DBCoreState.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/ProxyCommitData.actor.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using std::vector;
@ -1561,7 +1564,8 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
}
}
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), self->txnStateStore, nullptr, nullptr);
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()),
self->txnStateStore);
mmApplied = tr.mutations.size();
tr.read_snapshot = self->recoveryTransactionVersion; // lastEpochEnd would make more sense, but isn't in the initial window of the resolver(s)

View File

@ -1379,7 +1379,6 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN(recruited.getKeyServersLocations);
DUMPTOKEN(recruited.getStorageServerRejoinInfo);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getRawCommittedVersion);
DUMPTOKEN(recruited.txnState);
//printf("Recruited as masterProxyServer\n");

View File

@ -23,6 +23,7 @@
#pragma once
#include <atomic>
#include <array>
#include "flow/Error.h"
#include "flow/Trace.h"
@ -44,7 +45,11 @@
#include <drd.h>
#endif
class ThreadSpinLock {
// TODO: We should make this dependent on the CPU. Maybe cmake
// can set this variable properly?
constexpr size_t CACHE_LINE_SIZE = 64;
class alignas(CACHE_LINE_SIZE) ThreadSpinLock {
public:
// #ifdef _WIN32
ThreadSpinLock() {
@ -83,6 +88,9 @@ private:
ThreadSpinLock(const ThreadSpinLock&);
void operator=(const ThreadSpinLock&);
std::atomic_flag isLocked = ATOMIC_FLAG_INIT;
// We want a spin lock to occupy a cache line in order to
// prevent false sharing.
std::array<uint8_t, CACHE_LINE_SIZE - sizeof(isLocked)> padding;
};
class ThreadSpinLockHolder {