Improve transaction log under FS corruption

To address concerns with recovery from file system
failures, we consolidate the new and old log
files into a single file, introduce checksums to the
file, and record on-disk sstable state to permit
validation of the log file on restart. This permits
us to be pessimistic in the case of any failure,
leaving all affected files on disk.

patch by stefania; reviewed by benedict for CASSANDRA-7066
This commit is contained in:
Stefania Alborghetti 2015-07-29 12:01:01 +08:00 committed by Benedict Elliott Smith
parent 47b66d7c1e
commit 5726625a5c
49 changed files with 2331 additions and 1829 deletions

View File

@ -38,6 +38,24 @@ Upgrading
- The default JVM flag -XX:+PerfDisableSharedMem will cause the following tools JVM
to stop working: jps, jstack, jinfo, jmc, jcmd as well as 3rd party tools like Jolokia.
If you wish to use these tools you can comment this flag out in cassandra-env.{sh,ps1}
- New transaction log files have been introduced to replace the compactions_in_progress
system table, temporary file markers (tmp and tmplink) and sstable ancerstors.
Therefore, compaction metadata no longer contains ancestors. Transaction log files
list sstable descriptors involved in compactions and other operations such as flushing
and streaming. Use the sstableutil tool to list any sstable files currently involved
in operations not yet completed, which previously would have been marked as temporary.
A transaction log file contains one sstable per line, with the prefix "add:" or "remove:".
They also contain a special line "commit", only inserted at the end when the transaction
is committed. On startup we use these files to cleanup any partial transactions that were
in progress when the process exited. If the commit line is found, we keep new sstables
(those with the "add" prefix) and delete the old sstables (those with the "remove" prefix),
vice-versa if the commit line is missing. Should you lose or delete these log files,
both old and new sstable files will be kept as live files, which will result in duplicated
sstables. These files are protected by incremental checksums so you should not manually
edit them. When restoring a full backup or moving sstable files, you should clean-up
any left over transactions and their temporary files first. You can use this command:
===> sstableutil -c ks table
See CASSANDRA-7066 for full details.
- New write stages have been added for batchlog and materialized view mutations
you can set their size in cassandra.yaml
- User defined functions are now executed in a sandbox.

View File

@ -50,6 +50,6 @@ fi
"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
-Dcassandra.storagedir="$cassandra_storagedir" \
-Dlogback.configurationFile=logback-tools.xml \
org.apache.cassandra.tools.StandaloneLister "$@"
org.apache.cassandra.tools.StandaloneSSTableUtil "$@"
# vi:ai sw=4 ts=4 tw=0 et

View File

@ -20,7 +20,7 @@ if "%OS%" == "Windows_NT" setlocal
pushd "%~dp0"
call cassandra.in.bat
if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneLister
if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneSSTableUtil
if NOT DEFINED JAVA_HOME goto :err
REM ***** JAVA options *****

View File

@ -353,7 +353,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (data.loadsstables)
{
Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata);
data.addInitialSSTables(sstables);
}
@ -459,7 +459,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
SystemKeyspace.removeTruncationRecord(metadata.cfId);
data.dropSSTables();
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
indexManager.invalidate();
materializedViewManager.invalidate();
@ -503,7 +503,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
// get the max generation number, to prevent generation conflicts
Directories directories = new Directories(metadata);
Directories.SSTableLister lister = directories.sstableLister().includeBackups(true);
Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
List<Integer> generations = new ArrayList<Integer>();
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
{
@ -534,7 +534,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
LifecycleTransaction.removeUnfinishedLeftovers(metadata);
logger.debug("Further extra check for orphan sstable files for {}", metadata.cfName);
for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
Set<Component> components = sstableFiles.getValue();
@ -642,7 +642,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
currentDescriptors.add(sstable.descriptor);
Set<SSTableReader> newSSTables = new HashSet<>();
Directories.SSTableLister lister = directories.sstableLister().skipTemporary(true);
Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
{
Descriptor descriptor = entry.getKey();
@ -1644,7 +1644,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Map<Integer, SSTableReader> active = new HashMap<>();
for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
active.put(sstable.descriptor.generation, sstable);
Map<Descriptor, Set<Component>> snapshots = directories.sstableLister().snapshots(tag).list();
Map<Descriptor, Set<Component>> snapshots = directories.sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
Refs<SSTableReader> refs = new Refs<>();
try
{

View File

@ -31,6 +31,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
@ -92,7 +93,6 @@ public class Directories
public static final String BACKUPS_SUBDIR = "backups";
public static final String SNAPSHOT_SUBDIR = "snapshots";
public static final String TRANSACTIONS_SUBDIR = "transactions";
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
public static final DataDirectory[] dataDirectories;
@ -142,7 +142,7 @@ public class Directories
{
X, W, XW, R, XR, RW, XRW;
private FileAction()
FileAction()
{
}
@ -468,40 +468,6 @@ public class Directories
}
}
public static File getTransactionsDirectory(File folder)
{
return getOrCreate(folder, TRANSACTIONS_SUBDIR);
}
public List<File> getExistingDirectories(String subFolder)
{
List<File> ret = new ArrayList<>();
for (File dir : dataPaths)
{
File subDir = getExistingDirectory(dir, subFolder);
if (subDir != null)
ret.add(subDir);
}
return ret;
}
public static File getExistingDirectory(File folder, String subFolder)
{
File subDir = new File(folder, join(subFolder));
if (subDir.exists())
{
assert(subDir.isDirectory());
return subDir;
}
return null;
}
public SSTableLister sstableLister()
{
return new SSTableLister();
}
public static class DataDirectory
{
public final File location;
@ -549,10 +515,42 @@ public class Directories
}
}
/** The type of files that can be listed by SSTableLister, we never return txn logs,
* use LifecycleTransaction.getFiles() if you need txn logs. */
public enum FileType
{
/** A permanent sstable file that is safe to use. */
FINAL,
/** A temporary sstable file that will soon be deleted. */
TEMPORARY,
/** A transaction log file (contains information on final and temporary files). */
TXN_LOG
}
/**
* How to handle a failure to read a txn log file. Note that we will try a few
* times before giving up.
**/
public enum OnTxnErr
{
/** Throw the exception */
THROW,
/** Ignore the txn log file */
IGNORE
}
public SSTableLister sstableLister(OnTxnErr onTxnErr)
{
return new SSTableLister(onTxnErr);
}
public class SSTableLister
{
private final OnTxnErr onTxnErr;
private boolean skipTemporary;
private boolean onlyTemporary;
private boolean includeBackups;
private boolean onlyBackups;
private int nbFiles;
@ -560,6 +558,11 @@ public class Directories
private boolean filtered;
private String snapshotName;
private SSTableLister(OnTxnErr onTxnErr)
{
this.onTxnErr = onTxnErr;
}
public SSTableLister skipTemporary(boolean b)
{
if (filtered)
@ -568,14 +571,6 @@ public class Directories
return this;
}
public SSTableLister onlyTemporary(boolean b)
{
if (filtered)
throw new IllegalStateException("list() has already been called");
onlyTemporary = b;
return this;
}
public SSTableLister includeBackups(boolean b)
{
if (filtered)
@ -633,56 +628,54 @@ public class Directories
if (snapshotName != null)
{
getSnapshotDirectory(location, snapshotName).listFiles(getFilter(location));
LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(), onTxnErr);
continue;
}
if (!onlyBackups)
location.listFiles(getFilter(location));
LifecycleTransaction.getFiles(location.toPath(), getFilter(), onTxnErr);
if (includeBackups)
getBackupsDirectory(location).listFiles(getFilter(location));
LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(), onTxnErr);
}
filtered = true;
}
private FileFilter getFilter(File location)
private BiFunction<File, FileType, Boolean> getFilter()
{
final Set<File> temporaryFiles = skipTemporary || onlyTemporary
? LifecycleTransaction.getTemporaryFiles(metadata, location)
: Collections.<File>emptySet();
return new FileFilter()
// This function always return false since it adds to the components map
return (file, type) ->
{
// This function always return false since accepts adds to the components map
public boolean accept(File file)
switch (type)
{
if (file.isDirectory())
case TXN_LOG:
return false;
case TEMPORARY:
if (skipTemporary)
return false;
case FINAL:
Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName());
if (pair == null)
return false;
// we are only interested in the SSTable files that belong to the specific ColumnFamily
if (!pair.left.ksname.equals(metadata.ksName) || !pair.left.cfname.equals(metadata.cfName))
return false;
Set<Component> previous = components.get(pair.left);
if (previous == null)
{
previous = new HashSet<>();
components.put(pair.left, previous);
}
previous.add(pair.right);
nbFiles++;
return false;
Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName());
if (pair == null)
return false;
// we are only interested in the SSTable files that belong to the specific ColumnFamily
if (!pair.left.ksname.equals(metadata.ksName) || !pair.left.cfname.equals(metadata.cfName))
return false;
if (skipTemporary && temporaryFiles.contains(file))
return false;
if (onlyTemporary && !temporaryFiles.contains(file))
return false;
Set<Component> previous = components.get(pair.left);
if (previous == null)
{
previous = new HashSet<>();
components.put(pair.left, previous);
}
previous.add(pair.right);
nbFiles++;
return false;
default:
throw new AssertionError();
}
};
}
@ -917,7 +910,7 @@ public class Directories
{
super();
Builder<String> builder = ImmutableSet.builder();
for (File file : sstableLister().listFiles())
for (File file : sstableLister(Directories.OnTxnErr.THROW).listFiles())
builder.add(file.getName());
alive = builder.build();
}

View File

@ -58,7 +58,8 @@ public interface CompactionManagerMBean
/**
* Stop an individual running compaction using the compactionId.
* @param compactionId Compaction ID of compaction to stop. Such IDs can be found in
* the compactions_in_progress table of the system keyspace.
* the transaction log files whose name starts with compaction_,
* located in the table transactions folder.
*/
public void stopCompactionById(String compactionId);

View File

@ -69,15 +69,6 @@ public class Upgrader
private SSTableWriter createCompactionWriter(long repairedAt)
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
// Get the max timestamp of the precompacted sstables
// and adds generation of live ancestors
sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
for (Integer i : sstable.getAncestors())
{
if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
sstableMetadataCollector.addAncestor(i);
}
sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(directory)),
estimatedRows,

View File

@ -45,7 +45,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
private long partitionsWritten = 0;
private long totalWrittenInLevel = 0;
private int sstablesWritten = 0;
private final boolean skipAncestors;
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
LifecycleTransaction txn,
@ -70,17 +69,13 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
skipAncestors = estimatedSSTables * allSSTables.size() > 200000; // magic number, avoid storing too much ancestor information since allSSTables are ancestors to *all* resulting sstables
if (skipAncestors)
logger.warn("Many sstables involved in compaction, skipping storing ancestor information to avoid running out of memory");
@SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
keysPerSSTable,
minRepairedAt,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
sstableWriter.switchWriter(writer);
@ -108,7 +103,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
averageEstimatedKeysPerSSTable,
minRepairedAt,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
sstableWriter.switchWriter(writer);

View File

@ -106,12 +106,12 @@ class Helpers
assert !reader.isReplaced();
}
static Throwable markObsolete(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
static Throwable markObsolete(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
{
if (obsoletions == null || obsoletions.isEmpty())
return accumulate;
for (TransactionLogs.Obsoletion obsoletion : obsoletions)
for (TransactionLog.Obsoletion obsoletion : obsoletions)
{
try
{
@ -125,13 +125,13 @@ class Helpers
return accumulate;
}
static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLogs txnLogs, List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLog txnLogs, List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
{
for (SSTableReader reader : readers)
{
try
{
obsoletions.add(new TransactionLogs.Obsoletion(reader, txnLogs.obsoleted(reader)));
obsoletions.add(new TransactionLog.Obsoletion(reader, txnLogs.obsoleted(reader)));
}
catch (Throwable t)
{
@ -141,12 +141,12 @@ class Helpers
return accumulate;
}
static Throwable abortObsoletion(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
static Throwable abortObsoletion(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
{
if (obsoletions == null || obsoletions.isEmpty())
return accumulate;
for (TransactionLogs.Obsoletion obsoletion : obsoletions)
for (TransactionLog.Obsoletion obsoletion : obsoletions)
{
try
{

View File

@ -18,10 +18,11 @@
package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.nio.file.Path;
import java.util.*;
import java.util.function.BiFunction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
@ -29,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@ -96,7 +98,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public final Tracker tracker;
// The transaction logs keep track of new and old sstable files
private final TransactionLogs transactionLogs;
private final TransactionLog transactionLog;
// the original readers this transaction was opened over, and that it guards
// (no other transactions may operate over these readers concurrently)
private final Set<SSTableReader> originals = new HashSet<>();
@ -113,7 +115,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
private final State staged = new State();
// the tidier and their readers, to be used for marking readers obsoleted during a commit
private List<TransactionLogs.Obsoletion> obsoletions;
private List<TransactionLog.Obsoletion> obsoletions;
/**
* construct a Transaction for use in an offline operation
@ -141,7 +143,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata)
{
Tracker dummy = new Tracker(null, false);
return new LifecycleTransaction(dummy, new TransactionLogs(operationType, metadata, dummy), Collections.emptyList());
return new LifecycleTransaction(dummy, new TransactionLog(operationType, metadata, dummy), Collections.emptyList());
}
/**
@ -150,18 +152,18 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public static LifecycleTransaction offline(OperationType operationType, File operationFolder)
{
Tracker dummy = new Tracker(null, false);
return new LifecycleTransaction(dummy, new TransactionLogs(operationType, operationFolder, dummy), Collections.emptyList());
return new LifecycleTransaction(dummy, new TransactionLog(operationType, operationFolder, dummy), Collections.emptyList());
}
LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
{
this(tracker, new TransactionLogs(operationType, getMetadata(tracker, readers), tracker), readers);
this(tracker, new TransactionLog(operationType, getMetadata(tracker, readers), tracker), readers);
}
LifecycleTransaction(Tracker tracker, TransactionLogs transactionLogs, Iterable<SSTableReader> readers)
LifecycleTransaction(Tracker tracker, TransactionLog transactionLog, Iterable<SSTableReader> readers)
{
this.tracker = tracker;
this.transactionLogs = transactionLogs;
this.transactionLog = transactionLog;
for (SSTableReader reader : readers)
{
originals.add(reader);
@ -185,19 +187,19 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return null;
}
public TransactionLogs logs()
public TransactionLog log()
{
return transactionLogs;
return transactionLog;
}
public OperationType opType()
{
return transactionLogs.getType();
return transactionLog.getType();
}
public UUID opId()
{
return transactionLogs.getId();
return transactionLog.getId();
}
public void doPrepare()
@ -210,8 +212,8 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
// prepare for compaction obsolete readers as long as they were part of the original set
// since those that are not original are early readers that share the same desc with the finals
maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLogs, obsoletions = new ArrayList<>(), null));
transactionLogs.prepareToCommit();
maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLog, obsoletions = new ArrayList<>(), null));
transactionLog.prepareToCommit();
}
/**
@ -226,7 +228,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
maybeFail(accumulate);
// transaction log commit failure means we must abort; safe commit is not possible
maybeFail(transactionLogs.commit(null));
maybeFail(transactionLog.commit(null));
// this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
// we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
@ -235,7 +237,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
accumulate = markObsolete(obsoletions, accumulate);
accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
accumulate = release(selfRefs(logged.obsolete), accumulate);
accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLogs.getType(), accumulate);
accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLog.getType(), accumulate);
return accumulate;
}
@ -251,16 +253,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
accumulate = abortObsoletion(obsoletions, accumulate);
if (logged.isEmpty() && staged.isEmpty())
return transactionLogs.abort(accumulate);
return transactionLog.abort(accumulate);
// mark obsolete all readers that are not versions of those present in the original set
Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
logger.debug("Obsoleting {}", obsolete);
accumulate = prepareForObsoletion(obsolete, transactionLogs, obsoletions = new ArrayList<>(), accumulate);
accumulate = prepareForObsoletion(obsolete, transactionLog, obsoletions = new ArrayList<>(), accumulate);
// it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report
// a failure to abort, which is useful information to have for debug
accumulate = transactionLogs.abort(accumulate);
accumulate = transactionLog.abort(accumulate);
accumulate = markObsolete(obsoletions, accumulate);
// replace all updated readers with a version restored to its original state
@ -491,7 +493,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
originals.remove(reader);
marked.remove(reader);
}
return new LifecycleTransaction(tracker, transactionLogs.getType(), readers);
return new LifecycleTransaction(tracker, transactionLog.getType(), readers);
}
/**
@ -524,27 +526,34 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public void trackNew(SSTable table)
{
transactionLogs.trackNew(table);
transactionLog.trackNew(table);
}
public void untrackNew(SSTable table)
{
transactionLogs.untrackNew(table);
transactionLog.untrackNew(table);
}
public static void removeUnfinishedLeftovers(CFMetaData metadata)
{
TransactionLogs.removeUnfinishedLeftovers(metadata);
TransactionLog.removeUnfinishedLeftovers(metadata);
}
public static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
/**
* Get the files in the folder specified, provided that the filter returns true.
* A filter is given each file and its type, and decides which files should be returned
* and which should be discarded. To classify files into their type, we read transaction
* log files. Should we fail to read these log files after a few times, we look at onTxnErr
* to determine what to do.
*
* @param folder - the folder to scan
* @param onTxnErr - how to handle a failure to read a txn log file
* @param filter - A function that receives each file and its type, it should return true to have the file returned
* @return - the list of files that were scanned and for which the filter returned true
*/
public static List<File> getFiles(Path folder, BiFunction<File, Directories.FileType, Boolean> filter, Directories.OnTxnErr onTxnErr)
{
return TransactionLogs.getTemporaryFiles(metadata, folder);
}
public static Set<File> getLogFiles(CFMetaData metadata)
{
return TransactionLogs.getLogFiles(metadata);
return new TransactionLog.FileLister(folder, filter, onTxnErr).list();
}
// a class representing the current state of the reader within this transaction, encoding the actions both logged

View File

@ -227,7 +227,7 @@ public class Tracker
*/
public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
{
try (TransactionLogs txnLogs = new TransactionLogs(operationType, cfstore.metadata, this))
try (TransactionLog txnLogs = new TransactionLog(operationType, cfstore.metadata, this))
{
Pair<View, View> result = apply(view -> {
Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
@ -239,7 +239,7 @@ public class Tracker
// It is important that any method accepting/returning a Throwable never throws an exception, and does its best
// to complete the instructions given to it
List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
List<TransactionLog.Obsoletion> obsoletions = new ArrayList<>();
accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
try
{

File diff suppressed because it is too large Load Diff

View File

@ -1,786 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.cassandra.utils.Throwables.merge;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Blocker;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.Transactional;
/**
* IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
* for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
* with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
* *requires* that the prepareToCommit() phase only take actions that can be rolled back.
*
* A class that tracks sstable files involved in a transaction across sstables:
* if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
*
* Two log files, NEW and OLD, contain new and old sstable files respectively. The log files also track each
* other by referencing each others path in the contents.
*
* If the transaction finishes successfully:
* - the OLD transaction file is deleted along with its contents, this includes the NEW transaction file.
* Before deleting we must let the SSTableTidier instances run first for any old readers that are being obsoleted
* (mark as compacted) by the transaction, see LifecycleTransaction
*
* If the transaction is aborted:
* - the NEW transaction file and its contents are deleted, this includes the OLD transaction file
*
* On start-up:
* - If we find a NEW transaction file, it means the transaction did not complete and we delete the NEW file and its contents
* - If we find an OLD transaction file but not a NEW file, it means the transaction must have completed and so we delete
* all the contents of the OLD file, if they still exist, and the OLD file itself.
*
* See CASSANDRA-7066 for full details.
*/
public class TransactionLogs extends Transactional.AbstractTransactional implements Transactional
{
private static final Logger logger = LoggerFactory.getLogger(TransactionLogs.class);
/**
* A single transaction log file, either NEW or OLD.
*/
final static class TransactionFile
{
static String EXT = ".log";
static char SEP = '_';
static String REGEX_STR = String.format("^(.*)_(.*)_(%s|%s)%s$", Type.NEW.txt, Type.OLD.txt, EXT);
static Pattern REGEX = Pattern.compile(REGEX_STR); //(opname)_(id)_(new|old).data
public enum Type
{
NEW (0, "new"),
OLD (1, "old");
public final int idx;
public final String txt;
Type(int idx, String txt)
{
this.idx = idx;
this.txt = txt;
}
};
public final Type type;
public final File file;
public final TransactionData parent;
public final Set<String> lines = new HashSet<>();
public TransactionFile(Type type, TransactionData parent)
{
this.type = type;
this.file = new File(parent.getFileName(type));
this.parent = parent;
if (exists())
lines.addAll(FileUtils.readLines(file));
}
public boolean add(SSTable table)
{
return add(table.descriptor.baseFilename());
}
private boolean add(String path)
{
String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), path);
if (lines.contains(relativePath))
return false;
lines.add(relativePath);
FileUtils.append(file, relativePath);
return true;
}
public void remove(SSTable table)
{
String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename());
assert lines.contains(relativePath) : String.format("%s is not tracked by %s", relativePath, file);
lines.remove(relativePath);
delete(relativePath);
}
public boolean contains(SSTable table)
{
String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename());
return lines.contains(relativePath);
}
private void deleteContents()
{
deleteOpposite();
// we sync the parent file descriptor between opposite log deletion and
// contents deletion to ensure there is a happens before edge between them
parent.sync();
lines.forEach(line -> delete(line));
lines.clear();
}
private void deleteOpposite()
{
Type oppositeType = type == Type.NEW ? Type.OLD : Type.NEW;
String oppositeFile = FileUtils.getRelativePath(parent.getParentFolder(), parent.getFileName(oppositeType));
assert lines.contains(oppositeFile) : String.format("Could not find %s amongst lines", oppositeFile);
delete(oppositeFile);
lines.remove(oppositeFile);
}
private void delete(String relativePath)
{
getTrackedFiles(relativePath).forEach(file -> TransactionLogs.delete(file));
}
public Set<File> getTrackedFiles()
{
Set<File> ret = new HashSet<>();
FileUtils.readLines(file).forEach(line -> ret.addAll(getTrackedFiles(line)));
ret.add(file);
return ret;
}
private List<File> getTrackedFiles(String relativePath)
{
List<File> ret = new ArrayList<>();
File file = new File(StringUtils.join(parent.getParentFolder(), File.separator, relativePath));
if (file.exists())
ret.add(file);
else
ret.addAll(Arrays.asList(new File(parent.getParentFolder()).listFiles((dir, name) -> {
return name.startsWith(relativePath);
})));
return ret;
}
public void delete(boolean deleteContents)
{
assert file.exists() : String.format("Expected %s to exists", file);
if (deleteContents)
deleteContents();
// we sync the parent file descriptor between contents and log deletion
// to ensure there is a happens before edge between them
parent.sync();
TransactionLogs.delete(file);
}
public boolean exists()
{
return file.exists();
}
}
/**
* We split the transaction data from the behavior because we need
* to reconstruct any left-overs and clean them up, as well as work
* out which files are temporary. So for these cases we don't want the full
* transactional behavior, plus it's handy for the TransactionTidier.
*/
final static class TransactionData implements AutoCloseable
{
private final OperationType opType;
private final UUID id;
private final File folder;
private final TransactionFile[] files;
private int folderDescriptor;
private boolean succeeded;
static TransactionData make(File logFile)
{
Matcher matcher = TransactionFile.REGEX.matcher(logFile.getName());
assert matcher.matches();
OperationType operationType = OperationType.fromFileName(matcher.group(1));
UUID id = UUID.fromString(matcher.group(2));
return new TransactionData(operationType, logFile.getParentFile(), id);
}
TransactionData(OperationType opType, File folder, UUID id)
{
this.opType = opType;
this.id = id;
this.folder = folder;
this.files = new TransactionFile[TransactionFile.Type.values().length];
for (TransactionFile.Type t : TransactionFile.Type.values())
this.files[t.idx] = new TransactionFile(t, this);
this.folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath());
this.succeeded = !newLog().exists() && oldLog().exists();
}
public void succeeded(boolean succeeded)
{
this.succeeded = succeeded;
}
public void close()
{
if (folderDescriptor > 0)
{
CLibrary.tryCloseFD(folderDescriptor);
folderDescriptor = -1;
}
}
void crossReference()
{
newLog().add(oldLog().file.getPath());
oldLog().add(newLog().file.getPath());
}
void sync()
{
if (folderDescriptor > 0)
CLibrary.trySync(folderDescriptor);
}
TransactionFile newLog()
{
return files[TransactionFile.Type.NEW.idx];
}
TransactionFile oldLog()
{
return files[TransactionFile.Type.OLD.idx];
}
OperationType getType()
{
return opType;
}
UUID getId()
{
return id;
}
Throwable removeUnfinishedLeftovers(Throwable accumulate)
{
try
{
if (succeeded)
oldLog().delete(true);
else
newLog().delete(true);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
return accumulate;
}
Set<File> getTemporaryFiles()
{
sync();
if (newLog().exists())
return newLog().getTrackedFiles();
else
return oldLog().getTrackedFiles();
}
String getFileName(TransactionFile.Type type)
{
String fileName = StringUtils.join(opType.fileName,
TransactionFile.SEP,
id.toString(),
TransactionFile.SEP,
type.txt,
TransactionFile.EXT);
return StringUtils.join(folder, File.separator, fileName);
}
String getParentFolder()
{
return folder.getParent();
}
static boolean isLogFile(String name)
{
return TransactionFile.REGEX.matcher(name).matches();
}
}
private final Tracker tracker;
private final TransactionData data;
private final Ref<TransactionLogs> selfRef;
// Deleting sstables is tricky because the mmapping might not have been finalized yet,
// and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
// Additionally, we need to make sure to delete the data file first, so on restart the others
// will be recognized as GCable.
private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
private static final Blocker blocker = new Blocker();
TransactionLogs(OperationType opType, CFMetaData metadata)
{
this(opType, metadata, null);
}
TransactionLogs(OperationType opType, CFMetaData metadata, Tracker tracker)
{
this(opType, new Directories(metadata), tracker);
}
TransactionLogs(OperationType opType, Directories directories, Tracker tracker)
{
this(opType, directories.getDirectoryForNewSSTables(), tracker);
}
TransactionLogs(OperationType opType, File folder, Tracker tracker)
{
this.tracker = tracker;
this.data = new TransactionData(opType,
Directories.getTransactionsDirectory(folder),
UUIDGen.getTimeUUID());
this.selfRef = new Ref<>(this, new TransactionTidier(data));
data.crossReference();
if (logger.isDebugEnabled())
logger.debug("Created transaction logs with id {}", data.id);
}
/**
* Track a reader as new.
**/
void trackNew(SSTable table)
{
if (!data.newLog().add(table))
throw new IllegalStateException(table + " is already tracked as new");
data.newLog().add(table);
}
/**
* Stop tracking a reader as new.
*/
void untrackNew(SSTable table)
{
data.newLog().remove(table);
}
/**
* Schedule a reader for deletion as soon as it is fully unreferenced and the transaction
* has been committed.
*/
SSTableTidier obsoleted(SSTableReader reader)
{
if (data.newLog().contains(reader))
{
if (data.oldLog().contains(reader))
throw new IllegalArgumentException();
return new SSTableTidier(reader, true, this);
}
if (!data.oldLog().add(reader))
throw new IllegalStateException();
if (tracker != null)
tracker.notifyDeleting(reader);
return new SSTableTidier(reader, false, this);
}
OperationType getType()
{
return data.getType();
}
UUID getId()
{
return data.getId();
}
@VisibleForTesting
String getDataFolder()
{
return data.getParentFolder();
}
@VisibleForTesting
String getLogsFolder()
{
return StringUtils.join(getDataFolder(), File.separator, Directories.TRANSACTIONS_SUBDIR);
}
@VisibleForTesting
TransactionData getData()
{
return data;
}
private static void delete(File file)
{
try
{
if (logger.isDebugEnabled())
logger.debug("Deleting {}", file);
Files.delete(file.toPath());
}
catch (NoSuchFileException e)
{
logger.error("Unable to delete {} as it does not exist", file);
}
catch (IOException e)
{
logger.error("Unable to delete {}", file, e);
throw new RuntimeException(e);
}
}
/**
* The transaction tidier.
*
* When the transaction reference is fully released we try to delete all the obsolete files
* depending on the transaction result.
*/
private static class TransactionTidier implements RefCounted.Tidy, Runnable
{
private final TransactionData data;
public TransactionTidier(TransactionData data)
{
this.data = data;
}
public void tidy() throws Exception
{
run();
}
public String name()
{
return data.id.toString();
}
public void run()
{
if (logger.isDebugEnabled())
logger.debug("Removing files for transaction {}", name());
Throwable err = data.removeUnfinishedLeftovers(null);
if (err != null)
{
logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
failedDeletions.add(this);
}
else
{
if (logger.isDebugEnabled())
logger.debug("Closing file transaction {}", name());
data.close();
}
}
}
static class Obsoletion
{
final SSTableReader reader;
final SSTableTidier tidier;
public Obsoletion(SSTableReader reader, SSTableTidier tidier)
{
this.reader = reader;
this.tidier = tidier;
}
}
/**
* The SSTableReader tidier. When a reader is fully released and no longer referenced
* by any one, we run this. It keeps a reference to the parent transaction and releases
* it when done, so that the final transaction cleanup can run when all obsolete readers
* are released.
*/
public static class SSTableTidier implements Runnable
{
// must not retain a reference to the SSTableReader, else leak detection cannot kick in
private final Descriptor desc;
private final long sizeOnDisk;
private final Tracker tracker;
private final boolean wasNew;
private final Ref<TransactionLogs> parentRef;
public SSTableTidier(SSTableReader referent, boolean wasNew, TransactionLogs parent)
{
this.desc = referent.descriptor;
this.sizeOnDisk = referent.bytesOnDisk();
this.tracker = parent.tracker;
this.wasNew = wasNew;
this.parentRef = parent.selfRef.tryRef();
}
public void run()
{
blocker.ask();
SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
try
{
// If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
File datafile = new File(desc.filenameFor(Component.DATA));
delete(datafile);
// let the remainder be cleaned up by delete
SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
}
catch (Throwable t)
{
logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
failedDeletions.add(this);
return;
}
if (tracker != null && tracker.cfstore != null && !wasNew)
tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
// release the referent to the parent so that the all transaction files can be released
parentRef.release();
}
public void abort()
{
parentRef.release();
}
}
/**
* Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
* Useful because there are times when we know GC has been invoked; also exposed as an mbean.
*/
public static void rescheduleFailedDeletions()
{
Runnable task;
while ( null != (task = failedDeletions.poll()))
ScheduledExecutors.nonPeriodicTasks.submit(task);
}
/**
* Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader)
* so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed.
*/
public static void waitForDeletions()
{
FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(() -> {
}, 0, TimeUnit.MILLISECONDS));
}
@VisibleForTesting
public static void pauseDeletions(boolean stop)
{
blocker.block(stop);
}
private Throwable complete(Throwable accumulate)
{
try
{
try
{
if (data.succeeded)
data.newLog().delete(false);
else
data.oldLog().delete(false);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
accumulate = selfRef.ensureReleased(accumulate);
return accumulate;
}
catch (Throwable t)
{
logger.error("Failed to complete file transaction {}", getId(), t);
return Throwables.merge(accumulate, t);
}
}
protected Throwable doCommit(Throwable accumulate)
{
data.succeeded(true);
return complete(accumulate);
}
protected Throwable doAbort(Throwable accumulate)
{
data.succeeded(false);
return complete(accumulate);
}
protected void doPrepare() { }
/**
* Called on startup to scan existing folders for any unfinished leftovers of
* operations that were ongoing when the process exited.
*
* We check if the new transaction file exists first, and if so we clean it up
* along with its contents, which includes the old file, else if only the old file exists
* it means the operation has completed and we only cleanup the old file with its contents.
*/
static void removeUnfinishedLeftovers(CFMetaData metadata)
{
Throwable accumulate = null;
Set<UUID> ids = new HashSet<>();
for (File dir : getFolders(metadata, null))
{
File[] logs = dir.listFiles((dir1, name) -> {
return TransactionData.isLogFile(name);
});
for (File log : logs)
{
try (TransactionData data = TransactionData.make(log))
{
// we need to check this because there are potentially 2 log files per operation
if (ids.contains(data.id))
continue;
ids.add(data.id);
accumulate = data.removeUnfinishedLeftovers(accumulate);
}
}
}
if (accumulate != null)
logger.error("Failed to remove unfinished transaction leftovers", accumulate);
}
/**
* Return a set of files that are temporary, that is they are involved with
* a transaction that hasn't completed yet.
*
* Only return the files that exist and that are located in the folder
* specified as a parameter or its sub-folders.
*/
static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
{
Set<File> ret = new HashSet<>();
Set<UUID> ids = new HashSet<>();
for (File dir : getFolders(metadata, folder))
{
File[] logs = dir.listFiles((dir1, name) -> {
return TransactionData.isLogFile(name);
});
for (File log : logs)
{
try(TransactionData data = TransactionData.make(log))
{
// we need to check this because there are potentially 2 log files per transaction
if (ids.contains(data.id))
continue;
ids.add(data.id);
ret.addAll(data.getTemporaryFiles()
.stream()
.filter(file -> FileUtils.isContained(folder, file))
.collect(Collectors.toSet()));
}
}
}
return ret;
}
/**
* Return the transaction log files that currently exist for this table.
*/
static Set<File> getLogFiles(CFMetaData metadata)
{
Set<File> ret = new HashSet<>();
for (File dir : getFolders(metadata, null))
ret.addAll(Arrays.asList(dir.listFiles((dir1, name) -> {
return TransactionData.isLogFile(name);
})));
return ret;
}
/**
* A utility method to work out the existing transaction sub-folders
* either for a table, or a specific parent folder, or both.
*/
private static List<File> getFolders(CFMetaData metadata, File folder)
{
List<File> ret = new ArrayList<>();
if (metadata != null)
{
Directories directories = new Directories(metadata);
ret.addAll(directories.getExistingDirectories(Directories.TRANSACTIONS_SUBDIR));
}
if (folder != null)
{
File opDir = Directories.getExistingDirectory(folder, Directories.TRANSACTIONS_SUBDIR);
if (opDir != null)
ret.add(opDir);
}
return ret;
}
}

View File

@ -18,7 +18,6 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
@ -27,16 +26,13 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Pair;
@ -77,89 +73,79 @@ public class SSTableLoader implements StreamEventHandler
{
outputHandler.output("Opening sstables and calculating sections to stream");
directory.list(new FilenameFilter()
{
final Map<File, Set<File>> allTemporaryFiles = new HashMap<>();
public boolean accept(File dir, String name)
{
File file = new File(dir, name);
LifecycleTransaction.getFiles(directory.toPath(),
(file, type) ->
{
File dir = file.getParentFile();
String name = file.getName();
if (file.isDirectory())
return false;
if (type != Directories.FileType.FINAL)
{
outputHandler.output(String.format("Skipping temporary file %s", name));
return false;
}
Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
Descriptor desc = p == null ? null : p.left;
if (p == null || !p.right.equals(Component.DATA))
return false;
Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
Descriptor desc = p == null ? null : p.left;
if (p == null || !p.right.equals(Component.DATA))
return false;
if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
{
outputHandler.output(String.format("Skipping file %s because index is missing", name));
return false;
}
if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
{
outputHandler.output(String.format("Skipping file %s because index is missing", name));
return false;
}
CFMetaData metadata = client.getTableMetadata(desc.cfname);
if (metadata == null)
{
outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname));
return false;
}
CFMetaData metadata = client.getTableMetadata(desc.cfname);
if (metadata == null)
{
outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname));
return false;
}
Set<File> temporaryFiles = allTemporaryFiles.get(dir);
if (temporaryFiles == null)
{
temporaryFiles = LifecycleTransaction.getTemporaryFiles(metadata, dir);
allTemporaryFiles.put(dir, temporaryFiles);
}
Set<Component> components = new HashSet<>();
components.add(Component.DATA);
components.add(Component.PRIMARY_INDEX);
if (new File(desc.filenameFor(Component.SUMMARY)).exists())
components.add(Component.SUMMARY);
if (new File(desc.filenameFor(Component.COMPRESSION_INFO)).exists())
components.add(Component.COMPRESSION_INFO);
if (new File(desc.filenameFor(Component.STATS)).exists())
components.add(Component.STATS);
if (temporaryFiles.contains(file))
{
outputHandler.output(String.format("Skipping temporary file %s", name));
return false;
}
try
{
// To conserve memory, open SSTableReaders without bloom filters and discard
// the index summary after calculating the file sections to stream and the estimated
// number of keys for each endpoint. See CASSANDRA-5555 for details.
SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata);
sstables.add(sstable);
Set<Component> components = new HashSet<>();
components.add(Component.DATA);
components.add(Component.PRIMARY_INDEX);
if (new File(desc.filenameFor(Component.SUMMARY)).exists())
components.add(Component.SUMMARY);
if (new File(desc.filenameFor(Component.COMPRESSION_INFO)).exists())
components.add(Component.COMPRESSION_INFO);
if (new File(desc.filenameFor(Component.STATS)).exists())
components.add(Component.STATS);
// calculate the sstable sections to stream as well as the estimated number of
// keys per host
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : ranges.entrySet())
{
InetAddress endpoint = entry.getKey();
Collection<Range<Token>> tokenRanges = entry.getValue();
try
{
// To conserve memory, open SSTableReaders without bloom filters and discard
// the index summary after calculating the file sections to stream and the estimated
// number of keys for each endpoint. See CASSANDRA-5555 for details.
SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata);
sstables.add(sstable);
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
Ref<SSTableReader> ref = sstable.ref();
StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
// calculate the sstable sections to stream as well as the estimated number of
// keys per host
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : ranges.entrySet())
{
InetAddress endpoint = entry.getKey();
Collection<Range<Token>> tokenRanges = entry.getValue();
// to conserve heap space when bulk loading
sstable.releaseSummary();
}
catch (IOException e)
{
outputHandler.output(String.format("Skipping file %s, error opening it: %s", name, e.getMessage()));
}
return false;
},
Directories.OnTxnErr.IGNORE);
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
Ref<SSTableReader> ref = sstable.ref();
StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
// to conserve heap space when bulk loading
sstable.releaseSummary();
}
catch (IOException e)
{
outputHandler.output(String.format("Skipping file %s, error opening it: %s", name, e.getMessage()));
}
return false;
}
});
return sstables;
}

View File

@ -45,7 +45,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.FSError;
@ -1647,7 +1647,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @return true if the this is the first time the file was marked obsolete. Calling this
* multiple times is usually buggy (see exceptions in Tracker.unmarkCompacting and removeOldSSTablesSize).
*/
public void markObsolete(TransactionLogs.SSTableTidier tidier)
public void markObsolete(TransactionLog.SSTableTidier tidier)
{
if (logger.isDebugEnabled())
logger.debug("Marking {} compacted", getFilename());
@ -1903,22 +1903,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
: (sstableMetadata.totalRows == 0 ? 0 : (int)(sstableMetadata.totalColumnsSet / sstableMetadata.totalRows));
}
public Set<Integer> getAncestors()
{
try
{
CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
if (compactionMetadata != null)
return compactionMetadata.ancestors;
return Collections.emptySet();
}
catch (IOException e)
{
SSTableReader.logOpenException(descriptor, e);
return Collections.emptySet();
}
}
public int getSSTableLevel()
{
return sstableMetadata.sstableLevel;
@ -2191,7 +2175,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// sstable have been released
private ScheduledFuture readMeterSyncFuture;
// shared state managing if the logical sstable has been compacted; this is used in cleanup
private volatile TransactionLogs.SSTableTidier obsoletion;
private volatile TransactionLog.SSTableTidier obsoletion;
GlobalTidy(final SSTableReader reader)
{

View File

@ -66,6 +66,8 @@ public abstract class Version
public abstract boolean hasOldBfHashOrder();
public abstract boolean hasCompactionAncestors();
public String getVersion()
{
return version;

View File

@ -140,6 +140,11 @@ public class BigFormat implements SSTableFormat
*/
private final boolean hasOldBfHashOrder;
/**
* CASSANDRA-7066: compaction ancerstors are no longer used and have been removed.
*/
private final boolean hasCompactionAncestors;
BigVersion(String version)
{
super(instance, version);
@ -166,6 +171,7 @@ public class BigFormat implements SSTableFormat
newFileName = version.compareTo("la") >= 0;
hasOldBfHashOrder = version.compareTo("ma") < 0;
hasCompactionAncestors = version.compareTo("ma") < 0;
storeRows = version.compareTo("ma") >= 0;
correspondingMessagingVersion = storeRows
? MessagingService.VERSION_30
@ -220,6 +226,12 @@ public class BigFormat implements SSTableFormat
return hasOldBfHashOrder;
}
@Override
public boolean hasCompactionAncestors()
{
return hasCompactionAncestors;
}
@Override
public boolean hasNewFileName()
{

View File

@ -18,8 +18,6 @@
package org.apache.cassandra.io.sstable.metadata;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
@ -39,13 +37,10 @@ public class CompactionMetadata extends MetadataComponent
{
public static final IMetadataComponentSerializer serializer = new CompactionMetadataSerializer();
public final Set<Integer> ancestors;
public final ICardinality cardinalityEstimator;
public CompactionMetadata(Set<Integer> ancestors, ICardinality cardinalityEstimator)
public CompactionMetadata(ICardinality cardinalityEstimator)
{
this.ancestors = ancestors;
this.cardinalityEstimator = cardinalityEstimator;
}
@ -57,48 +52,46 @@ public class CompactionMetadata extends MetadataComponent
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
CompactionMetadata that = (CompactionMetadata) o;
return ancestors == null ? that.ancestors == null : ancestors.equals(that.ancestors);
if (o == null || getClass() != o.getClass())
return false;
// keeping equals and hashCode as all classes inheriting from MetadataComponent
// implement them but we have really nothing to compare
return true;
}
@Override
public int hashCode()
{
return ancestors != null ? ancestors.hashCode() : 0;
// see comment in equals
return 31;
}
public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
{
public int serializedSize(CompactionMetadata component) throws IOException
{
int size = 0;
size += TypeSizes.sizeof(component.ancestors.size());
for (int g : component.ancestors)
size += TypeSizes.sizeof(g);
byte[] serializedCardinality = component.cardinalityEstimator.getBytes();
size += TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length;
return size;
return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length;
}
public void serialize(CompactionMetadata component, DataOutputPlus out) throws IOException
{
out.writeInt(component.ancestors.size());
for (int g : component.ancestors)
out.writeInt(g);
ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);
}
public CompactionMetadata deserialize(Version version, DataInputPlus in) throws IOException
{
int nbAncestors = in.readInt();
Set<Integer> ancestors = new HashSet<>(nbAncestors);
for (int i = 0; i < nbAncestors; i++)
ancestors.add(in.readInt());
if (version.hasCompactionAncestors())
{ // skip ancestors
int nbAncestors = in.readInt();
in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors));
}
ICardinality cardinality = HyperLogLogPlus.Builder.build(ByteBufferUtil.readBytes(in, in.readInt()));
return new CompactionMetadata(ancestors, cardinality);
return new CompactionMetadata(cardinality);
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.*;
import com.google.common.collect.Maps;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@ -60,9 +61,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
out.writeDouble(validation.bloomFilterFPChance);
out.writeDouble(stats.compressionRatio);
out.writeUTF(validation.partitioner);
out.writeInt(compaction.ancestors.size());
for (Integer g : compaction.ancestors)
out.writeInt(g);
out.writeInt(0); // compaction ancestors
StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
out.writeInt(stats.sstableLevel);
out.writeInt(stats.minClusteringValues.size());
@ -99,10 +98,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
double bloomFilterFPChance = in.readDouble();
double compressionRatio = in.readDouble();
String partitioner = in.readUTF();
int nbAncestors = in.readInt();
Set<Integer> ancestors = new HashSet<>(nbAncestors);
for (int i = 0; i < nbAncestors; i++)
ancestors.add(in.readInt());
int nbAncestors = in.readInt(); //skip compaction ancestors
in.skipBytes(nbAncestors * TypeSizes.sizeof(nbAncestors));
StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
int sstableLevel = 0;
if (in.available() > 0)
@ -143,7 +140,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
-1));
if (types.contains(MetadataType.COMPACTION))
components.put(MetadataType.COMPACTION,
new CompactionMetadata(ancestors, null));
new CompactionMetadata(null));
}
}
return components;

View File

@ -94,7 +94,6 @@ public class MetadataCollector implements PartitionStatisticsCollector
protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
protected double compressionRatio = NO_COMPRESSION_RATIO;
protected Set<Integer> ancestors = new HashSet<>();
protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
protected int sstableLevel;
protected ByteBuffer[] minClusteringValues;
@ -120,29 +119,12 @@ public class MetadataCollector implements PartitionStatisticsCollector
this.maxClusteringValues = new ByteBuffer[comparator.size()];
}
public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level, boolean skipAncestors)
public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level)
{
this(comparator);
replayPosition(ReplayPosition.getReplayPosition(sstables));
sstableLevel(level);
// Get the max timestamp of the precompacted sstables
// and adds generation of live ancestors
if (!skipAncestors)
{
for (SSTableReader sstable : sstables)
{
addAncestor(sstable.descriptor.generation);
for (Integer i : sstable.getAncestors())
if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
addAncestor(i);
}
}
}
public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level)
{
this(sstables, comparator, level, false);
}
public MetadataCollector addKey(ByteBuffer key)
@ -237,12 +219,6 @@ public class MetadataCollector implements PartitionStatisticsCollector
return this;
}
public MetadataCollector addAncestor(int generation)
{
this.ancestors.add(generation);
return this;
}
public MetadataCollector sstableLevel(int sstableLevel)
{
this.sstableLevel = sstableLevel;
@ -313,7 +289,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
repairedAt,
totalColumnsSet,
totalRows));
components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality));
components.put(MetadataType.HEADER, header.toComponent());
return components;
}

