Merge branch 'cassandra-4.1' into cassandra-5.0

* cassandra-4.1:
  Add configurable batchlog endpoint strategies
This commit is contained in:
mck 2024-10-15 13:38:43 +02:00
commit dcff5a0fad
No known key found for this signature in database
GPG Key ID: E91335D77E3E87CB
12 changed files with 1207 additions and 69 deletions

View File

@ -7,6 +7,7 @@
Merged from 4.1:
* Fix race condition in DecayingEstimatedHistogramReservoir during rescale (CASSANDRA-19365)
Merged from 4.0:
* Add configurable batchlog endpoint strategies: random_remote, prefer_local, dynamic_remote, and dynamic (CASSANDRA-18120)
* Fix bash-completion for debian distro (CASSANDRA-19999)
* Ensure thread-safety for CommitLogArchiver in CommitLog (CASSANDRA-19960)
* Fix text containing "/*" being interpreted as multiline comment in cqlsh (CASSANDRA-17667)

View File

@ -156,6 +156,32 @@ auto_hints_cleanup_enabled: false
# Min unit: KiB
batchlog_replay_throttle: 1024KiB
# Strategy to choose the batchlog storage endpoints.
#
# Available options:
#
# - random_remote
# Default, purely random, prevents the local rack, if possible.
#
# - prefer_local
# Similar to random_remote. Random, except that one of the replications will go to the local rack,
# which mean it offers lower availability guarantee than random_remote or dynamic_remote.
#
# - dynamic_remote
# Using DynamicEndpointSnitch to select batchlog storage endpoints, prevents the
# local rack, if possible. This strategy offers the same availability guarantees
# as random_remote but selects the fastest endpoints according to the DynamicEndpointSnitch.
# (DynamicEndpointSnitch currently only tracks reads and not writes - i.e. write-only
# (or mostly-write) workloads might not benefit from this strategy.)
# Note: this strategy will fall back to random_remote, if dynamic_snitch is not enabled.
#
# - dynamic
# Mostly the same as dynamic_remote, except that local rack is not excluded, which mean it offers lower
# availability guarantee than random_remote or dynamic_remote.
# Note: this strategy will fall back to random_remote, if dynamic_snitch is not enabled.
#
# batchlog_endpoint_strategy: random_remote
# Authentication backend, implementing IAuthenticator; used to identify users
# Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllAuthenticator,
# PasswordAuthenticator}.

View File

@ -159,6 +159,32 @@ auto_hints_cleanup_enabled: false
# Min unit: KiB
batchlog_replay_throttle: 1024KiB
# Strategy to choose the batchlog storage endpoints.
#
# Available options:
#
# - random_remote
# Default, purely random, prevents the local rack, if possible.
#
# - prefer_local
# Similar to random_remote. Random, except that one of the replications will go to the local rack,
# which mean it offers lower availability guarantee than random_remote or dynamic_remote.
#
# - dynamic_remote
# Using DynamicEndpointSnitch to select batchlog storage endpoints, prevents the
# local rack, if possible. This strategy offers the same availability guarantees
# as random_remote but selects the fastest endpoints according to the DynamicEndpointSnitch.
# (DynamicEndpointSnitch currently only tracks reads and not writes - i.e. write-only
# (or mostly-write) workloads might not benefit from this strategy.)
# Note: this strategy will fall back to random_remote, if dynamic_snitch is not enabled.
#
# - dynamic
# Mostly the same as dynamic_remote, except that local rack is not excluded, which mean it offers lower
# availability guarantee than random_remote or dynamic_remote.
# Note: this strategy will fall back to random_remote, if dynamic_snitch is not enabled.
#
batchlog_endpoint_strategy: dynamic_remote
# Authentication backend, implementing IAuthenticator; used to identify users
# Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllAuthenticator,
# PasswordAuthenticator}.

View File

