Fixed printf field width specifier to reduce compilation warnings within OS X
This commit is contained in:
parent
0b9ed67e12
commit
b28ed397a2
|
@ -489,7 +489,7 @@ void initHelp() {
|
|||
"If KEY is not already present in the database, it will be created." ESCAPINGKV);
|
||||
helpMap["option"] = CommandHelp(
|
||||
"option <STATE> <OPTION> <ARG>",
|
||||
"enables or disables an option",
|
||||
"enables or disables an option",
|
||||
"If STATE is `on', then the option OPTION will be enabled with optional parameter ARG, if required. If STATE is `off', then OPTION will be disabled.\n\nIf there is no active transaction, then the option will be applied to all operations as well as all subsequently created transactions (using `begin').\n\nIf there is an active transaction (one created with `begin'), then enabled options apply only to that transaction. Options cannot be disabled on an active transaction.\n\nCalling `option' with no parameters prints a list of all enabled options.\n\nFor information about specific options that can be set, type `help options'.");
|
||||
helpMap["help"] = CommandHelp(
|
||||
"help [<topic>]",
|
||||
|
@ -584,8 +584,8 @@ std::string getProcessAddressByServerID(StatusObjectReader processesMap, std::st
|
|||
}
|
||||
}
|
||||
catch (std::exception &e) {
|
||||
// If an entry in the process map is badly formed then something will throw. Since we are
|
||||
// looking for a positive match, just ignore any read execeptions and move on to the next proc
|
||||
// If an entry in the process map is badly formed then something will throw. Since we are
|
||||
// looking for a positive match, just ignore any read execeptions and move on to the next proc
|
||||
}
|
||||
}
|
||||
return "unknown";
|
||||
|
@ -681,9 +681,9 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
if (statusObjCluster.get("recovery_state", recoveryState)) {
|
||||
std::string name;
|
||||
std::string description;
|
||||
if (recoveryState.get("name", name) &&
|
||||
recoveryState.get("description", description) &&
|
||||
name != "fully_recovered")
|
||||
if (recoveryState.get("name", name) &&
|
||||
recoveryState.get("description", description) &&
|
||||
name != "fully_recovered")
|
||||
{
|
||||
fatalRecoveryState = true;
|
||||
|
||||
|
@ -713,10 +713,10 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
|
||||
|
||||
// Check if cluster controllable is reachable
|
||||
try {
|
||||
try {
|
||||
// print any cluster messages
|
||||
if (statusObjCluster.has("messages") && statusObjCluster.last().get_array().size()){
|
||||
|
||||
|
||||
// any messages we don't want to display
|
||||
std::set<std::string> skipMsgs = { "unreachable_process", "" };
|
||||
if (fatalRecoveryState){
|
||||
|
@ -728,7 +728,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
skipMsgs.insert("read_probe_timeout");
|
||||
skipMsgs.insert("commit_probe_timeout");
|
||||
}
|
||||
|
||||
|
||||
for (StatusObjectReader msgObj : statusObjCluster.last().get_array()){
|
||||
std::string messageName;
|
||||
if(!msgObj.get("name", messageName)){
|
||||
|
@ -776,12 +776,12 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
if (msgObj.has("description"))
|
||||
outputString += "\n" + lineWrap(msgObj.last().get_str().c_str(), 80);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::runtime_error& e){}
|
||||
|
||||
|
||||
if (fatalRecoveryState){
|
||||
printf("%s", outputString.c_str());
|
||||
return;
|
||||
|
@ -826,10 +826,10 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
if (excludedServersArr.size()) {
|
||||
outputString += format("\n Exclusions - %d (type `exclude' for details)", excludedServersArr.size());
|
||||
}
|
||||
|
||||
|
||||
if (statusObjConfig.get("proxies", intVal))
|
||||
outputString += format("\n Desired Proxies - %d", intVal);
|
||||
|
||||
|
||||
if (statusObjConfig.get("resolvers", intVal))
|
||||
outputString += format("\n Desired Resolvers - %d", intVal);
|
||||
|
||||
|
@ -847,7 +847,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
StatusObjectReader machinesMap;
|
||||
|
||||
outputStringCache = outputString;
|
||||
// this bool removed code duplication when there's an else (usually due to a missing field) that should print some error message
|
||||
// this bool removed code duplication when there's an else (usually due to a missing field) that should print some error message
|
||||
// which would be the same error message if the catch block was hit
|
||||
bool success = false;
|
||||
try {
|
||||
|
@ -932,7 +932,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
outputString += "1 machine";
|
||||
else
|
||||
outputString += format("%d machines", minLoss);
|
||||
|
||||
|
||||
if (dataLoss > availLoss){
|
||||
outputString += format(" (%d without data loss)", dataLoss);
|
||||
}
|
||||
|
@ -1074,7 +1074,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
bool unknownMCT = false;
|
||||
bool unknownRP = false;
|
||||
|
||||
// Print performance limit details if known.
|
||||
// Print performance limit details if known.
|
||||
try {
|
||||
StatusObjectReader limit = statusObjCluster["qos.performance_limited_by"];
|
||||
std::string name = limit["name"].get_str();
|
||||
|
@ -1153,7 +1153,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
StatusObjectReader procObj(proc.second);
|
||||
std::string address;
|
||||
procObj.get("address", address);
|
||||
|
||||
|
||||
std::string line;
|
||||
|
||||
// Windows does not support the "hh" width specifier so just using unsigned int to be safe.
|
||||
|
@ -1271,7 +1271,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
|||
|
||||
// status minimal
|
||||
else if (level == StatusClient::MINIMAL) {
|
||||
// Checking for field exsistence is not necessary here because if a field is missing there is no additional information
|
||||
// Checking for field exsistence is not necessary here because if a field is missing there is no additional information
|
||||
// that we would be able to display if we continued execution. Instead, any missing fields will throw and the catch will display the proper message.
|
||||
try {
|
||||
// If any of these throw, can't get status because the result makes no sense.
|
||||
|
@ -1366,7 +1366,7 @@ ACTOR Future<Void> checkStatus(Future<Void> f, Reference<ClusterConnectionFile>
|
|||
Void _ = wait(f);
|
||||
StatusObject s = wait(StatusClient::statusFetcher(clusterFile));
|
||||
printf("\n");
|
||||
printStatus(s, StatusClient::MINIMAL, displayDatabaseAvailable);
|
||||
printStatus(s, StatusClient::MINIMAL, displayDatabaseAvailable);
|
||||
printf("\n");
|
||||
return Void();
|
||||
}
|
||||
|
@ -1418,16 +1418,16 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
|||
return true;
|
||||
}
|
||||
|
||||
bool noChanges = conf.get().old_replication == conf.get().auto_replication &&
|
||||
conf.get().old_logs == conf.get().auto_logs &&
|
||||
conf.get().old_proxies == conf.get().auto_proxies &&
|
||||
conf.get().old_resolvers == conf.get().auto_resolvers &&
|
||||
conf.get().old_processes_with_transaction == conf.get().auto_processes_with_transaction &&
|
||||
bool noChanges = conf.get().old_replication == conf.get().auto_replication &&
|
||||
conf.get().old_logs == conf.get().auto_logs &&
|
||||
conf.get().old_proxies == conf.get().auto_proxies &&
|
||||
conf.get().old_resolvers == conf.get().auto_resolvers &&
|
||||
conf.get().old_processes_with_transaction == conf.get().auto_processes_with_transaction &&
|
||||
conf.get().old_machines_with_transaction == conf.get().auto_machines_with_transaction;
|
||||
|
||||
bool noDesiredChanges = noChanges &&
|
||||
conf.get().old_logs == conf.get().desired_logs &&
|
||||
conf.get().old_proxies == conf.get().desired_proxies &&
|
||||
bool noDesiredChanges = noChanges &&
|
||||
conf.get().old_logs == conf.get().desired_logs &&
|
||||
conf.get().old_proxies == conf.get().desired_proxies &&
|
||||
conf.get().old_resolvers == conf.get().desired_resolvers;
|
||||
|
||||
std::string outputString;
|
||||
|
@ -1435,7 +1435,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
|||
outputString += "\nYour cluster has:\n\n";
|
||||
outputString += format(" processes %d\n", conf.get().processes);
|
||||
outputString += format(" machines %d\n", conf.get().machines);
|
||||
|
||||
|
||||
if(noDesiredChanges) outputString += "\nConfigure recommends keeping your current configuration:\n\n";
|
||||
else if(noChanges) outputString += "\nConfigure cannot modify the configuration because some parameters have been set manually:\n\n";
|
||||
else outputString += "\nConfigure recommends the following changes:\n\n";
|
||||
|
@ -1452,7 +1452,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
|||
outputString += format("| transaction-class processes | %16d | %16d |\n", conf.get().old_processes_with_transaction, conf.get().auto_processes_with_transaction);
|
||||
outputString += format("| transaction-class machines | %16d | %16d |\n", conf.get().old_machines_with_transaction, conf.get().auto_machines_with_transaction);
|
||||
outputString += " ------------------------------------------------------------------- \n\n";
|
||||
|
||||
|
||||
std::printf("%s", outputString.c_str());
|
||||
|
||||
if(noChanges)
|
||||
|
@ -1465,7 +1465,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ConfigurationResult::Type r = wait( makeInterruptable( changeConfig( db, std::vector<StringRef>(tokens.begin()+1,tokens.end()), conf) ) );
|
||||
result = r;
|
||||
}
|
||||
|
@ -1527,7 +1527,7 @@ ACTOR Future<bool> coordinators( Database db, std::vector<StringRef> tokens, boo
|
|||
state std::vector<StringRef>::iterator t;
|
||||
for(t = tokens.begin()+1; t != tokens.end(); ++t) {
|
||||
try {
|
||||
// SOMEDAY: Check for keywords
|
||||
// SOMEDAY: Check for keywords
|
||||
auto const& addr = NetworkAddress::parse( t->toString() );
|
||||
if( addr.isTLS() != isClusterTLS ) {
|
||||
printf("ERROR: cannot use coordinator with incompatible TLS state: `%s'\n", t->toString().c_str());
|
||||
|
@ -1654,13 +1654,13 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
"Type `exclude FORCE <ADDRESS>*' to exclude without checking free space.\n";
|
||||
|
||||
StatusObjectReader statusObj(status);
|
||||
|
||||
|
||||
StatusObjectReader statusObjCluster;
|
||||
if (!statusObj.get("cluster", statusObjCluster)) {
|
||||
printf("%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
StatusObjectReader processesMap;
|
||||
if (!statusObjCluster.get("processes", processesMap)) {
|
||||
printf("%s", errorString.c_str());
|
||||
|
@ -1681,14 +1681,14 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
}
|
||||
NetworkAddress addr = NetworkAddress::parse(addrStr);
|
||||
bool excluded = (process.has("excluded") && process.last().get_bool()) || addressExcluded(exclusions, addr);
|
||||
|
||||
|
||||
if(!excluded) {
|
||||
StatusObjectReader disk;
|
||||
if (!process.get("disk", disk)) {
|
||||
printf("%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
int64_t total_bytes;
|
||||
if (!disk.get("total_bytes", total_bytes)) {
|
||||
printf("%s", errorString.c_str());
|
||||
|
@ -1703,7 +1703,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
|
||||
worstFreeSpaceRatio = std::min(worstFreeSpaceRatio, double(free_bytes)/total_bytes);
|
||||
}
|
||||
|
||||
|
||||
for (StatusObjectReader role : rolesArray) {
|
||||
if (role["role"].get_str() == "storage") {
|
||||
if (excluded)
|
||||
|
@ -1769,7 +1769,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
bool foundCoordinator = false;
|
||||
auto ccs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
|
||||
for( auto& c : ccs.coordinators()) {
|
||||
if (std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip, c.port) ) ||
|
||||
if (std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip, c.port) ) ||
|
||||
std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip) )) {
|
||||
printf("WARNING: %s is a coordinator!\n", c.toString().c_str());
|
||||
foundCoordinator = true;
|
||||
|
@ -1934,7 +1934,7 @@ void fdbcli_comp_cmd(std::string const& text, std::vector<std::string>& lc) {
|
|||
}
|
||||
|
||||
// printf("final text (%d tokens): `%s' & `%s'\n", count, base_input.c_str(), ntext.c_str());
|
||||
|
||||
|
||||
if (!count) {
|
||||
cmd_generator(ntext.c_str(), lc);
|
||||
return;
|
||||
|
@ -1990,7 +1990,7 @@ struct CLIOptions {
|
|||
std::string tlsKeyPath;
|
||||
std::string tlsVerifyPeers;
|
||||
|
||||
CLIOptions( int argc, char* argv[] )
|
||||
CLIOptions( int argc, char* argv[] )
|
||||
: trace(false),
|
||||
exit_timeout(0),
|
||||
initialStatusCheck(true),
|
||||
|
@ -2116,7 +2116,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
state const char *database = "DB";
|
||||
state Standalone<StringRef> openDbName = StringRef(database);
|
||||
|
||||
state Reference<ClusterConnectionFile> ccf;
|
||||
state Reference<ClusterConnectionFile> ccf;
|
||||
|
||||
state std::pair<std::string, bool> resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName( opt.clusterFile );
|
||||
try {
|
||||
|
@ -2484,7 +2484,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
} else if(address_interface.size() == 1) {
|
||||
printf("\nThe following address can be killed:\n");
|
||||
} else {
|
||||
printf("\nThe following %d addresses can be killed:\n", address_interface.size());
|
||||
printf("\nThe following %lu addresses can be killed:\n", address_interface.size());
|
||||
}
|
||||
for( auto it : address_interface ) {
|
||||
printf("%s\n", printable(it.first).c_str());
|
||||
|
@ -2532,7 +2532,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
} else if(address_interface.size() == 1) {
|
||||
printf("\nThe following address can be checked:\n");
|
||||
} else {
|
||||
printf("\nThe following %d addresses can be checked:\n", address_interface.size());
|
||||
printf("\nThe following %lu addresses can be checked:\n", address_interface.size());
|
||||
}
|
||||
for( auto it : address_interface ) {
|
||||
printf("%s\n", printable(it.first).c_str());
|
||||
|
@ -2660,7 +2660,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (tokens.size() != 3) {
|
||||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
|
@ -2681,7 +2681,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (tokens.size() != 2) {
|
||||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
|
@ -2702,7 +2702,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (tokens.size() != 3) {
|
||||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
|
@ -2800,7 +2800,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
}
|
||||
|
||||
ACTOR Future<int> runCli(CLIOptions opt) {
|
||||
state LineNoise linenoise(
|
||||
state LineNoise linenoise(
|
||||
[](std::string const& line, std::vector<std::string>& completions) {
|
||||
fdbcli_comp_cmd(line, completions);
|
||||
},
|
||||
|
|
|
@ -178,13 +178,13 @@ bool PolicyAcross::validate(
|
|||
if (g_replicationdebug > 3) {
|
||||
printf("Across check values:%9lu key: %-7s solutions:%2lu count:%2d policy: %-10s => %s\n", validMap.size(), _attribKey.c_str(), solutionSet.size(), _count, _policy->name().c_str(), _policy->info().c_str());
|
||||
for (auto& itValue : validMap) {
|
||||
printf(" value: (%3lu) %-10s\n", itValue.first._id, fromServers->valueText(itValue.first).c_str());
|
||||
printf(" value: (%3d) %-10s\n", itValue.first._id, fromServers->valueText(itValue.first).c_str());
|
||||
}
|
||||
}
|
||||
for (auto& itValid : validMap) {
|
||||
if (_policy->validate(itValid.second, fromServers)) {
|
||||
if (g_replicationdebug > 4) {
|
||||
printf("Across valid solution: %6lu key: %-7s count:%3d of%3d value: (%3lu) %-10s policy: %-10s => %s\n", itValid.second.size(), _attribKey.c_str(), count+1, _count, itValid.first._id, fromServers->valueText(itValid.first).c_str(), _policy->name().c_str(), _policy->info().c_str());
|
||||
printf("Across valid solution: %6lu key: %-7s count:%3d of%3d value: (%3d) %-10s policy: %-10s => %s\n", itValid.second.size(), _attribKey.c_str(), count+1, _count, itValid.first._id, fromServers->valueText(itValid.first).c_str(), _policy->name().c_str(), _policy->info().c_str());
|
||||
if (g_replicationdebug > 5) {
|
||||
for (auto& entry : itValid.second) {
|
||||
printf(" entry: %s\n", fromServers->getEntryInfo(entry).c_str());
|
||||
|
@ -256,7 +256,7 @@ bool PolicyAcross::selectReplicas(
|
|||
_addedResults.push_back(_arena, std::pair<int, int>(resultsAdded, resultsSize));
|
||||
}
|
||||
if (g_replicationdebug > 5) {
|
||||
printf("Across !added:%3d key: %-7s count:%3d of%3d value: (%3lu) %-10s entry: %s\n", resultsAdded, _attribKey.c_str(), count, _count, value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
|
||||
printf("Across !added:%3d key: %-7s count:%3d of%3d value: (%3d) %-10s entry: %s\n", resultsAdded, _attribKey.c_str(), count, _count, value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
|
||||
}
|
||||
if (count >= _count) break;
|
||||
_usedValues.insert(lowerBound, value.get());
|
||||
|
@ -404,7 +404,7 @@ bool PolicyAnd::selectReplicas(
|
|||
|
||||
for (auto& policy : _sortedPolicies) {
|
||||
if (g_replicationdebug > 3) {
|
||||
printf("And also:%5d used: %4lu from %3d items policy: %-10s => %s\n", newResults.size(), newResults.size()-alsoServers.size(), fromServers->size(), policy->name().c_str(), policy->info().c_str());
|
||||
printf("And also:%5lu used: %4lu from %3d items policy: %-10s => %s\n", newResults.size(), newResults.size()-alsoServers.size(), fromServers->size(), policy->name().c_str(), policy->info().c_str());
|
||||
}
|
||||
if (!policy->selectReplicas(fromServers, newResults, newResults))
|
||||
{
|
||||
|
@ -421,7 +421,7 @@ bool PolicyAnd::selectReplicas(
|
|||
}
|
||||
|
||||
if (g_replicationdebug > 2) {
|
||||
printf("And used:%5d results:%3lu from %3d items\n", newResults.size()-alsoServers.size(), results.size(), fromServers->size());
|
||||
printf("And used:%5lu results:%3lu from %3d items\n", newResults.size()-alsoServers.size(), results.size(), fromServers->size());
|
||||
}
|
||||
return passed;
|
||||
}
|
||||
|
|
|
@ -134,7 +134,7 @@ bool findBestPolicySet(
|
|||
testRate = ratePolicy(testLocalitySet, policy, nSelectTests);
|
||||
|
||||
if (g_replicationdebug > 3) {
|
||||
printf(" rate: %7.5\n", testRate);
|
||||
printf(" rate: %7.5f\n", testRate);
|
||||
}
|
||||
|
||||
if (bestRate < 0.0)
|
||||
|
@ -466,7 +466,7 @@ bool testPolicy(
|
|||
solved = serverMap->selectReplicas(policy, including, entryResults, results);
|
||||
|
||||
if (g_replicationdebug > 1) {
|
||||
printf("%-10s solution:%3lu policy: %-10s => %s include:%4d\n", ((solved) ? "Solved" : "Unsolved"), results.size(), policy->name().c_str(), policy->info().c_str(), including.size());
|
||||
printf("%-10s solution:%3lu policy: %-10s => %s include:%4lu\n", ((solved) ? "Solved" : "Unsolved"), results.size(), policy->name().c_str(), policy->info().c_str(), including.size());
|
||||
}
|
||||
if (g_replicationdebug > 2) {
|
||||
for (auto& entry : entryResults) {
|
||||
|
|
|
@ -80,7 +80,7 @@ struct AddingShard : NonCopyable {
|
|||
Promise<Void> fetchComplete;
|
||||
Promise<Void> readWrite;
|
||||
|
||||
std::deque< Standalone<VerUpdateRef> > updates; // during the Fetching phase, mutations with key in keys and version>=(fetchClient's) fetchVersion;
|
||||
std::deque< Standalone<VerUpdateRef> > updates; // during the Fetching phase, mutations with key in keys and version>=(fetchClient's) fetchVersion;
|
||||
|
||||
struct StorageServer* server;
|
||||
Version transferredVersion;
|
||||
|
@ -113,12 +113,12 @@ struct ShardInfo : ReferenceCounted<ShardInfo>, NonCopyable {
|
|||
KeyRange keys;
|
||||
uint64_t changeCounter;
|
||||
|
||||
ShardInfo(KeyRange keys, AddingShard* adding, StorageServer* readWrite)
|
||||
: adding(adding), readWrite(readWrite), keys(keys)
|
||||
ShardInfo(KeyRange keys, AddingShard* adding, StorageServer* readWrite)
|
||||
: adding(adding), readWrite(readWrite), keys(keys)
|
||||
{
|
||||
}
|
||||
|
||||
~ShardInfo() {
|
||||
~ShardInfo() {
|
||||
delete adding;
|
||||
}
|
||||
|
||||
|
@ -144,7 +144,7 @@ struct ShardInfo : ReferenceCounted<ShardInfo>, NonCopyable {
|
|||
|
||||
struct StorageServerDisk {
|
||||
explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage) {}
|
||||
|
||||
|
||||
void makeNewStorageServerDurable();
|
||||
bool makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft );
|
||||
void makeVersionDurable( Version version );
|
||||
|
@ -236,13 +236,13 @@ struct StorageServer {
|
|||
typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
|
||||
|
||||
private:
|
||||
// versionedData contains sets and clears.
|
||||
|
||||
// versionedData contains sets and clears.
|
||||
|
||||
// * Nonoverlapping: No clear overlaps a set or another clear, or adjoins another clear.
|
||||
// ~ Clears are maximal: If versionedData.at(v) contains a clear [b,e) then
|
||||
// ~ Clears are maximal: If versionedData.at(v) contains a clear [b,e) then
|
||||
// there is a key data[e]@v, or e==allKeys.end, or a shard boundary or former boundary at e
|
||||
|
||||
// * Reads are possible: When k is in a readable shard, for any v in [storageVersion, version.get()],
|
||||
// * Reads are possible: When k is in a readable shard, for any v in [storageVersion, version.get()],
|
||||
// storage[k] + versionedData.at(v)[k] = database[k] @ v (storage[k] might be @ any version in [durableVersion, storageVersion])
|
||||
|
||||
// * Transferred shards are partially readable: When k is in an adding, transferred shard, for any v in [transferredVersion, version.get()],
|
||||
|
@ -254,7 +254,7 @@ private:
|
|||
// * Old shards are erased: versionedData.atLatest() has entries (sets or intersecting clears) only for keys in readable or adding,transferred shards.
|
||||
// Earlier versions may have extra entries for shards that *were* readable or adding,transferred when those versions were the latest, but they eventually are forgotten.
|
||||
|
||||
// * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion(), but views
|
||||
// * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion(), but views
|
||||
// at older versions may contain older items which are also in storage (this is OK because of idempotency)
|
||||
|
||||
VersionedData versionedData;
|
||||
|
@ -342,7 +342,7 @@ public:
|
|||
std::string folder;
|
||||
|
||||
// defined only during splitMutations()/addMutation()
|
||||
UpdateEagerReadInfo *updateEagerReads;
|
||||
UpdateEagerReadInfo *updateEagerReads;
|
||||
|
||||
FlowLock durableVersionLock;
|
||||
FlowLock fetchKeysParallelismLock;
|
||||
|
@ -353,7 +353,7 @@ public:
|
|||
Promise<Void> otherError;
|
||||
Promise<Void> coreStarted;
|
||||
bool shuttingDown;
|
||||
|
||||
|
||||
Smoother readReplyRate; //FIXME: very similar to counters.finishedQueries, new fast load balancing smoother
|
||||
|
||||
bool behind;
|
||||
|
@ -371,23 +371,23 @@ public:
|
|||
struct Counters {
|
||||
CounterCollection cc;
|
||||
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried;
|
||||
Counter bytesInput, bytesDurable, bytesFetched,
|
||||
Counter bytesInput, bytesDurable, bytesFetched,
|
||||
mutationBytes; // Like bytesInput but without MVCC accounting
|
||||
Counter updateBatches, updateVersions;
|
||||
Counter loops;
|
||||
|
||||
Counters(StorageServer* self)
|
||||
: cc("StorageServer", self->thisServerID.toString()),
|
||||
getKeyQueries("getKeyQueries", cc),
|
||||
getValueQueries("getValueQueries",cc),
|
||||
getRangeQueries("getRangeQueries", cc),
|
||||
getKeyQueries("getKeyQueries", cc),
|
||||
getValueQueries("getValueQueries",cc),
|
||||
getRangeQueries("getRangeQueries", cc),
|
||||
allQueries("QueryQueue", cc),
|
||||
finishedQueries("finishedQueries", cc),
|
||||
rowsQueried("rowsQueried", cc),
|
||||
finishedQueries("finishedQueries", cc),
|
||||
rowsQueried("rowsQueried", cc),
|
||||
bytesQueried("bytesQueried", cc),
|
||||
bytesInput("bytesInput", cc),
|
||||
bytesDurable("bytesDurable", cc),
|
||||
bytesFetched("bytesFetched", cc),
|
||||
bytesInput("bytesInput", cc),
|
||||
bytesDurable("bytesDurable", cc),
|
||||
bytesFetched("bytesFetched", cc),
|
||||
mutationBytes("mutationBytes", cc),
|
||||
updateBatches("updateBatches", cc),
|
||||
updateVersions("updateVersions", cc),
|
||||
|
@ -417,10 +417,10 @@ public:
|
|||
: instanceID(g_random->randomUniqueID().first()),
|
||||
storage(this, storage), db(db),
|
||||
lastTLogVersion(0), lastVersionWithData(0),
|
||||
updateEagerReads(0),
|
||||
updateEagerReads(0),
|
||||
shardChangeCounter(0),
|
||||
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
|
||||
shuttingDown(false), readReplyRate(SERVER_KNOBS->STORAGE_LOGGING_DELAY / 2.0),
|
||||
shuttingDown(false), readReplyRate(SERVER_KNOBS->STORAGE_LOGGING_DELAY / 2.0),
|
||||
debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0),
|
||||
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
|
||||
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
|
||||
|
@ -503,8 +503,8 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
// If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end,
|
||||
// and H(key) < |key+value|/bytesPerSample,
|
||||
// If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end,
|
||||
// and H(key) < |key+value|/bytesPerSample,
|
||||
// let sampledSize = max(|key+value|,bytesPerSample)
|
||||
// persistByteSampleKeys.begin()+key := sampledSize is in storage
|
||||
// (key,sampledSize) is in byteSample
|
||||
|
@ -552,7 +552,7 @@ void validate(StorageServer* data, bool force = false) {
|
|||
if (force || (EXPENSIVE_VALIDATION)) {
|
||||
data->newestAvailableVersion.validateCoalesced();
|
||||
data->newestDirtyVersion.validateCoalesced();
|
||||
|
||||
|
||||
for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
|
||||
ASSERT( s->value()->keys == s->range() );
|
||||
ASSERT( !s->value()->keys.empty() );
|
||||
|
@ -611,7 +611,7 @@ ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
|
|||
}
|
||||
|
||||
if(g_random->random01() < 0.001)
|
||||
TraceEvent("WaitForVersion1000x");
|
||||
TraceEvent("WaitForVersion1000x");
|
||||
choose {
|
||||
when ( Void _ = wait( data->version.whenAtLeast(version) ) ) {
|
||||
//FIXME: A bunch of these can block with or without the following delay 0.
|
||||
|
@ -697,9 +697,9 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
|||
|
||||
debugMutation("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
|
||||
debugMutation("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
|
||||
|
||||
|
||||
/*
|
||||
StorageMetrics m;
|
||||
StorageMetrics m;
|
||||
m.bytesPerKSecond = req.key.size() + (v.present() ? v.get().size() : 0);
|
||||
m.iosPerKSecond = 1;
|
||||
data->metrics.notify(req.key, m);
|
||||
|
@ -738,7 +738,7 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
|||
if( req.debugID.present() )
|
||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
|
||||
loop {
|
||||
loop {
|
||||
try {
|
||||
state Version latest = data->data().latestVersion;
|
||||
state Future<Void> watchFuture = data->watches.onChange(req.key);
|
||||
|
@ -795,19 +795,19 @@ ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
|
|||
|
||||
ACTOR Future<Void> getShardState_impl( StorageServer* data, GetShardStateRequest req ) {
|
||||
ASSERT( req.mode != GetShardStateRequest::NO_WAIT );
|
||||
|
||||
|
||||
loop {
|
||||
std::vector<Future<Void>> onChange;
|
||||
|
||||
|
||||
for( auto t : data->shards.intersectingRanges( req.keys ) ) {
|
||||
if( !t.value()->assigned() ) {
|
||||
onChange.push_back( delay( SERVER_KNOBS->SHARD_READY_DELAY ) );
|
||||
break;
|
||||
}
|
||||
|
||||
if( req.mode == GetShardStateRequest::READABLE && !t.value()->isReadable() )
|
||||
if( req.mode == GetShardStateRequest::READABLE && !t.value()->isReadable() )
|
||||
onChange.push_back( t.value()->adding->readWrite.getFuture() );
|
||||
|
||||
|
||||
if( req.mode == GetShardStateRequest::FETCHING && !t.value()->isFetched() )
|
||||
onChange.push_back( t.value()->adding->fetchComplete.getFuture() );
|
||||
}
|
||||
|
@ -832,8 +832,8 @@ ACTOR Future<Void> getShardStateQ( StorageServer* data, GetShardStateRequest req
|
|||
return Void();
|
||||
}
|
||||
|
||||
void merge( Arena& arena, VectorRef<KeyValueRef>& output, VectorRef<KeyValueRef> const& base,
|
||||
StorageServer::VersionedData::iterator& start, StorageServer::VersionedData::iterator const& end,
|
||||
void merge( Arena& arena, VectorRef<KeyValueRef>& output, VectorRef<KeyValueRef> const& base,
|
||||
StorageServer::VersionedData::iterator& start, StorageServer::VersionedData::iterator const& end,
|
||||
int versionedDataCount, int limit, bool stopAtEndOfBase, int limitBytes = 1<<30 )
|
||||
// Combines data from base (at an older version) with sets from newer versions in [start, end) and appends the first (up to) |limit| rows to output
|
||||
// If limit<0, base and output are in descending order, and start->key()>end->key(), but start is still inclusive and end is exclusive
|
||||
|
@ -891,9 +891,9 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
|
|||
// if (limit >= 0) we are reading forward, else backward
|
||||
|
||||
if (limit >= 0) {
|
||||
// We might care about a clear beginning before start that
|
||||
// We might care about a clear beginning before start that
|
||||
// runs into range
|
||||
vStart = view.lastLessOrEqual(range.begin);
|
||||
vStart = view.lastLessOrEqual(range.begin);
|
||||
if (vStart && vStart->isClearTo() && vStart->getEndKey() > range.begin)
|
||||
readBegin = vStart->getEndKey();
|
||||
else
|
||||
|
@ -931,7 +931,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
|
|||
|
||||
// Read the data on disk up to vEnd (or the end of the range)
|
||||
readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
|
||||
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
|
||||
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
|
||||
data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
|
||||
|
||||
/*if (track) {
|
||||
|
@ -963,7 +963,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
|
|||
.detail("limit", limit).detail("*pLimitBytes", *pLimitBytes).detail("resultSize", result.data.size()).detail("prevSize", prevSize);
|
||||
readBegin = readBeginTemp = keyAfter( result.data.end()[-1].key );
|
||||
ASSERT( limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key );
|
||||
} else if (vStart && vStart->isClearTo()){ // if vStart is a clear, skip it.
|
||||
} else if (vStart && vStart->isClearTo()){ // if vStart is a clear, skip it.
|
||||
//if (track) printf("skip clear\n");
|
||||
readBegin = vStart->getEndKey(); // next disk read should start at the end of the clear
|
||||
++vStart;
|
||||
|
@ -981,7 +981,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
|
|||
.detail("end", printable(range.end) )
|
||||
.detail("limitReamin", limit)
|
||||
.detail("lBytesRemain", *pLimitBytes); */
|
||||
|
||||
|
||||
/*GetKeyValuesReply correct = wait( readRangeOld(data, version, range, originalLimit, originalLimitBytes) );
|
||||
bool prefix_equal = true;
|
||||
int totalsize = 0;
|
||||
|
@ -1100,7 +1100,7 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
|
|||
}
|
||||
|
||||
int index = distance-1;
|
||||
if (skipEqualKey && rep.data.size() && rep.data[0].key == sel.getKey() )
|
||||
if (skipEqualKey && rep.data.size() && rep.data[0].key == sel.getKey() )
|
||||
++index;
|
||||
|
||||
if (index < rep.data.size()) {
|
||||
|
@ -1128,7 +1128,7 @@ ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version vers
|
|||
}
|
||||
}
|
||||
|
||||
KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
|
||||
KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
|
||||
// Returns largest range such that the shard state isReadable and selectorInRange(sel, range) or wrong_shard_server if no such range exists
|
||||
{
|
||||
auto i = sel.isBackward() ? data->shards.rangeContainingKeyBefore( sel.getKey() ) : data->shards.rangeContaining( sel.getKey() );
|
||||
|
@ -1137,7 +1137,7 @@ KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
|
|||
return i->range();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
||||
ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
||||
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents
|
||||
// all data from being read in one range read
|
||||
{
|
||||
|
@ -1221,7 +1221,7 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
|||
}
|
||||
|
||||
/*for( int i = 0; i < r.data.size(); i++ ) {
|
||||
StorageMetrics m;
|
||||
StorageMetrics m;
|
||||
m.bytesPerKSecond = r.data[i].expectedSize();
|
||||
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
|
||||
data->metrics.notify(r.data[i].key, m);
|
||||
|
@ -1262,13 +1262,13 @@ ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
|
|||
|
||||
state int offset;
|
||||
Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
|
||||
|
||||
|
||||
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
|
||||
data->readReplyRate.addDelta(1);
|
||||
|
||||
KeySelector updated;
|
||||
if (offset < 0)
|
||||
updated = firstGreaterOrEqual(k)+offset; // first thing on this shard OR (large offset case) smallest key retrieved in range read
|
||||
updated = firstGreaterOrEqual(k)+offset; // first thing on this shard OR (large offset case) smallest key retrieved in range read
|
||||
else if (offset > 0)
|
||||
updated = firstGreaterOrEqual(k)+offset-1; // first thing on next shard OR (large offset case) keyAfter largest key retrieved in range read
|
||||
else
|
||||
|
@ -1301,7 +1301,7 @@ void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const&
|
|||
reply.readReplyRate = self->readReplyRate.smoothRate();
|
||||
|
||||
reply.storageBytes = self->storage.getStorageBytes();
|
||||
|
||||
|
||||
reply.v = self->version.get();
|
||||
req.reply.send( reply );
|
||||
}
|
||||
|
@ -1408,12 +1408,12 @@ bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion )
|
|||
Optional<MutationRef> clipMutation( MutationRef const& m, KeyRangeRef range ) {
|
||||
if (isSingleKeyMutation((MutationRef::Type) m.type)) {
|
||||
if (range.contains(m.param1)) return m;
|
||||
}
|
||||
}
|
||||
else if (m.type == MutationRef::ClearRange) {
|
||||
KeyRangeRef i = range & KeyRangeRef(m.param1, m.param2);
|
||||
if (!i.empty())
|
||||
return MutationRef( (MutationRef::Type)m.type, i.begin, i.end );
|
||||
}
|
||||
}
|
||||
else
|
||||
ASSERT(false);
|
||||
return Optional<MutationRef>();
|
||||
|
@ -1460,7 +1460,7 @@ bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, U
|
|||
ASSERT( m.param2 > m.param1 );
|
||||
}
|
||||
else if (m.type != MutationRef::SetValue && (m.type)) {
|
||||
|
||||
|
||||
StringRef oldVal;
|
||||
auto it = data.atLatest().lastLessOrEqual(m.param1);
|
||||
if (it != data.atLatest().end() && it->isValue() && it.key() == m.param1)
|
||||
|
@ -1475,7 +1475,7 @@ bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, U
|
|||
}
|
||||
|
||||
switch(m.type) {
|
||||
case MutationRef::AddValue:
|
||||
case MutationRef::AddValue:
|
||||
m.param2 = doLittleEndianAdd(oldVal, m.param2, ar);
|
||||
break;
|
||||
case MutationRef::And:
|
||||
|
@ -1511,7 +1511,7 @@ bool isClearContaining( StorageServer::VersionedData::ViewAtVersion const& view,
|
|||
void applyMutation( StorageServer *self, MutationRef const& m, Arena& arena, StorageServer::VersionedData &data ) {
|
||||
// m is expected to be in arena already
|
||||
// Clear split keys are added to arena
|
||||
StorageMetrics metrics;
|
||||
StorageMetrics metrics;
|
||||
metrics.bytesPerKSecond = mvccStorageBytes( m ) / 2;
|
||||
metrics.iosPerKSecond = 1;
|
||||
self->metrics.notify(m.param1, metrics);
|
||||
|
@ -1568,7 +1568,7 @@ void removeDataRange( StorageServer *ss, Standalone<VersionUpdateRef> &mLV, KeyR
|
|||
|
||||
auto beginClear = data.atLatest().lastLess( range.begin );
|
||||
if (beginClear && beginClear->isClearTo() && beginClear->getEndKey() > range.begin ) {
|
||||
// We don't need any special mutationLog entry - because the begin key and insert version are unchanged the original clear
|
||||
// We don't need any special mutationLog entry - because the begin key and insert version are unchanged the original clear
|
||||
// mutation works to forget this one - but we need range.begin in the right arena
|
||||
KeyRef rb( mLV.arena(), range.begin );
|
||||
// insert() invalidates beginClear, so beginClear.key() is not safe to pass to it by reference
|
||||
|
@ -1639,7 +1639,7 @@ ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version versi
|
|||
}
|
||||
|
||||
output.more = limits.isReached();
|
||||
|
||||
|
||||
return output;
|
||||
} else if( rep.readThrough.present() ) {
|
||||
output.arena().dependsOn( rep.arena() );
|
||||
|
@ -1745,7 +1745,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
|
||||
Void _ = wait( data->fetchKeysParallelismLock.take( TaskDefaultYield, fetchBlockBytes ) );
|
||||
state FlowLock::Releaser holdingFKPL( data->fetchKeysParallelismLock, fetchBlockBytes );
|
||||
|
||||
|
||||
Void _ = wait(delay(0));
|
||||
|
||||
shard->phase = AddingShard::Fetching;
|
||||
|
@ -1761,11 +1761,11 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
loop {
|
||||
try {
|
||||
TEST(true); // Fetching keys for transferred shard
|
||||
|
||||
|
||||
state Standalone<RangeResultRef> this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isPastVersion ) );
|
||||
|
||||
|
||||
int expectedSize = (int)this_block.expectedSize() + (8-(int)sizeof(KeyValueRef))*this_block.size();
|
||||
|
||||
|
||||
TraceEvent(SevDebug, "FetchKeysBlock", data->thisServerID).detail("FKID", interval.pairID)
|
||||
.detail("BlockRows", this_block.size()).detail("BlockBytes", expectedSize)
|
||||
.detail("KeyBegin", printable(keys.begin)).detail("KeyEnd", printable(keys.end))
|
||||
|
@ -1857,10 +1857,10 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
TraceEvent(SevDebug, "FKBeforeFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
|
||||
// Directly commit()ing the IKVS would interfere with updateStorage, possibly resulting in an incomplete version being recovered.
|
||||
// Instead we wait for the updateStorage loop to commit something (and consequently also what we have written)
|
||||
|
||||
|
||||
Void _ = wait( data->durableVersion.whenAtLeast( data->storageVersion()+1 ) );
|
||||
holdingFKPL.release();
|
||||
|
||||
|
||||
TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
|
||||
|
||||
// Wait to run during update(), after a new batch of versions is received from the tlog but before eager reads take place.
|
||||
|
@ -1928,7 +1928,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
TraceEvent(SevDebug, interval.end(), data->thisServerID);
|
||||
} catch (Error &e){
|
||||
TraceEvent(SevDebug, interval.end(), data->thisServerID).error(e, true).detail("Version", data->version.get());
|
||||
|
||||
|
||||
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
|
||||
if (shard->phase < AddingShard::Waiting) {
|
||||
data->storage.clearRange( keys );
|
||||
|
@ -1958,7 +1958,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
AddingShard::AddingShard( StorageServer* server, KeyRangeRef const& keys )
|
||||
: server(server), keys(keys), transferredVersion(invalidVersion), phase(WaitPrevious)
|
||||
{
|
||||
fetchClient = fetchKeys(server, this);
|
||||
fetchClient = fetchKeys(server, this);
|
||||
}
|
||||
|
||||
void AddingShard::addMutation( Version version, MutationRef const& mutation ){
|
||||
|
@ -2121,7 +2121,7 @@ void rollback( StorageServer* data, Version rollbackVersion, Version nextVersion
|
|||
debugKeyRange("Rollback", rollbackVersion, allKeys);
|
||||
|
||||
// We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
|
||||
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
|
||||
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
|
||||
// forward from the TLog's history. It's not quite as efficient, but we rarely have to do this in practice.
|
||||
|
||||
// FIXME: This code is relying for liveness on an undocumented property of the log system implementation: that after a rollback the rolled back versions will
|
||||
|
@ -2140,7 +2140,7 @@ void StorageServer::addMutation(Version version, MutationRef const& mutation, Ke
|
|||
}
|
||||
expanded = addMutationToMutationLog(mLog, expanded);
|
||||
if (debugMutation("expandedMutation", version, expanded)) {
|
||||
const char* type =
|
||||
const char* type =
|
||||
mutation.type == MutationRef::SetValue ? "SetValue" :
|
||||
mutation.type == MutationRef::ClearRange ? "ClearRange" :
|
||||
mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" :
|
||||
|
@ -2178,7 +2178,7 @@ public:
|
|||
|
||||
void applyMutation(StorageServer* data, MutationRef const& m, Version ver) {
|
||||
//TraceEvent("SSNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver);
|
||||
|
||||
|
||||
if(currentVersion != ver) {
|
||||
fromVersion = currentVersion;
|
||||
currentVersion = ver;
|
||||
|
@ -2307,7 +2307,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
state FlowLock::Releaser holdingDVL( data->durableVersionLock );
|
||||
if(now() - start > 0.1)
|
||||
TraceEvent("SSSlowTakeLock1", data->thisServerID).detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken).detail("Duration", now() - start).detail("Version", data->version.get());
|
||||
|
||||
|
||||
start = now();
|
||||
state UpdateEagerReadInfo eager;
|
||||
state FetchInjectionInfo fii;
|
||||
|
@ -2325,7 +2325,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
cloneCursor2 = cursor->cloneNoMore();
|
||||
|
||||
cloneCursor1->setProtocolVersion(data->logProtocol);
|
||||
|
||||
|
||||
for (; cloneCursor1->hasMessage(); cloneCursor1->nextMessage()) {
|
||||
ArenaReader& cloneReader = *cloneCursor1->reader();
|
||||
|
||||
|
@ -2376,7 +2376,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
|
||||
data->updateEagerReads = &eager;
|
||||
data->debug_inApplyUpdate = true;
|
||||
|
||||
|
||||
StorageUpdater updater(data->lastVersionWithData, std::max( std::max(data->desiredOldestVersion.get(), data->oldestVersion.get()), minNewOldestVersion ));
|
||||
|
||||
if (EXPENSIVE_VALIDATION) data->data().atLatest().validate();
|
||||
|
@ -2459,7 +2459,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
|
||||
validate(data);
|
||||
|
||||
data->logCursor->advanceTo( cloneCursor2->version() );
|
||||
data->logCursor->advanceTo( cloneCursor2->version() );
|
||||
if(cursor->version().version >= data->lastTLogVersion) {
|
||||
if(data->behind) {
|
||||
TraceEvent("StorageServerNoLongerBehind", data->thisServerID).detail("CursorVersion", cursor->version().version).detail("TLogVersion", data->lastTLogVersion);
|
||||
|
@ -2581,11 +2581,11 @@ void setAssignedStatus( StorageServer* self, KeyRangeRef keys, bool nowAssigned
|
|||
ASSERT( !keys.empty() );
|
||||
auto& mLV = self->addVersionToMutationLog( self->data().getLatestVersion() );
|
||||
KeyRange assignedKeys = KeyRangeRef(
|
||||
persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
|
||||
persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
|
||||
persistShardAssignedKeys.begin.toString() + keys.end.toString() );
|
||||
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", printable(assignedKeys.begin)).detail("RangeEnd", printable(assignedKeys.end));
|
||||
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end ) );
|
||||
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.begin,
|
||||
self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.begin,
|
||||
nowAssigned ? LiteralStringRef("1") : LiteralStringRef("0") ) );
|
||||
if (keys.end != allKeys.end) {
|
||||
bool endAssigned = self->shards.rangeContaining( keys.end )->value()->assigned();
|
||||
|
@ -2669,9 +2669,9 @@ ACTOR Future<Void> applyByteSampleResult( StorageServer* data, KeyRange range, F
|
|||
|
||||
ACTOR Future<Void> restoreByteSample(StorageServer* data, IKeyValueStore* storage, Standalone<VectorRef<KeyValueRef>> bsSample) {
|
||||
Void _ = wait( delay( BUGGIFY ? g_random->random01() * 2.0 : 0.0001 ) );
|
||||
|
||||
|
||||
TraceEvent("RecoveredByteSampleSample", data->thisServerID).detail("Keys", bsSample.size()).detail("ReadBytes", bsSample.expectedSize());
|
||||
|
||||
|
||||
size_t bytes_per_fetch = 0;
|
||||
// Since the expected size also includes (as of now) the space overhead of the container, we calculate our own number here
|
||||
for( int i = 0; i < bsSample.size(); i++ )
|
||||
|
@ -2698,7 +2698,7 @@ ACTOR Future<Void> restoreByteSample(StorageServer* data, IKeyValueStore* storag
|
|||
|
||||
Void _ = wait( waitForAll( sampleRanges ) );
|
||||
TraceEvent("RecoveredByteSampleChunkedRead", data->thisServerID).detail("Ranges",sampleRanges.size());
|
||||
|
||||
|
||||
if( BUGGIFY )
|
||||
Void _ = wait( delay( g_random->random01() * 10.0 ) );
|
||||
|
||||
|
@ -2806,14 +2806,14 @@ ByteSampleInfo isKeyValueInSample(KeyValueRef keyValue) {
|
|||
const KeyRef key = keyValue.key;
|
||||
info.size = key.size() + keyValue.value.size();
|
||||
|
||||
uint32_t a = 0;
|
||||
uint32_t a = 0;
|
||||
uint32_t b = 0;
|
||||
hashlittle2( key.begin(), key.size(), &a, &b );
|
||||
|
||||
double probability = (double)info.size / (key.size() + SERVER_KNOBS->BYTE_SAMPLING_OVERHEAD) / SERVER_KNOBS->BYTE_SAMPLING_FACTOR;
|
||||
info.inSample = a / ((1 << 30) * 4.0) < probability;
|
||||
info.sampledSize = info.size / std::min(1.0, probability);
|
||||
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
|
@ -3030,7 +3030,7 @@ ACTOR Future<Void> logLongByteSampleRecovery(Future<Void> recovery) {
|
|||
TraceEvent(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LongByteSampleRecovery");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -3047,7 +3047,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
actors.add(self->otherError.getFuture());
|
||||
actors.add(metricsCore(self, ssi));
|
||||
actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
|
||||
|
||||
|
||||
self->coreStarted.send( Void() );
|
||||
|
||||
loop {
|
||||
|
@ -3122,7 +3122,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
}
|
||||
when( Void _ = wait(doUpdate) ) {
|
||||
updateReceived = false;
|
||||
if (!self->logSystem)
|
||||
if (!self->logSystem)
|
||||
doUpdate = Never();
|
||||
else
|
||||
doUpdate = update( self, &updateReceived );
|
||||
|
@ -3139,13 +3139,13 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
|
|||
self.shards.insert( allKeys, Reference<ShardInfo>() );
|
||||
|
||||
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just close it.
|
||||
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed)
|
||||
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed)
|
||||
persistentData->dispose();
|
||||
else
|
||||
persistentData->close();
|
||||
|
||||
if ( e.code() == error_code_worker_removed ||
|
||||
e.code() == error_code_recruitment_failed ||
|
||||
|
||||
if ( e.code() == error_code_worker_removed ||
|
||||
e.code() == error_code_recruitment_failed ||
|
||||
e.code() == error_code_file_not_found ||
|
||||
e.code() == error_code_actor_cancelled )
|
||||
{
|
||||
|
@ -3220,7 +3220,7 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
|
|||
when ( Void _ = wait(infoChanged) ) {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -3300,7 +3300,7 @@ Possibilities:
|
|||
void versionedMapTest() {
|
||||
VersionedMap<int,int> vm;
|
||||
|
||||
printf("SS Ptree node is %d bytes\n", sizeof( StorageServer::VersionedData::PTreeT ) );
|
||||
printf("SS Ptree node is %lu bytes\n", sizeof( StorageServer::VersionedData::PTreeT ) );
|
||||
|
||||
const int NSIZE = sizeof(VersionedMap<int,int>::PTreeT);
|
||||
const int ASIZE = NSIZE<=64 ? 64 : NextPowerOfTwo<NSIZE>::Result;
|
||||
|
@ -3327,6 +3327,6 @@ void versionedMapTest() {
|
|||
|
||||
printf("PTree node is %d bytes, allocated as %d bytes\n", NSIZE, ASIZE);
|
||||
printf("%d distinct after %d insertions\n", count, 1000*1000);
|
||||
printf("Memory used: %f MB\n",
|
||||
printf("Memory used: %f MB\n",
|
||||
(after - before)/ 1e6);
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload
|
|||
if(maxOperationSize * numSimultaneousOperations > targetFileSize * 0.25)
|
||||
{
|
||||
targetFileSize *= (int)ceil((maxOperationSize * numSimultaneousOperations * 4.0) / targetFileSize);
|
||||
printf("Target file size is insufficient to support %d simultaneous operations of size %d; changing to %ld\n", numSimultaneousOperations, maxOperationSize, targetFileSize);
|
||||
printf("Target file size is insufficient to support %d simultaneous operations of size %d; changing to %lld\n", numSimultaneousOperations, maxOperationSize, targetFileSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,7 +207,7 @@ struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload
|
|||
//If we know what data should be in a particular range, then compare the result with what we know
|
||||
if(isValid && memcmp(&self->fileValidityMask[info.offset + start], &info.data->buffer[start], i - start))
|
||||
{
|
||||
printf("Read returned incorrect results at %d of length %d\n", info.offset, info.length);
|
||||
printf("Read returned incorrect results at %llu of length %llu\n", info.offset, info.length);
|
||||
|
||||
self->success = false;
|
||||
return Void();
|
||||
|
@ -416,7 +416,7 @@ struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload
|
|||
|
||||
if(numRead != std::min(info.length, self->fileSize - info.offset))
|
||||
{
|
||||
printf("Read reported incorrect number of bytes at %d of length %d\n", info.offset, info.length);
|
||||
printf("Read reported incorrect number of bytes at %llu of length %llu\n", info.offset, info.length);
|
||||
self->success = false;
|
||||
}
|
||||
}
|
||||
|
@ -457,12 +457,12 @@ struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload
|
|||
int64_t fileSizeChange = fileSize - self->fileSize;
|
||||
if(fileSizeChange >= _PAGE_SIZE)
|
||||
{
|
||||
printf("Reopened file increased in size by %d bytes (at most %d allowed)\n", fileSizeChange, _PAGE_SIZE - 1);
|
||||
printf("Reopened file increased in size by %lld bytes (at most %d allowed)\n", fileSizeChange, _PAGE_SIZE - 1);
|
||||
self->success = false;
|
||||
}
|
||||
else if(fileSizeChange < 0)
|
||||
{
|
||||
printf("Reopened file decreased in size by %d bytes\n", -fileSizeChange);
|
||||
printf("Reopened file decreased in size by %lld bytes\n", -fileSizeChange);
|
||||
self->success = false;
|
||||
}
|
||||
|
||||
|
@ -510,4 +510,3 @@ struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload
|
|||
};
|
||||
|
||||
WorkloadFactory<AsyncFileCorrectnessWorkload> AsyncFileCorrectnessWorkloadFactory("AsyncFileCorrectness");
|
||||
|
||||
|
|
Loading…
Reference in New Issue