View File

@ -616,11 +616,6 @@ public class FileUtils
{
return Files.readAllLines(file.toPath(), Charset.forName("utf-8"));
}
catch (NoSuchFileException ex)
{
logger.warn("Tried to read non existing file: {}", file);
return Collections.emptyList();
}
catch (IOException ex)
{
throw new RuntimeException(ex);

View File

@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
import com.sun.management.GarbageCollectionNotificationInfo;
import com.sun.management.GcInfo;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.utils.StatusLogger;
public class GCInspector implements NotificationListener, GCInspectorMXBean
@ -284,7 +284,7 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
// if we just finished an old gen collection and we're still using a lot of memory, try to reduce the pressure
if (gcState.assumeGCIsOldGen)
TransactionLogs.rescheduleFailedDeletions();
TransactionLog.rescheduleFailedDeletions();
}
}

View File

@ -237,8 +237,7 @@ public class StartupChecks
{
String name = dir.getFileName().toString();
return (name.equals(Directories.SNAPSHOT_SUBDIR)
|| name.equals(Directories.BACKUPS_SUBDIR)
|| name.equals(Directories.TRANSACTIONS_SUBDIR))
|| name.equals(Directories.BACKUPS_SUBDIR))
? FileVisitResult.SKIP_SUBTREE
: FileVisitResult.CONTINUE;
}

