Merge branch 'main' of https://github.com/apple/foundationdb into refactor/main/ddEnabledState

This commit is contained in:
Xiaoxi Wang 2022-08-10 17:03:18 -07:00
commit fda922eb1a
21 changed files with 295 additions and 77 deletions

View File

@ -2,6 +2,19 @@
Release Notes
#############
7.1.19
======
* Same as 7.1.18 release with AVX enabled.
7.1.18
======
* Released with AVX disabled.
* Added knobs for the minimum and the maximum of the Ratekeeper's default priority. `(PR #7820) <https://github.com/apple/foundationdb/pull/7820>`_
* Fixed bugs in ``getRange`` of the special key space. `(PR #7778) <https://github.com/apple/foundationdb/pull/7778>`_, `(PR #7720) <https://github.com/apple/foundationdb/pull/7720>`_
* Added debug ID for secondary queries in index prefetching. `(PR #7755) <https://github.com/apple/foundationdb/pull/7755>`_
* Changed hostname resolving to prefer IPv6 addresses. `(PR #7750) <https://github.com/apple/foundationdb/pull/7750>`_
* Added more transaction debug events for prefetch queries. `(PR #7732) <https://github.com/apple/foundationdb/pull/7732>`_
7.1.17
======
* Same as 7.1.16 release with AVX enabled.
@ -15,7 +28,7 @@ Release Notes
* Fixed ScopeEventFieldTypeMismatch error for TLogMetrics. `(PR #7640) <https://github.com/apple/foundationdb/pull/7640>`_
* Added getMappedRange latency metrics. `(PR #7632) <https://github.com/apple/foundationdb/pull/7632>`_
* Fixed a version vector performance bug due to not updating client side tag cache. `(PR #7616) <https://github.com/apple/foundationdb/pull/7616>`_
* Fixed DiskReadSeconds and DiskWriteSeconds calculaion in ProcessMetrics. `(PR #7609) <https://github.com/apple/foundationdb/pull/7609>`_
* Fixed DiskReadSeconds and DiskWriteSeconds calculation in ProcessMetrics. `(PR #7609) <https://github.com/apple/foundationdb/pull/7609>`_
* Added Rocksdb compression and data size stats. `(PR #7596) <https://github.com/apple/foundationdb/pull/7596>`_
7.1.15
@ -74,7 +87,7 @@ Release Notes
* Added support of the reboot command in go bindings. `(PR #7270) <https://github.com/apple/foundationdb/pull/7270>`_
* Fixed several issues in profiling special keys using GlobalConfig. `(PR #7120) <https://github.com/apple/foundationdb/pull/7120>`_
* Fixed a stuck transaction system bug due to inconsistent recovery transaction version. `(PR #7261) <https://github.com/apple/foundationdb/pull/7261>`_
* Fixed a unknown_error crash due to not resolving hostnames. `(PR #7254) <https://github.com/apple/foundationdb/pull/7254>`_
* Fixed an unknown_error crash due to not resolving hostnames. `(PR #7254) <https://github.com/apple/foundationdb/pull/7254>`_
* Fixed a heap-use-after-free bug. `(PR #7250) <https://github.com/apple/foundationdb/pull/7250>`_
* Fixed a performance issue that remote TLogs are sending too many pops to log routers. `(PR #7235) <https://github.com/apple/foundationdb/pull/7235>`_
* Fixed an issue that SharedTLogs are not displaced and leaking disk space. `(PR #7246) <https://github.com/apple/foundationdb/pull/7246>`_

View File

@ -257,11 +257,9 @@ ACTOR Future<bool> metaclusterGetCommand(Reference<IDatabase> db, std::vector<St
fmt::print("{}\n", json_spirit::write_string(json_spirit::mValue(obj), json_spirit::pretty_print).c_str());
} else {
fmt::print(" connection string: {}\n", metadata.connectionString.toString().c_str());
fmt::print(" cluster state: {}\n", DataClusterEntry::clusterStateToString(metadata.entry.clusterState));
fmt::print(" tenant group capacity: {}\n", metadata.entry.capacity.numTenantGroups);
fmt::print(" allocated tenant groups: {}\n", metadata.entry.allocated.numTenantGroups);
if (metadata.entry.locked) {
fmt::print(" locked: true\n");
}
}
} catch (Error& e) {
if (useJson) {

View File

@ -346,7 +346,9 @@ struct IndexBlockRef {
decrypt(cipherKeysCtx.get(), *this, arena);
} else {
TraceEvent("IndexBlockSize").detail("Sz", buffer.size());
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent("IndexBlockSize").detail("Sz", buffer.size());
}
ObjectReader dataReader(buffer.begin(), IncludeVersion());
dataReader.deserialize(FileIdentifierFor<IndexBlock>::value, block, arena);
@ -368,7 +370,11 @@ struct IndexBlockRef {
arena, ObjectWriter::toValue(block, IncludeVersion(ProtocolVersion::withBlobGranuleFile())).contents());
}
TraceEvent(SevDebug, "IndexBlockSize").detail("Sz", buffer.size()).detail("Encrypted", cipherKeysCtx.present());
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "IndexBlockSize")
.detail("Sz", buffer.size())
.detail("Encrypted", cipherKeysCtx.present());
}
}
template <class Ar>

