Re-checking them in to fix some stuff.

git-svn-id: https://svn.apache.org/repos/asf/incubator/cassandra/trunk@759022 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Avinash Lakshman 2009-03-27 05:21:26 +00:00
parent 2b27a9fade
commit d19877f286
4 changed files with 239 additions and 112 deletions

View File

@ -18,20 +18,24 @@
package org.apache.cassandra.dht;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
/**
@ -44,18 +48,18 @@ public class BootStrapper implements Runnable
/* endpoints that need to be bootstrapped */
protected EndPoint[] targets_ = new EndPoint[0];
/* tokens of the nodes being bootstapped. */
protected final Token[] tokens_;
protected BigInteger[] tokens_ = new BigInteger[0];
protected TokenMetadata tokenMetadata_ = null;
private List<EndPoint> filters_ = new ArrayList<EndPoint>();
public BootStrapper(EndPoint[] target, Token... token)
public BootStrapper(EndPoint[] target, BigInteger[] token)
{
targets_ = target;
tokens_ = token;
tokenMetadata_ = StorageService.instance().getTokenMetadata();
}
public BootStrapper(EndPoint[] target, Token[] token, EndPoint[] filters)
public BootStrapper(EndPoint[] target, BigInteger[] token, EndPoint[] filters)
{
this(target, token);
Collections.addAll(filters_, filters);
@ -67,14 +71,14 @@ public class BootStrapper implements Runnable
{
logger_.debug("Beginning bootstrap process for " + targets_ + " ...");
/* copy the token to endpoint map */
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* remove the tokens associated with the endpoints being bootstrapped */
for (Token token : tokens_)
for ( BigInteger token : tokens_ )
{
tokenToEndPointMap.remove(token);
}
Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
logger_.debug("Total number of old ranges " + oldRanges.length);
/*
@ -126,5 +130,21 @@ public class BootStrapper implements Runnable
logger_.debug( LogUtil.throwableToString(th) );
}
}
private Range getMyOldRange()
{
Map<EndPoint, BigInteger> oldEndPointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
Map<BigInteger, EndPoint> oldTokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
oldEndPointToTokenMap.remove(targets_);
oldTokenToEndPointMap.remove(tokens_);
BigInteger myToken = oldEndPointToTokenMap.get(StorageService.getLocalStorageEndPoint());
List<BigInteger> allTokens = new ArrayList<BigInteger>(oldTokenToEndPointMap.keySet());
Collections.sort(allTokens);
int index = Collections.binarySearch(allTokens, myToken);
/* Calculate the lhs for the range */
BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
return new Range( lhs, myToken );
}
}

View File