View File

@ -70,7 +70,7 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
@ -4241,7 +4241,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void rescheduleFailedDeletions()
{
TransactionLogs.rescheduleFailedDeletions();
TransactionLog.rescheduleFailedDeletions();
}
/**

View File

@ -34,6 +34,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@ -68,7 +69,7 @@ public class SSTableExpiredBlockers
Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true);
Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
Set<SSTableReader> sstables = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
{

View File

@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@ -75,7 +76,7 @@ public class SSTableLevelResetter
Keyspace keyspace = Keyspace.openWithoutSSTables(keyspaceName);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnfamily);
boolean foundSSTable = false;
for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister().list().entrySet())
for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().entrySet())
{
if (sstable.getValue().contains(Component.STATS))
{

View File

@ -78,9 +78,7 @@ public class SSTableMetadataViewer
}
if (compaction != null)
{
out.printf("Ancestors: %s%n", compaction.ancestors.toString());
out.printf("Estimated cardinality: %s%n", compaction.cardinalityEstimator.cardinality());
}
}
else

View File

@ -95,7 +95,7 @@ public class SSTableOfflineRelevel
Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true);
Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
Set<SSTableReader> sstables = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
{

View File

@ -26,17 +26,20 @@ import org.apache.cassandra.utils.OutputHandler;
import org.apache.commons.cli.*;
import java.io.File;
import java.io.IOException;
import java.util.function.BiFunction;
import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
public class StandaloneLister
public class StandaloneSSTableUtil
{
private static final String TOOL_NAME = "sstablelister";
private static final String TOOL_NAME = "sstableutil";
private static final String TYPE_OPTION = "type";
private static final String OP_LOG_OPTION = "oplog";
private static final String VERBOSE_OPTION = "verbose";
private static final String DEBUG_OPTION = "debug";
private static final String HELP_OPTION = "help";
private static final String CLEANUP_OPTION = "cleanup";
public static void main(String args[])
{
@ -54,23 +57,15 @@ public class StandaloneLister
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
Directories directories = new Directories(metadata);
Directories.SSTableLister lister = directories.sstableLister();
if (options.type == Options.FileType.FINAL)
lister.skipTemporary(true);
else if (options.type == Options.FileType.TMP)
lister.onlyTemporary(true);
for (File file : lister.listFiles())
handler.output(file.getCanonicalPath());
if (options.oplogs)
if (options.cleanup)
{
for (File file : LifecycleTransaction.getLogFiles(metadata))
{
handler.output(file.getCanonicalPath());
}
handler.output("Cleanuping up...");
LifecycleTransaction.removeUnfinishedLeftovers(metadata);
}
else
{
handler.output("Listing files...");
listFiles(options, metadata, handler);
}
System.exit(0);
@ -84,6 +79,35 @@ public class StandaloneLister
}
}
private static void listFiles(Options options, CFMetaData metadata, OutputHandler handler) throws IOException
{
Directories directories = new Directories(metadata);
for (File dir : directories.getCFDirectories())
{
for (File file : LifecycleTransaction.getFiles(dir.toPath(), getFilter(options), Directories.OnTxnErr.THROW))
handler.output(file.getCanonicalPath());
}
}
private static BiFunction<File, Directories.FileType, Boolean> getFilter(Options options)
{
return (file, type) ->
{
switch(type)
{
case FINAL:
return options.type != Options.FileType.TMP;
case TEMPORARY:
return options.type != Options.FileType.FINAL;
case TXN_LOG:
return options.oplogs;
default:
throw new AssertionError();
}
};
}
private static class Options
{
public enum FileType
@ -131,6 +155,7 @@ public class StandaloneLister
public boolean debug;
public boolean verbose;
public boolean oplogs;
public boolean cleanup;
public FileType type;
private Options(String keyspaceName, String cfName)
@ -171,6 +196,7 @@ public class StandaloneLister
opts.verbose = cmd.hasOption(VERBOSE_OPTION);
opts.type = FileType.fromOption(cmd.getOptionValue(TYPE_OPTION));
opts.oplogs = cmd.hasOption(OP_LOG_OPTION);
opts.cleanup = cmd.hasOption(CLEANUP_OPTION);
return opts;
}
@ -191,6 +217,7 @@ public class StandaloneLister
private static CmdLineOptions getCmdLineOptions()
{
CmdLineOptions options = new CmdLineOptions();
options.addOption("c", CLEANUP_OPTION, "clean-up any outstanding transactions");
options.addOption("d", DEBUG_OPTION, "display stack traces");
options.addOption("h", HELP_OPTION, "display this help message");
options.addOption("o", OP_LOG_OPTION, "include operation logs");

View File

@ -34,7 +34,7 @@ import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.JVMStabilityInspector;
@ -84,7 +84,7 @@ public class StandaloneScrubber
String snapshotName = "pre-scrub-" + System.currentTimeMillis();
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true);
Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
List<SSTableReader> sstables = new ArrayList<>();
@ -145,7 +145,7 @@ public class StandaloneScrubber
// Check (and repair) manifests
checkManifest(cfs.getCompactionStrategyManager(), cfs, sstables);
CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
System.exit(0); // We need that to stop non daemonized threads
}
catch (Exception e)

View File

@ -23,7 +23,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.cli.*;
@ -164,7 +164,7 @@ public class StandaloneSplitter
}
}
CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
System.exit(0); // We need that to stop non daemonized threads
}
catch (Exception e)

View File

@ -20,7 +20,7 @@ package org.apache.cassandra.tools;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.cli.*;
@ -63,7 +63,7 @@ public class StandaloneUpgrader
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cf);
OutputHandler handler = new OutputHandler.SystemOutput(false, options.debug);
Directories.SSTableLister lister = cfs.directories.sstableLister();
Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW);
if (options.snapshot != null)
lister.onlyBackups(true).snapshots(options.snapshot);
else
@ -120,7 +120,7 @@ public class StandaloneUpgrader
}
}
CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
System.exit(0);
}
catch (Exception e)

View File

@ -69,7 +69,7 @@ public class StandaloneVerifier
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cfName);
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true);
Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
boolean extended = options.extended;

View File

@ -36,7 +36,7 @@ public class Stop extends NodeToolCmd
@Option(title = "compactionId",
name = {"-id", "--compaction-id"},
description = "Use -id to stop a compaction by the specified id. Ids can be found in the system.compactions_in_progress table.",
description = "Use -id to stop a compaction by the specified id. Ids can be found in the transaction log files whose name starts with compaction_, located in the table transactions folder.",
required = false)
private String compactionId = "";

View File

@ -532,7 +532,7 @@ public class ColumnFamilyStoreTest
ColumnFamilyStore.scrubDataDirectories(cfs.metadata);
List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister().listFiles();
List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister(Directories.OnTxnErr.THROW).listFiles();
assertNotNull(ssTableFiles);
assertEquals(0, ssTableFiles.size());
}

View File

@ -23,7 +23,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
@ -236,7 +235,7 @@ public class DirectoriesTest
Set<File> listed;
// List all but no snapshot, backup
lister = directories.sstableLister();
lister = directories.sstableLister(Directories.OnTxnErr.THROW);
listed = new HashSet<>(lister.listFiles());
for (File f : files.get(cfm.cfName))
{
@ -247,7 +246,7 @@ public class DirectoriesTest
}
// List all but including backup (but no snapshot)
lister = directories.sstableLister().includeBackups(true);
lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true);
listed = new HashSet<>(lister.listFiles());
for (File f : files.get(cfm.cfName))
{
@ -258,7 +257,7 @@ public class DirectoriesTest
}
// Skip temporary and compacted
lister = directories.sstableLister().skipTemporary(true);
lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
listed = new HashSet<>(lister.listFiles());
for (File f : files.get(cfm.cfName))
{

View File

@ -37,6 +37,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
@ -163,10 +164,7 @@ public class KeyCacheTest
refs.release();
while (ScheduledExecutors.nonPeriodicTasks.getActiveCount() + ScheduledExecutors.nonPeriodicTasks.getQueue().size() > 0)
{
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);;
}
TransactionLog.waitForDeletions();
// after releasing the reference this should drop to 2
assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);

View File

@ -43,7 +43,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.Partition;
@ -371,7 +371,7 @@ public class ScrubTest
{
scrubber.scrub();
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
cfs.loadNewSSTables();
assertOrderedAll(cfs, 7);
}

View File

@ -32,14 +32,9 @@ import org.junit.Test;
import junit.framework.Assert;
import org.apache.cassandra.MockSchema;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.big.BigTableReader;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
@ -164,10 +159,10 @@ public class HelpersTest
public void testMarkObsolete()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
TransactionLogs txnLogs = new TransactionLogs(OperationType.UNKNOWN, cfs.metadata);
TransactionLog txnLogs = new TransactionLog(OperationType.UNKNOWN, cfs.metadata);
Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
List<TransactionLog.Obsoletion> obsoletions = new ArrayList<>();
Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
assertNotNull(obsoletions);
assertEquals(2, obsoletions.size());

View File

@ -249,7 +249,7 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest
protected TestableTransaction newTest()
{
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
SSTableReader.resetTidying();
return new TxnTest();
}

View File

@ -89,10 +89,9 @@ public class RealTransactionsTest extends SchemaLoader
SSTableReader oldSSTable = getSSTable(cfs, 1);
LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
SSTableReader newSSTable = replaceSSTable(cfs, txn, false);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertFiles(txn.logs().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths()));
assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
assertFiles(txn.log().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths()));
}
@Test
@ -105,10 +104,9 @@ public class RealTransactionsTest extends SchemaLoader
LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
replaceSSTable(cfs, txn, true);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertFiles(txn.logs().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths()));
assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
assertFiles(txn.log().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths()));
}
@Test
@ -120,11 +118,6 @@ public class RealTransactionsTest extends SchemaLoader
SSTableReader ssTableReader = getSSTable(cfs, 100);
String dataFolder = cfs.getLiveSSTables().iterator().next().descriptor.directory.getPath();
String transactionLogsFolder = StringUtils.join(dataFolder, File.separator, Directories.TRANSACTIONS_SUBDIR);
assertTrue(new File(transactionLogsFolder).exists());
assertFiles(transactionLogsFolder, Collections.<String>emptySet());
assertFiles(dataFolder, new HashSet<>(ssTableReader.getAllFilePaths()));
}

View File

@ -187,9 +187,9 @@ public class TrackerTest
public void testDropSSTables()
{
testDropSSTables(false);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
testDropSSTables(true);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
}
private void testDropSSTables(boolean invalidate)
@ -203,62 +203,54 @@ public class TrackerTest
MockSchema.sstable(2, 71, true, cfs));
tracker.addInitialSSTables(copyOf(readers));
try
try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION))
{
// TransactionLogs.pauseDeletions(true);
try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION))
if (invalidate)
{
if (invalidate)
{
cfs.invalidate(false);
}
else
{
tracker.dropSSTables();
TransactionLogs.waitForDeletions();
}
Assert.assertEquals(9, cfs.metric.totalDiskSpaceUsed.getCount());
Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
Assert.assertEquals(1, tracker.getView().sstables.size());
}
if (!invalidate)
{
Assert.assertEquals(1, tracker.getView().sstables.size());
Assert.assertEquals(readers.get(0), Iterables.getFirst(tracker.getView().sstables, null));
Assert.assertEquals(1, readers.get(0).selfRef().globalCount());
Assert.assertFalse(readers.get(0).isMarkedCompacted());
for (SSTableReader reader : readers.subList(1, 3))
{
Assert.assertEquals(0, reader.selfRef().globalCount());
Assert.assertTrue(reader.isMarkedCompacted());
}
Assert.assertNull(tracker.dropSSTables(reader -> reader != readers.get(0), OperationType.UNKNOWN, null));
Assert.assertEquals(1, tracker.getView().sstables.size());
Assert.assertEquals(3, listener.received.size());
Assert.assertEquals(tracker, listener.senders.get(0));
Assert.assertTrue(listener.received.get(0) instanceof SSTableDeletingNotification);
Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
Assert.assertTrue(listener.received.get(2) instanceof SSTableListChangedNotification);
Assert.assertEquals(readers.get(1), ((SSTableDeletingNotification) listener.received.get(0)).deleting);
Assert.assertEquals(readers.get(2), ((SSTableDeletingNotification)listener.received.get(1)).deleting);
Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(2)).added.size());
Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
readers.get(0).selfRef().release();
cfs.invalidate(false);
}
else
{
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
for (SSTableReader reader : readers)
Assert.assertTrue(reader.isMarkedCompacted());
tracker.dropSSTables();
TransactionLog.waitForDeletions();
}
Assert.assertEquals(9, cfs.metric.totalDiskSpaceUsed.getCount());
Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
Assert.assertEquals(1, tracker.getView().sstables.size());
}
finally
if (!invalidate)
{
// TransactionLogs.pauseDeletions(false);
Assert.assertEquals(1, tracker.getView().sstables.size());
Assert.assertEquals(readers.get(0), Iterables.getFirst(tracker.getView().sstables, null));
Assert.assertEquals(1, readers.get(0).selfRef().globalCount());
Assert.assertFalse(readers.get(0).isMarkedCompacted());
for (SSTableReader reader : readers.subList(1, 3))
{
Assert.assertEquals(0, reader.selfRef().globalCount());
Assert.assertTrue(reader.isMarkedCompacted());
}
Assert.assertNull(tracker.dropSSTables(reader -> reader != readers.get(0), OperationType.UNKNOWN, null));
Assert.assertEquals(1, tracker.getView().sstables.size());
Assert.assertEquals(3, listener.received.size());
Assert.assertEquals(tracker, listener.senders.get(0));
Assert.assertTrue(listener.received.get(0) instanceof SSTableDeletingNotification);
Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
Assert.assertTrue(listener.received.get(2) instanceof SSTableListChangedNotification);
Assert.assertEquals(readers.get(1), ((SSTableDeletingNotification) listener.received.get(0)).deleting);
Assert.assertEquals(readers.get(2), ((SSTableDeletingNotification)listener.received.get(1)).deleting);
Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(2)).added.size());
Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
readers.get(0).selfRef().release();
}
else
{
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
for (SSTableReader reader : readers)
Assert.assertTrue(reader.isMarkedCompacted());
}
}

View File

@ -0,0 +1,791 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.junit.BeforeClass;
import org.junit.Test;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertNull;
import static junit.framework.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import junit.framework.Assert;
import org.apache.cassandra.MockSchema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.BufferedSegmentedFile;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
import org.apache.cassandra.utils.concurrent.Transactional;
public class TransactionLogTest extends AbstractTransactionalTest
{
private static final String KEYSPACE = "TransactionLogsTest";
@BeforeClass
public static void setUp()
{
MockSchema.cleanup();
}
protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception
{
TransactionLog.waitForDeletions();
SSTableReader.resetTidying();
return new TxnTest();
}
private static final class TxnTest extends TestableTransaction
{
private final static class Transaction extends Transactional.AbstractTransactional implements Transactional
{
final ColumnFamilyStore cfs;
final TransactionLog txnLogs;
final SSTableReader sstableOld;
final SSTableReader sstableNew;
final TransactionLog.SSTableTidier tidier;
public Transaction(ColumnFamilyStore cfs, TransactionLog txnLogs) throws IOException
{
this.cfs = cfs;
this.txnLogs = txnLogs;
this.sstableOld = sstable(cfs, 0, 128);
this.sstableNew = sstable(cfs, 1, 128);
assertNotNull(txnLogs);
assertNotNull(txnLogs.getId());
Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType());
txnLogs.trackNew(sstableNew);
tidier = txnLogs.obsoleted(sstableOld);
assertNotNull(tidier);
}
protected Throwable doCommit(Throwable accumulate)
{
sstableOld.markObsolete(tidier);
sstableOld.selfRef().release();
TransactionLog.waitForDeletions();
Throwable ret = txnLogs.commit(accumulate);
sstableNew.selfRef().release();
return ret;
}
protected Throwable doAbort(Throwable accumulate)
{
tidier.abort();
TransactionLog.waitForDeletions();
Throwable ret = txnLogs.abort(accumulate);
sstableNew.selfRef().release();
sstableOld.selfRef().release();
return ret;
}
protected void doPrepare()
{
txnLogs.prepareToCommit();
}
protected void assertInProgress() throws Exception
{
assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
sstableOld.getAllFilePaths(),
Collections.singleton(txnLogs.getData().getLogFile().file.getPath()))));
}
protected void assertPrepared() throws Exception
{
}
protected void assertAborted() throws Exception
{
assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
}
protected void assertCommitted() throws Exception
{
assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
}
}
final Transaction txn;
private TxnTest() throws IOException
{
this(MockSchema.newCFS(KEYSPACE));
}
private TxnTest(ColumnFamilyStore cfs) throws IOException
{
this(cfs, new TransactionLog(OperationType.COMPACTION, cfs.metadata));
}
private TxnTest(ColumnFamilyStore cfs, TransactionLog txnLogs) throws IOException
{
this(new Transaction(cfs, txnLogs));
}
private TxnTest(Transaction txn)
{
super(txn);
this.txn = txn;
}
protected void assertInProgress() throws Exception
{
txn.assertInProgress();
}
protected void assertPrepared() throws Exception
{
txn.assertPrepared();
}
protected void assertAborted() throws Exception
{
txn.assertAborted();
}
protected void assertCommitted() throws Exception
{
txn.assertCommitted();
}
}
@Test
public void testUntrack() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableNew = sstable(cfs, 1, 128);
// complete a transaction without keep the new files since they were untracked
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstableNew);
transactionLog.untrackNew(sstableNew);
transactionLog.finish();
sstableNew.selfRef().release();
Thread.sleep(1);
TransactionLog.waitForDeletions();
assertFiles(transactionLog.getDataFolder(), Collections.<String>emptySet());
}
@Test
public void testCommitSameDesc() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld1 = sstable(cfs, 0, 128);
SSTableReader sstableOld2 = sstable(cfs, 0, 256);
SSTableReader sstableNew = sstable(cfs, 1, 128);
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstableNew);
sstableOld1.setReplaced();
TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld2);
assertNotNull(tidier);
transactionLog.finish();
sstableOld2.markObsolete(tidier);
sstableOld1.selfRef().release();
sstableOld2.selfRef().release();
assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
sstableNew.selfRef().release();
}
@Test
public void testCommitOnlyNew() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstable);
transactionLog.finish();
assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
sstable.selfRef().release();
}
@Test
public void testCommitOnlyOld() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstable);
assertNotNull(tidier);
transactionLog.finish();
sstable.markObsolete(tidier);
sstable.selfRef().release();
assertFiles(transactionLog.getDataFolder(), new HashSet<>());
}
@Test
public void testAbortOnlyNew() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstable);
transactionLog.abort();
sstable.selfRef().release();
assertFiles(transactionLog.getDataFolder(), new HashSet<>());
}
@Test
public void testAbortOnlyOld() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstable);
assertNotNull(tidier);
tidier.abort();
transactionLog.abort();
sstable.selfRef().release();
assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
}
@Test
public void testRemoveUnfinishedLeftovers_abort() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld = sstable(cfs, 0, 128);
SSTableReader sstableNew = sstable(cfs, 1, 128);
// simulate tracking sstables with a failed transaction (new log file NOT deleted)
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstableNew);
TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld);
Set<File> tmpFiles = Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths().stream().map(p -> new File(p)).collect(Collectors.toList()),
Collections.singleton(transactionLog.getData().getLogFile().file)));
sstableNew.selfRef().release();
sstableOld.selfRef().release();
Assert.assertEquals(tmpFiles, TransactionLog.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory));
// normally called at startup
TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
// sstableOld should be only table left
Directories directories = new Directories(cfs.metadata);
Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list();
assertEquals(1, sstables.size());
assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
tidier.run();
// complete the transaction to avoid LEAK errors
transactionLog.close();
}
@Test
public void testRemoveUnfinishedLeftovers_commit() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld = sstable(cfs, 0, 128);
SSTableReader sstableNew = sstable(cfs, 1, 128);
// simulate tracking sstables with a committed transaction (new log file deleted)
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstableNew);
TransactionLog.SSTableTidier tidier = transactionLog.obsoleted(sstableOld);
//Fake a commit
transactionLog.getData().getLogFile().commit();
Set<File> tmpFiles = Sets.newHashSet(Iterables.concat(sstableOld.getAllFilePaths().stream().map(p -> new File(p)).collect(Collectors.toList()),
Collections.singleton(transactionLog.getData().getLogFile().file)));
sstableNew.selfRef().release();
sstableOld.selfRef().release();
Assert.assertEquals(tmpFiles, TransactionLog.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory));
// normally called at startup
TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
// sstableNew should be only table left
Directories directories = new Directories(cfs.metadata);
Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list();
assertEquals(1, sstables.size());
assertFiles(transactionLog.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
tidier.run();
// complete the transaction to avoid LEAK errors
assertNull(transactionLog.complete(null));
}
@Test
public void testGetTemporaryFiles() throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable1 = sstable(cfs, 0, 128);
File dataFolder = sstable1.descriptor.directory;
Set<File> tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
assertNotNull(tmpFiles);
assertEquals(0, tmpFiles.size());
TransactionLog transactionLog = new TransactionLog(OperationType.WRITE, cfs.metadata);
Directories directories = new Directories(cfs.metadata);
File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
SSTableReader sstable2 = sstable(cfs, 1, 128);
transactionLog.trackNew(sstable2);
Map<Descriptor, Set<Component>> sstables = directories.sstableLister(Directories.OnTxnErr.THROW).list();
assertEquals(2, sstables.size());
// this should contain sstable1, sstable2 and the transaction log file
File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length;
assertEquals(numNewFiles - 1, sstable2.getAllFilePaths().size()); // new files except for transaction log file
tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
assertNotNull(tmpFiles);
assertEquals(numNewFiles, tmpFiles.size());
File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA));
File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX));
assertTrue(tmpFiles.contains(ssTable2DataFile));
assertTrue(tmpFiles.contains(ssTable2IndexFile));
List<File> files = directories.sstableLister(Directories.OnTxnErr.THROW).listFiles();
List<File> filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles();
assertNotNull(files);
assertNotNull(filesNoTmp);
assertTrue(files.contains(ssTable2DataFile));
assertTrue(files.contains(ssTable2IndexFile));
assertFalse(filesNoTmp.contains(ssTable2DataFile));
assertFalse(filesNoTmp.contains(ssTable2IndexFile));
transactionLog.finish();
//Now it should be empty since the transaction has finished
tmpFiles = TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
assertNotNull(tmpFiles);
assertEquals(0, tmpFiles.size());
filesNoTmp = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true).listFiles();
assertNotNull(filesNoTmp);
assertTrue(filesNoTmp.contains(ssTable2DataFile));
assertTrue(filesNoTmp.contains(ssTable2IndexFile));
sstable1.selfRef().release();
sstable2.selfRef().release();
}
@Test
public void testWrongChecksumLastLine() throws IOException
{
testCorruptRecord((t, s) ->
{ // Fake a commit with invalid checksum
FileUtils.append(t.getData().getLogFile().file,
String.format("commit:[%d,0,0][%d]",
System.currentTimeMillis(),
12345678L));
},
true);
}
@Test
public void testWrongChecksumSecondFromLastLine() throws IOException
{
testCorruptRecord((t, s) ->
{ // Fake two lines with invalid checksum
FileUtils.append(t.getData().getLogFile().file,
String.format("add:[ma-3-big,%d,4][%d]",
System.currentTimeMillis(),
12345678L));
FileUtils.append(t.getData().getLogFile().file,
String.format("commit:[%d,0,0][%d]",
System.currentTimeMillis(),
12345678L));
},
false);
}
@Test
public void testWrongChecksumLastLineMissingFile() throws IOException
{
testCorruptRecord((t, s) ->
{ // Fake a commit with invalid checksum and also delete one of the old files
for (String filePath : s.getAllFilePaths())
{
if (filePath.endsWith("Data.db"))
{
FileUtils.delete(filePath);
break;
}
}
FileUtils.append(t.getData().getLogFile().file,
String.format("commit:[%d,0,0][%d]",
System.currentTimeMillis(),
12345678L));
},
false);
}
@Test
public void testWrongChecksumLastLineWrongRecordFormat() throws IOException
{
testCorruptRecord((t, s) ->
{ // Fake a commit with invalid checksum and a wrong record format (extra spaces)
FileUtils.append(t.getData().getLogFile().file,
String.format("commit:[%d ,0 ,0 ][%d]",
System.currentTimeMillis(),
12345678L));
},
true);
}
@Test
public void testMissingChecksumLastLine() throws IOException
{
testCorruptRecord((t, s) ->
{
// Fake a commit without a checksum
FileUtils.append(t.getData().getLogFile().file,
String.format("commit:[%d,0,0]",
System.currentTimeMillis()));
},
true);
}
@Test
public void testMissingChecksumSecondFromLastLine() throws IOException
{
testCorruptRecord((t, s) ->
{ // Fake two lines without a checksum
FileUtils.append(t.getData().getLogFile().file,
String.format("add:[ma-3-big,%d,4]",
System.currentTimeMillis()));
FileUtils.append(t.getData().getLogFile().file,
String.format("commit:[%d,0,0]",
System.currentTimeMillis()));
},
false);
}
private void testCorruptRecord(BiConsumer<TransactionLog, SSTableReader> modifier, boolean isRecoverable) throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld = sstable(cfs, 0, 128);
SSTableReader sstableNew = sstable(cfs, 1, 128);
File dataFolder = sstableOld.descriptor.directory;
// simulate tracking sstables with a committed transaction except the checksum will be wrong
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstableNew);
transactionLog.obsoleted(sstableOld);
//Modify the transaction log in some way
modifier.accept(transactionLog, sstableOld);
String txnFilePath = transactionLog.getData().getLogFile().file.getPath();
transactionLog.complete(null);
sstableOld.selfRef().release();
sstableNew.selfRef().release();
if (isRecoverable)
{ // the corruption is recoverable, we assume there is a commit record
//This should return the old files and the tx log
assertFiles(Iterables.concat(sstableOld.getAllFilePaths(), Collections.singleton(txnFilePath)),
TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder));
//This should remove old files
TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
assertFiles(dataFolder.getPath(), Sets.newHashSet(sstableNew.getAllFilePaths()));
}
else
{ // if an intermediate line was modified, we cannot tell,
// it should just throw and handle the exception with a log message
//This should not return any files
assertEquals(Collections.emptyList(), new TransactionLog.FileLister(dataFolder.toPath(),
(file, type) -> type != Directories.FileType.FINAL,
Directories.OnTxnErr.IGNORE).list());
try
{
//This should throw a RuntimeException
new TransactionLog.FileLister(dataFolder.toPath(),
(file, type) -> type != Directories.FileType.FINAL,
Directories.OnTxnErr.THROW).list();
fail("Expected exception");
}
catch (RuntimeException ex)
{
// pass
ex.printStackTrace();
}
//This should not remove any files
TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
assertFiles(dataFolder.getPath(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
sstableOld.getAllFilePaths(),
Collections.singleton(txnFilePath))),
true);
}
}
@Test
public void testObsoletedDataFileUpdateTimeChanged() throws IOException
{
testObsoletedFilesChanged(sstable ->
{
// increase the modification time of the Data file
for (String filePath : sstable.getAllFilePaths())
{
if (filePath.endsWith("Data.db"))
assertTrue(new File(filePath).setLastModified(System.currentTimeMillis() + 60000)); //one minute later
}
});
}
private void testObsoletedFilesChanged(Consumer<SSTableReader> modifier) throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld = sstable(cfs, 0, 128);
SSTableReader sstableNew = sstable(cfs, 1, 128);
// simulate tracking sstables with a committed transaction except the checksum will be wrong
TransactionLog transactionLog = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLog);
transactionLog.trackNew(sstableNew);
/*TransactionLog.SSTableTidier tidier =*/ transactionLog.obsoleted(sstableOld);
//modify the old sstable files
modifier.accept(sstableOld);
//Fake a commit
transactionLog.getData().getLogFile().commit();
//This should not remove the old files
TransactionLog.removeUnfinishedLeftovers(cfs.metadata);
assertFiles(transactionLog.getDataFolder(), Sets.newHashSet(Iterables.concat(
sstableNew.getAllFilePaths(),
sstableOld.getAllFilePaths(),
Collections.singleton(transactionLog.getData().getLogFile().file.getPath()))));
sstableOld.selfRef().release();
sstableNew.selfRef().release();
// complete the transaction to avoid LEAK errors
assertNull(transactionLog.complete(null));
assertFiles(transactionLog.getDataFolder(), Sets.newHashSet(Iterables.concat(
sstableNew.getAllFilePaths(),
sstableOld.getAllFilePaths(),
Collections.singleton(transactionLog.getData().getLogFile().file.getPath()))));
}
@Test
public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
File dataFolder = sstable.descriptor.directory;
TransactionLog transactionLogs = new TransactionLog(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
TransactionLog.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
transactionLogs.finish();
sstable.markObsolete(tidier);
sstable.selfRef().release();
for (int i = 0; i < 1000; i++)
{
// This should race with the asynchronous deletion of txn log files
// It doesn't matter what it returns but it should not throw
TransactionLog.getTemporaryFiles(cfs.metadata, dataFolder);
}
}
private static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException
{
Directories dir = new Directories(cfs.metadata);
Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation);
Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
for (Component component : components)
{
File file = new File(descriptor.filenameFor(component));
file.createNewFile();
try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
{
raf.setLength(size);
}
}
SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor,
components,
cfs.metadata,
dFile,
iFile,
MockSchema.indexSummary.sharedCopy(),
new AlwaysPresentFilter(),
1L,
metadata,
SSTableReader.OpenReason.NORMAL,
header);
reader.first = reader.last = MockSchema.readerBounds(generation);
return reader;
}
private static void assertFiles(String dirPath, Set<String> expectedFiles)
{
assertFiles(dirPath, expectedFiles, false);
}
private static void assertFiles(String dirPath, Set<String> expectedFiles, boolean excludeNonExistingFiles)
{
TransactionLog.waitForDeletions();
File dir = new File(dirPath);
for (File file : dir.listFiles())
{
if (file.isDirectory())
continue;
String filePath = file.getPath();
assertTrue(filePath, expectedFiles.contains(filePath));
expectedFiles.remove(filePath);
}
if (excludeNonExistingFiles)
{
for (String filePath : expectedFiles)
{
File file = new File(filePath);
if (!file.exists())
expectedFiles.remove(filePath);
}
}
assertTrue(expectedFiles.toString(), expectedFiles.isEmpty());
}
private static void assertFiles(Iterable<String> filePaths, Set<File> expectedFiles)
{
for (String filePath : filePaths)
{
File file = new File(filePath);
assertTrue(filePath, expectedFiles.contains(file));
expectedFiles.remove(file);
}
assertTrue(expectedFiles.isEmpty());
}
}

