Merge pull request #2308 from etschannen/master

Merge 6.2 into master
This commit is contained in:
Evan Tschannen 2019-11-01 15:54:32 -07:00 committed by GitHub
commit b326d26a47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 93 additions and 48 deletions

View File

@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* The starting point for accessing FoundationDB.
* <br>
* <h3>Setting API version</h3>
* <h2>Setting API version</h2>
* The FoundationDB API is accessed with a call to {@link #selectAPIVersion(int)}.
* This call is required before using any other part of the API. The call allows
* an error to be thrown at this point to prevent client code from accessing a later library
@ -49,11 +49,11 @@ import java.util.concurrent.atomic.AtomicInteger;
* being used to connect to the cluster. In particular, you should not advance
* the API version of your application after upgrading your client until the
* cluster has also been upgraded.<br>
* <h3>Getting a database</h3>
* <h2>Getting a database</h2>
* Once the API version has been set, the easiest way to get a {@link Database} object to use is
* to call {@link #open}.
* <br>
* <h3>Client networking</h3>
* <h2>Client networking</h2>
* The network is started either implicitly with a call to a variant of {@link #open()}
* or started explicitly with a call to {@link #startNetwork()}.
* <br>

View File

@ -39,7 +39,7 @@ import com.apple.foundationdb.Range;
* the same order in which they would sort in FoundationDB. {@code Tuple}s sort
* first by the first element, then by the second, etc. This makes the tuple layer
* ideal for building a variety of higher-level data models.<br>
* <h3>Types</h3>
* <h2>Types</h2>
* A {@code Tuple} can
* contain byte arrays ({@code byte[]}), {@link String}s, {@link Number}s, {@link UUID}s,
* {@code boolean}s, {@link List}s, {@link Versionstamp}s, other {@code Tuple}s, and {@code null}.
@ -50,7 +50,7 @@ import com.apple.foundationdb.Range;
* a {@code long} integral value, so the range will be constrained to
* [{@code -2^63}, {@code 2^63-1}]. Note that for numbers outside this range the way that Java
* truncates integral values may yield unexpected results.<br>
* <h3>{@code null} values</h3>
* <h2>{@code null} values</h2>
* The FoundationDB tuple specification has a special type-code for {@code None}; {@code nil}; or,
* as Java would understand it, {@code null}.
* The behavior of the layer in the presence of {@code null} varies by type with the intention

View File

@ -2,7 +2,7 @@
<BODY>
This documents the client API for using FoundationDB from Java.<br>
<br>
<h3>Installation</h3>
<h1>Installation</h1>
FoundationDB's Java bindings rely on native libraries that are installed as part of the
FoundationDB client binaries installation (see
<a href="/foundationdb/api-general.html#installing-client-binaries" target="_blank">
@ -10,7 +10,7 @@ Installing FoundationDB client binaries</a>). The JAR can be downloaded from
<a href="https://www.foundationdb.org/download/">our website</a>
and then added to your classpath.<br>
<br>
<h3>Getting started</h3>
<h1>Getting started</h1>
To start using FoundationDB from Java, create an instance of the
{@link com.apple.foundationdb.FDB FoundationDB API interface} with the version of the
API that you want to use (this release of the FoundationDB Java API supports versions between {@code 510} and {@code 620}).
@ -50,7 +50,7 @@ public class Example {
}
}
</pre>
<h3>FoundationDB {@link com.apple.foundationdb.tuple Tuple API}</h3>
<h1>FoundationDB {@link com.apple.foundationdb.tuple Tuple API}</h1>
The {@link com.apple.foundationdb.tuple Tuple API} is provided with the core Java API for FoundationDB.
This layer is provided in some form in all official language bindings. It enables
cross-language support for storing and retrieving typed data from the
@ -60,7 +60,7 @@ binary data that FoundationDB supports. And, just as importantly, data packed in
and <a href="/foundationdb/data-modeling.html#data-modeling-tuples">general Tuple documentation</a>
for information about how Tuples sort and can be used to efficiently model data.
<br>
<h3>FoundationDB {@link com.apple.foundationdb.directory Directory API}</h3>
<h1>FoundationDB {@link com.apple.foundationdb.directory Directory API}</h1>
The {@link com.apple.foundationdb.directory Directory API} is provided with the core
Java API for FoundationDB. This layer is provided in some form in all official
language bindings. The FoundationDB API provides directories as a tool for

View File

@ -2,6 +2,15 @@
Release Notes
#############
6.2.8
=====
Fixes
-----
* Significantly improved the rate at which the transaction logs in a remote region can pull data from the primary region. `(PR #2307) <https://github.com/apple/foundationdb/pull/2307>`_.
* The ``system_kv_size_bytes`` status field could report a size much larger than the actual size of the system keyspace. `(PR #2305) <https://github.com/apple/foundationdb/pull/2305>`_.
6.2.7
=====
@ -133,7 +142,6 @@ Fixes only impacting 6.2.0+
* The cluster controller would saturate its CPU for a few seconds when sending configuration information to all of the worker processes. [6.2.4] `(PR #2086) <https://github.com/apple/foundationdb/pull/2086>`_.
* The data distributor would build all possible team combinations if it was tracking an unhealthy server with less than 10 teams. [6.2.4] `(PR #2099) <https://github.com/apple/foundationdb/pull/2099>`_.
* The cluster controller could crash if a coordinator was unreachable when compiling cluster status. [6.2.4] `(PR #2065) <https://github.com/apple/foundationdb/pull/2065>`_.
* The cluster controller could crash if a coordinator was unreachable when compiling cluster status. [6.2.4] `(PR #2065) <https://github.com/apple/foundationdb/pull/2065>`_.
* A storage server could crash if it took longer than 10 minutes to fetch a key range from another server. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* Excluding or including servers would restart the data distributor. [6.2.5] `(PR #2170) <https://github.com/apple/foundationdb/pull/2170>`_.
* The data distributor could read invalid memory when estimating database size. [6.2.6] `(PR #2225) <https://github.com/apple/foundationdb/pull/2225>`_.

View File

@ -913,7 +913,7 @@ void printBackupContainerInfo() {
static void printBackupUsage(bool devhelp) {
printf("FoundationDB " FDB_VT_PACKAGE_NAME " (v" FDB_VT_VERSION ")\n");
printf("Usage: %s (start | status | abort | wait | discontinue | pause | resume | expire | delete | describe | list) [OPTIONS]\n\n", exeBackup.toString().c_str());
printf("Usage: %s (start | status | abort | wait | discontinue | pause | resume | expire | delete | describe | list | cleanup) [OPTIONS]\n\n", exeBackup.toString().c_str());
printf(" -C CONNFILE The path of a file containing the connection string for the\n"
" FoundationDB cluster. The default is first the value of the\n"
" FDB_CLUSTER_FILE environment variable, then `./fdb.cluster',\n"
@ -964,6 +964,11 @@ static void printBackupUsage(bool devhelp) {
printf(" --trace_format FORMAT\n"
" Select the format of the trace files. xml (the default) and json are supported.\n"
" Has no effect unless --log is specified.\n");
printf(" --max_cleanup_seconds SECONDS\n"
" Specifies the amount of time a backup or DR needs to be stale before cleanup will\n"
" remove mutations for it. By default this is set to one hour.\n");
printf(" --delete_data\n"
" This flag will cause cleanup to remove mutations for the most stale backup or DR.\n");
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif

View File

@ -862,29 +862,33 @@ ACTOR Future<Void> cleanupLogMutations(Database cx, Value destUidValue, bool del
wait(success(foundDRKey) && success(foundBackupKey));
if(foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("WARNING: Found a tag which looks like both a backup and a DR. This tag was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
printf("WARNING: Found a tag that looks like both a backup and a DR. This tag is %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(foundDRKey.get().present() && !foundBackupKey.get().present()) {
printf("Found a DR which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
printf("Found a DR that is %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(!foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("Found a Backup which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
printf("Found a Backup that is %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else {
printf("WARNING: Found a unknown tag which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
printf("WARNING: Found an unknown tag that is %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
loggedLogUids.insert(currLogUid);
}
}
if( readVer - minVersion > CLIENT_KNOBS->MIN_CLEANUP_SECONDS*CLIENT_KNOBS->CORE_VERSIONSPERSECOND && deleteData && (!removingLogUid.present() || minVersionLogUid == removingLogUid.get()) ) {
if(deleteData) {
if(readVer - minVersion > CLIENT_KNOBS->MIN_CLEANUP_SECONDS*CLIENT_KNOBS->CORE_VERSIONSPERSECOND && (!removingLogUid.present() || minVersionLogUid == removingLogUid.get())) {
removingLogUid = minVersionLogUid;
wait(eraseLogData(tr, minVersionLogUid, destUidValue));
wait(tr->commit());
printf("\nSuccessfully removed the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
printf("\nSuccessfully removed the tag that was %.4f hours behind.\n\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(removingLogUid.present() && minVersionLogUid != removingLogUid.get()) {
printf("\nWARNING: The oldest tag was possibly removed, run again without `--delete_data' to check.\n");
} else if( deleteData ) {
printf("\nWARNING: Did not delete data because the tag was not at least %.4f hours behind. Change `--min_cleanup_seconds' to adjust this threshold.\n", CLIENT_KNOBS->MIN_CLEANUP_SECONDS/3600.0);
printf("\nWARNING: The oldest tag was possibly removed, run again without `--delete_data' to check.\n\n");
} else {
printf("\nPassing `--delete_data' would delete the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
printf("\nWARNING: Did not delete data because the tag is not at least %.4f hours behind. Change `--min_cleanup_seconds' to adjust this threshold.\n\n", CLIENT_KNOBS->MIN_CLEANUP_SECONDS/3600.0);
}
} else if(readVer - minVersion > CLIENT_KNOBS->MIN_CLEANUP_SECONDS*CLIENT_KNOBS->CORE_VERSIONSPERSECOND) {
printf("\nPassing `--delete_data' would delete the tag that is %.4f hours behind.\n\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else {
printf("\nPassing `--delete_data' would not delete the tag that is %.4f hours behind. Change `--min_cleanup_seconds' to adjust the cleanup threshold.\n\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
return Void();

View File

@ -443,6 +443,7 @@ Future<Void> shardMerger(
bool forwardComplete = false;
KeyRangeRef merged;
StorageMetrics endingStats = shardSize->get().get();
int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().bytes : 0;
loop {
Optional<StorageMetrics> newMetrics;
@ -480,6 +481,9 @@ Future<Void> shardMerger(
merged = KeyRangeRef( prevIter->range().begin, nextIter->range().end );
endingStats += newMetrics.get();
if((forwardComplete ? prevIter->range().begin : nextIter->range().begin) >= systemKeys.begin) {
systemBytes += newMetrics.get().bytes;
}
shardsMerged++;
auto shardBounds = getShardSizeBounds( merged, maxShardSize );
@ -498,6 +502,9 @@ Future<Void> shardMerger(
// If going forward, remove most recently added range
endingStats -= newMetrics.get();
if(nextIter->range().begin >= systemKeys.begin) {
systemBytes -= newMetrics.get().bytes;
}
shardsMerged--;
--nextIter;
merged = KeyRangeRef( prevIter->range().begin, nextIter->range().end );
@ -514,6 +521,9 @@ Future<Void> shardMerger(
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged);
if(mergeRange.begin < systemKeys.begin) {
self->systemSizeEstimate -= systemBytes;
}
restartShardTrackers( self, mergeRange, endingStats );
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
self->output.send( RelocateShard( mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD ) );

View File

@ -337,6 +337,9 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) {
throw timed_out();
}
auto& trackerData = self->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));

View File

@ -1719,7 +1719,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
}
}
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted, bool parallelGetMore ) {
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = beginVersion;
@ -1733,7 +1733,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
}
when( wait( dbInfoChange ) ) {
if( logData->logSystem->get() ) {
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, parallelGetMore );
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, true );
} else {
r = Reference<ILogSystem::IPeekCursor>();
}
@ -1870,7 +1870,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
if(!logData->isPrimary) {
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true, true) );
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true) );
}
try {
@ -2233,10 +2233,10 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
logData->logRouterPopToVersion = req.recoverAt;
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true, false) || logData->removed);
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed);
} else if(!req.recoverTags.empty()) {
ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion);
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false, true) || logData->removed);
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed);
}
pulledRecoveryVersions = true;
logData->knownCommittedVersion = req.recoverAt;

View File

@ -2131,8 +2131,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
}
}
// copy data from old gene to new gene without desiarlzing
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted, bool parallelGetMore ) {
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = beginVersion;
@ -2150,7 +2149,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
}
when( wait( dbInfoChange ) ) {
if( logData->logSystem->get() ) {
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, parallelGetMore );
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, true );
} else {
r = Reference<ILogSystem::IPeekCursor>();
}
@ -2287,7 +2286,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
if(!logData->isPrimary) {
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true, true) );
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true) );
}
try {
@ -2682,10 +2681,10 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
logData->logRouterPopToVersion = req.recoverAt;
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true, false) || logData->removed);
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed);
} else if(!req.recoverTags.empty()) {
ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion);
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false, true) || logData->removed);
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed);
}
pulledRecoveryVersions = true;
logData->knownCommittedVersion = req.recoverAt;

