contrib/transactionÂ_profilingÂ_analyzer Applied flake8

This commit is contained in:
Lukas Molleman 2022-11-22 19:20:38 +01:00
parent 0b7baf4437
commit 71585dee62
2 changed files with 85 additions and 67 deletions

View File

@ -28,7 +28,6 @@ optional packages:
sortedcontainers (for estimating key range read/write density)
"""
import argparse
from collections import defaultdict
from enum import Enum
@ -55,7 +54,6 @@ supported_protocol_versions = frozenset([PROTOCOL_VERSION_5_2, PROTOCOL_VERSION_
PROTOCOL_VERSION_6_2, PROTOCOL_VERSION_6_3, PROTOCOL_VERSION_7_0,
PROTOCOL_VERSION_7_1, PROTOCOL_VERSION_7_2])
fdb.api_version(520)
BASIC_FORMAT = "%(asctime)s - %(levelname)-8s %(message)s"
@ -188,6 +186,7 @@ class BaseInfo(object):
"""
Corresponds to FdbClientLogEvents::Event
"""
def __init__(self, bb, protocol_version):
# we already read the EventType, so go straight to start_timestamp
self.start_timestamp = bb.get_double()
@ -197,6 +196,7 @@ class BaseInfo(object):
if bb.get_bool():
self.tenant = bb.get_bytes_with_length()
class GetVersionInfo(BaseInfo):
def __init__(self, bb, protocol_version):
super().__init__(bb, protocol_version)
@ -206,6 +206,7 @@ class GetVersionInfo(BaseInfo):
if protocol_version >= PROTOCOL_VERSION_6_3:
self.read_version = bb.get_long()
class GetInfo(BaseInfo):
def __init__(self, bb, protocol_version):
super().__init__(bb, protocol_version)
@ -244,11 +245,6 @@ class CommitInfo(BaseInfo):
self.read_snapshot_version = bb.get_long()
if protocol_version >= PROTOCOL_VERSION_6_3:
self.report_conflicting_keys = bb.get_bool()
if protocol_version >= PROTOCOL_VERSION_7_1:
lock_aware = bb.get_bool()
if bb.get_bool():
spanId = bb.get_bytes(16)
class ErrorGetInfo(BaseInfo):
@ -284,11 +280,6 @@ class ErrorCommitInfo(BaseInfo):
if protocol_version >= PROTOCOL_VERSION_6_3:
self.report_conflicting_keys = bb.get_bool()
if protocol_version >= PROTOCOL_VERSION_7_1:
lock_aware = bb.get_bool()
if bb.get_bool():
spanId = bb.get_bytes(16)
class UnsupportedProtocolVersionError(Exception):
def __init__(self, protocol_version):
@ -314,52 +305,57 @@ class ClientTransactionInfo:
if event == 0:
# we need to read it to consume the buffer even if we don't want to store it
get_version = GetVersionInfo(bb, protocol_version)
if (not type_filter or "get_version" in type_filter):
if not type_filter or "get_version" in type_filter:
self.get_version = get_version
elif event == 1:
get = GetInfo(bb, protocol_version)
if (not type_filter or "get" in type_filter):
if not type_filter or "get" in type_filter:
# because of the crappy json serializtion using __dict__ we have to set the list here otherwise
# it doesn't print
if not self.gets: self.gets = []
if not self.gets:
self.gets = []
self.gets.append(get)
elif event == 2:
get_range = GetRangeInfo(bb, protocol_version)
if (not type_filter or "get_range" in type_filter):
if not self.get_ranges: self.get_ranges = []
if not type_filter or "get_range" in type_filter:
if not self.get_ranges:
self.get_ranges = []
self.get_ranges.append(get_range)
elif event == 3:
commit = CommitInfo(bb, protocol_version, full_output=full_output)
if (not type_filter or "commit" in type_filter):
if not type_filter or "commit" in type_filter:
self.commit = commit
elif event == 4:
error_get = ErrorGetInfo(bb, protocol_version)
if (not type_filter or "error_gets" in type_filter):
if not self.error_gets: self.error_gets = []
if not type_filter or "error_gets" in type_filter:
if not self.error_gets:
self.error_gets = []
self.error_gets.append(error_get)
elif event == 5:
error_get_range = ErrorGetRangeInfo(bb, protocol_version)
if (not type_filter or "error_get_range" in type_filter):
if not self.error_get_ranges: self.error_get_ranges = []
if not type_filter or "error_get_range" in type_filter:
if not self.error_get_ranges:
self.error_get_ranges = []
self.error_get_ranges.append(error_get_range)
elif event == 6:
error_commit = ErrorCommitInfo(bb, protocol_version, full_output=full_output)
if (not type_filter or "error_commit" in type_filter):
if not self.error_commits: self.error_commits = []
if not type_filter or "error_commit" in type_filter:
if not self.error_commits:
self.error_commits = []
self.error_commits.append(error_commit)
else:
raise Exception("Unknown event type %d" % event)
def has_types(self):
return self.get_version or self.gets or self.get_ranges or self.commit or self.error_gets \
or self.error_get_ranges or self.error_commits
return self.get_version or self.gets or self.get_ranges or self.commit \
or self.error_gets or self.error_get_ranges or self.error_commits
def to_json(self):
return json.dumps(self, cls=ObjJsonEncoder, sort_keys=True)
class TransactionInfoLoader(object):
max_num_chunks_to_store = 1000 # Each chunk would be 100 KB in size
max_num_chunks_to_store = 1000 # Each chunk would be 100 KB in size
def __init__(self, db, full_output=True, type_filter=None, min_timestamp=None, max_timestamp=None):
self.db = db
@ -433,7 +429,7 @@ class TransactionInfoLoader(object):
reverse = False
for k, v in tr.snapshot.get_range(start_key, end_key, limit=1, reverse=reverse):
return fdb.tuple.unpack(v)[0]
return 0 if start else 0x8000000000000000 # we didn't find any timekeeper data so find the max range
return 0 if start else 0x8000000000000000 # we didn't find any timekeeper data so find the max range
def fetch_transaction_info(self):
if self.min_timestamp:
@ -469,12 +465,12 @@ class TransactionInfoLoader(object):
streaming_mode=fdb.impl.StreamingMode.want_all)
for k, v in transaction_info_range:
found += 1
#logger.debug(k)
# logger.debug(k)
start_key = fdb.KeySelector.first_greater_than(k)
_, tr_id, num_chunks, chunk_num = self.parse_key(k)
#logger.debug("num_chunks=%d, chunk_num=%d" % (num_chunks,chunk_num))
# logger.debug("num_chunks=%d, chunk_num=%d" % (num_chunks,chunk_num))
if num_chunks == 1:
assert chunk_num == 1
@ -482,7 +478,7 @@ class TransactionInfoLoader(object):
info = build_client_transaction_info(v)
if info.has_types():
buffer.append(info)
except UnsupportedProtocolVersionError as e:
except UnsupportedProtocolVersionError:
invalid_transaction_infos += 1
except ValueError:
invalid_transaction_infos += 1
@ -497,7 +493,8 @@ class TransactionInfoLoader(object):
self._check_and_adjust_chunk_cache_size()
else:
if tr_id not in self.tr_info_map:
logger.error("Got a middle chunk without getting beginning part. Discarding transaction id: %s\n" % tr_id)
logger.error(
"Got a middle chunk without getting beginning part. Discarding transaction id: %s\n" % tr_id)
continue
c_list = self.tr_info_map[tr_id]
if c_list[-1].num_chunks != num_chunks or c_list[-1].chunk_num != chunk_num - 1:
@ -513,7 +510,7 @@ class TransactionInfoLoader(object):
info = build_client_transaction_info(b''.join([chunk.value for chunk in c_list]))
if info.has_types():
buffer.append(info)
except UnsupportedProtocolVersionError as e:
except UnsupportedProtocolVersionError:
invalid_transaction_infos += 1
except ValueError:
invalid_transaction_infos += 1
@ -553,6 +550,7 @@ def has_dateparser():
logger.warn("Can't find dateparser so disabling human date parsing")
return False
class ReadCounter(object):
def __init__(self):
from sortedcontainers import SortedDict
@ -560,7 +558,7 @@ class ReadCounter(object):
self.reads[b''] = [0, 0]
self.read_counts = {}
self.hit_count=0
self.hit_count = 0
def process(self, transaction_info):
for get in transaction_info.gets:
@ -576,7 +574,7 @@ class ReadCounter(object):
if end_key is not None:
self.reads.setdefault(end_key, [0, 0])[1] += 1
else:
self.reads.setdefault(start_key+b'\x00', [0, 0])[1] += 1
self.reads.setdefault(start_key + b'\x00', [0, 0])[1] += 1
def get_total_reads(self):
return sum([v for v in self.read_counts.values()])
@ -673,8 +671,8 @@ class ShardFinder(object):
self.shard_cache = {}
def _get_boundary_keys(self, begin, end):
start_pos = max(0, bisect_right(self.boundary_keys, begin)-1)
end_pos = max(0, bisect_right(self.boundary_keys, end)-1)
start_pos = max(0, bisect_right(self.boundary_keys, begin) - 1)
end_pos = max(0, bisect_right(self.boundary_keys, end) - 1)
return self.boundary_keys[start_pos:end_pos]
@ -691,9 +689,9 @@ class ShardFinder(object):
return len(self._get_boundary_keys(start_key, end_key)) + 1
def get_addresses_for_key(self, key):
shard = self.boundary_keys[max(0, bisect_right(self.boundary_keys, key)-1)]
shard = self.boundary_keys[max(0, bisect_right(self.boundary_keys, key) - 1)]
do_load = False
if not shard in self.shard_cache:
if shard not in self.shard_cache:
do_load = True
elif self.shard_cache[shard].is_ready():
try:
@ -708,7 +706,7 @@ class ShardFinder(object):
for f in self.outstanding:
try:
f.wait()
except fdb.FDBError as e:
except fdb.FDBError:
pass
self.outstanding = []
@ -726,10 +724,13 @@ class ShardFinder(object):
if item[addr_idx] is not None:
while True:
try:
ranges[index] = item[0:addr_idx] + ([a.decode('ascii') for a in item[addr_idx].wait()],) + item[addr_idx+1:]
ranges[index] = item[0:addr_idx] + ([a.decode('ascii') for a in item[addr_idx].wait()],) \
+ item[addr_idx + 1:]
break
except fdb.FDBError as e:
ranges[index] = item[0:addr_idx] + (self.get_addresses_for_key(item[key_idx]),) + item[addr_idx+1:]
except fdb.FDBError:
ranges[index] = item[0:addr_idx] + (self.get_addresses_for_key(item[key_idx]),) \
+ item[addr_idx + 1:]
class WriteCounter(object):
mutation_types_to_consider = frozenset([MutationType.SET_VALUE, MutationType.ADD_VALUE])
@ -795,10 +796,11 @@ class WriteCounter(object):
filter_addresses = set(filter_addresses)
results = [r for r in results if filter_addresses.issubset(set(r[3]))][0:num]
else:
results = [(key, end, count) for (count, key) in count_pairs[0:num]]
results = [(key, count) for (count, key) in count_pairs[0:num]]
return results
def connect(cluster_file=None):
db = fdb.open(cluster_file=cluster_file)
return db
@ -831,22 +833,34 @@ def main():
end_time_group = parser.add_mutually_exclusive_group()
end_time_group.add_argument("--max-timestamp", type=int, help="Don't return events newer than this epoch time")
end_time_group.add_argument("-e", "--end-time", type=str, help="Don't return events older than this parsed time")
parser.add_argument("--num-buckets", type=int, help="The number of buckets to partition the key-space into for operation counts", default=100)
parser.add_argument("--top-requests", type=int, help="If specified will output this many top keys for reads or writes", default=0)
parser.add_argument("--exclude-ports", action="store_true", help="Print addresses without the port number. Only works in versions older than 6.3, and is required in versions older than 6.2.")
parser.add_argument("--single-shard-ranges-only", action="store_true", help="Only print range boundaries that exist in a single shard")
parser.add_argument("-a", "--filter-address", action="append", help="Only print range boundaries that include the given address. This option can used multiple times to include more than one address in the filter, in which case all addresses must match.")
parser.add_argument("--num-buckets", type=int,
help="The number of buckets to partition the key-space into for operation counts", default=100)
parser.add_argument("--top-requests", type=int,
help="If specified will output this many top keys for reads or writes", default=0)
parser.add_argument("--exclude-ports", action="store_true",
help="Print addresses without the port number. Only works in versions older than 6.3, and is required in versions older than 6.2.")
parser.add_argument("--single-shard-ranges-only", action="store_true",
help="Only print range boundaries that exist in a single shard")
parser.add_argument("-a", "--filter-address", action="append",
help="Only print range boundaries that include the given address. This option can used multiple times to include more than one address in the filter, in which case all addresses must match.")
args = parser.parse_args()
type_filter = set()
if args.filter_get_version: type_filter.add("get_version")
if args.filter_get or args.filter_reads: type_filter.add("get")
if args.filter_get_range or args.filter_reads: type_filter.add("get_range")
if args.filter_commit: type_filter.add("commit")
if args.filter_error_get: type_filter.add("error_get")
if args.filter_error_get_range: type_filter.add("error_get_range")
if args.filter_error_commit: type_filter.add("error_commit")
if args.filter_get_version:
type_filter.add("get_version")
if args.filter_get or args.filter_reads:
type_filter.add("get")
if args.filter_get_range or args.filter_reads:
type_filter.add("get_range")
if args.filter_commit:
type_filter.add("commit")
if args.filter_error_get:
type_filter.add("error_get")
if args.filter_error_get_range:
type_filter.add("error_get_range")
if args.filter_error_commit:
type_filter.add("error_commit")
if (not type_filter or "commit" in type_filter):
write_counter = WriteCounter() if args.num_buckets else None
@ -912,7 +926,8 @@ def main():
else:
op_str = 'Key %r' % start
print(" %d. %s\n %d sampled %s (%.2f%%, %.2f%% cumulative)" % (idx+1, op_str, count, context, 100*count/total, 100*running_count/total))
print(" %d. %s\n %d sampled %s (%.2f%%, %.2f%% cumulative)" % (
idx + 1, op_str, count, context, 100 * count / total, 100 * running_count / total))
print(" shard addresses: %s\n" % ", ".join(addresses))
else:
@ -933,10 +948,10 @@ def main():
if not omit:
if omit_start is not None:
if omit_start == idx-1:
if omit_start == idx - 1:
print(" %d. Omitted\n" % (idx))
else:
print(" %d - %d. Omitted\n" % (omit_start+1, idx))
print(" %d - %d. Omitted\n" % (omit_start + 1, idx))
omit_start = None
if total_count is None:
@ -944,18 +959,19 @@ def main():
else:
count_str = '%d sampled %s (%d intersecting)' % (start_count, context, total_count)
if not shard_count:
print(" %d. [%s, %s]\n %d sampled %s\n" % (idx+1, start, end, count, context))
print(" %d. [%s, %s]\n %d sampled %s\n" % (idx + 1, start, end, total_count, context))
else:
addresses_string = "; addresses=%s" % ', '.join(addresses) if addresses else ''
print(" %d. [%s, %s]\n %s spanning %d shard(s)%s\n" % (idx+1, start, end, count_str, shard_count, addresses_string))
print(" %d. [%s, %s]\n %s spanning %d shard(s)%s\n" % (
idx + 1, start, end, count_str, shard_count, addresses_string))
elif omit_start is None:
omit_start = idx
if omit_start is not None:
if omit_start == len(range_boundaries)-1:
if omit_start == len(range_boundaries) - 1:
print(" %d. Omitted\n" % len(range_boundaries))
else:
print(" %d - %d. Omitted\n" % (omit_start+1, len(range_boundaries)))
print(" %d - %d. Omitted\n" % (omit_start + 1, len(range_boundaries)))
shard_finder = ShardFinder(db, args.exclude_ports)
@ -963,7 +979,8 @@ def main():
if write_counter:
if args.top_requests:
top_writes = write_counter.get_top_k_writes(args.top_requests, args.filter_address, shard_finder=shard_finder)
top_writes = write_counter.get_top_k_writes(args.top_requests, args.filter_address,
shard_finder=shard_finder)
range_boundaries = write_counter.get_range_boundaries(args.num_buckets, shard_finder=shard_finder)
num_writes = write_counter.get_total_writes()
@ -1014,5 +1031,6 @@ def main():
print("Key-space boundaries with approximately equal read counts:\n")
print_range_boundaries(range_boundaries, "reads")
if __name__ == "__main__":
main()

View File

@ -105,8 +105,8 @@ class RangeCounterTest(unittest.TestCase):
assert rc_count == v, "Counts for %s mismatch. Expected %d got %d" % (k, v, rc_count)
for _ in range(0, 100):
i = random.randint(0, len(letters)-1)
j = random.randint(0, len(letters)-2)
i = random.randint(0, len(letters) - 1)
j = random.randint(0, len(letters) - 2)
if i == j:
j += 1
start_index = min(i, j)
@ -123,4 +123,4 @@ class RangeCounterTest(unittest.TestCase):
if __name__ == "__main__":
unittest.main() # run all tests
unittest.main() # run all tests