split DD related headers

This commit is contained in:
Xiaoxi Wang 2022-08-16 14:32:55 -07:00
parent 25f1d3f4d8
commit e2e5abbc9d
8 changed files with 166 additions and 110 deletions

View File

@ -49,7 +49,7 @@
#include "flow/serialize.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "fdbserver/DDSharedContext.h"
#include "flow/actorcompiler.h" // This must be the last #include.
void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority) {

View File

@ -29,6 +29,7 @@
#include "fdbrpc/sim_validation.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/DDSharedContext.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/Knobs.h"
@ -516,7 +517,7 @@ ACTOR Future<Void> dataDistributionRelocator(struct DDQueue* self,
Future<Void> prevCleanup,
const DDEnabledState* ddEnabledState);
struct DDQueue {
struct DDQueue : public IDDRelocationQueue {
struct DDDataMove {
DDDataMove() = default;
explicit DDDataMove(UID id) : id(id) {}
@ -1312,6 +1313,8 @@ struct DDQueue {
};
return recurring(f, SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL);
}
int getUnhealthyRelocationCount() override { return unhealthyRelocations; }
};
ACTOR Future<Void> cancelDataMove(struct DDQueue* self, KeyRange range, const DDEnabledState* ddEnabledState) {
@ -2489,7 +2492,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(ddQueueFutures))) {}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); }
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) {
r.send(self.getUnhealthyRelocationCount());
}
}
}
} catch (Error& e) {
@ -2502,6 +2507,8 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
}
ACTOR Future<Void> dataDistributionQueue(Reference<DDSharedContext> context, Database cx);
TEST_CASE("/DataDistribution/DDQueue/ServerCounterTrace") {
state double duration = 2.5 * SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL;
state DDQueue self;

View File

@ -21,6 +21,7 @@
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/DDSharedContext.h"
#include "fdbserver/Knobs.h"
#include "fdbclient/DatabaseContext.h"
#include "flow/ActorCollection.h"
@ -68,7 +69,7 @@ ACTOR Future<Void> updateMaxShardSize(Reference<AsyncVar<int64_t>> dbSizeEstimat
}
}
struct DataDistributionTracker {
struct DataDistributionTracker : public IDDShardTracker {
Database cx;
UID distributorId;
KeyRangeMap<ShardTrackedData>* shards;
@ -130,11 +131,13 @@ struct DataDistributionTracker {
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), readyToStart(readyToStart),
anyZeroHealthyTeams(anyZeroHealthyTeams), trackerCancelled(trackerCancelled) {}
~DataDistributionTracker() {
~DataDistributionTracker() override {
*trackerCancelled = true;
// Cancel all actors so they aren't waiting on sizeChanged broken promise
sizeChanges.clear(false);
}
double getAverageShardBytes() override { return maxShardSize->get().get() / 2.0; }
};
void restartShardTrackers(DataDistributionTracker* self,
@ -1025,9 +1028,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
initData = Reference<InitialDataDistribution>();
loop choose {
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) {
req.send(self.maxShardSize->get().get() / 2);
}
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) { req.send(self.getAverageShardBytes()); }
when(wait(loggingTrigger)) {
TraceEvent("DDTrackerStats", self.distributorId)
.detail("Shards", self.shards->size())
@ -1056,3 +1057,9 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
throw e;
}
}
// Not used yet
ACTOR Future<Void> dataDistributionTracker(Reference<DDSharedContext> context,
Reference<InitialDataDistribution> initData,
Database cx,
KeyRangeMap<ShardTrackedData>* shards);

View File

@ -0,0 +1,34 @@
/*
* DDRelocationQueue.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOUNDATIONDB_DDRELOCATIONQUEUE_H
#define FOUNDATIONDB_DDRELOCATIONQUEUE_H
#include "DataDistribution.actor.h"
// send request/signal to DDRelocationQueue through interface
// call synchronous method from components outside DDRelocationQueue
struct IDDRelocationQueue {
PromiseStream<RelocateShard> relocationProducer, relocationConsumer; // FIXME(xwang): not used yet
// PromiseStream<Promise<int>> getUnhealthyRelocationCount; // FIXME(xwang): change it to a synchronous call
virtual int getUnhealthyRelocationCount() = 0;
virtual ~IDDRelocationQueue() = 0;
};
#endif // FOUNDATIONDB_DDRELOCATIONQUEUE_H

View File

@ -0,0 +1,40 @@
/*
* DDShardTracker.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOUNDATIONDB_DDSHARDTRACKER_H
#define FOUNDATIONDB_DDSHARDTRACKER_H
#include "DataDistribution.actor.h"
// send request/signal to DDTracker through interface
// call synchronous method from components outside DDShardTracker
struct IDDShardTracker {
// FIXME: the streams are not used yet
Promise<Void> readyToStart;
PromiseStream<GetMetricsRequest> getShardMetrics;
PromiseStream<GetTopKMetricsRequest> getTopKMetrics;
PromiseStream<GetMetricsListRequest> getShardMetricsList;
PromiseStream<KeyRange> restartShardTracker;
// PromiseStream<Promise<int64_t>> averageShardBytes; // FIXME(xwang): change it to a synchronous call
virtual double getAverageShardBytes() = 0;
virtual ~IDDShardTracker() = 0;
};
#endif // FOUNDATIONDB_DDSHARDTRACKER_H

View File

@ -0,0 +1,68 @@
/*
* DDSharedContext.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOUNDATIONDB_DDSHAREDCONTEXT_H
#define FOUNDATIONDB_DDSHAREDCONTEXT_H
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/ShardsAffectedByTeamFailure.h"
#include "fdbserver/DDShardTracker.h"
#include "fdbserver/DDRelocationQueue.h"
// The common info shared by all DD components. Normally the DD components should share the reference to the same
// context.
class DDSharedContext : public ReferenceCounted<DDSharedContext> {
// FIXME(xwang) mark fields privates
public:
std::unique_ptr<DDEnabledState>
ddEnabledState; // Note: don't operate directly because it's shared with snapshot server
IDDShardTracker* shardTracker = nullptr;
IDDRelocationQueue* relocationQueue = nullptr;
std::vector<IDDTeamCollection*> teamCollections;
// public:
UID ddId;
MoveKeysLock lock;
bool trackerCancelled = false;
DatabaseConfiguration configuration;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
Reference<AsyncVar<bool>> processingUnhealthy, processingWiggle;
DDSharedContext() = default;
DDSharedContext(UID id)
: ddEnabledState(new DDEnabledState), ddId(id), shardsAffectedByTeamFailure(new ShardsAffectedByTeamFailure),
processingUnhealthy(new AsyncVar<bool>(false)), processingWiggle(new AsyncVar<bool>(false)) {}
UID id() const { return ddId; }
void markTrackerCancelled() { trackerCancelled = true; }
bool isTrackerCancelled() const { return trackerCancelled; }
decltype(auto) usableRegions() { return configuration.usableRegions; }
bool isDDEnabled() const { return ddEnabledState->isDDEnabled(); };
void proposeRelocation(const RelocateShard& rs) const { return relocationQueue->relocationProducer.send(rs); }
void requestRestartShardTracker(KeyRange keys) const { return shardTracker->restartShardTracker.send(keys); }
};
#endif // FOUNDATIONDB_DDSHAREDCONTEXT_H

View File

@ -176,9 +176,7 @@ FDB_DECLARE_BOOLEAN_PARAM(WaitWiggle);
// send request/signal to DDTeamCollection through interface
// call synchronous method from components outside DDTeamCollection
struct IDDTeamCollection {
struct Interface {
PromiseStream<GetTeamRequest> getTeam;
};
PromiseStream<GetTeamRequest> getTeam;
virtual ~IDDTeamCollection() {}
};

View File

@ -405,105 +405,6 @@ int64_t getMaxShardSize(double dbSizeEstimate);
#pragma endregion
#endif
/////////////////////////////// DD Components //////////////////////////////////////
#ifndef __INTEL_COMPILER
#pragma region DD Components
#endif
class DDTeamCollection;
// send request/signal to DDTracker through interface
// call synchronous method from components outside DDTracker
struct IDDShardTracker {
struct Interface {
Promise<Void> readyToStart;
PromiseStream<GetMetricsRequest> getShardMetrics;
PromiseStream<GetTopKMetricsRequest> getTopKMetrics;
PromiseStream<GetMetricsListRequest> getShardMetricsList;
PromiseStream<KeyRange> restartShardTracker;
PromiseStream<Promise<int64_t>> getAverageShardBytes; // FIXME(xwang): change it to a synchronous call
};
virtual double getAverageShardBytes() = 0;
virtual ~IDDShardTracker() = 0;
};
// send request/signal to DDQueue through interface
// call synchronous method from components outside DDQueue
struct IDDQueue {
struct Interface {
PromiseStream<RelocateShard> relocationProducer, relocationConsumer;
PromiseStream<Promise<int>> getUnhealthyRelocationCount; // FIXME(xwang): change it to a synchronous call
};
virtual int getUnhealthyRelocationCount() = 0;
virtual ~IDDQueue() = 0;
};
// The common info shared by all DD components. Normally the DD components should share the reference to the same
// context.
struct DDContext : public ReferenceCounted<DDContext> {
// FIXME(xwang) mark fields privates
// private:
std::shared_ptr<DDEnabledState>
ddEnabledState; // Note: don't operate directly because it's shared with snapshot server
IDDShardTracker::Interface trackerInterface;
IDDQueue::Interface queueInterface;
// public:
UID ddId;
MoveKeysLock lock;
bool trackerCancelled = false;
DatabaseConfiguration configuration;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
Reference<AsyncVar<bool>> processingUnhealthy, processingWiggle;
DDContext() = default;
DDContext(UID id, std::shared_ptr<DDEnabledState> ddEnabledState)
: ddEnabledState(std::move(ddEnabledState)), ddId(id),
shardsAffectedByTeamFailure(new ShardsAffectedByTeamFailure), processingUnhealthy(new AsyncVar<bool>(false)),
processingWiggle(new AsyncVar<bool>(false)) {}
void proposeRelocation(const RelocateShard& rs) const { return queueInterface.relocationProducer.send(rs); }
void requestRestartShardTracker(KeyRange keys) const { return trackerInterface.restartShardTracker.send(keys); }
};
// provide common behavior to manage shared state. Beware of expose too much details
class DDComponent {
protected:
Reference<DDContext> context;
public:
DDComponent() : context(makeReference<DDContext>()) {}
explicit DDComponent(Reference<DDContext> context) : context(context) {}
UID id() const { return context->ddId; }
void markTrackerCancelled() { context->trackerCancelled = true; }
bool isTrackerCancelled() const { return context->trackerCancelled; }
decltype(auto) usableRegions() { return context->configuration.usableRegions; }
bool isDDEnabled() const { return context->ddEnabledState->isDDEnabled(); };
std::shared_ptr<DDEnabledState> getDDEnableState() { return context->ddEnabledState; }
Future<Standalone<VectorRef<DDMetricsRef>>> getDDMetricsList(const GetMetricsListRequest& req);
Reference<DDContext> rawContext() { return context; }
};
ACTOR Future<Void> dataDistributionTracker(Reference<DDContext> context,
Reference<InitialDataDistribution> initData,
Database cx,
KeyRangeMap<ShardTrackedData>* shards);
ACTOR Future<Void> dataDistributionQueue(Reference<DDContext> context, Database cx);
#ifndef __INTEL_COMPILER
#pragma endregion
#endif
// FIXME(xwang): Delete Old DD Actors once the refactoring is done
/////////////////////////////// Old DD Actors //////////////////////////////////////
#ifndef __INTEL_COMPILER
@ -552,6 +453,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
#ifndef __INTEL_COMPILER
#pragma region Perpetual Storage Wiggle
#endif
class DDTeamCollection;
struct StorageWiggleMetrics {
constexpr static FileIdentifier file_identifier = 4728961;