View File

@ -1,581 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.junit.BeforeClass;
import org.junit.Test;
import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import junit.framework.Assert;
import org.apache.cassandra.MockSchema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.BufferedSegmentedFile;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
import org.apache.cassandra.utils.concurrent.Transactional;
public class TransactionLogsTest extends AbstractTransactionalTest
{
private static final String KEYSPACE = "TransactionLogsTest";
@BeforeClass
public static void setUp()
{
MockSchema.cleanup();
}
protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception
{
TransactionLogs.waitForDeletions();
SSTableReader.resetTidying();
return new TxnTest();
}
private static final class TxnTest extends TestableTransaction
{
private final static class Transaction extends Transactional.AbstractTransactional implements Transactional
{
final ColumnFamilyStore cfs;
final TransactionLogs txnLogs;
final SSTableReader sstableOld;
final SSTableReader sstableNew;
final TransactionLogs.SSTableTidier tidier;
public Transaction(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
{
this.cfs = cfs;
this.txnLogs = txnLogs;
this.sstableOld = sstable(cfs, 0, 128);
this.sstableNew = sstable(cfs, 1, 128);
assertNotNull(txnLogs);
assertNotNull(txnLogs.getId());
Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType());
txnLogs.trackNew(sstableNew);
tidier = txnLogs.obsoleted(sstableOld);
assertNotNull(tidier);
}
protected Throwable doCommit(Throwable accumulate)
{
sstableOld.markObsolete(tidier);
sstableOld.selfRef().release();
TransactionLogs.waitForDeletions();
Throwable ret = txnLogs.commit(accumulate);
sstableNew.selfRef().release();
return ret;
}
protected Throwable doAbort(Throwable accumulate)
{
tidier.abort();
TransactionLogs.waitForDeletions();
Throwable ret = txnLogs.abort(accumulate);
sstableNew.selfRef().release();
sstableOld.selfRef().release();
return ret;
}
protected void doPrepare()
{
txnLogs.prepareToCommit();
}
protected void assertInProgress() throws Exception
{
assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
sstableOld.getAllFilePaths())));
assertFiles(txnLogs.getLogsFolder(), Sets.newHashSet(txnLogs.getData().oldLog().file.getPath(),
txnLogs.getData().newLog().file.getPath()));
assertEquals(2, TransactionLogs.getLogFiles(cfs.metadata).size());
}
protected void assertPrepared() throws Exception
{
}
protected void assertAborted() throws Exception
{
assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
}
protected void assertCommitted() throws Exception
{
assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
}
}
final Transaction txn;
private TxnTest() throws IOException
{
this(MockSchema.newCFS(KEYSPACE));
}
private TxnTest(ColumnFamilyStore cfs) throws IOException
{
this(cfs, new TransactionLogs(OperationType.COMPACTION, cfs.metadata));
}
private TxnTest(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
{
this(new Transaction(cfs, txnLogs));
}
private TxnTest(Transaction txn)
{
super(txn);
this.txn = txn;
}
protected void assertInProgress() throws Exception
{
txn.assertInProgress();
}
protected void assertPrepared() throws Exception
{
txn.assertPrepared();
}
protected void assertAborted() throws Exception
{
txn.assertAborted();
}
protected void assertCommitted() throws Exception
{
txn.assertCommitted();
}
}
@Test
public void testUntrack() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableNew = sstable(cfs, 1, 128);
// complete a transaction without keep the new files since they were untracked
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
transactionLogs.trackNew(sstableNew);
transactionLogs.untrackNew(sstableNew);
transactionLogs.finish();
assertFiles(transactionLogs.getDataFolder(), Collections.<String>emptySet());
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
sstableNew.selfRef().release();
}
@Test
public void testCommitSameDesc() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld1 = sstable(cfs, 0, 128);
SSTableReader sstableOld2 = sstable(cfs, 0, 256);
SSTableReader sstableNew = sstable(cfs, 1, 128);
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
transactionLogs.trackNew(sstableNew);
sstableOld1.setReplaced();
TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld2);
assertNotNull(tidier);
transactionLogs.finish();
sstableOld2.markObsolete(tidier);
sstableOld1.selfRef().release();
sstableOld2.selfRef().release();
TransactionLogs.waitForDeletions();
assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
sstableNew.selfRef().release();
}
@Test
public void testCommitOnlyNew() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
transactionLogs.trackNew(sstable);
transactionLogs.finish();
assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
sstable.selfRef().release();
}
@Test
public void testCommitOnlyOld() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
assertNotNull(tidier);
transactionLogs.finish();
sstable.markObsolete(tidier);
sstable.selfRef().release();
TransactionLogs.waitForDeletions();
assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
}
@Test
public void testAbortOnlyNew() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
transactionLogs.trackNew(sstable);
transactionLogs.abort();
sstable.selfRef().release();
assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
}
@Test
public void testAbortOnlyOld() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
assertNotNull(tidier);
tidier.abort();
transactionLogs.abort();
sstable.selfRef().release();
assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
}
private File copyToTmpFile(File file) throws IOException
{
File ret = File.createTempFile(file.getName(), ".tmp");
ret.deleteOnExit();
Files.copy(file.toPath(), ret.toPath(), StandardCopyOption.REPLACE_EXISTING);
return ret;
}
@Test
public void testRemoveUnfinishedLeftovers_newLogFound() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld = sstable(cfs, 0, 128);
SSTableReader sstableNew = sstable(cfs, 1, 128);
// simulate tracking sstables with a failed transaction (new log file NOT deleted)
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
transactionLogs.trackNew(sstableNew);
TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
for (String p : sstableNew.getAllFilePaths())
tmpFiles.add(new File(p));
sstableNew.selfRef().release();
sstableOld.selfRef().release();
Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory));
// normally called at startup
TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
// sstable should not have been removed because the new log was found
Directories directories = new Directories(cfs.metadata);
Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
assertEquals(1, sstables.size());
assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
tidier.run();
// copy old transaction files contents back or transactionlogs will throw assertions
Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
transactionLogs.close();
}
@Test
public void testRemoveUnfinishedLeftovers_oldLogFound() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstableOld = sstable(cfs, 0, 128);
SSTableReader sstableNew = sstable(cfs, 1, 128);
// simulate tracking sstables with a committed transaction (new log file deleted)
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
transactionLogs.trackNew(sstableNew);
TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
transactionLogs.getData().newLog().delete(false);
Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
for (String p : sstableOld.getAllFilePaths())
tmpFiles.add(new File(p));
sstableNew.selfRef().release();
sstableOld.selfRef().release();
Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory));
// normally called at startup
TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
// sstable should have been removed because there was no new log.
Directories directories = new Directories(cfs.metadata);
Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
assertEquals(1, sstables.size());
assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
tidier.run();
// copy old transaction files contents back or transactionlogs will throw assertions
Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
transactionLogs.close();
}
@Test
public void testGetTemporaryFiles() throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable1 = sstable(cfs, 0, 128);
File dataFolder = sstable1.descriptor.directory;
Set<File> tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
assertNotNull(tmpFiles);
assertEquals(0, tmpFiles.size());
TransactionLogs transactionLogs = new TransactionLogs(OperationType.WRITE, cfs.metadata);
Directories directories = new Directories(cfs.metadata);
File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
SSTableReader sstable2 = sstable(cfs, 1, 128);
transactionLogs.trackNew(sstable2);
Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
assertEquals(2, sstables.size());
File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length;
assertTrue(numNewFiles == sstable2.getAllFilePaths().size());
tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
assertNotNull(tmpFiles);
assertEquals(numNewFiles + 2, tmpFiles.size()); //the extra files are the transaction log files
File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA));
File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX));
assertTrue(tmpFiles.contains(ssTable2DataFile));
assertTrue(tmpFiles.contains(ssTable2IndexFile));
List<File> files = directories.sstableLister().listFiles();
List<File> filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
assertNotNull(files);
assertNotNull(filesNoTmp);
assertTrue(files.contains(ssTable2DataFile));
assertTrue(files.contains(ssTable2IndexFile));
assertFalse(filesNoTmp.contains(ssTable2DataFile));
assertFalse(filesNoTmp.contains(ssTable2IndexFile));
transactionLogs.finish();
//Now it should be empty since the transaction has finished
tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
assertNotNull(tmpFiles);
assertEquals(0, tmpFiles.size());
filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
assertNotNull(filesNoTmp);
assertTrue(filesNoTmp.contains(ssTable2DataFile));
assertTrue(filesNoTmp.contains(ssTable2IndexFile));
sstable1.selfRef().release();
sstable2.selfRef().release();
}
@Test
public void testGetTemporaryFilesSafeAfterObsoletion() throws Throwable
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
SSTableReader sstable = sstable(cfs, 0, 128);
File dataFolder = sstable.descriptor.directory;
TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
assertNotNull(transactionLogs);
TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
transactionLogs.finish();
sstable.markObsolete(tidier);
sstable.selfRef().release();
for (int i = 0; i < 1000; i++)
{
// This should race with the asynchronous deletion of txn log files
// It doesn't matter what it returns but it should not throw
TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
}
}
private static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException
{
Directories dir = new Directories(cfs.metadata);
Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation);
Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
for (Component component : components)
{
File file = new File(descriptor.filenameFor(component));
file.createNewFile();
try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
{
raf.setLength(size);
}
}
SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor,
components,
cfs.metadata,
dFile,
iFile,
MockSchema.indexSummary.sharedCopy(),
new AlwaysPresentFilter(),
1L,
metadata,
SSTableReader.OpenReason.NORMAL,
header);
reader.first = reader.last = MockSchema.readerBounds(generation);
return reader;
}
private static void assertFiles(String dirPath, Set<String> expectedFiles)
{
File dir = new File(dirPath);
for (File file : dir.listFiles())
{
if (file.isDirectory())
continue;
String filePath = file.getPath();
assertTrue(filePath, expectedFiles.contains(filePath));
expectedFiles.remove(filePath);
}
assertTrue(expectedFiles.isEmpty());
}
}

