parse tenant id during getResolution phase
This commit is contained in:
parent
9cb7df318b
commit
33bddb31c7
|
@ -277,6 +277,8 @@ struct CommitTransactionRef {
|
|||
bool report_conflicting_keys = false;
|
||||
bool lock_aware = false; // set when metadata mutations are present
|
||||
Optional<SpanContext> spanContext;
|
||||
Optional<VectorRef<int64_t>> tenantIds; // The tenants associated with this transaction. This field only existing
|
||||
// when tenant mode is required and this transaction has metadata mutations
|
||||
|
||||
template <class Ar>
|
||||
force_inline void serialize(Ar& ar) {
|
||||
|
@ -288,7 +290,8 @@ struct CommitTransactionRef {
|
|||
read_snapshot,
|
||||
report_conflicting_keys,
|
||||
lock_aware,
|
||||
spanContext);
|
||||
spanContext,
|
||||
tenantIds);
|
||||
} else {
|
||||
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot);
|
||||
if (ar.protocolVersion().hasReportConflictingKeys()) {
|
||||
|
@ -324,7 +327,8 @@ struct CommitTransactionRef {
|
|||
}
|
||||
|
||||
size_t expectedSize() const {
|
||||
return read_conflict_ranges.expectedSize() + write_conflict_ranges.expectedSize() + mutations.expectedSize();
|
||||
return read_conflict_ranges.expectedSize() + write_conflict_ranges.expectedSize() + mutations.expectedSize() +
|
||||
(tenantIds.present() ? tenantIds.expectedSize() : 0);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -203,6 +203,8 @@ struct ResolutionRequestBuilder {
|
|||
ASSERT(transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768);
|
||||
|
||||
bool isTXNStateTransaction = false;
|
||||
bool needParseTenantId = !trRequest.tenantInfo.hasTenant() && self->getTenantMode() == TenantMode::REQUIRED;
|
||||
VectorRef<int64_t> tenantIds;
|
||||
for (auto& m : trIn.mutations) {
|
||||
DEBUG_MUTATION("AddTr", ver, m, self->dbgid).detail("Idx", transactionNumberInBatch);
|
||||
if (m.type == MutationRef::SetVersionstampedKey) {
|
||||
|
@ -216,6 +218,8 @@ struct ResolutionRequestBuilder {
|
|||
auto& tr = getOutTransaction(0, trIn.read_snapshot);
|
||||
tr.mutations.push_back(requests[0].arena, m);
|
||||
tr.lock_aware = trRequest.isLockAware();
|
||||
} else if (needParseTenantId && !isSystemKey(m.param1) && isSingleKeyMutation((MutationRef::Type)m.type)) {
|
||||
tenantIds.push_back(requests[0].arena, TenantAPI::extractTenantIdFromMutation(m));
|
||||
}
|
||||
}
|
||||
if (isTXNStateTransaction && !trRequest.isLockAware()) {
|
||||
|
@ -239,7 +243,9 @@ struct ResolutionRequestBuilder {
|
|||
}
|
||||
// Note only Resolver 0 got the correct spanContext, which means
|
||||
// the reply from Resolver 0 has the right one back.
|
||||
getOutTransaction(0, trIn.read_snapshot).spanContext = trRequest.spanContext;
|
||||
auto& tr = getOutTransaction(0, trIn.read_snapshot);
|
||||
tr.spanContext = trRequest.spanContext;
|
||||
tr.tenantIds = tenantIds;
|
||||
}
|
||||
|
||||
std::vector<int> resolversUsed;
|
||||
|
@ -960,9 +966,6 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
int conflictRangeCount = 0;
|
||||
self->maxTransactionBytes = 0;
|
||||
for (int t = 0; t < trs.size(); t++) {
|
||||
// detect invalid tenant operation
|
||||
|
||||
|
||||
requests.addTransaction(trs[t], self->commitVersion, t);
|
||||
conflictRangeCount +=
|
||||
trs[t].transaction.read_conflict_ranges.size() + trs[t].transaction.write_conflict_ranges.size();
|
||||
|
@ -1082,7 +1085,7 @@ bool validTenantAccess(MutationRef m,
|
|||
return true;
|
||||
}
|
||||
|
||||
inline bool tenantMapChanging(MutationRef mutation) {
|
||||
inline bool tenantMapChanging(MutationRef const& mutation) {
|
||||
const KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
|
||||
if (mutation.type == MutationRef::SetValue && mutation.param1.startsWith(tenantMapRange.begin)) {
|
||||
return true;
|
||||
|
@ -1171,13 +1174,26 @@ void applyMetadataEffect(CommitBatchContext* self) {
|
|||
committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
|
||||
}
|
||||
|
||||
if (committed) {
|
||||
Error e = validateAndProcessTenantAccess(
|
||||
self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations,
|
||||
self->pProxyCommitData,
|
||||
self->pProxyCommitData->dbgid,
|
||||
"applyMetadataEffect");
|
||||
committed = committed && (e.code() == error_code_success);
|
||||
if (committed && self->pProxyCommitData->getTenantMode() == TenantMode::REQUIRED) {
|
||||
auto& tenantIds = self->resolution[0].stateMutations[versionIndex][transactionIndex].tenantIds;
|
||||
ASSERT(tenantIds.present());
|
||||
// check whether contains tenant changes and normal key writing
|
||||
auto& mutations = self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations;
|
||||
committed =
|
||||
tenantIds.get().empty() || std::none_of(mutations.begin(), mutations.end(), tenantMapChanging);
|
||||
|
||||
// check if all tenant ids are valid if committed == true
|
||||
committed = committed &&
|
||||
std::all_of(tenantIds.get().begin(), tenantIds.get().end(), [self](const int64_t& tid) {
|
||||
return self->pProxyCommitData->tenantMap.count(tid);
|
||||
});
|
||||
|
||||
// TODO debug trace
|
||||
if (self->debugID.present()) {
|
||||
TraceEvent e(SevDebug, "TenantCheck_ApplyMetadataEffect", self->debugID.get());
|
||||
e.detail("TenantIds", tenantIds);
|
||||
e.detail("Mutations", mutations);
|
||||
}
|
||||
}
|
||||
|
||||
if (committed) {
|
||||
|
|
|
@ -352,7 +352,8 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
|||
stateTransactions.push_back_deep(
|
||||
stateTransactions.arena(),
|
||||
StateTransactionRef(reply.committed[t] == ConflictBatch::TransactionCommitted,
|
||||
req.transactions[t].mutations));
|
||||
req.transactions[t].mutations,
|
||||
req.transactions[t].tenantIds));
|
||||
|
||||
// for (const auto& m : req.transactions[t].mutations)
|
||||
// DEBUG_MUTATION("Resolver", req.version, m, self->dbgid);
|
||||
|
|
|
@ -69,17 +69,29 @@ struct ResolverInterface {
|
|||
struct StateTransactionRef {
|
||||
constexpr static FileIdentifier file_identifier = 6150271;
|
||||
StateTransactionRef() {}
|
||||
StateTransactionRef(const bool committed, VectorRef<MutationRef> const& mutations)
|
||||
: committed(committed), mutations(mutations) {}
|
||||
StateTransactionRef(const bool committed,
|
||||
VectorRef<MutationRef> const& mutations,
|
||||
Optional<VectorRef<int64_t>> tenantIds)
|
||||
: committed(committed), mutations(mutations), tenantIds(tenantIds) {}
|
||||
StateTransactionRef(Arena& p, const StateTransactionRef& toCopy)
|
||||
: committed(toCopy.committed), mutations(p, toCopy.mutations) {}
|
||||
: committed(toCopy.committed), mutations(p, toCopy.mutations) {
|
||||
if (toCopy.tenantIds.present()) {
|
||||
tenantIds = VectorRef<int64_t>(p, toCopy.tenantIds.get());
|
||||
}
|
||||
}
|
||||
bool committed;
|
||||
VectorRef<MutationRef> mutations;
|
||||
size_t expectedSize() const { return mutations.expectedSize(); }
|
||||
|
||||
// The tenants associated with this transaction. This field only existing when tenant mode is required. Because the
|
||||
// applyMetadataEffect need to know whether the tenant access is valid to decide whether to apply metadata
|
||||
Optional<VectorRef<int64_t>> tenantIds;
|
||||
size_t expectedSize() const {
|
||||
return mutations.expectedSize() + (tenantIds.present() ? tenantIds.expectedSize() : 0);
|
||||
}
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, committed, mutations);
|
||||
serializer(ar, committed, mutations, tenantIds);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -65,7 +65,7 @@ struct RawTenantAccessWorkload : TestWorkload {
|
|||
// create N tenant through special key space
|
||||
wait(runRYWTransaction(cx, [workload](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
for (int i = 0; i < workload->tenantCount ; i += 2) {
|
||||
for (int i = 0; i < workload->tenantCount; i += 2) {
|
||||
tr->set(workload->specialKeysTenantMapPrefix.withSuffix(workload->indexToTenantName(i)), ""_sr);
|
||||
}
|
||||
return Future<Void>(Void());
|
||||
|
@ -77,13 +77,9 @@ struct RawTenantAccessWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
bool hasNonexistentTenant() const {
|
||||
return lastCreatedTenants.size() + idx2Tid.size() < tenantCount;
|
||||
}
|
||||
bool hasNonexistentTenant() const { return lastCreatedTenants.size() + idx2Tid.size() < tenantCount; }
|
||||
|
||||
bool hasExistingTenant() const {
|
||||
return idx2Tid.size() - lastDeletedTenants.size() > 0;
|
||||
}
|
||||
bool hasExistingTenant() const { return idx2Tid.size() - lastDeletedTenants.size() > 0; }
|
||||
|
||||
int64_t extractTenantId(ValueRef value) {
|
||||
int64_t id;
|
||||
|
@ -282,6 +278,8 @@ struct RawTenantAccessWorkload : TestWorkload {
|
|||
state bool committed = false;
|
||||
|
||||
loop {
|
||||
self->lastDeletedTenants.clear();
|
||||
self->lastCreatedTenants.clear();
|
||||
tr->reset();
|
||||
tr->debugTransaction(traceId);
|
||||
try {
|
||||
|
|
|
@ -15,4 +15,4 @@ runSetup = true
|
|||
[[test.workload]]
|
||||
testName = 'RawTenantAccess'
|
||||
tenantCount = 1000
|
||||
testDuration = 200
|
||||
testDuration = 120
|
||||
|
|
Loading…
Reference in New Issue