New counters implementation

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6504
This commit is contained in:
Aleksey Yeschenko 2014-01-24 02:52:43 -08:00
parent 1218bcacba
commit 714c423360
59 changed files with 1343 additions and 642 deletions

View File

@ -25,6 +25,7 @@
* CF id is changed to be non-deterministic. Data dir/key cache are created
uniquely for CF id (CASSANDRA-5202)
* Cassandra won't start by default without jna (CASSANDRA-6575)
* New counters implementation (CASSANDRA-6504)
2.0.5

View File

@ -38,6 +38,11 @@ Upgrading
cold_reads_to_omit compaction option; 0.0 omits nothing (the old
behavior) and 1.0 omits everything.
- Multithreaded compaction has been removed.
- Counters implementation has been changed, replaced by a safer one with
less caveats, but different performance characteristics. You might have
to change your data model to accomodate the new implementation.
(See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the dev
blog post at http://www.datastax.com/dev/blog/<PLACEHOLDER> for details).
2.0.5
=====

View File

@ -171,6 +171,32 @@ row_cache_save_period: 0
# Disabled by default, meaning all keys are going to be saved
# row_cache_keys_to_save: 100
# Maximum size of the counter cache in memory.
#
# Counter cache helps to reduce counter locks' contention for hot counter cells.
# In case of RF = 1 a counter cache hit will cause Cassandra to skip the read before
# write entirely. With RF > 1 a counter cache hit will still help to reduce the duration
# of the lock hold, helping with hot counter cell updates, but will not allow skipping
# the read entirely. Only the local (clock, count) tuple of a counter cell is kept
# in memory, not the whole counter, so it's relatively cheap.
#
# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
#
# Default value is empty to make it "auto" (min(2.5% of Heap (in MB), 50MB)). Set to 0 to disable counter cache.
# NOTE: if you perform counter deletes and rely on low gcgs, you should disable the counter cache.
counter_cache_size_in_mb:
# Duration in seconds after which Cassandra should
# save the counter cache (keys only). Caches are saved to saved_caches_directory as
# specified in this configuration file.
#
# Default is 7200 or 2 hours.
counter_cache_save_period: 7200
# Number of keys from the counter cache to save
# Disabled by default, meaning all keys are going to be saved
# counter_cache_keys_to_save: 100
# The off-heap memory allocator. Affects storage engine metadata as
# well as caches. Experiments show that JEMAlloc saves some memory
# than the native GCC allocator (i.e., JEMalloc is more
@ -234,13 +260,16 @@ seed_provider:
# bottleneck will be reads that need to fetch data from
# disk. "concurrent_reads" should be set to (16 * number_of_drives) in
# order to allow the operations to enqueue low enough in the stack
# that the OS and drives can reorder them.
# that the OS and drives can reorder them. Same applies to
# "concurrent_counter_writes", since counter writes read the current
# values before incrementing and writing them back.
#
# On the other hand, since writes are almost never IO bound, the ideal
# number of "concurrent_writes" is dependent on the number of cores in
# your system; (8 * number_of_cores) is a good rule of thumb.
concurrent_reads: 32
concurrent_writes: 32
concurrent_counter_writes: 32
# Total memory to use for sstable-reading buffers. Defaults to
# the smaller of 1/4 of heap or 512MB.
@ -491,6 +520,8 @@ read_request_timeout_in_ms: 5000
range_request_timeout_in_ms: 10000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 2000
# How long the coordinator should wait for counter writes to complete
counter_write_request_timeout_in_ms: 5000
# How long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000

View File

@ -308,7 +308,6 @@ Table creation supports the following other @<property>@:
|@bloom_filter_fp_chance@ | _simple_ | 0.00075 | The target probability of false positive of the sstable bloom filters. Said bloom filters will be sized to provide the provided probability (thus lowering this value impact the size of bloom filters in-memory and on-disk)|
|@compaction@ | _map_ | _see below_ | The compaction options to use, see below.|
|@compression@ | _map_ | _see below_ | Compression options, see below. |
|@replicate_on_write@ | _simple_ | true | Whether to replicate data on write. This can only be set to false for tables with counters values. Disabling this is dangerous and can result in random lose of counters, don't disable unless you are sure to know what you are doing|
|@caching@ | _simple_ | keys_only | Whether to cache keys ("key cache") and/or rows ("row cache") for this table. Valid values are: @all@, @keys_only@, @rows_only@ and @none@. |

View File

@ -459,7 +459,6 @@ struct CfDef {
16: optional i32 id,
17: optional i32 min_compaction_threshold,
18: optional i32 max_compaction_threshold,
24: optional bool replicate_on_write,
26: optional string key_validation_class,
28: optional binary key_alias,
29: optional string compaction_strategy,
@ -492,6 +491,8 @@ struct CfDef {
/** @deprecated */
23: optional double memtable_operations_in_millions,
/** @deprecated */
24: optional bool replicate_on_write,
/** @deprecated */
25: optional double merge_shards_chance,
/** @deprecated */
27: optional string row_cache_provider,

View File

@ -68,7 +68,6 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
('gc_grace_seconds', None),
('index_interval', None),
('read_repair_chance', None),
('replicate_on_write', None),
('populate_io_cache_on_flush', None),
('default_time_to_live', None),
('speculative_retry', None),

View File

@ -661,7 +661,6 @@ class TestCqlshOutput(BaseTestCase):
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='NONE' AND

View File

@ -127,6 +127,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
for (Future<Pair<K, V>> future : futures)
{
Pair<K, V> entry = future.get();
if (entry != null)
put(entry.left, entry.right);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cache;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.UUID;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.utils.*;
public class CounterCacheKey implements CacheKey
{
public final UUID cfId;
public final byte[] partitionKey;
public final byte[] cellName;
private CounterCacheKey(UUID cfId, ByteBuffer partitionKey, CellName cellName)
{
this.cfId = cfId;
this.partitionKey = ByteBufferUtil.getArray(partitionKey);
this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer());
}
public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, CellName cellName)
{
return new CounterCacheKey(cfId, partitionKey, cellName);
}
public PathInfo getPathInfo()
{
Pair<String, String> cf = Schema.instance.getCF(cfId);
return new PathInfo(cf.left, cf.right, cfId);
}
public long memorySize()
{
return ObjectSizes.getFieldSize(3 * ObjectSizes.getReferenceSize())
+ ObjectSizes.getArraySize(partitionKey)
+ ObjectSizes.getArraySize(cellName);
}
@Override
public String toString()
{
return String.format("CounterCacheKey(%s, %s, %s)",
cfId,
ByteBufferUtil.bytesToHex(ByteBuffer.wrap(partitionKey)),
ByteBufferUtil.bytesToHex(ByteBuffer.wrap(cellName)));
}
@Override
public int hashCode()
{
return Arrays.deepHashCode(new Object[]{cfId, partitionKey, cellName});
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (!(o instanceof CounterCacheKey))
return false;
CounterCacheKey cck = (CounterCacheKey) o;
return cfId.equals(cck.cfId)
&& Arrays.equals(partitionKey, cck.partitionKey)
&& Arrays.equals(cellName, cck.cellName);
}
}

View File

@ -21,6 +21,7 @@ public enum Stage
{
READ,
MUTATION,
COUNTER_MUTATION,
GOSSIP,
REQUEST_RESPONSE,
ANTI_ENTROPY,
@ -28,8 +29,7 @@ public enum Stage
MISC,
TRACING,
INTERNAL_RESPONSE,
READ_REPAIR,
REPLICATE_ON_WRITE;
READ_REPAIR;
public String getJmxType()
{
@ -43,9 +43,9 @@ public enum Stage
case INTERNAL_RESPONSE:
return "internal";
case MUTATION:
case COUNTER_MUTATION:
case READ:
case REQUEST_RESPONSE:
case REPLICATE_ON_WRITE:
case READ_REPAIR:
return "request";
default:

View File

@ -43,15 +43,13 @@ public class StageManager
public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * FBUtilities.getAvailableProcessors();
static
{
stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.COUNTER_MUTATION, multiThreadedConfigurableStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
// the rest are all single-threaded
stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
@ -99,16 +97,6 @@ public class StageManager
stage.getJmxType());
}
private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
{
return new JMXConfigurableThreadPoolExecutor(numThreads,
KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(maxTasksBeforeBlock),
new NamedThreadFactory(stage.getJmxName()),
stage.getJmxType());
}
/**
* Retrieve a stage from the StageManager
* @param stage name of the stage to be retrieved.

View File

@ -76,7 +76,6 @@ public final class CFMetaData
public final static double DEFAULT_READ_REPAIR_CHANCE = 0.1;
public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.0;
public final static boolean DEFAULT_REPLICATE_ON_WRITE = true;
public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
@ -128,7 +127,6 @@ public final class CFMetaData
+ "comment text,"
+ "read_repair_chance double,"
+ "local_read_repair_chance double,"
+ "replicate_on_write boolean,"
+ "gc_grace_seconds int,"
+ "default_validator text,"
+ "key_validator text,"
@ -395,7 +393,6 @@ public final class CFMetaData
private volatile String comment = "";
private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
private volatile boolean replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE;
private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
private volatile AbstractType<?> defaultValidator = BytesType.instance;
private volatile AbstractType<?> keyValidator = BytesType.instance;
@ -437,7 +434,6 @@ public final class CFMetaData
public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;}
public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; return this;}
public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
@ -624,7 +620,6 @@ public final class CFMetaData
.comment(oldCFMD.comment)
.readRepairChance(oldCFMD.readRepairChance)
.dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
.replicateOnWrite(oldCFMD.replicateOnWrite)
.gcGraceSeconds(oldCFMD.gcGraceSeconds)
.defaultValidator(oldCFMD.defaultValidator)
.keyValidator(oldCFMD.keyValidator)
@ -691,11 +686,6 @@ public final class CFMetaData
return ReadRepairDecision.NONE;
}
public boolean getReplicateOnWrite()
{
return replicateOnWrite;
}
public boolean populateIoCacheOnFlush()
{
return populateIoCacheOnFlush;
@ -869,7 +859,6 @@ public final class CFMetaData
.append(comment, rhs.comment)
.append(readRepairChance, rhs.readRepairChance)
.append(dcLocalReadRepairChance, rhs.dcLocalReadRepairChance)
.append(replicateOnWrite, rhs.replicateOnWrite)
.append(gcGraceSeconds, rhs.gcGraceSeconds)
.append(defaultValidator, rhs.defaultValidator)
.append(keyValidator, rhs.keyValidator)
@ -902,7 +891,6 @@ public final class CFMetaData
.append(comment)
.append(readRepairChance)
.append(dcLocalReadRepairChance)
.append(replicateOnWrite)
.append(gcGraceSeconds)
.append(defaultValidator)
.append(keyValidator)
@ -956,8 +944,6 @@ public final class CFMetaData
{
if (!cf_def.isSetComment())
cf_def.setComment("");
if (!cf_def.isSetReplicate_on_write())
cf_def.setReplicate_on_write(CFMetaData.DEFAULT_REPLICATE_ON_WRITE);
if (!cf_def.isSetPopulate_io_cache_on_flush())
cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
if (!cf_def.isSetMin_compaction_threshold())
@ -1047,7 +1033,6 @@ public final class CFMetaData
return newCFMD.addAllColumnDefinitions(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata))
.comment(cf_def.comment)
.replicateOnWrite(cf_def.replicate_on_write)
.defaultValidator(TypeParser.parse(cf_def.default_validation_class))
.compressionParameters(cp)
.rebuild();
@ -1125,7 +1110,6 @@ public final class CFMetaData
comment = enforceCommentNotNull(cfm.comment);
readRepairChance = cfm.readRepairChance;
dcLocalReadRepairChance = cfm.dcLocalReadRepairChance;
replicateOnWrite = cfm.replicateOnWrite;
gcGraceSeconds = cfm.gcGraceSeconds;
defaultValidator = cfm.defaultValidator;
keyValidator = cfm.keyValidator;
@ -1265,7 +1249,6 @@ public final class CFMetaData
def.setComment(enforceCommentNotNull(comment));
def.setRead_repair_chance(readRepairChance);
def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
def.setReplicate_on_write(replicateOnWrite);
def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
def.setGc_grace_seconds(gcGraceSeconds);
def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
@ -1628,7 +1611,6 @@ public final class CFMetaData
adder.add("comment", comment);
adder.add("read_repair_chance", readRepairChance);
adder.add("local_read_repair_chance", dcLocalReadRepairChance);
adder.add("replicate_on_write", replicateOnWrite);
adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
adder.add("gc_grace_seconds", gcGraceSeconds);
adder.add("default_validator", defaultValidator.toString());
@ -1692,7 +1674,6 @@ public final class CFMetaData
cfm.readRepairChance(result.getDouble("read_repair_chance"));
cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
cfm.replicateOnWrite(result.getBoolean("replicate_on_write"));
cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
@ -2172,7 +2153,6 @@ public final class CFMetaData
.append("comment", comment)
.append("readRepairChance", readRepairChance)
.append("dclocalReadRepairChance", dcLocalReadRepairChance)
.append("replicateOnWrite", replicateOnWrite)
.append("gcGraceSeconds", gcGraceSeconds)
.append("defaultValidator", defaultValidator)
.append("keyValidator", keyValidator)

View File

@ -58,6 +58,8 @@ public class Config
public volatile Long write_request_timeout_in_ms = 2000L;
public volatile Long counter_write_request_timeout_in_ms = 5000L;
public volatile Long cas_contention_timeout_in_ms = 1000L;
public volatile Long truncate_request_timeout_in_ms = 60000L;
@ -70,7 +72,10 @@ public class Config
public Integer concurrent_reads = 32;
public Integer concurrent_writes = 32;
public Integer concurrent_replicates = 32;
public Integer concurrent_counter_writes = 32;
@Deprecated
public Integer concurrent_replicates = null;
public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
public Integer memtable_total_space_in_mb;
@ -165,7 +170,12 @@ public class Config
public long row_cache_size_in_mb = 0;
public volatile int row_cache_save_period = 0;
public int row_cache_keys_to_save = Integer.MAX_VALUE;
public volatile int row_cache_keys_to_save = Integer.MAX_VALUE;
public Long counter_cache_size_in_mb = null;
public volatile int counter_cache_save_period = 7200;
public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;
public String memory_allocator = NativeAllocator.class.getSimpleName();
public boolean populate_io_cache_on_flush = false;

View File

@ -86,6 +86,7 @@ public class DatabaseDescriptor
private static RequestSchedulerOptions requestSchedulerOptions;
private static long keyCacheSizeInMB;
private static long counterCacheSizeInMB;
private static IAllocator memoryAllocator;
private static long indexSummaryCapacityInMB;
@ -248,10 +249,11 @@ public class DatabaseDescriptor
throw new ConfigurationException("concurrent_writes must be at least 2");
}
if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
{
throw new ConfigurationException("concurrent_replicates must be at least 2");
}
if (conf.concurrent_counter_writes != null && conf.concurrent_counter_writes < 2)
throw new ConfigurationException("concurrent_counter_writes must be at least 2");
if (conf.concurrent_replicates != null)
logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml");
if (conf.file_cache_size_in_mb == null)
conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
@ -446,6 +448,22 @@ public class DatabaseDescriptor
+ conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
}
try
{
// if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
: conf.counter_cache_size_in_mb;
if (counterCacheSizeInMB < 0)
throw new NumberFormatException(); // to escape duplicating error message
}
catch (NumberFormatException e)
{
throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
+ conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.");
}
// if set to empty/"auto" then use 5% of Heap size
indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
@ -780,6 +798,16 @@ public class DatabaseDescriptor
conf.write_request_timeout_in_ms = timeOutInMillis;
}
public static long getCounterWriteRpcTimeout()
{
return conf.counter_write_request_timeout_in_ms;
}
public static void setCounterWriteRpcTimeout(Long timeOutInMillis)
{
conf.counter_write_request_timeout_in_ms = timeOutInMillis;
}
public static long getCasContentionTimeout()
{
return conf.cas_contention_timeout_in_ms;
@ -818,8 +846,9 @@ public class DatabaseDescriptor
return getTruncateRpcTimeout();
case READ_REPAIR:
case MUTATION:
case COUNTER_MUTATION:
return getWriteRpcTimeout();
case COUNTER_MUTATION:
return getCounterWriteRpcTimeout();
default:
return getRpcTimeout();
}
@ -830,7 +859,12 @@ public class DatabaseDescriptor
*/
public static long getMinRpcTimeout()
{
return Longs.min(getRpcTimeout(), getReadRpcTimeout(), getRangeRpcTimeout(), getWriteRpcTimeout(), getTruncateRpcTimeout());
return Longs.min(getRpcTimeout(),
getReadRpcTimeout(),
getRangeRpcTimeout(),
getWriteRpcTimeout(),
getCounterWriteRpcTimeout(),
getTruncateRpcTimeout());
}
public static double getPhiConvictThreshold()
@ -853,9 +887,9 @@ public class DatabaseDescriptor
return conf.concurrent_writes;
}
public static int getConcurrentReplicators()
public static int getConcurrentCounterWriters()
{
return conf.concurrent_replicates;
return conf.concurrent_counter_writes;
}
public static int getFlushWriters()
@ -1283,6 +1317,31 @@ public class DatabaseDescriptor
return conf.row_cache_keys_to_save;
}
public static long getCounterCacheSizeInMB()
{
return counterCacheSizeInMB;
}
public static int getCounterCacheSavePeriod()
{
return conf.counter_cache_save_period;
}
public static void setCounterCacheSavePeriod(int counterCacheSavePeriod)
{
conf.counter_cache_save_period = counterCacheSavePeriod;
}
public static int getCounterCacheKeysToSave()
{
return conf.counter_cache_keys_to_save;
}
public static void setCounterCacheKeysToSave(int counterCacheKeysToSave)
{
conf.counter_cache_keys_to_save = counterCacheKeysToSave;
}
public static IAllocator getoffHeapMemoryAllocator()
{
return memoryAllocator;

View File

@ -173,7 +173,6 @@ public class AlterTableStatement
cfm.readRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
cfm.dcLocalReadRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
cfm.gcGraceSeconds(cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
cfm.replicateOnWrite(cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
int minCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold());
int maxCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold());
if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)

View File

@ -47,7 +47,6 @@ public class CFPropDefs {
public static final String KW_DEFAULTVALIDATION = "default_validation";
public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold";
public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold";
public static final String KW_REPLICATEONWRITE = "replicate_on_write";
public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
public static final String KW_CACHING = "caching";
public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
@ -90,7 +89,6 @@ public class CFPropDefs {
keywords.add(KW_DEFAULTVALIDATION);
keywords.add(KW_MINCOMPACTIONTHRESHOLD);
keywords.add(KW_MAXCOMPACTIONTHRESHOLD);
keywords.add(KW_REPLICATEONWRITE);
keywords.add(KW_COMPACTION_STRATEGY_CLASS);
keywords.add(KW_CACHING);
keywords.add(KW_DEFAULT_TIME_TO_LIVE);
@ -107,6 +105,7 @@ public class CFPropDefs {
obsoleteKeywords.add("memtable_operations_in_millions");
obsoleteKeywords.add("memtable_flush_after_mins");
obsoleteKeywords.add("row_cache_provider");
obsoleteKeywords.add("replicate_on_write");
allowedKeywords.addAll(keywords);
allowedKeywords.addAll(obsoleteKeywords);

View File

@ -188,7 +188,6 @@ public class CreateColumnFamilyStatement
.comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
.readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
.dcLocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
.replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
.gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
.defaultValidator(cfProps.getValidator())
.minCompactionThreshold(minCompactionThreshold)

View File

@ -25,6 +25,7 @@ import org.github.jamm.MemoryMeter;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
@ -174,7 +175,11 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
{
for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
mutation.apply();
{
// We don't use counters internally.
assert mutation instanceof Mutation;
((Mutation) mutation).apply();
}
return null;
}

View File

@ -34,7 +34,6 @@ public class CFPropDefs extends PropertyDefinitions
public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
public static final String KW_MINCOMPACTIONTHRESHOLD = "min_threshold";
public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_threshold";
public static final String KW_REPLICATEONWRITE = "replicate_on_write";
public static final String KW_CACHING = "caching";
public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
public static final String KW_INDEX_INTERVAL = "index_interval";
@ -57,7 +56,6 @@ public class CFPropDefs extends PropertyDefinitions
keywords.add(KW_READREPAIRCHANCE);
keywords.add(KW_DCLOCALREADREPAIRCHANCE);
keywords.add(KW_GCGRACESECONDS);
keywords.add(KW_REPLICATEONWRITE);
keywords.add(KW_CACHING);
keywords.add(KW_DEFAULT_TIME_TO_LIVE);
keywords.add(KW_INDEX_INTERVAL);
@ -67,6 +65,8 @@ public class CFPropDefs extends PropertyDefinitions
keywords.add(KW_COMPACTION);
keywords.add(KW_COMPRESSION);
keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
obsoleteKeywords.add("replicate_on_write");
}
private Class<? extends AbstractCompactionStrategy> compactionStrategyClass = null;
@ -146,7 +146,6 @@ public class CFPropDefs extends PropertyDefinitions
cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
cfm.replicateOnWrite(getBoolean(KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
int minCompactionThreshold = toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold());
int maxCompactionThreshold = toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold());
if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)

View File

@ -479,7 +479,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
throw new UnsupportedOperationException();
for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
mutation.apply();
{
// We don't use counters internally.
assert mutation instanceof Mutation;
((Mutation) mutation).apply();
}
return null;
}

View File

@ -234,17 +234,13 @@ public class Cell implements OnDiskAtom
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Cell cell = (Cell)o;
if (timestamp != cell.timestamp)
return false;
if (!name.equals(cell.name))
return false;
return value.equals(cell.value);
return timestamp == cell.timestamp && name.equals(cell.name) && value.equals(cell.value);
}
@Override
@ -256,11 +252,6 @@ public class Cell implements OnDiskAtom
return result;
}
public Cell localCopy(ColumnFamilyStore cfs)
{
return localCopy(cfs, HeapAllocator.instance);
}
public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
return new Cell(name.copy(allocator), allocator.clone(value), timestamp);

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import com.google.common.base.Objects;
import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.utils.ObjectSizes;
public class ClockAndCount implements IMeasurableMemory
{
public static ClockAndCount BLANK = ClockAndCount.create(0L, 0L);
public final long clock;
public final long count;
private ClockAndCount(long clock, long count)
{
this.clock = clock;
this.count = count;
}
public static ClockAndCount create(long clock, long count)
{
return new ClockAndCount(clock, count);
}
public long memorySize()
{
return ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(clock))
+ ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(count));
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (!(o instanceof ClockAndCount))
return false;
ClockAndCount other = (ClockAndCount) o;
return clock == other.clock && count == other.count;
}
@Override
public int hashCode()
{
return Objects.hashCode(clock, count);
}
@Override
public String toString()
{
return String.format("ClockAndCount(%s,%s)", clock, count);
}
}

