Merge branch 'master' into getrange-perf-improvements

This commit is contained in:
A.J. Beamon 2017-10-26 09:25:04 -07:00
commit 0d68db0dac
120 changed files with 1184 additions and 360 deletions

View File

@ -25,7 +25,7 @@ sys.path[:0]=[os.path.join(os.path.dirname(__file__), '..', '..', 'bindings', 'p
import util
FDB_API_VERSION = 500
FDB_API_VERSION = 510
LOGGING = {
'version' : 1,

View File

@ -133,7 +133,7 @@ def choose_api_version(selected_api_version, tester_min_version, tester_max_vers
elif random.random() < 0.7:
api_version = min_version
elif random.random() < 0.9:
api_version = random.choice([v for v in [13, 14, 16, 21, 22, 23, 100, 200, 300, 400, 410, 420, 430, 440, 450, 460, 500] if v >= min_version and v <= max_version])
api_version = random.choice([v for v in [13, 14, 16, 21, 22, 23, 100, 200, 300, 400, 410, 420, 430, 440, 450, 460, 500, 510] if v >= min_version and v <= max_version])
else:
api_version = random.randint(min_version, max_version)

View File

@ -20,7 +20,7 @@
import os
MAX_API_VERSION = 500
MAX_API_VERSION = 510
class Tester:
def __init__(self, name, cmd, max_int_bits=64, min_api_version=0, max_api_version=MAX_API_VERSION, threads_enabled=True):

View File

@ -164,7 +164,7 @@ class ApiTest(Test):
op_choices += write_conflicts
op_choices += resets
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN']
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX']
atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR']
if args.concurrency > 1:

View File

@ -32,7 +32,7 @@ fdb.api_version(FDB_API_VERSION)
# SOMEDAY: This should probably be broken up into smaller tests
class ScriptedTest(Test):
TEST_API_VERSION = 500
TEST_API_VERSION = 510
def __init__(self, subspace):
super(ScriptedTest, self).__init__(subspace, ScriptedTest.TEST_API_VERSION, ScriptedTest.TEST_API_VERSION)

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include "fdbclient/MultiVersionTransaction.h"
#include "foundationdb/fdb_c.h"

View File

@ -28,10 +28,10 @@
#endif
#if !defined(FDB_API_VERSION)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 500)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 510)
#elif FDB_API_VERSION < 13
#error API version no longer supported (upgrade to 13)
#elif FDB_API_VERSION > 500
#elif FDB_API_VERSION > 510
#error Requested API version requires a newer version of this header
#endif

View File

@ -602,7 +602,7 @@ void runTests(struct ResultSet *rs) {
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(500), "select API version", rs);
checkError(fdb_select_api_version(510), "select API version", rs);
printf("Running performance test at client version: %s\n", fdb_get_client_version());
valueStr = (uint8_t*)malloc((sizeof(uint8_t))*valueSize);

View File

@ -243,7 +243,7 @@ void runTests(struct ResultSet *rs) {
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(500), "select API version", rs);
checkError(fdb_select_api_version(510), "select API version", rs);
printf("Running RYW Benchmark test at client version: %s\n", fdb_get_client_version());
keys = generateKeys(numKeys, keySize);

View File

@ -27,7 +27,7 @@
#include <pthread.h>
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#endif
#include <foundationdb/fdb_c.h>

View File

@ -33,7 +33,7 @@ THREAD_FUNC networkThread(void* fdb) {
}
ACTOR Future<Void> _test() {
API *fdb = FDB::API::selectAPIVersion(500);
API *fdb = FDB::API::selectAPIVersion(510);
auto c = fdb->createCluster( std::string() );
auto db = c->createDatabase();
state Reference<Transaction> tr( new Transaction(db) );
@ -77,7 +77,7 @@ ACTOR Future<Void> _test() {
}
void fdb_flow_test() {
API *fdb = FDB::API::selectAPIVersion(500);
API *fdb = FDB::API::selectAPIVersion(510);
fdb->setupNetwork();
startThread(networkThread, fdb);

View File

@ -23,7 +23,7 @@
#include <flow/flow.h>
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <bindings/c/foundationdb/fdb_c.h>
#undef DLLEXPORT

View File

@ -1631,6 +1631,8 @@ void populateAtomicOpMap() {
optionInfo["MIN"] = FDBMutationType::FDB_MUTATION_TYPE_MIN;
optionInfo["SET_VERSIONSTAMPED_KEY"] = FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY;
optionInfo["SET_VERSIONSTAMPED_VALUE"] = FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_VALUE;
optionInfo["BYTE_MIN"] = FDBMutationType::FDB_MUTATION_TYPE_BYTE_MIN;
optionInfo["BYTE_MAX"] = FDBMutationType::FDB_MUTATION_TYPE_BYTE_MAX;
}
void populateOpsThatCreateDirectories() {
@ -1685,7 +1687,7 @@ ACTOR void _test_versionstamp() {
try {
g_network = newNet2(NetworkAddress(), false);
API *fdb = FDB::API::selectAPIVersion(500);
API *fdb = FDB::API::selectAPIVersion(510);
fdb->setupNetwork();
startThread(networkThread, fdb);

View File

@ -8,7 +8,7 @@ This package requires:
- Go 1.1+ with CGO enabled
- FoundationDB C API 2.0.x, 3.0.x, or 4.x.y (part of the [FoundationDB clients package](https://files.foundationdb.org/fdb-c/))
Use of this package requires the selection of a FoundationDB API version at runtime. This package currently supports FoundationDB API versions 200-500.
Use of this package requires the selection of a FoundationDB API version at runtime. This package currently supports FoundationDB API versions 200-510.
To build this package, in the top level of this repository run:

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
#include <stdlib.h>
*/
@ -109,7 +109,7 @@ func (opt NetworkOptions) setOpt(code int, param []byte) error {
// library, an error will be returned. APIVersion must be called prior to any
// other functions in the fdb package.
//
// Currently, this package supports API versions 200 through 500.
// Currently, this package supports API versions 200 through 510.
//
// Warning: When using the multi-version client API, setting an API version that
// is not supported by a particular client library will prevent that client from
@ -117,7 +117,7 @@ func (opt NetworkOptions) setOpt(code int, param []byte) error {
// the API version of your application after upgrading your client until the
// cluster has also been upgraded.
func APIVersion(version int) error {
headerVersion := 500
headerVersion := 510
networkMutex.Lock()
defer networkMutex.Unlock()
@ -129,7 +129,7 @@ func APIVersion(version int) error {
return errAPIVersionAlreadySet
}
if version < 200 || version > 500 {
if version < 200 || version > 510 {
return errAPIVersionNotSupported
}

View File

@ -24,7 +24,7 @@ package fdb
/*
#cgo LDFLAGS: -lfdb_c -lm
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
#include <string.h>

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -21,7 +21,7 @@
#include <jni.h>
#include <string.h>
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ThreadFactory;
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
* with incorrect assumptions from the current version. The API version documented here is version
* {@code 500}.<br><br>
* {@code 510}.<br><br>
* FoundationDB encapsulates multiple versions of its interface by requiring
* the client to explicitly specify the version of the API it uses. The purpose
* of this design is to allow you to upgrade the server, client libraries, or
@ -157,8 +157,8 @@ public class FDB {
}
if(version < 500)
throw new IllegalArgumentException("API version not supported (minimum 500)");
if(version > 500)
throw new IllegalArgumentException("API version not supported (maximum 500)");
if(version > 510)
throw new IllegalArgumentException("API version not supported (maximum 510)");
Select_API_version(version);
return singleton = new FDB(version);
}

View File

@ -110,6 +110,9 @@ public class Subspace
}
/**
* Create a human-readable string representation of this subspace. This is
* really only useful for debugging purposes, but it includes information
* on what raw prefix the subspace is using.
* @return a printable representation of the subspace
*/
@Override
@ -117,6 +120,16 @@ public class Subspace
return "Subspace(rawPrefix=" + printable(rawPrefix) + ")";
}
/**
* Returns a hash-table compatible hash of this subspace. This is based off
* of the hash of the underlying byte-array prefix.
* @return a hash of this subspace
*/
@Override
public int hashCode() {
return Arrays.hashCode(rawPrefix);
}
/**
* Gets a new subspace which is equivalent to this subspace with its prefix {@link Tuple} extended by
* the specified {@code Object}. The object will be inserted into a {@link Tuple} and passed to {@link #get(Tuple)}.

View File

@ -25,7 +25,7 @@ and add it to your classpath.<br>
<h3>Getting started</h3>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports only version {@code 500}).
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 500} and {@code 510}).
With this API object you can then open {@link com.apple.foundationdb.Cluster Cluster}s and
{@link com.apple.foundationdb.Database Database}s and start using
{@link com.apple.foundationdb.Transaction Transactions}s.
@ -41,7 +41,7 @@ import Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -27,7 +27,7 @@ import java.nio.charset.Charset;
import java.util.Random;
public abstract class AbstractTester {
public static final int API_VERSION = 500;
public static final int API_VERSION = 510;
protected static final int NUM_RUNS = 25;
protected static final Charset ASCII = Charset.forName("ASCII");

View File

@ -33,7 +33,7 @@ public class BlockingBenchmark {
private static final int PARALLEL = 100;
public static void main(String[] args) throws InterruptedException {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
// The cluster file DOES NOT need to be valid, although it must exist.
// This is because the database is never really contacted in this test.

View File

@ -51,7 +51,7 @@ public class ConcurrentGetSetGet {
}
public static void main(String[] args) {
Database database = FDB.selectAPIVersion(500).open();
Database database = FDB.selectAPIVersion(510).open();
new ConcurrentGetSetGet().apply(database);
}

View File

@ -39,7 +39,7 @@ public class DirectoryTest {
public static void main(String[] args) throws Exception {
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(db);
} catch(Throwable t) {

View File

@ -30,7 +30,7 @@ import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -36,7 +36,7 @@ public class IterableTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster cluster = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster cluster = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = cluster.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
public class LocalityTests {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
{
Transaction tr = database.createTransaction();

View File

@ -45,7 +45,7 @@ public class ParallelRandomScan {
private static final int PARALLELISM_STEP = 5;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
public class RangeTest {
private static final int API_VERSION = 500;
private static final int API_VERSION = 510;
public static void main(String[] args) {
System.out.println("About to use version " + API_VERSION);

View File

@ -34,7 +34,7 @@ public class SerialInsertion {
private static final int NODES = 1000000;
public static void main(String[] args) {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
long start = System.currentTimeMillis();

View File

@ -39,7 +39,7 @@ public class SerialIteration {
private static final int THREAD_COUNT = 1;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = 1; i <= THREAD_COUNT; i++) {

View File

@ -36,7 +36,7 @@ public class SerialTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);

View File

@ -35,7 +35,7 @@ public class TupleTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.Transaction;
public class WatchTest {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
database.options().setLocationCacheSize(42);
Transaction tr = database.createTransaction();

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ThreadFactory;
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
* with incorrect assumptions from the current version. The API version documented here is version
* {@code 500}.<br><br>
* {@code 510}.<br><br>
* FoundationDB encapsulates multiple versions of its interface by requiring
* the client to explicitly specify the version of the API it uses. The purpose
* of this design is to allow you to upgrade the server, client libraries, or
@ -157,8 +157,8 @@ public class FDB {
}
if(version < 500)
throw new IllegalArgumentException("API version not supported (minimum 500)");
if(version > 500)
throw new IllegalArgumentException("API version not supported (maximum 500)");
if(version > 510)
throw new IllegalArgumentException("API version not supported (maximum 510)");
Select_API_version(version);
return singleton = new FDB(version);
}

View File

@ -110,6 +110,9 @@ public class Subspace
}
/**
* Create a human-readable string representation of this subspace. This is
* really only useful for debugging purposes, but it includes information
* on what raw prefix the subspace is using.
* @return a printable representation of the subspace
*/
@Override
@ -117,6 +120,16 @@ public class Subspace
return "Subspace(rawPrefix=" + printable(rawPrefix) + ")";
}
/**
* Returns a hash-table compatible hash of this subspace. This is based off
* of the hash of the underlying byte-array prefix.
* @return a hash of this subspace
*/
@Override
public int hashCode() {
return Arrays.hashCode(rawPrefix);
}
/**
* Gets a new subspace which is equivalent to this subspace with its prefix {@link Tuple} extended by
* the specified {@code Object}. The object will be inserted into a {@link Tuple} and passed to {@link #get(Tuple)}.

View File

@ -25,7 +25,7 @@ and add it to your classpath.<br>
<h3>Getting started</h3>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports only version {@code 500}).
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 500} and {@code 510}).
With this API object you can then open {@link com.apple.foundationdb.Cluster}s and
{@link com.apple.foundationdb.Database}s and start using
{@link com.apple.foundationdb.Transaction}s. Here we give an example. The example relies on a
@ -41,7 +41,7 @@ import Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -37,7 +37,7 @@ public class AsListTest {
* a bug made the the addition of the clear into the result returning 0 items.
*/
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
database.options().setLocationCacheSize(42);
Transaction tr = database.createTransaction();

View File

@ -33,7 +33,7 @@ public class BlockingBenchmark {
private static final int PARALLEL = 100;
public static void main(String[] args) throws InterruptedException {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
// The cluster file DOES NOT need to be valid, although it must exist.
// This is because the database is never really contacted in this test.

View File

@ -52,7 +52,7 @@ public class ConcurrentGetSetGet {
}
public static void main(String[] args) {
Database database = FDB.selectAPIVersion(500).open();
Database database = FDB.selectAPIVersion(510).open();
new ConcurrentGetSetGet().apply(database);
}

View File

@ -38,7 +38,7 @@ public class DirectoryTest {
public static void main(String[] args) throws Exception {
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(db);
} catch(Throwable t) {

View File

@ -29,7 +29,7 @@ import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -36,7 +36,7 @@ public class IterableTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster cluster = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster cluster = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = cluster.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
public class LocalityTests {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
{

View File

@ -44,7 +44,7 @@ public class ParallelRandomScan {
private static final int PARALLELISM_STEP = 5;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {

View File

@ -33,7 +33,7 @@ import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.Function;
public class RangeTest {
private static final int API_VERSION = 500;
private static final int API_VERSION = 510;
public static void main(String[] args) throws Exception {
System.out.println("About to use version " + API_VERSION);

View File

@ -34,7 +34,7 @@ public class SerialInsertion {
private static final int NODES = 1000000;
public static void main(String[] args) {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
long start = System.currentTimeMillis();
@ -94,4 +94,4 @@ public class SerialInsertion {
}
}
}
}

View File

@ -39,7 +39,7 @@ public class SerialIteration {
private static final int THREAD_COUNT = 1;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = 1; i <= THREAD_COUNT; i++) {

View File

@ -36,7 +36,7 @@ public class SerialTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);

View File

@ -31,7 +31,7 @@ public class TestApp {
public static void main(String[] args) throws Exception {
try {
Cluster cluster = FDB.selectAPIVersion(500).createCluster("C:\\Users\\Ben\\workspace\\fdb\\fdb.cluster");
Cluster cluster = FDB.selectAPIVersion(510).createCluster("C:\\Users\\Ben\\workspace\\fdb\\fdb.cluster");
System.out.println("I now have the cluster");
Database db = cluster.openDatabase();

View File

@ -34,7 +34,7 @@ public class TupleTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -43,7 +43,7 @@ public class VersionstampExample {
}
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
Transaction tr = db.createTransaction();
tr.mutate(MutationType.SET_VERSIONSTAMPED_VALUE, "foo".getBytes(), "blahblahbl".getBytes());

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.async.Future;
public class WatchTest {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
database.options().setLocationCacheSize(42);
Transaction tr = database.createTransaction();

View File

@ -43,8 +43,8 @@ module.exports = {
throw new Error('Cannot select multiple different FDB API versions');
if(version < 500)
throw new RangeError('FDB API versions before 500 are not supported');
if(version > 500)
throw new RangeError('Latest known FDB API version is 500');
if(version > 510)
throw new RangeError('Latest known FDB API version is 510');
if(!selectedApiVersion.value) {
fdb.apiVersion(version);

View File

@ -22,6 +22,6 @@
#ifndef FDB_NODE_VERSION_H
#define FDB_NODE_VERSION_H
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#endif

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
var fdb = require('../lib/fdb.js').apiVersion(500);
var fdb = require('../lib/fdb.js').apiVersion(510);
var fdbModule = require('../lib/fdbModule.js');
console.log(fdb.tuple.pack([-Math.pow(2,53)]));

View File

@ -36,7 +36,7 @@ def _add_symbols(module, symbols):
globals()[symbol] = getattr(module, symbol)
def api_version(ver):
header_version = 500
header_version = 510
if '_version' in globals():
if globals()['_version'] != ver:

View File

@ -26,7 +26,7 @@
module FDB
@@chosen_version = -1
def self.api_version(version)
header_version = 500
header_version = 510
if @@chosen_version >= 0
if@@chosen_version != version
raise "FDB API already loaded at version #{@@chosen_version}."

View File

@ -698,6 +698,8 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
description += format("\nNeed at least %d log servers, %d proxies and %d resolvers.", recoveryState["required_logs"].get_int(), recoveryState["required_proxies"].get_int(), recoveryState["required_resolvers"].get_int());
if (statusObjCluster.has("machines") && statusObjCluster.has("processes"))
description += format("\nHave %d processes on %d machines.", statusObjCluster["processes"].get_obj().size(), statusObjCluster["machines"].get_obj().size());
} else if (name == "locking_old_transaction_servers" && recoveryState["missing_logs"].get_str().size()) {
description += format("\nNeed one or more of the following log servers: %s", recoveryState["missing_logs"].get_str().c_str());
}
description = lineWrap(description.c_str(), 80);
if (!printedCoordinators && (
@ -1658,6 +1660,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
StatusObject status = wait( makeInterruptable( StatusClient::statusFetcher( ccf ) ) );
state std::string errorString = "ERROR: Could not calculate the impact of this exclude on the total free space in the cluster.\n"
"Please try the exclude again in 30 seconds.\n"
"Type `exclude FORCE <ADDRESS>*' to exclude without checking free space.\n";
StatusObjectReader statusObj(status);

View File

@ -24,7 +24,8 @@
#include "CommitTransaction.h"
static ValueRef doLittleEndianAdd(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doLittleEndianAdd(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return otherOperand;
@ -46,7 +47,8 @@ static ValueRef doLittleEndianAdd(const ValueRef& existingValue, const ValueRef&
return StringRef(buf, i);
}
static ValueRef doAnd(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doAnd(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!otherOperand.size()) return otherOperand;
uint8_t* buf = new (ar) uint8_t [otherOperand.size()];
@ -60,7 +62,15 @@ static ValueRef doAnd(const ValueRef& existingValue, const ValueRef& otherOperan
return StringRef(buf, i);
}
static ValueRef doOr(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doAndV2(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present())
return otherOperand;
return doAnd(existingValueOptional, otherOperand, ar);
}
static ValueRef doOr(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return otherOperand;
@ -75,7 +85,8 @@ static ValueRef doOr(const ValueRef& existingValue, const ValueRef& otherOperand
return StringRef(buf, i);
}
static ValueRef doXor(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doXor(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return otherOperand;
@ -91,7 +102,8 @@ static ValueRef doXor(const ValueRef& existingValue, const ValueRef& otherOperan
return StringRef(buf, i);
}
static ValueRef doAppendIfFits(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doAppendIfFits(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return existingValue;
if(existingValue.size() + otherOperand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT) {
@ -111,7 +123,8 @@ static ValueRef doAppendIfFits(const ValueRef& existingValue, const ValueRef& ot
return StringRef(buf, i+j);
}
static ValueRef doMax(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doMax(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if (!existingValue.size()) return otherOperand;
if (!otherOperand.size()) return otherOperand;
@ -142,9 +155,20 @@ static ValueRef doMax(const ValueRef& existingValue, const ValueRef& otherOperan
return otherOperand;
}
static ValueRef doMin(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doByteMax(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present()) return otherOperand;
const ValueRef& existingValue = existingValueOptional.get();
if (existingValue > otherOperand)
return existingValue;
return otherOperand;
}
static ValueRef doMin(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!otherOperand.size()) return otherOperand;
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
int i,j;
for (i = otherOperand.size() - 1; i >= existingValue.size(); i--) {
@ -179,34 +203,22 @@ static ValueRef doMin(const ValueRef& existingValue, const ValueRef& otherOperan
return otherOperand;
}
static ValueRef doAtomicOp(const ValueRef& existingValue, const ValueRef& otherOperand, MutationRef::Type mutationType, Arena& ar) {
switch(mutationType) {
case MutationRef::AddValue:
return doLittleEndianAdd(existingValue, otherOperand, ar);
break;
case MutationRef::AppendIfFits:
return doAppendIfFits(existingValue, otherOperand, ar);
break;
case MutationRef::And:
return doAnd(existingValue, otherOperand, ar);
break;
case MutationRef::Or:
return doOr(existingValue, otherOperand, ar);
break;
case MutationRef::Xor:
return doXor(existingValue, otherOperand, ar);
break;
case MutationRef::Max:
return doMax(existingValue, otherOperand, ar);
break;
case MutationRef::Min:
return doMin(existingValue, otherOperand, ar);
break;
default:
throw operation_failed();
}
static ValueRef doMinV2(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present())
return otherOperand;
return doMin(existingValueOptional, otherOperand, ar);
}
static ValueRef doByteMin(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present()) return otherOperand;
const ValueRef& existingValue = existingValueOptional.get();
if (existingValue < otherOperand)
return existingValue;
return otherOperand;
}
/*
* Returns the range corresponding to the specified versionstamp key.

View File

@ -26,8 +26,8 @@
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, MAX_ATOMIC_OP };
const char * typeString[MAX_ATOMIC_OP] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue" };
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP };
const char * typeString[MAX_ATOMIC_OP] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
// This is stored this way for serialization purposes.
uint8_t type;
StringRef param1, param2;
@ -54,9 +54,9 @@ struct MutationRef {
// These masks define which mutation types have particular properties (they are used to implement isSingleKeyMutation() etc)
enum {
ATOMIC_MASK = (1 << AddValue) | (1 << And) | (1 << Or) | (1 << Xor) | (1 << AppendIfFits) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue),
ATOMIC_MASK = (1 << AddValue) | (1 << And) | (1 << Or) | (1 << Xor) | (1 << AppendIfFits) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << ByteMin) | (1 << ByteMax) | (1 << MinV2) | (1 << AndV2),
SINGLE_KEY_MASK = ATOMIC_MASK | (1<<SetValue),
NON_ASSOCIATIVE_MASK = (1 << AddValue) | (1 << Or) | (1 << Xor) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue)
NON_ASSOCIATIVE_MASK = (1 << AddValue) | (1 << Or) | (1 << Xor) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << MinV2)
};
};

View File

@ -998,7 +998,7 @@ namespace fileBackup {
state Key nextKey = keyAfter(lastKey);
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, endKey, outVersion),
endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, nextKey, outVersion),
timeout // time at which to do the first saveAndExtend
)
);

View File

@ -158,7 +158,7 @@ public:
Future<Void> set(Database cx, T const &val) {
auto _key = key;
auto _val = Codec<T>::pack(val).pack();
Value _val = Codec<T>::pack(val).pack();
return runRYWTransaction(cx, [_key, _val](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);

View File

@ -144,6 +144,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BLOBSTORE_CONCURRENT_UPLOADS, BACKUP_TASKS_PER_AGENT*2 );
init( BLOBSTORE_CONCURRENT_WRITES_PER_FILE, 5 );
init( BLOBSTORE_CONCURRENT_READS_PER_FILE, 3 );
init( BLOBSTORE_READ_BLOCK_SIZE, 1024 * 1024 );
init( BLOBSTORE_READ_AHEAD_BLOCKS, 0 );

View File

@ -150,6 +150,7 @@ public:
int BLOBSTORE_MULTIPART_MAX_PART_SIZE;
int BLOBSTORE_MULTIPART_MIN_PART_SIZE;
int BLOBSTORE_CONCURRENT_UPLOADS;
int BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
int BLOBSTORE_CONCURRENT_READS_PER_FILE;
int BLOBSTORE_READ_BLOCK_SIZE;
int BLOBSTORE_READ_AHEAD_BLOCKS;

View File

@ -2208,6 +2208,12 @@ void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationR
if(operand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
throw value_too_large();
if (apiVersionAtLeast(510)) {
if (operationType == MutationRef::Min)
operationType = MutationRef::MinV2;
else if (operationType == MutationRef::And)
operationType = MutationRef::AndV2;
}
auto &req = tr;
auto &t = req.transaction;
auto r = singleKeyRange( key, req.arena );

View File

@ -127,6 +127,9 @@ public:
void checkDeferredError() { if (deferred_error.code() != invalid_error_code) throw deferred_error; }
private: friend class ThreadSafeCluster;
friend class AtomicOpsApiCorrectnessWorkload; // This is just for testing purposes. It needs to change apiVersion
friend class AtomicOpsWorkload; // This is just for testing purposes. It needs to change apiVersion
Cluster( Reference<ClusterConnectionFile> connFile, int apiVersion = API_VERSION_LATEST );
Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface;

View File

@ -55,9 +55,9 @@ KeyValueRef const& RYWIterator::kv( Arena& arena ) {
if (writes.is_unmodified_range())
return cache.kv( arena );
else if (writes.is_independent() || cache.is_empty_range())
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), Optional<ValueRef>(), arena ).value );
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), Optional<ValueRef>(), arena ).value.get() );
else
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), cache.kv(arena).value, arena ).value );
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), cache.kv(arena).value, arena ).value.get() );
}
RYWIterator& RYWIterator::operator++() {
@ -651,11 +651,11 @@ TEST_CASE("fdbclient/WriteMap/random") {
TraceEvent("RWMT_checkOperation")
.detail("wm_key", printable(it.beginKey().toStandaloneStringRef()))
.detail("wm_size", it.op().size())
.detail("wm_value", it.op().top().value.size())
.detail("wm_value", it.op().top().value.present() ? std::to_string(it.op().top().value.get().size()) : "Not Found")
.detail("wm_type", (int)it.op().top().type)
.detail("sm_key", printable(setIter->first))
.detail("sm_size", setIter->second.size())
.detail("sm_value", setIter->second.top().value.size())
.detail("sm_value", setIter->second.top().value.present() ? std::to_string(setIter->second.top().value.get().size()) : "Not Found")
.detail("sm_type", (int)setIter->second.top().type);
ASSERT(it.beginKey() == setIter->first && it.op() == setIter->second);
++setIter;

View File

@ -1406,7 +1406,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
for( int i = 0; i < op.size(); ++i) {
switch(op[i].type) {
case MutationRef::SetValue:
tr.set( it.beginKey().assertRef(), op[i].value, false );
tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
break;
case MutationRef::AddValue:
case MutationRef::AppendIfFits:
@ -1417,7 +1417,11 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
case MutationRef::Min:
case MutationRef::SetVersionstampedKey:
case MutationRef::SetVersionstampedValue:
tr.atomicOp( it.beginKey().assertRef(), op[i].value, op[i].type, false );
case MutationRef::ByteMin:
case MutationRef::ByteMax:
case MutationRef::MinV2:
case MutationRef::AndV2:
tr.atomicOp( it.beginKey().assertRef(), op[i].value.get(), op[i].type, false );
break;
default:
break;
@ -1488,6 +1492,13 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
if (operationType == MutationRef::SetVersionstampedValue && operand.size() < 10)
throw client_invalid_operation();
if (tr.apiVersionAtLeast(510)) {
if (operationType == MutationRef::Min)
operationType = MutationRef::MinV2;
else if (operationType == MutationRef::And)
operationType = MutationRef::AndV2;
}
if(options.readYourWritesDisabled) {
return tr.atomicOp(key, operand, (MutationRef::Type) operationType, addWriteConflict);
}

View File

@ -64,4 +64,12 @@ runRYWTransactionFailIfLocked(Database cx, Function func) {
}
}
ACTOR template < class Function >
Future<decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
runRYWTransactionNoRetry(Database cx, Function func) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue()) result = wait(func(tr));
Void _ = wait(tr->commit());
return result;
}
#endif

View File

@ -304,7 +304,10 @@ public:
}
ACTOR static Future<bool> doTask(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
if (task && TaskFuncBase::isValidTask(task)) {
if (!task || !TaskFuncBase::isValidTask(task))
return false;
try {
state Reference<TaskFuncBase> taskFunc = TaskFuncBase::create(task->params[Task::reservedTaskParamKeyType]);
if (taskFunc) {
state bool verifyTask = (task->params.find(Task::reservedTaskParamValidKey) != task->params.end());
@ -358,10 +361,8 @@ public:
state Reference<ReadYourWritesTransaction> tr3(new ReadYourWritesTransaction(cx));
taskBucket->setOptions(tr3);
Version version = wait(tr3->getReadVersion());
if(version >= task->timeout) {
TraceEvent(SevWarn, "TB_ExecuteTimedOut").detail("TaskType", printable(task->params[Task::reservedTaskParamKeyType]));
return true;
}
if(version >= task->timeout)
throw timed_out();
// Otherwise reset the timeout
timeout = delay((BUGGIFY ? (2 * g_random->random01()) : 1.0) * (double)(task->timeout - (uint64_t)versionNow) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND);
}
@ -370,12 +371,19 @@ public:
if (BUGGIFY) Void _ = wait(delay(10.0));
Void _ = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
return finishTaskRun(tr, taskBucket, futureBucket, task, taskFunc, verifyTask); }));
return true;
return finishTaskRun(tr, taskBucket, futureBucket, task, taskFunc, verifyTask);
}));
}
} catch(Error &e) {
TraceEvent(SevWarn, "TB_ExecuteFailure")
.detail("TaskUID", task->key.printable())
.detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable())
.detail("Priority", task->getPriority())
.error(e);
}
return false;
// Return true to indicate that we did work.
return true;
}
ACTOR static Future<Void> run(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, double *pollDelay, int maxConcurrentTasks) {

View File

@ -28,11 +28,11 @@
#include "Atomic.h"
struct RYWMutation {
ValueRef value;
Optional<ValueRef> value;
enum MutationRef::Type type;
RYWMutation( ValueRef const& entry, MutationRef::Type type ) : value(entry), type(type) {}
RYWMutation() : value(ValueRef()), type(MutationRef::NoOp) {}
RYWMutation(Optional<ValueRef> const& entry, MutationRef::Type type ) : value(entry), type(type) {}
RYWMutation() : value(), type(MutationRef::NoOp) {}
bool operator == (const RYWMutation& r) const {
return value == r.value && type == r.type;
@ -148,7 +148,7 @@ public:
if (it.entry().key != key) {
if( it.is_cleared_range() && is_dependent ) {
it.tree.clear();
OperationStack op( RYWMutation( StringRef(), MutationRef::SetValue ) );
OperationStack op( RYWMutation( Optional<StringRef>(), MutationRef::SetValue ) );
coalesceOver(op, RYWMutation(param, operation), *arena);
PTreeImpl::insert( writes, ver, WriteMapEntry( key, std::move(op), true, following_conflict, is_conflict, following_unreadable, is_unreadable ) );
} else {
@ -165,7 +165,7 @@ public:
e.is_conflict = is_conflict;
e.is_unreadable = is_unreadable;
if (e.stack.size() == 0 && it.is_cleared_range() && is_dependent) {
e.stack.push(RYWMutation(StringRef(), MutationRef::SetValue));
e.stack.push(RYWMutation(Optional<StringRef>(), MutationRef::SetValue));
coalesceOver(e.stack, RYWMutation(param, operation), *arena);
} else if( !is_unreadable && e.stack.size() > 0 )
coalesceOver( e.stack, RYWMutation( param, operation ), *arena );
@ -381,14 +381,16 @@ public:
bool empty() const { return writeMapEmpty; }
static RYWMutation coalesce(RYWMutation existingEntry, RYWMutation newEntry, Arena& arena) {
ASSERT(newEntry.value.present());
if (newEntry.type == MutationRef::SetValue)
return newEntry;
else if (newEntry.type == MutationRef::AddValue) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::AddValue:
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value, arena), MutationRef::AddValue);
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value.get(), arena), MutationRef::AddValue);
default:
throw operation_failed();
}
@ -396,9 +398,9 @@ public:
else if (newEntry.type == MutationRef::AppendIfFits) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::AppendIfFits:
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value, arena), MutationRef::AppendIfFits);
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value.get(), arena), MutationRef::AppendIfFits);
default:
throw operation_failed();
}
@ -406,9 +408,9 @@ public:
else if (newEntry.type == MutationRef::And) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAnd(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doAnd(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::And:
return RYWMutation(doAnd(existingEntry.value, newEntry.value, arena), MutationRef::And);
return RYWMutation(doAnd(existingEntry.value, newEntry.value.get(), arena), MutationRef::And);
default:
throw operation_failed();
}
@ -416,9 +418,9 @@ public:
else if (newEntry.type == MutationRef::Or) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doOr(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doOr(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Or:
return RYWMutation(doOr(existingEntry.value, newEntry.value, arena), MutationRef::Or);
return RYWMutation(doOr(existingEntry.value, newEntry.value.get(), arena), MutationRef::Or);
default:
throw operation_failed();
}
@ -426,9 +428,9 @@ public:
else if (newEntry.type == MutationRef::Xor) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doXor(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doXor(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Xor:
return RYWMutation(doXor(existingEntry.value, newEntry.value, arena), MutationRef::Xor);
return RYWMutation(doXor(existingEntry.value, newEntry.value.get(), arena), MutationRef::Xor);
default:
throw operation_failed();
}
@ -436,9 +438,9 @@ public:
else if (newEntry.type == MutationRef::Max) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMax(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Max:
return RYWMutation(doMax(existingEntry.value, newEntry.value, arena), MutationRef::Max);
return RYWMutation(doMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::Max);
default:
throw operation_failed();
}
@ -446,20 +448,60 @@ public:
else if (newEntry.type == MutationRef::Min) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMin(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Min:
return RYWMutation(doMin(existingEntry.value, newEntry.value, arena), MutationRef::Min);
return RYWMutation(doMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::Min);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::ByteMin) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doByteMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::ByteMin:
return RYWMutation(doByteMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::ByteMin);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::ByteMax) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doByteMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::ByteMax:
return RYWMutation(doByteMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::ByteMax);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::MinV2) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMinV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::MinV2:
return RYWMutation(doMinV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::MinV2);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::AndV2) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAndV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::AndV2:
return RYWMutation(doAndV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::AndV2);
default:
throw operation_failed();
}
}
else throw operation_failed();
}
static void coalesceOver(OperationStack& stack, RYWMutation newEntry, Arena& arena) {
RYWMutation existingEntry = stack.top();
if (existingEntry.type == newEntry.type) {
if (isNonAssociativeOp(existingEntry.type) && existingEntry.value.size() != newEntry.value.size()) {
if (isNonAssociativeOp(existingEntry.type) && existingEntry.value.present() && existingEntry.value.get().size() != newEntry.value.get().size()) {
stack.push(newEntry);
}
else {
@ -480,7 +522,7 @@ public:
if( !stack.isDependent() && stack.size() == 1 )
return stack.at(0);
RYWMutation currentEntry = RYWMutation( value.present() ? value.get() : StringRef(), MutationRef::SetValue);
RYWMutation currentEntry = RYWMutation( value, MutationRef::SetValue);
for(int i = 0; i < stack.size(); ++i) {
currentEntry = coalesce(currentEntry, stack.at(i), arena);
}

View File

@ -202,7 +202,7 @@ description is not currently required but encouraged.
description="Deprecated"/>
<Option name="bit_and" code="6"
paramType="Bytes" paramDescription="value with which to perform bitwise and"
description="Performs a bitwise ``and`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``."/>
description="Performs a bitwise ``and`` operation. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``."/>
<Option name="or" code="7"
paramType="Bytes" paramDescription="value with which to perform bitwise or"
description="Deprecated"/>
@ -220,13 +220,19 @@ description is not currently required but encouraged.
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The larger of the two values is then stored in the database."/>
<Option name="min" code="13"
paramType="Bytes" paramDescription="value to check against database value"
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The smaller of the two values is then stored in the database."/>
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The smaller of the two values is then stored in the database."/>
<Option name="set_versionstamped_key" code="14"
paramType="Bytes" paramDescription="value to which to set the transformed key"
description="Transforms ``key`` using a versionstamp for the transaction. Sets the transformed key in the database to ``param``. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database. The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time versionstamps are not compatible with the Tuple layer. Note that this implies versionstamped keys may not be used with the Subspace and Directory layers." />
<Option name="set_versionstamped_value" code="15"
paramType="Bytes" paramDescription="value to versionstamp and set"
description="Transforms ``param`` using a versionstamp for the transaction. Sets ``key`` in the database to the transformed parameter. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database. The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time versionstamps are not compatible with the Tuple layer." />
<Option name="byte_min" code="16"
paramType="Bytes" paramDescription="value to check against database value"
description="Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the smaller of the two values is then stored in the database."/>
<Option name="byte_max" code="17"
paramType="Bytes" paramDescription="value to check against database value"
description="Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the larger of the two values is then stored in the database."/>
</Scope>
<Scope name="ConflictRangeType">

View File

@ -38,6 +38,16 @@
#include "md5/md5.h"
#include "libb64/encode.h"
ACTOR template<typename T> static Future<T> joinErrorGroup(Future<T> f, Promise<Void> p) {
try {
Void _ = wait(success(f) || p.getFuture());
return f.get();
} catch(Error &e) {
if(p.canBeSet())
p.sendError(e);
throw;
}
}
// This class represents a write-only file that lives in an S3-style blob store. It writes using the REST API,
// using multi-part upload and beginning to transfer each part as soon as it is large enough.
// All write operations file operations must be sequential and contiguous.
@ -96,7 +106,7 @@ public:
data = (const uint8_t *)data + finishlen;
// End current part (and start new one)
Void _ = wait(f->endCurrentPart(true));
Void _ = wait(f->endCurrentPart(f.getPtr(), true));
p = f->m_parts.back().getPtr();
}
@ -109,7 +119,7 @@ public:
throw non_sequential_op();
m_cursor += length;
return write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
return m_error.getFuture() || write_impl(Reference<AsyncFileBlobStoreWrite>::addRef(this), (const uint8_t *)data, length);
}
virtual Future<Void> truncate( int64_t size ) {
@ -119,14 +129,10 @@ public:
}
ACTOR static Future<std::string> doPartUpload(AsyncFileBlobStoreWrite *f, Part *p) {
try {
p->finalizeMD5();
std::string upload_id = wait(f->getUploadID());
std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string));
return etag;
} catch(Error &e) {
throw;
}
p->finalizeMD5();
std::string upload_id = wait(f->getUploadID());
std::string etag = wait(f->m_bstore->uploadPart(f->m_bucket, f->m_object, upload_id, p->number, &p->content, p->length, p->md5string));
return etag;
}
ACTOR static Future<Void> doFinishUpload(AsyncFileBlobStoreWrite* f) {
@ -139,7 +145,7 @@ public:
}
// There are at least 2 parts. End the last part (which could be empty)
Void _ = wait(f->endCurrentPart());
Void _ = wait(f->endCurrentPart(f));
state BlobStoreEndpoint::MultiPartSetT partSet;
state std::vector<Reference<Part>>::iterator p;
@ -208,17 +214,26 @@ private:
Future<std::string> m_upload_id;
Future<Void> m_finished;
std::vector<Reference<Part>> m_parts;
Promise<Void> m_error;
FlowLock m_concurrentUploads;
Future<Void> endCurrentPart(bool startNew = false) {
if(m_parts.back()->length == 0)
// End the current part and start uploading it, but also wait for a part to finish if too many are in transit.
ACTOR static Future<Void> endCurrentPart(AsyncFileBlobStoreWrite *f, bool startNew = false) {
if(f->m_parts.back()->length == 0)
return Void();
// Start the upload
m_parts.back()->etag = doPartUpload(this, m_parts.back().getPtr());
// Wait for an upload slot to be available
Void _ = wait(f->m_concurrentUploads.take(1));
// Do the upload, and if it fails forward errors to m_error and also stop if anything else sends an error to m_error
// Also, hold a releaser for the concurrent upload slot while all that is going on.
f->m_parts.back()->etag = holdWhile(std::shared_ptr<FlowLock::Releaser>(new FlowLock::Releaser(f->m_concurrentUploads, 1)),
joinErrorGroup(doPartUpload(f, f->m_parts.back().getPtr()), f->m_error)
);
// Make a new part to write to
if(startNew)
m_parts.push_back(Reference<Part>(new Part(m_parts.size() + 1)));
f->m_parts.push_back(Reference<Part>(new Part(f->m_parts.size() + 1)));
return Void();
}
@ -231,7 +246,7 @@ private:
public:
AsyncFileBlobStoreWrite(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object)
: m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0) {
: m_bstore(bstore), m_bucket(bucket), m_object(object), m_cursor(0), m_concurrentUploads(bstore->knobs.concurrent_writes_per_file) {
// Add first part
m_parts.push_back(Reference<Part>(new Part(1)));

View File

@ -59,6 +59,7 @@ BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE;
concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS;
concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE;
concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE;
read_ahead_blocks = CLIENT_KNOBS->BLOBSTORE_READ_AHEAD_BLOCKS;
read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE;
@ -80,6 +81,7 @@ bool BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
TRY_PARAM(multipart_min_part_size, minps);
TRY_PARAM(concurrent_uploads, cu);
TRY_PARAM(concurrent_reads_per_file, crpf);
TRY_PARAM(concurrent_writes_per_file, cwpf);
TRY_PARAM(read_block_size, rbs);
TRY_PARAM(read_ahead_blocks, rab);
TRY_PARAM(read_cache_blocks_per_file, rcb);
@ -105,6 +107,7 @@ std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
_CHECK_PARAM(multipart_min_part_size, minps);
_CHECK_PARAM(concurrent_uploads, cu);
_CHECK_PARAM(concurrent_reads_per_file, crpf);
_CHECK_PARAM(concurrent_writes_per_file, cwpf);
_CHECK_PARAM(read_block_size, rbs);
_CHECK_PARAM(read_ahead_blocks, rab);
_CHECK_PARAM(read_cache_blocks_per_file, rcb);
@ -195,7 +198,7 @@ Reference<BlobStoreEndpoint> BlobStoreEndpoint::fromString(std::string const &ur
} catch(std::string &err) {
if(error != nullptr)
*error = err;
TraceEvent(SevWarn, "BlobStoreEndpoint").detail("Description", err).detail("Format", getURLFormat()).detail("URL", url);
TraceEvent(SevWarnAlways, "BlobStoreEndpoint").detail("Description", err).detail("Format", getURLFormat()).detail("URL", url);
throw file_not_found();
}
}
@ -247,13 +250,8 @@ ACTOR Future<bool> objectExists_impl(Reference<BlobStoreEndpoint> b, std::string
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0));
if(r->code == 200)
return true;
if(r->code == 404)
return false;
throw http_bad_response();
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200, 404}));
return r->code == 200;
}
Future<bool> BlobStoreEndpoint::objectExists(std::string const &bucket, std::string const &object) {
@ -263,11 +261,9 @@ Future<bool> BlobStoreEndpoint::objectExists(std::string const &bucket, std::str
ACTOR Future<Void> deleteObject_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0));
// 200 means object deleted, 404 means it doesn't exist, so we'll call that success as well
if(r->code == 200 || r->code == 404)
return Void();
throw http_bad_response();
Reference<HTTP::Response> r = wait(b->doRequest("DELETE", resource, headers, NULL, 0, {200, 404}));
// 200 means object deleted, 404 means it doesn't exist already, so either success code passed above is fine.
return Void();
}
Future<Void> BlobStoreEndpoint::deleteObject(std::string const &bucket, std::string const &object) {
@ -304,9 +300,7 @@ ACTOR Future<int64_t> objectSize_impl(Reference<BlobStoreEndpoint> b, std::strin
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0));
if(r->code != 200)
throw io_error();
Reference<HTTP::Response> r = wait(b->doRequest("HEAD", resource, headers, NULL, 0, {200}));
return r->contentLen;
}
@ -335,7 +329,7 @@ Future<Reference<IConnection>> BlobStoreEndpoint::connect( NetworkAddress addres
// Do a request, get a Response.
// Request content is provided as UnsentPacketQueue *pContent which will be depleted as bytes are sent but the queue itself must live for the life of this actor
// and be destroyed by the caller
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoint> bstore, std::string verb, std::string resource, HTTP::Headers headers, UnsentPacketQueue *pContent, int contentLen) {
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoint> bstore, std::string verb, std::string resource, HTTP::Headers headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes) {
state UnsentPacketQueue contentCopy;
// Set content length header if there is content
@ -345,11 +339,13 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
Void _ = wait(bstore->concurrentRequests.take(1));
state FlowLock::Releaser globalReleaser(bstore->concurrentRequests, 1);
state int tries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries);
state double retryDelay = 2.0;
state int maxTries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries);
state int thisTry = 1;
state double nextRetryDelay = 2.0;
state NetworkAddress address;
loop {
state Optional<Error> err;
try {
// Pick an adress
address = bstore->addresses[g_random->randomInt(0, bstore->addresses.size())];
@ -382,56 +378,85 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
Void _ = wait(bstore->requestRate->getAllowance(1));
state Reference<HTTP::Response> r = wait(timeoutError(HTTP::doRequest(conn, verb, resource, headers, &contentCopy, contentLen, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate), bstore->knobs.request_timeout));
std::string connectionHeader;
HTTP::Headers::iterator i = r->headers.find("Connection");
if(i != r->headers.end())
connectionHeader = i->second;
// If the response parsed successfully (which is why we reached this point) and the connection can be reused, put the connection in the connection_pool
if(connectionHeader != "close")
// Since the response was parsed successfully (which is why we are here) reuse the connection unless we received the "Connection: close" header.
if(r->headers["Connection"] != "close")
bstore->connectionPool[address].push_back(BlobStoreEndpoint::ConnPoolEntry(conn, now()));
// Handle retry-after response code
if(r->code == 429) {
bstore->s_stats.requests_failed++;
conn = Reference<IConnection>();
double d = 60;
if(r->headers.count("Retry-After"))
d = atof(r->headers["Retry-After"].c_str());
Void _ = wait(delay(d));
// Just continue, don't throw an error, don't decrement tries
}
else if(r->code == 406) {
// Blob returns this when the account doesn't exist
throw http_not_accepted();
}
else if(r->code == 500 || r->code == 503) {
// For these errors just treat it like connection_failed
throw connection_failed();
}
else
break;
conn.clear();
} catch(Error &e) {
// If the error is connection failed and a retry is allowed then ignore the error
if((e.code() == error_code_connection_failed || e.code() == error_code_timed_out) && --tries > 0) {
bstore->s_stats.requests_failed++;
TraceEvent(SevWarn, "BlobStoreHTTPConnectionFailed").detail("RemoteEndpoint", address).detail("Verb", verb).detail("Resource", resource).suppressFor(5.0, true);
Void _ = wait(delay(retryDelay));
retryDelay *= 2;
retryDelay = std::min(retryDelay, 60.0);
}
// For timeouts, conn failure, or bad reponse reported by HTTP:doRequest, save the error and handle it / possibly retry below.
// Any other error is rethrown.
if(e.code() == error_code_connection_failed || e.code() == error_code_timed_out || e.code() == error_code_http_bad_response)
err = e;
else
throw;
}
// If err is not present then r is valid.
// If r->code is in successCodes then record the successful request and return r.
if(!err.present() && successCodes.count(r->code) != 0) {
bstore->s_stats.requests_successful++;
return r;
}
// Otherwise, this request is considered failed. Update failure count.
bstore->s_stats.requests_failed++;
// All errors in err are potentially retryable as well as certain HTTP response codes...
bool retryable = err.present() || r->code == 500 || r->code == 503;
// But only if our previous attempt was not the last allowable try.
retryable = retryable && (thisTry < maxTries);
TraceEvent event(retryable ? SevWarn : SevWarnAlways, retryable ? "BlobStoreEndpointRequestFailedRetryable" : "BlobStoreEndpointRequestFailed");
event.detail("RemoteEndpoint", address)
.detail("Verb", verb)
.detail("Resource", resource)
.detail("ThisTry", thisTry)
.suppressFor(5, true);
// We will wait delay seconds before the next retry, start with nextRetryDelay.
double delay = nextRetryDelay;
// Double but limit the *next* nextRetryDelay.
nextRetryDelay = std::min(nextRetryDelay * 2, 60.0);
// Attach err to trace event if present, otherwise extract some stuff from the response
if(err.present())
event.error(err.get());
else {
event.detail("ResponseCode", r->code);
// Check for the Retry-After header which is present with certain types of errors
auto iRetryAfter = r->headers.find("Retry-After");
if(iRetryAfter != r->headers.end()) {
event.detail("RetryAfterHeader", iRetryAfter->second);
char *pEnd;
double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd);
if(*pEnd) // If there were other characters then don't trust the parsed value, use a probably safe value of 5 minutes.
retryAfter = 300;
delay = std::max(delay, retryAfter);
}
}
// For retryable errors, log the delay then wait.
if(retryable) {
event.detail("RetryDelay", delay);
Void _ = wait(::delay(delay));
}
else {
// We can't retry, so throw something.
// This error code means the authentication header was not accepted, likely the account or key is wrong.
if(r->code == 406)
throw http_not_accepted();
throw http_request_failed();
}
}
bstore->s_stats.requests_successful++;
return r;
}
Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen) {
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen);
Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes) {
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen, successCodes);
}
ACTOR Future<Void> getBucketContentsStream_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, PromiseStream<BlobStoreEndpoint::ObjectInfo> results) {
@ -442,7 +467,7 @@ ACTOR Future<Void> getBucketContentsStream_impl(Reference<BlobStoreEndpoint> bst
while(more) {
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0));
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0, {200}));
try {
// Parse the json assuming it is valid and contains the right stuff. If any exceptions are thrown, throw http_bad_response
@ -573,12 +598,10 @@ void BlobStoreEndpoint::setAuthHeaders(std::string const &verb, std::string cons
ACTOR Future<std::string> readEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0));
if(r->code == 200)
return r->content;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 404}));
if(r->code == 404)
throw file_not_found();
throw http_bad_response();
return r->content;
}
Future<std::string> BlobStoreEndpoint::readEntireFile(std::string const &bucket, std::string const &object) {
@ -599,16 +622,13 @@ ACTOR Future<Void> writeEntireFileFromBuffer_impl(Reference<BlobStoreEndpoint> b
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200}));
// For uploads, Blobstore returns an MD5 sum of uploaded content so check that too.
auto sum = r->headers.find("Content-MD5");
if(sum == r->headers.end() || sum->second != contentMD5)
throw http_bad_response();
// For uploads, Blobstore returns an MD5 sum of uploaded content so check it.
if(r->headers["Content-MD5"] != contentMD5)
throw checksum_failed();
if(r->code == 200)
return Void();
throw http_bad_response();
return Void();
}
ACTOR Future<Void> writeEntireFile_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object, std::string content) {
@ -649,9 +669,7 @@ ACTOR Future<int> readObject_impl(Reference<BlobStoreEndpoint> bstore, std::stri
std::string resource = std::string("/") + bucket + "/" + object;
HTTP::Headers headers;
headers["Range"] = format("bytes=%lld-%lld", offset, offset + length - 1);
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0));
if(r->code != 200 && r->code != 206)
throw file_not_readable();
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource, headers, NULL, 0, {200, 206}));
if(r->contentLen != r->content.size()) // Double check that this wasn't a header-only response, probably unnecessary
throw io_error();
// Copy the output bytes, server could have sent more or less bytes than requested so copy at most length bytes
@ -666,9 +684,7 @@ Future<int> BlobStoreEndpoint::readObject(std::string const &bucket, std::string
ACTOR static Future<std::string> beginMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, std::string object) {
std::string resource = std::string("/") + bucket + "/" + object + "?uploads";
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0));
if(r->code != 200)
throw file_not_writable();
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, NULL, 0, {200}));
int start = r->content.find("<UploadId>");
if(start == std::string::npos)
throw http_bad_response();
@ -691,16 +707,15 @@ ACTOR Future<std::string> uploadPart_impl(Reference<BlobStoreEndpoint> bstore, s
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen, {200}));
// TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry
// will see error 400. That could be detected and handled gracefully by retrieving the etag for the successful request.
// For uploads, Blobstore returns an MD5 sum of uploaded content so check that too.
auto sum = r->headers.find("Content-MD5");
if(sum == r->headers.end() || sum->second != contentMD5)
throw http_bad_response();
if(r->code != 200)
throw http_bad_response();
// For uploads, Blobstore returns an MD5 sum of uploaded content so check it.
if(r->headers["Content-MD5"] != contentMD5)
throw checksum_failed();
// No etag -> bad response.
std::string etag = r->headers["ETag"];
if(etag.empty())
throw http_bad_response();
@ -724,10 +739,10 @@ ACTOR Future<Void> finishMultiPartUpload_impl(Reference<BlobStoreEndpoint> bstor
HTTP::Headers headers;
PacketWriter pw(part_list.getWriteBuffer(), NULL, Unversioned());
pw.serializeBytes(manifest);
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size()));
if(r->code != 200)
throw http_bad_response();
Reference<HTTP::Response> r = wait(bstore->doRequest("POST", resource, headers, &part_list, manifest.size(), {200}));
// TODO: In the event that the client times out just before the request completes (so the client is unaware) then the next retry
// will see error 400. That could be detected and handled gracefully by HEAD'ing the object before upload to get its (possibly
// nonexistent) eTag, then if an error 400 is seen then retrieve the eTag again and if it has changed then consider the finish complete.
return Void();
}

View File

@ -57,6 +57,7 @@ public:
multipart_min_part_size,
concurrent_uploads,
concurrent_reads_per_file,
concurrent_writes_per_file,
read_block_size,
read_ahead_blocks,
read_cache_blocks_per_file,
@ -77,6 +78,7 @@ public:
"multipart_min_part_size (or minps) Min part size for multipart uploads.",
"concurrent_uploads (or cu) Max concurrent uploads (part or whole) that can be in progress at once.",
"concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.",
"concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.",
"read_block_size (or rbs) Block size in bytes to be used for reads.",
"read_ahead_blocks (or rab) Number of blocks to read ahead of requested offset.",
"read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.",
@ -149,8 +151,8 @@ public:
// Do an HTTP request to the Blob Store, read the response. Handles authentication.
// Every blob store interaction should ultimately go through this function
Future<Reference<HTTP::Response>> doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen);
Future<Reference<HTTP::Response>> doRequest(std::string const &verb, std::string const &resource, const HTTP::Headers &headers, UnsentPacketQueue *pContent, int contentLen, std::set<unsigned int> successCodes);
struct ObjectInfo {
std::string bucket;
std::string name;

View File

@ -192,6 +192,7 @@ public:
std::pair<WorkerInterface, ProcessClass> getMasterWorker( DatabaseConfiguration const& conf, bool checkStable = false ) {
ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
Optional<std::pair<WorkerInterface, ProcessClass>> bestInfo;
bool bestIsClusterController = false;
int numEquivalent = 1;
for( auto& it : id_worker ) {
auto fit = it.second.processClass.machineClassFitness( ProcessClass::Master );
@ -199,12 +200,13 @@ public:
fit = std::max(fit, ProcessClass::WorstFit);
}
if( workerAvailable(it.second, checkStable) && fit != ProcessClass::NeverAssign ) {
if( fit < bestFit ) {
if( fit < bestFit || (fit == bestFit && bestIsClusterController) ) {
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
bestFit = fit;
numEquivalent = 1;
bestIsClusterController = clusterControllerProcessId == it.first;
}
else if( fit != ProcessClass::NeverAssign && fit == bestFit && g_random->random01() < 1.0/++numEquivalent )
else if( fit == bestFit && clusterControllerProcessId != it.first && g_random->random01() < 1.0/++numEquivalent )
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
}
}
@ -563,6 +565,18 @@ public:
return resolverCount > r.resolverCount;
}
bool betterInDatacenterFitness (InDatacenterFitness const& r) const {
int lmax = std::max(resolverFit,proxyFit);
int lmin = std::min(resolverFit,proxyFit);
int rmax = std::max(r.resolverFit,r.proxyFit);
int rmin = std::min(r.resolverFit,r.proxyFit);
if( lmax != rmax ) return lmax < rmax;
if( lmin != rmin ) return lmin < rmin;
return false;
}
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
};
@ -621,6 +635,7 @@ public:
result.storageServers.push_back(storageServers[i].first);
}
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used );
for(int i = 0; i < tlogs.size(); i++)
@ -686,9 +701,14 @@ public:
if(masterWorker == id_worker.end())
return false;
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
ProcessClass::Fitness oldMasterFit = masterWorker->second.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(dbi.master.address())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::WorstFit);
}
ProcessClass::Fitness newMasterFit = getMasterWorker(db.config, true).second.machineClassFitness( ProcessClass::Master );
if(dbi.recoveryState < RecoveryState::FULLY_RECOVERED) {
@ -751,7 +771,8 @@ public:
newInFit = fitness;
}
if(oldInFit < newInFit) return false;
if(oldInFit.betterInDatacenterFitness(newInFit)) return false;
if(oldMasterFit > newMasterFit || oldAcrossFit > newAcrossFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldAcrossFitC", oldAcrossFit.tlogCount).detail("newAcrossFitC", newAcrossFit.tlogCount)
@ -770,6 +791,7 @@ public:
Standalone<RangeResultRef> lastProcessClasses;
bool gotProcessClasses;
Optional<Standalone<StringRef>> masterProcessId;
Optional<Standalone<StringRef>> clusterControllerProcessId;
UID id;
std::vector<RecruitFromConfigurationRequest> outstandingRecruitmentRequests;
std::vector<std::pair<RecruitStorageRequest, double>> outstandingStorageRequests;
@ -1325,6 +1347,10 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace();
if ( w.address() == g_network->getLocalAddress() ) {
self->clusterControllerProcessId = w.locality.processId();
}
// Check process class if needed
if (self->gotProcessClasses && (info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen)) {
auto classIter = self->id_class.find(w.locality.processId());
@ -1663,18 +1689,22 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
registerWorker( req, &self );
}
when( GetWorkersRequest req = waitNext( interf.getWorkers.getFuture() ) ) {
if ( req.flags & GetWorkersRequest::FLAG_TESTER_CLASS ) {
vector<std::pair<WorkerInterface, ProcessClass>> testers;
for(auto& it : self.id_worker)
if (it.second.processClass.classType() == ProcessClass::TesterClass)
testers.push_back(std::make_pair(it.second.interf, it.second.processClass));
req.reply.send( testers );
} else {
vector<std::pair<WorkerInterface, ProcessClass>> workers;
for(auto& it : self.id_worker)
workers.push_back(std::make_pair(it.second.interf, it.second.processClass));
req.reply.send( workers );
vector<std::pair<WorkerInterface, ProcessClass>> workers;
auto masterAddr = self.db.serverInfo->get().master.address();
for(auto& it : self.id_worker) {
if ( (req.flags & GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY) && self.db.config.isExcludedServer(it.second.interf.address()) ) {
continue;
}
if ( (req.flags & GetWorkersRequest::TESTER_CLASS_ONLY) && it.second.processClass.classType() != ProcessClass::TesterClass ) {
continue;
}
workers.push_back(std::make_pair(it.second.interf, it.second.processClass));
}
req.reply.send( workers );
}
when( GetClientWorkersRequest req = waitNext( interf.clientInterface.getClientWorkers.getFuture() ) ) {
vector<ClientWorkerInterface> workers;

View File

@ -130,7 +130,7 @@ struct RegisterWorkerRequest {
};
struct GetWorkersRequest {
enum { FLAG_TESTER_CLASS = 1 };
enum { TESTER_CLASS_ONLY = 0x1, NON_EXCLUDED_PROCESSES_ONLY = 0x2 };
int flags;
ReplyPromise<vector<std::pair<WorkerInterface, ProcessClass>>> reply;

View File

@ -567,7 +567,7 @@ namespace oldTLog {
nextVersion = std::max<Version>(nextVersion, self->persistentDataVersion);
TraceEvent("UpdateStorageVer", self->dbgid).detail("nextVersion", nextVersion).detail("persistentDataVersion", self->persistentDataVersion).detail("totalSize", totalSize);
//TraceEvent("UpdateStorageVer", self->dbgid).detail("nextVersion", nextVersion).detail("persistentDataVersion", self->persistentDataVersion).detail("totalSize", totalSize);
Void _ = wait( self->queueCommittedVersion.whenAtLeast( nextVersion ) );
Void _ = wait( delay(0, TaskUpdateStorage) );

View File

@ -30,10 +30,10 @@
#include "Status.h"
#include "fdbclient/ManagementAPI.h"
ACTOR Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
ACTOR Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0 ) {
loop {
choose {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest() ) ) ) ) {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( flags ) ) ) ) ) {
return w;
}
when( Void _ = wait( dbInfo->onChange() ) ) {}

View File

@ -33,7 +33,7 @@ Future<int64_t> getMaxTLogQueueSize( Database const& cx, Reference<AsyncVar<stru
Future<int64_t> getMaxStorageServerQueueSize( Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const& );
Future<int64_t> getDataDistributionQueueSize( Database const &cx, Reference<AsyncVar<struct ServerDBInfo>> const&, bool const& reportInFlight );
Future<vector<StorageServerInterface>> getStorageServers( Database const& cx, bool const &use_system_priority = false);
Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0 );
Future<WorkerInterface> getMasterWorker( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
//Waits for f to complete. If simulated, disables connection failures after waiting a specified amount of time

View File

@ -904,11 +904,11 @@ static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
return clientStatus;
}
ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, std::string dbName, int workerCount, std::set<std::string> *incomplete_reasons) {
ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, int workerCount, std::set<std::string> *incomplete_reasons) {
state StatusObject message;
try {
Standalone<StringRef> md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(StringRef(dbName+"/MasterRecoveryState") ) ), 1.0) );
Standalone<StringRef> md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = parseInt( extractAttribute(md, LiteralStringRef("StatusCode")) );
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -926,6 +926,8 @@ ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInt
message["required_logs"] = requiredLogs;
message["required_proxies"] = requiredProxies;
message["required_resolvers"] = requiredResolvers;
} else if (mStatusCode == RecoveryStatus::locking_old_transaction_servers) {
message["missing_logs"] = extractAttribute(md, LiteralStringRef("MissingIDs")).c_str();
}
// TODO: time_in_recovery: 0.5
// time_in_state: 0.1
@ -1744,7 +1746,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// construct status information for cluster subsections
state StatusObject recoveryStateStatus = wait(recoveryStateStatusFetcher(mWorker, dbName, workers.size(), &status_incomplete_reasons));
state StatusObject recoveryStateStatus = wait(recoveryStateStatusFetcher(mWorker, workers.size(), &status_incomplete_reasons));
// machine metrics
state WorkerEvents mMetrics = workerEventsVec[0].present() ? workerEventsVec[0].get().first : WorkerEvents();

View File

@ -663,7 +663,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
nextVersion = std::max<Version>(nextVersion, logData->persistentDataVersion);
TraceEvent("UpdateStorageVer", logData->logId).detail("nextVersion", nextVersion).detail("persistentDataVersion", logData->persistentDataVersion).detail("totalSize", totalSize);
//TraceEvent("UpdateStorageVer", logData->logId).detail("nextVersion", nextVersion).detail("persistentDataVersion", logData->persistentDataVersion).detail("totalSize", totalSize);
Void _ = wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) );
Void _ = wait( delay(0, TaskUpdateStorage) );

View File

@ -28,6 +28,7 @@
#include "fdbrpc/simulator.h"
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "RecoveryState.h"
template <class Collection>
void uniquify( Collection& c ) {
@ -648,6 +649,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<TLogLockResult> results;
std::string sServerState;
LocalityGroup unResponsiveSet;
std::string missingServerIds;
double t = timer();
cycles ++;
@ -660,6 +663,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
else {
unResponsiveSet.add(prevState.tLogLocalities[t]);
sServerState += 'f';
if(missingServerIds.size()) {
missingServerIds += ", ";
}
missingServerIds += logServers[t]->get().toString();
}
}
@ -773,22 +780,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
.detail("LogZones", ::describeZones(prevState.tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(prevState.tLogLocalities));
}
}
// Too many failures
else {
TraceEvent("LogSystemWaitingForRecovery", dbgid).detail("Cycles", cycles)
.detail("AvailableServers", results.size())
.detail("TotalServers", logServers.size())
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers.size() - results.size())
.detail("ServerState", sServerState)
.detail("ReplicationFactor", prevState.tLogReplicationFactor)
.detail("AntiQuorum", prevState.tLogWriteAntiQuorum)
.detail("Policy", prevState.tLogPolicy->info())
.detail("TooManyFailures", bTooManyFailures)
.detail("LogZones", ::describeZones(prevState.tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(prevState.tLogLocalities));
} else {
TraceEvent("MasterRecoveryState", dbgid)
.detail("StatusCode", RecoveryStatus::locking_old_transaction_servers)
.detail("Status", RecoveryStatus::names[RecoveryStatus::locking_old_transaction_servers])
.detail("MissingIDs", missingServerIds)
.trackLatest("MasterRecoveryState");
}
// Wait for anything relevant to change

View File

@ -92,6 +92,7 @@
<ActorCompiler Include="workloads\DummyWorkload.actor.cpp" />
<ActorCompiler Include="workloads\BackupCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\AtomicOps.actor.cpp" />
<ActorCompiler Include="workloads\AtomicOpsApiCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\BackupToDBAbort.actor.cpp" />
<ActorCompiler Include="workloads\BackupToDBCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\AtomicSwitchover.actor.cpp" />

View File

@ -221,6 +221,9 @@
<ActorCompiler Include="workloads\AtomicOps.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\AtomicOpsApiCorrectness.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\StatusWorkload.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>

View File

@ -452,7 +452,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", status)
.detail("Status", RecoveryStatus::names[status])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
return Never();
} else
TraceEvent("MasterRecoveryState", self->dbgid)
@ -465,7 +465,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
.detail("RequiredResolvers", 1)
.detail("DesiredResolvers", self->configuration.getDesiredResolvers())
.detail("storeType", self->configuration.storageServerStoreType)
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
RecruitFromConfigurationReply recruits = wait(
brokenPromiseToNever( self->clusterController.recruitFromConfiguration.getReply(
@ -477,7 +477,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
.detail("Proxies", recruits.proxies.size())
.detail("TLogs", recruits.tLogs.size())
.detail("Resolvers", recruits.resolvers.size())
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand new database we are sort of lying that we are
// past the recruitment phase. In a perfect world we would split that up so that the recruitment part happens above (in parallel with recruiting the transaction servers?).
@ -637,7 +637,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::reading_transaction_system_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_transaction_system_state])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
self->hasConfiguration = false;
if(BUGGIFY)
@ -966,7 +966,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::reading_coordinated_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_coordinated_state])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
Value prevDBStateRaw = wait( self->cstate1.read() );
addActor.send( masterTerminateOnConflict( self, self->cstate1.onConflict() ) );
@ -981,7 +981,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("TLogs", self->prevDBState.tLogs.size())
.detail("MyRecoveryCount", self->prevDBState.recoveryCount+2)
.detail("StateSize", prevDBStateRaw.size())
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality);
@ -999,7 +999,8 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::locking_old_transaction_servers)
.detail("Status", RecoveryStatus::names[RecoveryStatus::locking_old_transaction_servers])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.detail("MissingIDs", "")
.trackLatest("MasterRecoveryState");
loop {
Reference<ILogSystem> oldLogSystem = oldLogSystems->get();
@ -1025,7 +1026,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::recovery_transaction)
.detail("Status", RecoveryStatus::names[RecoveryStatus::recovery_transaction])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Recovery transaction
state bool debugResult = debug_checkMinRestoredVersion( UID(), self->lastEpochEnd, "DBRecovery", SevWarn );
@ -1101,7 +1102,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("Status", RecoveryStatus::names[RecoveryStatus::writing_coordinated_state])
.detail("TLogs", self->logSystem->getLogServerCount())
.detail("TLogList", self->logSystem->describe())
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Multiple masters prevent conflicts between themselves via CoordinatedState (self->cstate)
// 1. If SetMaster succeeds, then by CS's contract, these "new" Tlogs are the immediate
@ -1137,7 +1138,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.detail("storeType", self->configuration.storageServerStoreType)
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Now that recovery is complete, we register ourselves with the cluster controller, so that the client and server information
// it hands out can be updated

View File

@ -196,7 +196,7 @@ struct UpdateEagerReadInfo {
// SOMEDAY: Theoretically we can avoid a read if there is an earlier overlapping ClearRange
if (m.type == MutationRef::ClearRange && !m.param2.startsWith(systemKeys.end))
keyBegin.push_back( m.param2 );
else if (m.type == MutationRef::AppendIfFits)
else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) || (m.type == MutationRef::ByteMax))
keys.push_back(pair<KeyRef, int>(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT));
else if (isAtomicOp((MutationRef::Type) m.type))
keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size()));
@ -1461,17 +1461,17 @@ bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, U
}
else if (m.type != MutationRef::SetValue && (m.type)) {
StringRef oldVal;
Optional<StringRef> oldVal;
auto it = data.atLatest().lastLessOrEqual(m.param1);
if (it != data.atLatest().end() && it->isValue() && it.key() == m.param1)
oldVal = it->getValue();
else if (it != data.atLatest().end() && it->isClearTo() && it->getEndKey() > m.param1) {
TEST(true); // Atomic op right after a clear.
oldVal = StringRef();
}
else {
Optional<Value>& oldThing = eager->getValue(m.param1);
oldVal = oldThing.present() ? oldThing.get() : StringRef();
if (oldThing.present())
oldVal = oldThing.get();
}
switch(m.type) {
@ -1496,6 +1496,18 @@ bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, U
case MutationRef::Min:
m.param2 = doMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMin:
m.param2 = doByteMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMax:
m.param2 = doByteMax(oldVal, m.param2, ar);
break;
case MutationRef::MinV2:
m.param2 = doMinV2(oldVal, m.param2, ar);
break;
case MutationRef::AndV2:
m.param2 = doAndV2(oldVal, m.param2, ar);
break;
}
m.type = MutationRef::SetValue;
}

View File

@ -1079,7 +1079,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc,
Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, vector<TestSpec> tests, test_location_t at,
int minTestersExpected, StringRef startingConfiguration, LocalityData locality ) {
state int flags = at == TEST_ON_SERVERS ? 0 : GetWorkersRequest::FLAG_TESTER_CLASS;
state int flags = (at == TEST_ON_SERVERS ? 0 : GetWorkersRequest::TESTER_CLASS_ONLY) | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY;
state Future<Void> testerTimeout = delay(60.0); // wait 60 sec for testers to show up
state vector<std::pair<WorkerInterface, ProcessClass>> workers;

View File

@ -29,6 +29,7 @@
struct AtomicOpsWorkload : TestWorkload {
int opNum, actorCount, nodeCount;
uint32_t opType;
bool apiVersion500 = false;
double testDuration, transactionsPerSecond;
vector<Future<Void>> clients;
@ -41,8 +42,14 @@ struct AtomicOpsWorkload : TestWorkload {
actorCount = getOption( options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5 );
opType = getOption( options, LiteralStringRef("opType"), -1 );
nodeCount = getOption( options, LiteralStringRef("nodeCount"), 1000 );
// Atomic OPs Min and And have modified behavior from api version 510. Hence allowing testing for older version (500) with a 10% probability
// Actual change of api Version happens in setup
apiVersion500 = ((sharedRandomNumber % 10) == 0);
TraceEvent("AtomicOpsApiVersion500").detail("apiVersion500", apiVersion500);
int64_t randNum = sharedRandomNumber / 10;
if(opType == -1)
opType = sharedRandomNumber % 6;
opType = randNum % 8;
switch(opType) {
case 0:
@ -69,6 +76,14 @@ struct AtomicOpsWorkload : TestWorkload {
TEST(true); //Testing atomic Min
opType = MutationRef::Min;
break;
case 6:
TEST(true); //Testing atomic ByteMin
opType = MutationRef::ByteMin;
break;
case 7:
TEST(true); //Testing atomic ByteMax
opType = MutationRef::ByteMax;
break;
default:
ASSERT(false);
}
@ -77,7 +92,10 @@ struct AtomicOpsWorkload : TestWorkload {
virtual std::string description() { return "AtomicOps"; }
virtual Future<Void> setup( Database const& cx ) {
virtual Future<Void> setup( Database const& cx ) {
if (apiVersion500)
cx->cluster->apiVersion = 500;
if(clientId != 0)
return Void();
return _setup( cx, this );

View File

@ -0,0 +1,381 @@
/*
* AtomicOpsApiCorrectness.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/actorcompiler.h"
#include "fdbserver/TesterInterface.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/RunTransaction.actor.h"
#include "workloads.h"
struct AtomicOpsApiCorrectnessWorkload : TestWorkload {
bool testFailed = false;
uint32_t opType;
private:
static int getApiVersion(const Database &cx) {
return cx->cluster->apiVersion;
}
static void setApiVersion(Database *cx, int version) {
(*cx)->cluster->apiVersion = version;
}
Key getTestKey(std::string prefix) {
std::string key = prefix + std::to_string(clientId);
return StringRef(key);
}
public:
AtomicOpsApiCorrectnessWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx)
{
opType = getOption(options, LiteralStringRef("opType"), -1);
}
virtual std::string description() { return "AtomicOpsApiCorrectness"; }
virtual Future<Void> setup(Database const& cx) {
return Void();
}
virtual Future<Void> start(Database const& cx) {
if (opType == -1)
opType = sharedRandomNumber % 8;
switch (opType) {
case 0:
TEST(true); //Testing atomic Min
return testMin(cx->clone(), this);
case 1:
TEST(true); //Testing atomic And
return testAnd(cx->clone(), this);
case 2:
TEST(true); //Testing atomic ByteMin
return testByteMin(cx->clone(), this);
case 3:
TEST(true); //Testing atomic ByteMax
return testByteMax(cx->clone(), this);
case 4:
TEST(true); //Testing atomic Or
return testOr(cx->clone(), this);
case 5:
TEST(true); //Testing atomic Max
return testMax(cx->clone(), this);
case 6:
TEST(true); //Testing atomic Xor
return testXor(cx->clone(), this);
case 7:
TEST(true); //Testing atomic Add
return testAdd(cx->clone(), this);
default:
ASSERT(false);
}
return Void();
}
virtual Future<bool> check(Database const& cx) {
return !testFailed;
}
virtual void getMetrics(vector<PerfMetric>& m) {
}
// Test Atomic ops on non existing keys that results in a set
ACTOR Future<Void> testAtomicOpSetOnNonExistingKey(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key) {
state uint64_t intValue = g_random->randomInt(0, 10000000);
state Value val = StringRef((const uint8_t*)&intValue, sizeof(intValue));
// Do operation on Storage Server
loop{
try {
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->clear(key); return Void(); }));
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, val, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != intValue) {
TraceEvent(SevError, "AtomicOpSetOnNonExistingKeyUnexpectedOutput").detail("opOn", "StorageServer").detail("op", opType).detail("ExpectedOutput", intValue).detail("ActualOutput", output);
self->testFailed = true;
}
// Do operation on RYW Layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->clear(key); tr->atomicOp(key, val, opType); return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != intValue) {
TraceEvent(SevError, "AtomicOpSetOnNonExistingKeyUnexpectedOutput").detail("opOn", "RYWLayer").detail("op", opType).detail("ExpectedOutput", intValue).detail("ActualOutput", output);
self->testFailed = true;
}
return Void();
}
// Test Atomic ops on non existing keys that results in a unset
ACTOR Future<Void> testAtomicOpUnsetOnNonExistingKey(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key) {
state uint64_t intValue = g_random->randomInt(0, 10000000);
state Value val = StringRef((const uint8_t*)&intValue, sizeof(intValue));
// Do operation on Storage Server
loop {
try {
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->clear(key); return Void(); }));
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, val, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != 0) {
TraceEvent(SevError, "AtomicOpUnsetOnNonExistingKeyUnexpectedOutput").detail("opOn", "StorageServer").detail("op", opType).detail("ExpectedOutput", 0).detail("ActualOutput", output);
self->testFailed = true;
}
// Do operation on RYW Layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->clear(key); tr->atomicOp(key, val, opType); return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != 0) {
TraceEvent(SevError, "AtomicOpUnsetOnNonExistingKeyUnexpectedOutput").detail("opOn", "RYWLayer").detail("op", opType).detail("ExpectedOutput", 0).detail("ActualOutput", output);
self->testFailed = true;
}
return Void();
}
typedef std::function<Value(Value, Value)> DoAtomicOpOnEmptyValueFunction;
// Test Atomic Ops when one of the value is empty
ACTOR Future<Void> testAtomicOpOnEmptyValue(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key, DoAtomicOpOnEmptyValueFunction opFunc) {
state Value existingVal;
state Value otherVal;
state uint64_t val = g_random->randomInt(0, 10000000);
if (g_random->random01() < 0.5) {
existingVal = StringRef((const uint8_t*)&val, sizeof(val));
otherVal = StringRef();
}
else {
otherVal = StringRef((const uint8_t*)&val, sizeof(val));
existingVal = StringRef();
}
// Do operation on Storage Server
loop{
try {
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->set(key, existingVal); return Void(); }));
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, otherVal, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
ASSERT(outputVal.present());
Value output = outputVal.get();
if (output != opFunc(existingVal, otherVal)) {
TraceEvent(SevError, "AtomicOpOnEmptyValueUnexpectedOutput").detail("opOn", "StorageServer").detail("op", opType).detail("ExpectedOutput", opFunc(existingVal, otherVal).toString()).detail("ActualOutput", output.toString());
self->testFailed = true;
}
// Do operation on RYW Layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->set(key, existingVal); tr->atomicOp(key, otherVal, opType); return tr->get(key); }));
ASSERT(outputVal.present());
Value output = outputVal.get();
if (output != opFunc(existingVal, otherVal)) {
TraceEvent(SevError, "AtomicOpOnEmptyValueUnexpectedOutput").detail("opOn", "RYWLayer").detail("op", opType).detail("ExpectedOutput", opFunc(existingVal, otherVal).toString()).detail("ActualOutput", output.toString());
self->testFailed = true;
}
return Void();
}
typedef std::function<uint64_t(uint64_t, uint64_t)> DoAtomicOpFunction;
// Test atomic ops in the normal case when the existing value is present
ACTOR Future<Void> testAtomicOpApi(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key, DoAtomicOpFunction opFunc) {
state uint64_t intValue1 = g_random->randomInt(0, 10000000);
state uint64_t intValue2 = g_random->randomInt(0, 10000000);
state Value val1 = StringRef((const uint8_t*)&intValue1, sizeof(intValue1));
state Value val2 = StringRef((const uint8_t *)&intValue2, sizeof(intValue2));
// Do operation on Storage Server
loop{
try {
// Set the key to a random value
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->set(key, val1); return Void(); }));
// Do atomic op
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, val2, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
// Compare result
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != opFunc(intValue1, intValue2)) {
TraceEvent(SevError, "AtomicOpApiCorrectnessUnexpectedOutput").detail("opOn", "StorageServer").detail("InValue1", intValue1).detail("InValue2", intValue2).detail("AtomicOp", opType).detail("ExpectedOutput", opFunc(intValue1, intValue2)).detail("ActualOutput", output);
self->testFailed = true;
}
// Do operation at RYW layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->set(key, val1); tr->atomicOp(key, val2, opType); return tr->get(key); }));
// Compare result
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != opFunc(intValue1, intValue2)) {
TraceEvent(SevError, "AtomicOpApiCorrectnessUnexpectedOutput").detail("opOn", "RYWLayer").detail("InValue1", intValue1).detail("InValue2", intValue2).detail("AtomicOp", opType).detail("ExpectedOutput", opFunc(intValue1, intValue2)).detail("ActualOutput", output);
self->testFailed = true;
}
return Void();
}
ACTOR Future<Void> testMin(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state int currentApiVersion = getApiVersion(cx);
state Key key = self->getTestKey("test_key_min_");
TraceEvent("AtomicOpCorrectnessApiWorkload").detail("opType", "MIN");
// API Version 500
setApiVersion(&cx, 500);
TraceEvent(SevInfo, "Running Atomic Op Min Correctness Test Api Version 500");
Void _ = wait(self->testAtomicOpUnsetOnNonExistingKey(cx, self, MutationRef::Min, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Min, key, [](uint64_t val1, uint64_t val2) { return val1 < val2 ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Min, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
// Current API Version
setApiVersion(&cx, currentApiVersion);
TraceEvent(SevInfo, "Running Atomic Op Min Correctness Current Api Version").detail("Version", currentApiVersion);
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Min, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Min, key, [](uint64_t val1, uint64_t val2) { return val1 < val2 ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Min, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
return Void();
}
ACTOR Future<Void> testMax(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_max_");
TraceEvent(SevInfo, "Running Atomic Op MAX Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Max, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Max, key, [](uint64_t val1, uint64_t val2) { return val1 > val2 ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Max, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testAnd(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state int currentApiVersion = getApiVersion(cx);
state Key key = self->getTestKey("test_key_and_");
TraceEvent("AtomicOpCorrectnessApiWorkload").detail("opType", "AND");
// API Version 500
setApiVersion(&cx, 500);
TraceEvent(SevInfo, "Running Atomic Op AND Correctness Test Api Version 500");
Void _ = wait(self->testAtomicOpUnsetOnNonExistingKey(cx, self, MutationRef::And, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::And, key, [](uint64_t val1, uint64_t val2) { return val1 & val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::And, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
// Current API Version
setApiVersion(&cx, currentApiVersion);
TraceEvent(SevInfo, "Running Atomic Op AND Correctness Current Api Version").detail("Version", currentApiVersion);
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::And, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::And, key, [](uint64_t val1, uint64_t val2) { return val1 & val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::And, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
return Void();
}
ACTOR Future<Void> testOr(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_or_");
TraceEvent(SevInfo, "Running Atomic Op OR Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Or, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Or, key, [](uint64_t val1, uint64_t val2) { return val1 | val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Or, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testXor(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_xor_");
TraceEvent(SevInfo, "Running Atomic Op XOR Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Xor, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Xor, key, [](uint64_t val1, uint64_t val2) { return val1 ^ val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Xor, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testAdd(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_add_");
TraceEvent(SevInfo, "Running Atomic Op ADD Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::AddValue, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::AddValue, key, [](uint64_t val1, uint64_t val2) { return val1 + val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::AddValue, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testByteMin(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_byte_min_");
TraceEvent(SevInfo, "Running Atomic Op BYTE_MIN Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::ByteMin, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::ByteMin, key, [](uint64_t val1, uint64_t val2) { return StringRef((const uint8_t *)&val1, sizeof(val1)) < StringRef((const uint8_t *)&val2, sizeof(val2)) ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::ByteMin, key, [](Value v1, Value v2) -> Value { return StringRef(); }));
return Void();
}
ACTOR Future<Void> testByteMax(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_byte_max_");
TraceEvent(SevInfo, "Running Atomic Op BYTE_MAX Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::ByteMax, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::ByteMax, key, [](uint64_t val1, uint64_t val2) { return StringRef((const uint8_t *)&val1, sizeof(val1)) > StringRef((const uint8_t *)&val2, sizeof(val2)) ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::ByteMax, key, [](Value v1, Value v2) -> Value { return v1.size() ? v1 : v2; }));
return Void();
}
};
WorkloadFactory<AtomicOpsApiCorrectnessWorkload> AtomicOpsApiCorrectnessWorkloadFactory("AtomicOpsApiCorrectness");

View File

@ -241,11 +241,10 @@ struct ConsistencyCheckWorkload : TestWorkload
bool hasStorage = wait( self->checkForStorage(cx, configuration, self) );
bool hasExtraStores = wait( self->checkForExtraDataStores(cx, self) );
//SOMEDAY: enable this check when support for background reassigning server type is supported
//Check that each machine is operating as its desired class
/*bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
if(!usingDesiredClasses)
self->testFailure("Cluster has machine(s) not using requested classes");*/
self->testFailure("Cluster has machine(s) not using requested classes");
bool workerListCorrect = wait( self->checkWorkerList(cx, self) );
if(!workerListCorrect)
@ -1176,32 +1175,53 @@ struct ConsistencyCheckWorkload : TestWorkload
return true;
}
static ProcessClass::Fitness getBestAvailableFitness(std::set<ProcessClass::ClassType>& availableClassTypes, ProcessClass::ClusterRole role) {
ProcessClass::Fitness bestAvailableFitness = ProcessClass::NeverAssign;
for (auto classType : availableClassTypes) {
bestAvailableFitness = std::min(bestAvailableFitness, ProcessClass(classType, ProcessClass::InvalidSource).machineClassFitness(role));
}
return bestAvailableFitness;
}
//Returns true if all machines in the cluster that specified a desired class are operating in that class
ACTOR Future<bool> checkUsingDesiredClasses(Database cx, ConsistencyCheckWorkload *self)
{
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) );
state vector<StorageServerInterface> storageServers = wait( getStorageServers( cx ) );
auto& db = self->dbInfo->get();
//Check master server
if(!self->workerHasClass(workers, db.master.address(), ProcessClass::ResolutionClass, "Master"))
std::set<ProcessClass::ClassType> availableClassTypes;
std::map<NetworkAddress, ProcessClass> workerProcessMap;
for (auto worker : workers) {
availableClassTypes.insert(worker.second.classType());
workerProcessMap[worker.first.address()] = worker.second;
}
// Check master
ProcessClass::Fitness bestMasterFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Master);
if (!workerProcessMap.count(db.master.address()) || workerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
return false;
}
//Check master proxies
for(int i = 0; i < db.client.proxies.size(); i++)
if(!self->workerHasClass(workers, db.client.proxies[i].address(), ProcessClass::TransactionClass, "MasterProxy"))
// Check master proxy
ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Proxy);
for (auto masterProxy : db.client.proxies) {
if (!workerProcessMap.count(masterProxy.address()) || workerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
return false;
}
}
//Check storage servers
for(int i = 0; i < storageServers.size(); i++)
if(!self->workerHasClass(workers, storageServers[i].address(), ProcessClass::StorageClass, "StorageServer"))
// Check master resolver
ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Resolver);
for (auto resolver : db.resolvers) {
if (!workerProcessMap.count(resolver.address()) || workerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
return false;
}
}
//Check tlogs
std::vector<TLogInterface> logs = db.logSystemConfig.allPresentLogs();
for(int i = 0; i < logs.size(); i++)
if(!self->workerHasClass(workers, logs[i].address(), ProcessClass::TransactionClass, "TLog"))
return false;
// TODO: Check Tlog and cluster controller
return true;
}

View File

@ -107,7 +107,7 @@ struct PerformanceWorkload : TestWorkload {
loop {
choose {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( self->dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( GetWorkersRequest::FLAG_TESTER_CLASS ) ) ) ) ) {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( self->dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( GetWorkersRequest::TESTER_CLASS_ONLY | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) ) ) ) ) {
workers = w;
break;
}

View File

@ -138,7 +138,7 @@ struct RandomSelectorWorkload : TestWorkload {
try {
for(i = 0; i < g_random->randomInt(self->minOperationsPerTransaction,self->maxOperationsPerTransaction+1); i++) {
j = g_random->randomInt(0,13);
j = g_random->randomInt(0,15);
if( j < 3 ) {
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
@ -377,6 +377,54 @@ struct RandomSelectorWorkload : TestWorkload {
}
}
}
else if (j < 13) {
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
myValue = format("%d", g_random->randomInt(0, 10000000));
//TraceEvent("RYOWbytemin").detail("Key",myKeyA).detail("Value", myValue);
trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::ByteMin);
loop{
try {
tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());
tr.atomicOp(StringRef(clientID + "d/" + myKeyA), myValue, MutationRef::ByteMin);
Void _ = wait(tr.commit());
break;
}
catch (Error& e) {
error = e;
Void _ = wait(tr.onError(e));
if (error.code() == error_code_commit_unknown_result) {
Optional<Value> thing = wait(tr.get(StringRef(clientID + "z/" + myRandomIDKey)));
if (thing.present()) break;
}
}
}
}
else if (j < 14) {
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
myValue = format("%d", g_random->randomInt(0, 10000000));
//TraceEvent("RYOWbytemax").detail("Key",myKeyA).detail("Value", myValue);
trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::ByteMax);
loop{
try {
tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());
tr.atomicOp(StringRef(clientID + "d/" + myKeyA), myValue, MutationRef::ByteMax);
Void _ = wait(tr.commit());
break;
}
catch (Error& e) {
error = e;
Void _ = wait(tr.onError(e));
if (error.code() == error_code_commit_unknown_result) {
Optional<Value> thing = wait(tr.get(StringRef(clientID + "z/" + myRandomIDKey)));
if (thing.present()) break;
}
}
}
}
else {
int a = g_random->randomInt( 1, self->maxKeySpace+1 );
int b = g_random->randomInt( 1, self->maxKeySpace+1 );

View File

@ -171,7 +171,7 @@ struct SerializabilityWorkload : TestWorkload {
Key key = getRandomKey();
Value value = getRandomValue();
MutationRef::Type opType;
switch( g_random->randomInt(0,6) ) {
switch( g_random->randomInt(0,8) ) {
case 0:
opType = MutationRef::AddValue;
break;
@ -190,6 +190,12 @@ struct SerializabilityWorkload : TestWorkload {
case 5:
opType = MutationRef::Min;
break;
case 6:
opType = MutationRef::ByteMin;
break;
case 7:
opType = MutationRef::ByteMax;
break;
}
op.mutationOp = MutationRef(opType, key, value);
hasMutation = true;

Some files were not shown because too many files have changed in this diff Show More