Introduced fetchCheckpointKeyValues API in StorageServerInterface.

This commit is contained in:
He Liu 2022-04-08 15:37:53 -07:00 committed by He Liu
parent 01234720e7
commit bdc22646c1
1 changed files with 35 additions and 0 deletions

View File

@ -89,6 +89,7 @@ struct StorageServerInterface {
RequestStream<struct ChangeFeedVersionUpdateRequest> changeFeedVersionUpdate;
RequestStream<struct GetCheckpointRequest> checkpoint;
RequestStream<struct FetchCheckpointRequest> fetchCheckpoint;
RequestStream<struct FetchCheckpointKeyValuesRequest> fetchCheckpointKeyValues;
private:
bool acceptingRequests;
@ -155,6 +156,8 @@ public:
checkpoint = RequestStream<struct GetCheckpointRequest>(getValue.getEndpoint().getAdjustedEndpoint(19));
fetchCheckpoint =
RequestStream<struct FetchCheckpointRequest>(getValue.getEndpoint().getAdjustedEndpoint(20));
fetchCheckpointKeyValues = RequestStream<struct FetchCheckpointKeyValuesRequest>(
getValue.getEndpoint().getAdjustedEndpoint(21));
}
} else {
ASSERT(Ar::isDeserializing);
@ -204,6 +207,7 @@ public:
streams.push_back(changeFeedVersionUpdate.getReceiver());
streams.push_back(checkpoint.getReceiver());
streams.push_back(fetchCheckpoint.getReceiver());
streams.push_back(fetchCheckpointKeyValues.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
};
@ -920,6 +924,37 @@ struct FetchCheckpointRequest {
}
};
struct FetchCheckpointKeyValuesStreamReply : public ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 13804353;
Arena arena;
VectorRef<KeyValueRef> data;
FetchCheckpointKeyValuesStreamReply() = default;
int expectedSize() const { return data.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, data, arena);
}
};
// Fetch checkpoint in the format of key-value pairs.
struct FetchCheckpointKeyValuesRequest {
constexpr static FileIdentifier file_identifier = 13804354;
UID checkpointID;
KeyRange range;
ReplyPromiseStream<FetchCheckpointKeyValuesStreamReply> reply;
FetchCheckpointKeyValuesRequest() = default;
FetchCheckpointKeyValuesRequest(UID checkpointID, KeyRange range) : checkpointID(checkpointID), range(range) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, checkpointID, range, reply);
}
};
struct OverlappingChangeFeedEntry {
Key rangeId;
KeyRange range;