View File

@ -553,21 +553,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(bestSet == -1) {
TraceEvent("TLogPeekRemoteNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
if(begin >= lastBegin) {
TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
int i = 0;
while(begin < lastBegin) {
if(i == oldLogData.size()) {
TraceEvent("TLogPeekRemoteDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
int bestOldSet = -1;
@ -584,14 +584,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(bestOldSet == -1) {
TraceEvent("TLogPeekRemoteNoOldBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
if(thisBegin < lastBegin) {
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
.detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion);
cursors.emplace_back(new ILogSystem::MergedPeekCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
thisBegin, lastBegin, false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0));
thisBegin, lastBegin, parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0));
epochEnds.emplace_back(lastBegin);
lastBegin = thisBegin;
}

View File

@ -1092,6 +1092,10 @@ public:
// If the user chosen physical page size is larger, then there will be a gap of unused space after
// between the end of page 1 and the start of page 2.
ACTOR static Future<Reference<IPage>> readHeaderPage(COWPager *self, PhysicalPageID pageID) {
if(g_network->getCurrentTask() > TaskPriority::DiskRead) {
wait(delay(0, TaskPriority::DiskRead));
}
state Reference<IPage> page(new FastAllocatedPage(smallestPhysicalBlock, smallestPhysicalBlock));
int readBytes = wait(self->pageFile->read(page->mutate(), smallestPhysicalBlock, (int64_t)pageID * smallestPhysicalBlock));
debug_printf("COWPager(%s) header op=read_complete %s bytes=%d\n", self->filename.c_str(), toString(pageID).c_str(), readBytes);
@ -1100,6 +1104,10 @@ public:
}
ACTOR static Future<Reference<IPage>> readPhysicalPage(COWPager *self, PhysicalPageID pageID) {
if(g_network->getCurrentTask() > TaskPriority::DiskRead) {
wait(delay(0, TaskPriority::DiskRead));
}
state Reference<IPage> page = self->newPageBuffer();
debug_printf("COWPager(%s) op=read_physical_start %s\n", self->filename.c_str(), toString(pageID).c_str());
int readBytes = wait(self->pageFile->read(page->mutate(), self->physicalPageSize, (int64_t)pageID * self->physicalPageSize));
@ -1200,11 +1208,17 @@ public:
debug_printf("COWPager(%s) Syncing\n", self->filename.c_str());
// Sync everything except the header
if(g_network->getCurrentTask() > TaskPriority::DiskWrite) {
wait(delay(0, TaskPriority::DiskWrite));
}
wait(self->pageFile->sync());
debug_printf("COWPager(%s) commit version %" PRId64 " sync 1\n", self->filename.c_str(), self->pHeader->committedVersion);
// Update header on disk and sync again.
wait(self->writeHeaderPage(0, self->headerPage));
if(g_network->getCurrentTask() > TaskPriority::DiskWrite) {
wait(delay(0, TaskPriority::DiskWrite));
}
wait(self->pageFile->sync());
debug_printf("COWPager(%s) commit version %" PRId64 " sync 2\n", self->filename.c_str(), self->pHeader->committedVersion);
@ -2271,10 +2285,10 @@ struct BTreePage {
}
};
static void makeEmptyPage(Reference<IPage> page, uint8_t newFlags) {
static void makeEmptyRoot(Reference<IPage> page) {
BTreePage *btpage = (BTreePage *)page->begin();
btpage->formatVersion = BTreePage::FORMAT_VERSION;
btpage->flags = newFlags;
btpage->flags = BTreePage::IS_LEAF;
btpage->height = 1;
btpage->kvBytes = 0;
btpage->itemCount = 0;
@ -2637,7 +2651,7 @@ public:
self->m_header.height = 1;
++latest;
Reference<IPage> page = self->m_pager->newPageBuffer();
makeEmptyPage(page, BTreePage::IS_LEAF);
makeEmptyRoot(page);
self->m_pager->updatePage(id, page);
self->m_pager->setCommitVersion(latest);
@ -3228,6 +3242,7 @@ private:
childPageID.push_back(records.arena(), id);
}
}
wait(yield());
// Update activity counts
++counts.pageWrites;
@ -3327,7 +3342,7 @@ private:
debug_printf("readPage() op=readForDeferredClear %s @%" PRId64 " \n", toString(id).c_str(), snapshot->getVersion());
}
wait(delay(0, TaskPriority::DiskRead));
wait(yield());
state Reference<const IPage> page;
@ -3811,7 +3826,8 @@ private:
debug_printf("Writing new empty root.\n");
LogicalPageID newRootID = wait(self->m_pager->newPageID());
Reference<IPage> page = self->m_pager->newPageBuffer();
makeEmptyPage(page, BTreePage::IS_LEAF);
makeEmptyRoot(page);
self->m_header.height = 1;
self->m_pager->updatePage(newRootID, page);
rootPageID = BTreePageID((LogicalPageID *)&newRootID, 1);
}
@ -4509,7 +4525,7 @@ public:
KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue()));
accumulatedBytes += kv.expectedSize();
result.push_back(result.arena(), kv);
if(--rowLimit == 0 || accumulatedBytes >= byteLimit) {
if(++rowLimit == 0 || accumulatedBytes >= byteLimit) {
break;
}
wait(cur->prev(true));