ReplicationPolicy:Add trace for the name of each keyIndex
This commit is contained in:
parent
4fae510633
commit
c6e42d6119
|
@ -477,6 +477,8 @@ public:
|
|||
virtual Reference<StringToIntMap> const& getGroupKeyMap() const
|
||||
{ return _localitygroup->getGroupKeyMap(); }
|
||||
|
||||
Reference<StringToIntMap> _keymap;
|
||||
|
||||
protected:
|
||||
virtual Reference<StringToIntMap>& getGroupValueMap()
|
||||
{ return _localitygroup->getGroupValueMap(); }
|
||||
|
@ -491,7 +493,7 @@ protected:
|
|||
|
||||
std::vector<AttribKey> _keyIndexArray;
|
||||
std::vector<LocalityCacheRecord> _cacheArray;
|
||||
Reference<StringToIntMap> _keymap;
|
||||
|
||||
LocalitySet* _localitygroup;
|
||||
long long unsigned int _cachehits;
|
||||
long long unsigned int _cachemisses;
|
||||
|
|
|
@ -113,18 +113,14 @@ bool PolicyOne::selectReplicas(
|
|||
int itemsUsed = 0;
|
||||
if (alsoServers.size()) {
|
||||
totalUsed ++;
|
||||
}
|
||||
else if (fromServers->size()) {
|
||||
} else if (fromServers->size()) {
|
||||
auto randomEntry = fromServers->random();
|
||||
results.push_back(randomEntry);
|
||||
itemsUsed ++;
|
||||
totalUsed ++;
|
||||
if (g_replicationdebug > 5) {
|
||||
printf("One added:%4d %33s entry: %s\n", itemsUsed, "", fromServers->getEntryInfo(randomEntry).c_str());
|
||||
}
|
||||
}
|
||||
if (g_replicationdebug > 2) {
|
||||
printf("One used:%5d results:%3d from %3d servers\n", totalUsed, itemsUsed, fromServers->size());
|
||||
if (g_replicationdebug > 0) {
|
||||
printf("PolicyOne used:%5d results:%3d from %3d servers\n", totalUsed, itemsUsed, fromServers->size());
|
||||
}
|
||||
return (totalUsed > 0);
|
||||
}
|
||||
|
@ -263,6 +259,8 @@ bool PolicyAcross::validate(
|
|||
// fromserverse are the servers that have already been chosen and
|
||||
// that should be excluded from being selected as replicas.
|
||||
// FIXME: Simplify this function, such as removing unnecessary printf
|
||||
// fromServers are the servers that must have;
|
||||
// alsoServers are the servers you can choose.
|
||||
bool PolicyAcross::selectReplicas(
|
||||
Reference<LocalitySet> & fromServers,
|
||||
std::vector<LocalityEntry> const& alsoServers,
|
||||
|
@ -279,9 +277,10 @@ bool PolicyAcross::selectReplicas(
|
|||
_newResults.clear();
|
||||
_addedResults.resize(_arena, 0);
|
||||
|
||||
if ((g_replicationdebug > 3) && (alsoServers.size())) {
|
||||
if (g_replicationdebug > 0) {
|
||||
printf("Across !also:%4lu key: %-7s policy: %-10s => %s\n", alsoServers.size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str());
|
||||
}
|
||||
// Q: What does this for loop do???
|
||||
for (auto& alsoServer : alsoServers) {
|
||||
auto value = fromServers->getValueViaGroupKey(alsoServer, groupIndexKey);
|
||||
if (value.present()) {
|
||||
|
@ -289,14 +288,18 @@ bool PolicyAcross::selectReplicas(
|
|||
if ((lowerBound == _usedValues.end()) || (*lowerBound != value.get())) {
|
||||
//_selected is a set of processes that have the same indexKey and indexValue (value)
|
||||
_selected = fromServers->restrict(indexKey, value.get());
|
||||
if (_selected->size()) {
|
||||
// Pass only the also array item which are valid for the value
|
||||
if (g_replicationdebug > 5) {
|
||||
if (g_replicationdebug > 0) {
|
||||
if (_selected->size() > 0) {
|
||||
// entry is the locality entry info (entryValue) from the to-be-selected team member alsoServer
|
||||
printf("Across !select key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(),
|
||||
value.get()._id, fromServers->valueText(value.get()).c_str(),
|
||||
fromServers->getEntryInfo(alsoServer).c_str());
|
||||
value.get()._id, fromServers->valueText(value.get()).c_str(),
|
||||
fromServers->getEntryInfo(alsoServer).c_str());
|
||||
} else {
|
||||
printf("Across !select empty\n");
|
||||
}
|
||||
}
|
||||
if (_selected->size()) {
|
||||
// Pass only the also array item which are valid for the value
|
||||
resultsSize = _newResults.size();
|
||||
if (_policy->selectReplicas(_selected, alsoServers, _newResults))
|
||||
{
|
||||
|
@ -307,30 +310,11 @@ bool PolicyAcross::selectReplicas(
|
|||
else {
|
||||
_addedResults.push_back(_arena, std::pair<int, int>(resultsAdded, resultsSize));
|
||||
}
|
||||
if (g_replicationdebug > 5) {
|
||||
printf("Across !added:%3d key: %-7s count:%3d of%3d value: (%3d) %-10s entry: %s\n",
|
||||
resultsAdded, _attribKey.c_str(), count, _count, value.get()._id,
|
||||
fromServers->valueText(value.get()).c_str(),
|
||||
fromServers->getEntryInfo(alsoServer).c_str());
|
||||
}
|
||||
if (count >= _count) break;
|
||||
_usedValues.insert(lowerBound, value.get());
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across !no answer key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
|
||||
}
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across !empty set key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
|
||||
}
|
||||
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across !duplicate key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
|
||||
}
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across !no value key: %-7s %21s entry: %s\n", _attribKey.c_str(), "", fromServers->getEntryInfo(alsoServer).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -339,11 +323,11 @@ bool PolicyAcross::selectReplicas(
|
|||
// Sort the added results array
|
||||
std::sort(_addedResults.begin(), _addedResults.end(), PolicyAcross::compareAddedResults);
|
||||
|
||||
if (g_replicationdebug > 2) {
|
||||
if (g_replicationdebug > 0) {
|
||||
printf("Across !add sets key: %-7s sets:%3d results:%3lu count:%3d of%3d\n", _attribKey.c_str(), _addedResults.size(), _newResults.size(), count, _count);
|
||||
}
|
||||
|
||||
if (g_replicationdebug > 6) {
|
||||
if (g_replicationdebug > 0) {
|
||||
LocalitySet::staticDisplayEntries(fromServers, alsoServers, "also");
|
||||
LocalitySet::staticDisplayEntries(fromServers, results, "results");
|
||||
LocalitySet::staticDisplayEntries(fromServers, _newResults, "add items");
|
||||
|
@ -351,14 +335,14 @@ bool PolicyAcross::selectReplicas(
|
|||
|
||||
for (auto& addedResult : _addedResults) {
|
||||
count ++;
|
||||
if (g_replicationdebug > 2) {
|
||||
if (g_replicationdebug > 0) {
|
||||
printf("Across !add set key: %-7s count:%3d of%3d results:%3d index:%3d\n", _attribKey.c_str(), count, _count, addedResult.first, addedResult.second);
|
||||
}
|
||||
results.reserve(results.size() + addedResult.first);
|
||||
results.insert(results.end(), _newResults.begin()+addedResult.second, _newResults.begin()+addedResult.second+addedResult.first);
|
||||
if (count >= _count) break;
|
||||
}
|
||||
if (g_replicationdebug > 7) {
|
||||
if (g_replicationdebug > 0) {
|
||||
LocalitySet::staticDisplayEntries(fromServers, results, "results");
|
||||
}
|
||||
}
|
||||
|
@ -366,14 +350,14 @@ bool PolicyAcross::selectReplicas(
|
|||
// Cannot find replica from the least used alsoServers, now try to find replicas from all servers
|
||||
// Process the remaining values
|
||||
if (count < _count) {
|
||||
if (g_replicationdebug > 3) {
|
||||
if (g_replicationdebug > 0) {
|
||||
printf("Across items:%4d key: %-7s policy: %-10s => %s count:%3d of%3d\n", fromServers->size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str(), count, _count);
|
||||
}
|
||||
int recordIndex;
|
||||
// Use mutable array so that swaps does not affect actual element array
|
||||
auto& mutableArray = fromServers->getMutableEntries();
|
||||
for (int checksLeft = fromServers->size(); checksLeft > 0; checksLeft --) {
|
||||
if (g_replicationdebug > 6) {
|
||||
if (g_replicationdebug > 0) {
|
||||
LocalitySet::staticDisplayEntries(fromServers, mutableArray, "mutable");
|
||||
}
|
||||
recordIndex = deterministicRandom()->randomInt(0, checksLeft);
|
||||
|
@ -402,36 +386,21 @@ bool PolicyAcross::selectReplicas(
|
|||
if (count >= _count) break;
|
||||
_usedValues.insert(lowerBound, value.get());
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across no answer key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(entry).c_str());
|
||||
}
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across empty set:%3d key: %-7s value: (%3d) %-10s entry: %s index:%4d\n", fromServers->size()-checksLeft+1, _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(entry).c_str(), recordIndex);
|
||||
}
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across duplicate key: %-7s value: (%3d) %-10s entry: %s attempt:%3d index:%4d\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(entry).c_str(), fromServers->size()-checksLeft+1, recordIndex);
|
||||
}
|
||||
}
|
||||
else if (g_replicationdebug > 5) {
|
||||
printf("Across no value key: %-7s %21s entry: %s attempt:%3d index:%4d\n", _attribKey.c_str(), "", fromServers->getEntryInfo(entry).c_str(), fromServers->size()-checksLeft+1, recordIndex);
|
||||
}
|
||||
if (recordIndex != checksLeft-1) {
|
||||
if (g_replicationdebug > 5) {
|
||||
printf("Across swap key: %-7s index:%4d last:%4d entry: %s\n", _attribKey.c_str(), recordIndex, checksLeft-1, fromServers->getEntryInfo(entry).c_str());
|
||||
}
|
||||
fromServers->swapMutableRecords(recordIndex, checksLeft-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Clear the return array, if not satified
|
||||
if (count < _count) {
|
||||
if (g_replicationdebug > 4) printf("Across result count: %d < %d requested\n", count, _count);
|
||||
if (g_replicationdebug > 0) printf("Across result count: %d < %d requested\n", count, _count);
|
||||
results.resize(resultsInit);
|
||||
count = 0;
|
||||
}
|
||||
if (g_replicationdebug > 2) {
|
||||
if (g_replicationdebug > 0) {
|
||||
printf("Across used:%5lu results:%3d from %3d items key: %-7s policy: %-10s => %s\n", results.size()-resultsInit, count, fromServers->size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str());
|
||||
}
|
||||
return (count >= _count);
|
||||
|
@ -465,14 +434,8 @@ bool PolicyAnd::selectReplicas(
|
|||
}
|
||||
|
||||
for (auto& policy : _sortedPolicies) {
|
||||
if (g_replicationdebug > 3) {
|
||||
printf("And also:%5lu used: %4lu from %3d items policy: %-10s => %s\n", newResults.size(), newResults.size()-alsoServers.size(), fromServers->size(), policy->name().c_str(), policy->info().c_str());
|
||||
}
|
||||
if (!policy->selectReplicas(fromServers, newResults, newResults))
|
||||
{
|
||||
if (g_replicationdebug > 3) {
|
||||
printf("And failed set:%4d policy: %-10s => %s\n", fromServers->size(), policy->name().c_str(), policy->info().c_str());
|
||||
}
|
||||
passed = false;
|
||||
break;
|
||||
}
|
||||
|
@ -482,9 +445,6 @@ bool PolicyAnd::selectReplicas(
|
|||
results.insert(results.end(), newResults.begin()+alsoServers.size(), newResults.end());
|
||||
}
|
||||
|
||||
if (g_replicationdebug > 2) {
|
||||
printf("And used:%5lu results:%3lu from %3d items\n", newResults.size()-alsoServers.size(), results.size(), fromServers->size());
|
||||
}
|
||||
return passed;
|
||||
}
|
||||
|
||||
|
|
|
@ -1249,10 +1249,27 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
}
|
||||
}
|
||||
|
||||
void traceLocalityArrayIndexName(Reference<LocalityRecord> record) {
|
||||
TraceEvent("LocalityRecordKeyName");
|
||||
for (int i = 0; i < machineLocalityMap._keymap->_lookuparray.size(); ++i) {
|
||||
TraceEvent("LocalityRecordKeyIndexName")
|
||||
.detail("KeyIndex", i)
|
||||
.detail("KeyName", machineLocalityMap._keymap->_lookuparray[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void traceMachineLocalityMap() {
|
||||
int i = 0;
|
||||
|
||||
TraceEvent("MachineLocalityMap").detail("Size", machineLocalityMap.size());
|
||||
for (auto& uid : machineLocalityMap.getObjects()) {
|
||||
Reference<LocalityRecord> record = machineLocalityMap.getRecord(i);
|
||||
if (record.isValid()) {
|
||||
// Record the Locality KeyIndex and name so that we know what each key means
|
||||
traceLocalityArrayIndexName(record);
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (auto& uid : machineLocalityMap.getObjects()) {
|
||||
Reference<LocalityRecord> record = machineLocalityMap.getRecord(i);
|
||||
if (record.isValid()) {
|
||||
|
@ -1270,7 +1287,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
}
|
||||
|
||||
// To enable verbose debug info, set shouldPrint to true
|
||||
void traceAllInfo(bool shouldPrint = false) {
|
||||
void traceAllInfo(bool shouldPrint = true) {
|
||||
if (!shouldPrint) return;
|
||||
|
||||
TraceEvent("TraceAllInfo").detail("Primary", primary);
|
||||
|
@ -1302,6 +1319,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
machine->second->localityEntry = localityEntry;
|
||||
++numHealthyMachine;
|
||||
}
|
||||
TraceEvent("RebuildMachineLocalityMapDebug").detail("NumHealthyMachine", numHealthyMachine);
|
||||
}
|
||||
|
||||
// Create machineTeamsToBuild number of machine teams
|
||||
|
@ -1354,33 +1372,39 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
std::vector<UID*> team;
|
||||
std::vector<LocalityEntry> forcedAttributes;
|
||||
|
||||
// Step 3: Create a representative process for each machine.
|
||||
// Construct forcedAttribute from leastUsedMachines.
|
||||
// We will use forcedAttribute to call existing function to form a team
|
||||
if (leastUsedMachines.size()) {
|
||||
// Randomly choose 1 least used machine
|
||||
Reference<TCMachineInfo> tcMachineInfo = deterministicRandom()->randomChoice(leastUsedMachines);
|
||||
ASSERT(!tcMachineInfo->serversOnMachine.empty());
|
||||
LocalityEntry process = tcMachineInfo->localityEntry;
|
||||
forcedAttributes.push_back(process);
|
||||
} else {
|
||||
// when leastUsedMachine is empty, we will never find a team later, so we can simply return.
|
||||
return addedMachineTeams;
|
||||
}
|
||||
|
||||
// Step 4: Reuse Policy's selectReplicas() to create team for the representative process.
|
||||
std::vector<UID*> bestTeam;
|
||||
int bestScore = std::numeric_limits<int>::max();
|
||||
int maxAttempts = SERVER_KNOBS->BEST_OF_AMT; // BEST_OF_AMT = 4
|
||||
for (int i = 0; i < maxAttempts && i < 100; ++i) {
|
||||
// Step 3: Create a representative process for each machine.
|
||||
// Construct forcedAttribute from leastUsedMachines.
|
||||
// We will use forcedAttribute to call existing function to form a team
|
||||
if (leastUsedMachines.size()) {
|
||||
forcedAttributes.clear();
|
||||
// Randomly choose 1 least used machine
|
||||
Reference<TCMachineInfo> tcMachineInfo = deterministicRandom()->randomChoice(leastUsedMachines);
|
||||
ASSERT(!tcMachineInfo->serversOnMachine.empty());
|
||||
LocalityEntry process = tcMachineInfo->localityEntry;
|
||||
forcedAttributes.push_back(process);
|
||||
TraceEvent("ChosenMachine").detail("MachineInfo", tcMachineInfo->machineID).detail("LeaseUsedMachinesSize", leastUsedMachines.size()).detail("ForcedAttributesSize", forcedAttributes.size());
|
||||
} else {
|
||||
// when leastUsedMachine is empty, we will never find a team later, so we can simply return.
|
||||
return addedMachineTeams;
|
||||
}
|
||||
|
||||
// Choose a team that balances the # of teams per server among the teams
|
||||
// that have the least-utilized server
|
||||
team.clear();
|
||||
ASSERT_WE_THINK(forcedAttributes.size() == 1);
|
||||
auto success = machineLocalityMap.selectReplicas(configuration.storagePolicy, forcedAttributes, team);
|
||||
TraceEvent("SelectReplicasDebug").detail("Success", success).detail("PolicyInfo", configuration.storagePolicy->info());
|
||||
// NOTE: selectReplicas() should always return success when storageTeamSize = 1
|
||||
ASSERT_WE_THINK(configuration.storageTeamSize > 1 || (configuration.storageTeamSize == 1 && success));
|
||||
if (!success) {
|
||||
break;
|
||||
traceAllInfo(true);
|
||||
TraceEvent("SelectReplicasDebug").detail("FoundTeamSize", team.size() + 1).detail("ExpectedTeamSize", configuration.storageTeamSize);
|
||||
continue; // Try up to maxAttempts, since next time we may choose a different forcedAttributes
|
||||
}
|
||||
ASSERT(forcedAttributes.size() > 0);
|
||||
team.push_back((UID*)machineLocalityMap.getObject(forcedAttributes[0]));
|
||||
|
@ -1395,12 +1419,15 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
|
||||
int score = 0;
|
||||
vector<Standalone<StringRef>> machineIDs;
|
||||
TraceEvent("MachineTeamDebug").detail("TeamSize", team.size());
|
||||
for (auto process = team.begin(); process != team.end(); process++) {
|
||||
Reference<TCServerInfo> server = server_info[**process];
|
||||
score += server->machine->machineTeams.size();
|
||||
Standalone<StringRef> machine_id = server->lastKnownInterface.locality.zoneId().get();
|
||||
machineIDs.push_back(machine_id);
|
||||
TraceEvent("MachineTeamDebug\n").detail("MachineTeamMember", machine_id);
|
||||
}
|
||||
|
||||
|
||||
// Only choose healthy machines into machine team
|
||||
ASSERT_WE_THINK(isMachineTeamHealthy(machineIDs));
|
||||
|
@ -2166,8 +2193,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
Standalone<StringRef> machine_id = locality.zoneId().get(); // locality to machine_id with std::string type
|
||||
|
||||
Reference<TCMachineInfo> machineInfo;
|
||||
if (machine_info.find(machine_id) ==
|
||||
machine_info.end()) { // uid is the first storage server process on the machine
|
||||
if (machine_info.find(machine_id) == machine_info.end()) {
|
||||
// uid is the first storage server process on the machine
|
||||
TEST(true);
|
||||
// For each machine, store the first server's localityEntry into machineInfo for later use.
|
||||
LocalityEntry localityEntry = machineLocalityMap.add(locality, &server->id);
|
||||
|
@ -3243,7 +3270,8 @@ ACTOR Future<Void> storageServerTracker(
|
|||
TraceEvent("StorageServerInterfaceChanged", self->distributorId).detail("ServerID", server->id)
|
||||
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
|
||||
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
|
||||
.detail("LocalityChanged", localityChanged);
|
||||
.detail("LocalityChanged", localityChanged)
|
||||
.detail("MachineLocalityChanged", machineLocalityChanged);
|
||||
|
||||
server->lastKnownInterface = newInterface.first;
|
||||
server->lastKnownClass = newInterface.second;
|
||||
|
@ -3267,6 +3295,7 @@ ACTOR Future<Void> storageServerTracker(
|
|||
int serverIndex = -1;
|
||||
for (int i = 0; i < machine->serversOnMachine.size(); ++i) {
|
||||
if (machine->serversOnMachine[i].getPtr() == server) {
|
||||
// NOTE: now the machine's locality is wrong. Need update it whenever uses it.
|
||||
serverIndex = i;
|
||||
machine->serversOnMachine[i] = machine->serversOnMachine.back();
|
||||
machine->serversOnMachine.pop_back();
|
||||
|
|
Loading…
Reference in New Issue