fix: The delay inside the disabledMap was causing the storage server updateStorage actor to run on the client process
This commit is contained in:
parent
dccb9bc26d
commit
55f7e7d372
|
@ -84,19 +84,6 @@ void ISimulator::displayWorkers() const
|
|||
return;
|
||||
}
|
||||
|
||||
void ISimulator::disableFor(const std::string& desc, double time) {
|
||||
disabledMap[desc] = map(::delay(time), [this, desc](Void v){ disabledMap.erase(desc); return Void(); });
|
||||
}
|
||||
|
||||
Future<Void> ISimulator::checkDisabled(const std::string& desc) const
|
||||
{
|
||||
auto iter = disabledMap.find(desc);
|
||||
if (iter != disabledMap.end()) {
|
||||
return iter->second;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
namespace std {
|
||||
template<>
|
||||
class hash<Endpoint> {
|
||||
|
|
|
@ -311,8 +311,18 @@ public:
|
|||
virtual flowGlobalType global(int id) { return getCurrentProcess()->global(id); };
|
||||
virtual void setGlobal(size_t id, flowGlobalType v) { getCurrentProcess()->setGlobal(id,v); };
|
||||
|
||||
Future<Void> checkDisabled(const std::string& desc) const;
|
||||
void disableFor(const std::string& desc, double time);
|
||||
virtual void disableFor(const std::string& desc, double time) {
|
||||
disabledMap[desc] = time;
|
||||
}
|
||||
|
||||
virtual double checkDisabled(const std::string& desc) const
|
||||
{
|
||||
auto iter = disabledMap.find(desc);
|
||||
if (iter != disabledMap.end()) {
|
||||
return iter->second;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static thread_local ProcessInfo* currentProcess;
|
||||
protected:
|
||||
|
@ -323,7 +333,7 @@ private:
|
|||
std::map<NetworkAddress, int> excludedAddresses;
|
||||
std::map<NetworkAddress, int> clearedAddresses;
|
||||
std::map<NetworkAddress, std::map<std::string, int>> roleAddresses;
|
||||
std::map<std::string, Future<Void>> disabledMap;
|
||||
std::map<std::string, double> disabledMap;
|
||||
bool allSwapsDisabled;
|
||||
};
|
||||
|
||||
|
|
|
@ -2863,7 +2863,10 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
loop {
|
||||
ASSERT( data->durableVersion.get() == data->storageVersion() );
|
||||
if (g_network->isSimulated()) {
|
||||
wait(g_pSimulator->checkDisabled(format("%s/updateStorage", data->thisServerID.toString().c_str())));
|
||||
double endTime = g_simulator.checkDisabled(format("%s/updateStorage", data->thisServerID.toString().c_str()));
|
||||
if(endTime > now()) {
|
||||
wait(delay(endTime - now(), TaskUpdateStorage));
|
||||
}
|
||||
}
|
||||
wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
|
||||
wait( delay(0, TaskUpdateStorage) );
|
||||
|
|
|
@ -115,7 +115,7 @@ struct LocalRatekeeperWorkload : TestWorkload {
|
|||
ACTOR static Future<Void> _start(LocalRatekeeperWorkload* self, Database cx) {
|
||||
wait(delay(self->startAfter));
|
||||
state StorageServerInterface ssi = wait(getRandomStorage(cx));
|
||||
g_simulator.disableFor(format("%s/updateStorage", ssi.id().toString().c_str()), self->blockWritesFor);
|
||||
g_simulator.disableFor(format("%s/updateStorage", ssi.id().toString().c_str()), now() + self->blockWritesFor);
|
||||
state Future<Void> done = delay(self->blockWritesFor);
|
||||
// not much will happen until the storage goes over the soft limit
|
||||
wait(delay(double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_SOFT_MAX/1e6)));
|
||||
|
|
Loading…
Reference in New Issue