Merge branch 'release-5.1' of github.com:apple/foundationdb into release-5.1
This commit is contained in:
commit
ae7d8e90b2
|
@ -735,9 +735,8 @@ public:
|
|||
// These should not happen
|
||||
if(e.code() == error_code_key_not_found)
|
||||
t.backtrace();
|
||||
std::string msg = format("ERROR: %s %s", e.what(), details.c_str());
|
||||
|
||||
return updateErrorInfo(cx, e, msg);
|
||||
return updateErrorInfo(cx, e, details);
|
||||
}
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -35,6 +35,49 @@
|
|||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <algorithm>
|
||||
|
||||
static std::string boolToYesOrNo(bool val) { return val ? std::string("Yes") : std::string("No"); }
|
||||
|
||||
static std::string versionToString(Optional<Version> version) {
|
||||
if (version.present())
|
||||
return std::to_string(version.get());
|
||||
else
|
||||
return "N/A";
|
||||
}
|
||||
|
||||
static std::string timeStampToString(Optional<int64_t> ts) {
|
||||
if (!ts.present())
|
||||
return "N/A";
|
||||
time_t curTs = ts.get();
|
||||
char buffer[128];
|
||||
struct tm* timeinfo;
|
||||
timeinfo = localtime(&curTs);
|
||||
strftime(buffer, 128, "%D %T", timeinfo);
|
||||
return std::string(buffer);
|
||||
}
|
||||
|
||||
static Future<Optional<int64_t>> getTimestampFromVersion(Optional<Version> ver, Reference<ReadYourWritesTransaction> tr) {
|
||||
if (!ver.present())
|
||||
return Optional<int64_t>();
|
||||
|
||||
return timeKeeperEpochsFromVersion(ver.get(), tr);
|
||||
}
|
||||
|
||||
// Time format :
|
||||
// <= 59 seconds
|
||||
// <= 59.99 minutes
|
||||
// <= 23.99 hours
|
||||
// N.NN days
|
||||
std::string secondsToTimeFormat(int64_t seconds) {
|
||||
if (seconds >= 86400)
|
||||
return format("%.2f day(s)", seconds / 86400.0);
|
||||
else if (seconds >= 3600)
|
||||
return format("%.2f hour(s)", seconds / 3600.0);
|
||||
else if (seconds >= 60)
|
||||
return format("%.2f minute(s)", seconds / 60.0);
|
||||
else
|
||||
return format("%ld second(s)", seconds);
|
||||
}
|
||||
|
||||
const Key FileBackupAgent::keyLastRestorable = LiteralStringRef("last_restorable");
|
||||
|
||||
// For convenience
|
||||
|
@ -177,9 +220,8 @@ public:
|
|||
// These should not happen
|
||||
if(e.code() == error_code_key_not_found)
|
||||
t.backtrace();
|
||||
std::string msg = format("ERROR: %s (%s)", details.c_str(), e.what());
|
||||
|
||||
return updateErrorInfo(cx, e, msg);
|
||||
return updateErrorInfo(cx, e, details);
|
||||
}
|
||||
|
||||
Key mutationLogPrefix() {
|
||||
|
@ -790,7 +832,7 @@ namespace fileBackup {
|
|||
// servers to catch and log to the appropriate config any error that execute/finish didn't catch and log.
|
||||
struct RestoreTaskFuncBase : TaskFuncBase {
|
||||
virtual Future<Void> handleError(Database cx, Reference<Task> task, Error const &error) {
|
||||
return RestoreConfig(task).logError(cx, error, format("Task '%s' UID '%s' %s failed", task->params[Task::reservedTaskParamKeyType].printable().c_str(), task->key.printable().c_str(), toString(task).c_str()));
|
||||
return RestoreConfig(task).logError(cx, error, format("'%s' on '%s'", error.what(), task->params[Task::reservedTaskParamKeyType].printable().c_str()));
|
||||
}
|
||||
virtual std::string toString(Reference<Task> task)
|
||||
{
|
||||
|
@ -800,7 +842,7 @@ namespace fileBackup {
|
|||
|
||||
struct BackupTaskFuncBase : TaskFuncBase {
|
||||
virtual Future<Void> handleError(Database cx, Reference<Task> task, Error const &error) {
|
||||
return BackupConfig(task).logError(cx, error, format("Task '%s' UID '%s' %s failed", task->params[Task::reservedTaskParamKeyType].printable().c_str(), task->key.printable().c_str(), toString(task).c_str()));
|
||||
return BackupConfig(task).logError(cx, error, format("'%s' on '%s'", error.what(), task->params[Task::reservedTaskParamKeyType].printable().c_str()));
|
||||
}
|
||||
virtual std::string toString(Reference<Task> task)
|
||||
{
|
||||
|
@ -3538,17 +3580,50 @@ public:
|
|||
state int64_t snapshotInterval;
|
||||
state Version snapshotBeginVersion;
|
||||
state Version snapshotTargetEndVersion;
|
||||
state Optional<Version> latestSnapshotEndVersion;
|
||||
state Optional<Version> latestLogEndVersion;
|
||||
state Optional<int64_t> logBytesWritten;
|
||||
state Optional<int64_t> rangeBytesWritten;
|
||||
state Optional<int64_t> latestSnapshotEndVersionTimestamp;
|
||||
state Optional<int64_t> latestLogEndVersionTimestamp;
|
||||
state Optional<int64_t> snapshotBeginVersionTimestamp;
|
||||
state Optional<int64_t> snapshotTargetEndVersionTimestamp;
|
||||
state bool stopWhenDone;
|
||||
|
||||
Void _ = wait( store(config.snapshotBeginVersion().getOrThrow(tr), snapshotBeginVersion)
|
||||
&& store(config.snapshotTargetEndVersion().getOrThrow(tr), snapshotTargetEndVersion)
|
||||
&& store(config.snapshotIntervalSeconds().getOrThrow(tr), snapshotInterval)
|
||||
);
|
||||
&& store(config.logBytesWritten().get(tr), logBytesWritten)
|
||||
&& store(config.rangeBytesWritten().get(tr), rangeBytesWritten)
|
||||
&& store(config.latestLogEndVersion().get(tr), latestLogEndVersion)
|
||||
&& store(config.latestSnapshotEndVersion().get(tr), latestSnapshotEndVersion)
|
||||
&& store(config.stopWhenDone().getOrThrow(tr), stopWhenDone)
|
||||
);
|
||||
|
||||
Void _ = wait( store(getTimestampFromVersion(latestSnapshotEndVersion, tr), latestSnapshotEndVersionTimestamp)
|
||||
&& store(getTimestampFromVersion(latestLogEndVersion, tr), latestLogEndVersionTimestamp)
|
||||
&& store(timeKeeperEpochsFromVersion(snapshotBeginVersion, tr), snapshotBeginVersionTimestamp)
|
||||
&& store(timeKeeperEpochsFromVersion(snapshotTargetEndVersion, tr), snapshotTargetEndVersionTimestamp)
|
||||
);
|
||||
|
||||
statusText += format("Snapshot interval is %lld seconds. ", snapshotInterval);
|
||||
if(backupState == BackupAgentBase::STATE_DIFFERENTIAL)
|
||||
statusText += format("Current snapshot progress target is %3.2f%% (>100%% means the snapshot is supposed to be done)\n", 100.0 * (recentReadVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion)) ;
|
||||
else
|
||||
statusText += "The initial snapshot is still running.\n";
|
||||
|
||||
statusText += format("\nDetails:\n LogBytes written - %ld\n RangeBytes written - %ld\n "
|
||||
"Last complete log version and timestamp - %s, %s\n "
|
||||
"Last complete snapshot version and timestamp - %s, %s\n "
|
||||
"Current Snapshot start version and timestamp - %s, %s\n "
|
||||
"Expected snapshot end version and timestamp - %s, %s\n "
|
||||
"Backup supposed to stop at next snapshot completion - %s\n",
|
||||
logBytesWritten.orDefault(0), rangeBytesWritten.orDefault(0),
|
||||
versionToString(latestLogEndVersion).c_str(), timeStampToString(latestLogEndVersionTimestamp).c_str(),
|
||||
versionToString(latestSnapshotEndVersion).c_str(), timeStampToString(latestSnapshotEndVersionTimestamp).c_str(),
|
||||
versionToString(snapshotBeginVersion).c_str(), timeStampToString(snapshotBeginVersionTimestamp).c_str(),
|
||||
versionToString(snapshotTargetEndVersion).c_str(), timeStampToString(snapshotTargetEndVersionTimestamp).c_str(),
|
||||
boolToYesOrNo(stopWhenDone).c_str());
|
||||
}
|
||||
|
||||
// Append the errors, if requested
|
||||
|
@ -3559,8 +3634,7 @@ public:
|
|||
|
||||
for(auto &e : errors) {
|
||||
Version v = e.second.second;
|
||||
std::string msg = format("%s (%lld seconds ago)\n", e.second.first.c_str(),
|
||||
(recentReadVersion - v) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND);
|
||||
std::string msg = format("%s ago : %s\n", secondsToTimeFormat((recentReadVersion - v) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND).c_str(), e.second.first.c_str());
|
||||
|
||||
// If error version is at or more recent than the latest restorable version then it could be inhibiting progress
|
||||
if(v >= latestRestorableVersion.orDefault(0)) {
|
||||
|
@ -3571,10 +3645,16 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
if(!recentErrors.empty())
|
||||
statusText += "Errors possibly preventing progress:\n" + recentErrors;
|
||||
if (!recentErrors.empty()) {
|
||||
if (latestRestorableVersion.present())
|
||||
statusText += format("Recent Errors (since latest restorable point %s ago)\n",
|
||||
secondsToTimeFormat((recentReadVersion - latestRestorableVersion.get()) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND).c_str())
|
||||
+ recentErrors;
|
||||
else
|
||||
statusText += "Recent Errors (since initialization)\n" + recentErrors;
|
||||
}
|
||||
if(!pastErrors.empty())
|
||||
statusText += "Older errors:\n" + pastErrors;
|
||||
statusText += "Older Errors\n" + pastErrors;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -885,12 +885,12 @@ Reference<ProxyInfo> DatabaseContext::getMasterProxies() {
|
|||
}
|
||||
|
||||
//Actor which will wait until the ProxyInfo returned by the DatabaseContext cx is not NULL
|
||||
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx) {
|
||||
loop{
|
||||
Reference<ProxyInfo> proxies = cx->getMasterProxies();
|
||||
if (proxies)
|
||||
return proxies;
|
||||
Void _ = wait( cx->onMasterProxiesChanged() );
|
||||
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx) {
|
||||
loop{
|
||||
Reference<ProxyInfo> proxies = cx->getMasterProxies();
|
||||
if (proxies)
|
||||
return proxies;
|
||||
Void _ = wait( cx->onMasterProxiesChanged() );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -343,27 +343,26 @@ ACTOR Future<Void> runProfiler(ProfilerRequest req) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit ) {
|
||||
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, UID id, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit ) {
|
||||
loop {
|
||||
ErrorOr<Void> e = wait( errorOr( prevStorageServer) );
|
||||
if (!e.isError()) return Void();
|
||||
else if (e.getError().code() != error_code_please_reboot) throw e.getError();
|
||||
|
||||
TraceEvent("StorageServerRequestedReboot", ssi.id());
|
||||
TraceEvent("StorageServerRequestedReboot", id);
|
||||
|
||||
//if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as zombie()
|
||||
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
|
||||
// the old one so the failure detector will know it is gone.
|
||||
StorageServerInterface ssi_new;
|
||||
ssi_new.uniqueID = ssi.uniqueID;
|
||||
ssi_new.locality = ssi.locality;
|
||||
ssi = ssi_new;
|
||||
StorageServerInterface ssi;
|
||||
ssi.uniqueID = id;
|
||||
ssi.locality = locality;
|
||||
ssi.initEndpoints();
|
||||
auto* kv = openKVStore( storeType, filename, ssi.uniqueID, memoryLimit );
|
||||
Future<Void> kvClosed = kv->onClosed();
|
||||
filesClosed->add( kvClosed );
|
||||
prevStorageServer = storageServer( kv, ssi, db, folder, Promise<Void>() );
|
||||
prevStorageServer = handleIOErrors( prevStorageServer, kv, ssi.id(), kvClosed );
|
||||
prevStorageServer = handleIOErrors( prevStorageServer, kv, id, kvClosed );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -610,7 +609,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
|
||||
recoveries.push_back(recovery.getFuture());
|
||||
f = handleIOErrors( f, kv, s.storeID, kvClosed );
|
||||
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited, dbInfo, folder, &filesClosed, memoryLimit );
|
||||
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
|
||||
} else if( s.storedComponent == DiskStore::TLogData ) {
|
||||
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
|
||||
|
@ -763,7 +762,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
|
||||
s = handleIOErrors(s, data, recruited.id(), kvClosed);
|
||||
s = storageCache.removeOnReady( req.reqId, s );
|
||||
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited, dbInfo, folder, &filesClosed, memoryLimit );
|
||||
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
|
||||
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );
|
||||
} else
|
||||
forwardPromise( req.reply, storageCache.get( req.reqId ) );
|
||||
|
|
|
@ -682,6 +682,7 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
state Key lastSampleKey;
|
||||
state Key lastStartSampleKey;
|
||||
state int64_t totalReadAmount = 0;
|
||||
state int64_t rangeBytes = 0;
|
||||
|
||||
state KeySelector begin = firstGreaterOrEqual(range.begin);
|
||||
|
||||
|
@ -725,6 +726,9 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
{
|
||||
state GetKeyValuesReply current = rangeResult.get();
|
||||
totalReadAmount += current.data.expectedSize();
|
||||
if (j == 0) {
|
||||
rangeBytes += current.data.expectedSize();
|
||||
}
|
||||
//If we haven't encountered a valid storage server yet, then mark this as the baseline to compare against
|
||||
if(firstValidServer == -1)
|
||||
firstValidServer = j;
|
||||
|
@ -911,6 +915,8 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
}
|
||||
}
|
||||
|
||||
TraceEvent("ConsistencyCheck_CheckedRange").detail("Range", printable(range)).detail("Bytes", rangeBytes);
|
||||
|
||||
canSplit = canSplit && sampledBytes - splitBytes >= shardBounds.min.bytes && sampledBytes > splitBytes;
|
||||
|
||||
//Update the size of all storage servers containing this shard
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{61C46988-7589-4B8A-9BB9-D850FD5B8B05}'
|
||||
Id='{3A9AB74C-F787-4A85-9D50-C0E18E2A1CA7}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
|
Loading…
Reference in New Issue