Merge branch 'apple:master' into redWoodMetricsEnhancements

This commit is contained in:
sfc-gh-fzhao 2021-07-13 11:36:03 -07:00 committed by GitHub
commit 3b40d49a10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 536 additions and 106 deletions

View File

@ -201,9 +201,9 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fdbclient/BuildFlags.h.in ${CMAKE_CUR
if (CMAKE_EXPORT_COMPILE_COMMANDS AND WITH_PYTHON)
add_custom_command(
OUTPUT ${CMAKE_CURRENT_SOURCE_DIR}/compile_commands.json
COMMAND $<TARGET_FILE:Python::Interpreter> contrib/gen_compile_db.py
COMMAND $<TARGET_FILE:Python::Interpreter> ${CMAKE_CURRENT_SOURCE_DIR}/contrib/gen_compile_db.py
ARGS -b ${CMAKE_CURRENT_BINARY_DIR} -s ${CMAKE_CURRENT_SOURCE_DIR} -o ${CMAKE_CURRENT_SOURCE_DIR}/compile_commands.json ${CMAKE_CURRENT_BINARY_DIR}/compile_commands.json
DEPENDS contrib/gen_compile_db.py ${CMAKE_CURRENT_BINARY_DIR}/compile_commands.json
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/contrib/gen_compile_db.py ${CMAKE_CURRENT_BINARY_DIR}/compile_commands.json
COMMENT "Build compile commands for IDE"
)
add_custom_target(processed_compile_commands ALL DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/compile_commands.json ${CMAKE_CURRENT_BINARY_DIR}/compile_commands.json)

View File

@ -382,7 +382,14 @@ if(NOT OPEN_FOR_IDE)
COMMAND ${Java_JAVA_EXECUTABLE}
-classpath "${target_jar}:${integration_jar_path}:${JUNIT_CLASSPATH}"
-Djava.library.path=${CMAKE_BINARY_DIR}/lib
org.junit.platform.console.ConsoleLauncher "--details=summary" "--class-path=${integration_jar_path}" "--scan-classpath" "--disable-banner"
org.junit.platform.console.ConsoleLauncher "--details=summary" "--class-path=${integration_jar_path}" "--scan-classpath" "--disable-banner" "-T MultiClient"
)
add_multi_fdbclient_test(NAME java-multi-integration
COMMAND ${Java_JAVA_EXECUTABLE}
-classpath "${target_jar}:${integration_jar_path}:${JUNIT_CLASSPATH}"
-Djava.library.path=${CMAKE_BINARY_DIR}/lib
org.junit.platform.console.ConsoleLauncher "--details=summary" "--class-path=${integration_jar_path}" "--scan-classpath" "--disable-banner" "-t MultiClient"
)
endif()

View File