View File

@ -99,12 +99,5 @@ public class CQLSSTableWriterClientTest
File[] dataFiles = this.testDirectory.listFiles(filter);
assertEquals(2, dataFiles.length);
File transactionsFolder = Directories.getTransactionsDirectory(testDirectory);
assertTrue(transactionsFolder.exists());
File[] opFiles = transactionsFolder.listFiles();
assertEquals(0, opFiles.length);
}
}

View File

@ -45,7 +45,6 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class CQLSSTableWriterTest
{

View File

@ -44,7 +44,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionController;
@ -109,7 +109,7 @@ public class SSTableRewriterTest extends SchemaLoader
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
store.truncateBlocking();
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
}
@Test
@ -145,7 +145,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
writer.finish();
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
validateCFS(cfs);
int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
assertEquals(1, filecounts);
@ -177,7 +177,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
writer.finish();
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
validateCFS(cfs);
int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
assertEquals(1, filecounts);
@ -232,7 +232,7 @@ public class SSTableRewriterTest extends SchemaLoader
assertTrue(checked);
writer.finish();
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
validateCFS(cfs);
int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
assertEquals(1, filecounts);
@ -277,12 +277,12 @@ public class SSTableRewriterTest extends SchemaLoader
// open till .abort() is called (via the builder)
if (!FBUtilities.isWindows())
{
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertFileCounts(dir.list());
}
writer.abort();
txn.abort();
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
int datafiles = assertFileCounts(dir.list());
assertEquals(datafiles, 0);
validateCFS(cfs);
@ -328,7 +328,7 @@ public class SSTableRewriterTest extends SchemaLoader
sstables = rewriter.finish();
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
long sum = 0;
for (SSTableReader x : cfs.getLiveSSTables())
@ -337,7 +337,7 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, StorageMetrics.load.getCount());
assertEquals(files, sstables.size());
assertEquals(files, cfs.getLiveSSTables().size());
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
// tmplink and tmp files should be gone:
assertEquals(sum, cfs.metric.totalDiskSpaceUsed.getCount());
@ -382,7 +382,7 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(files, sstables.size());
assertEquals(files, cfs.getLiveSSTables().size());
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertFileCounts(s.descriptor.directory.list());
validateCFS(cfs);
@ -519,7 +519,7 @@ public class SSTableRewriterTest extends SchemaLoader
test.run(scanner, controller, s, cfs, rewriter, txn);
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(1, cfs.getLiveSSTables().size());
@ -567,7 +567,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertEquals(files - 1, cfs.getLiveSSTables().size()); // we never wrote anything to the last file
assertFileCounts(s.descriptor.directory.list());
@ -609,7 +609,7 @@ public class SSTableRewriterTest extends SchemaLoader
sstables = rewriter.finish();
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertFileCounts(s.descriptor.directory.list());
validateCFS(cfs);
}
@ -650,7 +650,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
assertEquals(files, sstables.size());
assertEquals(files, cfs.getLiveSSTables().size());
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertFileCounts(s.descriptor.directory.list());
validateCFS(cfs);
@ -670,7 +670,7 @@ public class SSTableRewriterTest extends SchemaLoader
splitter.split();
assertFileCounts(s.descriptor.directory.list());
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
for (File f : s.descriptor.directory.listFiles())
{
@ -746,7 +746,7 @@ public class SSTableRewriterTest extends SchemaLoader
s.selfRef().release();
}
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
int filecount = assertFileCounts(s.descriptor.directory.list());
assertEquals(filecount, 1);
@ -825,7 +825,7 @@ public class SSTableRewriterTest extends SchemaLoader
rewriter.finish();
}
validateKeys(keyspace);
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
validateCFS(cfs);
truncate(cfs);
}
@ -923,7 +923,7 @@ public class SSTableRewriterTest extends SchemaLoader
public static void truncate(ColumnFamilyStore cfs)
{
cfs.truncateBlocking();
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());