View File

@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.regex.Pattern;
import javax.management.*;
@ -32,13 +33,12 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Striped;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.IRowCacheEntry;
import org.apache.cassandra.cache.RowCacheKey;
import org.apache.cassandra.cache.RowCacheSentinel;
import org.apache.cassandra.cache.*;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.*;
import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
@ -46,6 +46,7 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.composites.Composite;
@ -105,6 +106,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final ColumnFamilyMetrics metric;
public volatile long sampleLatencyNanos;
private final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 128);
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@ -332,9 +335,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.unreferenceSSTables();
indexManager.invalidate();
for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
if (key.cfId == metadata.cfId)
invalidateCachedRow(key);
invalidateCaches();
}
/**
* Obtain a lock for this CF's part of a counter mutation
* @param key the key for the CounterMutation
* @return the striped lock instance
*/
public Lock counterLockFor(ByteBuffer key)
{
assert metadata.isCounter();
return counterLocks.get(key);
}
/**
@ -562,13 +574,29 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
if (cachedRowsRead > 0)
logger.info("completed loading ({} ms; {} keys) row cache for {}.{}",
logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
cachedRowsRead,
keyspace.getName(),
name);
}
public void initCounterCache()
{
if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0)
return;
long start = System.nanoTime();
int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this);
if (cachedShardsRead > 0)
logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
cachedShardsRead,
keyspace.getName(),
name);
}
/**
* See #{@code StorageService.loadNewSSTables(String, String)} for more info
*
@ -1073,9 +1101,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return maxFile;
}
public void forceCleanup(CounterId.OneShotRenewer renewer) throws ExecutionException, InterruptedException
public void forceCleanup() throws ExecutionException, InterruptedException
{
CompactionManager.instance.performCleanup(ColumnFamilyStore.this, renewer);
CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
}
public void scrub(boolean disableSnapshot) throws ExecutionException, InterruptedException
@ -1535,6 +1563,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
invalidateCachedRow(dk);
}
if (metadata.isCounter())
{
for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
{
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
CacheService.instance.counterCache.remove(key);
}
}
}
public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
@ -1886,6 +1924,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily) cached;
}
private void invalidateCaches()
{
for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
if (key.cfId == metadata.cfId)
invalidateCachedRow(key);
if (metadata.isCounter())
for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
if (key.cfId == metadata.cfId)
CacheService.instance.counterCache.remove(key);
}
/**
* @return true if @param key is contained in the row cache
*/
@ -1908,6 +1958,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
invalidateCachedRow(new RowCacheKey(cfId, key));
}
public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName)
{
if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
return null;
return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName));
}
public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount)
{
if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled.
return;
CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount);
}
public void forceMajorCompaction() throws InterruptedException, ExecutionException
{
CompactionManager.instance.performMaximal(this);
@ -2017,12 +2081,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
logger.debug("cleaning out row cache");
for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
{
if (key.cfId == metadata.cfId)
invalidateCachedRow(key);
}
invalidateCaches();
}
};

