Prevent grvProxyServer from modifying ServerDBInfo object
This commit is contained in:
parent
1a20cf9579
commit
b2bbdf0d7f
|
@ -222,7 +222,7 @@ struct GrvProxyData {
|
||||||
Reference<ILogSystem> logSystem;
|
Reference<ILogSystem> logSystem;
|
||||||
|
|
||||||
Database cx;
|
Database cx;
|
||||||
Reference<AsyncVar<ServerDBInfo>> db;
|
Reference<AsyncVar<ServerDBInfo> const> db;
|
||||||
|
|
||||||
Optional<LatencyBandConfig> latencyBandConfig;
|
Optional<LatencyBandConfig> latencyBandConfig;
|
||||||
double lastStartCommit;
|
double lastStartCommit;
|
||||||
|
@ -251,7 +251,7 @@ struct GrvProxyData {
|
||||||
GrvProxyData(UID dbgid,
|
GrvProxyData(UID dbgid,
|
||||||
MasterInterface master,
|
MasterInterface master,
|
||||||
RequestStream<GetReadVersionRequest> getConsistentReadVersion,
|
RequestStream<GetReadVersionRequest> getConsistentReadVersion,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db)
|
Reference<AsyncVar<ServerDBInfo> const> db)
|
||||||
: dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion),
|
: dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion),
|
||||||
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE)), db(db), lastStartCommit(0),
|
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::TRUE)), db(db), lastStartCommit(0),
|
||||||
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0),
|
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0),
|
||||||
|
@ -275,7 +275,7 @@ ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy,
|
||||||
|
|
||||||
// Get transaction rate info from RateKeeper.
|
// Get transaction rate info from RateKeeper.
|
||||||
ACTOR Future<Void> getRate(UID myID,
|
ACTOR Future<Void> getRate(UID myID,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db,
|
Reference<AsyncVar<ServerDBInfo> const> db,
|
||||||
int64_t* inTransactionCount,
|
int64_t* inTransactionCount,
|
||||||
int64_t* inBatchTransactionCount,
|
int64_t* inBatchTransactionCount,
|
||||||
GrvTransactionRateInfo* transactionRateInfo,
|
GrvTransactionRateInfo* transactionRateInfo,
|
||||||
|
@ -375,7 +375,7 @@ void dropRequestFromQueue(Deque<GetReadVersionRequest>* queue, GrvProxyStats* st
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put a GetReadVersion request into the queue corresponding to its priority.
|
// Put a GetReadVersion request into the queue corresponding to its priority.
|
||||||
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>> db,
|
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo> const> db,
|
||||||
SpannedDeque<GetReadVersionRequest>* systemQueue,
|
SpannedDeque<GetReadVersionRequest>* systemQueue,
|
||||||
SpannedDeque<GetReadVersionRequest>* defaultQueue,
|
SpannedDeque<GetReadVersionRequest>* defaultQueue,
|
||||||
SpannedDeque<GetReadVersionRequest>* batchQueue,
|
SpannedDeque<GetReadVersionRequest>* batchQueue,
|
||||||
|
@ -634,7 +634,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo>> db) {
|
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||||
state Future<Void> nextRequestTimer = Never();
|
state Future<Void> nextRequestTimer = Never();
|
||||||
state Future<GetDataDistributorMetricsReply> nextReply = Never();
|
state Future<GetDataDistributorMetricsReply> nextReply = Never();
|
||||||
|
|
||||||
|
@ -680,7 +680,7 @@ ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<Asyn
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db,
|
Reference<AsyncVar<ServerDBInfo> const> db,
|
||||||
PromiseStream<Future<Void>> addActor,
|
PromiseStream<Future<Void>> addActor,
|
||||||
GrvProxyData* grvProxyData,
|
GrvProxyData* grvProxyData,
|
||||||
GetHealthMetricsReply* healthMetricsReply,
|
GetHealthMetricsReply* healthMetricsReply,
|
||||||
|
@ -898,7 +898,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
||||||
|
|
||||||
ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
|
ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
|
||||||
MasterInterface master,
|
MasterInterface master,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db) {
|
Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||||
state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db);
|
state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db);
|
||||||
|
|
||||||
state PromiseStream<Future<Void>> addActor;
|
state PromiseStream<Future<Void>> addActor;
|
||||||
|
@ -945,7 +945,7 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
|
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
|
||||||
uint64_t recoveryCount,
|
uint64_t recoveryCount,
|
||||||
GrvProxyInterface myInterface) {
|
GrvProxyInterface myInterface) {
|
||||||
loop {
|
loop {
|
||||||
|
@ -959,7 +959,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
|
||||||
|
|
||||||
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
|
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
|
||||||
InitializeGrvProxyRequest req,
|
InitializeGrvProxyRequest req,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db) {
|
Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||||
try {
|
try {
|
||||||
state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
|
state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
|
||||||
wait(core || checkRemoved(db, req.recoveryCount, proxy));
|
wait(core || checkRemoved(db, req.recoveryCount, proxy));
|
||||||
|
|
|
@ -890,7 +890,7 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
|
||||||
std::string whitelistBinPaths);
|
std::string whitelistBinPaths);
|
||||||
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
|
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
|
||||||
InitializeGrvProxyRequest req,
|
InitializeGrvProxyRequest req,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db);
|
Reference<AsyncVar<ServerDBInfo> const> db);
|
||||||
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
||||||
IDiskQueue* persistentQueue,
|
IDiskQueue* persistentQueue,
|
||||||
Reference<AsyncVar<ServerDBInfo>> db,
|
Reference<AsyncVar<ServerDBInfo>> db,
|
||||||
|
|
Loading…
Reference in New Issue