Merge branch 'main' of https://github.com/apple/foundationdb into feature/main/clearRange

This commit is contained in:
Xiaoxi Wang 2023-02-06 14:28:35 -08:00
commit 6df5f1fa56
53 changed files with 1242 additions and 593 deletions

View File

@ -70,12 +70,15 @@ void ApiWorkload::start() {
schedule([this]() {
// 1. Clear data
clearData([this]() {
// 2. Workload setup
setup([this]() {
// 3. Populate initial data
populateData([this]() {
// 4. Generate random workload
runTests();
// 2. Create tenants if necessary.
createTenantsIfNecessary([this] {
// 3. Workload setup.
setup([this]() {
// 4. Populate initial data
populateData([this]() {
// 5. Generate random workload
runTests();
});
});
});
});
@ -259,9 +262,17 @@ void ApiWorkload::createTenants(TTaskFct cont) {
[this, cont]() { schedule(cont); });
}
void ApiWorkload::createTenantsIfNecessary(TTaskFct cont) {
if (tenants.size() > 0) {
createTenants(cont);
} else {
schedule(cont);
}
}
void ApiWorkload::populateData(TTaskFct cont) {
if (tenants.size() > 0) {
createTenants([this, cont]() { populateTenantData(cont, std::make_optional(0)); });
populateTenantData(cont, std::make_optional(0));
} else {
populateTenantData(cont, {});
}

View File

@ -141,6 +141,7 @@ private:
void populateDataTx(TTaskFct cont, std::optional<int> tenantId);
void populateTenantData(TTaskFct cont, std::optional<int> tenantId);
void createTenants(TTaskFct cont);
void createTenantsIfNecessary(TTaskFct cont);
void clearTenantData(TTaskFct cont, std::optional<int> tenantId);

View File

@ -40,7 +40,6 @@ public:
}
private:
// FIXME: add tenant support for DB operations
// FIXME: use other new blob granule apis!
enum OpType {
OP_INSERT,
@ -58,15 +57,8 @@ private:
void setup(TTaskFct cont) override { setupBlobGranules(cont); }
// FIXME: get rid of readSuccess* in this test now that setup is verify()-ing
// Allow reads at the start to get blob_granule_transaction_too_old if BG data isn't initialized yet
std::unordered_set<std::optional<int>> tenantsWithReadSuccess;
std::set<fdb::ByteString> validatedFiles;
inline void setReadSuccess(std::optional<int> tenantId) { tenantsWithReadSuccess.insert(tenantId); }
inline bool seenReadSuccess(std::optional<int> tenantId) { return tenantsWithReadSuccess.count(tenantId); }
void debugOp(std::string opName, fdb::KeyRange keyRange, std::optional<int> tenantId, std::string message) {
if (BG_API_DEBUG_VERBOSE) {
info(fmt::format("{0}: [{1} - {2}) {3}: {4}",
@ -99,30 +91,15 @@ private:
granuleContext);
auto out = fdb::Result::KeyValueRefArray{};
fdb::Error err = res.getKeyValueArrayNothrow(out);
if (err.code() == error_code_blob_granule_transaction_too_old) {
bool previousSuccess = seenReadSuccess(tenantId);
if (previousSuccess) {
error("Read bg too old after read success!\n");
} else {
info("Read bg too old\n");
}
ASSERT(!previousSuccess);
*tooOld = true;
ctx->done();
} else if (err.code() != error_code_success) {
ASSERT(err.code() != error_code_blob_granule_transaction_too_old);
if (err.code() != error_code_success) {
ctx->onError(err);
} else {
auto resCopy = copyKeyValueArray(out);
auto& [resVector, out_more] = resCopy;
ASSERT(!out_more);
results.get()->assign(resVector.begin(), resVector.end());
bool previousSuccess = seenReadSuccess(tenantId);
if (!previousSuccess) {
info(fmt::format("Read {0}: first success\n", debugTenantStr(tenantId)));
setReadSuccess(tenantId);
} else {
debugOp("Read", keyRange, tenantId, "complete");
}
debugOp("Read", keyRange, tenantId, "complete");
ctx->done();
}
},
@ -183,19 +160,13 @@ private:
},
[this, keyRange, tenantId, results, cont]() {
debugOp("GetGranules", keyRange, tenantId, fmt::format("complete with {0} granules", results->size()));
this->validateRanges(results, keyRange, seenReadSuccess(tenantId));
this->validateRanges(results, keyRange);
schedule(cont);
},
getTenant(tenantId));
}
void randomSummarizeOp(TTaskFct cont, std::optional<int> tenantId) {
if (!seenReadSuccess(tenantId)) {
// tester can't handle this throwing bg_txn_too_old, so just don't call it unless we have already seen a
// read success
schedule(cont);
return;
}
fdb::KeyRange keyRange = randomNonEmptyKeyRange();
auto results = std::make_shared<std::vector<fdb::GranuleSummary>>();
@ -231,33 +202,29 @@ private:
ranges->push_back((*results)[i].keyRange);
}
this->validateRanges(ranges, keyRange, true);
this->validateRanges(ranges, keyRange);
schedule(cont);
},
getTenant(tenantId));
}
void validateRanges(std::shared_ptr<std::vector<fdb::KeyRange>> results,
fdb::KeyRange keyRange,
bool shouldBeRanges) {
if (shouldBeRanges) {
if (results->size() == 0) {
error(fmt::format("ValidateRanges: [{0} - {1}): No ranges returned!",
fdb::toCharsRef(keyRange.beginKey),
fdb::toCharsRef(keyRange.endKey)));
}
ASSERT(results->size() > 0);
if (results->front().beginKey > keyRange.beginKey || results->back().endKey < keyRange.endKey) {
error(fmt::format("ValidateRanges: [{0} - {1}): Incomplete range(s) returned [{2} - {3})!",
fdb::toCharsRef(keyRange.beginKey),
fdb::toCharsRef(keyRange.endKey),
fdb::toCharsRef(results->front().beginKey),
fdb::toCharsRef(results->back().endKey)));
}
ASSERT(results->front().beginKey <= keyRange.beginKey);
ASSERT(results->back().endKey >= keyRange.endKey);
void validateRanges(std::shared_ptr<std::vector<fdb::KeyRange>> results, fdb::KeyRange keyRange) {
if (results->size() == 0) {
error(fmt::format("ValidateRanges: [{0} - {1}): No ranges returned!",
fdb::toCharsRef(keyRange.beginKey),
fdb::toCharsRef(keyRange.endKey)));
}
ASSERT(results->size() > 0);
if (results->front().beginKey > keyRange.beginKey || results->back().endKey < keyRange.endKey) {
error(fmt::format("ValidateRanges: [{0} - {1}): Incomplete range(s) returned [{2} - {3})!",
fdb::toCharsRef(keyRange.beginKey),
fdb::toCharsRef(keyRange.endKey),
fdb::toCharsRef(results->front().beginKey),
fdb::toCharsRef(results->back().endKey)));
}
ASSERT(results->front().beginKey <= keyRange.beginKey);
ASSERT(results->back().endKey >= keyRange.endKey);
for (int i = 0; i < results->size(); i++) {
// no empty or inverted ranges
if ((*results)[i].beginKey >= (*results)[i].endKey) {
@ -293,7 +260,6 @@ private:
execOperation(
[keyRange, results](auto ctx) {
// FIXME: add tenant!
fdb::Future f =
ctx->dbOps()->listBlobbifiedRanges(keyRange.beginKey, keyRange.endKey, 1000).eraseType();
ctx->continueAfter(f, [ctx, f, results]() {
@ -303,7 +269,7 @@ private:
},
[this, keyRange, tenantId, results, cont]() {
debugOp("GetBlobRanges", keyRange, tenantId, fmt::format("complete with {0} ranges", results->size()));
this->validateRanges(results, keyRange, seenReadSuccess(tenantId));
this->validateRanges(results, keyRange);
schedule(cont);
},
getTenant(tenantId),
@ -319,7 +285,6 @@ private:
auto verifyVersion = std::make_shared<int64_t>(-1);
execOperation(
[keyRange, verifyVersion](auto ctx) {
// FIXME: add tenant!!
fdb::Future f = ctx->dbOps()
->verifyBlobRange(keyRange.beginKey, keyRange.endKey, -2 /* latest version*/)
.eraseType();
@ -330,13 +295,6 @@ private:
},
[this, keyRange, tenantId, verifyVersion, cont]() {
debugOp("Verify", keyRange, tenantId, fmt::format("Complete @ {0}", *verifyVersion));
bool previousSuccess = seenReadSuccess(tenantId);
if (*verifyVersion == -1) {
ASSERT(!previousSuccess);
} else if (!previousSuccess) {
info(fmt::format("Verify {0}: first success\n", debugTenantStr(tenantId)));
setReadSuccess(tenantId);
}
schedule(cont);
},
getTenant(tenantId),
@ -475,11 +433,6 @@ private:
std::optional<int> tenantId,
int64_t readVersion) {
ASSERT(!results.empty());
ASSERT(results.front().keyRange.beginKey <= keyRange.beginKey);
ASSERT(keyRange.endKey <= results.back().keyRange.endKey);
for (int i = 0; i < results.size() - 1; i++) {
ASSERT(results[i].keyRange.endKey == results[i + 1].keyRange.beginKey);
}
if (tenantId) {
// FIXME: support tenants!!
@ -487,6 +440,12 @@ private:
return;
}
ASSERT(results.front().keyRange.beginKey <= keyRange.beginKey);
ASSERT(keyRange.endKey <= results.back().keyRange.endKey);
for (int i = 0; i < results.size() - 1; i++) {
ASSERT(results[i].keyRange.endKey == results[i + 1].keyRange.beginKey);
}
TesterGranuleContext testerContext(ctx->getBGBasePath());
fdb::native::FDBReadBlobGranuleContext bgCtx = createGranuleContext(&testerContext);
for (int i = 0; i < results.size(); i++) {
@ -495,9 +454,6 @@ private:
}
void randomReadDescription(TTaskFct cont, std::optional<int> tenantId) {
if (!seenReadSuccess(tenantId)) {
return;
}
fdb::KeyRange keyRange = randomNonEmptyKeyRange();
auto results = std::make_shared<std::vector<fdb::GranuleDescription>>();
auto readVersionOut = std::make_shared<int64_t>();

View File

@ -0,0 +1,24 @@
[[test]]
title = 'Blob Granule API Tenant Correctness Multi Threaded'
multiThreaded = true
buggify = true
minFdbThreads = 2
maxFdbThreads = 8
minClients = 1
maxClients = 8
minTenants = 1
maxTenants = 5
[[server]]
blob_granules_enabled = true
[[test.workload]]
name = 'ApiBlobGranuleCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
# TODO - increase initialSize and/or buggify down BG_SNAPSHOT_FILE_TARGET_BYTES to force multiple granules
initialSize = 100
numRandomOperations = 100

View File

@ -60,7 +60,7 @@ class StatFetcher:
class TestPicker:
def __init__(self, test_dir: Path):
if not test_dir.exists():
raise RuntimeError('{} is neither a directory nor a file'.format(test_dir))
raise RuntimeError("{} is neither a directory nor a file".format(test_dir))
self.include_files_regex = re.compile(config.include_test_files)
self.exclude_files_regex = re.compile(config.exclude_test_files)
self.include_tests_regex = re.compile(config.include_test_classes)
@ -78,6 +78,7 @@ class TestPicker:
self.stat_fetcher = StatFetcher(self.tests)
else:
from test_harness.fdb import FDBStatFetcher
self.stat_fetcher = FDBStatFetcher(self.tests)
if config.stats is not None:
self.load_stats(config.stats)
@ -106,50 +107,60 @@ class TestPicker:
break
assert test_name is not None and test_desc is not None
self.stat_fetcher.add_run_time(test_name, run_time, out)
out.attributes['TotalTestTime'] = str(test_desc.total_runtime)
out.attributes['TestRunCount'] = str(test_desc.num_runs)
out.attributes["TotalTestTime"] = str(test_desc.total_runtime)
out.attributes["TestRunCount"] = str(test_desc.num_runs)
def dump_stats(self) -> str:
res = array.array('I')
res = array.array("I")
for _, spec in self.tests.items():
res.append(spec.total_runtime)
return base64.standard_b64encode(res.tobytes()).decode('utf-8')
return base64.standard_b64encode(res.tobytes()).decode("utf-8")
def fetch_stats(self):
self.stat_fetcher.read_stats()
def load_stats(self, serialized: str):
times = array.array('I')
times = array.array("I")
times.frombytes(base64.standard_b64decode(serialized))
assert len(times) == len(self.tests.items())
for idx, (_, spec) in enumerate(self.tests.items()):
spec.total_runtime = times[idx]
def parse_txt(self, path: Path):
if self.include_files_regex.search(str(path)) is None or self.exclude_files_regex.search(str(path)) is not None:
if (
self.include_files_regex.search(str(path)) is None
or self.exclude_files_regex.search(str(path)) is not None
):
return
with path.open('r') as f:
with path.open("r") as f:
test_name: str | None = None
test_class: str | None = None
priority: float | None = None
for line in f:
line = line.strip()
kv = line.split('=')
kv = line.split("=")
if len(kv) != 2:
continue
kv[0] = kv[0].strip()
kv[1] = kv[1].strip(' \r\n\t\'"')
if kv[0] == 'testTitle' and test_name is None:
kv[1] = kv[1].strip(" \r\n\t'\"")
if kv[0] == "testTitle" and test_name is None:
test_name = kv[1]
if kv[0] == 'testClass' and test_class is None:
if kv[0] == "testClass" and test_class is None:
test_class = kv[1]
if kv[0] == 'testPriority' and priority is None:
if kv[0] == "testPriority" and priority is None:
try:
priority = float(kv[1])
except ValueError:
raise RuntimeError("Can't parse {} -- testPriority in {} should be set to a float".format(kv[1],
path))
if test_name is not None and test_class is not None and priority is not None:
raise RuntimeError(
"Can't parse {} -- testPriority in {} should be set to a float".format(
kv[1], path
)
)
if (
test_name is not None
and test_class is not None
and priority is not None
):
break
if test_name is None:
return
@ -157,8 +168,10 @@ class TestPicker:
test_class = test_name
if priority is None:
priority = 1.0
if self.include_tests_regex.search(test_class) is None \
or self.exclude_tests_regex.search(test_class) is not None:
if (
self.include_tests_regex.search(test_class) is None
or self.exclude_tests_regex.search(test_class) is not None
):
return
if test_class not in self.tests:
self.tests[test_class] = TestDescription(path, test_class, priority)
@ -173,12 +186,12 @@ class TestPicker:
# check whether we're looking at a restart test
if self.follow_test.match(test.name) is not None:
return
if test.suffix == '.txt' or test.suffix == '.toml':
if test.suffix == ".txt" or test.suffix == ".toml":
self.parse_txt(test)
@staticmethod
def list_restart_files(start_file: Path) -> List[Path]:
name = re.sub(r'-\d+.(txt|toml)', '', start_file.name)
name = re.sub(r"-\d+.(txt|toml)", "", start_file.name)
res: List[Path] = []
for test_file in start_file.parent.iterdir():
if test_file.name.startswith(name):
@ -209,12 +222,12 @@ class TestPicker:
class OldBinaries:
def __init__(self):
self.first_file_expr = re.compile(r'.*-1\.(txt|toml)')
self.first_file_expr = re.compile(r".*-1\.(txt|toml)")
self.old_binaries_path: Path = config.old_binaries_path
self.binaries: OrderedDict[Version, Path] = collections.OrderedDict()
if not self.old_binaries_path.exists() or not self.old_binaries_path.is_dir():
return
exec_pattern = re.compile(r'fdbserver-\d+\.\d+\.\d+(\.exe)?')
exec_pattern = re.compile(r"fdbserver-\d+\.\d+\.\d+(\.exe)?")
for file in self.old_binaries_path.iterdir():
if not file.is_file() or not os.access(file, os.X_OK):
continue
@ -222,9 +235,9 @@ class OldBinaries:
self._add_file(file)
def _add_file(self, file: Path):
version_str = file.name.split('-')[1]
if version_str.endswith('.exe'):
version_str = version_str[0:-len('.exe')]
version_str = file.name.split("-")[1]
if version_str.endswith(".exe"):
version_str = version_str[0 : -len(".exe")]
ver = Version.parse(version_str)
self.binaries[ver] = file
@ -232,21 +245,21 @@ class OldBinaries:
if len(self.binaries) == 0:
return config.binary
max_version = Version.max_version()
min_version = Version.parse('5.0.0')
min_version = Version.parse("5.0.0")
dirs = test_file.parent.parts
if 'restarting' not in dirs:
if "restarting" not in dirs:
return config.binary
version_expr = dirs[-1].split('_')
version_expr = dirs[-1].split("_")
first_file = self.first_file_expr.match(test_file.name) is not None
if first_file and version_expr[0] == 'to':
if first_file and version_expr[0] == "to":
# downgrade test -- first binary should be current one
return config.binary
if not first_file and version_expr[0] == 'from':
if not first_file and version_expr[0] == "from":
# upgrade test -- we only return an old version for the first test file
return config.binary
if version_expr[0] == 'from' or version_expr[0] == 'to':
if version_expr[0] == "from" or version_expr[0] == "to":
min_version = Version.parse(version_expr[1])
if len(version_expr) == 4 and version_expr[2] == 'until':
if len(version_expr) == 4 and version_expr[2] == "until":
max_version = Version.parse(version_expr[3])
candidates: List[Path] = []
for ver, binary in self.binaries.items():
@ -259,13 +272,13 @@ class OldBinaries:
def is_restarting_test(test_file: Path):
for p in test_file.parts:
if p == 'restarting':
if p == "restarting":
return True
return False
def is_no_sim(test_file: Path):
return test_file.parts[-2] == 'noSim'
return test_file.parts[-2] == "noSim"
class ResourceMonitor(threading.Thread):
@ -291,9 +304,19 @@ class ResourceMonitor(threading.Thread):
class TestRun:
def __init__(self, binary: Path, test_file: Path, random_seed: int, uid: uuid.UUID,
restarting: bool = False, test_determinism: bool = False, buggify_enabled: bool = False,
stats: str | None = None, expected_unseed: int | None = None, will_restart: bool = False):
def __init__(
self,
binary: Path,
test_file: Path,
random_seed: int,
uid: uuid.UUID,
restarting: bool = False,
test_determinism: bool = False,
buggify_enabled: bool = False,
stats: str | None = None,
expected_unseed: int | None = None,
will_restart: bool = False,
):
self.binary = binary
self.test_file = test_file
self.random_seed = random_seed
@ -313,23 +336,31 @@ class TestRun:
self.temp_path = config.run_dir / str(self.uid)
# state for the run
self.retryable_error: bool = False
self.summary: Summary = Summary(binary, uid=self.uid, stats=self.stats, expected_unseed=self.expected_unseed,
will_restart=will_restart, long_running=config.long_running)
self.summary: Summary = Summary(
binary,
uid=self.uid,
stats=self.stats,
expected_unseed=self.expected_unseed,
will_restart=will_restart,
long_running=config.long_running,
)
self.run_time: int = 0
self.success = self.run()
def log_test_plan(self, out: SummaryTree):
test_plan: SummaryTree = SummaryTree('TestPlan')
test_plan.attributes['TestUID'] = str(self.uid)
test_plan.attributes['RandomSeed'] = str(self.random_seed)
test_plan.attributes['TestFile'] = str(self.test_file)
test_plan.attributes['Buggify'] = '1' if self.buggify_enabled else '0'
test_plan.attributes['FaultInjectionEnabled'] = '1' if self.fault_injection_enabled else '0'
test_plan.attributes['DeterminismCheck'] = '1' if self.test_determinism else '0'
test_plan: SummaryTree = SummaryTree("TestPlan")
test_plan.attributes["TestUID"] = str(self.uid)
test_plan.attributes["RandomSeed"] = str(self.random_seed)
test_plan.attributes["TestFile"] = str(self.test_file)
test_plan.attributes["Buggify"] = "1" if self.buggify_enabled else "0"
test_plan.attributes["FaultInjectionEnabled"] = (
"1" if self.fault_injection_enabled else "0"
)
test_plan.attributes["DeterminismCheck"] = "1" if self.test_determinism else "0"
out.append(test_plan)
def delete_simdir(self):
shutil.rmtree(self.temp_path / Path('simfdb'))
shutil.rmtree(self.temp_path / Path("simfdb"))
def run(self):
command: List[str] = []
@ -341,47 +372,68 @@ class TestRun:
# the test take longer. Also old binaries weren't built with
# USE_VALGRIND=ON, and we have seen false positives with valgrind in
# such binaries.
command.append('valgrind')
valgrind_file = self.temp_path / Path('valgrind-{}.xml'.format(self.random_seed))
dbg_path = os.getenv('FDB_VALGRIND_DBGPATH')
command.append("valgrind")
valgrind_file = self.temp_path / Path(
"valgrind-{}.xml".format(self.random_seed)
)
dbg_path = os.getenv("FDB_VALGRIND_DBGPATH")
if dbg_path is not None:
command.append('--extra-debuginfo-path={}'.format(dbg_path))
command += ['--xml=yes', '--xml-file={}'.format(valgrind_file.absolute()), '-q']
command += [str(self.binary.absolute()),
'-r', 'test' if is_no_sim(self.test_file) else 'simulation',
'-f', str(self.test_file),
'-s', str(self.random_seed)]
command.append("--extra-debuginfo-path={}".format(dbg_path))
command += [
"--xml=yes",
"--xml-file={}".format(valgrind_file.absolute()),
"-q",
]
command += [
str(self.binary.absolute()),
"-r",
"test" if is_no_sim(self.test_file) else "simulation",
"-f",
str(self.test_file),
"-s",
str(self.random_seed),
]
if self.trace_format is not None:
command += ['--trace_format', self.trace_format]
command += ["--trace_format", self.trace_format]
if self.use_tls_plugin:
command += ['--tls_plugin', str(config.tls_plugin_path)]
command += ["--tls_plugin", str(config.tls_plugin_path)]
env["FDB_TLS_PLUGIN"] = str(config.tls_plugin_path)
if config.disable_kaio:
command += ['--knob-disable-posix-kernel-aio=1']
if Version.of_binary(self.binary) >= '7.1.0':
command += ['-fi', 'on' if self.fault_injection_enabled else 'off']
command += ["--knob-disable-posix-kernel-aio=1"]
if Version.of_binary(self.binary) >= "7.1.0":
command += ["-fi", "on" if self.fault_injection_enabled else "off"]
if self.restarting:
command.append('--restarting')
command.append("--restarting")
if self.buggify_enabled:
command += ['-b', 'on']
command += ["-b", "on"]
if config.crash_on_error:
command.append('--crash')
command.append("--crash")
if config.long_running:
# disable simulation speedup
command += ['--knob-sim-speedup-after-seconds=36000']
command += ["--knob-sim-speedup-after-seconds=36000"]
# disable traceTooManyLines Error MAX_TRACE_LINES
command += ['--knob-max-trace-lines=1000000000']
command += ["--knob-max-trace-lines=1000000000"]
self.temp_path.mkdir(parents=True, exist_ok=True)
# self.log_test_plan(out)
resources = ResourceMonitor()
resources.start()
process = subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, cwd=self.temp_path,
text=True, env=env)
process = subprocess.Popen(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
cwd=self.temp_path,
text=True,
env=env,
)
did_kill = False
# No timeout for long running tests
timeout = 20 * config.kill_seconds if self.use_valgrind else (None if config.long_running else config.kill_seconds)
timeout = (
20 * config.kill_seconds
if self.use_valgrind
else (None if config.long_running else config.kill_seconds)
)
err_out: str
try:
_, err_out = process.communicate(timeout=timeout)
@ -398,7 +450,7 @@ class TestRun:
self.summary.was_killed = did_kill
self.summary.valgrind_out_file = valgrind_file
self.summary.error_out = err_out
self.summary.summarize(self.temp_path, ' '.join(command))
self.summary.summarize(self.temp_path, " ".join(command))
return self.summary.ok()
@ -407,18 +459,18 @@ def decorate_summary(out: SummaryTree, test_file: Path, seed: int, buggify: bool
tests are then hard to reproduce (they can be reproduced through TestHarness but
require the user to run in the joshua docker container). To account for this we
will write the necessary information into the attributes if it is missing."""
if 'TestFile' not in out.attributes:
out.attributes['TestFile'] = str(test_file)
if 'RandomSeed' not in out.attributes:
out.attributes['RandomSeed'] = str(seed)
if 'BuggifyEnabled' not in out.attributes:
out.attributes['BuggifyEnabled'] = '1' if buggify else '0'
if "TestFile" not in out.attributes:
out.attributes["TestFile"] = str(test_file)
if "RandomSeed" not in out.attributes:
out.attributes["RandomSeed"] = str(seed)
if "BuggifyEnabled" not in out.attributes:
out.attributes["BuggifyEnabled"] = "1" if buggify else "0"
class TestRunner:
def __init__(self):
self.uid = uuid.uuid4()
self.test_path: Path = Path('tests')
self.test_path: Path = Path("tests")
self.cluster_file: str | None = None
self.fdb_app_dir: str | None = None
self.binary_chooser = OldBinaries()
@ -426,32 +478,43 @@ class TestRunner:
def backup_sim_dir(self, seed: int):
temp_dir = config.run_dir / str(self.uid)
src_dir = temp_dir / 'simfdb'
src_dir = temp_dir / "simfdb"
assert src_dir.is_dir()
dest_dir = temp_dir / 'simfdb.{}'.format(seed)
dest_dir = temp_dir / "simfdb.{}".format(seed)
assert not dest_dir.exists()
shutil.copytree(src_dir, dest_dir)
def restore_sim_dir(self, seed: int):
temp_dir = config.run_dir / str(self.uid)
src_dir = temp_dir / 'simfdb.{}'.format(seed)
src_dir = temp_dir / "simfdb.{}".format(seed)
assert src_dir.exists()
dest_dir = temp_dir / 'simfdb'
dest_dir = temp_dir / "simfdb"
shutil.rmtree(dest_dir)
shutil.move(src_dir, dest_dir)
def run_tests(self, test_files: List[Path], seed: int, test_picker: TestPicker) -> bool:
def run_tests(
self, test_files: List[Path], seed: int, test_picker: TestPicker
) -> bool:
result: bool = True
for count, file in enumerate(test_files):
will_restart = count + 1 < len(test_files)
binary = self.binary_chooser.choose_binary(file)
unseed_check = not is_no_sim(file) and config.random.random() < config.unseed_check_ratio
unseed_check = (
not is_no_sim(file)
and config.random.random() < config.unseed_check_ratio
)
buggify_enabled: bool = config.random.random() < config.buggify_on_ratio
if unseed_check and count != 0:
# for restarting tests we will need to restore the sim2 after the first run
self.backup_sim_dir(seed + count - 1)
run = TestRun(binary, file.absolute(), seed + count, self.uid, restarting=count != 0,
stats=test_picker.dump_stats(), will_restart=will_restart, buggify_enabled=buggify_enabled)
# FIXME: support unseed checks for restarting tests
run = TestRun(
binary,
file.absolute(),
seed + count,
self.uid,
restarting=count != 0,
stats=test_picker.dump_stats(),
will_restart=will_restart,
buggify_enabled=buggify_enabled,
)
result = result and run.success
test_picker.add_time(test_files[0], run.run_time, run.summary.out)
decorate_summary(run.summary.out, file, seed + count, run.buggify_enabled)
@ -460,14 +523,22 @@ class TestRunner:
run.summary.out.dump(sys.stdout)
if not result:
return False
if unseed_check and run.summary.unseed is not None:
if count != 0:
self.restore_sim_dir(seed + count - 1)
run2 = TestRun(binary, file.absolute(), seed + count, self.uid, restarting=count != 0,
stats=test_picker.dump_stats(), expected_unseed=run.summary.unseed,
will_restart=will_restart, buggify_enabled=buggify_enabled)
if count == 0 and unseed_check and run.summary.unseed is not None:
run2 = TestRun(
binary,
file.absolute(),
seed + count,
self.uid,
restarting=count != 0,
stats=test_picker.dump_stats(),
expected_unseed=run.summary.unseed,
will_restart=will_restart,
buggify_enabled=buggify_enabled,
)
test_picker.add_time(file, run2.run_time, run.summary.out)
decorate_summary(run2.summary.out, file, seed + count, run.buggify_enabled)
decorate_summary(
run2.summary.out, file, seed + count, run.buggify_enabled
)
run2.summary.out.dump(sys.stdout)
result = result and run2.success
if not result:
@ -475,7 +546,11 @@ class TestRunner:
return result
def run(self) -> bool:
seed = config.random_seed if config.random_seed is not None else config.random.randint(0, 2 ** 32 - 1)
seed = (
config.random_seed
if config.random_seed is not None
else config.random.randint(0, 2**32 - 1)
)
test_files = self.test_picker.choose_test()
success = self.run_tests(test_files, seed, self.test_picker)
if config.clean_up:

View File

@ -21,7 +21,9 @@
#include "fdbcli/FlowLineNoise.h"
#include "flow/IThreadPool.h"
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include "boost/asio.hpp"

View File

@ -21,7 +21,9 @@
#include "flow/Platform.h"
#include <algorithm>
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include "boost/asio.hpp"

View File

@ -70,7 +70,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( MAX_MESSAGE_SIZE, std::max<int>(LOG_SYSTEM_PUSHED_DATA_BLOCK_SIZE, 1e5 + 2e4 + 1) + 8 ); // VALUE_SIZE_LIMIT + SYSTEM_KEY_SIZE_LIMIT + 9 bytes (4 bytes for length, 4 bytes for sequence number, and 1 byte for mutation type)
init( TLOG_MESSAGE_BLOCK_BYTES, 10e6 );
init( TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR, double(TLOG_MESSAGE_BLOCK_BYTES) / (TLOG_MESSAGE_BLOCK_BYTES - MAX_MESSAGE_SIZE) ); //1.0121466709838096006362758832473
init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = deterministicRandom()->coinflip() ? 0.1 : 120;
init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = 120; // Cannot be buggified lower without changing the following assert in LogSystemPeekCursor.actor.cpp: ASSERT_WE_THINK(e.code() == error_code_operation_obsolete || SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME < 10);
init( PEEK_USING_STREAMING, false ); if( randomize && isSimulated && BUGGIFY ) PEEK_USING_STREAMING = true;
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
init( MULTI_CURSOR_PRE_FETCH_LIMIT, 10 );
@ -975,7 +975,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; }
init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); }
init( REDWOOD_IO_PRIORITIES, "32,32,32,32" );
init( REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT, false );
// Server request latency measurement
init( LATENCY_SKETCH_ACCURACY, 0.01 );

View File

@ -512,10 +512,11 @@ struct GetStorageServerRejoinInfoReply {
Optional<Tag> newTag;
bool newLocality;
std::vector<std::pair<Version, Tag>> history;
EncryptionAtRestMode encryptMode;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, tag, newTag, newLocality, history);
serializer(ar, version, tag, newTag, newLocality, history, encryptMode);
}
};

View File

@ -1520,6 +1520,8 @@ struct EncryptionAtRestMode {
bool operator==(const EncryptionAtRestMode& e) const { return isEquals(e); }
bool operator!=(const EncryptionAtRestMode& e) const { return !isEquals(e); }
bool operator==(Mode m) const { return mode == m; }
bool operator!=(Mode m) const { return mode != m; }
bool isEncryptionEnabled() const { return mode != EncryptionAtRestMode::DISABLED; }
@ -1548,6 +1550,11 @@ struct EncryptionAtRestMode {
uint32_t mode;
};
template <>
struct Traceable<EncryptionAtRestMode> : std::true_type {
static std::string toString(const EncryptionAtRestMode& mode) { return mode.toString(); }
};
typedef StringRef ClusterNameRef;
typedef Standalone<ClusterNameRef> ClusterName;

View File

@ -938,7 +938,6 @@ public:
double REDWOOD_HISTOGRAM_INTERVAL;
bool REDWOOD_EVICT_UPDATED_PAGES; // Whether to prioritize eviction of updated pages from cache.
int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches
bool REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT; // Whether to split pages by tenant if encryption is enabled
std::string REDWOOD_IO_PRIORITIES;

View File

@ -22,7 +22,9 @@
// Define boost::asio::io_service
#include <algorithm>
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include <boost/asio.hpp>

View File

@ -18,7 +18,9 @@
* limitations under the License.
*/
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include <boost/asio.hpp>

View File

@ -26,7 +26,9 @@
#include "fmt/format.h"
#include "fdbrpc/simulator.h"
#include "flow/Arena.h"
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include "fdbrpc/SimExternalConnection.h"

View File

@ -300,7 +300,6 @@ ACTOR Future<Void> validateGranuleSummaries(Database cx,
// same invariant isn't always true for delta version because of force flushing around granule
// merges
if (it.keyRange == itLast.range()) {
ASSERT(it.deltaVersion >= last.deltaVersion);
if (it.snapshotVersion == last.snapshotVersion) {
ASSERT(it.snapshotSize == last.snapshotSize);
}
@ -308,7 +307,11 @@ ACTOR Future<Void> validateGranuleSummaries(Database cx,
ASSERT(it.snapshotSize == last.snapshotSize);
ASSERT(it.deltaSize == last.deltaSize);
} else if (it.snapshotVersion == last.snapshotVersion) {
ASSERT(it.deltaSize > last.deltaSize);
// empty delta files can cause version to decrease or size to remain same with a version
// increase
if (it.deltaVersion >= last.deltaVersion) {
ASSERT(it.deltaSize >= last.deltaSize);
} // else can happen because of empty delta file version bump
}
break;
}

View File

@ -914,8 +914,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
}
if (BUGGIFY && bwData->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
wait(Never());
}
if (BUGGIFY_WITH_PROB(0.01)) {
@ -953,6 +952,84 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
}
}
ACTOR Future<BlobFileIndex> writeEmptyDeltaFile(Reference<BlobWorkerData> bwData,
KeyRange keyRange,
UID granuleID,
int64_t epoch,
int64_t seqno,
Version previousVersion,
Version currentDeltaVersion,
Future<BlobFileIndex> previousDeltaFileFuture,
Future<Void> waitCommitted,
Optional<std::pair<KeyRange, UID>> oldGranuleComplete) {
ASSERT(previousVersion < currentDeltaVersion);
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
// before updating FDB, wait for the delta file version to be committed and previous delta files to finish
wait(waitCommitted);
BlobFileIndex prev = wait(previousDeltaFileFuture);
wait(delay(0, TaskPriority::BlobWorkerUpdateFDB));
// update FDB with new file
state Key oldDFKey = blobGranuleFileKeyFor(granuleID, previousVersion, 'D');
state Key newDFKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D');
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
state Optional<Value> dfValue;
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno));
// FIXME: could construct this value from the prev BlobFileIndex, but checking that the key exists in the DB
// is a good sanity check anyway
if (!dfValue.present()) {
// Only check if not seen yet. If we get commit unknown result and then retry, we'd see our own delete
wait(store(dfValue, tr->get(oldDFKey)));
ASSERT(dfValue.present());
} else {
tr->addReadConflictRange(singleKeyRange(oldDFKey));
}
tr->clear(oldDFKey);
tr->set(newDFKey, dfValue.get());
if (oldGranuleComplete.present()) {
wait(updateGranuleSplitState(&tr->getTransaction(),
oldGranuleComplete.get().first,
oldGranuleComplete.get().second,
granuleID,
BlobGranuleSplitState::Done));
}
wait(tr->commit());
if (BW_DEBUG) {
fmt::print(
"Granule {0} [{1} - {2}) empty delta file bumped version last delta file from {3} -> {4}, cv={5}\n",
granuleID.toString(),
keyRange.begin.printable(),
keyRange.end.printable(),
previousVersion,
currentDeltaVersion,
tr->getCommittedVersion());
}
if (BUGGIFY && bwData->maybeInjectTargetedRestart()) {
wait(Never());
}
if (BUGGIFY_WITH_PROB(0.01)) {
wait(delay(deterministicRandom()->random01()));
}
return BlobFileIndex(currentDeltaVersion, "", 0, 0, 0, {});
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobWorkerData> bwData,
UID granuleID,
KeyRange keyRange,
@ -1158,8 +1235,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
}
if (BUGGIFY && bwData->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
wait(Never());
}
// FIXME: change when we implement multiplexing
@ -1213,8 +1289,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
DEBUG_KEY_RANGE("BlobWorkerFDBSnapshot", readVersion, metadata->keyRange, bwData->id);
if (BUGGIFY && bwData->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
wait(Never());
}
// initial snapshot is committed in fdb, we can pop the change feed up to this version
@ -1633,8 +1708,7 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobWorkerData> bwData,
reply.proposedSplitKey = proposedSplitKey;
bwData->currentManagerStatusStream.get().send(reply);
if (BUGGIFY && bwData->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
wait(Never());
}
// if a new manager appears, also tell it about this granule being splittable, or retry after a certain
// amount of time of not hearing back
@ -1755,8 +1829,16 @@ void handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
Key cfKey,
Version cfStartVersion,
std::deque<std::pair<Version, Version>>* rollbacksCompleted,
std::deque<Future<Void>>& inFlightPops) {
metadata->files.deltaFiles.push_back(completedDeltaFile);
std::deque<Future<Void>>& inFlightPops,
bool emptyDeltaFile) {
if (emptyDeltaFile) {
ASSERT(!metadata->files.deltaFiles.empty());
ASSERT(completedDeltaFile.length == 0);
ASSERT(metadata->files.deltaFiles.back().version < completedDeltaFile.version);
metadata->files.deltaFiles.back().version = completedDeltaFile.version;
} else {
metadata->files.deltaFiles.push_back(completedDeltaFile);
}
ASSERT(metadata->durableDeltaVersion.get() < completedDeltaFile.version);
metadata->durableDeltaVersion.set(completedDeltaFile.version);
@ -1813,9 +1895,10 @@ struct InFlightFile {
Version version;
uint64_t bytes;
bool snapshot;
bool emptyDeltaFile;
InFlightFile(Future<BlobFileIndex> future, Version version, uint64_t bytes, bool snapshot)
: future(future), version(version), bytes(bytes), snapshot(snapshot) {}
InFlightFile(Future<BlobFileIndex> future, Version version, uint64_t bytes, bool snapshot, bool emptyDeltaFile)
: future(future), version(version), bytes(bytes), snapshot(snapshot), emptyDeltaFile(emptyDeltaFile) {}
};
namespace {
@ -2233,7 +2316,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
startVersion = startState.previousDurableVersion;
Future<BlobFileIndex> inFlightBlobSnapshot = compactFromBlob(
bwData, bstore, metadata, startState.granuleID, startState.blobFilesToSnapshot, startVersion);
inFlightFiles.push_back(InFlightFile(inFlightBlobSnapshot, startVersion, 0, true));
inFlightFiles.push_back(InFlightFile(inFlightBlobSnapshot, startVersion, 0, true, false));
pendingSnapshots++;
metadata->durableSnapshotVersion.set(minDurableSnapshotV);
@ -2260,6 +2343,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
if (inFlightFiles.front().future.isReady()) {
BlobFileIndex completedFile = wait(inFlightFiles.front().future);
if (inFlightFiles.front().snapshot) {
ASSERT(!inFlightFiles.front().emptyDeltaFile);
if (metadata->files.deltaFiles.empty()) {
ASSERT(completedFile.version == metadata->initialSnapshotVersion);
} else {
@ -2343,6 +2427,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
if (inFlightFiles.front().future.isReady()) {
BlobFileIndex completedFile = wait(inFlightFiles.front().future);
if (inFlightFiles.front().snapshot) {
ASSERT(!inFlightFiles.front().emptyDeltaFile);
if (metadata->files.deltaFiles.empty()) {
ASSERT(completedFile.version == metadata->initialSnapshotVersion);
} else {
@ -2360,7 +2445,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
cfKey,
startState.changeFeedStartVersion,
&rollbacksCompleted,
inFlightPops);
inFlightPops,
inFlightFiles.front().emptyDeltaFile);
}
inFlightFiles.pop_front();
@ -2733,8 +2819,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
ASSERT(lastDeltaVersion >= metadata->currentDeltas.back().version);
ASSERT(metadata->pendingDeltaVersion < metadata->currentDeltas.front().version);
} else {
// FIXME: could always write special metadata for empty file, so we don't actually
// write/read a bunch of empty blob files
ASSERT(forceFlush);
ASSERT(!forceFlushVersions.empty());
CODE_PROBE(true, "Force flushing empty delta file!");
@ -2767,24 +2851,40 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
oldChangeFeedDataComplete.present() ? ". Finalizing " : "");
}
int64_t deltaFileBudget =
std::min((int64_t)metadata->bufferedDeltaBytes, SERVER_KNOBS->BLOB_WORKER_DELTA_WRITE_BUDGET_BYTES);
startDeltaFileWrite = bwData->deltaWritesBudget->take(TaskPriority::DefaultYield, deltaFileBudget);
Future<BlobFileIndex> dfFuture =
writeDeltaFile(bwData,
bstore,
metadata->keyRange,
startState.granuleID,
metadata->originalEpoch,
metadata->originalSeqno,
metadata->currentDeltas,
lastDeltaVersion,
previousFuture,
waitVersionCommitted(bwData, metadata, lastDeltaVersion),
oldChangeFeedDataComplete,
startDeltaFileWrite,
deltaFileBudget);
inFlightFiles.push_back(InFlightFile(dfFuture, lastDeltaVersion, metadata->bufferedDeltaBytes, false));
Future<BlobFileIndex> dfFuture;
bool emptyDeltaFile = metadata->bytesInNewDeltaFiles > 0 && metadata->currentDeltas.empty();
if (emptyDeltaFile) {
// Optimization to do a metadata-only update if flushing an empty delta file
dfFuture = writeEmptyDeltaFile(bwData,
metadata->keyRange,
startState.granuleID,
metadata->originalEpoch,
metadata->originalSeqno,
metadata->pendingDeltaVersion,
lastDeltaVersion,
previousFuture,
waitVersionCommitted(bwData, metadata, lastDeltaVersion),
oldChangeFeedDataComplete);
} else {
int64_t deltaFileBudget = std::min((int64_t)metadata->bufferedDeltaBytes,
SERVER_KNOBS->BLOB_WORKER_DELTA_WRITE_BUDGET_BYTES);
startDeltaFileWrite = bwData->deltaWritesBudget->take(TaskPriority::DefaultYield, deltaFileBudget);
dfFuture = writeDeltaFile(bwData,
bstore,
metadata->keyRange,
startState.granuleID,
metadata->originalEpoch,
metadata->originalSeqno,
metadata->currentDeltas,
lastDeltaVersion,
previousFuture,
waitVersionCommitted(bwData, metadata, lastDeltaVersion),
oldChangeFeedDataComplete,
startDeltaFileWrite,
deltaFileBudget);
}
inFlightFiles.push_back(
InFlightFile(dfFuture, lastDeltaVersion, metadata->bufferedDeltaBytes, false, emptyDeltaFile));
// add new pending delta file
ASSERT(metadata->pendingDeltaVersion < lastDeltaVersion);
@ -2864,7 +2964,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
reSnapshotNoCheck(bwData, bstore, metadata, startState.granuleID, previousFuture);
writeAmpTarget.decrease(metadata->bytesInNewDeltaFiles);
}
inFlightFiles.push_back(InFlightFile(inFlightBlobSnapshot, metadata->pendingDeltaVersion, 0, true));
inFlightFiles.push_back(
InFlightFile(inFlightBlobSnapshot, metadata->pendingDeltaVersion, 0, true, false));
pendingSnapshots++;
metadata->pendingSnapshotVersion = metadata->pendingDeltaVersion;
@ -2903,6 +3004,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// TODO don't duplicate code
BlobFileIndex completedFile = wait(inFlightFiles.front().future);
if (inFlightFiles.front().snapshot) {
ASSERT(!inFlightFiles.front().emptyDeltaFile);
if (metadata->files.deltaFiles.empty()) {
ASSERT(completedFile.version == metadata->initialSnapshotVersion);
} else {
@ -2919,7 +3021,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
cfKey,
startState.changeFeedStartVersion,
&rollbacksCompleted,
inFlightPops);
inFlightPops,
inFlightFiles.front().emptyDeltaFile);
}
inFlightFiles.pop_front();
@ -4377,8 +4480,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
}
if (BUGGIFY && bwData->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
wait(Never());
}
return info;

View File

@ -363,6 +363,7 @@ ACTOR Future<Void> newSeedServers(Reference<ClusterRecoveryData> self,
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = deterministicRandom()->randomUniqueID();
isr.initialClusterVersion = self->recoveryTransactionVersion;
isr.encryptMode = self->configuration.encryptionAtRestMode;
ErrorOr<InitializeStorageReply> newServer = wait(recruits.storageServers[idx].storage.tryGetReply(isr));

View File

@ -203,6 +203,7 @@ struct ResolutionRequestBuilder {
ASSERT(transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768);
bool isTXNStateTransaction = false;
DisabledTraceEvent("AddTransaction", self->dbgid).detail("TenantMode", (int)self->getTenantMode());
bool needParseTenantId = !trRequest.tenantInfo.hasTenant() && self->getTenantMode() == TenantMode::REQUIRED;
VectorRef<int64_t> tenantIds;
for (auto& m : trIn.mutations) {
@ -2697,6 +2698,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
}
rep.newTag = Tag(maxTagLocality + 1, 0);
}
rep.encryptMode = commitData->encryptMode;
req.reply.send(rep);
} else {
req.reply.sendError(worker_removed());
@ -3284,7 +3286,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
TraceEvent("CPEncryptionAtRestMode").detail("Mode", commitData.encryptMode.toString());
TraceEvent("CPEncryptionAtRestMode", proxy.id()).detail("Mode", commitData.encryptMode);
addActor.send(waitFailureServer(proxy.waitFailure.getFuture()));
addActor.send(traceRole(Role::COMMIT_PROXY, proxy.id()));
@ -3449,7 +3451,16 @@ ACTOR Future<Void> updateLocalDbInfo(Reference<AsyncVar<ServerDBInfo> const> in,
// only update the db info if this is the current CP, or before we received first one including current CP.
// Several db infos at the beginning just contain the provisional CP
if (isIncluded || !firstValidDbInfo) {
out->set(in->get());
DisabledTraceEvent("UpdateLocalDbInfo", myInterface.id())
.detail("Provisional", myInterface.provisional)
.detail("Included", isIncluded)
.detail("FirstValid", firstValidDbInfo)
.detail("ReceivedRC", in->get().recoveryCount)
.detail("RecoveryCount", recoveryCount)
.detail("TenantMode", (int)in->get().client.tenantMode);
if (in->get().recoveryCount >= out->get().recoveryCount) {
out->set(in->get());
}
}
wait(in->onChange());

View File

@ -2355,6 +2355,7 @@ public:
isr.seedTag = invalidTag;
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = interfaceId;
isr.encryptMode = self->configuration.encryptionAtRestMode;
// if tss, wait for pair ss to finish and add its id to isr. If pair fails, don't recruit tss
state bool doRecruit = true;

View File

@ -19,7 +19,9 @@
*/
#if !defined(_WIN32) && !defined(__APPLE__) && !defined(__INTEL_COMPILER)
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include <boost/process.hpp>

View File

@ -81,6 +81,10 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
return doReadRange(store, keys, rowLimit, byteLimit, options);
}
Future<EncryptionAtRestMode> encryptionMode() override {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}
private:
ACTOR static Future<Optional<Value>> doReadValue(IKeyValueStore* store, Key key, Optional<ReadOptions> options) {
Optional<Value> v = wait(store->readValue(key, options));

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/BlobCipher.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
@ -290,6 +291,12 @@ public:
int uncommittedBytes() { return queue.totalSize(); }
// KeyValueStoreMemory does not support encryption-at-rest in general, despite it supports encryption
// when being used as TxnStateStore backend.
Future<EncryptionAtRestMode> encryptionMode() override {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}
private:
enum OpType {
OpSet,

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#ifdef SSD_ROCKSDB_EXPERIMENTAL
#include <rocksdb/c.h>
@ -2267,6 +2268,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return Void();
}
Future<EncryptionAtRestMode> encryptionMode() override {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}
DB db = nullptr;
std::shared_ptr<SharedRocksDBState> sharedState;
std::shared_ptr<PerfContextMetrics> perfContextMetrics;

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#define SQLITE_THREADSAFE 0 // also in sqlite3.amalgamation.c!
#include "fmt/format.h"
#include "crc32/crc32c.h"
@ -1639,6 +1640,10 @@ public:
Future<SpringCleaningWorkPerformed> doClean();
void startReadThreads();
Future<EncryptionAtRestMode> encryptionMode() override {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}
private:
KeyValueStoreType type;
UID logID;

View File

@ -1,3 +1,4 @@
#include "fdbclient/FDBTypes.h"
#ifdef SSD_ROCKSDB_EXPERIMENTAL
#include "fdbclient/KeyRangeMap.h"
@ -3102,6 +3103,10 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
// Used for debugging shard mapping issue.
std::vector<std::pair<KeyRange, std::string>> getDataMapping() { return shardManager.getDataMapping(); }
Future<EncryptionAtRestMode> encryptionMode() override {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}
std::shared_ptr<ShardedRocksDBState> rState;
rocksdb::Options dbOptions;
ShardManager shardManager;

View File

@ -1472,6 +1472,8 @@ private:
void setSimpleConfig();
void setSpecificConfig(const TestConfig& testConfig);
void setDatacenters(const TestConfig& testConfig);
void setTenantMode(const TestConfig& testConfig);
void setEncryptionAtRestMode(const TestConfig& testConfig);
void setStorageEngine(const TestConfig& testConfig);
void setRegions(const TestConfig& testConfig);
void setReplicationType(const TestConfig& testConfig);
@ -1579,12 +1581,60 @@ void SimulationConfig::setDatacenters(const TestConfig& testConfig) {
}
}
void SimulationConfig::setTenantMode(const TestConfig& testConfig) {
TenantMode tenantMode = TenantMode::DISABLED;
if (testConfig.tenantModes.size() > 0) {
tenantMode = TenantMode::fromString(deterministicRandom()->randomChoice(testConfig.tenantModes));
} else if (testConfig.allowDefaultTenant && deterministicRandom()->coinflip()) {
tenantMode = deterministicRandom()->random01() < 0.9 ? TenantMode::REQUIRED : TenantMode::OPTIONAL_TENANT;
} else if (deterministicRandom()->coinflip()) {
tenantMode = TenantMode::OPTIONAL_TENANT;
}
set_config("tenant_mode=" + tenantMode.toString());
}
void SimulationConfig::setEncryptionAtRestMode(const TestConfig& testConfig) {
EncryptionAtRestMode encryptionMode = EncryptionAtRestMode::DISABLED;
// Only Redwood support encryption. Disable encryption if non-Redwood storage engine is explicitly specified.
bool disableEncryption = testConfig.disableEncryption ||
(testConfig.storageEngineType.present() && testConfig.storageEngineType.get() != 3);
// TODO: Remove check on the ENABLE_ENCRYPTION knob once the EKP can start using the db config
if (!disableEncryption && (SERVER_KNOBS->ENABLE_ENCRYPTION || !testConfig.encryptModes.empty())) {
TenantMode tenantMode = db.tenantMode;
if (!testConfig.encryptModes.empty()) {
std::vector<EncryptionAtRestMode> validEncryptModes;
// Get the subset of valid encrypt modes given the tenant mode
for (int i = 0; i < testConfig.encryptModes.size(); i++) {
EncryptionAtRestMode encryptMode = EncryptionAtRestMode::fromString(testConfig.encryptModes.at(i));
if (encryptMode != EncryptionAtRestMode::DOMAIN_AWARE || tenantMode == TenantMode::REQUIRED) {
validEncryptModes.push_back(encryptMode);
}
}
if (validEncryptModes.size() > 0) {
encryptionMode = deterministicRandom()->randomChoice(validEncryptModes);
}
} else {
// TODO: These cases should only trigger with probability (BUGGIFY) once the server knob is removed
if (tenantMode == TenantMode::DISABLED || tenantMode == TenantMode::OPTIONAL_TENANT || BUGGIFY) {
// optional and disabled tenant modes currently only support cluster aware encryption
encryptionMode = EncryptionAtRestMode::CLUSTER_AWARE;
} else {
encryptionMode = EncryptionAtRestMode::DOMAIN_AWARE;
}
}
}
set_config("encryption_at_rest_mode=" + encryptionMode.toString());
}
// Sets storage engine based on testConfig details
void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
// Using [0, 4) to disable the RocksDB storage engine.
// TODO: Figure out what is broken with the RocksDB engine in simulation.
int storage_engine_type = deterministicRandom()->randomInt(0, 6);
if (testConfig.storageEngineType.present()) {
if (db.encryptionAtRestMode.isEncryptionEnabled()) {
// Only storage engine supporting encryption is Redwood.
storage_engine_type = 3;
} else if (testConfig.storageEngineType.present()) {
storage_engine_type = testConfig.storageEngineType.get();
} else {
// Continuously re-pick the storage engine type if it's the one we want to exclude
@ -2038,7 +2088,8 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
setSimpleConfig();
}
setSpecificConfig(testConfig);
setTenantMode(testConfig);
setEncryptionAtRestMode(testConfig);
setStorageEngine(testConfig);
setReplicationType(testConfig);
if (generateFearless || (datacenters == 2 && deterministicRandom()->random01() < 0.5)) {
@ -2059,15 +2110,6 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
setConfigDB(testConfig);
}
bool validateEncryptAndTenantModePair(EncryptionAtRestMode encryptMode, TenantMode tenantMode) {
// Domain aware encryption is only allowed when the tenant mode is required. Other encryption modes (disabled or
// cluster aware) are allowed regardless of the tenant mode
if (encryptMode.mode == EncryptionAtRestMode::DISABLED || encryptMode.mode == EncryptionAtRestMode::CLUSTER_AWARE) {
return true;
}
return tenantMode == TenantMode::REQUIRED;
}
// Configures the system according to the given specifications in order to run
// simulation under the correct conditions
void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
@ -2078,49 +2120,22 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
std::string whitelistBinPaths,
TestConfig testConfig,
ProtocolVersion protocolVersion,
TenantMode tenantMode) {
Optional<TenantMode>* tenantMode) {
auto& g_knobs = IKnobCollection::getMutableGlobalKnobCollection();
// SOMEDAY: this does not test multi-interface configurations
SimulationConfig simconfig(testConfig);
*tenantMode = simconfig.db.tenantMode;
if (testConfig.logAntiQuorum != -1) {
simconfig.db.tLogWriteAntiQuorum = testConfig.logAntiQuorum;
}
simconfig.db.tenantMode = tenantMode;
simconfig.db.encryptionAtRestMode = EncryptionAtRestMode::DISABLED;
// TODO: Remove check on the ENABLE_ENCRYPTION knob once the EKP can start using the db config
if (!testConfig.disableEncryption && (SERVER_KNOBS->ENABLE_ENCRYPTION || !testConfig.encryptModes.empty())) {
if (!testConfig.encryptModes.empty()) {
std::vector<EncryptionAtRestMode> validEncryptModes;
// Get the subset of valid encrypt modes given the tenant mode
for (int i = 0; i < testConfig.encryptModes.size(); i++) {
EncryptionAtRestMode encryptMode = EncryptionAtRestMode::fromString(testConfig.encryptModes.at(i));
if (validateEncryptAndTenantModePair(encryptMode, tenantMode)) {
validEncryptModes.push_back(encryptMode);
}
}
if (validEncryptModes.size() > 0) {
simconfig.db.encryptionAtRestMode = deterministicRandom()->randomChoice(validEncryptModes);
}
} else {
// TODO: These cases should only trigger with probability (BUGGIFY) once the server knob is removed
if (tenantMode == TenantMode::DISABLED || tenantMode == TenantMode::OPTIONAL_TENANT || BUGGIFY) {
// optional and disabled tenant modes currently only support cluster aware encryption
simconfig.db.encryptionAtRestMode = EncryptionAtRestMode::CLUSTER_AWARE;
} else {
simconfig.db.encryptionAtRestMode = EncryptionAtRestMode::DOMAIN_AWARE;
}
}
}
// TODO: remove knob hanlding once we move off from encrypption knobs to db config
if (simconfig.db.encryptionAtRestMode.mode == EncryptionAtRestMode::DISABLED) {
g_knobs.setKnob("enable_encryption", KnobValueRef::create(bool{ false }));
CODE_PROBE(true, "Disabled encryption in simulation");
} else {
g_knobs.setKnob("enable_encryption", KnobValueRef::create(bool{ true }));
g_knobs.setKnob(
"redwood_split_encrypted_pages_by_tenant",
KnobValueRef::create(bool{ simconfig.db.encryptionAtRestMode.mode == EncryptionAtRestMode::DOMAIN_AWARE }));
CODE_PROBE(simconfig.db.encryptionAtRestMode.mode == EncryptionAtRestMode::CLUSTER_AWARE,
"Enabled cluster-aware encryption in simulation");
CODE_PROBE(simconfig.db.encryptionAtRestMode.mode == EncryptionAtRestMode::DOMAIN_AWARE,
@ -2698,28 +2713,7 @@ ACTOR void setupAndRun(std::string dataFolder,
state Optional<TenantName> defaultTenant;
state Standalone<VectorRef<TenantNameRef>> tenantsToCreate;
state TenantMode tenantMode = TenantMode::DISABLED;
// If this is a restarting test, restartInfo.ini is read in restartSimulatedSystem
// where we update the defaultTenant and tenantMode in the testConfig
// Defer setting tenant mode and default tenant until later
if (!rebooting) {
if (testConfig.tenantModes.size()) {
auto randomPick = deterministicRandom()->randomChoice(testConfig.tenantModes);
tenantMode = TenantMode::fromString(randomPick);
if (tenantMode == TenantMode::REQUIRED && allowDefaultTenant) {
defaultTenant = "SimulatedDefaultTenant"_sr;
}
} else if (allowDefaultTenant && deterministicRandom()->coinflip()) {
defaultTenant = "SimulatedDefaultTenant"_sr;
if (deterministicRandom()->random01() < 0.9) {
tenantMode = TenantMode::REQUIRED;
} else {
tenantMode = TenantMode::OPTIONAL_TENANT;
}
} else if (deterministicRandom()->coinflip()) {
tenantMode = TenantMode::OPTIONAL_TENANT;
}
}
state Optional<TenantMode> tenantMode;
try {
// systemActors.push_back( startSystemMonitor(dataFolder) );
@ -2747,17 +2741,28 @@ ACTOR void setupAndRun(std::string dataFolder,
whitelistBinPaths,
testConfig,
protocolVersion,
tenantMode);
&tenantMode);
wait(delay(1.0)); // FIXME: WHY!!! //wait for machines to boot
}
// restartSimulatedSystem can adjust some testConfig params related to tenants
// so set/overwrite those options if necessary here
if (rebooting && testConfig.tenantModes.size()) {
tenantMode = TenantMode::fromString(testConfig.tenantModes[0]);
if (rebooting) {
if (testConfig.tenantModes.size()) {
tenantMode = TenantMode::fromString(testConfig.tenantModes[0]);
} else {
tenantMode = TenantMode::DISABLED;
}
}
if (testConfig.defaultTenant.present() && tenantMode != TenantMode::DISABLED && allowDefaultTenant) {
// setupSimulatedSystem/restartSimulatedSystem should fill tenantMode with valid value.
ASSERT(tenantMode.present());
if (tenantMode != TenantMode::DISABLED && allowDefaultTenant) {
// Default tenant set by testConfig or restarting data in restartInfo.ini
defaultTenant = testConfig.defaultTenant.get();
if (testConfig.defaultTenant.present()) {
defaultTenant = testConfig.defaultTenant.get();
} else if (!rebooting && (tenantMode == TenantMode::REQUIRED || deterministicRandom()->coinflip())) {
defaultTenant = "SimulatedDefaultTenant"_sr;
}
}
if (!rebooting) {
if (defaultTenant.present() && allowDefaultTenant) {
@ -2773,7 +2778,7 @@ ACTOR void setupAndRun(std::string dataFolder,
}
TraceEvent("SimulatedClusterTenantMode")
.detail("UsingTenant", defaultTenant)
.detail("TenantMode", tenantMode.toString())
.detail("TenantMode", tenantMode.get().toString())
.detail("TotalTenants", tenantsToCreate.size());
std::string clusterFileDir = joinPath(dataFolder, deterministicRandom()->randomUniqueID().toString());
platform::createDirectory(clusterFileDir);

View File

@ -2017,18 +2017,12 @@ public:
int64_t remapCleanupWindowBytes,
int concurrentExtentReads,
bool memoryOnly,
Reference<IPageEncryptionKeyProvider> keyProvider,
Promise<Void> errorPromise = {})
: keyProvider(keyProvider),
ioLock(makeReference<PriorityMultiLock>(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_IO_PRIORITIES)),
: ioLock(makeReference<PriorityMultiLock>(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_IO_PRIORITIES)),
pageCacheBytes(pageCacheSizeBytes), desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize),
filename(filename), memoryOnly(memoryOnly), errorPromise(errorPromise),
remapCleanupWindowBytes(remapCleanupWindowBytes), concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
if (!keyProvider) {
keyProvider = makeReference<NullKeyProvider>();
}
// This sets the page cache size for all PageCacheT instances using the same evictor
pageCache.evictor().sizeLimit = pageCacheBytes;
@ -2043,6 +2037,11 @@ public:
std::string getName() const override { return filename; }
void setEncryptionKeyProvider(Reference<IPageEncryptionKeyProvider> kp) override {
keyProvider = kp;
keyProviderInitialized.send(Void());
}
void setPageSize(int size) {
// Conservative maximum for number of records that can fit in this page size
g_redwoodMetrics.updateMaxRecordCount(315.0 * size / 4096);
@ -2812,6 +2811,10 @@ public:
try {
page->postReadHeader(pageID);
if (page->isEncrypted()) {
if (!self->keyProvider.isValid()) {
wait(self->keyProviderInitialized.getFuture());
ASSERT(self->keyProvider.isValid());
}
ArenaPage::EncryptionKey k = wait(self->keyProvider->getEncryptionKey(page->getEncodingHeader()));
page->encryptionKey = k;
}
@ -3807,6 +3810,7 @@ private:
int pagesPerExtent;
Reference<IPageEncryptionKeyProvider> keyProvider;
Promise<Void> keyProviderInitialized;
Reference<PriorityMultiLock> ioLock;
@ -4889,18 +4893,20 @@ public:
uint8_t height;
LazyClearQueueT::QueueState lazyDeleteQueue;
BTreeNodeLink root;
EncryptionAtRestMode encryptionMode = EncryptionAtRestMode::DISABLED; // since 7.3
std::string toString() {
return format("{formatVersion=%d height=%d root=%s lazyDeleteQueue=%s}",
return format("{formatVersion=%d height=%d root=%s lazyDeleteQueue=%s encryptionMode=%s}",
(int)formatVersion,
(int)height,
::toString(root).c_str(),
lazyDeleteQueue.toString().c_str());
lazyDeleteQueue.toString().c_str(),
encryptionMode.toString().c_str());
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, formatVersion, encodingType, height, lazyDeleteQueue, root);
serializer(ar, formatVersion, encodingType, height, lazyDeleteQueue, root, encryptionMode);
}
};
@ -4964,35 +4970,21 @@ public:
// VersionedBTree takes ownership of pager
VersionedBTree(IPager2* pager,
std::string name,
EncodingType defaultEncodingType,
Reference<IPageEncryptionKeyProvider> keyProvider)
: m_pager(pager), m_encodingType(defaultEncodingType), m_enforceEncodingType(false), m_keyProvider(keyProvider),
m_pBuffer(nullptr), m_mutationCount(0), m_name(name) {
// For encrypted encoding types, enforce that BTree nodes read from disk use the default encoding type
// This prevents an attack where an encrypted page is replaced by an attacker with an unencrypted page
// or an encrypted page fabricated using a compromised scheme.
if (ArenaPage::isEncodingTypeEncrypted(m_encodingType)) {
ASSERT(keyProvider.isValid());
m_enforceEncodingType = true;
}
// If key provider isn't given, instantiate the null provider
if (!m_keyProvider) {
m_keyProvider = makeReference<NullKeyProvider>();
}
m_pBoundaryVerifier = DecodeBoundaryVerifier::getVerifier(name);
if (m_pBoundaryVerifier != nullptr) {
m_pBoundaryVerifier->setKeyProvider(m_keyProvider);
}
Reference<AsyncVar<ServerDBInfo> const> db,
Optional<EncryptionAtRestMode> expectedEncryptionMode,
EncodingType encodingType = EncodingType::MAX_ENCODING_TYPE,
Reference<IPageEncryptionKeyProvider> keyProvider = {})
: m_pager(pager), m_db(db), m_expectedEncryptionMode(expectedEncryptionMode), m_encodingType(encodingType),
m_enforceEncodingType(false), m_keyProvider(keyProvider), m_pBuffer(nullptr), m_mutationCount(0), m_name(name),
m_pBoundaryVerifier(DecodeBoundaryVerifier::getVerifier(name)) {
m_pDecodeCacheMemory = m_pager->getPageCachePenaltySource();
m_lazyClearActor = 0;
m_init = init_impl(this);
m_latestCommit = m_init;
}
Future<EncryptionAtRestMode> encryptionMode() { return m_encryptionMode.getFuture(); }
ACTOR static Future<Reference<ArenaPage>> makeEmptyRoot(VersionedBTree* self) {
state Reference<ArenaPage> page = self->m_pager->newPageBuffer();
page->init(self->m_encodingType, PageType::BTreeNode, 1);
@ -5115,6 +5107,71 @@ public:
return freedPages;
}
void checkOrUpdateEncodingType(const std::string& event,
const EncryptionAtRestMode& encryptionMode,
EncodingType& encodingType) {
EncodingType expectedEncodingType = EncodingType::MAX_ENCODING_TYPE;
if (encryptionMode == EncryptionAtRestMode::DISABLED) {
expectedEncodingType = EncodingType::XXHash64;
} else {
expectedEncodingType = FLOW_KNOBS->ENCRYPT_HEADER_AUTH_TOKEN_ENABLED ? EncodingType::AESEncryptionWithAuth
: EncodingType::AESEncryption;
}
// Randomly enable XOR encryption in simulation. Also ignore encoding type mismatch if XOR encryption is set but
// default encoding is expected.
if (encodingType == EncodingType::MAX_ENCODING_TYPE) {
encodingType = expectedEncodingType;
if (encodingType == EncodingType::XXHash64 && g_network->isSimulated() && BUGGIFY) {
encodingType = EncodingType::XOREncryption_TestOnly;
}
} else if (encodingType != expectedEncodingType) {
// In simulation we could enable xor encryption for testing. Ignore encoding type mismatch in such a case.
if (!(g_network->isSimulated() && encodingType == EncodingType::XOREncryption_TestOnly &&
expectedEncodingType == EncodingType::XXHash64)) {
TraceEvent(SevWarnAlways, "RedwoodBTreeMismatchEncryptionModeAndEncodingType")
.detail("InstanceName", m_pager->getName())
.detail("Event", event)
.detail("EncryptionMode", encryptionMode)
.detail("ExpectedEncodingType", expectedEncodingType)
.detail("ActualEncodingType", encodingType);
throw encrypt_mode_mismatch();
}
}
}
void initEncryptionKeyProvider() {
if (!m_keyProvider.isValid()) {
switch (m_encodingType) {
case EncodingType::XXHash64:
m_keyProvider = makeReference<NullEncryptionKeyProvider>();
break;
case EncodingType::XOREncryption_TestOnly:
m_keyProvider = makeReference<XOREncryptionKeyProvider_TestOnly>(m_name);
break;
case EncodingType::AESEncryption:
ASSERT(m_expectedEncryptionMode.present());
ASSERT(m_db.isValid());
m_keyProvider =
makeReference<AESEncryptionKeyProvider<AESEncryption>>(m_db, m_expectedEncryptionMode.get());
break;
case EncodingType::AESEncryptionWithAuth:
ASSERT(m_expectedEncryptionMode.present());
ASSERT(m_db.isValid());
m_keyProvider = makeReference<AESEncryptionKeyProvider<AESEncryptionWithAuth>>(
m_db, m_expectedEncryptionMode.get());
break;
default:
ASSERT(false);
}
} else {
ASSERT_EQ(m_encodingType, m_keyProvider->expectedEncodingType());
}
m_pager->setEncryptionKeyProvider(m_keyProvider);
if (m_pBoundaryVerifier != nullptr) {
m_pBoundaryVerifier->setKeyProvider(m_keyProvider);
}
}
ACTOR static Future<Void> init_impl(VersionedBTree* self) {
wait(self->m_pager->init());
self->m_pBuffer.reset(new MutationBuffer());
@ -5134,9 +5191,17 @@ public:
state Value btreeHeader = self->m_pager->getCommitRecord();
if (btreeHeader.size() == 0) {
// Create new BTree
ASSERT(self->m_expectedEncryptionMode.present());
self->m_encryptionMode.send(self->m_expectedEncryptionMode.get());
self->checkOrUpdateEncodingType("NewBTree", self->m_expectedEncryptionMode.get(), self->m_encodingType);
self->initEncryptionKeyProvider();
self->m_enforceEncodingType = isEncodingTypeEncrypted(self->m_encodingType);
self->m_header.formatVersion = BTreeCommitHeader::FORMAT_VERSION;
self->m_header.encodingType = self->m_encodingType;
self->m_header.height = 1;
self->m_header.encryptionMode = self->m_expectedEncryptionMode.get();
LogicalPageID id = wait(self->m_pager->newPageID());
self->m_header.root = BTreeNodeLinkRef((LogicalPageID*)&id, 1);
@ -5166,28 +5231,39 @@ public:
throw e;
}
if (self->m_expectedEncryptionMode.present()) {
if (self->m_header.encryptionMode != self->m_expectedEncryptionMode.get()) {
TraceEvent(SevWarnAlways, "RedwoodBTreeEncryptionModeMismatched")
.detail("InstanceName", self->m_pager->getName())
.detail("ExpectedEncryptionMode", self->m_expectedEncryptionMode)
.detail("StoredEncryptionMode", self->m_header.encryptionMode);
throw encrypt_mode_mismatch();
} else {
self->m_expectedEncryptionMode = self->m_header.encryptionMode;
}
} else {
self->m_expectedEncryptionMode = self->m_header.encryptionMode;
}
self->m_encryptionMode.send(self->m_header.encryptionMode);
ASSERT_NE(EncodingType::MAX_ENCODING_TYPE, self->m_header.encodingType);
if (self->m_encodingType == EncodingType::MAX_ENCODING_TYPE) {
self->m_encodingType = self->m_header.encodingType;
} else if (self->m_encodingType != self->m_header.encodingType) {
TraceEvent(SevWarn, "RedwoodBTreeUnexpectedEncodingType")
.detail("InstanceName", self->m_pager->getName())
.detail("UsingEncodingType", self->m_encodingType)
.detail("ExistingEncodingType", self->m_header.encodingType);
}
// Verify if encryption mode and encoding type in the header are consistent.
// This check can also fail in case of authentication mode mismatch.
self->checkOrUpdateEncodingType("ExistingBTree", self->m_header.encryptionMode, self->m_encodingType);
self->initEncryptionKeyProvider();
self->m_enforceEncodingType = isEncodingTypeEncrypted(self->m_encodingType);
self->m_lazyClearQueue.recover(self->m_pager, self->m_header.lazyDeleteQueue, "LazyClearQueueRecovered");
debug_printf("BTree recovered.\n");
if (ArenaPage::isEncodingTypeEncrypted(self->m_header.encodingType) &&
self->m_encodingType == EncodingType::XXHash64) {
// On restart the encryption config of the cluster could be unknown. In that case if we find the Redwood
// instance is encrypted, we should use the same encryption encoding.
self->m_encodingType = self->m_header.encodingType;
self->m_enforceEncodingType = true;
TraceEvent("RedwoodBTreeNodeForceEncryption")
.detail("InstanceName", self->m_pager->getName())
.detail("EncodingFound", self->m_header.encodingType)
.detail("EncodingDesired", self->m_encodingType);
}
if (self->m_header.encodingType != self->m_encodingType) {
TraceEvent(SevWarn, "RedwoodBTreeNodeEncodingMismatch")
.detail("InstanceName", self->m_pager->getName())
.detail("EncodingFound", self->m_header.encodingType)
.detail("EncodingDesired", self->m_encodingType);
}
}
self->m_lazyClearActor = 0;
TraceEvent e(SevInfo, "RedwoodRecoveredBTree");
@ -5487,6 +5563,10 @@ private:
*/
IPager2* m_pager;
Reference<AsyncVar<ServerDBInfo> const> m_db;
Optional<EncryptionAtRestMode> m_expectedEncryptionMode;
Promise<EncryptionAtRestMode> m_encryptionMode;
EncodingType m_encodingType;
bool m_enforceEncodingType;
Reference<IPageEncryptionKeyProvider> m_keyProvider;
@ -5526,12 +5606,12 @@ private:
int blockSize,
EncodingType encodingType,
unsigned int height,
bool useEncryptionDomain,
bool enableEncryptionDomain,
bool splitByDomain,
IPageEncryptionKeyProvider* keyProvider)
: startIndex(index), count(0), pageSize(blockSize),
largeDeltaTree(pageSize > BTreePage::BinaryTree::SmallSizeLimit), blockSize(blockSize), blockCount(1),
kvBytes(0), encodingType(encodingType), height(height), useEncryptionDomain(useEncryptionDomain),
kvBytes(0), encodingType(encodingType), height(height), enableEncryptionDomain(enableEncryptionDomain),
splitByDomain(splitByDomain), keyProvider(keyProvider) {
// Subtrace Page header overhead, BTreePage overhead, and DeltaTree (BTreePage::BinaryTree) overhead.
@ -5541,7 +5621,7 @@ private:
PageToBuild next() {
return PageToBuild(
endIndex(), blockSize, encodingType, height, useEncryptionDomain, splitByDomain, keyProvider);
endIndex(), blockSize, encodingType, height, enableEncryptionDomain, splitByDomain, keyProvider);
}
int startIndex; // Index of the first record
@ -5556,7 +5636,7 @@ private:
EncodingType encodingType;
unsigned int height;
bool useEncryptionDomain;
bool enableEncryptionDomain;
bool splitByDomain;
IPageEncryptionKeyProvider* keyProvider;
@ -5635,7 +5715,7 @@ private:
return false;
}
if (useEncryptionDomain) {
if (enableEncryptionDomain) {
int64_t defaultDomainId = keyProvider->getDefaultEncryptionDomainId();
int64_t currentDomainId;
size_t prefixLength;
@ -5709,7 +5789,7 @@ private:
}
void finish() {
if (useEncryptionDomain && canUseDefaultDomain) {
if (enableEncryptionDomain && canUseDefaultDomain) {
domainId = keyProvider->getDefaultEncryptionDomainId();
}
}
@ -5735,12 +5815,12 @@ private:
std::vector<PageToBuild> pages;
// Whether encryption is used and we need to set encryption domain for a page.
bool useEncryptionDomain =
ArenaPage::isEncodingTypeEncrypted(m_encodingType) && m_keyProvider->enableEncryptionDomain();
bool enableEncryptionDomain =
isEncodingTypeEncrypted(m_encodingType) && m_keyProvider->enableEncryptionDomain();
// Whether we may need to split by encryption domain. It is mean to be an optimization to avoid
// unnecessary domain check and may not be exhaust all cases.
bool splitByDomain = false;
if (useEncryptionDomain && records.size() > 1) {
if (enableEncryptionDomain && records.size() > 1) {
int64_t firstDomain = std::get<0>(m_keyProvider->getEncryptionDomain(records[0].key));
int64_t lastDomain = std::get<0>(m_keyProvider->getEncryptionDomain(records[records.size() - 1].key));
// If the two record falls in the same non-default domain, we know all the records fall in the
@ -5759,7 +5839,7 @@ private:
}
PageToBuild p(
0, m_blockSize, m_encodingType, height, useEncryptionDomain, splitByDomain, m_keyProvider.getPtr());
0, m_blockSize, m_encodingType, height, enableEncryptionDomain, splitByDomain, m_keyProvider.getPtr());
for (int i = 0; i < records.size();) {
bool force = p.count < minRecords || p.slackFraction() > maxSlack;
@ -5799,8 +5879,8 @@ private:
PageToBuild& b = pages.back();
// We can rebalance the two pages only if they are in the same encryption domain.
ASSERT(!useEncryptionDomain || (a.domainId.present() && b.domainId.present()));
if (!useEncryptionDomain || a.domainId.get() == b.domainId.get()) {
ASSERT(!enableEncryptionDomain || (a.domainId.present() && b.domainId.present()));
if (!enableEncryptionDomain || a.domainId.get() == b.domainId.get()) {
// While the last page page has too much slack and the second to last page
// has more than the minimum record count, shift a record from the second
@ -5835,8 +5915,8 @@ private:
state int prefixLen = lowerBound->getCommonPrefixLen(*upperBound);
// Whether encryption is used and we need to set encryption domain for a page.
state bool useEncryptionDomain =
ArenaPage::isEncodingTypeEncrypted(self->m_encodingType) && self->m_keyProvider->enableEncryptionDomain();
state bool enableEncryptionDomain =
isEncodingTypeEncrypted(self->m_encodingType) && self->m_keyProvider->enableEncryptionDomain();
state std::vector<PageToBuild> pagesToBuild =
self->splitPages(lowerBound, upperBound, prefixLen, entries, height);
@ -5850,7 +5930,7 @@ private:
state int pageIndex;
if (useEncryptionDomain) {
if (enableEncryptionDomain) {
ASSERT(pagesToBuild[0].domainId.present());
int64_t domainId = pagesToBuild[0].domainId.get();
// We make sure the page lower bound fits in the domain of the page.
@ -5885,7 +5965,7 @@ private:
pageUpperBound.truncate(commonPrefix + 1);
}
if (useEncryptionDomain && pageUpperBound.key != dbEnd.key) {
if (enableEncryptionDomain && pageUpperBound.key != dbEnd.key) {
int64_t ubDomainId;
KeyRef ubDomainPrefix;
if (lastPage) {
@ -5916,10 +5996,10 @@ private:
--p->count;
debug_printf("Skipping first null record, new count=%d\n", p->count);
// In case encryption or encryption domain is not enabled, if the page is now empty then it must be the
// last page in pagesToBuild, otherwise there would be more than 1 item since internal pages need to
// have multiple children. In case encryption and encryption domain is enabled, however, because of the
// page split by encryption domain, it may not be the last page.
// In case encryption or encryption domain is not enabled, if the page is now empty then it must be
// the last page in pagesToBuild, otherwise there would be more than 1 item since internal pages
// need to have multiple children. In case encryption and encryption domain is enabled, however,
// because of the page split by encryption domain, it may not be the last page.
//
// Either way, a record must be added to the output set because the upper boundary of the last
// page built does not match the upper boundary of the original page that this call to writePages() is
@ -5927,7 +6007,7 @@ private:
// built does not match the upper boundary of the original page that the page set is replacing, so
// adding the extra null link fixes this.
if (p->count == 0) {
ASSERT(useEncryptionDomain || lastPage);
ASSERT(enableEncryptionDomain || lastPage);
records.push_back_deep(records.arena(), pageLowerBound);
pageLowerBound = pageUpperBound;
continue;
@ -5940,8 +6020,8 @@ private:
self->m_encodingType, (p->blockCount == 1) ? PageType::BTreeNode : PageType::BTreeSuperNode, height);
if (page->isEncrypted()) {
ArenaPage::EncryptionKey k =
wait(useEncryptionDomain ? self->m_keyProvider->getLatestEncryptionKey(p->domainId.get())
: self->m_keyProvider->getLatestDefaultEncryptionKey());
wait(enableEncryptionDomain ? self->m_keyProvider->getLatestEncryptionKey(p->domainId.get())
: self->m_keyProvider->getLatestDefaultEncryptionKey());
page->encryptionKey = k;
}
@ -6091,8 +6171,9 @@ private:
records[0].key != dbBegin.key) {
CODE_PROBE(records.size() == 1, "Writing a new root because the current root pointer would be too large");
if (records[0].key != dbBegin.key) {
ASSERT(self->m_keyProvider.isValid() && self->m_keyProvider->enableEncryption() &&
self->m_keyProvider->enableEncryptionDomain());
ASSERT(self->m_expectedEncryptionMode.present() &&
self->m_expectedEncryptionMode.get().isEncryptionEnabled());
ASSERT(self->m_keyProvider.isValid() && self->m_keyProvider->enableEncryptionDomain());
int64_t domainId;
size_t prefixLength;
std::tie(domainId, prefixLength) = self->m_keyProvider->getEncryptionDomain(records[0].key);
@ -6678,9 +6759,9 @@ private:
// TryToUpdate indicates insert and erase operations should be tried on the existing page first
state bool tryToUpdate = btPage->tree()->numItems > 0 && update->boundariesNormal();
state bool useEncryptionDomain = page->isEncrypted() && self->m_keyProvider->enableEncryptionDomain();
state bool enableEncryptionDomain = page->isEncrypted() && self->m_keyProvider->enableEncryptionDomain();
state Optional<int64_t> pageDomainId;
if (useEncryptionDomain) {
if (enableEncryptionDomain) {
pageDomainId = page->getEncryptionDomainId();
}
@ -6803,7 +6884,7 @@ private:
// If updating, first try to add the record to the page
if (updatingDeltaTree) {
bool canInsert = true;
if (useEncryptionDomain) {
if (enableEncryptionDomain) {
ASSERT(pageDomainId.present());
canInsert = self->m_keyProvider->keyFitsInDomain(pageDomainId.get(), rec.key, false);
}
@ -6957,9 +7038,9 @@ private:
debug_print(addPrefix(context, update->toString()));
return Void();
} else {
debug_printf(
"%s Changes were made, writing, but subtree may still be unchanged from parent's perspective.\n",
context.c_str());
debug_printf("%s Changes were made, writing, but subtree may still be unchanged from parent's "
"perspective.\n",
context.c_str());
}
if (updatingDeltaTree) {
@ -7582,7 +7663,7 @@ public:
#if REDWOOD_DEBUG
path.push_back({ p, cursor, link.get().getChildPage() });
#else
path.push_back({ p, cursor });
path.push_back({ p, cursor });
#endif
if (btree->m_pBoundaryVerifier != nullptr) {
@ -7608,7 +7689,7 @@ public:
#if REDWOOD_DEBUG
path.push_back({ p, btree->getCursor(p.getPtr(), dbBegin, dbEnd), id });
#else
path.push_back({ p, btree->getCursor(p.getPtr(), dbBegin, dbEnd) });
path.push_back({ p, btree->getCursor(p.getPtr(), dbBegin, dbEnd) });
#endif
return Void();
});
@ -7848,8 +7929,16 @@ RedwoodRecordRef VersionedBTree::dbEnd("\xff\xff\xff\xff\xff"_sr);
class KeyValueStoreRedwood : public IKeyValueStore {
public:
KeyValueStoreRedwood(std::string filename, UID logID, Reference<IPageEncryptionKeyProvider> encryptionKeyProvider)
KeyValueStoreRedwood(std::string filename,
UID logID,
Reference<AsyncVar<ServerDBInfo> const> db,
Optional<EncryptionAtRestMode> encryptionMode,
EncodingType encodingType = EncodingType::MAX_ENCODING_TYPE,
Reference<IPageEncryptionKeyProvider> keyProvider = {})
: m_filename(filename), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
if (!encryptionMode.present() || encryptionMode.get().isEncryptionEnabled()) {
ASSERT(keyProvider.isValid() || db.isValid());
}
int pageSize =
BUGGIFY ? deterministicRandom()->randomInt(1000, 4096 * 4) : SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE;
@ -7868,25 +7957,6 @@ public:
: 100 * 1024 * 1024) // 100M
: SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW_BYTES;
EncodingType encodingType = EncodingType::XXHash64;
// When reopening Redwood on restart, the cluser encryption config could be unknown at this point,
// for which shouldEnableEncryption will return false. In that case, if the Redwood instance was encrypted
// before, the encoding type in the header page will be used instead.
//
// TODO(yiwu): When the cluster encryption config is available later, fail if the cluster is configured to
// enable encryption, but the Redwood instance is unencrypted.
if (encryptionKeyProvider && encryptionKeyProvider->enableEncryption()) {
encodingType = FLOW_KNOBS->ENCRYPT_HEADER_AUTH_TOKEN_ENABLED ? EncodingType::AESEncryptionWithAuth
: EncodingType::AESEncryption;
ASSERT_EQ(encodingType, encryptionKeyProvider->expectedEncodingType());
m_keyProvider = encryptionKeyProvider;
} else if (g_allowXOREncryptionInSimulation && g_network->isSimulated() && logID.hash() % 2 == 0) {
// Simulation only. Deterministically enable encryption based on uid
encodingType = EncodingType::XOREncryption_TestOnly;
m_keyProvider = makeReference<XOREncryptionKeyProvider_TestOnly>(filename);
}
IPager2* pager = new DWALPager(pageSize,
extentSize,
filename,
@ -7894,9 +7964,8 @@ public:
remapCleanupWindowBytes,
SERVER_KNOBS->REDWOOD_EXTENT_CONCURRENT_READS,
false,
m_keyProvider,
m_error);
m_tree = new VersionedBTree(pager, filename, encodingType, m_keyProvider);
m_tree = new VersionedBTree(pager, filename, db, encryptionMode, encodingType, keyProvider);
m_init = catchError(init_impl(this));
}
@ -7912,6 +7981,8 @@ public:
return Void();
}
Future<EncryptionAtRestMode> encryptionMode() override { return m_tree->encryptionMode(); }
ACTOR void shutdown(KeyValueStoreRedwood* self, bool dispose) {
TraceEvent(SevInfo, "RedwoodShutdown").detail("Filename", self->m_filename).detail("Dispose", dispose);
@ -8189,8 +8260,9 @@ private:
IKeyValueStore* keyValueStoreRedwoodV1(std::string const& filename,
UID logID,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider) {
return new KeyValueStoreRedwood(filename, logID, encryptionKeyProvider);
Reference<AsyncVar<ServerDBInfo> const> db,
Optional<EncryptionAtRestMode> encryptionMode) {
return new KeyValueStoreRedwood(filename, logID, db, encryptionMode);
}
int randomSize(int max) {
@ -9998,13 +10070,23 @@ TEST_CASE("Lredwood/correctness/btree") {
state int64_t maxRecordsRead = params.getInt("maxRecordsRead").orDefault(300e6);
state EncodingType encodingType = static_cast<EncodingType>(encoding);
state EncryptionAtRestMode encryptionMode =
!isEncodingTypeAESEncrypted(encodingType)
? EncryptionAtRestMode::DISABLED
: (encryptionDomainMode == RandomEncryptionKeyProvider<AESEncryption>::EncryptionDomainMode::DISABLED
? EncryptionAtRestMode::CLUSTER_AWARE
: EncryptionAtRestMode::DOMAIN_AWARE);
state Reference<IPageEncryptionKeyProvider> keyProvider;
auto& g_knobs = IKnobCollection::getMutableGlobalKnobCollection();
if (encodingType == EncodingType::AESEncryption) {
keyProvider = makeReference<RandomEncryptionKeyProvider<AESEncryption>>(
RandomEncryptionKeyProvider<AESEncryption>::EncryptionDomainMode(encryptionDomainMode));
g_knobs.setKnob("encrypt_header_auth_token_enabled", KnobValueRef::create(bool{ false }));
} else if (encodingType == EncodingType::AESEncryptionWithAuth) {
keyProvider = makeReference<RandomEncryptionKeyProvider<AESEncryptionWithAuth>>(
RandomEncryptionKeyProvider<AESEncryptionWithAuth>::EncryptionDomainMode(encryptionDomainMode));
g_knobs.setKnob("encrypt_header_auth_token_enabled", KnobValueRef::create(bool{ true }));
g_knobs.setKnob("encrypt_header_auth_token_algo", KnobValueRef::create(int{ 1 }));
} else if (encodingType == EncodingType::XOREncryption_TestOnly) {
keyProvider = makeReference<XOREncryptionKeyProvider_TestOnly>(file);
}
@ -10042,15 +10124,9 @@ TEST_CASE("Lredwood/correctness/btree") {
deleteFile(file);
printf("Initializing...\n");
pager = new DWALPager(pageSize,
extentSize,
file,
pageCacheBytes,
remapCleanupWindowBytes,
concurrentExtentReads,
pagerMemoryOnly,
keyProvider);
state VersionedBTree* btree = new VersionedBTree(pager, file, encodingType, keyProvider);
pager = new DWALPager(
pageSize, extentSize, file, pageCacheBytes, remapCleanupWindowBytes, concurrentExtentReads, pagerMemoryOnly);
state VersionedBTree* btree = new VersionedBTree(pager, file, {}, encryptionMode, encodingType, keyProvider);
wait(btree->init());
state DecodeBoundaryVerifier* pBoundaries = DecodeBoundaryVerifier::getVerifier(file);
@ -10153,7 +10229,8 @@ TEST_CASE("Lredwood/correctness/btree") {
while (e != eEnd) {
auto w = *e;
++e;
// If e key is different from last and last was present then insert clear for last's key at version
// If e key is different from last and last was present then insert clear for last's key at
// version
if (last != eEnd &&
((e == eEnd || e->first.first != last->first.first) && last->second.present())) {
debug_printf(
@ -10221,8 +10298,8 @@ TEST_CASE("Lredwood/correctness/btree") {
keyBytesCleared.rate() / 1e6,
mutationBytes.rate() / 1e6);
// Sometimes advance the oldest version to close the gap between the oldest and latest versions by a random
// amount.
// Sometimes advance the oldest version to close the gap between the oldest and latest versions by a
// random amount.
if (deterministicRandom()->random01() < advanceOldVersionProbability) {
btree->setOldestReadableVersion(
btree->getLastCommittedVersion() -
@ -10286,15 +10363,9 @@ TEST_CASE("Lredwood/correctness/btree") {
wait(closedFuture);
printf("Reopening btree from disk.\n");
IPager2* pager = new DWALPager(pageSize,
extentSize,
file,
pageCacheBytes,
remapCleanupWindowBytes,
concurrentExtentReads,
false,
keyProvider);
btree = new VersionedBTree(pager, file, encodingType, keyProvider);
IPager2* pager = new DWALPager(
pageSize, extentSize, file, pageCacheBytes, remapCleanupWindowBytes, concurrentExtentReads, false);
btree = new VersionedBTree(pager, file, {}, encryptionMode, encodingType, keyProvider);
wait(btree->init());
@ -10341,9 +10412,10 @@ TEST_CASE("Lredwood/correctness/btree") {
pageCacheBytes,
(BUGGIFY ? 0 : remapCleanupWindowBytes),
concurrentExtentReads,
pagerMemoryOnly,
keyProvider),
pagerMemoryOnly),
file,
{},
{},
encodingType,
keyProvider);
wait(btree->init());
@ -10482,15 +10554,9 @@ TEST_CASE(":/redwood/performance/extentQueue") {
// Do random pushes into the queue and commit periodically
if (reload) {
pager = new DWALPager(pageSize,
extentSize,
fileName,
cacheSizeBytes,
remapCleanupWindowBytes,
concurrentExtentReads,
false,
Reference<IPageEncryptionKeyProvider>());
pager = new DWALPager(
pageSize, extentSize, fileName, cacheSizeBytes, remapCleanupWindowBytes, concurrentExtentReads, false);
pager->setEncryptionKeyProvider(makeReference<NullEncryptionKeyProvider>());
wait(success(pager->init()));
LogicalPageID extID = pager->newLastExtentID();
@ -10540,14 +10606,9 @@ TEST_CASE(":/redwood/performance/extentQueue") {
}
printf("Reopening pager file from disk.\n");
pager = new DWALPager(pageSize,
extentSize,
fileName,
cacheSizeBytes,
remapCleanupWindowBytes,
concurrentExtentReads,
false,
Reference<IPageEncryptionKeyProvider>());
pager = new DWALPager(
pageSize, extentSize, fileName, cacheSizeBytes, remapCleanupWindowBytes, concurrentExtentReads, false);
pager->setEncryptionKeyProvider(makeReference<NullEncryptionKeyProvider>());
wait(success(pager->init()));
printf("Starting ExtentQueue FastPath Recovery from Disk.\n");
@ -10687,16 +10748,10 @@ TEST_CASE(":/redwood/performance/set") {
deleteFile(file);
}
DWALPager* pager = new DWALPager(pageSize,
extentSize,
file,
pageCacheBytes,
remapCleanupWindowBytes,
concurrentExtentReads,
pagerMemoryOnly,
Reference<IPageEncryptionKeyProvider>());
DWALPager* pager = new DWALPager(
pageSize, extentSize, file, pageCacheBytes, remapCleanupWindowBytes, concurrentExtentReads, pagerMemoryOnly);
state VersionedBTree* btree =
new VersionedBTree(pager, file, EncodingType::XXHash64, Reference<IPageEncryptionKeyProvider>());
new VersionedBTree(pager, file, {}, {}, EncodingType::XXHash64, makeReference<NullEncryptionKeyProvider>());
wait(btree->init());
printf("Initialized. StorageBytes=%s\n", btree->getStorageBytes().toString().c_str());
@ -11062,8 +11117,9 @@ ACTOR Future<Void> prefixClusteredInsert(IKeyValueStore* kvs,
}
wait(commit);
// TODO is it desired that not all records are committed? This could commit again to ensure any records set() since
// the last commit are persisted. For the purposes of how this is used currently, I don't think it matters though
// TODO is it desired that not all records are committed? This could commit again to ensure any records set()
// since the last commit are persisted. For the purposes of how this is used currently, I don't think it matters
// though
stats();
printf("\n");
@ -11349,10 +11405,11 @@ void setAuthMode(EncodingType encodingType) {
TEST_CASE("/redwood/correctness/EnforceEncodingType") {
state const std::vector<std::pair<EncodingType, EncodingType>> testCases = {
{ XXHash64, AESEncryption }, { AESEncryption, AESEncryptionWithAuth }
{ XXHash64, XOREncryption_TestOnly }, { AESEncryption, AESEncryptionWithAuth }
};
state const std::map<EncodingType, Reference<IPageEncryptionKeyProvider>> encryptionKeyProviders = {
{ XXHash64, makeReference<NullKeyProvider>() },
{ XXHash64, makeReference<NullEncryptionKeyProvider>() },
{ XOREncryption_TestOnly, makeReference<XOREncryptionKeyProvider_TestOnly>("test.redwood-v1") },
{ AESEncryption, makeReference<RandomEncryptionKeyProvider<AESEncryption>>() },
{ AESEncryptionWithAuth, makeReference<RandomEncryptionKeyProvider<AESEncryptionWithAuth>>() }
};
@ -11362,18 +11419,18 @@ TEST_CASE("/redwood/correctness/EnforceEncodingType") {
state EncodingType initialEncodingType = testCase.first;
state EncodingType reopenEncodingType = testCase.second;
ASSERT_NE(initialEncodingType, reopenEncodingType);
ASSERT(ArenaPage::isEncodingTypeEncrypted(reopenEncodingType));
ASSERT(isEncodingTypeEncrypted(reopenEncodingType));
deleteFile("test.redwood-v1");
printf("Create KV store with encoding type %d\n", initialEncodingType);
setAuthMode(initialEncodingType);
kvs = openKVStore(KeyValueStoreType::SSD_REDWOOD_V1,
"test.redwood-v1",
UID(),
0,
false,
false,
false,
encryptionKeyProviders.at(initialEncodingType));
kvs = new KeyValueStoreRedwood("test.redwood-v1",
UID(),
{}, // db
isEncodingTypeAESEncrypted(initialEncodingType)
? EncryptionAtRestMode::CLUSTER_AWARE
: EncryptionAtRestMode::DISABLED,
initialEncodingType,
encryptionKeyProviders.at(initialEncodingType));
wait(kvs->init());
kvs->set(KeyValueRef("foo"_sr, "bar"_sr));
wait(kvs->commit());
@ -11381,14 +11438,12 @@ TEST_CASE("/redwood/correctness/EnforceEncodingType") {
// Reopen
printf("Reopen KV store with encoding type %d\n", reopenEncodingType);
setAuthMode(reopenEncodingType);
kvs = openKVStore(KeyValueStoreType::SSD_REDWOOD_V1,
"test.redwood-v1",
UID(),
0,
false,
false,
false,
encryptionKeyProviders.at(reopenEncodingType));
kvs = new KeyValueStoreRedwood("test.redwood-v1",
UID(),
{}, // db
{}, // encryptionMode
reopenEncodingType,
encryptionKeyProviders.at(reopenEncodingType));
wait(kvs->init());
try {
Optional<Value> v = wait(kvs->readValue("foo"_sr));

View File

@ -2265,10 +2265,6 @@ int main(int argc, char* argv[]) {
KnobValue::create(ini.GetBoolValue("META", "enableBlobGranuleEncryption", false)));
g_knobs.setKnob("enable_blob_granule_compression",
KnobValue::create(ini.GetBoolValue("META", "enableBlobGranuleEncryption", false)));
// Restart test does not preserve encryption mode (tenant-aware or domain-aware).
// Disable domain-aware encryption in Redwood until encryption mode from db config is being handled.
// TODO(yiwu): clean it up once we cleanup the knob.
g_knobs.setKnob("redwood_split_encrypted_pages_by_tenant", KnobValue::create(bool{ false }));
g_knobs.setKnob("encrypt_header_auth_token_enabled",
KnobValue::create(ini.GetBoolValue("META", "encryptHeaderAuthTokenEnabled", false)));
g_knobs.setKnob("encrypt_header_auth_token_algo",

View File

@ -20,6 +20,7 @@
#ifndef FDBSERVER_IKEYVALUESTORE_H
#define FDBSERVER_IKEYVALUESTORE_H
#include "flow/Trace.h"
#pragma once
#include "fdbclient/FDBTypes.h"
@ -133,6 +134,9 @@ public:
// of a rollback.
virtual Future<Void> init() { return Void(); }
// Obtain the encryption mode of the storage. The encryption mode needs to match the encryption mode of the cluster.
virtual Future<EncryptionAtRestMode> encryptionMode() = 0;
protected:
virtual ~IKeyValueStore() {}
};
@ -144,7 +148,8 @@ extern IKeyValueStore* keyValueStoreSQLite(std::string const& filename,
bool checkIntegrity = false);
extern IKeyValueStore* keyValueStoreRedwoodV1(std::string const& filename,
UID logID,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider = {});
Reference<AsyncVar<ServerDBInfo> const> db = {},
Optional<EncryptionAtRestMode> encryptionMode = {});
extern IKeyValueStore* keyValueStoreRocksDB(std::string const& path,
UID logID,
KeyValueStoreType storeType,
@ -183,7 +188,16 @@ inline IKeyValueStore* openKVStore(KeyValueStoreType storeType,
bool checkChecksums = false,
bool checkIntegrity = false,
bool openRemotely = false,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider = {}) {
Reference<AsyncVar<ServerDBInfo> const> db = {},
Optional<EncryptionAtRestMode> encryptionMode = {}) {
// Only Redwood support encryption currently.
if (encryptionMode.present() && encryptionMode.get().isEncryptionEnabled() &&
storeType != KeyValueStoreType::SSD_REDWOOD_V1) {
TraceEvent(SevWarn, "KVStoreTypeNotSupportingEncryption")
.detail("KVStoreType", storeType)
.detail("EncryptionMode", encryptionMode);
throw encrypt_mode_mismatch();
}
if (openRemotely) {
return openRemoteKVStore(storeType, filename, logID, memoryLimit, checkChecksums, checkIntegrity);
}
@ -195,7 +209,7 @@ inline IKeyValueStore* openKVStore(KeyValueStoreType storeType,
case KeyValueStoreType::MEMORY:
return keyValueStoreMemory(filename, logID, memoryLimit);
case KeyValueStoreType::SSD_REDWOOD_V1:
return keyValueStoreRedwoodV1(filename, logID, encryptionKeyProvider);
return keyValueStoreRedwoodV1(filename, logID, db, encryptionMode);
case KeyValueStoreType::SSD_ROCKSDB_V1:
return keyValueStoreRocksDB(filename, logID, storeType);
case KeyValueStoreType::SSD_SHARDED_ROCKSDB:

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include "fdbclient/TenantManagement.actor.h"
#include "fdbrpc/TenantInfo.h"
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_IPAGEENCRYPTIONKEYPROVIDER_ACTOR_G_H)
#define FDBSERVER_IPAGEENCRYPTIONKEYPROVIDER_ACTOR_G_H
#include "fdbserver/IPageEncryptionKeyProvider.actor.g.h"
@ -68,12 +70,8 @@ public:
// Expected encoding type being used with the encryption key provider.
virtual EncodingType expectedEncodingType() const = 0;
// Checks whether encryption should be enabled. If not, the encryption key provider will not be used by
// the pager, and instead the default non-encrypted encoding type (XXHash64) is used.
virtual bool enableEncryption() const = 0;
// Whether encryption domain is enabled.
virtual bool enableEncryptionDomain() const { return false; }
virtual bool enableEncryptionDomain() const = 0;
// Get an encryption key from given encoding header.
virtual Future<EncryptionKey> getEncryptionKey(const void* encodingHeader) { throw not_implemented(); }
@ -110,11 +108,11 @@ public:
// The null key provider is useful to simplify page decoding.
// It throws an error for any key info requested.
class NullKeyProvider : public IPageEncryptionKeyProvider {
class NullEncryptionKeyProvider : public IPageEncryptionKeyProvider {
public:
virtual ~NullKeyProvider() {}
virtual ~NullEncryptionKeyProvider() {}
EncodingType expectedEncodingType() const override { return EncodingType::XXHash64; }
bool enableEncryption() const override { return false; }
bool enableEncryptionDomain() const override { return false; }
};
// Key provider for dummy XOR encryption scheme
@ -139,7 +137,7 @@ public:
EncodingType expectedEncodingType() const override { return EncodingType::XOREncryption_TestOnly; }
bool enableEncryption() const override { return true; }
bool enableEncryptionDomain() const override { return false; }
Future<EncryptionKey> getEncryptionKey(const void* encodingHeader) override {
@ -188,9 +186,7 @@ public:
EncodingType expectedEncodingType() const override { return encodingType; }
bool enableEncryption() const override { return true; }
bool enableEncryptionDomain() const override { return mode > 1; }
bool enableEncryptionDomain() const override { return mode > 0; }
Future<EncryptionKey> getEncryptionKey(const void* encodingHeader) override {
using Header = typename ArenaPage::AESEncryptionEncoder<encodingType>::Header;
@ -284,26 +280,29 @@ private:
template <EncodingType encodingType,
typename std::enable_if<encodingType == AESEncryption || encodingType == AESEncryptionWithAuth, bool>::type =
true>
class TenantAwareEncryptionKeyProvider : public IPageEncryptionKeyProvider {
class AESEncryptionKeyProvider : public IPageEncryptionKeyProvider {
public:
using EncodingHeader = typename ArenaPage::AESEncryptionEncoder<encodingType>::Header;
const StringRef systemKeysPrefix = systemKeys.begin;
TenantAwareEncryptionKeyProvider(Reference<AsyncVar<ServerDBInfo> const> db) : db(db) {}
AESEncryptionKeyProvider(Reference<AsyncVar<ServerDBInfo> const> db, EncryptionAtRestMode encryptionMode)
: db(db), encryptionMode(encryptionMode) {
ASSERT(encryptionMode != EncryptionAtRestMode::DISABLED);
ASSERT(db.isValid());
}
virtual ~TenantAwareEncryptionKeyProvider() = default;
virtual ~AESEncryptionKeyProvider() = default;
EncodingType expectedEncodingType() const override { return encodingType; }
bool enableEncryption() const override {
return isEncryptionOpSupported(EncryptOperationType::STORAGE_SERVER_ENCRYPTION);
bool enableEncryptionDomain() const override {
// Regardless of encryption mode, system keys always encrypted using system key space domain.
// Because of this, AESEncryptionKeyProvider always appears to be domain-aware.
return true;
}
bool enableEncryptionDomain() const override { return SERVER_KNOBS->REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT; }
ACTOR static Future<EncryptionKey> getEncryptionKey(TenantAwareEncryptionKeyProvider* self,
const void* encodingHeader) {
ACTOR static Future<EncryptionKey> getEncryptionKey(AESEncryptionKeyProvider* self, const void* encodingHeader) {
const BlobCipherEncryptHeader& header = reinterpret_cast<const EncodingHeader*>(encodingHeader)->encryption;
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(self->db, header, BlobCipherMetrics::KV_REDWOOD));
@ -320,9 +319,8 @@ public:
return getLatestEncryptionKey(getDefaultEncryptionDomainId());
}
ACTOR static Future<EncryptionKey> getLatestEncryptionKey(TenantAwareEncryptionKeyProvider* self,
int64_t domainId) {
ACTOR static Future<EncryptionKey> getLatestEncryptionKey(AESEncryptionKeyProvider* self, int64_t domainId) {
ASSERT(self->encryptionMode == EncryptionAtRestMode::DOMAIN_AWARE || domainId < 0);
TextAndHeaderCipherKeys cipherKeys =
wait(getLatestEncryptCipherKeysForDomain(self->db, domainId, BlobCipherMetrics::KV_REDWOOD));
EncryptionKey encryptionKey;
@ -341,14 +339,16 @@ public:
if (key.startsWith(systemKeysPrefix)) {
return { SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, systemKeysPrefix.size() };
}
// Cluster-aware encryption.
if (encryptionMode == EncryptionAtRestMode::CLUSTER_AWARE) {
return { FDB_DEFAULT_ENCRYPT_DOMAIN_ID, 0 };
}
// Key smaller than tenant prefix in size belongs to the default domain.
if (key.size() < TenantAPI::PREFIX_SIZE) {
return { FDB_DEFAULT_ENCRYPT_DOMAIN_ID, 0 };
}
StringRef prefix = key.substr(0, TenantAPI::PREFIX_SIZE);
int64_t tenantId = TenantAPI::prefixToId(prefix, EnforceValidTenantId::False);
// Tenant id must be non-negative.
if (tenantId < 0) {
int64_t tenantId = TenantAPI::extractTenantIdFromKeyRef(key);
if (tenantId == TenantInfo::INVALID_TENANT) {
return { FDB_DEFAULT_ENCRYPT_DOMAIN_ID, 0 };
}
return { tenantId, TenantAPI::PREFIX_SIZE };
@ -362,6 +362,7 @@ public:
private:
Reference<AsyncVar<ServerDBInfo> const> db;
EncryptionAtRestMode encryptionMode;
};
#include "flow/unactorcompiler.h"

View File

@ -22,8 +22,6 @@
#ifndef FDBSERVER_IPAGER_H
#define FDBSERVER_IPAGER_H
#include <cstddef>
#include <stdint.h>
#include "fdbclient/BlobCipher.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
@ -39,6 +37,10 @@
#define XXH_INLINE_ALL
#include "flow/xxhash.h"
#include <array>
#include <cstddef>
#include <stdint.h>
typedef uint32_t LogicalPageID;
typedef uint32_t PhysicalPageID;
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
@ -102,6 +104,15 @@ enum EncodingType : uint8_t {
MAX_ENCODING_TYPE = 4
};
static constexpr std::array EncryptedEncodingTypes = { AESEncryption, AESEncryptionWithAuth, XOREncryption_TestOnly };
inline bool isEncodingTypeEncrypted(EncodingType encoding) {
return std::count(EncryptedEncodingTypes.begin(), EncryptedEncodingTypes.end(), encoding) > 0;
}
inline bool isEncodingTypeAESEncrypted(EncodingType encoding) {
return encoding == AESEncryption || encoding == AESEncryptionWithAuth;
}
enum PageType : uint8_t {
HeaderPage = 0,
BackupHeaderPage = 1,
@ -615,11 +626,6 @@ public:
const Arena& getArena() const { return arena; }
static bool isEncodingTypeEncrypted(EncodingType t) {
return t == EncodingType::AESEncryption || t == EncodingType::AESEncryptionWithAuth ||
t == EncodingType::XOREncryption_TestOnly;
}
// Returns true if the page's encoding type employs encryption
bool isEncrypted() const { return isEncodingTypeEncrypted(getEncodingType()); }
@ -707,11 +713,16 @@ public:
ArbitraryObject extra;
};
class IPageEncryptionKeyProvider;
// This API is probably too customized to the behavior of DWALPager and probably needs some changes to be more generic.
class IPager2 : public IClosable {
public:
virtual std::string getName() const = 0;
// Set an encryption key provider.
virtual void setEncryptionKeyProvider(Reference<IPageEncryptionKeyProvider> keyProvider) = 0;
// Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed.
virtual Reference<ArenaPage> newPageBuffer(size_t blocks = 1) = 0;

View File

@ -492,6 +492,10 @@ struct RemoteIKeyValueStore : public IKeyValueStore {
}
return Void();
}
Future<EncryptionAtRestMode> encryptionMode() override {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}
};
Future<Void> runFlowProcess(std::string const& name, Endpoint endpoint);

View File

@ -866,10 +866,12 @@ struct InitializeStorageRequest {
tssPairIDAndVersion; // Only set if recruiting a tss. Will be the UID and Version of its SS pair.
Version initialClusterVersion;
ReplyPromise<InitializeStorageReply> reply;
EncryptionAtRestMode encryptMode;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, initialClusterVersion);
serializer(
ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, initialClusterVersion, encryptMode);
}
};
@ -1186,7 +1188,6 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ei, Reference<
class IKeyValueStore;
class ServerCoordinators;
class IDiskQueue;
class IPageEncryptionKeyProvider;
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
@ -1194,8 +1195,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider);
std::string folder);
ACTOR Future<Void> storageServer(
IKeyValueStore* persistentData,
StorageServerInterface ssi,
@ -1203,8 +1203,7 @@ ACTOR Future<Void> storageServer(
std::string folder,
Promise<Void> recovered,
Reference<IClusterConnectionRecord>
connRecord, // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider);
connRecord); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,

View File

@ -437,6 +437,8 @@ struct StorageServerDisk {
StorageBytes getStorageBytes() const { return storage->getStorageBytes(); }
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
Future<EncryptionAtRestMode> encryptionMode() { return storage->encryptionMode(); }
// The following are pointers to the Counters in StorageServer::counters of the same names.
Counter* kvCommitLogicalBytes;
Counter* kvClearRanges;
@ -797,8 +799,6 @@ public:
std::map<Version, std::vector<KeyRange>>
pendingRemoveRanges; // Pending requests to remove ranges from physical shards
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider;
bool shardAware; // True if the storage server is aware of the physical shards.
// Histograms
@ -1162,6 +1162,8 @@ public:
Optional<LatencyBandConfig> latencyBandConfig;
Optional<EncryptionAtRestMode> encryptionMode;
struct Counters {
CounterCollection cc;
Counter allQueries, systemKeyQueries, getKeyQueries, getValueQueries, getRangeQueries, getRangeSystemKeyQueries,
@ -1374,12 +1376,10 @@ public:
StorageServer(IKeyValueStore* storage,
Reference<AsyncVar<ServerDBInfo> const> const& db,
StorageServerInterface const& ssi,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider)
: encryptionKeyProvider(encryptionKeyProvider), shardAware(false),
tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
TLOG_CURSOR_READS_LATENCY_HISTOGRAM,
Histogram::Unit::milliseconds)),
StorageServerInterface const& ssi)
: shardAware(false), tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
TLOG_CURSOR_READS_LATENCY_HISTOGRAM,
Histogram::Unit::milliseconds)),
ssVersionLockLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
SS_VERSION_LOCK_LATENCY_HISTOGRAM,
Histogram::Unit::milliseconds)),
@ -9197,11 +9197,13 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
} else {
MutationRef msg;
cloneReader >> msg;
if (g_network && g_network->isSimulated() &&
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) && !msg.isEncrypted() &&
!(isSingleKeyMutation((MutationRef::Type)msg.type) &&
(backupLogKeys.contains(msg.param1) || (applyLogKeys.contains(msg.param1))))) {
ASSERT(false);
if (g_network && g_network->isSimulated()) {
bool isBackupLogMutation =
isSingleKeyMutation((MutationRef::Type)msg.type) &&
(backupLogKeys.contains(msg.param1) || applyLogKeys.contains(msg.param1));
ASSERT(data->encryptionMode.present());
ASSERT(!data->encryptionMode.get().isEncryptionEnabled() || msg.isEncrypted() ||
isBackupLogMutation);
}
if (msg.isEncrypted()) {
if (!cipherKeys.present()) {
@ -9358,11 +9360,13 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
MutationRef msg;
MutationRefAndCipherKeys encryptedMutation;
rd >> msg;
if (g_network && g_network->isSimulated() &&
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) && !msg.isEncrypted() &&
!(isSingleKeyMutation((MutationRef::Type)msg.type) &&
(backupLogKeys.contains(msg.param1) || (applyLogKeys.contains(msg.param1))))) {
ASSERT(false);
if (g_network && g_network->isSimulated()) {
bool isBackupLogMutation =
isSingleKeyMutation((MutationRef::Type)msg.type) &&
(backupLogKeys.contains(msg.param1) || applyLogKeys.contains(msg.param1));
ASSERT(data->encryptionMode.present());
ASSERT(!data->encryptionMode.get().isEncryptionEnabled() || msg.isEncrypted() ||
isBackupLogMutation);
}
if (msg.isEncrypted()) {
ASSERT(cipherKeys.present());
@ -11402,6 +11406,7 @@ ACTOR Future<Void> initTenantMap(StorageServer* self) {
ACTOR Future<Void> replaceInterface(StorageServer* self, StorageServerInterface ssi) {
ASSERT(!ssi.isTss());
state EncryptionAtRestMode encryptionMode = wait(self->storage.encryptionMode());
state Transaction tr(self->cx);
loop {
@ -11415,6 +11420,12 @@ ACTOR Future<Void> replaceInterface(StorageServer* self, StorageServerInterface
GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId()))
: Never())) {
state GetStorageServerRejoinInfoReply rep = _rep;
if (rep.encryptMode != encryptionMode) {
TraceEvent(SevWarnAlways, "SSEncryptModeMismatch", self->thisServerID)
.detail("StorageEncryptionMode", encryptionMode)
.detail("ClusterEncryptionMode", rep.encryptMode);
throw encrypt_mode_mismatch();
}
try {
tr.reset();
@ -11569,9 +11580,8 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider) {
state StorageServer self(persistentData, db, ssi, encryptionKeyProvider);
std::string folder) {
state StorageServer self(persistentData, db, ssi);
self.shardAware = SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && persistentData->shardAware();
state Future<Void> ssCore;
self.initialClusterVersion = startVersion;
@ -11589,6 +11599,9 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
wait(self.storage.commit());
++self.counters.kvCommits;
EncryptionAtRestMode encryptionMode = wait(self.storage.encryptionMode());
self.encryptionMode = encryptionMode;
if (seedTag == invalidTag) {
ssi.startAcceptingRequests();
self.registerInterfaceAcceptingRequests.send(Void());
@ -11661,9 +11674,8 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<IClusterConnectionRecord> connRecord,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider) {
state StorageServer self(persistentData, db, ssi, encryptionKeyProvider);
Reference<IClusterConnectionRecord> connRecord) {
state StorageServer self(persistentData, db, ssi);
state Future<Void> ssCore;
self.folder = folder;
@ -11684,6 +11696,9 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
}
++self.counters.kvCommits;
EncryptionAtRestMode encryptionMode = wait(self.storage.encryptionMode());
self.encryptionMode = encryptionMode;
bool ok = wait(self.storage.restoreDurableState());
if (!ok) {
if (recovered.canBeSet())

View File

@ -1350,8 +1350,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
int64_t memoryLimit,
IKeyValueStore* store,
bool validateDataFiles,
Promise<Void>* rebootKVStore,
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider) {
Promise<Void>* rebootKVStore) {
state TrackRunningStorage _(id, storeType, runningStorages);
loop {
ErrorOr<Void> e = wait(errorOr(prevStorageServer));
@ -1418,13 +1417,8 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
DUMPTOKEN(recruited.changeFeedPop);
DUMPTOKEN(recruited.changeFeedVersionUpdate);
prevStorageServer = storageServer(store,
recruited,
db,
folder,
Promise<Void>(),
Reference<IClusterConnectionRecord>(nullptr),
encryptionKeyProvider);
prevStorageServer =
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<IClusterConnectionRecord>(nullptr));
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed());
}
}
@ -1888,14 +1882,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider;
if (FLOW_KNOBS->ENCRYPT_HEADER_AUTH_TOKEN_ENABLED) {
encryptionKeyProvider =
makeReference<TenantAwareEncryptionKeyProvider<EncodingType::AESEncryptionWithAuth>>(dbInfo);
} else {
encryptionKeyProvider =
makeReference<TenantAwareEncryptionKeyProvider<EncodingType::AESEncryption>>(dbInfo);
}
IKeyValueStore* kv = openKVStore(
s.storeType,
s.filename,
@ -1909,7 +1895,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
s.storeType != KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
deterministicRandom()->coinflip())
: true),
encryptionKeyProvider);
dbInfo);
Future<Void> kvClosed =
kv->onClosed() ||
rebootKVSPromise.getFuture() /* clear the onClosed() Future in actorCollection when rebooting */;
@ -1957,8 +1943,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.changeFeedVersionUpdate);
Promise<Void> recovery;
Future<Void> f =
storageServer(kv, recruited, dbInfo, folder, recovery, connRecord, encryptionKeyProvider);
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord);
recoveries.push_back(recovery.getFuture());
f = handleIOErrors(f, kv, s.storeID, kvClosed);
f = storageServerRollbackRebooter(&runningStorages,
@ -1974,8 +1959,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
memoryLimit,
kv,
validateDataFiles,
&rebootKVSPromise,
encryptionKeyProvider);
&rebootKVSPromise);
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), f));
} else if (s.storedComponent == DiskStore::TLogData) {
LocalLineage _;
@ -2581,15 +2565,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
folder,
isTss ? testingStoragePrefix.toString() : fileStoragePrefix.toString(),
recruited.id());
Reference<IPageEncryptionKeyProvider> encryptionKeyProvider;
if (FLOW_KNOBS->ENCRYPT_HEADER_AUTH_TOKEN_ENABLED) {
encryptionKeyProvider =
makeReference<TenantAwareEncryptionKeyProvider<EncodingType::AESEncryptionWithAuth>>(
dbInfo);
} else {
encryptionKeyProvider =
makeReference<TenantAwareEncryptionKeyProvider<EncodingType::AESEncryption>>(dbInfo);
}
IKeyValueStore* data = openKVStore(
req.storeType,
filename,
@ -2603,7 +2578,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
req.storeType != KeyValueStoreType::SSD_SHARDED_ROCKSDB &&
deterministicRandom()->coinflip())
: true),
encryptionKeyProvider);
dbInfo,
req.encryptMode);
Future<Void> kvClosed =
data->onClosed() ||
@ -2619,8 +2595,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,
dbInfo,
folder,
encryptionKeyProvider);
folder);
s = handleIOErrors(s, data, recruited.id(), kvClosed);
s = storageCache.removeOnReady(req.reqId, s);
s = storageServerRollbackRebooter(&runningStorages,
@ -2636,8 +2611,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
memoryLimit,
data,
false,
&rebootKVSPromise2,
encryptionKeyProvider);
&rebootKVSPromise2);
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s));
} else if (storageCache.exists(req.reqId)) {
forwardPromise(req.reply, storageCache.get(req.reqId));

View File

@ -111,7 +111,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
sharedRandomNumber /= 3;
// randomly some tests write data first and then turn on blob granules later, to test conversion of existing DB
initAtEnd = !enablePurging && sharedRandomNumber % 10 == 0;
initAtEnd = getOption(options, "initAtEnd"_sr, sharedRandomNumber % 10 == 0);
sharedRandomNumber /= 10;
// FIXME: enable and fix bugs!
// granuleSizeCheck = initAtEnd;

View File

@ -273,18 +273,17 @@ struct ConfigureDatabaseWorkload : TestWorkload {
ACTOR Future<Void> _setup(Database cx, ConfigureDatabaseWorkload* self) {
wait(success(ManagementAPI::changeConfig(cx.getReference(), "single storage_migration_type=aggressive", true)));
// Redwood is the only storage engine type supporting encryption.
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
if (config.encryptionAtRestMode.isEncryptionEnabled()) {
self->storageEngineExcludeTypes = { 0, 1, 2, 4, 5 };
wait(success(ManagementAPI::changeConfig(cx.getReference(), "ssd-redwood-1-experimental", true)));
}
return Void();
}
ACTOR Future<Void> _start(ConfigureDatabaseWorkload* self, Database cx) {
// Redwood is the only storage engine type supporting encryption.
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
TraceEvent("ConfigureDatabase_Config").detail("Config", config.toString());
if (config.encryptionAtRestMode.isEncryptionEnabled()) {
TraceEvent("ConfigureDatabase_EncryptionEnabled");
self->storageEngineExcludeTypes = { 0, 1, 2, 4, 5 };
}
if (self->clientId == 0) {
self->clients.push_back(timeout(self->singleDB(self, cx), self->testDuration, Void()));
wait(waitForAll(self->clients));

View File

@ -25,10 +25,11 @@
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/Knobs.h"
#include "boost/algorithm/string/predicate.hpp"
#include "flow/IConnection.h"
#include "fdbrpc/SimulatorProcessInfo.h"
#include "flow/Knobs.h"
#undef state
#include "fdbclient/SimpleIni.h"

View File

@ -21,7 +21,9 @@
#include "flow/IThreadPool.h"
#include <algorithm>
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include "boost/asio.hpp"

View File

@ -26,7 +26,9 @@
#include "flow/Trace.h"
#include <algorithm>
#include <memory>
#ifndef BOOST_SYSTEM_NO_LIB
#define BOOST_SYSTEM_NO_LIB
#endif
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include <boost/asio.hpp>

View File

@ -743,7 +743,26 @@ Future<T> safeThreadFutureToFutureImpl(ThreadFuture<T> threadFuture) {
return threadFuture.get();
}
// The allow anonymous_future type is used to prevent misuse of ThreadFutures.
// The removeArenaFromStandalone() actors simulate the behavior of DLApi. In this case,
// the memory is not owned by the Standalone. If the `future` goes out of scope, subsequent
// access to the memory via the returned standalone will be invalid.
ACTOR template <typename T>
Future<Standalone<T>> removeArenaFromStandalone(Future<Standalone<T>> future) {
Standalone<T> _ = wait(future);
return Standalone<T>(future.get(), Arena());
}
ACTOR template <typename T>
Future<Optional<Standalone<T>>> removeArenaFromStandalone(Future<Optional<Standalone<T>>> future) {
Optional<Standalone<T>> val = wait(future);
if (val.present()) {
return Standalone<T>(future.get().get(), Arena());
} else {
return Optional<Standalone<T>>();
}
}
// The allow_anonymous_future type is used to prevent misuse of ThreadFutures.
// For Standalone types, the memory in some cases is actually stored in the ThreadFuture object,
// in which case we expect the caller to keep that ThreadFuture around until the result is no
// longer needed.
@ -768,7 +787,11 @@ typename std::enable_if<allow_anonymous_future<T>::value, Future<T>>::type safeT
template <class T>
typename std::enable_if<!allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture(
ThreadFuture<T>& threadFuture) {
return safeThreadFutureToFutureImpl(threadFuture);
Future<T> f = safeThreadFutureToFutureImpl(threadFuture);
if (BUGGIFY) {
return removeArenaFromStandalone(f);
}
return f;
}
template <class T>
@ -781,7 +804,9 @@ typename std::enable_if<allow_anonymous_future<T>::value, Future<T>>::type safeT
template <class T>
typename std::enable_if<!allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture(
Future<T>& future) {
// Do nothing
if (BUGGIFY) {
return removeArenaFromStandalone(future);
}
return future;
}

View File

@ -356,6 +356,7 @@ ERROR( encrypt_invalid_id, 2706, "Invalid encryption cipher details" )
ERROR( encrypt_keys_fetch_failed, 2707, "Encryption keys fetch from external KMS failed" )
ERROR( encrypt_invalid_kms_config, 2708, "Invalid encryption/kms configuration: discovery-url, validation-token, endpoint etc." )
ERROR( encrypt_unsupported, 2709, "Encryption not supported" )
ERROR( encrypt_mode_mismatch, 2710, "Encryption mode mismatch with configuration")
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error

View File

@ -286,8 +286,8 @@ if(WITH_PYTHON)
TEST_FILES restarting/from_6.3.13_until_7.2.0/DrUpgradeRestart-1.txt
restarting/from_6.3.13_until_7.2.0/DrUpgradeRestart-2.txt)
add_fdb_test(
TEST_FILES restarting/from_7.0.0_until_7.2.0/UpgradeAndBackupRestore-1.toml
restarting/from_7.0.0_until_7.2.0/UpgradeAndBackupRestore-2.toml)
TEST_FILES restarting/from_7.0.0_until_7.1.0/UpgradeAndBackupRestore-1.toml
restarting/from_7.0.0_until_7.1.0/UpgradeAndBackupRestore-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.1.0_until_7.2.0/ConfigureTestRestart-1.toml
restarting/from_7.1.0_until_7.2.0/ConfigureTestRestart-2.toml)
@ -309,6 +309,9 @@ if(WITH_PYTHON)
add_fdb_test(
TEST_FILES restarting/from_7.1.0_until_7.2.0/ConfigureStorageMigrationTestRestart-1.toml
restarting/from_7.1.0_until_7.2.0/ConfigureStorageMigrationTestRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.1.0_until_7.2.0/UpgradeAndBackupRestore-1.toml
restarting/from_7.1.0_until_7.2.0/UpgradeAndBackupRestore-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.1.0_until_7.2.0/VersionVectorDisableRestart-1.toml
restarting/from_7.1.0_until_7.2.0/VersionVectorDisableRestart-2.toml)
@ -331,20 +334,26 @@ if(WITH_PYTHON)
TEST_FILES restarting/from_7.2.0/DrUpgradeRestart-1.toml
restarting/from_7.2.0/DrUpgradeRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.2.4/UpgradeAndBackupRestore-1.toml
restarting/from_7.2.4/UpgradeAndBackupRestore-2.toml)
TEST_FILES restarting/from_7.2.4_until_7.3.0/UpgradeAndBackupRestore-1.toml
restarting/from_7.2.4_until_7.3.0/UpgradeAndBackupRestore-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/ConfigureTestRestart-1.toml
restarting/from_7.3.0/ConfigureTestRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/ConfigureStorageMigrationTestRestart-1.toml
restarting/from_7.3.0/ConfigureStorageMigrationTestRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/UpgradeAndBackupRestore-1.toml
restarting/from_7.3.0/UpgradeAndBackupRestore-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/VersionVectorDisableRestart-1.toml
restarting/from_7.3.0/VersionVectorDisableRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/VersionVectorEnableRestart-1.toml
restarting/from_7.3.0/VersionVectorEnableRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml
restarting/from_7.3.0/BlobGranuleRestartCycle-2.toml)
add_fdb_test(
TEST_FILES restarting/to_7.1.0_until_7.2.0/ConfigureStorageMigrationTestRestart-1.toml
restarting/to_7.1.0_until_7.2.0/ConfigureStorageMigrationTestRestart-2.toml)

View File

@ -0,0 +1,57 @@
storageEngineExcludeTypes=3
[[test]]
testTitle = 'SubmitBackup'
simBackupAgents= 'BackupToFile'
clearAfterTest = false
runConsistencyCheck=false
[[test.workload]]
testName = 'SubmitBackup'
delayFor = 0
stopWhenDone = false
[[test]]
testTitle = 'FirstCycleTest'
clearAfterTest=false
runConsistencyCheck = false
[[test.workload]]
testName = 'Cycle'
nodeCount = 30000
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
keyPrefix = 'BeforeRestart'
[[test.workload]]
testName = 'RandomClogging'
testDuration = 90.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 90.0
testDuration = 90.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test.workload]]
testName='Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test]]
testTitle = 'SaveDatabase'
clearAfterTest = false
[[test.workload]]
testName = 'SaveAndKill'
restartInfoLocation = 'simfdb/restartInfo.ini'
testDuration=30.0

View File

@ -0,0 +1,61 @@
[[test]]
testTitle = 'SecondCycleTest'
simBackupAgents = 'BackupToFile'
clearAfterTest=false
runConsistencyCheck=false
[[test.workload]]
testName = 'Cycle'
nodeCount = 30000
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
keyPrefix = 'AfterRestart'
[[test.workload]]
testName = 'RandomClogging'
testDuration = 90.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 90.0
testDuration = 90.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test]]
testTitle= 'RestoreBackup'
simBackupAgents = 'BackupToFile'
clearAfterTest=false
[[test.workload]]
testName = 'RestoreBackup'
tag = 'default'
[[test]]
testTitle = 'CheckCycles'
checkOnly=true
[[test.workload]]
testName = 'Cycle'
nodeCount=30000
keyPrefix = 'AfterRestart'
expectedRate=0
[[test.workload]]
testName = 'Cycle'
nodeCount = 30000
keyPrefix= 'BeforeRestart'
expectedRate = 0

View File

@ -0,0 +1,68 @@
[configuration]
storageEngineExcludeTypes = [3]
disableEncryption = true
[[test]]
testTitle = 'SubmitBackup'
simBackupAgents= 'BackupToFile'
clearAfterTest = false
runConsistencyCheck=false
disabledFailureInjectionWorkloads = 'Attrition'
[[test.workload]]
testName = 'SubmitBackup'
delayFor = 0
stopWhenDone = false
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 30.0
[[test]]
testTitle = 'FirstCycleTest'
clearAfterTest=false
runConsistencyCheck = false
disabledFailureInjectionWorkloads = 'Attrition'
[[test.workload]]
testName = 'Cycle'
nodeCount = 30000
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
keyPrefix = 'BeforeRestart'
[[test.workload]]
testName = 'RandomClogging'
testDuration = 90.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 90.0
testDuration = 90.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test.workload]]
testName='Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test]]
testTitle = 'SaveDatabase'
clearAfterTest = false
[[test.workload]]
testName = 'SaveAndKill'
restartInfoLocation = 'simfdb/restartInfo.ini'
testDuration=30.0

View File

@ -0,0 +1,33 @@
# Blob Granules are only upgrade-able as of snowflake/release-71.2.3 and release
[configuration]
testClass = "BlobGranuleRestart"
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]
[[test]]
testTitle = 'BlobGranuleRestartCycle'
clearAfterTest=false
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 250.0
testDuration = 30.0
expectedRate = 0
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 30.0
# don't delete state after test
clearAndMergeCheck = false
doForcePurge = false
initAtEnd = false
[[test.workload]]
testName='SaveAndKill'
restartInfoLocation='simfdb/restartInfo.ini'
testDuration=90.0

View File

@ -0,0 +1,27 @@
# Blob Granules are only upgrade-able as of snowflake/release-71.2.3 and release-7.2
[configuration]
testClass = "BlobGranuleRestart"
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]
[[test]]
testTitle = 'BlobGranuleRestartCycle'
clearAfterTest=false
runSetup=false
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 250.0
testDuration = 30.0
expectedRate = 0
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 30.0
# cycle does its own workload checking, don't want clear racing with its checking
clearAndMergeCheck = false

View File

@ -0,0 +1,61 @@
[[test]]
testTitle = 'SecondCycleTest'
simBackupAgents = 'BackupToFile'
clearAfterTest=false
runConsistencyCheck=false
[[test.workload]]
testName = 'Cycle'
nodeCount = 30000
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
keyPrefix = 'AfterRestart'
[[test.workload]]
testName = 'RandomClogging'
testDuration = 90.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 90.0
testDuration = 90.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
[[test]]
testTitle= 'RestoreBackup'
simBackupAgents = 'BackupToFile'
clearAfterTest=false
[[test.workload]]
testName = 'RestoreBackup'
tag = 'default'
[[test]]
testTitle = 'CheckCycles'
checkOnly=true
[[test.workload]]
testName = 'Cycle'
nodeCount=30000
keyPrefix = 'AfterRestart'
expectedRate=0
[[test.workload]]
testName = 'Cycle'
nodeCount = 30000
keyPrefix= 'BeforeRestart'
expectedRate = 0