View File

@ -341,18 +341,11 @@ public enum ConsistencyLevel
public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException
{
if (this == ConsistencyLevel.ANY)
{
throw new InvalidRequestException("Consistency level ANY is not yet supported for counter columnfamily " + metadata.cfName);
}
else if (!metadata.getReplicateOnWrite() && !(this == ConsistencyLevel.ONE || this == ConsistencyLevel.LOCAL_ONE))
{
throw new InvalidRequestException("cannot achieve CL > CL.ONE without replicate_on_write on columnfamily " + metadata.cfName);
}
else if (isSerialConsistency())
{
if (isSerialConsistency())
throw new InvalidRequestException("Counter operations are inherently non-serializable");
}
}
private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException
{

View File

@ -25,7 +25,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.context.IContext.ContextRelationship;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.Allocator;
@ -40,16 +39,6 @@ public class CounterCell extends Cell
private final long timestampOfLastDelete;
public CounterCell(CellName name, long value, long timestamp)
{
this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp);
}
public CounterCell(CellName name, long value, long timestamp, long timestampOfLastDelete)
{
this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
}
public CounterCell(CellName name, ByteBuffer value, long timestamp)
{
this(name, value, timestamp, Long.MIN_VALUE);
@ -68,6 +57,12 @@ public class CounterCell extends Cell
return new CounterCell(name, value, timestamp, timestampOfLastDelete);
}
// For use by tests of compatibility with pre-2.1 counter only.
public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete)
{
return new CounterCell(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
}
@Override
public Cell withUpdatedName(CellName newName)
{
@ -110,12 +105,12 @@ public class CounterCell extends Cell
// merging a CounterCell with a tombstone never return a tombstone
// unless that tombstone timestamp is greater that the CounterCell
// one.
assert !(cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete())
return cell;
ContextRelationship rel = contextManager.diff(cell.value(), value());
if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
CounterContext.Relationship rel = contextManager.diff(cell.value(), value());
if (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT)
return cell;
return null;
}
@ -194,12 +189,6 @@ public class CounterCell extends Cell
return 31 * super.hashCode() + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
}
@Override
public Cell localCopy(ColumnFamilyStore cfs)
{
return localCopy(cfs, HeapAllocator.instance);
}
@Override
public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
@ -231,14 +220,6 @@ public class CounterCell extends Cell
contextManager.validateContext(value());
}
/**
* Check if a given counterId is found in this CounterCell context.
*/
public boolean hasCounterId(CounterId id)
{
return contextManager.hasCounterId(value(), id);
}
public Cell markLocalToBeCleared()
{
ByteBuffer marked = contextManager.markLocalToBeCleared(value);

View File

@ -21,22 +21,21 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import com.google.common.collect.Iterables;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
public class CounterMutation implements IMutation
{
@ -76,67 +75,175 @@ public class CounterMutation implements IMutation
return consistency;
}
public Mutation makeReplicationMutation()
{
List<ReadCommand> readCommands = new LinkedList<>();
long timestamp = System.currentTimeMillis();
for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
if (!columnFamily.metadata().getReplicateOnWrite())
continue;
addReadCommandFromColumnFamily(mutation.getKeyspaceName(), mutation.key(), columnFamily, timestamp, readCommands);
}
// create a replication Mutation
Mutation replicationMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
for (ReadCommand readCommand : readCommands)
{
Keyspace keyspace = Keyspace.open(readCommand.ksName);
Row row = readCommand.getRow(keyspace);
if (row == null || row.cf == null)
continue;
ColumnFamily cf = row.cf;
replicationMutation.add(cf);
}
return replicationMutation;
}
private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
{
SortedSet<CellName> s = new TreeSet<>(columnFamily.metadata().comparator);
Iterables.addAll(s, columnFamily.getColumnNames());
commands.add(new SliceByNamesReadCommand(keyspaceName, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
}
public MessageOut<CounterMutation> makeMutationMessage()
{
return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
}
public boolean shouldReplicateOnWrite()
/**
* Applies the counter mutation, returns the result Mutation (for replication to other nodes).
*
* 1. Grabs the striped CF-level lock(s)
* 2. Gets the current values of the counters-to-be-modified from the counter cache
* 3. Reads the rest of the current values (cache misses) from the CF
* 4. Writes the updated counter values
* 5. Updates the counter cache
* 6. Releases the lock(s)
*
* See CASSANDRA-4775 and CASSANDRA-6504 for further details.
*
* @return the applied resulting Mutation
*/
public Mutation apply() throws WriteTimeoutException
{
for (ColumnFamily cf : mutation.getColumnFamilies())
if (cf.metadata().getReplicateOnWrite())
return true;
return false;
Mutation result = new Mutation(getKeyspaceName(), ByteBufferUtil.clone(key()));
Keyspace keyspace = Keyspace.open(getKeyspaceName());
ArrayList<UUID> cfIds = new ArrayList<>(getColumnFamilyIds());
Collections.sort(cfIds); // will lock in the sorted order, to avoid a potential deadlock.
ArrayList<Lock> locks = new ArrayList<>(cfIds.size());
try
{
Tracing.trace("Acquiring {} counter locks", cfIds.size());
for (UUID cfId : cfIds)
{
Lock lock = keyspace.getColumnFamilyStore(cfId).counterLockFor(key());
if (!lock.tryLock(getTimeout(), TimeUnit.MILLISECONDS))
throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
locks.add(lock);
}
public void apply()
{
// transform all CounterUpdateCell to CounterCell: accomplished by localCopy
Mutation m = new Mutation(mutation.getKeyspaceName(), ByteBufferUtil.clone(mutation.key()));
Keyspace keyspace = Keyspace.open(m.getKeyspaceName());
for (ColumnFamily cf : getColumnFamilies())
result.add(processModifications(cf));
for (ColumnFamily cf_ : mutation.getColumnFamilies())
result.apply();
updateCounterCache(result, keyspace);
return result;
}
catch (InterruptedException e)
{
throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
}
finally
{
for (Lock lock : locks)
lock.unlock();
}
}
// Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
private ColumnFamily processModifications(ColumnFamily changesCF)
{
Allocator allocator = HeapAllocator.instance;
ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
ColumnFamily resultCF = changesCF.cloneMeShallow();
List<CounterUpdateCell> counterUpdateCells = new ArrayList<>(changesCF.getColumnCount());
for (Cell cell : changesCF)
{
if (cell instanceof CounterUpdateCell)
counterUpdateCells.add((CounterUpdateCell)cell);
else
resultCF.addColumn(cell.localCopy(cfs, allocator));
}
if (counterUpdateCells.isEmpty())
return resultCF; // only DELETEs
ClockAndCount[] currentValues = getCurrentValues(counterUpdateCells, cfs);
for (int i = 0; i < counterUpdateCells.size(); i++)
{
ClockAndCount currentValue = currentValues[i];
CounterUpdateCell update = counterUpdateCells.get(i);
long clock = currentValue.clock + 1L;
long count = currentValue.count + update.delta();
resultCF.addColumn(new CounterCell(update.name().copy(allocator),
CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count, allocator),
update.timestamp()));
}
return resultCF;
}
// Attempt to load the current values(s) from cache. If that fails, read the rest from the cfs.
private ClockAndCount[] getCurrentValues(List<CounterUpdateCell> counterUpdateCells, ColumnFamilyStore cfs)
{
ClockAndCount[] currentValues = new ClockAndCount[counterUpdateCells.size()];
int remaining = counterUpdateCells.size();
if (CacheService.instance.counterCache.getCapacity() != 0)
{
Tracing.trace("Fetching {} counter values from cache", counterUpdateCells.size());
remaining = getCurrentValuesFromCache(counterUpdateCells, cfs, currentValues);
if (remaining == 0)
return currentValues;
}
Tracing.trace("Reading {} counter values from the CF", remaining);
getCurrentValuesFromCFS(counterUpdateCells, cfs, currentValues);
return currentValues;
}
// Returns the count of cache misses.
private int getCurrentValuesFromCache(List<CounterUpdateCell> counterUpdateCells,
ColumnFamilyStore cfs,
ClockAndCount[] currentValues)
{
int cacheMisses = 0;
for (int i = 0; i < counterUpdateCells.size(); i++)
{
ClockAndCount cached = cfs.getCachedCounter(key(), counterUpdateCells.get(i).name());
if (cached != null)
currentValues[i] = cached;
else
cacheMisses++;
}
return cacheMisses;
}
// Reads the missing current values from the CFS.
private void getCurrentValuesFromCFS(List<CounterUpdateCell> counterUpdateCells,
ColumnFamilyStore cfs,
ClockAndCount[] currentValues)
{
SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator);
for (int i = 0; i < currentValues.length; i++)
if (currentValues[i] == null)
names.add(counterUpdateCells.get(i).name);
ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names));
Row row = cmd.getRow(cfs.keyspace);
ColumnFamily cf = row == null ? null : row.cf;
for (int i = 0; i < currentValues.length; i++)
{
if (currentValues[i] != null)
continue;
Cell cell = cf == null ? null : cf.getColumn(counterUpdateCells.get(i).name());
if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE)) // absent or a tombstone.
currentValues[i] = ClockAndCount.BLANK;
else
currentValues[i] = CounterContext.instance().getLocalClockAndCount(cell.value());
}
}
private void updateCounterCache(Mutation applied, Keyspace keyspace)
{
if (CacheService.instance.counterCache.getCapacity() == 0)
return;
for (ColumnFamily cf : applied.getColumnFamilies())
{
ColumnFamily cf = cf_.cloneMeShallow();
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
for (Cell cell : cf_)
cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
m.add(cf);
for (Cell cell : cf)
if (cell instanceof CounterCell)
cfs.putCachedCounter(key(), cell.name(), CounterContext.instance().getLocalClockAndCount(cell.value()));
}
m.apply();
}
public void addAll(IMutation m)
@ -147,6 +254,11 @@ public class CounterMutation implements IMutation
mutation.addAll(cm.mutation);
}
public long getTimeout()
{
return DatabaseDescriptor.getCounterWriteRpcTimeout();
}
@Override
public String toString()
{

View File

@ -21,10 +21,8 @@ import java.nio.ByteBuffer;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
/**
* A counter update while it hasn't been applied yet by the leader replica.
@ -63,13 +61,12 @@ public class CounterUpdateCell extends Cell
// The only time this could happen is if a batchAdd ships two
// increment for the same cell. Hence we simply sums the delta.
assert (cell instanceof CounterUpdateCell) || (cell instanceof DeletedCell) : "Wrong class type.";
// tombstones take precedence
if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
return timestamp() > cell.timestamp() ? this : cell;
// neither is tombstoned
assert cell instanceof CounterUpdateCell : "Wrong class type.";
CounterUpdateCell c = (CounterUpdateCell) cell;
return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
}
@ -80,22 +77,10 @@ public class CounterUpdateCell extends Cell
return ColumnSerializer.COUNTER_UPDATE_MASK;
}
@Override
public CounterCell localCopy(ColumnFamilyStore cfs)
{
return new CounterCell(name.copy(HeapAllocator.instance),
CounterContext.instance().createLocal(delta(), HeapAllocator.instance),
timestamp(),
Long.MIN_VALUE);
}
@Override
public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
{
return new CounterCell(name.copy(allocator),
CounterContext.instance().createLocal(delta(), allocator),
timestamp(),
Long.MIN_VALUE);
throw new UnsupportedOperationException();
}
@Override

View File

@ -27,7 +27,6 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
public class DeletedCell extends Cell
{
@ -97,12 +96,6 @@ public class DeletedCell extends Cell
return cell.reconcile(this, allocator);
}
@Override
public Cell localCopy(ColumnFamilyStore cfs)
{
return new DeletedCell(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
}
@Override
public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
{

View File

@ -27,7 +27,6 @@ import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.HeapAllocator;
/**
* Alternative to Cell that have an expiring time.
@ -132,12 +131,6 @@ public class ExpiringCell extends Cell
return localExpirationTime;
}
@Override
public Cell localCopy(ColumnFamilyStore cfs)
{
return localCopy(cfs, HeapAllocator.instance);
}
@Override
public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
{

View File

@ -26,7 +26,7 @@ public interface IMutation
public String getKeyspaceName();
public Collection<UUID> getColumnFamilyIds();
public ByteBuffer key();
public void apply();
public long getTimeout();
public String toString(boolean shallow);
public void addAll(IMutation m);
public Collection<ColumnFamily> getColumnFamilies();

View File

@ -26,6 +26,7 @@ import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
@ -219,6 +220,11 @@ public class Mutation implements IMutation
return new MessageOut<>(verb, this, serializer);
}
public long getTimeout()
{
return DatabaseDescriptor.getWriteRpcTimeout();
}
public String toString()
{
return toString(false);

View File

@ -693,27 +693,6 @@ public class SystemKeyspace
forceBlockingFlush(COUNTER_ID_CF);
}
public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
{
List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
CounterId previous = null;
for (Cell c : cf)
{
if (previous != null)
l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
// this will ignore the last column on purpose since it is the
// current local node id
previous = CounterId.wrap(c.name().toByteBuffer());
}
return l;
}
/**
* @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
* @return CFS responsible to hold low-level serialized schema

View File

@ -261,7 +261,7 @@ public class CompactionManager implements CompactionManagerMBean
});
}
public void performCleanup(ColumnFamilyStore cfStore, final CounterId.OneShotRenewer renewer) throws InterruptedException, ExecutionException
public void performCleanup(ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
{
performAllSSTableOperation(cfStore, new AllSSTablesOperation()
{
@ -272,7 +272,7 @@ public class CompactionManager implements CompactionManagerMBean
List<SSTableReader> sortedSSTables = Lists.newArrayList(sstables);
Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
doCleanupCompaction(store, sortedSSTables, renewer);
doCleanupCompaction(store, sortedSSTables);
}
});
}
@ -508,7 +508,7 @@ public class CompactionManager implements CompactionManagerMBean
*
* @throws IOException
*/
private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
{
assert !cfs.isIndex();
Keyspace keyspace = cfs.keyspace;
@ -520,7 +520,7 @@ public class CompactionManager implements CompactionManagerMBean
}
boolean hasIndexes = cfs.indexManager.hasIndexes();
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer);
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges);
for (SSTableReader sstable : sstables)
{
@ -614,12 +614,11 @@ public class CompactionManager implements CompactionManagerMBean
private static abstract class CleanupStrategy
{
public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
{
if (cfs.indexManager.hasIndexes() || cfs.metadata.isCounter())
return new Full(cfs, ranges, renewer);
return new Bounded(cfs, ranges);
return cfs.indexManager.hasIndexes()
? new Full(cfs, ranges)
: new Bounded(cfs, ranges);
}
public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter);
@ -660,14 +659,12 @@ public class CompactionManager implements CompactionManagerMBean
private final Collection<Range<Token>> ranges;
private final ColumnFamilyStore cfs;
private List<Cell> indexedColumnsInRow;
private final CounterId.OneShotRenewer renewer;
public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
{
this.cfs = cfs;
this.ranges = ranges;
this.indexedColumnsInRow = null;
this.renewer = renewer;
}
@Override
@ -690,8 +687,6 @@ public class CompactionManager implements CompactionManagerMBean
while (row.hasNext())
{
OnDiskAtom column = row.next();
if (column instanceof CounterCell)
renewer.maybeRenew((CounterCell) column);
if (column instanceof Cell && cfs.indexManager.indexes((Cell) column))
{

View File

@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ClockAndCount;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.serializers.MarshalException;
@ -72,7 +73,7 @@ import org.apache.cassandra.utils.*;
* rules work this way, see CASSANDRA-1938 - specifically the 1938_discussion
* attachment (doesn't cover global shards, see CASSANDRA-4775 for that).
*/
public class CounterContext implements IContext
public class CounterContext
{
private static final int HEADER_SIZE_LENGTH = TypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
private static final int HEADER_ELT_LENGTH = TypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
@ -82,6 +83,11 @@ public class CounterContext implements IContext
private static final Logger logger = LoggerFactory.getLogger(CounterContext.class);
public static enum Relationship
{
EQUAL, GREATER_THAN, LESS_THAN, DISJOINT
}
// lazy-load singleton
private static class LazyHolder
{
@ -93,8 +99,19 @@ public class CounterContext implements IContext
return LazyHolder.counterContext;
}
/**
* Creates a counter context with a single global, 2.1+ shard (a result of increment).
*/
public ByteBuffer createGlobal(CounterId id, long clock, long count, Allocator allocator)
{
ContextState state = ContextState.allocate(1, 0, 0, allocator);
state.writeGlobal(id, clock, count);
return state.context;
}
/**
* Creates a counter context with a single local shard.
* For use by tests of compatibility with pre-2.1 counters only.
*/
public ByteBuffer createLocal(long count, Allocator allocator)
{
@ -105,6 +122,7 @@ public class CounterContext implements IContext
/**
* Creates a counter context with a single remote shard.
* For use by tests of compatibility with pre-2.1 counters only.
*/
public ByteBuffer createRemote(CounterId id, long clock, long count, Allocator allocator)
{
@ -135,11 +153,11 @@ public class CounterContext implements IContext
*
* @param left counter context.
* @param right counter context.
* @return the ContextRelationship between the contexts.
* @return the Relationship between the contexts.
*/
public ContextRelationship diff(ByteBuffer left, ByteBuffer right)
public Relationship diff(ByteBuffer left, ByteBuffer right)
{
ContextRelationship relationship = ContextRelationship.EQUAL;
Relationship relationship = Relationship.EQUAL;
ContextState leftState = ContextState.wrap(left);
ContextState rightState = ContextState.wrap(right);
@ -165,45 +183,25 @@ public class CounterContext implements IContext
{
// Inconsistent shard (see the corresponding code in merge()). We return DISJOINT in this
// case so that it will be treated as a difference, allowing read-repair to work.
return ContextRelationship.DISJOINT;
}
else
{
continue;
return Relationship.DISJOINT;
}
}
else if ((leftClock >= 0 && rightClock > 0 && leftClock > rightClock)
|| (leftClock < 0 && (rightClock > 0 || leftClock < rightClock)))
{
if (relationship == ContextRelationship.EQUAL)
{
relationship = ContextRelationship.GREATER_THAN;
}
else if (relationship == ContextRelationship.GREATER_THAN)
{
continue;
if (relationship == Relationship.EQUAL)
relationship = Relationship.GREATER_THAN;
else if (relationship == Relationship.LESS_THAN)
return Relationship.DISJOINT;
// relationship == Relationship.GREATER_THAN
}
else
{
// relationship == ContextRelationship.LESS_THAN
return ContextRelationship.DISJOINT;
}
}
else
{
if (relationship == ContextRelationship.EQUAL)
{
relationship = ContextRelationship.LESS_THAN;
}
else if (relationship == ContextRelationship.GREATER_THAN)
{
return ContextRelationship.DISJOINT;
}
else
{
// relationship == ContextRelationship.LESS_THAN
continue;
}
if (relationship == Relationship.EQUAL)
relationship = Relationship.LESS_THAN;
else if (relationship == Relationship.GREATER_THAN)
return Relationship.DISJOINT;
// relationship == Relationship.LESS_THAN
}
}
else if (compareId > 0)
@ -211,63 +209,40 @@ public class CounterContext implements IContext
// only advance the right context
rightState.moveToNext();
if (relationship == ContextRelationship.EQUAL)
{
relationship = ContextRelationship.LESS_THAN;
}
else if (relationship == ContextRelationship.GREATER_THAN)
{
return ContextRelationship.DISJOINT;
}
else
{
// relationship == ContextRelationship.LESS_THAN
continue;
}
if (relationship == Relationship.EQUAL)
relationship = Relationship.LESS_THAN;
else if (relationship == Relationship.GREATER_THAN)
return Relationship.DISJOINT;
// relationship == Relationship.LESS_THAN
}
else // compareId < 0
{
// only advance the left context
leftState.moveToNext();
if (relationship == ContextRelationship.EQUAL)
{
relationship = ContextRelationship.GREATER_THAN;
}
else if (relationship == ContextRelationship.GREATER_THAN)
{
continue;
}
else
// relationship == ContextRelationship.LESS_THAN
{
return ContextRelationship.DISJOINT;
}
if (relationship == Relationship.EQUAL)
relationship = Relationship.GREATER_THAN;
else if (relationship == Relationship.LESS_THAN)
return Relationship.DISJOINT;
// relationship == Relationship.GREATER_THAN
}
}
// check final lengths
if (leftState.hasRemaining())
{
if (relationship == ContextRelationship.EQUAL)
{
return ContextRelationship.GREATER_THAN;
if (relationship == Relationship.EQUAL)
return Relationship.GREATER_THAN;
else if (relationship == Relationship.LESS_THAN)
return Relationship.DISJOINT;
}
else if (relationship == ContextRelationship.LESS_THAN)
if (rightState.hasRemaining())
{
return ContextRelationship.DISJOINT;
}
}
else if (rightState.hasRemaining())
{
if (relationship == ContextRelationship.EQUAL)
{
return ContextRelationship.LESS_THAN;
}
else if (relationship == ContextRelationship.GREATER_THAN)
{
return ContextRelationship.DISJOINT;
}
if (relationship == Relationship.EQUAL)
return Relationship.LESS_THAN;
else if (relationship == Relationship.GREATER_THAN)
return Relationship.DISJOINT;
}
return relationship;
@ -527,14 +502,9 @@ public class CounterContext implements IContext
public long total(ByteBuffer context)
{
long total = 0L;
// we could use a ContextState but it is easy enough that we avoid the object creation
for (int offset = context.position() + headerLength(context); offset < context.limit(); offset += STEP_LENGTH)
{
long count = context.getLong(offset + CounterId.LENGTH + CLOCK_LENGTH);
total += count;
}
total += context.getLong(offset + CounterId.LENGTH + CLOCK_LENGTH);
return total;
}
@ -642,24 +612,54 @@ public class CounterContext implements IContext
}
/**
* Checks whether the provided context has a count for the provided
* CounterId.
*
* TODO: since the context is sorted, we could implement a binary search.
* This is however not called in any critical path and contexts will be
* fairly small so it doesn't matter much.
* Returns the clock and the count associated with the local counter id, or (0, 0) if no such shard is present.
*/
public boolean hasCounterId(ByteBuffer context, CounterId id)
public ClockAndCount getLocalClockAndCount(ByteBuffer context)
{
// we could use a ContextState but it is easy enough that we avoid the object creation
for (int offset = context.position() + headerLength(context); offset < context.limit(); offset += STEP_LENGTH)
{
if (id.equals(CounterId.wrap(context, offset)))
{
return true;
return getClockAndCountOf(context, CounterId.getLocalId());
}
/**
* Returns the clock and the count associated with the given counter id, or (0, 0) if no such shard is present.
*/
@VisibleForTesting
public ClockAndCount getClockAndCountOf(ByteBuffer context, CounterId id)
{
int position = findPositionOf(context, id);
if (position == -1)
return ClockAndCount.BLANK;
long clock = context.getLong(position + CounterId.LENGTH);
long count = context.getLong(position + CounterId.LENGTH + CLOCK_LENGTH);
return ClockAndCount.create(clock, count);
}
return false;
/**
* Finds the position of a shard with the given id within the context (via binary search).
*/
@VisibleForTesting
public int findPositionOf(ByteBuffer context, CounterId id)
{
int headerLength = headerLength(context);
int offset = context.position() + headerLength;
int left = 0;
int right = (context.remaining() - headerLength) / STEP_LENGTH - 1;
while (right >= left)
{
int middle = (left + right) / 2;
int cmp = compareId(context, offset + middle * STEP_LENGTH, id.bytes(), id.bytes().position());
if (cmp == -1)
left = middle + 1;
else if (cmp == 0)
return offset + middle * STEP_LENGTH;
else
right = middle - 1;
}
return -1; // position not found
}
/**
@ -754,20 +754,7 @@ public class CounterContext implements IContext
public void copyTo(ContextState other)
{
ByteBufferUtil.arrayCopy(context,
context.position() + bodyOffset,
other.context,
other.context.position() + other.bodyOffset,
STEP_LENGTH);
if (currentIsGlobal)
other.context.putShort(other.context.position() + other.headerOffset, (short) (other.getElementIndex() + Short.MIN_VALUE));
else if (currentIsLocal)
context.putShort(other.context.position() + other.headerOffset, (short) other.getElementIndex());
other.currentIsGlobal = currentIsGlobal;
other.currentIsLocal = currentIsLocal;
other.moveToNext();
other.writeElement(getCounterId(), getClock(), getCount(), currentIsGlobal, currentIsLocal);
}
public int compareIdTo(ContextState other)
@ -802,17 +789,18 @@ public class CounterContext implements IContext
return context.getLong(context.position() + bodyOffset + CounterId.LENGTH + CLOCK_LENGTH);
}
// In 2.0 only used by the unit tests.
public void writeGlobal(CounterId id, long clock, long count)
{
writeElement(id, clock, count, true, false);
}
// In 2.1 only used by the unit tests.
public void writeLocal(CounterId id, long clock, long count)
{
writeElement(id, clock, count, false, true);
}
// In 2.1 only used by the unit tests.
public void writeRemote(CounterId id, long clock, long count)
{
writeElement(id, clock, count, false, false);

View File

@ -1,75 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.context;
import java.nio.ByteBuffer;
import org.apache.cassandra.utils.Allocator;
/**
* An opaque commutative context.
*
* Maintains a ByteBuffer context that represents a partitioned commutative value.
*/
public interface IContext
{
public static enum ContextRelationship
{
EQUAL,
GREATER_THAN,
LESS_THAN,
DISJOINT
};
/**
* Determine the relationship between two contexts.
*
* EQUAL: Equal set of nodes and every count is equal.
* GREATER_THAN: Superset of nodes and every count is equal or greater than its corollary.
* LESS_THAN: Subset of nodes and every count is equal or less than its corollary.
* DISJOINT: Node sets are not equal and/or counts are not all greater or less than.
*
* @param left
* context.
* @param right
* context.
* @return the ContextRelationship between the contexts.
*/
public ContextRelationship diff(ByteBuffer left, ByteBuffer right);
/**
* Return a context w/ an aggregated count for each node id.
*
* @param left
* context.
* @param right
* context.
* @param allocator
* an allocator to allocate the new context from.
*/
public ByteBuffer merge(ByteBuffer left, ByteBuffer right, Allocator allocator);
/**
* Human-readable String from context.
*
* @param context
* context.
* @return a human-readable String of the context.
*/
public String toString(ByteBuffer context);
}

View File

@ -156,7 +156,7 @@ public abstract class AbstractSSTableSimpleWriter
public void addCounterColumn(ByteBuffer name, long value)
{
addColumn(new CounterCell(metadata.comparator.cellFromByteBuffer(name),
CounterContext.instance().createRemote(counterid, 1L, value, HeapAllocator.instance),
CounterContext.instance().createGlobal(counterid, 1L, value, HeapAllocator.instance),
System.currentTimeMillis()));
}

View File

@ -130,9 +130,9 @@ public final class MessagingService implements MessagingServiceMBean
public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
{{
put(Verb.MUTATION, Stage.MUTATION);
put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
put(Verb.READ_REPAIR, Stage.MUTATION);
put(Verb.TRUNCATE, Stage.MUTATION);
put(Verb.COUNTER_MUTATION, Stage.MUTATION);
put(Verb.PAXOS_PREPARE, Stage.MUTATION);
put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
put(Verb.PAXOS_COMMIT, Stage.MUTATION);

View File

@ -44,7 +44,6 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
private final WriteType writeType;
/**
* @param pendingEndpoints
* @param callback A callback to be called when the write is successful.
*/
protected AbstractWriteResponseHandler(Keyspace keyspace,
@ -65,7 +64,11 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
public void get() throws WriteTimeoutException
{
long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout()) - (System.nanoTime() - start);
long requestTimeout = writeType == WriteType.COUNTER
? DatabaseDescriptor.getCounterWriteRpcTimeout()
: DatabaseDescriptor.getWriteRpcTimeout();
long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - start);
boolean success;
try

View File

@ -28,9 +28,11 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,14 +44,14 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@ -62,7 +64,8 @@ public class CacheService implements CacheServiceMBean
public static enum CacheType
{
KEY_CACHE("KeyCache"),
ROW_CACHE("RowCache");
ROW_CACHE("RowCache"),
COUNTER_CACHE("CounterCache");
private final String name;
@ -81,6 +84,7 @@ public class CacheService implements CacheServiceMBean
public final AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache;
public final AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache;
public final AutoSavingCache<CounterCacheKey, ClockAndCount> counterCache;
private CacheService()
{
@ -97,6 +101,7 @@ public class CacheService implements CacheServiceMBean
keyCache = initKeyCache();
rowCache = initRowCache();
counterCache = initCounterCache();
}
/**
@ -112,7 +117,7 @@ public class CacheService implements CacheServiceMBean
// where 48 = 40 bytes (average size of the key) + 8 bytes (size of value)
ICache<KeyCacheKey, RowIndexEntry> kc;
kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity);
AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer());
AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer());
int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
@ -132,7 +137,7 @@ public class CacheService implements CacheServiceMBean
// cache object
ICache<RowCacheKey, IRowCacheEntry> rc = new SerializingCacheProvider().create(rowCacheInMemoryCapacity);
AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE, new RowCacheSerializer());
AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<>(rc, CacheType.ROW_CACHE, new RowCacheSerializer());
int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
@ -141,6 +146,28 @@ public class CacheService implements CacheServiceMBean
return rowCache;
}
private AutoSavingCache<CounterCacheKey, ClockAndCount> initCounterCache()
{
logger.info("Initializing counter cache with capacity of {} MBs", DatabaseDescriptor.getCounterCacheSizeInMB());
long capacity = DatabaseDescriptor.getCounterCacheSizeInMB() * 1024 * 1024;
AutoSavingCache<CounterCacheKey, ClockAndCount> cache =
new AutoSavingCache<>(ConcurrentLinkedHashCache.<CounterCacheKey, ClockAndCount>create(capacity),
CacheType.COUNTER_CACHE,
new CounterCacheSerializer());
int keysToSave = DatabaseDescriptor.getCounterCacheKeysToSave();
logger.info("Scheduling counter cache save to every {} seconds (going to save {} keys).",
DatabaseDescriptor.getCounterCacheSavePeriod(),
keysToSave == Integer.MAX_VALUE ? "all" : keysToSave);
cache.scheduleSaving(DatabaseDescriptor.getCounterCacheSavePeriod(), keysToSave);
return cache;
}
public long getKeyCacheHits()
{
return keyCache.getMetrics().hits.count();
@ -199,6 +226,20 @@ public class CacheService implements CacheServiceMBean
keyCache.scheduleSaving(seconds, DatabaseDescriptor.getKeyCacheKeysToSave());
}
public int getCounterCacheSavePeriodInSeconds()
{
return DatabaseDescriptor.getCounterCacheSavePeriod();
}
public void setCounterCacheSavePeriodInSeconds(int seconds)
{
if (seconds < 0)
throw new RuntimeException("CounterCacheSavePeriodInSeconds must be non-negative.");
DatabaseDescriptor.setCounterCacheSavePeriod(seconds);
counterCache.scheduleSaving(seconds, DatabaseDescriptor.getCounterCacheKeysToSave());
}
public int getRowCacheKeysToSave()
{
return DatabaseDescriptor.getRowCacheKeysToSave();
@ -225,6 +266,19 @@ public class CacheService implements CacheServiceMBean
keyCache.scheduleSaving(getKeyCacheSavePeriodInSeconds(), count);
}
public int getCounterCacheKeysToSave()
{
return DatabaseDescriptor.getCounterCacheKeysToSave();
}
public void setCounterCacheKeysToSave(int count)
{
if (count < 0)
throw new RuntimeException("CounterCacheKeysToSave must be non-negative.");
DatabaseDescriptor.setCounterCacheKeysToSave(count);
counterCache.scheduleSaving(getCounterCacheSavePeriodInSeconds(), count);
}
public void invalidateKeyCache()
{
keyCache.clear();
@ -235,6 +289,11 @@ public class CacheService implements CacheServiceMBean
rowCache.clear();
}
public void invalidateCounterCache()
{
counterCache.clear();
}
public long getRowCacheCapacityInBytes()
{
return rowCache.getMetrics().capacity.value();
@ -271,6 +330,14 @@ public class CacheService implements CacheServiceMBean
keyCache.setCapacity(capacity * 1024 * 1024);
}
public void setCounterCacheCapacityInMB(long capacity)
{
if (capacity < 0)
throw new RuntimeException("capacity should not be negative.");
counterCache.setCapacity(capacity * 1024 * 1024);
}
public long getRowCacheSize()
{
return rowCache.getMetrics().size.value();
@ -293,16 +360,60 @@ public class CacheService implements CacheServiceMBean
public void saveCaches() throws ExecutionException, InterruptedException
{
List<Future<?>> futures = new ArrayList<Future<?>>(2);
List<Future<?>> futures = new ArrayList<>(3);
logger.debug("submitting cache saves");
futures.add(keyCache.submitWrite(DatabaseDescriptor.getKeyCacheKeysToSave()));
futures.add(rowCache.submitWrite(DatabaseDescriptor.getRowCacheKeysToSave()));
futures.add(counterCache.submitWrite(DatabaseDescriptor.getCounterCacheKeysToSave()));
FBUtilities.waitOnFutures(futures);
logger.debug("cache saves completed");
}
public class CounterCacheSerializer implements CacheSerializer<CounterCacheKey, ClockAndCount>
{
public void serialize(CounterCacheKey key, DataOutput out) throws IOException
{
ByteBufferUtil.writeWithLength(key.partitionKey, out);
ByteBufferUtil.writeWithLength(key.cellName, out);
}
public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
{
final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in));
return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
{
public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
{
DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
Lock lock = cfs.counterLockFor(partitionKey);
lock.lock();
try
{
QueryFilter filter = QueryFilter.getNamesFilter(key,
cfs.metadata.cfName,
FBUtilities.singleton(cellName, cfs.metadata.comparator),
Long.MIN_VALUE);
ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
if (cf == null)
return null;
Cell cell = cf.getColumn(cellName);
if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE))
return null;
ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
}
finally
{
lock.unlock();
}
}
});
}
}
public class RowCacheSerializer implements CacheSerializer<RowCacheKey, IRowCacheEntry>
{
public void serialize(RowCacheKey key, DataOutput out) throws IOException

View File

@ -27,12 +27,18 @@ public interface CacheServiceMBean
public int getKeyCacheSavePeriodInSeconds();
public void setKeyCacheSavePeriodInSeconds(int kcspis);
public int getCounterCacheSavePeriodInSeconds();
public void setCounterCacheSavePeriodInSeconds(int ccspis);
public int getRowCacheKeysToSave();
public void setRowCacheKeysToSave(int rckts);
public int getKeyCacheKeysToSave();
public void setKeyCacheKeysToSave(int kckts);
public int getCounterCacheKeysToSave();
public void setCounterCacheKeysToSave(int cckts);
/**
* invalidate the key cache; for use after invalidating row cache
*/
@ -43,10 +49,14 @@ public interface CacheServiceMBean
*/
public void invalidateRowCache();
public void invalidateCounterCache();
public void setRowCacheCapacityInMB(long capacity);
public void setKeyCacheCapacityInMB(long capacity);
public void setCounterCacheCapacityInMB(long capacity);
/**
* save row and key caches
*

View File

@ -129,8 +129,8 @@ public class StorageProxy implements StorageProxyMBean
/*
* We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or
* in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage
* but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the
* in CounterMutationVerbHandler on a replica othewise. The write must be executed on the COUNTER_MUTATION stage
* but on the latter case, the verb handler already run on the COUNTER_MUTATION stage, so we must not execute the
* underlying on the stage otherwise we risk a deadlock. Hence two different performer.
*/
counterWritePerformer = new WritePerformer()
@ -153,7 +153,8 @@ public class StorageProxy implements StorageProxyMBean
String localDataCenter,
ConsistencyLevel consistencyLevel)
{
StageManager.getStage(Stage.MUTATION).execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter));
StageManager.getStage(Stage.COUNTER_MUTATION)
.execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter));
}
};
}
@ -993,7 +994,7 @@ public class StorageProxy implements StorageProxyMBean
IMutation processed = SinkManager.processWriteRequest(mutation);
if (processed != null)
{
processed.apply();
((Mutation) processed).apply();
responseHandler.response(null);
}
}
@ -1102,34 +1103,22 @@ public class StorageProxy implements StorageProxyMBean
{
return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
{
public void runMayThrow()
public void runMayThrow() throws OverloadedException, WriteTimeoutException
{
IMutation processed = SinkManager.processWriteRequest(mutation);
if (processed == null)
return;
assert processed instanceof CounterMutation;
final CounterMutation cm = (CounterMutation) processed;
CounterMutation cm = (CounterMutation) processed;
// apply mutation
cm.apply();
Mutation result = cm.apply();
responseHandler.response(null);
// then send to replicas, if any
final Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(FBUtilities.getBroadcastAddress()));
if (!remotes.isEmpty() && cm.shouldReplicateOnWrite())
{
// We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION stage
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(MessagingService.Verb.READ)
{
public void runMayThrow() throws OverloadedException
{
// send mutation to other replica
sendToHintedEndpoints(cm.makeReplicationMutation(), remotes, responseHandler, localDataCenter);
}
});
}
Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets),
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
if (!remotes.isEmpty())
sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter);
}
};
}
@ -2146,6 +2135,9 @@ public class StorageProxy implements StorageProxyMBean
public Long getWriteRpcTimeout() { return DatabaseDescriptor.getWriteRpcTimeout(); }
public void setWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setWriteRpcTimeout(timeoutInMillis); }
public Long getCounterWriteRpcTimeout() { return DatabaseDescriptor.getCounterWriteRpcTimeout(); }
public void setCounterWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setCounterWriteRpcTimeout(timeoutInMillis); }
public Long getCasContentionTimeout() { return DatabaseDescriptor.getCasContentionTimeout(); }
public void setCasContentionTimeout(Long timeoutInMillis) { DatabaseDescriptor.setCasContentionTimeout(timeoutInMillis); }