@ -18,19 +18,19 @@
package org.apache.cassandra.dht;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.log4j.Logger;
class LeaveJoinProtocolHelper
@ -42,20 +42,20 @@ class LeaveJoinProtocolHelper
* a-----x-----y-----b then we want a mapping from
* (a, b] --> (a, x], (x, y], (y, b]
*/
protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, BigInteger[] allTokens)
{
Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
Token[] tokens = new Token[allTokens.length];
BigInteger[] tokens = new BigInteger[allTokens.length];
System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
Arrays.sort(tokens);
Range prevRange = null;
Token prevToken = null;
BigInteger prevToken = null;
boolean bVal = false;
for ( Range oldRange : oldRanges )
{
if (bVal)
if ( bVal && prevRange != null )
{
bVal = false;
List<Range> subRanges = splitRanges.get(prevRange);
@ -65,7 +65,7 @@ class LeaveJoinProtocolHelper
prevRange = oldRange;
prevToken = oldRange.left();
for (Token token : tokens)
for ( BigInteger token : tokens )
{
List<Range> subRanges = splitRanges.get(oldRange);
if ( oldRange.contains(token) )

View File

@ -18,20 +18,21 @@
package org.apache.cassandra.dht;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
/**
@ -47,11 +48,11 @@ public class LeaveJoinProtocolImpl implements Runnable
/* endpoints that are to be moved. */
protected EndPoint[] targets_ = new EndPoint[0];
/* position where they need to be moved */
protected final Token[] tokens_;
protected BigInteger[] tokens_ = new BigInteger[0];
/* token metadata information */
protected TokenMetadata tokenMetadata_ = null;
public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
public LeaveJoinProtocolImpl(EndPoint[] targets, BigInteger[] tokens)
{
targets_ = targets;
tokens_ = tokens;
@ -64,24 +65,24 @@ public class LeaveJoinProtocolImpl implements Runnable
{
logger_.debug("Beginning leave/join process for ...");
/* copy the token to endpoint map */
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* copy the endpoint to token map */
Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
Map<EndPoint, BigInteger> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
logger_.debug("Total number of old ranges " + oldRanges.length);
/* Calculate the list of nodes that handle the old ranges */
Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
/* Remove the tokens of the nodes leaving the ring */
Set<Token> tokens = getTokensForLeavingNodes();
Set<BigInteger> tokens = getTokensForLeavingNodes();
oldTokens.removeAll(tokens);
Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
/* Get expanded range to initial range mapping */
Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
/* add the new token positions to the old tokens set */
for (Token token : tokens_)
for ( BigInteger token : tokens_ )
oldTokens.add(token);
Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
/* replace the ranges that were split with the split ranges in the old configuration */
@ -195,12 +196,12 @@ public class LeaveJoinProtocolImpl implements Runnable
}
}
private Set<Token> getTokensForLeavingNodes()
private Set<BigInteger> getTokensForLeavingNodes()
{
Set<Token> tokens = new HashSet<Token>();
Set<BigInteger> tokens = new HashSet<BigInteger>();
for ( EndPoint target : targets_ )
{
tokens.add(tokenMetadata_.getToken(target));
tokens.add( tokenMetadata_.getToken(target) );
}
return tokens;
}
@ -275,16 +276,16 @@ public class LeaveJoinProtocolImpl implements Runnable
public static void main(String[] args) throws Throwable
{
StorageService ss = StorageService.instance();
ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(3), new EndPoint("A", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(6), new EndPoint("B", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(9), new EndPoint("C", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(12), new EndPoint("D", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(15), new EndPoint("E", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(18), new EndPoint("F", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(21), new EndPoint("G", 7000));
ss.updateTokenMetadata(BigInteger.valueOf(24), new EndPoint("H", 7000));
Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new BigInteger[]{BigInteger.valueOf(22), BigInteger.valueOf(23)} );
runnable.run();
}
}

View File

@ -21,9 +21,17 @@ package org.apache.cassandra.dht;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.cassandra.gms.GossipDigest;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.IFileReader;
import org.apache.cassandra.io.IFileWriter;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
@ -43,12 +51,28 @@ public class Range implements Comparable<Range>
public static ICompactSerializer<Range> serializer()
{
return serializer_;
}
private Token left_;
private Token right_;
}
public Range(Token left, Token right)
public static boolean isKeyInRanges(List<Range> ranges, String key)
{
if(ranges == null )
return false;
for ( Range range : ranges)
{
if(range.contains(StorageService.hash(key)))
{
return true ;
}
}
return false;
}
private BigInteger left_;
private BigInteger right_;
public Range(BigInteger left, BigInteger right)
{
left_ = left;
right_ = right;
@ -58,7 +82,7 @@ public class Range implements Comparable<Range>
* Returns the left endpoint of a range.
* @return left endpoint
*/
public Token left()
public BigInteger left()
{
return left_;
}
@ -67,20 +91,19 @@ public class Range implements Comparable<Range>
* Returns the right endpoint of a range.
* @return right endpoint
*/
public Token right()
public BigInteger right()
{
return right_;
}
/**
* Helps determine if a given point on the DHT ring is contained
* in the range in question.
* @param bi point in question
* @return true if the point contains within the range else false.
*/
public boolean contains(Token bi)
boolean isSplitRequired()
{
if ( left_.compareTo(right_) > 0 )
return ( left_.subtract(right_).signum() >= 0 );
}
public boolean isSplitBy(BigInteger bi)
{
if ( left_.subtract(right_).signum() > 0 )
{
/*
* left is greater than right we are wrapping around.
@ -90,31 +113,125 @@ public class Range implements Comparable<Range>
* (2) k < b -- return true
* (3) b < k < a -- return false
*/
if ( bi.compareTo(left_) >= 0 )
if ( bi.subtract(left_).signum() > 0 )
return true;
else return right_.compareTo(bi) > 0;
else if (right_.subtract(bi).signum() > 0 )
return true;
else
return false;
}
else if ( left_.compareTo(right_) < 0 )
else if ( left_.subtract(right_).signum() < 0 )
{
/*
* This is the range [a, b) where a < b.
*/
return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) >=0 );
return ( bi.subtract(left_).signum() > 0 && right_.subtract(bi).signum() > 0 );
}
else
{
// should never be here.
return true;
}
}
/**
* Helps determine if a given point on the DHT ring is contained
* in the range in question.
* @param bi point in question
* @return true if the point contains within the range else false.
*/
public boolean contains(BigInteger bi)
{
if ( left_.subtract(right_).signum() > 0 )
{
/*
* left is greater than right we are wrapping around.
* So if the interval is [a,b) where a > b then we have
* 3 cases one of which holds for any given token k.
* (1) k > a -- return true
* (2) k < b -- return true
* (3) b < k < a -- return false
*/
if ( bi.subtract(left_).signum() >= 0 )
return true;
else if (right_.subtract(bi).signum() > 0 )
return true;
else
return false;
}
else if ( left_.subtract(right_).signum() < 0 )
{
/*
* This is the range [a, b) where a < b.
*/
return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
}
else
{
return true;
}
}
/**
* Helps determine if a given range on the DHT ring is contained
* within the range associated with the <i>this</i> pointer.
* @param rhs rhs in question
* @return true if the point contains within the range else false.
*/
public boolean contains(Range rhs)
{
/*
* If (a, b] and (c, d} are not wrap arounds
* then return true if a <= c <= d <= b.
*/
if ( !isWrapAround(this) && !isWrapAround(rhs) )
{
if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(rhs.right_).signum() >= 0 )
return true;
else
return false;
}
/*
* If lhs is a wrap around and rhs is not then
* rhs.left >= lhs.left and rhs.right >= lhs.left.
*/
if ( isWrapAround(this) && !isWrapAround(rhs) )
{
if ( rhs.left_.subtract(left_).signum() >= 0 && rhs.right_.subtract(right_).signum() >= 0 )
return true;
else
return false;
}
/*
* If lhs is not a wrap around and rhs is a wrap
* around then we just return false.
*/
if ( !isWrapAround(this) && isWrapAround(rhs) )
return false;
if( isWrapAround(this) && isWrapAround(rhs) )
{
if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(right_).signum() >= 0 )
return true;
else
return false;
}
/* should never be here */
return false;
}
/**
* Tells if the given range is a wrap around.
* @param range
* @return
*/
private static boolean isWrapAround(Range range)
private boolean isWrapAround(Range range)
{
return range.left_.compareTo(range.right_) > 0;
boolean bVal = ( range.left_.subtract(range.right_).signum() > 0 ) ? true : false;
return bVal;
}
public int compareTo(Range rhs)
@ -132,28 +249,15 @@ public class Range implements Comparable<Range>
return right_.compareTo(rhs.right_);
}
public static boolean isKeyInRanges(String key, List<Range> ranges)
{
assert ranges != null;
Token token = StorageService.token(key);
for (Range range : ranges)
{
if(range.contains(token))
{
return true;
}
}
return false;
}
public boolean equals(Object o)
{
if ( !(o instanceof Range) )
return false;
Range rhs = (Range)o;
return left_.equals(rhs.left_) && right_.equals(rhs.right_);
if ( left_.equals(rhs.left_) && right_.equals(rhs.right_) )
return true;
else
return false;
}
public int hashCode()
@ -170,13 +274,15 @@ public class Range implements Comparable<Range>
class RangeSerializer implements ICompactSerializer<Range>
{
public void serialize(Range range, DataOutputStream dos) throws IOException
{
Token.serializer().serialize(range.left(), dos);
Token.serializer().serialize(range.right(), dos);
{
dos.writeUTF(range.left().toString());
dos.writeUTF(range.right().toString());
}
public Range deserialize(DataInputStream dis) throws IOException
{
return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
BigInteger left = new BigInteger(dis.readUTF());
BigInteger right = new BigInteger(dis.readUTF());
return new Range(left, right);
}
}