Merge pull request #183 from cie/add-new-atomic-ops

Add new atomic ops
This commit is contained in:
Balachandar Namasivayam 2017-10-25 18:03:52 -07:00 committed by GitHub Enterprise
commit ef307427e9
88 changed files with 735 additions and 163 deletions

View File

@ -25,7 +25,7 @@ sys.path[:0]=[os.path.join(os.path.dirname(__file__), '..', '..', 'bindings', 'p
import util
FDB_API_VERSION = 500
FDB_API_VERSION = 510
LOGGING = {
'version' : 1,

View File

@ -133,7 +133,7 @@ def choose_api_version(selected_api_version, tester_min_version, tester_max_vers
elif random.random() < 0.7:
api_version = min_version
elif random.random() < 0.9:
api_version = random.choice([v for v in [13, 14, 16, 21, 22, 23, 100, 200, 300, 400, 410, 420, 430, 440, 450, 460, 500] if v >= min_version and v <= max_version])
api_version = random.choice([v for v in [13, 14, 16, 21, 22, 23, 100, 200, 300, 400, 410, 420, 430, 440, 450, 460, 500, 510] if v >= min_version and v <= max_version])
else:
api_version = random.randint(min_version, max_version)

View File

@ -20,7 +20,7 @@
import os
MAX_API_VERSION = 500
MAX_API_VERSION = 510
class Tester:
def __init__(self, name, cmd, max_int_bits=64, min_api_version=0, max_api_version=MAX_API_VERSION, threads_enabled=True):

View File

@ -164,7 +164,7 @@ class ApiTest(Test):
op_choices += write_conflicts
op_choices += resets
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN']
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX']
atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR']
if args.concurrency > 1:

View File

@ -32,7 +32,7 @@ fdb.api_version(FDB_API_VERSION)
# SOMEDAY: This should probably be broken up into smaller tests
class ScriptedTest(Test):
TEST_API_VERSION = 500
TEST_API_VERSION = 510
def __init__(self, subspace):
super(ScriptedTest, self).__init__(subspace, ScriptedTest.TEST_API_VERSION, ScriptedTest.TEST_API_VERSION)

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include "fdbclient/MultiVersionTransaction.h"
#include "foundationdb/fdb_c.h"

View File

@ -28,10 +28,10 @@
#endif
#if !defined(FDB_API_VERSION)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 500)
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 510)
#elif FDB_API_VERSION < 13
#error API version no longer supported (upgrade to 13)
#elif FDB_API_VERSION > 500
#elif FDB_API_VERSION > 510
#error Requested API version requires a newer version of this header
#endif

View File

@ -602,7 +602,7 @@ void runTests(struct ResultSet *rs) {
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(500), "select API version", rs);
checkError(fdb_select_api_version(510), "select API version", rs);
printf("Running performance test at client version: %s\n", fdb_get_client_version());
valueStr = (uint8_t*)malloc((sizeof(uint8_t))*valueSize);

View File

@ -243,7 +243,7 @@ void runTests(struct ResultSet *rs) {
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(500), "select API version", rs);
checkError(fdb_select_api_version(510), "select API version", rs);
printf("Running RYW Benchmark test at client version: %s\n", fdb_get_client_version());
keys = generateKeys(numKeys, keySize);

View File

@ -27,7 +27,7 @@
#include <pthread.h>
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#endif
#include <foundationdb/fdb_c.h>

View File

@ -33,7 +33,7 @@ THREAD_FUNC networkThread(void* fdb) {
}
ACTOR Future<Void> _test() {
API *fdb = FDB::API::selectAPIVersion(500);
API *fdb = FDB::API::selectAPIVersion(510);
auto c = fdb->createCluster( std::string() );
auto db = c->createDatabase();
state Reference<Transaction> tr( new Transaction(db) );
@ -77,7 +77,7 @@ ACTOR Future<Void> _test() {
}
void fdb_flow_test() {
API *fdb = FDB::API::selectAPIVersion(500);
API *fdb = FDB::API::selectAPIVersion(510);
fdb->setupNetwork();
startThread(networkThread, fdb);

View File

@ -23,7 +23,7 @@
#include <flow/flow.h>
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <bindings/c/foundationdb/fdb_c.h>
#undef DLLEXPORT

View File

@ -1631,6 +1631,8 @@ void populateAtomicOpMap() {
optionInfo["MIN"] = FDBMutationType::FDB_MUTATION_TYPE_MIN;
optionInfo["SET_VERSIONSTAMPED_KEY"] = FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_KEY;
optionInfo["SET_VERSIONSTAMPED_VALUE"] = FDBMutationType::FDB_MUTATION_TYPE_SET_VERSIONSTAMPED_VALUE;
optionInfo["BYTE_MIN"] = FDBMutationType::FDB_MUTATION_TYPE_BYTE_MIN;
optionInfo["BYTE_MAX"] = FDBMutationType::FDB_MUTATION_TYPE_BYTE_MAX;
}
void populateOpsThatCreateDirectories() {
@ -1685,7 +1687,7 @@ ACTOR void _test_versionstamp() {
try {
g_network = newNet2(NetworkAddress(), false);
API *fdb = FDB::API::selectAPIVersion(500);
API *fdb = FDB::API::selectAPIVersion(510);
fdb->setupNetwork();
startThread(networkThread, fdb);

View File

@ -8,7 +8,7 @@ This package requires:
- Go 1.1+ with CGO enabled
- FoundationDB C API 2.0.x, 3.0.x, or 4.x.y (part of the [FoundationDB clients package](https://files.foundationdb.org/fdb-c/))
Use of this package requires the selection of a FoundationDB API version at runtime. This package currently supports FoundationDB API versions 200-500.
Use of this package requires the selection of a FoundationDB API version at runtime. This package currently supports FoundationDB API versions 200-510.
To build this package, in the top level of this repository run:

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
#include <stdlib.h>
*/
@ -109,7 +109,7 @@ func (opt NetworkOptions) setOpt(code int, param []byte) error {
// library, an error will be returned. APIVersion must be called prior to any
// other functions in the fdb package.
//
// Currently, this package supports API versions 200 through 500.
// Currently, this package supports API versions 200 through 510.
//
// Warning: When using the multi-version client API, setting an API version that
// is not supported by a particular client library will prevent that client from
@ -117,7 +117,7 @@ func (opt NetworkOptions) setOpt(code int, param []byte) error {
// the API version of your application after upgrading your client until the
// cluster has also been upgraded.
func APIVersion(version int) error {
headerVersion := 500
headerVersion := 510
networkMutex.Lock()
defer networkMutex.Unlock()
@ -129,7 +129,7 @@ func APIVersion(version int) error {
return errAPIVersionAlreadySet
}
if version < 200 || version > 500 {
if version < 200 || version > 510 {
return errAPIVersionNotSupported
}

View File

@ -24,7 +24,7 @@ package fdb
/*
#cgo LDFLAGS: -lfdb_c -lm
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
#include <string.h>

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -23,7 +23,7 @@
package fdb
/*
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>
*/
import "C"

View File

@ -21,7 +21,7 @@
#include <jni.h>
#include <string.h>
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#include <foundationdb/fdb_c.h>

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ThreadFactory;
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
* with incorrect assumptions from the current version. The API version documented here is version
* {@code 500}.<br><br>
* {@code 510}.<br><br>
* FoundationDB encapsulates multiple versions of its interface by requiring
* the client to explicitly specify the version of the API it uses. The purpose
* of this design is to allow you to upgrade the server, client libraries, or
@ -157,8 +157,8 @@ public class FDB {
}
if(version < 500)
throw new IllegalArgumentException("API version not supported (minimum 500)");
if(version > 500)
throw new IllegalArgumentException("API version not supported (maximum 500)");
if(version > 510)
throw new IllegalArgumentException("API version not supported (maximum 510)");
Select_API_version(version);
return singleton = new FDB(version);
}

View File

@ -25,7 +25,7 @@ and add it to your classpath.<br>
<h3>Getting started</h3>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports only version {@code 500}).
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 500} and {@code 510}).
With this API object you can then open {@link com.apple.foundationdb.Cluster Cluster}s and
{@link com.apple.foundationdb.Database Database}s and start using
{@link com.apple.foundationdb.Transaction Transactions}s.
@ -41,7 +41,7 @@ import Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -27,7 +27,7 @@ import java.nio.charset.Charset;
import java.util.Random;
public abstract class AbstractTester {
public static final int API_VERSION = 500;
public static final int API_VERSION = 510;
protected static final int NUM_RUNS = 25;
protected static final Charset ASCII = Charset.forName("ASCII");

View File

@ -33,7 +33,7 @@ public class BlockingBenchmark {
private static final int PARALLEL = 100;
public static void main(String[] args) throws InterruptedException {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
// The cluster file DOES NOT need to be valid, although it must exist.
// This is because the database is never really contacted in this test.

View File

@ -51,7 +51,7 @@ public class ConcurrentGetSetGet {
}
public static void main(String[] args) {
Database database = FDB.selectAPIVersion(500).open();
Database database = FDB.selectAPIVersion(510).open();
new ConcurrentGetSetGet().apply(database);
}

View File

@ -39,7 +39,7 @@ public class DirectoryTest {
public static void main(String[] args) throws Exception {
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(db);
} catch(Throwable t) {

View File

@ -30,7 +30,7 @@ import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -36,7 +36,7 @@ public class IterableTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster cluster = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster cluster = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = cluster.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
public class LocalityTests {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
{
Transaction tr = database.createTransaction();

View File

@ -45,7 +45,7 @@ public class ParallelRandomScan {
private static final int PARALLELISM_STEP = 5;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.async.AsyncIterable;
public class RangeTest {
private static final int API_VERSION = 500;
private static final int API_VERSION = 510;
public static void main(String[] args) {
System.out.println("About to use version " + API_VERSION);

View File

@ -34,7 +34,7 @@ public class SerialInsertion {
private static final int NODES = 1000000;
public static void main(String[] args) {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
long start = System.currentTimeMillis();

View File

@ -39,7 +39,7 @@ public class SerialIteration {
private static final int THREAD_COUNT = 1;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = 1; i <= THREAD_COUNT; i++) {

View File

@ -36,7 +36,7 @@ public class SerialTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);

View File

@ -35,7 +35,7 @@ public class TupleTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.Transaction;
public class WatchTest {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
database.options().setLocationCacheSize(42);
Transaction tr = database.createTransaction();

View File

@ -34,7 +34,7 @@ import java.util.concurrent.ThreadFactory;
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
* with incorrect assumptions from the current version. The API version documented here is version
* {@code 500}.<br><br>
* {@code 510}.<br><br>
* FoundationDB encapsulates multiple versions of its interface by requiring
* the client to explicitly specify the version of the API it uses. The purpose
* of this design is to allow you to upgrade the server, client libraries, or
@ -157,8 +157,8 @@ public class FDB {
}
if(version < 500)
throw new IllegalArgumentException("API version not supported (minimum 500)");
if(version > 500)
throw new IllegalArgumentException("API version not supported (maximum 500)");
if(version > 510)
throw new IllegalArgumentException("API version not supported (maximum 510)");
Select_API_version(version);
return singleton = new FDB(version);
}

View File

@ -25,7 +25,7 @@ and add it to your classpath.<br>
<h3>Getting started</h3>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports only version {@code 500}).
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 500} and {@code 510}).
With this API object you can then open {@link com.apple.foundationdb.Cluster}s and
{@link com.apple.foundationdb.Database}s and start using
{@link com.apple.foundationdb.Transaction}s. Here we give an example. The example relies on a
@ -41,7 +41,7 @@ import Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -37,7 +37,7 @@ public class AsListTest {
* a bug made the the addition of the clear into the result returning 0 items.
*/
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
database.options().setLocationCacheSize(42);
Transaction tr = database.createTransaction();

View File

@ -33,7 +33,7 @@ public class BlockingBenchmark {
private static final int PARALLEL = 100;
public static void main(String[] args) throws InterruptedException {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
// The cluster file DOES NOT need to be valid, although it must exist.
// This is because the database is never really contacted in this test.

View File

@ -52,7 +52,7 @@ public class ConcurrentGetSetGet {
}
public static void main(String[] args) {
Database database = FDB.selectAPIVersion(500).open();
Database database = FDB.selectAPIVersion(510).open();
new ConcurrentGetSetGet().apply(database);
}

View File

@ -38,7 +38,7 @@ public class DirectoryTest {
public static void main(String[] args) throws Exception {
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(db);
} catch(Throwable t) {

View File

@ -29,7 +29,7 @@ import com.apple.foundationdb.tuple.Tuple;
public class Example {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
// Run an operation on the database

View File

@ -36,7 +36,7 @@ public class IterableTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster cluster = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster cluster = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = cluster.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.tuple.ByteArrayUtil;
public class LocalityTests {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
{

View File

@ -44,7 +44,7 @@ public class ParallelRandomScan {
private static final int PARALLELISM_STEP = 5;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = PARALLELISM_MIN; i <= PARALLELISM_MAX; i += PARALLELISM_STEP) {

View File

@ -33,7 +33,7 @@ import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.Function;
public class RangeTest {
private static final int API_VERSION = 500;
private static final int API_VERSION = 510;
public static void main(String[] args) throws Exception {
System.out.println("About to use version " + API_VERSION);

View File

@ -34,7 +34,7 @@ public class SerialInsertion {
private static final int NODES = 1000000;
public static void main(String[] args) {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open("T:\\circus\\tags\\RebarCluster-bbc\\cluster_id.txt");
long start = System.currentTimeMillis();
@ -94,4 +94,4 @@ public class SerialInsertion {
}
}
}
}

View File

@ -39,7 +39,7 @@ public class SerialIteration {
private static final int THREAD_COUNT = 1;
public static void main(String[] args) throws InterruptedException {
FDB api = FDB.selectAPIVersion(500);
FDB api = FDB.selectAPIVersion(510);
Database database = api.open(args[0]);
for(int i = 1; i <= THREAD_COUNT; i++) {

View File

@ -36,7 +36,7 @@ public class SerialTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);

View File

@ -31,7 +31,7 @@ public class TestApp {
public static void main(String[] args) throws Exception {
try {
Cluster cluster = FDB.selectAPIVersion(500).createCluster("C:\\Users\\Ben\\workspace\\fdb\\fdb.cluster");
Cluster cluster = FDB.selectAPIVersion(510).createCluster("C:\\Users\\Ben\\workspace\\fdb\\fdb.cluster");
System.out.println("I now have the cluster");
Database db = cluster.openDatabase();

View File

@ -34,7 +34,7 @@ public class TupleTest {
public static void main(String[] args) throws InterruptedException {
final int reps = 1000;
try {
Cluster c = FDB.selectAPIVersion(500).createCluster(CLUSTER_FILE);
Cluster c = FDB.selectAPIVersion(510).createCluster(CLUSTER_FILE);
Database db = c.openDatabase();
runTests(reps, db);
} catch(Throwable t) {

View File

@ -43,7 +43,7 @@ public class VersionstampExample {
}
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database db = fdb.open();
Transaction tr = db.createTransaction();
tr.mutate(MutationType.SET_VERSIONSTAMPED_VALUE, "foo".getBytes(), "blahblahbl".getBytes());

View File

@ -34,7 +34,7 @@ import com.apple.foundationdb.async.Future;
public class WatchTest {
public static void main(String[] args) {
FDB fdb = FDB.selectAPIVersion(500);
FDB fdb = FDB.selectAPIVersion(510);
Database database = fdb.open(args[0]);
database.options().setLocationCacheSize(42);
Transaction tr = database.createTransaction();

View File

@ -43,8 +43,8 @@ module.exports = {
throw new Error('Cannot select multiple different FDB API versions');
if(version < 500)
throw new RangeError('FDB API versions before 500 are not supported');
if(version > 500)
throw new RangeError('Latest known FDB API version is 500');
if(version > 510)
throw new RangeError('Latest known FDB API version is 510');
if(!selectedApiVersion.value) {
fdb.apiVersion(version);

View File

@ -22,6 +22,6 @@
#ifndef FDB_NODE_VERSION_H
#define FDB_NODE_VERSION_H
#define FDB_API_VERSION 500
#define FDB_API_VERSION 510
#endif

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
var fdb = require('../lib/fdb.js').apiVersion(500);
var fdb = require('../lib/fdb.js').apiVersion(510);
var fdbModule = require('../lib/fdbModule.js');
console.log(fdb.tuple.pack([-Math.pow(2,53)]));

View File

@ -36,7 +36,7 @@ def _add_symbols(module, symbols):
globals()[symbol] = getattr(module, symbol)
def api_version(ver):
header_version = 500
header_version = 510
if '_version' in globals():
if globals()['_version'] != ver:

View File

@ -26,7 +26,7 @@
module FDB
@@chosen_version = -1
def self.api_version(version)
header_version = 500
header_version = 510
if @@chosen_version >= 0
if@@chosen_version != version
raise "FDB API already loaded at version #{@@chosen_version}."

View File

@ -24,7 +24,8 @@
#include "CommitTransaction.h"
static ValueRef doLittleEndianAdd(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doLittleEndianAdd(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return otherOperand;
@ -46,7 +47,8 @@ static ValueRef doLittleEndianAdd(const ValueRef& existingValue, const ValueRef&
return StringRef(buf, i);
}
static ValueRef doAnd(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doAnd(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!otherOperand.size()) return otherOperand;
uint8_t* buf = new (ar) uint8_t [otherOperand.size()];
@ -60,7 +62,15 @@ static ValueRef doAnd(const ValueRef& existingValue, const ValueRef& otherOperan
return StringRef(buf, i);
}
static ValueRef doOr(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doAndV2(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present())
return otherOperand;
return doAnd(existingValueOptional, otherOperand, ar);
}
static ValueRef doOr(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return otherOperand;
@ -75,7 +85,8 @@ static ValueRef doOr(const ValueRef& existingValue, const ValueRef& otherOperand
return StringRef(buf, i);
}
static ValueRef doXor(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doXor(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return otherOperand;
@ -91,7 +102,8 @@ static ValueRef doXor(const ValueRef& existingValue, const ValueRef& otherOperan
return StringRef(buf, i);
}
static ValueRef doAppendIfFits(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doAppendIfFits(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if(!existingValue.size()) return otherOperand;
if(!otherOperand.size()) return existingValue;
if(existingValue.size() + otherOperand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT) {
@ -111,7 +123,8 @@ static ValueRef doAppendIfFits(const ValueRef& existingValue, const ValueRef& ot
return StringRef(buf, i+j);
}
static ValueRef doMax(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doMax(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
if (!existingValue.size()) return otherOperand;
if (!otherOperand.size()) return otherOperand;
@ -142,9 +155,20 @@ static ValueRef doMax(const ValueRef& existingValue, const ValueRef& otherOperan
return otherOperand;
}
static ValueRef doMin(const ValueRef& existingValue, const ValueRef& otherOperand, Arena& ar) {
static ValueRef doByteMax(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present()) return otherOperand;
const ValueRef& existingValue = existingValueOptional.get();
if (existingValue > otherOperand)
return existingValue;
return otherOperand;
}
static ValueRef doMin(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!otherOperand.size()) return otherOperand;
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
int i,j;
for (i = otherOperand.size() - 1; i >= existingValue.size(); i--) {
@ -179,34 +203,22 @@ static ValueRef doMin(const ValueRef& existingValue, const ValueRef& otherOperan
return otherOperand;
}
static ValueRef doAtomicOp(const ValueRef& existingValue, const ValueRef& otherOperand, MutationRef::Type mutationType, Arena& ar) {
switch(mutationType) {
case MutationRef::AddValue:
return doLittleEndianAdd(existingValue, otherOperand, ar);
break;
case MutationRef::AppendIfFits:
return doAppendIfFits(existingValue, otherOperand, ar);
break;
case MutationRef::And:
return doAnd(existingValue, otherOperand, ar);
break;
case MutationRef::Or:
return doOr(existingValue, otherOperand, ar);
break;
case MutationRef::Xor:
return doXor(existingValue, otherOperand, ar);
break;
case MutationRef::Max:
return doMax(existingValue, otherOperand, ar);
break;
case MutationRef::Min:
return doMin(existingValue, otherOperand, ar);
break;
default:
throw operation_failed();
}
static ValueRef doMinV2(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present())
return otherOperand;
return doMin(existingValueOptional, otherOperand, ar);
}
static ValueRef doByteMin(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
if (!existingValueOptional.present()) return otherOperand;
const ValueRef& existingValue = existingValueOptional.get();
if (existingValue < otherOperand)
return existingValue;
return otherOperand;
}
/*
* Returns the range corresponding to the specified versionstamp key.

View File

@ -26,8 +26,8 @@
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, MAX_ATOMIC_OP };
const char * typeString[MAX_ATOMIC_OP] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue" };
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP };
const char * typeString[MAX_ATOMIC_OP] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
// This is stored this way for serialization purposes.
uint8_t type;
StringRef param1, param2;
@ -54,9 +54,9 @@ struct MutationRef {
// These masks define which mutation types have particular properties (they are used to implement isSingleKeyMutation() etc)
enum {
ATOMIC_MASK = (1 << AddValue) | (1 << And) | (1 << Or) | (1 << Xor) | (1 << AppendIfFits) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue),
ATOMIC_MASK = (1 << AddValue) | (1 << And) | (1 << Or) | (1 << Xor) | (1 << AppendIfFits) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << ByteMin) | (1 << ByteMax) | (1 << MinV2) | (1 << AndV2),
SINGLE_KEY_MASK = ATOMIC_MASK | (1<<SetValue),
NON_ASSOCIATIVE_MASK = (1 << AddValue) | (1 << Or) | (1 << Xor) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue)
NON_ASSOCIATIVE_MASK = (1 << AddValue) | (1 << Or) | (1 << Xor) | (1 << Max) | (1 << Min) | (1 << SetVersionstampedKey) | (1 << SetVersionstampedValue) | (1 << MinV2)
};
};

View File

@ -2206,6 +2206,12 @@ void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationR
if(operand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
throw value_too_large();
if (apiVersionAtLeast(510)) {
if (operationType == MutationRef::Min)
operationType = MutationRef::MinV2;
else if (operationType == MutationRef::And)
operationType = MutationRef::AndV2;
}
auto &req = tr;
auto &t = req.transaction;
auto r = singleKeyRange( key, req.arena );

View File

@ -127,6 +127,9 @@ public:
void checkDeferredError() { if (deferred_error.code() != invalid_error_code) throw deferred_error; }
private: friend class ThreadSafeCluster;
friend class AtomicOpsApiCorrectnessWorkload; // This is just for testing purposes. It needs to change apiVersion
friend class AtomicOpsWorkload; // This is just for testing purposes. It needs to change apiVersion
Cluster( Reference<ClusterConnectionFile> connFile, int apiVersion = API_VERSION_LATEST );
Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface;

View File

@ -55,9 +55,9 @@ KeyValueRef const& RYWIterator::kv( Arena& arena ) {
if (writes.is_unmodified_range())
return cache.kv( arena );
else if (writes.is_independent() || cache.is_empty_range())
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), Optional<ValueRef>(), arena ).value );
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), Optional<ValueRef>(), arena ).value.get() );
else
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), cache.kv(arena).value, arena ).value );
return temp = KeyValueRef( writes.beginKey().assertRef(), WriteMap::coalesceUnder( writes.op(), cache.kv(arena).value, arena ).value.get() );
}
RYWIterator& RYWIterator::operator++() {
@ -651,11 +651,11 @@ TEST_CASE("fdbclient/WriteMap/random") {
TraceEvent("RWMT_checkOperation")
.detail("wm_key", printable(it.beginKey().toStandaloneStringRef()))
.detail("wm_size", it.op().size())
.detail("wm_value", it.op().top().value.size())
.detail("wm_value", it.op().top().value.present() ? std::to_string(it.op().top().value.get().size()) : "Not Found")
.detail("wm_type", (int)it.op().top().type)
.detail("sm_key", printable(setIter->first))
.detail("sm_size", setIter->second.size())
.detail("sm_value", setIter->second.top().value.size())
.detail("sm_value", setIter->second.top().value.present() ? std::to_string(setIter->second.top().value.get().size()) : "Not Found")
.detail("sm_type", (int)setIter->second.top().type);
ASSERT(it.beginKey() == setIter->first && it.op() == setIter->second);
++setIter;

View File

@ -1406,7 +1406,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
for( int i = 0; i < op.size(); ++i) {
switch(op[i].type) {
case MutationRef::SetValue:
tr.set( it.beginKey().assertRef(), op[i].value, false );
tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
break;
case MutationRef::AddValue:
case MutationRef::AppendIfFits:
@ -1417,7 +1417,11 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
case MutationRef::Min:
case MutationRef::SetVersionstampedKey:
case MutationRef::SetVersionstampedValue:
tr.atomicOp( it.beginKey().assertRef(), op[i].value, op[i].type, false );
case MutationRef::ByteMin:
case MutationRef::ByteMax:
case MutationRef::MinV2:
case MutationRef::AndV2:
tr.atomicOp( it.beginKey().assertRef(), op[i].value.get(), op[i].type, false );
break;
default:
break;
@ -1488,6 +1492,13 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
if (operationType == MutationRef::SetVersionstampedValue && operand.size() < 10)
throw client_invalid_operation();
if (tr.apiVersionAtLeast(510)) {
if (operationType == MutationRef::Min)
operationType = MutationRef::MinV2;
else if (operationType == MutationRef::And)
operationType = MutationRef::AndV2;
}
if(options.readYourWritesDisabled) {
return tr.atomicOp(key, operand, (MutationRef::Type) operationType, addWriteConflict);
}

View File

@ -64,4 +64,12 @@ runRYWTransactionFailIfLocked(Database cx, Function func) {
}
}
ACTOR template < class Function >
Future<decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
runRYWTransactionNoRetry(Database cx, Function func) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue()) result = wait(func(tr));
Void _ = wait(tr->commit());
return result;
}
#endif

View File

@ -28,11 +28,11 @@
#include "Atomic.h"
struct RYWMutation {
ValueRef value;
Optional<ValueRef> value;
enum MutationRef::Type type;
RYWMutation( ValueRef const& entry, MutationRef::Type type ) : value(entry), type(type) {}
RYWMutation() : value(ValueRef()), type(MutationRef::NoOp) {}
RYWMutation(Optional<ValueRef> const& entry, MutationRef::Type type ) : value(entry), type(type) {}
RYWMutation() : value(), type(MutationRef::NoOp) {}
bool operator == (const RYWMutation& r) const {
return value == r.value && type == r.type;
@ -148,7 +148,7 @@ public:
if (it.entry().key != key) {
if( it.is_cleared_range() && is_dependent ) {
it.tree.clear();
OperationStack op( RYWMutation( StringRef(), MutationRef::SetValue ) );
OperationStack op( RYWMutation( Optional<StringRef>(), MutationRef::SetValue ) );
coalesceOver(op, RYWMutation(param, operation), *arena);
PTreeImpl::insert( writes, ver, WriteMapEntry( key, std::move(op), true, following_conflict, is_conflict, following_unreadable, is_unreadable ) );
} else {
@ -165,7 +165,7 @@ public:
e.is_conflict = is_conflict;
e.is_unreadable = is_unreadable;
if (e.stack.size() == 0 && it.is_cleared_range() && is_dependent) {
e.stack.push(RYWMutation(StringRef(), MutationRef::SetValue));
e.stack.push(RYWMutation(Optional<StringRef>(), MutationRef::SetValue));
coalesceOver(e.stack, RYWMutation(param, operation), *arena);
} else if( !is_unreadable && e.stack.size() > 0 )
coalesceOver( e.stack, RYWMutation( param, operation ), *arena );
@ -381,14 +381,16 @@ public:
bool empty() const { return writeMapEmpty; }
static RYWMutation coalesce(RYWMutation existingEntry, RYWMutation newEntry, Arena& arena) {
ASSERT(newEntry.value.present());
if (newEntry.type == MutationRef::SetValue)
return newEntry;
else if (newEntry.type == MutationRef::AddValue) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::AddValue:
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value, arena), MutationRef::AddValue);
return RYWMutation(doLittleEndianAdd(existingEntry.value, newEntry.value.get(), arena), MutationRef::AddValue);
default:
throw operation_failed();
}
@ -396,9 +398,9 @@ public:
else if (newEntry.type == MutationRef::AppendIfFits) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::AppendIfFits:
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value, arena), MutationRef::AppendIfFits);
return RYWMutation(doAppendIfFits(existingEntry.value, newEntry.value.get(), arena), MutationRef::AppendIfFits);
default:
throw operation_failed();
}
@ -406,9 +408,9 @@ public:
else if (newEntry.type == MutationRef::And) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAnd(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doAnd(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::And:
return RYWMutation(doAnd(existingEntry.value, newEntry.value, arena), MutationRef::And);
return RYWMutation(doAnd(existingEntry.value, newEntry.value.get(), arena), MutationRef::And);
default:
throw operation_failed();
}
@ -416,9 +418,9 @@ public:
else if (newEntry.type == MutationRef::Or) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doOr(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doOr(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Or:
return RYWMutation(doOr(existingEntry.value, newEntry.value, arena), MutationRef::Or);
return RYWMutation(doOr(existingEntry.value, newEntry.value.get(), arena), MutationRef::Or);
default:
throw operation_failed();
}
@ -426,9 +428,9 @@ public:
else if (newEntry.type == MutationRef::Xor) {
switch(existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doXor(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doXor(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Xor:
return RYWMutation(doXor(existingEntry.value, newEntry.value, arena), MutationRef::Xor);
return RYWMutation(doXor(existingEntry.value, newEntry.value.get(), arena), MutationRef::Xor);
default:
throw operation_failed();
}
@ -436,9 +438,9 @@ public:
else if (newEntry.type == MutationRef::Max) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMax(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Max:
return RYWMutation(doMax(existingEntry.value, newEntry.value, arena), MutationRef::Max);
return RYWMutation(doMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::Max);
default:
throw operation_failed();
}
@ -446,20 +448,60 @@ public:
else if (newEntry.type == MutationRef::Min) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMin(existingEntry.value, newEntry.value, arena), MutationRef::SetValue);
return RYWMutation(doMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::Min:
return RYWMutation(doMin(existingEntry.value, newEntry.value, arena), MutationRef::Min);
return RYWMutation(doMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::Min);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::ByteMin) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doByteMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::ByteMin:
return RYWMutation(doByteMin(existingEntry.value, newEntry.value.get(), arena), MutationRef::ByteMin);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::ByteMax) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doByteMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::ByteMax:
return RYWMutation(doByteMax(existingEntry.value, newEntry.value.get(), arena), MutationRef::ByteMax);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::MinV2) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doMinV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::MinV2:
return RYWMutation(doMinV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::MinV2);
default:
throw operation_failed();
}
}
else if (newEntry.type == MutationRef::AndV2) {
switch (existingEntry.type) {
case MutationRef::SetValue:
return RYWMutation(doAndV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::SetValue);
case MutationRef::AndV2:
return RYWMutation(doAndV2(existingEntry.value, newEntry.value.get(), arena), MutationRef::AndV2);
default:
throw operation_failed();
}
}
else throw operation_failed();
}
static void coalesceOver(OperationStack& stack, RYWMutation newEntry, Arena& arena) {
RYWMutation existingEntry = stack.top();
if (existingEntry.type == newEntry.type) {
if (isNonAssociativeOp(existingEntry.type) && existingEntry.value.size() != newEntry.value.size()) {
if (isNonAssociativeOp(existingEntry.type) && existingEntry.value.present() && existingEntry.value.get().size() != newEntry.value.get().size()) {
stack.push(newEntry);
}
else {
@ -480,7 +522,7 @@ public:
if( !stack.isDependent() && stack.size() == 1 )
return stack.at(0);
RYWMutation currentEntry = RYWMutation( value.present() ? value.get() : StringRef(), MutationRef::SetValue);
RYWMutation currentEntry = RYWMutation( value, MutationRef::SetValue);
for(int i = 0; i < stack.size(); ++i) {
currentEntry = coalesce(currentEntry, stack.at(i), arena);
}

View File

@ -202,7 +202,7 @@ description is not currently required but encouraged.
description="Deprecated"/>
<Option name="bit_and" code="6"
paramType="Bytes" paramDescription="value with which to perform bitwise and"
description="Performs a bitwise ``and`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``."/>
description="Performs a bitwise ``and`` operation. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``."/>
<Option name="or" code="7"
paramType="Bytes" paramDescription="value with which to perform bitwise or"
description="Deprecated"/>
@ -220,13 +220,19 @@ description is not currently required but encouraged.
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The larger of the two values is then stored in the database."/>
<Option name="min" code="13"
paramType="Bytes" paramDescription="value to check against database value"
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The smaller of the two values is then stored in the database."/>
description="Performs a little-endian comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The smaller of the two values is then stored in the database."/>
<Option name="set_versionstamped_key" code="14"
paramType="Bytes" paramDescription="value to which to set the transformed key"
description="Transforms ``key`` using a versionstamp for the transaction. Sets the transformed key in the database to ``param``. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database. The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time versionstamps are not compatible with the Tuple layer. Note that this implies versionstamped keys may not be used with the Subspace and Directory layers." />
<Option name="set_versionstamped_value" code="15"
paramType="Bytes" paramDescription="value to versionstamp and set"
description="Transforms ``param`` using a versionstamp for the transaction. Sets ``key`` in the database to the transformed parameter. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database. The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time versionstamps are not compatible with the Tuple layer." />
<Option name="byte_min" code="16"
paramType="Bytes" paramDescription="value to check against database value"
description="Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the smaller of the two values is then stored in the database."/>
<Option name="byte_max" code="17"
paramType="Bytes" paramDescription="value to check against database value"
description="Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the larger of the two values is then stored in the database."/>
</Scope>
<Scope name="ConflictRangeType">

View File

@ -92,6 +92,7 @@
<ActorCompiler Include="workloads\DummyWorkload.actor.cpp" />
<ActorCompiler Include="workloads\BackupCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\AtomicOps.actor.cpp" />
<ActorCompiler Include="workloads\AtomicOpsApiCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\BackupToDBAbort.actor.cpp" />
<ActorCompiler Include="workloads\BackupToDBCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\AtomicSwitchover.actor.cpp" />

View File

@ -221,6 +221,9 @@
<ActorCompiler Include="workloads\AtomicOps.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\AtomicOpsApiCorrectness.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\StatusWorkload.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>

View File

@ -196,7 +196,7 @@ struct UpdateEagerReadInfo {
// SOMEDAY: Theoretically we can avoid a read if there is an earlier overlapping ClearRange
if (m.type == MutationRef::ClearRange && !m.param2.startsWith(systemKeys.end))
keyBegin.push_back( m.param2 );
else if (m.type == MutationRef::AppendIfFits)
else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) || (m.type == MutationRef::ByteMax))
keys.push_back(pair<KeyRef, int>(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT));
else if (isAtomicOp((MutationRef::Type) m.type))
keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size()));
@ -1461,17 +1461,17 @@ bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, U
}
else if (m.type != MutationRef::SetValue && (m.type)) {
StringRef oldVal;
Optional<StringRef> oldVal;
auto it = data.atLatest().lastLessOrEqual(m.param1);
if (it != data.atLatest().end() && it->isValue() && it.key() == m.param1)
oldVal = it->getValue();
else if (it != data.atLatest().end() && it->isClearTo() && it->getEndKey() > m.param1) {
TEST(true); // Atomic op right after a clear.
oldVal = StringRef();
}
else {
Optional<Value>& oldThing = eager->getValue(m.param1);
oldVal = oldThing.present() ? oldThing.get() : StringRef();
if (oldThing.present())
oldVal = oldThing.get();
}
switch(m.type) {
@ -1496,6 +1496,18 @@ bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, U
case MutationRef::Min:
m.param2 = doMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMin:
m.param2 = doByteMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMax:
m.param2 = doByteMax(oldVal, m.param2, ar);
break;
case MutationRef::MinV2:
m.param2 = doMinV2(oldVal, m.param2, ar);
break;
case MutationRef::AndV2:
m.param2 = doAndV2(oldVal, m.param2, ar);
break;
}
m.type = MutationRef::SetValue;
}

View File

@ -29,6 +29,7 @@
struct AtomicOpsWorkload : TestWorkload {
int opNum, actorCount, nodeCount;
uint32_t opType;
bool apiVersion500 = false;
double testDuration, transactionsPerSecond;
vector<Future<Void>> clients;
@ -41,8 +42,14 @@ struct AtomicOpsWorkload : TestWorkload {
actorCount = getOption( options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5 );
opType = getOption( options, LiteralStringRef("opType"), -1 );
nodeCount = getOption( options, LiteralStringRef("nodeCount"), 1000 );
// Atomic OPs Min and And have modified behavior from api version 510. Hence allowing testing for older version (500) with a 10% probability
// Actual change of api Version happens in setup
apiVersion500 = ((sharedRandomNumber % 10) == 0);
TraceEvent("AtomicOpsApiVersion500").detail("apiVersion500", apiVersion500);
int64_t randNum = sharedRandomNumber / 10;
if(opType == -1)
opType = sharedRandomNumber % 6;
opType = randNum % 8;
switch(opType) {
case 0:
@ -69,6 +76,14 @@ struct AtomicOpsWorkload : TestWorkload {
TEST(true); //Testing atomic Min
opType = MutationRef::Min;
break;
case 6:
TEST(true); //Testing atomic ByteMin
opType = MutationRef::ByteMin;
break;
case 7:
TEST(true); //Testing atomic ByteMax
opType = MutationRef::ByteMax;
break;
default:
ASSERT(false);
}
@ -77,7 +92,10 @@ struct AtomicOpsWorkload : TestWorkload {
virtual std::string description() { return "AtomicOps"; }
virtual Future<Void> setup( Database const& cx ) {
virtual Future<Void> setup( Database const& cx ) {
if (apiVersion500)
cx->cluster->apiVersion = 500;
if(clientId != 0)
return Void();
return _setup( cx, this );

View File

@ -0,0 +1,381 @@
/*
* AtomicOpsApiCorrectness.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/actorcompiler.h"
#include "fdbserver/TesterInterface.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/RunTransaction.actor.h"
#include "workloads.h"
struct AtomicOpsApiCorrectnessWorkload : TestWorkload {
bool testFailed = false;
uint32_t opType;
private:
static int getApiVersion(const Database &cx) {
return cx->cluster->apiVersion;
}
static void setApiVersion(Database *cx, int version) {
(*cx)->cluster->apiVersion = version;
}
Key getTestKey(std::string prefix) {
std::string key = prefix + std::to_string(clientId);
return StringRef(key);
}
public:
AtomicOpsApiCorrectnessWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx)
{
opType = getOption(options, LiteralStringRef("opType"), -1);
}
virtual std::string description() { return "AtomicOpsApiCorrectness"; }
virtual Future<Void> setup(Database const& cx) {
return Void();
}
virtual Future<Void> start(Database const& cx) {
if (opType == -1)
opType = sharedRandomNumber % 8;
switch (opType) {
case 0:
TEST(true); //Testing atomic Min
return testMin(cx->clone(), this);
case 1:
TEST(true); //Testing atomic And
return testAnd(cx->clone(), this);
case 2:
TEST(true); //Testing atomic ByteMin
return testByteMin(cx->clone(), this);
case 3:
TEST(true); //Testing atomic ByteMax
return testByteMax(cx->clone(), this);
case 4:
TEST(true); //Testing atomic Or
return testOr(cx->clone(), this);
case 5:
TEST(true); //Testing atomic Max
return testMax(cx->clone(), this);
case 6:
TEST(true); //Testing atomic Xor
return testXor(cx->clone(), this);
case 7:
TEST(true); //Testing atomic Add
return testAdd(cx->clone(), this);
default:
ASSERT(false);
}
return Void();
}
virtual Future<bool> check(Database const& cx) {
return !testFailed;
}
virtual void getMetrics(vector<PerfMetric>& m) {
}
// Test Atomic ops on non existing keys that results in a set
ACTOR Future<Void> testAtomicOpSetOnNonExistingKey(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key) {
state uint64_t intValue = g_random->randomInt(0, 10000000);
state Value val = StringRef((const uint8_t*)&intValue, sizeof(intValue));
// Do operation on Storage Server
loop{
try {
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->clear(key); return Void(); }));
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, val, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != intValue) {
TraceEvent(SevError, "AtomicOpSetOnNonExistingKeyUnexpectedOutput").detail("opOn", "StorageServer").detail("op", opType).detail("ExpectedOutput", intValue).detail("ActualOutput", output);
self->testFailed = true;
}
// Do operation on RYW Layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->clear(key); tr->atomicOp(key, val, opType); return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != intValue) {
TraceEvent(SevError, "AtomicOpSetOnNonExistingKeyUnexpectedOutput").detail("opOn", "RYWLayer").detail("op", opType).detail("ExpectedOutput", intValue).detail("ActualOutput", output);
self->testFailed = true;
}
return Void();
}
// Test Atomic ops on non existing keys that results in a unset
ACTOR Future<Void> testAtomicOpUnsetOnNonExistingKey(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key) {
state uint64_t intValue = g_random->randomInt(0, 10000000);
state Value val = StringRef((const uint8_t*)&intValue, sizeof(intValue));
// Do operation on Storage Server
loop {
try {
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->clear(key); return Void(); }));
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, val, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != 0) {
TraceEvent(SevError, "AtomicOpUnsetOnNonExistingKeyUnexpectedOutput").detail("opOn", "StorageServer").detail("op", opType).detail("ExpectedOutput", 0).detail("ActualOutput", output);
self->testFailed = true;
}
// Do operation on RYW Layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->clear(key); tr->atomicOp(key, val, opType); return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != 0) {
TraceEvent(SevError, "AtomicOpUnsetOnNonExistingKeyUnexpectedOutput").detail("opOn", "RYWLayer").detail("op", opType).detail("ExpectedOutput", 0).detail("ActualOutput", output);
self->testFailed = true;
}
return Void();
}
typedef std::function<Value(Value, Value)> DoAtomicOpOnEmptyValueFunction;
// Test Atomic Ops when one of the value is empty
ACTOR Future<Void> testAtomicOpOnEmptyValue(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key, DoAtomicOpOnEmptyValueFunction opFunc) {
state Value existingVal;
state Value otherVal;
state uint64_t val = g_random->randomInt(0, 10000000);
if (g_random->random01() < 0.5) {
existingVal = StringRef((const uint8_t*)&val, sizeof(val));
otherVal = StringRef();
}
else {
otherVal = StringRef((const uint8_t*)&val, sizeof(val));
existingVal = StringRef();
}
// Do operation on Storage Server
loop{
try {
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->set(key, existingVal); return Void(); }));
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, otherVal, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
ASSERT(outputVal.present());
Value output = outputVal.get();
if (output != opFunc(existingVal, otherVal)) {
TraceEvent(SevError, "AtomicOpOnEmptyValueUnexpectedOutput").detail("opOn", "StorageServer").detail("op", opType).detail("ExpectedOutput", opFunc(existingVal, otherVal).toString()).detail("ActualOutput", output.toString());
self->testFailed = true;
}
// Do operation on RYW Layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->set(key, existingVal); tr->atomicOp(key, otherVal, opType); return tr->get(key); }));
ASSERT(outputVal.present());
Value output = outputVal.get();
if (output != opFunc(existingVal, otherVal)) {
TraceEvent(SevError, "AtomicOpOnEmptyValueUnexpectedOutput").detail("opOn", "RYWLayer").detail("op", opType).detail("ExpectedOutput", opFunc(existingVal, otherVal).toString()).detail("ActualOutput", output.toString());
self->testFailed = true;
}
return Void();
}
typedef std::function<uint64_t(uint64_t, uint64_t)> DoAtomicOpFunction;
// Test atomic ops in the normal case when the existing value is present
ACTOR Future<Void> testAtomicOpApi(Database cx, AtomicOpsApiCorrectnessWorkload* self, uint32_t opType, Key key, DoAtomicOpFunction opFunc) {
state uint64_t intValue1 = g_random->randomInt(0, 10000000);
state uint64_t intValue2 = g_random->randomInt(0, 10000000);
state Value val1 = StringRef((const uint8_t*)&intValue1, sizeof(intValue1));
state Value val2 = StringRef((const uint8_t *)&intValue2, sizeof(intValue2));
// Do operation on Storage Server
loop{
try {
// Set the key to a random value
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->set(key, val1); return Void(); }));
// Do atomic op
Void _ = wait(runRYWTransactionNoRetry(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> { tr->atomicOp(key, val2, opType); return Void(); }));
break;
}
catch (Error& e) {
TraceEvent(SevInfo, "AtomicOpApiThrow").detail("errCode", e.code());
Void _ = wait(delay(1));
}
}
// Compare result
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { return tr->get(key); }));
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != opFunc(intValue1, intValue2)) {
TraceEvent(SevError, "AtomicOpApiCorrectnessUnexpectedOutput").detail("opOn", "StorageServer").detail("InValue1", intValue1).detail("InValue2", intValue2).detail("AtomicOp", opType).detail("ExpectedOutput", opFunc(intValue1, intValue2)).detail("ActualOutput", output);
self->testFailed = true;
}
// Do operation at RYW layer
Optional<Value> outputVal = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> { tr->set(key, val1); tr->atomicOp(key, val2, opType); return tr->get(key); }));
// Compare result
uint64_t output = 0;
ASSERT(outputVal.present() && outputVal.get().size() == sizeof(uint64_t));
memcpy(&output, outputVal.get().begin(), outputVal.get().size());
if (output != opFunc(intValue1, intValue2)) {
TraceEvent(SevError, "AtomicOpApiCorrectnessUnexpectedOutput").detail("opOn", "RYWLayer").detail("InValue1", intValue1).detail("InValue2", intValue2).detail("AtomicOp", opType).detail("ExpectedOutput", opFunc(intValue1, intValue2)).detail("ActualOutput", output);
self->testFailed = true;
}
return Void();
}
ACTOR Future<Void> testMin(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state int currentApiVersion = getApiVersion(cx);
state Key key = self->getTestKey("test_key_min_");
TraceEvent("AtomicOpCorrectnessApiWorkload").detail("opType", "MIN");
// API Version 500
setApiVersion(&cx, 500);
TraceEvent(SevInfo, "Running Atomic Op Min Correctness Test Api Version 500");
Void _ = wait(self->testAtomicOpUnsetOnNonExistingKey(cx, self, MutationRef::Min, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Min, key, [](uint64_t val1, uint64_t val2) { return val1 < val2 ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Min, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
// Current API Version
setApiVersion(&cx, currentApiVersion);
TraceEvent(SevInfo, "Running Atomic Op Min Correctness Current Api Version").detail("Version", currentApiVersion);
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Min, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Min, key, [](uint64_t val1, uint64_t val2) { return val1 < val2 ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Min, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
return Void();
}
ACTOR Future<Void> testMax(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_max_");
TraceEvent(SevInfo, "Running Atomic Op MAX Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Max, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Max, key, [](uint64_t val1, uint64_t val2) { return val1 > val2 ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Max, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testAnd(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state int currentApiVersion = getApiVersion(cx);
state Key key = self->getTestKey("test_key_and_");
TraceEvent("AtomicOpCorrectnessApiWorkload").detail("opType", "AND");
// API Version 500
setApiVersion(&cx, 500);
TraceEvent(SevInfo, "Running Atomic Op AND Correctness Test Api Version 500");
Void _ = wait(self->testAtomicOpUnsetOnNonExistingKey(cx, self, MutationRef::And, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::And, key, [](uint64_t val1, uint64_t val2) { return val1 & val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::And, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
// Current API Version
setApiVersion(&cx, currentApiVersion);
TraceEvent(SevInfo, "Running Atomic Op AND Correctness Current Api Version").detail("Version", currentApiVersion);
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::And, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::And, key, [](uint64_t val1, uint64_t val2) { return val1 & val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::And, key, [](Value v1, Value v2) -> Value { uint64_t zeroVal = 0; if (v2.size() == 0) return StringRef(); else return StringRef((const uint8_t*)&zeroVal, sizeof(zeroVal)); }));
return Void();
}
ACTOR Future<Void> testOr(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_or_");
TraceEvent(SevInfo, "Running Atomic Op OR Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Or, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Or, key, [](uint64_t val1, uint64_t val2) { return val1 | val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Or, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testXor(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_xor_");
TraceEvent(SevInfo, "Running Atomic Op XOR Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::Xor, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::Xor, key, [](uint64_t val1, uint64_t val2) { return val1 ^ val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::Xor, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testAdd(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_add_");
TraceEvent(SevInfo, "Running Atomic Op ADD Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::AddValue, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::AddValue, key, [](uint64_t val1, uint64_t val2) { return val1 + val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::AddValue, key, [](Value v1, Value v2) -> Value { return v2.size() ? v2 : StringRef(); }));
return Void();
}
ACTOR Future<Void> testByteMin(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_byte_min_");
TraceEvent(SevInfo, "Running Atomic Op BYTE_MIN Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::ByteMin, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::ByteMin, key, [](uint64_t val1, uint64_t val2) { return StringRef((const uint8_t *)&val1, sizeof(val1)) < StringRef((const uint8_t *)&val2, sizeof(val2)) ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::ByteMin, key, [](Value v1, Value v2) -> Value { return StringRef(); }));
return Void();
}
ACTOR Future<Void> testByteMax(Database cx, AtomicOpsApiCorrectnessWorkload* self) {
state Key key = self->getTestKey("test_key_byte_max_");
TraceEvent(SevInfo, "Running Atomic Op BYTE_MAX Correctness Current Api Version");
Void _ = wait(self->testAtomicOpSetOnNonExistingKey(cx, self, MutationRef::ByteMax, key));
Void _ = wait(self->testAtomicOpApi(cx, self, MutationRef::ByteMax, key, [](uint64_t val1, uint64_t val2) { return StringRef((const uint8_t *)&val1, sizeof(val1)) > StringRef((const uint8_t *)&val2, sizeof(val2)) ? val1 : val2; }));
Void _ = wait(self->testAtomicOpOnEmptyValue(cx, self, MutationRef::ByteMax, key, [](Value v1, Value v2) -> Value { return v1.size() ? v1 : v2; }));
return Void();
}
};
WorkloadFactory<AtomicOpsApiCorrectnessWorkload> AtomicOpsApiCorrectnessWorkloadFactory("AtomicOpsApiCorrectness");

View File

@ -138,7 +138,7 @@ struct RandomSelectorWorkload : TestWorkload {
try {
for(i = 0; i < g_random->randomInt(self->minOperationsPerTransaction,self->maxOperationsPerTransaction+1); i++) {
j = g_random->randomInt(0,13);
j = g_random->randomInt(0,15);
if( j < 3 ) {
myKeyA = format( "%010d", g_random->randomInt( 0, self->maxKeySpace+1 ) );
myValue = format("%d", g_random->randomInt( 0, 10000000 ) );
@ -377,6 +377,54 @@ struct RandomSelectorWorkload : TestWorkload {
}
}
}
else if (j < 13) {
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
myValue = format("%d", g_random->randomInt(0, 10000000));
//TraceEvent("RYOWbytemin").detail("Key",myKeyA).detail("Value", myValue);
trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::ByteMin);
loop{
try {
tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());
tr.atomicOp(StringRef(clientID + "d/" + myKeyA), myValue, MutationRef::ByteMin);
Void _ = wait(tr.commit());
break;
}
catch (Error& e) {
error = e;
Void _ = wait(tr.onError(e));
if (error.code() == error_code_commit_unknown_result) {
Optional<Value> thing = wait(tr.get(StringRef(clientID + "z/" + myRandomIDKey)));
if (thing.present()) break;
}
}
}
}
else if (j < 14) {
myKeyA = format("%010d", g_random->randomInt(0, self->maxKeySpace + 1));
myRandomIDKey = format("%010d", g_random->randomInt(0, 1000000000));
myValue = format("%d", g_random->randomInt(0, 10000000));
//TraceEvent("RYOWbytemax").detail("Key",myKeyA).detail("Value", myValue);
trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::ByteMax);
loop{
try {
tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());
tr.atomicOp(StringRef(clientID + "d/" + myKeyA), myValue, MutationRef::ByteMax);
Void _ = wait(tr.commit());
break;
}
catch (Error& e) {
error = e;
Void _ = wait(tr.onError(e));
if (error.code() == error_code_commit_unknown_result) {
Optional<Value> thing = wait(tr.get(StringRef(clientID + "z/" + myRandomIDKey)));
if (thing.present()) break;
}
}
}
}
else {
int a = g_random->randomInt( 1, self->maxKeySpace+1 );
int b = g_random->randomInt( 1, self->maxKeySpace+1 );

View File

@ -171,7 +171,7 @@ struct SerializabilityWorkload : TestWorkload {
Key key = getRandomKey();
Value value = getRandomValue();
MutationRef::Type opType;
switch( g_random->randomInt(0,6) ) {
switch( g_random->randomInt(0,8) ) {
case 0:
opType = MutationRef::AddValue;
break;
@ -190,6 +190,12 @@ struct SerializabilityWorkload : TestWorkload {
case 5:
opType = MutationRef::Min;
break;
case 6:
opType = MutationRef::ByteMin;
break;
case 7:
opType = MutationRef::ByteMax;
break;
}
op.mutationOp = MutationRef(opType, key, value);
hasMutation = true;

View File

@ -545,16 +545,16 @@ struct WriteDuringReadWorkload : TestWorkload {
return KeyRangeRef( getKeyForIndex( startLocation ), getKeyForIndex( endLocation ) );
}
Value applyAtomicOp(Value existingValue, Value value, MutationRef::Type type) {
Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type) {
Arena arena;
if (type == MutationRef::SetValue)
return value;
else if (type == MutationRef::AddValue)
return doLittleEndianAdd(existingValue, value, arena);
else if (type == MutationRef::AppendIfFits)
else if (type == MutationRef::AppendIfFits)
return doAppendIfFits(existingValue, value, arena);
else if (type == MutationRef::And)
return doAnd(existingValue, value, arena);
return doAndV2(existingValue, value, arena);
else if (type == MutationRef::Or)
return doOr(existingValue, value, arena);
else if (type == MutationRef::Xor)
@ -562,7 +562,11 @@ struct WriteDuringReadWorkload : TestWorkload {
else if (type == MutationRef::Max)
return doMax(existingValue, value, arena);
else if (type == MutationRef::Min)
return doMin(existingValue, value, arena);
return doMinV2(existingValue, value, arena);
else if (type == MutationRef::ByteMin)
return doByteMin(existingValue, value, arena);
else if (type == MutationRef::ByteMax)
return doByteMax(existingValue, value, arena);
ASSERT(false);
return Value();
}
@ -730,7 +734,7 @@ struct WriteDuringReadWorkload : TestWorkload {
Key key = self->getRandomKey();
Value value = self->getRandomValue();
MutationRef::Type opType;
switch( g_random->randomInt(0,6) ) {
switch( g_random->randomInt(0,8) ) {
case 0:
opType = MutationRef::AddValue;
break;
@ -749,6 +753,12 @@ struct WriteDuringReadWorkload : TestWorkload {
case 5:
opType = MutationRef::Min;
break;
case 6:
opType = MutationRef::ByteMin;
break;
case 7:
opType = MutationRef::ByteMax;
break;
}
self->changeCount.insert( key, changeNum++ );
bool noConflict = g_random->random01() < 0.5;
@ -760,7 +770,7 @@ struct WriteDuringReadWorkload : TestWorkload {
if( !noConflict && key.size() <= (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT) )
self->addedConflicts.insert(key, true);
Optional<Value> existing = self->memoryGet( &self->memoryDatabase, key );
self->memoryDatabase[ key ] = self->applyAtomicOp( existing.present() ? existing.get() : Value(), value, opType );
self->memoryDatabase[ key ] = self->applyAtomicOp( existing.present() ? Optional<StringRef>(existing.get()) : Optional<StringRef>(), value, opType );
}
} else if( operationType > 11 && !disableSet ) {
Key key = self->getRandomKey();

View File

@ -51,7 +51,7 @@ using namespace boost::asio::ip;
// These impact both communications and the deserialization of certain database and IKeyValueStore keys
// xyzdev
// vvvv
uint64_t currentProtocolVersion = 0x0FDB00A551010001LL;
uint64_t currentProtocolVersion = 0x0FDB00A551020001LL;
uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -76,7 +76,7 @@ func read_blob(t fdb.ReadTransactor, blob_subspace subspace.Subspace) ([]byte, e
}
func main() {
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -198,7 +198,7 @@ func (doc Doc) GetDoc(trtr fdb.Transactor, doc_id int) interface{} {
}
func main() {
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -116,7 +116,7 @@ func (graph *Graph) get_in_neighbors(trtr fdb.Transactor, node int) ([]int, erro
}
func main() {
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -87,7 +87,7 @@ func (wrkspc Workspace) Session(foo func(directory.DirectorySubspace)) (err erro
}
func main() {
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -124,7 +124,7 @@ func (multi MultiMap) MultiIsElement(trtr fdb.Transactor, index, value interface
func main() {
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -100,7 +100,7 @@ func (prty Priority) Peek(trtr fdb.Transactor, max bool) interface{} {
func main() {
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -101,7 +101,7 @@ func (q *Queue) FirstItem(trtr fdb.Transactor) (interface{}, error) {
func main() {
fmt.Println("Queue Example Program")
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -128,7 +128,7 @@ func (tbl Table) TableGetCol(tr fdb.ReadTransactor, col int) ([]interface{}, err
}
func main() {
fdb.MustAPIVersion(500)
fdb.MustAPIVersion(510)
db := fdb.MustOpenDefault()

View File

@ -0,0 +1,3 @@
testTitle=AtomicOpsCorrectnessTest
testName=AtomicOpsApiCorrectness