View File

@ -24,6 +24,35 @@
FDB_DEFINE_BOOLEAN_PARAM(AddNewTenants);
FDB_DEFINE_BOOLEAN_PARAM(RemoveMissingTenants);
std::string DataClusterEntry::clusterStateToString(DataClusterState clusterState) {
switch (clusterState) {
case DataClusterState::READY:
return "ready";
case DataClusterState::REMOVING:
return "removing";
default:
UNREACHABLE();
}
}
DataClusterState DataClusterEntry::stringToClusterState(std::string stateStr) {
if (stateStr == "ready") {
return DataClusterState::READY;
} else if (stateStr == "removing") {
return DataClusterState::REMOVING;
}
UNREACHABLE();
}
json_spirit::mObject DataClusterEntry::toJson() const {
json_spirit::mObject obj;
obj["capacity"] = capacity.toJson();
obj["allocated"] = allocated.toJson();
obj["cluster_state"] = DataClusterEntry::clusterStateToString(clusterState);
return obj;
}
json_spirit::mObject ClusterUsage::toJson() const {
json_spirit::mObject obj;
obj["num_tenant_groups"] = numTenantGroups;

View File

@ -9180,10 +9180,21 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
// update lastReturned once the previous mutation has been consumed
if (*begin - 1 > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(*begin - 1);
if (!refresh.canBeSet()) {
try {
// refresh is set if and only if this actor is cancelled
wait(Future<Void>(Void()));
// Catch any unexpected behavior if the above contract is broken
ASSERT(false);
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);
throw;
}
}
}
loop {
ASSERT(refresh.canBeSet());
state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture());
*begin = feedReply.mutations.back().version + 1;

View File

