generalized read guards, allow for penalty+error
This commit is contained in:
parent
207049e852
commit
b944e0b116
|
@ -67,11 +67,12 @@ struct ModelHolder : NonCopyable, public ReferenceCounted<ModelHolder> {
|
|||
|
||||
struct LoadBalancedReply {
|
||||
double penalty;
|
||||
Optional<Error> error;
|
||||
LoadBalancedReply() : penalty(1.0) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar &ar) {
|
||||
serializer(ar, penalty);
|
||||
serializer(ar, penalty, error);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -94,6 +95,10 @@ bool checkAndProcessResult(ErrorOr<T> result, Reference<ModelHolder> holder, boo
|
|||
}
|
||||
|
||||
holder->release(receivedResponse, futureVersion, loadBalancedReply.present() ? loadBalancedReply.get().penalty : -1.0);
|
||||
|
||||
if (loadBalancedReply.present() && loadBalancedReply.get().error.present()) {
|
||||
throw loadBalancedReply.get().error.get();
|
||||
}
|
||||
|
||||
if(result.present()) {
|
||||
return true;
|
||||
|
|
|
@ -592,7 +592,23 @@ public:
|
|||
}
|
||||
|
||||
double getPenalty() {
|
||||
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
|
||||
return std::max(std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER -
|
||||
2.0 * SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) /
|
||||
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER),
|
||||
1.0 - currentRate);
|
||||
}
|
||||
|
||||
template<class Request, class HandleFunction>
|
||||
Future<Void> readGuard(const Request& request, const HandleFunction& fun) {
|
||||
if (currentRate < 0.999 && g_random->random01() > currentRate) {
|
||||
//request.error = future_version();
|
||||
REPLY_TYPE(Request) reply;
|
||||
reply.error = future_version();
|
||||
reply.penalty = 1.0 - currentRate;
|
||||
request.reply.send(reply);
|
||||
return Void();
|
||||
}
|
||||
return fun(this, request);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -3432,14 +3448,9 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
if( req.debugID.present() )
|
||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.recieved"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
||||
if (g_random->random01() > self->currentRate) {
|
||||
req.reply.sendError(future_version());
|
||||
} else {
|
||||
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
|
||||
req.reply.send(GetValueReply());
|
||||
else
|
||||
actors.add( getValueQ( self, req ) );
|
||||
}
|
||||
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
|
||||
req.reply.send(GetValueReply());
|
||||
actors.add(self->readGuard(req , getValueQ));
|
||||
}
|
||||
when( WatchValueRequest req = waitNext(ssi.watchValue.getFuture()) ) {
|
||||
// TODO: fast load balancing?
|
||||
|
@ -3452,19 +3463,11 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
}
|
||||
when (GetKeyRequest req = waitNext(ssi.getKey.getFuture())) {
|
||||
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
|
||||
if (g_random->random01() > self->currentRate) {
|
||||
req.reply.sendError(future_version());
|
||||
} else {
|
||||
actors.add( getKey( self, req ) );
|
||||
}
|
||||
actors.add(self->readGuard(req , getKey));
|
||||
}
|
||||
when (GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture()) ) {
|
||||
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
|
||||
if (g_random->random01() > self->currentRate) {
|
||||
req.reply.sendError(future_version());
|
||||
} else {
|
||||
actors.add( getKeyValues( self, req ) );
|
||||
}
|
||||
actors.add(self->readGuard(req , getKeyValues));
|
||||
}
|
||||
when (GetShardStateRequest req = waitNext(ssi.getShardState.getFuture()) ) {
|
||||
if (req.mode == GetShardStateRequest::NO_WAIT ) {
|
||||
|
|
Loading…
Reference in New Issue