diff --git a/CMakeLists.txt b/CMakeLists.txt index 00bdde8e1e..fa762949a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ # limitations under the License. cmake_minimum_required(VERSION 3.13) project(foundationdb - VERSION 6.3.0 + VERSION 7.0.0 DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions." HOMEPAGE_URL "http://www.foundationdb.org/" LANGUAGES C CXX ASM) diff --git a/contrib/transaction_profiling_analyzer/transaction_profiling_analyzer.py b/contrib/transaction_profiling_analyzer/transaction_profiling_analyzer.py index 5374475bed..58e2cf2548 100755 --- a/contrib/transaction_profiling_analyzer/transaction_profiling_analyzer.py +++ b/contrib/transaction_profiling_analyzer/transaction_profiling_analyzer.py @@ -39,7 +39,9 @@ from json import JSONEncoder import logging import struct from bisect import bisect_left +from bisect import bisect_right import time +import datetime PROTOCOL_VERSION_5_2 = 0x0FDB00A552000001 PROTOCOL_VERSION_6_0 = 0x0FDB00A570010001 @@ -414,7 +416,7 @@ class TransactionInfoLoader(object): else: end_key = self.client_latency_end_key_selector - valid_transaction_infos = 0 + transaction_infos = 0 invalid_transaction_infos = 0 def build_client_transaction_info(v): @@ -446,11 +448,12 @@ class TransactionInfoLoader(object): info = build_client_transaction_info(v) if info.has_types(): buffer.append(info) - valid_transaction_infos += 1 except UnsupportedProtocolVersionError as e: invalid_transaction_infos += 1 except ValueError: invalid_transaction_infos += 1 + + transaction_infos += 1 else: if chunk_num == 1: # first chunk @@ -476,14 +479,15 @@ class TransactionInfoLoader(object): info = build_client_transaction_info(b''.join([chunk.value for chunk in c_list])) if info.has_types(): buffer.append(info) - valid_transaction_infos += 1 except UnsupportedProtocolVersionError as e: invalid_transaction_infos += 1 except ValueError: invalid_transaction_infos += 1 + + transaction_infos += 1 self._check_and_adjust_chunk_cache_size() - if (valid_transaction_infos + invalid_transaction_infos) % 1000 == 0: - print("Processed valid: %d, invalid: %d" % (valid_transaction_infos, invalid_transaction_infos)) + if transaction_infos % 1000 == 0: + print("Processed %d transactions, %d invalid" % (transaction_infos, invalid_transaction_infos)) if found == 0: more = False except fdb.FDBError as e: @@ -495,13 +499,15 @@ class TransactionInfoLoader(object): for item in buffer: yield item + print("Processed %d transactions, %d invalid\n" % (transaction_infos, invalid_transaction_infos)) + def has_sortedcontainers(): try: import sortedcontainers return True except ImportError: - logger.warn("Can't find sortedcontainers so disabling RangeCounter") + logger.warn("Can't find sortedcontainers so disabling ReadCounter") return False @@ -513,155 +519,197 @@ def has_dateparser(): logger.warn("Can't find dateparser so disabling human date parsing") return False - -class RangeCounter(object): - def __init__(self, k): - self.k = k +class ReadCounter(object): + def __init__(self): from sortedcontainers import SortedDict - self.ranges = SortedDict() + self.reads = SortedDict() + self.reads[b''] = [0, 0] + + self.read_counts = {} + self.hit_count=0 def process(self, transaction_info): + for get in transaction_info.gets: + self._insert_read(get.key, None) for get_range in transaction_info.get_ranges: - self._insert_range(get_range.key_range.start_key, get_range.key_range.end_key) + self._insert_read(get_range.key_range.start_key, get_range.key_range.end_key) - def _insert_range(self, start_key, end_key): - keys = self.ranges.keys() - if len(keys) == 0: - self.ranges[start_key] = end_key, 1 - return + def _insert_read(self, start_key, end_key): + self.read_counts.setdefault((start_key, end_key), 0) + self.read_counts[(start_key, end_key)] += 1 - start_pos = bisect_left(keys, start_key) - end_pos = bisect_left(keys, end_key) - #print("start_pos=%d, end_pos=%d" % (start_pos, end_pos)) + self.reads.setdefault(start_key, [0, 0])[0] += 1 + if end_key is not None: + self.reads.setdefault(end_key, [0, 0])[1] += 1 + else: + self.reads.setdefault(start_key+b'\x00', [0, 0])[1] += 1 - possible_intersection_keys = keys[max(0, start_pos - 1):min(len(keys), end_pos+1)] + def get_total_reads(self): + return sum([v for v in self.read_counts.values()]) + + def matches_filter(addresses, required_addresses): + for addr in required_addresses: + if addr not in addresses: + return False + return True - start_range_left = start_key + def get_top_k_reads(self, num, filter_addresses, shard_finder=None): + count_pairs = sorted([(v, k) for (k, v) in self.read_counts.items()], reverse=True, key=lambda item: item[0]) + if not filter_addresses: + count_pairs = count_pairs[0:num] - for key in possible_intersection_keys: - cur_end_key, cur_count = self.ranges[key] - #logger.debug("key=%s, cur_end_key=%s, cur_count=%d, start_range_left=%s" % (key, cur_end_key, cur_count, start_range_left)) - if start_range_left < key: - if end_key <= key: - self.ranges[start_range_left] = end_key, 1 - return - self.ranges[start_range_left] = key, 1 - start_range_left = key - assert start_range_left >= key - if start_range_left >= cur_end_key: - continue + if shard_finder: + results = [] + for (count, (start, end)) in count_pairs: + results.append((start, end, count, shard_finder.get_addresses_for_key(start))) - # [key, start_range_left) = cur_count - # if key == start_range_left this will get overwritten below - self.ranges[key] = start_range_left, cur_count + shard_finder.wait_for_shard_addresses(results, 0, 3) - if end_key <= cur_end_key: - # [start_range_left, end_key) = cur_count+1 - # [end_key, cur_end_key) = cur_count - self.ranges[start_range_left] = end_key, cur_count + 1 - if end_key != cur_end_key: - self.ranges[end_key] = cur_end_key, cur_count - start_range_left = end_key - break - else: - # [start_range_left, cur_end_key) = cur_count+1 - self.ranges[start_range_left] = cur_end_key, cur_count+1 - start_range_left = cur_end_key - assert start_range_left <= end_key + if filter_addresses: + filter_addresses = set(filter_addresses) + results = [r for r in results if filter_addresses.issubset(set(r[3]))][0:num] + else: + results = [(start, end, count) for (count, (start, end)) in count_pairs[0:num]] - # there may be some range left - if start_range_left < end_key: - self.ranges[start_range_left] = end_key, 1 + return results - def get_count_for_key(self, key): - if key in self.ranges: - return self.ranges[key][1] - - keys = self.ranges.keys() - index = bisect_left(keys, key) - if index == 0: - return 0 - - index_key = keys[index-1] - if index_key <= key < self.ranges[index_key][0]: - return self.ranges[index_key][1] - return 0 - - def get_range_boundaries(self, shard_finder=None): - total = sum([count for _, (_, count) in self.ranges.items()]) - range_size = total // self.k + def get_range_boundaries(self, num_buckets, shard_finder=None): + total = sum([start_count for (start_count, end_count) in self.reads.values()]) + range_size = total // num_buckets output_range_counts = [] - def add_boundary(start, end, count): + def add_boundary(start, end, started_count, total_count): if shard_finder: shard_count = shard_finder.get_shard_count(start, end) if shard_count == 1: addresses = shard_finder.get_addresses_for_key(start) else: addresses = None - output_range_counts.append((start, end, count, shard_count, addresses)) + output_range_counts.append((start, end, started_count, total_count, shard_count, addresses)) else: - output_range_counts.append((start, end, count, None, None)) + output_range_counts.append((start, end, started_count, total_count, None, None)) this_range_start_key = None + last_end = None + open_count = 0 + opened_this_range = 0 count_this_range = 0 - for (start_key, (end_key, count)) in self.ranges.items(): - if not this_range_start_key: - this_range_start_key = start_key - count_this_range += count - if count_this_range >= range_size: - add_boundary(this_range_start_key, end_key, count_this_range) - count_this_range = 0 - this_range_start_key = None - if count_this_range > 0: - add_boundary(this_range_start_key, end_key, count_this_range) + for (start_key, (start_count, end_count)) in self.reads.items(): + open_count -= end_count + + if opened_this_range >= range_size: + add_boundary(this_range_start_key, start_key, opened_this_range, count_this_range) + count_this_range = open_count + opened_this_range = 0 + this_range_start_key = None + + count_this_range += start_count + opened_this_range += start_count + open_count += start_count + + if count_this_range > 0 and this_range_start_key is None: + this_range_start_key = start_key + + if end_count > 0: + last_end = start_key + + if last_end is None: + last_end = b'\xff' + if count_this_range > 0: + add_boundary(this_range_start_key, last_end, opened_this_range, count_this_range) + + shard_finder.wait_for_shard_addresses(output_range_counts, 0, 5) return output_range_counts class ShardFinder(object): - def __init__(self, db): + def __init__(self, db, exclude_ports): self.db = db + self.exclude_ports = exclude_ports + + self.tr = db.create_transaction() + self.refresh_tr() + + self.outstanding = [] + self.boundary_keys = list(fdb.locality.get_boundary_keys(db, b'', b'\xff\xff')) + self.shard_cache = {} + + def _get_boundary_keys(self, begin, end): + start_pos = max(0, bisect_right(self.boundary_keys, begin)-1) + end_pos = max(0, bisect_right(self.boundary_keys, end)-1) + + return self.boundary_keys[start_pos:end_pos] + + def refresh_tr(self): + self.tr.options.set_read_lock_aware() + if not self.exclude_ports: + self.tr.options.set_include_port_in_address() @staticmethod - @fdb.transactional - def _get_boundary_keys(tr, begin, end): - tr.options.set_read_lock_aware() - return fdb.locality.get_boundary_keys(tr, begin, end) - - @staticmethod - @fdb.transactional def _get_addresses_for_key(tr, key): - tr.options.set_read_lock_aware() return fdb.locality.get_addresses_for_key(tr, key) def get_shard_count(self, start_key, end_key): - return len(list(self._get_boundary_keys(self.db, start_key, end_key))) + 1 + return len(self._get_boundary_keys(start_key, end_key)) + 1 def get_addresses_for_key(self, key): - return [a.decode('ascii') for a in self._get_addresses_for_key(self.db, key).wait()] + shard = self.boundary_keys[max(0, bisect_right(self.boundary_keys, key)-1)] + do_load = False + if not shard in self.shard_cache: + do_load = True + elif self.shard_cache[shard].is_ready(): + try: + self.shard_cache[shard].wait() + except fdb.FDBError as e: + self.tr.on_error(e).wait() + self.refresh_tr() + do_load = True + if do_load: + if len(self.outstanding) > 1000: + for f in self.outstanding: + try: + f.wait() + except fdb.FDBError as e: + pass -class TopKeysCounter(object): + self.outstanding = [] + self.tr.reset() + self.refresh_tr() + + self.outstanding.append(self._get_addresses_for_key(self.tr, shard)) + self.shard_cache[shard] = self.outstanding[-1] + + return self.shard_cache[shard] + + def wait_for_shard_addresses(self, ranges, key_idx, addr_idx): + for index in range(len(ranges)): + item = ranges[index] + if item[addr_idx] is not None: + while True: + try: + ranges[index] = item[0:addr_idx] + ([a.decode('ascii') for a in item[addr_idx].wait()],) + item[addr_idx+1:] + break + except fdb.FDBError as e: + ranges[index] = item[0:addr_idx] + (self.get_addresses_for_key(item[key_idx]),) + item[addr_idx+1:] + +class WriteCounter(object): mutation_types_to_consider = frozenset([MutationType.SET_VALUE, MutationType.ADD_VALUE]) - def __init__(self, k): - self.k = k - self.reads = defaultdict(lambda: 0) + def __init__(self): self.writes = defaultdict(lambda: 0) def process(self, transaction_info): - for get in transaction_info.gets: - self.reads[get.key] += 1 if transaction_info.commit: for mutation in transaction_info.commit.mutations: if mutation.code in self.mutation_types_to_consider: self.writes[mutation.param_one] += 1 - def _get_range_boundaries(self, counts, shard_finder=None): - total = sum([v for (k, v) in counts.items()]) - range_size = total // self.k - key_counts_sorted = sorted(counts.items()) + def get_range_boundaries(self, num_buckets, shard_finder=None): + total = sum([v for (k, v) in self.writes.items()]) + range_size = total // num_buckets + key_counts_sorted = sorted(self.writes.items()) output_range_counts = [] def add_boundary(start, end, count): @@ -671,9 +719,9 @@ class TopKeysCounter(object): addresses = shard_finder.get_addresses_for_key(start) else: addresses = None - output_range_counts.append((start, end, count, shard_count, addresses)) + output_range_counts.append((start, end, count, None, shard_count, addresses)) else: - output_range_counts.append((start, end, count, None, None)) + output_range_counts.append((start, end, count, None, None, None)) start_key = None count_this_range = 0 @@ -688,24 +736,31 @@ class TopKeysCounter(object): if count_this_range > 0: add_boundary(start_key, k, count_this_range) + shard_finder.wait_for_shard_addresses(output_range_counts, 0, 5) return output_range_counts - def _get_top_k(self, counts): - count_key_pairs = sorted([(v, k) for (k, v) in counts.items()], reverse=True) - return count_key_pairs[0:self.k] + def get_total_writes(self): + return sum([v for v in self.writes.values()]) - def get_top_k_reads(self): - return self._get_top_k(self.reads) + def get_top_k_writes(self, num, filter_addresses, shard_finder=None): + count_pairs = sorted([(v, k) for (k, v) in self.writes.items()], reverse=True) + if not filter_addresses: + count_pairs = count_pairs[0:num] - def get_top_k_writes(self): - return self._get_top_k(self.writes) + if shard_finder: + results = [] + for (count, key) in count_pairs: + results.append((key, None, count, shard_finder.get_addresses_for_key(key))) - def get_k_read_range_boundaries(self, shard_finder=None): - return self._get_range_boundaries(self.reads, shard_finder) + shard_finder.wait_for_shard_addresses(results, 0, 3) - def get_k_write_range_boundaries(self, shard_finder=None): - return self._get_range_boundaries(self.writes, shard_finder) + if filter_addresses: + filter_addresses = set(filter_addresses) + results = [r for r in results if filter_addresses.issubset(set(r[3]))][0:num] + else: + results = [(key, end, count) for (count, key) in count_pairs[0:num]] + return results def connect(cluster_file=None): db = fdb.open(cluster_file=cluster_file) @@ -722,6 +777,8 @@ def main(): help="Include get type. If no filter args are given all will be returned.") parser.add_argument("--filter-get-range", action="store_true", help="Include get_range type. If no filter args are given all will be returned.") + parser.add_argument("--filter-reads", action="store_true", + help="Include get and get_range type. If no filter args are given all will be returned.") parser.add_argument("--filter-commit", action="store_true", help="Include commit type. If no filter args are given all will be returned.") parser.add_argument("--filter-error-get", action="store_true", @@ -737,21 +794,34 @@ def main(): end_time_group = parser.add_mutually_exclusive_group() end_time_group.add_argument("--max-timestamp", type=int, help="Don't return events newer than this epoch time") end_time_group.add_argument("-e", "--end-time", type=str, help="Don't return events older than this parsed time") - parser.add_argument("--top-keys", type=int, help="If specified will output this many top keys for reads or writes", default=0) + parser.add_argument("--num-buckets", type=int, help="The number of buckets to partition the key-space into for operation counts", default=100) + parser.add_argument("--top-requests", type=int, help="If specified will output this many top keys for reads or writes", default=0) + parser.add_argument("--exclude-ports", action="store_true", help="Print addresses without the port number. Only works in versions older than 6.3, and is required in versions older than 6.2.") + parser.add_argument("--single-shard-ranges-only", action="store_true", help="Only print range boundaries that exist in a single shard") + parser.add_argument("-a", "--filter-address", action="append", help="Only print range boundaries that include the given address. This option can used multiple times to include more than one address in the filter, in which case all addresses must match.") + args = parser.parse_args() type_filter = set() if args.filter_get_version: type_filter.add("get_version") - if args.filter_get: type_filter.add("get") - if args.filter_get_range: type_filter.add("get_range") + if args.filter_get or args.filter_reads: type_filter.add("get") + if args.filter_get_range or args.filter_reads: type_filter.add("get_range") if args.filter_commit: type_filter.add("commit") if args.filter_error_get: type_filter.add("error_get") if args.filter_error_get_range: type_filter.add("error_get_range") if args.filter_error_commit: type_filter.add("error_commit") - top_keys = args.top_keys - key_counter = TopKeysCounter(top_keys) if top_keys else None - range_counter = RangeCounter(top_keys) if (has_sortedcontainers() and top_keys) else None - full_output = args.full_output or (top_keys is not None) + + if (not type_filter or "commit" in type_filter): + write_counter = WriteCounter() if args.num_buckets else None + else: + write_counter = None + + if (not type_filter or "get" in type_filter or "get_range" in type_filter): + read_counter = ReadCounter() if (has_sortedcontainers() and args.num_buckets) else None + else: + read_counter = None + + full_output = args.full_output or (args.num_buckets is not None) if args.min_timestamp: min_timestamp = args.min_timestamp @@ -784,48 +854,128 @@ def main(): db = connect(cluster_file=args.cluster_file) loader = TransactionInfoLoader(db, full_output=full_output, type_filter=type_filter, min_timestamp=min_timestamp, max_timestamp=max_timestamp) + for info in loader.fetch_transaction_info(): if info.has_types(): - if not key_counter and not range_counter: + if not write_counter and not read_counter: print(info.to_json()) else: - if key_counter: - key_counter.process(info) - if range_counter: - range_counter.process(info) + if write_counter: + write_counter.process(info) + if read_counter: + read_counter.process(info) - if key_counter: - def print_top(top): - for (count, key) in top: - print("%s %d" % (key, count)) - - def print_range_boundaries(range_boundaries): - for (start, end, count, shard_count, addresses) in range_boundaries: - if not shard_count: - print("[%s, %s] %d" % (start, end, count)) + def print_top(top, total, context): + if top: + running_count = 0 + for (idx, (start, end, count, addresses)) in enumerate(top): + running_count += count + if end is not None: + op_str = 'Range %r - %r' % (start, end) else: - addresses_string = "addresses=%s" % ','.join(addresses) if addresses else '' - print("[%s, %s] %d shards=%d %s" % (start, end, count, shard_count, addresses_string)) + op_str = 'Key %r' % start + + print(" %d. %s\n %d sampled %s (%.2f%%, %.2f%% cumulative)" % (idx+1, op_str, count, context, 100*count/total, 100*running_count/total)) + print(" shard addresses: %s\n" % ", ".join(addresses)) + + else: + print(" No %s found" % context) + + def print_range_boundaries(range_boundaries, context): + omit_start = None + for (idx, (start, end, start_count, total_count, shard_count, addresses)) in enumerate(range_boundaries): + omit = args.single_shard_ranges_only and shard_count is not None and shard_count > 1 + if args.filter_address: + if not addresses: + omit = True + else: + for addr in args.filter_address: + if addr not in addresses: + omit = True + break + + if not omit: + if omit_start is not None: + if omit_start == idx-1: + print(" %d. Omitted\n" % (idx)) + else: + print(" %d - %d. Omitted\n" % (omit_start+1, idx)) + omit_start = None + + if total_count is None: + count_str = '%d sampled %s' % (start_count, context) + else: + count_str = '%d sampled %s (%d intersecting)' % (start_count, context, total_count) + if not shard_count: + print(" %d. [%s, %s]\n %d sampled %s\n" % (idx+1, start, end, count, context)) + else: + addresses_string = "; addresses=%s" % ', '.join(addresses) if addresses else '' + print(" %d. [%s, %s]\n %s spanning %d shard(s)%s\n" % (idx+1, start, end, count_str, shard_count, addresses_string)) + elif omit_start is None: + omit_start = idx + + if omit_start is not None: + if omit_start == len(range_boundaries)-1: + print(" %d. Omitted\n" % len(range_boundaries)) + else: + print(" %d - %d. Omitted\n" % (omit_start+1, len(range_boundaries))) + + shard_finder = ShardFinder(db, args.exclude_ports) + + print("NOTE: shard locations are current and may not reflect where an operation was performed in the past\n") + + if write_counter: + if args.top_requests: + top_writes = write_counter.get_top_k_writes(args.top_requests, args.filter_address, shard_finder=shard_finder) + + range_boundaries = write_counter.get_range_boundaries(args.num_buckets, shard_finder=shard_finder) + num_writes = write_counter.get_total_writes() + + if args.top_requests or range_boundaries: + print("WRITES") + print("------\n") + print("Processed %d total writes\n" % num_writes) + + if args.top_requests: + suffix = "" + if args.filter_address: + suffix = " (%s)" % ", ".join(args.filter_address) + print("Top %d writes%s:\n" % (args.top_requests, suffix)) + + print_top(top_writes, write_counter.get_total_writes(), "writes") + print("") - shard_finder = ShardFinder(db) - top_reads = key_counter.get_top_k_reads() - if top_reads: - print("Top %d reads:" % min(top_keys, len(top_reads))) - print_top(top_reads) - print("Approx equal sized gets range boundaries:") - print_range_boundaries(key_counter.get_k_read_range_boundaries(shard_finder=shard_finder)) - top_writes = key_counter.get_top_k_writes() - if top_writes: - print("Top %d writes:" % min(top_keys, len(top_writes))) - print_top(top_writes) - print("Approx equal sized commits range boundaries:") - print_range_boundaries(key_counter.get_k_write_range_boundaries(shard_finder=shard_finder)) - if range_counter: - range_boundaries = range_counter.get_range_boundaries(shard_finder=shard_finder) if range_boundaries: - print("Approx equal sized get_ranges boundaries:") - print_range_boundaries(range_boundaries) + print("Key-space boundaries with approximately equal mutation counts:\n") + print_range_boundaries(range_boundaries, "writes") + if args.top_requests or range_boundaries: + print("") + + if read_counter: + if args.top_requests: + top_reads = read_counter.get_top_k_reads(args.top_requests, args.filter_address, shard_finder=shard_finder) + + range_boundaries = read_counter.get_range_boundaries(args.num_buckets, shard_finder=shard_finder) + num_reads = read_counter.get_total_reads() + + if args.top_requests or range_boundaries: + print("READS") + print("-----\n") + print("Processed %d total reads\n" % num_reads) + + if args.top_requests: + suffix = "" + if args.filter_address: + suffix = " (%s)" % ", ".join(args.filter_address) + print("Top %d reads%s:\n" % (args.top_requests, suffix)) + + print_top(top_reads, num_reads, "reads") + print("") + + if range_boundaries: + print("Key-space boundaries with approximately equal read counts:\n") + print_range_boundaries(range_boundaries, "reads") if __name__ == "__main__": main() diff --git a/design/special-key-space.md b/design/special-key-space.md index c54cfb065f..d9fb328c65 100644 --- a/design/special-key-space.md +++ b/design/special-key-space.md @@ -88,11 +88,19 @@ We introduce this `module` concept after a [discussion](https://forums.foundatio - `\xff\xff/transaction/read_conflict_range/, \xff\xff/transaction/read_conflict_range0` : read conflict ranges of the transaction - `\xff\xff/transaction/write_conflict_range/, \xff\xff/transaction/write_conflict_range0` : write conflict ranges of the transaction - METRICS: `\xff\xff/metrics/, \xff\xff/metrics0`, all metrics like data-distribution metrics or healthy metrics are planned to put here. All need to call the rpc, so time_out error s may happen. Right now we have: +<<<<<<< HEAD - `\xff\xff/metrics/data_distribution_stats/, \xff\xff/metrics/data_distribution_stats0` : stats info about data-distribution +======= + - `\xff\xff/metrics/data_distribution_stats, \xff\xff/metrics/data_distribution_stats` : stats info about data-distribution +>>>>>>> master - WORKERINTERFACE : `\xff\xff/worker_interfaces/, \xff\xff/worker_interfaces0`, which is compatible with previous implementation, thus should not be used to add new functions. In addition, all singleKeyRanges are formatted as modules and cannot be used again. In particular, you should call `get` not `getRange` on these keys. Below are existing ones: - STATUSJSON : `\xff\xff/status/json` - CONNECTIONSTRING : `\xff\xff/connection_string` +<<<<<<< HEAD - CLUSTERFILEPATH : `\xff\xff/cluster_file_path` +======= +- CLUSTERFILEPATH : `\xff\xff/cluster_file_path` +>>>>>>> master diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 3b5518730d..128470674d 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -3465,16 +3465,6 @@ int main(int argc, char* argv[]) { std::set_new_handler( &platform::outOfMemory ); setMemoryQuota( memLimit ); - int total = 0; - for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i) - total += i->second; - if (total) - printf("%d errors:\n", total); - for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i) - if (i->second > 0) - printf(" %d: %d %s\n", i->second, i->first, Error::fromCode(i->first).what()); - - Reference ccf; Database db; Reference sourceCcf; diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 1af54f9d48..37ee20ec8c 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -2523,7 +2523,11 @@ void throttleGenerator(const char* text, const char *line, std::vector>>>>>> master const char* opts[] = { "auto", nullptr }; arrayGenerator(text, line, opts, lc); } diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 9670a2a6ac..f1c78806ef 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -132,6 +132,13 @@ struct MutationRef { }; }; +template<> +struct Traceable : std::true_type { + static std::string toString(MutationRef const& value) { + return value.toString(); + } +}; + static inline std::string getTypeString(MutationRef::Type type) { return type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "Unset"; } @@ -206,7 +213,4 @@ struct CommitTransactionRef { } }; -bool debugMutation( const char* context, Version version, MutationRef const& m ); -bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keyRange ); - #endif diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index c6aff2a804..d8cd7ab55c 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -108,6 +108,13 @@ struct struct_like_traits : std::true_type { } }; +template<> +struct Traceable : std::true_type { + static std::string toString(const Tag& value) { + return value.toString(); + } +}; + static const Tag invalidTag {tagLocalitySpecial, 0}; static const Tag txsTag {tagLocalitySpecial, 1}; static const Tag cacheTag {tagLocalitySpecial, 2}; @@ -222,11 +229,25 @@ std::string describe( std::vector const& items, int max_items = -1 ) { return describeList(items, max_items); } +template +struct Traceable> : std::true_type { + static std::string toString(const std::vector& value) { + return describe(value); + } +}; + template std::string describe( std::set const& items, int max_items = -1 ) { return describeList(items, max_items); } +template +struct Traceable> : std::true_type { + static std::string toString(const std::set& value) { + return describe(value); + } +}; + std::string printable( const StringRef& val ); std::string printable( const std::string& val ); std::string printable( const KeyRangeRef& range ); diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index b23b5b8d87..6950f8ca74 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -316,7 +316,12 @@ Future> ConflictingKeysImpl::getRange(ReadYourWritesT return result; } +<<<<<<< HEAD ACTOR Future> ddStatsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { +======= +ACTOR Future> ddStatsGetRangeActor(Reference ryw, + KeyRangeRef kr) { +>>>>>>> master try { auto keys = kr.removePrefix(ddStatsRange.begin); Standalone> resultWithoutPrefix = @@ -341,7 +346,12 @@ ACTOR Future> ddStatsGetRangeActor(ReadYourWritesTran DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {} +<<<<<<< HEAD Future> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { +======= +Future> DDStatsRangeImpl::getRange(Reference ryw, + KeyRangeRef kr) const { +>>>>>>> master return ddStatsGetRangeActor(ryw, kr); } diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index a33ff666a4..2c8242935f 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -154,5 +154,12 @@ public: Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; }; +class DDStatsRangeImpl : public SpecialKeyRangeBaseImpl { +public: + explicit DDStatsRangeImpl(KeyRangeRef kr); + Future> getRange(Reference ryw, + KeyRangeRef kr) const override; +}; + #include "flow/unactorcompiler.h" #endif diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index e72b99dce7..1dc20884a5 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -47,7 +47,11 @@ const KeyRef keyServersKey( const KeyRef& k, Arena& arena ) { } const Value keyServersValue( Standalone result, const std::vector& src, const std::vector& dest ) { if(!CLIENT_KNOBS->TAG_ENCODE_KEY_SERVERS) { +<<<<<<< HEAD BinaryWriter wr(IncludeVersion(ProtocolVersion::withKeyServerValue())); wr << src << dest; +======= + BinaryWriter wr(IncludeVersion()); wr << src << dest; +>>>>>>> master return wr.toValue(); } diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 655bf25a14..4989bf28cc 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -246,20 +246,13 @@ public: // stream.send( request ) // Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times) - void send(const T& value) const { + template + void send(U && value) const { if (queue->isRemoteEndpoint()) { - FlowTransport::transport().sendUnreliable(SerializeSource(value), getEndpoint(), true); + FlowTransport::transport().sendUnreliable(SerializeSource(std::forward(value)), getEndpoint(), true); } else - queue->send(value); - } - - void send(T&& value) const { - if (queue->isRemoteEndpoint()) { - FlowTransport::transport().sendUnreliable(SerializeSource(std::move(value)), getEndpoint(), true); - } - else - queue->send(std::move(value)); + queue->send(std::forward(value)); } /*void sendError(const Error& error) const { diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index e920d9b185..6114956dc9 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -1650,15 +1650,8 @@ public: this->currentProcess = t.machine; try { - //auto before = getCPUTicks(); t.action.send(Void()); ASSERT( this->currentProcess == t.machine ); - /*auto elapsed = getCPUTicks() - before; - currentProcess->cpuTicks += elapsed; - if (deterministicRandom()->random01() < 0.01){ - TraceEvent("TaskDuration").detail("CpuTicks", currentProcess->cpuTicks); - currentProcess->cpuTicks = 0; - }*/ } catch (Error& e) { TraceEvent(SevError, "UnhandledSimulationEventError").error(e, true); killProcess(t.machine, KillInstantly); diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index d81e5763a1..6f8164a0f4 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -58,7 +58,6 @@ public: bool failed; bool excluded; bool cleared; - int64_t cpuTicks; bool rebooting; std::vector globals; @@ -68,12 +67,11 @@ public: double fault_injection_p1, fault_injection_p2; ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddressList addresses, - INetworkConnections *net, const char* dataFolder, const char* coordinationFolder ) - : name(name), locality(locality), startingClass(startingClass), - addresses(addresses), address(addresses.address), dataFolder(dataFolder), - network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0), - rebooting(false), fault_injection_p1(0), fault_injection_p2(0), - fault_injection_r(0), machine(0), cleared(false) {} + INetworkConnections* net, const char* dataFolder, const char* coordinationFolder) + : name(name), locality(locality), startingClass(startingClass), addresses(addresses), + address(addresses.address), dataFolder(dataFolder), network(net), coordinationFolder(coordinationFolder), + failed(false), excluded(false), rebooting(false), fault_injection_p1(0), fault_injection_p2(0), + fault_injection_r(0), machine(0), cleared(false) {} Future onShutdown() { return shutdownSignal.getFuture(); } diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index d9c341e629..b07beb4cfe 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -247,7 +247,10 @@ struct BackupData { specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; }); specialCounter(cc, "MsgQ", [this]() { return this->messages.size(); }); specialCounter(cc, "BufferedBytes", [this]() { return this->lock->activePermits(); }); +<<<<<<< HEAD specialCounter(cc, "AvailableBytes", [this]() { return this->lock->available(); }); +======= +>>>>>>> master logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "BackupWorkerMetrics"); } @@ -734,13 +737,11 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int MutationRef m; if (!message.isBackupMessage(&m)) continue; - if (debugMutation("addMutation", message.version.version, m)) { - TraceEvent("BackupWorkerDebug", self->myId) + DEBUG_MUTATION("addMutation", message.version.version, m) .detail("Version", message.version.toString()) - .detail("Mutation", m.toString()) + .detail("Mutation", m) .detail("KCV", self->minKnownCommittedVersion) .detail("SavedVersion", self->savedVersion); - } std::vector> adds; if (m.type != MutationRef::Type::ClearRange) { @@ -847,7 +848,11 @@ ACTOR Future uploadData(BackupData* self) { } // If transition into NOOP mode, should clear messages +<<<<<<< HEAD if (!self->pulling && self->backupEpoch == self->recruitedEpoch) { +======= + if (!self->pulling) { +>>>>>>> master self->eraseMessages(self->messages.size()); } diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index f812a97861..52a08a6ef4 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -46,8 +46,10 @@ set(FDBSERVER_SRCS MasterInterface.h MasterProxyServer.actor.cpp masterserver.actor.cpp - MoveKeys.actor.cpp + MutationTracking.h + MutationTracking.cpp MoveKeys.actor.h + MoveKeys.actor.cpp networktest.actor.cpp NetworkTest.h OldTLogServer_4_6.actor.cpp diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index b6ed3c0493..f2ee81bc5f 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -27,6 +27,7 @@ #include "fdbserver/TLogInterface.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbclient/DatabaseConfiguration.h" +#include "fdbserver/MutationTracking.h" #include "flow/IndexedSet.h" #include "fdbrpc/ReplicationPolicy.h" #include "fdbrpc/Locality.h" @@ -877,16 +878,27 @@ struct LogPushData : NonCopyable { msg_locations.clear(); logSystem->getPushLocations(prev_tags, msg_locations, allLocations); + BinaryWriter bw(AssumeVersion(currentProtocolVersion)); uint32_t subseq = this->subsequence++; + bool first = true; + int firstOffset=-1, firstLength=-1; for(int loc : msg_locations) { - // FIXME: memcpy after the first time - BinaryWriter& wr = messagesWriter[loc]; - int offset = wr.getLength(); - wr << uint32_t(0) << subseq << uint16_t(prev_tags.size()); - for(auto& tag : prev_tags) - wr << tag; - wr << item; - *(uint32_t*)((uint8_t*)wr.getData() + offset) = wr.getLength() - offset - sizeof(uint32_t); + if (first) { + BinaryWriter& wr = messagesWriter[loc]; + firstOffset = wr.getLength(); + wr << uint32_t(0) << subseq << uint16_t(prev_tags.size()); + for(auto& tag : prev_tags) + wr << tag; + wr << item; + firstLength = wr.getLength() - firstOffset; + *(uint32_t*)((uint8_t*)wr.getData() + firstOffset) = firstLength - sizeof(uint32_t); + DEBUG_TAGS_AND_MESSAGE("ProxyPushLocations", invalidVersion, StringRef(((uint8_t*)wr.getData() + firstOffset), firstLength)).detail("PushLocations", msg_locations); + first = false; + } else { + BinaryWriter& wr = messagesWriter[loc]; + BinaryWriter& from = messagesWriter[msg_locations[0]]; + wr.serializeBytes( (uint8_t*)from.getData() + firstOffset, firstLength ); + } } next_message_tags.clear(); } diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 51880f5064..6f2d15cedd 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -21,6 +21,7 @@ #include "fdbserver/LogSystem.h" #include "fdbrpc/FailureMonitor.h" #include "fdbserver/Knobs.h" +#include "fdbserver/MutationTracking.h" #include "fdbrpc/ReplicationUtils.h" #include "flow/actorcompiler.h" // has to be last include @@ -90,6 +91,7 @@ void ILogSystem::ServerPeekCursor::nextMessage() { } messageAndTags.loadFromArena(&rd, &messageVersion.sub); + DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage()).detail("CursorID", this->randomID); // Rewind and consume the header so that reader() starts from the message. rd.rewind(); rd.readBytes(messageAndTags.getHeaderSize()); diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 7d50d077e6..f29428d598 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -38,6 +38,7 @@ #include "fdbserver/LogSystem.h" #include "fdbserver/LogSystemDiskQueueAdapter.h" #include "fdbserver/MasterInterface.h" +#include "fdbserver/MutationTracking.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/WaitFailure.h" @@ -759,7 +760,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::mapaddTags(tags); toCommit->addTypedMessage(backupMutation); -// if (debugMutation("BackupProxyCommit", commitVersion, backupMutation)) { +// if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) { // TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString()) // .detail("BackupMutationSize", val.size()).detail("Version", commitVersion).detail("DestPath", logRangeMutation.first) // .detail("PartIndex", part).detail("PartIndexEndian", bigEndian32(part)).detail("PartData", backupMutation.param1); @@ -1079,8 +1080,7 @@ ACTOR Future commitBatch( self->singleKeyMutationEvent->log(); } - if (debugMutation("ProxyCommit", commitVersion, m)) - TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("Mutation", m.toString()).detail("Version", commitVersion); + DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", tags).detail("Mutation", m); toCommit.addTags(tags); if(self->cacheInfo[m.param1]) { @@ -1095,8 +1095,7 @@ ACTOR Future commitBatch( ++firstRange; if (firstRange == ranges.end()) { // Fast path - if (debugMutation("ProxyCommit", commitVersion, m)) - TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion); + DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", ranges.begin().value().tags).detail("Mutation", m); ranges.begin().value().populateTags(); toCommit.addTags(ranges.begin().value().tags); @@ -1108,8 +1107,7 @@ ACTOR Future commitBatch( r.value().populateTags(); allSources.insert(r.value().tags.begin(), r.value().tags.end()); } - if (debugMutation("ProxyCommit", commitVersion, m)) - TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion); + DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", allSources).detail("Mutation", m); toCommit.addTags(allSources); } diff --git a/fdbserver/MutationTracking.cpp b/fdbserver/MutationTracking.cpp new file mode 100644 index 0000000000..ddd17437a3 --- /dev/null +++ b/fdbserver/MutationTracking.cpp @@ -0,0 +1,101 @@ +/* + * MutationTracking.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "fdbserver/MutationTracking.h" +#include "fdbserver/LogProtocolMessage.h" + +#if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED +#error "You cannot use mutation tracking in a clean/release build." +#endif + +// Track up to 2 keys in simulation via enabling MUTATION_TRACKING_ENABLED and setting the keys here. +StringRef debugKey = LiteralStringRef( "" ); +StringRef debugKey2 = LiteralStringRef( "\xff\xff\xff\xff" ); + +TraceEvent debugMutationEnabled( const char* context, Version version, MutationRef const& mutation ) { + if ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) && + ((mutation.param1<=debugKey && mutation.param2>debugKey) || (mutation.param1<=debugKey2 && mutation.param2>debugKey2))) { + return std::move(TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", typeString[mutation.type]).detail("KeyBegin", mutation.param1).detail("KeyEnd", mutation.param2)); + } else if (mutation.param1 == debugKey || mutation.param1 == debugKey2) { + return std::move(TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", typeString[mutation.type]).detail("Key", mutation.param1).detail("Value", mutation.param2)); + } else { + return std::move(TraceEvent()); + } +} + +TraceEvent debugKeyRangeEnabled( const char* context, Version version, KeyRangeRef const& keys ) { + if (keys.contains(debugKey) || keys.contains(debugKey2)) { + return std::move(debugMutation(context, version, MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end) )); + } else { + return std::move(TraceEvent()); + } +} + +TraceEvent debugTagsAndMessageEnabled( const char* context, Version version, StringRef commitBlob ) { + BinaryReader rdr(commitBlob, AssumeVersion(currentProtocolVersion)); + while (!rdr.empty()) { + if (*(int32_t*)rdr.peekBytes(4) == VERSION_HEADER) { + int32_t dummy; + rdr >> dummy >> version; + continue; + } + TagsAndMessage msg; + msg.loadFromArena(&rdr, nullptr); + bool logAdapterMessage = std::any_of( + msg.tags.begin(), msg.tags.end(), [](const Tag& t) { return t == txsTag || t.locality == tagLocalityTxs; }); + StringRef mutationData = msg.getMessageWithoutTags(); + uint8_t mutationType = *mutationData.begin(); + if (logAdapterMessage) { + // Skip the message, as there will always be an idential non-logAdapterMessage mutation + // that we can match against in the same commit. + } else if (LogProtocolMessage::startsLogProtocolMessage(mutationType)) { + BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); + LogProtocolMessage lpm; + br >> lpm; + rdr.setProtocolVersion(br.protocolVersion()); + } else { + MutationRef m; + BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); + br >> m; + TraceEvent&& event = debugMutation(context, version, m); + if (event.isEnabled()) { + return std::move(event.detail("MessageTags", msg.tags)); + } + } + } + return std::move(TraceEvent()); +} + +#if MUTATION_TRACKING_ENABLED +TraceEvent debugMutation( const char* context, Version version, MutationRef const& mutation ) { + return debugMutationEnabled( context, version, mutation ); +} +TraceEvent debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) { + return debugKeyRangeEnabled( context, version, keys ); +} +TraceEvent debugTagsAndMessage( const char* context, Version version, StringRef commitBlob ) { + return debugTagsAndMessageEnabled( context, version, commitBlob ); +} +#else +TraceEvent debugMutation( const char* context, Version version, MutationRef const& mutation ) { return std::move(TraceEvent()); } +TraceEvent debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) { return std::move(TraceEvent()); } +TraceEvent debugTagsAndMessage( const char* context, Version version, StringRef commitBlob ) { return std::move(TraceEvent()); } +#endif diff --git a/fdbserver/MutationTracking.h b/fdbserver/MutationTracking.h new file mode 100644 index 0000000000..978ddca6a3 --- /dev/null +++ b/fdbserver/MutationTracking.h @@ -0,0 +1,49 @@ +/* + * MutationTracking.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _FDBSERVER_MUTATIONTRACKING_H_ +#define _FDBSERVER_MUTATIONTRACKING_H_ +#pragma once + +#include "fdbclient/FDBTypes.h" +#include "fdbclient/CommitTransaction.h" + +#define MUTATION_TRACKING_ENABLED 0 +// The keys to track are defined in the .cpp file to limit recompilation. + + +#define DEBUG_MUTATION(context, version, mutation) MUTATION_TRACKING_ENABLED && debugMutation(context, version, mutation) +TraceEvent debugMutation( const char* context, Version version, MutationRef const& mutation ); + +// debugKeyRange and debugTagsAndMessage only log the *first* occurrence of a key in their range/commit. +// TODO: Create a TraceEventGroup that forwards all calls to each element of a vector, +// to allow "multiple" TraceEvents to be returned. + +#define DEBUG_KEY_RANGE(context, version, keys) MUTATION_TRACKING_ENABLED && debugKeyRange(context, version, keys) +TraceEvent debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ); + +#define DEBUG_TAGS_AND_MESSAGE(context, version, commitBlob) MUTATION_TRACKING_ENABLED && debugTagsAndMessage(context, version, commitBlob) +TraceEvent debugTagsAndMessage( const char* context, Version version, StringRef commitBlob ); + + +// TODO: Version Tracking. If the bug is in handling a version rather than a key, then it'd be good to be able to log each time +// that version is handled within simulation. A similar set of functions should be implemented. + +#endif diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 06cff36b75..7aaf44263a 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -36,6 +36,7 @@ #include "fdbrpc/Locality.h" #include "fdbserver/CoordinationInterface.h" #include "fdbclient/RestoreWorkerInterface.actor.h" +#include "fdbserver/MutationTracking.h" #include "fdbserver/RestoreUtil.h" #include "fdbserver/RestoreRoleCommon.actor.h" @@ -60,19 +61,17 @@ struct StagingKey { // Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set void add(const MutationRef& m, LogMessageVersion newVersion) { ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue); - if (debugMutation("StagingKeyAdd", newVersion.version, m)) { - TraceEvent("StagingKeyAdd") - .detail("Version", version.toString()) - .detail("NewVersion", newVersion.toString()) - .detail("Mutation", m.toString()); - } + DEBUG_MUTATION("StagingKeyAdd", newVersion.version, m) + .detail("Version", version.toString()) + .detail("NewVersion", newVersion.toString()) + .detail("Mutation", m); if (version == newVersion) { // This could happen because the same mutation can be present in // overlapping mutation logs, because new TLogs can copy mutations // from old generation TLogs (or backup worker is recruited without // knowning previously saved progress). ASSERT(type == m.type && key == m.param1 && val == m.param2); - TraceEvent("SameVersion").detail("Version", version.toString()).detail("Mutation", m.toString()); + TraceEvent("SameVersion").detail("Version", version.toString()).detail("Mutation", m); return; } @@ -84,15 +83,13 @@ struct StagingKey { ASSERT(m.param1 == m.param2); } if (version < newVersion) { - if (debugMutation("StagingKeyAdd", newVersion.version, m)) { - TraceEvent("StagingKeyAdd") + DEBUG_MUTATION("StagingKeyAdd", newVersion.version, m) .detail("Version", version.toString()) .detail("NewVersion", newVersion.toString()) .detail("MType", getTypeString(type)) .detail("Key", key) .detail("Val", val) .detail("NewMutation", m.toString()); - } key = m.param1; val = m.param2; type = (MutationRef::Type)m.type; @@ -108,8 +105,8 @@ struct StagingKey { TraceEvent("SameVersion") .detail("Version", version.toString()) .detail("NewVersion", newVersion.toString()) - .detail("OldMutation", it->second.toString()) - .detail("NewMutation", m.toString()); + .detail("OldMutation", it->second) + .detail("NewMutation", m); ASSERT(it->second.type == m.type && it->second.param1 == m.param1 && it->second.param2 == m.param2); } } diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index c919e77778..ccc1dfa9f3 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -26,6 +26,7 @@ #include "fdbclient/BackupAgent.actor.h" #include "fdbserver/RestoreLoader.actor.h" #include "fdbserver/RestoreRoleCommon.actor.h" +#include "fdbserver/MutationTracking.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -508,23 +509,23 @@ ACTOR Future sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat nodeIDs.contents()); ASSERT(mvector.size() == nodeIDs.size()); - if (debugMutation("RestoreLoader", commitVersion.version, kvm)) { - TraceEvent e("DebugSplit"); - int i = 0; - for (auto& [key, uid] : *pRangeToApplier) { - e.detail(format("Range%d", i).c_str(), printable(key)) - .detail(format("UID%d", i).c_str(), uid.toString()); - i++; + if (MUTATION_TRACKING_ENABLED) { + TraceEvent&& e = debugMutation("RestoreLoaderDebugSplit", commitVersion.version, kvm); + if (e.isEnabled()) { + int i = 0; + for (auto& [key, uid] : *pRangeToApplier) { + e.detail(format("Range%d", i).c_str(), printable(key)) + .detail(format("UID%d", i).c_str(), uid.toString()); + i++; + } } } for (splitMutationIndex = 0; splitMutationIndex < mvector.size(); splitMutationIndex++) { MutationRef mutation = mvector[splitMutationIndex]; UID applierID = nodeIDs[splitMutationIndex]; - if (debugMutation("RestoreLoader", commitVersion.version, mutation)) { - TraceEvent("SplittedMutation") - .detail("Version", commitVersion.toString()) - .detail("Mutation", mutation.toString()); - } + DEBUG_MUTATION("RestoreLoaderSplittedMutation", commitVersion.version, mutation) + .detail("Version", commitVersion.toString()) + .detail("Mutation", mutation); // CAREFUL: The splitted mutations' lifetime is shorter than the for-loop // Must use deep copy for splitted mutations applierVersionedMutationsBuffer[applierID].push_back_deep( @@ -540,12 +541,10 @@ ACTOR Future sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat UID applierID = itlow->second; kvCount++; - if (debugMutation("RestoreLoader", commitVersion.version, kvm)) { - TraceEvent("SendMutation") - .detail("Applier", applierID) - .detail("Version", commitVersion.toString()) - .detail("Mutation", kvm.toString()); - } + DEBUG_MUTATION("RestoreLoaderSendMutation", commitVersion.version, kvm) + .detail("Applier", applierID) + .detail("Version", commitVersion.toString()) + .detail("Mutation", kvm); // kvm data is saved in pkvOps in batchData, so shallow copy is ok here. applierVersionedMutationsBuffer[applierID].push_back(applierVersionedMutationsBuffer[applierID].arena(), VersionedMutation(kvm, commitVersion)); @@ -1057,4 +1056,4 @@ TEST_CASE("/FastRestore/RestoreLoader/splitMutation") { } return Void(); -} \ No newline at end of file +} diff --git a/fdbserver/RestoreUtil.actor.cpp b/fdbserver/RestoreUtil.actor.cpp index 7965ab60e4..7451f16570 100644 --- a/fdbserver/RestoreUtil.actor.cpp +++ b/fdbserver/RestoreUtil.actor.cpp @@ -76,4 +76,4 @@ bool isRangeMutation(MutationRef m) { ASSERT(m.type == MutationRef::Type::SetValue || isAtomicOp((MutationRef::Type)m.type)); return false; } -} \ No newline at end of file +} diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 2ded825e95..6a77858e14 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -26,6 +26,7 @@ #include "fdbclient/Atomic.h" #include "fdbclient/Notified.h" #include "fdbserver/LogSystem.h" +#include "fdbserver/MutationTracking.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -267,8 +268,8 @@ ACTOR Future getValueQ( StorageCacheData* data, GetValueRequest req ) { path = 1; } - //debugMutation("CacheGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef(""))); - //debugMutation("CacheGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2"))); + //DEBUG_MUTATION("CacheGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef(""))); + //DEBUG_MUTATION("CacheGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2"))); if (v.present()) { ++data->counters.rowsQueried; @@ -710,22 +711,10 @@ void StorageCacheData::addMutation(KeyRangeRef const& cachedKeyRange, Version ve return; } expanded = addMutationToMutationLog(mLog, expanded); - if (debugMutation("expandedMutation", version, expanded)) { - const char* type = - mutation.type == MutationRef::SetValue ? "SetValue" : - mutation.type == MutationRef::ClearRange ? "ClearRange" : - mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" : - mutation.type == MutationRef::DebugKey ? "DebugKey" : - "UnknownMutation"; - printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%s\t%s\t%s\n", - now(), g_network->getLocalAddress().toString().c_str(), "originalMutation", - type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str()); - printf(" Cached Key-range: %s - %s\n", printable(cachedKeyRange.begin).c_str(), printable(cachedKeyRange.end).c_str()); - } + DEBUG_MUTATION("expandedMutation", version, expanded).detail("Begin", cachedKeyRange.begin).detail("End", cachedKeyRange.end); applyMutation( this, expanded, mLog.arena(), mutableData() ); printf("\nSCUpdate: Printing versioned tree after applying mutation\n"); mutableData().printTree(version); - } // Helper class for updating the storage cache (i.e. applying mutations) @@ -742,15 +731,11 @@ public: data->mutableData().createNewVersion(ver); } + DEBUG_MUTATION("SCUpdateMutation", ver, m); if (m.param1.startsWith( systemKeys.end )) { //TraceEvent("PrivateData", data->thisServerID).detail("Mutation", m.toString()).detail("Version", ver); applyPrivateCacheData( data, m ); } else { - // FIXME: enable when debugMutation is active - //for(auto m = changes[c].mutations.begin(); m; ++m) { - // debugMutation("SCUpdateMutation", changes[c].version, *m); - //} - splitMutation(data, data->cachedRangeMap, m, ver); } @@ -768,7 +753,7 @@ private: //that this cache server is responsible for // TODO Revisit during failure handling. Might we loose some private mutations? void applyPrivateCacheData( StorageCacheData* data, MutationRef const& m ) { - TraceEvent(SevDebug, "SCPrivateCacheMutation", data->thisServerID).detail("Mutation", m.toString()); + TraceEvent(SevDebug, "SCPrivateCacheMutation", data->thisServerID).detail("Mutation", m); if (processedCacheStartKey) { // we expect changes in pairs, [begin,end). This mutation is for end key of the range @@ -914,7 +899,7 @@ ACTOR Future pullAsyncData( StorageCacheData *data ) { } if(ver != invalidVersion && ver > data->version.get()) { - debugKeyRange("SCUpdate", ver, allKeys); + DEBUG_KEY_RANGE("SCUpdate", ver, allKeys); data->mutableData().createNewVersion(ver); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 5f22090664..72cbcfad12 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -31,6 +31,7 @@ #include "fdbserver/TLogInterface.h" #include "fdbserver/Knobs.h" #include "fdbserver/IKeyValueStore.h" +#include "fdbserver/MutationTracking.h" #include "flow/ActorCollection.h" #include "fdbrpc/FailureMonitor.h" #include "fdbserver/IDiskQueue.h" @@ -1260,6 +1261,7 @@ void commitMessages( TLogData* self, Reference logData, Version version block.reserve(block.arena(), std::max(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize)); } + DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage()).detail("UID", self->dbgid).detail("LogId", logData->logId); block.append(block.arena(), msg.message.begin(), msg.message.size()); for(auto tag : msg.tags) { if(logData->locality == tagLocalitySatellite) { @@ -1374,7 +1376,12 @@ void peekMessagesFromMemory( Reference self, TLogPeekRequest const& req messages << VERSION_HEADER << currentVersion; } + // We need the 4 byte length prefix to be a TagsAndMessage format, but that prefix is added as part of StringRef serialization. + int offset = messages.getLength(); messages << it->second.toStringRef(); + void* data = messages.getData(); + DEBUG_TAGS_AND_MESSAGE("TLogPeek", currentVersion, StringRef((uint8_t*)data+offset, messages.getLength()-offset)) + .detail("LogId", self->logId).detail("PeekTag", req.tag); } } @@ -1636,6 +1643,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags)); for (const StringRef& msg : rawMessages) { messages.serializeBytes(msg); + DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg).detail("UID", self->dbgid).detail("LogId", logData->logId).detail("PeekTag", req.tag); } lastRefMessageVersion = entry.version; diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 905e9b1fd7..dbf7f4bd74 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -619,26 +619,6 @@ struct EventLogRequest { } }; -struct DebugEntryRef { - double time; - NetworkAddress address; - StringRef context; - Version version; - MutationRef mutation; - DebugEntryRef() {} - DebugEntryRef( const char* c, Version v, MutationRef const& m ) : context((const uint8_t*)c,strlen(c)), version(v), mutation(m), time(now()), address( g_network->getLocalAddress() ) {} - DebugEntryRef( Arena& a, DebugEntryRef const& d ) : time(d.time), address(d.address), context(d.context), version(d.version), mutation(a, d.mutation) {} - - size_t expectedSize() const { - return context.expectedSize() + mutation.expectedSize(); - } - - template - void serialize(Ar& ar) { - serializer(ar, time, address, context, version, mutation); - } -}; - struct DiskStoreRequest { constexpr static FileIdentifier file_identifier = 1986262; bool includePartialStores; diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 3c1ec52643..b43ddf11b3 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -196,63 +196,6 @@ bool enableFailures = true; #define test_assert(x) if (!(x)) { cout << "Test failed: " #x << endl; return false; } -vector< Standalone> > debugEntries; -int64_t totalDebugEntriesSize = 0; - -#if CENABLED(0, NOT_IN_CLEAN) -StringRef debugKey = LiteralStringRef(""); -StringRef debugKey2 = LiteralStringRef("\xff\xff\xff\xff"); - -bool debugMutation( const char* context, Version version, MutationRef const& mutation ) { - if ((mutation.type == mutation.SetValue || mutation.type == mutation.AddValue || mutation.type==mutation.DebugKey) && (mutation.param1 == debugKey || mutation.param1 == debugKey2)) - ;//TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "SetValue").detail("Key", mutation.param1).detail("Value", mutation.param2); - else if ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) && ((mutation.param1<=debugKey && mutation.param2>debugKey) || (mutation.param1<=debugKey2 && mutation.param2>debugKey2))) - ;//TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "ClearRange").detail("KeyBegin", mutation.param1).detail("KeyEnd", mutation.param2); - else - return false; - const char* type = - mutation.type == MutationRef::SetValue ? "SetValue" : - mutation.type == MutationRef::ClearRange ? "ClearRange" : - mutation.type == MutationRef::AddValue ? "AddValue" : - mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" : - mutation.type == MutationRef::DebugKey ? "DebugKey" : - "UnknownMutation"; - printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%lld\t%s\t%s\t%s\n", now(), g_network->getLocalAddress().toString().c_str(), context, version, type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str()); - - return true; -} - -bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) { - if (keys.contains(debugKey) || keys.contains(debugKey2)) { - debugMutation(context, version, MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end) ); - //TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("KeyBegin", keys.begin).detail("KeyEnd", keys.end); - return true; - } else - return false; -} - -#elif CENABLED(0, NOT_IN_CLEAN) -bool debugMutation( const char* context, Version version, MutationRef const& mutation ) { - if (!debugEntries.size() || debugEntries.back().size() >= 1000) { - if (debugEntries.size()) totalDebugEntriesSize += debugEntries.back().arena().getSize() + sizeof(debugEntries.back()); - debugEntries.push_back(Standalone>()); - TraceEvent("DebugMutationBuffer").detail("Bytes", totalDebugEntriesSize); - } - auto& v = debugEntries.back(); - v.push_back_deep( v.arena(), DebugEntryRef(context, version, mutation) ); - - return false; // No auxiliary logging -} - -bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) { - return debugMutation( context, version, MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end) ); -} - -#else // Default implementation. -bool debugMutation( const char* context, Version version, MutationRef const& mutation ) { return false; } -bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) { return false; } -#endif - #ifdef _WIN32 #include @@ -1978,20 +1921,6 @@ int main(int argc, char* argv[]) { cout << " " << i->second << " " << i->first << endl;*/ // cout << " " << Actor::allActors[i]->getName() << endl; - int total = 0; - for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i) - total += i->second; - if (total) - printf("%d errors:\n", total); - for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i) - if (i->second > 0) - printf(" %d: %d %s\n", i->second, i->first, Error::fromCode(i->first).what()); - - if (&g_simulator == g_network) { - auto processes = g_simulator.getAllProcesses(); - for(auto i = processes.begin(); i != processes.end(); ++i) - printf("%s %s: %0.3f Mclocks\n", (*i)->name, (*i)->address.toString().c_str(), (*i)->cpuTicks / 1e6); - } if (role == Simulation) { unsigned long sevErrorEventsLogged = TraceEvent::CountEventsLoggedAt(SevError); if (sevErrorEventsLogged > 0) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index d9a8eac716..3c335d753e 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -42,6 +42,7 @@ #include "fdbserver/LogProtocolMessage.h" #include "fdbserver/LogSystem.h" #include "fdbserver/MoveKeys.actor.h" +#include "fdbserver/MutationTracking.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/StorageMetrics.h" #include "fdbserver/ServerDBInfo.h" @@ -951,8 +952,8 @@ ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { v = vv; } - debugMutation("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef(""))); - debugMutation("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2"))); + DEBUG_MUTATION("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef(""))); + DEBUG_MUTATION("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2"))); /* StorageMetrics m; @@ -1026,7 +1027,7 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) throw reply.error.get(); } - debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("") ) ); + DEBUG_MUTATION("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("") ) ); if( req.debugID.present() ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); @@ -2082,7 +2083,7 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { wait( data->coreStarted.getFuture() && delay( 0 ) ); try { - debugKeyRange("fetchKeysBegin", data->version.get(), shard->keys); + DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys); TraceEvent(SevDebug, interval.begin(), data->thisServerID) .detail("KeyBegin", shard->keys.begin) @@ -2150,8 +2151,8 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { .detail("KeyBegin", keys.begin).detail("KeyEnd", keys.end) .detail("Last", this_block.size() ? this_block.end()[-1].key : std::string()) .detail("Version", fetchVersion).detail("More", this_block.more); - debugKeyRange("fetchRange", fetchVersion, keys); - for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value)); + DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys); + for(auto k = this_block.begin(); k != this_block.end(); ++k) DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value)); data->counters.bytesFetched += expectedSize; if( fetchBlockBytes > expectedSize ) { @@ -2298,7 +2299,7 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { ASSERT( b->version >= checkv ); checkv = b->version; for(auto& m : b->mutations) - debugMutation("fetchKeysFinalCommitInject", batch->changes[0].version, m); + DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m); } shard->updates.clear(); @@ -2407,7 +2408,8 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss // .detail("Context", changeServerKeysContextName[(int)context]); validate(data); - debugKeyRange( nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys ); + // TODO(alexmiller): Figure out how to selectively enable spammy data distribution events. + //DEBUG_KEY_RANGE( nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys ); bool isDifferent = false; auto existingShards = data->shards.intersectingRanges(keys); @@ -2512,7 +2514,7 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss void rollback( StorageServer* data, Version rollbackVersion, Version nextVersion ) { TEST(true); // call to shard rollback - debugKeyRange("Rollback", rollbackVersion, allKeys); + DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys); // We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable, // to simply restart the storage server actor and restore from the persistent disk state, and then roll @@ -2533,18 +2535,7 @@ void StorageServer::addMutation(Version version, MutationRef const& mutation, Ke return; } expanded = addMutationToMutationLog(mLog, expanded); - if (debugMutation("expandedMutation", version, expanded)) { - const char* type = - mutation.type == MutationRef::SetValue ? "SetValue" : - mutation.type == MutationRef::ClearRange ? "ClearRange" : - mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" : - mutation.type == MutationRef::DebugKey ? "DebugKey" : - "UnknownMutation"; - printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%" PRId64 "\t%s\t%s\t%s\n", now(), g_network->getLocalAddress().toString().c_str(), "originalMutation", version, type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str()); - printf(" shard: %s - %s\n", printable(shard.begin).c_str(), printable(shard.end).c_str()); - if (mutation.type == MutationRef::ClearRange && mutation.param2 != shard.end) - printf(" eager: %s\n", printable( eagerReads->getKeyEnd( mutation.param2 ) ).c_str() ); - } + DEBUG_MUTATION("applyMutation", version, expanded).detail("UID", thisServerID).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); applyMutation( this, expanded, mLog.arena(), mutableData() ); //printf("\nSSUpdate: Printing versioned tree after applying mutation\n"); //mutableData().printTree(version); @@ -2597,9 +2588,9 @@ public: applyPrivateData( data, m ); } } else { - // FIXME: enable when debugMutation is active + // FIXME: enable when DEBUG_MUTATION is active //for(auto m = changes[c].mutations.begin(); m; ++m) { - // debugMutation("SSUpdateMutation", changes[c].version, *m); + // DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m); //} splitMutation(data, data->shards, m, ver); @@ -2885,7 +2876,8 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) rd >> msg; if (ver != invalidVersion) { // This change belongs to a version < minVersion - if (debugMutation("SSPeek", ver, msg) || ver == 1) { + DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID); + if (ver == 1) { TraceEvent("SSPeekMutation", data->thisServerID); // The following trace event may produce a value with special characters //TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString()); @@ -2938,7 +2930,8 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) } if(ver != invalidVersion && ver > data->version.get()) { - debugKeyRange("SSUpdate", ver, allKeys); + // TODO(alexmiller): Update to version tracking. + DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef()); data->mutableData().createNewVersion(ver); if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get(); @@ -3141,7 +3134,7 @@ void StorageServerDisk::writeKeyValue( KeyValueRef kv ) { } void StorageServerDisk::writeMutation( MutationRef mutation ) { - // FIXME: debugMutation(debugContext, debugVersion, *m); + // FIXME: DEBUG_MUTATION(debugContext, debugVersion, *m); if (mutation.type == MutationRef::SetValue) { storage->set( KeyValueRef(mutation.param1, mutation.param2) ); } else if (mutation.type == MutationRef::ClearRange) { @@ -3152,7 +3145,7 @@ void StorageServerDisk::writeMutation( MutationRef mutation ) { void StorageServerDisk::writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext ) { for(auto m = mutations.begin(); m; ++m) { - debugMutation(debugContext, debugVersion, *m); + DEBUG_MUTATION(debugContext, debugVersion, *m).detail("UID", data->thisServerID); if (m->type == MutationRef::SetValue) { storage->set( KeyValueRef(m->param1, m->param2) ); } else if (m->type == MutationRef::ClearRange) { @@ -3169,7 +3162,8 @@ bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion if (u != data->getMutationLog().end() && u->first <= newStorageVersion) { VersionUpdateRef const& v = u->second; ASSERT( v.version > prevStorageVersion && v.version <= newStorageVersion ); - debugKeyRange("makeVersionMutationsDurable", v.version, allKeys); + // TODO(alexmiller): Update to version tracking. + DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef()); writeMutations(v.mutations, v.version, "makeVersionDurable"); for(auto m=v.mutations.begin(); m; ++m) bytesLeft -= mvccStorageBytes(*m); @@ -3355,7 +3349,8 @@ ACTOR Future restoreDurableState( StorageServer* data, IKeyValueStore* sto for(auto it = data->newestAvailableVersion.ranges().begin(); it != data->newestAvailableVersion.ranges().end(); ++it) { if (it->value() == invalidVersion) { KeyRangeRef clearRange(it->begin(), it->end()); - debugKeyRange("clearInvalidVersion", invalidVersion, clearRange); + // TODO(alexmiller): Figure out how to selectively enable spammy data distribution events. + //DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange); storage->clear( clearRange ); data->byteSampleApplyClear( clearRange, invalidVersion ); } diff --git a/fdbserver/workloads/ApiCorrectness.actor.cpp b/fdbserver/workloads/ApiCorrectness.actor.cpp index c122c7c550..afd04d6f97 100644 --- a/fdbserver/workloads/ApiCorrectness.actor.cpp +++ b/fdbserver/workloads/ApiCorrectness.actor.cpp @@ -20,6 +20,7 @@ #include "fdbserver/QuietDatabase.h" +#include "fdbserver/MutationTracking.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/workloads/ApiWorkload.h" #include "fdbserver/workloads/MemoryKeyValueStore.h" @@ -328,7 +329,7 @@ public: wait(transaction->commit()); for(int i = currentIndex; i < std::min(currentIndex + self->maxKeysPerTransaction, data.size()); i++) - debugMutation("ApiCorrectnessSet", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, data[i].key, data[i].value)); + DEBUG_MUTATION("ApiCorrectnessSet", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, data[i].key, data[i].value)); currentIndex += self->maxKeysPerTransaction; break; @@ -660,7 +661,7 @@ public: wait(transaction->commit()); for(int i = currentIndex; i < std::min(currentIndex + self->maxKeysPerTransaction, keys.size()); i++) - debugMutation("ApiCorrectnessClear", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, keys[i], StringRef())); + DEBUG_MUTATION("ApiCorrectnessClear", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, keys[i], StringRef())); currentIndex += self->maxKeysPerTransaction; break; @@ -711,7 +712,7 @@ public: } transaction->clear(range); wait(transaction->commit()); - debugKeyRange("ApiCorrectnessClear", transaction->getCommittedVersion(), range); + DEBUG_KEY_RANGE("ApiCorrectnessClear", transaction->getCommittedVersion(), range); break; } catch(Error &e) { diff --git a/fdbservice/FDBService.cpp b/fdbservice/FDBService.cpp index 8efee25593..22e4ddc5ab 100644 --- a/fdbservice/FDBService.cpp +++ b/fdbservice/FDBService.cpp @@ -28,8 +28,13 @@ #include #include +<<<<<<< HEAD #include "..\flow\SimpleOpt.h" #include "..\fdbmonitor\SimpleIni.h" +======= +#include "flow/SimpleOpt.h" +#include "fdbmonitor/SimpleIni.h" +>>>>>>> master #include "fdbclient/versions.h" // For PathFileExists diff --git a/flow/Deque.h b/flow/Deque.h index c5c05fb895..6148e00d0c 100644 --- a/flow/Deque.h +++ b/flow/Deque.h @@ -41,21 +41,25 @@ public: Deque() : arr(0), begin(0), end(0), mask(-1) {} // TODO: iterator construction, other constructors - Deque(Deque const& r) : arr(0), begin(0), end(r.size()), mask(r.mask) { + Deque(Deque const& r) : arr(nullptr), begin(0), end(r.size()), mask(r.mask) { if (r.capacity() > 0) { arr = (T*)aligned_alloc(std::max(__alignof(T), sizeof(void*)), capacity() * sizeof(T)); ASSERT(arr != nullptr); } ASSERT(capacity() >= end || end == 0); - for (uint32_t i=0; i= r.begin) { + std::copy(r.arr + r.begin, r.arr + r.begin + r.size(), arr); + } else { + auto partOneSize = r.capacity() - r.begin; + std::copy(r.arr + r.begin, r.arr + r.begin + partOneSize, arr); + std::copy(r.arr, r.arr + r.end, arr + partOneSize); + } } void operator=(Deque const& r) { cleanup(); - arr = 0; + arr = nullptr; begin = 0; end = r.size(); mask = r.mask; @@ -64,13 +68,17 @@ public: ASSERT(arr != nullptr); } ASSERT(capacity() >= end || end == 0); - for (uint32_t i=0; i= r.begin) { + std::copy(r.arr + r.begin, r.arr + r.begin + r.size(), arr); + } else { + auto partOneSize = r.capacity() - r.begin; + std::copy(r.arr + r.begin, r.arr + r.begin + partOneSize, arr); + std::copy(r.arr, r.arr + r.end, arr + partOneSize); + } } Deque(Deque&& r) BOOST_NOEXCEPT : begin(r.begin), end(r.end), mask(r.mask), arr(r.arr) { - r.arr = 0; + r.arr = nullptr; r.begin = r.end = 0; r.mask = -1; } @@ -82,8 +90,8 @@ public: end = r.end; mask = r.mask; arr = r.arr; - - r.arr = 0; + + r.arr = nullptr; r.begin = r.end = 0; r.mask = -1; } diff --git a/flow/Error.cpp b/flow/Error.cpp index 3edb81adf9..cf2fa34b63 100644 --- a/flow/Error.cpp +++ b/flow/Error.cpp @@ -28,11 +28,6 @@ using std::make_pair; bool g_crashOnError = false; -std::map& Error::errorCounts() { - static std::map counts; - return counts; -} - #include Error Error::fromUnvalidatedCode(int code) { @@ -70,8 +65,6 @@ Error::Error(int error_code) crashAndDie(); } } - /*if (error_code) - errorCounts()[error_code]++;*/ } ErrorCodeTable& Error::errorCodeTable() { diff --git a/flow/Error.h b/flow/Error.h index 0afe4d0d99..98cb35c576 100644 --- a/flow/Error.h +++ b/flow/Error.h @@ -58,9 +58,12 @@ public: explicit Error(int error_code); static void init(); - static std::map& errorCounts(); static ErrorCodeTable& errorCodeTable(); - static Error fromCode(int error_code) { Error e; e.error_code = error_code; return e; } // Doesn't change errorCounts + static Error fromCode(int error_code) { + Error e; + e.error_code = error_code; + return e; + } static Error fromUnvalidatedCode(int error_code); // Converts codes that are outside the legal range (but not necessarily individually unknown error codes) to unknown_error() Error asInjectedFault() const; // Returns an error with the same code() as this but isInjectedFault() is true diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index b63c91561f..a858105e61 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -131,7 +131,7 @@ public: // introduced features // // xyzdev // vvvv -constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B063010001LL); +constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B070010001LL); // This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to // change when we reach version 10. static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version"); diff --git a/flow/Trace.cpp b/flow/Trace.cpp index adf1921fe3..2f15483b30 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -783,6 +783,7 @@ TraceEvent::TraceEvent(TraceEvent &&ev) { tmpEventMetric = ev.tmpEventMetric; trackingKey = ev.trackingKey; type = ev.type; + timeIndex = ev.timeIndex; ev.initialized = true; ev.enabled = false; @@ -803,6 +804,7 @@ TraceEvent& TraceEvent::operator=(TraceEvent &&ev) { tmpEventMetric = ev.tmpEventMetric; trackingKey = ev.trackingKey; type = ev.type; + timeIndex = ev.timeIndex; ev.initialized = true; ev.enabled = false; diff --git a/flow/Trace.h b/flow/Trace.h index 3aeb5a9a8d..ae32302a74 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -479,6 +479,10 @@ public: return enabled; } + explicit operator bool() const { + return enabled; + } + void log(); ~TraceEvent(); // Actually logs the event diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f1397e0b60..c4e8697fb7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -79,7 +79,7 @@ if(WITH_PYTHON) if (NOT USE_UBSAN) # TODO re-enable in UBSAN after https://github.com/apple/foundationdb/issues/2410 is resolved add_fdb_test(TEST_FILES SimpleExternalTest.txt) else() - message(WARNING "Python not found, won't configure ctest") + add_fdb_test(TEST_FILES SimpleExternalTest.txt IGNORE) endif() add_fdb_test(TEST_FILES SlowTask.txt IGNORE) add_fdb_test(TEST_FILES SpecificUnitTest.txt IGNORE)