@ -929,6 +929,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BG_MERGE_CANDIDATE_DELAY_SECONDS, BG_MERGE_CANDIDATE_THRESHOLD_SECONDS / 10.0 );
init( BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM, 8 ); if( randomize && BUGGIFY ) BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM = 1;
init( BLOB_WORKER_RESNAPSHOT_PARALLELISM, 100 ); if( randomize && BUGGIFY ) BLOB_WORKER_RESNAPSHOT_PARALLELISM = deterministicRandom()->randomInt(1, 10);
init( BLOB_WORKER_DELTA_FILE_WRITE_PARALLELISM, 2000 ); if( randomize && BUGGIFY ) BLOB_WORKER_DELTA_FILE_WRITE_PARALLELISM = deterministicRandom()->randomInt(10, 100);
init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0;
init( BLOB_WORKER_REQUEST_TIMEOUT, 5.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_REQUEST_TIMEOUT = 1.0;
init( BLOB_WORKERLIST_FETCH_INTERVAL, 1.0 );

View File

@ -49,10 +49,18 @@ struct BlobWorkerStats {
int activeReadRequests;
int granulesPendingSplitCheck;
Reference<FlowLock> initialSnapshotLock;
Reference<FlowLock> resnapshotLock;
Reference<FlowLock> deltaWritesLock;
Future<Void> logger;
// Current stats maintained for a given blob worker process
explicit BlobWorkerStats(UID id, double interval)
explicit BlobWorkerStats(UID id,
double interval,
Reference<FlowLock> initialSnapshotLock,
Reference<FlowLock> resnapshotLock,
Reference<FlowLock> deltaWritesLock)
: cc("BlobWorkerStats", id.toString()),
s3PutReqs("S3PutReqs", cc), s3GetReqs("S3GetReqs", cc), s3DeleteReqs("S3DeleteReqs", cc),
@ -68,11 +76,18 @@ struct BlobWorkerStats {
readRequestsWithBegin("ReadRequestsWithBegin", cc), readRequestsCollapsed("ReadRequestsCollapsed", cc),
flushGranuleReqs("FlushGranuleReqs", cc), compressionBytesRaw("CompressionBytesRaw", cc),
compressionBytesFinal("CompressionBytesFinal", cc), numRangesAssigned(0), mutationBytesBuffered(0),
activeReadRequests(0), granulesPendingSplitCheck(0) {
activeReadRequests(0), granulesPendingSplitCheck(0), initialSnapshotLock(initialSnapshotLock),
resnapshotLock(resnapshotLock), deltaWritesLock(deltaWritesLock) {
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; });
specialCounter(cc, "ActiveReadRequests", [this]() { return this->activeReadRequests; });
specialCounter(cc, "GranulesPendingSplitCheck", [this]() { return this->granulesPendingSplitCheck; });
specialCounter(cc, "InitialSnapshotsActive", [this]() { return this->initialSnapshotLock->activePermits(); });
specialCounter(cc, "InitialSnapshotsWaiting", [this]() { return this->initialSnapshotLock->waiters(); });
specialCounter(cc, "ReSnapshotsActive", [this]() { return this->resnapshotLock->activePermits(); });
specialCounter(cc, "ReSnapshotsWaiting", [this]() { return this->resnapshotLock->waiters(); });
specialCounter(cc, "DeltaFileWritesActive", [this]() { return this->deltaWritesLock->activePermits(); });
specialCounter(cc, "DeltaFileWritesWaiting", [this]() { return this->deltaWritesLock->waiters(); });
logger = traceCounters("BlobWorkerMetrics", id, interval, &cc, "BlobWorkerMetrics");
}

View File

@ -25,6 +25,7 @@
#include "fdbclient/ClientBooleanParams.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/Subspace.h"
#include "flow/ObjectSerializer.h"

View File

@ -53,16 +53,23 @@ struct Traceable<ClusterUsage> : std::true_type {
}
};
// Represents the various states that a data cluster could be in.
//
// READY - the data cluster is active
// REMOVING - the data cluster is being removed and cannot have its configuration changed or any tenants created
enum class DataClusterState { READY, REMOVING };
struct DataClusterEntry {
constexpr static FileIdentifier file_identifier = 929511;
static std::string clusterStateToString(DataClusterState clusterState);
static DataClusterState stringToClusterState(std::string stateStr);
UID id;
ClusterUsage capacity;
ClusterUsage allocated;
// If true, then tenant groups cannot be assigned to this cluster. This is used when a cluster is being forcefully
// removed.
bool locked = false;
DataClusterState clusterState = DataClusterState::READY;
DataClusterEntry() = default;
DataClusterEntry(ClusterUsage capacity) : capacity(capacity) {}
@ -81,19 +88,11 @@ struct DataClusterEntry {
return ObjectReader::fromStringRef<DataClusterEntry>(value, IncludeVersion());
}
json_spirit::mObject toJson() const {
json_spirit::mObject obj;
obj["capacity"] = capacity.toJson();
obj["allocated"] = allocated.toJson();
if (locked) {
obj["locked"] = locked;
}
return obj;
}
json_spirit::mObject toJson() const;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, capacity, allocated, locked);
serializer(ar, id, capacity, allocated, clusterState);
}
};

View File

