commit
61fcce08cc
|
@ -35,6 +35,7 @@ import com.apple.foundationdb.Transaction;
|
|||
*
|
||||
*/
|
||||
public class ByteArrayUtil extends FastByteComparisons {
|
||||
private static final byte[] EMPTY_BYTES = new byte[0];
|
||||
|
||||
/**
|
||||
* Joins a set of byte arrays into a larger array. The {@code interlude} is placed
|
||||
|
@ -45,36 +46,46 @@ public class ByteArrayUtil extends FastByteComparisons {
|
|||
* concatenated elements.
|
||||
* @param parts the pieces to be joined. May be {@code null}, but does not allow
|
||||
* for elements in the list to be {@code null}.
|
||||
*
|
||||
*
|
||||
* @return a newly created concatenation of the input
|
||||
*/
|
||||
public static byte[] join(byte[] interlude, List<byte[]> parts) {
|
||||
return interludeJoin(interlude, parts.toArray(new byte[0][]));
|
||||
}
|
||||
/**
|
||||
* Joins a set of byte arrays into a larger array. The {@code interlude} is placed
|
||||
* between each of the elements, but not at the beginning or end. In the case that
|
||||
* the list is empty or {@code null}, a zero-length byte array will be returned.
|
||||
*
|
||||
* @param interlude can be {@code null} or zero length. Placed internally between
|
||||
* concatenated elements.
|
||||
* @param parts the pieces to be joined. May be {@code null}, but does not allow
|
||||
* for elements in the array to be {@code null}.
|
||||
*
|
||||
* @return a newly created concatenation of the input
|
||||
*/
|
||||
public static byte[] interludeJoin(byte[] interlude, byte[][] parts) {
|
||||
if(parts == null)
|
||||
return new byte[0];
|
||||
int partCount = parts.size();
|
||||
int partCount = parts.length;
|
||||
if(partCount == 0)
|
||||
return new byte[0];
|
||||
return EMPTY_BYTES;
|
||||
|
||||
if(interlude == null)
|
||||
interlude = new byte[0];
|
||||
interlude = EMPTY_BYTES;
|
||||
|
||||
int elementTotals = 0;
|
||||
int interludeSize = interlude.length;
|
||||
for(byte[] e : parts) {
|
||||
elementTotals += e.length;
|
||||
for (int i = 0; i < partCount; i++) {
|
||||
elementTotals += parts[i].length;
|
||||
}
|
||||
|
||||
byte[] dest = new byte[(interludeSize * (partCount - 1)) + elementTotals];
|
||||
|
||||
//System.out.println(" interlude -> " + ArrayUtils.printable(interlude));
|
||||
|
||||
int startByte = 0;
|
||||
int index = 0;
|
||||
for(byte[] part : parts) {
|
||||
//System.out.println(" section -> " + ArrayUtils.printable(parts.get(i)));
|
||||
int length = part.length;
|
||||
for (int i = 0; i < partCount; i++) {
|
||||
int length = parts[i].length;
|
||||
if(length > 0) {
|
||||
System.arraycopy(part, 0, dest, startByte, length);
|
||||
System.arraycopy(parts[i], 0, dest, startByte, length);
|
||||
startByte += length;
|
||||
}
|
||||
if(index < partCount - 1 && interludeSize > 0) {
|
||||
|
@ -84,8 +95,6 @@ public class ByteArrayUtil extends FastByteComparisons {
|
|||
}
|
||||
index++;
|
||||
}
|
||||
|
||||
//System.out.println(" complete -> " + ArrayUtils.printable(dest));
|
||||
return dest;
|
||||
}
|
||||
|
||||
|
@ -97,7 +106,7 @@ public class ByteArrayUtil extends FastByteComparisons {
|
|||
* @return a newly created concatenation of the input
|
||||
*/
|
||||
public static byte[] join(byte[]... parts) {
|
||||
return join(null, Arrays.asList(parts));
|
||||
return interludeJoin(null, parts);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -74,6 +74,14 @@ services:
|
|||
<<: *snapshot-bindings-cmake
|
||||
|
||||
|
||||
snapshot-cmake: &snapshot-testpackages
|
||||
<<: *build-setup
|
||||
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DFDB_RELEASE=0 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}"'
|
||||
|
||||
prb-testpackages:
|
||||
<<: *snapshot-testpackages
|
||||
|
||||
|
||||
snapshot-ctest: &snapshot-ctest
|
||||
<<: *build-setup
|
||||
command: scl enable devtoolset-8 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -G "Ninja" -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && ninja -v -j "$${MAKEJOBS}" && ctest -L fast -j "$${MAKEJOBS}" --output-on-failure'
|
||||
|
|
|
@ -883,7 +883,7 @@ public:
|
|||
}
|
||||
TraceEvent t(SevWarn, "FileBackupError");
|
||||
t.error(e).detail("BackupUID", uid).detail("Description", details).detail("TaskInstance", (uint64_t)taskInstance);
|
||||
// These should not happen
|
||||
// key_not_found could happen
|
||||
if(e.code() == error_code_key_not_found)
|
||||
t.backtrace();
|
||||
|
||||
|
|
|
@ -235,7 +235,7 @@ public:
|
|||
}
|
||||
TraceEvent t(SevWarn, "FileRestoreError");
|
||||
t.error(e).detail("RestoreUID", uid).detail("Description", details).detail("TaskInstance", (uint64_t)taskInstance);
|
||||
// These should not happen
|
||||
// key_not_found could happen
|
||||
if(e.code() == error_code_key_not_found)
|
||||
t.backtrace();
|
||||
|
||||
|
|
|
@ -629,6 +629,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
|
||||
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
|
||||
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
|
||||
init( REDWOOD_COMMIT_CONCURRENT_READS, 64 );
|
||||
init( REDWOOD_PAGE_REBUILD_FILL_FACTOR, 0.66 );
|
||||
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
|
||||
init( REDWOOD_LAZY_CLEAR_MIN_PAGES, 0 );
|
||||
|
|
|
@ -561,6 +561,7 @@ public:
|
|||
|
||||
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
|
||||
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
|
||||
int REDWOOD_COMMIT_CONCURRENT_READS; // Max number of concurrent reads done to support commit operations
|
||||
double REDWOOD_PAGE_REBUILD_FILL_FACTOR; // When rebuilding pages, start a new page after this capacity
|
||||
int REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES; // Number of pages to try to pop from the lazy delete queue and process at once
|
||||
int REDWOOD_LAZY_CLEAR_MIN_PAGES; // Minimum number of pages to free before ending a lazy clear cycle, unless the queue is empty
|
||||
|
|
|
@ -213,21 +213,6 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<Value>> getValue(Reference<ReadYourWritesTransaction> tr, Key key, int i,
|
||||
std::set<int>* keysNotFound) {
|
||||
try {
|
||||
Optional<Value> v = wait(tr->get(key));
|
||||
return v;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_key_not_found) {
|
||||
keysNotFound->insert(i);
|
||||
return Optional<Value>();
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
|
||||
ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, double delayTime, Database cx,
|
||||
|
@ -243,28 +228,23 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
.detail("BatchIndex", batchIndex)
|
||||
.detail("GetKeys", incompleteStagingKeys.size())
|
||||
.detail("DelayTime", delayTime);
|
||||
state std::set<int> keysNotFound;
|
||||
|
||||
state int i = 0;
|
||||
loop {
|
||||
try {
|
||||
int i = 0;
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
i = 0;
|
||||
for (auto& key : incompleteStagingKeys) {
|
||||
if (!keysNotFound.count(i)) { // only get exist-keys
|
||||
fValues[i] = getValue(tr, key.first, i, &keysNotFound);
|
||||
}
|
||||
++i;
|
||||
fValues[i++] = tr->get(key.first);
|
||||
}
|
||||
wait(waitForAll(fValues));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
bool ok = (e.code() != error_code_key_not_found);
|
||||
if (!ok || retries++ > incompleteStagingKeys.size()) {
|
||||
TraceEvent(!ok ? SevError : SevWarnAlways, "GetAndComputeStagingKeys", applierID)
|
||||
if (retries++ > incompleteStagingKeys.size()) {
|
||||
TraceEvent(SevWarnAlways, "GetAndComputeStagingKeys", applierID)
|
||||
.suppressFor(1.0)
|
||||
.detail("RandomUID", randomID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("KeyIndex", i)
|
||||
.error(e);
|
||||
}
|
||||
wait(tr->onError(e));
|
||||
|
@ -274,7 +254,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
ASSERT(fValues.size() == incompleteStagingKeys.size());
|
||||
int i = 0;
|
||||
for (auto& key : incompleteStagingKeys) {
|
||||
if (keysNotFound.count(i) || (!fValues[i].get().present())) { // Key not exist in DB
|
||||
if (!fValues[i].get().present()) { // Key not exist in DB
|
||||
// if condition: fValues[i].Valid() && fValues[i].isReady() && !fValues[i].isError() &&
|
||||
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
|
|
|
@ -122,7 +122,7 @@ Future<Void> RestoreConfigFR::logError(Database cx, Error e, std::string const&
|
|||
}
|
||||
TraceEvent t(SevWarn, "FileRestoreError");
|
||||
t.error(e).detail("RestoreUID", uid).detail("Description", details).detail("TaskInstance", (uint64_t)taskInstance);
|
||||
// These should not happen
|
||||
// key_not_found could happen
|
||||
if (e.code() == error_code_key_not_found) t.backtrace();
|
||||
|
||||
return updateErrorInfo(cx, e, details);
|
||||
|
|
|
@ -2993,7 +2993,8 @@ public:
|
|||
|
||||
VersionedBTree(IPager2* pager, std::string name)
|
||||
: m_pager(pager), m_writeVersion(invalidVersion), m_lastCommittedVersion(invalidVersion), m_pBuffer(nullptr),
|
||||
m_name(name) {
|
||||
m_commitReadLock(SERVER_KNOBS->REDWOOD_COMMIT_CONCURRENT_READS), m_name(name) {
|
||||
|
||||
m_lazyClearActor = 0;
|
||||
m_init = init_impl(this);
|
||||
m_latestCommit = m_init;
|
||||
|
@ -3440,6 +3441,7 @@ private:
|
|||
Version m_writeVersion;
|
||||
Version m_lastCommittedVersion;
|
||||
Version m_newOldestVersion;
|
||||
FlowLock m_commitReadLock;
|
||||
Future<Void> m_latestCommit;
|
||||
Future<Void> m_init;
|
||||
std::string m_name;
|
||||
|
@ -4131,8 +4133,13 @@ private:
|
|||
}
|
||||
|
||||
state Version writeVersion = self->getLastCommittedVersion() + 1;
|
||||
|
||||
wait(self->m_commitReadLock.take());
|
||||
state FlowLock::Releaser readLock(self->m_commitReadLock);
|
||||
state Reference<const IPage> page =
|
||||
wait(readPage(snapshot, rootID, update->decodeLowerBound, update->decodeUpperBound));
|
||||
readLock.release();
|
||||
|
||||
state BTreePage* btPage = (BTreePage*)page->begin();
|
||||
ASSERT(isLeaf == btPage->isLeaf());
|
||||
g_redwoodMetrics.level(btPage->height).pageCommitStart += 1;
|
||||
|
|
Loading…
Reference in New Issue