Merge pull request #7055 from sfc-gh-tclinkenbeard/remove-tpcc

Remove TPC-C workloads
This commit is contained in:
Markus Pilman 2022-06-30 09:49:09 -06:00 committed by GitHub
commit 3d529fb145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 0 additions and 1697 deletions

View File

@ -1,321 +0,0 @@
/*
* TPCCWorkload.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_TPCCWORKLOAD_H
#define FDBSERVER_TPCCWORKLOAD_H
#pragma once
#include "flow/Arena.h"
#include "fdbclient/FDBTypes.h"
#include <boost/preprocessor.hpp>
#include <iomanip>
namespace TPCCWorkload {
// Schema
#define EXPAND(...) __VA_ARGS__
#define EMPTY()
#define DEFER(x) x EMPTY()
// An indirection macro to avoid direct recursion
#define BOOST_PP_SEQ_FOR_EACH_ID() BOOST_PP_SEQ_FOR_EACH generators
#define ROW_CONCAT(prefix, name) prefix##name
#define ROW_TO_STRING(str) #str
#define ROW_ELEMENT_NAME(prefix, element) ROW_CONCAT(prefix, element)
#define ROW_MEMBER(r, data, elem) \
BOOST_PP_TUPLE_ELEM(0, elem) \
ROW_ELEMENT_NAME(data, BOOST_PP_TUPLE_ELEM(1, elem));
#define ROW_MEMBERS_SEQ(prefix, seq) BOOST_PP_SEQ_FOR_EACH(ROW_MEMBER, prefix, seq)
#define ROW_MEMBERS(prefix, tuple) ROW_MEMBERS_SEQ(prefix, BOOST_PP_TUPLE_TO_SEQ(tuple))
#define ROW_SERIALIZE_ELEMENT(r, data, elem) , ROW_ELEMENT_NAME(data, BOOST_PP_TUPLE_ELEM(1, elem))
#define ROW_SERIALIZE_ELEMENTS(prefix, seq) BOOST_PP_SEQ_FOR_EACH(ROW_SERIALIZE_ELEMENT, prefix, seq)
#define ROW_SERIALIZE(prefix, tuple) ar ROW_SERIALIZE_ELEMENTS(prefix, BOOST_PP_TUPLE_TO_SEQ(tuple))
#define ROW_KEY_STEP(r, data, elem) , ROW_ELEMENT_NAME(data, elem)
#define ROW_KEY_LIST_SEQ_EXP(prefix, seq) BOOST_PP_SEQ_FOR_EACH(ROW_KEY_STEP, prefix, seq)
#define ROW_KEY_LIST_SEQ(prefix, seq) ROW_KEY_LIST_SEQ_EXP(prefix, seq)
#define ROW_KEY_LIST(prefix, a) ROW_KEY_LIST_SEQ(prefix, BOOST_PP_ARRAY_TO_SEQ(a))
#define ROW_KEY_LIST_TUPLE(prefix, tuple) ROW_KEY_LIST_SEQ(prefix, BOOST_PP_TUPLE_TO_SEQ(tuple))
#define ROW_KEY_HAS_KEY(Name, prefix, primary_key) \
static constexpr bool HAS_KEY = true; \
StringRef key() { \
auto s = generateKey(#Name, KEY_SIZE ROW_KEY_LIST(prefix, primary_key)); \
return StringRef(arena, s); \
} \
KeyRangeRef keyRange(int dontInclude) { \
auto s = generateKey(#Name, KEY_SIZE - dontInclude ROW_KEY_LIST(prefix, primary_key)); \
KeyRef begin = StringRef(arena, reinterpret_cast<const uint8_t*>(s.c_str()), s.size() + 1); \
KeyRef end = StringRef(arena, reinterpret_cast<const uint8_t*>(s.c_str()), s.size() + 1); \
auto sBegin = mutateString(begin); \
sBegin[s.size()] = uint8_t('/'); \
auto sEnd = mutateString(end); \
sEnd[s.size()] = uint8_t('0'); \
return KeyRangeRef(begin, end); \
}
#define ROW_KEY_NO_KEY static constexpr bool HAS_KEY = false;
#define ROW_KEY_IMPL(Name, prefix, primary_key, sz) \
BOOST_PP_IF(sz, ROW_KEY_HAS_KEY(Name, prefix, primary_key), ROW_KEY_NO_KEY)
#define ROW_KEY(Name, prefix, primary_key) ROW_KEY_IMPL(Name, prefix, primary_key, BOOST_PP_ARRAY_SIZE(primary_key))
#define ROW_INDEX_NAME_KEY(name) ROW_CONCAT(name, Key)
#define ROW_INDEX_NAME_IMPL2(name) ROW_TO_STRING(name)
#define ROW_INDEX_NAME_IMPL(indexName, name) ROW_INDEX_NAME_IMPL2(ROW_CONCAT(indexName, name))
#define ROW_INDEX_NAME(nameTuple, index) \
ROW_INDEX_NAME_IMPL(BOOST_PP_TUPLE_ELEM(0, index), BOOST_PP_TUPLE_ELEM(0, nameTuple))
#define ROW_GENERATE_INDEX(r, data, index) \
StringRef ROW_INDEX_NAME_KEY(BOOST_PP_TUPLE_ELEM(0, index))(int dontInclude = 0) { \
auto s = generateKey(ROW_INDEX_NAME(data, index), \
BOOST_PP_TUPLE_SIZE(index) - dontInclude - \
1 ROW_KEY_LIST_TUPLE(BOOST_PP_TUPLE_ELEM(1, data), BOOST_PP_TUPLE_POP_FRONT(index))); \
return StringRef(arena, s); \
}
#define ROW_GENERATE_INDEXES_LIST(Name, prefix, indexes) \
BOOST_PP_LIST_FOR_EACH(ROW_GENERATE_INDEX, (Name, prefix), indexes)
#define ROW_GENERATE_INDEXES(Name, prefix, indexes) \
ROW_GENERATE_INDEXES_LIST(Name, prefix, BOOST_PP_ARRAY_TO_LIST(indexes))
#define ROW_INDEXES(Name, prefix, indexes) \
BOOST_PP_IF(BOOST_PP_ARRAY_SIZE(indexes), ROW_GENERATE_INDEXES(Name, prefix, indexes), BOOST_PP_EMPTY())
#define ROW(Name, prefix, tuple, primary_key, indexes) \
struct Name { \
constexpr static FileIdentifier file_identifier = __COUNTER__; \
Arena arena; \
ROW_MEMBERS(prefix, tuple) \
template <class Ar> \
void serialize(Ar& ar) { \
serializer(ROW_SERIALIZE(prefix, tuple)); \
} \
static constexpr int KEY_SIZE = BOOST_PP_ARRAY_SIZE(primary_key); \
ROW_KEY(Name, prefix, primary_key) \
ROW_INDEXES(Name, prefix, indexes) \
}
template <class Value>
struct KeyStreamer {
void operator()(std::stringstream& ss, const Value& v) { ss << v; }
};
template <>
struct KeyStreamer<StringRef> {
void operator()(std::stringstream& ss, const StringRef& v) { ss << v.toString(); }
};
template <>
struct KeyStreamer<int> {
void operator()(std::stringstream& ss, const int v) { ss << std::setfill('0') << std::setw(6) << v; }
};
template <>
struct KeyStreamer<short> {
void operator()(std::stringstream& ss, const int v) { ss << std::setfill('0') << std::setw(6) << v; }
};
template <class... Values>
struct KeyGenerator;
template <class Head, class... Tail>
struct KeyGenerator<Head, Tail...> {
static void generate(std::stringstream& ss, int max, Head h, Tail... tail) {
KeyStreamer<Head> streamer;
if (max > 0) {
ss << '/';
streamer(ss, h);
KeyGenerator<Tail...>::generate(ss, max - 1, tail...);
}
}
};
template <>
struct KeyGenerator<> {
static void generate(std::stringstream&, int) {}
};
template <class... Values>
std::string generateKey(const std::string& table, int max, Values... values) {
std::stringstream ss;
ss << table;
if (max > 0) {
KeyGenerator<Values...>::generate(ss, max, values...);
}
return ss.str();
}
ROW(Warehouse,
w_,
((int, id),
(StringRef, name),
(StringRef, street_1),
(StringRef, street_2),
(StringRef, city),
(StringRef, state),
(StringRef, zip),
(double, tax),
(double, ytd)),
(1, (id)),
(0, ()));
ROW(District,
d_,
((int, id),
(int, w_id),
(StringRef, name),
(StringRef, street_1),
(StringRef, street_2),
(StringRef, city),
(StringRef, state),
(StringRef, zip),
(double, tax),
(double, ytd),
(int, next_o_id)),
(2, (w_id, id)),
(0, ()));
ROW(Customer,
c_,
((int, id),
(int, d_id),
(int, w_id),
(StringRef, first),
(StringRef, last),
(StringRef, middle),
(StringRef, street_1),
(StringRef, street_2),
(StringRef, city),
(StringRef, state),
(StringRef, zip),
(StringRef, phone),
(double, since),
(StringRef, credit),
(double, credit_lim),
(double, discount),
(double, balance),
(double, ytd_payment),
(unsigned, payment_cnt),
(unsigned, delivery_count),
(StringRef, data)),
(3, (w_id, d_id, id)),
(1, ((indexLast, w_id, d_id, last, id))));
ROW(History,
h_,
((int, c_id),
(int, c_d_id),
(int, c_w_id),
(int, d_id),
(int, w_id),
(double, date),
(double, amount),
(StringRef, data)),
(0, ()),
(0, ()));
ROW(NewOrder, no_, ((int, o_id), (int, d_id), (int, w_id)), (3, (w_id, d_id, o_id)), (0, ()));
ROW(Order,
o_,
((int, id),
(int, d_id),
(int, w_id),
(int, c_id),
(double, entry_d),
(Optional<short>, carrier_id),
(short, ol_cnt),
(bool, all_local)),
(3, (w_id, d_id, id)),
(0, ()));
ROW(OrderLine,
ol_,
((int, o_id),
(int, d_id),
(int, w_id),
(short, number),
(int, i_id),
(int, supply_w_id),
(Optional<double>, delivery_d),
(short, quantity),
(double, amount),
(StringRef, dist_info)),
(4, (w_id, d_id, o_id, number)),
(0, ()));
ROW(Item, i_, ((int, id), (int, im_id), (StringRef, name), (double, price), (StringRef, data)), (1, (id)), (0, ()));
ROW(Stock,
s_,
((int, i_id),
(int, w_id),
(short, quantity),
(StringRef, dist_01),
(StringRef, dist_02),
(StringRef, dist_03),
(StringRef, dist_04),
(StringRef, dist_05),
(StringRef, dist_06),
(StringRef, dist_07),
(StringRef, dist_08),
(StringRef, dist_09),
(StringRef, dist_10),
(int, ytd),
(short, order_cnt),
(short, remote_cnt),
(StringRef, data)),
(2, (w_id, i_id)),
(0, ()));
#undef FLOW_ACOMPILER_STATE
#define FLOW_ACOMPILER_STATE 1
struct GlobalState {
constexpr static FileIdentifier file_identifier = 1064821;
int CLoad, CRun, CDelta, CId, COlIID;
GlobalState() {
CLoad = deterministicRandom()->randomInt(0, 256);
while (true) {
CDelta = deterministicRandom()->randomInt(65, 120);
if (!(CDelta == 96 || CDelta == 112)) {
break;
}
}
if (CDelta > CLoad) {
CRun = CLoad + CDelta;
} else {
CRun = deterministicRandom()->coinflip() ? CLoad + CDelta : CLoad - CDelta;
}
CId = deterministicRandom()->randomInt(1, 3001);
COlIID = deterministicRandom()->randomInt(1, 100001);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, CLoad, CRun, CDelta, CId, COlIID);
}
StringRef key() const { return LiteralStringRef("GlobalState"); }
};
const std::vector<std::string> syllables = {
"BAR", "UGHT", "ABLE", "RI", "PRES", "SE", "ANTI", "ALLY", "ATION", "ING",
};
} // namespace TPCCWorkload
#endif

View File

@ -1,521 +0,0 @@
/*
* TPCC.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/Arena.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/workloads/TPCCWorkload.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/actorcompiler.h" // needs to be last include
#undef FLOW_ACOMPILER_STATE
#define FLOW_ACOMPILER_STATE 1
using namespace TPCCWorkload;
namespace {
constexpr char alphaNumerics[] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p',
'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F',
'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
'W', 'X', 'Y', 'Z', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0' };
constexpr char numerics[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
constexpr const char* originalString = "ORIGINAL";
struct PopulateTPCC : TestWorkload {
static constexpr const char* DESCRIPTION = "PopulateTPCC";
int actorsPerClient;
int warehousesPerActor;
int clientsUsed;
GlobalState gState;
PopulateTPCC(WorkloadContext const& ctx) : TestWorkload(ctx) {
std::string workloadName = DESCRIPTION;
actorsPerClient = getOption(options, LiteralStringRef("actorsPerClient"), 10);
warehousesPerActor = getOption(options, LiteralStringRef("warehousesPerActor"), 30);
clientsUsed = getOption(options, LiteralStringRef("clientsUsed"), 2);
}
int NURand(int C, int A, int x, int y) {
return (((deterministicRandom()->randomInt(0, A + 1) | deterministicRandom()->randomInt(x, y + 1)) + C) %
(y - x + 1)) +
x;
}
StringRef aString(Arena& arena, int x, int y) {
int length = deterministicRandom()->randomInt(x, y + 1);
char* res = new (arena) char[length];
for (int i = 0; i < length; ++i) {
res[i] = alphaNumerics[deterministicRandom()->randomInt(0, sizeof(alphaNumerics))];
}
return StringRef(reinterpret_cast<uint8_t*>(res), length);
}
StringRef nString(Arena& arena, int x, int y) {
int length = deterministicRandom()->randomInt(x, y + 1);
char* res = new (arena) char[length];
for (int i = 0; i < length; ++i) {
res[i] = numerics[deterministicRandom()->randomInt(0, sizeof(numerics))];
}
return StringRef(reinterpret_cast<uint8_t*>(res), length);
}
StringRef genCLast(Arena& arena, int x) {
int l = x % 10;
x /= 10;
int m = x % 10;
x /= 10;
int f = x % 10;
std::stringstream ss;
ss << syllables[f] << syllables[m] << syllables[l];
return StringRef(arena, ss.str());
}
StringRef rndZip(Arena& arena) {
char* result = new (arena) char[9];
for (int i = 0; i < 4; ++i) {
result[i] = numerics[deterministicRandom()->randomInt(0, sizeof(numerics))];
}
for (int i = 4; i < 9; ++i) {
result[i] = '1';
}
return StringRef(reinterpret_cast<uint8_t*>(result), 9);
}
StringRef dataString(Arena& arena) {
if (deterministicRandom()->random01() < 0.1) {
auto str = aString(arena, 26, 51 - strlen(originalString));
char* r = new (arena) char[str.size() + strlen(originalString)];
int pos = deterministicRandom()->randomInt(0, str.size());
std::copy(originalString, originalString + strlen(originalString), r + pos);
auto res = reinterpret_cast<uint8_t*>(r);
std::copy(str.begin(), str.begin() + pos, res);
std::copy(str.begin() + pos, str.end(), res + pos + strlen(originalString));
return StringRef(res, str.size() + strlen(originalString));
} else {
return aString(arena, 26, 51);
}
}
ACTOR static Future<Void> writeGlobalState(PopulateTPCC* self, Database cx) {
state ReadYourWritesTransaction tr(cx);
loop {
tr.reset();
try {
BinaryWriter writer(IncludeVersion());
serializer(writer, self->gState);
tr.set(self->gState.key(), writer.toValue());
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<Void> readGlobalState(PopulateTPCC* self, Database cx) {
state ReadYourWritesTransaction tr(cx);
loop {
tr.reset();
try {
Optional<Value> val = wait(tr.get(self->gState.key()));
if (val.present()) {
BinaryReader reader(val.get(), IncludeVersion());
serializer(reader, self->gState);
} else {
wait(delay(1.0));
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
std::string description() const override { return DESCRIPTION; }
ACTOR static Future<Void> populateItems(PopulateTPCC* self, Database cx) {
state Transaction tr(cx);
state int itemStart = 0;
state int i_id;
for (; itemStart < 100000; itemStart += 100) {
TraceEvent("PopulateItems").detail("Status", itemStart);
loop {
try {
tr.reset();
for (i_id = itemStart; i_id < itemStart + 100; ++i_id) {
Item item;
item.i_id = i_id;
item.i_im_id = deterministicRandom()->randomInt(1, 10001);
item.i_name = self->aString(item.arena, 14, 25);
item.i_price = deterministicRandom()->randomInt64(1.0, 100.0);
item.i_data = self->dataString(item.arena);
BinaryWriter w(IncludeVersion());
serializer(w, item);
tr.set(item.key(), w.toValue(), AddConflictRange::False);
}
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent("PopulateItemsHandleError").error(e);
wait(tr.onError(e));
}
}
}
TraceEvent("PopulateItemsDone").log();
return Void();
}
ACTOR static Future<Void> populateCustomers(PopulateTPCC* self, Database cx, int w_id, int d_id) {
state Transaction tr(cx);
state int cStart;
state int c_id;
for (cStart = 0; cStart < 3000; cStart += 100) {
TraceEvent("PopulateCustomers")
.detail("Warehouse", w_id)
.detail("District", d_id)
.detail("Customer", cStart);
loop {
for (c_id = cStart; c_id < cStart + 100; ++c_id) {
Customer c;
History h;
c.c_id = c_id;
c.c_d_id = d_id;
c.c_w_id = w_id;
if (c_id < 1000) {
c.c_last = self->genCLast(c.arena, c_id);
} else {
c.c_last = self->genCLast(c.arena, self->NURand(self->gState.CLoad, 255, 0, 999));
}
c.c_middle = LiteralStringRef("OE");
c.c_first = self->aString(c.arena, 8, 16);
c.c_street_1 = self->aString(c.arena, 10, 20);
c.c_street_2 = self->aString(c.arena, 10, 20);
c.c_city = self->aString(c.arena, 10, 20);
c.c_state = self->aString(c.arena, 2, 2);
c.c_zip = self->rndZip(c.arena);
c.c_phone = self->nString(c.arena, 16, 16);
c.c_since = g_network->now();
if (deterministicRandom()->random01() < 0.1) {
c.c_credit = LiteralStringRef("BC");
} else {
c.c_credit = LiteralStringRef("GC");
}
c.c_credit_lim = 50000;
c.c_discount = deterministicRandom()->random01() / 2.0;
c.c_balance = -10.0;
c.c_ytd_payment = 10.0;
c.c_payment_cnt = 1;
c.c_delivery_count = 0;
c.c_data = self->aString(c.arena, 300, 500);
h.h_c_id = c_id;
h.h_c_d_id = d_id;
h.h_d_id = d_id;
h.h_w_id = w_id;
h.h_c_w_id = w_id;
h.h_date = g_network->now();
h.h_amount = 10.0;
h.h_data = self->aString(c.arena, 12, 24);
{
BinaryWriter w(IncludeVersion());
serializer(w, c);
tr.set(c.key(), w.toValue(), AddConflictRange::False);
}
{
// Write index
tr.set(c.indexLastKey(), c.key(), AddConflictRange::False);
}
{
BinaryWriter w(IncludeVersion());
serializer(w, h);
UID k = deterministicRandom()->randomUniqueID();
BinaryWriter kW(Unversioned());
serializer(kW, k);
auto key = kW.toValue().withPrefix(LiteralStringRef("History/"));
tr.set(key, w.toValue(), AddConflictRange::False);
}
}
try {
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent("PopulateCustomerHandleError").error(e);
wait(tr.onError(e));
}
}
}
TraceEvent("PopulateCustomersDone").detail("Warehouse", w_id).detail("District", d_id);
return Void();
}
ACTOR static Future<Void> populateOrders(PopulateTPCC* self, Database cx, int w_id, int d_id) {
state Transaction tr(cx);
state std::vector<int> customerIds;
state int idStart;
state int o_id;
customerIds.reserve(3000);
for (int i = 0; i < 3000; ++i) {
customerIds.push_back(i);
}
deterministicRandom()->randomShuffle(customerIds);
for (idStart = 0; idStart < 3000; idStart += 100) {
TraceEvent("PopulateOrders").detail("Warehouse", w_id).detail("District", d_id).detail("Order", idStart);
loop {
tr.reset();
for (o_id = idStart; o_id < idStart + 100; ++o_id) {
Order o;
o.o_id = o_id;
o.o_c_id = customerIds[o_id];
o.o_d_id = d_id;
o.o_w_id = w_id;
o.o_entry_d = g_network->now();
if (o_id < 2100) {
o.o_carrier_id = deterministicRandom()->randomInt(1, 11);
}
o.o_ol_cnt = deterministicRandom()->randomInt(5, 16);
o.o_all_local = true;
for (int ol_number = 0; ol_number < o.o_ol_cnt; ++ol_number) {
OrderLine ol;
ol.ol_o_id = o_id;
ol.ol_d_id = d_id;
ol.ol_w_id = w_id;
ol.ol_number = ol_number;
ol.ol_i_id = deterministicRandom()->randomInt(0, 100000);
ol.ol_supply_w_id = w_id;
if (o_id < 2100) {
ol.ol_delivery_d = g_network->now();
ol.ol_amount = 0.0;
} else {
ol.ol_amount = deterministicRandom()->random01() * 10000.0;
}
ol.ol_quantity = 5;
ol.ol_dist_info = self->aString(ol.arena, 24, 24);
BinaryWriter w(IncludeVersion());
serializer(w, ol);
tr.set(ol.key(), w.toValue(), AddConflictRange::False);
}
BinaryWriter w(IncludeVersion());
serializer(w, o);
tr.set(o.key(), w.toValue(), AddConflictRange::False);
}
try {
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent("PopulateOrderHandleError").error(e);
wait(tr.onError(e));
}
}
}
TraceEvent("PopulateOrdersDone").detail("Warehouse", w_id).detail("District", d_id);
return Void();
}
ACTOR static Future<Void> populateNewOrders(PopulateTPCC* self, Database cx, int w_id, int d_id) {
state Transaction tr(cx);
TraceEvent("PopulateNewOrders").detail("Warehouse", w_id).detail("District", d_id);
loop {
tr.reset();
for (int i = 2100; i < 3000; ++i) {
NewOrder no;
no.no_o_id = i;
no.no_d_id = d_id;
no.no_w_id = w_id;
BinaryWriter w(IncludeVersion());
serializer(w, no);
tr.set(no.key(), w.toValue(), AddConflictRange::False);
}
try {
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent("PopulateNewOrderHandleError").error(e);
wait(tr.onError(e));
}
}
TraceEvent("PopulateNewOrdersDone").detail("Warehouse", w_id).detail("District", d_id);
return Void();
}
ACTOR static Future<Void> populateDistricts(PopulateTPCC* self, Database cx, int w_id) {
state Transaction tr(cx);
state int d_id;
for (d_id = 0; d_id < 10; ++d_id) {
TraceEvent("PopulateDistricts").detail("Warehouse", w_id).detail("District", d_id);
loop {
tr.reset();
District d;
d.d_id = d_id;
d.d_w_id = w_id;
d.d_name = self->aString(d.arena, 6, 10);
d.d_street_1 = self->aString(d.arena, 10, 20);
d.d_street_2 = self->aString(d.arena, 10, 20);
d.d_city = self->aString(d.arena, 10, 20);
d.d_state = self->aString(d.arena, 2, 2);
d.d_zip = self->rndZip(d.arena);
d.d_tax = deterministicRandom()->random01() * 0.2;
d.d_ytd = 30000;
d.d_next_o_id = 3000;
BinaryWriter w(IncludeVersion());
serializer(w, d);
tr.set(d.key(), w.toValue(), AddConflictRange::False);
try {
wait(tr.commit());
wait(populateCustomers(self, cx, w_id, d_id));
wait(populateOrders(self, cx, w_id, d_id));
wait(populateNewOrders(self, cx, w_id, d_id));
break;
} catch (Error& e) {
TraceEvent("PopulateDistrictHandleError").error(e);
wait(tr.onError(e));
}
}
}
TraceEvent("PopulateDistrictsDone").detail("Warehouse", w_id);
return Void();
}
ACTOR static Future<Void> populateStock(PopulateTPCC* self, Database cx, int w_id) {
state Transaction tr(cx);
state int idStart;
for (idStart = 0; idStart < 100000; idStart += 100) {
TraceEvent("PopulateStock").detail("Warehouse", w_id).detail("i_id", idStart);
loop {
tr.reset();
for (int i = idStart; i < idStart + 100; ++i) {
Stock s;
s.s_i_id = i;
s.s_w_id = w_id;
s.s_quantity = deterministicRandom()->randomInt(1, 101);
s.s_dist_01 = self->aString(s.arena, 24, 25);
s.s_dist_02 = self->aString(s.arena, 24, 25);
s.s_dist_03 = self->aString(s.arena, 24, 25);
s.s_dist_04 = self->aString(s.arena, 24, 25);
s.s_dist_05 = self->aString(s.arena, 24, 25);
s.s_dist_06 = self->aString(s.arena, 24, 25);
s.s_dist_07 = self->aString(s.arena, 24, 25);
s.s_dist_08 = self->aString(s.arena, 24, 25);
s.s_dist_09 = self->aString(s.arena, 24, 25);
s.s_dist_10 = self->aString(s.arena, 24, 25);
s.s_ytd = 0;
s.s_order_cnt = 0;
s.s_remote_cnt = 0;
s.s_data = self->dataString(s.arena);
BinaryWriter w(IncludeVersion());
serializer(w, s);
tr.set(s.key(), w.toValue(), AddConflictRange::False);
}
try {
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent("PopulateStockHandleError").error(e).detail("Warehouse", w_id);
wait(tr.onError(e));
}
}
}
TraceEvent("PopulateStockDone").detail("Warehouse", w_id);
return Void();
}
ACTOR static Future<Void> populateWarehouse(PopulateTPCC* self, Database cx, int w_id) {
state Transaction tr(cx);
TraceEvent("PopulateWarehouse").detail("W_ID", w_id);
loop {
tr.reset();
try {
Warehouse w;
w.w_id = w_id;
w.w_name = self->aString(w.arena, 6, 11);
w.w_street_1 = self->aString(w.arena, 10, 21);
w.w_street_2 = self->aString(w.arena, 10, 21);
w.w_city = self->aString(w.arena, 10, 21);
w.w_state = self->aString(w.arena, 2, 3);
w.w_tax = deterministicRandom()->random01() * 0.2;
w.w_ytd = 300000;
BinaryWriter writer(IncludeVersion());
serializer(writer, w);
tr.set(w.key(), writer.toValue(), AddConflictRange::False);
wait(tr.commit());
break;
} catch (Error& e) {
TraceEvent("PopulateWarehouseHandleError").error(e).detail("Warehouse", w_id);
wait(tr.onError(e));
}
}
wait(populateStock(self, cx, w_id));
wait(populateDistricts(self, cx, w_id));
TraceEvent("PopulateWarehouseDone").detail("W_ID", w_id);
return Void();
}
ACTOR static Future<Void> populateActor(PopulateTPCC* self, Database cx, int actorId) {
state int startWID =
self->clientId * self->actorsPerClient * self->warehousesPerActor + actorId * self->warehousesPerActor;
state int endWID = startWID + self->warehousesPerActor;
state int wid;
for (wid = startWID; wid < endWID; ++wid) {
wait(populateWarehouse(self, cx, wid));
}
return Void();
}
ACTOR static Future<Void> populate(PopulateTPCC* self, Database cx) {
if (self->clientId == 0) {
wait(writeGlobalState(self, cx));
} else {
wait(readGlobalState(self, cx));
}
if (self->clientId == 0) {
wait(populateItems(self, cx));
}
state std::vector<Future<Void>> populateActors;
state int actorId;
for (actorId = 0; actorId < self->actorsPerClient; ++actorId) {
populateActors.push_back(populateActor(self, cx, actorId));
}
wait(waitForAll(populateActors));
wait(quietDatabase(cx, self->dbInfo, "PopulateTPCC"));
return Void();
}
Future<Void> setup(Database const& cx) override {
if (clientId >= clientsUsed)
return Void();
return populate(this, cx);
}
Future<Void> start(Database const& cx) override { return Void(); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
} // namespace
WorkloadFactory<PopulateTPCC> PopulateTPCCWorkloadFactory(PopulateTPCC::DESCRIPTION);

View File

@ -1,825 +0,0 @@
/*
* TPCC.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/TPCCWorkload.h"
#include <fdbclient/ReadYourWrites.h>
#include "flow/actorcompiler.h" // has to be last include
using namespace TPCCWorkload;
namespace {
struct TPCCMetrics {
static constexpr int latenciesStored = 1000;
uint64_t successfulStockLevelTransactions{ 0 };
uint64_t failedStockLevelTransactions{ 0 };
uint64_t successfulDeliveryTransactions{ 0 };
uint64_t failedDeliveryTransactions{ 0 };
uint64_t successfulOrderStatusTransactions{ 0 };
uint64_t failedOrderStatusTransactions{ 0 };
uint64_t successfulPaymentTransactions{ 0 };
uint64_t failedPaymentTransactions{ 0 };
uint64_t successfulNewOrderTransactions{ 0 };
uint64_t failedNewOrderTransactions{ 0 };
double stockLevelResponseTime{ 0.0 };
double deliveryResponseTime{ 0.0 };
double orderStatusResponseTime{ 0.0 };
double paymentResponseTime{ 0.0 };
double newOrderResponseTime{ 0.0 };
std::vector<double> stockLevelLatencies, deliveryLatencies, orderStatusLatencies, paymentLatencies,
newOrderLatencies;
void sort() {
std::sort(stockLevelLatencies.begin(), stockLevelLatencies.end());
std::sort(deliveryLatencies.begin(), deliveryLatencies.end());
std::sort(orderStatusLatencies.begin(), orderStatusLatencies.end());
std::sort(paymentLatencies.begin(), paymentLatencies.end());
std::sort(newOrderLatencies.begin(), newOrderLatencies.end());
}
static double median(const std::vector<double>& latencies) {
// assumes latencies is sorted
return latencies[latencies.size() / 2];
}
static double percentile_90(const std::vector<double>& latencies) {
// assumes latencies is sorted
return latencies[(9 * latencies.size()) / 10];
}
static double percentile_99(const std::vector<double>& latencies) {
// assumes latencies is sorted
return latencies[(99 * latencies.size()) / 100];
}
static void updateMetrics(bool committed,
uint64_t& successCounter,
uint64_t& failedCounter,
double txnStartTime,
std::vector<double>& latencies,
double& totalLatency,
std::string txnType) {
auto responseTime = g_network->now() - txnStartTime;
if (committed) {
totalLatency += responseTime;
++successCounter;
if (successCounter <= latenciesStored)
latencies[successCounter - 1] = responseTime;
else {
auto index = deterministicRandom()->randomInt(0, successCounter);
if (index < latenciesStored) {
latencies[index] = responseTime;
}
}
} else {
++failedCounter;
}
TraceEvent("TransactionComplete")
.detail("TransactionType", txnType)
.detail("Latency", responseTime)
.detail("Begin", txnStartTime)
.detail("End", txnStartTime + responseTime)
.detail("Success", committed);
}
};
struct TPCC : TestWorkload {
static constexpr const char* DESCRIPTION = "TPCC";
int warehousesPerClient;
int expectedTransactionsPerMinute;
int testDuration;
int warmupTime;
int clientsUsed;
double startTime;
GlobalState gState;
TPCCMetrics metrics;
TPCC(WorkloadContext const& ctx) : TestWorkload(ctx) {
std::string workloadName = DESCRIPTION;
warehousesPerClient = getOption(options, LiteralStringRef("warehousesPerClient"), 100);
expectedTransactionsPerMinute = getOption(options, LiteralStringRef("expectedTransactionsPerMinute"), 1000);
testDuration = getOption(options, LiteralStringRef("testDuration"), 600);
warmupTime = getOption(options, LiteralStringRef("warmupTime"), 30);
clientsUsed = getOption(options, LiteralStringRef("clientsUsed"), 40);
}
int NURand(int C, int A, int x, int y) {
return (((deterministicRandom()->randomInt(0, A + 1) | deterministicRandom()->randomInt(x, y + 1)) + C) %
(y - x + 1)) +
x;
}
StringRef genCLast(Arena& arena, int x) {
int l = x % 10;
x /= 10;
int m = x % 10;
x /= 10;
int f = x % 10;
std::stringstream ss;
ss << syllables[f] << syllables[m] << syllables[l];
return StringRef(arena, ss.str());
}
// Should call in setup
ACTOR static Future<Void> readGlobalState(TPCC* self, Database cx) {
state ReadYourWritesTransaction tr(cx);
loop {
tr.reset();
try {
Optional<Value> val = wait(tr.get(self->gState.key()));
if (val.present()) {
BinaryReader reader(val.get(), IncludeVersion());
serializer(reader, self->gState);
} else {
wait(delay(1.0));
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
std::string description() const override { return DESCRIPTION; }
// Transactions
ACTOR static Future<bool> newOrder(TPCC* self, Database cx, int w_id) {
state int d_id = deterministicRandom()->randomInt(0, 10);
state int c_id = self->NURand(self->gState.CRun, 1023, 1, 3000) - 1;
state int ol_cnt = deterministicRandom()->randomInt(5, 16);
state bool willRollback = deterministicRandom()->randomInt(1, 100) == 1;
state ReadYourWritesTransaction tr(cx);
try {
state Warehouse warehouse;
warehouse.w_id = w_id;
Optional<Value> wValue = wait(tr.get(warehouse.key()));
ASSERT(wValue.present());
{
BinaryReader r(wValue.get(), IncludeVersion());
serializer(r, warehouse);
}
state District district;
district.d_w_id = w_id;
district.d_id = d_id;
Optional<Value> dValue = wait(tr.get(district.key()));
ASSERT(dValue.present());
{
BinaryReader r(dValue.get(), IncludeVersion());
serializer(r, district);
}
state Customer customer;
customer.c_id = c_id;
customer.c_w_id = w_id;
customer.c_d_id = d_id;
Optional<Value> cValue = wait(tr.get(customer.key()));
ASSERT(cValue.present());
{
BinaryReader r(cValue.get(), IncludeVersion());
serializer(r, customer);
}
state Order order;
order.o_entry_d = g_network->now();
order.o_c_id = c_id;
order.o_d_id = d_id;
order.o_w_id = w_id;
order.o_ol_cnt = ol_cnt;
order.o_id = district.d_next_o_id;
++district.d_next_o_id;
{
BinaryWriter w(IncludeVersion());
serializer(w, district);
tr.set(district.key(), w.toValue());
}
state NewOrder newOrder;
newOrder.no_w_id = w_id;
newOrder.no_d_id = d_id;
newOrder.no_o_id = order.o_id;
state int ol_id = 0;
state bool allLocal = true;
for (; ol_id < order.o_ol_cnt; ++ol_id) {
if (ol_id + 1 == order.o_ol_cnt && willRollback) {
// Simulated abort - order item not found
return false;
}
state OrderLine orderLine;
orderLine.ol_number = ol_id;
orderLine.ol_w_id = w_id;
orderLine.ol_d_id = d_id;
orderLine.ol_supply_w_id = w_id;
orderLine.ol_o_id = order.o_id;
orderLine.ol_i_id = self->NURand(self->gState.CRun, 8191, 1, 100000) - 1;
orderLine.ol_quantity = deterministicRandom()->randomInt(1, 11);
if (deterministicRandom()->randomInt(0, 100) == 0) {
orderLine.ol_supply_w_id =
deterministicRandom()->randomInt(0, self->clientsUsed * self->warehousesPerClient);
}
state Item item;
item.i_id = orderLine.ol_i_id;
orderLine.ol_i_id = item.i_id;
Optional<Value> iValue = wait(tr.get(item.key()));
ASSERT(iValue.present());
{
BinaryReader r(iValue.get(), IncludeVersion());
serializer(r, item);
}
state Stock stock;
stock.s_i_id = item.i_id;
stock.s_w_id = orderLine.ol_supply_w_id;
Optional<Value> sValue = wait(tr.get(stock.key()));
ASSERT(sValue.present());
{
BinaryReader r(sValue.get(), IncludeVersion());
serializer(r, stock);
}
if (stock.s_quantity - orderLine.ol_quantity >= 10) {
stock.s_quantity -= orderLine.ol_quantity;
} else {
stock.s_quantity = (stock.s_quantity - orderLine.ol_quantity) + 91;
}
stock.s_ytd += orderLine.ol_quantity;
stock.s_order_cnt += 1;
if (orderLine.ol_supply_w_id != w_id) {
stock.s_remote_cnt += 1;
allLocal = false;
}
{
BinaryWriter w(IncludeVersion());
serializer(w, stock);
tr.set(stock.key(), w.toValue());
}
orderLine.ol_amount = orderLine.ol_quantity * item.i_price;
switch (orderLine.ol_d_id) {
case 0:
orderLine.ol_dist_info = stock.s_dist_01;
break;
case 1:
orderLine.ol_dist_info = stock.s_dist_02;
break;
case 2:
orderLine.ol_dist_info = stock.s_dist_03;
break;
case 3:
orderLine.ol_dist_info = stock.s_dist_04;
break;
case 4:
orderLine.ol_dist_info = stock.s_dist_05;
break;
case 5:
orderLine.ol_dist_info = stock.s_dist_06;
break;
case 6:
orderLine.ol_dist_info = stock.s_dist_07;
break;
case 7:
orderLine.ol_dist_info = stock.s_dist_08;
break;
case 8:
orderLine.ol_dist_info = stock.s_dist_09;
break;
case 9:
orderLine.ol_dist_info = stock.s_dist_10;
break;
}
{
BinaryWriter w(IncludeVersion());
serializer(w, orderLine);
tr.set(orderLine.key(), w.toValue());
}
}
order.o_all_local = allLocal;
{
BinaryWriter w(IncludeVersion());
serializer(w, order);
tr.set(order.key(), w.toValue());
}
{
BinaryWriter w(IncludeVersion());
serializer(w, newOrder);
tr.set(newOrder.key(), w.toValue());
}
wait(tr.commit());
} catch (Error& e) {
return false;
}
return true;
}
ACTOR static Future<Customer> getRandomCustomer(TPCC* self, ReadYourWritesTransaction* tr, int w_id, int d_id) {
state Customer result;
result.c_w_id = w_id;
result.c_d_id = d_id;
if (deterministicRandom()->randomInt(0, 100) >= 85) {
result.c_d_id = deterministicRandom()->randomInt(0, 10);
result.c_w_id = deterministicRandom()->randomInt(0, self->clientsUsed * self->warehousesPerClient);
}
if (deterministicRandom()->randomInt(0, 100) < 60) {
// select through last name
result.c_last = self->genCLast(result.arena, self->NURand(self->gState.CRun, 1023, 1, 3000) - 1);
auto s = result.indexLastKey(1);
auto begin = new (result.arena) uint8_t[s.size() + 1];
auto end = new (result.arena) uint8_t[s.size() + 1];
memcpy(begin, s.begin(), s.size());
memcpy(end, s.begin(), s.size());
begin[s.size()] = '/';
end[s.size()] = '0';
state RangeResult range =
wait(tr->getRange(KeyRangeRef(StringRef(begin, s.size() + 1), StringRef(end, s.size() + 1)), 1000));
ASSERT(range.size() > 0);
state std::vector<Customer> customers;
state int i = 0;
for (; i < range.size(); ++i) {
Optional<Value> cValue = wait(tr->get(range[i].value));
ASSERT(cValue.present());
BinaryReader r(cValue.get(), IncludeVersion());
state Customer customer;
serializer(r, customer);
customers.push_back(customer);
}
// Sort customers by first name and choose median
std::sort(customers.begin(), customers.end(), [](const Customer& cus1, const Customer& cus2) {
const std::string cus1Name = cus1.c_first.toString();
const std::string cus2Name = cus2.c_first.toString();
return (cus1Name.compare(cus2Name) < 0);
});
result = customers[customers.size() / 2];
} else {
// select through random id
result.c_id = self->NURand(self->gState.CRun, 1023, 1, 3000) - 1;
Optional<Value> val = wait(tr->get(result.key()));
ASSERT(val.present());
BinaryReader r(val.get(), IncludeVersion());
serializer(r, result);
}
return result;
}
ACTOR static Future<bool> payment(TPCC* self, Database cx, int w_id) {
state ReadYourWritesTransaction tr(cx);
state int d_id = deterministicRandom()->randomInt(0, 10);
state History history;
state Warehouse warehouse;
state District district;
history.h_amount = deterministicRandom()->random01() * 4999.0 + 1.0;
history.h_date = g_network->now();
try {
// get the customer
state Customer customer = wait(getRandomCustomer(self, &tr, w_id, d_id));
warehouse.w_id = w_id;
Optional<Value> wValue = wait(tr.get(warehouse.key()));
ASSERT(wValue.present());
{
BinaryReader r(wValue.get(), IncludeVersion());
serializer(r, warehouse);
}
warehouse.w_ytd += history.h_amount;
{
BinaryWriter w(IncludeVersion());
serializer(w, warehouse);
tr.set(warehouse.key(), w.toValue());
}
district.d_w_id = w_id;
district.d_id = d_id;
Optional<Value> dValue = wait(tr.get(district.key()));
ASSERT(dValue.present());
{
BinaryReader r(dValue.get(), IncludeVersion());
serializer(r, district);
}
district.d_ytd += history.h_amount;
customer.c_balance -= history.h_amount;
customer.c_ytd_payment += history.h_amount;
customer.c_payment_cnt += 1;
if (customer.c_credit == LiteralStringRef("BC")) {
// we must update c_data
std::stringstream ss;
ss << customer.c_id << "," << customer.c_d_id << "," << customer.c_w_id << "," << district.d_id << ","
<< w_id << history.h_amount << ";";
auto s = ss.str();
auto len = std::min(int(s.size()) + customer.c_data.size(), 500);
auto data = new (customer.arena) uint8_t[len];
std::copy(s.begin(), s.end(), reinterpret_cast<char*>(data));
std::copy(customer.c_data.begin(), customer.c_data.begin() + len - s.size(), data);
customer.c_data = StringRef(data, len);
}
{
BinaryWriter w(IncludeVersion());
serializer(w, customer);
tr.set(customer.key(), w.toValue());
}
std::stringstream ss;
ss << warehouse.w_name.toString() << " " << district.d_name.toString();
history.h_data = StringRef(history.arena, ss.str());
history.h_c_id = customer.c_id;
history.h_c_d_id = customer.c_d_id;
history.h_c_w_id = customer.c_w_id;
history.h_d_id = d_id;
history.h_w_id = w_id;
{
BinaryWriter w(IncludeVersion());
serializer(w, history);
UID k = deterministicRandom()->randomUniqueID();
BinaryWriter kW(Unversioned());
serializer(kW, k);
auto key = kW.toValue().withPrefix(LiteralStringRef("History/"));
tr.set(key, w.toValue());
}
wait(tr.commit());
} catch (Error& e) {
return false;
}
return true;
}
ACTOR static Future<bool> orderStatus(TPCC* self, Database cx, int w_id) {
state ReadYourWritesTransaction tr(cx);
state int d_id = deterministicRandom()->randomInt(0, 10);
state int i;
state Order order;
state std::vector<OrderLine> orderLines;
try {
state Customer customer = wait(getRandomCustomer(self, &tr, w_id, d_id));
order.o_w_id = customer.c_w_id;
order.o_d_id = customer.c_d_id;
order.o_c_id = customer.c_id;
RangeResult range = wait(tr.getRange(order.keyRange(1), 1, Snapshot::False, Reverse::True));
ASSERT(range.size() > 0);
{
BinaryReader r(range[0].value, IncludeVersion());
serializer(r, order);
}
for (i = 0; i < order.o_ol_cnt; ++i) {
OrderLine orderLine;
orderLine.ol_w_id = order.o_w_id;
orderLine.ol_d_id = order.o_d_id;
orderLine.ol_o_id = order.o_id;
orderLine.ol_number = i;
Optional<Value> olValue = wait(tr.get(orderLine.key()));
ASSERT(olValue.present());
BinaryReader r(olValue.get(), IncludeVersion());
OrderLine ol;
serializer(r, ol);
orderLines.push_back(ol);
}
} catch (Error& e) {
return false;
}
return true;
}
ACTOR static Future<bool> delivery(TPCC* self, Database cx, int w_id) {
state ReadYourWritesTransaction tr(cx);
state int carrier_id = deterministicRandom()->randomInt(0, 10);
state int d_id;
state NewOrder newOrder;
state Order order;
state double sumAmount = 0.0;
state Customer customer;
state int i;
try {
for (d_id = 0; d_id < 10; ++d_id) {
newOrder.no_w_id = w_id;
newOrder.no_d_id = d_id;
RangeResult range = wait(tr.getRange(newOrder.keyRange(1), 1));
if (range.size() > 0) {
{
BinaryReader r(range[0].value, IncludeVersion());
serializer(r, newOrder);
}
tr.clear(newOrder.key());
order.o_w_id = w_id;
order.o_d_id = d_id;
order.o_id = newOrder.no_o_id;
Optional<Value> oValue = wait(tr.get(order.key()));
ASSERT(oValue.present());
{
BinaryReader r(oValue.get(), IncludeVersion());
serializer(r, order);
}
order.o_carrier_id = carrier_id;
{
BinaryWriter w(IncludeVersion());
serializer(w, order);
tr.set(order.key(), w.toValue());
}
for (i = 0; i < order.o_ol_cnt; ++i) {
state OrderLine orderLine;
orderLine.ol_w_id = order.o_w_id;
orderLine.ol_d_id = order.o_d_id;
orderLine.ol_o_id = order.o_id;
orderLine.ol_number = i;
Optional<Value> olV = wait(tr.get(orderLine.key()));
ASSERT(olV.present());
BinaryReader r(olV.get(), IncludeVersion());
serializer(r, orderLine);
orderLine.ol_delivery_d = g_network->now();
sumAmount += orderLine.ol_amount;
}
customer.c_w_id = w_id;
customer.c_d_id = d_id;
customer.c_id = order.o_c_id;
Optional<Value> cV = wait(tr.get(customer.key()));
ASSERT(cV.present());
{
BinaryReader r(cV.get(), IncludeVersion());
serializer(r, customer);
}
customer.c_balance += sumAmount;
customer.c_delivery_count += 1;
{
BinaryWriter w(IncludeVersion());
serializer(w, customer);
tr.set(customer.key(), w.toValue());
}
wait(tr.commit());
}
}
} catch (Error& e) {
return false;
}
return true;
}
ACTOR static Future<bool> stockLevel(TPCC* self, Database cx, int w_id, int d_id) {
state int threshold = deterministicRandom()->randomInt(10, 21);
state Transaction tr(cx);
state District district;
state OrderLine orderLine;
state Stock stock;
state int ol_o_id;
state int low_stock = 0;
state int i;
try {
district.d_w_id = w_id;
district.d_id = d_id;
Optional<Value> dV = wait(tr.get(district.key()));
ASSERT(dV.present());
{
BinaryReader r(dV.get(), IncludeVersion());
serializer(r, district);
}
for (ol_o_id = district.d_next_o_id - 20; ol_o_id < district.d_next_o_id; ++ol_o_id) {
orderLine.ol_w_id = w_id;
orderLine.ol_d_id = d_id;
orderLine.ol_o_id = ol_o_id;
state RangeResult range = wait(tr.getRange(orderLine.keyRange(1), CLIENT_KNOBS->TOO_MANY));
ASSERT(!range.more);
ASSERT(range.size() > 0);
for (i = 0; i < range.size(); ++i) {
{
BinaryReader r(range[i].value, IncludeVersion());
serializer(r, orderLine);
}
stock.s_i_id = orderLine.ol_i_id;
stock.s_w_id = orderLine.ol_w_id;
Optional<Value> sV = wait(tr.get(stock.key()));
ASSERT(sV.present());
{
BinaryReader r(sV.get(), IncludeVersion());
serializer(r, stock);
}
if (stock.s_quantity < threshold) {
++low_stock;
}
}
}
} catch (Error& e) {
return false;
}
return true;
}
ACTOR static Future<Void> emulatedUser(TPCC* self, Database cx, int w_id, int d_id) {
// stagger users
wait(delay(20.0 * deterministicRandom()->random01()));
TraceEvent("StartingEmulatedUser").detail("Warehouse", w_id).detail("District", d_id);
loop {
auto type = deterministicRandom()->randomInt(0, 100);
Future<bool> tx;
state double txnStartTime = g_network->now();
if (type < 4) {
tx = stockLevel(self, cx, w_id, d_id);
bool committed = wait(tx);
if (self->recordMetrics()) {
TPCCMetrics::updateMetrics(committed,
self->metrics.successfulStockLevelTransactions,
self->metrics.failedStockLevelTransactions,
txnStartTime,
self->metrics.stockLevelLatencies,
self->metrics.stockLevelResponseTime,
"StockLevel");
}
wait(delay(2 + deterministicRandom()->random01() * 10));
} else if (type < 8) {
tx = delivery(self, cx, w_id);
bool committed = wait(tx);
if (self->recordMetrics()) {
TPCCMetrics::updateMetrics(committed,
self->metrics.successfulDeliveryTransactions,
self->metrics.failedDeliveryTransactions,
txnStartTime,
self->metrics.deliveryLatencies,
self->metrics.deliveryResponseTime,
"Delivery");
}
wait(delay(2 + deterministicRandom()->random01() * 10));
} else if (type < 12) {
tx = orderStatus(self, cx, w_id);
bool committed = wait(tx);
if (self->recordMetrics()) {
TPCCMetrics::updateMetrics(committed,
self->metrics.successfulOrderStatusTransactions,
self->metrics.failedOrderStatusTransactions,
txnStartTime,
self->metrics.orderStatusLatencies,
self->metrics.orderStatusResponseTime,
"OrderStatus");
}
wait(delay(2 + deterministicRandom()->random01() * 20));
} else if (type < 55) {
tx = payment(self, cx, w_id);
bool committed = wait(tx);
if (self->recordMetrics()) {
TPCCMetrics::updateMetrics(committed,
self->metrics.successfulPaymentTransactions,
self->metrics.failedPaymentTransactions,
txnStartTime,
self->metrics.paymentLatencies,
self->metrics.paymentResponseTime,
"Payment");
}
wait(delay(3 + deterministicRandom()->random01() * 24));
} else {
tx = newOrder(self, cx, w_id);
bool committed = wait(tx);
if (self->recordMetrics()) {
TPCCMetrics::updateMetrics(committed,
self->metrics.successfulNewOrderTransactions,
self->metrics.failedNewOrderTransactions,
txnStartTime,
self->metrics.newOrderLatencies,
self->metrics.newOrderResponseTime,
"NewOrder");
}
wait(delay(18 + deterministicRandom()->random01() * 24));
}
}
}
double transactionsPerMinute() const {
return metrics.successfulNewOrderTransactions * 60.0 / (testDuration - 2 * warmupTime);
}
bool recordMetrics() const {
auto now = g_network->now();
return (now > startTime + warmupTime && now < startTime + testDuration - warmupTime);
}
Future<Void> start(Database const& cx) override {
if (clientId >= clientsUsed)
return Void();
return _start(cx, this);
}
ACTOR Future<Void> _start(Database cx, TPCC* self) {
wait(readGlobalState(self, cx));
self->startTime = g_network->now();
int startWID = self->clientId * self->warehousesPerClient;
int endWID = startWID + self->warehousesPerClient;
state int w_id;
state int d_id;
state std::vector<Future<Void>> emulatedUsers;
for (w_id = startWID; w_id < endWID; ++w_id) {
for (d_id = 0; d_id < 10; ++d_id) {
emulatedUsers.push_back(timeout(emulatedUser(self, cx, w_id, d_id), self->testDuration, Void()));
}
}
wait(waitForAll(emulatedUsers));
return Void();
}
Future<bool> check(Database const& cx) override {
return (transactionsPerMinute() > expectedTransactionsPerMinute);
}
void getMetrics(std::vector<PerfMetric>& m) override {
double multiplier = static_cast<double>(clientCount) / static_cast<double>(clientsUsed);
m.emplace_back("Transactions Per Minute", transactionsPerMinute(), Averaged::False);
m.emplace_back("Successful StockLevel Transactions", metrics.successfulStockLevelTransactions, Averaged::False);
m.emplace_back("Successful Delivery Transactions", metrics.successfulDeliveryTransactions, Averaged::False);
m.emplace_back(
"Successful OrderStatus Transactions", metrics.successfulOrderStatusTransactions, Averaged::False);
m.emplace_back("Successful Payment Transactions", metrics.successfulPaymentTransactions, Averaged::False);
m.emplace_back("Successful NewOrder Transactions", metrics.successfulNewOrderTransactions, Averaged::False);
m.emplace_back("Failed StockLevel Transactions", metrics.failedStockLevelTransactions, Averaged::False);
m.emplace_back("Failed Delivery Transactions", metrics.failedDeliveryTransactions, Averaged::False);
m.emplace_back("Failed OrderStatus Transactions", metrics.failedOrderStatusTransactions, Averaged::False);
m.emplace_back("Failed Payment Transactions", metrics.failedPaymentTransactions, Averaged::False);
m.emplace_back("Failed NewOrder Transactions", metrics.failedNewOrderTransactions, Averaged::False);
m.emplace_back("Mean StockLevel Latency",
(clientId < clientsUsed)
? (multiplier * metrics.stockLevelResponseTime / metrics.successfulStockLevelTransactions)
: 0.0,
Averaged::True);
m.emplace_back("Mean Delivery Latency",
(clientId < clientsUsed)
? (multiplier * metrics.deliveryResponseTime / metrics.successfulDeliveryTransactions)
: 0.0,
Averaged::True);
m.emplace_back("Mean OrderStatus Repsonse Time",
(clientId < clientsUsed)
? (multiplier * metrics.orderStatusResponseTime / metrics.successfulOrderStatusTransactions)
: 0.0,
Averaged::True);
m.emplace_back("Mean Payment Latency",
(clientId < clientsUsed)
? (multiplier * metrics.paymentResponseTime / metrics.successfulPaymentTransactions)
: 0.0,
Averaged::True);
m.emplace_back("Mean NewOrder Latency",
(clientId < clientsUsed)
? (multiplier * metrics.newOrderResponseTime / metrics.successfulNewOrderTransactions)
: 0.0,
Averaged::True);
metrics.sort();
m.emplace_back(
"Median StockLevel Latency", multiplier * TPCCMetrics::median(metrics.stockLevelLatencies), Averaged::True);
m.emplace_back(
"Median Delivery Latency", multiplier * TPCCMetrics::median(metrics.deliveryLatencies), Averaged::True);
m.emplace_back("Median OrderStatus Latency",
multiplier * TPCCMetrics::median(metrics.orderStatusLatencies),
Averaged::True);
m.emplace_back(
"Median Payment Latency", multiplier * TPCCMetrics::median(metrics.paymentLatencies), Averaged::True);
m.emplace_back(
"Median NewOrder Latency", multiplier * TPCCMetrics::median(metrics.newOrderLatencies), Averaged::True);
m.emplace_back("90th Percentile StockLevel Latency",
multiplier * TPCCMetrics::percentile_90(metrics.stockLevelLatencies),
Averaged::True);
m.emplace_back("90th Percentile Delivery Latency",
multiplier * TPCCMetrics::percentile_90(metrics.deliveryLatencies),
Averaged::True);
m.emplace_back("90th Percentile OrderStatus Latency",
multiplier * TPCCMetrics::percentile_90(metrics.orderStatusLatencies),
Averaged::True);
m.emplace_back("90th Percentile Payment Latency",
multiplier * TPCCMetrics::percentile_90(metrics.paymentLatencies),
Averaged::True);
m.emplace_back("90th Percentile NewOrder Latency",
multiplier * TPCCMetrics::percentile_90(metrics.newOrderLatencies),
Averaged::True);
m.emplace_back("99th Percentile StockLevel Latency",
multiplier * TPCCMetrics::percentile_99(metrics.stockLevelLatencies),
Averaged::True);
m.emplace_back("99th Percentile Delivery Latency",
multiplier * TPCCMetrics::percentile_99(metrics.deliveryLatencies),
Averaged::True);
m.emplace_back("99th Percentile OrderStatus Latency",
multiplier * TPCCMetrics::percentile_99(metrics.orderStatusLatencies),
Averaged::True);
m.emplace_back("99th Percentile Payment Latency",
multiplier * TPCCMetrics::percentile_99(metrics.paymentLatencies),
Averaged::True);
m.emplace_back("99th Percentile NewOrder Latency",
multiplier * TPCCMetrics::percentile_99(metrics.newOrderLatencies),
Averaged::True);
}
};
} // namespace
WorkloadFactory<TPCC> TPCCWorkloadFactory(TPCC::DESCRIPTION);

View File

@ -104,8 +104,6 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES SystemData.txt)
add_fdb_test(TEST_FILES ThreadSafety.txt IGNORE)
add_fdb_test(TEST_FILES TraceEventMetrics.txt IGNORE)
add_fdb_test(TEST_FILES PopulateTPCC.txt IGNORE)
add_fdb_test(TEST_FILES TPCC.txt IGNORE)
add_fdb_test(TEST_FILES default.txt IGNORE)
add_fdb_test(TEST_FILES errors.txt IGNORE)
add_fdb_test(TEST_FILES fail.txt IGNORE)

View File

@ -1,9 +0,0 @@
testTitle=PopulateTPCCTest
timeout=3600000
clearAfterTest=false
runConsistencyCheck=false
testName=PopulateTPCC
clientsUsed=2
actors=1
warehousesPerActor=200

View File

@ -1,19 +0,0 @@
testTitle=PopulateTPCCTest
clearAfterTest=false
runConsistencyCheck=false
timeout=3600000
testName=PopulateTPCC
clientsUsed=2
actorsPerClient=1
warehousesPerActor=80
testTitle=TPCCTest
timeout=14400
testName=TPCC
warehousesPerClient=4
testDuration=3600
warmupTime=300
clientsUsed=40
expectedTransactionsPerMinute=1000