Flow transport uses an ordered delay to avoid out of order reply promise stream messages
This commit is contained in:
parent
26d886c600
commit
256a18e43b
|
@ -234,6 +234,8 @@ struct YieldMockNetwork final : INetwork, ReferenceCounted<YieldMockNetwork> {
|
|||
|
||||
Future<class Void> delay(double seconds, TaskPriority taskID) override { return nextTick.getFuture(); }
|
||||
|
||||
Future<class Void> orderedDelay(double seconds, TaskPriority taskID) override { return nextTick.getFuture(); }
|
||||
|
||||
Future<class Void> yield(TaskPriority taskID) override {
|
||||
if (check_yield(taskID))
|
||||
return delay(0, taskID);
|
||||
|
|
|
@ -922,9 +922,9 @@ ACTOR static void deliver(TransportData* self,
|
|||
// We want to run the task at the right priority. If the priority is higher than the current priority (which is
|
||||
// ReadSocket) we can just upgrade. Otherwise we'll context switch so that we don't block other tasks that might run
|
||||
// with a higher priority. ReplyPromiseStream needs to guarentee that messages are recieved in the order they were
|
||||
// sent, so even in the case of local delivery those messages need to skip this delay.
|
||||
if (priority < TaskPriority::ReadSocket || (priority != TaskPriority::NoDeliverDelay && !inReadSocket)) {
|
||||
wait(delay(0, priority));
|
||||
// sent, so we are using orderedDelay.
|
||||
if (priority < TaskPriority::ReadSocket || !inReadSocket) {
|
||||
wait(orderedDelay(0, priority));
|
||||
} else {
|
||||
g_network->setCurrentTask(priority);
|
||||
}
|
||||
|
|
|
@ -361,7 +361,7 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
|
|||
FlowTransport::transport().sendUnreliable(
|
||||
SerializeSource<ErrorOr<AcknowledgementReply>>(
|
||||
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
|
||||
acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
|
||||
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
@ -378,7 +378,7 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
|
|||
acknowledgements.bytesAcknowledged += res.expectedSize();
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>(
|
||||
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
|
||||
acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
|
||||
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
|
||||
false);
|
||||
}
|
||||
return res;
|
||||
|
@ -389,13 +389,13 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
|
|||
// Notify the server that a client is not using this ReplyPromiseStream anymore
|
||||
FlowTransport::transport().sendUnreliable(
|
||||
SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
|
||||
acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
|
||||
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
|
||||
false);
|
||||
}
|
||||
if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) {
|
||||
// The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(broken_promise()),
|
||||
getEndpoint(TaskPriority::NoDeliverDelay),
|
||||
getEndpoint(TaskPriority::ReadSocket),
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
@ -406,9 +406,6 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
|
|||
template <class T>
|
||||
class ReplyPromiseStream {
|
||||
public:
|
||||
// The endpoints of a ReplyPromiseStream must be initialized at Task::NoDeliverDelay, because a
|
||||
// delay(0) in FlowTransport deliver can cause out of order delivery.
|
||||
|
||||
// stream.send( request )
|
||||
// Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times)
|
||||
|
||||
|
@ -416,7 +413,7 @@ public:
|
|||
void send(U&& value) const {
|
||||
if (queue->isRemoteEndpoint()) {
|
||||
if (!queue->acknowledgements.getRawEndpoint().isValid()) {
|
||||
value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay).token;
|
||||
value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::ReadSocket).token;
|
||||
}
|
||||
queue->acknowledgements.bytesSent += value.expectedSize();
|
||||
FlowTransport::transport().sendUnreliable(
|
||||
|
@ -477,7 +474,7 @@ public:
|
|||
errors->delPromiseRef();
|
||||
}
|
||||
|
||||
const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::NoDeliverDelay); }
|
||||
const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); }
|
||||
|
||||
bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; }
|
||||
bool isEmpty() const { return !queue->isReady(); }
|
||||
|
|
|
@ -858,13 +858,17 @@ public:
|
|||
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
|
||||
return delay(seconds, taskID, currentProcess);
|
||||
}
|
||||
Future<class Void> delay(double seconds, TaskPriority taskID, ProcessInfo* machine) {
|
||||
Future<class Void> orderedDelay(double seconds, TaskPriority taskID) override {
|
||||
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
|
||||
return delay(seconds, taskID, currentProcess, true);
|
||||
}
|
||||
Future<class Void> delay(double seconds, TaskPriority taskID, ProcessInfo* machine, bool ordered = false) {
|
||||
ASSERT(seconds >= -0.0001);
|
||||
seconds = std::max(0.0, seconds);
|
||||
Future<Void> f;
|
||||
|
||||
if (!currentProcess->rebooting && machine == currentProcess && !currentProcess->shutdownSignal.isSet() &&
|
||||
FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 &&
|
||||
if (!ordered && !currentProcess->rebooting && machine == currentProcess &&
|
||||
!currentProcess->shutdownSignal.isSet() && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 &&
|
||||
deterministicRandom()->random01() < 0.25) { // FIXME: why doesnt this work when we are changing machines?
|
||||
seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY * pow(deterministicRandom()->random01(), 1000.0);
|
||||
}
|
||||
|
|
|
@ -161,6 +161,7 @@ public:
|
|||
double timer() override { return ::timer(); };
|
||||
double timer_monotonic() override { return ::timer_monotonic(); };
|
||||
Future<Void> delay(double seconds, TaskPriority taskId) override;
|
||||
Future<Void> orderedDelay(double seconds, TaskPriority taskId) override;
|
||||
Future<class Void> yield(TaskPriority taskID) override;
|
||||
bool check_yield(TaskPriority taskId) override;
|
||||
TaskPriority getCurrentTask() const override { return currentTaskID; }
|
||||
|
@ -1750,6 +1751,11 @@ Future<Void> Net2::delay(double seconds, TaskPriority taskId) {
|
|||
return t->promise.getFuture();
|
||||
}
|
||||
|
||||
Future<Void> Net2::orderedDelay(double seconds, TaskPriority taskId) {
|
||||
// The regular delay already provides the required ordering property
|
||||
return delay(seconds, taskId);
|
||||
}
|
||||
|
||||
void Net2::onMainThread(Promise<Void>&& signal, TaskPriority taskID) {
|
||||
if (stopped)
|
||||
return;
|
||||
|
|
|
@ -1087,6 +1087,9 @@ inline double now() {
|
|||
inline Future<Void> delay(double seconds, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
||||
return g_network->delay(seconds, taskID);
|
||||
}
|
||||
inline Future<Void> orderedDelay(double seconds, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
||||
return g_network->orderedDelay(seconds, taskID);
|
||||
}
|
||||
inline Future<Void> delayUntil(double time, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
||||
return g_network->delay(std::max(0.0, time - g_network->now()), taskID);
|
||||
}
|
||||
|
|
|
@ -45,7 +45,6 @@ enum class TaskPriority {
|
|||
WriteSocket = 10000,
|
||||
PollEIO = 9900,
|
||||
DiskIOComplete = 9150,
|
||||
NoDeliverDelay = 9100,
|
||||
LoadBalancedEndpoint = 9000,
|
||||
ReadSocket = 9000,
|
||||
AcceptSocket = 8950,
|
||||
|
@ -507,6 +506,10 @@ public:
|
|||
virtual Future<class Void> delay(double seconds, TaskPriority taskID) = 0;
|
||||
// The given future will be set after seconds have elapsed
|
||||
|
||||
virtual Future<class Void> orderedDelay(double seconds, TaskPriority taskID) = 0;
|
||||
// The given future will be set after seconds have elapsed, delays with the same time and TaskPriority will be
|
||||
// executed in the order they were issues
|
||||
|
||||
virtual Future<class Void> yield(TaskPriority taskID) = 0;
|
||||
// The given future will be set immediately or after higher-priority tasks have executed
|
||||
|
||||
|
|
Loading…
Reference in New Issue