View File

@ -51,17 +51,10 @@ public class MetadataSerializerTest
CFMetaData cfm = SchemaLoader.standardCFMD("ks1", "cf1");
ReplayPosition rp = new ReplayPosition(11L, 12);
MetadataCollector collector = new MetadataCollector(cfm.comparator).replayPosition(rp);
Set<Integer> ancestors = Sets.newHashSet(1, 2, 3, 4);
for (int i : ancestors)
collector.addAncestor(i);
String partitioner = RandomPartitioner.class.getCanonicalName();
double bfFpChance = 0.1;
Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance, 0, SerializationHeader.make(cfm, Collections.EMPTY_LIST));

View File

@ -39,8 +39,9 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.lifecycle.TransactionLog;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.ConfigurationException;
@ -205,7 +206,7 @@ public class DefsTest
ColumnFamilyStore store = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assertNotNull(store);
store.forceBlockingFlush();
assertTrue(store.directories.sstableLister().list().size() > 0);
assertTrue(store.directories.sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);
@ -227,7 +228,7 @@ public class DefsTest
// verify that the files are gone.
Supplier<Object> lambda = () -> {
for (File file : store.directories.sstableLister().listFiles())
for (File file : store.directories.sstableLister(Directories.OnTxnErr.THROW).listFiles())
{
if (file.getPath().endsWith("Data.db") && !new File(file.getPath().replace("Data.db", "Compacted")).exists())
return false;
@ -276,7 +277,7 @@ public class DefsTest
ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assertNotNull(cfs);
cfs.forceBlockingFlush();
assertTrue(!cfs.directories.sstableLister().list().isEmpty());
assertTrue(!cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
MigrationManager.announceKeyspaceDrop(ks.name);
@ -521,7 +522,7 @@ public class DefsTest
// check
assertTrue(cfs.indexManager.getIndexes().isEmpty());
TransactionLogs.waitForDeletions();
TransactionLog.waitForDeletions();
assertFalse(new File(desc.filenameFor(Component.DATA)).exists());
}