add SuperColumn support to forceFlush. split out recovery flushing into flushOnRecovery.

git-svn-id: https://svn.apache.org/repos/asf/incubator/cassandra/trunk@759209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Ellis 2009-03-27 16:35:33 +00:00
parent 6ba824e430
commit 66871216ce
5 changed files with 53 additions and 36 deletions

View File

@ -395,11 +395,11 @@ public class ColumnFamilyStore
binaryMemtable_.get().put(key, buffer);
}
void forceFlush(boolean fRecovery) throws IOException
void forceFlush() throws IOException
{
//MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
//memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
memtable_.get().forceflush(this, fRecovery);
memtable_.get().forceflush(this);
}
void forceFlushBinary() throws IOException
@ -1512,4 +1512,14 @@ public class ColumnFamilyStore
+ totalBytesWritten + " Total keys read ..." + totalkeysRead);
return;
}
public boolean isSuper()
{
return DatabaseDescriptor.getColumnType(getColumnFamilyName()).equals("Super");
}
public void flushMemtableOnRecovery() throws IOException
{
memtable_.get().flushOnRecovery();
}
}

View File

@ -142,7 +142,7 @@ public class HintedHandOffManager implements IComponentShutdown
if(hintedColumnFamily == null)
{
// Force flush now
columnFamilyStore_.forceFlush(false);
columnFamilyStore_.forceFlush();
return;
}
Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
@ -177,7 +177,7 @@ public class HintedHandOffManager implements IComponentShutdown
}
}
// Force flush now
columnFamilyStore_.forceFlush(false);
columnFamilyStore_.forceFlush();
// Now do a major compaction
columnFamilyStore_.forceCompaction(null, null, 0, null);

View File

@ -20,7 +20,12 @@ package org.apache.cassandra.db;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -124,7 +129,7 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
key_ = key;
columnFamilyName_ = cfName;
}
Getter(String key, String cfName, IFilter filter)
{
this(key, cfName);
@ -133,7 +138,7 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
public ColumnFamily call()
{
ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
return cf;
}
}
@ -179,7 +184,7 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
}
/**
* Compares two Memtable based on creation time.
* Compares two Memtable based on creation time.
* @param rhs
* @return
*/
@ -283,27 +288,30 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
/*
* This version is used to switch memtable and force flush.
*/
void forceflush(ColumnFamilyStore cfStore, boolean fRecovery) throws IOException
public void forceflush(ColumnFamilyStore cfStore) throws IOException
{
if(!fRecovery)
RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
try
{
RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
try
{
rm.add(cfStore.columnFamily_ + ":Column","0".getBytes());
rm.apply();
}
catch(ColumnFamilyNotDefinedException ex)
{
logger_.debug(LogUtil.throwableToString(ex));
}
if (cfStore.isSuper())
{
rm.add(cfStore.getColumnFamilyName() + ":SC1:Column", "0".getBytes(), 0);
} else {
rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
}
rm.apply();
}
else
catch(ColumnFamilyNotDefinedException ex)
{
flush(CommitLog.CommitLogContext.NULL);
logger_.debug(LogUtil.throwableToString(ex));
}
}
void flushOnRecovery() throws IOException {
flush(CommitLog.CommitLogContext.NULL);
}
private void resolve(String key, ColumnFamily columnFamily)
{
ColumnFamily oldCf = columnFamilies_.get(key);
@ -397,7 +405,7 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
}
return cf;
}
ColumnFamily get(String key, String cfName, IFilter filter)
{
printExecutorStats();
@ -431,10 +439,6 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
apartments_.get(cfName_).submit(deleter);
}
/*
* param recoveryMode - indicates if this was invoked during
* recovery.
*/
void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
{
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
@ -451,18 +455,18 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
String directory = DatabaseDescriptor.getDataFileLocation();
String filename = cfStore.getNextFileName();
SSTable ssTable = new SSTable(directory, filename, pType);
switch (pType)
switch (pType)
{
case OPHF:
flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
break;
default:
flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
break;
}
}
}
private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
{
/* List of primary keys in sorted order */
@ -489,7 +493,7 @@ public class Memtable implements MemtableMBean, Comparable<Memtable>
cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
buffer.close();
}
private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
{
List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );

View File

@ -27,10 +27,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )

View File

@ -847,10 +847,15 @@ public class Table
Set<String> cfNames = columnFamilyStores_.keySet();
for ( String cfName : cfNames )
{
columnFamilyStores_.get(cfName).forceFlush(fRecovery);
if (fRecovery) {
columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
} else {
columnFamilyStores_.get(cfName).forceFlush();
}
}
}
void delete(Row row) throws IOException
{
String key = row.key();
@ -895,7 +900,7 @@ public class Table
}
else if(column.timestamp() == 3)
{
cfStore.forceFlush(false);
cfStore.forceFlush();
}
else if(column.timestamp() == 4)
{