Merge branch 'release-6.2' of github.com:apple/foundationdb into feature-redwood

This commit is contained in:
Steve Atherton 2020-02-23 00:30:27 -08:00
commit 712aa27896
5 changed files with 157 additions and 28 deletions

View File

@ -817,9 +817,9 @@ public class DirectoryLayer implements Directory {
private static long unpackLittleEndian(byte[] bytes) {
assert bytes.length == 8;
int value = 0;
long value = 0;
for(int i = 0; i < 8; ++i) {
value += (bytes[i] << (i * 8));
value += (Byte.toUnsignedLong(bytes[i]) << (i * 8));
}
return value;
}

View File

@ -2544,8 +2544,8 @@ ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCo
} else {
Optional<Value> val = wait( tr.get( it->range().begin ) );
if( !val.present() || val.get() != m.setValue ) {
TraceEvent evt = TraceEvent(SevError, "CheckWritesFailed")
.detail("Class", "Set")
TraceEvent evt(SevError, "CheckWritesFailed");
evt.detail("Class", "Set")
.detail("Key", it->range().begin)
.detail("Expected", m.setValue);
if( !val.present() )

View File

@ -1127,8 +1127,10 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
}
// Move a random shard of sourceTeam's to destTeam if sourceTeam has much more data than destTeam
ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<IDataDistributionTeam> sourceTeam, Reference<IDataDistributionTeam> destTeam, bool primary ) {
ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<IDataDistributionTeam> sourceTeam,
Reference<IDataDistributionTeam> destTeam, bool primary, TraceEvent *traceEvent ) {
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
traceEvent->detail("CancelingDueToSimulationSpeedup", true);
return false;
}
@ -1138,6 +1140,9 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
state int64_t averageShardBytes = wait(req.getFuture());
state std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
traceEvent->detail("AverageShardBytes", averageShardBytes)
.detail("ShardsInSource", shards.size());
if( !shards.size() )
return false;
@ -1159,28 +1164,28 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
int64_t sourceBytes = sourceTeam->getLoadBytes(false);
int64_t destBytes = destTeam->getLoadBytes();
if( sourceBytes - destBytes <= 3 * std::max<int64_t>( SERVER_KNOBS->MIN_SHARD_BYTES, metrics.bytes ) || metrics.bytes == 0 )
bool sourceAndDestTooSimilar = sourceBytes - destBytes <= 3 * std::max<int64_t>(SERVER_KNOBS->MIN_SHARD_BYTES, metrics.bytes);
traceEvent->detail("SourceBytes", sourceBytes)
.detail("DestBytes", destBytes)
.detail("ShardBytes", metrics.bytes)
.detail("SourceAndDestTooSimilar", sourceAndDestTooSimilar);
if( sourceAndDestTooSimilar || metrics.bytes == 0 ) {
return false;
}
{
//verify the shard is still in sabtf
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
for( int i = 0; i < shards.size(); i++ ) {
if( moveShard == shards[i] ) {
TraceEvent(priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->distributorId)
.detail("SourceBytes", sourceBytes)
.detail("DestBytes", destBytes)
.detail("ShardBytes", metrics.bytes)
.detail("AverageShardBytes", averageShardBytes)
.detail("SourceTeam", sourceTeam->getDesc())
.detail("DestTeam", destTeam->getDesc());
self->output.send( RelocateShard( moveShard, priority ) );
return true;
}
//verify the shard is still in sabtf
shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
for( int i = 0; i < shards.size(); i++ ) {
if( moveShard == shards[i] ) {
traceEvent->detail("ShardStillPresent", true);
self->output.send( RelocateShard( moveShard, priority ) );
return true;
}
}
traceEvent->detail("ShardStillPresent", false);
return false;
}
@ -1191,6 +1196,15 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state bool moved = false;
state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId);
traceEvent.suppressFor(5.0)
.detail("PollingInterval", rebalancePollingInterval);
if(*self->lastLimited > 0) {
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
}
try {
state Future<Void> delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
@ -1203,6 +1217,9 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
}
skipCurrentLoop = val.present();
}
traceEvent.detail("Enabled", !skipCurrentLoop);
wait(delayF);
if (skipCurrentLoop) {
// set loop interval to avoid busy wait here.
@ -1210,18 +1227,31 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
continue;
}
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true, false))));
traceEvent.detail("DestTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (randomTeam.present()) {
state Optional<Reference<IDataDistributionTeam>> loadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(true, true, false, true))));
traceEvent.detail("SourceTeam", printable(loadedTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (loadedTeam.present()) {
bool moved =
bool _moved =
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(),
randomTeam.get(), teamCollectionIndex == 0));
randomTeam.get(), teamCollectionIndex == 0, &traceEvent));
moved = _moved;
if (moved) {
resetCount = 0;
} else {
@ -1244,10 +1274,16 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
}
traceEvent.detail("ResetCount", resetCount);
tr.reset();
} catch (Error& e) {
traceEvent.error(e, true); // Log actor_cancelled because it's not legal to suppress an event that's initialized
wait(tr.onError(e));
}
traceEvent.detail("Moved", moved);
traceEvent.log();
}
}
@ -1258,6 +1294,15 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state bool moved = false;
state TraceEvent traceEvent("BgDDValleyFiller", self->distributorId);
traceEvent.suppressFor(5.0)
.detail("PollingInterval", rebalancePollingInterval);
if(*self->lastLimited > 0) {
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
}
try {
state Future<Void> delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
@ -1270,6 +1315,9 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
}
skipCurrentLoop = val.present();
}
traceEvent.detail("Enabled", !skipCurrentLoop);
wait(delayF);
if (skipCurrentLoop) {
// set loop interval to avoid busy wait here.
@ -1277,17 +1325,30 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
continue;
}
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false, true))));
traceEvent.detail("SourceTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (randomTeam.present()) {
state Optional<Reference<IDataDistributionTeam>> unloadedTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, true, true, false))));
traceEvent.detail("DestTeam", printable(unloadedTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (unloadedTeam.present()) {
bool moved =
bool _moved =
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(),
unloadedTeam.get(), teamCollectionIndex == 0));
unloadedTeam.get(), teamCollectionIndex == 0, &traceEvent));
moved = _moved;
if (moved) {
resetCount = 0;
} else {
@ -1310,10 +1371,16 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
}
traceEvent.detail("ResetCount", resetCount);
tr.reset();
} catch (Error& e) {
traceEvent.error(e, true); // Log actor_cancelled because it's not legal to suppress an event that's initialized
wait(tr.onError(e));
}
traceEvent.detail("Moved", moved);
traceEvent.log();
}
}

View File

@ -653,6 +653,50 @@ void removeTraceRole(std::string role) {
g_traceLog.removeRole(role);
}
TraceEvent::TraceEvent() : initialized(true), enabled(false), logged(true) {}
TraceEvent::TraceEvent(TraceEvent &&ev) {
enabled = ev.enabled;
err = ev.err;
fields = std::move(ev.fields);
id = ev.id;
initialized = ev.initialized;
logged = ev.logged;
maxEventLength = ev.maxEventLength;
maxFieldLength = ev.maxFieldLength;
severity = ev.severity;
tmpEventMetric = ev.tmpEventMetric;
trackingKey = ev.trackingKey;
type = ev.type;
ev.initialized = true;
ev.enabled = false;
ev.logged = true;
ev.tmpEventMetric = nullptr;
}
TraceEvent& TraceEvent::operator=(TraceEvent &&ev) {
enabled = ev.enabled;
err = ev.err;
fields = std::move(ev.fields);
id = ev.id;
initialized = ev.initialized;
logged = ev.logged;
maxEventLength = ev.maxEventLength;
maxFieldLength = ev.maxFieldLength;
severity = ev.severity;
tmpEventMetric = ev.tmpEventMetric;
trackingKey = ev.trackingKey;
type = ev.type;
ev.initialized = true;
ev.enabled = false;
ev.logged = true;
ev.tmpEventMetric = nullptr;
return *this;
}
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) {
g_trace_depth++;
setMaxFieldLength(0);
@ -729,7 +773,9 @@ bool TraceEvent::init() {
}
detail("Severity", int(severity));
detailf("Time", "%.6f", getCurrentTime());
detail("Time", "0.000000");
timeIndex = fields.size() - 1;
detail("Type", type);
if(g_network && g_network->isSimulated()) {
NetworkAddress local = g_network->getLocalAddress();
@ -937,6 +983,8 @@ void TraceEvent::log() {
init();
try {
if (enabled) {
fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime());
if (this->severity == SevError) {
severity = SevInfo;
backtrace();
@ -1150,6 +1198,10 @@ std::string TraceEventFields::getValue(std::string key) const {
}
}
TraceEventFields::Field& TraceEventFields::mutate(int index) {
return fields.at(index);
}
namespace {
void parseNumericValue(std::string const& s, double &outValue, bool permissive = false) {
double d = 0;
@ -1275,6 +1327,9 @@ void TraceEventFields::validateFormat() const {
}
std::string traceableStringToString(const char* value, size_t S) {
ASSERT_WE_THINK(S > 0 && value[S - 1] == '\0');
if(g_network) {
ASSERT_WE_THINK(S > 0 && value[S - 1] == '\0');
}
return std::string(value, S - 1); // Exclude trailing \0 byte
}

View File

@ -80,6 +80,8 @@ public:
int64_t getInt64(std::string key, bool permissive=false) const;
double getDouble(std::string key, bool permissive=false) const;
Field &mutate(int index);
std::string toString() const;
void validateFormat() const;
template<class Archiver>
@ -373,11 +375,15 @@ struct SpecialTraceMetricType
TRACE_METRIC_TYPE(double, double);
struct TraceEvent {
TraceEvent();
TraceEvent( const char* type, UID id = UID() ); // Assumes SevInfo severity
TraceEvent( Severity, const char* type, UID id = UID() );
TraceEvent( struct TraceInterval&, UID id = UID() );
TraceEvent( Severity severity, struct TraceInterval& interval, UID id = UID() );
TraceEvent( TraceEvent &&ev );
TraceEvent& operator=( TraceEvent &&ev );
static void setNetworkThread();
static bool isNetworkThread();
@ -489,6 +495,7 @@ private:
int maxFieldLength;
int maxEventLength;
int timeIndex;
void setSizeLimits();