Merge branch 'master' into mengxu/release-6.3-conflict-PR

Has conflict with master;
Next commit will fix the conflicts.
This commit is contained in:
Meng Xu 2020-05-25 12:01:49 -07:00
commit 1c35ad884f
37 changed files with 675 additions and 415 deletions

View File

@ -18,7 +18,7 @@
# limitations under the License.
cmake_minimum_required(VERSION 3.13)
project(foundationdb
VERSION 6.3.0
VERSION 7.0.0
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
HOMEPAGE_URL "http://www.foundationdb.org/"
LANGUAGES C CXX ASM)

View File

@ -39,7 +39,9 @@ from json import JSONEncoder
import logging
import struct
from bisect import bisect_left
from bisect import bisect_right
import time
import datetime
PROTOCOL_VERSION_5_2 = 0x0FDB00A552000001
PROTOCOL_VERSION_6_0 = 0x0FDB00A570010001
@ -414,7 +416,7 @@ class TransactionInfoLoader(object):
else:
end_key = self.client_latency_end_key_selector
valid_transaction_infos = 0
transaction_infos = 0
invalid_transaction_infos = 0
def build_client_transaction_info(v):
@ -446,11 +448,12 @@ class TransactionInfoLoader(object):
info = build_client_transaction_info(v)
if info.has_types():
buffer.append(info)
valid_transaction_infos += 1
except UnsupportedProtocolVersionError as e:
invalid_transaction_infos += 1
except ValueError:
invalid_transaction_infos += 1
transaction_infos += 1
else:
if chunk_num == 1:
# first chunk
@ -476,14 +479,15 @@ class TransactionInfoLoader(object):
info = build_client_transaction_info(b''.join([chunk.value for chunk in c_list]))
if info.has_types():
buffer.append(info)
valid_transaction_infos += 1
except UnsupportedProtocolVersionError as e:
invalid_transaction_infos += 1
except ValueError:
invalid_transaction_infos += 1
transaction_infos += 1
self._check_and_adjust_chunk_cache_size()
if (valid_transaction_infos + invalid_transaction_infos) % 1000 == 0:
print("Processed valid: %d, invalid: %d" % (valid_transaction_infos, invalid_transaction_infos))
if transaction_infos % 1000 == 0:
print("Processed %d transactions, %d invalid" % (transaction_infos, invalid_transaction_infos))
if found == 0:
more = False
except fdb.FDBError as e:
@ -495,13 +499,15 @@ class TransactionInfoLoader(object):
for item in buffer:
yield item
print("Processed %d transactions, %d invalid\n" % (transaction_infos, invalid_transaction_infos))
def has_sortedcontainers():
try:
import sortedcontainers
return True
except ImportError:
logger.warn("Can't find sortedcontainers so disabling RangeCounter")
logger.warn("Can't find sortedcontainers so disabling ReadCounter")
return False
@ -513,155 +519,197 @@ def has_dateparser():
logger.warn("Can't find dateparser so disabling human date parsing")
return False
class RangeCounter(object):
def __init__(self, k):
self.k = k
class ReadCounter(object):
def __init__(self):
from sortedcontainers import SortedDict
self.ranges = SortedDict()
self.reads = SortedDict()
self.reads[b''] = [0, 0]
self.read_counts = {}
self.hit_count=0
def process(self, transaction_info):
for get in transaction_info.gets:
self._insert_read(get.key, None)
for get_range in transaction_info.get_ranges:
self._insert_range(get_range.key_range.start_key, get_range.key_range.end_key)
self._insert_read(get_range.key_range.start_key, get_range.key_range.end_key)
def _insert_range(self, start_key, end_key):
keys = self.ranges.keys()
if len(keys) == 0:
self.ranges[start_key] = end_key, 1
return
def _insert_read(self, start_key, end_key):
self.read_counts.setdefault((start_key, end_key), 0)
self.read_counts[(start_key, end_key)] += 1
start_pos = bisect_left(keys, start_key)
end_pos = bisect_left(keys, end_key)
#print("start_pos=%d, end_pos=%d" % (start_pos, end_pos))
self.reads.setdefault(start_key, [0, 0])[0] += 1
if end_key is not None:
self.reads.setdefault(end_key, [0, 0])[1] += 1
else:
self.reads.setdefault(start_key+b'\x00', [0, 0])[1] += 1
possible_intersection_keys = keys[max(0, start_pos - 1):min(len(keys), end_pos+1)]
def get_total_reads(self):
return sum([v for v in self.read_counts.values()])
def matches_filter(addresses, required_addresses):
for addr in required_addresses:
if addr not in addresses:
return False
return True
start_range_left = start_key
def get_top_k_reads(self, num, filter_addresses, shard_finder=None):
count_pairs = sorted([(v, k) for (k, v) in self.read_counts.items()], reverse=True, key=lambda item: item[0])
if not filter_addresses:
count_pairs = count_pairs[0:num]
for key in possible_intersection_keys:
cur_end_key, cur_count = self.ranges[key]
#logger.debug("key=%s, cur_end_key=%s, cur_count=%d, start_range_left=%s" % (key, cur_end_key, cur_count, start_range_left))
if start_range_left < key:
if end_key <= key:
self.ranges[start_range_left] = end_key, 1
return
self.ranges[start_range_left] = key, 1
start_range_left = key
assert start_range_left >= key
if start_range_left >= cur_end_key:
continue
if shard_finder:
results = []
for (count, (start, end)) in count_pairs:
results.append((start, end, count, shard_finder.get_addresses_for_key(start)))
# [key, start_range_left) = cur_count
# if key == start_range_left this will get overwritten below
self.ranges[key] = start_range_left, cur_count
shard_finder.wait_for_shard_addresses(results, 0, 3)
if end_key <= cur_end_key:
# [start_range_left, end_key) = cur_count+1
# [end_key, cur_end_key) = cur_count
self.ranges[start_range_left] = end_key, cur_count + 1
if end_key != cur_end_key:
self.ranges[end_key] = cur_end_key, cur_count
start_range_left = end_key
break
else:
# [start_range_left, cur_end_key) = cur_count+1
self.ranges[start_range_left] = cur_end_key, cur_count+1
start_range_left = cur_end_key
assert start_range_left <= end_key
if filter_addresses:
filter_addresses = set(filter_addresses)
results = [r for r in results if filter_addresses.issubset(set(r[3]))][0:num]
else:
results = [(start, end, count) for (count, (start, end)) in count_pairs[0:num]]
# there may be some range left
if start_range_left < end_key:
self.ranges[start_range_left] = end_key, 1
return results
def get_count_for_key(self, key):
if key in self.ranges:
return self.ranges[key][1]
keys = self.ranges.keys()
index = bisect_left(keys, key)
if index == 0:
return 0
index_key = keys[index-1]
if index_key <= key < self.ranges[index_key][0]:
return self.ranges[index_key][1]
return 0
def get_range_boundaries(self, shard_finder=None):
total = sum([count for _, (_, count) in self.ranges.items()])
range_size = total // self.k
def get_range_boundaries(self, num_buckets, shard_finder=None):
total = sum([start_count for (start_count, end_count) in self.reads.values()])
range_size = total // num_buckets
output_range_counts = []
def add_boundary(start, end, count):
def add_boundary(start, end, started_count, total_count):
if shard_finder:
shard_count = shard_finder.get_shard_count(start, end)
if shard_count == 1:
addresses = shard_finder.get_addresses_for_key(start)
else:
addresses = None
output_range_counts.append((start, end, count, shard_count, addresses))
output_range_counts.append((start, end, started_count, total_count, shard_count, addresses))
else:
output_range_counts.append((start, end, count, None, None))
output_range_counts.append((start, end, started_count, total_count, None, None))
this_range_start_key = None
last_end = None
open_count = 0
opened_this_range = 0
count_this_range = 0
for (start_key, (end_key, count)) in self.ranges.items():
if not this_range_start_key:
this_range_start_key = start_key
count_this_range += count
if count_this_range >= range_size:
add_boundary(this_range_start_key, end_key, count_this_range)
count_this_range = 0
this_range_start_key = None
if count_this_range > 0:
add_boundary(this_range_start_key, end_key, count_this_range)
for (start_key, (start_count, end_count)) in self.reads.items():
open_count -= end_count
if opened_this_range >= range_size:
add_boundary(this_range_start_key, start_key, opened_this_range, count_this_range)
count_this_range = open_count
opened_this_range = 0
this_range_start_key = None
count_this_range += start_count
opened_this_range += start_count
open_count += start_count
if count_this_range > 0 and this_range_start_key is None:
this_range_start_key = start_key
if end_count > 0:
last_end = start_key
if last_end is None:
last_end = b'\xff'
if count_this_range > 0:
add_boundary(this_range_start_key, last_end, opened_this_range, count_this_range)
shard_finder.wait_for_shard_addresses(output_range_counts, 0, 5)
return output_range_counts
class ShardFinder(object):
def __init__(self, db):
def __init__(self, db, exclude_ports):
self.db = db
self.exclude_ports = exclude_ports
self.tr = db.create_transaction()
self.refresh_tr()
self.outstanding = []
self.boundary_keys = list(fdb.locality.get_boundary_keys(db, b'', b'\xff\xff'))
self.shard_cache = {}
def _get_boundary_keys(self, begin, end):
start_pos = max(0, bisect_right(self.boundary_keys, begin)-1)
end_pos = max(0, bisect_right(self.boundary_keys, end)-1)
return self.boundary_keys[start_pos:end_pos]
def refresh_tr(self):
self.tr.options.set_read_lock_aware()
if not self.exclude_ports:
self.tr.options.set_include_port_in_address()
@staticmethod
@fdb.transactional
def _get_boundary_keys(tr, begin, end):
tr.options.set_read_lock_aware()
return fdb.locality.get_boundary_keys(tr, begin, end)
@staticmethod
@fdb.transactional
def _get_addresses_for_key(tr, key):
tr.options.set_read_lock_aware()
return fdb.locality.get_addresses_for_key(tr, key)
def get_shard_count(self, start_key, end_key):
return len(list(self._get_boundary_keys(self.db, start_key, end_key))) + 1
return len(self._get_boundary_keys(start_key, end_key)) + 1
def get_addresses_for_key(self, key):
return [a.decode('ascii') for a in self._get_addresses_for_key(self.db, key).wait()]
shard = self.boundary_keys[max(0, bisect_right(self.boundary_keys, key)-1)]
do_load = False
if not shard in self.shard_cache:
do_load = True
elif self.shard_cache[shard].is_ready():
try:
self.shard_cache[shard].wait()
except fdb.FDBError as e:
self.tr.on_error(e).wait()
self.refresh_tr()
do_load = True
if do_load:
if len(self.outstanding) > 1000:
for f in self.outstanding:
try:
f.wait()
except fdb.FDBError as e:
pass
class TopKeysCounter(object):
self.outstanding = []
self.tr.reset()
self.refresh_tr()
self.outstanding.append(self._get_addresses_for_key(self.tr, shard))
self.shard_cache[shard] = self.outstanding[-1]
return self.shard_cache[shard]
def wait_for_shard_addresses(self, ranges, key_idx, addr_idx):
for index in range(len(ranges)):
item = ranges[index]
if item[addr_idx] is not None:
while True:
try:
ranges[index] = item[0:addr_idx] + ([a.decode('ascii') for a in item[addr_idx].wait()],) + item[addr_idx+1:]
break
except fdb.FDBError as e:
ranges[index] = item[0:addr_idx] + (self.get_addresses_for_key(item[key_idx]),) + item[addr_idx+1:]
class WriteCounter(object):
mutation_types_to_consider = frozenset([MutationType.SET_VALUE, MutationType.ADD_VALUE])
def __init__(self, k):
self.k = k
self.reads = defaultdict(lambda: 0)
def __init__(self):
self.writes = defaultdict(lambda: 0)
def process(self, transaction_info):
for get in transaction_info.gets:
self.reads[get.key] += 1
if transaction_info.commit:
for mutation in transaction_info.commit.mutations:
if mutation.code in self.mutation_types_to_consider:
self.writes[mutation.param_one] += 1
def _get_range_boundaries(self, counts, shard_finder=None):
total = sum([v for (k, v) in counts.items()])
range_size = total // self.k
key_counts_sorted = sorted(counts.items())
def get_range_boundaries(self, num_buckets, shard_finder=None):
total = sum([v for (k, v) in self.writes.items()])
range_size = total // num_buckets
key_counts_sorted = sorted(self.writes.items())
output_range_counts = []
def add_boundary(start, end, count):
@ -671,9 +719,9 @@ class TopKeysCounter(object):
addresses = shard_finder.get_addresses_for_key(start)
else:
addresses = None
output_range_counts.append((start, end, count, shard_count, addresses))
output_range_counts.append((start, end, count, None, shard_count, addresses))
else:
output_range_counts.append((start, end, count, None, None))
output_range_counts.append((start, end, count, None, None, None))
start_key = None
count_this_range = 0
@ -688,24 +736,31 @@ class TopKeysCounter(object):
if count_this_range > 0:
add_boundary(start_key, k, count_this_range)
shard_finder.wait_for_shard_addresses(output_range_counts, 0, 5)
return output_range_counts
def _get_top_k(self, counts):
count_key_pairs = sorted([(v, k) for (k, v) in counts.items()], reverse=True)
return count_key_pairs[0:self.k]
def get_total_writes(self):
return sum([v for v in self.writes.values()])
def get_top_k_reads(self):
return self._get_top_k(self.reads)
def get_top_k_writes(self, num, filter_addresses, shard_finder=None):
count_pairs = sorted([(v, k) for (k, v) in self.writes.items()], reverse=True)
if not filter_addresses:
count_pairs = count_pairs[0:num]
def get_top_k_writes(self):
return self._get_top_k(self.writes)
if shard_finder:
results = []
for (count, key) in count_pairs:
results.append((key, None, count, shard_finder.get_addresses_for_key(key)))
def get_k_read_range_boundaries(self, shard_finder=None):
return self._get_range_boundaries(self.reads, shard_finder)
shard_finder.wait_for_shard_addresses(results, 0, 3)
def get_k_write_range_boundaries(self, shard_finder=None):
return self._get_range_boundaries(self.writes, shard_finder)
if filter_addresses:
filter_addresses = set(filter_addresses)
results = [r for r in results if filter_addresses.issubset(set(r[3]))][0:num]
else:
results = [(key, end, count) for (count, key) in count_pairs[0:num]]
return results
def connect(cluster_file=None):
db = fdb.open(cluster_file=cluster_file)
@ -722,6 +777,8 @@ def main():
help="Include get type. If no filter args are given all will be returned.")
parser.add_argument("--filter-get-range", action="store_true",
help="Include get_range type. If no filter args are given all will be returned.")
parser.add_argument("--filter-reads", action="store_true",
help="Include get and get_range type. If no filter args are given all will be returned.")
parser.add_argument("--filter-commit", action="store_true",
help="Include commit type. If no filter args are given all will be returned.")
parser.add_argument("--filter-error-get", action="store_true",
@ -737,21 +794,34 @@ def main():
end_time_group = parser.add_mutually_exclusive_group()
end_time_group.add_argument("--max-timestamp", type=int, help="Don't return events newer than this epoch time")
end_time_group.add_argument("-e", "--end-time", type=str, help="Don't return events older than this parsed time")
parser.add_argument("--top-keys", type=int, help="If specified will output this many top keys for reads or writes", default=0)
parser.add_argument("--num-buckets", type=int, help="The number of buckets to partition the key-space into for operation counts", default=100)
parser.add_argument("--top-requests", type=int, help="If specified will output this many top keys for reads or writes", default=0)
parser.add_argument("--exclude-ports", action="store_true", help="Print addresses without the port number. Only works in versions older than 6.3, and is required in versions older than 6.2.")
parser.add_argument("--single-shard-ranges-only", action="store_true", help="Only print range boundaries that exist in a single shard")
parser.add_argument("-a", "--filter-address", action="append", help="Only print range boundaries that include the given address. This option can used multiple times to include more than one address in the filter, in which case all addresses must match.")
args = parser.parse_args()
type_filter = set()
if args.filter_get_version: type_filter.add("get_version")
if args.filter_get: type_filter.add("get")
if args.filter_get_range: type_filter.add("get_range")
if args.filter_get or args.filter_reads: type_filter.add("get")
if args.filter_get_range or args.filter_reads: type_filter.add("get_range")
if args.filter_commit: type_filter.add("commit")
if args.filter_error_get: type_filter.add("error_get")
if args.filter_error_get_range: type_filter.add("error_get_range")
if args.filter_error_commit: type_filter.add("error_commit")
top_keys = args.top_keys
key_counter = TopKeysCounter(top_keys) if top_keys else None
range_counter = RangeCounter(top_keys) if (has_sortedcontainers() and top_keys) else None
full_output = args.full_output or (top_keys is not None)
if (not type_filter or "commit" in type_filter):
write_counter = WriteCounter() if args.num_buckets else None
else:
write_counter = None
if (not type_filter or "get" in type_filter or "get_range" in type_filter):
read_counter = ReadCounter() if (has_sortedcontainers() and args.num_buckets) else None
else:
read_counter = None
full_output = args.full_output or (args.num_buckets is not None)
if args.min_timestamp:
min_timestamp = args.min_timestamp
@ -784,48 +854,128 @@ def main():
db = connect(cluster_file=args.cluster_file)
loader = TransactionInfoLoader(db, full_output=full_output, type_filter=type_filter,
min_timestamp=min_timestamp, max_timestamp=max_timestamp)
for info in loader.fetch_transaction_info():
if info.has_types():
if not key_counter and not range_counter:
if not write_counter and not read_counter:
print(info.to_json())
else:
if key_counter:
key_counter.process(info)
if range_counter:
range_counter.process(info)
if write_counter:
write_counter.process(info)
if read_counter:
read_counter.process(info)
if key_counter:
def print_top(top):
for (count, key) in top:
print("%s %d" % (key, count))
def print_range_boundaries(range_boundaries):
for (start, end, count, shard_count, addresses) in range_boundaries:
if not shard_count:
print("[%s, %s] %d" % (start, end, count))
def print_top(top, total, context):
if top:
running_count = 0
for (idx, (start, end, count, addresses)) in enumerate(top):
running_count += count
if end is not None:
op_str = 'Range %r - %r' % (start, end)
else:
addresses_string = "addresses=%s" % ','.join(addresses) if addresses else ''
print("[%s, %s] %d shards=%d %s" % (start, end, count, shard_count, addresses_string))
op_str = 'Key %r' % start
print(" %d. %s\n %d sampled %s (%.2f%%, %.2f%% cumulative)" % (idx+1, op_str, count, context, 100*count/total, 100*running_count/total))
print(" shard addresses: %s\n" % ", ".join(addresses))
else:
print(" No %s found" % context)
def print_range_boundaries(range_boundaries, context):
omit_start = None
for (idx, (start, end, start_count, total_count, shard_count, addresses)) in enumerate(range_boundaries):
omit = args.single_shard_ranges_only and shard_count is not None and shard_count > 1
if args.filter_address:
if not addresses:
omit = True
else:
for addr in args.filter_address:
if addr not in addresses:
omit = True
break
if not omit:
if omit_start is not None:
if omit_start == idx-1:
print(" %d. Omitted\n" % (idx))
else:
print(" %d - %d. Omitted\n" % (omit_start+1, idx))
omit_start = None
if total_count is None:
count_str = '%d sampled %s' % (start_count, context)
else:
count_str = '%d sampled %s (%d intersecting)' % (start_count, context, total_count)
if not shard_count:
print(" %d. [%s, %s]\n %d sampled %s\n" % (idx+1, start, end, count, context))
else:
addresses_string = "; addresses=%s" % ', '.join(addresses) if addresses else ''
print(" %d. [%s, %s]\n %s spanning %d shard(s)%s\n" % (idx+1, start, end, count_str, shard_count, addresses_string))
elif omit_start is None:
omit_start = idx
if omit_start is not None:
if omit_start == len(range_boundaries)-1:
print(" %d. Omitted\n" % len(range_boundaries))
else:
print(" %d - %d. Omitted\n" % (omit_start+1, len(range_boundaries)))
shard_finder = ShardFinder(db, args.exclude_ports)
print("NOTE: shard locations are current and may not reflect where an operation was performed in the past\n")
if write_counter:
if args.top_requests:
top_writes = write_counter.get_top_k_writes(args.top_requests, args.filter_address, shard_finder=shard_finder)
range_boundaries = write_counter.get_range_boundaries(args.num_buckets, shard_finder=shard_finder)
num_writes = write_counter.get_total_writes()
if args.top_requests or range_boundaries:
print("WRITES")
print("------\n")
print("Processed %d total writes\n" % num_writes)
if args.top_requests:
suffix = ""
if args.filter_address:
suffix = " (%s)" % ", ".join(args.filter_address)
print("Top %d writes%s:\n" % (args.top_requests, suffix))
print_top(top_writes, write_counter.get_total_writes(), "writes")
print("")
shard_finder = ShardFinder(db)
top_reads = key_counter.get_top_k_reads()
if top_reads:
print("Top %d reads:" % min(top_keys, len(top_reads)))
print_top(top_reads)
print("Approx equal sized gets range boundaries:")
print_range_boundaries(key_counter.get_k_read_range_boundaries(shard_finder=shard_finder))
top_writes = key_counter.get_top_k_writes()
if top_writes:
print("Top %d writes:" % min(top_keys, len(top_writes)))
print_top(top_writes)
print("Approx equal sized commits range boundaries:")
print_range_boundaries(key_counter.get_k_write_range_boundaries(shard_finder=shard_finder))
if range_counter:
range_boundaries = range_counter.get_range_boundaries(shard_finder=shard_finder)
if range_boundaries:
print("Approx equal sized get_ranges boundaries:")
print_range_boundaries(range_boundaries)
print("Key-space boundaries with approximately equal mutation counts:\n")
print_range_boundaries(range_boundaries, "writes")
if args.top_requests or range_boundaries:
print("")
if read_counter:
if args.top_requests:
top_reads = read_counter.get_top_k_reads(args.top_requests, args.filter_address, shard_finder=shard_finder)
range_boundaries = read_counter.get_range_boundaries(args.num_buckets, shard_finder=shard_finder)
num_reads = read_counter.get_total_reads()
if args.top_requests or range_boundaries:
print("READS")
print("-----\n")
print("Processed %d total reads\n" % num_reads)
if args.top_requests:
suffix = ""
if args.filter_address:
suffix = " (%s)" % ", ".join(args.filter_address)
print("Top %d reads%s:\n" % (args.top_requests, suffix))
print_top(top_reads, num_reads, "reads")
print("")
if range_boundaries:
print("Key-space boundaries with approximately equal read counts:\n")
print_range_boundaries(range_boundaries, "reads")
if __name__ == "__main__":
main()

View File

@ -88,11 +88,19 @@ We introduce this `module` concept after a [discussion](https://forums.foundatio
- `\xff\xff/transaction/read_conflict_range/, \xff\xff/transaction/read_conflict_range0` : read conflict ranges of the transaction
- `\xff\xff/transaction/write_conflict_range/, \xff\xff/transaction/write_conflict_range0` : write conflict ranges of the transaction
- METRICS: `\xff\xff/metrics/, \xff\xff/metrics0`, all metrics like data-distribution metrics or healthy metrics are planned to put here. All need to call the rpc, so time_out error s may happen. Right now we have:
<<<<<<< HEAD
- `\xff\xff/metrics/data_distribution_stats/, \xff\xff/metrics/data_distribution_stats0` : stats info about data-distribution
=======
- `\xff\xff/metrics/data_distribution_stats, \xff\xff/metrics/data_distribution_stats` : stats info about data-distribution
>>>>>>> master
- WORKERINTERFACE : `\xff\xff/worker_interfaces/, \xff\xff/worker_interfaces0`, which is compatible with previous implementation, thus should not be used to add new functions.
In addition, all singleKeyRanges are formatted as modules and cannot be used again. In particular, you should call `get` not `getRange` on these keys. Below are existing ones:
- STATUSJSON : `\xff\xff/status/json`
- CONNECTIONSTRING : `\xff\xff/connection_string`
<<<<<<< HEAD
- CLUSTERFILEPATH : `\xff\xff/cluster_file_path`
=======
- CLUSTERFILEPATH : `\xff\xff/cluster_file_path`
>>>>>>> master

View File

@ -3465,16 +3465,6 @@ int main(int argc, char* argv[]) {
std::set_new_handler( &platform::outOfMemory );
setMemoryQuota( memLimit );
int total = 0;
for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i)
total += i->second;
if (total)
printf("%d errors:\n", total);
for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i)
if (i->second > 0)
printf(" %d: %d %s\n", i->second, i->first, Error::fromCode(i->first).what());
Reference<ClusterConnectionFile> ccf;
Database db;
Reference<ClusterConnectionFile> sourceCcf;

View File

@ -2523,7 +2523,11 @@ void throttleGenerator(const char* text, const char *line, std::vector<std::stri
const char* opts[] = { "all", "auto", "manual", "tag", "default", "immediate", "batch", nullptr };
arrayGenerator(text, line, opts, lc);
}
<<<<<<< HEAD
else if(tokens.size() == 2 && (tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable"))) {
=======
else if(tokens.size() == 2 && tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) {
>>>>>>> master
const char* opts[] = { "auto", nullptr };
arrayGenerator(text, line, opts, lc);
}

View File

@ -132,6 +132,13 @@ struct MutationRef {
};
};
template<>
struct Traceable<MutationRef> : std::true_type {
static std::string toString(MutationRef const& value) {
return value.toString();
}
};
static inline std::string getTypeString(MutationRef::Type type) {
return type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "Unset";
}
@ -206,7 +213,4 @@ struct CommitTransactionRef {
}
};
bool debugMutation( const char* context, Version version, MutationRef const& m );
bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keyRange );
#endif