@ -23,3 +23,18 @@ To run _only_ integration tests, run `ctest -R integration` from `${BUILD_DIR}/b
There are lots of other useful `ctest` commands, which we don't need to get into here. For more information,
see the [https://cmake.org/cmake/help/v3.19/manual/ctest.1.html](ctest documentation).
### Multi-Client tests
Multi-Client tests are integration tests that can only be executed when multiple clusters are running. To write a multi-client
test, do the following:
1. Tag all tests that require multiple clients with `@Tag("MultiClient")`
2. Ensure that your tests have the `MultiClientHelper` extension present, and Registered as an extension
3. Ensure that your test class is in the the JAVA_INTEGRATION_TESTS list in `test.cmake`
( see `BasicMultiClientIntegrationTest` for a good reference example)
It is important to note that it requires significant time to start and stop 3 separate clusters; if the underying test takes a long time to run,
ctest will time out and kill the test. When that happens, there is no guarantee that the FDB clusters will be properly stopped! It is thus
in your best interest to ensure that all tests run in a relatively small amount of time, or have a longer timeout attached.

View File

@ -0,0 +1,69 @@
/*
* BasicMultiClientIntegrationTest
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.Collection;
import java.util.Random;
import com.apple.foundationdb.tuple.Tuple;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/**
* Simple class to test multi-client logic.
*
* Note that all Multi-client-only tests _must_ be tagged with "MultiClient", which will ensure that they are excluded
* from non-multi-threaded tests.
*/
public class BasicMultiClientIntegrationTest {
@RegisterExtension public static final MultiClientHelper clientHelper = new MultiClientHelper();
@Test
@Tag("MultiClient")
void testMultiClientWritesAndReadsData() throws Exception {
FDB fdb = FDB.selectAPIVersion(630);
fdb.options().setKnob("min_trace_severity=5");
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
System.out.print("Starting tests.");
Random rand = new Random();
for (int counter = 0; counter < 25; ++counter) {
for (Database db : dbs) {
String key = Integer.toString(rand.nextInt(100000000));
String val = Integer.toString(rand.nextInt(100000000));
db.run(tr -> {
tr.set(Tuple.from(key).pack(), Tuple.from(val).pack());
return null;
});
String fetchedVal = db.run(tr -> {
byte[] result = tr.get(Tuple.from(key).pack()).join();
return Tuple.fromBytes(result).getString(0);
});
Assertions.assertEquals(val, fetchedVal, "Wrong result!");
}
Thread.sleep(200);
}
}
}

View File

@ -19,8 +19,6 @@
*/
package com.apple.foundationdb;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

View File

@ -0,0 +1,82 @@
/*
* MultiClientHelper.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
/**
* Callback to help define a multi-client scenario and ensure that
* the clients can be configured properly.
*/
public class MultiClientHelper implements BeforeAllCallback,AfterEachCallback{
private String[] clusterFiles;
private Collection<Database> openDatabases;
public static String[] readClusterFromEnv() {
/*
* Reads the cluster file lists from the ENV variable
* FDB_CLUSTERS.
*/
String clusterFilesProp = System.getenv("FDB_CLUSTERS");
if (clusterFilesProp == null) {
throw new IllegalStateException("Missing FDB cluster connection file names");
}
return clusterFilesProp.split(";");
}
Collection<Database> openDatabases(FDB fdb){
if(openDatabases!=null){
return openDatabases;
}
if(clusterFiles==null){
clusterFiles = readClusterFromEnv();
}
Collection<Database> dbs = new ArrayList<Database>();
for (String arg : clusterFiles) {
System.out.printf("Opening Cluster: %s\n", arg);
dbs.add(fdb.open(arg));
}
this.openDatabases = dbs;
return dbs;
}
@Override
public void beforeAll(ExtensionContext arg0) throws Exception {
clusterFiles = readClusterFromEnv();
}
@Override
public void afterEach(ExtensionContext arg0) throws Exception {
//close any databases that have been opened
if(openDatabases!=null){
for(Database db : openDatabases){
db.close();
}
}
openDatabases = null;
}
}

View File

@ -48,12 +48,14 @@ set(JUNIT_RESOURCES
set(JAVA_INTEGRATION_TESTS
src/integration/com/apple/foundationdb/DirectoryTest.java
src/integration/com/apple/foundationdb/RangeQueryIntegrationTest.java
src/integration/com/apple/foundationdb/BasicMultiClientIntegrationTest.java
)
# Resources that are used in integration testing, but are not explicitly test files (JUnit rules,
# utility classes, and so forth)
set(JAVA_INTEGRATION_RESOURCES
src/integration/com/apple/foundationdb/RequiresDatabase.java
src/integration/com/apple/foundationdb/MultiClientHelper.java
)

View File

@ -394,6 +394,7 @@ function(package_bindingtester)
add_dependencies(bindingtester copy_bindingtester_binaries)
endfunction()
# Creates a single cluster before running the specified command (usually a ctest test)
function(add_fdbclient_test)
set(options DISABLED ENABLED)
set(oneValueArgs NAME)
@ -420,6 +421,36 @@ function(add_fdbclient_test)
set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT 60)
endfunction()
# Creates 3 distinct clusters before running the specified command.
# This is useful for testing features that require multiple clusters (like the
# multi-cluster FDB client)
function(add_multi_fdbclient_test)
set(options DISABLED ENABLED)
set(oneValueArgs NAME)
set(multiValueArgs COMMAND)
cmake_parse_arguments(T "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
if(OPEN_FOR_IDE)
return()
endif()
if(NOT T_ENABLED AND T_DISABLED)
return()
endif()
if(NOT T_NAME)
message(FATAL_ERROR "NAME is a required argument for add_multi_fdbclient_test")
endif()
if(NOT T_COMMAND)
message(FATAL_ERROR "COMMAND is a required argument for add_multi_fdbclient_test")
endif()
message(STATUS "Adding Client test ${T_NAME}")
add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_multi_cluster.py
--build-dir ${CMAKE_BINARY_DIR}
--clusters 3
--
${T_COMMAND})
set_tests_properties("${T_NAME}" PROPERTIES TIMEOUT 60)
endfunction()
function(add_java_test)
set(options DISABLED ENABLED)
set(oneValueArgs NAME CLASS)

View File

@ -143,7 +143,7 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
}
if (key == "perpetual_storage_wiggle" && isInteger(value)) {
int ppWiggle = atoi(value.c_str());
int ppWiggle = std::stoi(value);
if (ppWiggle >= 2 || ppWiggle < 0) {
printf("Error: Only 0 and 1 are valid values of perpetual_storage_wiggle at present.\n");
return out;

View File

@ -1956,7 +1956,7 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
break;
}
case FDBNetworkOptions::ENABLE_RUN_LOOP_PROFILING: // Same as ENABLE_SLOW_TASK_PROFILING
validateOptionValuePresent(value);
validateOptionValueNotPresent(value);
networkOptions.runLoopProfilingEnabled = true;
break;
case FDBNetworkOptions::DISTRIBUTED_CLIENT_TRACER: {

View File

@ -255,6 +255,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_TEAMS_INFO_PRINT_YIELD_COUNT, 100 ); if( randomize && BUGGIFY ) DD_TEAMS_INFO_PRINT_YIELD_COUNT = deterministicRandom()->random01() * 1000 + 1;
init( DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY, 120 ); if( randomize && BUGGIFY ) DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY = 5;
init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 1 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 10;
init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 50 );
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -208,6 +208,7 @@ public:
int DD_TEAMS_INFO_PRINT_YIELD_COUNT;
int DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY;
int DD_STORAGE_WIGGLE_PAUSE_THRESHOLD; // How many unhealthy relocations are ongoing will pause storage wiggle
int DD_STORAGE_WIGGLE_STUCK_THRESHOLD; // How many times bestTeamStuck accumulate will pause storage wiggle
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -836,7 +836,7 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb
for (auto i : id_pid) {
if (!loadedConf || ini.GetSectionSize(id_command[i.first]->ssection.c_str()) == -1) {
/* Server on this port no longer configured; deconfigure it and kill it if required */
/* Process no longer configured; deconfigure it and kill it if required */
log_msg(SevInfo, "Deconfigured %s\n", id_command[i.first]->ssection.c_str());
id_command[i.first]->deconfigured = true;

View File

@ -1950,6 +1950,7 @@ public:
g_clogging.clogRecvFor(ip, seconds);
}
void clogPair(const IPAddress& from, const IPAddress& to, double seconds) override {
TraceEvent("CloggingPair").detail("From", from).detail("To", to).detail("Seconds", seconds);
g_clogging.clogPairFor(from, to, seconds);
}
std::vector<ProcessInfo*> getAllProcesses() const override {

View File

@ -146,7 +146,7 @@ set(FDBSERVER_SRCS
workloads/BackgroundSelectors.actor.cpp
workloads/BackupCorrectness.actor.cpp
workloads/BackupAndParallelRestoreCorrectness.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/ClogSingleConnection.actor.cpp
workloads/BackupToBlob.actor.cpp
workloads/BackupToDBAbort.actor.cpp
workloads/BackupToDBCorrectness.actor.cpp
@ -197,6 +197,7 @@ set(FDBSERVER_SRCS
workloads/MemoryKeyValueStore.h
workloads/MemoryLifetime.actor.cpp
workloads/MetricLogging.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/Performance.actor.cpp
workloads/Ping.actor.cpp
workloads/PopulateTPCC.actor.cpp

View File

@ -1578,6 +1578,20 @@ public:
return result;
}
// Given datacenter ID, returns the primary and remote regions.
std::pair<RegionInfo, RegionInfo> getPrimaryAndRemoteRegion(const std::vector<RegionInfo>& regions, Key dcId) {
RegionInfo region;
RegionInfo remoteRegion;
for (const auto& r : regions) {
if (r.dcId == dcId) {
region = r;
} else {
remoteRegion = r;
}
}
return std::make_pair(region, remoteRegion);
}
ErrorOr<RecruitFromConfigurationReply> findWorkersForConfigurationFromDC(RecruitFromConfigurationRequest const& req,
Optional<Key> dcId) {
RecruitFromConfigurationReply result;
@ -1590,15 +1604,7 @@ public:
primaryDC.insert(dcId);
result.dcId = dcId;
RegionInfo region;
RegionInfo remoteRegion;
for (auto& r : req.configuration.regions) {
if (r.dcId == dcId.get()) {
region = r;
} else {
remoteRegion = r;
}
}
auto [region, remoteRegion] = getPrimaryAndRemoteRegion(req.configuration.regions, dcId.get());
if (req.recruitSeedServers) {
auto primaryStorageServers =
@ -2043,67 +2049,82 @@ public:
RecruitFromConfigurationReply findWorkersForConfiguration(RecruitFromConfigurationRequest const& req) {
RecruitFromConfigurationReply rep = findWorkersForConfigurationDispatch(req);
if (g_network->isSimulated()) {
RecruitFromConfigurationReply compare = findWorkersForConfigurationDispatch(req);
// FIXME: The logic to pick a satellite in a remote region is not
// deterministic and can therefore break this nondeterminism check.
// Since satellites will generally be in the primary region,
// disable the determinism check for remote region satellites.
bool remoteDCUsedAsSatellite = false;
if (req.configuration.regions.size() > 1) {
auto [region, remoteRegion] = getPrimaryAndRemoteRegion(req.configuration.regions, req.configuration.regions[0].dcId);
for (const auto& satellite : region.satellites) {
if (satellite.dcId == remoteRegion.dcId) {
remoteDCUsedAsSatellite = true;
}
}
}
if (!remoteDCUsedAsSatellite) {
RecruitFromConfigurationReply compare = findWorkersForConfigurationDispatch(req);
std::map<Optional<Standalone<StringRef>>, int> firstUsed;
std::map<Optional<Standalone<StringRef>>, int> secondUsed;
updateKnownIds(&firstUsed);
updateKnownIds(&secondUsed);
std::map<Optional<Standalone<StringRef>>, int> firstUsed;
std::map<Optional<Standalone<StringRef>>, int> secondUsed;
updateKnownIds(&firstUsed);
updateKnownIds(&secondUsed);
// auto mworker = id_worker.find(masterProcessId);
//TraceEvent("CompareAddressesMaster")
// .detail("Master",
// mworker != id_worker.end() ? mworker->second.details.interf.address() : NetworkAddress());
// auto mworker = id_worker.find(masterProcessId);
//TraceEvent("CompareAddressesMaster")
// .detail("Master",
// mworker != id_worker.end() ? mworker->second.details.interf.address() : NetworkAddress());
updateIdUsed(rep.tLogs, firstUsed);
updateIdUsed(compare.tLogs, secondUsed);
compareWorkers(
req.configuration, rep.tLogs, firstUsed, compare.tLogs, secondUsed, ProcessClass::TLog, "TLog");
updateIdUsed(rep.satelliteTLogs, firstUsed);
updateIdUsed(compare.satelliteTLogs, secondUsed);
compareWorkers(req.configuration,
rep.satelliteTLogs,
firstUsed,
compare.satelliteTLogs,
secondUsed,
ProcessClass::TLog,
"Satellite");
updateIdUsed(rep.commitProxies, firstUsed);
updateIdUsed(compare.commitProxies, secondUsed);
updateIdUsed(rep.grvProxies, firstUsed);
updateIdUsed(compare.grvProxies, secondUsed);
updateIdUsed(rep.resolvers, firstUsed);
updateIdUsed(compare.resolvers, secondUsed);
compareWorkers(req.configuration,
rep.commitProxies,
firstUsed,
compare.commitProxies,
secondUsed,
ProcessClass::CommitProxy,
"CommitProxy");
compareWorkers(req.configuration,
rep.grvProxies,
firstUsed,
compare.grvProxies,
secondUsed,
ProcessClass::GrvProxy,
"GrvProxy");
compareWorkers(req.configuration,
rep.resolvers,
firstUsed,
compare.resolvers,
secondUsed,
ProcessClass::Resolver,
"Resolver");
updateIdUsed(rep.backupWorkers, firstUsed);
updateIdUsed(compare.backupWorkers, secondUsed);
compareWorkers(req.configuration,
rep.backupWorkers,
firstUsed,
compare.backupWorkers,
secondUsed,
ProcessClass::Backup,
"Backup");
updateIdUsed(rep.tLogs, firstUsed);
updateIdUsed(compare.tLogs, secondUsed);
compareWorkers(
req.configuration, rep.tLogs, firstUsed, compare.tLogs, secondUsed, ProcessClass::TLog, "TLog");
updateIdUsed(rep.satelliteTLogs, firstUsed);
updateIdUsed(compare.satelliteTLogs, secondUsed);
compareWorkers(req.configuration,
rep.satelliteTLogs,
firstUsed,
compare.satelliteTLogs,
secondUsed,
ProcessClass::TLog,
"Satellite");
updateIdUsed(rep.commitProxies, firstUsed);
updateIdUsed(compare.commitProxies, secondUsed);
updateIdUsed(rep.grvProxies, firstUsed);
updateIdUsed(compare.grvProxies, secondUsed);
updateIdUsed(rep.resolvers, firstUsed);
updateIdUsed(compare.resolvers, secondUsed);
compareWorkers(req.configuration,
rep.commitProxies,
firstUsed,
compare.commitProxies,
secondUsed,
ProcessClass::CommitProxy,
"CommitProxy");
compareWorkers(req.configuration,
rep.grvProxies,
firstUsed,
compare.grvProxies,
secondUsed,
ProcessClass::GrvProxy,
"GrvProxy");
compareWorkers(req.configuration,
rep.resolvers,
firstUsed,
compare.resolvers,
secondUsed,
ProcessClass::Resolver,
"Resolver");
updateIdUsed(rep.backupWorkers, firstUsed);
updateIdUsed(compare.backupWorkers, secondUsed);
compareWorkers(req.configuration,
rep.backupWorkers,
firstUsed,
compare.backupWorkers,
secondUsed,
ProcessClass::Backup,
"Backup");
}
}
return rep;
}

View File

@ -656,7 +656,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int optimalTeamCount;
AsyncVar<bool> zeroOptimalTeams;
bool bestTeamStuck = false;
int bestTeamKeepStuckCount = 0;
bool isTssRecruiting; // If tss recruiting is waiting on a pair, don't consider DD recruiting for the purposes of QuietDB
@ -1011,12 +1011,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Log BestTeamStuck reason when we have healthy teams but they do not have healthy free space
if (randomTeams.empty() && !self->zeroHealthyTeams->get()) {
self->bestTeamStuck = true;
self->bestTeamKeepStuckCount++;
if (g_network->isSimulated()) {
TraceEvent(SevWarn, "GetTeamReturnEmpty").detail("HealthyTeams", self->healthyTeamCount);
}
} else {
self->bestTeamStuck = false;
self->bestTeamKeepStuckCount = 0;
}
for (int i = 0; i < randomTeams.size(); i++) {
@ -2833,7 +2833,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
std::vector<Future<Void>> moveFutures;
if (this->pid2server_info.count(pid) != 0) {
for (auto& info : this->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
AddressExclusion addr(info->lastKnownInterface.address().ip, info->lastKnownInterface.address().port);
if (this->excludedServers.count(addr) &&
this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) {
continue; // don't overwrite the value set by actor trackExcludedServer
@ -3509,7 +3509,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
bool anyUndesired = false;
bool anyWrongConfiguration = false;
bool anyWigglingServer = false;
int serversLeft = 0;
int serversLeft = 0, serverUndesired = 0, serverWrongConf = 0, serverWiggling = 0;
for (const UID& uid : team->getServerIDs()) {
change.push_back(self->server_status.onChange(uid));
@ -3519,12 +3519,15 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
if (status.isUndesired) {
anyUndesired = true;
serverUndesired++;
}
if (status.isWrongConfiguration) {
anyWrongConfiguration = true;
serverWrongConf++;
}
if (status.isWiggling) {
anyWigglingServer = true;
serverWiggling++;
}
}
@ -3646,6 +3649,10 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_2_LEFT);
else
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY);
} else if (!badTeam && anyWigglingServer && serverWiggling == serverWrongConf &&
serverWiggling == serverUndesired) {
// the wrong configured and undesired server is the wiggling server
team->setPriority(SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE);
} else if (badTeam || anyWrongConfiguration) {
if (redundantTeam) {
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT);
@ -3654,8 +3661,6 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
} else if (anyUndesired) {
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER);
} else if (anyWigglingServer) {
team->setPriority(SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE);
} else {
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY);
}
@ -3972,7 +3977,7 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
wait(delayJittered(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY));
// there must not have other teams to place wiggled data
takeRest = teamCollection->server_info.size() <= teamCollection->configuration.storageTeamSize ||
teamCollection->machine_info.size() < teamCollection->configuration.storageTeamSize;
teamCollection->machine_info.size() < teamCollection->configuration.storageTeamSize;
}
wait(updateNextWigglingStoragePID(teamCollection));
}
@ -4020,10 +4025,12 @@ ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self,
// b. healthy teams are not enough
// c. the overall disk space is not enough
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount <= *extraTeamCount ||
self->bestTeamStuck) {
self->bestTeamKeepStuckCount > SERVER_KNOBS->DD_STORAGE_WIGGLE_STUCK_THRESHOLD) {
// if we pause wiggle not because the reason a, increase extraTeamCount. This helps avoid oscillation
// between pause and non-pause status.
if ((self->healthyTeamCount <= *extraTeamCount || self->bestTeamStuck) && !self->pauseWiggle->get()) {
if ((self->healthyTeamCount <= *extraTeamCount ||
self->bestTeamKeepStuckCount > SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD) &&
!self->pauseWiggle->get()) {
*extraTeamCount = std::min(*extraTeamCount + pausePenalty, (int)self->teams.size());
pausePenalty = std::min(pausePenalty * 2, (int)self->teams.size());
}
@ -4060,6 +4067,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
self->includeStorageServersForWiggle();
TraceEvent("PerpetualStorageWigglePause", self->distributorId)
.detail("ProcessId", pid)
.detail("BestTeamKeepStuckCount", self->bestTeamKeepStuckCount)
.detail("ExtraHealthyTeamCount", extraTeamCount)
.detail("HealthyTeamCount", self->healthyTeamCount)
.detail("StorageCount", movingCount);
@ -4566,6 +4574,10 @@ ACTOR Future<Void> storageServerTracker(
DDTeamCollection::Status worstStatus = self->excludedServers.get(worstAddr);
if (worstStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(worstAddr, self, server)) {
TraceEvent(SevInfo, "InvalidWiggleServer", self->distributorId)
.detail("Address", worstAddr.toString())
.detail("ProcessId", server->lastKnownInterface.locality.processId())
.detail("ValidWigglingId", self->wigglingPid.present());
self->excludedServers.set(worstAddr, DDTeamCollection::Status::NONE);
worstStatus = DDTeamCollection::Status::NONE;
}
@ -4586,6 +4598,10 @@ ACTOR Future<Void> storageServerTracker(
DDTeamCollection::Status testStatus = self->excludedServers.get(testAddr);
if (testStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(testAddr, self, server)) {
TraceEvent(SevInfo, "InvalidWiggleServer", self->distributorId)
.detail("Address", testAddr.toString())
.detail("ProcessId", server->lastKnownInterface.locality.processId())
.detail("ValidWigglingId", self->wigglingPid.present());
self->excludedServers.set(testAddr, DDTeamCollection::Status::NONE);
testStatus = DDTeamCollection::Status::NONE;
}

View File

@ -234,6 +234,15 @@ vector<std::string> getOption(VectorRef<KeyValueRef> options, Key key, vector<st
return defaultValue;
}
bool hasOption(VectorRef<KeyValueRef> options, Key key) {
for (const auto& option : options) {
if (option.key == key) {
return true;
}
}
return false;
}
// returns unconsumed options
Standalone<VectorRef<KeyValueRef>> checkAllOptionsConsumed(VectorRef<KeyValueRef> options) {
static StringRef nothing = LiteralStringRef("");
@ -1049,8 +1058,7 @@ std::map<std::string, std::function<void(const std::string&)>> testSpecGlobalKey
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedStorageEngineExcludeTypes", ""); } },
{ "maxTLogVersion",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedMaxTLogVersion", ""); } },
{ "disableTss",
[](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableTSS", ""); } }
{ "disableTss", [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableTSS", ""); } }
};
std::map<std::string, std::function<void(const std::string& value, TestSpec* spec)>> testSpecTestKeys = {
@ -1178,6 +1186,11 @@ std::map<std::string, std::function<void(const std::string& value, TestSpec* spe
if (value == "true")
spec->phases = TestWorkload::CHECK;
} },
{ "restorePerpetualWiggleSetting",
[](const std::string& value, TestSpec* spec) {
if (value == "false")
spec->restorePerpetualWiggleSetting = false;
} },
};
vector<TestSpec> readTests(ifstream& ifs) {

View File

@ -0,0 +1,71 @@
/*
* ClogSingleConnection.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class ClogSingleConnectionWorkload : public TestWorkload {
double delaySeconds;
Optional<double> clogDuration; // If empty, clog forever
public:
ClogSingleConnectionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
auto minDelay = getOption(options, "minDelay"_sr, 0.0);
auto maxDelay = getOption(options, "maxDelay"_sr, 10.0);
ASSERT_LE(minDelay, maxDelay);
delaySeconds = minDelay + deterministicRandom()->random01() * (maxDelay - minDelay);
if (hasOption(options, "clogDuration"_sr)) {
clogDuration = getOption(options, "clogDuration"_sr, "");
}
}
std::string description() const override {
return g_network->isSimulated() ? "ClogSingleConnection" : "NoClogging";
}
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
if (g_network->isSimulated() && clientId == 0) {
return map(delay(delaySeconds), [this](Void _) {
clogRandomPair();
return Void();
});
} else {
return Void();
}
}
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
void clogRandomPair() {
auto m1 = deterministicRandom()->randomChoice(g_simulator.getAllProcesses());
auto m2 = deterministicRandom()->randomChoice(g_simulator.getAllProcesses());
if (m1->address.ip != m2->address.ip) {
g_simulator.clogPair(m1->address.ip, m2->address.ip, clogDuration.orDefault(10000));
}
}
};
WorkloadFactory<ClogSingleConnectionWorkload> ClogSingleConnectionWorkloadFactory("ClogSingleConnection");

View File

@ -43,6 +43,7 @@ bool getOption(VectorRef<KeyValueRef> options, Key key, bool defaultValue);
vector<std::string> getOption(VectorRef<KeyValueRef> options,
Key key,
vector<std::string> defaultValue); // comma-separated strings
bool hasOption(VectorRef<KeyValueRef> options, Key key);
struct WorkloadContext {
Standalone<VectorRef<KeyValueRef>> options;
@ -53,9 +54,7 @@ struct WorkloadContext {
WorkloadContext();
WorkloadContext(const WorkloadContext&);
~WorkloadContext();
private:
void operator=(const WorkloadContext&);
WorkloadContext& operator=(const WorkloadContext&) = delete;
};
struct TestWorkload : NonCopyable, WorkloadContext {

View File

@ -134,6 +134,8 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/LocalRatekeeper.toml)
add_fdb_test(TEST_FILES fast/LongStackWriteDuringRead.toml)
add_fdb_test(TEST_FILES fast/LowLatency.toml)
# TODO: Fix failures and reenable this test:
add_fdb_test(TEST_FILES fast/LowLatencySingleClog.toml IGNORE)
add_fdb_test(TEST_FILES fast/MemoryLifetime.toml)
add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml)
add_fdb_test(TEST_FILES fast/ProtocolVersion.toml)

View File

@ -11,7 +11,7 @@ from random import choice
from pathlib import Path
class TempCluster:
def __init__(self, build_dir: str):
def __init__(self, build_dir: str,port: str = None):
self.build_dir = Path(build_dir).resolve()
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
@ -22,7 +22,8 @@ class TempCluster:
self.cluster = LocalCluster(tmp_dir,
self.build_dir.joinpath('bin', 'fdbserver'),
self.build_dir.joinpath('bin', 'fdbmonitor'),
self.build_dir.joinpath('bin', 'fdbcli'))
self.build_dir.joinpath('bin', 'fdbcli'),
port = port)
self.log = self.cluster.log
self.etc = self.cluster.etc
self.data = self.cluster.data
@ -37,6 +38,10 @@ class TempCluster:
self.cluster.__exit__(xc_type, exc_value, traceback)
shutil.rmtree(self.tmp_dir)
def close(self):
self.cluster.__exit__(None,None,None)
shutil.rmtree(self.tmp_dir)
if __name__ == '__main__':
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
#
# tmp_multi_cluster.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import subprocess
import sys
import shutil
from pathlib import Path
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from tmp_cluster import TempCluster
if __name__ == '__main__':
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,description="""
This script automatically configures N temporary local clusters on the machine and then
calls a command while these clusters are running. As soon as the command returns, all
configured clusters are killed and all generated data is deleted.
The purpose of this is to support testing a set of integration tests using multiple clusters
(i.e. using the Multi-threaded client).
""")
parser.add_argument('--build-dir','-b',metavar='BUILD_DIRECTORY',help='FDB build director',required=True)
parser.add_argument('--clusters','-c',metavar='NUM_CLUSTERS',type=int,help='The number of clusters to run',required=True)
parser.add_argument('cmd', metavar='COMMAND',nargs='+',help='The command to run')
args = parser.parse_args()
errcode = 1
#spawn all the clusters
base_dir = args.build_dir
num_clusters = args.clusters
build_dir=Path(base_dir)
bin_dir=build_dir.joinpath('bin')
clusters = []
for c in range(1,num_clusters+1):
# now start the cluster up
local_c = TempCluster(args.build_dir, port="{}501".format(c))
local_c.__enter__()
clusters.append(local_c)
# all clusters should be running now, so run the subcommand
# TODO (bfines): pass through the proper ENV commands so that the client can find everything
cluster_paths = ';'.join([str(cluster.etc.joinpath('fdb.cluster')) for cluster in clusters])
print(cluster_paths)
env = dict(**os.environ)
env['FDB_CLUSTERS'] = env.get('FDB_CLUSTERS',cluster_paths)
errcode = subprocess.run(args.cmd,stdout=sys.stdout,stderr=sys.stderr,env=env).returncode
# shutdown all the running clusters
for tc in clusters:
tc.close()
sys.exit(errcode)

View File

@ -0,0 +1,20 @@
[configuration]
buggify = false
minimumReplication = 2
[[test]]
testTitle = 'Clogged'
connectionFailuresDisableDuration = 100000
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 1000.0
testDuration = 30.0
expectedRate = 0
[[test.workload]]
testName = 'LowLatency'
testDuration = 30.0
[[test.workload]]
testName = 'ClogSingleConnection'