s/subcription/subscription
This commit is contained in:
parent
c32c3b6ec4
commit
27c08b04e5
|
@ -35,10 +35,10 @@ uint64_t valueToUInt64(const StringRef& v) {
|
|||
Key keyForInbox(uint64_t inbox) {
|
||||
return StringRef(format("i/%016llx", inbox));
|
||||
}
|
||||
Key keyForInboxSubcription(uint64_t inbox, uint64_t feed) {
|
||||
Key keyForInboxSubscription(uint64_t inbox, uint64_t feed) {
|
||||
return StringRef(format("i/%016llx/subs/%016llx", inbox, feed));
|
||||
}
|
||||
Key keyForInboxSubcriptionCount(uint64_t inbox) {
|
||||
Key keyForInboxSubscriptionCount(uint64_t inbox) {
|
||||
return StringRef(format("i/%016llx/subsCnt", inbox));
|
||||
}
|
||||
Key keyForInboxStalePrefix(uint64_t inbox) {
|
||||
|
@ -142,7 +142,7 @@ ACTOR Future<uint64_t> _createInbox(Database cx, Standalone<StringRef> metadata)
|
|||
val = v;
|
||||
}
|
||||
tr.set(keyForInbox(id), metadata);
|
||||
tr.set(keyForInboxSubcriptionCount(id), uInt64ToValue(0));
|
||||
tr.set(keyForInboxSubscriptionCount(id), uInt64ToValue(0));
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -156,13 +156,13 @@ Future<uint64_t> PubSub::createInbox(Standalone<StringRef> metadata) {
|
|||
return _createInbox(cx, metadata);
|
||||
}
|
||||
|
||||
ACTOR Future<bool> _createSubcription(Database cx, uint64_t feed, uint64_t inbox) {
|
||||
ACTOR Future<bool> _createSubscription(Database cx, uint64_t feed, uint64_t inbox) {
|
||||
state Transaction tr(cx);
|
||||
TraceEvent("PubSubCreateSubscription").detail("Feed", feed).detail("Inbox", inbox);
|
||||
loop {
|
||||
try {
|
||||
Optional<Value> subcription = wait(tr.get(keyForInboxSubcription(inbox, feed)));
|
||||
if (subcription.present()) {
|
||||
Optional<Value> subscription = wait(tr.get(keyForInboxSubscription(inbox, feed)));
|
||||
if (subscription.present()) {
|
||||
// For idempotency, this could exist from a previous transaction from us that succeeded
|
||||
return true;
|
||||
}
|
||||
|
@ -176,10 +176,10 @@ ACTOR Future<bool> _createSubcription(Database cx, uint64_t feed, uint64_t inbox
|
|||
}
|
||||
|
||||
// Update the subscriptions of the inbox
|
||||
Optional<Value> subcriptionCountVal = wait(tr.get(keyForInboxSubcriptionCount(inbox)));
|
||||
uint64_t subcriptionCount = valueToUInt64(subcriptionCountVal.get()); // throws if count not present
|
||||
tr.set(keyForInboxSubcription(inbox, feed), StringRef());
|
||||
tr.set(keyForInboxSubcriptionCount(inbox), uInt64ToValue(subcriptionCount + 1));
|
||||
Optional<Value> subscriptionCountVal = wait(tr.get(keyForInboxSubscriptionCount(inbox)));
|
||||
uint64_t subscriptionCount = valueToUInt64(subscriptionCountVal.get()); // throws if count not present
|
||||
tr.set(keyForInboxSubscription(inbox, feed), StringRef());
|
||||
tr.set(keyForInboxSubscriptionCount(inbox), uInt64ToValue(subscriptionCount + 1));
|
||||
|
||||
// Update the subcribers of the feed
|
||||
Optional<Value> subcriberCountVal = wait(tr.get(keyForFeedSubcriberCount(feed)));
|
||||
|
@ -199,8 +199,8 @@ ACTOR Future<bool> _createSubcription(Database cx, uint64_t feed, uint64_t inbox
|
|||
return true;
|
||||
}
|
||||
|
||||
Future<bool> PubSub::createSubcription(uint64_t feed, uint64_t inbox) {
|
||||
return _createSubcription(cx, feed, inbox);
|
||||
Future<bool> PubSub::createSubscription(uint64_t feed, uint64_t inbox) {
|
||||
return _createSubscription(cx, feed, inbox);
|
||||
}
|
||||
|
||||
// Since we are not relying on "read-your-own-writes", we need to keep track of
|
||||
|
@ -410,7 +410,7 @@ ACTOR Future<std::vector<Message>> _listInboxMessages(Database cx, uint64_t inbo
|
|||
state std::map<MessageId, Feed> feedLatest;
|
||||
try {
|
||||
// Fetch all cached entries for all the feeds to which we are subscribed
|
||||
Optional<Value> cntValue = wait(tr.get(keyForInboxSubcriptionCount(inbox)));
|
||||
Optional<Value> cntValue = wait(tr.get(keyForInboxSubscriptionCount(inbox)));
|
||||
uint64_t subscriptions = valueToUInt64(cntValue.get());
|
||||
state RangeResult feeds = wait(tr.getRange(firstGreaterOrEqual(keyForInboxCacheByID(inbox, 0)),
|
||||
firstGreaterOrEqual(keyForInboxCacheByID(inbox, UINT64_MAX)),
|
||||
|
|
|
@ -81,7 +81,7 @@ public:
|
|||
|
||||
Future<Inbox> createInbox(Standalone<StringRef> metadata);
|
||||
|
||||
Future<bool> createSubcription(Feed feed, Inbox inbox);
|
||||
Future<bool> createSubscription(Feed feed, Inbox inbox);
|
||||
|
||||
Future<MessageId> postMessage(Feed feed, Standalone<StringRef> data);
|
||||
|
||||
|
|
Loading…
Reference in New Issue