Merge remote-tracking branch 'apple/main' into fix

This commit is contained in:
Jingyu Zhou 2023-01-06 14:16:28 -08:00
commit d1e5018f60
13 changed files with 157 additions and 34 deletions

View File

@ -407,6 +407,11 @@ func (o DatabaseOptions) SetTransactionIncludePortInAddress() error {
return o.setOpt(505, nil)
}
// Set a random idempotency id for all transactions. See the transaction option description for more information. This feature is in development and not ready for general use.
func (o DatabaseOptions) SetTransactionAutomaticIdempotency() error {
return o.setOpt(506, nil)
}
// Allows ``get`` operations to read from sections of keyspace that have become unreadable because of versionstamp operations. This sets the ``bypass_unreadable`` option of each transaction created by this database. See the transaction option description for more information.
func (o DatabaseOptions) SetTransactionBypassUnreadable() error {
return o.setOpt(700, nil)
@ -571,6 +576,11 @@ func (o TransactionOptions) SetSizeLimit(param int64) error {
return o.setOpt(503, int64ToBytes(param))
}
// Automatically assign a random 16 byte idempotency id for this transaction. Prevents commits from failing with ``commit_unknown_result``. WARNING: If you are also using the multiversion client or transaction timeouts, if either cluster_version_changed or transaction_timed_out was thrown during a commit, then that commit may have already succeeded or may succeed in the future. This feature is in development and not ready for general use.
func (o TransactionOptions) SetAutomaticIdempotency() error {
return o.setOpt(505, nil)
}
// Snapshot read operations will see the results of writes done in the same transaction. This is the default behavior.
func (o TransactionOptions) SetSnapshotRywEnable() error {
return o.setOpt(600, nil)

View File

@ -6,12 +6,22 @@ As an intermediate goal, I plan to introduce this feature disabled by default. T
# API
Introduce a new transaction option `IDEMPOTENCY_ID`, which will be validated to be at most 255 bytes.
Introduce a new transaction option `AUTOMATIC_IDEMPOTENCY`, which sets the transaction's idempotency id to 16 random bytes if there's no idempotency id yet. Setting this also instructs the fdb client to manage the full lifecycle of the idempotency id (expiring the id automatically after a commit acknowledgement).
Introduce a new transaction option `IDEMPOTENCY_ID`, which will be validated to be at most 255 bytes. This option sets the idempotency id for the transaction, and also instructs the fdb client _not to_ expire this id automatically. The user is responsible for calling `fdb_database_expire_idempotency_id` to expire it.
Add
```
FDBFuture* fdb_transaction_commit_result(FDBTransaction* tr, uint8_t const* idempotency_id, int idempotency_id_length)
FDBFuture* fdb_database_commit_result(FDBDatabase* db, uint8_t const* idempotency_id, int idempotency_id_length, int64_t read_snapshot)
```
, which can be used to determine the result of a commit that failed with `transaction_timed_out`.
, which can be used to determine the result of a commit with the given read snapshot and idempotency id. Read snapshot is the read version of the last transaction attempt, and this is used to narrow down the keyspace that needs to be searched for the idempotency id.
Add
```
void fdb_database_expire_idempotency_id(FDBDatabase* db, uint8_t const* idempotency_id, int idempotency_id_length)
```
This lets the client know that the caller is done with this idempotency id and that the cluster may purge it.
Commits for transactions with idempotency ids would not fail with `commit_unknown_result`, but in (extremely) rare cases could fail with a new error that clients are expected to handle by restarting the process.
# Background
@ -49,10 +59,10 @@ in that id and the cluster can reclaim the space used to store the idempotency
id. The commit proxy that committed a batch is responsible for cleaning all
idempotency kv pairs from that batch, so clients must tell that specific proxy
that they're done with the id. The first proxy will also periodically clean up
the oldest idempotency ids, based on a policy determined by two knobs. One knob
will control the minimum lifetime of an idempotency id (i.e. don't delete
anything younger than 1 day), and the other will control the target byte size of
the idempotency keys (e.g. keep 100 MB of idempotency keys around).
the oldest idempotency ids, based on a policy determined by knobs. The knob
`IDEMPOTENCY_IDS_MIN_AGE_SECONDS` controls the minimum lifetime of an
idempotency id (i.e. don't delete anything younger than 1 day). More knobs may
be considered in the future.
# Commit protocol
@ -76,13 +86,29 @@ If a transaction learns that it has been in-flight so long that its idempotency
# Considerations
- Additional storage space on the cluster. This can be controlled directly via an idempotency id target bytes knob/config.
- Potential write hot spot.
- Additional storage space on the cluster for the idempotency ids.
- Potential write hot spot, since all idempotency id writes are adjacent and monotonically increasing.
# Multi-version client
The multi-version client will generate its own idempotency id for a transaction and manage its lifecycle. It will duplicate the logic in NativeApi to achieve the same guarantees. As part of this change we will also ensure that the previous commit attempt is no longer in-flight before allowing the commit future to become ready. This will fix a potential "causal-write-risky" issue if a commit attempt fails with `cluster_version_changed`.
# Implementation
```
FDBFuture* fdb_database_commit_result(FDBDatabase* db, uint8_t const* idempotency_id, int idempotency_id_length, int64_t read_snapshot)
```
The implementation will commit a dummy transaction that conflicts with transactions that have this idempotency_id. (Recall that idempotency_id is added to the conflict ranges, so any transaction with the same idempotency id conflict with the dummy transaction.) This ensures that no transactions with this idempotency_id are in flight.
The implementation would then search storage servers for the idempotency id to determine the commit result. The range of keys that need to be read is bounded by the range of possible commit versions for a transaction with `read_snapshot`.
```
void fdb_database_expire_idempotency_id(FDBDatabase* db, uint8_t const* idempotency_id, int idempotency_id_length)
```
The fdb client would need to keep track of which commit proxies are responsible for recently committed transactions with idempotency ids, so it knows where to direct the expire requests. Proactively expiring idempotency ids is done on a best-effort basis, but this needs to work well enough that the storage used by the cluster for idempotency ids is acceptable. Users who are not using `AUTOMATIC_IDEMPOTENCY` are expected to call `fdb_database_expire_idempotency_id` to do the proactive clean up.
# Experiments
- Initial experiments show that this is about 1% overhead for the worst case workload which is transactions that only update a single key.

View File

@ -0,0 +1,81 @@
#####################
Automatic Idempotency
#####################
.. automatic-idempotency:
.. warning :: Automatic idempotency is currently experimental and not recommended for use in production.
Synopsis
~~~~~~~~
Use the ``automatic_idempotency`` transaction option to prevent commits from
failing with ``commit_unknown_result`` at a small performance cost.
``transaction_timed_out`` and ``cluster_version_changed`` still indicate an
unknown commit status.
Details
~~~~~~~
Transactions are generally run in retry loops that retry the error
``commit_unknown_result``. When an attempt fails with
``commit_unknown_result`` it's possible that the attempt succeeded, and the
retry will perform the effect of the transaction twice! This behavior can be
surprising at first, and difficult to reason
about.
As an example, consider this simple transaction::
@fdb.transactional
def atomic_increment(tr, key):
tr.add(key, struct.pack("<q", 1))
This transaction appears to be correct, and behaves as expected most of the
time, but incorrectly adds to the key multiple times if
``commit_unknown_result`` is thrown.
To mitigate this, it's common to write something unique to a transaction, so
that retries can detect whether or not a commit succeeded by reading the
database and looking for the unique change.
For example::
def atomic_increment(db, key):
tr_id = uuid.UUID(int=random.getrandbits(128)).bytes
# tr_id can be used to detect whether or not this transaction has already committed
@fdb.transactional
def do_it(tr):
if tr[b"ids/" + tr_id] == None:
tr[b"ids/" + tr_id] = b""
tr.add(key, struct.pack("<q", 1))
do_it(db)
# Clean up unique state
del db[b"ids/" + tr_id]
This approach has several problems
#. We're now doing about twice the work as before, and we need an extra transaction to clean up.
#. This will slowly leak space over time if clients fail before cleaning up their unique state.
Automatic Idempotency is an implementation of a conceptually similar pattern,
but taking advantage of fdb implementation details to provide better
performance and clean up after itself. Here's our atomic increment with Automatic Idempotency::
@fdb.transactional
def atomic_increment(tr, key):
tr.options.set_automatic_idempotency()
tr.add(key, struct.pack("<q", 1))
This will correctly add exactly once, provided you allow it to retry until success.
Caveats
~~~~~~~
Automatic Idempotency only prevents ``commit_unknown_result``, and it does not
prevent ``transaction_timed_out`` or ``cluster_version_changed``.
Each of these can indicate an unknown commit status. To mitigate this, the
current recommendation is *do not* retry transactions that fail with
``transaction_timed_out``. The default retry loop does not retry
``transaction_timed_out``, so this is mostly relevant if you have a custom retry
loop. This feature is not recommended for users of the :ref:`multi-version-client-api` at
this time since it does not prevent ``cluster_version_changed``. Support may be added in the future.

View File

@ -28,6 +28,8 @@ FoundationDB supports language bindings for application development using the or
* :doc:`tenants` describes the use of the tenants feature to define named transaction domains.
* :doc:`automatic-idempotency` describes the use of a transaction option to prevent transactions from failing with ``commit_unknown_result``.
.. toctree::
:maxdepth: 1
:titlesonly:
@ -45,3 +47,4 @@ FoundationDB supports language bindings for application development using the or
transaction-profiler-analyzer
api-version-upgrade-guide
tenants
automatic-idempotency

View File

@ -606,6 +606,8 @@ The following example illustrates both techniques. Together, they make a transac
balanceKey = fdb.tuple.pack(('account', acctId))
tr.add(balanceKey, amount)
There is experimental support for preventing ``commit_unknown_result`` altogether using a transaction option. See :doc:`automatic-idempotency` for more details. Note: there are other errors which indicate an unknown commit status. See :ref:`non-retryable errors`.
.. _conflict-ranges:
Conflict ranges
@ -954,6 +956,10 @@ The ``commit_unknown_result`` Error
However, there is one guarantee FoundationDB gives to the caller: at the point of time where you receive this error, the transaction either committed or not and if it didn't commit, it will never commit in the future. Or: it is guaranteed that the transaction is not in-flight anymore. This is an important guarantee as it means that if your transaction is idempotent you can simply retry. For more explanations see developer-guide-unknown-results_.
There is experimental support for preventing ``commit_unknown_result`` altogether using a transaction option. See :doc:`automatic-idempotency` for more details. Note: there are other errors which indicate an unknown commit status. See :ref:`non-retryable errors`.
.. _non-retryable errors:
Non-Retryable Errors
~~~~~~~~~~~~~~~~~~~~

View File

@ -123,7 +123,6 @@ struct ClientDBInfo {
Optional<Value> forward;
std::vector<VersionHistory> history;
UID clusterId;
bool isEncryptionEnabled = false;
Optional<EncryptKeyProxyInterface> encryptKeyProxy;
TenantMode tenantMode;
@ -147,7 +146,6 @@ struct ClientDBInfo {
forward,
history,
tenantMode,
isEncryptionEnabled,
encryptKeyProxy,
clusterId,
clusterType,

View File

@ -209,8 +209,7 @@ description is not currently required but encouraged.
defaultFor="23"/>
<Option name="transaction_automatic_idempotency" code="506"
description="Set a random idempotency id for all transactions. See the transaction option description for more information. This feature is in development and not ready for general use."
defaultFor="505"
hidden="true"/>
defaultFor="505" />
<Option name="transaction_bypass_unreadable" code="700"
description="Allows ``get`` operations to read from sections of keyspace that have become unreadable because of versionstamp operations. This sets the ``bypass_unreadable`` option of each transaction created by this database. See the transaction option description for more information."
defaultFor="1100"/>
@ -290,8 +289,7 @@ description is not currently required but encouraged.
description="Associate this transaction with this ID for the purpose of checking whether or not this transaction has already committed. Must be at least 16 bytes and less than 256 bytes. This feature is in development and not ready for general use. Unless the automatic_idempotency option is set after this option, the client will not automatically attempt to remove this id from the cluster after a successful commit."
hidden="true" />
<Option name="automatic_idempotency" code="505"
description="Automatically assign a random 16 byte idempotency id for this transaction. Prevents commits from failing with ``commit_unknown_result``. WARNING: If you are also using the multiversion client or transaction timeouts, if either cluster_version_changed or transaction_timed_out was thrown during a commit, then that commit may have already succeeded or may succeed in the future. This feature is in development and not ready for general use."
hidden="true" />
description="Automatically assign a random 16 byte idempotency id for this transaction. Prevents commits from failing with ``commit_unknown_result``. WARNING: If you are also using the multiversion client or transaction timeouts, if either cluster_version_changed or transaction_timed_out was thrown during a commit, then that commit may have already succeeded or may succeed in the future. This feature is in development and not ready for general use." />
<Option name="snapshot_ryw_enable" code="600"
description="Snapshot read operations will see the results of writes done in the same transaction. This is the default behavior." />
<Option name="snapshot_ryw_disable" code="601"

View File

@ -135,7 +135,6 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
dbInfo.myLocality = db->serverInfo->get().myLocality;
dbInfo.client = ClientDBInfo();
dbInfo.client.encryptKeyProxy = db->serverInfo->get().encryptKeyProxy;
dbInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
dbInfo.client.tenantMode = TenantAPI::tenantModeForClusterType(db->clusterType, db->config.tenantMode);
dbInfo.client.clusterId = db->serverInfo->get().client.clusterId;
dbInfo.client.clusterType = db->clusterType;
@ -949,7 +948,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
if (db->clientInfo->get().commitProxies != req.commitProxies ||
db->clientInfo->get().grvProxies != req.grvProxies ||
db->clientInfo->get().tenantMode != db->config.tenantMode ||
db->clientInfo->get().isEncryptionEnabled != SERVER_KNOBS->ENABLE_ENCRYPTION ||
db->clientInfo->get().clusterId != db->serverInfo->get().client.clusterId ||
db->clientInfo->get().clusterType != db->clusterType ||
db->clientInfo->get().metaclusterName != db->metaclusterName ||
@ -962,7 +960,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
.detail("ReqCPs", req.commitProxies)
.detail("TenantMode", db->clientInfo->get().tenantMode.toString())
.detail("ReqTenantMode", db->config.tenantMode.toString())
.detail("EncryptionEnabled", SERVER_KNOBS->ENABLE_ENCRYPTION)
.detail("ClusterId", db->serverInfo->get().client.clusterId)
.detail("ClientClusterId", db->clientInfo->get().clusterId)
.detail("ClusterType", db->clientInfo->get().clusterType)
@ -974,7 +971,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
ClientDBInfo clientInfo;
clientInfo.encryptKeyProxy = db->serverInfo->get().encryptKeyProxy;
clientInfo.id = deterministicRandom()->randomUniqueID();
clientInfo.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
clientInfo.commitProxies = req.commitProxies;
clientInfo.grvProxies = req.grvProxies;
clientInfo.tenantMode = TenantAPI::tenantModeForClusterType(db->clusterType, db->config.tenantMode);
@ -3571,8 +3567,10 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
data.degradationInfo.degradedServers.insert(satelliteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
// Trigger recovery when satellite Tlog is disconnected.
data.degradationInfo.disconnectedServers.insert(satelliteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
// No recovery when remote tlog is degraded.

View File

@ -402,7 +402,6 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
notify[i].send(newInfo);
notify.clear();
ClientDBInfo outInfo;
outInfo.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
outInfo.id = deterministicRandom()->randomUniqueID();
outInfo.forward = req.conn.toString();
clientData.clientInfo->set(CachedSerialization<ClientDBInfo>(outInfo));
@ -642,7 +641,6 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
Optional<LeaderInfo> forward = regs.getForward(req.clusterKey);
if (forward.present()) {
ClientDBInfo info;
info.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
info.id = deterministicRandom()->randomUniqueID();
info.forward = forward.get().serializedInfo;
req.reply.send(CachedSerialization<ClientDBInfo>(info));

View File

@ -1297,7 +1297,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
dataMove.setPhase(DataMoveMetaData::Deleting);
tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(dataMove));
wait(tr.commit());
throw data_move_cancelled();
throw movekeys_conflict();
}
if (dataMove.getPhase() == DataMoveMetaData::Deleting) {
TraceEvent(SevVerbose, "StartMoveShardsDataMove", relocationIntervalId)
@ -1313,7 +1313,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
begin = dataMove.ranges.front().end;
} else {
if (cancelDataMove) {
throw data_move_cancelled();
throw movekeys_conflict();
}
dataMove = DataMoveMetaData();
dataMove.id = dataMoveId;

View File

@ -8030,7 +8030,8 @@ public:
// Run the destructive sanity check, but don't throw.
ErrorOr<Void> err = wait(errorOr(self->m_tree->clearAllAndCheckSanity()));
// If the test threw an error, it must be an injected fault or something has gone wrong.
ASSERT(!err.isError() || err.getError().isInjectedFault());
ASSERT(!err.isError() || err.getError().isInjectedFault() ||
err.getError().code() == error_code_unexpected_encoding_type);
}
} else {
// The KVS user shouldn't be holding a commit future anymore so self shouldn't either.

View File

@ -3168,14 +3168,20 @@ public:
// Whether the transaction system (in primary DC if in HA setting) contains degraded servers.
bool transactionSystemContainsDegradedServers() {
const ServerDBInfo& dbi = db.serverInfo->get();
auto transactionWorkerInList = [&dbi](const std::unordered_set<NetworkAddress>& serverList) -> bool {
auto transactionWorkerInList = [&dbi](const std::unordered_set<NetworkAddress>& serverList,
bool skipSatellite) -> bool {
for (const auto& server : serverList) {
if (dbi.master.addresses().contains(server)) {
return true;
}
for (const auto& logSet : dbi.logSystemConfig.tLogs) {
if (!logSet.isLocal || logSet.locality == tagLocalitySatellite) {
if (!logSet.isLocal) {
// We don't check server degradation for remote TLogs since it is not on the transaction system
// critical path.
continue;
}
if (skipSatellite && logSet.locality == tagLocalitySatellite) {
continue;
}
for (const auto& tlog : logSet.tLogs) {
@ -3207,8 +3213,10 @@ public:
return false;
};
return transactionWorkerInList(degradationInfo.degradedServers) ||
transactionWorkerInList(degradationInfo.disconnectedServers);
// Check if transaction system contains degraded/disconnected servers. For satellite, we only check for
// disconnection since the latency between prmary and satellite is across WAN and may not be very stable.
return transactionWorkerInList(degradationInfo.degradedServers, /*skipSatellite=*/true) ||
transactionWorkerInList(degradationInfo.disconnectedServers, /*skipSatellite=*/false);
}
// Whether transaction system in the remote DC, e.g. log router and tlogs in the remote DC, contains degraded
@ -3450,7 +3458,6 @@ public:
serverInfo.masterLifetime.ccID = id;
serverInfo.clusterInterface = ccInterface;
serverInfo.myLocality = locality;
serverInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
db.serverInfo->set(serverInfo);
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, LockAware::True);

View File

@ -2141,9 +2141,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
.detail("BlobMigratorID",
localInfo.blobMigrator.present() ? localInfo.blobMigrator.get().id() : UID())
.detail("EncryptKeyProxyID",
localInfo.encryptKeyProxy.present() ? localInfo.encryptKeyProxy.get().id() : UID())
.detail("IsEncryptionEnabled", localInfo.client.isEncryptionEnabled);
localInfo.encryptKeyProxy.present() ? localInfo.encryptKeyProxy.get().id() : UID());
dbInfo->set(localInfo);
}
errorForwarders.add(
@ -3654,7 +3652,6 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
auto asyncPriorityInfo =
makeReference<AsyncVar<ClusterControllerPriorityInfo>>(getCCPriorityInfo(fitnessFilePath, processClass));
auto serverDBInfo = ServerDBInfo();
serverDBInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
serverDBInfo.myLocality = localities;
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>(serverDBInfo);
Reference<AsyncVar<Optional<UID>>> clusterId(