Merge branch 'release-6.3' into mengxu/fr-sched-PR

Resolve conflict at BackupContainer.actor.cpp
This commit is contained in:
Meng Xu 2020-08-27 16:53:50 -07:00
commit ca9b1f5b34
15 changed files with 783 additions and 223 deletions

View File

@ -22,6 +22,8 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/directory/NoSuchDirectoryException.java
src/main/com/apple/foundationdb/directory/package-info.java
src/main/com/apple/foundationdb/directory/PathUtil.java
src/main/com/apple/foundationdb/DirectBufferIterator.java
src/main/com/apple/foundationdb/DirectBufferPool.java
src/main/com/apple/foundationdb/FDB.java
src/main/com/apple/foundationdb/FDBDatabase.java
src/main/com/apple/foundationdb/FDBTransaction.java

View File

@ -305,42 +305,6 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureStrings_FutureString
return arr;
}
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getSummary(JNIEnv *jenv, jobject, jlong future) {
if( !future ) {
throwParamNotNull(jenv);
return JNI_NULL;
}
FDBFuture *f = (FDBFuture *)future;
const FDBKeyValue *kvs;
int count;
fdb_bool_t more;
fdb_error_t err = fdb_future_get_keyvalue_array( f, &kvs, &count, &more );
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );
return JNI_NULL;
}
jbyteArray lastKey = JNI_NULL;
if(count) {
lastKey = jenv->NewByteArray(kvs[count - 1].key_length);
if( !lastKey ) {
if( !jenv->ExceptionOccurred() )
throwOutOfMem(jenv);
return JNI_NULL;
}
jenv->SetByteArrayRegion(lastKey, 0, kvs[count - 1].key_length, (jbyte *)kvs[count - 1].key);
}
jobject result = jenv->NewObject(range_result_summary_class, range_result_summary_init, lastKey, count, (jboolean)more);
if( jenv->ExceptionOccurred() )
return JNI_NULL;
return result;
}
// SOMEDAY: explore doing this more efficiently with Direct ByteBuffers
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1get(JNIEnv *jenv, jobject, jlong future) {
if( !future ) {
@ -640,6 +604,68 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
return (jlong)f;
}
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getDirect(
JNIEnv* jenv, jobject, jlong future, jobject jbuffer, jint bufferCapacity) {
if( !future ) {
throwParamNotNull(jenv);
return;
}
uint8_t* buffer = (uint8_t*)jenv->GetDirectBufferAddress(jbuffer);
if (!buffer) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return;
}
FDBFuture* f = (FDBFuture*)future;
const FDBKeyValue *kvs;
int count;
fdb_bool_t more;
fdb_error_t err = fdb_future_get_keyvalue_array( f, &kvs, &count, &more );
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );
return;
}
// Capacity for Metadata+Keys+Values
// => sizeof(jint) for total key/value pairs
// => sizeof(jint) to store more flag
// => sizeof(jint) to store key length per KV pair
// => sizeof(jint) to store value length per KV pair
int totalCapacityNeeded = 2 * sizeof(jint);
for(int i = 0; i < count; i++) {
totalCapacityNeeded += kvs[i].key_length + kvs[i].value_length + 2*sizeof(jint);
if (bufferCapacity < totalCapacityNeeded) {
count = i; /* Only fit first `i` K/V pairs */
more = true;
break;
}
}
int offset = 0;
// First copy RangeResultSummary, i.e. [keyCount, more]
memcpy(buffer + offset, &count, sizeof(jint));
offset += sizeof(jint);
memcpy(buffer + offset, &more, sizeof(jint));
offset += sizeof(jint);
for (int i = 0; i < count; i++) {
memcpy(buffer + offset, &kvs[i].key_length, sizeof(jint));
memcpy(buffer + offset + sizeof(jint), &kvs[i].value_length, sizeof(jint));
offset += 2 * sizeof(jint);
memcpy(buffer + offset, kvs[i].key, kvs[i].key_length);
offset += kvs[i].key_length;
memcpy(buffer + offset, kvs[i].value, kvs[i].value_length);
offset += kvs[i].value_length;
}
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getEstimatedRangeSizeBytes(JNIEnv *jenv, jobject, jlong tPtr,
jbyteArray beginKeyBytes, jbyteArray endKeyBytes) {
if( !tPtr || !beginKeyBytes || !endKeyBytes) {

View File

@ -0,0 +1,108 @@
/*
* DirectBufferIterator.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Holds the direct buffer that is shared with JNI wrapper. A typical usage is as follows:
*
* The serialization format of result is =>
* [int keyCount, boolean more, ListOf<(int keyLen, int valueLen, byte[] key, byte[] value)>]
*/
class DirectBufferIterator implements Iterator<KeyValue>, AutoCloseable {
private ByteBuffer byteBuffer;
private int current = 0;
private int keyCount = -1;
private boolean more = false;
public DirectBufferIterator(ByteBuffer buffer) {
byteBuffer = buffer;
byteBuffer.order(ByteOrder.nativeOrder());
}
@Override
public void close() {
if (byteBuffer != null) {
DirectBufferPool.getInstance().add(byteBuffer);
byteBuffer = null;
}
}
public boolean hasResultReady() {
return keyCount > -1;
}
@Override
public boolean hasNext() {
assert (hasResultReady());
return current < keyCount;
}
@Override
public KeyValue next() {
assert (hasResultReady()); // Must be called once its ready.
if (!hasNext()) {
throw new NoSuchElementException();
}
final int keyLen = byteBuffer.getInt();
final int valueLen = byteBuffer.getInt();
byte[] key = new byte[keyLen];
byteBuffer.get(key);
byte[] value = new byte[valueLen];
byteBuffer.get(value);
current += 1;
return new KeyValue(key, value);
}
public ByteBuffer getBuffer() {
return byteBuffer;
}
public int count() {
assert (hasResultReady());
return keyCount;
}
public boolean hasMore() {
assert (hasResultReady());
return more;
}
public int currentIndex() {
return current;
}
public void readResultsSummary() {
byteBuffer.rewind();
byteBuffer.position(0);
keyCount = byteBuffer.getInt();
more = byteBuffer.getInt() > 0;
}
}

View File

@ -0,0 +1,89 @@
/*
* DirectBufferPool.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
/**
* A singleton that manages a pool of {@link DirectByteBuffer}, that will be
* shared by the {@link DirectBufferIterator} instances. It is responsibilty of
* user to return the borrowed buffers.
*/
class DirectBufferPool {
static final DirectBufferPool __instance = new DirectBufferPool();
// When tuning this, make sure that the size of the buffer,
// is always greater than the maximum size KV allowed by FDB.
// Current limits is :
// 10kB for key + 100kB for value + 1 int for count + 1 int for more + 2 int for KV size
static public final int MIN_BUFFER_SIZE = (10 + 100) * 1000 + Integer.BYTES * 4;
static private final int DEFAULT_NUM_BUFFERS = 128;
static private final int DEFAULT_BUFFER_SIZE = 1024 * 512;
private ArrayBlockingQueue<ByteBuffer> buffers;
private int currentBufferCapacity;
public DirectBufferPool() {
resize(DEFAULT_NUM_BUFFERS, DEFAULT_BUFFER_SIZE);
}
public static DirectBufferPool getInstance() {
return __instance;
}
/**
* Resizes buffer pool with given capacity and buffer size. Throws OutOfMemory exception
* if unable to allocate as asked.
*/
public synchronized void resize(int newPoolSize, int bufferSize) {
if (bufferSize < MIN_BUFFER_SIZE) {
throw new IllegalArgumentException("'bufferSize' must be at-least: " + MIN_BUFFER_SIZE + " bytes");
}
buffers = new ArrayBlockingQueue<>(newPoolSize);
currentBufferCapacity = bufferSize;
while (buffers.size() < newPoolSize) {
ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
buffers.add(buffer);
}
}
/**
* Requests a {@link DirectByteBuffer} from our pool. Returns null if pool is empty.
*/
public synchronized ByteBuffer poll() {
return buffers.poll();
}
/**
* Returns the {@link DirectByteBuffer} that was borrowed from our pool.
*/
public synchronized void add(ByteBuffer buffer) {
if (buffer.capacity() != currentBufferCapacity) {
// This can happen when a resize is called while there are outstanding requests,
// older buffers will be returned eventually.
return;
}
buffers.offer(buffer);
}
}

View File

@ -85,6 +85,8 @@ public class FDB {
private volatile boolean netStarted = false;
private volatile boolean netStopped = false;
volatile boolean warnOnUnclosed = true;
private boolean enableDirectBufferQueries = false;
private boolean useShutdownHook = true;
private Thread shutdownHook;
private final Semaphore netRunning = new Semaphore(1);
@ -229,6 +231,35 @@ public class FDB {
return apiVersion;
}
/**
* Enables or disables use of DirectByteBuffers for getRange() queries.
*
* @param v Whether DirectByteBuffer should be used for getRange() queries.
*/
public void enableDirectBufferQuery(boolean enabled) {
enableDirectBufferQueries = enabled;
}
/**
* Determines whether getRange() queries can use {@link DirectByteBuffer} from
* {@link DirectBufferPool} to copy results.
*
* @return {@code true} if direct buffer queries have been enabled and {@code false} otherwise
*/
public boolean isDirectBufferQueriesEnabled() {
return enableDirectBufferQueries;
}
/**
* Resizes the DirectBufferPool with given parameters, which is used by getRange() requests.
*
* @param poolSize Number of buffers in pool
* @param bufferSize Size of each buffer in bytes
*/
public void resizeDirectBufferPool(int poolSize, int bufferSize) {
DirectBufferPool.getInstance().resize(poolSize, bufferSize);
}
/**
* Connects to the cluster specified by the
* <a href="/foundationdb/administration.html#default-cluster-file" target="_blank">default fdb.cluster file</a>.
@ -507,4 +538,4 @@ public class FDB {
private native boolean Error_predicate(int predicate, int code);
private native long Database_create(String clusterFilePath) throws FDBException;
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.nio.ByteBuffer;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncUtil;
@ -36,7 +37,6 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
private final TransactionOptions options;
private boolean transactionOwner;
public final ReadTransaction snapshot;
class ReadSnapshot implements ReadTransaction {
@ -369,10 +369,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
" -- range get: (%s, %s) limit: %d, bytes: %d, mode: %d, iteration: %d, snap: %s, reverse %s",
begin.toString(), end.toString(), rowLimit, targetBytes, streamingMode,
iteration, Boolean.toString(isSnapshot), Boolean.toString(reverse)));*/
return new FutureResults(Transaction_getRange(
getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
streamingMode, iteration, isSnapshot, reverse), executor);
return new FutureResults(
Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
streamingMode, iteration, isSnapshot, reverse),
FDB.instance().isDirectBufferQueriesEnabled(), executor);
} finally {
pointerReadLock.unlock();
}

View File

@ -20,12 +20,14 @@
package com.apple.foundationdb;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
class FutureResults extends NativeFuture<RangeResultInfo> {
FutureResults(long cPtr, Executor executor) {
FutureResults(long cPtr, boolean enableDirectBufferQueries, Executor executor) {
super(cPtr);
registerMarshalCallback(executor);
this.enableDirectBufferQueries = enableDirectBufferQueries;
}
@Override
@ -44,26 +46,28 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
return new RangeResultInfo(this);
}
public RangeResultSummary getSummary() {
try {
pointerReadLock.lock();
return FutureResults_getSummary(getPtr());
}
finally {
pointerReadLock.unlock();
}
}
public RangeResult getResults() {
ByteBuffer buffer = enableDirectBufferQueries
? DirectBufferPool.getInstance().poll()
: null;
try {
pointerReadLock.lock();
return FutureResults_get(getPtr());
}
finally {
if (buffer != null) {
try (DirectBufferIterator directIterator = new DirectBufferIterator(buffer)) {
FutureResults_getDirect(getPtr(), directIterator.getBuffer(), directIterator.getBuffer().capacity());
return new RangeResult(directIterator);
}
} else {
return FutureResults_get(getPtr());
}
} finally {
pointerReadLock.unlock();
}
}
private native RangeResultSummary FutureResults_getSummary(long ptr) throws FDBException;
private boolean enableDirectBufferQueries = false;
private native RangeResult FutureResults_get(long cPtr) throws FDBException;
private native void FutureResults_getDirect(long cPtr, ByteBuffer buffer, int capacity)
throws FDBException;
}

View File

@ -152,8 +152,6 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
@Override
public void accept(RangeResultInfo data, Throwable error) {
try {
final RangeResultSummary summary;
if(error != null) {
promise.completeExceptionally(error);
if(error instanceof Error) {
@ -163,7 +161,8 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
return;
}
summary = data.getSummary();
final RangeResult rangeResult = data.get();
final RangeResultSummary summary = rangeResult.getSummary();
if(summary.lastKey == null) {
promise.complete(Boolean.FALSE);
return;
@ -186,11 +185,11 @@ class RangeQuery implements AsyncIterable<KeyValue>, Iterable<KeyValue> {
// If this is the first fetch or the main chunk is exhausted
if(chunk == null || index == chunk.values.size()) {
nextChunk = null;
chunk = data.get();
chunk = rangeResult;
index = 0;
}
else {
nextChunk = data.get();
nextChunk = rangeResult;
}
}

View File

@ -51,4 +51,22 @@ class RangeResult {
}
this.more = more;
}
RangeResult(DirectBufferIterator iterator) {
iterator.readResultsSummary();
more = iterator.hasMore();
int count = iterator.count();
values = new ArrayList<KeyValue>(count);
for (int i = 0; i < count; ++i) {
values.add(iterator.next());
}
}
public RangeResultSummary getSummary() {
final int keyCount = values.size();
final byte[] lastKey = keyCount > 0 ? values.get(keyCount -1).getKey() : null;
return new RangeResultSummary(lastKey, keyCount, more);
}
}

View File

@ -21,10 +21,6 @@
package com.apple.foundationdb;
class RangeResultInfo {
RangeResultSummary getSummary() {
return f.getSummary();
}
RangeResult get() {
return f.getResults();
}

View File

@ -1610,9 +1610,10 @@ public:
std::string fullPath = joinPath(m_path, path);
#ifndef _WIN32
if(g_network->isSimulated()) {
if(!fileExists(fullPath))
if(!fileExists(fullPath)) {
throw file_not_found();
// std::string uniquePath = fullPath + "." + deterministicRandom()->randomUniqueID().toString() + ".lnk";
}
if (g_simulator.getCurrentProcess()->uid == UID()) {
TraceEvent(SevError, "BackupContainerReadFileOnUnsetProcessID");
}

View File

@ -4817,6 +4817,21 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
return Void();
}
ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req, PromiseStream<GetMetricsListRequest> getShardMetricsList) {
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
if(result.isError()) {
req.reply.sendError(result.getError());
} else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
req.reply.send(rep);
}
return Void();
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Future<Void> dbInfoChange = db->onChange();
if (!setDDEnabled(false, snapReq.snapUID)) {
@ -4940,16 +4955,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID);
break;
}
when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) {
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
if ( result.isError() ) {
req.reply.sendError(result.getError());
} else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
req.reply.send(rep);
}
when(GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture())) {
actors.add(ddGetMetrics(req, getShardMetricsList));
}
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
actors.add(ddSnapCreate(snapReq, db));