View File

@ -85,6 +85,8 @@ public interface StorageProxyMBean
public void setReadRpcTimeout(Long timeoutInMillis);
public Long getWriteRpcTimeout();
public void setWriteRpcTimeout(Long timeoutInMillis);
public Long getCounterWriteRpcTimeout();
public void setCounterWriteRpcTimeout(Long timeoutInMillis);
public Long getCasContentionTimeout();
public void setCasContentionTimeout(Long timeoutInMillis);
public Long getRangeRpcTimeout();

View File

@ -538,14 +538,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
CounterId.renewLocalId();
}
// Can't do this in CassandraDaemon before the SS start b/c local counter id can be renewed afterwards.
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
if (cfs.metadata.isCounter())
cfs.initCounterCache();
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
Thread drainOnShutdown = new Thread(new WrappedRunnable()
{
@Override
public void runMayThrow() throws ExecutionException, InterruptedException, IOException
{
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isShutdown())
if (mutationStage.isShutdown() && counterMutationStage.isShutdown())
return; // drained already
shutdownClientServers();
@ -555,7 +561,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// In-progress writes originating here could generate hints to be written, so shut down MessagingService
// before mutation stage, so we can get all the hints saved before shutting down
MessagingService.instance().shutdown();
counterMutationStage.shutdown();
mutationStage.shutdown();
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
@ -2102,10 +2110,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (keyspaceName.equals(Keyspace.SYSTEM_KS))
throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise");
CounterId.OneShotRenewer counterIdRenewer = new CounterId.OneShotRenewer();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
{
cfStore.forceCleanup(counterIdRenewer);
cfStore.forceCleanup();
}
}
@ -3335,8 +3342,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public synchronized void drain() throws IOException, InterruptedException, ExecutionException
{
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isTerminated())
if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
{
logger.warn("Cannot drain node (did it already happen?)");
return;
@ -3350,7 +3358,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().shutdown();
setMode(Mode.DRAINING, "clearing mutation stage", false);
counterMutationStage.shutdown();
mutationStage.shutdown();
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();

View File

@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1076,7 +1077,11 @@ public class CassandraServer implements Cassandra.Iface
if (mutations.isEmpty())
return;
schedule(DatabaseDescriptor.getWriteRpcTimeout());
long timeout = Long.MAX_VALUE;
for (IMutation m : mutations)
timeout = Longs.min(timeout, m.getTimeout());
schedule(timeout);
try
{
StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically);

View File

@ -276,6 +276,11 @@ public class NodeProbe implements AutoCloseable
ssProxy.forceKeyspaceRepairRange(beginToken, endToken, keyspaceName, isSequential, isLocal, columnFamilies);
}
public void invalidateCounterCache()
{
cacheService.invalidateCounterCache();
}
public void invalidateKeyCache()
{
cacheService.invalidateKeyCache();
@ -540,7 +545,7 @@ public class NodeProbe implements AutoCloseable
ssProxy.setIncrementalBackupsEnabled(enabled);
}
public void setCacheCapacities(int keyCacheCapacity, int rowCacheCapacity)
public void setCacheCapacities(int keyCacheCapacity, int rowCacheCapacity, int counterCacheCapacity)
{
try
{
@ -548,6 +553,7 @@ public class NodeProbe implements AutoCloseable
CacheServiceMBean cacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), CacheServiceMBean.class);
cacheMBean.setKeyCacheCapacityInMB(keyCacheCapacity);
cacheMBean.setRowCacheCapacityInMB(rowCacheCapacity);
cacheMBean.setCounterCacheCapacityInMB(counterCacheCapacity);
}
catch (MalformedObjectNameException e)
{
@ -555,7 +561,7 @@ public class NodeProbe implements AutoCloseable
}
}
public void setCacheKeysToSave(int keyCacheKeysToSave, int rowCacheKeysToSave)
public void setCacheKeysToSave(int keyCacheKeysToSave, int rowCacheKeysToSave, int counterCacheKeysToSave)
{
try
{
@ -563,6 +569,7 @@ public class NodeProbe implements AutoCloseable
CacheServiceMBean cacheMBean = JMX.newMBeanProxy(mbeanServerConn, new ObjectName(keyCachePath), CacheServiceMBean.class);
cacheMBean.setKeyCacheKeysToSave(keyCacheKeysToSave);
cacheMBean.setRowCacheKeysToSave(rowCacheKeysToSave);
cacheMBean.setCounterCacheKeysToSave(counterCacheKeysToSave);
}
catch (MalformedObjectNameException e)
{
@ -910,8 +917,8 @@ public class NodeProbe implements AutoCloseable
// JMX getters for the o.a.c.metrics API below.
/**
* Retrieve cache metrics based on the cache type (KeyCache or RowCache)
* @param cacheType KeyCache or RowCache
* Retrieve cache metrics based on the cache type (KeyCache, RowCache, or CounterCache)
* @param cacheType KeyCach, RowCache, or CounterCache
* @param metricName Capacity, Entries, HitRate, Size, Requests or Hits.
*/
public Object getCacheMetric(String cacheType, String metricName)

View File

@ -102,6 +102,7 @@ public class NodeTool
GossipInfo.class,
InvalidateKeyCache.class,
InvalidateRowCache.class,
InvalidateCounterCache.class,
Join.class,
Move.class,
PauseHandoff.class,
@ -342,6 +343,17 @@ public class NodeTool
probe.getCacheMetric("RowCache", "HitRate"),
cacheService.getRowCacheSavePeriodInSeconds());
// Counter Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
"Counter Cache",
probe.getCacheMetric("CounterCache", "Entries"),
probe.getCacheMetric("CounterCache", "Size"),
probe.getCacheMetric("CounterCache", "Capacity"),
probe.getCacheMetric("CounterCache", "Hits"),
probe.getCacheMetric("CounterCache", "Requests"),
probe.getCacheMetric("CounterCache", "HitRate"),
cacheService.getCounterCacheSavePeriodInSeconds());
// Tokens
List<String> tokens = probe.getTokens();
if (tokens.size() == 1 || this.tokens)
@ -1331,6 +1343,16 @@ public class NodeTool
}
}
@Command(name = "invalidatecountercache", description = "Invalidate the counter cache")
public static class InvalidateCounterCache extends NodeToolCmd
{
@Override
public void execute(NodeProbe probe)
{
probe.invalidateCounterCache();
}
}
@Command(name = "join", description = "Join the ring")
public static class Join extends NodeToolCmd
{
@ -1532,17 +1554,20 @@ public class NodeTool
}
}
@Command(name = "setcachecapacity", description = "Set global key and row cache capacities (in MB units)")
@Command(name = "setcachecapacity", description = "Set global key, row, and counter cache capacities (in MB units)")
public static class SetCacheCapacity extends NodeToolCmd
{
@Arguments(title = "<key-cache-capacity> <row-cache-capacity>", usage = "<key-cache-capacity> <row-cache-capacity>", description = "Key cache and row cache (in MB)", required = true)
@Arguments(title = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
usage = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
description = "Key cache, row cache, and counter cache (in MB)",
required = true)
private List<Integer> args = new ArrayList<>();
@Override
public void execute(NodeProbe probe)
{
checkArgument(args.size() == 2, "setcachecapacity requires key-cache-capacity, and row-cache-capacity args.");
probe.setCacheCapacities(args.get(0), args.get(1));
checkArgument(args.size() == 3, "setcachecapacity requires key-cache-capacity, row-cache-capacity, and counter-cache-capacity args.");
probe.setCacheCapacities(args.get(0), args.get(1), args.get(2));
}
}
@ -2083,14 +2108,17 @@ public class NodeTool
@Command(name = "setcachekeystosave", description = "Set number of keys saved by each cache for faster post-restart warmup. 0 to disable")
public static class SetCacheKeysToSave extends NodeToolCmd
{
@Arguments(title = "<key-cache-keys-to-save> <row-cache-keys-to-save>", usage = "<key-cache-keys-to-save> <row-cache-keys-to-save>", description = "The number of keys saved by each cache. 0 to disable", required = true)
@Arguments(title = "<key-cache-keys-to-save> <row-cache-keys-to-save> <counter-cache-keys-to-save>",
usage = "<key-cache-keys-to-save> <row-cache-keys-to-save> <counter-cache-keys-to-save>",
description = "The number of keys saved by each cache. 0 to disable",
required = true)
private List<Integer> args = new ArrayList<>();
@Override
public void execute(NodeProbe probe)
{
checkArgument(args.size() == 2, "setcachekeystosave requires key-cache-keys-to-save, and row-cache-keys-to-save args.");
probe.setCacheKeysToSave(args.get(0), args.get(1));
checkArgument(args.size() == 3, "setcachekeystosave requires key-cache-keys-to-save, row-cache-keys-to-save, and counter-cache-keys-to-save args.");
probe.setCacheKeysToSave(args.get(0), args.get(1), args.get(2));
}
}

