Merge branch 'master' of github.com:apple/foundationdb into add-fdbcli-tests

This commit is contained in:
Chaoguang Lin 2021-06-14 17:35:33 +00:00
commit 82c63c154d
44 changed files with 3833 additions and 1714 deletions

View File

@ -151,18 +151,46 @@ void* fdb_network_thread(void* args) {
return 0;
}
int genprefix(char* str, char* prefix, int prefixlen, int prefixpadding, int rows, int len) {
const int rowdigit = digits(rows);
const int paddinglen = len - (prefixlen + rowdigit) - 1;
int offset = 0;
if (prefixpadding) {
memset(str, 'x', paddinglen);
offset += paddinglen;
}
memcpy(str + offset, prefix, prefixlen);
str[len - 1] = '\0';
return offset + prefixlen;
}
/* cleanup database */
int cleanup(FDBTransaction* transaction, mako_args_t* args) {
struct timespec timer_start, timer_end;
char beginstr[7];
char endstr[7];
char* prefixstr = (char*)malloc(sizeof(char) * args->key_length + 1);
if (!prefixstr)
return -1;
char* beginstr = (char*)malloc(sizeof(char) * args->key_length + 1);
if (!beginstr) {
free(prefixstr);
return -1;
}
char* endstr = (char*)malloc(sizeof(char) * args->key_length + 1);
if (!endstr) {
free(prefixstr);
free(beginstr);
return -1;
}
int len = genprefix(prefixstr, KEYPREFIX, KEYPREFIXLEN, args->prefixpadding, args->rows, args->key_length + 1);
snprintf(beginstr, len + 2, "%s%c", prefixstr, 0x00);
snprintf(endstr, len + 2, "%s%c", prefixstr, 0xff);
free(prefixstr);
len += 1;
strncpy(beginstr, "mako", 4);
beginstr[4] = 0x00;
strncpy(endstr, "mako", 4);
endstr[4] = 0xff;
clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_start);
fdb_transaction_clear_range(transaction, (uint8_t*)beginstr, 5, (uint8_t*)endstr, 5);
fdb_transaction_clear_range(transaction, (uint8_t*)beginstr, len + 1, (uint8_t*)endstr, len + 1);
if (commit_transaction(transaction) != FDB_SUCCESS)
goto failExit;
@ -172,9 +200,16 @@ int cleanup(FDBTransaction* transaction, mako_args_t* args) {
"INFO: Clear range: %6.3f sec\n",
((timer_end.tv_sec - timer_start.tv_sec) * 1000000000.0 + timer_end.tv_nsec - timer_start.tv_nsec) /
1000000000);
free(beginstr);
free(endstr);
return 0;
failExit:
free(beginstr);
free(endstr);
fprintf(stderr, "ERROR: FDB failure in cleanup()\n");
return -1;
}
@ -220,7 +255,7 @@ int populate(FDBTransaction* transaction,
for (i = begin; i <= end; i++) {
/* sequential keys */
genkey(keystr, i, args->rows, args->key_length + 1);
genkey(keystr, KEYPREFIX, KEYPREFIXLEN, args->prefixpadding, i, args->rows, args->key_length + 1);
/* random values */
randstr(valstr, args->value_length + 1);
@ -512,7 +547,7 @@ retryTxn:
} else {
keynum = urand(0, args->rows - 1);
}
genkey(keystr, keynum, args->rows, args->key_length + 1);
genkey(keystr, KEYPREFIX, KEYPREFIXLEN, args->prefixpadding, keynum, args->rows, args->key_length + 1);
/* range */
if (args->txnspec.ops[i][OP_RANGE] > 0) {
@ -520,7 +555,7 @@ retryTxn:
if (keyend > args->rows - 1) {
keyend = args->rows - 1;
}
genkey(keystr2, keyend, args->rows, args->key_length + 1);
genkey(keystr2, KEYPREFIX, KEYPREFIXLEN, args->prefixpadding, keyend, args->rows, args->key_length + 1);
}
if (stats->xacts % args->sampling == 0) {
@ -1354,6 +1389,7 @@ int init_args(mako_args_t* args) {
args->flatbuffers = 0; /* internal */
args->knobs[0] = '\0';
args->log_group[0] = '\0';
args->prefixpadding = 0;
args->trace = 0;
args->tracepath[0] = '\0';
args->traceformat = 0; /* default to client's default (XML) */
@ -1515,6 +1551,7 @@ void usage() {
printf("%-24s %s\n", "-z, --zipf", "Use zipfian distribution instead of uniform distribution");
printf("%-24s %s\n", " --commitget", "Commit GETs");
printf("%-24s %s\n", " --loggroup=LOGGROUP", "Set client log group");
printf("%-24s %s\n", " --prefix_padding", "Pad key by prefixing data (Default: postfix padding)");
printf("%-24s %s\n", " --trace", "Enable tracing");
printf("%-24s %s\n", " --tracepath=PATH", "Set trace file path");
printf("%-24s %s\n", " --trace_format <xml|json>", "Set trace format (Default: json)");
@ -1567,6 +1604,7 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
{ "zipf", no_argument, NULL, 'z' },
{ "commitget", no_argument, NULL, ARG_COMMITGET },
{ "flatbuffers", no_argument, NULL, ARG_FLATBUFFERS },
{ "prefix_padding", no_argument, NULL, ARG_PREFIXPADDING },
{ "trace", no_argument, NULL, ARG_TRACE },
{ "txntagging", required_argument, NULL, ARG_TXNTAGGING },
{ "txntagging_prefix", required_argument, NULL, ARG_TXNTAGGINGPREFIX },
@ -1670,6 +1708,9 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
case ARG_LOGGROUP:
memcpy(args->log_group, optarg, strlen(optarg) + 1);
break;
case ARG_PREFIXPADDING:
args->prefixpadding = 1;
break;
case ARG_TRACE:
args->trace = 1;
break;

View File

@ -69,6 +69,7 @@ enum Arguments {
ARG_KNOBS,
ARG_FLATBUFFERS,
ARG_LOGGROUP,
ARG_PREFIXPADDING,
ARG_TRACE,
ARG_TRACEPATH,
ARG_TRACEFORMAT,
@ -125,6 +126,7 @@ typedef struct {
mako_txnspec_t txnspec;
char cluster_file[PATH_MAX];
char log_group[LOGGROUP_MAX];
int prefixpadding;
int trace;
char tracepath[PATH_MAX];
int traceformat; /* 0 - XML, 1 - JSON */

View File

@ -3,6 +3,7 @@
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* uniform-distribution random */
int urand(int low, int high) {
@ -67,15 +68,16 @@ int digits(int num) {
}
/* generate a key for a given key number */
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
/* len is the buffer size, key length + null */
void genkey(char* str, int num, int rows, int len) {
int i;
int rowdigit = digits(rows);
sprintf(str, KEYPREFIX "%0.*d", rowdigit, num);
for (i = (KEYPREFIXLEN + rowdigit); i < len - 1; i++) {
str[i] = 'x';
}
str[len - 1] = '\0';
void genkey(char* str, char* prefix, int prefixlen, int prefixpadding, int num, int rows, int len) {
const int rowdigit = digits(rows);
const int prefixoffset = prefixpadding ? len - (prefixlen + rowdigit) - 1 : 0;
char* prefixstr = (char*)alloca(sizeof(char) * (prefixlen + rowdigit + 1));
snprintf(prefixstr, prefixlen + rowdigit + 1, "%s%0.*d", prefix, rowdigit, num);
memset(str, 'x', len);
memcpy(str + prefixoffset, prefixstr, prefixlen + rowdigit);
str[len - 1] = '\0';
}
/* This is another sorting algorithm used to calculate latency parameters */

View File

@ -47,8 +47,9 @@ int compute_thread_portion(int val, int p_idx, int t_idx, int total_p, int total
int digits(int num);
/* generate a key for a given key number */
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
/* len is the buffer size, key length + null */
void genkey(char* str, int num, int rows, int len);
void genkey(char* str, char* prefix, int prefixlen, int prefixpadding, int num, int rows, int len);
#if 0
// The main function is to sort arr[] of size n using Radix Sort

View File

@ -59,7 +59,7 @@ It can be stopped and prevented from starting at boot as follows::
Start, stop and restart behavior
=================================
These commands above start and stop the master ``fdbmonitor`` process, which in turn starts ``fdbserver`` and ``backup-agent`` processes. See :ref:`administration_fdbmonitor` for details.
These commands above start and stop the ``fdbmonitor`` process, which in turn starts ``fdbserver`` and ``backup-agent`` processes. See :ref:`administration_fdbmonitor` for details.
After any child process has terminated by any reason, ``fdbmonitor`` tries to restart it. See :ref:`restarting parameters <configuration-restarting>`.

View File

@ -64,10 +64,12 @@ The ``commit`` command commits the current transaction. Any sets or clears execu
configure
---------
The ``configure`` command changes the database configuration. Its syntax is ``configure [new] [single|double|triple|three_data_hall|three_datacenter] [ssd|memory] [grv_proxies=<N>] [commit_proxies=<N>] [resolvers=<N>] [logs=<N>]``.
The ``configure`` command changes the database configuration. Its syntax is ``configure [new|tss] [single|double|triple|three_data_hall|three_datacenter] [ssd|memory] [grv_proxies=<N>] [commit_proxies=<N>] [resolvers=<N>] [logs=<N>] [count=<TSS_COUNT>] [perpetual_storage_wiggle=<WIGGLE_SPEED>]``.
The ``new`` option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. When ``new`` is used, both a redundancy mode and a storage engine must be specified.
The ``tss`` option, if present, changes the Testing Storage Server (TSS) configuration for a cluster. When used for the first time, both a count and a storage engine must be specified. For more details, see :ref:`testing-storage-server`.
redundancy mode
^^^^^^^^^^^^^^^
@ -107,6 +109,11 @@ Set the process using ``configure [grv_proxies|commit_proxies|resolvers|logs]=<N
For recommendations on appropriate values for process types in large clusters, see :ref:`guidelines-process-class-config`.
perpetual storage wiggle
^^^^^^^^^^^^^^^^^^^^^^^^
Set the value speed (a.k.a., the number of processes that the Data Distributor should wiggle at a time). Currently, only 0 and 1 are supported. The value 0 means to disable the perpetual storage wiggle.
consistencycheck
----------------
@ -496,3 +503,21 @@ Disables writing from ``fdbcli`` (the default). In this mode, attempting to set
``writemode on``
Enables writing from ``fdbcli``.
tssq
----
Utility commands for handling quarantining Testing Storage Servers. For more information on this, see :ref:`testing-storage-server`.
``tssq start <StorageUID>``
Manually quarantines a TSS process, if it is not already quarantined.
``tssq stop <StorageUID>``
Removes a TSS process from quarantine, disposing of the TSS and allowing Data Distribution to recruit a new storage process on the worker.
``tssq list``:
Lists the storage UIDs of all TSS processes currently in quarantine.

View File

@ -530,7 +530,7 @@
"hz":0.0,
"counter":0,
"roughness":0.0
},
},
"low_priority_reads":{ // measures number of incoming low priority read requests
"hz":0.0,
"counter":0,
@ -702,7 +702,8 @@
"auto_resolvers":1,
"auto_logs":3,
"backup_worker_enabled":1,
"commit_proxies":5 // this field will be absent if a value has not been explicitly set
"commit_proxies":5, // this field will be absent if a value has not been explicitly set
"proxies":6 // this field will be absent if a value has not been explicitly set
},
"data":{
"least_operating_space_bytes_log_server":0,

View File

@ -26,6 +26,8 @@ Ready to operate an externally accessible FoundationDB cluster? You'll find what
* :doc:`transaction-tagging` gives an overview of transaction tagging, including details about throttling particular tags.
* :doc:`tss` gives an overview of the Testing Storage Server feature of FoundationDB, which allows you to safely run an untrusted storage engine in a production cluster.
.. toctree::
:maxdepth: 2
:titlesonly:
@ -41,3 +43,4 @@ Ready to operate an externally accessible FoundationDB cluster? You'll find what
disk-snapshot-backup
platforms
transaction-tagging
tss

View File

@ -10,12 +10,14 @@ Release Notes
Features
--------
* Added a new API in all bindings that can be used to get a list of split points that will split the given range into (roughly) equally sized chunks. `(PR #3394) <https://github.com/apple/foundationdb/pull/3394>`_
* Added support for writing backup files directly to Azure blob storage. This is not yet performance tested on large-scale clusters. `(PR #3961) <https://github.com/apple/foundationdb/pull/3961>`_
Performance
-----------
* Improved Deque copy performance. `(PR #3197) <https://github.com/apple/foundationdb/pull/3197>`_
* Increased performance of dr_agent when copying the mutation log. The ``COPY_LOG_BLOCK_SIZE``, ``COPY_LOG_BLOCKS_PER_TASK``, ``COPY_LOG_PREFETCH_BLOCKS``, ``COPY_LOG_READ_AHEAD_BYTES`` and ``COPY_LOG_TASK_DURATION_NANOS`` knobs can be set. `(PR #3436) <https://github.com/apple/foundationdb/pull/3436>`_
* Added multiple new microbenchmarks for PromiseStream, Reference, IRandom, and timer, as well as support for benchmarking actors. `(PR #3590) <https://github.com/apple/foundationdb/pull/3590>`_
* Use xxhash3 for SQLite page checksums. `(PR #4075) <https://github.com/apple/foundationdb/pull/4075>`_
* Reduced the number of connections required by the multi-version client when loading external clients. When connecting to 7.0 clusters, only one connection with version 6.2 or larger will be used. With older clusters, at most two connections with version 6.2 or larger will be used. Clients older than version 6.2 will continue to create an additional connection each. `(PR #4667) <https://github.com/apple/foundationdb/pull/4667>`_
* Reduce CPU overhead of load balancing on client processes. `(PR #4561) <https://github.com/apple/foundationdb/pull/4561>`_
@ -31,6 +33,7 @@ Fixes
Status
------
* Added limiting metrics (limiting_storage_durability_lag and limiting_storage_queue) to health metrics. `(PR #4067) <https://github.com/apple/foundationdb/pull/4067>`_
* Added ``commit_batching_window_size`` to the proxy roles section of status to record statistics about commit batching window size on each proxy. `(PR #4735) <https://github.com/apple/foundationdb/pull/4735>`_
* Added ``cluster.bounce_impact`` section to status to report if there will be any extra effects when bouncing the cluster, and if so, the reason for those effects. `(PR #4770) <https://github.com/apple/foundationdb/pull/4770>`_
* Added ``fetched_versions`` to the storage metrics section of status to report how fast a storage server is catching up in versions. `(PR #4770) <https://github.com/apple/foundationdb/pull/4770>`_
@ -41,15 +44,17 @@ Bindings
--------
* Python: The function ``get_estimated_range_size_bytes`` will now throw an error if the ``begin_key`` or ``end_key`` is ``None``. `(PR #3394) <https://github.com/apple/foundationdb/pull/3394>`_
* C: Added a function, ``fdb_database_reboot_worker``, to reboot or suspend the specified process. `(PR #4094) <https://github.com/apple/foundationdb/pull/4094>`_
* C: Added a function, ``fdb_database_force_recovery_with_data_loss``, to force the database to recover into the given datacenter. `(PR #4420) <https://github.com/apple/foundationdb/pull/4220>`_
* C: Added a function, ``fdb_database_create_snapshot``, to create a snapshot of the database. `(PR #4241) <https://github.com/apple/foundationdb/pull/4241>`_
* C: Added a function, ``fdb_database_force_recovery_with_data_loss``, to force the database to recover into the given datacenter. `(PR #4220) <https://github.com/apple/foundationdb/pull/4220>`_
* C: Added a function, ``fdb_database_create_snapshot``, to create a snapshot of the database. `(PR #4241) <https://github.com/apple/foundationdb/pull/4241/files>`_
* C: Added ``fdb_database_get_main_thread_busyness`` function to report how busy a client's main thread is. `(PR #4504) <https://github.com/apple/foundationdb/pull/4504>`_
* Java: Added ``Database.getMainThreadBusyness`` function to report how busy a client's main thread is. `(PR #4564) <https://github.com/apple/foundationdb/pull/4564>`_
* Java: Added ``Database.getMainThreadBusyness`` function to report how busy a client's main thread is. `(PR #4564) <https://github.com/apple/foundationdb/pull/4564>`_
Other Changes
-------------
* When ``fdbmonitor`` dies, all of its child processes are now killed. `(PR #3841) <https://github.com/apple/foundationdb/pull/3841>`_
* The ``foundationdb`` service installed by the RPM packages will now automatically restart ``fdbmonitor`` after 60 seconds when it fails. `(PR #3841) <https://github.com/apple/foundationdb/pull/3841>`_
* Capture output of forked snapshot processes in trace events. `(PR #4254) <https://github.com/apple/foundationdb/pull/4254/files>`_
* Add ErrorKind field to Severity 40 trace events. `(PR #4741) <https://github.com/apple/foundationdb/pull/4741/files>`_
Earlier release notes
---------------------

View File

@ -0,0 +1,124 @@
.. _testing-storage-server:
############################
Testing Storage Server (TSS)
############################
.. include:: guide-common.rst.inc
This document covers the operation and architecture of the Testing Storage Server feature, or TSS for short.
.. _tss-introduction:
Summary
============
The TSS feature allows FoundationDB to run an "untrusted" storage engine (the *testing storage engine*) directly in a QA or production envronment with identical workload to the current storage engine, with zero impact on durability or correctness, and minimal impact on performance.
This allows a FoundationDB cluster operator to validate the correctness and performance of a different storage engine on the exact cluster workload before migrating data to the different storage engine.
A Testing Storage Server is paired to a normal Storage Server. Both servers in a new pair start empty and take on exactly the same data and serve exactly the same read requests. The SS and TSS responses are compared client-side to ensure that they match, and performance metrics are recorded for the pair.
Configuring TSS
===============
You can configure TSS via the FDB :ref:`command line interface <command-line-interface>`.
Because of the performance overhead of duplicating and comparing read requests, it is recommended to configure the number of TSS processes to a small percentage of the number of storage processes in the cluster (at most 5% - 10%). It is also recommended to add enough servers to the cluster before enabling TSS to end up with the same number of normal Storage processes as the cluster had before enabling TSS.
Because TSS recruitment only pairs *new* storage processes, you must add processes to the cluster and/or enable the *perpetual wiggle* to actually start recruiting TSS processes.
Example commands
----------------
Set the desired TSS processes count to 4, using the redwood storage engine: ``configure tss ssd-redwood-experimental count=4``.
Change the desired TSS process count to 2: ``configure tss count=2``.
Disable TSS on the cluster: ``configure tss count=0``.
Monitoring TSS
==============
The ``status`` command in the FDB :ref:`command line interface <command-line-interface>` will show the current and desired number of TSS processes in the cluster, and the full status JSON will include the full TSS configuration parameters.
Trace Events
----------------------
Whenever a client detects a *TSS Mismatch*, or when the SS and TSS response differ, and the difference can only be explained by different storage engine contents, it will emit an error-level trace event with a type starting with ``TSSMismatch``, with a different type for each read request. This trace event will include all of the information necessary to investgate the mismatch, such as the TSS storage ID, the full request data, and the summarized replies (full keys and checksummed values) from both the SS and TSS.
Each client emits a ``TSSClientMetrics`` trace event for each TSS pair in the cluster that it has sent requests to recently, similar to the ``TransactionMetrics`` trace event.
It contains the TSS storage ID, and latency statistics for each type of read request. It also includes a count of any mismatches, and a histogram of error codes recieved by the SS and TSS to ensure the storage engines have similar error rates and types.
The ``StorageMetrics`` trace event emitted by storage servers includes the storage ID of its pair if part of a TSS pairing, and includes a ``TSSJointID`` detail with a unique id for the SS/TSS pair that enables correlating the separate StorageMetrics events from the SS and TSS.
Quarantined TSS
---------------
If a *TSS Mismatch* is detected for a given TSS, instead of killing the TSS, it will be put into a *quarantined* state. In this state, the TSS doesn't respond to any data requests, but is still recruited on the worker, preventing a new storage process from replacing it.
This is so that the cluster operator can investigate the storage engine file of the TSS's *testing storage engine* to determine the cause of the data inconsistency.
You can also manually quarantine a TSS, or dispose of a quarantined TSS once you're done investigating using the ``tssq`` command in the FDB :ref:`command line interface <command-line-interface>`.
The typical flow of operations would be
* ``tssq start <StorageUID>``: manually quarantines a TSS process, if it is not already quarantined.
* ``tssq list``: lists all TSS processes currently in quarantine to see if any were automatically quarantined.
* Investigate the quarantined TSS to determine the cause of the mismatch.
* ``tssq stop <StorageUID>``: remove a TSS process from quarantine, disposing of the TSS and allowing Data Distribution to recruit a new storage process on the worker.
The Storage Consistency Check will also check TSS processes against the rest of that shard's team, and fail if there is a mismatch, but it will not automatically quarantine the offending TSS.
Other Failure Types
-------------------
The TSS feature is designed to insulate the cluster from an incorrect storage engine, but it is also designed to insulate the cluster as much as possible from a poor-performing storage engine.
Read Requests to TSS processes are never on the critical path for a client transaction, so a slow-responding TSS will not affect client latencies.
If the TSS falls behind its pair for ingesting new mutations or processing data movements, it will be killed by the cluster.
Architecture Overview
=====================
TSS Mapping
-----------
The *TSS Mapping*, the mapping containing the (SS Storage ID, TSS Storage ID) pairs for all active TSS pairs in the cluster, is stored in the system keyspace under ``\xff/tss/``.
This information is lazily propagated to clients in the ``KeyServerLocationsRequest`` from the Commit Proxies, which read the TSS mapping from the Transaction State Store.
Clients will use this mapping to duplicate any request to a storage server that has a TSS pair.
Other cluster components that need the full, up-to-date TSS mapping (MoveKeys, ConsistencyCheck, etc...) read it directly from the system keyspace as part of a transaction.
Recruitment
-----------
TSS Pair recruitment is designed to use the *Perpetual Storage Wiggle*, but will also work if multiple storage processes are added to the cluster.
When Data Distribution detects a deficit of TSS processes in the cluster, it will begin recruiting a TSS pair.
The pair recruitment logic is as follows:
* Once DD gets a candidate worker from the Cluster Controller, hold that worker as a desired TSS process.
* Once DD gets a second candidate worker from the Cluster Controller, initialize that worker as a normal SS.
* Once the second candidate worker is successfully initialized, initialize the first candidate worker as a TSS, passing it the storage ID, starting tag + version, and other information from its SS pair. Because the TSS reads from the same tag starting at the same version, it is guaranteed to recieve the same mutations and data movements as its pair.
One implication of this is, during TSS recruitment, the cluster is effectively down one storage process until a second storage process becomes available.
While clusters should be able to handle being down a single storage process anyway to tolerate machine failure, an active TSS recruitment will be cancelled if the lack of that single storage process is causing the cluster to be unhealthy. Similarly, if the cluster is unhealthy and unable to find new teams to replicate data to, any existing TSS processes may be killed to make room for new storage servers.
TSS Process
-----------
A TSS process is almost identical to a normal Storage process, with the exception that it does not pop its tag from the TLog mutation stream, and it listens to and processes private mutations for both itself and its SS pair.
Caveats
=======
Despite its usefulness, the TSS feature does not give a perfect performance comparison of the storage engine(s) under test.
Because it is only enabled on a small percentage of the cluster and only compares single storage processes for the same workload, it may miss potential aggregate performance problems, such as the testing storage engine overall consuming more cpu/memory, especially on a host with many storage instances.
TSS testing using the recommended small number of TSS pairs may also miss performance pathologies from workloads not experienced by the specific storage teams with TSS pairs in their membership.
TSS testing is not a substitute for full-cluster performance and correctness testing or simulation testing.

View File

@ -24,6 +24,7 @@
#include "fdbclient/IClientApi.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/Status.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
@ -495,10 +496,10 @@ void initHelp() {
"All keys between BEGINKEY (inclusive) and ENDKEY (exclusive) are cleared from the database. This command will "
"succeed even if the specified range is empty, but may fail because of conflicts." ESCAPINGK);
helpMap["configure"] = CommandHelp(
"configure [new] "
"configure [new|tss]"
"<single|double|triple|three_data_hall|three_datacenter|ssd|memory|memory-radixtree-beta|proxies=<PROXIES>|"
"commit_proxies=<COMMIT_PROXIES>|grv_proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*|"
"perpetual_storage_wiggle=<WIGGLE_SPEED>",
"count=<TSS_COUNT>|perpetual_storage_wiggle=<WIGGLE_SPEED>",
"change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing "
"the configuration of an existing one. When used, both a redundancy mode and a storage engine must be "
@ -663,6 +664,12 @@ void initHelp() {
CommandHelp("triggerddteaminfolog",
"trigger the data distributor teams logging",
"Trigger the data distributor to log detailed information about its teams.");
helpMap["tssq"] =
CommandHelp("tssq start|stop <StorageUID>",
"start/stop tss quarantine",
"Toggles Quarantine mode for a Testing Storage Server. Quarantine will happen automatically if the "
"TSS is detected to have incorrect data, but can also be initiated manually. You can also remove a "
"TSS from quarantine once your investigation is finished, which will destroy the TSS process.");
hiddenCommands.insert("expensive_data_check");
hiddenCommands.insert("datadistribution");
@ -1861,6 +1868,75 @@ ACTOR Future<Void> triggerDDTeamInfoLog(Database db) {
}
}
ACTOR Future<Void> tssQuarantineList(Database db) {
state ReadYourWritesTransaction tr(db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
RangeResult result = wait(tr.getRange(tssQuarantineKeys, CLIENT_KNOBS->TOO_MANY));
// shouldn't have many quarantined TSSes
ASSERT(!result.more);
printf("Found %d quarantined TSS processes%s\n", result.size(), result.size() == 0 ? "." : ":");
for (auto& it : result) {
printf(" %s\n", decodeTssQuarantineKey(it.key).toString().c_str());
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<bool> tssQuarantine(Database db, bool enable, UID tssId) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Do some validation first to make sure the command is valid
Optional<Value> serverListValue = wait(tr->get(serverListKeyFor(tssId)));
if (!serverListValue.present()) {
printf("No TSS %s found in cluster!\n", tssId.toString().c_str());
return false;
}
state StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
if (!ssi.isTss()) {
printf("Cannot quarantine Non-TSS storage ID %s!\n", tssId.toString().c_str());
return false;
}
Optional<Value> currentQuarantineValue = wait(tr->get(tssQuarantineKeyFor(tssId)));
if (enable && currentQuarantineValue.present()) {
printf("TSS %s already in quarantine, doing nothing.\n", tssId.toString().c_str());
return false;
} else if (!enable && !currentQuarantineValue.present()) {
printf("TSS %s is not in quarantine, cannot remove from quarantine!.\n", tssId.toString().c_str());
return false;
}
if (enable) {
tr->set(tssQuarantineKeyFor(tssId), LiteralStringRef(""));
// remove server from TSS mapping when quarantine is enabled
tssMapDB.erase(tr, ssi.tssPairID.get());
} else {
tr->clear(tssQuarantineKeyFor(tssId));
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
printf("Successfully %s TSS %s\n", enable ? "quarantined" : "removed", tssId.toString().c_str());
return true;
}
ACTOR Future<Void> timeWarning(double when, const char* msg) {
wait(delay(when));
fputs(msg, stderr);
@ -3414,6 +3490,31 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "tssq")) {
if (tokens.size() == 2) {
if (tokens[1] != LiteralStringRef("list")) {
printUsage(tokens[0]);
is_error = true;
} else {
wait(tssQuarantineList(db));
}
}
if (tokens.size() == 3) {
if ((tokens[1] != LiteralStringRef("start") && tokens[1] != LiteralStringRef("stop")) ||
(tokens[2].size() != 32) || !std::all_of(tokens[2].begin(), tokens[2].end(), &isxdigit)) {
printUsage(tokens[0]);
is_error = true;
} else {
bool enable = tokens[1] == LiteralStringRef("start");
UID tssId = UID::fromString(tokens[2].toString());
bool err = wait(tssQuarantine(db, enable, tssId));
if (err)
is_error = true;
}
}
continue;
}
if (tokencmp(tokens[0], "configure")) {
bool err = wait(configure(db, tokens, db->getConnectionFile(), &linenoise, warn));
if (err)

View File

@ -338,14 +338,26 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["regions"] = getRegionJSON();
}
// Add to the `proxies` count for backwards compatibility with tools built before 7.0.
int32_t proxyCount = -1;
if (desiredTLogCount != -1 || isOverridden("logs")) {
result["logs"] = desiredTLogCount;
}
if (commitProxyCount != -1 || isOverridden("commit_proxies")) {
result["commit_proxies"] = commitProxyCount;
if (proxyCount != -1) {
proxyCount += commitProxyCount;
} else {
proxyCount = commitProxyCount;
}
}
if (grvProxyCount != -1 || isOverridden("grv_proxies")) {
result["grv_proxies"] = grvProxyCount;
if (proxyCount != -1) {
proxyCount += grvProxyCount;
} else {
proxyCount = grvProxyCount;
}
}
if (resolverCount != -1 || isOverridden("resolvers")) {
result["resolvers"] = resolverCount;
@ -371,6 +383,9 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
if (autoDesiredTLogCount != CLIENT_KNOBS->DEFAULT_AUTO_LOGS || isOverridden("auto_logs")) {
result["auto_logs"] = autoDesiredTLogCount;
}
if (proxyCount != -1) {
result["proxies"] = proxyCount;
}
result["backup_worker_enabled"] = (int32_t)backupWorkerEnabled;
result["perpetual_storage_wiggle"] = perpetualStorageWiggleSpeed;

View File

@ -97,6 +97,7 @@ void ClientKnobs::initialize(bool randomize) {
init( DETAILED_HEALTH_METRICS_MAX_STALENESS, 5.0 );
init( MID_SHARD_SIZE_MAX_STALENESS, 10.0 );
init( TAG_ENCODE_KEY_SERVERS, false ); if( randomize && BUGGIFY ) TAG_ENCODE_KEY_SERVERS = true;
init( QUARANTINE_TSS_ON_MISMATCH, true ); if( randomize && BUGGIFY ) QUARANTINE_TSS_ON_MISMATCH = false; // if true, a tss mismatch will put the offending tss in quarantine. If false, it will just be killed
//KeyRangeMap
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;

View File

@ -90,6 +90,7 @@ public:
double DETAILED_HEALTH_METRICS_MAX_STALENESS;
double MID_SHARD_SIZE_MAX_STALENESS;
bool TAG_ENCODE_KEY_SERVERS;
bool QUARANTINE_TSS_ON_MISMATCH;
// KeyRangeMap
int KRM_GET_RANGE_LIMIT;

View File

@ -838,11 +838,12 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
}
}
if (found) {
TraceEvent(SevWarnAlways, "TSS_KillMismatch").detail("TSSID", tssID.toString());
TEST(true); // killing TSS because it got mismatch
state bool quarantine = CLIENT_KNOBS->QUARANTINE_TSS_ON_MISMATCH;
TraceEvent(SevWarnAlways, quarantine ? "TSS_QuarantineMismatch" : "TSS_KillMismatch")
.detail("TSSID", tssID.toString());
TEST(quarantine); // Quarantining TSS because it got mismatch
TEST(!quarantine); // Killing TSS because it got mismatch
// TODO we could write something to the system keyspace and then have DD listen to that keyspace and then DD
// do exactly this, so why not just cut out the middle man (or the middle system keys, as it were)
tr = makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(cx)));
state int tries = 0;
loop {
@ -850,7 +851,11 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->clear(serverTagKeyFor(tssID));
if (quarantine) {
tr->set(tssQuarantineKeyFor(tssID), LiteralStringRef(""));
} else {
tr->clear(serverTagKeyFor(tssID));
}
tssMapDB.erase(tr, tssPairID);
wait(tr->commit());
@ -861,16 +866,15 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
}
tries++;
if (tries > 10) {
// Give up on trying to kill the tss, it'll get another mismatch or a human will investigate
// eventually
TraceEvent("TSS_KillMismatchGaveUp").detail("TSSID", tssID.toString());
// Give up, it'll get another mismatch or a human will investigate eventually
TraceEvent("TSS_MismatchGaveUp").detail("TSSID", tssID.toString());
break;
}
}
// clear out txn so that the extra DatabaseContext ref gets decref'd and we can free cx
tr = makeReference<ReadYourWritesTransaction>();
} else {
TEST(true); // Not killing TSS with mismatch because it's already gone
TEST(true); // Not handling TSS with mismatch because it's already gone
}
}
}
@ -5878,21 +5882,20 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
}
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, bool lock_aware) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if(lock_aware) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lock_aware) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
}
tr.set(perpetualStorageWiggleKey, enable ? LiteralStringRef("1") : LiteralStringRef("0"));
wait(tr.commit());
break;
}
catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
tr.set(perpetualStorageWiggleKey, enable ? LiteralStringRef("1") : LiteralStringRef("0"));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

View File

@ -164,7 +164,11 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const
StringRef prefix = t.eat("://");
if (prefix != LiteralStringRef("blobstore"))
throw format("Invalid blobstore URL prefix '%s'", prefix.toString().c_str());
StringRef cred = t.eat("@");
Optional<StringRef> cred;
if (url.find("@") != std::string::npos) {
cred = t.eat("@");
}
uint8_t foundSeparator = 0;
StringRef hostPort = t.eatAny("/?", &foundSeparator);
StringRef resource;
@ -238,12 +242,15 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const
if (resourceFromURL != nullptr)
*resourceFromURL = resource.toString();
StringRef c(cred);
StringRef key = c.eat(":");
StringRef secret = c.eat();
Optional<S3BlobStoreEndpoint::Credentials> creds;
if (cred.present()) {
StringRef c(cred.get());
StringRef key = c.eat(":");
StringRef secret = c.eat();
creds = S3BlobStoreEndpoint::Credentials{ key.toString(), secret.toString() };
}
return makeReference<S3BlobStoreEndpoint>(
host.toString(), service.toString(), key.toString(), secret.toString(), knobs, extraHeaders);
return makeReference<S3BlobStoreEndpoint>(host.toString(), service.toString(), creds, knobs, extraHeaders);
} catch (std::string& err) {
if (error != nullptr)
@ -264,12 +271,17 @@ std::string S3BlobStoreEndpoint::getResourceURL(std::string resource, std::strin
hostPort.append(service);
}
// If secret isn't being looked up from credentials files then it was passed explicitly in th URL so show it here.
std::string s;
if (!lookupSecret)
s = std::string(":") + secret;
// If secret isn't being looked up from credentials files then it was passed explicitly in the URL so show it here.
std::string credsString;
if (credentials.present()) {
credsString = credentials.get().key;
if (!lookupSecret) {
credsString += ":" + credentials.get().secret;
}
credsString += "@";
}
std::string r = format("blobstore://%s%s@%s/%s", key.c_str(), s.c_str(), hostPort.c_str(), resource.c_str());
std::string r = format("blobstore://%s%s/%s", credsString.c_str(), hostPort.c_str(), resource.c_str());
// Get params that are deviations from knob defaults
std::string knobParams = knobs.getURLParameters();
@ -484,13 +496,18 @@ ACTOR Future<Void> updateSecret_impl(Reference<S3BlobStoreEndpoint> b) {
if (pFiles == nullptr)
return Void();
if (!b->credentials.present()) {
return Void();
}
state std::vector<Future<Optional<json_spirit::mObject>>> reads;
for (auto& f : *pFiles)
reads.push_back(tryReadJSONFile(f));
wait(waitForAll(reads));
std::string key = b->key + "@" + b->host;
std::string accessKey = b->credentials.get().key;
std::string credentialsFileKey = accessKey + "@" + b->host;
int invalid = 0;
@ -504,12 +521,12 @@ ACTOR Future<Void> updateSecret_impl(Reference<S3BlobStoreEndpoint> b) {
JSONDoc doc(f.get().get());
if (doc.has("accounts") && doc.last().type() == json_spirit::obj_type) {
JSONDoc accounts(doc.last().get_obj());
if (accounts.has(key, false) && accounts.last().type() == json_spirit::obj_type) {
if (accounts.has(credentialsFileKey, false) && accounts.last().type() == json_spirit::obj_type) {
JSONDoc account(accounts.last());
std::string secret;
// Once we find a matching account, use it.
if (account.tryGet("secret", secret)) {
b->secret = secret;
b->credentials = S3BlobStoreEndpoint::Credentials{ accessKey, secret };
return Void();
}
}
@ -644,6 +661,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<S3BlobStoreEndp
// This must be done AFTER the connection is ready because if credentials are coming from disk they are
// refreshed when a new connection is established and setAuthHeaders() would need the updated secret.
bstore->setAuthHeaders(verb, resource, headers);
remoteAddress = rconn.conn->getPeerAddress();
wait(bstore->requestRate->getAllowance(1));
Reference<HTTP::Response> _r = wait(timeoutError(HTTP::doRequest(rconn.conn,
@ -1041,8 +1059,8 @@ Future<std::vector<std::string>> S3BlobStoreEndpoint::listBuckets() {
return listBuckets_impl(Reference<S3BlobStoreEndpoint>::addRef(this));
}
std::string S3BlobStoreEndpoint::hmac_sha1(std::string const& msg) {
std::string key = secret;
std::string S3BlobStoreEndpoint::hmac_sha1(Credentials const& creds, std::string const& msg) {
std::string key = creds.secret;
// Hash key to shorten it if it is longer than SHA1 block size
if (key.size() > 64) {
@ -1067,6 +1085,11 @@ std::string S3BlobStoreEndpoint::hmac_sha1(std::string const& msg) {
}
void S3BlobStoreEndpoint::setAuthHeaders(std::string const& verb, std::string const& resource, HTTP::Headers& headers) {
if (!credentials.present()) {
return;
}
Credentials creds = credentials.get();
std::string& date = headers["Date"];
char dateBuf[20];
@ -1106,11 +1129,11 @@ void S3BlobStoreEndpoint::setAuthHeaders(std::string const& verb, std::string co
msg.resize(msg.size() - (resource.size() - q));
}
std::string sig = base64::encoder::from_string(hmac_sha1(msg));
std::string sig = base64::encoder::from_string(hmac_sha1(creds, msg));
// base64 encoded blocks end in \n so remove it.
sig.resize(sig.size() - 1);
std::string auth = "AWS ";
auth.append(key);
auth.append(creds.key);
auth.append(":");
auth.append(sig);
headers["Authorization"] = auth;

View File

@ -46,6 +46,11 @@ public:
static Stats s_stats;
struct Credentials {
std::string key;
std::string secret;
};
struct BlobKnobs {
BlobKnobs();
int secure_connection, connect_tries, connect_timeout, max_connection_life, request_tries, request_timeout_min,
@ -92,12 +97,11 @@ public:
S3BlobStoreEndpoint(std::string const& host,
std::string service,
std::string const& key,
std::string const& secret,
Optional<Credentials> const& creds,
BlobKnobs const& knobs = BlobKnobs(),
HTTP::Headers extraHeaders = HTTP::Headers())
: host(host), service(service), key(key), secret(secret), lookupSecret(secret.empty()), knobs(knobs),
extraHeaders(extraHeaders), requestRate(new SpeedLimit(knobs.requests_per_second, 1)),
: host(host), service(service), credentials(creds), lookupSecret(creds.present() && creds.get().secret.empty()),
knobs(knobs), extraHeaders(extraHeaders), requestRate(new SpeedLimit(knobs.requests_per_second, 1)),
requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)),
requestRateWrite(new SpeedLimit(knobs.write_requests_per_second, 1)),
requestRateRead(new SpeedLimit(knobs.read_requests_per_second, 1)),
@ -114,7 +118,7 @@ public:
const char* resource = "";
if (withResource)
resource = "<name>";
return format("blobstore://<api_key>:<secret>@<host>[:<port>]/%s[?<param>=<value>[&<param>=<value>]...]",
return format("blobstore://[<api_key>:<secret>@]<host>[:<port>]/%s[?<param>=<value>[&<param>=<value>]...]",
resource);
}
@ -142,8 +146,7 @@ public:
std::string host;
std::string service;
std::string key;
std::string secret;
Optional<Credentials> credentials;
bool lookupSecret;
BlobKnobs knobs;
HTTP::Headers extraHeaders;
@ -163,7 +166,7 @@ public:
Future<Void> updateSecret();
// Calculates the authentication string from the secret key
std::string hmac_sha1(std::string const& msg);
static std::string hmac_sha1(Credentials const& creds, std::string const& msg);
// Sets headers needed for Authorization (including Date which will be overwritten if present)
void setAuthHeaders(std::string const& verb, std::string const& resource, HTTP::Headers& headers);

View File

@ -755,6 +755,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"auto_logs":3,
"commit_proxies":5,
"grv_proxies":1,
"proxies":6,
"backup_worker_enabled":1,
"perpetual_storage_wiggle":0
},

View File

@ -348,6 +348,22 @@ uint16_t cacheChangeKeyDecodeIndex(const KeyRef& key) {
const KeyRangeRef tssMappingKeys(LiteralStringRef("\xff/tss/"), LiteralStringRef("\xff/tss0"));
const KeyRangeRef tssQuarantineKeys(LiteralStringRef("\xff/tssQ/"), LiteralStringRef("\xff/tssQ0"));
const Key tssQuarantineKeyFor(UID serverID) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(tssQuarantineKeys.begin);
wr << serverID;
return wr.toValue();
}
UID decodeTssQuarantineKey(KeyRef const& key) {
UID serverID;
BinaryReader rd(key.removePrefix(tssQuarantineKeys.begin), Unversioned());
rd >> serverID;
return serverID;
}
const KeyRangeRef serverTagKeys(LiteralStringRef("\xff/serverTag/"), LiteralStringRef("\xff/serverTag0"));
const KeyRef serverTagPrefix = serverTagKeys.begin;

View File

@ -114,9 +114,16 @@ extern const KeyRef cacheChangePrefix;
const Key cacheChangeKeyFor(uint16_t idx);
uint16_t cacheChangeKeyDecodeIndex(const KeyRef& key);
// "\xff/tss/[[serverId]]" := "[[tssId]]"
// "\xff/tss/[[serverId]]" := "[[tssId]]"
extern const KeyRangeRef tssMappingKeys;
// "\xff/tssQ/[[serverId]]" := ""
// For quarantining a misbehaving TSS.
extern const KeyRangeRef tssQuarantineKeys;
const Key tssQuarantineKeyFor(UID serverID);
UID decodeTssQuarantineKey(KeyRef const&);
// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
// Provides the Tag for the given serverID. Used to access a
// storage server's corresponding TLog in order to apply mutations.

View File

@ -1484,3 +1484,133 @@ TEST_CASE("/flow/flow/PromiseStream/move2") {
ASSERT(movedTracker.copied == 0);
return Void();
}
constexpr double mutexTestDelay = 0.00001;
ACTOR Future<Void> mutexTest(int id, FlowMutex* mutex, int n, bool allowError, bool* verbose) {
while (n-- > 0) {
state double d = deterministicRandom()->random01() * mutexTestDelay;
if (*verbose) {
printf("%d:%d wait %f while unlocked\n", id, n, d);
}
wait(delay(d));
if (*verbose) {
printf("%d:%d locking\n", id, n);
}
state FlowMutex::Lock lock = wait(mutex->take());
if (*verbose) {
printf("%d:%d locked\n", id, n);
}
d = deterministicRandom()->random01() * mutexTestDelay;
if (*verbose) {
printf("%d:%d wait %f while locked\n", id, n, d);
}
wait(delay(d));
// On the last iteration, send an error or drop the lock if allowError is true
if (n == 0 && allowError) {
if (deterministicRandom()->coinflip()) {
// Send explicit error
if (*verbose) {
printf("%d:%d sending error\n", id, n);
}
lock.error(end_of_stream());
} else {
// Do nothing
if (*verbose) {
printf("%d:%d dropping promise, returning without unlock\n", id, n);
}
}
} else {
if (*verbose) {
printf("%d:%d unlocking\n", id, n);
}
lock.release();
}
}
if (*verbose) {
printf("%d Returning\n", id);
}
return Void();
}
TEST_CASE("/flow/flow/FlowMutex") {
state int count = 100000;
// Default verboseness
state bool verboseSetting = false;
// Useful for debugging, enable verbose mode for this iteration number
state int verboseTestIteration = -1;
try {
state bool verbose = verboseSetting || count == verboseTestIteration;
while (--count > 0) {
if (count % 1000 == 0) {
printf("%d tests left\n", count);
}
state FlowMutex mutex;
state std::vector<Future<Void>> tests;
state bool allowErrors = deterministicRandom()->coinflip();
if (verbose) {
printf("\nTesting allowErrors=%d\n", allowErrors);
}
state Optional<Error> error;
try {
for (int i = 0; i < 10; ++i) {
tests.push_back(mutexTest(i, &mutex, 10, allowErrors, &verbose));
}
wait(waitForAll(tests));
if (allowErrors) {
if (verbose) {
printf("Final wait in case error was injected by the last actor to finish\n");
}
wait(success(mutex.take()));
}
} catch (Error& e) {
if (verbose) {
printf("Caught error %s\n", e.what());
}
error = e;
// Wait for all actors still running to finish their waits and try to take the mutex
if (verbose) {
printf("Waiting for completions\n");
}
wait(delay(2 * mutexTestDelay));
if (verbose) {
printf("Future end states:\n");
}
// All futures should be ready, some with errors.
bool allReady = true;
for (int i = 0; i < tests.size(); ++i) {
auto f = tests[i];
if (verbose) {
printf(
" %d: %s\n", i, f.isReady() ? (f.isError() ? f.getError().what() : "done") : "not ready");
}
allReady = allReady && f.isReady();
}
ASSERT(allReady);
}
// If an error was caused, one should have been detected.
// Otherwise, no errors should be detected.
ASSERT(error.present() == allowErrors);
}
} catch (Error& e) {
printf("Error at count=%d\n", count + 1);
ASSERT(false);
}
return Void();
}

View File

@ -52,7 +52,7 @@ void applyMetadataMutations(SpanID const& spanContext,
Arena& arena,
VectorRef<MutationRef> const& mutations,
IKeyValueStore* txnStateStore,
LogPushData* toCommit,
LogPushData* toCommit, // non-null if these mutations were part of a new commit handled by this commit proxy
bool& confChange,
Reference<ILogSystem> logSystem,
Version popVersion,
@ -66,7 +66,8 @@ void applyMetadataMutations(SpanID const& spanContext,
std::map<UID, Reference<StorageInfo>>* storageCache,
std::map<Tag, Version>* tag_popped,
std::unordered_map<UID, StorageServerInterface>* tssMapping,
bool initialCommit) {
bool initialCommit // true if the mutations were already written to the txnStateStore as part of recovery
) {
// std::map<keyRef, vector<uint16_t>> cacheRangeInfo;
std::map<KeyRef, MutationRef> cachedRangeInfo;
@ -245,24 +246,45 @@ void applyMetadataMutations(SpanID const& spanContext,
}
}
} else if (m.param1.startsWith(tssMappingKeys.begin)) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
if (tssMapping) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
}
if (tssMapping) {
tssMappingToAdd.push_back(std::pair(ssId, tssId));
}
tssMappingToAdd.push_back(std::pair(ssId, tssId));
if (toCommit) {
// send private mutation to SS that it now has a TSS pair
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
// send private mutation to SS that it now has a TSS pair
if (toCommit) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
} else if (m.param1.startsWith(tssQuarantineKeys.begin)) {
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
if (toCommit) {
UID tssId = decodeTssQuarantineKey(m.param1);
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get();
if (ssiV.present()) {
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
if (ssi.isTss()) {
Optional<Value> tagV =
txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
}
}
@ -510,15 +532,52 @@ void applyMetadataMutations(SpanID const& spanContext,
txnStateStore->clear(range & serverTagHistoryKeys);
}
if (tssMappingKeys.intersects(range)) {
KeyRangeRef rangeToClear = range & tssMappingKeys;
ASSERT(rangeToClear.singleKeyRange());
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
if (!initialCommit) {
KeyRangeRef rangeToClear = range & tssMappingKeys;
txnStateStore->clear(rangeToClear);
}
if (tssMapping) {
tssMapping->erase(ssId);
}
if (toCommit) {
// send private mutation to SS to notify that it no longer has a tss pair
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
}
if (tssQuarantineKeys.intersects(range)) {
if (!initialCommit) {
KeyRangeRef rangeToClear = range & tssQuarantineKeys;
ASSERT(rangeToClear.singleKeyRange());
txnStateStore->clear(rangeToClear);
if (tssMapping) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId =
Codec<UID>::unpack(Tuple::unpack(rangeToClear.begin.removePrefix(tssMappingKeys.begin)));
tssMapping->erase(ssId);
if (toCommit) {
UID tssId = decodeTssQuarantineKey(m.param1);
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get();
if (ssiV.present()) {
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
if (ssi.isTss()) {
Optional<Value> tagV =
txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
}
}
}
}

View File

@ -27,7 +27,6 @@ set(FDBSERVER_SRCS
IKeyValueContainer.h
IKeyValueStore.h
IPager.h
IVersionedStore.h
KeyValueStoreCompressTestData.actor.cpp
KeyValueStoreMemory.actor.cpp
KeyValueStoreRocksDB.actor.cpp

View File

@ -1439,9 +1439,9 @@ void maybeAddTssMapping(GetKeyServerLocationsReply& reply,
if (!included.count(ssId)) {
auto mappingItr = commitData->tssMapping.find(ssId);
if (mappingItr != commitData->tssMapping.end()) {
included.insert(ssId);
reply.resultsTssMapping.push_back(*mappingItr);
}
included.insert(ssId);
}
}

View File

@ -26,6 +26,14 @@
#include "fdbserver/Knobs.h"
#include <string.h>
#define DELTATREE_DEBUG 0
#if DELTATREE_DEBUG
#define deltatree_printf(...) printf(__VA_ARGS__)
#else
#define deltatree_printf(...)
#endif
typedef uint64_t Word;
// Get the number of prefix bytes that are the same between a and b, up to their common length of cl
static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) {
@ -198,10 +206,6 @@ struct DeltaTree {
smallOffsets.left = offset;
}
}
int size(bool large) const {
return delta(large).size() + (large ? sizeof(smallOffsets) : sizeof(largeOffsets));
}
};
static constexpr int SmallSizeLimit = std::numeric_limits<uint16_t>::max();
@ -356,8 +360,6 @@ public:
Mirror(const void* treePtr = nullptr, const T* lowerBound = nullptr, const T* upperBound = nullptr)
: tree((DeltaTree*)treePtr), lower(lowerBound), upper(upperBound) {
// TODO: Remove these copies into arena and require users of Mirror to keep prev and next alive during its
// lifetime
lower = new (arena) T(arena, *lower);
upper = new (arena) T(arena, *upper);
@ -875,7 +877,10 @@ private:
int deltaSize = item.writeDelta(node.delta(largeNodes), *base, commonPrefix);
node.delta(largeNodes).setPrefixSource(prefixSourcePrev);
// printf("Serialized %s to %p\n", item.toString().c_str(), &root.delta(largeNodes));
deltatree_printf("Serialized %s to offset %d data: %s\n",
item.toString().c_str(),
(uint8_t*)&node - (uint8_t*)this,
StringRef((uint8_t*)&node.delta(largeNodes), deltaSize).toHexString().c_str());
// Continue writing after the serialized Delta.
uint8_t* wptr = (uint8_t*)&node.delta(largeNodes) + deltaSize;
@ -899,3 +904,823 @@ private:
return wptr - (uint8_t*)&node;
}
};
// DeltaTree2 is a memory mappable binary tree of T objects such that each node's item is
// stored as a Delta which can reproduce the node's T item given either
// - The node's greatest lesser ancestor, called the "left parent"
// - The node's least greater ancestor, called the "right parent"
// One of these ancestors will also happen to be the node's direct parent.
//
// The Delta type is intended to make use of ordered prefix compression and borrow all
// available prefix bytes from the ancestor T which shares the most prefix bytes with
// the item T being encoded. If T is implemented properly, this results in perfect
// prefix compression while performing O(log n) comparisons for a seek.
//
// T requirements
//
// Must be compatible with Standalone<T> and must implement the following additional things:
//
// // Return the common prefix length between *this and T
// // skipLen is a hint, representing the length that is already known to be common.
// int getCommonPrefixLen(const T& other, int skipLen) const;
//
// // Compare *this to rhs, returns < 0 for less than, 0 for equal, > 0 for greater than
// // skipLen is a hint, representing the length that is already known to be common.
// int compare(const T &rhs, int skipLen) const;
//
// // Writes to d a delta which can create *this from base
// // commonPrefix is a hint, representing the length that is already known to be common.
// // DeltaT's size need not be static, for more details see below.
// void writeDelta(DeltaT &d, const T &base, int commonPrefix) const;
//
// // Returns the size in bytes of the DeltaT required to recreate *this from base
// int deltaSize(const T &base) const;
//
// // A type which represents the parts of T that either borrowed from the base T
// // or can be borrowed by other T's using the first T as a base
// // Partials must allocate any heap storage in the provided Arena for any operation.
// typedef Partial;
//
// // Update cache with the Partial for *this, storing any heap memory for the Partial in arena
// void updateCache(Optional<Partial> cache, Arena& arena) const;
//
// // For debugging, return a useful human-readable string representation of *this
// std::string toString() const;
//
// DeltaT requirements
//
// DeltaT can be variable sized, larger than sizeof(DeltaT), and implement the following:
//
// // Returns the size in bytes of this specific DeltaT instance
// int size();
//
// // Apply *this to base and return the resulting T
// // Store the Partial for T into cache, allocating any heap memory for the Partial in arena
// T apply(Arena& arena, const T& base, Optional<T::Partial>& cache);
//
// // Recreate T from *this and the Partial for T
// T apply(const T::Partial& cache);
//
// // Set or retrieve a boolean flag representing which base ancestor the DeltaT is to be applied to
// void setPrefixSource(bool val);
// bool getPrefixSource() const;
//
// // Set of retrieve a boolean flag representing that a DeltaTree node has been erased
// void setDeleted(bool val);
// bool getDeleted() const;
//
// // For debugging, return a useful human-readable string representation of *this
// std::string toString() const;
//
#pragma pack(push, 1)
template <typename T, typename DeltaT = typename T::Delta>
struct DeltaTree2 {
typedef typename T::Partial Partial;
struct {
uint16_t numItems; // Number of items in the tree.
uint32_t nodeBytesUsed; // Bytes used by nodes (everything after the tree header)
uint32_t nodeBytesFree; // Bytes left at end of tree to expand into
uint32_t nodeBytesDeleted; // Delta bytes deleted from tree. Note that some of these bytes could be borrowed by
// descendents.
uint8_t initialHeight; // Height of tree as originally built
uint8_t maxHeight; // Maximum height of tree after any insertion. Value of 0 means no insertions done.
bool largeNodes; // Node size, can be calculated as capacity > SmallSizeLimit but it will be used a lot
};
// Node is not fixed size. Most node methods require the context of whether the node is in small or large
// offset mode, passed as a boolean
struct Node {
// Offsets are relative to the start of the DeltaTree
union {
struct {
uint32_t leftChild;
uint32_t rightChild;
} largeOffsets;
struct {
uint16_t leftChild;
uint16_t rightChild;
} smallOffsets;
};
static int headerSize(bool large) { return large ? sizeof(largeOffsets) : sizeof(smallOffsets); }
// Delta is located after the offsets, which differs by node size
DeltaT& delta(bool large) { return large ? *(DeltaT*)(&largeOffsets + 1) : *(DeltaT*)(&smallOffsets + 1); };
// Delta is located after the offsets, which differs by node size
const DeltaT& delta(bool large) const {
return large ? *(DeltaT*)(&largeOffsets + 1) : *(DeltaT*)(&smallOffsets + 1);
};
std::string toString(DeltaTree2* tree) const {
return format("Node{offset=%d leftChild=%d rightChild=%d delta=%s}",
tree->nodeOffset(this),
getLeftChildOffset(tree->largeNodes),
getRightChildOffset(tree->largeNodes),
delta(tree->largeNodes).toString().c_str());
}
#define getMember(m) (large ? largeOffsets.m : smallOffsets.m)
#define setMember(m, v) \
if (large) { \
largeOffsets.m = v; \
} else { \
smallOffsets.m = v; \
}
void setRightChildOffset(bool large, int offset) { setMember(rightChild, offset); }
void setLeftChildOffset(bool large, int offset) { setMember(leftChild, offset); }
int getRightChildOffset(bool large) const { return getMember(rightChild); }
int getLeftChildOffset(bool large) const { return getMember(leftChild); }
int size(bool large) const { return delta(large).size() + headerSize(large); }
#undef getMember
#undef setMember
};
static constexpr int SmallSizeLimit = std::numeric_limits<uint16_t>::max();
static constexpr int LargeTreePerNodeExtraOverhead = sizeof(Node::largeOffsets) - sizeof(Node::smallOffsets);
int nodeOffset(const Node* n) const { return (uint8_t*)n - (uint8_t*)this; }
Node* nodeAt(int offset) { return offset == 0 ? nullptr : (Node*)((uint8_t*)this + offset); }
Node* root() { return numItems == 0 ? nullptr : (Node*)(this + 1); }
int rootOffset() { return sizeof(DeltaTree2); }
int size() const { return sizeof(DeltaTree2) + nodeBytesUsed; }
int capacity() const { return size() + nodeBytesFree; }
public:
// DecodedNode represents a Node of a DeltaTree and its T::Partial.
// DecodedNodes are created on-demand, as DeltaTree Nodes are visited by a Cursor.
// DecodedNodes link together to form a binary tree with the same Node relationships as their
// corresponding DeltaTree Nodes. Additionally, DecodedNodes store links to their left and
// right ancestors which correspond to possible base Nodes on which the Node's Delta is based.
//
// DecodedNode links are not pointers, but rather indices to be looked up in the DecodeCache
// defined below. An index value of -1 is uninitialized, meaning it is not yet known whether
// the corresponding DeltaTree Node link is non-null in any version of the DeltaTree which is
// using or has used the DecodeCache.
struct DecodedNode {
DecodedNode(int nodeOffset, int leftParentIndex, int rightParentIndex)
: nodeOffset(nodeOffset), leftParentIndex(leftParentIndex), rightParentIndex(rightParentIndex),
leftChildIndex(-1), rightChildIndex(-1) {}
int nodeOffset;
int16_t leftParentIndex;
int16_t rightParentIndex;
int16_t leftChildIndex;
int16_t rightChildIndex;
Optional<Partial> partial;
Node* node(DeltaTree2* tree) const { return tree->nodeAt(nodeOffset); }
std::string toString() {
return format("DecodedNode{nodeOffset=%d leftChildIndex=%d rightChildIndex=%d leftParentIndex=%d "
"rightParentIndex=%d}",
(int)nodeOffset,
(int)leftChildIndex,
(int)rightChildIndex,
(int)leftParentIndex,
(int)rightParentIndex);
}
};
#pragma pack(pop)
// The DecodeCache is a reference counted structure that stores DecodedNodes by an integer index
// and can be shared across a series of updated copies of a DeltaTree.
//
// DecodedNodes are stored in a contiguous vector, which sometimes must be expanded, so care
// must be taken to resolve DecodedNode pointers again after the DecodeCache has new entries added.
struct DecodeCache : FastAllocated<DecodeCache>, ReferenceCounted<DecodeCache> {
DecodeCache(const T& lowerBound = T(), const T& upperBound = T())
: lowerBound(arena, lowerBound), upperBound(arena, upperBound) {
decodedNodes.reserve(10);
deltatree_printf("DecodedNode size: %d\n", sizeof(DecodedNode));
}
Arena arena;
T lowerBound;
T upperBound;
// Index 0 is always the root
std::vector<DecodedNode> decodedNodes;
DecodedNode& get(int index) { return decodedNodes[index]; }
template <class... Args>
int emplace_new(Args&&... args) {
int index = decodedNodes.size();
decodedNodes.emplace_back(args...);
return index;
}
bool empty() const { return decodedNodes.empty(); }
void clear() {
decodedNodes.clear();
Arena a;
lowerBound = T(a, lowerBound);
upperBound = T(a, upperBound);
arena = a;
}
};
// Cursor provides a way to seek into a DeltaTree and iterate over its contents
// The cursor needs a DeltaTree pointer and a DecodeCache, which can be shared
// with other DeltaTrees which were incrementally modified to produce the the
// tree that this cursor is referencing.
struct Cursor {
Cursor() : cache(nullptr), nodeIndex(-1) {}
Cursor(DecodeCache* cache, DeltaTree2* tree) : cache(cache), tree(tree), nodeIndex(-1) {}
Cursor(DecodeCache* cache, DeltaTree2* tree, int nodeIndex) : cache(cache), tree(tree), nodeIndex(nodeIndex) {}
// Copy constructor does not copy item because normally a copied cursor will be immediately moved.
Cursor(const Cursor& c) : cache(c.cache), tree(c.tree), nodeIndex(c.nodeIndex) {}
Cursor next() const {
Cursor c = *this;
c.moveNext();
return c;
}
Cursor previous() const {
Cursor c = *this;
c.movePrev();
return c;
}
int rootIndex() {
if (!cache->empty()) {
return 0;
} else if (tree->numItems != 0) {
return cache->emplace_new(tree->rootOffset(), -1, -1);
}
return -1;
}
DeltaTree2* tree;
DecodeCache* cache;
int nodeIndex;
mutable Optional<T> item;
Node* node() const { return tree->nodeAt(cache->get(nodeIndex).nodeOffset); }
std::string toString() const {
if (nodeIndex == -1) {
return format("Cursor{nodeIndex=-1}");
}
return format("Cursor{item=%s indexItem=%s nodeIndex=%d decodedNode=%s node=%s ",
item.present() ? item.get().toString().c_str() : "<absent>",
get(cache->get(nodeIndex)).toString().c_str(),
nodeIndex,
cache->get(nodeIndex).toString().c_str(),
node()->toString(tree).c_str());
}
bool valid() const { return nodeIndex != -1; }
// Get T for Node n, and provide to n's delta the base and local decode cache entries to use/modify
const T get(DecodedNode& decoded) const {
DeltaT& delta = decoded.node(tree)->delta(tree->largeNodes);
// If this node's cached partial is populated, then the delta can create T from that alone
if (decoded.partial.present()) {
return delta.apply(decoded.partial.get());
}
// Otherwise, get the base T
bool basePrev = delta.getPrefixSource();
int baseIndex = basePrev ? decoded.leftParentIndex : decoded.rightParentIndex;
// If baseOffset is -1, then base T is DecodeCache's lower or upper bound
if (baseIndex == -1) {
return delta.apply(cache->arena, basePrev ? cache->lowerBound : cache->upperBound, decoded.partial);
}
// Otherwise, get the base's decoded node
DecodedNode& baseDecoded = cache->get(baseIndex);
// If the base's partial is present, apply delta to it to get result
if (baseDecoded.partial.present()) {
return delta.apply(cache->arena, baseDecoded.partial.get(), decoded.partial);
}
// Otherwise apply delta to base T
return delta.apply(cache->arena, get(baseDecoded), decoded.partial);
}
public:
// Get the item at the cursor
// Behavior is undefined if the cursor is not valid.
// If the cursor is moved, the reference object returned will be modified to
// the cursor's new current item.
const T& get() const {
if (!item.present()) {
item = get(cache->get(nodeIndex));
}
return item.get();
}
void switchTree(DeltaTree2* newTree) { tree = newTree; }
// If the cursor is valid, return a reference to the cursor's internal T.
// Otherwise, returns a reference to the cache's upper boundary.
const T& getOrUpperBound() const { return valid() ? get() : cache->upperBound; }
bool operator==(const Cursor& rhs) const { return nodeIndex == rhs.nodeIndex; }
bool operator!=(const Cursor& rhs) const { return nodeIndex != rhs.nodeIndex; }
// The seek methods, of the form seek[Less|Greater][orEqual](...) are very similar.
// They attempt move the cursor to the [Greatest|Least] item, based on the name of the function.
// Then will not "see" erased records.
// If successful, they return true, and if not then false a while making the cursor invalid.
// These methods forward arguments to the seek() overloads, see those for argument descriptions.
template <typename... Args>
bool seekLessThan(Args... args) {
int cmp = seek(args...);
if (cmp < 0 || (cmp == 0 && nodeIndex != -1)) {
movePrev();
}
return _hideDeletedBackward();
}
template <typename... Args>
bool seekLessThanOrEqual(Args... args) {
int cmp = seek(args...);
if (cmp < 0) {
movePrev();
}
return _hideDeletedBackward();
}
template <typename... Args>
bool seekGreaterThan(Args... args) {
int cmp = seek(args...);
if (cmp > 0 || (cmp == 0 && nodeIndex != -1)) {
moveNext();
}
return _hideDeletedForward();
}
template <typename... Args>
bool seekGreaterThanOrEqual(Args... args) {
int cmp = seek(args...);
if (cmp > 0) {
moveNext();
}
return _hideDeletedForward();
}
// Get the right child index for parentIndex
int getRightChildIndex(int parentIndex) {
DecodedNode* parent = &cache->get(parentIndex);
// The cache may have a child index, but since cache covers multiple versions of a DeltaTree
// it can't be used unless the node in the tree has a child.
int childOffset = parent->node(tree)->getRightChildOffset(tree->largeNodes);
if (childOffset == 0) {
return -1;
}
// parent has this child so return the index if it is in DecodedNode
if (parent->rightChildIndex != -1) {
return parent->rightChildIndex;
}
// Create the child's DecodedNode and get its index
int childIndex = cache->emplace_new(childOffset, parentIndex, parent->rightParentIndex);
// Set the index in the parent. The cache lookup is repeated because the cache has changed.
cache->get(parentIndex).rightChildIndex = childIndex;
return childIndex;
}
// Get the left child index for parentIndex
int getLeftChildIndex(int parentIndex) {
DecodedNode* parent = &cache->get(parentIndex);
// The cache may have a child index, but since cache covers multiple versions of a DeltaTree
// it can't be used unless the node in the tree has a child.
int childOffset = parent->node(tree)->getLeftChildOffset(tree->largeNodes);
if (childOffset == 0) {
return -1;
}
// parent has this child so return the index if it is in DecodedNode
if (parent->leftChildIndex != -1) {
return parent->leftChildIndex;
}
// Create the child's DecodedNode and get its index
int childIndex = cache->emplace_new(childOffset, parent->leftParentIndex, parentIndex);
// Set the index in the parent. The cache lookup is repeated because the cache has changed.
cache->get(parentIndex).leftChildIndex = childIndex;
return childIndex;
}
// seek() moves the cursor to a node containing s or the node that would be the parent of s if s were to be
// added to the tree. If the tree was empty, the cursor will be invalid and the return value will be 0.
// Otherwise, returns the result of s.compare(item at cursor position)
// Does not skip/avoid deleted nodes.
int seek(const T& s, int skipLen = 0) {
nodeIndex = -1;
item.reset();
deltatree_printf("seek(%s) start %s\n", s.toString().c_str(), toString().c_str());
int nIndex = rootIndex();
int cmp = 0;
while (nIndex != -1) {
nodeIndex = nIndex;
item.reset();
cmp = s.compare(get(), skipLen);
deltatree_printf("seek(%s) loop cmp=%d %s\n", s.toString().c_str(), cmp, toString().c_str());
if (cmp == 0) {
break;
}
if (cmp > 0) {
nIndex = getRightChildIndex(nIndex);
} else {
nIndex = getLeftChildIndex(nIndex);
}
}
return cmp;
}
bool moveFirst() {
nodeIndex = -1;
item.reset();
int nIndex = rootIndex();
deltatree_printf("moveFirst start %s\n", toString().c_str());
while (nIndex != -1) {
nodeIndex = nIndex;
deltatree_printf("moveFirst moved %s\n", toString().c_str());
nIndex = getLeftChildIndex(nIndex);
}
return _hideDeletedForward();
}
bool moveLast() {
nodeIndex = -1;
item.reset();
int nIndex = rootIndex();
deltatree_printf("moveLast start %s\n", toString().c_str());
while (nIndex != -1) {
nodeIndex = nIndex;
deltatree_printf("moveLast moved %s\n", toString().c_str());
nIndex = getRightChildIndex(nIndex);
}
return _hideDeletedBackward();
}
// Try to move to next node, sees deleted nodes.
void _moveNext() {
deltatree_printf("_moveNext start %s\n", toString().c_str());
item.reset();
// Try to go right
int nIndex = getRightChildIndex(nodeIndex);
// If we couldn't go right, then the answer is our next ancestor
if (nIndex == -1) {
nodeIndex = cache->get(nodeIndex).rightParentIndex;
deltatree_printf("_moveNext move1 %s\n", toString().c_str());
} else {
// Go left as far as possible
do {
nodeIndex = nIndex;
deltatree_printf("_moveNext move2 %s\n", toString().c_str());
nIndex = getLeftChildIndex(nodeIndex);
} while (nIndex != -1);
}
}
// Try to move to previous node, sees deleted nodes.
void _movePrev() {
deltatree_printf("_movePrev start %s\n", toString().c_str());
item.reset();
// Try to go left
int nIndex = getLeftChildIndex(nodeIndex);
// If we couldn't go left, then the answer is our prev ancestor
if (nIndex == -1) {
nodeIndex = cache->get(nodeIndex).leftParentIndex;
deltatree_printf("_movePrev move1 %s\n", toString().c_str());
} else {
// Go right as far as possible
do {
nodeIndex = nIndex;
deltatree_printf("_movePrev move2 %s\n", toString().c_str());
nIndex = getRightChildIndex(nodeIndex);
} while (nIndex != -1);
}
}
bool moveNext() {
_moveNext();
return _hideDeletedForward();
}
bool movePrev() {
_movePrev();
return _hideDeletedBackward();
}
DeltaT& getDelta() const { return cache->get(nodeIndex).node(tree)->delta(tree->largeNodes); }
bool isErased() const { return getDelta().getDeleted(); }
// Erase current item by setting its deleted flag to true.
// Tree header is updated if a change is made.
// Cursor is then moved forward to the next non-deleted node.
void erase() {
auto& delta = getDelta();
if (!delta.getDeleted()) {
delta.setDeleted(true);
--tree->numItems;
tree->nodeBytesDeleted += (delta.size() + Node::headerSize(tree->largeNodes));
}
moveNext();
}
// Erase k by setting its deleted flag to true. Returns true only if k existed
bool erase(const T& k, int skipLen = 0) {
Cursor c(cache, tree);
if (c.seek(k, skipLen) == 0 && !c.isErased()) {
c.erase();
return true;
}
return false;
}
// Try to insert k into the DeltaTree, updating byte counts and initialHeight if they
// have changed (they won't if k already exists in the tree but was deleted).
// Returns true if successful, false if k does not fit in the space available
// or if k is already in the tree (and was not already deleted).
// Insertion on an empty tree returns false as well.
// Insert does NOT change the cursor position.
bool insert(const T& k, int skipLen = 0, int maxHeightAllowed = std::numeric_limits<int>::max()) {
deltatree_printf("insert %s\n", k.toString().c_str());
int nIndex = rootIndex();
int parentIndex = nIndex;
DecodedNode* parentDecoded;
// Result of comparing node at parentIndex
int cmp = 0;
// Height of the inserted node
int height = 0;
// Find the parent to add the node to
// This is just seek but modifies parentIndex instead of nodeIndex and tracks the insertion height
deltatree_printf(
"insert(%s) start %s\n", k.toString().c_str(), Cursor(cache, tree, parentIndex).toString().c_str());
while (nIndex != -1) {
++height;
parentIndex = nIndex;
parentDecoded = &cache->get(parentIndex);
cmp = k.compare(get(*parentDecoded), skipLen);
deltatree_printf("insert(%s) moved cmp=%d %s\n",
k.toString().c_str(),
cmp,
Cursor(cache, tree, parentIndex).toString().c_str());
if (cmp == 0) {
break;
}
if (cmp > 0) {
deltatree_printf("insert(%s) move right\n", k.toString().c_str());
nIndex = getRightChildIndex(nIndex);
} else {
deltatree_printf("insert(%s) move left\n", k.toString().c_str());
nIndex = getLeftChildIndex(nIndex);
}
}
// If the item is found, mark it erased if it isn't already
if (cmp == 0) {
DeltaT& delta = tree->nodeAt(parentDecoded->nodeOffset)->delta(tree->largeNodes);
if (delta.getDeleted()) {
delta.setDeleted(false);
++tree->numItems;
tree->nodeBytesDeleted -= (delta.size() + Node::headerSize(tree->largeNodes));
deltatree_printf("insert(%s) deleted item restored %s\n",
k.toString().c_str(),
Cursor(cache, tree, parentIndex).toString().c_str());
return true;
}
deltatree_printf("insert(%s) item exists %s\n",
k.toString().c_str(),
Cursor(cache, tree, parentIndex).toString().c_str());
return false;
}
// If the tree was empty or the max insertion height is exceeded then fail
if (parentIndex == -1 || height > maxHeightAllowed) {
return false;
}
// Find the base base to borrow from, see if the resulting delta fits into the tree
int leftBaseIndex, rightBaseIndex;
bool addingRight = cmp > 0;
if (addingRight) {
leftBaseIndex = parentIndex;
rightBaseIndex = parentDecoded->rightParentIndex;
} else {
leftBaseIndex = parentDecoded->leftParentIndex;
rightBaseIndex = parentIndex;
}
T leftBase = leftBaseIndex == -1 ? cache->lowerBound : get(cache->get(leftBaseIndex));
T rightBase = rightBaseIndex == -1 ? cache->upperBound : get(cache->get(rightBaseIndex));
int common = leftBase.getCommonPrefixLen(rightBase, skipLen);
int commonWithLeftParent = k.getCommonPrefixLen(leftBase, common);
int commonWithRightParent = k.getCommonPrefixLen(rightBase, common);
bool borrowFromLeft = commonWithLeftParent >= commonWithRightParent;
const T* base;
int commonPrefix;
if (borrowFromLeft) {
base = &leftBase;
commonPrefix = commonWithLeftParent;
} else {
base = &rightBase;
commonPrefix = commonWithRightParent;
}
int deltaSize = k.deltaSize(*base, commonPrefix, false);
int nodeSpace = deltaSize + Node::headerSize(tree->largeNodes);
if (nodeSpace > tree->nodeBytesFree) {
return false;
}
int childOffset = tree->size();
Node* childNode = tree->nodeAt(childOffset);
childNode->setLeftChildOffset(tree->largeNodes, 0);
childNode->setRightChildOffset(tree->largeNodes, 0);
// Create the decoded node and link it to the parent
// Link the parent's decodednode to the child's decodednode
// Link the parent node in the tree to the new child node
// true if node is being added to right child
int childIndex = cache->emplace_new(childOffset, leftBaseIndex, rightBaseIndex);
// Get a new parentDecoded pointer as the cache may have changed allocations
parentDecoded = &cache->get(parentIndex);
if (addingRight) {
// Adding child to right of parent
parentDecoded->rightChildIndex = childIndex;
parentDecoded->node(tree)->setRightChildOffset(tree->largeNodes, childOffset);
} else {
// Adding child to left of parent
parentDecoded->leftChildIndex = childIndex;
parentDecoded->node(tree)->setLeftChildOffset(tree->largeNodes, childOffset);
}
// Give k opportunity to populate its cache partial record
k.updateCache(cache->get(childIndex).partial, cache->arena);
DeltaT& childDelta = childNode->delta(tree->largeNodes);
deltatree_printf("insert(%s) writing delta from %s\n", k.toString().c_str(), base->toString().c_str());
int written = k.writeDelta(childDelta, *base, commonPrefix);
ASSERT(deltaSize == written);
childDelta.setPrefixSource(borrowFromLeft);
tree->nodeBytesUsed += nodeSpace;
tree->nodeBytesFree -= nodeSpace;
++tree->numItems;
// Update max height of the tree if necessary
if (height > tree->maxHeight) {
tree->maxHeight = height;
}
deltatree_printf("insert(%s) done parent=%s\n",
k.toString().c_str(),
Cursor(cache, tree, parentIndex).toString().c_str());
deltatree_printf("insert(%s) done child=%s\n",
k.toString().c_str(),
Cursor(cache, tree, childIndex).toString().c_str());
return true;
}
private:
bool _hideDeletedBackward() {
while (nodeIndex != -1 && getDelta().getDeleted()) {
_movePrev();
}
return nodeIndex != -1;
}
bool _hideDeletedForward() {
while (nodeIndex != -1 && getDelta().getDeleted()) {
_moveNext();
}
return nodeIndex != -1;
}
};
// Returns number of bytes written
int build(int spaceAvailable, const T* begin, const T* end, const T* lowerBound, const T* upperBound) {
largeNodes = spaceAvailable > SmallSizeLimit;
int count = end - begin;
numItems = count;
nodeBytesDeleted = 0;
initialHeight = (uint8_t)log2(count) + 1;
maxHeight = 0;
// The boundary leading to the new page acts as the last time we branched right
if (count > 0) {
nodeBytesUsed = buildSubtree(
*root(), begin, end, lowerBound, upperBound, lowerBound->getCommonPrefixLen(*upperBound, 0));
} else {
nodeBytesUsed = 0;
}
nodeBytesFree = spaceAvailable - size();
return size();
}
private:
int buildSubtree(Node& node,
const T* begin,
const T* end,
const T* leftParent,
const T* rightParent,
int subtreeCommon) {
int count = end - begin;
// Find key to be stored in root
int mid = perfectSubtreeSplitPointCached(count);
const T& item = begin[mid];
int commonWithPrev = item.getCommonPrefixLen(*leftParent, subtreeCommon);
int commonWithNext = item.getCommonPrefixLen(*rightParent, subtreeCommon);
bool prefixSourcePrev;
int commonPrefix;
const T* base;
if (commonWithPrev >= commonWithNext) {
prefixSourcePrev = true;
commonPrefix = commonWithPrev;
base = leftParent;
} else {
prefixSourcePrev = false;
commonPrefix = commonWithNext;
base = rightParent;
}
int deltaSize = item.writeDelta(node.delta(largeNodes), *base, commonPrefix);
node.delta(largeNodes).setPrefixSource(prefixSourcePrev);
// Continue writing after the serialized Delta.
uint8_t* wptr = (uint8_t*)&node.delta(largeNodes) + deltaSize;
int leftChildOffset;
// Serialize left subtree
if (count > 1) {
leftChildOffset = wptr - (uint8_t*)this;
deltatree_printf("%p: offset=%d count=%d serialize left subtree leftChildOffset=%d\n",
this,
nodeOffset(&node),
count,
leftChildOffset);
wptr += buildSubtree(*(Node*)wptr, begin, begin + mid, leftParent, &item, commonWithPrev);
} else {
leftChildOffset = 0;
}
int rightChildOffset;
// Serialize right subtree
if (count > 2) {
rightChildOffset = wptr - (uint8_t*)this;
deltatree_printf("%p: offset=%d count=%d serialize right subtree rightChildOffset=%d\n",
this,
nodeOffset(&node),
count,
rightChildOffset);
wptr += buildSubtree(*(Node*)wptr, begin + mid + 1, end, &item, rightParent, commonWithNext);
} else {
rightChildOffset = 0;
}
node.setLeftChildOffset(largeNodes, leftChildOffset);
node.setRightChildOffset(largeNodes, rightChildOffset);
deltatree_printf("%p: Serialized %s as %s\n", this, item.toString().c_str(), node.toString(this).c_str());
return wptr - (uint8_t*)&node;
}
};

View File

@ -189,8 +189,8 @@ struct GrvTransactionRateInfo {
void disable() {
disabled = true;
rate = 0;
smoothRate.reset(0);
// Use smoothRate.setTotal(0) instead of setting rate to 0 so txns will not be throttled immediately.
smoothRate.setTotal(0);
}
void setRate(double rate) {
@ -389,13 +389,15 @@ ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>>
TaskPriority::ProxyGRVTimer));
}
++stats->txnRequestIn;
stats->txnStartIn += req.transactionCount;
if (req.priority >= TransactionPriority::IMMEDIATE) {
++stats->txnRequestIn;
stats->txnStartIn += req.transactionCount;
stats->txnSystemPriorityStartIn += req.transactionCount;
systemQueue->push_back(req);
systemQueue->span.addParent(req.spanContext);
} else if (req.priority >= TransactionPriority::DEFAULT) {
++stats->txnRequestIn;
stats->txnStartIn += req.transactionCount;
stats->txnDefaultPriorityStartIn += req.transactionCount;
defaultQueue->push_back(req);
defaultQueue->span.addParent(req.spanContext);
@ -405,12 +407,13 @@ ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>>
if (batchRateInfo->rate <= (1.0 / proxiesCount)) {
req.reply.sendError(batch_transaction_throttled());
stats->txnThrottled += req.transactionCount;
continue;
} else {
++stats->txnRequestIn;
stats->txnStartIn += req.transactionCount;
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
batchQueue->span.addParent(req.spanContext);
}
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
batchQueue->span.addParent(req.spanContext);
}
}
}

View File

@ -37,6 +37,9 @@ typedef uint32_t LogicalPageID;
typedef uint32_t PhysicalPageID;
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
typedef uint32_t QueueID;
#define invalidQueueID std::numeric_limits<QueueID>::max()
// Represents a block of memory in a 4096-byte aligned location held by an Arena.
class ArenaPage : public ReferenceCounted<ArenaPage>, public FastAllocated<ArenaPage> {
public:
@ -125,10 +128,7 @@ public:
class IPagerSnapshot {
public:
virtual Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID,
bool cacheable,
bool nohit,
bool* fromCache = nullptr) = 0;
virtual Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0;
virtual bool tryEvictPage(LogicalPageID id) = 0;
virtual Version getVersion() const = 0;
@ -150,11 +150,17 @@ public:
// For a given pager instance, separate calls to this function must return the same value.
// Only valid to call after recovery is complete.
virtual int getUsablePageSize() const = 0;
virtual int getPhysicalPageSize() const = 0;
virtual int getLogicalPageSize() const = 0;
virtual int getPagesPerExtent() const = 0;
// Allocate a new page ID for a subsequent write. The page will be considered in-use after the next commit
// regardless of whether or not it was written to.
virtual Future<LogicalPageID> newPageID() = 0;
virtual Future<LogicalPageID> newExtentPageID(QueueID queueID) = 0;
virtual QueueID newLastQueueID() = 0;
// Replace the contents of a page with new data across *all* versions.
// Existing holders of a page reference for pageID, read from any version,
// may see the effects of this write.
@ -169,6 +175,8 @@ public:
// Free pageID to be used again after the commit that moves oldestVersion past v
virtual void freePage(LogicalPageID pageID, Version v) = 0;
virtual void freeExtent(LogicalPageID pageID) = 0;
// If id is remapped, delete the original as of version v and return the page it was remapped to. The caller
// is then responsible for referencing and deleting the returned page ID.
virtual LogicalPageID detachRemappedPage(LogicalPageID id, Version v) = 0;
@ -180,10 +188,16 @@ public:
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
// considered likely to be needed soon.
virtual Future<Reference<ArenaPage>> readPage(LogicalPageID pageID,
bool cacheable = true,
bool noHit = false,
bool* fromCache = nullptr) = 0;
virtual Future<Reference<ArenaPage>> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
virtual Future<Reference<ArenaPage>> readExtent(LogicalPageID pageID) = 0;
virtual void releaseExtentReadLock() = 0;
// Temporary methods for testing
virtual Future<Standalone<VectorRef<LogicalPageID>>> getUsedExtents(QueueID queueID) = 0;
virtual void pushExtentUsedList(QueueID queueID, LogicalPageID extID) = 0;
virtual void extentCacheClear() = 0;
virtual int64_t getPageCacheCount() = 0;
virtual int64_t getExtentCacheCount() = 0;
// Get a snapshot of the metakey and all pages as of the version v which must be >= getOldestVersion()
// Note that snapshots at any version may still see the results of updatePage() calls.
@ -204,6 +218,8 @@ public:
virtual StorageBytes getStorageBytes() const = 0;
virtual int64_t getPageCount() = 0;
// Count of pages in use by the pager client (including retained old page versions)
virtual Future<int64_t> getUserPageCount() = 0;

View File

@ -1,80 +0,0 @@
/*
* IVersionedStore.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_IVERSIONEDSTORE_H
#define FDBSERVER_IVERSIONEDSTORE_H
#pragma once
#include "fdbserver/IKeyValueStore.h"
#include "flow/flow.h"
#include "fdbclient/FDBTypes.h"
class IStoreCursor {
public:
virtual Future<Void> findEqual(KeyRef key) = 0;
virtual Future<Void> findFirstEqualOrGreater(KeyRef key, int prefetchBytes = 0) = 0;
virtual Future<Void> findLastLessOrEqual(KeyRef key, int prefetchBytes = 0) = 0;
virtual Future<Void> next() = 0;
virtual Future<Void> prev() = 0;
virtual bool isValid() = 0;
virtual KeyRef getKey() = 0;
virtual ValueRef getValue() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
class IVersionedStore : public IClosable {
public:
virtual KeyValueStoreType getType() const = 0;
virtual bool supportsMutation(int op) const = 0; // If this returns true, then mutate(op, ...) may be called
virtual StorageBytes getStorageBytes() const = 0;
// Writes are provided in an ordered stream.
// A write is considered part of (a change leading to) the version determined by the previous call to
// setWriteVersion() A write shall not become durable until the following call to commit() begins, and shall be
// durable once the following call to commit() returns
virtual void set(KeyValueRef keyValue) = 0;
virtual void clear(KeyRangeRef range) = 0;
virtual void mutate(int op, StringRef param1, StringRef param2) = 0;
virtual void setWriteVersion(Version) = 0; // The write version must be nondecreasing
virtual void setOldestVersion(Version v) = 0; // Set oldest readable version to be used in next commit
virtual Version getOldestVersion() const = 0; // Get oldest readable version
virtual Future<Void> commit() = 0;
virtual Future<Void> init() = 0;
virtual Version getLatestVersion() const = 0;
// readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never
// previously passed
// to forgetVersion. The returned results when violating this precondition are unspecified; the store is not
// required to be able to detect violations.
// The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes
// done with write versions less
// than or equal to the given version.
// If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes
// at the same
// write version, OR it may represent a snapshot as of the call to readAtVersion().
virtual Reference<IStoreCursor> readAtVersion(Version) = 0;
};
#endif

View File

@ -455,7 +455,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
StorageBytes getStorageBytes() const override {
uint64_t live = 0;
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kEstimateLiveDataSize, &live));
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kLiveSstFilesSize, &live));
int64_t free;
int64_t total;

View File

@ -97,7 +97,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_SLOW_AMOUNT, 2 );
init( PEEK_STATS_SLOW_RATIO, 0.5 );
init( PUSH_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PUSH_RESET_INTERVAL = 20.0;
// Buggified value must be larger than the amount of simulated time taken by snapshots, to prevent repeatedly failing
// snapshots due to closed commit proxy connections
init( PUSH_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PUSH_RESET_INTERVAL = 40.0;
init( PUSH_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PUSH_MAX_LATENCY = 0.0;
init( PUSH_STATS_INTERVAL, 10.0 );
init( PUSH_STATS_SLOW_AMOUNT, 2 );
@ -266,10 +268,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( DD_REMOVE_STORE_ENGINE_DELAY, 60.0 ); if( randomize && BUGGIFY ) DD_REMOVE_STORE_ENGINE_DELAY = deterministicRandom()->random01() * 60.0;
// Redwood Storage Engine
init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT, 30 );
init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN, 0 );
// KeyValueStore SQLITE
init( CLEAR_BUFFER_SIZE, 20000 );
init( READ_VALUE_TIME_ESTIMATE, .00005 );
@ -710,8 +708,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_RATE_UPDATE_SECONDS, 1.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_RATE_UPDATE_SECONDS = deterministicRandom()->random01() < 0.5 ? 0.1 : 2;}
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
init( REDWOOD_DEFAULT_EXTENT_SIZE, 32 * 1024 * 1024 );
init( REDWOOD_DEFAULT_EXTENT_READ_SIZE, 1024 * 1024 );
init( REDWOOD_EXTENT_CONCURRENT_READS, 4 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
init( REDWOOD_COMMIT_CONCURRENT_READS, 64 );
init( REDWOOD_PAGE_REBUILD_MAX_SLACK, 0.33 );
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
init( REDWOOD_LAZY_CLEAR_MIN_PAGES, 0 );

View File

@ -222,10 +222,6 @@ public:
double DD_FAILURE_TIME;
double DD_ZERO_HEALTHY_TEAM_DELAY;
// Redwood Storage Engine
int PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT;
int PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN;
// KeyValueStore SQLITE
int CLEAR_BUFFER_SIZE;
double READ_VALUE_TIME_ESTIMATE;
@ -644,8 +640,10 @@ public:
double FASTRESTORE_RATE_UPDATE_SECONDS; // how long to update appliers target write rate
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
int REDWOOD_DEFAULT_EXTENT_SIZE; // Extent size for new Redwood files
int REDWOOD_DEFAULT_EXTENT_READ_SIZE; // Extent read size for Redwood files
int REDWOOD_EXTENT_CONCURRENT_READS; // Max number of simultaneous extent disk reads in progress.
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
int REDWOOD_COMMIT_CONCURRENT_READS; // Max number of concurrent reads done to support commit operations
double REDWOOD_PAGE_REBUILD_MAX_SLACK; // When rebuilding pages, max slack to allow in page
int REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES; // Number of pages to try to pop from the lazy delete queue and process at
// once

View File

@ -1253,7 +1253,14 @@ ACTOR Future<Void> removeStorageServer(Database cx,
TraceEvent(SevError, "TSSIdentityMappingEnabled");
tssMapDB.erase(tr, serverID);
} else if (tssPairID.present()) {
// remove the TSS from the mapping
tssMapDB.erase(tr, tssPairID.get());
// remove the TSS from quarantine, if it was in quarantine
Key tssQuarantineKey = tssQuarantineKeyFor(serverID);
Optional<Value> tssInQuarantine = wait(tr->get(tssQuarantineKey));
if (tssInQuarantine.present()) {
tr->clear(tssQuarantineKeyFor(serverID));
}
}
retry = true;

View File

@ -2813,7 +2813,10 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
removed.push_back(errorOr(logData->removed));
logsByVersion.emplace_back(ver, id1);
TraceEvent("TLogPersistentStateRestore", self->dbgid).detail("LogId", logData->logId).detail("Ver", ver);
TraceEvent("TLogPersistentStateRestore", self->dbgid)
.detail("LogId", logData->logId)
.detail("Ver", ver)
.detail("RecoveryCount", logData->recoveryCount);
// Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion
// might be lost, but that is fine because we will get the corresponding data back, too.
tagKeys = prefixRange(rawId.withPrefix(persistTagPoppedKeys.begin));
@ -3050,7 +3053,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->popOrder.push_back(recruited.id());
self->spillOrder.push_back(recruited.id());
TraceEvent("TLogStart", logData->logId);
TraceEvent("TLogStart", logData->logId).detail("RecoveryCount", logData->recoveryCount);
state Future<Void> updater;
state bool pulledRecoveryVersions = false;

View File

@ -170,6 +170,9 @@ class TestConfig {
if (attrib == "maxTLogVersion") {
sscanf(value.c_str(), "%d", &maxTLogVersion);
}
if (attrib == "disableTss") {
sscanf(value.c_str(), "%d", &disableTss);
}
if (attrib == "restartInfoLocation") {
isFirstTestInRestart = true;
}
@ -186,6 +189,8 @@ public:
bool startIncompatibleProcess = false;
int logAntiQuorum = -1;
bool isFirstTestInRestart = false;
// 7.0 cannot be downgraded to 6.3 after enabling TSS, so disable TSS for 6.3 downgrade tests
bool disableTss = false;
// Storage Engine Types: Verify match with SimulationConfig::generateNormalConfig
// 0 = "ssd"
// 1 = "memory"
@ -234,6 +239,7 @@ public:
.add("logAntiQuorum", &logAntiQuorum)
.add("storageEngineExcludeTypes", &storageEngineExcludeTypes)
.add("maxTLogVersion", &maxTLogVersion)
.add("disableTss", &disableTss)
.add("simpleConfig", &simpleConfig)
.add("generateFearless", &generateFearless)
.add("datacenters", &datacenters)
@ -1191,7 +1197,7 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
}
int tssCount = 0;
if (!testConfig.simpleConfig && deterministicRandom()->random01() < 0.25) {
if (!testConfig.simpleConfig && !testConfig.disableTss && deterministicRandom()->random01() < 0.25) {
// 1 or 2 tss
tssCount = deterministicRandom()->randomInt(1, 3);
}

View File

@ -2884,7 +2884,10 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
removed.push_back(errorOr(logData->removed));
logsByVersion.emplace_back(ver, id1);
TraceEvent("TLogPersistentStateRestore", self->dbgid).detail("LogId", logData->logId).detail("Ver", ver);
TraceEvent("TLogPersistentStateRestore", self->dbgid)
.detail("LogId", logData->logId)
.detail("Ver", ver)
.detail("RecoveryCount", logData->recoveryCount);
// Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion
// might be lost, but that is fine because we will get the corresponding data back, too.
tagKeys = prefixRange(rawId.withPrefix(persistTagPoppedKeys.begin));
@ -3129,7 +3132,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->popOrder.push_back(recruited.id());
self->spillOrder.push_back(recruited.id());
TraceEvent("TLogStart", logData->logId);
TraceEvent("TLogStart", logData->logId).detail("RecoveryCount", logData->recoveryCount);
state Future<Void> updater;
state bool pulledRecoveryVersions = false;

File diff suppressed because it is too large Load Diff

View File

@ -95,13 +95,25 @@ struct AddingShard : NonCopyable {
Promise<Void> fetchComplete;
Promise<Void> readWrite;
std::deque<Standalone<VerUpdateRef>>
updates; // during the Fetching phase, mutations with key in keys and version>=(fetchClient's) fetchVersion;
// During the Fetching phase, it saves newer mutations whose version is greater or equal to fetchClient's
// fetchVersion, while the shard is still busy catching up with fetchClient. It applies these updates after fetching
// completes.
std::deque<Standalone<VerUpdateRef>> updates;
struct StorageServer* server;
Version transferredVersion;
enum Phase { WaitPrevious, Fetching, Waiting };
// To learn more details of the phase transitions, see function fetchKeys(). The phases below are sorted in
// chronological order and do not go back.
enum Phase {
WaitPrevious,
// During Fetching phase, it fetches data before fetchVersion and write it to storage, then let updater know it
// is ready to update the deferred updates` (see the comment of member variable `updates` above).
Fetching,
// During Waiting phase, it sends updater the deferred updates, and wait until they are durable.
Waiting
// The shard's state is changed from adding to readWrite then.
};
Phase phase;
@ -128,6 +140,7 @@ class ShardInfo : public ReferenceCounted<ShardInfo>, NonCopyable {
: adding(std::move(adding)), readWrite(readWrite), keys(keys) {}
public:
// A shard has 3 mutual exclusive states: adding, readWrite and notAssigned.
std::unique_ptr<AddingShard> adding;
struct StorageServer* readWrite;
KeyRange keys;
@ -167,6 +180,7 @@ struct StorageServerDisk {
void makeNewStorageServerDurable();
bool makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft);
void makeVersionDurable(Version version);
void makeTssQuarantineDurable();
Future<bool> restoreDurableState();
void changeLogProtocol(Version version, ProtocolVersion protocol);
@ -284,6 +298,7 @@ const int VERSION_OVERHEAD =
sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); // versioned map [ x2 for
// createNewVersion(version+1) ], 64b
// overhead for map
// For both the mutation log and the versioned map.
static int mvccStorageBytes(MutationRef const& m) {
return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 +
(MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2;
@ -526,6 +541,22 @@ public:
}
}
// If a TSS is "in quarantine", it means it has incorrect data. It is effectively in a "zombie" state where it
// rejects all read requests and ignores all non-private mutations and data movements, but otherwise is still part
// of the cluster. The purpose of this state is to "freeze" the TSS state after a mismatch so a human operator can
// investigate, but preventing a new storage process from replacing the TSS on the worker. It will still get removed
// from the cluster if it falls behind on the mutation stream, or if its tss pair gets removed and its tag is no
// longer valid.
bool isTSSInQuarantine() { return tssPairID.present() && tssInQuarantine; }
void startTssQuarantine() {
if (!tssInQuarantine) {
// persist quarantine so it's still quarantined if rebooted
storage.makeTssQuarantineDurable();
}
tssInQuarantine = true;
}
StorageServerDisk storage;
KeyRangeMap<Reference<ShardInfo>> shards;
@ -571,6 +602,8 @@ public:
Optional<UID> tssPairID; // if this server is a tss, this is the id of its (ss) pair
Optional<UID> ssPairID; // if this server is an ss, this is the id of its (tss) pair
Optional<double> tssFaultInjectTime;
bool tssInQuarantine;
Key sk;
Reference<AsyncVar<ServerDBInfo>> db;
Database cx;
@ -690,9 +723,24 @@ public:
CounterCollection cc;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, lowPriorityQueries,
rowsQueried, bytesQueried, watchQueries, emptyQueries;
Counter bytesInput, bytesDurable, bytesFetched,
mutationBytes; // Like bytesInput but without MVCC accounting
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
Counter bytesInput;
// Bytes of the mutations that have been removed from memory because they durable. The counting is same as
// bytesInput, instead of the actual bytes taken in the storages, so that (bytesInput - bytesDurable) can
// reflect the current memory footprint of MVCC.
Counter bytesDurable;
// Bytes fetched by fetchKeys() for data movements. The size is counted as a collection of KeyValueRef.
Counter bytesFetched;
// Like bytesInput but without MVCC accounting. The size is counted as how much it takes when serialized. It
// is basically the size of both parameters of the mutation and a 12 bytes overhead that keeps mutation type
// and the lengths of both parameters.
Counter mutationBytes;
Counter sampledBytesCleared;
// The number of key-value pairs fetched by fetchKeys()
Counter kvFetched;
Counter mutations, setMutations, clearRangeMutations, atomicMutations;
Counter updateBatches, updateVersions;
Counter loops;
@ -712,7 +760,7 @@ public:
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc),
mutations("Mutations", cc), setMutations("SetMutations", cc),
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc),
@ -758,7 +806,7 @@ public:
primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), shuttingDown(false),
debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), logProtocol(0),
counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()), tssInQuarantine(false),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), behind(false), versionBehind(false),
byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), lastUpdate(now()),
poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0) {
@ -885,10 +933,10 @@ public:
template <class Request>
bool shouldRead(const Request& request) {
auto rate = currentRate();
if (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD &&
deterministicRandom()->random01() >
std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE,
rate / SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD)) {
if (isTSSInQuarantine() || (rate < SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD &&
deterministicRandom()->random01() >
std::max(SERVER_KNOBS->STORAGE_DURABILITY_LAG_MIN_RATE,
rate / SERVER_KNOBS->STORAGE_DURABILITY_LAG_REJECT_THRESHOLD))) {
sendErrorWithPenalty(request.reply, server_overloaded(), getPenalty());
++counters.readsRejected;
return false;
@ -2209,6 +2257,8 @@ Optional<MutationRef> clipMutation(MutationRef const& m, KeyRangeRef range) {
return Optional<MutationRef>();
}
// Return true if the mutation need to be applied, otherwise (it's a CompareAndClear mutation and failed the comparison)
// false.
bool expandMutation(MutationRef& m,
StorageServer::VersionedData const& data,
UpdateEagerReadInfo* eager,
@ -2312,6 +2362,8 @@ void applyMutation(StorageServer* self, MutationRef const& m, Arena& arena, Stor
self->metrics.notify(m.param1, metrics);
if (m.type == MutationRef::SetValue) {
// VersionedMap (data) is bookkeeping all empty ranges. If the key to be set is new, it is supposed to be in a
// range what was empty. Break the empty range into halves.
auto prev = data.atLatest().lastLessOrEqual(m.param1);
if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) {
ASSERT(prev.key() <= m.param1);
@ -2542,19 +2594,28 @@ class FetchKeysMetricReporter {
int fetchedBytes;
StorageServer::FetchKeysHistograms& histograms;
StorageServer::CurrentRunningFetchKeys& currentRunning;
Counter& bytesFetchedCounter;
Counter& kvFetchedCounter;
public:
FetchKeysMetricReporter(const UID& uid_,
const double startTime_,
const KeyRange& keyRange,
StorageServer::FetchKeysHistograms& histograms_,
StorageServer::CurrentRunningFetchKeys& currentRunning_)
: uid(uid_), startTime(startTime_), fetchedBytes(0), histograms(histograms_), currentRunning(currentRunning_) {
StorageServer::CurrentRunningFetchKeys& currentRunning_,
Counter& bytesFetchedCounter,
Counter& kvFetchedCounter)
: uid(uid_), startTime(startTime_), fetchedBytes(0), histograms(histograms_), currentRunning(currentRunning_),
bytesFetchedCounter(bytesFetchedCounter), kvFetchedCounter(kvFetchedCounter) {
currentRunning.recordStart(uid, keyRange);
}
void addFetchedBytes(const int bytes) { fetchedBytes += bytes; }
void addFetchedBytes(const int bytes, const int kvCount) {
fetchedBytes += bytes;
bytesFetchedCounter += bytes;
kvFetchedCounter += kvCount;
}
~FetchKeysMetricReporter() {
double latency = now() - startTime;
@ -2580,8 +2641,13 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state Future<Void> warningLogger = logFetchKeysWarning(shard);
state const double startTime = now();
state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES;
state FetchKeysMetricReporter metricReporter(
fetchKeysID, startTime, keys, data->fetchKeysHistograms, data->currentRunningFetchKeys);
state FetchKeysMetricReporter metricReporter(fetchKeysID,
startTime,
keys,
data->fetchKeysHistograms,
data->currentRunningFetchKeys,
data->counters.bytesFetched,
data->counters.kvFetched);
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
// This allows adding->start() to be called inline with CSK.
@ -2673,8 +2739,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
for (auto k = this_block.begin(); k != this_block.end(); ++k)
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
metricReporter.addFetchedBytes(expectedSize);
data->counters.bytesFetched += expectedSize;
metricReporter.addFetchedBytes(expectedSize, this_block.size());
if (fetchBlockBytes > expectedSize) {
holdingFKPL.release(fetchBlockBytes - expectedSize);
}
@ -2683,7 +2749,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// wait( data->fetchKeysStorageWriteLock.take() );
// state FlowLock::Releaser holdingFKSWL( data->fetchKeysStorageWriteLock );
// Write this_block to storage
// Write this_block directly to storage, bypassing update() which write to MVCC in memory.
state KeyValueRef* kvItr = this_block.begin();
for (; kvItr != this_block.end(); ++kvItr) {
data->storage.writeKeyValue(*kvItr);
@ -2805,6 +2871,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
Promise<FetchInjectionInfo*> p;
data->readyFetchKeys.push_back(p);
// After we add to the promise readyFetchKeys, update() would provide a pointer to FetchInjectionInfo that we
// can put mutation in.
FetchInjectionInfo* batch = wait(p.getFuture());
TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID);
@ -2859,6 +2927,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
keys,
true); // keys will be available when getLatestVersion()==transferredVersion is durable
// Note that since it receives a pointer to FetchInjectionInfo, the thread does not leave this actor until this
// point.
// Wait for the transferredVersion (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(shard->transferredVersion));
@ -2922,6 +2993,9 @@ void AddingShard::addMutation(Version version, MutationRef const& mutation) {
if (phase == WaitPrevious) {
// Updates can be discarded
} else if (phase == Fetching) {
// Save incoming mutations (See the comments of member variable `updates`).
// Create a new VerUpdateRef in updates queue if it is a new version.
if (!updates.size() || version > updates.end()[-1].version) {
VerUpdateRef v;
v.version = version;
@ -2930,6 +3004,7 @@ void AddingShard::addMutation(Version version, MutationRef const& mutation) {
} else {
ASSERT(version == updates.end()[-1].version);
}
// Add the mutation to the version.
updates.back().mutations.push_back_deep(updates.back().arena(), mutation);
} else if (phase == Waiting) {
server->addMutation(version, mutation, keys, server->updateEagerReads);
@ -3132,6 +3207,7 @@ static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("Foundation
LiteralStringRef("FoundationDB/StorageServer/1/5"));
static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID");
static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID");
static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ");
// (Potentially) change with the durable version or when fetchKeys completes
static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version");
@ -3208,12 +3284,16 @@ private:
ASSERT(m.type == MutationRef::SetValue && m.param1.startsWith(data->sk));
KeyRangeRef keys(startKey.removePrefix(data->sk), m.param1.removePrefix(data->sk));
// add changes in shard assignment to the mutation log
setAssignedStatus(data, keys, nowAssigned);
// ignore data movements for tss in quarantine
if (!data->isTSSInQuarantine()) {
// add changes in shard assignment to the mutation log
setAssignedStatus(data, keys, nowAssigned);
// The changes for version have already been received (and are being processed now). We need to fetch
// the data for change.version-1 (changes from versions < change.version)
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE);
}
// The changes for version have already been received (and are being processed now). We need
// to fetch the data for change.version-1 (changes from versions < change.version)
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE);
processedStartKey = false;
} else if (m.type == MutationRef::SetValue && m.param1.startsWith(data->sk)) {
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
@ -3251,7 +3331,8 @@ private:
bool matchesThisServer = serverTagKey == data->thisServerID;
bool matchesTssPair = data->isTss() ? serverTagKey == data->tssPairID.get() : false;
if ((m.type == MutationRef::SetValue && !data->isTss() && !matchesThisServer) ||
(m.type == MutationRef::ClearRange && (matchesThisServer || (data->isTss() && matchesTssPair)))) {
(m.type == MutationRef::ClearRange &&
((!data->isTSSInQuarantine() && matchesThisServer) || (data->isTss() && matchesTssPair)))) {
throw worker_removed();
}
if (!data->isTss() && m.type == MutationRef::ClearRange && data->ssPairID.present() &&
@ -3267,12 +3348,32 @@ private:
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2));
} else if (m.type == MutationRef::SetValue && m.param1.substr(1).startsWith(tssMappingKeys.begin)) {
} else if (m.param1.substr(1).startsWith(tssMappingKeys.begin) &&
(m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange)) {
if (!data->isTss()) {
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.substr(1).removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
ASSERT(ssId == data->thisServerID);
data->setSSWithTssPair(tssId);
if (m.type == MutationRef::SetValue) {
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
data->setSSWithTssPair(tssId);
} else {
data->clearSSWithTssPair();
}
}
} else if (m.param1.substr(1).startsWith(tssQuarantineKeys.begin) &&
(m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange)) {
if (data->isTss()) {
UID ssId = decodeTssQuarantineKey(m.param1.substr(1));
ASSERT(ssId == data->thisServerID);
if (m.type == MutationRef::SetValue) {
TEST(true); // Putting TSS in quarantine
TraceEvent(SevWarn, "TSSQuarantineStart", data->thisServerID);
data->startTssQuarantine();
} else {
TraceEvent(SevWarn, "TSSQuarantineStop", data->thisServerID);
// dipose of this TSS
throw worker_removed();
}
}
} else {
ASSERT(false); // Unknown private mutation
@ -3434,6 +3535,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this actor
// until it was completed.
}
for (auto& c : fii.changes)
@ -3472,6 +3575,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
for (; mutationNum < pUpdate->mutations.size(); mutationNum++) {
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
// data->counters.mutationBytes or data->counters.mutations should not be updated because they should
// have counted when the mutations arrive from cursor initially.
injectedChanges = true;
if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
mutationBytes = 0;
@ -3519,14 +3624,22 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
Span span("SS:update"_loc, { spanContext });
span.addTag("key"_sr, msg.param1);
// Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in
// quarantine.
if (g_network->isSimulated() && data->isTss() &&
g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations &&
data->tssFaultInjectTime.present() && data->tssFaultInjectTime.get() < now() &&
(msg.type == MutationRef::SetValue || msg.type == MutationRef::ClearRange) && msg.param1.size() &&
msg.param1[0] != 0xff && deterministicRandom()->random01() < 0.05) {
(msg.type == MutationRef::SetValue || msg.type == MutationRef::ClearRange) &&
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff) &&
deterministicRandom()->random01() < 0.05) {
TraceEvent(SevWarnAlways, "TSSInjectDropMutation", data->thisServerID)
.detail("Mutation", msg.toString())
.detail("Version", cloneCursor2->version().toString());
} else if (data->isTSSInQuarantine() &&
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff)) {
TraceEvent("TSSQuarantineDropMutation", data->thisServerID)
.suppressFor(10.0)
.detail("Version", cloneCursor2->version().toString());
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID);
if (ver == 1) {
@ -3885,6 +3998,11 @@ void StorageServerDisk::makeVersionDurable(Version version) {
// .detail("ToVersion", version);
}
// Update data->storage to persist tss quarantine state
void StorageServerDisk::makeTssQuarantineDurable() {
storage->set(KeyValueRef(persistTssQuarantine, LiteralStringRef("1")));
}
void StorageServerDisk::changeLogProtocol(Version version, ProtocolVersion protocol) {
data->addMutationToMutationLogOrStorage(
version,
@ -4001,6 +4119,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fID = storage->readValue(persistID);
state Future<Optional<Value>> ftssPairID = storage->readValue(persistTssPairID);
state Future<Optional<Value>> fTssQuarantine = storage->readValue(persistTssQuarantine);
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
@ -4013,7 +4132,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
restoreByteSample(data, storage, byteSampleSampleRecovered, startByteSampleRestore.getFuture());
TraceEvent("ReadingDurableState", data->thisServerID);
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable }));
wait(byteSampleSampleRecovered.getFuture());
TraceEvent("RestoringDurableState", data->thisServerID);
@ -4037,6 +4156,14 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
data->setTssPair(BinaryReader::fromStringRef<UID>(ftssPairID.get().get(), Unversioned()));
}
// It's a bit sketchy to rely on an untrusted storage engine to persist its quarantine state when the quarantine
// state means the storage engine already had a durability or correctness error, but it should get re-quarantined
// very quickly because of a mismatch if it starts trying to do things again
if (fTssQuarantine.get().present()) {
TEST(true); // TSS restarted while quarantined
data->tssInQuarantine = true;
}
data->sk = serverKeysPrefixFor((data->tssPairID.present()) ? data->tssPairID.get() : data->thisServerID)
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
@ -4058,6 +4185,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
? allKeys.end
: available[availableLoc + 1].key.removePrefix(persistShardAvailableKeys.begin));
ASSERT(!keys.empty());
bool nowAvailable = available[availableLoc].value != LiteralStringRef("0");
/*if(nowAvailable)
TraceEvent("AvailableShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/
@ -4331,6 +4459,7 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
wait(self->byteSampleRecovery);
// Logs all counters in `counters.cc` and reset the interval.
self->actors.add(traceCounters("StorageMetrics",
self->thisServerID,
SERVER_KNOBS->STORAGE_LOGGING_DELAY,
@ -4735,6 +4864,7 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
if (e.code() == error_code_please_reboot) {
// do nothing.
} else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
// SOMEDAY: could close instead of dispose if tss in quarantine gets removed so it could still be investigated?
persistentData->dispose();
} else {
persistentData->close();
@ -4965,7 +5095,9 @@ ACTOR Future<Void> replaceTSSInterface(StorageServer* self, StorageServerInterfa
tr->set(serverListKeyFor(ssi.id()), serverListValue(ssi));
// add itself back to tss mapping
tssMapDB.set(tr, self->tssPairID.get(), ssi.id());
if (!self->isTSSInQuarantine()) {
tssMapDB.set(tr, self->tssPairID.get(), ssi.id());
}
wait(tr->commit());
self->tag = myTag;

View File

@ -1047,7 +1047,9 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
{ "storageEngineExcludeTypes",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStorageEngineExcludeTypes", ""); } },
{ "maxTLogVersion",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedMaxTLogVersion", ""); } }
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedMaxTLogVersion", ""); } },
{ "disableTss",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableTSS", ""); } }
};
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {

View File

@ -1271,6 +1271,40 @@ Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
}
}
// A low-overhead FIFO mutex made with no internal queue structure (no list, deque, vector, etc)
// The lock is implemented as a Promise<Void>, which is returned to callers in a convenient wrapper
// called Lock.
//
// Usage:
// Lock lock = wait(mutex.take());
// lock.release(); // Next waiter will get the lock, OR
// lock.error(e); // Next waiter will get e, future waiters will see broken_promise
// lock = Lock(); // Or let Lock and any copies go out of scope. All waiters will see broken_promise.
struct FlowMutex {
FlowMutex() { lastPromise.send(Void()); }
bool available() { return lastPromise.isSet(); }
struct Lock {
void release() { promise.send(Void()); }
void error(Error e = broken_promise()) { promise.sendError(e); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
};
Future<Lock> take() {
Lock newLock;
Future<Lock> f = lastPromise.isSet() ? newLock : tag(lastPromise.getFuture(), newLock);
lastPromise = newLock.promise;
return f;
}
private:
Promise<Void> lastPromise;
};
struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code
// between wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock
@ -1348,7 +1382,9 @@ struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
// Only works if broken_on_destruct.canBeSet()
void kill(Error e = broken_promise()) {
if (broken_on_destruct.canBeSet()) {
broken_on_destruct.sendError(e);
auto local = broken_on_destruct;
// It could be the case that calling broken_on_destruct destroys this FlowLock
local.sendError(e);
}
}

View File

@ -1,14 +1,18 @@
FROM amazonlinux:2.0.20210326.0 as base
RUN yum install -y \
binutils \
bind-utils \
curl \
gdb \
jq \
less \
lsof \
nc \
net-tools \
perf \
perl \
procps \
python38 \
python3-pip \
strace \
@ -16,11 +20,16 @@ RUN yum install -y \
traceroute \
telnet \
tcpdump \
unzip \
vim
#todo: nload, iperf, numademo
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-2.0.30.zip" -o "awscliv2.zip" \
&& unzip awscliv2.zip && ./aws/install && rm -rf aws
COPY misc/tini-amd64.sha256sum /tmp/
COPY misc/flamegraph.sha256sum /tmp/
# Adding tini as PID 1 https://github.com/krallin/tini
ARG TINI_VERSION=v0.19.0
RUN curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-amd64 && \
@ -31,6 +40,13 @@ RUN curl -sLO https://github.com/krallin/tini/releases/download/${TINI_VERSION}/
COPY sidecar/requirements.txt /tmp
RUN pip3 install -r /tmp/requirements.txt
# Install flamegraph
RUN curl -sLO https://raw.githubusercontent.com/brendangregg/FlameGraph/90533539b75400297092f973163b8a7b067c66d3/stackcollapse-perf.pl && \
curl -sLO https://raw.githubusercontent.com/brendangregg/FlameGraph/90533539b75400297092f973163b8a7b067c66d3/flamegraph.pl && \
sha256sum -c /tmp/flamegraph.sha256sum && \
chmod +x stackcollapse-perf.pl flamegraph.pl && \
mv stackcollapse-perf.pl flamegraph.pl /usr/bin
# TODO: Only used by sidecar
RUN groupadd --gid 4059 fdb && \
useradd --gid 4059 --uid 4059 --no-create-home --shell /bin/bash fdb

View File

@ -0,0 +1,2 @@
a682ac46497d6fdbf9904d1e405d3aea3ad255fcb156f6b2b1a541324628dfc0 flamegraph.pl
5bcfb73ff2c2ab7bf2ad2b851125064780b58c51cc602335ec0001bec92679a5 stackcollapse-perf.pl

View File

@ -1,5 +1,6 @@
storageEngineExcludeTypes=-1,-2
maxTLogVersion=6
disableTss=true
testTitle=Clogged
clearAfterTest=false
testName=Cycle

View File

@ -1,5 +1,6 @@
storageEngineExcludeTypes=-1,-2
maxTLogVersion=6
disableTss=true
testTitle=Clogged
runSetup=false
testName=Cycle

View File

@ -7,7 +7,7 @@ clearAfterTest = false
[[test.workload]]
testName = 'DifferentClustersSameRV'
testDuration = 150
testDuration = 500
switchAfter = 50
keyToRead = 'someKey'
keyToWatch = 'anotherKey'