View File

@ -105,6 +105,10 @@ public:
// Free pageID to be used again after the commit that moves oldestVersion past v
virtual void freePage(LogicalPageID pageID, Version v) = 0;
// If id is remapped, delete the original as of version v and return the page it was remapped to. The caller
// is then responsible for referencing and deleting the returned page ID.
virtual LogicalPageID detachRemappedPage(LogicalPageID id, Version v) = 0;
// Returns the latest data (regardless of version) for a page by LogicalPageID
// The data returned will be the later of
// - the most recent committed atomic
@ -133,7 +137,7 @@ public:
virtual StorageBytes getStorageBytes() = 0;
// Count of pages in use by the pager client
// Count of pages in use by the pager client (including retained old page versions)
virtual Future<int64_t> getUserPageCount() = 0;
// Future returned is ready when pager has been initialized from disk and is ready for reads and writes.

View File

@ -141,8 +141,8 @@ struct ProxyStats {
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
specialCounter(cc, "Version", [pVersion](){return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });

View File

@ -92,7 +92,18 @@ std::string toString(LogicalPageID id) {
if (id == invalidLogicalPageID) {
return "LogicalPageID{invalid}";
}
return format("LogicalPageID{%" PRId64 "}", id);
return format("LogicalPageID{%u}", id);
}
std::string toString(Version v) {
if (v == invalidVersion) {
return "invalidVersion";
}
return format("@%" PRId64, v);
}
std::string toString(bool b) {
return b ? "true" : "false";
}
template <typename T>
@ -136,6 +147,11 @@ std::string toString(const Optional<T>& o) {
return "<not present>";
}
template <typename F, typename S>
std::string toString(const std::pair<F, S>& o) {
return format("{%s, %s}", toString(o.first).c_str(), toString(o.second).c_str());
}
// A FIFO queue of T stored as a linked list of pages.
// Main operations are pop(), pushBack(), pushFront(), and flush().
//
@ -765,6 +781,8 @@ struct RedwoodMetrics {
unsigned int lazyClearRequeueExt;
unsigned int lazyClearFree;
unsigned int lazyClearFreeExt;
unsigned int forceUpdate;
unsigned int detachChild;
double buildStoredPct;
double buildFillPct;
unsigned int buildItemCount;
@ -797,6 +815,12 @@ struct RedwoodMetrics {
unsigned int btreeLeafPreload;
unsigned int btreeLeafPreloadExt;
// Return number of pages read or written, from cache or disk
unsigned int pageOps() const {
// All page reads are either a cache hit, probe hit, or a disk read
return pagerDiskWrite + pagerDiskRead + pagerCacheHit + pagerProbeHit;
}
double startTime;
Level& level(unsigned int level) {
@ -807,9 +831,9 @@ struct RedwoodMetrics {
return levels[level - 1];
}
// This will populate a trace event and/or a string with Redwood metrics. The string is a
// reasonably well formatted page of information
void getFields(TraceEvent* e, std::string* s = nullptr) {
// This will populate a trace event and/or a string with Redwood metrics.
// The string is a reasonably well formatted page of information
void getFields(TraceEvent* e, std::string* s = nullptr, bool skipZeroes = false) {
std::pair<const char*, unsigned int> metrics[] = { { "BTreePreload", btreeLeafPreload },
{ "BTreePreloadExt", btreeLeafPreloadExt },
{ "", 0 },
@ -837,21 +861,26 @@ struct RedwoodMetrics {
{ "PagerRemapCopy", pagerRemapCopy },
{ "PagerRemapSkip", pagerRemapSkip } };
double elapsed = now() - startTime;
for (auto& m : metrics) {
if (*m.first == '\0') {
if (s != nullptr) {
*s += "\n";
}
} else {
if (s != nullptr) {
*s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed));
}
if (e != nullptr) {
if (e != nullptr) {
for (auto& m : metrics) {
char c = m.first[0];
if(c != 0 && (!skipZeroes || m.second != 0) ) {
e->detail(m.first, m.second);
}
}
}
if(s != nullptr) {
for (auto& m : metrics) {
if (*m.first == '\0') {
*s += "\n";
} else if(!skipZeroes || m.second != 0) {
*s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed));
}
}
}
for (int i = 0; i < btreeLevels; ++i) {
auto& level = levels[i];
std::pair<const char*, unsigned int> metrics[] = {
@ -869,37 +898,44 @@ struct RedwoodMetrics {
{ "LazyClear", level.lazyClearFree },
{ "LazyClearExt", level.lazyClearFreeExt },
{ "", 0 },
{ "ForceUpdate", level.forceUpdate },
{ "DetachChild", level.detachChild },
{ "", 0 },
{ "-BldAvgCount", level.pageBuild ? level.buildItemCount / level.pageBuild : 0 },
{ "-BldAvgFillPct", level.pageBuild ? level.buildFillPct / level.pageBuild * 100 : 0 },
{ "-BldAvgStoredPct", level.pageBuild ? level.buildStoredPct / level.pageBuild * 100 : 0 },
{ "", 0 },
{ "-ModAvgCount", level.pageModify ? level.modifyItemCount / level.pageModify : 0 },
{ "-ModAvgFillPct", level.pageModify ? level.modifyFillPct / level.pageModify * 100 : 0 },
{ "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 }
{ "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 },
{ "", 0 },
};
if(e != nullptr) {
for (auto& m : metrics) {
char c = m.first[0];
if(c != 0 && (!skipZeroes || m.second != 0) ) {
e->detail(format("L%d%s", i + 1, m.first + (c == '-' ? 1 : 0)), m.second);
}
}
}
if (s != nullptr) {
*s += format("\nLevel %d\n\t", i + 1);
}
for (auto& m : metrics) {
const char* name = m.first;
bool rate = elapsed != 0;
if (*name == '-') {
++name;
rate = false;
}
if (*name == '\0') {
if (s != nullptr) {
for (auto& m : metrics) {
const char* name = m.first;
bool rate = elapsed != 0;
if (*name == '-') {
++name;
rate = false;
}
if (*name == '\0') {
*s += "\n\t";
}
} else {
if (s != nullptr) {
} else if(!skipZeroes || m.second != 0) {
*s += format("%-15s %8u %8u/s ", name, m.second, rate ? int(m.second / elapsed) : 0);
}
if (e != nullptr) {
e->detail(format("L%d%s", i + 1, name), m.second);
}
}
}
}
@ -1124,22 +1160,32 @@ public:
};
struct RemappedPage {
RemappedPage() : version(invalidVersion) {}
RemappedPage(Version v, LogicalPageID o, LogicalPageID n) : version(v), originalPageID(o), newPageID(n) {}
enum Type { NONE = 'N', REMAP = 'R', FREE = 'F', DETACH = 'D' };
RemappedPage(Version v = invalidVersion, LogicalPageID o = invalidLogicalPageID, LogicalPageID n = invalidLogicalPageID) : version(v), originalPageID(o), newPageID(n) {}
Version version;
LogicalPageID originalPageID;
LogicalPageID newPageID;
bool isFree() const {
return newPageID == invalidLogicalPageID;
static Type getTypeOf(LogicalPageID newPageID) {
if(newPageID == invalidLogicalPageID) {
return FREE;
}
if(newPageID == 0) {
return DETACH;
}
return REMAP;
}
Type getType() const {
return getTypeOf(newPageID);
}
bool operator<(const RemappedPage& rhs) { return version < rhs.version; }
std::string toString() const {
return format("RemappedPage(%s -> %s @%" PRId64 "}", ::toString(originalPageID).c_str(),
::toString(newPageID).c_str(), version);
return format("RemappedPage(%c: %s -> %s %s}", getType(), ::toString(originalPageID).c_str(),
::toString(newPageID).c_str(), ::toString(version).c_str());
}
};
@ -1484,6 +1530,35 @@ public:
}
}
LogicalPageID detachRemappedPage(LogicalPageID pageID, Version v) override {
auto i = remappedPages.find(pageID);
if(i == remappedPages.end()) {
// Page is not remapped
return invalidLogicalPageID;
}
// Get the page that id was most recently remapped to
auto iLast = i->second.rbegin();
LogicalPageID newID = iLast->second;
ASSERT(RemappedPage::getTypeOf(newID) == RemappedPage::REMAP);
// If the last change remap was also at v then change the remap to a delete, as it's essentially
// the same as the original page being deleted at that version and newID being used from then on.
if(iLast->first == v) {
debug_printf("DWALPager(%s) op=detachDelete originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(),
toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion);
iLast->second = invalidLogicalPageID;
remapQueue.pushBack(RemappedPage{ v, pageID, invalidLogicalPageID });
} else {
debug_printf("DWALPager(%s) op=detach originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(),
toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion);
// Mark id as converted to its last remapped location as of v
i->second[v] = 0;
remapQueue.pushBack(RemappedPage{ v, pageID, 0 });
}
return newID;
}
void freePage(LogicalPageID pageID, Version v) override {
// If pageID has been remapped, then it can't be freed until all existing remaps for that page have been undone,
// so queue it for later deletion
@ -1588,13 +1663,13 @@ public:
auto j = i->second.upper_bound(v);
if (j != i->second.begin()) {
--j;
debug_printf("DWALPager(%s) read %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(),
debug_printf("DWALPager(%s) op=readAtVersionRemapped %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(),
v, toString(j->second).c_str());
pageID = j->second;
ASSERT(pageID != invalidLogicalPageID);
}
} else {
debug_printf("DWALPager(%s) read %s @%" PRId64 " (not remapped)\n", filename.c_str(),
debug_printf("DWALPager(%s) op=readAtVersionNotRemapped %s @%" PRId64 " (not remapped)\n", filename.c_str(),
toString(pageID).c_str(), v);
}
@ -1623,29 +1698,126 @@ public:
return std::min(pLastCommittedHeader->oldestVersion, snapshots.front().version);
}
ACTOR static Future<Void> remapCopyAndFree(DWALPager* self, RemappedPage p, VersionToPageMapT *m, VersionToPageMapT::iterator i) {
debug_printf("DWALPager(%s) remapCleanup copyAndFree %s\n", self->filename.c_str(), p.toString().c_str());
ACTOR static Future<Void> removeRemapEntry(DWALPager* self, RemappedPage p, Version oldestRetainedVersion) {
// Get iterator to the versioned page map entry for the original page
state PageToVersionedMapT::iterator iPageMapPair = self->remappedPages.find(p.originalPageID);
// The iterator must be valid and not empty and its first page map entry must match p's version
ASSERT(iPageMapPair != self->remappedPages.end());
ASSERT(!iPageMapPair->second.empty());
state VersionToPageMapT::iterator iVersionPagePair = iPageMapPair->second.find(p.version);
ASSERT(iVersionPagePair != iPageMapPair->second.end());
// Read the data from the page that the original was mapped to
Reference<IPage> data = wait(self->readPage(p.newPageID, false));
RemappedPage::Type firstType = p.getType();
state RemappedPage::Type secondType;
bool secondAfterOldestRetainedVersion = false;
state bool deleteAtSameVersion = false;
if(p.newPageID == iVersionPagePair->second) {
auto nextEntry = iVersionPagePair;
++nextEntry;
if(nextEntry == iPageMapPair->second.end()) {
secondType = RemappedPage::NONE;
} else {
secondType = RemappedPage::getTypeOf(nextEntry->second);
secondAfterOldestRetainedVersion = nextEntry->first >= oldestRetainedVersion;
}
} else {
ASSERT(iVersionPagePair->second == invalidLogicalPageID);
secondType = RemappedPage::FREE;
deleteAtSameVersion = true;
}
ASSERT(firstType == RemappedPage::REMAP || secondType == RemappedPage::NONE);
// Write the data to the original page so it can be read using its original pageID
self->updatePage(p.originalPageID, data);
++g_redwoodMetrics.pagerRemapCopy;
// Scenarios and actions to take:
//
// The first letter (firstType) is the type of the entry just popped from the remap queue.
// The second letter (secondType) is the type of the next item in the queue for the same
// original page ID, if present. If not present, secondType will be NONE.
//
// Since the next item can be arbitrarily ahead in the queue, secondType is determined by
// looking at the remappedPages structure.
//
// R == Remap F == Free D == Detach | == oldestRetaineedVersion
//
// R R | free new ID
// R F | free new ID if R and D are at different versions
// R D | do nothing
// R | R copy new to original ID, free new ID
// R | F copy new to original ID, free new ID
// R | D copy new to original ID
// R | copy new to original ID, free new ID
// F | free original ID
// D | free original ID
//
// Note that
//
// Special case: Page is detached while it is being read in remapCopyAndFree()
// Initial state: R |
// Start remapCopyAndFree(), intending to copy new, ID to originalID and free newID
// New state: R | D
// Read of newID completes.
// Copy new contents over original, do NOT free new ID
// Later popped state: D |
// free original ID
//
state bool freeNewID = (firstType == RemappedPage::REMAP && secondType != RemappedPage::DETACH && !deleteAtSameVersion);
state bool copyNewToOriginal = (firstType == RemappedPage::REMAP && (secondAfterOldestRetainedVersion || secondType == RemappedPage::NONE));
state bool freeOriginalID = (firstType == RemappedPage::FREE || firstType == RemappedPage::DETACH);
// Now that the page data has been copied to the original page, the versioned page map entry is no longer
// needed and the new page ID can be freed as of the next commit.
m->erase(i);
self->freeUnmappedPage(p.newPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
debug_printf("DWALPager(%s) remapCleanup %s secondType=%c mapEntry=%s oldestRetainedVersion=%" PRId64 " \n",
self->filename.c_str(), p.toString().c_str(), secondType, ::toString(*iVersionPagePair).c_str(), oldestRetainedVersion);
if(copyNewToOriginal) {
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
// Read the data from the page that the original was mapped to
Reference<IPage> data = wait(self->readPage(p.newPageID, false, true));
// Write the data to the original page so it can be read using its original pageID
self->updatePage(p.originalPageID, data);
++g_redwoodMetrics.pagerRemapCopy;
} else if (firstType == RemappedPage::REMAP) {
++g_redwoodMetrics.pagerRemapSkip;
}
// Now that the page contents have been copied to the original page, if the corresponding map entry
// represented the remap and there wasn't a delete later in the queue at p for the same version then
// erase the entry.
if(!deleteAtSameVersion) {
debug_printf("DWALPager(%s) remapCleanup deleting map entry %s\n", self->filename.c_str(), p.toString().c_str());
// Erase the entry and set iVersionPagePair to the next entry or end
iVersionPagePair = iPageMapPair->second.erase(iVersionPagePair);
// If the map is now empty, delete it
if(iPageMapPair->second.empty()) {
debug_printf("DWALPager(%s) remapCleanup deleting empty map %s\n", self->filename.c_str(), p.toString().c_str());
self->remappedPages.erase(iPageMapPair);
} else if(freeNewID && secondType == RemappedPage::NONE && iVersionPagePair != iPageMapPair->second.end() && RemappedPage::getTypeOf(iVersionPagePair->second) == RemappedPage::DETACH) {
// If we intend to free the new ID and there was no map entry, one could have been added during the wait above.
// If so, and if it was a detach operation, then we can't free the new page ID as its lifetime will be managed
// by the client starting at some later version.
freeNewID = false;
}
}
if(freeNewID) {
debug_printf("DWALPager(%s) remapCleanup freeNew %s\n", self->filename.c_str(), p.toString().c_str());
self->freeUnmappedPage(p.newPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
}
if(freeOriginalID) {
debug_printf("DWALPager(%s) remapCleanup freeOriginal %s\n", self->filename.c_str(), p.toString().c_str());
self->freeUnmappedPage(p.originalPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
}
return Void();
}
ACTOR static Future<Void> remapCleanup(DWALPager* self) {
state ActorCollection copies(true);
state ActorCollection tasks(true);
state Promise<Void> signal;
copies.add(signal.getFuture());
tasks.add(signal.getFuture());
self->remapCleanupStop = false;
@ -1654,8 +1826,7 @@ public:
state Version oldestRetainedVersion = self->effectiveOldestVersion();
// Cutoff is the version we can pop to
state RemappedPage cutoff;
cutoff.version = oldestRetainedVersion - self->remapCleanupWindow;
state RemappedPage cutoff(oldestRetainedVersion - self->remapCleanupWindow);
// Minimum version we must pop to before obeying stop command.
state Version minStopVersion = cutoff.version - (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG);
@ -1663,46 +1834,15 @@ public:
loop {
state Optional<RemappedPage> p = wait(self->remapQueue.pop(cutoff));
debug_printf("DWALPager(%s) remapCleanup popped %s\n", self->filename.c_str(), ::toString(p).c_str());
// Stop if we have reached the cutoff version, which is the start of the cleanup coalescing window
if (!p.present()) {
break;
}
// Get iterator to the versioned page map entry for the original page
auto iPageMapPair = self->remappedPages.find(p.get().originalPageID);
// The iterator must be valid and not empty and its first page map entry must match p's version
ASSERT(iPageMapPair != self->remappedPages.end());
ASSERT(!iPageMapPair->second.empty());
auto iVersionPagePair = iPageMapPair->second.begin();
ASSERT(iVersionPagePair->first == p.get().version);
// If this is a free page entry then free the original page ID
if(p.get().isFree()) {
debug_printf("DWALPager(%s) remapCleanup free %s\n", self->filename.c_str(),
p.get().toString().c_str());
self->freeUnmappedPage(p.get().originalPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
// There can't be any more entries in the page map after this one so verify that
// the map size is 1 and erase the map for p's original page ID.
ASSERT(iPageMapPair->second.size() == 1);
self->remappedPages.erase(iPageMapPair);
}
else {
// If there is no next page map entry or there is but it is after the oldest retained version
// then p must be copied to unmap it.
auto iNextVersionPagePair = iVersionPagePair;
++iNextVersionPagePair;
if(iNextVersionPagePair == iPageMapPair->second.end() || iNextVersionPagePair->first > oldestRetainedVersion) {
// Copy the remapped page to the original so it can be freed.
copies.add(remapCopyAndFree(self, p.get(), &iPageMapPair->second, iVersionPagePair));
}
else {
debug_printf("DWALPager(%s) remapCleanup skipAndFree %s\n", self->filename.c_str(), p.get().toString().c_str());
self->freeUnmappedPage(p.get().newPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
++g_redwoodMetrics.pagerRemapSkip;
iPageMapPair->second.erase(iVersionPagePair);
}
Future<Void> task = removeRemapEntry(self, p.get(), oldestRetainedVersion);
if(!task.isReady()) {
tasks.add(task);
}
// If the stop flag is set and we've reached the minimum stop version according the the allowed lag then stop.
@ -1713,7 +1853,7 @@ public:
debug_printf("DWALPager(%s) remapCleanup stopped (stop=%d)\n", self->filename.c_str(), self->remapCleanupStop);
signal.send(Void());
wait(copies.getResult());
wait(tasks.getResult());
return Void();
}
@ -1889,8 +2029,7 @@ public:
Future<int64_t> getUserPageCount() override {
return map(getUserPageCount_cleanup(this), [=](Void) {
int64_t userPages = pHeader->pageCount - 2 - freeList.numPages - freeList.numEntries -
delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages
- remapQueue.numEntries;
delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages;
debug_printf("DWALPager(%s) userPages=%" PRId64 " totalPageCount=%" PRId64 " freeQueuePages=%" PRId64
" freeQueueCount=%" PRId64 " delayedFreeQueuePages=%" PRId64 " delayedFreeQueueCount=%" PRId64
@ -2871,6 +3010,38 @@ public:
typedef FIFOQueue<LazyClearQueueEntry> LazyClearQueueT;
struct ParentInfo {
ParentInfo() {
count = 0;
bits = 0;
}
void clear() {
count = 0;
bits = 0;
}
static uint32_t mask(LogicalPageID id) {
return 1 << (id & 31);
}
void pageUpdated(LogicalPageID child) {
auto m = mask(child);
if((bits & m) == 0) {
bits |= m;
++count;
}
}
bool maybeUpdated(LogicalPageID child) {
return (mask(child) & bits) != 0;
}
uint32_t bits;
int count;
};
typedef std::unordered_map<LogicalPageID, ParentInfo> ParentInfoMapT;
#pragma pack(push, 1)
struct MetaKey {
static constexpr int FORMAT_VERSION = 8;
@ -2924,8 +3095,8 @@ public:
// durable once the following call to commit() returns
void set(KeyValueRef keyValue) {
++g_redwoodMetrics.opSet;
++g_redwoodMetrics.opSetKeyBytes += keyValue.key.size();
++g_redwoodMetrics.opSetValueBytes += keyValue.value.size();
g_redwoodMetrics.opSetKeyBytes += keyValue.key.size();
g_redwoodMetrics.opSetValueBytes += keyValue.value.size();
m_pBuffer->insert(keyValue.key).mutation().setBoundaryValue(m_pBuffer->copyToArena(keyValue.value));
}
@ -3025,7 +3196,7 @@ public:
// If this page is height 2, then the children are leaves so free them directly
if (btPage.height == 2) {
debug_printf("LazyClear: freeing child %s\n", toString(btChildPageID).c_str());
self->freeBtreePage(btChildPageID, v);
self->freeBTreePage(btChildPageID, v);
freedPages += btChildPageID.size();
metrics.lazyClearFree += 1;
metrics.lazyClearFreeExt += (btChildPageID.size() - 1);
@ -3044,7 +3215,7 @@ public:
// Free the page, now that its children have either been freed or queued
debug_printf("LazyClear: freeing queue entry %s\n", toString(entry.pageID).c_str());
self->freeBtreePage(entry.pageID, v);
self->freeBTreePage(entry.pageID, v);
freedPages += entry.pageID.size();
metrics.lazyClearFree += 1;
metrics.lazyClearFreeExt += entry.pageID.size() - 1;
@ -3149,7 +3320,7 @@ public:
return commit_impl(this);
}
ACTOR static Future<Void> destroyAndCheckSanity_impl(VersionedBTree* self) {
ACTOR static Future<Void> clearAllAndCheckSanity_impl(VersionedBTree* self) {
ASSERT(g_network->isSimulated());
debug_printf("Clearing tree.\n");
@ -3194,7 +3365,7 @@ public:
return Void();
}
Future<Void> destroyAndCheckSanity() { return destroyAndCheckSanity_impl(this); }
Future<Void> clearAllAndCheckSanity() { return clearAllAndCheckSanity_impl(this); }
private:
// Represents a change to a single key - set, clear, or atomic op
@ -3415,6 +3586,8 @@ private:
Future<Void> m_init;
std::string m_name;
int m_blockSize;
std::unordered_map<LogicalPageID, ParentInfo> parents;
ParentInfoMapT childUpdateTracker;
// MetaKey changes size so allocate space for it to expand into
union {
@ -3606,7 +3779,7 @@ private:
// must be rewritten anyway to count for the change in child count or child links.
// Free the old IDs, but only once (before the first output record is added).
if (records.empty()) {
self->freeBtreePage(previousID, v);
self->freeBTreePage(previousID, v);
}
for (p = 0; p < pages.size(); ++p) {
LogicalPageID id = wait(self->m_pager->newPageID());
@ -3774,7 +3947,7 @@ private:
}
}
void freeBtreePage(BTreePageIDRef btPageID, Version v) {
void freeBTreePage(BTreePageIDRef btPageID, Version v) {
// Free individual pages at v
for (LogicalPageID id : btPageID) {
m_pager->freePage(id, v);
@ -3783,7 +3956,7 @@ private:
// Write new version of pageID at version v using page as its data.
// Attempts to reuse original id(s) in btPageID, returns BTreePageID.
ACTOR static Future<BTreePageIDRef> updateBtreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena,
ACTOR static Future<BTreePageIDRef> updateBTreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena,
Reference<IPage> page, Version writeVersion) {
state BTreePageIDRef newID;
newID.resize(*arena, oldID.size());
@ -3881,19 +4054,23 @@ private:
// If the last record in the range has a null link then this will be null.
const RedwoodRecordRef* expectedUpperBound;
bool inPlaceUpdate;
// CommitSubtree will call one of the following three functions based on its exit path
// Subtree was cleared.
void cleared() {
inPlaceUpdate = false;
childrenChanged = true;
expectedUpperBound = nullptr;
}
// Page was updated in-place through edits and written to maybeNewID
void updatedInPlace(BTreePageIDRef maybeNewID, BTreePage* btPage, int capacity) {
inPlaceUpdate = true;
auto& metrics = g_redwoodMetrics.level(btPage->height);
metrics.pageModify += 1;
metrics.pageModify += (maybeNewID.size() - 1);
metrics.pageModifyExt += (maybeNewID.size() - 1);
metrics.modifyFillPct += (double)btPage->size() / capacity;
metrics.modifyStoredPct += (double)btPage->kvBytes / capacity;
metrics.modifyItemCount += btPage->tree().numItems;
@ -3915,6 +4092,7 @@ private:
// writePages() was used to build 1 or more replacement pages.
void rebuilt(Standalone<VectorRef<RedwoodRecordRef>> newRecords) {
inPlaceUpdate = false;
newLinks = newRecords;
childrenChanged = true;
@ -3955,14 +4133,15 @@ private:
struct InternalPageModifier {
InternalPageModifier() {}
InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating)
: btPage(p), m(m), updating(updating), changesMade(false) {}
InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating, ParentInfo *parentInfo)
: btPage(p), m(m), updating(updating), changesMade(false), parentInfo(parentInfo) {}
bool updating;
BTreePage* btPage;
BTreePage::BinaryTree::Mirror* m;
Standalone<VectorRef<RedwoodRecordRef>> rebuild;
bool changesMade;
ParentInfo *parentInfo;
bool empty() const {
if (updating) {
@ -4058,6 +4237,13 @@ private:
// endpoint.
changesMade = true;
} else {
if(u.inPlaceUpdate) {
for(auto id : u.decodeLowerBound->getChildPage()) {
parentInfo->pageUpdated(id);
}
}
keep(u.cBegin, u.cEnd);
}
@ -4229,7 +4415,7 @@ private:
debug_printf("%s Inserted %s [mutation, boundary start]\n", context.c_str(),
rec.toString().c_str());
} else {
debug_printf("%s Inserted failed for %s [mutation, boundary start]\n", context.c_str(),
debug_printf("%s Insert failed for %s [mutation, boundary start]\n", context.c_str(),
rec.toString().c_str());
switchToLinearMerge();
}
@ -4342,12 +4528,12 @@ private:
// If the tree is now empty, delete the page
if (deltaTree.numItems == 0) {
update->cleared();
self->freeBtreePage(rootID, writeVersion);
self->freeBTreePage(rootID, writeVersion);
debug_printf("%s Page updates cleared all entries, returning %s\n", context.c_str(),
toString(*update).c_str());
} else {
// Otherwise update it.
BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(),
BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(),
page.castTo<IPage>(), writeVersion));
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
@ -4360,7 +4546,7 @@ private:
// If everything in the page was deleted then this page should be deleted as of the new version
if (merged.empty()) {
update->cleared();
self->freeBtreePage(rootID, writeVersion);
self->freeBTreePage(rootID, writeVersion);
debug_printf("%s All leaf page contents were cleared, returning %s\n", context.c_str(),
toString(*update).c_str());
@ -4514,7 +4700,7 @@ private:
if (btPage->height == 2) {
debug_printf("%s: freeing child page in cleared subtree range: %s\n",
context.c_str(), ::toString(rec.getChildPage()).c_str());
self->freeBtreePage(rec.getChildPage(), writeVersion);
self->freeBTreePage(rec.getChildPage(), writeVersion);
} else {
debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n",
context.c_str(), ::toString(rec.getChildPage()).c_str());
@ -4550,7 +4736,10 @@ private:
wait(waitForAll(recursions));
debug_printf("%s Recursions done, processing slice updates.\n", context.c_str());
state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate);
// Note: parentInfo could be invalid after a wait and must be re-initialized.
// All uses below occur before waits so no reinitialization is done.
state ParentInfo *parentInfo = &self->childUpdateTracker[rootID.front()];
state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate, parentInfo);
// Apply the possible changes for each subtree range recursed to, except the last one.
// For each range, the expected next record, if any, is checked against the first boundary
@ -4568,25 +4757,103 @@ private:
context.c_str(), m.changesMade, update->toString().c_str());
m.applyUpdate(*slices.back(), m.changesMade ? update->subtreeUpperBound : update->decodeUpperBound);
state bool detachChildren = (parentInfo->count > 2);
state bool forceUpdate = false;
if(!m.changesMade && detachChildren) {
debug_printf("%s Internal page forced rewrite because at least %d children have been updated in-place.\n", context.c_str(), parentInfo->count);
forceUpdate = true;
if(!m.updating) {
page = self->cloneForUpdate(page);
cursor = getCursor(page);
btPage = (BTreePage*)page->begin();
m.btPage = btPage;
m.m = cursor.mirror;
m.updating = true;
}
++g_redwoodMetrics.level(btPage->height).forceUpdate;
}
// If page contents have changed
if (m.changesMade) {
if ((m.empty())) {
if (m.changesMade || forceUpdate) {
if (m.empty()) {
update->cleared();
debug_printf("%s All internal page children were deleted so deleting this page too, returning %s\n",
context.c_str(), toString(*update).c_str());
self->freeBtreePage(rootID, writeVersion);
self->freeBTreePage(rootID, writeVersion);
self->childUpdateTracker.erase(rootID.front());
} else {
if (m.updating) {
// Page was updated in place
BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(),
// Page was updated in place (or being forced to be updated in place to update child page ids)
debug_printf("%s Internal page modified in-place tryUpdate=%d forceUpdate=%d detachChildren=%d\n", context.c_str(), tryToUpdate, forceUpdate, detachChildren);
if(detachChildren) {
int detached = 0;
cursor.moveFirst();
auto &stats = g_redwoodMetrics.level(btPage->height);
while(cursor.valid()) {
if(cursor.get().value.present()) {
for(auto &p : cursor.get().getChildPage()) {
if(parentInfo->maybeUpdated(p)) {
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
if(newID != invalidLogicalPageID) {
debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
p = newID;
++stats.detachChild;
++detached;
}
}
}
}
cursor.moveNext();
}
parentInfo->clear();
if(forceUpdate && detached == 0) {
debug_printf("%s No children detached during forced update, returning %s\n", context.c_str(), toString(*update).c_str());
return Void();
}
}
BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(),
page.castTo<IPage>(), writeVersion));
debug_printf(
"%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n", context.c_str(), toString(writeVersion).c_str(),
btPage->toString(false, newID, snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound)
.c_str());
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
debug_printf("%s Internal page updated in-place, returning %s\n", context.c_str(),
toString(*update).c_str());
} else {
// Page was rebuilt, possibly split.
debug_printf("%s Internal page modified, creating replacements.\n", context.c_str());
debug_printf("%s Internal page could not be modified, rebuilding replacement(s).\n", context.c_str());
if(detachChildren) {
auto &stats = g_redwoodMetrics.level(btPage->height);
for(auto &rec : m.rebuild) {
if(rec.value.present()) {
BTreePageIDRef oldPages = rec.getChildPage();
BTreePageIDRef newPages;
for(int i = 0; i < oldPages.size(); ++i) {
LogicalPageID p = oldPages[i];
if(parentInfo->maybeUpdated(p)) {
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
if(newID != invalidLogicalPageID) {
// Rebuild record values reference original page memory so make a copy
if(newPages.empty()) {
newPages = BTreePageIDRef(m.rebuild.arena(), oldPages);
rec.setChildPage(newPages);
}
debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
newPages[i] = newID;
++stats.detachChild;
}
}
}
}
}
parentInfo->clear();
}
Standalone<VectorRef<RedwoodRecordRef>> newChildEntries =
wait(writePages(self, update->subtreeLowerBound, update->subtreeUpperBound, m.rebuild,
@ -4988,7 +5255,7 @@ public:
bool isValid() const { return valid; }
std::string toString() const {
std::string r;
std::string r = format("{ptr=%p %s ", this, ::toString(pager->getVersion()).c_str());
for (int i = 0; i < path.size(); ++i) {
r += format("[%d/%d: %s] ", i + 1, path.size(),
path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage->isLeaf()).c_str()
@ -4997,6 +5264,7 @@ public:
if (!valid) {
r += " (invalid) ";
}
r += "}";
return r;
}
@ -5017,6 +5285,8 @@ public:
const RedwoodRecordRef& upperBound) {
Reference<const IPage>& page = pages[id.front()];
if (page.isValid()) {
// The pager won't see this access so count it as a cache hit
++g_redwoodMetrics.pagerCacheHit;
path.push_back(arena, { (BTreePage*)page->begin(), getCursor(page) });
return Void();
}
@ -6958,24 +7228,23 @@ TEST_CASE("!/redwood/correctness/btree") {
state int pageSize =
shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
state int64_t targetPageOps = shortTest ? 50000 : 1000000;
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .01);
state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2);
state int maxValueSize = randomSize(pageSize * 25);
state int maxCommitSize = shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6));
state int mutationBytesTarget =
shortTest ? 100000 : randomSize(std::min<int>(maxCommitSize * 100, pageSize * 100000));
state double clearProbability = deterministicRandom()->random01() * .1;
state double clearSingleKeyProbability = deterministicRandom()->random01();
state double clearPostSetProbability = deterministicRandom()->random01() * .1;
state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3);
state double advanceOldVersionProbability = deterministicRandom()->random01();
state double maxDuration = 60;
state int64_t cacheSizeBytes =
pagerMemoryOnly ? 2e9 : (BUGGIFY ? deterministicRandom()->randomInt(1, 10 * pageSize) : 0);
state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8);
state Version remapCleanupWindow = deterministicRandom()->randomInt64(0, versionIncrement * 50);
printf("\n");
printf("targetPageOps: %" PRId64 "\n", targetPageOps);
printf("pagerMemoryOnly: %d\n", pagerMemoryOnly);
printf("serialTest: %d\n", serialTest);
printf("shortTest: %d\n", shortTest);
@ -6983,7 +7252,6 @@ TEST_CASE("!/redwood/correctness/btree") {
printf("maxKeySize: %d\n", maxKeySize);
printf("maxValueSize: %d\n", maxValueSize);
printf("maxCommitSize: %d\n", maxCommitSize);
printf("mutationBytesTarget: %d\n", mutationBytesTarget);
printf("clearProbability: %f\n", clearProbability);
printf("clearSingleKeyProbability: %f\n", clearSingleKeyProbability);
printf("clearPostSetProbability: %f\n", clearPostSetProbability);
@ -6998,8 +7266,6 @@ TEST_CASE("!/redwood/correctness/btree") {
deleteFile(pagerFile);
printf("Initializing...\n");
state double startTime = now();
pager = new DWALPager(pageSize, pagerFile, cacheSizeBytes, remapCleanupWindow, pagerMemoryOnly);
state VersionedBTree* btree = new VersionedBTree(pager, pagerFile);
wait(btree->init());
@ -7026,14 +7292,12 @@ TEST_CASE("!/redwood/correctness/btree") {
state PromiseStream<Version> committedVersions;
state Future<Void> verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
state Future<Void> randomTask = serialTest ? Void() : (randomReader(btree) || btree->getError());
committedVersions.send(lastVer);
state Future<Void> commit = Void();
state int64_t totalPageOps = 0;
while (mutationBytes.get() < mutationBytesTarget && (now() - startTime) < maxDuration) {
if (now() - startTime > 600) {
mutationBytesTarget = mutationBytes.get();
}
while (totalPageOps < targetPageOps) {
// Sometimes increment the version
if (deterministicRandom()->random01() < 0.10) {
++version;
@ -7129,14 +7393,12 @@ TEST_CASE("!/redwood/correctness/btree") {
}
// Commit at end or after this commit's mutation bytes are reached
if (mutationBytes.get() >= mutationBytesTarget || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
if (totalPageOps >= targetPageOps || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
// Wait for previous commit to finish
wait(commit);
printf("Committed. Next commit %d bytes, %" PRId64
"/%d (%.2f%%) Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n",
mutationBytesThisCommit, mutationBytes.get(), mutationBytesTarget,
(double)mutationBytes.get() / mutationBytesTarget * 100,
(keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6,
printf("Committed. Next commit %d bytes, %" PRId64 " bytes.", mutationBytesThisCommit, mutationBytes.get());
printf(" Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n",
(keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6,
mutationBytes.rate() / 1e6);
Version v = version; // Avoid capture of version as a member of *this
@ -7149,8 +7411,12 @@ TEST_CASE("!/redwood/correctness/btree") {
btree->getOldestVersion() + 1));
}
commit = map(btree->commit(), [=](Void) {
commit = map(btree->commit(), [=,&ops=totalPageOps](Void) {
// Update pager ops before clearing metrics
ops += g_redwoodMetrics.pageOps();
printf("PageOps %" PRId64 "/%" PRId64 " (%.2f%%)\n", ops, targetPageOps, ops * 100.0 / targetPageOps);
printf("Committed:\n%s\n", g_redwoodMetrics.toString(true).c_str());
// Notify the background verifier that version is committed and therefore readable
committedVersions.send(v);
return Void();
@ -7200,6 +7466,7 @@ TEST_CASE("!/redwood/correctness/btree") {
committedVersions = PromiseStream<Version>();
verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
randomTask = randomReader(btree) || btree->getError();
committedVersions.send(v);
}
version += versionIncrement;
@ -7207,7 +7474,7 @@ TEST_CASE("!/redwood/correctness/btree") {
}
// Check for errors
if (errorCount != 0) throw internal_error();
ASSERT(errorCount == 0);
}
debug_printf("Waiting for outstanding commit\n");
@ -7218,11 +7485,18 @@ TEST_CASE("!/redwood/correctness/btree") {
wait(verifyTask);
// Check for errors
if (errorCount != 0) throw internal_error();
ASSERT(errorCount == 0);
wait(btree->destroyAndCheckSanity());
// Reopen pager and btree with a remap cleanup window of 0 to reclaim all old pages
state Future<Void> closedFuture = btree->onClosed();
btree->close();
wait(closedFuture);
btree = new VersionedBTree(new DWALPager(pageSize, pagerFile, cacheSizeBytes, 0), pagerFile);
wait(btree->init());
Future<Void> closedFuture = btree->onClosed();
wait(btree->clearAllAndCheckSanity());
closedFuture = btree->onClosed();
btree->close();
debug_printf("Closing.\n");
wait(closedFuture);
@ -7328,7 +7602,7 @@ TEST_CASE("!/redwood/performance/set") {
state int minValueSize = 100;
state int maxValueSize = 500;
state int minConsecutiveRun = 1;
state int maxConsecutiveRun = 10;
state int maxConsecutiveRun = 100000;
state char firstKeyChar = 'a';
state char lastKeyChar = 'm';
state Version remapCleanupWindow = SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW;