foundationdb/fdbserver/SkipList.cpp

1478 lines
46 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* SkipList.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdint.h>
#include <memory.h>
#include <stdio.h>
#include <algorithm>
#include <numeric>
#include <string>
#include <vector>
2017-05-26 04:48:44 +08:00
/*
#ifdef __GNUG__
#include <smmintrin.h>
#endif
*/
#include "flow/Platform.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/PerfMetric.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/Knobs.h"
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
#define PARALLEL_THREAD_COUNT \
0 // FIXME: When >1, program execution (e.g. random numbers) is/was nondeterministic. Why?
2017-05-26 04:48:44 +08:00
using std::max;
2020-01-31 06:24:37 +08:00
using std::min;
2017-05-26 04:48:44 +08:00
static std::vector<PerfDoubleCounter*> skc;
2017-05-26 04:48:44 +08:00
static thread_local uint32_t g_seed = 0;
static inline int skfastrand() {
g_seed = g_seed * 1664525L + 1013904223L;
return g_seed;
}
void setAffinity(int proc);
class SlowConflictSet {
public:
2020-01-31 06:24:37 +08:00
bool is_conflict(const VectorRef<KeyRangeRef>& readRanges, Version read_snapshot);
void add(const VectorRef<KeyRangeRef>& clearRanges, const VectorRef<KeyValueRef>& setValues, Version now);
void clear(Version now);
2017-05-26 04:48:44 +08:00
private:
KeyRangeMap<Version> age;
};
2020-01-31 06:24:37 +08:00
bool SlowConflictSet::is_conflict(const VectorRef<KeyRangeRef>& readRanges, Version read_snapshot) {
for (auto range = readRanges.begin(); range != readRanges.end(); ++range) {
auto intersecting = age.intersectingRanges(*range);
for (auto it = intersecting.begin(); it != intersecting.end(); ++it)
if (it.value() > read_snapshot) return true;
2017-05-26 04:48:44 +08:00
}
return false;
}
2020-01-31 06:24:37 +08:00
void SlowConflictSet::clear(Version now) {
2017-05-26 04:48:44 +08:00
age.insert(allKeys, now);
}
2020-01-31 06:24:37 +08:00
void SlowConflictSet::add(const VectorRef<KeyRangeRef>& clearRanges, const VectorRef<KeyValueRef>& setValues,
Version now) {
for (auto c = clearRanges.begin(); c != clearRanges.end(); ++c) age.insert(*c, now);
for (auto s = setValues.begin(); s != setValues.end(); ++s) age.insert(s->key, now);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
PerfDoubleCounter g_buildTest("Build", skc), g_add("Add", skc), g_add_sort("A.Sort", skc),
g_detectConflicts("Detect", skc), g_sort("D.Sort", skc), g_combine("D.Combine", skc),
g_checkRead("D.CheckRead", skc), g_checkBatch("D.CheckIntraBatch", skc), g_merge("D.MergeWrite", skc),
g_merge_launch("D.Merge.Launch", skc), g_merge_fork("D.Merge.Fork", skc),
g_merge_start_var("D.Merge.StartVariance", skc), g_merge_end_var("D.Merge.EndVariance", skc),
g_merge_run_var("D.Merge.RunVariance", skc), g_merge_run_shortest("D.Merge.ShortestRun", skc),
g_merge_run_longest("D.Merge.LongestRun", skc), g_merge_run_total("D.Merge.TotalRun", skc),
g_merge_join("D.Merge.Join", skc), g_removeBefore("D.RemoveBefore", skc);
static force_inline int compare(const StringRef& a, const StringRef& b) {
int c = memcmp(a.begin(), b.begin(), min(a.size(), b.size()));
if (c < 0) return -1;
if (c > 0) return +1;
2017-05-26 04:48:44 +08:00
if (a.size() < b.size()) return -1;
if (a.size() == b.size()) return 0;
return +1;
}
struct ReadConflictRange {
StringRef begin, end;
Version version;
int transaction;
2020-01-31 06:24:37 +08:00
ReadConflictRange(StringRef begin, StringRef end, Version version, int transaction)
: begin(begin), end(end), version(version), transaction(transaction) {}
bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin) < 0; }
2017-05-26 04:48:44 +08:00
};
struct KeyInfo {
StringRef key;
int* pIndex;
bool nextKey;
bool begin;
bool write;
int transaction;
2020-01-31 06:24:37 +08:00
KeyInfo(){};
KeyInfo(StringRef key, bool nextKey, bool begin, bool write, int transaction, int* pIndex)
: key(key), nextKey(nextKey), begin(begin), write(write), transaction(transaction), pIndex(pIndex) {}
2017-05-26 04:48:44 +08:00
};
// returns true if done with string
2020-01-31 06:24:37 +08:00
force_inline bool getCharacter(const KeyInfo& ki, int character, int& outputCharacter) {
2017-05-26 04:48:44 +08:00
// normal case
2020-01-31 06:24:37 +08:00
if (character < ki.key.size()) {
2017-05-26 04:48:44 +08:00
outputCharacter = 5 + ki.key.begin()[character];
return false;
}
// nextKey append a zero
2020-01-31 06:24:37 +08:00
if (ki.nextKey && character >= ki.key.size()) {
if (character == ki.key.size()) {
outputCharacter = 5; // extra '0' character
2017-05-26 04:48:44 +08:00
return false;
}
character--;
}
// termination
2020-01-31 06:24:37 +08:00
if (character == ki.key.size()) {
2017-05-26 04:48:44 +08:00
outputCharacter = 0;
2020-01-31 06:24:37 +08:00
return false;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
if (character == ki.key.size() + 1) {
2017-05-26 04:48:44 +08:00
// end/begin+read/write relative sorting
2020-01-31 06:24:37 +08:00
outputCharacter = ki.begin * 2 + (ki.write ^ ki.begin);
2017-05-26 04:48:44 +08:00
return false;
}
outputCharacter = 0;
return true;
}
2020-01-31 06:24:37 +08:00
bool operator<(const KeyInfo& lhs, const KeyInfo& rhs) {
2017-05-26 04:48:44 +08:00
int i = min(lhs.key.size(), rhs.key.size());
2020-01-31 06:24:37 +08:00
int c = memcmp(lhs.key.begin(), rhs.key.begin(), i);
if (c != 0) return c < 0;
2017-05-26 04:48:44 +08:00
// SOMEDAY: This is probably not very fast. Slows D.Sort by ~20% relative to previous (incorrect) version.
bool lDone, rDone;
int lc, rc;
while (true) {
lDone = getCharacter(lhs, i, lc);
rDone = getCharacter(rhs, i, rc);
2020-01-31 06:24:37 +08:00
if (lDone && rDone) return false; // equality
2017-05-26 04:48:44 +08:00
if (lc < rc) return true;
if (lc > rc) return false;
i++;
}
}
2020-01-31 06:24:37 +08:00
bool operator==(const KeyInfo& lhs, const KeyInfo& rhs) {
return !(lhs < rhs || rhs < lhs);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
void swapSort(std::vector<KeyInfo>& points, int a, int b) {
if (points[b] < points[a]) {
2017-05-26 04:48:44 +08:00
KeyInfo temp;
temp = points[a];
points[a] = points[b];
points[b] = temp;
}
}
2020-01-31 06:24:37 +08:00
void smallSort(std::vector<KeyInfo>& points, int start, int N) {
for (int i = 1; i < N; i++)
for (int j = i; j > 0; j -= 2) swapSort(points, start + j - 1, start + j);
for (int i = N - 2; i > 0; i--)
for (int j = i; j > 0; j -= 2) swapSort(points, start + j - 1, start + j);
2017-05-26 04:48:44 +08:00
}
struct SortTask {
int begin;
int size;
int character;
SortTask(int begin, int size, int character) : begin(begin), size(size), character(character) {}
};
2020-01-31 06:24:37 +08:00
void sortPoints(std::vector<KeyInfo>& points) {
std::vector<SortTask> tasks;
std::vector<KeyInfo> newPoints;
std::vector<int> counts;
2017-05-26 04:48:44 +08:00
tasks.emplace_back(0, points.size(), 0);
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
while (tasks.size()) {
2017-05-26 04:48:44 +08:00
SortTask st = tasks.back();
tasks.pop_back();
2020-01-31 06:24:37 +08:00
if (st.size < 10) {
// smallSort(points, st.begin, st.size);
std::sort(points.begin() + st.begin, points.begin() + st.begin + st.size);
2017-05-26 04:48:44 +08:00
continue;
}
newPoints.resize(st.size);
2020-01-31 06:24:37 +08:00
counts.assign(256 + 5, 0);
2017-05-26 04:48:44 +08:00
// get counts
int c;
bool allDone = true;
2020-01-31 06:24:37 +08:00
for (int i = st.begin; i < st.begin + st.size; i++) {
2017-05-26 04:48:44 +08:00
allDone &= getCharacter(points[i], st.character, c);
counts[c]++;
}
2020-01-31 06:24:37 +08:00
if (allDone) continue;
2017-05-26 04:48:44 +08:00
// calculate offsets from counts and build next level of tasks
2020-01-31 06:24:37 +08:00
int total = 0;
for (int i = 0; i < counts.size(); i++) {
2017-05-26 04:48:44 +08:00
int temp = counts[i];
2020-01-31 06:24:37 +08:00
if (temp > 1) tasks.emplace_back(st.begin + total, temp, st.character + 1);
2017-05-26 04:48:44 +08:00
counts[i] = total;
total += temp;
}
// put in their places
2020-01-31 06:24:37 +08:00
for (int i = st.begin; i < st.begin + st.size; i++) {
2017-05-26 04:48:44 +08:00
getCharacter(points[i], st.character, c);
newPoints[counts[c]++] = points[i];
}
2020-01-31 06:24:37 +08:00
// copy back into original points array
for (int i = 0; i < st.size; i++) points[st.begin + i] = newPoints[i];
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
// cout << endl << "Radix sort done" << endl;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
class SkipList : NonCopyable {
2017-05-26 04:48:44 +08:00
private:
static const int MaxLevels = 26;
int randomLevel() {
/*int l = 0;
while (deterministicRandom()->random01() < 0.5 && l < MaxLevels-1) l++;
2017-05-26 04:48:44 +08:00
return l; */
2020-01-31 06:24:37 +08:00
// deterministicRandom()->randomInt(0, 1<<(MaxLevels-1));
uint32_t i = uint32_t(skfastrand()) >> (32 - (MaxLevels - 1));
2017-05-26 04:48:44 +08:00
int level = 0;
2020-01-31 06:24:37 +08:00
while (i & 1) {
i >>= 1;
2017-05-26 04:48:44 +08:00
level++;
}
2020-01-31 06:24:37 +08:00
ASSERT(level < MaxLevels);
2017-05-26 04:48:44 +08:00
return level;
}
/*
struct Node {
2020-01-31 06:24:37 +08:00
int nPointers, valueLength;
Node *pointers[nPointers];
Version maxVersions[nPointers];
char value[valueLength];
2017-05-26 04:48:44 +08:00
};
*/
struct Node {
2020-01-31 06:24:37 +08:00
int level() { return nPointers - 1; }
uint8_t* value() { return end() + nPointers * (sizeof(Node*) + sizeof(Version)); }
2017-05-26 04:48:44 +08:00
int length() { return valueLength; }
Node* getNext(int i) { return *((Node**)end() + i); }
2020-01-31 06:24:37 +08:00
void setNext(int i, Node* n) {
*((Node**)end() + i) = n;
#if defined(_DEBUG) || 1
/*if (n && n->level() < i)
*(volatile int*)0 = 0;*/
#endif
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
Version getMaxVersion(int i) { return ((Version*)(end() + nPointers * sizeof(Node*)))[i]; }
void setMaxVersion(int i, Version v) { ((Version*)(end() + nPointers * sizeof(Node*)))[i] = v; }
2017-05-26 04:48:44 +08:00
// Return a node with initialized value but uninitialized pointers
2020-01-31 06:24:37 +08:00
static Node* create(const StringRef& value, int level) {
int nodeSize = sizeof(Node) + value.size() + (level + 1) * (sizeof(Node*) + sizeof(Version));
2017-05-26 04:48:44 +08:00
Node* n;
if (nodeSize <= 64) {
n = (Node*)FastAllocator<64>::allocate();
INSTRUMENT_ALLOCATE("SkipListNode64");
} else if (nodeSize <= 128) {
n = (Node*)FastAllocator<128>::allocate();
INSTRUMENT_ALLOCATE("SkipListNode128");
} else {
2020-01-31 06:24:37 +08:00
n = (Node*)new char[nodeSize];
2017-05-26 04:48:44 +08:00
INSTRUMENT_ALLOCATE("SkipListNodeLarge");
}
2020-01-31 06:24:37 +08:00
n->nPointers = level + 1;
2017-05-26 04:48:44 +08:00
n->valueLength = value.size();
if (value.size() > 0) {
memcpy(n->value(), value.begin(), value.size());
}
2017-05-26 04:48:44 +08:00
return n;
}
// pre: level>0, all lower level nodes between this and getNext(level) have correct maxversions
2020-01-31 06:24:37 +08:00
void calcVersionForLevel(int level) {
Node* end = getNext(level);
Version v = getMaxVersion(level - 1);
for (Node* x = getNext(level - 1); x != end; x = x->getNext(level - 1))
v = max(v, x->getMaxVersion(level - 1));
2017-05-26 04:48:44 +08:00
setMaxVersion(level, v);
}
void destroy() {
int nodeSize = getNodeSize();
if (nodeSize <= 64) {
FastAllocator<64>::release(this);
INSTRUMENT_RELEASE("SkipListNode64");
} else if (nodeSize <= 128) {
FastAllocator<128>::release(this);
INSTRUMENT_RELEASE("SkipListNode128");
} else {
2020-01-31 06:24:37 +08:00
delete[](char*) this;
2017-05-26 04:48:44 +08:00
INSTRUMENT_RELEASE("SkipListNodeLarge");
}
}
2020-01-31 06:24:37 +08:00
2017-05-26 04:48:44 +08:00
private:
2020-01-31 06:24:37 +08:00
int getNodeSize() { return sizeof(Node) + valueLength + nPointers * (sizeof(Node*) + sizeof(Version)); }
uint8_t* end() { return (uint8_t*)(this + 1); }
int nPointers, valueLength;
2017-05-26 04:48:44 +08:00
};
2020-01-31 06:24:37 +08:00
static force_inline bool less(const uint8_t* a, int aLen, const uint8_t* b, int bLen) {
2017-05-26 04:48:44 +08:00
int len = min(aLen, bLen);
2020-01-31 06:24:37 +08:00
for (int i = 0; i < len; i++)
2017-05-26 04:48:44 +08:00
if (a[i] < b[i])
return true;
else if (a[i] > b[i])
return false;
/*int c = memcmp(a,b,min(aLen,bLen));
if (c<0) return true;
if (c>0) return false;*/
return aLen < bLen;
}
2020-01-31 06:24:37 +08:00
Node* header;
2017-05-26 04:48:44 +08:00
void destroy() {
Node *next, *x;
2020-01-31 06:24:37 +08:00
for (x = header; x; x = next) {
2017-05-26 04:48:44 +08:00
next = x->getNext(0);
x->destroy();
}
}
2020-01-31 06:24:37 +08:00
2017-05-26 04:48:44 +08:00
public:
2020-01-31 06:24:37 +08:00
struct Finger {
Node* finger[MaxLevels]; // valid for levels >= level
2017-05-26 04:48:44 +08:00
int level;
Node* x;
2020-01-31 06:24:37 +08:00
Node* alreadyChecked;
2017-05-26 04:48:44 +08:00
StringRef value;
Finger() : level(MaxLevels), x(NULL), alreadyChecked(NULL) {}
2020-01-31 06:24:37 +08:00
Finger(Node* header, const StringRef& ptr) : value(ptr), level(MaxLevels), alreadyChecked(NULL), x(header) {}
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
void init(const StringRef& value, Node* header) {
2017-05-26 04:48:44 +08:00
this->value = value;
x = header;
alreadyChecked = NULL;
level = MaxLevels;
}
// pre: !finished()
force_inline void prefetch() {
2020-01-31 06:24:37 +08:00
Node* next = x->getNext(level - 1);
_mm_prefetch((const char*)next, _MM_HINT_T0);
// if ( (((intptr_t)next) & 64) == 0 )
_mm_prefetch((const char*)next + 64, _MM_HINT_T0);
2017-05-26 04:48:44 +08:00
//_mm_prefetch( (const char*)next+128, _MM_HINT_T0 );
//_mm_prefetch( (const char*)next+192, _MM_HINT_T0 );
//_mm_prefetch( (const char*)next+256, _MM_HINT_T0 );
//_mm_prefetch( (const char*)next+320, _MM_HINT_T0 );
}
// pre: !finished()
// Returns true if we have advanced to the next level
force_inline bool advance() {
2020-01-31 06:24:37 +08:00
Node* next = x->getNext(level - 1);
2017-05-26 04:48:44 +08:00
if (next == alreadyChecked || !less(next->value(), next->length(), value.begin(), value.size())) {
alreadyChecked = next;
level--;
finger[level] = x;
return true;
} else {
x = next;
return false;
}
}
// pre: !finished()
force_inline void nextLevel() {
2020-01-31 06:24:37 +08:00
while (!advance())
;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
force_inline bool finished() { return level == 0; }
2017-05-26 04:48:44 +08:00
force_inline Node* found() const {
// valid after finished returns true
2020-01-31 06:24:37 +08:00
Node* n = finger[0]->getNext(0); // or alreadyChecked, but that is more easily invalidated
2017-05-26 04:48:44 +08:00
if (n && n->length() == value.size() && !memcmp(n->value(), value.begin(), value.size()))
return n;
2020-01-31 06:24:37 +08:00
else
2017-05-26 04:48:44 +08:00
return NULL;
}
StringRef getValue() const {
Node* n = finger[0]->getNext(0);
2020-01-31 06:24:37 +08:00
return n ? StringRef(n->value(), n->length()) : StringRef();
2017-05-26 04:48:44 +08:00
}
};
int count() {
int count = 0;
Node* x = header->getNext(0);
2020-01-31 06:24:37 +08:00
while (x) {
x = x->getNext(0);
count++;
2017-05-26 04:48:44 +08:00
}
return count;
}
2020-01-31 06:24:37 +08:00
explicit SkipList(Version version = 0) {
header = Node::create(StringRef(), MaxLevels - 1);
for (int l = 0; l < MaxLevels; l++) {
2017-05-26 04:48:44 +08:00
header->setNext(l, NULL);
header->setMaxVersion(l, version);
}
}
2020-01-31 06:24:37 +08:00
~SkipList() { destroy(); }
SkipList(SkipList&& other) BOOST_NOEXCEPT : header(other.header) { other.header = NULL; }
void operator=(SkipList&& other) BOOST_NOEXCEPT {
2017-05-26 04:48:44 +08:00
destroy();
header = other.header;
other.header = NULL;
}
2020-01-31 06:24:37 +08:00
void swap(SkipList& other) { std::swap(header, other.header); }
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
void addConflictRanges(const Finger* fingers, int rangeCount, Version version) {
for (int r = rangeCount - 1; r >= 0; r--) {
const Finger& startF = fingers[r * 2];
const Finger& endF = fingers[r * 2 + 1];
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
if (endF.found() == NULL) insert(endF, endF.finger[0]->getMaxVersion(0));
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
remove(startF, endF);
insert(startF, version);
2017-05-26 04:48:44 +08:00
}
}
2020-01-31 06:24:37 +08:00
void detectConflicts(ReadConflictRange* ranges, int count, bool* transactionConflictStatus) {
2017-05-26 04:48:44 +08:00
const int M = 16;
int nextJob[M];
2020-01-31 06:24:37 +08:00
CheckMax inProgress[M];
2017-05-26 04:48:44 +08:00
if (!count) return;
2020-01-31 06:24:37 +08:00
int started = min(M, count);
for (int i = 0; i < started; i++) {
inProgress[i].init(ranges[i], header, transactionConflictStatus);
nextJob[i] = i + 1;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
nextJob[started - 1] = 0;
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
int prevJob = started - 1;
2017-05-26 04:48:44 +08:00
int job = 0;
// vtune: 340 parts
while (true) {
if (inProgress[job].advance()) {
2020-01-31 06:24:37 +08:00
if (started == count) {
2017-05-26 04:48:44 +08:00
if (prevJob == job) break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
2020-01-31 06:24:37 +08:00
} else
inProgress[job].init(ranges[started++], header, transactionConflictStatus);
2017-05-26 04:48:44 +08:00
}
prevJob = job;
job = nextJob[job];
}
}
// Splits the version history represented by this skiplist into separate key ranges
// delimited by the given array of keys. This SkipList is left empty. this->partition
// is intended to be followed by a call to this->concatenate() recombining the same
// partitions. In between, operations on each partition must not touch any keys outside
// the partition. Specifically, the partition to the left of 'key' must not have a range
// [...,key) inserted, since that would insert an entry at 'key'.
2020-01-31 06:24:37 +08:00
void partition(StringRef* begin, int splitCount, SkipList* output) {
for (int i = splitCount - 1; i >= 0; i--) {
Finger f(header, begin[i]);
while (!f.finished()) f.nextLevel();
split(f, output[i + 1]);
2017-05-26 04:48:44 +08:00
}
swap(output[0]);
}
2020-01-31 06:24:37 +08:00
void concatenate(SkipList* input, int count) {
std::vector<Finger> ends(count - 1);
for (int i = 0; i < ends.size(); i++) input[i].getEnd(ends[i]);
for (int l = 0; l < MaxLevels; l++) {
for (int i = ends.size() - 1; i >= 0; i--) {
ends[i].finger[l]->setNext(l, input[i + 1].header->getNext(l));
if (l && (!i || ends[i].finger[l] != input[i].header)) ends[i].finger[l]->calcVersionForLevel(l);
input[i + 1].header->setNext(l, NULL);
2017-05-26 04:48:44 +08:00
}
}
swap(input[0]);
}
2020-01-31 06:24:37 +08:00
void find(const StringRef* values, Finger* results, int* temp, int count) {
// Relying on the ordering of values, descend until the values aren't all in the
2017-05-26 04:48:44 +08:00
// same part of the tree
// vtune: 11 parts
2020-01-31 06:24:37 +08:00
results[0].init(values[0], header);
const StringRef& endValue = values[count - 1];
while (results[0].level > 1) {
2017-05-26 04:48:44 +08:00
results[0].nextLevel();
Node* ac = results[0].alreadyChecked;
2020-01-31 06:24:37 +08:00
if (ac && less(ac->value(), ac->length(), endValue.begin(), endValue.size())) break;
2017-05-26 04:48:44 +08:00
}
// Init all the other fingers to start descending where we stopped
// the first one
// SOMEDAY: this loop showed up on vtune, could be faster?
// vtune: 8 parts
2020-01-31 06:24:37 +08:00
int startLevel = results[0].level + 1;
Node* x = startLevel < MaxLevels ? results[0].finger[startLevel] : header;
for (int i = 1; i < count; i++) {
2017-05-26 04:48:44 +08:00
results[i].level = startLevel;
results[i].x = x;
results[i].alreadyChecked = NULL;
results[i].value = values[i];
2020-01-31 06:24:37 +08:00
for (int j = startLevel; j < MaxLevels; j++) results[i].finger[j] = results[0].finger[j];
2017-05-26 04:48:44 +08:00
}
int* nextJob = temp;
2020-01-31 06:24:37 +08:00
for (int i = 0; i < count - 1; i++) nextJob[i] = i + 1;
nextJob[count - 1] = 0;
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
int prevJob = count - 1;
2017-05-26 04:48:44 +08:00
int job = 0;
// vtune: 225 parts
while (true) {
Finger* f = &results[job];
f->advance();
if (f->finished()) {
if (prevJob == job) break;
nextJob[prevJob] = nextJob[job];
2020-01-31 06:24:37 +08:00
} else {
2017-05-26 04:48:44 +08:00
f->prefetch();
prevJob = job;
}
job = nextJob[job];
}
}
/*Finger randomFinger() {
2020-01-31 06:24:37 +08:00
// Written, not exactly uniform, not tested
Finger f( header, StringRef() );
Node* begin = header, *end = 0;
for(int lev = MaxLevels-1; lev>=0; lev--) {
int length = 0;
for( Node* x = begin; x != end; x=x->getNext(lev) )
length++;
if (length == 1) { // forced down
f.finger[lev] = begin;
} else {
int c = deterministicRandom()->randomInt(0, length);
for( Node* x = begin; x != end; x=x->getNext(lev) )
if (!c--) {
f.finger[lev] = begin = x;
end = x->getNext(lev);
break;
}
}
}
f.level = 0;
return f;
2017-05-26 04:48:44 +08:00
}*/
2020-01-31 06:24:37 +08:00
int removeBefore(Version v, Finger& f, int nodeCount) {
2017-05-26 04:48:44 +08:00
/*Finger f( header, StringRef() );
for(int i=0; i<MaxLevels; i++)
2020-01-31 06:24:37 +08:00
f.finger[i] = header;
2017-05-26 04:48:44 +08:00
f.level = 0;*/
// f.x, f.alreadyChecked?
int removedCount = 0;
bool wasAbove = true;
while (nodeCount--) {
Node* x = f.finger[0]->getNext(0);
if (!x) break;
2020-01-31 06:24:37 +08:00
2017-05-26 04:48:44 +08:00
// double prefetch gives +25% speed (single threaded)
Node* next = x->getNext(0);
2020-01-31 06:24:37 +08:00
_mm_prefetch((const char*)next, _MM_HINT_T0);
2017-05-26 04:48:44 +08:00
//_mm_prefetch( (const char*)next+64, _MM_HINT_T0 );
next = x->getNext(1);
2020-01-31 06:24:37 +08:00
_mm_prefetch((const char*)next, _MM_HINT_T0);
2017-05-26 04:48:44 +08:00
//_mm_prefetch( (const char*)next+64, _MM_HINT_T0 );
bool isAbove = x->getMaxVersion(0) >= v;
2020-01-31 06:24:37 +08:00
if (isAbove || wasAbove) { // f.nextItem
for (int l = 0; l <= x->level(); l++) f.finger[l] = x;
} else { // f.eraseItem
2017-05-26 04:48:44 +08:00
removedCount++;
2020-01-31 06:24:37 +08:00
for (int l = 0; l <= x->level(); l++) f.finger[l]->setNext(l, x->getNext(l));
for (int i = 1; i <= x->level(); i++)
f.finger[i]->setMaxVersion(i, max(f.finger[i]->getMaxVersion(i), x->getMaxVersion(i)));
2017-05-26 04:48:44 +08:00
x->destroy();
}
wasAbove = isAbove;
}
return removedCount;
}
private:
2020-01-31 06:24:37 +08:00
void remove(const Finger& start, const Finger& end) {
if (start.finger[0] == end.finger[0]) return;
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
Node* x = start.finger[0]->getNext(0);
2017-05-26 04:48:44 +08:00
// vtune says: this loop is the expensive parts (6 parts)
2020-01-31 06:24:37 +08:00
for (int i = 0; i < MaxLevels; i++)
if (start.finger[i] != end.finger[i]) start.finger[i]->setNext(i, end.finger[i]->getNext(i));
2017-05-26 04:48:44 +08:00
while (true) {
Node* next = x->getNext(0);
x->destroy();
if (x == end.finger[0]) break;
x = next;
}
}
2020-01-31 06:24:37 +08:00
// void insert( const std::string& v, Version version ) { insert(StringRef(v), version); }
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
void insert(const Finger& f, Version version) {
2017-05-26 04:48:44 +08:00
int level = randomLevel();
2020-01-31 06:24:37 +08:00
// cout << std::string((const char*)value,length) << " level: " << level << endl;
Node* x = Node::create(f.value, level);
2017-05-26 04:48:44 +08:00
x->setMaxVersion(0, version);
2020-01-31 06:24:37 +08:00
for (int i = 0; i <= level; i++) {
2017-05-26 04:48:44 +08:00
x->setNext(i, f.finger[i]->getNext(i));
f.finger[i]->setNext(i, x);
}
// vtune says: this loop is the costly part of this function
2020-01-31 06:24:37 +08:00
for (int i = 1; i <= level; i++) {
2017-05-26 04:48:44 +08:00
f.finger[i]->calcVersionForLevel(i);
x->calcVersionForLevel(i);
}
2020-01-31 06:24:37 +08:00
for (int i = level + 1; i < MaxLevels; i++) {
2017-05-26 04:48:44 +08:00
Version v = f.finger[i]->getMaxVersion(i);
if (v >= version) break;
f.finger[i]->setMaxVersion(i, version);
}
}
2020-01-31 06:24:37 +08:00
void insert(const StringRef& value, Version version) {
2017-05-26 04:48:44 +08:00
Finger f(header, value);
2020-01-31 06:24:37 +08:00
while (!f.finished()) f.nextLevel();
2017-05-26 04:48:44 +08:00
// SOMEDAY: equality?
2020-01-31 06:24:37 +08:00
insert(f, version);
2017-05-26 04:48:44 +08:00
}
struct CheckMax {
Finger start, end;
Version version;
2020-01-31 06:24:37 +08:00
bool* result;
2017-05-26 04:48:44 +08:00
int state;
2020-01-31 06:24:37 +08:00
void init(const ReadConflictRange& r, Node* header, bool* tCS) {
this->start.init(r.begin, header);
this->end.init(r.end, header);
2017-05-26 04:48:44 +08:00
this->version = r.version;
2020-01-31 06:24:37 +08:00
result = &tCS[r.transaction];
2017-05-26 04:48:44 +08:00
this->state = 0;
}
bool noConflict() { return true; }
2020-01-31 06:24:37 +08:00
bool conflict() {
*result = true;
return true;
}
2017-05-26 04:48:44 +08:00
// Return true if finished
force_inline bool advance() {
switch (state) {
case 0:
// find where start and end fingers diverge
while (true) {
if (!start.advance()) {
start.prefetch();
return false;
}
end.x = start.x;
2020-01-31 06:24:37 +08:00
while (!end.advance())
;
2017-05-26 04:48:44 +08:00
int l = start.level;
2020-01-31 06:24:37 +08:00
if (start.finger[l] != end.finger[l]) break;
2017-05-26 04:48:44 +08:00
// accept if the range spans the check range, but does not have a greater version
2020-01-31 06:24:37 +08:00
if (start.finger[l]->getMaxVersion(l) <= version) return noConflict();
if (l == 0) return conflict();
2017-05-26 04:48:44 +08:00
}
state = 1;
2020-01-31 06:24:37 +08:00
case 1: {
// check the end side of the pyramid
Node* e = end.finger[end.level];
while (e->getMaxVersion(end.level) > version) {
if (end.finished()) return conflict();
end.nextLevel();
Node* f = end.finger[end.level];
while (e != f) {
if (e->getMaxVersion(end.level) > version) return conflict();
e = e->getNext(end.level);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
}
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
// check the start side of the pyramid
Node* s = end.finger[start.level];
while (true) {
Node* nextS = start.finger[start.level]->getNext(start.level);
Node* p = nextS;
while (p != s) {
if (p->getMaxVersion(start.level) > version) return conflict();
p = p->getNext(start.level);
}
if (start.finger[start.level]->getMaxVersion(start.level) <= version) return noConflict();
s = nextS;
if (start.finished()) {
if (nextS->length() == start.value.size() &&
!memcmp(nextS->value(), start.value.begin(), start.value.size()))
2017-05-26 04:48:44 +08:00
return noConflict();
2020-01-31 06:24:37 +08:00
else
return conflict();
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
start.nextLevel();
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
}
2017-05-26 04:48:44 +08:00
default:
__assume(false);
}
}
};
2020-01-31 06:24:37 +08:00
void split(const Finger& f, SkipList& right) {
ASSERT(!right.header->getNext(0)); // right must be empty
2017-05-26 04:48:44 +08:00
right.header->setMaxVersion(0, f.finger[0]->getMaxVersion(0));
2020-01-31 06:24:37 +08:00
for (int l = 0; l < MaxLevels; l++) {
2017-05-26 04:48:44 +08:00
right.header->setNext(l, f.finger[l]->getNext(l));
f.finger[l]->setNext(l, NULL);
/*if (l) {
2020-01-31 06:24:37 +08:00
// SOMEDAY: Do we actually need these?
right.header->calcVersionForLevel(l);
f.finger[l]->calcVersionForLevel(l);
2017-05-26 04:48:44 +08:00
}*/
}
2020-01-31 06:24:37 +08:00
}
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
void getEnd(Finger& end) {
2017-05-26 04:48:44 +08:00
Node* node = header;
2020-01-31 06:24:37 +08:00
for (int l = MaxLevels - 1; l >= 0; l--) {
2017-05-26 04:48:44 +08:00
Node* next;
2020-01-31 06:24:37 +08:00
while ((next = node->getNext(l)) != NULL) node = next;
2017-05-26 04:48:44 +08:00
end.finger[l] = node;
}
end.level = 0;
// SOMEDAY: end.x? end.alreadyChecked?
/*end = Finger(header, (const uint8_t*)"\xff\xff\xff\xff\xff\xff", 6);
while (!end.finished())
2020-01-31 06:24:37 +08:00
end.nextLevel();*/
2017-05-26 04:48:44 +08:00
}
};
2020-01-31 06:24:37 +08:00
struct Action {
virtual void operator()() = 0; // self-destructs
2017-05-26 04:48:44 +08:00
};
typedef Action* PAction;
template <class F>
2020-01-31 06:24:37 +08:00
PAction action(F&& f) {
2017-05-26 04:48:44 +08:00
struct FAction : Action, F, FastAllocated<FAction> {
2020-01-31 06:24:37 +08:00
FAction(F&& f) : F(std::move(f)) {}
virtual void operator()() {
F::operator()();
delete this;
}
2017-05-26 04:48:44 +08:00
};
2020-01-31 06:24:37 +08:00
return new FAction(std::move(f));
2017-05-26 04:48:44 +08:00
};
2020-01-31 06:24:37 +08:00
void workerThread(PAction* nextAction, Event* nextActionReady, int index, Event* whenFinished) {
2017-05-26 04:48:44 +08:00
ASSERT(false);
/*
inThread<Void>( [nextAction,nextActionReady,index,whenFinished]()->Void {
2020-01-31 06:24:37 +08:00
g_seed = index*123; fastrand();
setAffinity( index );
while (true) {
try {
nextActionReady->block(); // auto-reset
Action* action = *nextAction;
*nextAction = 0;
if (!action) break;
(*action)();
} catch (Error& e) {
fprintf(stderr, "Error in worker thread: %s\n", e.what());
} catch (...) {
fprintf(stderr, "Error in worker thread: %s\n", unknown_error().what());
}
}
//cout << "Worker thread finished" << endl;
whenFinished->set();
return Void();
2017-05-26 04:48:44 +08:00
});*/
}
2020-01-31 06:24:37 +08:00
StringRef setK(Arena& arena, int i) {
char t[sizeof(i)];
2017-05-26 04:48:44 +08:00
*(int*)t = i;
const int keySize = 16;
2020-01-31 06:24:37 +08:00
char* ss = new (arena) char[keySize];
for (int c = 0; c < keySize - sizeof(i); c++) ss[c] = '.';
for (int c = 0; c < sizeof(i); c++) ss[c + keySize - sizeof(i)] = t[sizeof(i) - 1 - c];
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
return StringRef((const uint8_t*)ss, keySize);
2017-05-26 04:48:44 +08:00
}
#include "fdbserver/ConflictSet.h"
2017-05-26 04:48:44 +08:00
struct ConflictSet {
ConflictSet() : oldestVersion(0) {
static_assert(PARALLEL_THREAD_COUNT == 0, "workerThread() not implemented");
2020-01-31 06:24:37 +08:00
static_assert(PARALLEL_THREAD_COUNT == 0 || FASTALLOC_THREAD_SAFE,
"Thread safe fast allocator required for multithreaded conflict set");
2017-05-26 04:48:44 +08:00
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) {
2020-01-31 06:24:37 +08:00
worker_nextAction.push_back(NULL);
worker_ready.push_back(new Event);
worker_finished.push_back(new Event);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
for (int t = 0; t < worker_nextAction.size(); t++)
workerThread(&worker_nextAction[t], worker_ready[t], (t)*2, worker_finished[t]);
2017-05-26 04:48:44 +08:00
}
~ConflictSet() {
2020-01-31 06:24:37 +08:00
for (int i = 0; i < worker_nextAction.size(); i++) {
2017-05-26 04:48:44 +08:00
worker_nextAction[i] = 0;
worker_ready[i]->set();
}
// Wait for workers to terminate; otherwise can get crashes at shutdown time
2020-01-31 06:24:37 +08:00
for (int i = 0; i < worker_finished.size(); i++) worker_finished[i]->block();
2017-05-26 04:48:44 +08:00
}
SkipList versionHistory;
Key removalKey;
Version oldestVersion;
std::vector<PAction> worker_nextAction;
std::vector<Event*> worker_ready;
std::vector<Event*> worker_finished;
2017-05-26 04:48:44 +08:00
};
2020-01-31 06:24:37 +08:00
ConflictSet* newConflictSet() {
return new ConflictSet;
}
void clearConflictSet(ConflictSet* cs, Version v) {
SkipList(v).swap(cs->versionHistory);
2017-05-26 04:48:44 +08:00
}
void destroyConflictSet(ConflictSet* cs) {
delete cs;
}
2020-01-31 06:24:37 +08:00
ConflictBatch::ConflictBatch(ConflictSet* cs) : cs(cs), transactionCount(0) {}
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
ConflictBatch::~ConflictBatch() {}
2017-05-26 04:48:44 +08:00
struct TransactionInfo {
2020-01-31 06:24:37 +08:00
VectorRef<std::pair<int, int>> readRanges;
VectorRef<std::pair<int, int>> writeRanges;
2017-05-26 04:48:44 +08:00
bool tooOld;
};
2020-01-31 06:24:37 +08:00
void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
2017-05-26 04:48:44 +08:00
int t = transactionCount++;
Arena& arena = transactionInfo.arena();
TransactionInfo* info = new (arena) TransactionInfo;
if (tr.read_snapshot < cs->oldestVersion && tr.read_conflict_ranges.size()) {
info->tooOld = true;
} else {
info->tooOld = false;
2020-01-31 06:24:37 +08:00
info->readRanges.resize(arena, tr.read_conflict_ranges.size());
info->writeRanges.resize(arena, tr.write_conflict_ranges.size());
2017-05-26 04:48:44 +08:00
std::vector<KeyInfo>& points = this->points;
2020-01-31 06:24:37 +08:00
for (int r = 0; r < tr.read_conflict_ranges.size(); r++) {
2017-05-26 04:48:44 +08:00
const KeyRangeRef& range = tr.read_conflict_ranges[r];
points.emplace_back(range.begin, false, true, false, t, &info->readRanges[r].first);
2020-01-31 06:24:37 +08:00
// points.back().keyEnd = StringRef(buf,range.second);
points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
for (int r = 0; r < tr.write_conflict_ranges.size(); r++) {
2017-05-26 04:48:44 +08:00
const KeyRangeRef& range = tr.write_conflict_ranges[r];
points.emplace_back(range.begin, false, true, true, t, &info->writeRanges[r].first);
points.emplace_back(range.end, false, false, true, t, &info->writeRanges[r].second);
2017-05-26 04:48:44 +08:00
}
}
2020-01-31 06:24:37 +08:00
this->transactionInfo.push_back(arena, info);
2017-05-26 04:48:44 +08:00
}
class MiniConflictSet2 : NonCopyable {
std::vector<bool> values;
2020-01-31 06:24:37 +08:00
2017-05-26 04:48:44 +08:00
public:
2020-01-31 06:24:37 +08:00
explicit MiniConflictSet2(int size) { values.assign(size, false); }
void set(int begin, int end) {
for (int i = begin; i < end; i++) values[i] = true;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
bool any(int begin, int end) {
for (int i = begin; i < end; i++)
if (values[i]) return true;
2017-05-26 04:48:44 +08:00
return false;
}
};
class MiniConflictSet : NonCopyable {
typedef uint64_t wordType;
2020-01-31 06:24:37 +08:00
enum { bucketShift = 6, bucketMask = sizeof(wordType) * 8 - 1 };
std::vector<wordType> values; // undefined when andValues is true for a range of values
std::vector<wordType> orValues;
std::vector<wordType> andValues;
2020-01-31 06:24:37 +08:00
MiniConflictSet2 debug; // SOMEDAY: Test on big ranges, eliminate this
2017-05-26 04:48:44 +08:00
2019-12-06 07:03:04 +08:00
wordType bitMask(unsigned int bit) { // computes results for bit%word
2019-12-06 03:12:59 +08:00
return (((wordType)1) << (bit & bucketMask));
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
void setNthBit(std::vector<wordType>& v, const unsigned int bit) { v[bit >> bucketShift] |= bitMask(bit); }
void clearNthBit(std::vector<wordType>& v, const unsigned int bit) { v[bit >> bucketShift] &= ~(bitMask(bit)); }
bool getNthBit(const std::vector<wordType>& v, const unsigned int bit) {
return (v[bit >> bucketShift] & bitMask(bit)) != 0;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
int wordsForNBits(unsigned int bits) { return (bits + ((1 << bucketShift) - 1)) >> bucketShift; }
wordType highBits(int b) { // bits (b&bucketMask) and higher are 1
#pragma warning(disable : 4146)
return -bitMask(b);
2020-01-31 06:24:37 +08:00
#pragma warning(default : 4146)
2017-05-26 04:48:44 +08:00
}
2019-12-06 03:12:59 +08:00
wordType lowBits(int b) { // bits lower than (b&bucketMask) are 1
return bitMask(b) - 1;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
wordType lowBits2(int b) { return (b & bucketMask) ? lowBits(b) : -1; }
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
void setBits(std::vector<wordType>& v, int bitBegin, int bitEnd, bool fillMiddle) {
if (bitBegin >= bitEnd) return;
int beginWord = bitBegin >> bucketShift;
int lastWord = ((bitEnd + bucketMask) >> bucketShift) - 1;
if (beginWord == lastWord) {
2017-05-26 04:48:44 +08:00
v[beginWord] |= highBits(bitBegin) & lowBits2(bitEnd);
} else {
v[beginWord] |= highBits(bitBegin);
if (fillMiddle)
2020-01-31 06:24:37 +08:00
for (int w = beginWord + 1; w < lastWord; w++) v[w] = wordType(-1);
2017-05-26 04:48:44 +08:00
v[lastWord] |= lowBits2(bitEnd);
}
}
bool orBits(std::vector<wordType>& v, int bitBegin, int bitEnd, bool getMiddle) {
2017-05-26 04:48:44 +08:00
if (bitBegin >= bitEnd) return false;
int beginWord = bitBegin >> bucketShift;
2020-01-31 06:24:37 +08:00
int lastWord = ((bitEnd + bucketMask) >> bucketShift) - 1;
2017-05-26 04:48:44 +08:00
if (beginWord == lastWord)
return (v[beginWord] & highBits(bitBegin) & lowBits2(bitEnd)) != 0;
else {
if (getMiddle)
2020-01-31 06:24:37 +08:00
for (int w = beginWord + 1; w < lastWord; w++)
if (v[w]) return true;
2017-05-26 04:48:44 +08:00
return ((v[beginWord] & highBits(bitBegin)) | (v[lastWord] & lowBits2(bitEnd))) != 0;
}
}
bool orImpl(int begin, int end) {
if (begin == end) return false;
int beginWord = begin >> bucketShift;
int lastWord = ((end + bucketMask) >> bucketShift) - 1;
return orBits(orValues, beginWord + 1, lastWord, true) || getNthBit(andValues, beginWord) ||
getNthBit(andValues, lastWord) || orBits(values, begin, end, false);
}
2017-05-26 04:48:44 +08:00
public:
2020-01-31 06:24:37 +08:00
explicit MiniConflictSet(int size) : debug(size) {
static_assert((1 << bucketShift) == sizeof(wordType) * 8, "BucketShift incorrect");
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
values.assign(wordsForNBits(size), false);
orValues.assign(wordsForNBits(wordsForNBits(size)), false);
andValues.assign(wordsForNBits(wordsForNBits(size)), false);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
void set(int begin, int end) {
debug.set(begin, end);
2017-05-26 04:48:44 +08:00
if (begin == end) return;
2020-01-31 06:24:37 +08:00
int beginWord = begin >> bucketShift;
int lastWord = ((end + bucketMask) >> bucketShift) - 1;
2017-05-26 04:48:44 +08:00
setBits(values, begin, end, false);
2020-01-31 06:24:37 +08:00
setBits(andValues, beginWord + 1, lastWord, true);
setBits(orValues, beginWord, lastWord + 1, true);
2017-05-26 04:48:44 +08:00
}
bool any(int begin, int end) {
2020-01-31 06:24:37 +08:00
bool a = orImpl(begin, end);
bool b = debug.any(begin, end);
ASSERT(a == b);
2017-05-26 04:48:44 +08:00
return b;
}
};
void ConflictBatch::checkIntraBatchConflicts() {
int index = 0;
2020-01-31 06:24:37 +08:00
for (int p = 0; p < points.size(); p++) *points[p].pIndex = index++;
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
MiniConflictSet mcs(index);
for (int t = 0; t < transactionInfo.size(); t++) {
2017-05-26 04:48:44 +08:00
const TransactionInfo& tr = *transactionInfo[t];
if (transactionConflictStatus[t]) continue;
bool conflict = tr.tooOld;
2020-01-31 06:24:37 +08:00
for (int i = 0; i < tr.readRanges.size(); i++)
if (mcs.any(tr.readRanges[i].first, tr.readRanges[i].second)) {
2017-05-26 04:48:44 +08:00
conflict = true;
break;
}
transactionConflictStatus[t] = conflict;
if (!conflict)
2020-01-31 06:24:37 +08:00
for (int i = 0; i < tr.writeRanges.size(); i++) mcs.set(tr.writeRanges[i].first, tr.writeRanges[i].second);
2017-05-26 04:48:44 +08:00
}
}
void ConflictBatch::GetTooOldTransactions(std::vector<int>& tooOldTransactions) {
2020-01-31 06:24:37 +08:00
for (int i = 0; i < transactionInfo.size(); i++) {
2017-05-26 04:48:44 +08:00
if (transactionInfo[i]->tooOld) {
tooOldTransactions.push_back(i);
}
}
}
2020-01-31 06:24:37 +08:00
void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, std::vector<int>& nonConflicting,
std::vector<int>* tooOldTransactions) {
2017-05-26 04:48:44 +08:00
double t = timer();
2020-01-31 06:24:37 +08:00
sortPoints(points);
// std::sort( combinedReadConflictRanges.begin(), combinedReadConflictRanges.end() );
g_sort += timer() - t;
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
transactionConflictStatus = new bool[transactionCount];
memset(transactionConflictStatus, 0, transactionCount * sizeof(bool));
2017-05-26 04:48:44 +08:00
t = timer();
checkReadConflictRanges();
2020-01-31 06:24:37 +08:00
g_checkRead += timer() - t;
2017-05-26 04:48:44 +08:00
t = timer();
checkIntraBatchConflicts();
2020-01-31 06:24:37 +08:00
g_checkBatch += timer() - t;
2017-05-26 04:48:44 +08:00
t = timer();
combineWriteConflictRanges();
2020-01-31 06:24:37 +08:00
g_combine += timer() - t;
2017-05-26 04:48:44 +08:00
t = timer();
mergeWriteConflictRanges(now);
2020-01-31 06:24:37 +08:00
g_merge += timer() - t;
for (int i = 0; i < transactionCount; i++) {
if (!transactionConflictStatus[i]) nonConflicting.push_back(i);
if (tooOldTransactions && transactionInfo[i]->tooOld) tooOldTransactions->push_back(i);
2017-05-26 04:48:44 +08:00
}
delete[] transactionConflictStatus;
t = timer();
if (newOldestVersion > cs->oldestVersion) {
cs->oldestVersion = newOldestVersion;
2020-01-31 06:24:37 +08:00
SkipList::Finger finger;
2017-05-26 04:48:44 +08:00
int temp;
2020-01-31 06:24:37 +08:00
cs->versionHistory.find(&cs->removalKey, &finger, &temp, 1);
cs->versionHistory.removeBefore(cs->oldestVersion, finger, combinedWriteConflictRanges.size() * 3 + 10);
2017-05-26 04:48:44 +08:00
cs->removalKey = finger.getValue();
}
2020-01-31 06:24:37 +08:00
g_removeBefore += timer() - t;
2017-05-26 04:48:44 +08:00
}
void ConflictBatch::checkReadConflictRanges() {
2020-01-31 06:24:37 +08:00
if (!combinedReadConflictRanges.size()) return;
2017-05-26 04:48:44 +08:00
if (PARALLEL_THREAD_COUNT) {
2020-01-31 06:24:37 +08:00
Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1];
for (int t = 0; t < PARALLEL_THREAD_COUNT; t++) {
cs->worker_nextAction[t] = action([&, t] {
2017-05-26 04:48:44 +08:00
#pragma GCC diagnostic push
2020-01-31 06:24:37 +08:00
DISABLE_ZERO_DIVISION_FLAG
auto begin =
&combinedReadConflictRanges[0] + t * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT;
auto end = &combinedReadConflictRanges[0] +
(t + 1) * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT;
2017-05-26 04:48:44 +08:00
#pragma GCC diagnostic pop
2020-01-31 06:24:37 +08:00
cs->versionHistory.detectConflicts(begin, end - begin, transactionConflictStatus);
2017-05-26 04:48:44 +08:00
done[t].set();
});
cs->worker_ready[t]->set();
}
2020-01-31 06:24:37 +08:00
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block();
2017-05-26 04:48:44 +08:00
} else {
2020-01-31 06:24:37 +08:00
cs->versionHistory.detectConflicts(&combinedReadConflictRanges[0], combinedReadConflictRanges.size(),
transactionConflictStatus);
2017-05-26 04:48:44 +08:00
}
}
2020-01-31 06:24:37 +08:00
void ConflictBatch::addConflictRanges(Version now, std::vector<std::pair<StringRef, StringRef>>::iterator begin,
std::vector<std::pair<StringRef, StringRef>>::iterator end, SkipList* part) {
int count = end - begin;
2017-05-26 04:48:44 +08:00
#if 0
//for(auto w = begin; w != end; ++w)
for(auto w = end-1; w != begin-1; --w)
part->addConflictRange( w->first, w->second, now );
#else
2020-01-31 06:24:37 +08:00
static_assert(sizeof(begin[0]) == sizeof(StringRef) * 2,
"Write Conflict Range type not convertible to two StringPtrs");
const StringRef* strings = reinterpret_cast<const StringRef*>(&*begin);
int stringCount = count * 2;
2017-05-26 04:48:44 +08:00
static const int stripeSize = 16;
2020-01-31 06:24:37 +08:00
SkipList::Finger fingers[stripeSize];
int temp[stripeSize];
int stripes = (stringCount + stripeSize - 1) / stripeSize;
int ss = stringCount - (stripes - 1) * stripeSize;
for (int s = stripes - 1; s >= 0; s--) {
part->find(&strings[s * stripeSize], &fingers[0], temp, ss);
part->addConflictRanges(&fingers[0], ss / 2, now);
2017-05-26 04:48:44 +08:00
ss = stripeSize;
}
#endif
}
void ConflictBatch::mergeWriteConflictRanges(Version now) {
2020-01-31 06:24:37 +08:00
if (!combinedWriteConflictRanges.size()) return;
2017-05-26 04:48:44 +08:00
if (PARALLEL_THREAD_COUNT) {
std::vector<SkipList> parts;
2020-01-31 06:24:37 +08:00
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) parts.emplace_back();
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
std::vector<StringRef> splits(parts.size() - 1);
for (int s = 0; s < splits.size(); s++)
splits[s] = combinedWriteConflictRanges[(s + 1) * combinedWriteConflictRanges.size() / parts.size()].first;
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
cs->versionHistory.partition(splits.size() ? &splits[0] : NULL, splits.size(), &parts[0]);
std::vector<double> tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT);
2017-05-26 04:48:44 +08:00
Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1];
double before = timer();
2020-01-31 06:24:37 +08:00
for (int t = 0; t < parts.size(); t++) {
cs->worker_nextAction[t] = action([&, t] {
2017-05-26 04:48:44 +08:00
tstart[t] = timer();
2020-01-31 06:24:37 +08:00
auto begin =
combinedWriteConflictRanges.begin() + (t * combinedWriteConflictRanges.size() / parts.size());
auto end =
combinedWriteConflictRanges.begin() + ((t + 1) * combinedWriteConflictRanges.size() / parts.size());
2017-05-26 04:48:44 +08:00
addConflictRanges(now, begin, end, &parts[t]);
tend[t] = timer();
done[t].set();
});
cs->worker_ready[t]->set();
}
double launch = timer();
2020-01-31 06:24:37 +08:00
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block();
2017-05-26 04:48:44 +08:00
double after = timer();
2020-01-31 06:24:37 +08:00
g_merge_launch += launch - before;
// g_merge_start_var += *std::max_element(tstart.begin(), tstart.end()) - before;
2017-05-26 04:48:44 +08:00
g_merge_fork += *std::min_element(tstart.begin(), tstart.end()) - before;
2020-01-31 06:24:37 +08:00
g_merge_start_var +=
*std::max_element(tstart.begin(), tstart.end()) - *std::min_element(tstart.begin(), tstart.end());
2017-05-26 04:48:44 +08:00
g_merge_end_var += *std::max_element(tend.begin(), tend.end()) - *std::min_element(tend.begin(), tend.end());
g_merge_join += after - *std::max_element(tend.begin(), tend.end());
double run_max = 0, run_min = 1e9;
2020-01-31 06:24:37 +08:00
for (int i = 0; i < tend.size(); i++) {
run_max = max(run_max, tend[i] - tstart[i]);
run_min = min(run_min, tend[i] - tstart[i]);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
g_merge_run_var += run_max - run_min;
2017-05-26 04:48:44 +08:00
g_merge_run_shortest += run_min;
g_merge_run_longest += run_max;
2020-01-31 06:24:37 +08:00
g_merge_run_total +=
std::accumulate(tend.begin(), tend.end(), 0.0) - std::accumulate(tstart.begin(), tstart.end(), 0.0);
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
cs->versionHistory.concatenate(&parts[0], parts.size());
2017-05-26 04:48:44 +08:00
} else {
2020-01-31 06:24:37 +08:00
addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(),
&cs->versionHistory);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
// for(auto w = combinedWriteConflictRanges.begin(); w != combinedWriteConflictRanges.end(); ++w)
2017-05-26 04:48:44 +08:00
// versionHistory.addConflictRange( w->first.begin(), w->first.size(), w->second.begin(), w->second.size(), now );
}
2020-01-31 06:24:37 +08:00
void ConflictBatch::combineWriteConflictRanges() {
2017-05-26 04:48:44 +08:00
int activeWriteCount = 0;
2020-01-31 06:24:37 +08:00
for (int i = 0; i < points.size(); i++) {
2017-05-26 04:48:44 +08:00
KeyInfo& point = points[i];
2020-01-31 06:24:37 +08:00
if (point.write && !transactionConflictStatus[point.transaction]) {
2017-05-26 04:48:44 +08:00
if (point.begin) {
2020-01-31 06:24:37 +08:00
activeWriteCount++;
if (activeWriteCount == 1) combinedWriteConflictRanges.emplace_back(point.key, KeyRef());
2017-05-26 04:48:44 +08:00
} else /*if (point.end)*/ {
activeWriteCount--;
2020-01-31 06:24:37 +08:00
if (activeWriteCount == 0) combinedWriteConflictRanges.back().second = point.key;
2017-05-26 04:48:44 +08:00
}
}
}
}
2020-01-31 06:24:37 +08:00
// void showNumaStatus();
2017-05-26 04:48:44 +08:00
/*
bool sse4Less( const uint8_t* a, int aLen, const uint8_t* b, int bLen ) {
2020-01-31 06:24:37 +08:00
while (true) {
int res = _mm_cmpestri(*(__m128i*)a, aLen, *(__m128i*)b, bLen, _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_EACH |
_SIDD_NEGATIVE_POLARITY | _SIDD_LEAST_SIGNIFICANT ); printf("%d ", res); if (res == 16) { if (bLen < 16) return false;
a += 16; b += 16; aLen -= 16; bLen -= 16;
}
if (res == bLen) return false;
if (res == aLen) return true;
return a[res] < b[res];
}
2017-05-26 04:48:44 +08:00
}
void tless( const char* a, const char* b ) {
2020-01-31 06:24:37 +08:00
bool x = sse4Less( (const uint8_t*)a, strlen(a), (const uint8_t*)b, strlen(b) );
if (x)
printf("%s < %s\n", a, b);
else
printf("%s >= %s\n", a, b);
2017-05-26 04:48:44 +08:00
}
void sse4Test(){
2020-01-31 06:24:37 +08:00
tless("hello", "world");
tless("a", "a");
tless("world", "hello");
tless("world", "worry");
tless("worry", "world");
tless("hello", "hello1");
tless("hello1", "hello");
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld");
tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello");
tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworry");
tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworry", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld");
tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello1");
tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello");
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
char *a = "hello1worldthisisalonglonglongstring";
char *b = "hello";
__m128i aa = *(__m128i*)a;
__m128i bb = *(__m128i*)a;
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
int res = _mm_cmpestri(aa, 2, bb, 2, _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_EACH | _SIDD_NEGATIVE_POLARITY |
_SIDD_LEAST_SIGNIFICANT );
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
cout << res << endl;
2017-05-26 04:48:44 +08:00
}
*/
void miniConflictSetTest() {
2020-01-31 06:24:37 +08:00
for (int i = 0; i < 2000000; i++) {
int size = 64 * 5; // Also run 64*64*5 to test multiple words of andValues and orValues
2017-05-26 04:48:44 +08:00
MiniConflictSet mini(size);
2020-01-31 06:24:37 +08:00
for (int j = 0; j < 2; j++) {
int a = deterministicRandom()->randomInt(0, size);
int b = deterministicRandom()->randomInt(a, size);
2020-01-31 06:24:37 +08:00
mini.set(a, b);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
for (int j = 0; j < 4; j++) {
int a = deterministicRandom()->randomInt(0, size);
int b = deterministicRandom()->randomInt(a, size);
2020-01-31 06:24:37 +08:00
mini.any(a, b); // Tests correctness internally
2017-05-26 04:48:44 +08:00
}
}
printf("miniConflictSetTest complete\n");
}
void skipListTest() {
printf("Skip list test\n");
2020-01-31 06:24:37 +08:00
// sse4Test();
2017-05-26 04:48:44 +08:00
2020-01-31 06:24:37 +08:00
// A test case that breaks the old operator<
// KeyInfo a( LiteralStringRef("hello"), true, false, true, -1 );
// KeyInfo b( LiteralStringRef("hello\0"), false, false, false, 0 );
2017-05-26 04:48:44 +08:00
miniConflictSetTest();
setAffinity(0);
2020-01-31 06:24:37 +08:00
// showNumaStatus();
2017-05-26 04:48:44 +08:00
double start;
ConflictSet* cs = newConflictSet();
Arena testDataArena;
2020-01-31 06:24:37 +08:00
VectorRef<VectorRef<KeyRangeRef>> testData;
2017-05-26 04:48:44 +08:00
testData.resize(testDataArena, 500);
2020-01-31 06:24:37 +08:00
std::vector<std::vector<uint8_t>> success(testData.size());
std::vector<std::vector<uint8_t>> success2(testData.size());
for (int i = 0; i < testData.size(); i++) {
2017-05-26 04:48:44 +08:00
testData[i].resize(testDataArena, 5000);
2020-01-31 06:24:37 +08:00
success[i].assign(testData[i].size(), false);
success2[i].assign(testData[i].size(), false);
for (int j = 0; j < testData[i].size(); j++) {
int key = deterministicRandom()->randomInt(0, 20000000);
int key2 = key + 1 + deterministicRandom()->randomInt(0, 10);
2020-01-31 06:24:37 +08:00
testData[i][j] = KeyRangeRef(setK(testDataArena, key), setK(testDataArena, key2));
2017-05-26 04:48:44 +08:00
}
}
2020-01-31 06:24:37 +08:00
printf("Test data generated (%d)\n", deterministicRandom()->randomInt(0, 100000));
2017-05-26 04:48:44 +08:00
printf(" %d threads, %d batches, %d/batch\n", PARALLEL_THREAD_COUNT, testData.size(), testData[0].size());
printf("Running\n");
int readCount = 1, writeCount = 1;
int cranges = 0, tcount = 0;
start = timer();
2020-01-31 06:24:37 +08:00
std::vector<std::vector<int>> nonConflict(testData.size());
for (int i = 0; i < testData.size(); i++) {
2017-05-26 04:48:44 +08:00
Arena buf;
std::vector<CommitTransactionRef> trs;
2017-05-26 04:48:44 +08:00
double t = timer();
2020-01-31 06:24:37 +08:00
for (int j = 0; j + readCount + writeCount <= testData[i].size(); j += readCount + writeCount) {
2017-05-26 04:48:44 +08:00
CommitTransactionRef tr;
2020-01-31 06:24:37 +08:00
for (int k = 0; k < readCount; k++) {
KeyRangeRef r(buf, testData[i][j + k]);
tr.read_conflict_ranges.push_back(buf, r);
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
for (int k = 0; k < writeCount; k++) {
KeyRangeRef r(buf, testData[i][j + readCount + k]);
tr.write_conflict_ranges.push_back(buf, r);
2017-05-26 04:48:44 +08:00
}
cranges += tr.read_conflict_ranges.size() + tr.write_conflict_ranges.size();
tr.read_snapshot = i;
trs.push_back(tr);
}
tcount += trs.size();
2020-01-31 06:24:37 +08:00
g_buildTest += timer() - t;
2017-05-26 04:48:44 +08:00
t = timer();
2020-01-31 06:24:37 +08:00
ConflictBatch batch(cs);
for (int j = 0; j < trs.size(); j++) batch.addTransaction(trs[j]);
g_add += timer() - t;
2017-05-26 04:48:44 +08:00
t = timer();
2020-01-31 06:24:37 +08:00
batch.detectConflicts(i + 50, i, nonConflict[i]);
g_detectConflicts += timer() - t;
2017-05-26 04:48:44 +08:00
}
2020-01-31 06:24:37 +08:00
double elapsed = timer() - start;
2017-05-26 04:48:44 +08:00
printf("New conflict set: %0.3f sec\n", elapsed);
2020-01-31 06:24:37 +08:00
printf(" %0.3f Mtransactions/sec\n", tcount / elapsed / 1e6);
printf(" %0.3f Mkeys/sec\n", cranges * 2 / elapsed / 1e6);
2017-05-26 04:48:44 +08:00
elapsed = g_detectConflicts.getValue();
printf("Detect only: %0.3f sec\n", elapsed);
2020-01-31 06:24:37 +08:00
printf(" %0.3f Mtransactions/sec\n", tcount / elapsed / 1e6);
printf(" %0.3f Mkeys/sec\n", cranges * 2 / elapsed / 1e6);
2017-05-26 04:48:44 +08:00
elapsed = g_checkRead.getValue() + g_merge.getValue();
printf("Skiplist only: %0.3f sec\n", elapsed);
2020-01-31 06:24:37 +08:00
printf(" %0.3f Mtransactions/sec\n", tcount / elapsed / 1e6);
printf(" %0.3f Mkeys/sec\n", cranges * 2 / elapsed / 1e6);
2017-05-26 04:48:44 +08:00
printf("Performance counters:\n");
2020-01-31 06:24:37 +08:00
for (int c = 0; c < skc.size(); c++) {
2017-05-26 04:48:44 +08:00
printf("%20s: %s\n", skc[c]->getMetric().name().c_str(), skc[c]->getMetric().formatted().c_str());
}
2020-01-31 06:24:37 +08:00
// showNumaStatus();
2017-05-26 04:48:44 +08:00
printf("%d entries in version history\n", cs->versionHistory.count());
/*start = timer();
vector<vector<int>> nonConflict2( testData.size() );
SlowConflictSet scs;
Standalone<VectorRef<KeyRangeRef>> ranges;
ranges.resize( ranges.arena(), 1 );
for(int i=0; i<testData.size(); i++) {
2020-01-31 06:24:37 +08:00
for(int j=0; j<testData[i].size(); j++) {
ranges[0] = testData[i][j];
if (!scs.is_conflict( ranges, i )) {
nonConflict2[i].push_back( j );
scs.add( ranges, VectorRef<KeyValueRef>(), i + 50 );
}
}
2017-05-26 04:48:44 +08:00
}
printf("Old conflict set: %0.3f sec\n", timer()-start);
int aminusb=0, bminusa=0, atotal=0;
for(int i=0; i<testData.size(); i++) {
2020-01-31 06:24:37 +08:00
vector<bool> a( testData[i].size() ), b( testData[i].size() );
for(int j=0; j<nonConflict[i].size(); j++)
a[ nonConflict[i][j] ] = true;
for(int j=0; j<nonConflict2[i].size(); j++)
b[ nonConflict2[i][j] ] = true;
for(int j=0; j<a.size(); j++) {
if (a[j]) atotal++;
if (a[j] && !b[j]) aminusb++;
else if (b[j] && !a[j]) bminusa++;
}
2017-05-26 04:48:44 +08:00
}
printf("%d transactions accepted\n", atotal);
if (bminusa)
2020-01-31 06:24:37 +08:00
printf("ERROR: %d transactions unnecessarily rejected!\n", bminusa);
2017-05-26 04:48:44 +08:00
if (aminusb)
2020-01-31 06:24:37 +08:00
printf("ERROR: %d transactions incorrectly accepted!\n", aminusb);
*/
// for(int i=0; i<testData.size(); i++)
// printf("%d %d %d %d\n", i, nonConflict[i].size(), nonConflict2[i].size()-nonConflict[i].size(), nonConflict[i]
//!= nonConflict2[i]);
2017-05-26 04:48:44 +08:00
}