Fix mock DD incompatible places

This commit is contained in:
Xiaoxi Wang 2023-05-03 16:43:52 -07:00
parent bef639ab81
commit ac16dbd0d8
7 changed files with 41 additions and 18 deletions

View File

@ -532,7 +532,7 @@ ACTOR Future<Void> getSourceServersForRange(DDQueue* self,
}
DDQueue::DDQueue(DDQueueInitParams const& params)
: IDDRelocationQueue(), distributorId(params.id), lock(params.lock), cx(params.db->context()),
: IDDRelocationQueue(), distributorId(params.id), lock(params.lock),
txnProcessor(params.db), teamCollections(params.teamCollections),
shardsAffectedByTeamFailure(params.shardsAffectedByTeamFailure),
physicalShardCollection(params.physicalShardCollection), getAverageShardBytes(params.getAverageShardBytes),
@ -1130,7 +1130,7 @@ void DDQueue::enqueueCancelledDataMove(UID dataMoveId, KeyRange range, const DDE
}
DDQueue::DDDataMove dataMove(dataMoveId);
dataMove.cancel = cleanUpDataMove(this->cx,
dataMove.cancel = cleanUpDataMove(txnProcessor->context(),
dataMoveId,
this->lock,
&this->cleanUpDataMoveParallelismLock,
@ -1173,7 +1173,7 @@ ACTOR Future<Void> cancelDataMove(class DDQueue* self, KeyRange range, const DDE
.detail("DataMoveRange", keys)
.detail("Range", range);
if (!it->value().cancel.isValid()) {
it->value().cancel = cleanUpDataMove(self->cx,
it->value().cancel = cleanUpDataMove(self->txnProcessor->context(),
it->value().id,
self->lock,
&self->cleanUpDataMoveParallelismLock,

View File

@ -2999,12 +2999,12 @@ public:
// The following actors (e.g. storageRecruiter) do not need to be assigned to a variable because
// they are always running.
self->addActor.send(self->storageRecruiter(recruitStorage, *ddEnabledState));
self->addActor.send(self->monitorStorageServerRecruitment());
self->addActor.send(self->waitServerListChange(serverRemoved.getFuture(), *ddEnabledState));
self->addActor.send(self->monitorHealthyTeams());
if (!self->db->isMocked()) {
self->addActor.send(self->storageRecruiter(recruitStorage, *ddEnabledState));
self->addActor.send(self->monitorStorageServerRecruitment());
self->addActor.send(self->waitServerListChange(serverRemoved.getFuture(), *ddEnabledState));
self->addActor.send(self->trackExcludedServers());
self->addActor.send(self->waitHealthyZoneChange());
self->addActor.send(self->monitorPerpetualStorageWiggle());
@ -3815,6 +3815,8 @@ Future<Void> DDTeamCollection::readStorageWiggleMap() {
}
Future<Void> DDTeamCollection::updateStorageMetadata(TCServerInfo* server) {
if(db->isMocked())
return Never();
return DDTeamCollectionImpl::updateStorageMetadata(this, server);
}

View File

@ -1066,3 +1066,7 @@ Future<Optional<HealthMetrics::StorageStats>> DDMockTxnProcessor::getStorageStat
}
return Optional<HealthMetrics::StorageStats>(it->second->getStorageStats());
}
Future<DatabaseConfiguration> DDMockTxnProcessor::getDatabaseConfiguration() const {
return mgs->configuration;
}

View File

@ -657,8 +657,9 @@ public:
}
// Trigger background cleanup for datamove tombstones
self->addActor.send((self->removeDataMoveTombstoneBackground(self)));
if (!self->txnProcessor->isMocked()) {
self->addActor.send(self->removeDataMoveTombstoneBackground(self));
}
return Void();
}
@ -695,6 +696,10 @@ public:
};
Future<Void> DataDistributor::initDDConfigWatch() {
if (txnProcessor->isMocked()) {
onConfigChange = Never();
return Void();
}
onConfigChange = map(DDConfiguration().trigger.onChange(
SystemDBWriteLockedNow(txnProcessor->context().getReference()), {}, configChangeWatching),
[](Version v) {
@ -988,17 +993,16 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
// &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); ASSERT( cx->locationCacheSize ==
// SERVER_KNOBS->DD_LOCATION_CACHE_SIZE
// );
self->txnProcessor = Reference<IDDTxnProcessor>(new DDTxnProcessor(cx));
// wait(debugCheckCoalescing(self->txnProcessor->context()));
// Make sure that the watcher has established a baseline before init() below so the watcher will
// see any changes that occur after init() has read the config state.
wait(self->initDDConfigWatch());
} else {
ASSERT(self->txnProcessor.isValid() && self->txnProcessor->isMocked());
}
// Make sure that the watcher has established a baseline before init() below so the watcher will
// see any changes that occur after init() has read the config state.
wait(self->initDDConfigWatch());
loop {
self->context->trackerCancelled = false;
// whether all initial shard are tracked
@ -1010,6 +1014,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state KeyRangeMap<ShardTrackedData> shards;
state Promise<UID> removeFailedServer;
try {
wait(DataDistributor::init(self));
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
@ -1023,6 +1028,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
state Reference<AsyncVar<bool>> processingWiggle(new AsyncVar<bool>(false));
if (SERVER_KNOBS->DD_TENANT_AWARENESS_ENABLED || SERVER_KNOBS->STORAGE_QUOTA_ENABLED) {
ASSERT(!isMocked);
wait(self->initTenantCache());
}
@ -1137,8 +1143,11 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
getUnhealthyRelocationCount,
getAverageShardBytes });
teamCollectionsPtrs.push_back(self->context->primaryTeamCollection.getPtr());
auto recruitStorage = IAsyncListener<RequestStream<RecruitStorageRequest>>::create(
self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; });
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage;
if (!isMocked) {
recruitStorage = IAsyncListener<RequestStream<RecruitStorageRequest>>::create(
self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; });
}
if (self->configuration.usableRegions > 1) {
self->context->remoteTeamCollection = makeReference<DDTeamCollection>(
DDTeamCollectionInitParams{ self->txnProcessor,

View File

@ -236,7 +236,7 @@ public:
ActorCollectionNoErrors noErrorActors; // has to be the last one to be destroyed because other Actors may use it.
UID distributorId;
MoveKeysLock lock;
Database cx;
// Should always use txnProcessor to access Database object
Reference<IDDTxnProcessor> txnProcessor;
std::vector<TeamCollectionInterface> teamCollections;

View File

@ -79,7 +79,7 @@ public:
[[nodiscard]] virtual Future<MoveKeysLock> takeMoveKeysLock(const UID& ddId) const { return MoveKeysLock(); }
virtual Future<DatabaseConfiguration> getDatabaseConfiguration() const { return DatabaseConfiguration(); }
virtual Future<DatabaseConfiguration> getDatabaseConfiguration() const = 0;
virtual Future<Void> updateReplicaKeys(const std::vector<Optional<Key>>& primaryIds,
const std::vector<Optional<Key>>& remoteIds,
@ -133,7 +133,7 @@ public:
virtual Future<HealthMetrics> getHealthMetrics(bool detailed = false) const = 0;
virtual Future<Optional<Value>> readRebalanceDDIgnoreKey() const { return {}; }
virtual Future<Optional<Value>> readRebalanceDDIgnoreKey() const { return Optional<Value>(); }
virtual Future<Void> waitDDTeamInfoPrintSignal() const { return Never(); }
@ -301,6 +301,8 @@ public:
Future<Optional<HealthMetrics::StorageStats>> getStorageStats(const UID& id, double maxStaleness) const override;
Future<DatabaseConfiguration> getDatabaseConfiguration() const override;
protected:
Future<Void> rawStartMovement(const MoveKeysParams& params, std::map<UID, StorageServerInterface>& tssMapping);

View File

@ -1,3 +1,9 @@
[[knobs]]
enable_dd_physical_shard = false
shard_encode_location_metadata = false
dd_tenant_awareness_enabled = false
storage_quota_enabled = false
[[test]]
testTitle = 'MockDDReadWriteTest'
useDB = false