Fix problem with PROXY_USE_RESOLVER_PRIVATE_MUTATIONS in resolver.

This commit is contained in:
Dan Lambright 2022-01-21 10:39:06 -05:00
parent 9544379cdf
commit 1f67250402
4 changed files with 22 additions and 10 deletions

View File

@ -226,6 +226,7 @@ ACTOR Future<Void> newResolvers(Reference<ClusterRecoveryData> self, RecruitFrom
std::vector<Future<ResolverInterface>> initializationReplies;
for (int i = 0; i < recr.resolvers.size(); i++) {
InitializeResolverRequest req;
req.masterLifetime = self->masterLifetime;
req.recoveryCount = self->cstate.myDBState.recoveryCount + 1;
req.commitProxyCount = recr.commitProxies.size();
req.resolverCount = recr.resolvers.size();
@ -578,7 +579,9 @@ std::pair<KeyRangeRef, bool> findRange(CoalescedKeyRangeMap<int>& key_resolver,
}
ACTOR Future<Void> resolutionBalancing(Reference<ClusterRecoveryData> self) {
state CoalescedKeyRangeMap<int> key_resolver;
state CoalescedKeyRangeMap<int> key_resolver(
0, SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ? normalKeys.end : allKeys.end);
key_resolver.insert(SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ? normalKeys : allKeys, 0);
key_resolver.insert(allKeys, 0);
loop {
wait(delay(SERVER_KNOBS->MIN_BALANCE_TIME, TaskPriority::ResolutionMetrics));
@ -1282,7 +1285,12 @@ ACTOR Future<Void> sendInitialCommitToResolvers(Reference<ClusterRecoveryData> s
for (auto& it : self->commitProxies) {
endpoints.push_back(it.txnState.getEndpoint());
}
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
// Broadcasts transaction state store to resolvers.
for (auto& it : self->resolvers) {
endpoints.push_back(it.txnState.getEndpoint());
}
}
loop {
if (!data.size())
break;
@ -1885,4 +1893,4 @@ std::string& getRecoveryEventName(ClusterRecoveryEventType type) {
auto iter = recoveryEventNameMap.find(type);
ASSERT(iter != recoveryEventNameMap.end());
return iter->second;
}
}

View File

@ -632,15 +632,14 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver,
TraceEvent("ResolverInit", resolver.id()).detail("RecoveryCount", initReq.recoveryCount);
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(db->get().master.id() == initReq.masterId &&
while (!(initReq.masterLifetime.isEqual(db->get().masterLifetime) &&
db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
//TraceEvent("ResolverInit2", resolver.id()).detail("LSEpoch", db->get().logSystemConfig.epoch);
// TraceEvent("ResolverInit2", resolver.id()).detail("LSEpoch", db->get().logSystemConfig.epoch);
wait(db->onChange());
}
// Initialize txnStateStore
self->logSystem = ILogSystem::fromServerDBInfo(resolver.id(), db->get(), false, addActor);
state PromiseStream<Future<Void>> addActor;
state Future<Void> onError =
transformError(actorCollection(addActor.getFuture()), broken_promise(), resolver_failed());

View File

@ -732,6 +732,7 @@ struct InitializeBlobManagerRequest {
struct InitializeResolverRequest {
constexpr static FileIdentifier file_identifier = 7413317;
LifetimeToken masterLifetime;
uint64_t recoveryCount;
int commitProxyCount;
int resolverCount;
@ -740,7 +741,7 @@ struct InitializeResolverRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, recoveryCount, commitProxyCount, resolverCount, masterId, reply);
serializer(ar, masterLifetime, recoveryCount, commitProxyCount, resolverCount, masterId, reply);
}
};

View File

@ -2132,8 +2132,12 @@ ACTOR Future<Optional<Value>> quickGetValue(StorageServer* data,
if (data->shards[key]->isReadable()) {
try {
// TODO: Use a lower level API may be better? Or tweak priorities?
GetValueRequest req(
pOriginalReq->spanContext, key, version, pOriginalReq->tags, pOriginalReq->debugID, pOriginalReq->ssLatestCommitVersions);
GetValueRequest req(pOriginalReq->spanContext,
key,
version,
pOriginalReq->tags,
pOriginalReq->debugID,
pOriginalReq->ssLatestCommitVersions);
// Note that it does not use readGuard to avoid server being overloaded here. Throttling is enforced at the
// original request level, rather than individual underlying lookups. The reason is that throttle any
// individual underlying lookup will fail the original request, which is not productive.
@ -2666,7 +2670,7 @@ ACTOR Future<RangeResult> quickGetKeyValues(StorageServer* data,
req.end = firstGreaterOrEqual(strinc(prefix, req.arena));
req.version = version;
req.tags = pOriginalReq->tags;
req.ssLatestCommitVersions = pOriginalReq->ssLatestCommitVersions;
req.ssLatestCommitVersions = pOriginalReq->ssLatestCommitVersions;
req.debugID = pOriginalReq->debugID;
// Note that it does not use readGuard to avoid server being overloaded here. Throttling is enforced at the