View File

@ -18,15 +18,11 @@
package org.apache.cassandra.utils;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import org.apache.cassandra.db.CounterCell;
import org.apache.cassandra.db.SystemKeyspace;
public class CounterId implements Comparable<CounterId>
@ -37,21 +33,21 @@ public class CounterId implements Comparable<CounterId>
// Lazy holder because this opens the system keyspace and we want to avoid
// having this triggered during class initialization
private static class LocalIds
private static class LocalId
{
static final LocalCounterIdHistory instance = new LocalCounterIdHistory();
static final LocalCounterIdHolder instance = new LocalCounterIdHolder();
}
private final ByteBuffer id;
private static LocalCounterIdHistory localIds()
private static LocalCounterIdHolder localId()
{
return LocalIds.instance;
return LocalId.instance;
}
public static CounterId getLocalId()
{
return localIds().current.get();
return localId().get();
}
/**
@ -59,24 +55,9 @@ public class CounterId implements Comparable<CounterId>
* To use only when this strictly necessary, as using this will make all
* counter context grow with time.
*/
public static void renewLocalId()
public static synchronized void renewLocalId()
{
renewLocalId(FBUtilities.timestampMicros());
}
public static synchronized void renewLocalId(long now)
{
localIds().renewCurrent(now);
}
/**
* Return the list of old local counter id of this node.
* It is guaranteed that the returned list is sorted by growing counter id
* (and hence the first item will be the oldest counter id for this host)
*/
public static List<CounterIdRecord> getOldLocalCounterIds()
{
return localIds().olds;
localId().renew(FBUtilities.timestampMicros());
}
/**
@ -163,94 +144,39 @@ public class CounterId implements Comparable<CounterId>
return id.hashCode();
}
public static class OneShotRenewer
{
private boolean renewed;
private final CounterId initialId;
public OneShotRenewer()
{
renewed = false;
initialId = getLocalId();
}
public void maybeRenew(CounterCell column)
{
if (!renewed && column.hasCounterId(initialId))
{
renewLocalId();
renewed = true;
}
}
}
private static class LocalCounterIdHistory
private static class LocalCounterIdHolder
{
private final AtomicReference<CounterId> current;
private final List<CounterIdRecord> olds;
LocalCounterIdHistory()
LocalCounterIdHolder()
{
CounterId id = SystemKeyspace.getCurrentLocalCounterId();
if (id == null)
{
// no recorded local counter id, generating a new one and saving it
id = generate();
logger.info("No saved local counter id, using newly generated: {}", id);
SystemKeyspace.writeCurrentLocalCounterId(id, FBUtilities.timestampMicros());
current = new AtomicReference<>(id);
olds = new CopyOnWriteArrayList<>();
}
else
{
logger.info("Saved local counter id: {}", id);
current = new AtomicReference<>(id);
olds = new CopyOnWriteArrayList<>(SystemKeyspace.getOldLocalCounterIds());
}
logger.info("Using saved local counter id: {}", id);
}
synchronized void renewCurrent(long now)
current = new AtomicReference<>(id);
}
synchronized void renew(long now)
{
CounterId newCounterId = generate();
CounterId old = current.get();
SystemKeyspace.writeCurrentLocalCounterId(newCounterId, now);
current.set(newCounterId);
olds.add(new CounterIdRecord(old, now));
}
}
public static class CounterIdRecord
CounterId get()
{
public final CounterId id;
public final long timestamp;
public CounterIdRecord(CounterId id, long timestamp)
{
this.id = id;
this.timestamp = timestamp;
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
CounterIdRecord otherRecord = (CounterIdRecord)o;
return id.equals(otherRecord.id) && timestamp == otherRecord.timestamp;
}
@Override
public int hashCode()
{
return Objects.hashCode(id, timestamp);
}
public String toString()
{
return String.format("(%s, %d)", id.toString(), timestamp);
return current.get();
}
}
}

View File

@ -102,6 +102,7 @@ public class SchemaLoader
String ks6 = "Keyspace6";
String ks_kcs = "KeyCacheSpace";
String ks_rcs = "RowCacheSpace";
String ks_ccs = "CounterCacheSpace";
String ks_nocommit = "NoCommitlogSpace";
String ks_prsi = "PerRowSecondaryIndex";
String ks_cql = "cql_keyspace";
@ -234,6 +235,13 @@ public class SchemaLoader
standardCFMD(ks_rcs, "CFWithoutCache").caching(CFMetaData.Caching.NONE),
standardCFMD(ks_rcs, "CachedCF").caching(CFMetaData.Caching.ALL)));
// CounterCacheSpace
schema.add(KSMetaData.testMetadata(ks_ccs,
simple,
opts_rf1,
standardCFMD(ks_ccs, "Counter1").defaultValidator(CounterColumnType.instance),
standardCFMD(ks_ccs, "Counter2").defaultValidator(CounterColumnType.instance)));
schema.add(KSMetaData.testMetadataNotDurable(ks_nocommit,
simple,
opts_rf1,

View File

@ -188,13 +188,13 @@ public class Util
* @param mutations A group of Mutations for the same keyspace and column family.
* @return The ColumnFamilyStore that was used.
*/
public static ColumnFamilyStore writeColumnFamily(List<IMutation> mutations) throws IOException, ExecutionException, InterruptedException
public static ColumnFamilyStore writeColumnFamily(List<Mutation> mutations) throws IOException, ExecutionException, InterruptedException
{
IMutation first = mutations.get(0);
String keyspaceName = first.getKeyspaceName();
UUID cfid = first.getColumnFamilyIds().iterator().next();
for (IMutation rm : mutations)
for (Mutation rm : mutations)
rm.apply();
ColumnFamilyStore store = Keyspace.open(keyspaceName).getColumnFamilyStore(cfid);

View File

@ -64,7 +64,6 @@ public class DefsTest extends SchemaLoader
cfm.comment("No comment")
.readRepairChance(0.5)
.replicateOnWrite(false)
.gcGraceSeconds(100000)
.minCompactionThreshold(500)
.maxCompactionThreshold(500);

View File

@ -42,7 +42,6 @@ import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.junit.Test;
public class CleanupTest extends SchemaLoader
@ -79,7 +78,7 @@ public class CleanupTest extends SchemaLoader
assertEquals(LOOPS, rows.size());
// with one token in the ring, owned by the local node, cleanup should be a no-op
CompactionManager.instance.performCleanup(cfs, new CounterId.OneShotRenewer());
CompactionManager.instance.performCleanup(cfs);
// ensure max timestamp of the sstables are retained post-cleanup
assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
@ -125,7 +124,7 @@ public class CleanupTest extends SchemaLoader
tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
CompactionManager.instance.performCleanup(cfs, new CounterId.OneShotRenewer());
CompactionManager.instance.performCleanup(cfs);
// row data should be gone
rows = Util.getRangeSlice(cfs);

View File

@ -106,9 +106,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
cfs.truncateBlocking();
List<IMutation> rms = new LinkedList<IMutation>();
Mutation rm;
rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key1"));
List<Mutation> rms = new LinkedList<>();
Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key1"));
rm.add("Standard1", cellname("Column1"), ByteBufferUtil.bytes("asdf"), 0);
rm.add("Standard1", cellname("Column2"), ByteBufferUtil.bytes("asdf"), 0);
rms.add(rm);
@ -850,7 +849,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
private ColumnFamilyStore insertKey1Key2() throws IOException, ExecutionException, InterruptedException
{
ColumnFamilyStore cfs = Keyspace.open("Keyspace2").getColumnFamilyStore("Standard1");
List<IMutation> rms = new LinkedList<IMutation>();
List<Mutation> rms = new LinkedList<>();
Mutation rm;
rm = new Mutation("Keyspace2", ByteBufferUtil.bytes("key1"));
rm.add("Standard1", cellname("Column1"), ByteBufferUtil.bytes("asdf"), 0);

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.util.concurrent.ExecutionException;
import org.junit.AfterClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.apache.cassandra.Util.cellname;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class CounterCacheTest extends SchemaLoader
{
private static final String KS = "CounterCacheSpace";
private static final String CF = "Counter1";
@AfterClass
public static void cleanup()
{
cleanupSavedCaches();
}
@Test
public void testReadWrite()
{
ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
CacheService.instance.invalidateCounterCache();
assertEquals(0, CacheService.instance.counterCache.size());
assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
assertEquals(4, CacheService.instance.counterCache.size());
assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
}
@Test
public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
{
ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
CacheService.instance.invalidateCounterCache();
ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addColumn(new CounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
cells.addColumn(new CounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
// flush the counter cache and invalidate
CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
CacheService.instance.invalidateCounterCache();
assertEquals(0, CacheService.instance.counterCache.size());
// load from cache and validate
CacheService.instance.counterCache.loadSaved(cfs);
assertEquals(4, CacheService.instance.counterCache.size());
assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
}
}

View File

@ -65,15 +65,17 @@ public class CounterCellTest extends SchemaLoader
public void testCreate() throws UnknownHostException
{
long delta = 3L;
CounterUpdateCell cuc = new CounterUpdateCell(cellname("x"), delta, 1L);
CounterCell cell = cuc.localCopy(Keyspace.open("Keyspace5").getColumnFamilyStore("Counter1"));
CounterCell cell = new CounterCell(Util.cellname("x"),
CounterContext.instance().createLocal(delta, HeapAllocator.instance),
1L,
Long.MIN_VALUE);
assert delta == cell.total();
assert 1 == cell.value().getShort(0);
assert 0 == cell.value().getShort(2);
assert CounterId.wrap(cell.value(), 4).isLocalId();
assert 1L == cell.value().getLong(4 + idLength);
assert delta == cell.value().getLong(4 + idLength + clockLength);
Assert.assertEquals(delta, cell.total());
Assert.assertEquals(1, cell.value().getShort(0));
Assert.assertEquals(0, cell.value().getShort(2));
Assert.assertTrue(CounterId.wrap(cell.value(), 4).isLocalId());
Assert.assertEquals(1L, cell.value().getLong(4 + idLength));
Assert.assertEquals(delta, cell.value().getLong(4 + idLength + clockLength));
}
@Test
@ -96,25 +98,25 @@ public class CounterCellTest extends SchemaLoader
// tombstone > live
left = new DeletedCell(cellname("x"), 1, 2L);
right = new CounterCell(cellname("x"), 0L, 1L);
right = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
assert left.reconcile(right) == left;
// tombstone < live last delete
left = new DeletedCell(cellname("x"), 1, 1L);
right = new CounterCell(cellname("x"), 0L, 4L, 2L);
right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
assert left.reconcile(right) == right;
// tombstone == live last delete
left = new DeletedCell(cellname("x"), 1, 2L);
right = new CounterCell(cellname("x"), 0L, 4L, 2L);
right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
assert left.reconcile(right) == right;
// tombstone > live last delete
left = new DeletedCell(cellname("x"), 1, 4L);
right = new CounterCell(cellname("x"), 0L, 9L, 1L);
right = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
reconciled = left.reconcile(right);
assert reconciled.name() == right.name();
@ -123,25 +125,25 @@ public class CounterCellTest extends SchemaLoader
assert ((CounterCell)reconciled).timestampOfLastDelete() == left.getMarkedForDeleteAt();
// live < tombstone
left = new CounterCell(cellname("x"), 0L, 1L);
left = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
right = new DeletedCell(cellname("x"), 1, 2L);
assert left.reconcile(right) == right;
// live last delete > tombstone
left = new CounterCell(cellname("x"), 0L, 4L, 2L);
left = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
right = new DeletedCell(cellname("x"), 1, 1L);
assert left.reconcile(right) == left;
// live last delete == tombstone
left = new CounterCell(cellname("x"), 0L, 4L, 2L);
left = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
right = new DeletedCell(cellname("x"), 1, 2L);
assert left.reconcile(right) == left;
// live last delete < tombstone
left = new CounterCell(cellname("x"), 0L, 9L, 1L);
left = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
right = new DeletedCell(cellname("x"), 1, 4L);
reconciled = left.reconcile(right);
@ -213,15 +215,15 @@ public class CounterCellTest extends SchemaLoader
CounterCell rightCell;
// timestamp
leftCell = new CounterCell(cellname("x"), 0, 1L);
rightCell = new CounterCell(cellname("x"), 0, 2L);
leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, Long.MIN_VALUE);
rightCell = CounterCell.createLocal(cellname("x"), 0, 2L, Long.MIN_VALUE);
assert rightCell == leftCell.diff(rightCell);
assert null == rightCell.diff(leftCell);
// timestampOfLastDelete
leftCell = new CounterCell(cellname("x"), 0, 1L, 1L);
rightCell = new CounterCell(cellname("x"), 0, 1L, 2L);
leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, 1L);
rightCell = CounterCell.createLocal(cellname("x"), 0, 1L, 2L);
assert rightCell == leftCell.diff(rightCell);
assert null == rightCell.diff(leftCell);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,29 +17,179 @@
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.util.List;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.apache.cassandra.Util.cellname;
import static org.apache.cassandra.Util.dk;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class CounterMutationTest extends SchemaLoader
{
private static final String KS = "CounterCacheSpace";
private static final String CF1 = "Counter1";
private static final String CF2 = "Counter2";
@Test
public void testGetOldShardFromSystemKeyspace() throws IOException
public void testSingleCell() throws WriteTimeoutException
{
// Renewing a bunch of times and checking we get the same thing from
// the system keyspace that what is in memory
CounterId.renewLocalId();
CounterId.renewLocalId();
CounterId.renewLocalId();
ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
cfs.truncateBlocking();
List<CounterId.CounterIdRecord> inMem = CounterId.getOldLocalCounterIds();
List<CounterId.CounterIdRecord> onDisk = SystemKeyspace.getOldLocalCounterIds();
// Do the initial update (+1)
ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
assert inMem.equals(onDisk);
// Make another increment (+2)
cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 2L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
// Decrement to 0 (-3)
cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), -3L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(0L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
assertEquals(ClockAndCount.create(3L, 0L), cfs.getCachedCounter(bytes(1), cellname(1)));
}
@Test
public void testTwoCells() throws WriteTimeoutException
{
ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
cfs.truncateBlocking();
// Do the initial update (+1, -1)
ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
cells.addCounter(cellname(2), -1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
assertEquals(-1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
// Make another increment (+2, -2)
cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 2L);
cells.addCounter(cellname(2), -2L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(3L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
// Decrement to 0 (-3, +3)
cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), -3L);
cells.addCounter(cellname(2), 3L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(0L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
assertEquals(0L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
// Check the caches, separately
assertEquals(ClockAndCount.create(3L, 0L), cfs.getCachedCounter(bytes(1), cellname(1)));
assertEquals(ClockAndCount.create(3L, 0L), cfs.getCachedCounter(bytes(1), cellname(2)));
}
@Test
public void testBatch() throws WriteTimeoutException
{
ColumnFamilyStore cfs1 = Keyspace.open(KS).getColumnFamilyStore(CF1);
ColumnFamilyStore cfs2 = Keyspace.open(KS).getColumnFamilyStore(CF2);
cfs1.truncateBlocking();
cfs2.truncateBlocking();
// Do the update (+1, -1), (+2, -2)
ColumnFamily cells1 = UnsortedColumns.factory.create(cfs1.metadata);
cells1.addCounter(cellname(1), 1L);
cells1.addCounter(cellname(2), -1L);
ColumnFamily cells2 = UnsortedColumns.factory.create(cfs2.metadata);
cells2.addCounter(cellname(1), 2L);
cells2.addCounter(cellname(2), -2L);
Mutation mutation = new Mutation(KS, bytes(1));
mutation.add(cells1);
mutation.add(cells2);
new CounterMutation(mutation, ConsistencyLevel.ONE).apply();
// Validate all values
ColumnFamily current1 = cfs1.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
ColumnFamily current2 = cfs2.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF2, System.currentTimeMillis()));
assertEquals(1L, CounterContext.instance().total(current1.getColumn(cellname(1)).value()));
assertEquals(-1L, CounterContext.instance().total(current1.getColumn(cellname(2)).value()));
assertEquals(2L, CounterContext.instance().total(current2.getColumn(cellname(1)).value()));
assertEquals(-2L, CounterContext.instance().total(current2.getColumn(cellname(2)).value()));
// Check the caches, separately
assertEquals(ClockAndCount.create(1L, 1L), cfs1.getCachedCounter(bytes(1), cellname(1)));
assertEquals(ClockAndCount.create(1L, -1L), cfs1.getCachedCounter(bytes(1), cellname(2)));
assertEquals(ClockAndCount.create(1L, 2L), cfs2.getCachedCounter(bytes(1), cellname(1)));
assertEquals(ClockAndCount.create(1L, -2L), cfs2.getCachedCounter(bytes(1), cellname(2)));
}
@Test
public void testDeletes() throws WriteTimeoutException
{
ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF1);
cfs.truncateBlocking();
// Do the initial update (+1, -1)
ColumnFamily cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
cells.addCounter(cellname(2), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
ColumnFamily current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(1)).value()));
assertEquals(1L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
// Remove the first counter, increment the second counter
cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addTombstone(cellname(1), (int) System.currentTimeMillis() / 1000, FBUtilities.timestampMicros());
cells.addCounter(cellname(2), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertNull(current.getColumn(cellname(1)));
assertEquals(2L, CounterContext.instance().total(current.getColumn(cellname(2)).value()));
// Increment the first counter, make sure it's still shadowed by the tombstone
cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertNull(current.getColumn(cellname(1)));
// Get rid of the complete partition
Mutation mutation = new Mutation(KS, bytes(1));
mutation.delete(CF1, FBUtilities.timestampMicros());
new CounterMutation(mutation, ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertNull(current.getColumn(cellname(1)));
assertNull(current.getColumn(cellname(2)));
// Increment both counters, ensure that both stay dead
cells = UnsortedColumns.factory.create(cfs.metadata);
cells.addCounter(cellname(1), 1L);
cells.addCounter(cellname(2), 1L);
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
current = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(bytes(1)), CF1, System.currentTimeMillis()));
assertNull(current.getColumn(cellname(1)));
assertNull(current.getColumn(cellname(2)));
}
}

View File

@ -84,7 +84,7 @@ public class RecoveryManagerTest extends SchemaLoader
for (int i = 0; i < 10; ++i)
{
cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Counter1");
cf.addColumn(new CounterCell(cellname("col"), 1L, 1L));
cf.addColumn(CounterCell.createLocal(cellname("col"), 1L, 1L, Long.MIN_VALUE));
rm = new Mutation("Keyspace1", dk.key, cf);
rm.apply();
}

View File

@ -26,7 +26,8 @@ import java.nio.ByteBuffer;
import org.junit.Test;
import org.apache.cassandra.db.context.IContext.ContextRelationship;
import org.apache.cassandra.db.ClockAndCount;
import org.apache.cassandra.db.context.CounterContext.Relationship;
import org.apache.cassandra.Util;
import org.apache.cassandra.utils.*;
@ -92,7 +93,7 @@ public class CounterContextTest
left.writeRemote(CounterId.fromInt(9), 1L, 0L);
right = ContextState.wrap(ByteBufferUtil.clone(left.context));
assertEquals(ContextRelationship.EQUAL, cc.diff(left.context, right.context));
assertEquals(Relationship.EQUAL, cc.diff(left.context, right.context));
// greater than: left has superset of nodes (counts equal)
left = ContextState.allocate(0, 0, 4, allocator);
@ -106,7 +107,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 2L, 0L);
right.writeRemote(CounterId.fromInt(9), 1L, 0L);
assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, right.context));
assertEquals(Relationship.GREATER_THAN, cc.diff(left.context, right.context));
// less than: left has subset of nodes (counts equal)
left = ContextState.allocate(0, 0, 3, allocator);
@ -120,7 +121,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(9), 1L, 0L);
right.writeRemote(CounterId.fromInt(12), 0L, 0L);
assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, right.context));
assertEquals(Relationship.LESS_THAN, cc.diff(left.context, right.context));
// greater than: equal nodes, but left has higher counts
left = ContextState.allocate(0, 0, 3, allocator);
@ -133,7 +134,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 2L, 0L);
right.writeRemote(CounterId.fromInt(9), 1L, 0L);
assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, right.context));
assertEquals(Relationship.GREATER_THAN, cc.diff(left.context, right.context));
// less than: equal nodes, but right has higher counts
left = ContextState.allocate(0, 0, 3, allocator);
@ -146,7 +147,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 9L, 0L);
right.writeRemote(CounterId.fromInt(9), 3L, 0L);
assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, right.context));
assertEquals(Relationship.LESS_THAN, cc.diff(left.context, right.context));
// disjoint: right and left have disjoint node sets
left = ContextState.allocate(0, 0, 3, allocator);
@ -159,7 +160,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 1L, 0L);
right.writeRemote(CounterId.fromInt(9), 1L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
left = ContextState.allocate(0, 0, 3, allocator);
left.writeRemote(CounterId.fromInt(3), 1L, 0L);
@ -171,7 +172,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 1L, 0L);
right.writeRemote(CounterId.fromInt(12), 1L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
// disjoint: equal nodes, but right and left have higher counts in differing nodes
left = ContextState.allocate(0, 0, 3, allocator);
@ -184,7 +185,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 1L, 0L);
right.writeRemote(CounterId.fromInt(9), 5L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
left = ContextState.allocate(0, 0, 3, allocator);
left.writeRemote(CounterId.fromInt(3), 2L, 0L);
@ -196,7 +197,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 9L, 0L);
right.writeRemote(CounterId.fromInt(9), 5L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
// disjoint: left has more nodes, but lower counts
left = ContextState.allocate(0, 0, 4, allocator);
@ -210,7 +211,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 9L, 0L);
right.writeRemote(CounterId.fromInt(9), 5L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
// disjoint: left has less nodes, but higher counts
left = ContextState.allocate(0, 0, 3, allocator);
@ -224,7 +225,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(9), 2L, 0L);
right.writeRemote(CounterId.fromInt(12), 1L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
// disjoint: mixed nodes and counts
left = ContextState.allocate(0, 0, 3, allocator);
@ -238,7 +239,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(9), 2L, 0L);
right.writeRemote(CounterId.fromInt(12), 1L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
left = ContextState.allocate(0, 0, 4, allocator);
left.writeRemote(CounterId.fromInt(3), 5L, 0L);
@ -251,7 +252,7 @@ public class CounterContextTest
right.writeRemote(CounterId.fromInt(6), 3L, 0L);
right.writeRemote(CounterId.fromInt(9), 2L, 0L);
assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context));
assertEquals(Relationship.DISJOINT, cc.diff(left.context, right.context));
}
@Test
@ -499,4 +500,67 @@ public class CounterContextTest
cleared = cc.clearAllLocal(marked);
assertSame(cleared, marked);
}
@Test
public void testFindPositionOf()
{
ContextState state = ContextState.allocate(3, 3, 3, HeapAllocator.instance);
state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
state.writeRemote(CounterId.fromInt(2), 2L, 2L);
state.writeLocal( CounterId.fromInt(3), 3L, 3L);
state.writeGlobal(CounterId.fromInt(4), 4L, 4L);
state.writeRemote(CounterId.fromInt(5), 5L, 5L);
state.writeLocal( CounterId.fromInt(6), 6L, 6L);
state.writeGlobal(CounterId.fromInt(7), 7L, 7L);
state.writeRemote(CounterId.fromInt(8), 8L, 8L);
state.writeLocal(CounterId.fromInt(9), 9L, 9L);
int headerLength = headerSizeLength + 6 * headerEltLength;
assertEquals(headerLength, cc.findPositionOf(state.context, CounterId.fromInt(1)));
assertEquals(headerLength + stepLength, cc.findPositionOf(state.context, CounterId.fromInt(2)));
assertEquals(headerLength + 2 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(3)));
assertEquals(headerLength + 3 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(4)));
assertEquals(headerLength + 4 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(5)));
assertEquals(headerLength + 5 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(6)));
assertEquals(headerLength + 6 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(7)));
assertEquals(headerLength + 7 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(8)));
assertEquals(headerLength + 8 * stepLength, cc.findPositionOf(state.context, CounterId.fromInt(9)));
assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(0)));
assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(10)));
assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(15)));
assertEquals(-1, cc.findPositionOf(state.context, CounterId.fromInt(20)));
}
@Test
public void testGetGlockAndCountOf()
{
ContextState state = ContextState.allocate(3, 3, 3, HeapAllocator.instance);
state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
state.writeRemote(CounterId.fromInt(2), 2L, 2L);
state.writeLocal( CounterId.fromInt(3), 3L, 3L);
state.writeGlobal(CounterId.fromInt(4), 4L, 4L);
state.writeRemote(CounterId.fromInt(5), 5L, 5L);
state.writeLocal( CounterId.fromInt(6), 6L, 6L);
state.writeGlobal(CounterId.fromInt(7), 7L, 7L);
state.writeRemote(CounterId.fromInt(8), 8L, 8L);
state.writeLocal(CounterId.fromInt(9), 9L, 9L);
assertEquals(ClockAndCount.create(1L, 1L), cc.getClockAndCountOf(state.context, CounterId.fromInt(1)));
assertEquals(ClockAndCount.create(2L, 2L), cc.getClockAndCountOf(state.context, CounterId.fromInt(2)));
assertEquals(ClockAndCount.create(3L, 3L), cc.getClockAndCountOf(state.context, CounterId.fromInt(3)));
assertEquals(ClockAndCount.create(4L, 4L), cc.getClockAndCountOf(state.context, CounterId.fromInt(4)));
assertEquals(ClockAndCount.create(5L, 5L), cc.getClockAndCountOf(state.context, CounterId.fromInt(5)));
assertEquals(ClockAndCount.create(6L, 6L), cc.getClockAndCountOf(state.context, CounterId.fromInt(6)));
assertEquals(ClockAndCount.create(7L, 7L), cc.getClockAndCountOf(state.context, CounterId.fromInt(7)));
assertEquals(ClockAndCount.create(8L, 8L), cc.getClockAndCountOf(state.context, CounterId.fromInt(8)));
assertEquals(ClockAndCount.create(9L, 9L), cc.getClockAndCountOf(state.context, CounterId.fromInt(9)));
assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(0)));
assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(10)));
assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(15)));
assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(20)));
}
}

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotNull;
import static org.apache.cassandra.io.sstable.SSTableUtils.tempSSTableFile;
import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileReader;
@ -216,7 +215,7 @@ public class SSTableExportTest extends SchemaLoader
SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
// Add rowA
cfamily.addColumn(new CounterCell(Util.cellname("colA"), 42, System.currentTimeMillis()));
cfamily.addColumn(CounterCell.createLocal(Util.cellname("colA"), 42, System.currentTimeMillis(), Long.MIN_VALUE));
writer.append(Util.dk("rowA"), cfamily);
cfamily.clear();

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils;
import java.io.IOException;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.SystemKeyspace;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CounterIdTest extends SchemaLoader
{
@Test
public void testGetCurrentIdFromSystemKeyspace() throws IOException
{
// Renewing a bunch of times and checking we get the same thing from
// the system keyspace that what is in memory
CounterId id0 = CounterId.getLocalId();
assertEquals(id0, SystemKeyspace.getCurrentLocalCounterId());
CounterId.renewLocalId();
CounterId id1 = CounterId.getLocalId();
assertEquals(id1, SystemKeyspace.getCurrentLocalCounterId());
assertTrue(id1.compareTo(id0) == 1);
CounterId.renewLocalId();
CounterId id2 = CounterId.getLocalId();
assertEquals(id2, SystemKeyspace.getCurrentLocalCounterId());
assertTrue(id2.compareTo(id1) == 1);
}
}