View File

@ -108,6 +108,13 @@ struct struct_like_traits<Tag> : std::true_type {
}
};
template<>
struct Traceable<Tag> : std::true_type {
static std::string toString(const Tag& value) {
return value.toString();
}
};
static const Tag invalidTag {tagLocalitySpecial, 0};
static const Tag txsTag {tagLocalitySpecial, 1};
static const Tag cacheTag {tagLocalitySpecial, 2};
@ -222,11 +229,25 @@ std::string describe( std::vector<T> const& items, int max_items = -1 ) {
return describeList(items, max_items);
}
template<typename T>
struct Traceable<std::vector<T>> : std::true_type {
static std::string toString(const std::vector<T>& value) {
return describe(value);
}
};
template <class T>
std::string describe( std::set<T> const& items, int max_items = -1 ) {
return describeList(items, max_items);
}
template<typename T>
struct Traceable<std::set<T>> : std::true_type {
static std::string toString(const std::set<T>& value) {
return describe(value);
}
};
std::string printable( const StringRef& val );
std::string printable( const std::string& val );
std::string printable( const KeyRangeRef& range );

View File

@ -316,7 +316,12 @@ Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(ReadYourWritesT
return result;
}
<<<<<<< HEAD
ACTOR Future<Standalone<RangeResultRef>> ddStatsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
=======
ACTOR Future<Standalone<RangeResultRef>> ddStatsGetRangeActor(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) {
>>>>>>> master
try {
auto keys = kr.removePrefix(ddStatsRange.begin);
Standalone<VectorRef<DDMetricsRef>> resultWithoutPrefix =
@ -341,7 +346,12 @@ ACTOR Future<Standalone<RangeResultRef>> ddStatsGetRangeActor(ReadYourWritesTran
DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
<<<<<<< HEAD
Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
=======
Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const {
>>>>>>> master
return ddStatsGetRangeActor(ryw, kr);
}

View File

@ -154,5 +154,12 @@ public:
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class DDStatsRangeImpl : public SpecialKeyRangeBaseImpl {
public:
explicit DDStatsRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -47,7 +47,11 @@ const KeyRef keyServersKey( const KeyRef& k, Arena& arena ) {
}
const Value keyServersValue( Standalone<RangeResultRef> result, const std::vector<UID>& src, const std::vector<UID>& dest ) {
if(!CLIENT_KNOBS->TAG_ENCODE_KEY_SERVERS) {
<<<<<<< HEAD
BinaryWriter wr(IncludeVersion(ProtocolVersion::withKeyServerValue())); wr << src << dest;
=======
BinaryWriter wr(IncludeVersion()); wr << src << dest;
>>>>>>> master
return wr.toValue();
}

View File

@ -246,20 +246,13 @@ public:
// stream.send( request )
// Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times)
void send(const T& value) const {
template<class U>
void send(U && value) const {
if (queue->isRemoteEndpoint()) {
FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(), true);
FlowTransport::transport().sendUnreliable(SerializeSource<T>(std::forward<U>(value)), getEndpoint(), true);
}
else
queue->send(value);
}
void send(T&& value) const {
if (queue->isRemoteEndpoint()) {
FlowTransport::transport().sendUnreliable(SerializeSource<T>(std::move(value)), getEndpoint(), true);
}
else
queue->send(std::move(value));
queue->send(std::forward<U>(value));
}
/*void sendError(const Error& error) const {

View File

@ -1650,15 +1650,8 @@ public:
this->currentProcess = t.machine;
try {
//auto before = getCPUTicks();
t.action.send(Void());
ASSERT( this->currentProcess == t.machine );
/*auto elapsed = getCPUTicks() - before;
currentProcess->cpuTicks += elapsed;
if (deterministicRandom()->random01() < 0.01){
TraceEvent("TaskDuration").detail("CpuTicks", currentProcess->cpuTicks);
currentProcess->cpuTicks = 0;
}*/
} catch (Error& e) {
TraceEvent(SevError, "UnhandledSimulationEventError").error(e, true);
killProcess(t.machine, KillInstantly);

View File

@ -58,7 +58,6 @@ public:
bool failed;
bool excluded;
bool cleared;
int64_t cpuTicks;
bool rebooting;
std::vector<flowGlobalType> globals;
@ -68,12 +67,11 @@ public:
double fault_injection_p1, fault_injection_p2;
ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddressList addresses,
INetworkConnections *net, const char* dataFolder, const char* coordinationFolder )
: name(name), locality(locality), startingClass(startingClass),
addresses(addresses), address(addresses.address), dataFolder(dataFolder),
network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0),
rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), cleared(false) {}
INetworkConnections* net, const char* dataFolder, const char* coordinationFolder)
: name(name), locality(locality), startingClass(startingClass), addresses(addresses),
address(addresses.address), dataFolder(dataFolder), network(net), coordinationFolder(coordinationFolder),
failed(false), excluded(false), rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), cleared(false) {}
Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }

View File

@ -247,7 +247,10 @@ struct BackupData {
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
specialCounter(cc, "MsgQ", [this]() { return this->messages.size(); });
specialCounter(cc, "BufferedBytes", [this]() { return this->lock->activePermits(); });
<<<<<<< HEAD
specialCounter(cc, "AvailableBytes", [this]() { return this->lock->available(); });
=======
>>>>>>> master
logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc,
"BackupWorkerMetrics");
}
@ -734,13 +737,11 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
MutationRef m;
if (!message.isBackupMessage(&m)) continue;
if (debugMutation("addMutation", message.version.version, m)) {
TraceEvent("BackupWorkerDebug", self->myId)
DEBUG_MUTATION("addMutation", message.version.version, m)
.detail("Version", message.version.toString())
.detail("Mutation", m.toString())
.detail("Mutation", m)
.detail("KCV", self->minKnownCommittedVersion)
.detail("SavedVersion", self->savedVersion);
}
std::vector<Future<Void>> adds;
if (m.type != MutationRef::Type::ClearRange) {
@ -847,7 +848,11 @@ ACTOR Future<Void> uploadData(BackupData* self) {
}
// If transition into NOOP mode, should clear messages
<<<<<<< HEAD
if (!self->pulling && self->backupEpoch == self->recruitedEpoch) {
=======
if (!self->pulling) {
>>>>>>> master
self->eraseMessages(self->messages.size());
}

View File

@ -46,8 +46,10 @@ set(FDBSERVER_SRCS
MasterInterface.h
MasterProxyServer.actor.cpp
masterserver.actor.cpp
MoveKeys.actor.cpp
MutationTracking.h
MutationTracking.cpp
MoveKeys.actor.h
MoveKeys.actor.cpp
networktest.actor.cpp
NetworkTest.h
OldTLogServer_4_6.actor.cpp

View File

@ -27,6 +27,7 @@
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbserver/MutationTracking.h"
#include "flow/IndexedSet.h"
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbrpc/Locality.h"
@ -877,16 +878,27 @@ struct LogPushData : NonCopyable {
msg_locations.clear();
logSystem->getPushLocations(prev_tags, msg_locations, allLocations);
BinaryWriter bw(AssumeVersion(currentProtocolVersion));
uint32_t subseq = this->subsequence++;
bool first = true;
int firstOffset=-1, firstLength=-1;
for(int loc : msg_locations) {
// FIXME: memcpy after the first time
BinaryWriter& wr = messagesWriter[loc];
int offset = wr.getLength();
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
for(auto& tag : prev_tags)
wr << tag;
wr << item;
*(uint32_t*)((uint8_t*)wr.getData() + offset) = wr.getLength() - offset - sizeof(uint32_t);
if (first) {
BinaryWriter& wr = messagesWriter[loc];
firstOffset = wr.getLength();
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
for(auto& tag : prev_tags)
wr << tag;
wr << item;
firstLength = wr.getLength() - firstOffset;
*(uint32_t*)((uint8_t*)wr.getData() + firstOffset) = firstLength - sizeof(uint32_t);
DEBUG_TAGS_AND_MESSAGE("ProxyPushLocations", invalidVersion, StringRef(((uint8_t*)wr.getData() + firstOffset), firstLength)).detail("PushLocations", msg_locations);
first = false;
} else {
BinaryWriter& wr = messagesWriter[loc];
BinaryWriter& from = messagesWriter[msg_locations[0]];
wr.serializeBytes( (uint8_t*)from.getData() + firstOffset, firstLength );
}
}
next_message_tags.clear();
}

View File

@ -21,6 +21,7 @@
#include "fdbserver/LogSystem.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/MutationTracking.h"
#include "fdbrpc/ReplicationUtils.h"
#include "flow/actorcompiler.h" // has to be last include
@ -90,6 +91,7 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
}
messageAndTags.loadFromArena(&rd, &messageVersion.sub);
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage()).detail("CursorID", this->randomID);
// Rewind and consume the header so that reader() starts from the message.
rd.rewind();
rd.readBytes(messageAndTags.getHeaderSize());

View File

@ -38,6 +38,7 @@
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/WaitFailure.h"
@ -759,7 +760,7 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, Mutat
toCommit->addTags(tags);
toCommit->addTypedMessage(backupMutation);
// if (debugMutation("BackupProxyCommit", commitVersion, backupMutation)) {
// if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) {
// TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString())
// .detail("BackupMutationSize", val.size()).detail("Version", commitVersion).detail("DestPath", logRangeMutation.first)
// .detail("PartIndex", part).detail("PartIndexEndian", bigEndian32(part)).detail("PartData", backupMutation.param1);
@ -1079,8 +1080,7 @@ ACTOR Future<Void> commitBatch(
self->singleKeyMutationEvent->log();
}
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", tags).detail("Mutation", m);
toCommit.addTags(tags);
if(self->cacheInfo[m.param1]) {
@ -1095,8 +1095,7 @@ ACTOR Future<Void> commitBatch(
++firstRange;
if (firstRange == ranges.end()) {
// Fast path
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", ranges.begin().value().tags).detail("Mutation", m);
ranges.begin().value().populateTags();
toCommit.addTags(ranges.begin().value().tags);
@ -1108,8 +1107,7 @@ ACTOR Future<Void> commitBatch(
r.value().populateTags();
allSources.insert(r.value().tags.begin(), r.value().tags.end());
}
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion);
DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", allSources).detail("Mutation", m);
toCommit.addTags(allSources);
}

View File

@ -0,0 +1,101 @@
/*
* MutationTracking.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include "fdbserver/MutationTracking.h"
#include "fdbserver/LogProtocolMessage.h"
#if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED
#error "You cannot use mutation tracking in a clean/release build."
#endif
// Track up to 2 keys in simulation via enabling MUTATION_TRACKING_ENABLED and setting the keys here.
StringRef debugKey = LiteralStringRef( "" );
StringRef debugKey2 = LiteralStringRef( "\xff\xff\xff\xff" );
TraceEvent debugMutationEnabled( const char* context, Version version, MutationRef const& mutation ) {
if ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) &&
((mutation.param1<=debugKey && mutation.param2>debugKey) || (mutation.param1<=debugKey2 && mutation.param2>debugKey2))) {
return std::move(TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", typeString[mutation.type]).detail("KeyBegin", mutation.param1).detail("KeyEnd", mutation.param2));
} else if (mutation.param1 == debugKey || mutation.param1 == debugKey2) {
return std::move(TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", typeString[mutation.type]).detail("Key", mutation.param1).detail("Value", mutation.param2));
} else {
return std::move(TraceEvent());
}
}
TraceEvent debugKeyRangeEnabled( const char* context, Version version, KeyRangeRef const& keys ) {
if (keys.contains(debugKey) || keys.contains(debugKey2)) {
return std::move(debugMutation(context, version, MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end) ));
} else {
return std::move(TraceEvent());
}
}
TraceEvent debugTagsAndMessageEnabled( const char* context, Version version, StringRef commitBlob ) {
BinaryReader rdr(commitBlob, AssumeVersion(currentProtocolVersion));
while (!rdr.empty()) {
if (*(int32_t*)rdr.peekBytes(4) == VERSION_HEADER) {
int32_t dummy;
rdr >> dummy >> version;
continue;
}
TagsAndMessage msg;
msg.loadFromArena(&rdr, nullptr);
bool logAdapterMessage = std::any_of(
msg.tags.begin(), msg.tags.end(), [](const Tag& t) { return t == txsTag || t.locality == tagLocalityTxs; });
StringRef mutationData = msg.getMessageWithoutTags();
uint8_t mutationType = *mutationData.begin();
if (logAdapterMessage) {
// Skip the message, as there will always be an idential non-logAdapterMessage mutation
// that we can match against in the same commit.
} else if (LogProtocolMessage::startsLogProtocolMessage(mutationType)) {
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
LogProtocolMessage lpm;
br >> lpm;
rdr.setProtocolVersion(br.protocolVersion());
} else {
MutationRef m;
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
br >> m;
TraceEvent&& event = debugMutation(context, version, m);
if (event.isEnabled()) {
return std::move(event.detail("MessageTags", msg.tags));
}
}
}
return std::move(TraceEvent());
}
#if MUTATION_TRACKING_ENABLED
TraceEvent debugMutation( const char* context, Version version, MutationRef const& mutation ) {
return debugMutationEnabled( context, version, mutation );
}
TraceEvent debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) {
return debugKeyRangeEnabled( context, version, keys );
}
TraceEvent debugTagsAndMessage( const char* context, Version version, StringRef commitBlob ) {
return debugTagsAndMessageEnabled( context, version, commitBlob );
}
#else
TraceEvent debugMutation( const char* context, Version version, MutationRef const& mutation ) { return std::move(TraceEvent()); }
TraceEvent debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) { return std::move(TraceEvent()); }
TraceEvent debugTagsAndMessage( const char* context, Version version, StringRef commitBlob ) { return std::move(TraceEvent()); }
#endif

View File

@ -0,0 +1,49 @@
/*
* MutationTracking.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _FDBSERVER_MUTATIONTRACKING_H_
#define _FDBSERVER_MUTATIONTRACKING_H_
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#define MUTATION_TRACKING_ENABLED 0
// The keys to track are defined in the .cpp file to limit recompilation.
#define DEBUG_MUTATION(context, version, mutation) MUTATION_TRACKING_ENABLED && debugMutation(context, version, mutation)
TraceEvent debugMutation( const char* context, Version version, MutationRef const& mutation );
// debugKeyRange and debugTagsAndMessage only log the *first* occurrence of a key in their range/commit.
// TODO: Create a TraceEventGroup that forwards all calls to each element of a vector<TraceEvent>,
// to allow "multiple" TraceEvents to be returned.
#define DEBUG_KEY_RANGE(context, version, keys) MUTATION_TRACKING_ENABLED && debugKeyRange(context, version, keys)
TraceEvent debugKeyRange( const char* context, Version version, KeyRangeRef const& keys );
#define DEBUG_TAGS_AND_MESSAGE(context, version, commitBlob) MUTATION_TRACKING_ENABLED && debugTagsAndMessage(context, version, commitBlob)
TraceEvent debugTagsAndMessage( const char* context, Version version, StringRef commitBlob );
// TODO: Version Tracking. If the bug is in handling a version rather than a key, then it'd be good to be able to log each time
// that version is handled within simulation. A similar set of functions should be implemented.
#endif

View File

@ -36,6 +36,7 @@
#include "fdbrpc/Locality.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
@ -60,19 +61,17 @@ struct StagingKey {
// Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set
void add(const MutationRef& m, LogMessageVersion newVersion) {
ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue);
if (debugMutation("StagingKeyAdd", newVersion.version, m)) {
TraceEvent("StagingKeyAdd")
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("Mutation", m.toString());
}
DEBUG_MUTATION("StagingKeyAdd", newVersion.version, m)
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("Mutation", m);
if (version == newVersion) {
// This could happen because the same mutation can be present in
// overlapping mutation logs, because new TLogs can copy mutations
// from old generation TLogs (or backup worker is recruited without
// knowning previously saved progress).
ASSERT(type == m.type && key == m.param1 && val == m.param2);
TraceEvent("SameVersion").detail("Version", version.toString()).detail("Mutation", m.toString());
TraceEvent("SameVersion").detail("Version", version.toString()).detail("Mutation", m);
return;
}
@ -84,15 +83,13 @@ struct StagingKey {
ASSERT(m.param1 == m.param2);
}
if (version < newVersion) {
if (debugMutation("StagingKeyAdd", newVersion.version, m)) {
TraceEvent("StagingKeyAdd")
DEBUG_MUTATION("StagingKeyAdd", newVersion.version, m)
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("MType", getTypeString(type))
.detail("Key", key)
.detail("Val", val)
.detail("NewMutation", m.toString());
}
key = m.param1;
val = m.param2;
type = (MutationRef::Type)m.type;
@ -108,8 +105,8 @@ struct StagingKey {
TraceEvent("SameVersion")
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("OldMutation", it->second.toString())
.detail("NewMutation", m.toString());
.detail("OldMutation", it->second)
.detail("NewMutation", m);
ASSERT(it->second.type == m.type && it->second.param1 == m.param1 && it->second.param2 == m.param2);
}
}

View File

@ -26,6 +26,7 @@
#include "fdbclient/BackupAgent.actor.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/MutationTracking.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -508,23 +509,23 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
if (debugMutation("RestoreLoader", commitVersion.version, kvm)) {
TraceEvent e("DebugSplit");
int i = 0;
for (auto& [key, uid] : *pRangeToApplier) {
e.detail(format("Range%d", i).c_str(), printable(key))
.detail(format("UID%d", i).c_str(), uid.toString());
i++;
if (MUTATION_TRACKING_ENABLED) {
TraceEvent&& e = debugMutation("RestoreLoaderDebugSplit", commitVersion.version, kvm);
if (e.isEnabled()) {
int i = 0;
for (auto& [key, uid] : *pRangeToApplier) {
e.detail(format("Range%d", i).c_str(), printable(key))
.detail(format("UID%d", i).c_str(), uid.toString());
i++;
}
}
}
for (splitMutationIndex = 0; splitMutationIndex < mvector.size(); splitMutationIndex++) {
MutationRef mutation = mvector[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
if (debugMutation("RestoreLoader", commitVersion.version, mutation)) {
TraceEvent("SplittedMutation")
.detail("Version", commitVersion.toString())
.detail("Mutation", mutation.toString());
}
DEBUG_MUTATION("RestoreLoaderSplittedMutation", commitVersion.version, mutation)
.detail("Version", commitVersion.toString())
.detail("Mutation", mutation);
// CAREFUL: The splitted mutations' lifetime is shorter than the for-loop
// Must use deep copy for splitted mutations
applierVersionedMutationsBuffer[applierID].push_back_deep(
@ -540,12 +541,10 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
UID applierID = itlow->second;
kvCount++;
if (debugMutation("RestoreLoader", commitVersion.version, kvm)) {
TraceEvent("SendMutation")
.detail("Applier", applierID)
.detail("Version", commitVersion.toString())
.detail("Mutation", kvm.toString());
}
DEBUG_MUTATION("RestoreLoaderSendMutation", commitVersion.version, kvm)
.detail("Applier", applierID)
.detail("Version", commitVersion.toString())
.detail("Mutation", kvm);
// kvm data is saved in pkvOps in batchData, so shallow copy is ok here.
applierVersionedMutationsBuffer[applierID].push_back(applierVersionedMutationsBuffer[applierID].arena(),
VersionedMutation(kvm, commitVersion));
@ -1057,4 +1056,4 @@ TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
}
return Void();
}
}

View File

@ -76,4 +76,4 @@ bool isRangeMutation(MutationRef m) {
ASSERT(m.type == MutationRef::Type::SetValue || isAtomicOp((MutationRef::Type)m.type));
return false;
}
}
}

View File

@ -26,6 +26,7 @@
#include "fdbclient/Atomic.h"
#include "fdbclient/Notified.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -267,8 +268,8 @@ ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
path = 1;
}
//debugMutation("CacheGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
//debugMutation("CacheGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
//DEBUG_MUTATION("CacheGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
//DEBUG_MUTATION("CacheGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
if (v.present()) {
++data->counters.rowsQueried;
@ -710,22 +711,10 @@ void StorageCacheData::addMutation(KeyRangeRef const& cachedKeyRange, Version ve
return;
}
expanded = addMutationToMutationLog(mLog, expanded);
if (debugMutation("expandedMutation", version, expanded)) {
const char* type =
mutation.type == MutationRef::SetValue ? "SetValue" :
mutation.type == MutationRef::ClearRange ? "ClearRange" :
mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" :
mutation.type == MutationRef::DebugKey ? "DebugKey" :
"UnknownMutation";
printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%s\t%s\t%s\n",
now(), g_network->getLocalAddress().toString().c_str(), "originalMutation",
type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str());
printf(" Cached Key-range: %s - %s\n", printable(cachedKeyRange.begin).c_str(), printable(cachedKeyRange.end).c_str());
}
DEBUG_MUTATION("expandedMutation", version, expanded).detail("Begin", cachedKeyRange.begin).detail("End", cachedKeyRange.end);
applyMutation( this, expanded, mLog.arena(), mutableData() );
printf("\nSCUpdate: Printing versioned tree after applying mutation\n");
mutableData().printTree(version);
}
// Helper class for updating the storage cache (i.e. applying mutations)
@ -742,15 +731,11 @@ public:
data->mutableData().createNewVersion(ver);
}
DEBUG_MUTATION("SCUpdateMutation", ver, m);
if (m.param1.startsWith( systemKeys.end )) {
//TraceEvent("PrivateData", data->thisServerID).detail("Mutation", m.toString()).detail("Version", ver);
applyPrivateCacheData( data, m );
} else {
// FIXME: enable when debugMutation is active
//for(auto m = changes[c].mutations.begin(); m; ++m) {
// debugMutation("SCUpdateMutation", changes[c].version, *m);
//}
splitMutation(data, data->cachedRangeMap, m, ver);
}
@ -768,7 +753,7 @@ private:
//that this cache server is responsible for
// TODO Revisit during failure handling. Might we loose some private mutations?
void applyPrivateCacheData( StorageCacheData* data, MutationRef const& m ) {
TraceEvent(SevDebug, "SCPrivateCacheMutation", data->thisServerID).detail("Mutation", m.toString());
TraceEvent(SevDebug, "SCPrivateCacheMutation", data->thisServerID).detail("Mutation", m);
if (processedCacheStartKey) {
// we expect changes in pairs, [begin,end). This mutation is for end key of the range
@ -914,7 +899,7 @@ ACTOR Future<Void> pullAsyncData( StorageCacheData *data ) {
}
if(ver != invalidVersion && ver > data->version.get()) {
debugKeyRange("SCUpdate", ver, allKeys);
DEBUG_KEY_RANGE("SCUpdate", ver, allKeys);
data->mutableData().createNewVersion(ver);

View File

@ -31,6 +31,7 @@
#include "fdbserver/TLogInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/MutationTracking.h"
#include "flow/ActorCollection.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/IDiskQueue.h"
@ -1260,6 +1261,7 @@ void commitMessages( TLogData* self, Reference<LogData> logData, Version version
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
}
DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage()).detail("UID", self->dbgid).detail("LogId", logData->logId);
block.append(block.arena(), msg.message.begin(), msg.message.size());
for(auto tag : msg.tags) {
if(logData->locality == tagLocalitySatellite) {
@ -1374,7 +1376,12 @@ void peekMessagesFromMemory( Reference<LogData> self, TLogPeekRequest const& req
messages << VERSION_HEADER << currentVersion;
}
// We need the 4 byte length prefix to be a TagsAndMessage format, but that prefix is added as part of StringRef serialization.
int offset = messages.getLength();
messages << it->second.toStringRef();
void* data = messages.getData();
DEBUG_TAGS_AND_MESSAGE("TLogPeek", currentVersion, StringRef((uint8_t*)data+offset, messages.getLength()-offset))
.detail("LogId", self->logId).detail("PeekTag", req.tag);
}
}
@ -1636,6 +1643,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg).detail("UID", self->dbgid).detail("LogId", logData->logId).detail("PeekTag", req.tag);
}
lastRefMessageVersion = entry.version;

View File

@ -619,26 +619,6 @@ struct EventLogRequest {
}
};
struct DebugEntryRef {
double time;
NetworkAddress address;
StringRef context;
Version version;
MutationRef mutation;
DebugEntryRef() {}
DebugEntryRef( const char* c, Version v, MutationRef const& m ) : context((const uint8_t*)c,strlen(c)), version(v), mutation(m), time(now()), address( g_network->getLocalAddress() ) {}
DebugEntryRef( Arena& a, DebugEntryRef const& d ) : time(d.time), address(d.address), context(d.context), version(d.version), mutation(a, d.mutation) {}
size_t expectedSize() const {
return context.expectedSize() + mutation.expectedSize();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, time, address, context, version, mutation);
}
};
struct DiskStoreRequest {
constexpr static FileIdentifier file_identifier = 1986262;
bool includePartialStores;

View File

@ -196,63 +196,6 @@ bool enableFailures = true;
#define test_assert(x) if (!(x)) { cout << "Test failed: " #x << endl; return false; }
vector< Standalone<VectorRef<DebugEntryRef>> > debugEntries;
int64_t totalDebugEntriesSize = 0;
#if CENABLED(0, NOT_IN_CLEAN)
StringRef debugKey = LiteralStringRef("");
StringRef debugKey2 = LiteralStringRef("\xff\xff\xff\xff");
bool debugMutation( const char* context, Version version, MutationRef const& mutation ) {
if ((mutation.type == mutation.SetValue || mutation.type == mutation.AddValue || mutation.type==mutation.DebugKey) && (mutation.param1 == debugKey || mutation.param1 == debugKey2))
;//TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "SetValue").detail("Key", mutation.param1).detail("Value", mutation.param2);
else if ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) && ((mutation.param1<=debugKey && mutation.param2>debugKey) || (mutation.param1<=debugKey2 && mutation.param2>debugKey2)))
;//TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "ClearRange").detail("KeyBegin", mutation.param1).detail("KeyEnd", mutation.param2);
else
return false;
const char* type =
mutation.type == MutationRef::SetValue ? "SetValue" :
mutation.type == MutationRef::ClearRange ? "ClearRange" :
mutation.type == MutationRef::AddValue ? "AddValue" :
mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" :
mutation.type == MutationRef::DebugKey ? "DebugKey" :
"UnknownMutation";
printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%lld\t%s\t%s\t%s\n", now(), g_network->getLocalAddress().toString().c_str(), context, version, type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str());
return true;
}
bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) {
if (keys.contains(debugKey) || keys.contains(debugKey2)) {
debugMutation(context, version, MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end) );
//TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("KeyBegin", keys.begin).detail("KeyEnd", keys.end);
return true;
} else
return false;
}
#elif CENABLED(0, NOT_IN_CLEAN)
bool debugMutation( const char* context, Version version, MutationRef const& mutation ) {
if (!debugEntries.size() || debugEntries.back().size() >= 1000) {
if (debugEntries.size()) totalDebugEntriesSize += debugEntries.back().arena().getSize() + sizeof(debugEntries.back());
debugEntries.push_back(Standalone<VectorRef<DebugEntryRef>>());
TraceEvent("DebugMutationBuffer").detail("Bytes", totalDebugEntriesSize);
}
auto& v = debugEntries.back();
v.push_back_deep( v.arena(), DebugEntryRef(context, version, mutation) );
return false; // No auxiliary logging
}
bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) {
return debugMutation( context, version, MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end) );
}
#else // Default implementation.
bool debugMutation( const char* context, Version version, MutationRef const& mutation ) { return false; }
bool debugKeyRange( const char* context, Version version, KeyRangeRef const& keys ) { return false; }
#endif
#ifdef _WIN32
#include <sddl.h>
@ -1978,20 +1921,6 @@ int main(int argc, char* argv[]) {
cout << " " << i->second << " " << i->first << endl;*/
// cout << " " << Actor::allActors[i]->getName() << endl;
int total = 0;
for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i)
total += i->second;
if (total)
printf("%d errors:\n", total);
for(auto i = Error::errorCounts().begin(); i != Error::errorCounts().end(); ++i)
if (i->second > 0)
printf(" %d: %d %s\n", i->second, i->first, Error::fromCode(i->first).what());
if (&g_simulator == g_network) {
auto processes = g_simulator.getAllProcesses();
for(auto i = processes.begin(); i != processes.end(); ++i)
printf("%s %s: %0.3f Mclocks\n", (*i)->name, (*i)->address.toString().c_str(), (*i)->cpuTicks / 1e6);
}
if (role == Simulation) {
unsigned long sevErrorEventsLogged = TraceEvent::CountEventsLoggedAt(SevError);
if (sevErrorEventsLogged > 0) {

View File

@ -42,6 +42,7 @@
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/StorageMetrics.h"
#include "fdbserver/ServerDBInfo.h"
@ -951,8 +952,8 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
v = vv;
}
debugMutation("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
debugMutation("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
DEBUG_MUTATION("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
DEBUG_MUTATION("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
/*
StorageMetrics m;
@ -1026,7 +1027,7 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
throw reply.error.get();
}
debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
DEBUG_MUTATION("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
@ -2082,7 +2083,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
wait( data->coreStarted.getFuture() && delay( 0 ) );
try {
debugKeyRange("fetchKeysBegin", data->version.get(), shard->keys);
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys);
TraceEvent(SevDebug, interval.begin(), data->thisServerID)
.detail("KeyBegin", shard->keys.begin)
@ -2150,8 +2151,8 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
.detail("KeyBegin", keys.begin).detail("KeyEnd", keys.end)
.detail("Last", this_block.size() ? this_block.end()[-1].key : std::string())
.detail("Version", fetchVersion).detail("More", this_block.more);
debugKeyRange("fetchRange", fetchVersion, keys);
for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys);
for(auto k = this_block.begin(); k != this_block.end(); ++k) DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
data->counters.bytesFetched += expectedSize;
if( fetchBlockBytes > expectedSize ) {
@ -2298,7 +2299,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
ASSERT( b->version >= checkv );
checkv = b->version;
for(auto& m : b->mutations)
debugMutation("fetchKeysFinalCommitInject", batch->changes[0].version, m);
DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m);
}
shard->updates.clear();
@ -2407,7 +2408,8 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss
// .detail("Context", changeServerKeysContextName[(int)context]);
validate(data);
debugKeyRange( nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys );
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
//DEBUG_KEY_RANGE( nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys );
bool isDifferent = false;
auto existingShards = data->shards.intersectingRanges(keys);
@ -2512,7 +2514,7 @@ void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAss
void rollback( StorageServer* data, Version rollbackVersion, Version nextVersion ) {
TEST(true); // call to shard rollback
debugKeyRange("Rollback", rollbackVersion, allKeys);
DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys);
// We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
@ -2533,18 +2535,7 @@ void StorageServer::addMutation(Version version, MutationRef const& mutation, Ke
return;
}
expanded = addMutationToMutationLog(mLog, expanded);
if (debugMutation("expandedMutation", version, expanded)) {
const char* type =
mutation.type == MutationRef::SetValue ? "SetValue" :
mutation.type == MutationRef::ClearRange ? "ClearRange" :
mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" :
mutation.type == MutationRef::DebugKey ? "DebugKey" :
"UnknownMutation";
printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%" PRId64 "\t%s\t%s\t%s\n", now(), g_network->getLocalAddress().toString().c_str(), "originalMutation", version, type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str());
printf(" shard: %s - %s\n", printable(shard.begin).c_str(), printable(shard.end).c_str());
if (mutation.type == MutationRef::ClearRange && mutation.param2 != shard.end)
printf(" eager: %s\n", printable( eagerReads->getKeyEnd( mutation.param2 ) ).c_str() );
}
DEBUG_MUTATION("applyMutation", version, expanded).detail("UID", thisServerID).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
applyMutation( this, expanded, mLog.arena(), mutableData() );
//printf("\nSSUpdate: Printing versioned tree after applying mutation\n");
//mutableData().printTree(version);
@ -2597,9 +2588,9 @@ public:
applyPrivateData( data, m );
}
} else {
// FIXME: enable when debugMutation is active
// FIXME: enable when DEBUG_MUTATION is active
//for(auto m = changes[c].mutations.begin(); m; ++m) {
// debugMutation("SSUpdateMutation", changes[c].version, *m);
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m);
//}
splitMutation(data, data->shards, m, ver);
@ -2885,7 +2876,8 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
rd >> msg;
if (ver != invalidVersion) { // This change belongs to a version < minVersion
if (debugMutation("SSPeek", ver, msg) || ver == 1) {
DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID);
if (ver == 1) {
TraceEvent("SSPeekMutation", data->thisServerID);
// The following trace event may produce a value with special characters
//TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
@ -2938,7 +2930,8 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
}
if(ver != invalidVersion && ver > data->version.get()) {
debugKeyRange("SSUpdate", ver, allKeys);
// TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef());
data->mutableData().createNewVersion(ver);
if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
@ -3141,7 +3134,7 @@ void StorageServerDisk::writeKeyValue( KeyValueRef kv ) {
}
void StorageServerDisk::writeMutation( MutationRef mutation ) {
// FIXME: debugMutation(debugContext, debugVersion, *m);
// FIXME: DEBUG_MUTATION(debugContext, debugVersion, *m);
if (mutation.type == MutationRef::SetValue) {
storage->set( KeyValueRef(mutation.param1, mutation.param2) );
} else if (mutation.type == MutationRef::ClearRange) {
@ -3152,7 +3145,7 @@ void StorageServerDisk::writeMutation( MutationRef mutation ) {
void StorageServerDisk::writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext ) {
for(auto m = mutations.begin(); m; ++m) {
debugMutation(debugContext, debugVersion, *m);
DEBUG_MUTATION(debugContext, debugVersion, *m).detail("UID", data->thisServerID);
if (m->type == MutationRef::SetValue) {
storage->set( KeyValueRef(m->param1, m->param2) );
} else if (m->type == MutationRef::ClearRange) {
@ -3169,7 +3162,8 @@ bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion
if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
VersionUpdateRef const& v = u->second;
ASSERT( v.version > prevStorageVersion && v.version <= newStorageVersion );
debugKeyRange("makeVersionMutationsDurable", v.version, allKeys);
// TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
writeMutations(v.mutations, v.version, "makeVersionDurable");
for(auto m=v.mutations.begin(); m; ++m)
bytesLeft -= mvccStorageBytes(*m);
@ -3355,7 +3349,8 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
for(auto it = data->newestAvailableVersion.ranges().begin(); it != data->newestAvailableVersion.ranges().end(); ++it) {
if (it->value() == invalidVersion) {
KeyRangeRef clearRange(it->begin(), it->end());
debugKeyRange("clearInvalidVersion", invalidVersion, clearRange);
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
//DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange);
storage->clear( clearRange );
data->byteSampleApplyClear( clearRange, invalidVersion );
}

View File

@ -20,6 +20,7 @@
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/ApiWorkload.h"
#include "fdbserver/workloads/MemoryKeyValueStore.h"
@ -328,7 +329,7 @@ public:
wait(transaction->commit());
for(int i = currentIndex; i < std::min(currentIndex + self->maxKeysPerTransaction, data.size()); i++)
debugMutation("ApiCorrectnessSet", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, data[i].key, data[i].value));
DEBUG_MUTATION("ApiCorrectnessSet", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, data[i].key, data[i].value));
currentIndex += self->maxKeysPerTransaction;
break;
@ -660,7 +661,7 @@ public:
wait(transaction->commit());
for(int i = currentIndex; i < std::min(currentIndex + self->maxKeysPerTransaction, keys.size()); i++)
debugMutation("ApiCorrectnessClear", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, keys[i], StringRef()));
DEBUG_MUTATION("ApiCorrectnessClear", transaction->getCommittedVersion(), MutationRef(MutationRef::DebugKey, keys[i], StringRef()));
currentIndex += self->maxKeysPerTransaction;
break;
@ -711,7 +712,7 @@ public:
}
transaction->clear(range);
wait(transaction->commit());
debugKeyRange("ApiCorrectnessClear", transaction->getCommittedVersion(), range);
DEBUG_KEY_RANGE("ApiCorrectnessClear", transaction->getCommittedVersion(), range);
break;
}
catch(Error &e) {

View File

@ -28,8 +28,13 @@
#include <stdint.h>
#include <time.h>
<<<<<<< HEAD
#include "..\flow\SimpleOpt.h"
#include "..\fdbmonitor\SimpleIni.h"
=======
#include "flow/SimpleOpt.h"
#include "fdbmonitor/SimpleIni.h"
>>>>>>> master
#include "fdbclient/versions.h"
// For PathFileExists

View File

@ -41,21 +41,25 @@ public:
Deque() : arr(0), begin(0), end(0), mask(-1) {}
// TODO: iterator construction, other constructors
Deque(Deque const& r) : arr(0), begin(0), end(r.size()), mask(r.mask) {
Deque(Deque const& r) : arr(nullptr), begin(0), end(r.size()), mask(r.mask) {
if (r.capacity() > 0) {
arr = (T*)aligned_alloc(std::max(__alignof(T), sizeof(void*)), capacity() * sizeof(T));
ASSERT(arr != nullptr);
}
ASSERT(capacity() >= end || end == 0);
for (uint32_t i=0; i<end; i++)
new (&arr[i]) T(r[i]);
// FIXME: Specialization for POD types using memcpy?
if (r.end >= r.begin) {
std::copy(r.arr + r.begin, r.arr + r.begin + r.size(), arr);
} else {
auto partOneSize = r.capacity() - r.begin;
std::copy(r.arr + r.begin, r.arr + r.begin + partOneSize, arr);
std::copy(r.arr, r.arr + r.end, arr + partOneSize);
}
}
void operator=(Deque const& r) {
cleanup();
arr = 0;
arr = nullptr;
begin = 0;
end = r.size();
mask = r.mask;
@ -64,13 +68,17 @@ public:
ASSERT(arr != nullptr);
}
ASSERT(capacity() >= end || end == 0);
for (uint32_t i=0; i<end; i++)
new (&arr[i]) T(r[i]);
// FIXME: Specialization for POD types using memcpy?
if (r.end >= r.begin) {
std::copy(r.arr + r.begin, r.arr + r.begin + r.size(), arr);
} else {
auto partOneSize = r.capacity() - r.begin;
std::copy(r.arr + r.begin, r.arr + r.begin + partOneSize, arr);
std::copy(r.arr, r.arr + r.end, arr + partOneSize);
}
}
Deque(Deque&& r) BOOST_NOEXCEPT : begin(r.begin), end(r.end), mask(r.mask), arr(r.arr) {
r.arr = 0;
r.arr = nullptr;
r.begin = r.end = 0;
r.mask = -1;
}
@ -82,8 +90,8 @@ public:
end = r.end;
mask = r.mask;
arr = r.arr;
r.arr = 0;
r.arr = nullptr;
r.begin = r.end = 0;
r.mask = -1;
}

View File

@ -28,11 +28,6 @@ using std::make_pair;
bool g_crashOnError = false;
std::map<int, int>& Error::errorCounts() {
static std::map<int, int> counts;
return counts;
}
#include <iostream>
Error Error::fromUnvalidatedCode(int code) {
@ -70,8 +65,6 @@ Error::Error(int error_code)
crashAndDie();
}
}
/*if (error_code)
errorCounts()[error_code]++;*/
}
ErrorCodeTable& Error::errorCodeTable() {

View File

@ -58,9 +58,12 @@ public:
explicit Error(int error_code);
static void init();
static std::map<int, int>& errorCounts();
static ErrorCodeTable& errorCodeTable();
static Error fromCode(int error_code) { Error e; e.error_code = error_code; return e; } // Doesn't change errorCounts
static Error fromCode(int error_code) {
Error e;
e.error_code = error_code;
return e;
}
static Error fromUnvalidatedCode(int error_code); // Converts codes that are outside the legal range (but not necessarily individually unknown error codes) to unknown_error()
Error asInjectedFault() const; // Returns an error with the same code() as this but isInjectedFault() is true

View File

@ -131,7 +131,7 @@ public: // introduced features
//
// xyzdev
// vvvv
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B063010001LL);
constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B070010001LL);
// This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to
// change when we reach version 10.
static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version");

View File

@ -783,6 +783,7 @@ TraceEvent::TraceEvent(TraceEvent &&ev) {
tmpEventMetric = ev.tmpEventMetric;
trackingKey = ev.trackingKey;
type = ev.type;
timeIndex = ev.timeIndex;
ev.initialized = true;
ev.enabled = false;
@ -803,6 +804,7 @@ TraceEvent& TraceEvent::operator=(TraceEvent &&ev) {
tmpEventMetric = ev.tmpEventMetric;
trackingKey = ev.trackingKey;
type = ev.type;
timeIndex = ev.timeIndex;
ev.initialized = true;
ev.enabled = false;

View File

@ -479,6 +479,10 @@ public:
return enabled;
}
explicit operator bool() const {
return enabled;
}
void log();
~TraceEvent(); // Actually logs the event

View File

@ -79,7 +79,7 @@ if(WITH_PYTHON)
if (NOT USE_UBSAN) # TODO re-enable in UBSAN after https://github.com/apple/foundationdb/issues/2410 is resolved
add_fdb_test(TEST_FILES SimpleExternalTest.txt)
else()
message(WARNING "Python not found, won't configure ctest")
add_fdb_test(TEST_FILES SimpleExternalTest.txt IGNORE)
endif()
add_fdb_test(TEST_FILES SlowTask.txt IGNORE)
add_fdb_test(TEST_FILES SpecificUnitTest.txt IGNORE)