Merge branch 'main' into feature-metacluster
This commit is contained in:
commit
3ceb2a0639
|
@ -96,7 +96,7 @@ if(WIN32)
|
|||
add_dependencies(fdbclient_sampling_actors fdbclient_actors)
|
||||
endif()
|
||||
|
||||
add_flow_target(LINK_TEST NAME fdbclientlinktest SRCS ${FDBCLIENT_SRCS} LinkTest.cpp ADDL_SRCS ${options_srcs})
|
||||
add_flow_target(LINK_TEST NAME fdbclientlinktest SRCS LinkTest.cpp)
|
||||
target_link_libraries(fdbclientlinktest PRIVATE fdbclient rapidxml) # re-link rapidxml due to private link interface
|
||||
|
||||
if(BUILD_AZURE_BACKUP)
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "fdbclient/GenericManagementAPI.actor.h"
|
||||
#include "fmt/format.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "flow/Arena.h"
|
||||
|
@ -2461,6 +2462,21 @@ bool schemaMatch(json_spirit::mValue const& schemaValue,
|
|||
}
|
||||
}
|
||||
|
||||
void setStorageQuota(Transaction& tr, StringRef tenantName, uint64_t quota) {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
auto key = storageQuotaKey(tenantName);
|
||||
tr.set(key, BinaryWriter::toValue<uint64_t>(quota, Unversioned()));
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<uint64_t>> getStorageQuota(Transaction* tr, StringRef tenantName) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state Optional<Value> v = wait(tr->get(storageQuotaKey(tenantName)));
|
||||
if (!v.present()) {
|
||||
return Optional<uint64_t>();
|
||||
}
|
||||
return BinaryReader::fromStringRef<uint64_t>(v.get(), Unversioned());
|
||||
}
|
||||
|
||||
std::string ManagementAPI::generateErrorMessage(const CoordinatorsResult& res) {
|
||||
// Note: the error message here should not be changed if possible
|
||||
// If you do change the message here,
|
||||
|
|
|
@ -2875,6 +2875,7 @@ ACTOR Future<KeyRangeLocationInfo> getKeyLocation_internal(Database cx,
|
|||
auto locationInfo =
|
||||
cx->setCachedLocation(tenant, rep.tenantEntry, rep.results[0].first, rep.results[0].second);
|
||||
updateTssMappings(cx, rep);
|
||||
updateTagMappings(cx, rep);
|
||||
|
||||
return KeyRangeLocationInfo(
|
||||
rep.tenantEntry,
|
||||
|
|
|
@ -1622,6 +1622,13 @@ BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) {
|
|||
|
||||
const KeyRef tenantDataPrefixKey = "\xff/tenantDataPrefix"_sr;
|
||||
|
||||
const KeyRangeRef storageQuotaKeys(LiteralStringRef("\xff/storageQuota/"), LiteralStringRef("\xff/storageQuota0"));
|
||||
const KeyRef storageQuotaPrefix = storageQuotaKeys.begin;
|
||||
|
||||
Key storageQuotaKey(StringRef tenantName) {
|
||||
return tenantName.withPrefix(storageQuotaPrefix);
|
||||
}
|
||||
|
||||
// for tests
|
||||
void testSSISerdes(StorageServerInterface const& ssi) {
|
||||
printf("ssi=\nid=%s\nlocality=%s\nisTss=%s\ntssId=%s\nacceptingRequests=%s\naddress=%s\ngetValue=%s\n\n\n",
|
||||
|
|
|
@ -159,5 +159,9 @@ bool schemaMatch(json_spirit::mValue const& schema,
|
|||
// storage nodes
|
||||
ACTOR Future<Void> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);
|
||||
|
||||
// Set and get the storage quota per tenant
|
||||
void setStorageQuota(Transaction& tr, StringRef tenantName, uint64_t quota);
|
||||
ACTOR Future<Optional<uint64_t>> getStorageQuota(Transaction* tr, StringRef tenantName);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -39,4 +39,6 @@ public:
|
|||
T const& operator*() const { return *impl; }
|
||||
T* operator->() { return impl.get(); }
|
||||
T const* operator->() const { return impl.get(); }
|
||||
T* get() { return impl.get(); }
|
||||
T const* get() const { return impl.get(); }
|
||||
};
|
||||
|
|
|
@ -681,6 +681,12 @@ BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value);
|
|||
|
||||
extern const KeyRef tenantDataPrefixKey; // TODO: remove?
|
||||
|
||||
// Storage quota per tenant
|
||||
// "\xff/storageQuota/[[tenantName]]" := "[[quota]]"
|
||||
extern const KeyRangeRef storageQuotaKeys;
|
||||
extern const KeyRef storageQuotaPrefix;
|
||||
Key storageQuotaKey(StringRef tenantName);
|
||||
|
||||
#pragma clang diagnostic pop
|
||||
|
||||
#endif
|
||||
|
|
|
@ -25,7 +25,7 @@ add_flow_target(STATIC_LIBRARY NAME fdbrpc_sampling
|
|||
SRCS ${FDBRPC_SRCS}
|
||||
DISABLE_ACTOR_DIAGNOSTICS ${FDBRPC_SRCS_DISABLE_ACTOR_DIAGNOSTICS})
|
||||
|
||||
add_flow_target(LINK_TEST NAME fdbrpclinktest SRCS ${FDBRPC_SRCS} LinkTest.cpp DISABLE_ACTOR_DIAGNOSTICS ${FDBRPC_SRCS_DISABLE_ACTOR_DIAGNOSTICS})
|
||||
add_flow_target(LINK_TEST NAME fdbrpclinktest SRCS LinkTest.cpp)
|
||||
target_link_libraries(fdbrpclinktest PRIVATE fdbrpc rapidjson)
|
||||
target_include_directories(fdbrpclinktest PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)
|
||||
|
||||
|
|
|
@ -30,6 +30,18 @@
|
|||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
// serialize change feed key as UID bytes, to use 16 bytes on disk
|
||||
Key granuleIDToCFKey(UID granuleID) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
wr << granuleID;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
// parse change feed key back to UID, to be human-readable
|
||||
UID cfKeyToGranuleID(Key cfKey) {
|
||||
return BinaryReader::fromStringRef<UID>(cfKey, Unversioned());
|
||||
}
|
||||
|
||||
// Gets the latest granule history node for range that was persisted
|
||||
ACTOR Future<Optional<GranuleHistory>> getLatestGranuleHistory(Transaction* tr, KeyRange range) {
|
||||
state KeyRange historyRange = blobGranuleHistoryKeyRangeFor(range);
|
||||
|
|
|
@ -79,13 +79,13 @@ struct CoordinatedStateImpl {
|
|||
|
||||
CoordinatedStateImpl(ServerCoordinators const& c)
|
||||
: coordinators(c), stage(0), conflictGen(0), doomed(false), ac(false), initial(false) {}
|
||||
uint64_t getConflict() { return conflictGen; }
|
||||
uint64_t getConflict() const { return conflictGen; }
|
||||
|
||||
bool isDoomed(GenerationRegReadReply const& rep) {
|
||||
return rep.gen > gen // setExclusive is doomed, because there was a write at least started at a higher
|
||||
// generation, which means a read completed at that higher generation
|
||||
// || rep.rgen > gen // setExclusive isn't absolutely doomed, but it may/probably will fail
|
||||
;
|
||||
bool isDoomed(GenerationRegReadReply const& rep) const {
|
||||
return rep.gen > gen;
|
||||
// setExclusive is doomed, because there was a write at least started at a higher
|
||||
// generation, which means a read completed at that higher generation
|
||||
// || rep.rgen > gen // setExclusive isn't absolutely doomed, but it may/probably will fail
|
||||
}
|
||||
|
||||
ACTOR static Future<Value> read(CoordinatedStateImpl* self) {
|
||||
|
@ -216,7 +216,7 @@ struct CoordinatedStateImpl {
|
|||
};
|
||||
|
||||
CoordinatedState::CoordinatedState(ServerCoordinators const& coord)
|
||||
: impl(std::make_unique<CoordinatedStateImpl>(coord)) {}
|
||||
: impl(PImpl<CoordinatedStateImpl>::create(coord)) {}
|
||||
CoordinatedState::~CoordinatedState() = default;
|
||||
Future<Value> CoordinatedState::read() {
|
||||
return CoordinatedStateImpl::read(impl.get());
|
||||
|
@ -227,7 +227,7 @@ Future<Void> CoordinatedState::onConflict() {
|
|||
Future<Void> CoordinatedState::setExclusive(Value v) {
|
||||
return CoordinatedStateImpl::setExclusive(impl.get(), v);
|
||||
}
|
||||
uint64_t CoordinatedState::getConflict() {
|
||||
uint64_t CoordinatedState::getConflict() const {
|
||||
return impl->getConflict();
|
||||
}
|
||||
|
||||
|
@ -354,7 +354,7 @@ struct MovableCoordinatedStateImpl {
|
|||
|
||||
MovableCoordinatedState& MovableCoordinatedState::operator=(MovableCoordinatedState&&) = default;
|
||||
MovableCoordinatedState::MovableCoordinatedState(class ServerCoordinators const& coord)
|
||||
: impl(std::make_unique<MovableCoordinatedStateImpl>(coord)) {}
|
||||
: impl(PImpl<MovableCoordinatedStateImpl>::create(coord)) {}
|
||||
MovableCoordinatedState::~MovableCoordinatedState() = default;
|
||||
Future<Value> MovableCoordinatedState::read() {
|
||||
return MovableCoordinatedStateImpl::read(impl.get());
|
||||
|
|
|
@ -1395,7 +1395,6 @@ ACTOR static Future<Void> startMoveShards(Database occ,
|
|||
physicalShardMap[ssId].emplace_back(rangeIntersectKeys, srcId);
|
||||
}
|
||||
|
||||
const UID checkpontId = deterministicRandom()->randomUniqueID();
|
||||
for (const UID& ssId : src) {
|
||||
dataMove.src.insert(ssId);
|
||||
// TODO(psm): Create checkpoint for the range.
|
||||
|
|
|
@ -88,16 +88,10 @@ struct GranuleFiles {
|
|||
};
|
||||
|
||||
// serialize change feed key as UID bytes, to use 16 bytes on disk
|
||||
static Key granuleIDToCFKey(UID granuleID) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
wr << granuleID;
|
||||
return wr.toValue();
|
||||
}
|
||||
Key granuleIDToCFKey(UID granuleID);
|
||||
|
||||
// parse change feed key back to UID, to be human-readable
|
||||
static UID cfKeyToGranuleID(Key cfKey) {
|
||||
return BinaryReader::fromStringRef<UID>(cfKey, Unversioned());
|
||||
}
|
||||
UID cfKeyToGranuleID(Key cfKey);
|
||||
|
||||
class Transaction;
|
||||
ACTOR Future<Optional<GranuleHistory>> getLatestGranuleHistory(Transaction* tr, KeyRange range);
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/PImpl.h"
|
||||
|
||||
class CoordinatedState : NonCopyable {
|
||||
public:
|
||||
|
@ -53,10 +54,10 @@ public:
|
|||
// returned from read may or may not ever have been a valid state. Probably there was a
|
||||
// call to read() or setExclusive() concurrently with this pair.
|
||||
|
||||
uint64_t getConflict();
|
||||
uint64_t getConflict() const;
|
||||
|
||||
private:
|
||||
std::unique_ptr<struct CoordinatedStateImpl> impl;
|
||||
PImpl<struct CoordinatedStateImpl> impl;
|
||||
};
|
||||
|
||||
class MovableCoordinatedState : NonCopyable {
|
||||
|
@ -78,7 +79,7 @@ public:
|
|||
// (and therefore the caller should die).
|
||||
|
||||
private:
|
||||
std::unique_ptr<struct MovableCoordinatedStateImpl> impl;
|
||||
PImpl<struct MovableCoordinatedStateImpl> impl;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -2609,9 +2609,7 @@ ACTOR Future<Void> localChangeFeedStream(StorageServer* data,
|
|||
// would mean the stream would have finished without error
|
||||
results.send(MutationsAndVersionRef(end, invalidVersion));
|
||||
} else {
|
||||
TraceEvent(SevError, "LocalChangeFeedError", data->thisServerID)
|
||||
.error(e)
|
||||
.detail("CFID", rangeID.printable());
|
||||
TraceEvent(SevError, "LocalChangeFeedError", data->thisServerID).error(e).detail("CFID", rangeID);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
@ -3911,7 +3909,7 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
|
|||
try {
|
||||
mappedKeyFormatTuple = Tuple::unpack(mapper);
|
||||
} catch (Error& e) {
|
||||
TraceEvent("MapperNotTuple").error(e).detail("Mapper", mapper.printable());
|
||||
TraceEvent("MapperNotTuple").error(e).detail("Mapper", mapper);
|
||||
throw mapper_not_tuple();
|
||||
}
|
||||
state std::vector<Optional<Tuple>> vt;
|
||||
|
@ -4560,7 +4558,7 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
|
|||
auto cached = data->cachedRangeMap[absoluteKey];
|
||||
// if (cached)
|
||||
// TraceEvent(SevDebug, "SSGetKeyCached").detail("Key", k).detail("Begin",
|
||||
// shard.begin.printable()).detail("End", shard.end.printable());
|
||||
// shard.begin).detail("End", shard.end);
|
||||
|
||||
GetKeyReply reply(updated, cached);
|
||||
reply.penalty = data->getPenalty();
|
||||
|
@ -5059,8 +5057,8 @@ ACTOR Future<Void> logFetchKeysWarning(AddingShard* shard) {
|
|||
TraceEvent(traceEventLevel, "FetchKeysTooLong")
|
||||
.detail("Duration", now() - startTime)
|
||||
.detail("Phase", shard->phase)
|
||||
.detail("Begin", shard->keys.begin.printable())
|
||||
.detail("End", shard->keys.end.printable());
|
||||
.detail("Begin", shard->keys.begin)
|
||||
.detail("End", shard->keys.end);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5181,7 +5179,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
|||
}
|
||||
|
||||
TraceEvent(SevDebug, "ChangeFeedPopQuery", self->thisServerID)
|
||||
.detail("RangeID", req.rangeID.printable())
|
||||
.detail("RangeID", req.rangeID)
|
||||
.detail("Version", req.version)
|
||||
.detail("SSVersion", self->version.get())
|
||||
.detail("Range", req.range);
|
||||
|
@ -5238,8 +5236,8 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
if (startVersion >= endVersion || (changeFeedInfo->removing)) {
|
||||
TEST(true); // Change Feed popped before fetch
|
||||
TraceEvent(SevDebug, "FetchChangeFeedNoOp", data->thisServerID)
|
||||
.detail("RangeID", rangeId.printable())
|
||||
.detail("Range", range.toString())
|
||||
.detail("RangeID", rangeId)
|
||||
.detail("Range", range)
|
||||
.detail("StartVersion", startVersion)
|
||||
.detail("EndVersion", endVersion)
|
||||
.detail("Removing", changeFeedInfo->removing);
|
||||
|
@ -5389,8 +5387,8 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
if (e.code() != error_code_end_of_stream) {
|
||||
TraceEvent(SevDebug, "FetchChangeFeedError", data->thisServerID)
|
||||
.errorUnsuppressed(e)
|
||||
.detail("RangeID", rangeId.printable())
|
||||
.detail("Range", range.toString())
|
||||
.detail("RangeID", rangeId)
|
||||
.detail("Range", range)
|
||||
.detail("EndVersion", endVersion)
|
||||
.detail("Removing", changeFeedInfo->removing)
|
||||
.detail("Destroyed", changeFeedInfo->destroyed);
|
||||
|
@ -5436,8 +5434,8 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
}
|
||||
|
||||
TraceEvent(SevDebug, "FetchChangeFeedDone", data->thisServerID)
|
||||
.detail("RangeID", rangeId.printable())
|
||||
.detail("Range", range.toString())
|
||||
.detail("RangeID", rangeId)
|
||||
.detail("Range", range)
|
||||
.detail("StartVersion", startVersion)
|
||||
.detail("EndVersion", endVersion)
|
||||
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
|
||||
|
@ -5460,8 +5458,8 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
state FlowLock::Releaser holdingFCFPL(data->fetchChangeFeedParallelismLock);
|
||||
|
||||
TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range.toString())
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("BeginVersion", beginVersion)
|
||||
.detail("EndVersion", endVersion);
|
||||
|
||||
|
@ -5469,8 +5467,8 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
if (cleanupPending != data->changeFeedCleanupDurable.end()) {
|
||||
TEST(true); // Change feed waiting for dirty previous move to finish
|
||||
TraceEvent(SevDebug, "FetchChangeFeedWaitCleanup", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range.toString())
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("CleanupVersion", cleanupPending->second)
|
||||
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
|
||||
.detail("BeginVersion", beginVersion)
|
||||
|
@ -5482,8 +5480,8 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
if (cleanupPendingAfter != data->changeFeedCleanupDurable.end()) {
|
||||
ASSERT(cleanupPendingAfter->second >= endVersion);
|
||||
TraceEvent(SevDebug, "FetchChangeFeedCancelledByCleanup", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range.toString())
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("BeginVersion", beginVersion)
|
||||
.detail("EndVersion", endVersion);
|
||||
return invalidVersion;
|
||||
|
@ -5526,8 +5524,8 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
Version cleanupVersion = data->data().getLatestVersion();
|
||||
|
||||
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetch", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range.toString())
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("Version", cleanupVersion);
|
||||
|
||||
if (g_network->isSimulated()) {
|
||||
|
@ -5606,7 +5604,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
cfInfo->durableFetchVersion = NotifiedVersion();
|
||||
|
||||
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", cfInfo->id.printable())
|
||||
.detail("RangeID", cfInfo->id)
|
||||
.detail("Range", cfInfo->range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", cfInfo->emptyVersion)
|
||||
|
@ -5641,7 +5639,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
bool existing = existingEntry != data->uidChangeFeed.end();
|
||||
|
||||
TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", cfEntry.rangeId.printable())
|
||||
.detail("RangeID", cfEntry.rangeId)
|
||||
.detail("Range", cfEntry.range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", cfEntry.emptyVersion)
|
||||
|
@ -5746,7 +5744,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
existingEntry->second->emptyVersion + 1,
|
||||
existingEntry->second->stopVersion)));
|
||||
TraceEvent(SevDebug, "PersistingResetChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", existingEntry->second->id.printable())
|
||||
.detail("RangeID", existingEntry->second->id)
|
||||
.detail("Range", existingEntry->second->range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", existingEntry->second->emptyVersion)
|
||||
|
@ -5768,7 +5766,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
Version cleanupVersion = data->data().getLatestVersion();
|
||||
|
||||
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID)
|
||||
.detail("RangeID", feedId.printable())
|
||||
.detail("RangeID", feedId)
|
||||
.detail("Range", existingEntry->second->range)
|
||||
.detail("Version", cleanupVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
@ -6821,8 +6819,8 @@ private:
|
|||
auto feed = data->uidChangeFeed.find(changeFeedId);
|
||||
|
||||
TraceEvent(SevDebug, "ChangeFeedPrivateMutation", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString())
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion)
|
||||
.detail("PopVersion", popVersion)
|
||||
.detail("Status", status);
|
||||
|
@ -6850,8 +6848,8 @@ private:
|
|||
ASSERT(feed != data->uidChangeFeed.end());
|
||||
|
||||
TraceEvent(SevDebug, "AddingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString())
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("EmptyVersion", feed->second->emptyVersion);
|
||||
|
||||
auto rs = data->keyChangeFeed.modify(changeFeedRange);
|
||||
|
@ -6888,24 +6886,24 @@ private:
|
|||
|
||||
} else if (status == ChangeFeedStatus::CHANGE_FEED_CREATE && createdFeed) {
|
||||
TraceEvent(SevDebug, "CreatingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString())
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion);
|
||||
// no-op, already created metadata
|
||||
addMutationToLog = true;
|
||||
}
|
||||
if (status == ChangeFeedStatus::CHANGE_FEED_STOP && currentVersion < feed->second->stopVersion) {
|
||||
TraceEvent(SevDebug, "StoppingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString())
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion);
|
||||
feed->second->stopVersion = currentVersion;
|
||||
addMutationToLog = true;
|
||||
}
|
||||
if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY && !createdFeed && feed != data->uidChangeFeed.end()) {
|
||||
TraceEvent(SevDebug, "DestroyingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString())
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion);
|
||||
Key beginClearKey = changeFeedId.withPrefix(persistChangeFeedKeys.begin);
|
||||
Version cleanupVersion = data->data().getLatestVersion();
|
||||
|
@ -8395,8 +8393,8 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||
Version popVersion, stopVersion;
|
||||
std::tie(changeFeedRange, popVersion, stopVersion) = decodeChangeFeedSSValue(changeFeeds[feedLoc].value);
|
||||
TraceEvent(SevDebug, "RestoringChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString())
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("StopVersion", stopVersion)
|
||||
.detail("PopVer", popVersion);
|
||||
Reference<ChangeFeedInfo> changeFeedInfo(new ChangeFeedInfo());
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* StorageQuota.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct StorageQuotaWorkload : TestWorkload {
|
||||
StorageQuotaWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
|
||||
|
||||
std::string description() const override { return "StorageQuotaWorkload"; }
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
Future<Void> start(Database const& cx) override { return (clientId == 0) ? _start(cx) : Void(); }
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
|
||||
ACTOR Future<Void> _start(Database cx) {
|
||||
wait(setStorageQuotaHelper(cx, "name1"_sr, 100));
|
||||
wait(setStorageQuotaHelper(cx, "name2"_sr, 200));
|
||||
wait(setStorageQuotaHelper(cx, "name1"_sr, 300));
|
||||
|
||||
state Optional<uint64_t> quota1 = wait(getStorageQuotaHelper(cx, "name1"_sr));
|
||||
ASSERT(quota1.present() && quota1.get() == 300);
|
||||
state Optional<uint64_t> quota2 = wait(getStorageQuotaHelper(cx, "name2"_sr));
|
||||
ASSERT(quota2.present() && quota2.get() == 200);
|
||||
state Optional<uint64_t> quota3 = wait(getStorageQuotaHelper(cx, "name3"_sr));
|
||||
ASSERT(!quota3.present());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> setStorageQuotaHelper(Database cx, StringRef tenantName, uint64_t quota) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
setStorageQuota(tr, tenantName, quota);
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<uint64_t>> getStorageQuotaHelper(Database cx, StringRef tenantName) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
state Optional<uint64_t> quota = wait(getStorageQuota(&tr, tenantName));
|
||||
wait(tr.commit());
|
||||
return quota;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<StorageQuotaWorkload> StorageQuotaWorkloadFactory("StorageQuota", true);
|
|
@ -21,7 +21,7 @@ add_flow_target(STATIC_LIBRARY NAME flow_sampling SRCS ${FLOW_SRCS})
|
|||
# Since we want to ensure no symbols from other modules are used, create an
|
||||
# executable so the linker will throw errors if it can't find the declaration
|
||||
# of a symbol.
|
||||
add_flow_target(LINK_TEST NAME flowlinktest SRCS ${FLOW_SRCS} LinkTest.cpp)
|
||||
add_flow_target(LINK_TEST NAME flowlinktest SRCS LinkTest.cpp)
|
||||
target_link_libraries(flowlinktest PRIVATE flow stacktrace)
|
||||
|
||||
find_package(ZLIB)
|
||||
|
|
|
@ -1731,15 +1731,10 @@ SystemStatistics getSystemStatistics(std::string const& dataFolder,
|
|||
0,
|
||||
returnStats.elapsed -
|
||||
std::min<double>(returnStats.elapsed, (nowIOMilliSecs - (*statState)->lastIOMilliSecs) / 1000.0));
|
||||
returnStats.processDiskReadSeconds = std::max<double>(
|
||||
0,
|
||||
returnStats.elapsed - std::min<double>(returnStats.elapsed,
|
||||
(nowReadMilliSecs - (*statState)->lastReadMilliSecs) / 1000.0));
|
||||
returnStats.processDiskReadSeconds =
|
||||
std::min<double>(returnStats.elapsed, (nowReadMilliSecs - (*statState)->lastReadMilliSecs) / 1000.0);
|
||||
returnStats.processDiskWriteSeconds =
|
||||
std::max<double>(0,
|
||||
returnStats.elapsed -
|
||||
std::min<double>(returnStats.elapsed,
|
||||
(nowWriteMilliSecs - (*statState)->lastWriteMilliSecs) / 1000.0));
|
||||
std::min<double>(returnStats.elapsed, (nowWriteMilliSecs - (*statState)->lastWriteMilliSecs) / 1000.0);
|
||||
returnStats.processDiskRead = (nowReads - (*statState)->lastReads);
|
||||
returnStats.processDiskWrite = (nowWrites - (*statState)->lastWrites);
|
||||
returnStats.processDiskWriteSectors = (nowWriteSectors - (*statState)->lastWriteSectors);
|
||||
|
@ -3469,7 +3464,12 @@ void* loadLibrary(const char* lib_path) {
|
|||
void* dlobj = nullptr;
|
||||
|
||||
#if defined(__unixish__)
|
||||
dlobj = dlopen(lib_path, RTLD_LAZY | RTLD_LOCAL);
|
||||
dlobj = dlopen(lib_path,
|
||||
RTLD_LAZY | RTLD_LOCAL
|
||||
#ifdef USE_SANITIZER // Keep alive dlopen()-ed libs for symbolized XSAN backtrace
|
||||
| RTLD_NODELETE
|
||||
#endif
|
||||
);
|
||||
if (dlobj == nullptr) {
|
||||
TraceEvent(SevWarn, "LoadLibraryFailed").detail("Library", lib_path).detail("Error", dlerror());
|
||||
}
|
||||
|
|
|
@ -217,6 +217,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml)
|
||||
add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml)
|
||||
add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml)
|
||||
add_fdb_test(TEST_FILES rare/StorageQuotaTest.toml)
|
||||
add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml)
|
||||
add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml)
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
[[test]]
|
||||
testTitle = 'StorageQuota'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'StorageQuota'
|
Loading…
Reference in New Issue