Merge pull request #3997 from ajbeamon/grv-proxy-model

A model used to quickly simulate various GRV scenarios and algorithms
This commit is contained in:
Jingyu Zhou 2021-05-03 13:52:49 -07:00 committed by GitHub
commit 64d01e9303
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1023 additions and 0 deletions

View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
#
# grv_test.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import inspect
import sys
import rate_model
import workload_model
import proxy_model
import ratekeeper_model
from priority import Priority
from plot import Plotter
parser = argparse.ArgumentParser()
parser.add_argument('-w', '--workload', type=str, help='Name of workload to run')
parser.add_argument('-r', '--ratekeeper', type=str, help='Name of ratekeeper model')
parser.add_argument('-d', '--duration', type=int, default=240, help='Duration of simulated test, in seconds. Defaults to 240.')
parser.add_argument('-L', '--limiter', type=str, default='Original', help='Name of limiter implementation. Defaults to \'Original\'.')
parser.add_argument('-p', '--proxy', type=str, default='ProxyModel', help='Name of proxy implementation. Defaults to \'ProxyModel\'.')
parser.add_argument('--list', action='store_true', default=False, help='List options for all models.')
parser.add_argument('--no-graph', action='store_true', default=False, help='Disable graphical output.')
args = parser.parse_args()
def print_choices_list(context=None):
if context == 'workload' or context is None:
print('Workloads:')
for w in workload_model.predefined_workloads.keys():
print(' %s' % w)
if context == 'ratekeeper' or context is None:
print('\nRatekeeper models:')
for r in ratekeeper_model.predefined_ratekeeper.keys():
print(' %s' % r)
proxy_model_classes = [c for c in [getattr(proxy_model, a) for a in dir(proxy_model)] if inspect.isclass(c)]
if context == 'proxy' or context is None:
print('\nProxy models:')
for p in proxy_model_classes:
if issubclass(p, proxy_model.ProxyModel):
print(' %s' % p.__name__)
if context == 'limiter' or context is None:
print('\nProxy limiters:')
for p in proxy_model_classes:
if issubclass(p, proxy_model.Limiter) and p != proxy_model.Limiter:
name = p.__name__
if name.endswith('Limiter'):
name = name[0:-len('Limiter')]
print(' %s' % name)
if args.workload is None or args.ratekeeper is None:
print('ERROR: A workload (-w/--workload) and ratekeeper model (-r/--ratekeeper) must be specified.\n')
print_choices_list()
sys.exit(1)
if args.list:
print_choices_list()
sys.exit(0)
def validate_class_type(var, name, superclass):
cls = getattr(var, name, None)
return cls is not None and inspect.isclass(cls) and issubclass(cls, superclass)
if not args.ratekeeper in ratekeeper_model.predefined_ratekeeper:
print('Invalid ratekeeper model `%s\'' % args.ratekeeper)
print_choices_list('ratekeeper')
sys.exit(1)
if not args.workload in workload_model.predefined_workloads:
print('Invalid workload model `%s\'' % args.workload)
print_choices_list('workload')
sys.exit(1)
if not validate_class_type(proxy_model, args.proxy, proxy_model.ProxyModel):
print('Invalid proxy model `%s\'' % args.proxy)
print_choices_list('proxy')
sys.exit(1)
limiter_name = args.limiter
if not validate_class_type(proxy_model, limiter_name, proxy_model.Limiter):
limiter_name += 'Limiter'
if not validate_class_type(proxy_model, limiter_name, proxy_model.Limiter):
print('Invalid proxy limiter `%s\'' % args.limiter)
print_choices_list('limiter')
sys.exit(1)
ratekeeper = ratekeeper_model.predefined_ratekeeper[args.ratekeeper]
workload = workload_model.predefined_workloads[args.workload]
limiter = getattr(proxy_model, limiter_name)
proxy = getattr(proxy_model, args.proxy)(args.duration, ratekeeper, workload, limiter)
proxy.run()
for priority in workload.priorities():
latencies = sorted([p for t in proxy.results.latencies[priority].values() for p in t])
total_started = sum(proxy.results.started[priority].values())
still_queued = sum([r.count for r in proxy.request_queue if r.priority == priority])
if len(latencies) > 0:
print('\n%s: %d requests in %d seconds (rate=%f). %d still queued.' % (priority, total_started, proxy.time, float(total_started)/proxy.time, still_queued))
print(' Median latency: %f' % latencies[len(latencies)//2])
print(' 90%% latency: %f' % latencies[int(0.9*len(latencies))])
print(' 99%% latency: %f' % latencies[int(0.99*len(latencies))])
print(' 99.9%% latency: %f' % latencies[int(0.999*len(latencies))])
print(' Max latency: %f' % latencies[-1])
print('')
if not args.no_graph:
plotter = Plotter(proxy.results)
plotter.display()

107
contrib/grv_proxy_model/plot.py Executable file
View File

@ -0,0 +1,107 @@
#
# plot.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import matplotlib.pyplot as plt
class Plotter:
def __init__(self, results):
self.results = results
def add_plot(data, time_resolution, label, use_avg=False):
out_data = {}
counts = {}
for t in data.keys():
out_data.setdefault(t//time_resolution*time_resolution, 0)
counts.setdefault(t//time_resolution*time_resolution, 0)
out_data[t//time_resolution*time_resolution] += data[t]
counts[t//time_resolution*time_resolution] += 1
if use_avg:
out_data = { t: v/counts[t] for t,v in out_data.items() }
plt.plot(list(out_data.keys()), list(out_data.values()), label=label)
def add_plot_with_times(data, label):
plt.plot(list(data.keys()), list(data.values()), label=label)
def display(self, time_resolution=0.1):
plt.figure(figsize=(40,9))
plt.subplot(3, 3, 1)
for priority in self.results.started.keys():
Plotter.add_plot(self.results.started[priority], time_resolution, priority)
plt.xlabel('Time (s)')
plt.ylabel('Released/s')
plt.legend()
plt.subplot(3, 3, 2)
for priority in self.results.queued.keys():
Plotter.add_plot(self.results.queued[priority], time_resolution, priority)
plt.xlabel('Time (s)')
plt.ylabel('Requests/s')
plt.legend()
plt.subplot(3, 3, 3)
for priority in self.results.unprocessed_queue_sizes.keys():
data = {k: max(v) for (k,v) in self.results.unprocessed_queue_sizes[priority].items()}
Plotter.add_plot(data, time_resolution, priority)
plt.xlabel('Time (s)')
plt.ylabel('Max queue size')
plt.legend()
num = 4
for priority in self.results.latencies.keys():
plt.subplot(3, 3, num)
median_latencies = {k: v[int(0.5*len(v))] if len(v) > 0 else 0 for (k,v) in self.results.latencies[priority].items()}
percentile90_latencies = {k: v[int(0.9*len(v))] if len(v) > 0 else 0 for (k,v) in self.results.latencies[priority].items()}
max_latencies = {k: max(v) if len(v) > 0 else 0 for (k,v) in self.results.latencies[priority].items()}
Plotter.add_plot(median_latencies, time_resolution, 'median')
Plotter.add_plot(percentile90_latencies, time_resolution, '90th percentile')
Plotter.add_plot(max_latencies, time_resolution, 'max')
plt.xlabel('Time (s)')
plt.ylabel(str(priority) + ' Latency (s)')
plt.yscale('log')
plt.legend()
num += 1
for priority in self.results.rate.keys():
plt.subplot(3, 3, num)
if len(self.results.rate[priority]) > 0:
Plotter.add_plot(self.results.rate[priority], time_resolution, 'Rate', use_avg=True)
if len(self.results.released[priority]) > 0:
Plotter.add_plot(self.results.released[priority], time_resolution, 'Released', use_avg=True)
if len(self.results.limit[priority]) > 0:
Plotter.add_plot(self.results.limit[priority], time_resolution, 'Limit', use_avg=True)
if len(self.results.limit_and_budget[priority]) > 0:
Plotter.add_plot(self.results.limit_and_budget[priority], time_resolution, 'Limit and budget', use_avg=True)
if len(self.results.budget[priority]) > 0:
Plotter.add_plot(self.results.budget[priority], time_resolution, 'Budget', use_avg=True)
plt.xlabel('Time (s)')
plt.ylabel('Value (' + str(priority) + ')')
plt.legend()
num += 1
plt.show()

View File

@ -0,0 +1,40 @@
#
# priority.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
@functools.total_ordering
class Priority:
def __init__(self, priority_value, label):
self.priority_value = priority_value
self.label = label
def __lt__(self, other):
return self.priority_value < other.priority_value
def __str__(self):
return self.label
def __repr__(self):
return repr(self.label)
Priority.SYSTEM = Priority(0, "System")
Priority.DEFAULT = Priority(1, "Default")
Priority.BATCH = Priority(2, "Batch")

View File

@ -0,0 +1,338 @@
#
# proxy_model.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import functools
import heapq
from priority import Priority
from smoother import Smoother
@functools.total_ordering
class Task:
def __init__(self, time, fxn):
self.time = time
self.fxn = fxn
def __lt__(self, other):
return self.time < other.time
class Limiter:
class UpdateRateParams:
def __init__(self, time):
self.time = time
class UpdateLimitParams:
def __init__(self, time, elapsed):
self.time = time
self.elapsed = elapsed
class CanStartParams:
def __init__(self, time, num_started, count):
self.time = time
self.num_started = num_started
self.count = count
class UpdateBudgetParams:
def __init__(self, time, num_started, num_started_at_priority, min_priority, last_batch, queue_empty, elapsed):
self.time = time
self.num_started = num_started
self.num_started_at_priority = num_started_at_priority
self.min_priority = min_priority
self.last_batch = last_batch
self.queue_empty = queue_empty
self.elapsed = elapsed
def __init__(self, priority, ratekeeper_model, proxy_model):
self.priority = priority
self.ratekeeper_model = ratekeeper_model
self.proxy_model = proxy_model
self.limit = 0
self.rate = self.ratekeeper_model.get_limit(0, self.priority)
def update_rate(self, params):
pass
def update_limit(self, params):
pass
def can_start(self, params):
pass
def update_budget(self, params):
pass
class OriginalLimiter(Limiter):
def __init__(self, priority, limit_rate_model, proxy_model):
Limiter.__init__(self, priority, limit_rate_model, proxy_model)
def update_rate(self, params):
self.rate = self.ratekeeper_model.get_limit(params.time, self.priority)
def update_limit(self, params):
self.limit = min(0, self.limit) + params.elapsed * self.rate
self.limit = min(self.limit, self.rate * 0.01)
self.limit = min(self.limit, 100000)
self.proxy_model.results.rate[self.priority][params.time] = self.rate
self.proxy_model.results.limit[self.priority][params.time] = self.limit
def can_start(self, params):
return params.num_started < self.limit
def update_budget(self, params):
self.limit -= params.num_started
class PositiveBudgetLimiter(OriginalLimiter):
def __init__(self, priority, limit_rate_model, proxy_model):
OriginalLimiter.__init__(self, priority, limit_rate_model, proxy_model)
def update_limit(self, params):
self.limit += params.elapsed * self.rate
self.limit = min(self.limit, 2.0 * self.rate)
class ClampedBudgetLimiter(PositiveBudgetLimiter):
def __init__(self, priority, limit_rate_model, proxy_model):
PositiveBudgetLimiter.__init__(self, priority, limit_rate_model, proxy_model)
def update_budget(self, params):
min_budget = -self.rate * 5.0
if self.limit > min_budget:
self.limit = max(self.limit - params.num_started, min_budget)
class TimeLimiter(PositiveBudgetLimiter):
def __init__(self, priority, limit_rate_model, proxy_model):
PositiveBudgetLimiter.__init__(self, priority, limit_rate_model, proxy_model)
self.locked_until = 0
def can_start(self, params):
return params.time >= self.locked_until and PositiveBudgetLimiter.can_start(self, params)
def update_budget(self, params):
#print('Start update budget: time=%f, limit=%f, locked_until=%f, num_started=%d, priority=%s, min_priority=%s, last_batch=%d' % (params.time, self.limit, self.locked_until, params.num_started, self.priority, params.min_priority, params.last_batch))
if params.min_priority >= self.priority or params.num_started < self.limit:
self.limit -= params.num_started
else:
self.limit = min(self.limit, max(self.limit - params.num_started, -params.last_batch))
self.locked_until = min(params.time + 2.0, max(params.time, self.locked_until) + (params.num_started - self.limit)/self.rate)
#print('End update budget: time=%f, limit=%f, locked_until=%f, num_started=%d, priority=%s, min_priority=%s' % (params.time, self.limit, self.locked_until, params.num_started, self.priority, params.min_priority))
class TimePositiveBudgetLimiter(PositiveBudgetLimiter):
def __init__(self, priority, limit_rate_model, proxy_model):
PositiveBudgetLimiter.__init__(self, priority, limit_rate_model, proxy_model)
self.locked_until = 0
def update_limit(self, params):
if params.time >= self.locked_until:
PositiveBudgetLimiter.update_limit(self, params)
def can_start(self, params):
return params.num_started + params.count <= self.limit
def update_budget(self, params):
#if params.num_started > 0:
#print('Start update budget: time=%f, limit=%f, locked_until=%f, num_started=%d, priority=%s, min_priority=%s, last_batch=%d' % (params.time, self.limit, self.locked_until, params.num_started, self.priority, params.min_priority, params.last_batch))
if params.num_started > self.limit:
self.locked_until = min(params.time + 2.0, max(params.time, self.locked_until) + penalty/self.rate)
self.limit = 0
else:
self.limit -= params.num_started
#if params.num_started > 0:
#print('End update budget: time=%f, limit=%f, locked_until=%f, num_started=%d, priority=%s, min_priority=%s' % (params.time, self.limit, self.locked_until, params.num_started, self.priority, params.min_priority))
class SmoothingLimiter(OriginalLimiter):
def __init__(self, priority, limit_rate_model, proxy_model):
OriginalLimiter.__init__(self, priority, limit_rate_model, proxy_model)
self.smooth_released = Smoother(2)
self.smooth_rate_limit = Smoother(2)
self.rate_set = False
def update_rate(self, params):
OriginalLimiter.update_rate(self, params)
if not self.rate_set:
self.rate_set = True
self.smooth_rate_limit.reset(self.rate)
else:
self.smooth_rate_limit.set_total(params.time, self.rate)
def update_limit(self, params):
self.limit = 2.0 * (self.smooth_rate_limit.smooth_total(params.time) - self.smooth_released.smooth_rate(params.time))
def can_start(self, params):
return params.num_started + params.count <= self.limit
def update_budget(self, params):
self.smooth_released.add_delta(params.time, params.num_started)
class SmoothingBudgetLimiter(SmoothingLimiter):
def __init__(self, priority, limit_rate_model, proxy_model):
SmoothingLimiter.__init__(self, priority, limit_rate_model, proxy_model)
#self.smooth_filled = Smoother(2)
self.budget = 0
def update_limit(self, params):
release_rate = (self.smooth_rate_limit.smooth_total(params.time) - self.smooth_released.smooth_rate(params.time))
#self.smooth_filled.set_total(params.time, 1 if release_rate > 0 else 0)
self.limit = 2.0 * release_rate
self.proxy_model.results.rate[self.priority][params.time] = self.smooth_rate_limit.smooth_total(params.time)
self.proxy_model.results.released[self.priority][params.time] = self.smooth_released.smooth_rate(params.time)
self.proxy_model.results.limit[self.priority][params.time] = self.limit
self.proxy_model.results.limit_and_budget[self.priority][params.time] = self.limit + self.budget
self.proxy_model.results.budget[self.priority][params.time] = self.budget
#self.budget = max(0, self.budget + params.elapsed * self.smooth_rate_limit.smooth_total(params.time))
#if self.smooth_filled.smooth_total(params.time) >= 0.1:
#self.budget += params.elapsed * self.smooth_rate_limit.smooth_total(params.time)
#print('Update limit: time=%f, priority=%s, limit=%f, rate=%f, released=%f, budget=%f' % (params.time, self.priority, self.limit, self.smooth_rate_limit.smooth_total(params.time), self.smooth_released.smooth_rate(params.time), self.budget))
def can_start(self, params):
return params.num_started + params.count <= self.limit + self.budget #or params.num_started + params.count <= self.budget
def update_budget(self, params):
self.budget = max(0, self.budget + (self.limit - params.num_started_at_priority) / 2 * params.elapsed)
if params.queue_empty:
self.budget = min(10, self.budget)
self.smooth_released.add_delta(params.time, params.num_started_at_priority)
class ProxyModel:
class Results:
def __init__(self, priorities, duration):
self.started = self.init_result(priorities, 0, duration)
self.queued = self.init_result(priorities, 0, duration)
self.latencies = self.init_result(priorities, [], duration)
self.unprocessed_queue_sizes = self.init_result(priorities, [], duration)
self.rate = {p:{} for p in priorities}
self.released = {p:{} for p in priorities}
self.limit = {p:{} for p in priorities}
self.limit_and_budget = {p:{} for p in priorities}
self.budget = {p:{} for p in priorities}
def init_result(self, priorities, starting_value, duration):
return {p: {s: copy.copy(starting_value) for s in range(0, duration)} for p in priorities}
def __init__(self, duration, ratekeeper_model, workload_model, Limiter):
self.time = 0
self.log_time = 0
self.duration = duration
self.priority_limiters = { priority: Limiter(priority, ratekeeper_model, self) for priority in workload_model.priorities() }
self.workload_model = workload_model
self.request_scheduled = { p: False for p in self.workload_model.priorities()}
self.tasks = []
self.request_queue = []
self.results = ProxyModel.Results(self.workload_model.priorities(), duration)
def run(self):
self.update_rate()
self.process_requests(self.time)
for priority in self.workload_model.priorities():
next_request = self.workload_model.next_request(self.time, priority)
assert next_request is not None
heapq.heappush(self.tasks, Task(next_request.time, lambda next_request=next_request: self.receive_request(next_request)))
self.request_scheduled[priority] = True
while True:# or len(self.request_queue) > 0:
if int(self.time) > self.log_time:
self.log_time = int(self.time)
#print(self.log_time)
task = heapq.heappop(self.tasks)
self.time = task.time
if self.time >= self.duration:
break
task.fxn()
def update_rate(self):
for limiter in self.priority_limiters.values():
limiter.update_rate(Limiter.UpdateRateParams(self.time))
heapq.heappush(self.tasks, Task(self.time + 0.01, lambda: self.update_rate()))
def receive_request(self, request):
heapq.heappush(self.request_queue, request)
self.results.queued[request.priority][int(self.time)] += request.count
next_request = self.workload_model.next_request(self.time, request.priority)
if next_request is not None and next_request.time < self.duration:
heapq.heappush(self.tasks, Task(next_request.time, lambda: self.receive_request(next_request)))
else:
self.request_scheduled[request.priority] = False
def process_requests(self, last_time):
elapsed = self.time - last_time
for limiter in self.priority_limiters.values():
limiter.update_limit(Limiter.UpdateLimitParams(self.time, elapsed))
current_started = 0
started = {p:0 for p in self.workload_model.priorities()}
min_priority = Priority.SYSTEM
last_batch = 0
while len(self.request_queue) > 0:
request = self.request_queue[0]
if not self.priority_limiters[request.priority].can_start(Limiter.CanStartParams(self.time, current_started, request.count)):
break
min_priority = request.priority
last_batch = request.count
if self.workload_model.request_completed(request) and not self.request_scheduled[request.priority]:
next_request = self.workload_model.next_request(self.time, request.priority)
assert next_request is not None
heapq.heappush(self.tasks, Task(next_request.time, lambda next_request=next_request: self.receive_request(next_request)))
self.request_scheduled[request.priority] = True
current_started += request.count
started[request.priority] += request.count
heapq.heappop(self.request_queue)
self.results.started[request.priority][int(self.time)] += request.count
self.results.latencies[request.priority][int(self.time)].append(self.time-request.time)
if len(self.request_queue) == 0:
min_priority = Priority.BATCH
for priority, limiter in self.priority_limiters.items():
started_at_priority = sum([v for p,v in started.items() if p <= priority])
limiter.update_budget(Limiter.UpdateBudgetParams(self.time, current_started, started_at_priority, min_priority, last_batch, len(self.request_queue) == 0 or self.request_queue[0].priority > priority, elapsed))
for priority in self.workload_model.priorities():
self.results.unprocessed_queue_sizes[priority][int(self.time)].append(self.workload_model.workload_models[priority].outstanding)
current_time = self.time
delay = 0.001
heapq.heappush(self.tasks, Task(self.time + delay, lambda: self.process_requests(current_time)))

View File

@ -0,0 +1,83 @@
#
# rate_model.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy
class RateModel:
def __init__(self):
pass
def get_rate(self, time):
pass
class FixedRateModel(RateModel):
def __init__(self, rate):
RateModel.__init__(self)
self.rate = rate
def get_rate(self, time):
return self.rate
class UnlimitedRateModel(FixedRateModel):
def __init__(self):
self.rate = 1e9
class IntervalRateModel(RateModel):
def __init__(self, intervals):
self.intervals = sorted(intervals)
def get_rate(self, time):
if len(self.intervals) == 0 or time < self.intervals[0][0]:
return 0
target_interval = len(self.intervals)-1
for i in range(1, len(self.intervals)):
if time < self.intervals[i][0]:
target_interval = i-1
break
self.intervals = self.intervals[target_interval:]
return self.intervals[0][1]
class SawtoothRateModel(RateModel):
def __init__(self, low, high, frequency):
self.low = low
self.high = high
self.frequency = frequency
def get_rate(self, time):
if int(2*time/self.frequency) % 2 == 0:
return self.low
else:
return self.high
class DistributionRateModel(RateModel):
def __init__(self, distribution, frequency):
self.distribution = distribution
self.frequency = frequency
self.last_change = 0
self.rate = None
def get_rate(self, time):
if self.frequency == 0 or int((time - self.last_change) / self.frequency) > int(self.last_change / self.frequency) or self.rate is None:
self.last_change = time
self.rate = self.distribution()
return self.rate

View File

@ -0,0 +1,67 @@
#
# ratekeeper.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy
import rate_model
from priority import Priority
class RatekeeperModel:
def __init__(self, limit_models):
self.limit_models = limit_models
def get_limit(self, time, priority):
return self.limit_models[priority].get_rate(time)
predefined_ratekeeper = {}
predefined_ratekeeper['default200_batch100'] = RatekeeperModel(
{
Priority.SYSTEM: rate_model.UnlimitedRateModel(),
Priority.DEFAULT: rate_model.FixedRateModel(200),
Priority.BATCH: rate_model.FixedRateModel(100)
})
predefined_ratekeeper['default_sawtooth'] = RatekeeperModel(
{
Priority.SYSTEM: rate_model.UnlimitedRateModel(),
Priority.DEFAULT: rate_model.SawtoothRateModel(10, 200, 1),
Priority.BATCH: rate_model.FixedRateModel(0)
})
predefined_ratekeeper['default_uniform_random'] = RatekeeperModel(
{
Priority.SYSTEM: rate_model.UnlimitedRateModel(),
Priority.DEFAULT: rate_model.DistributionRateModel(lambda: numpy.random.uniform(10, 200), 1),
Priority.BATCH: rate_model.FixedRateModel(0)
})
predefined_ratekeeper['default_trickle'] = RatekeeperModel(
{
Priority.SYSTEM: rate_model.UnlimitedRateModel(),
Priority.DEFAULT: rate_model.FixedRateModel(3),
Priority.BATCH: rate_model.FixedRateModel(0)
})
predefined_ratekeeper['default1000'] = RatekeeperModel(
{
Priority.SYSTEM: rate_model.UnlimitedRateModel(),
Priority.DEFAULT: rate_model.FixedRateModel(1000),
Priority.BATCH: rate_model.FixedRateModel(500)
})

View File

@ -0,0 +1,53 @@
#
# smoother.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math
class Smoother:
def __init__(self, folding_time):
self.folding_time = folding_time
self.reset(0)
def reset(self, value):
self.time = 0
self.total = value
self.estimate = value
def set_total(self, time, total):
self.add_delta(time, total-self.total)
def add_delta(self, time, delta):
self.update(time)
self.total += delta
def smooth_total(self, time):
self.update(time)
return self.estimate
def smooth_rate(self, time):
self.update(time)
return (self.total-self.estimate) / self.folding_time
def update(self, time):
elapsed = time - self.time
if elapsed > 0:
self.time = time
self.estimate += (self.total-self.estimate) * (1-math.exp(-elapsed/self.folding_time))

View File

@ -0,0 +1,201 @@
#
# workload_model.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
import numpy
import math
import rate_model
from priority import Priority
@functools.total_ordering
class Request:
def __init__(self, time, count, priority):
self.time = time
self.count = count
self.priority = priority
def __lt__(self, other):
return self.priority < other.priority
class PriorityWorkloadModel:
def __init__(self, priority, rate_model, batch_model, generator, max_outstanding=1e9):
self.priority = priority
self.rate_model = rate_model
self.batch_model = batch_model
self.generator = generator
self.max_outstanding = max_outstanding
self.outstanding = 0
def next_request(self, time):
if self.outstanding >= self.max_outstanding:
return None
batch_size = self.batch_model.next_batch()
self.outstanding += batch_size
interval = self.generator.next_request_interval(self.rate_model.get_rate(time))
return Request(time + interval, batch_size, self.priority)
def request_completed(self, request):
was_full = self.max_outstanding <= self.outstanding
self.outstanding -= request.count
return was_full and self.outstanding < self.max_outstanding
class WorkloadModel:
def __init__(self, workload_models):
self.workload_models = workload_models
def priorities(self):
return list(self.workload_models.keys())
def next_request(self, time, priority):
return self.workload_models[priority].next_request(time)
def request_completed(self, request):
return self.workload_models[request.priority].request_completed(request)
class Distribution:
EXPONENTIAL = lambda x: numpy.random.exponential(x)
UNIFORM = lambda x: numpy.random.uniform(0, 2.0*x)
FIXED = lambda x: x
class BatchGenerator:
def __init__(self):
pass
def next_batch(self):
pass
class DistributionBatchGenerator(BatchGenerator):
def __init__(self, distribution, size):
BatchGenerator.__init__(self)
self.distribution = distribution
self.size = size
def next_batch(self):
return math.ceil(self.distribution(self.size))
class RequestGenerator:
def __init__(self):
pass
def next_request_interval(self, rate):
pass
class DistributionRequestGenerator(RequestGenerator):
def __init__(self, distribution):
RequestGenerator.__init__(self)
self.distribution = distribution
def next_request_interval(self, rate):
if rate == 0:
return 1e9
return self.distribution(1.0/rate)
predefined_workloads = {}
predefined_workloads['slow_exponential'] = WorkloadModel(
{
Priority.DEFAULT: PriorityWorkloadModel(Priority.DEFAULT,
rate_model.FixedRateModel(100),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.EXPONENTIAL),
max_outstanding=100
)
})
predefined_workloads['fixed_uniform'] = WorkloadModel(
{
Priority.SYSTEM: PriorityWorkloadModel(Priority.SYSTEM,
rate_model.FixedRateModel(0),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=10
),
Priority.DEFAULT: PriorityWorkloadModel(Priority.DEFAULT,
rate_model.FixedRateModel(95),
DistributionBatchGenerator(Distribution.FIXED, 10),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=200
),
Priority.BATCH: PriorityWorkloadModel(Priority.BATCH,
rate_model.FixedRateModel(1),
DistributionBatchGenerator(Distribution.UNIFORM, 500),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=200
)
})
predefined_workloads['batch_starvation'] = WorkloadModel(
{
Priority.SYSTEM: PriorityWorkloadModel(Priority.SYSTEM,
rate_model.FixedRateModel(1),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=10
),
Priority.DEFAULT: PriorityWorkloadModel(Priority.DEFAULT,
rate_model.IntervalRateModel([(0,50), (60,150), (120,90)]),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=200
),
Priority.BATCH: PriorityWorkloadModel(Priority.BATCH,
rate_model.FixedRateModel(100),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=200
)
})
predefined_workloads['default_low_high_low'] = WorkloadModel(
{
Priority.SYSTEM: PriorityWorkloadModel(Priority.SYSTEM,
rate_model.FixedRateModel(0),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=10
),
Priority.DEFAULT: PriorityWorkloadModel(Priority.DEFAULT,
rate_model.IntervalRateModel([(0,100), (60,300), (120,100)]),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=200
),
Priority.BATCH: PriorityWorkloadModel(Priority.BATCH,
rate_model.FixedRateModel(0),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.UNIFORM),
max_outstanding=200
)
})
for rate in [83, 100, 180, 190, 200]:
predefined_workloads['default%d' % rate] = WorkloadModel(
{
Priority.DEFAULT: PriorityWorkloadModel(Priority.DEFAULT,
rate_model.FixedRateModel(rate),
DistributionBatchGenerator(Distribution.FIXED, 1),
DistributionRequestGenerator(Distribution.EXPONENTIAL),
max_outstanding=1000
)
})