Merge branch 'master' into ratekeeper-batch-priority-limits
This commit is contained in:
commit
e2bcecb08f
|
@ -21,7 +21,7 @@ project(foundationdb
|
|||
VERSION 6.1.0
|
||||
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
|
||||
HOMEPAGE_URL "http://www.foundationdb.org/"
|
||||
LANGUAGES ASM C CXX)
|
||||
LANGUAGES C CXX ASM)
|
||||
|
||||
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${PROJECT_SOURCE_DIR}/cmake")
|
||||
message (STATUS "${PROJECT_SOURCE_DIR} ${PROJECT_BINARY_DIR}")
|
||||
|
|
24
README.md
24
README.md
|
@ -73,9 +73,10 @@ based build system.
|
|||
|
||||
To build with CMake, generally the following is required (works on Linux and
|
||||
Mac OS - for Windows see below):
|
||||
|
||||
1. Check out this repository.
|
||||
2. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
|
||||
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.52.0/).
|
||||
1. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
|
||||
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.67.0/).
|
||||
1. Unpack boost (you don't need to compile it)
|
||||
1. Install [Mono](http://www.mono-project.com/download/stable/).
|
||||
1. Install a [JDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html). FoundationDB currently builds with Java 8.
|
||||
|
@ -135,7 +136,7 @@ edit and navigation features your IDE supports.
|
|||
For example, if you want to use XCode to make changes to FoundationDB you can
|
||||
create a XCode-project with the following command:
|
||||
|
||||
```
|
||||
```sh
|
||||
cmake -G Xcode -DOPEN_FOR_IDE=ON <FDB_SOURCE_DIRECTORY>
|
||||
```
|
||||
|
||||
|
@ -165,14 +166,14 @@ can use [Hombrew](https://brew.sh/). LibreSSL will not be installed in
|
|||
`/usr/local` instead it will stay in `/usr/local/Cellar`. So the cmake command
|
||||
will look somethink like this:
|
||||
|
||||
```
|
||||
```sh
|
||||
cmake -DLibreSSL_ROOT=/usr/local/Cellar/libressl/2.8.3 <PATH_TO_FOUNDATIONDB_SOURCE>
|
||||
```
|
||||
|
||||
To generate a installable package, you have to call CMake with the corresponding
|
||||
arguments and then use cpack to generate the package:
|
||||
|
||||
```
|
||||
```sh
|
||||
cmake -DINSTALL_LAYOUT=OSX <FDB_SOURCE_DIR>
|
||||
make
|
||||
cpack
|
||||
|
@ -184,13 +185,13 @@ Under Windows, the build instructions are very similar, with the main difference
|
|||
that Visual Studio is used to compile.
|
||||
|
||||
1. Install Visual Studio 2017 (Community Edition is tested)
|
||||
2. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
|
||||
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.52.0/).
|
||||
1. Install cmake Version 3.12 or higher [CMake](https://cmake.org/)
|
||||
1. Download version 1.67 of [Boost](https://sourceforge.net/projects/boost/files/boost/1.67.0/).
|
||||
1. Unpack boost (you don't need to compile it)
|
||||
1. Install [Mono](http://www.mono-project.com/download/stable/).
|
||||
1. Install a [JDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html). FoundationDB currently builds with Java 8.
|
||||
1. Set JAVA_HOME to the unpacked location and JAVA_COMPILE to
|
||||
$JAVA_HOME/bin/javac
|
||||
1. Set `JAVA_HOME` to the unpacked location and JAVA_COMPILE to
|
||||
`$JAVA_HOME/bin/javac`.
|
||||
1. (Optional) Install [WIX](http://wixtoolset.org/). Without it Visual Studio
|
||||
won't build the Windows installer.
|
||||
1. Create a build directory (you can have the build directory anywhere you
|
||||
|
@ -198,10 +199,11 @@ that Visual Studio is used to compile.
|
|||
1. `cd build`
|
||||
1. `cmake -G "Visual Studio 15 2017 Win64" -DBOOST_ROOT=<PATH_TO_BOOST> <PATH_TO_FOUNDATIONDB_DIRECTORY>`
|
||||
1. This should succeed. In which case you can build using msbuild:
|
||||
`msbuild /p:Configuration=Release fdb.sln`. You can also open the resulting
|
||||
`msbuild /p:Configuration=Release foundationdb.sln`. You can also open the resulting
|
||||
solution in Visual Studio and compile from there. However, be aware that
|
||||
using Visual Studio for development is currently not supported as Visual
|
||||
Studio will only know about the generated files.
|
||||
Studio will only know about the generated files. `msbuild` is located at
|
||||
`c:\Program Files (x86)\MSBuild\14.0\Bin\MSBuild.exe` for Visual Studio 15.
|
||||
|
||||
If you want TLS support to be enabled under Windows you currently have to build
|
||||
and install LibreSSL yourself as the newer LibreSSL versions are not provided
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
FROM ubuntu:15.04
|
||||
LABEL version=0.0.5
|
||||
LABEL version=0.0.6
|
||||
|
||||
RUN sed -i -e 's/archive.ubuntu.com\|security.ubuntu.com/old-releases.ubuntu.com/g' -e 's/us\.old/old/g' /etc/apt/sources.list && apt-get clean
|
||||
|
||||
RUN apt-get update && apt-get --no-install-recommends install -y --force-yes bzip2 ca-certificates=20141019 adduser apt base-files base-passwd bash binutils build-essential cpp cpp-4.9 dpkg dos2unix fakeroot findutils g++=4:4.9.2-2ubuntu2 g++-4.9=4.9.2-10ubuntu13 gawk=1:4.1.1+dfsg-1 gcc-5-base gcc=4:4.9.2-2ubuntu2 gcc-4.9=4.9.2-10ubuntu13 gcc-4.9-base:amd64=4.9.2-10ubuntu13 gcc-5-base:amd64=5.1~rc1-0ubuntu1 gdb git golang golang-go golang-go-linux-amd64 golang-src grep gzip hostname java-common libasan1 liblsan0 libtsan0 libubsan0 libcilkrts5 libgcc-4.9-dev libstdc++-4.9-dev libgl1-mesa-dri libgl1-mesa-glx libmono-system-xml-linq4.0-cil libmono-system-data-datasetextensions4.0-cil libstdc++-4.9-pic locales login m4 make makedev mawk mono-dmcs npm openjdk-8-jdk passwd python-distlib python-gevent python-greenlet python-html5lib python-minimal python-pip python-pkg-resources python-requests python-setuptools python-six python-urllib3 python-yaml python2.7 python2.7-minimal rpm rpm2cpio ruby ruby2.1 rubygems-integration sed tar texinfo tzdata-java udev unzip util-linux valgrind vim wget golang-go.tools curl sphinx-common gnupg python-dev python3 python3-dev
|
||||
RUN apt-get update && apt-get --no-install-recommends install -y --force-yes bzip2 ca-certificates=20141019 adduser apt base-files base-passwd bash binutils build-essential cpp cpp-4.9 dpkg dos2unix elfutils fakeroot findutils g++=4:4.9.2-2ubuntu2 g++-4.9=4.9.2-10ubuntu13 gawk=1:4.1.1+dfsg-1 gcc-5-base gcc=4:4.9.2-2ubuntu2 gcc-4.9=4.9.2-10ubuntu13 gcc-4.9-base:amd64=4.9.2-10ubuntu13 gcc-5-base:amd64=5.1~rc1-0ubuntu1 gdb git golang golang-go golang-go-linux-amd64 golang-src grep gzip hostname java-common libasan1 liblsan0 libtsan0 libubsan0 libcilkrts5 libgcc-4.9-dev libstdc++-4.9-dev libgl1-mesa-dri libgl1-mesa-glx libmono-system-xml-linq4.0-cil libmono-system-data-datasetextensions4.0-cil libstdc++-4.9-pic locales login m4 make makedev mawk mono-dmcs npm openjdk-8-jdk passwd python-distlib python-gevent python-greenlet python-html5lib python-minimal python-pip python-pkg-resources python-requests python-setuptools python-six python-urllib3 python-yaml python2.7 python2.7-minimal rpm rpm2cpio ruby ruby2.1 rubygems-integration sed tar texinfo tzdata-java udev unzip util-linux valgrind vim wget golang-go.tools curl sphinx-common gnupg python-dev python3 python3-dev
|
||||
|
||||
RUN adduser --disabled-password --gecos '' fdb && chown -R fdb /opt && chmod -R 0777 /opt
|
||||
|
||||
|
|
|
@ -381,8 +381,16 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
|
|||
else if (ck == LiteralStringRef("proxies")) parse(&masterProxyCount, value);
|
||||
else if (ck == LiteralStringRef("resolvers")) parse(&resolverCount, value);
|
||||
else if (ck == LiteralStringRef("logs")) parse(&desiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("log_replicas")) parse(&tLogReplicationFactor, value);
|
||||
else if (ck == LiteralStringRef("log_anti_quorum")) parse(&tLogWriteAntiQuorum, value);
|
||||
else if (ck == LiteralStringRef("log_replicas")) {
|
||||
parse(&tLogReplicationFactor, value);
|
||||
tLogWriteAntiQuorum = std::min(tLogWriteAntiQuorum, tLogReplicationFactor/2);
|
||||
}
|
||||
else if (ck == LiteralStringRef("log_anti_quorum")) {
|
||||
parse(&tLogWriteAntiQuorum, value);
|
||||
if(tLogReplicationFactor > 0) {
|
||||
tLogWriteAntiQuorum = std::min(tLogWriteAntiQuorum, tLogReplicationFactor/2);
|
||||
}
|
||||
}
|
||||
else if (ck == LiteralStringRef("storage_replicas")) parse(&storageTeamSize, value);
|
||||
else if (ck == LiteralStringRef("log_version")) {
|
||||
parse((&type), value);
|
||||
|
|
|
@ -115,8 +115,8 @@ public:
|
|||
|
||||
// Client status updater
|
||||
struct ClientStatusUpdater {
|
||||
std::vector<BinaryWriter> inStatusQ;
|
||||
std::vector<BinaryWriter> outStatusQ;
|
||||
std::vector< std::pair<std::string, BinaryWriter> > inStatusQ;
|
||||
std::vector< std::pair<std::string, BinaryWriter> > outStatusQ;
|
||||
Future<Void> actor;
|
||||
};
|
||||
ClientStatusUpdater clientStatusUpdater;
|
||||
|
|
|
@ -186,6 +186,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
|||
}
|
||||
init(CSI_STATUS_DELAY, 10.0 );
|
||||
|
||||
init( CONSISTENCY_CHECK_RATE_LIMIT, 50e6 );
|
||||
init( CONSISTENCY_CHECK_RATE_WINDOW, 1.0 );
|
||||
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 );
|
||||
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
|
||||
init( CONSISTENCY_CHECK_RATE_WINDOW, 1.0 );
|
||||
}
|
||||
|
|
|
@ -176,7 +176,8 @@ public:
|
|||
int BLOBSTORE_MAX_SEND_BYTES_PER_SECOND;
|
||||
int BLOBSTORE_MAX_RECV_BYTES_PER_SECOND;
|
||||
|
||||
int CONSISTENCY_CHECK_RATE_LIMIT;
|
||||
int CONSISTENCY_CHECK_RATE_LIMIT_MAX;
|
||||
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME;
|
||||
int CONSISTENCY_CHECK_RATE_WINDOW;
|
||||
|
||||
ClientKnobs(bool randomize = false);
|
||||
|
|
|
@ -362,15 +362,17 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext *cx) {
|
|||
cx->clientStatusUpdater.inStatusQ.swap(cx->clientStatusUpdater.outStatusQ);
|
||||
// Split Transaction Info into chunks
|
||||
state std::vector<TrInfoChunk> trChunksQ;
|
||||
for (auto &bw : cx->clientStatusUpdater.outStatusQ) {
|
||||
for (auto &entry : cx->clientStatusUpdater.outStatusQ) {
|
||||
auto &bw = entry.second;
|
||||
int64_t value_size_limit = BUGGIFY ? g_random->randomInt(1e3, CLIENT_KNOBS->VALUE_SIZE_LIMIT) : CLIENT_KNOBS->VALUE_SIZE_LIMIT;
|
||||
int num_chunks = (bw.getLength() + value_size_limit - 1) / value_size_limit;
|
||||
std::string random_id = g_random->randomAlphaNumeric(16);
|
||||
std::string user_provided_id = entry.first.size() ? entry.first + "/" : "";
|
||||
for (int i = 0; i < num_chunks; i++) {
|
||||
TrInfoChunk chunk;
|
||||
BinaryWriter chunkBW(Unversioned());
|
||||
chunkBW << bigEndian32(i+1) << bigEndian32(num_chunks);
|
||||
chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toStringRef().toString() + "/" + std::string(4, '\x00'));
|
||||
chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toStringRef().toString() + "/" + user_provided_id + std::string(4, '\x00'));
|
||||
int32_t pos = littleEndian32(clientLatencyName.size());
|
||||
memcpy(mutateString(chunk.key) + chunk.key.size() - sizeof(int32_t), &pos, sizeof(int32_t));
|
||||
if (i == num_chunks - 1) {
|
||||
|
@ -492,7 +494,7 @@ DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), laten
|
|||
|
||||
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo ) {
|
||||
try {
|
||||
state Optional<std::string> incorrectConnectionString;
|
||||
state Optional<double> incorrectTime;
|
||||
loop {
|
||||
OpenDatabaseRequest req;
|
||||
req.knownClientInfoID = outInfo->get().id;
|
||||
|
@ -503,17 +505,19 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
|
|||
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
|
||||
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
|
||||
std::string connectionString = ccf->getConnectionString().toString();
|
||||
if(!incorrectTime.present()) {
|
||||
incorrectTime = now();
|
||||
}
|
||||
if(ccf->canGetFilename()) {
|
||||
// Don't log a SevWarnAlways the first time to account for transient issues (e.g. someone else changing the file right before us)
|
||||
TraceEvent(incorrectConnectionString.present() && incorrectConnectionString.get() == connectionString ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
|
||||
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the file right before us)
|
||||
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
|
||||
.detail("Filename", ccf->getFilename())
|
||||
.detail("ConnectionStringFromFile", fileConnectionString.toString())
|
||||
.detail("CurrentConnectionString", connectionString);
|
||||
}
|
||||
incorrectConnectionString = connectionString;
|
||||
}
|
||||
else {
|
||||
incorrectConnectionString = Optional<std::string>();
|
||||
incorrectTime = Optional<double>();
|
||||
}
|
||||
|
||||
choose {
|
||||
|
@ -1902,7 +1906,7 @@ void Transaction::operator=(Transaction&& r) noexcept(true) {
|
|||
void Transaction::flushTrLogsIfEnabled() {
|
||||
if (trLogInfo && trLogInfo->logsAdded && trLogInfo->trLogWriter.getData()) {
|
||||
ASSERT(trLogInfo->flushed == false);
|
||||
cx->clientStatusUpdater.inStatusQ.push_back(std::move(trLogInfo->trLogWriter));
|
||||
cx->clientStatusUpdater.inStatusQ.push_back({ trLogInfo->identifier, std::move(trLogInfo->trLogWriter) });
|
||||
trLogInfo->flushed = true;
|
||||
}
|
||||
}
|
||||
|
@ -2678,25 +2682,40 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
break;
|
||||
|
||||
case FDBTransactionOptions::TRANSACTION_LOGGING_ENABLE:
|
||||
setOption(FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER, value);
|
||||
setOption(FDBTransactionOptions::LOG_TRANSACTION);
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER:
|
||||
validateOptionValue(value, true);
|
||||
|
||||
if(value.get().size() > 100) {
|
||||
if (value.get().size() > 100) {
|
||||
throw invalid_option_value();
|
||||
}
|
||||
|
||||
if(trLogInfo) {
|
||||
if(!trLogInfo->identifier.present()) {
|
||||
if (trLogInfo) {
|
||||
if (trLogInfo->identifier.empty()) {
|
||||
trLogInfo->identifier = printable(value.get());
|
||||
}
|
||||
else if(trLogInfo->identifier.get() != printable(value.get())) {
|
||||
TraceEvent(SevWarn, "CannotChangeTransactionLoggingIdentifier").detail("PreviousIdentifier", trLogInfo->identifier.get()).detail("NewIdentifier", printable(value.get()));
|
||||
else if (trLogInfo->identifier != printable(value.get())) {
|
||||
TraceEvent(SevWarn, "CannotChangeDebugTransactionIdentifier").detail("PreviousIdentifier", trLogInfo->identifier).detail("NewIdentifier", printable(value.get()));
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
}
|
||||
else {
|
||||
trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(printable(value.get())));
|
||||
trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(printable(value.get()), TransactionLogInfo::DONT_LOG));
|
||||
}
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::LOG_TRANSACTION:
|
||||
validateOptionValue(value, false);
|
||||
if (trLogInfo) {
|
||||
trLogInfo->logTo(TransactionLogInfo::TRACE_LOG);
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarn, "DebugTransactionIdentifierNotSet").detail("Error", "Debug Transaction Identifier option must be set before logging the transaction");
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::MAX_RETRY_DELAY:
|
||||
|
@ -3069,7 +3088,7 @@ Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(cons
|
|||
if(!cx->isError()) {
|
||||
double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : cx->clientInfo->get().clientTxnInfoSampleRate;
|
||||
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) && g_random->random01() < clientSamplingProbability && (!g_network->isSimulated() || !g_simulator.speedUpSimulation)) {
|
||||
return Reference<TransactionLogInfo>(new TransactionLogInfo());
|
||||
return Reference<TransactionLogInfo>(new TransactionLogInfo(TransactionLogInfo::DATABASE));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -171,34 +171,37 @@ struct TransactionInfo {
|
|||
};
|
||||
|
||||
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
|
||||
TransactionLogInfo() : logToDatabase(true) {}
|
||||
TransactionLogInfo(std::string identifier) : logToDatabase(false), identifier(identifier) {}
|
||||
enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 };
|
||||
|
||||
BinaryWriter trLogWriter{ IncludeVersion() };
|
||||
bool logsAdded{ false };
|
||||
bool flushed{ false };
|
||||
TransactionLogInfo() : logLocation(DONT_LOG) {}
|
||||
TransactionLogInfo(LoggingLocation location) : logLocation(location) {}
|
||||
TransactionLogInfo(std::string id, LoggingLocation location) : logLocation(location), identifier(id) {}
|
||||
|
||||
void setIdentifier(std::string id) { identifier = id; }
|
||||
void logTo(LoggingLocation loc) { logLocation = logLocation | loc; }
|
||||
template <typename T>
|
||||
void addLog(const T& event) {
|
||||
ASSERT(logToDatabase || identifier.present());
|
||||
|
||||
if(identifier.present()) {
|
||||
event.logEvent(identifier.get());
|
||||
if(logLocation & TRACE_LOG) {
|
||||
ASSERT(!identifier.empty())
|
||||
event.logEvent(identifier);
|
||||
}
|
||||
|
||||
if (flushed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if(logToDatabase) {
|
||||
if(logLocation & DATABASE) {
|
||||
logsAdded = true;
|
||||
static_assert(std::is_base_of<FdbClientLogEvents::Event, T>::value, "Event should be derived class of FdbClientLogEvents::Event");
|
||||
trLogWriter << event;
|
||||
}
|
||||
}
|
||||
|
||||
bool logToDatabase;
|
||||
Optional<std::string> identifier;
|
||||
BinaryWriter trLogWriter{ IncludeVersion() };
|
||||
bool logsAdded{ false };
|
||||
bool flushed{ false };
|
||||
int logLocation;
|
||||
std::string identifier;
|
||||
};
|
||||
|
||||
struct Watch : public ReferenceCounted<Watch>, NonCopyable {
|
||||
|
|
|
@ -170,7 +170,11 @@ description is not currently required but encouraged.
|
|||
hidden="true" />
|
||||
<Option name="debug_retry_logging" code="401" paramType="String" paramDescription="Optional transaction name" />
|
||||
<Option name="transaction_logging_enable" code="402" paramType="String" paramDescription="String identifier to be used in the logs when tracing this transaction. The identifier must not exceed 100 characters."
|
||||
description="Enables tracing for this transaction and logs results to the client trace logs. Client trace logging must be enabled to get log output." />
|
||||
description="Deprecated" />
|
||||
<Option name="debug_transaction_identifier" code="403" paramType="String" paramDescription="String identifier to be used when tracing or profiling this transaction. The identifier must not exceed 100 characters."
|
||||
description="Sets a client provided identifier for the transaction that will be used in scenarios like tracing or profiling. Client trace logging or transaction profiling must be separately enabled." />
|
||||
<Option name="log_transaction" code="404"
|
||||
description="Enables tracing for this transaction and logs results to the client trace logs. The DEBUG_TRANSACTION_IDENTIFIER option must be set before using this option, and client trace logging must be enabled and to get log output." />
|
||||
<Option name="timeout" code="500"
|
||||
paramType="Int" paramDescription="value in milliseconds of timeout"
|
||||
description="Set a timeout in milliseconds which, when elapsed, will cause the transaction automatically to be cancelled. Valid parameter values are ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and any future uses of the transaction will throw an exception. The transaction can be used again after it is reset. Like all transaction options, a timeout must be reset after a call to onError. This behavior allows the user to make the timeout dynamic." />
|
||||
|
|
|
@ -796,6 +796,8 @@ public:
|
|||
// FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs
|
||||
// to be changed to understand the new intiailizeRecovery protocol.
|
||||
virtual location getNextReadLocation() { return nextReadLocation; }
|
||||
virtual location getNextCommitLocation() { ASSERT( initialized ); return lastCommittedSeq + sizeof(Page); }
|
||||
virtual location getNextPushLocation() { ASSERT( initialized ); return endLocation(); }
|
||||
|
||||
virtual Future<Void> getError() { return rawQueue->getError(); }
|
||||
virtual Future<Void> onClosed() { return rawQueue->onClosed(); }
|
||||
|
@ -1247,6 +1249,9 @@ public:
|
|||
virtual location getNextReadLocation() { return queue->getNextReadLocation(); }
|
||||
|
||||
virtual Future<Standalone<StringRef>> read( location start, location end ) { return queue->read( start, end ); }
|
||||
virtual location getNextCommitLocation() { return queue->getNextCommitLocation(); }
|
||||
virtual location getNextPushLocation() { return queue->getNextPushLocation(); }
|
||||
|
||||
|
||||
virtual location push( StringRef contents ) {
|
||||
pushed = queue->push(contents);
|
||||
|
|
|
@ -34,6 +34,11 @@ public:
|
|||
location(int64_t hi, int64_t lo) : hi(hi), lo(lo) {}
|
||||
operator std::string() { return format("%lld.%lld", hi, lo); } // FIXME: Return a 'HumanReadableDescription' instead of std::string, make TraceEvent::detail accept that (for safety)
|
||||
|
||||
template<class Ar>
|
||||
void serialize_unversioned(Ar& ar) {
|
||||
serializer(ar, hi, lo);
|
||||
}
|
||||
|
||||
bool operator < (location const& r) const {
|
||||
if (hi<r.hi) return true;
|
||||
if (hi>r.hi) return false;
|
||||
|
@ -53,6 +58,8 @@ public:
|
|||
// Thereafter it may not be called again.
|
||||
virtual Future<Standalone<StringRef>> readNext( int bytes ) = 0; // Return the next bytes in the queue (beginning, the first time called, with the first unpopped byte)
|
||||
virtual location getNextReadLocation() = 0; // Returns a location >= the location of all bytes previously returned by readNext(), and <= the location of all bytes subsequently returned
|
||||
virtual location getNextCommitLocation() = 0; // If commit() were to be called, all buffered writes would be written starting at `location`.
|
||||
virtual location getNextPushLocation() = 0; // If push() were to be called, the pushed data would be written starting at `location`.
|
||||
|
||||
virtual Future<Standalone<StringRef>> read( location start, location end ) = 0;
|
||||
virtual location push( StringRef contents ) = 0; // Appends the given bytes to the byte stream. Returns a location token representing the *end* of the contents.
|
||||
|
@ -64,6 +71,10 @@ public:
|
|||
virtual StorageBytes getStorageBytes() = 0;
|
||||
};
|
||||
|
||||
// FIXME: One should be able to use SFINAE to choose between serialize and serialize_unversioned.
|
||||
template <class Ar> void load( Ar& ar, IDiskQueue::location& loc ) { loc.serialize_unversioned(ar); }
|
||||
template <class Ar> void save( Ar& ar, const IDiskQueue::location& loc ) { const_cast<IDiskQueue::location&>(loc).serialize_unversioned(ar); }
|
||||
|
||||
IDiskQueue* openDiskQueue( std::string basename, std::string ext, UID dbgid, int64_t fileSizeWarningLimit = -1); // opens basename+"0."+ext and basename+"1."+ext
|
||||
|
||||
#endif
|
||||
|
|
|
@ -82,6 +82,8 @@ public:
|
|||
virtual Future<bool> initializeRecovery() { return false; }
|
||||
virtual Future<Standalone<StringRef>> readNext( int bytes );
|
||||
virtual IDiskQueue::location getNextReadLocation();
|
||||
virtual IDiskQueue::location getNextCommitLocation() { ASSERT(false); throw internal_error(); }
|
||||
virtual IDiskQueue::location getNextPushLocation() { ASSERT(false); throw internal_error(); }
|
||||
virtual Future<Standalone<StringRef>> read( location start, location end ) { ASSERT(false); throw internal_error(); }
|
||||
virtual IDiskQueue::location push( StringRef contents );
|
||||
virtual void pop( IDiskQueue::location upTo );
|
||||
|
|
|
@ -842,8 +842,9 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
|
|||
break;
|
||||
}
|
||||
} else {
|
||||
set_config("log_version:=3"); // 6.1
|
||||
set_config("log_spill:=2"); // REFERENCE
|
||||
// FIXME: Temporarily disable spill-by-reference.
|
||||
//set_config("log_version:=3"); // 6.1
|
||||
//set_config("log_spill:=2"); // REFERENCE
|
||||
}
|
||||
|
||||
if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) {
|
||||
|
|
|
@ -132,13 +132,18 @@ private:
|
|||
IDiskQueue* queue;
|
||||
UID dbgid;
|
||||
|
||||
void updateVersionSizes( const TLogQueueEntry& result, TLogData* tLog );
|
||||
void updateVersionSizes( const TLogQueueEntry& result, TLogData* tLog, IDiskQueue::location start, IDiskQueue::location end );
|
||||
|
||||
ACTOR static Future<TLogQueueEntry> readNext( TLogQueue* self, TLogData* tLog ) {
|
||||
state TLogQueueEntry result;
|
||||
state int zeroFillSize = 0;
|
||||
|
||||
// FIXME: initializeRecovery should probably be called on its own by the caller of readNext.
|
||||
bool recoveryFinished = wait( self->queue->initializeRecovery() );
|
||||
if (recoveryFinished) throw end_of_stream();
|
||||
|
||||
loop {
|
||||
state IDiskQueue::location startloc = self->queue->getNextReadLocation();
|
||||
Standalone<StringRef> h = wait( self->queue->readNext( sizeof(uint32_t) ) );
|
||||
if (h.size() != sizeof(uint32_t)) {
|
||||
if (h.size()) {
|
||||
|
@ -162,10 +167,12 @@ private:
|
|||
}
|
||||
|
||||
if (e[payloadSize]) {
|
||||
ASSERT( e[payloadSize] == 1 );
|
||||
Arena a = e.arena();
|
||||
ArenaReader ar( a, e.substr(0, payloadSize), IncludeVersion() );
|
||||
ar >> result;
|
||||
self->updateVersionSizes(result, tLog);
|
||||
const IDiskQueue::location endloc = self->queue->getNextReadLocation();
|
||||
self->updateVersionSizes(result, tLog, startloc, endloc);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -192,12 +199,12 @@ static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef( LiteralStringR
|
|||
static const KeyRangeRef persistKnownCommittedVersionKeys = KeyRangeRef( LiteralStringRef( "knownCommitted/" ), LiteralStringRef( "knownCommitted0" ) );
|
||||
static const KeyRangeRef persistLocalityKeys = KeyRangeRef( LiteralStringRef( "Locality/" ), LiteralStringRef( "Locality0" ) );
|
||||
static const KeyRangeRef persistLogRouterTagsKeys = KeyRangeRef( LiteralStringRef( "LogRouterTags/" ), LiteralStringRef( "LogRouterTags0" ) );
|
||||
static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/"));
|
||||
static const KeyRange persistTagMessageRefsKeys = prefixRange(LiteralStringRef("TagMsgRef/"));
|
||||
static const KeyRange persistTagPoppedKeys = prefixRange(LiteralStringRef("TagPop/"));
|
||||
|
||||
static Key persistTagMessagesKey( UID id, Tag tag, Version version ) {
|
||||
static Key persistTagMessageRefsKey( UID id, Tag tag, Version version ) {
|
||||
BinaryWriter wr( Unversioned() );
|
||||
wr.serializeBytes(persistTagMessagesKeys.begin);
|
||||
wr.serializeBytes(persistTagMessageRefsKeys.begin);
|
||||
wr << id;
|
||||
wr << tag;
|
||||
wr << bigEndian64( version );
|
||||
|
@ -227,12 +234,12 @@ static Version decodeTagPoppedValue( ValueRef value ) {
|
|||
return BinaryReader::fromStringRef<Version>( value, Unversioned() );
|
||||
}
|
||||
|
||||
static StringRef stripTagMessagesKey( StringRef key ) {
|
||||
return key.substr( sizeof(UID) + sizeof(Tag) + persistTagMessagesKeys.begin.size() );
|
||||
static StringRef stripTagMessageRefsKey( StringRef key ) {
|
||||
return key.substr( sizeof(UID) + sizeof(Tag) + persistTagMessageRefsKeys.begin.size() );
|
||||
}
|
||||
|
||||
static Version decodeTagMessagesKey( StringRef key ) {
|
||||
return bigEndian64( BinaryReader::fromStringRef<Version>( stripTagMessagesKey(key), Unversioned() ) );
|
||||
static Version decodeTagMessageRefsKey( StringRef key ) {
|
||||
return bigEndian64( BinaryReader::fromStringRef<Version>( stripTagMessageRefsKey(key), Unversioned() ) );
|
||||
}
|
||||
|
||||
struct TLogData : NonCopyable {
|
||||
|
@ -339,7 +346,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
}
|
||||
};
|
||||
|
||||
Map<Version, IDiskQueue::location> versionLocation; // For the version of each entry that was push()ed, the end location of the serialized bytes
|
||||
Map<Version, std::pair<IDiskQueue::location, IDiskQueue::location>> versionLocation; // For the version of each entry that was push()ed, the [start, end) location of the serialized bytes
|
||||
|
||||
/*
|
||||
Popped version tracking contract needed by log system to implement ILogCursor::popped():
|
||||
|
@ -459,7 +466,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLocalityKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistLogRouterTagsKeys.begin)) );
|
||||
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin)) );
|
||||
Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin);
|
||||
Key msgKey = logIdKey.withPrefix(persistTagMessageRefsKeys.begin);
|
||||
tLogData->persistentData->clear( KeyRangeRef( msgKey, strinc(msgKey) ) );
|
||||
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
|
||||
tLogData->persistentData->clear( KeyRangeRef( poppedKey, strinc(poppedKey) ) );
|
||||
|
@ -477,10 +484,15 @@ void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
|
|||
wr << qe;
|
||||
wr << uint8_t(1);
|
||||
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);
|
||||
auto loc = queue->push( wr.toStringRef() );
|
||||
const IDiskQueue::location startloc = queue->getNextPushLocation();
|
||||
// FIXME: push shouldn't return anything. We should call getNextPushLocation() again.
|
||||
const IDiskQueue::location endloc = queue->push( wr.toStringRef() );
|
||||
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc);
|
||||
logData->versionLocation[qe.version] = loc;
|
||||
logData->versionLocation[qe.version] = std::make_pair(startloc, endloc);
|
||||
}
|
||||
|
||||
// FIXME: Split pop into forgetBefore and pop, and maintain the poppedLocation per tag.
|
||||
// There's no need for us to remember spilled version locations, as we're spilling that information also.
|
||||
void TLogQueue::pop( Version upTo, Reference<LogData> logData ) {
|
||||
// Keep only the given and all subsequent version numbers
|
||||
// Find the first version >= upTo
|
||||
|
@ -494,13 +506,14 @@ void TLogQueue::pop( Version upTo, Reference<LogData> logData ) {
|
|||
v.decrementNonEnd();
|
||||
}
|
||||
|
||||
queue->pop( v->value );
|
||||
queue->pop( v->value.second );
|
||||
logData->versionLocation.erase( logData->versionLocation.begin(), v ); // ... and then we erase that previous version and all prior versions
|
||||
}
|
||||
void TLogQueue::updateVersionSizes( const TLogQueueEntry& result, TLogData* tLog ) {
|
||||
void TLogQueue::updateVersionSizes( const TLogQueueEntry& result, TLogData* tLog,
|
||||
IDiskQueue::location start, IDiskQueue::location end) {
|
||||
auto it = tLog->id_data.find(result.id);
|
||||
if(it != tLog->id_data.end()) {
|
||||
it->second->versionLocation[result.version] = queue->getNextReadLocation();
|
||||
it->second->versionLocation[result.version] = std::make_pair(start, end);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -541,12 +554,53 @@ void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Referen
|
|||
if (data->nothingPersistent) return;
|
||||
|
||||
self->persistentData->clear( KeyRangeRef(
|
||||
persistTagMessagesKey( logData->logId, data->tag, Version(0) ),
|
||||
persistTagMessagesKey( logData->logId, data->tag, data->popped ) ) );
|
||||
persistTagMessageRefsKey( logData->logId, data->tag, Version(0) ),
|
||||
persistTagMessageRefsKey( logData->logId, data->tag, data->popped ) ) );
|
||||
if (data->popped > logData->persistentDataVersion)
|
||||
data->nothingPersistent = true;
|
||||
}
|
||||
|
||||
struct VerifyState {
|
||||
std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> locations;
|
||||
std::vector<Version> versions;
|
||||
std::vector<Future<Standalone<StringRef>>> readfutures;
|
||||
};
|
||||
|
||||
ACTOR void verifyPersistentData( TLogData* self, VerifyState* vs ) {
|
||||
for (auto iter = vs->locations.begin(); iter != vs->locations.end(); iter++) {
|
||||
vs->readfutures.push_back( self->rawPersistentQueue->read( iter->first, iter->second.lo ) );
|
||||
}
|
||||
try {
|
||||
wait( waitForAll(vs->readfutures) );
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_io_error) {
|
||||
delete vs;
|
||||
throw;
|
||||
} else {
|
||||
delete vs;
|
||||
return;
|
||||
}
|
||||
}
|
||||
state int i = 0;
|
||||
for (; i < vs->readfutures.size(); i++) {
|
||||
state TLogQueueEntry entry;
|
||||
state StringRef rawdata = vs->readfutures[i].get();
|
||||
state uint32_t length = *(uint32_t*)rawdata.begin();
|
||||
state uint8_t valid;
|
||||
rawdata = rawdata.substr( 4, rawdata.size() - 4);
|
||||
try {
|
||||
BinaryReader rd( rawdata, IncludeVersion() );
|
||||
rd >> entry >> valid;
|
||||
} catch (Error& e) {
|
||||
ASSERT(false);
|
||||
}
|
||||
// ASSERT( length == rawdata.size() );
|
||||
ASSERT( entry.version == vs->versions[i] );
|
||||
ASSERT( valid == 1 );
|
||||
}
|
||||
delete vs;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logData, Version newPersistentDataVersion ) {
|
||||
// PERSIST: Changes self->persistentDataVersion and writes and commits the relevant changes
|
||||
ASSERT( newPersistentDataVersion <= logData->version.get() );
|
||||
|
@ -561,12 +615,17 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
// For all existing tags
|
||||
state int tagLocality = 0;
|
||||
state int tagId = 0;
|
||||
state VerifyState *vs = nullptr;
|
||||
if (g_network->isSimulated()) {
|
||||
vs = new VerifyState;
|
||||
}
|
||||
|
||||
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
state Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
|
||||
if(tagData) {
|
||||
state Version currentVersion = 0;
|
||||
state Version lastVersion = 0;
|
||||
// Clear recently popped versions from persistentData if necessary
|
||||
updatePersistentPopped( self, logData, tagData );
|
||||
// Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData
|
||||
|
@ -577,10 +636,19 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
tagData->nothingPersistent = false;
|
||||
BinaryWriter wr( Unversioned() );
|
||||
|
||||
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg)
|
||||
wr << msg->second.toStringRef();
|
||||
const IDiskQueue::location begin = logData->versionLocation[currentVersion].first;
|
||||
const IDiskQueue::location end = logData->versionLocation[currentVersion].second;
|
||||
wr << begin << end;
|
||||
self->persistentData->set( KeyValueRef( persistTagMessageRefsKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
|
||||
|
||||
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
|
||||
if (vs && (vs->versions.empty() || vs->versions.back() != currentVersion)) {
|
||||
vs->versions.push_back( currentVersion );
|
||||
vs->locations.push_back( std::make_pair( begin, end ) );
|
||||
}
|
||||
|
||||
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) {
|
||||
// Fast forward until we find a new version.
|
||||
}
|
||||
|
||||
Future<Void> f = yield(TaskUpdateStorage);
|
||||
if(!f.isReady()) {
|
||||
|
@ -594,6 +662,11 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME remove temporary verification
|
||||
if (vs) {
|
||||
verifyPersistentData( self, vs );
|
||||
}
|
||||
|
||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()) ) );
|
||||
self->persistentData->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
|
||||
logData->persistentDataVersion = newPersistentDataVersion;
|
||||
|
@ -634,8 +707,21 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
ASSERT(logData->bytesDurable.getValue() <= logData->bytesInput.getValue());
|
||||
ASSERT(self->bytesDurable <= self->bytesInput);
|
||||
|
||||
if( self->queueCommitEnd.get() > 0 )
|
||||
self->persistentQueue->pop( newPersistentDataVersion+1, logData ); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions. Should we limit the number of versions cleared at a time?
|
||||
if( self->queueCommitEnd.get() > 0 ) {
|
||||
// FIXME: Maintain a heap of tags ordered by version to make this O(1) instead of O(n).
|
||||
Version minVersion = std::numeric_limits<Version>::max();
|
||||
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
|
||||
if (tagData) {
|
||||
minVersion = std::min(minVersion, tagData->popped);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (minVersion != std::numeric_limits<Version>::max()) {
|
||||
self->persistentQueue->pop( minVersion, logData ); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions. Should we limit the number of versions cleared at a time?
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -931,6 +1017,42 @@ void peekMessagesFromMemory( Reference<LogData> self, TLogPeekRequest const& req
|
|||
}
|
||||
}
|
||||
|
||||
std::vector<StringRef> parseMessagesForTag( StringRef commitBlob, Tag tag, int logRouters ) {
|
||||
// See the comment in LogSystem.cpp for the binary format of commitBlob.
|
||||
std::vector<StringRef> relevantMessages;
|
||||
BinaryReader rd(commitBlob, AssumeVersion(currentProtocolVersion));
|
||||
while (!rd.empty()) {
|
||||
uint32_t messageLength = 0;
|
||||
uint32_t subsequence = 0;
|
||||
uint16_t tagCount = 0;
|
||||
rd >> messageLength;
|
||||
rd.checkpoint();
|
||||
rd >> subsequence >> tagCount;
|
||||
Tag msgtag;
|
||||
bool match = false;
|
||||
for (int i = 0; i < tagCount; i++) {
|
||||
rd >> msgtag;
|
||||
if (msgtag == tag) {
|
||||
match = true;
|
||||
break;
|
||||
} else if (tag.locality == tagLocalityLogRouter && msgtag.locality == tagLocalityLogRouter &&
|
||||
msgtag.id % logRouters == tag.id) {
|
||||
// Mutations that are in the partially durable span between known comitted version and
|
||||
// recovery version get copied to the new log generation. These commits might have had more
|
||||
// log router tags than what now exist, so we mod them down to what we have.
|
||||
match = true;
|
||||
}
|
||||
}
|
||||
rd.rewind();
|
||||
const void* begin = rd.readBytes(messageLength);
|
||||
if (match) {
|
||||
relevantMessages.push_back( StringRef((uint8_t*)begin, messageLength) );
|
||||
}
|
||||
// FIXME: Yield here so Evan doesn't have to
|
||||
}
|
||||
return relevantMessages;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Reference<LogData> logData ) {
|
||||
state BinaryWriter messages(Unversioned());
|
||||
state BinaryWriter messages2(Unversioned());
|
||||
|
@ -1033,21 +1155,58 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
|
||||
peekMessagesFromMemory( logData, req, messages2, endVersion );
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> kvs = wait(
|
||||
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
|
||||
state Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
|
||||
self->persistentData->readRange(KeyRangeRef(
|
||||
persistTagMessagesKey(logData->logId, req.tag, req.begin),
|
||||
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
|
||||
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
|
||||
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1))));
|
||||
|
||||
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence());
|
||||
|
||||
for (auto &kv : kvs) {
|
||||
auto ver = decodeTagMessagesKey(kv.key);
|
||||
messages << int32_t(-1) << ver;
|
||||
messages.serializeBytes(kv.value);
|
||||
state std::vector<Future<Standalone<StringRef>>> messageReads;
|
||||
for (auto &kv : kvrefs) {
|
||||
IDiskQueue::location begin, end;
|
||||
BinaryReader r(kv.value, Unversioned());
|
||||
r >> begin >> end;
|
||||
messageReads.push_back( self->rawPersistentQueue->read(begin, end) );
|
||||
}
|
||||
wait( waitForAll( messageReads ) );
|
||||
|
||||
ASSERT( messageReads.size() == kvrefs.size() );
|
||||
Version lastRefMessageVersion = 0;
|
||||
for (int i = 0; i < messageReads.size(); i++ ) {
|
||||
Standalone<StringRef> queueEntryData = messageReads[i].get();
|
||||
uint8_t valid;
|
||||
const uint32_t length = *(uint32_t*)queueEntryData.begin();
|
||||
queueEntryData = queueEntryData.substr( 4, queueEntryData.size() - 4);
|
||||
BinaryReader rd( queueEntryData, IncludeVersion() );
|
||||
TLogQueueEntry entry;
|
||||
rd >> entry >> valid;
|
||||
Version version = decodeTagMessageRefsKey(kvrefs[i].key);
|
||||
ASSERT( valid == 0x01 );
|
||||
ASSERT( length + sizeof(valid) == queueEntryData.size() );
|
||||
|
||||
messages << int32_t(-1) << version;
|
||||
|
||||
// FIXME: push DESIRED_TOTAL_BYTES into parseMessagesForTag
|
||||
// FIXME: maybe push this copy in as well
|
||||
std::vector<StringRef> parsedMessages = parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags);
|
||||
for (StringRef msg : parsedMessages) {
|
||||
messages << msg;
|
||||
}
|
||||
|
||||
lastRefMessageVersion = version;
|
||||
|
||||
if (messages.getLength() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
|
||||
endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1;
|
||||
messageReads.clear();
|
||||
kvrefs = Standalone<VectorRef<KeyValueRef>>();
|
||||
|
||||
if (messages.getLength() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES)
|
||||
endVersion = lastRefMessageVersion + 1;
|
||||
else
|
||||
messages.serializeBytes( messages2.toStringRef() );
|
||||
} else {
|
||||
|
|
|
@ -1227,7 +1227,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
int safe_range_end = logSet->tLogReplicationFactor - absent;
|
||||
|
||||
if( !lastEnd.present() || ((safe_range_end > 0) && (safe_range_end-1 < results.size()) && results[ safe_range_end-1 ].end < lastEnd.get()) ) {
|
||||
Version knownCommittedVersion = results[ new_safe_range_begin ].end - (g_network->isSimulated() ? 10*SERVER_KNOBS->VERSIONS_PER_SECOND : SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS); //In simulation this must be the maximum MAX_READ_TRANSACTION_LIFE_VERSIONS
|
||||
Version knownCommittedVersion = 0;
|
||||
for(int i = 0; i < results.size(); i++) {
|
||||
knownCommittedVersion = std::max(knownCommittedVersion, results[i].knownCommittedVersion);
|
||||
}
|
||||
|
|
|
@ -1140,13 +1140,13 @@ ACTOR Future<Void> runTests( Reference<ClusterConnectionFile> connFile, test_typ
|
|||
spec.timeout = 0;
|
||||
spec.waitForQuiescenceBegin = false;
|
||||
spec.waitForQuiescenceEnd = false;
|
||||
std::string rateLimit = format("%d", CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT);
|
||||
std::string rateLimitMax = format("%d", CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_LIMIT_MAX);
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("testName"), LiteralStringRef("ConsistencyCheck")));
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("performQuiescentChecks"), LiteralStringRef("false")));
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("distributed"), LiteralStringRef("false")));
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("failureIsError"), LiteralStringRef("true")));
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("indefinite"), LiteralStringRef("true")));
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("rateLimit"), StringRef(rateLimit)));
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("rateLimitMax"), StringRef(rateLimitMax)));
|
||||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("shuffleShards"), LiteralStringRef("true")));
|
||||
spec.options.push_back_deep(spec.options.arena(), options);
|
||||
testSpecs.push_back(spec);
|
||||
|
|
|
@ -556,7 +556,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
|
|||
localInfo.myLocality = locality;
|
||||
dbInfo->set(localInfo);
|
||||
|
||||
state Optional<std::string> incorrectConnectionString;
|
||||
state Optional<double> incorrectTime;
|
||||
loop {
|
||||
GetServerDBInfoRequest req;
|
||||
req.knownServerInfoID = dbInfo->get().id;
|
||||
|
@ -565,17 +565,19 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
|
|||
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
|
||||
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
|
||||
std::string connectionString = connFile->getConnectionString().toString();
|
||||
if(!incorrectTime.present()) {
|
||||
incorrectTime = now();
|
||||
}
|
||||
if(connFile->canGetFilename()) {
|
||||
// Don't log a SevWarnAlways the first time to account for transient issues (e.g. someone else changing the file right before us)
|
||||
TraceEvent(incorrectConnectionString.present() && incorrectConnectionString.get() == connectionString ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
|
||||
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the file right before us)
|
||||
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
|
||||
.detail("Filename", connFile->getFilename())
|
||||
.detail("ConnectionStringFromFile", fileConnectionString.toString())
|
||||
.detail("CurrentConnectionString", connectionString);
|
||||
}
|
||||
incorrectConnectionString = connectionString;
|
||||
}
|
||||
else {
|
||||
incorrectConnectionString = Optional<std::string>();
|
||||
incorrectTime = Optional<double>();
|
||||
}
|
||||
|
||||
auto peers = FlowTransport::transport().getIncompatiblePeers();
|
||||
|
|
|
@ -14,8 +14,9 @@ SSSSSSSSSS - 10 bytes Version Stamp
|
|||
RRRRRRRRRRRRRRRR - 16 bytes Transaction id
|
||||
NNNN - 4 Bytes Chunk number (Big Endian)
|
||||
TTTT - 4 Bytes Total number of chunks (Big Endian)
|
||||
XXXX - Variable length user provided transaction identifier
|
||||
*/
|
||||
StringRef sampleTrInfoKey = LiteralStringRef("\xff\x02/fdbClientInfo/client_latency/SSSSSSSSSS/RRRRRRRRRRRRRRRR/NNNNTTTT/");
|
||||
StringRef sampleTrInfoKey = LiteralStringRef("\xff\x02/fdbClientInfo/client_latency/SSSSSSSSSS/RRRRRRRRRRRRRRRR/NNNNTTTT/XXXX/");
|
||||
static const auto chunkNumStartIndex = sampleTrInfoKey.toString().find('N');
|
||||
static const auto numChunksStartIndex = sampleTrInfoKey.toString().find('T');
|
||||
static const int chunkFormatSize = 4;
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <math.h>
|
||||
|
||||
#include "flow/IRandom.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
|
@ -56,8 +58,11 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
//If true, then any failure of the consistency check will be logged as SevError. Otherwise, it will be logged as SevWarn
|
||||
bool failureIsError;
|
||||
|
||||
//Ideal number of bytes per second to read from each storage server
|
||||
int rateLimit;
|
||||
//Max number of bytes per second to read from each storage server
|
||||
int rateLimitMax;
|
||||
|
||||
// DataSet Size
|
||||
int64_t bytesReadInPreviousRound;
|
||||
|
||||
//Randomize shard order with each iteration if true
|
||||
bool shuffleShards;
|
||||
|
@ -78,7 +83,7 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
distributed = getOption(options, LiteralStringRef("distributed"), true);
|
||||
shardSampleFactor = std::max(getOption(options, LiteralStringRef("shardSampleFactor"), 1), 1);
|
||||
failureIsError = getOption(options, LiteralStringRef("failureIsError"), false);
|
||||
rateLimit = getOption(options, LiteralStringRef("rateLimit"), 0);
|
||||
rateLimitMax = getOption(options, LiteralStringRef("rateLimitMax"), 0);
|
||||
shuffleShards = getOption(options, LiteralStringRef("shuffleShards"), false);
|
||||
indefinite = getOption(options, LiteralStringRef("indefinite"), false);
|
||||
|
||||
|
@ -87,6 +92,7 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
firstClient = clientId == 0;
|
||||
|
||||
repetitions = 0;
|
||||
bytesReadInPreviousRound = 0;
|
||||
}
|
||||
|
||||
virtual std::string description()
|
||||
|
@ -585,7 +591,12 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
state int effectiveClientCount = (self->distributed) ? self->clientCount : 1;
|
||||
state int i = self->clientId * (self->shardSampleFactor + 1);
|
||||
state int increment = (self->distributed && !self->firstClient) ? effectiveClientCount * self->shardSampleFactor : 1;
|
||||
state Reference<IRateControl> rateLimiter = Reference<IRateControl>( new SpeedLimit(self->rateLimit, CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_WINDOW) );
|
||||
state int rateLimitForThisRound = self->bytesReadInPreviousRound == 0 ? self->rateLimitMax :
|
||||
std::min(self->rateLimitMax, static_cast<int>(ceil(self->bytesReadInPreviousRound / (float) CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME)));
|
||||
ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= self->rateLimitMax);
|
||||
TraceEvent("ConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound);
|
||||
state Reference<IRateControl> rateLimiter = Reference<IRateControl>( new SpeedLimit(rateLimitForThisRound, CLIENT_KNOBS->CONSISTENCY_CHECK_RATE_WINDOW) );
|
||||
state int64_t bytesReadInthisRound = 0;
|
||||
|
||||
state double dbSize = 100e12;
|
||||
if(g_network->isSimulated()) {
|
||||
|
@ -912,11 +923,12 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
shardKeys += data.size();
|
||||
}
|
||||
//after requesting each shard, enforce rate limit based on how much data will likely be read
|
||||
if(self->rateLimit > 0)
|
||||
if(rateLimitForThisRound > 0)
|
||||
{
|
||||
wait(rateLimiter->getAllowance(totalReadAmount));
|
||||
}
|
||||
bytesReadInRange += totalReadAmount;
|
||||
bytesReadInthisRound += totalReadAmount;
|
||||
|
||||
//Advance to the next set of entries
|
||||
if(firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more)
|
||||
|
@ -1029,7 +1041,7 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
}
|
||||
}*/
|
||||
|
||||
|
||||
self->bytesReadInPreviousRound = bytesReadInthisRound;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -511,6 +511,11 @@ public:
|
|||
return b;
|
||||
}
|
||||
|
||||
const void* peekBytes( int bytes ) {
|
||||
ASSERT( begin + bytes <= end );
|
||||
return begin;
|
||||
}
|
||||
|
||||
void serializeBytes(void* data, int bytes) {
|
||||
memcpy(data, readBytes(bytes), bytes);
|
||||
}
|
||||
|
@ -534,6 +539,7 @@ public:
|
|||
BinaryReader( const void* data, int length, VersionOptions vo ) {
|
||||
begin = (const char*)data;
|
||||
end = begin + length;
|
||||
check = nullptr;
|
||||
vo.read(*this);
|
||||
}
|
||||
template <class VersionOptions>
|
||||
|
@ -558,8 +564,19 @@ public:
|
|||
|
||||
bool empty() const { return begin == end; }
|
||||
|
||||
void checkpoint() {
|
||||
check = begin;
|
||||
}
|
||||
|
||||
void rewind() {
|
||||
ASSERT(check != nullptr);
|
||||
begin = check;
|
||||
check = nullptr;
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
const char *begin, *end;
|
||||
const char *begin, *end, *check;
|
||||
Arena m_pool;
|
||||
uint64_t m_protocolVersion;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue