format code; fix DD ignore option bug"

This commit is contained in:
Xiaoxi Wang 2022-03-14 10:49:19 -07:00
parent d38b827906
commit 10eb082888
2 changed files with 26 additions and 24 deletions

View File

@ -1449,10 +1449,11 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
state Transaction tr(self->cx); state Transaction tr(self->cx);
state double lastRead = 0; state double lastRead = 0;
state bool skipCurrentLoop = false; state bool skipCurrentLoop = false;
state bool disableReadBalance = false;
state bool disableDiskBalance = false;
loop { loop {
state bool moved = false; state bool moved = false;
state bool disableReadBalance = false;
state bool disableDiskBalance = false;
state Reference<IDataDistributionTeam> sourceTeam; state Reference<IDataDistributionTeam> sourceTeam;
state Reference<IDataDistributionTeam> destTeam; state Reference<IDataDistributionTeam> destTeam;
state GetTeamRequest srcReq; state GetTeamRequest srcReq;
@ -1471,19 +1472,19 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey)); Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
lastRead = now(); lastRead = now();
if (skipCurrentLoop && !val.present()) { if (!val.present()) {
// reset loop interval // reset loop interval
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; if (skipCurrentLoop) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
skipCurrentLoop = false; skipCurrentLoop = false;
} else if (val.present()) { disableReadBalance = false;
disableDiskBalance = false;
} else {
if (val.get().size() > 0) { if (val.get().size() > 0) {
int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned()); int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned());
if (ddIgnore & DDIgnore::REBALANCE_DISK) { disableDiskBalance = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
disableDiskBalance = true; disableReadBalance = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
}
if (ddIgnore & DDIgnore::REBALANCE_READ) {
disableReadBalance = true;
}
skipCurrentLoop = disableReadBalance && disableDiskBalance; skipCurrentLoop = disableReadBalance && disableDiskBalance;
} else { } else {
skipCurrentLoop = true; skipCurrentLoop = true;
@ -1564,11 +1565,11 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
state Transaction tr(self->cx); state Transaction tr(self->cx);
state double lastRead = 0; state double lastRead = 0;
state bool skipCurrentLoop = false; state bool skipCurrentLoop = false;
state bool disableReadBalance = false;
state bool disableDiskBalance = false;
loop { loop {
state bool moved = false; state bool moved = false;
state bool disableReadBalance = false;
state bool disableDiskBalance = false;
state Reference<IDataDistributionTeam> sourceTeam; state Reference<IDataDistributionTeam> sourceTeam;
state Reference<IDataDistributionTeam> destTeam; state Reference<IDataDistributionTeam> destTeam;
state GetTeamRequest srcReq; state GetTeamRequest srcReq;
@ -1587,19 +1588,19 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey)); Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
lastRead = now(); lastRead = now();
if (skipCurrentLoop && !val.present()) { if (!val.present()) {
// reset loop interval // reset loop interval
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; if (skipCurrentLoop) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
skipCurrentLoop = false;
disableReadBalance = false;
disableDiskBalance = false;
} else if (val.present()) { } else if (val.present()) {
// FIXME: better way for upgrade? for example, using a new key to record mode if (val.get().size() > 0) {
if (val.get().size() > sizeof(int)) {
int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned()); int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned());
if (ddIgnore & DDIgnore::REBALANCE_DISK) { disableDiskBalance = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
disableDiskBalance = true; disableReadBalance = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
}
if (ddIgnore & DDIgnore::REBALANCE_READ) {
disableReadBalance = true;
}
skipCurrentLoop = disableReadBalance && disableDiskBalance; skipCurrentLoop = disableReadBalance && disableDiskBalance;
} else { } else {
skipCurrentLoop = true; skipCurrentLoop = true;

View File

@ -294,7 +294,8 @@ Future<Void> bulkSetup(Database cx,
// Here we wait for data in flight to go to 0 (this will not work on a database with other users) // Here we wait for data in flight to go to 0 (this will not work on a database with other users)
if (postSetupWarming != 0) { if (postSetupWarming != 0) {
try { try {
wait(delay(5.0) >> waitForLowInFlight(cx, workload)); // Wait for the data distribution in a small test to start wait(delay(5.0) >>
waitForLowInFlight(cx, workload)); // Wait for the data distribution in a small test to start
} catch (Error& e) { } catch (Error& e) {
if (e.code() == error_code_actor_cancelled) if (e.code() == error_code_actor_cancelled)
throw; throw;