foundationdb/design/tlog-spilling.md.html

591 lines
29 KiB
HTML
Raw Normal View History

2019-09-11 08:37:23 +08:00
<meta charset="utf-8">
# TLog Spill-By-Reference Design
## Background
(This assumes a basic familiarity with [FoundationDB's architecture][fdbsummit-technical-overview].)
Transaction logs are a distributed Write-Ahead-Log for FoundationDB. They
receive commits from proxies that are written to a sequential *disk queue* in
version order. A commit is sent as a list of tagged mutations, where a *tag*
is a small identifier that represents one storage server as a destination. The
transaction logs are then later *peeked* by storage servers to receive data
destined for their tag. This is how committed data becomes available to
clients for reading.
Transaction logs internally handle commits via performing two operations
concurrently. First, they walk through each mutation in the commit, and push
the mutation onto an in-memory queue of mutations destined for that tag.
Second, they include the data in the next batch of pages to durably persist to
disk. These queues are popped from when the corresponding storage server has
persisted the data to its own disk.
TLogs will need to hold the last 5-7 seconds of mutations. In normal
operation, the default 1.5GB of memory is enough such that the last 5-7 seconds
of commits should almost always fit in memory. However, in the presence of
failures, the transaction log can be required to buffer significantly more
data. Most notably, when a storage server fails, its tag isn't popped until
data distribution is able to re-replicate all of the shards that storage server
was responsible for to other storage servers. Before that happens, mutations
will accumulate on the TLog destined for the failed storage server, in case it
comes back and is able to rejoin the cluster.
When this accumulation causes the memory required to hold all the unpopped data
to exceed `TLOG_SPILL_THREASHOLD` bytes, the transaction log offloads the
oldest data to disk. This writing of data to disk to reduce TLog memory
pressure is referred to as *spilling*.
Previously, spilling would work by writing the data to a SQLite B-tree. The
key would be `(tag, version)`, and the value would be all the mutations
destined for the given tag at the given version. Peek requests have a start
version, that is the latest version for which the storage server knows about,
and the TLog responds by range-reading the B-tree from the start version. Pop
requests allow the TLog to forget all mutations for a tag until a specific
version, and the TLog thus issues a range clear from `(tag, 0)` to
`(tag, pop_version)`. After spilling, the durably written data in the disk
queue would be trimmed to only include from the spilled version on, as any
required data is now entirely, durably held in the B-tree. As the entire value
is copied into the B-tree, this method of spilling will be referred to as
*spill-by-value* in the rest of this document.
**************************************************************
* Transaction Log *
* *
* *
* +------------------+ appends +------------+ *
* | Incoming Commits |----------->| Disk Queue | +------+ *
* +------------------+ +------------+ |SQLite| *
* | ^ +------+ *
* | | ^ *
* | pops | *
* +------+--------------+ | writes *
* | | | | | *
* v v v +----------+ *
* in-memory +---+ +---+ +---+ |Spill Loop| *
* queues | 1 | | 2 | | 3 | +----------+ *
* per-tag | | | | | | ^ *
* |...| |...| |...| | *
* | | | | *
* v v v | *
* +-------+------+--------------+ *
* queues spilled on overflow *
* *
**************************************************************
Unfortunately, it turned out that spilling in this fashion greatly impacts TLog
performance. A write bandwidth saturation test was run against a cluster, with
a modification to the transaction logs to have them act as if there was one
storage server that was permanently failed; it never sent pop requests to allow
the TLog to remove data from memory. After 15min, the write bandwidth had
reduced to 30% of its baseline. After 30min, that became 10%. After 60min,
that became 5%. (This is an intentional hyperbole due to the saturating write
load. See experiments at the end for more detail and data.)
With the recent multi-DC/multi-region work, a failure of a remote data center
would cause transaction logs to need to buffer all commits, as every commit is
tagged as destined for the remote datacenter. This would rapidly push
transaction logs into a spilling regime, and thus write bandwidth would begin
to rapidly degrade. It is unacceptable for a remote datacenter failure to so
drastically affect the primary datacenter's performance in the case of a
failure, so a more performant way of spilling data is required.
## Overview
Whereas spill-by-value copied the entire mutation into the B-tree and removes
it from the disk queue, spill-by-reference leaves the mutations in the disk
queue and writes a pointer to it into the B-tree. Performance experiments
revealed that the TLog's performance while spilling was dictated more by the
number of writes done to the SQLite B-tree, than by the size of those writes.
Thus, "spill-by-reference" being able to do a significantly better batching
with its writes to the B-tree is more important than that it writes less data
in aggregate. Spill-by-reference significantly reduces the volume of data
written to the B-tree, and the less data that we write, the more we can batch
versions to be written together.
************************************************************************
* DiskQueue *
* *
* -------- Index on disk -------- ---- Index in memory ---- *
* / \ / \ *
* +-----------------------------------+-----------------------------+ *
* | Spilled Data | Most Recent Data | *
* +-----------------------------------+-----------------------------+ *
* lowest version highest version *
* *
************************************************************************
Spill-by-reference works by taking a larger range of versions, and building a
single key-value pair per tag that describes where in the disk queue is every
relevant commit for that tag. Concretely, this takes the form
`(tag, last_version) -> [(version, start, end, mutation bytes)]`, where...
* `tag` is the small integer representing the storage server this mutation batch is destined for.
* `last_version` is the last/maximum version contained in the value's batch.
* `version` is the version of the commit that this index entry points to.
* `start` is an index into the disk queue of where to find the beginning of the commit.
* `end` is an index into the disk queue of where the end of the commit is.
* `mutation_bytes` is the number of bytes in the commit that are relevant for this tag.
And then writing only once per tag spilled into the B-tree for each iteration
through spilling.
Note that each tuple in the list represents a commit, and not a mutation. This
means that peeking spilled commits will involve reading mutations unrelated to
the requested tag. Alternatively, one could have each tuple represent a
mutation within a commit, to prevent over-reading when peeking. There exist
pathological workloads for each strategy. The purpose of this work is most
importantly to support spilling of log router tags. These exist on every
mutation, so that it will get copied to other datacenters. This is the exact
pathological workload for recording each mutation individually, because it only
increases the number of IO operations used to read the same amount of data.
For a wider set of workloads, there's room to establish a heuristic as to when
to record mutation(s) versus the entire commit, but performance testing hasn't
surfaced this as important enough to include in the initial version of this
work.
Peeking now works by issuing a range read to the B-tree from `(tag, peek_begin)`
to `(tag, infinity)`. This is why the key contains the last version of the
batch, rather than the beginning, so that a range read from the peek request's
version will always return all relevant batches. For each batched tuple, if
the version is greater than our peek request's version, then we read the commit
containing that mutation from disk, extract the relevant mutations, and append
them to our response. There is a target size of the response, 150KB by
default. As we iterate through the tuples, we sum `mutation_bytes`, which
already informs us how many bytes of relevant mutations we'll get from a given
commit. This allows us to make sure we won't waste disk IOs on reads that will
end up being discarded as unnecessary.
Popping works similarly to before, but now requires recovering information from
disk. Previously, we would maintain a map from version to location in the disk
queue for every version we hadn't yet spilled. Once spilling has copied the
value into the B-tree, knowing where the commit was in the disk queue is
useless to us, and is removed. In spill-by-reference, that information is
still needed to know how to map "pop until version 7" to "pop until byte 87" in
the disk queue. Unfortunately, keeping this information in memory would result
in TLogs slowly consuming more and more memory[^versionmap-memory] as more data
is spilled. Instead, we issue a range read of the B-tree from `(tag, pop_version)`
to `(tag, infinity)` and look at the first commit we find with a version
greater than our own. We then use its starting disk queue location as the
limit of what we could pop the disk queue until for this tag.
[^versionmap-memory]: Pessimistic assumptions would suggest that a TLog spilling 1TB of data would require ~50GB of memory to hold this map, which isn't acceptable.
## Detailed Implementation
The rough outline of concrete changes proposed looks like:
0. Allow a new TLog and old TLog to co-exist and be configurable, upgradeable, and recoverable
0. Modify spilling in new TLogServer
0. Modify peeking in new TLogServer
0. Modify popping in new TLogServer
0. Spill txsTag specially
### Spilling
In spill-by-reference, spilling is now the act of persisting the index of
### Peeking
A `TLogPeekRequest` contains a `Tag` and a `Version`, and is a request for all
commits with the specified tag with a commit version greater than or equal to
the given version. The goal is to return a 150KB block of mutations.
**************************************************************************
* *
* +---------+ Tag +---------+ Tag +--------+ *
* | Peek |-------->| Spilled | ...------------->| Memory | *
* | Request | Version | Index | Version | Index | *
* +---------+ +---------+ +--------+ *
* | | *
* +-----------------+-----------------+ | *
* / \ Start=100 _/ \_ Start=500 + Start=900 + Ptr=0xF00 *
* / \ Length=50 / \ Length=70 / \ Length=30 / \ Length=30 *
* +------------------------------------------------+------------------+ *
* | Disk Queue | Also In Memory | *
* +------------------------------------------------+------------------+ *
* *
**************************************************************************
Spill-by-value and memory storage engine only ever read from DiskQueue when
recovering, and read the entire file linearly. Therefore, `IDiskQueue` had no
API for random reads to the DiskQueue. That ability is now required for
peeking, and thus, `IDiskQueue`'s API has been enhanced correspondingly:
``` CPP
enum class CheckHashes { NO, YES };
class IDiskQueue {
// ...
Future<Standalone<StringRef>> read(location start, location end, CheckHashes ch);
// ...
};
```
Internally, the DiskQueue adds page headers every 4K, which are stripped out
from the returned data. Therefore, the length of the result will not be the
same as `end-start`, intentionally. For this reason, the API is `(start, end)`
and not `(start, length)`.
Spilled data, when using spill-by-value, was resistent to bitrot via data being
checksummed interally within SQLite's B-tree. Now that reads can be done
directly, the responsibility for verifing data integrity falls upon the
DiskQueue. `CheckHashes::YES` will cause the DiskQueue to use the checksum in
each DiskQueue page to verify data integrity. If an externally maintained
checksums exists to verify the returned data, then `CheckHashes::NO` can be
used to elide the checksumming.
### Popping
As storage servers persist data, they send `pop(tag, version)` requests to the
transaction log to notify it that it is allowed to discard data for `tag` up
through `version`. Once all the tags have been popped from the oldest commit
in the DiskQueue, the tail of the DiskQueue can be discarded to reclaim space.
Each time FoundationDB goes through a recovery, it will recruit a new
generation of transaction logs. This new generation of transaction logs will
often be recruited on the same worker that hosted the previous generation's
transaction log. The old generation of transaction logs will only shut down
once all the data that they have has been fully popped. This means that there
can be multiple instances of a transaction log
*********************************************************
* SharedTLog *
* *
* +--------+--------+--------+--------+--------+ *
* | TLog 1 | TLog 2 | TLog 3 | TLog 4 | TLog 5 | *
* +--------+--------+--------+--------+--------+ *
* ^ popping ^spilling ^committing *
*********************************************************
### Transaction State Store
For FDB to perform a recovery, there is information that it needs to know about
the database, such as the configuration, worker exclusions, backup status, etc.
These values are stored into the database in the `\xff` system keyspace.
However, during a recovery, FDB can't read this data from the storage servers,
because recovery hasn't completed, so it doesn't know who the storage servers
are yet. Thus, a copy of this data is held in-memory on every proxy in the
*transaction state store*, and durably persisted as a part of commits on the
transaction logs. Being durably stored on the transaction logs means the list
of transaction logs can be fetched from the coordinators, and then used to load
the rest of the information about the database.
The in-memory storage engine writes an equal amount of mutations and snapshot
data to a queue, an when a full snapshot of the data has been written, deletes
the preceeding snapshot and begins writing a new one. When backing an
in-memory storage engine with the transaction logs, the
`LogSystemDiskQueueAdapter` implements writing to a queue as committing
mutations to the transaction logs with a special tag of `txsTag`, and deleting
the preceeding snapshot as popping the transaction logs for the tag of `txsTag`
until the version where the last full snapshot began.
This means that unlike every other commit that is tagged and stored on the
transaction logs, `txsTag` signifies data that is:
1. Committed to infrequently
2. Only peeked on recovery
3. Popped infrequently, and a large portion of the data is popped at once
4. A small total volume of data
The most problematic of these is the infrequent popping. Unpopped data will be
spilled after some time, and if `txsTag` data is spilled and not popped, it
will prevent the DiskQueue from being popped as well. This will cause the
DiskQueue to grow continuously. The infrequent commits and small data volume
means that there benefits of spill-by-reference over spill-by-value don't apply
for this tag.
Thus, even when configured to spill-by-reference, `txsTag` is spilled by value.
### Disk Queue Recovery
If a transaction log dies and restarts, all commits that were in memory at the
time of the crash must be loaded back into memory. Recovery is blocked on this
process, as there might have been a commit to the transaction state store
immediately before crashing, and that data needs to be fully readable during a
recovery.
In spill-by-value, the DiskQueue only ever contained commits that were also
held in memory, and thus recovery would need to read up to 1.5GB of data. With
spill-by-reference, the DiskQueue could theoretically contain terrabytes of
data. To keep recovery times boundedly low, FDB must still only read the
commits that need to be loaded back into memory.
This is done by persisting the location in the DiskQueue of the last spilled
commit to the SQLite B-Tree. This is done in the same transaction as the
spilling of that commit. This provides an always accurate pointer to where
data that needs to be loaded into memory begins. The pointer is to the
beginning of the last commit rather than the end, to make sure that the pointer
is always contained within the DiskQueue. This provides extra sanity checking
on the validity of the DiskQueue's contents at recovery, at the cost of
potentially reading 10MB more than what would be required.
## Testing
Correctness bugs in spilling would manifest as data corruption, which is well covered by simulation.
The only special testing code added was to enable changing `log_spill` in `ConfigureTest`.
This covers switching between spilling methods in the presence of faults.
An `ASSERT` was added to simulation that verifies that commits read from the
DiskQueue on recovery are only the commits which have not been spilled.
The rest of the testing is to take a physical cluster and try the extremes that
can only happen at scale:
* Verify that recovery times are not impacted when a large amount of data is spilled
* Verify that long running tests hit a steady state of memory usage (and thus there are likely no leaks).
* Plot how quickly (MB/s) a remote datacenter can catch up in old vs new spilling strategy
* See what happens when there's 1 tlog and more than 100 storage servers.
* Verify that peek requests get limited
* See if tlog commits can get starved by excessive peeking
[fdbsummit-technical-overview]: https://www.youtube.com/watch?v=EMwhsGsxfPU
# TLog Spill-By-Reference Operational Guide
## Notable Behavior Changes
TL;DR: Spilling involves less IOPS and is faster. Peeking involves more IOPS and is slower. Popping involves >0 IOPS.
### Spilling
The most notable effect of the spilling changes is that the Disk Queue files
will now grow to potentially terrabytes in size.
1. Spilling will occur in larger batches, which will result in a more
sawtooth-like `BytesInput - BytesDurable` value. I'm not aware that this will have any meaningful impact.
* Disk queue files will grow when spilling is happening
* Alerting based on DQ file size is no longer appropriate
As a curious aside, throughput decreases as spilled volume increases, which
quite possibly worked as accidental backpressure. As a feature, this no longer
exists, but means write-heavy workloads can drown storage servers faster than
before.
### Peeking
Peeking has seen tremendous changes. Its involves more IO operations and memory usage.
The expected implication of this are:
1. A peek of spilled data will involve a burst of IO operations.
Theoretically, this burst can drown out queued write operations to disk,
thus and slowing down TLog commits. This hasn't been observed in testing.
Low IOPS devices, such as HDD or network attached storage, would struggle
more here than locally attached SSD.
2. Generating a peek response of 150KB could require reading 100MB of data, and allocating buffers to hold that 100MB.
OOMs were observed in early testing. Code has been added to specifically
limit how much memory can be allocated for serving a signle peek request
and all concurrent peek requests, with knobs to allow tuning this per
deployment configuration.
### Popping
Popping will transition from being an only in-memory operation to one that
involves reads from disk.
Due to a strange quirk, TLogs will allocate up to 2GB of memory as a read cache
for SQLite's B-tree. The expected maximum size of the B-tree has drastically
reduced, so these reads should almost never actually hit disk. The number of
writes to disk will stay the same, so performance should stay unchanged.
### Disk Queues
This work should have a minimal impact on recovery times, which is why recovery
hasn't been significantly mentioned in this document. However, there are two
minor impacts on recovery times:
1. Larger disk queue file means more file to zero out in the case of recovery.
This should be negligable when fallocate `ZERO_RANGE` is available, because then it's only a metadata operation.
2. A larger file means more bisection iterations to find the first page.
If we say Disk Queue files are typically ~4GB now, and people are unlikely
to have more than 4TB drives, then this means in the worst case, another 8
sequential IOs will need to be done when first recovering a disk queue file
to find the most recent page with a binary search.
If this turns out to be an issue, it's trivial to address. There's no
reason to do only a binary search when drives support parallel requests. A
32-way search could reasonably be done, and would would make a 4TB Disk
Queue file faster to recover than a 4GB one currently.
3. Disk queue files can now shrink.
The particular logic currently used is that:
If one file is significantly larger than the other file, then it will be
truncated to the size of the other file. This resolves situations where a
particular storage server or remote DC being down causes one DiskQueue file
to be grown to a massive size, and then the data is rapidly popped.
Otherwise, If the files are of reasonably similar size, then we'll take
`pushLocation - popLocation` as the number of "active" bytes, and then
shrink the file by `TLOG_DISK_QUEUE_SHRINK_BYTES` bytes if the file is
larger than `active + TLOG_DISK_QUEUE_EXTENSION_BYTES + TLOG_DISK_QUEUE_SHRINK_BYTES`.
!!! note
While writing this, I've realized it's probably a good idea to limit that
the disk queue can't shrink under 4GB of size, to prevent size thrashing on
bursty workloads.
## Knobs
`REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT`
: How many bytes of mutations should be spilled at once in a spill-by-reference TLog.<br>
Increasing it could increase throughput in spilling regimes.<br>
Decreasing it will decrease how sawtooth-like TLog memory usage is.<br>
`TLOG_UPDATE_STORAGE`
: How many bytes of mutations should be spilled at once in a spill-by-value TLog.<br>
This knob is pre-existing, and has only been "changed" to only apply to spill-by-value.<br>
`TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK`
: How many batches of spilled data index batches should be read from disk to serve one peek request.<br>
Increasing it will potentially increase the throughput of peek requests.<br>
Decreasing it will decrease the number of read IOs done per peek request.<br>
`TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH`
: How many bytes a batch of spilled data indexes can be.<br>
Increasing it will increase TLog throughput while spilling.<br>
Decreasing it will decrease the latency and increase the throughput of peek requests.<br>
`TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES`
: How many bytes of memory can be allocated to hold the results of reads from disk to respond to peek requests.<br>
Increasing it will increase the number of parallel peek requests a TLog can handle at once.<br>
Decreasing it will reduce TLog memory usage.<br>
If increased, `--max_memory` should be increased by the same amount.<br>
`TLOG_DISK_QUEUE_EXTENSION_BYTES`
: When a DiskQueue needs to extend a file, by how many bytes should it extend the file.<br>
Increasing it will reduce metadata operations done to the drive, and likely tail commit latency.<br>
Decreasing it will reduce allocated but unused space in the DiskQueue files.<br>
Note that this was previously hardcoded to 20MB, and is only being promoted to a knob.<br>
`TLOG_DISK_QUEUE_SHRINK_BYTES`
: If a DiskQueue file has extra space left when switching to the other file, by how many bytes should it be shrunk.<br>
Increasing this will cause disk space to be returned to the OS faster.<br>
Decreasing this will decrease TLog tail latency due to filesystem metadata updates.<br>
## Observability
With the new changes, we must ensure that sufficent information has been exposed such that:
1. If something goes wrong in production, we can understand what and why from trace logs.
2. We can understand if the TLog is performing suboptimally, and if so, which knob we should change and by how much.
All of the below are planned to be additions to TLogMetrics.
### Spilling
!!! warning
Only metrics above this line have been implemented.
`SpillReferenceBatchSize`
: Stats about the total size of batches written to the B-tree, excluding `txsTag`.
`SpillReferenceTagCount`
: Stats about the number of distinct tags that have been spilled on each loop iteration, excluding `txsTag`.
`SpillReferenceIterationCount`
: The number of times we committed data to the B-tree to spill.
### Peeking
`PeekMemoryRequestsStalled`
: The number of peek requests that are blocked on acquiring memory for reads.
`PeekMemoryReserved`
: The amount of memory currently reserved for serving peek requests.
!!! warning
Only metrics above this line have been implemented.
`PeekMemoryAverage`
: The average amount of memory allocated per peek of spilled data.
`PeekMemoryLimitHit`
: The number of times a peek was cut short due to hitting the maximum memory limit.
`PeekReferenceSpilledCount`
: The number of times a peek request required reading spilled data from the disk queue.
`PeekReferenceReadAmp`
: Stats about the read amplification encountered.
### Popping
!!! warning
Only metrics above this line have been implemented.
`DQOldestVersion`
: The oldest version that's still useful.
`BytesPopped`
: The total bytes discarded from the queue *and* the `IDiskQueue::location` of the first useful byte.
`OldestUnpoppedTag`
: The tag that's preventing the DiskQueue from being further popped.
### Disk Queue
!!! warning
Only metrics above this line have been implemented.
`DiskQueueExcessBytes`
: The number of bytes that the disk queue doesn't need, and will truncate to free over time. This should be roughly equal to `BytesInput - BytesPopped`, but computed at a different layer.
## Monitoring and Alerting
To answer questions like:
1. What new graphs should exist?
2. What old graphs might exist that would no longer be meaningful?
3. What alerts might exist that need to be changed?
4. What alerts should be created?
Of which I'm aware of:
* Any current alerts on "Disk Queue files more than [constant size] GB" will need to be removed.
* Any alerting or monitoring of `log*.sqlite` as an indication of spilling will no longer be effective.
* A graph of `BytesInput - BytesPopped` will give an idea of the number of "active" bytes in the DiskQueue file.
# Appendix
## Experiments
### SQLite B-tree costs
| Experiment | result |
|-------------------------------|---------|
| Baseline | 60 MB/s |
| Write value to B-tree | 20 MB/s |
| Write pointer to B-tree | 20 MB/s |
| " once per spill loop | 40 MB/s |
| " spill 10x the data per loop | 60 MB/s |
### Spilling speeds
2. speed over time with 1 storage server dead
<!-- Force long-style table of contents -->
<script>window.markdeepOptions={}; window.markdeepOptions.tocStyle="long";</script>
<!-- When printed, top level section headers should force page breaks -->
<style>.md h1, .md .nonumberh1 {page-break-before:always}</style>
<!-- Markdeep: -->
<style class="fallback">body{visibility:hidden;white-space:pre;font-family:monospace}</style><script src="markdeep.min.js" charset="utf-8"></script><script src="https://casual-effects.com/markdeep/latest/markdeep.min.js" charset="utf-8"></script><script>window.alreadyProcessedMarkdeep||(document.body.style.visibility="visible")</script>