@ -416,6 +416,10 @@ public enum CassandraRelevantProperties
REPLACE_ADDRESS_FIRST_BOOT("cassandra.replace_address_first_boot"),
REPLACE_NODE("cassandra.replace_node"),
REPLACE_TOKEN("cassandra.replace_token"),
/**
* Number of replicas required to store batchlog for atomicity, only accepts values of 1 or 2.
*/
REQUIRED_BATCHLOG_REPLICA_COUNT("cassandra.batchlog.required_replica_count", "2"),
/**
* Whether we reset any found data from previously run bootstraps.
*/
@ -612,7 +616,7 @@ public enum CassandraRelevantProperties
prev = next;
}
}
CassandraRelevantProperties(String key, String defaultVal)
{
this.key = key;

View File

@ -433,6 +433,7 @@ public class Config
public DataStorageSpec.IntKibibytesBound hinted_handoff_throttle = new DataStorageSpec.IntKibibytesBound("1024KiB");
@Replaces(oldName = "batchlog_replay_throttle_in_kb", converter = Converters.KIBIBYTES_DATASTORAGE, deprecated = true)
public DataStorageSpec.IntKibibytesBound batchlog_replay_throttle = new DataStorageSpec.IntKibibytesBound("1024KiB");
public BatchlogEndpointStrategy batchlog_endpoint_strategy = BatchlogEndpointStrategy.random_remote;
public int max_hints_delivery_threads = 2;
@Replaces(oldName = "hints_flush_period_in_ms", converter = Converters.MILLIS_DURATION_INT, deprecated = true)
public DurationSpec.IntMillisecondsBound hints_flush_period = new DurationSpec.IntMillisecondsBound("10s");
@ -1249,6 +1250,68 @@ public class Config
exception
}
public enum BatchlogEndpointStrategy
{
/**
* Old, conventional strategy to select batchlog storage endpoints.
* Purely random, prevents the local rack, if possible.
*/
random_remote(false, false),
/**
* Random, except that one of the replications will go to the local rack.
* Which means this strategy offers lower availability guarantees than
* {@link #random_remote} or {@link #dynamic_remote}.
*/
prefer_local(false, true),
/**
* Strategy using {@link Config#dynamic_snitch} ({@link org.apache.cassandra.locator.DynamicEndpointSnitch})
* to select batchlog storage endpoints. Prevents the local rack, if possible.
*
* This strategy offers the same availability guarantees as {@link #random_remote} but selects the
* fastest endpoints according to the {@link org.apache.cassandra.locator.DynamicEndpointSnitch}.
*
* Hint: {@link org.apache.cassandra.locator.DynamicEndpointSnitch} tracks reads and not writes - i.e.
* write-only (or mostly-write) workloads might not benefit from this strategy.
*
* Note: this strategy will fall back to {@link #random_remote}, if {@link #dynamic_snitch} is not enabled.
*/
dynamic_remote(true, false),
/**
* Strategy using {@link Config#dynamic_snitch} ({@link org.apache.cassandra.locator.DynamicEndpointSnitch})
* to select batchlog storage endpoints. Does not prevent the local rack.
*
* Since the local rack is not excluded, this strategy offers lower availability guarantees than
* {@link #random_remote} or {@link #dynamic_remote}.
*
* Hint: {@link org.apache.cassandra.locator.DynamicEndpointSnitch} tracks reads and not writes - i.e.
* write-only (or mostly-write) workloads might not benefit from this strategy.
*
* Note: this strategy will fall back to {@link #random_remote}, if {@link #dynamic_snitch} is not enabled.
*/
dynamic(true, true);
/**
* If true, dynamic snitch response times will be used to select more responsive nodes to write the batchlog to.
* If false, nodes will be randomly selected.
*/
public final boolean useDynamicSnitchScores;
/**
* If true, one of the selected nodes will come from the local rack.
* If false, the local rack will not be used except as a last resort with no other racks available.
*/
public final boolean preferLocalRack;
BatchlogEndpointStrategy(boolean useDynamicSnitchScores, boolean preferLocalRack)
{
this.useDynamicSnitchScores = useDynamicSnitchScores;
this.preferLocalRack = preferLocalRack;
}
}
private static final Set<String> SENSITIVE_KEYS = new HashSet<String>() {{
add("client_encryption_options");
add("server_encryption_options");

View File

@ -3618,6 +3618,22 @@ public class DatabaseDescriptor
conf.batchlog_replay_throttle = new DataStorageSpec.IntKibibytesBound(throttleInKiB);
}
public static boolean isDynamicEndpointSnitch()
{
// not using config.dynamic_snitch because snitch can be changed via JMX
return snitch instanceof DynamicEndpointSnitch;
}
public static Config.BatchlogEndpointStrategy getBatchlogEndpointStrategy()
{
return conf.batchlog_endpoint_strategy;
}
public static void setBatchlogEndpointStrategy(Config.BatchlogEndpointStrategy batchlogEndpointStrategy)
{
conf.batchlog_endpoint_strategy = batchlogEndpointStrategy;
}
public static int getMaxHintsDeliveryThreads()
{
return conf.max_hints_delivery_threads;

View File

@ -29,8 +29,8 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Snapshot;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.ApplicationState;

View File

@ -18,16 +18,37 @@
package org.apache.cassandra.locator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
@ -44,23 +65,8 @@ import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
@ -79,6 +85,16 @@ public class ReplicaPlans
private static final Range<Token> FULL_TOKEN_RANGE = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), DatabaseDescriptor.getPartitioner().getMinimumToken());
private static final int REQUIRED_BATCHLOG_REPLICA_COUNT
= Math.max(1, Math.min(2, CassandraRelevantProperties.REQUIRED_BATCHLOG_REPLICA_COUNT.getInt()));
static
{
int batchlogReplicaCount = CassandraRelevantProperties.REQUIRED_BATCHLOG_REPLICA_COUNT.getInt();
if (batchlogReplicaCount < 1 || 2 < batchlogReplicaCount)
logger.warn("System property {} was set to {} but must be 1 or 2. Running with {}", CassandraRelevantProperties.REQUIRED_BATCHLOG_REPLICA_COUNT.getKey(), batchlogReplicaCount, REQUIRED_BATCHLOG_REPLICA_COUNT);
}
public static boolean isSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas)
{
switch (consistencyLevel)
@ -233,10 +249,22 @@ public class ReplicaPlans
// - replicas should be in the local datacenter
// - choose min(2, number of qualifying candiates above)
// - allow the local node to be the only replica only if it's a single-node DC
Collection<InetAddressAndPort> chosenEndpoints = filterBatchlogEndpoints(snitch.getLocalRack(), localEndpoints);
Collection<InetAddressAndPort> chosenEndpoints = filterBatchlogEndpoints(false, snitch.getLocalRack(), localEndpoints);
if (chosenEndpoints.isEmpty() && isAny)
chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
// Batchlog is hosted by either one node or two nodes from different racks.
ConsistencyLevel consistencyLevel = chosenEndpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
if (chosenEndpoints.isEmpty())
{
if (isAny)
chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
else
// UnavailableException instead of letting the batchlog write unnecessarily timeout
throw new UnavailableException("Cannot achieve consistency level " + consistencyLevel
+ " for batchlog in local DC, required:" + REQUIRED_BATCHLOG_REPLICA_COUNT
+ ", available:" + 0,
consistencyLevel, REQUIRED_BATCHLOG_REPLICA_COUNT, 0);
}
Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
@ -244,36 +272,34 @@ public class ReplicaPlans
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
EndpointsForToken.empty(token)
);
// Batchlog is hosted by either one node or two nodes from different racks.
ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
// assume that we have already been given live endpoints, and skip applying the failure detector
return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
}
private static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
Multimap<String, InetAddressAndPort> endpoints)
@VisibleForTesting
public static Collection<InetAddressAndPort> filterBatchlogEndpoints(boolean preferLocalRack, String localRack,
Multimap<String, InetAddressAndPort> endpoints)
{
return filterBatchlogEndpoints(localRack,
endpoints,
Collections::shuffle,
FailureDetector.isEndpointAlive,
ThreadLocalRandom.current()::nextInt);
return DatabaseDescriptor.getBatchlogEndpointStrategy().useDynamicSnitchScores && DatabaseDescriptor.isDynamicEndpointSnitch()
? filterBatchlogEndpointsDynamic(preferLocalRack,localRack, endpoints, FailureDetector.isEndpointAlive)
: filterBatchlogEndpointsRandom(preferLocalRack, localRack, endpoints,
Collections::shuffle,
FailureDetector.isEndpointAlive,
ThreadLocalRandom.current()::nextInt);
}
// Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
@VisibleForTesting
public static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
Multimap<String, InetAddressAndPort> endpoints,
Consumer<List<?>> shuffle,
Predicate<InetAddressAndPort> isAlive,
Function<Integer, Integer> indexPicker)
private static ListMultimap<String, InetAddressAndPort> validate(boolean preferLocalRack, String localRack,
Multimap<String, InetAddressAndPort> endpoints,
Predicate<InetAddressAndPort> isAlive)
{
int endpointCount = endpoints.values().size();
// special case for single-node data centers
if (endpoints.values().size() == 1)
return endpoints.values();
if (endpointCount <= REQUIRED_BATCHLOG_REPLICA_COUNT)
return ArrayListMultimap.create(endpoints);
// strip out dead endpoints and localhost
ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create();
int rackCount = endpoints.keySet().size();
ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create(rackCount, endpointCount / rackCount);
for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
{
InetAddressAndPort addr = entry.getValue();
@ -281,15 +307,45 @@ public class ReplicaPlans
validated.put(entry.getKey(), entry.getValue());
}
if (validated.size() <= 2)
return validated.values();
// return early if no more than 2 nodes:
if (validated.size() <= REQUIRED_BATCHLOG_REPLICA_COUNT)
return validated;
if (validated.size() - validated.get(localRack).size() >= 2)
// if the local rack is not preferred and there are enough nodes in other racks, remove it:
if (!(DatabaseDescriptor.getBatchlogEndpointStrategy().preferLocalRack || preferLocalRack)
&& validated.size() - validated.get(localRack).size() >= REQUIRED_BATCHLOG_REPLICA_COUNT)
{
// we have enough endpoints in other racks
validated.removeAll(localRack);
}
return validated;
}
// Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
// Replicas are picked manually:
// - replicas should be alive according to the failure detector
// - replicas should be in the local datacenter
// - choose min(2, number of qualifying candiates above)
// - allow the local node to be the only replica only if it's a single-node DC
@VisibleForTesting
public static Collection<InetAddressAndPort> filterBatchlogEndpointsRandom(boolean preferLocalRack, String localRack,
Multimap<String, InetAddressAndPort> endpoints,
Consumer<List<?>> shuffle,
Predicate<InetAddressAndPort> isAlive,
Function<Integer, Integer> indexPicker)
{
ListMultimap<String, InetAddressAndPort> validated = validate(preferLocalRack, localRack, endpoints, isAlive);
// return early if no more than 2 nodes:
if (validated.size() <= REQUIRED_BATCHLOG_REPLICA_COUNT)
return validated.values();
/*
* if we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack),
* pick two random nodes from there and return early;
* we are guaranteed to have at least two nodes in the single remaining rack because of the above if block.
*/
if (validated.keySet().size() == 1)
{
/*
@ -299,24 +355,32 @@ public class ReplicaPlans
*/
List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values());
shuffle.accept(otherRack);
return otherRack.subList(0, 2);
return otherRack.subList(0, REQUIRED_BATCHLOG_REPLICA_COUNT);
}
// randomize which racks we pick from if more than 2 remaining
Collection<String> racks;
if (validated.keySet().size() == 2)
if (validated.keySet().size() == REQUIRED_BATCHLOG_REPLICA_COUNT)
{
racks = validated.keySet();
}
else if (preferLocalRack || DatabaseDescriptor.getBatchlogEndpointStrategy().preferLocalRack)
{
List<String> nonLocalRacks = Lists.newArrayList(Sets.difference(validated.keySet(), ImmutableSet.of(localRack)));
racks = new LinkedHashSet<>();
racks.add(localRack);
racks.add(nonLocalRacks.get(indexPicker.apply(nonLocalRacks.size())));
}
else
{
racks = Lists.newArrayList(validated.keySet());
shuffle.accept((List<?>) racks);
}
// grab a random member of up to two racks
List<InetAddressAndPort> result = new ArrayList<>(2);
for (String rack : Iterables.limit(racks, 2))
// grab two random nodes from two different racks
List<InetAddressAndPort> result = new ArrayList<>(REQUIRED_BATCHLOG_REPLICA_COUNT);
for (String rack : Iterables.limit(racks, REQUIRED_BATCHLOG_REPLICA_COUNT))
{
List<InetAddressAndPort> rackMembers = validated.get(rack);
result.add(rackMembers.get(indexPicker.apply(rackMembers.size())));
@ -325,6 +389,56 @@ public class ReplicaPlans
return result;
}
@VisibleForTesting
public static Collection<InetAddressAndPort> filterBatchlogEndpointsDynamic(boolean preferLocalRack, String localRack,
Multimap<String, InetAddressAndPort> endpoints,
Predicate<InetAddressAndPort> isAlive)
{
ListMultimap<String, InetAddressAndPort> validated = validate(preferLocalRack, localRack, endpoints, isAlive);
// return early if no more than 2 nodes:
if (validated.size() <= REQUIRED_BATCHLOG_REPLICA_COUNT)
return validated.values();
// sort _all_ nodes to pick the best racks
List<InetAddressAndPort> sorted = sortByProximity(validated.values());
List<InetAddressAndPort> result = new ArrayList<>(REQUIRED_BATCHLOG_REPLICA_COUNT);
Set<String> racks = new HashSet<>();
while (result.size() < REQUIRED_BATCHLOG_REPLICA_COUNT)
{
for (InetAddressAndPort endpoint : sorted)
{
if (result.size() == REQUIRED_BATCHLOG_REPLICA_COUNT)
break;
if (racks.isEmpty())
racks.addAll(validated.keySet());
String rack = DatabaseDescriptor.getEndpointSnitch().getRack(endpoint);
if (!racks.remove(rack))
continue;
if (result.contains(endpoint))
continue;
result.add(endpoint);
}
}
return result;
}
@VisibleForTesting
public static List<InetAddressAndPort> sortByProximity(Collection<InetAddressAndPort> endpoints)
{
EndpointsForRange endpointsForRange = SystemReplicas.getSystemReplicas(endpoints);
return DatabaseDescriptor.getEndpointSnitch()
.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), endpointsForRange)
.endpointList();
}
public static ReplicaPlan.ForWrite forReadRepair(Token token, ReplicaPlan<?, ?> readPlan) throws UnavailableException
{
return forWrite(readPlan.keyspace(), readPlan.consistencyLevel(), token, writeReadRepair(readPlan));

View File

@ -6304,6 +6304,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
updateTopology();
}
public String getBatchlogEndpointStrategy()
{
return DatabaseDescriptor.getBatchlogEndpointStrategy().name();
}
public void setBatchlogEndpointStrategy(String batchlogEndpointStrategy)
{
DatabaseDescriptor.setBatchlogEndpointStrategy(Config.BatchlogEndpointStrategy.valueOf(batchlogEndpointStrategy));
}
/**
* Send data to the endpoints that will be responsible for it in the future
*

View File

@ -650,6 +650,13 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public int getDynamicUpdateInterval();
public String getBatchlogEndpointStrategy();
/**
* See {@link org.apache.cassandra.config.Config.BatchlogEndpointStrategy} for valid values.
*/
public void setBatchlogEndpointStrategy(String batchlogEndpointStrategy);
// allows a user to forcibly 'kill' a sick node
public void stopGossiping();

View File

@ -100,6 +100,9 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.ConfigBeanInfo",
"org.apache.cassandra.config.ConfigCustomizer",
"org.apache.cassandra.config.ConfigurationLoader",
"org.apache.cassandra.config.Config$CorruptedTombstoneStrategy",
"org.apache.cassandra.config.Config$BatchlogEndpointStrategy",
"org.apache.cassandra.config.DatabaseDescriptor$ByteUnit",
"org.apache.cassandra.config.DataRateSpec",
"org.apache.cassandra.config.DataRateSpec$DataRateUnit",
"org.apache.cassandra.config.DataRateSpec$DataRateUnit$1",