@ -541,8 +541,8 @@ void updateClusterMetadata(Transaction tr,
Optional<DataClusterEntry> const& updatedEntry) {
if (updatedEntry.present()) {
if (previousMetadata.entry.locked) {
throw cluster_locked();
if (previousMetadata.entry.clusterState == DataClusterState::REMOVING) {
throw cluster_removed();
}
ManagementClusterMetadata::dataClusters().set(tr, name, updatedEntry.get());
updateClusterCapacityIndex(tr, name, previousMetadata.entry, updatedEntry.get());
@ -709,21 +709,21 @@ struct RemoveClusterImpl {
// Initialization parameters
bool forceRemove;
// Parameters set in lockDataCluster
// Parameters set in markClusterRemoving
Optional<int64_t> lastTenantId;
RemoveClusterImpl(Reference<DB> managementDb, ClusterName clusterName, bool forceRemove)
: ctx(managementDb, clusterName), forceRemove(forceRemove) {}
// Returns false if the cluster is no longer present, or true if it is present and the removal should proceed.
ACTOR static Future<bool> lockDataCluster(RemoveClusterImpl* self, Reference<typename DB::TransactionT> tr) {
ACTOR static Future<bool> markClusterRemoving(RemoveClusterImpl* self, Reference<typename DB::TransactionT> tr) {
if (!self->forceRemove && self->ctx.dataClusterMetadata.get().entry.allocated.numTenantGroups > 0) {
throw cluster_not_empty();
} else if (!self->ctx.dataClusterMetadata.get().entry.locked) {
// Lock the cluster while we finish the remaining removal steps to prevent new tenants from being
// assigned to it.
} else if (self->ctx.dataClusterMetadata.get().entry.clusterState != DataClusterState::REMOVING) {
// Mark the cluster in a removing state while we finish the remaining removal steps. This prevents new
// tenants from being assigned to it.
DataClusterEntry updatedEntry = self->ctx.dataClusterMetadata.get().entry;
updatedEntry.locked = true;
updatedEntry.clusterState = DataClusterState::REMOVING;
updatedEntry.capacity.numTenantGroups = 0;
updateClusterMetadata(tr,
@ -744,7 +744,7 @@ struct RemoveClusterImpl {
self->lastTenantId = lastId;
}
TraceEvent("LockedDataCluster")
TraceEvent("MarkedDataClusterRemoving")
.detail("Name", self->ctx.clusterName.get())
.detail("Version", tr->getCommittedVersion());
@ -780,6 +780,8 @@ struct RemoveClusterImpl {
ACTOR static Future<bool> purgeTenants(RemoveClusterImpl* self,
Reference<typename DB::TransactionT> tr,
std::pair<Tuple, Tuple> clusterTupleRange) {
ASSERT(self->ctx.dataClusterMetadata.get().entry.clusterState == DataClusterState::REMOVING);
// Get the list of tenants
state Future<KeyBackedRangeResult<Tuple>> tenantEntriesFuture =
ManagementClusterMetadata::clusterTenantIndex.getRange(
@ -813,6 +815,8 @@ struct RemoveClusterImpl {
ACTOR static Future<bool> purgeTenantGroupsAndDataCluster(RemoveClusterImpl* self,
Reference<typename DB::TransactionT> tr,
std::pair<Tuple, Tuple> clusterTupleRange) {
ASSERT(self->ctx.dataClusterMetadata.get().entry.clusterState == DataClusterState::REMOVING);
// Get the list of tenant groups
state Future<KeyBackedRangeResult<Tuple>> tenantGroupEntriesFuture =
ManagementClusterMetadata::clusterTenantGroupIndex.getRange(
@ -880,8 +884,21 @@ struct RemoveClusterImpl {
}
ACTOR static Future<Void> run(RemoveClusterImpl* self) {
bool clusterIsPresent = wait(self->ctx.runManagementTransaction(
[self = self](Reference<typename DB::TransactionT> tr) { return lockDataCluster(self, tr); }));
state bool clusterIsPresent;
try {
wait(store(clusterIsPresent,
self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
return markClusterRemoving(self, tr);
})));
} catch (Error& e) {
// If the transaction retries after success or if we are trying a second time to remove the cluster, it will
// throw an error indicating that the removal has already started
if (e.code() == error_code_cluster_removed) {
clusterIsPresent = true;
} else {
throw;
}
}
if (clusterIsPresent) {
try {
@ -1224,8 +1241,8 @@ struct CreateTenantImpl {
// If we are part of a tenant group that is assigned to a cluster being removed from the metacluster,
// then we fail with an error.
if (self->ctx.dataClusterMetadata.get().entry.locked) {
throw cluster_locked();
if (self->ctx.dataClusterMetadata.get().entry.clusterState == DataClusterState::REMOVING) {
throw cluster_removed();
}
managementClusterAddTenantToGroup(

View File

@ -907,6 +907,9 @@ public:
int BG_KEY_TUPLE_TRUNCATE_OFFSET;
int BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM;
int BLOB_WORKER_RESNAPSHOT_PARALLELISM;
int BLOB_WORKER_DELTA_FILE_WRITE_PARALLELISM;
double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure
double BLOB_WORKER_REQUEST_TIMEOUT; // Blob Worker's server-side request timeout
double BLOB_WORKERLIST_FETCH_INTERVAL;

View File

@ -44,14 +44,23 @@ typedef Standalone<TenantGroupNameRef> TenantGroupName;
// REMOVING - the tenant has been marked for removal and is being removed on the data cluster
// UPDATING_CONFIGURATION - the tenant configuration has changed on the management cluster and is being applied to the
// data cluster
// ERROR - currently unused
// RENAMING_FROM - the tenant is being renamed to a new name and is awaiting the rename to complete on the data cluster
// RENAMING_TO - the tenant is being created as a rename from an existing tenant and is awaiting the rename to complete
// on the data cluster
// ERROR - the tenant is in an error state
//
// A tenant in any configuration is allowed to be removed. Only tenants in the READY or UPDATING_CONFIGURATION phases
// can have their configuration updated. A tenant must not exist or be in the REGISTERING phase to be created.
// can have their configuration updated. A tenant must not exist or be in the REGISTERING phase to be created. To be
// renamed, a tenant must be in the READY or RENAMING_FROM state. In the latter case, the rename destination must match
// the original rename attempt.
//
// If an operation fails and the tenant is left in a non-ready state, re-running the same operation is legal. If
// successful, the tenant will return to the READY state.
enum class TenantState { REGISTERING, READY, REMOVING, UPDATING_CONFIGURATION, ERROR };
enum class TenantState { REGISTERING, READY, REMOVING, UPDATING_CONFIGURATION, RENAMING_FROM, RENAMING_TO, ERROR };
// Represents the lock state the tenant could be in.
// Can be used in conjunction with the other tenant states above.
enum class TenantLockState { UNLOCKED, READ_ONLY, LOCKED };
struct TenantMapEntry {
constexpr static FileIdentifier file_identifier = 12247338;
@ -65,10 +74,15 @@ struct TenantMapEntry {
int64_t id = -1;
Key prefix;
TenantState tenantState = TenantState::READY;
TenantLockState tenantLockState = TenantLockState::UNLOCKED;
Optional<TenantGroupName> tenantGroup;
bool encrypted = false;
Optional<ClusterName> assignedCluster;
int64_t configurationSequenceNum = 0;
Optional<TenantName> renamePair;
// Can be set to an error string if the tenant is in the ERROR state
std::string error;
constexpr static int PREFIX_SIZE = sizeof(id);
@ -89,7 +103,16 @@ struct TenantMapEntry {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, tenantState, tenantGroup, encrypted, assignedCluster, configurationSequenceNum);
serializer(ar,
id,
tenantState,
tenantLockState,
tenantGroup,
encrypted,
assignedCluster,
configurationSequenceNum,
renamePair,
error);
if constexpr (Ar::isDeserializing) {
if (id >= 0) {
prefix = idToPrefix(id);

View File

@ -426,7 +426,24 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
return v;
}
// FIXME: is it possible for merge/split/re-merge to call this with same range but a different granule id or
// startVersion? Unlikely but could cause weird history problems
void setMergeCandidate(const KeyRangeRef& range, UID granuleID, Version startVersion) {
// if this granule is not an active granule, it can't be merged
auto gIt = workerAssignments.rangeContaining(range.begin);
if (gIt->begin() != range.begin || gIt->end() != range.end) {
CODE_PROBE(true, "non-active granule reported merge eligible, ignoring");
if (BM_DEBUG) {
fmt::print(
"BM {0} Ignoring Merge Candidate [{1} - {2}): range mismatch with active granule [{3} - {4})\n",
epoch,
range.begin.printable(),
range.end.printable(),
gIt->begin().printable(),
gIt->end().printable());
}
return;
}
// Want this to be idempotent. If a granule was already reported as merge-eligible, we want to use the existing
// merge and mergeNow state.
auto it = mergeCandidates.rangeContaining(range.begin);
@ -649,17 +666,29 @@ ACTOR Future<BlobGranuleSplitPoints> splitRange(Reference<BlobManagerData> bmDat
// Picks a worker with the fewest number of already assigned ranges.
// If there is a tie, picks one such worker at random.
// TODO: add desired per-blob-worker limit? don't assign ranges to each worker past that limit?
ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData) {
// wait until there are BWs to pick from
while (bmData->workerStats.size() == 0) {
CODE_PROBE(true, "BM wants to assign range, but no workers available");
if (BM_DEBUG) {
fmt::print("BM {0} waiting for blob workers before assigning granules\n", bmData->epoch);
loop {
state bool wasZeroWorkers = false;
while (bmData->workerStats.size() == 0) {
wasZeroWorkers = true;
CODE_PROBE(true, "BM wants to assign range, but no workers available");
if (BM_DEBUG) {
fmt::print("BM {0} waiting for blob workers before assigning granules\n", bmData->epoch);
}
bmData->restartRecruiting.trigger();
wait(bmData->recruitingStream.onChange() || bmData->foundBlobWorkers.getFuture());
}
bmData->restartRecruiting.trigger();
wait(bmData->recruitingStream.onChange() || bmData->foundBlobWorkers.getFuture());
// FIXME: may want to have some buffer here so zero-worker recruiting case doesn't assign every single pending
// range to the first worker recruited
if (wasZeroWorkers) {
// Add a bit of delay. If we were at zero workers, don't immediately assign all granules to the first worker
// we recruit
wait(delay(0.1));
}
if (bmData->workerStats.size() != 0) {
break;
}
// if in the post-zero workers delay, we went back down to zero workers, re-loop
}
int minGranulesAssigned = INT_MAX;
@ -1536,12 +1565,15 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
coalescedRanges.push_back(coalescedRanges.arena(), splitPoints.keys.back());
ASSERT(coalescedRanges.size() == SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1);
if (BM_DEBUG) {
fmt::print("Downsampled split from {0} -> {1} granules\n",
fmt::print("Downsampled split [{0} - {1}) from {2} -> {3} granules\n",
granuleRange.begin.printable(),
granuleRange.end.printable(),
splitPoints.keys.size() - 1,
SERVER_KNOBS->BG_MAX_SPLIT_FANOUT);
}
splitPoints.keys = coalescedRanges;
// TODO probably do something better here?
wait(store(splitPoints, alignKeys(bmData, granuleRange, coalescedRanges)));
ASSERT(splitPoints.keys.size() <= SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1);
}
@ -1935,6 +1967,9 @@ ACTOR Future<std::pair<UID, Version>> persistMergeGranulesStart(Reference<BlobMa
wait(checkManagerLock(tr, bmData));
// FIXME: extra safeguard: check that granuleID of active lock == parentGranuleID for each parent, abort
// merge if so
tr->atomicOp(
blobGranuleMergeKeyFor(mergeGranuleID),
blobGranuleMergeValueFor(mergeRange, parentGranuleIDs, parentGranuleRanges, parentGranuleStartVersions),
@ -2231,7 +2266,7 @@ ACTOR Future<Void> attemptMerges(Reference<BlobManagerData> bmData,
}
CODE_PROBE(true, "Candidate ranges to merge");
wait(bmData->concurrentMergeChecks.take());
state FlowLock::Releaser holdingDVL(bmData->concurrentMergeChecks);
state FlowLock::Releaser holdingLock(bmData->concurrentMergeChecks);
// start merging any set of 2+ consecutive granules that can be merged
state int64_t currentBytes = 0;
@ -3756,7 +3791,7 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
canDeleteHistoryKey ? "" : " ignoring history key!");
}
TraceEvent("GranuleFullPurge", self->id)
TraceEvent(SevDebug, "GranuleFullPurge", self->id)
.detail("Epoch", self->epoch)
.detail("GranuleID", granuleId)
.detail("PurgeVersion", purgeVersion)
@ -3876,7 +3911,7 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Partially deleting granule {1}: success\n", self->epoch, granuleId.toString());
}
TraceEvent("GranulePartialPurge", self->id)
TraceEvent(SevDebug, "GranulePartialPurge", self->id)
.detail("Epoch", self->epoch)
.detail("GranuleID", granuleId)
.detail("PurgeVersion", purgeVersion)

View File

@ -175,8 +175,6 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
UID id;
Database db;
BlobWorkerStats stats;
PromiseStream<Future<Void>> addActor;
LocalityData locality;
@ -203,16 +201,24 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
NotifiedVersion grvVersion;
Promise<Void> fatalError;
FlowLock initialSnapshotLock;
Reference<FlowLock> initialSnapshotLock;
Reference<FlowLock> resnapshotLock;
Reference<FlowLock> deltaWritesLock;
BlobWorkerStats stats;
bool shuttingDown = false;
int changeFeedStreamReplyBufferSize = SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 2;
int changeFeedStreamReplyBufferSize = SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 4;
bool isEncryptionEnabled = false;
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInf, Database db)
: id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL), tenantData(BGTenantMap(dbInf)), dbInfo(dbInf),
initialSnapshotLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM),
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db)
: id(id), db(db), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo),
initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)),
resnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_PARALLELISM)),
deltaWritesLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_FILE_WRITE_PARALLELISM)),
stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, initialSnapshotLock, resnapshotLock, deltaWritesLock),
isEncryptionEnabled(
isEncryptionOpSupported(EncryptOperationType::BLOB_GRANULE_ENCRYPTION, db->clientInfo->get())) {}
@ -621,7 +627,11 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
Version currentDeltaVersion,
Future<BlobFileIndex> previousDeltaFileFuture,
Future<Void> waitCommitted,
Optional<std::pair<KeyRange, UID>> oldGranuleComplete) {
Optional<std::pair<KeyRange, UID>> oldGranuleComplete,
Future<Void> startDeltaFileWrite) {
wait(startDeltaFileWrite);
state FlowLock::Releaser holdingLock(*bwData->deltaWritesLock);
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
state std::string fileName = randomBGFilename(bwData->id, granuleID, currentDeltaVersion, ".delta");
@ -665,6 +675,10 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
// free serialized since it is persisted in blob
serialized = Value();
// now that all buffered memory from file is gone, we can release memory flow lock
// we must unblock here to allow feed to continue to consume, so that waitCommitted returns
holdingLock.release();
state int numIterations = 0;
try {
// before updating FDB, wait for the delta file version to be committed and previous delta files to finish
@ -946,8 +960,8 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable());
}
wait(bwData->initialSnapshotLock.take());
state FlowLock::Releaser holdingDVL(bwData->initialSnapshotLock);
wait(bwData->initialSnapshotLock->take());
state FlowLock::Releaser holdingLock(*bwData->initialSnapshotLock);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
state int retries = 0;
@ -1026,6 +1040,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
UID granuleID,
std::vector<GranuleFiles> fileSet,
Version version) {
wait(bwData->resnapshotLock->take());
state FlowLock::Releaser holdingLock(*bwData->resnapshotLock);
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
if (BW_DEBUG) {
fmt::print("Compacting snapshot from blob for [{0} - {1}) @ {2}\n",
@ -1747,6 +1763,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
state std::deque<std::pair<Version, Version>> rollbacksInProgress;
state std::deque<std::pair<Version, Version>> rollbacksCompleted;
state Future<Void> startDeltaFileWrite = Future<Void>(Void());
state bool snapshotEligible; // just wrote a delta file or just took granule over from another worker
state bool justDidRollback = false;
@ -2276,6 +2293,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
} else {
previousFuture = Future<BlobFileIndex>(BlobFileIndex());
}
startDeltaFileWrite = bwData->deltaWritesLock->take();
Future<BlobFileIndex> dfFuture =
writeDeltaFile(bwData,
bstore,
@ -2287,7 +2305,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
lastDeltaVersion,
previousFuture,
waitVersionCommitted(bwData, metadata, lastDeltaVersion),
oldChangeFeedDataComplete);
oldChangeFeedDataComplete,
startDeltaFileWrite);
inFlightFiles.push_back(InFlightFile(dfFuture, lastDeltaVersion, metadata->bufferedDeltaBytes, false));
oldChangeFeedDataComplete.reset();
@ -2311,6 +2330,10 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// exhaust old change feed before compacting - otherwise we could end up with an endlessly
// growing list of previous change feeds in the worst case.
snapshotEligible = true;
// Wait on delta file starting here. If we have too many pending delta file writes, we need to not
// continue to consume from the change feed, as that will pile on even more delta files to write
wait(startDeltaFileWrite);
}
// FIXME: if we're still reading from old change feed, we should probably compact if we're
@ -4441,6 +4464,48 @@ ACTOR Future<Void> handleFlushGranuleReq(Reference<BlobWorkerData> self, FlushGr
}
}
ACTOR Future<Void> simForceFileWriteContention(Reference<BlobWorkerData> bwData) {
// take the file write contention lock down to just 1 or 2 open writes
int numToLeave = deterministicRandom()->randomInt(1, 3);
state int numToTake = SERVER_KNOBS->BLOB_WORKER_DELTA_FILE_WRITE_PARALLELISM - numToLeave;
ASSERT(bwData->deltaWritesLock->available() >= numToTake);
if (numToTake <= 0) {
return Void();
}
if (BW_DEBUG) {
fmt::print("BW {0} forcing file contention down to {1}\n", bwData->id.toString().substr(0, 5), numToTake);
}
wait(bwData->deltaWritesLock->take(TaskPriority::DefaultYield, numToTake));
if (BW_DEBUG) {
fmt::print("BW {0} force acquired {1} file writes\n", bwData->id.toString().substr(0, 5), numToTake);
}
state FlowLock::Releaser holdingLock(*bwData->deltaWritesLock, numToTake);
state Future<Void> delayFor = delay(deterministicRandom()->randomInt(10, 60));
loop {
choose {
when(wait(delayFor)) {
if (BW_DEBUG) {
fmt::print("BW {0} releasing {1} file writes\n", bwData->id.toString().substr(0, 5), numToTake);
}
return Void();
}
// check for speed up sim
when(wait(delay(5.0))) {
if (g_simulator.speedUpSimulation) {
if (BW_DEBUG) {
fmt::print("BW {0} releasing {1} file writes b/c speed up simulation\n",
bwData->id.toString().substr(0, 5),
numToTake);
}
return Void();
}
}
}
}
}
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
ReplyPromise<InitializeBlobWorkerReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
@ -4491,6 +4556,9 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
self->addActor.send(runGRVChecks(self));
self->addActor.send(monitorTenants(self));
state Future<Void> selfRemoved = monitorRemoval(self);
if (g_network->isSimulated() && BUGGIFY_WITH_PROB(0.25)) {
self->addActor.send(simForceFileWriteContention(self));
}
TraceEvent("BlobWorkerInit", self->id).log();

View File

@ -69,7 +69,7 @@ inline bool isDataMovementForValleyFiller(DataMovementReason reason) {
typedef std::map<DataMovementReason, int> DmReasonPriorityMapping;
typedef std::map<int, DataMovementReason> PriorityDmReasonMapping;
std::pair<const DmReasonPriorityMapping&, const PriorityDmReasonMapping&> buildPriorityMappings() {
std::pair<const DmReasonPriorityMapping*, const PriorityDmReasonMapping*> buildPriorityMappings() {
static DmReasonPriorityMapping reasonPriority{
{ DataMovementReason::INVALID, -1 },
{ DataMovementReason::RECOVER_MOVE, SERVER_KNOBS->PRIORITY_RECOVER_MOVE },
@ -103,17 +103,17 @@ std::pair<const DmReasonPriorityMapping&, const PriorityDmReasonMapping&> buildP
}
}
return std::make_pair(reasonPriority, priorityReason);
return std::make_pair(&reasonPriority, &priorityReason);
}
int dataMovementPriority(DataMovementReason reason) {
const auto& [reasonPriority, _] = buildPriorityMappings();
return reasonPriority.at(reason);
auto [reasonPriority, _] = buildPriorityMappings();
return reasonPriority->at(reason);
}
DataMovementReason priorityToDataMovementReason(int priority) {
const auto& [_, priorityReason] = buildPriorityMappings();
return priorityReason.at(priority);
auto [_, priorityReason] = buildPriorityMappings();
return priorityReason->at(priority);
}
struct RelocateData {

View File

@ -2889,8 +2889,6 @@ ACTOR Future<Void> localChangeFeedStream(StorageServer* data,
// Change feed stream must be sent an error as soon as it is moved away, or change feed can get incorrect results
ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamRequest req, UID streamUID) {
wait(delay(0, TaskPriority::DefaultEndpoint));
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end() || feed->second->removing) {
req.reply.sendError(unknown_change_feed());

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -248,7 +248,7 @@ ERROR( cluster_already_registered, 2165, "Data cluster is already registered wit
ERROR( metacluster_no_capacity, 2166, "Metacluster does not have capacity to create new tenants" )
ERROR( management_cluster_invalid_access, 2167, "Standard transactions cannot be run against the management cluster" )
ERROR( tenant_creation_permanently_failed, 2168, "The tenant creation did not complete in a timely manner and has permanently failed" )
ERROR( cluster_locked, 2169, "The cluster has been locked" )
ERROR( cluster_removed, 2169, "The cluster is being removed from the metacluster" )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )

View File

@ -331,7 +331,7 @@ class UpgradeTest:
# correct asan annotations so that it shouldn't produce any false positives.
if line.endswith(
"WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false "
"positives in some cases! "
"positives in some cases!"
):
continue
if err_cnt < error_limit: