Merge branch 'release-5.0'

This commit is contained in:
A.J. Beamon 2017-05-26 11:21:26 -07:00
commit 62876ae731
14 changed files with 52 additions and 45 deletions

View File

@ -183,7 +183,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
public CompletableFuture<Long> getReadVersion() {
pointerReadLock.lock();
try {
return new FutureVersion( Transaction_getReadVersion(getPtr()));
return new FutureVersion( Transaction_getReadVersion(getPtr()), executor);
} finally {
pointerReadLock.unlock();
}
@ -200,7 +200,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
private CompletableFuture<byte[]> get_internal(byte[] key, boolean isSnapshot) {
pointerReadLock.lock();
try {
return new FutureResult( Transaction_get(getPtr(), key, isSnapshot));
return new FutureResult( Transaction_get(getPtr(), key, isSnapshot), executor);
} finally {
pointerReadLock.unlock();
}
@ -218,7 +218,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
pointerReadLock.lock();
try {
return new FutureKey( Transaction_getKey(getPtr(),
selector.getKey(), selector.orEqual(), selector.getOffset(), isSnapshot));
selector.getKey(), selector.orEqual(), selector.getOffset(), isSnapshot), executor);
} finally {
pointerReadLock.unlock();
}
@ -313,7 +313,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
return new FutureResults(Transaction_getRange(
getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
streamingMode, iteration, isSnapshot, reverse));
streamingMode, iteration, isSnapshot, reverse), executor);
} finally {
pointerReadLock.unlock();
}
@ -442,7 +442,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
public CompletableFuture<Void> commit() {
pointerReadLock.lock();
try {
return new FutureVoid(Transaction_commit(getPtr()));
return new FutureVoid(Transaction_commit(getPtr()), executor);
} finally {
pointerReadLock.unlock();
}
@ -462,7 +462,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
public CompletableFuture<byte[]> getVersionstamp() {
pointerReadLock.lock();
try {
return new FutureKey(Transaction_getVersionstamp(getPtr()));
return new FutureKey(Transaction_getVersionstamp(getPtr()), executor);
} finally {
pointerReadLock.unlock();
}
@ -472,7 +472,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
public CompletableFuture<Void> watch(byte[] key) throws FDBException {
pointerReadLock.lock();
try {
return new FutureVoid(Transaction_watch(getPtr(), key));
return new FutureVoid(Transaction_watch(getPtr(), key), executor);
} finally {
pointerReadLock.unlock();
}
@ -490,7 +490,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
}
pointerReadLock.lock();
try {
CompletableFuture<Void> f = new FutureVoid(Transaction_onError(getPtr(), ((FDBException)e).getCode()));
CompletableFuture<Void> f = new FutureVoid(Transaction_onError(getPtr(), ((FDBException)e).getCode()), executor);
final Transaction tr = transfer();
return f.thenApply(v -> tr)
.whenComplete((v, t) -> {
@ -533,7 +533,7 @@ class FDBTransaction extends DefaultDisposableImpl implements Disposable, Transa
public CompletableFuture<String[]> getAddressesForKey(byte[] key) {
pointerReadLock.lock();
try {
return new FutureStrings(Transaction_getKeyLocations(getPtr(), key));
return new FutureStrings(Transaction_getKeyLocations(getPtr(), key), executor);
} finally {
pointerReadLock.unlock();
}

View File

@ -28,7 +28,7 @@ class FutureCluster extends NativeFuture<Cluster> {
protected FutureCluster(long cPtr, Executor executor) {
super(cPtr);
this.executor = executor;
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -28,7 +28,7 @@ class FutureDatabase extends NativeFuture<Database> {
FutureDatabase(long cPtr, Executor executor) {
super(cPtr);
this.executor = executor;
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -20,12 +20,12 @@
package com.apple.cie.foundationdb;
import java.util.concurrent.Executor;
class FutureKey extends NativeFuture<byte[]> {
FutureKey(long cPtr) {
FutureKey(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -20,11 +20,12 @@
package com.apple.cie.foundationdb;
import java.util.concurrent.Executor;
class FutureResult extends NativeFuture<byte[]> {
FutureResult(long cPtr) {
FutureResult(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -20,11 +20,12 @@
package com.apple.cie.foundationdb;
import java.util.concurrent.Executor;
class FutureResults extends NativeFuture<RangeResultInfo> {
FutureResults(long cPtr) {
FutureResults(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -20,11 +20,12 @@
package com.apple.cie.foundationdb;
import java.util.concurrent.Executor;
class FutureStrings extends NativeFuture<String[]> {
FutureStrings(long cPtr) {
FutureStrings(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -20,11 +20,12 @@
package com.apple.cie.foundationdb;
import java.util.concurrent.Executor;
class FutureVersion extends NativeFuture<Long> {
FutureVersion(long cPtr) {
FutureVersion(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -20,11 +20,12 @@
package com.apple.cie.foundationdb;
import java.util.concurrent.Executor;
class FutureVoid extends NativeFuture<Void> {
FutureVoid(long cPtr) {
FutureVoid(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback();
registerMarshalCallback(executor);
}
@Override

View File

@ -21,6 +21,7 @@
package com.apple.cie.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
abstract class NativeFuture<T> extends CompletableFuture<T> {
protected final long cPtr;
@ -35,13 +36,8 @@ abstract class NativeFuture<T> extends CompletableFuture<T> {
// constructor of this class because a quickly completing future can
// lead to a race where the marshalWhenDone tries to run on an
// unconstructed subclass.
protected void registerMarshalCallback() {
Future_registerCallback(cPtr, new Runnable() {
@Override
public void run() {
NativeFuture.this.marshalWhenDone();
}
});
protected void registerMarshalCallback(Executor executor) {
Future_registerCallback(cPtr, () -> executor.execute(this::marshalWhenDone));
}
private void marshalWhenDone() {

View File

@ -1522,14 +1522,24 @@ THREAD_FUNC cancel(void *arg) {
ACTOR Future<Void> checkUndestroyedFutures(std::vector<ThreadSingleAssignmentVar<int>*> undestroyed) {
state int fNum;
state ThreadSingleAssignmentVar<int>* f;
state double start = now();
for(fNum = 0; fNum < undestroyed.size(); ++fNum) {
state ThreadSingleAssignmentVar<int>* f = undestroyed[fNum];
state double start = now();
while(!f->isReady()) {
ASSERT(now() < start+60);
Void _ = wait(delay(5.0));
f = undestroyed[fNum];
while(!f->isReady() && start+5 >= now()) {
Void _ = wait(delay(1.0));
}
ASSERT(f->isReady());
}
Void _ = wait(delay(1.0));
for(fNum = 0; fNum < undestroyed.size(); ++fNum) {
f = undestroyed[fNum];
ASSERT(f->debugGetReferenceCount() == 1);
ASSERT(f->isReady());
@ -1545,7 +1555,7 @@ THREAD_FUNC runSingleAssignmentVarTest(void *arg) {
volatile bool *done = (volatile bool*)arg;
try {
for(int i = 0; i < 100; ++i) {
for(int i = 0; i < 25; ++i) {
FutureInfo f = createVarOnMainThread(false);
FutureInfo tf = T::createThreadFuture(f);
tf.validate();
@ -1553,7 +1563,7 @@ THREAD_FUNC runSingleAssignmentVarTest(void *arg) {
tf.future.extractPtr(); // leaks
}
for(int numRuns = 0; numRuns < 100; ++numRuns) {
for(int numRuns = 0; numRuns < 25; ++numRuns) {
std::vector<ThreadSingleAssignmentVar<int>*> undestroyed;
std::vector<THREAD_HANDLE> threads;
for(int i = 0; i < 10; ++i) {

View File

@ -1097,7 +1097,7 @@ public:
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->startingClass != ProcessClass::TesterClass) {
// Add machine processes to dead group if dead or specified kill machine
if (processInfo->failed || machineRec.second.dead || (machineRec.second.zoneId == zoneId))
if (processInfo->failed || (machineRec.second.zoneId == zoneId))
processesDead.push_back(processInfo);
else
processesLeft.push_back(processInfo);
@ -1112,7 +1112,6 @@ public:
}
else if ((kt == KillInstantly) || (kt == InjectFaults)) {
TraceEvent("DeadMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalZones", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
machines[zoneId].dead = true;
}
else {
TraceEvent("ClearMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalZones", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
@ -1173,7 +1172,7 @@ public:
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->startingClass != ProcessClass::TesterClass) {
// Add processes from dead machines and datacenter machines to dead group
if (processInfo->failed || machineRec.second.dead || (datacenterZones.find(machineRec.second.zoneId) != datacenterZones.end()))
if (processInfo->failed || (datacenterZones.find(machineRec.second.zoneId) != datacenterZones.end()))
processesDead.push_back(processInfo);
else
processesLeft.push_back(processInfo);
@ -1245,7 +1244,6 @@ public:
for( auto process : machine.processes ) {
ASSERT( process->failed );
}
machine.dead = true;
if( machine.machineProcess ) {
killProcess_internal( machine.machineProcess, KillInstantly );
}

View File

@ -93,9 +93,8 @@ public:
std::map<std::string, Future<Reference<IAsyncFile>>> openFiles;
std::set<std::string> closingFiles;
Optional<Standalone<StringRef>> zoneId;
bool dead;
MachineInfo() : machineProcess(0), dead(false) {}
MachineInfo() : machineProcess(0) {}
};
template <class Func>

View File

@ -182,7 +182,6 @@ struct RemoveServersSafelyWorkload : TestWorkload {
// The actor final boolean argument is a hack to prevent the second part of this function from happening
// Fix Me
NOT_IN_CLEAN;
if (exitAfterInclude) return Void();
std::vector<NetworkAddress> coordinators = wait( getCoordinators(cx) );