fix: removed storage servers must be popped on remote logs from the proxy
This commit is contained in:
parent
471e7b9ab9
commit
fa9089c2e8
|
@ -46,8 +46,8 @@ struct applyMutationsData {
|
||||||
// the same operations will be done on all proxies at the same time. Otherwise, the data stored in
|
// the same operations will be done on all proxies at the same time. Otherwise, the data stored in
|
||||||
// txnStateStore will become corrupted.
|
// txnStateStore will become corrupted.
|
||||||
static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference<ILogSystem> logSystem = Reference<ILogSystem>(), Version popVersion = 0,
|
static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference<ILogSystem> logSystem = Reference<ILogSystem>(), Version popVersion = 0,
|
||||||
KeyRangeMap<std::set<Key> >* vecBackupKeys = NULL, KeyRangeMap<ServerCacheInfo>* keyInfo = NULL, std::map<Key, applyMutationsData>* uid_applyMutationsData = NULL,
|
KeyRangeMap<std::set<Key> >* vecBackupKeys = NULL, KeyRangeMap<ServerCacheInfo>* keyInfo = NULL, std::map<Key, applyMutationsData>* uid_applyMutationsData = NULL, RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>(),
|
||||||
RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>(), Database cx = Database(), NotifiedVersion* commitVersion = NULL, std::map<UID, Reference<StorageInfo>>* storageCache = NULL, bool initialCommit = false ) {
|
Database cx = Database(), NotifiedVersion* commitVersion = NULL, std::map<UID, Reference<StorageInfo>>* storageCache = NULL, std::map<Tag, Version>* tag_popped = NULL, bool initialCommit = false ) {
|
||||||
for (auto const& m : mutations) {
|
for (auto const& m : mutations) {
|
||||||
//TraceEvent("MetadataMutation", dbgid).detail("M", m.toString());
|
//TraceEvent("MetadataMutation", dbgid).detail("M", m.toString());
|
||||||
|
|
||||||
|
@ -291,8 +291,10 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
|
||||||
if (logSystem && popVersion) {
|
if (logSystem && popVersion) {
|
||||||
auto serverKeysCleared = txnStateStore->readRange( range & serverTagKeys ).get(); // read is expected to be immediately available
|
auto serverKeysCleared = txnStateStore->readRange( range & serverTagKeys ).get(); // read is expected to be immediately available
|
||||||
for(auto &kv : serverKeysCleared) {
|
for(auto &kv : serverKeysCleared) {
|
||||||
TraceEvent("ServerTagRemove").detail("popVersion", popVersion).detail("tag", decodeServerTagValue(kv.value).toString()).detail("server", decodeServerTagKey(kv.key));
|
Tag tag = decodeServerTagValue(kv.value);
|
||||||
|
TraceEvent("ServerTagRemove").detail("popVersion", popVersion).detail("tag", tag.toString()).detail("server", decodeServerTagKey(kv.key));
|
||||||
logSystem->pop( popVersion, decodeServerTagValue(kv.value) );
|
logSystem->pop( popVersion, decodeServerTagValue(kv.value) );
|
||||||
|
(*tag_popped)[tag] = popVersion;
|
||||||
|
|
||||||
if(toCommit) {
|
if(toCommit) {
|
||||||
MutationRef privatized = m;
|
MutationRef privatized = m;
|
||||||
|
@ -317,8 +319,10 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
|
||||||
if (logSystem && popVersion) {
|
if (logSystem && popVersion) {
|
||||||
auto serverKeysCleared = txnStateStore->readRange( range & serverTagHistoryKeys ).get(); // read is expected to be immediately available
|
auto serverKeysCleared = txnStateStore->readRange( range & serverTagHistoryKeys ).get(); // read is expected to be immediately available
|
||||||
for(auto &kv : serverKeysCleared) {
|
for(auto &kv : serverKeysCleared) {
|
||||||
TraceEvent("ServerTagHistoryRemove").detail("popVersion", popVersion).detail("tag", decodeServerTagValue(kv.value).toString()).detail("version", decodeServerTagHistoryKey(kv.key));
|
Tag tag = decodeServerTagValue(kv.value);
|
||||||
logSystem->pop( popVersion, decodeServerTagValue(kv.value) );
|
TraceEvent("ServerTagHistoryRemove").detail("popVersion", popVersion).detail("tag", tag.toString()).detail("version", decodeServerTagHistoryKey(kv.key));
|
||||||
|
logSystem->pop( popVersion, tag );
|
||||||
|
(*tag_popped)[tag] = popVersion;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(!initialCommit) txnStateStore->clear( range & serverTagHistoryKeys );
|
if(!initialCommit) txnStateStore->clear( range & serverTagHistoryKeys );
|
||||||
|
|
|
@ -199,6 +199,7 @@ struct ProxyCommitData {
|
||||||
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
|
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
|
||||||
|
|
||||||
std::map<UID, Reference<StorageInfo>> storageCache;
|
std::map<UID, Reference<StorageInfo>> storageCache;
|
||||||
|
std::map<Tag, Version> tag_popped;
|
||||||
|
|
||||||
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
|
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
|
||||||
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
|
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
|
||||||
|
@ -446,7 +447,7 @@ ACTOR Future<Void> commitBatch(
|
||||||
for (int resolver = 0; resolver < resolution.size(); resolver++)
|
for (int resolver = 0; resolver < resolution.size(); resolver++)
|
||||||
committed = committed && resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
|
committed = committed && resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
|
||||||
if (committed)
|
if (committed)
|
||||||
applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache );
|
applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache, &self->tag_popped);
|
||||||
|
|
||||||
if( resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() && firstStateMutations ) {
|
if( resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() && firstStateMutations ) {
|
||||||
ASSERT(committed);
|
ASSERT(committed);
|
||||||
|
@ -508,7 +509,7 @@ ACTOR Future<Void> commitBatch(
|
||||||
{
|
{
|
||||||
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
|
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
|
||||||
commitCount++;
|
commitCount++;
|
||||||
applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache);
|
applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache, &self->tag_popped);
|
||||||
}
|
}
|
||||||
if(firstStateMutations) {
|
if(firstStateMutations) {
|
||||||
ASSERT(committed[t] == ConflictBatch::TransactionCommitted);
|
ASSERT(committed[t] == ConflictBatch::TransactionCommitted);
|
||||||
|
@ -1208,6 +1209,9 @@ ACTOR Future<Void> masterProxyServerCore(
|
||||||
dbInfoChange = db->onChange();
|
dbInfoChange = db->onChange();
|
||||||
if(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
|
if(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
|
||||||
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get());
|
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get());
|
||||||
|
for(auto it : commitData.tag_popped) {
|
||||||
|
commitData.logSystem->pop(it.second, it.first);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
when(Void _ = wait(onError)) {}
|
when(Void _ = wait(onError)) {}
|
||||||
|
@ -1305,7 +1309,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
||||||
|
|
||||||
Arena arena;
|
Arena arena;
|
||||||
bool confChanges;
|
bool confChanges;
|
||||||
applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference<ILogSystem>(), 0, &commitData.vecBackupKeys, &commitData.keyInfo, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.storageCache, true);
|
applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference<ILogSystem>(), 0, &commitData.vecBackupKeys, &commitData.keyInfo, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.storageCache, &commitData.tag_popped, true );
|
||||||
}
|
}
|
||||||
|
|
||||||
auto lockedKey = commitData.txnStateStore->readValue(databaseLockedKey).get();
|
auto lockedKey = commitData.txnStateStore->readValue(databaseLockedKey).get();
|
||||||
|
|
Loading…
Reference in New Issue