Add IConfigTransaction::getRange (not yet tested)
This commit is contained in:
parent
1a6dcd6677
commit
d6fa06afdd
|
@ -92,10 +92,40 @@ struct ConfigTransactionCommitRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct ConfigTransactionGetRangeReply {
|
||||
static constexpr FileIdentifier file_identifier = 430263;
|
||||
Standalone<RangeResultRef> range;
|
||||
|
||||
ConfigTransactionGetRangeReply() = default;
|
||||
explicit ConfigTransactionGetRangeReply(Standalone<RangeResultRef> range) : range(range) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, range);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigTransactionGetRangeRequest {
|
||||
static constexpr FileIdentifier file_identifier = 987410;
|
||||
Version version;
|
||||
Standalone<KeyRangeRef> keys;
|
||||
ReplyPromise<ConfigTransactionGetRangeReply> reply;
|
||||
|
||||
ConfigTransactionGetRangeRequest() = default;
|
||||
explicit ConfigTransactionGetRangeRequest(Version version, Standalone<KeyRangeRef> keys)
|
||||
: version(version), keys(keys) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, keys, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigTransactionInterface {
|
||||
static constexpr FileIdentifier file_identifier = 982485;
|
||||
struct RequestStream<ConfigTransactionGetVersionRequest> getVersion;
|
||||
struct RequestStream<ConfigTransactionGetRequest> get;
|
||||
struct RequestStream<ConfigTransactionGetRangeRequest> getRange;
|
||||
struct RequestStream<ConfigTransactionCommitRequest> commit;
|
||||
|
||||
ConfigTransactionInterface() = default;
|
||||
|
@ -103,15 +133,17 @@ struct ConfigTransactionInterface {
|
|||
void setupWellKnownEndpoints() {
|
||||
getVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GETVERSION, TaskPriority::Coordination);
|
||||
get.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GET, TaskPriority::Coordination);
|
||||
getRange.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GETRANGE, TaskPriority::Coordination);
|
||||
commit.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_COMMIT, TaskPriority::Coordination);
|
||||
}
|
||||
|
||||
ConfigTransactionInterface(NetworkAddress const& remote)
|
||||
: getVersion(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GETVERSION)),
|
||||
get(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GET)), commit(Endpoint({ remote }, WLTOKEN_CONFIGTXN_COMMIT)) {}
|
||||
get(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GET)), getRange(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GETRANGE)),
|
||||
commit(Endpoint({ remote }, WLTOKEN_CONFIGTXN_COMMIT)) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, getVersion, get, commit);
|
||||
serializer(ar, getVersion, get, getRange, commit);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -36,7 +36,8 @@ constexpr UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE(-1, 3);
|
|||
constexpr UID WLTOKEN_PROTOCOL_INFO(-1, 10);
|
||||
constexpr UID WLTOKEN_CONFIGTXN_GETVERSION(-1, 11);
|
||||
constexpr UID WLTOKEN_CONFIGTXN_GET(-1, 12);
|
||||
constexpr UID WLTOKEN_CONFIGTXN_COMMIT(-1, 13);
|
||||
constexpr UID WLTOKEN_CONFIGTXN_GETRANGE(-1, 13);
|
||||
constexpr UID WLTOKEN_CONFIGTXN_COMMIT(-1, 14);
|
||||
|
||||
struct ClientLeaderRegInterface {
|
||||
RequestStream<struct GetLeaderRequest> getLeader;
|
||||
|
|
|
@ -33,6 +33,7 @@ public:
|
|||
virtual void set(KeyRef key, ValueRef value) = 0;
|
||||
virtual void clearRange(KeyRef begin, KeyRef end) = 0;
|
||||
virtual Future<Optional<Value>> get(KeyRef) = 0;
|
||||
virtual Future<Standalone<RangeResultRef>> getRange(KeyRangeRef) = 0;
|
||||
virtual Future<Void> commit() = 0;
|
||||
virtual Future<Void> onError(Error const&) = 0;
|
||||
virtual Future<Version> getVersion() = 0;
|
||||
|
@ -49,6 +50,7 @@ public:
|
|||
void set(KeyRef begin, KeyRef end) override;
|
||||
void clearRange(KeyRef begin, KeyRef end) override;
|
||||
Future<Optional<Value>> get(KeyRef) override;
|
||||
Future<Standalone<RangeResultRef>> getRange(KeyRangeRef) override;
|
||||
Future<Void> commit() override;
|
||||
Future<Void> onError(Error const&) override;
|
||||
Future<Version> getVersion() override;
|
||||
|
|
|
@ -47,6 +47,16 @@ class SimpleConfigTransactionImpl {
|
|||
return result.value;
|
||||
}
|
||||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> getRange(SimpleConfigTransactionImpl* self, KeyRangeRef keys) {
|
||||
if (!self->version.isValid()) {
|
||||
self->version = getVersion(self);
|
||||
}
|
||||
Version version = wait(self->version);
|
||||
ConfigTransactionGetRangeReply result =
|
||||
wait(self->cti.getRange.getReply(ConfigTransactionGetRangeRequest(version, keys)));
|
||||
return result.range;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> commit(SimpleConfigTransactionImpl* self) {
|
||||
if (!self->version.isValid()) {
|
||||
self->version = getVersion(self);
|
||||
|
@ -73,6 +83,8 @@ public:
|
|||
|
||||
Future<Optional<Value>> get(KeyRef key) { return get(this, key); }
|
||||
|
||||
Future<Standalone<RangeResultRef>> getRange(KeyRangeRef keys) { return getRange(this, keys); }
|
||||
|
||||
Future<Void> commit() { return commit(this); }
|
||||
|
||||
Future<Void> onError(Error const& e) {
|
||||
|
@ -113,6 +125,10 @@ Future<Optional<Value>> SimpleConfigTransaction::get(KeyRef key) {
|
|||
return impl->get(key);
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> SimpleConfigTransaction::getRange(KeyRangeRef keys) {
|
||||
return impl->getRange(keys);
|
||||
}
|
||||
|
||||
Future<Void> SimpleConfigTransaction::commit() {
|
||||
return impl->commit();
|
||||
}
|
||||
|
|
|
@ -334,7 +334,7 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
|
|||
}
|
||||
|
||||
TransportData::TransportData(uint64_t transportId)
|
||||
: endpoints(/*wellKnownTokenCount*/ 18), endpointNotFoundReceiver(endpoints), pingReceiver(endpoints),
|
||||
: endpoints(/*wellKnownTokenCount*/ 19), endpointNotFoundReceiver(endpoints), pingReceiver(endpoints),
|
||||
warnAlwaysForLargePacket(true), lastIncompatibleMessage(0), transportId(transportId),
|
||||
numIncompatibleConnections(0) {
|
||||
degraded = makeReference<AsyncVar<bool>>(false);
|
||||
|
|
|
@ -31,10 +31,10 @@ constexpr UID WLTOKEN_LEADERELECTIONREG_FORWARD(-1, 7);
|
|||
constexpr UID WLTOKEN_GENERATIONREG_READ(-1, 8);
|
||||
constexpr UID WLTOKEN_GENERATIONREG_WRITE(-1, 9);
|
||||
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETVERSION(-1, 14);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETFULLDB(-1, 15);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETCHANGES(-1, 16);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_COMPACT(-1, 17);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETVERSION(-1, 15);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETFULLDB(-1, 16);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETCHANGES(-1, 17);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_COMPACT(-1, 18);
|
||||
|
||||
struct GenerationRegInterface {
|
||||
constexpr static FileIdentifier file_identifier = 16726744;
|
||||
|
|
|
@ -177,6 +177,41 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getRange(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetRangeRequest req) {
|
||||
wait(self->globalLock.take());
|
||||
state FlowLock::Releaser releaser(self->globalLock);
|
||||
Version currentVersion = wait(getLiveTransactionVersion(self));
|
||||
if (req.version != currentVersion) {
|
||||
req.reply.sendError(transaction_too_old());
|
||||
return Void();
|
||||
}
|
||||
state Standalone<RangeResultRef> range = wait(self->kvStore->readRange(req.keys));
|
||||
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, 0, req.version));
|
||||
for (const auto& versionedMutation : versionedMutations) {
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
if (mutation.type == MutationRef::Type::SetValue) {
|
||||
for (auto& kv : range) {
|
||||
if (kv.key == mutation.param1) {
|
||||
kv.value = mutation.param2;
|
||||
}
|
||||
}
|
||||
} else if (mutation.type == MutationRef::Type::ClearRange) {
|
||||
// FIXME: This is very inefficient
|
||||
Standalone<RangeResultRef> newRange;
|
||||
for (const auto& kv : range) {
|
||||
if (kv.key < mutation.param1 || kv.key > mutation.param2) {
|
||||
newRange.push_back_deep(newRange.arena(), kv);
|
||||
}
|
||||
}
|
||||
range = std::move(newRange);
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
req.reply.send(ConfigTransactionGetRangeReply(range));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> commit(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionCommitRequest req) {
|
||||
wait(self->globalLock.take());
|
||||
state FlowLock::Releaser releaser(self->globalLock);
|
||||
|
@ -227,6 +262,9 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
when(ConfigTransactionCommitRequest req = waitNext(cti->commit.getFuture())) {
|
||||
self->actors.add(commit(self, req));
|
||||
}
|
||||
when(ConfigTransactionGetRangeRequest req = waitNext(cti->getRange.getFuture())) {
|
||||
self->actors.add(getRange(self, req));
|
||||
}
|
||||
when(wait(self->actors.getResult())) { ASSERT(false); }
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue