syste log logSize++ when getPhysicalPlan

This commit is contained in:
CGF 2017-05-24 11:01:41 +08:00
parent 63edc001ac
commit d538db4764
5 changed files with 117 additions and 53 deletions

View File

@ -42,7 +42,7 @@ public class TSFileDBConfig {
public int defaultFetchSize = 1000000;
public String writeLogPath = "src/main/resources/writeLog.log";
public int LogCompactSize = 100000;
public int LogCompactSize = 10000;
public int LogMemorySize = 1;
public TSFileDBConfig() {

View File

@ -22,7 +22,7 @@ public class WriteLogManager {
private static HashMap<String, WriteLogNode> logNodeMaps;
public static final int BUFFERWRITER = 0, OVERFLOW = 1;
private static List<String> recoveryPathList = new ArrayList<>();
public static boolean isRecovering;
public static boolean isRecovering = false;
private WriteLogManager() {
logNodeMaps = new HashMap<>();
@ -85,6 +85,7 @@ public class WriteLogManager {
Iterator<String> iterator = recoveryPathList.iterator();
while (iterator.hasNext()) {
WriteLogNode node = getWriteLogNode(iterator.next());
node.recovery();
PhysicalPlan plan = node.getPhysicalPlan();
if (plan != null) {
return plan;

View File

@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.List;
/**
*
* System log persist interface.
*
* @author CGF
@ -103,7 +102,7 @@ public class WriteLogNode {
byte[] flushStart = new byte[1];
flushStart[0] = (byte) Operator.OperatorType.OVERFLOWFLUSHSTART.ordinal();
writer.write(flushStart);
writer.write(BytesUtils.intToTwoBytes(flushStart.length)); // 2 bytes to represent the content size
writer.write(BytesUtils.intToBytes(flushStart.length)); // 2 bytes to represent the content size
LOG.info("Write overflow log start.");
}
@ -116,7 +115,7 @@ public class WriteLogNode {
byte[] flushEnd = new byte[1];
flushEnd[0] = (byte) Operator.OperatorType.OVERFLOWFLUSHEND.ordinal();
writer.write(flushEnd);
writer.write(BytesUtils.intToTwoBytes(flushEnd.length));
writer.write(BytesUtils.intToBytes(flushEnd.length));
hasOverflowFlush = true;
LOG.info("Write overflow log end.");
checkLogsCompactFileSize(false);
@ -131,7 +130,7 @@ public class WriteLogNode {
byte[] flushStart = new byte[1];
flushStart[0] = (byte) Operator.OperatorType.BUFFERFLUSHSTART.ordinal();
writer.write(flushStart);
writer.write(BytesUtils.intToTwoBytes(flushStart.length));
writer.write(BytesUtils.intToBytes(flushStart.length));
LOG.info("Write bufferwrite log start.");
}
@ -144,7 +143,7 @@ public class WriteLogNode {
byte[] flushEnd = new byte[1];
flushEnd[0] = (byte) Operator.OperatorType.BUFFERFLUSHEND.ordinal();
writer.write(flushEnd);
writer.write(BytesUtils.intToTwoBytes(flushEnd.length));
writer.write(BytesUtils.intToBytes(flushEnd.length));
LOG.info("Write bufferwrite log end.");
hasBufferWriteFlush = true;
checkLogsCompactFileSize(false);
@ -153,7 +152,7 @@ public class WriteLogNode {
}
/**
* Compact logs in path.log.
* Compact logs in path.log.
*
* @throws IOException
*/
@ -186,14 +185,14 @@ public class WriteLogNode {
bytesInMemory.add(planBytes);
totalBytes += planBytes.length;
}
byte[] bytesToSerialize = new byte[totalBytes + 2*plansInMemory.size()];
byte[] bytesToSerialize = new byte[totalBytes + 4 * plansInMemory.size()];
int pos = 0;
for (byte[] bs : bytesInMemory) {
System.arraycopy(bs, 0, bytesToSerialize, pos, bs.length);
pos += bs.length;
byte[] len = BytesUtils.intToTwoBytes(bs.length);
byte[] len = BytesUtils.intToBytes(bs.length);
System.arraycopy(len, 0, bytesToSerialize, pos, len.length);
pos += 2;
pos += 4;
}
if (writer == null) {
@ -225,6 +224,9 @@ public class WriteLogNode {
}
PhysicalPlan plan = reader.getPhysicalPlan();
if (plan != null) {
logSize++;
}
return plan;
}

View File

@ -46,9 +46,9 @@ public class LocalFileLogReader implements WriteLogReadable {
return false;
}
raf.seek(pos - 2);
byte[] opeContentLengthBytes = new byte[2];
byte[] opeContentLengthBytes = new byte[4];
raf.read(opeContentLengthBytes);
int opeContentLength = BytesUtils.twoBytesToInt(opeContentLengthBytes);
int opeContentLength = BytesUtils.bytesToInt(opeContentLengthBytes);
byte[] opeTypeBytes = new byte[1];
raf.seek(pos - 2 - opeContentLength);
@ -70,9 +70,9 @@ public class LocalFileLogReader implements WriteLogReadable {
public byte[] nextOperator() throws IOException {
raf.seek(pos - 2);
byte[] opeContentLengthBytes = new byte[2];
byte[] opeContentLengthBytes = new byte[4];
raf.read(opeContentLengthBytes);
int opeContentLength = BytesUtils.twoBytesToInt(opeContentLengthBytes);
int opeContentLength = BytesUtils.bytesToInt(opeContentLengthBytes);
byte[] opeContent = new byte[opeContentLength];
raf.seek(pos - 2 - opeContentLength);
@ -103,37 +103,37 @@ public class LocalFileLogReader implements WriteLogReadable {
int bufferVis = -1;
while (i > 0) {
lraf.seek(i - 2);
byte[] opeContentLengthBytes = new byte[2];
lraf.seek(i - 4);
byte[] opeContentLengthBytes = new byte[4];
lraf.read(opeContentLengthBytes);
int opeContentLength = BytesUtils.twoBytesToInt(opeContentLengthBytes);
int opeContentLength = BytesUtils.bytesToInt(opeContentLengthBytes);
byte[] opeTypeBytes = new byte[1];
lraf.seek(i - 2 - opeContentLength);
lraf.seek(i - 4 - opeContentLength);
lraf.read(opeTypeBytes);
int opeType = (int) opeTypeBytes[0];
if (opeType == OperatorType.OVERFLOWFLUSHEND.ordinal()) {
overflowVis = 1;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
} else if (opeType == OperatorType.OVERFLOWFLUSHSTART.ordinal()) {
if (overflowVis == 1)
overflowVis = 2;
else
overflowVis = 3;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
} else if (opeType == OperatorType.BUFFERFLUSHEND.ordinal()) {
bufferVis = 1;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
} else if (opeType == OperatorType.BUFFERFLUSHSTART.ordinal()) {
if (bufferVis == 1)
bufferVis = 2;
else
bufferVis = 3;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
}
@ -146,20 +146,20 @@ public class LocalFileLogReader implements WriteLogReadable {
lraf.read(insertTypeBytes);
int insertType = (int) insertTypeBytes[0];
if (insertType == 1 && bufferVis != 2) { // bufferwrite insert
bufferStartList.add(i - 2 - opeContentLength);
bufferStartList.add(i - 4 - opeContentLength);
bufferLengthList.add(opeContentLength);
bufferTailCount++;
} else if (insertType == 2 && overflowVis != 2) { // overflow insert
overflowStartList.add(i - 2 - opeContentLength);
overflowStartList.add(i - 4 - opeContentLength);
overflowLengthList.add(opeContentLength);
overflowTailCount++;
}
} else if (overflowVis != 2) { // overflow update/delete
overflowStartList.add(i - 2 - opeContentLength);
overflowStartList.add(i - 4 - opeContentLength);
overflowLengthList.add(opeContentLength);
overflowTailCount++;
}
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
}
}
@ -217,38 +217,38 @@ public class LocalFileLogReader implements WriteLogReadable {
int backUpTotalLength = 0;
while (i > 0) {
lraf.seek(i - 2);
byte[] opeContentLengthBytes = new byte[2];
lraf.seek(i - 4);
byte[] opeContentLengthBytes = new byte[4];
lraf.read(opeContentLengthBytes);
int opeContentLength = BytesUtils.twoBytesToInt(opeContentLengthBytes);
int opeContentLength = BytesUtils.bytesToInt(opeContentLengthBytes);
byte[] opeTypeBytes = new byte[1];
int backUpPos = i - 2 - opeContentLength;
int backUpPos = i - 4 - opeContentLength;
lraf.seek(backUpPos);
lraf.read(opeTypeBytes);
int opeType = (int) opeTypeBytes[0];
if (opeType == OperatorType.OVERFLOWFLUSHEND.ordinal()) {
overflowVis = 1;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
} else if (opeType == OperatorType.OVERFLOWFLUSHSTART.ordinal()) {
if (overflowVis == 1)
overflowVis = 2;
else
overflowVis = 3;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
} else if (opeType == OperatorType.BUFFERFLUSHEND.ordinal()) {
bufferVis = 1;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
} else if (opeType == OperatorType.BUFFERFLUSHSTART.ordinal()) {
if (bufferVis == 1)
bufferVis = 2;
else
bufferVis = 3;
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
continue;
}
@ -261,26 +261,26 @@ public class LocalFileLogReader implements WriteLogReadable {
lraf.read(insertTypeBytes);
int insertType = (int) insertTypeBytes[0];
if (insertType == 1 && bufferVis != 2) { // bufferwrite insert
byte[] dataBackUp = new byte[opeContentLength + 2];
byte[] dataBackUp = new byte[opeContentLength + 4];
lraf.seek(backUpPos);
lraf.read(dataBackUp);
backUpBytesList.add(dataBackUp);
backUpTotalLength += dataBackUp.length;
} else if (insertType == 2 && overflowVis != 2) { // overflow insert
byte[] dataBackUp = new byte[opeContentLength + 2];
byte[] dataBackUp = new byte[opeContentLength + 4];
lraf.seek(backUpPos);
lraf.read(dataBackUp);
backUpBytesList.add(dataBackUp);
backUpTotalLength += dataBackUp.length;
}
} else if (overflowVis != 2) { // overflow update/delete
byte[] dataBackUp = new byte[opeContentLength + 2];
byte[] dataBackUp = new byte[opeContentLength + 4];
lraf.seek(backUpPos);
lraf.read(dataBackUp);
backUpBytesList.add(dataBackUp);
backUpTotalLength += dataBackUp.length;
}
i -= (2 + opeContentLength);
i -= (4 + opeContentLength);
}
byte[] ans = new byte[backUpTotalLength];

View File

@ -14,13 +14,11 @@ import java.util.List;
*/
public class WriteLogNodeTest {
private static String device, sensor;
private static Path path = new Path("d1.s1");
private static String fileNode = "root.vehicle.d1";
@Test
public void bufferWriteOverflowFlushTest() throws IOException {
System.out.println("============bufferWriteOverflowFlushTest");
WriteLogNode node = new WriteLogNode(fileNode);
node.resetFileStatus();
node.write(new InsertPlan(1, 100L, "1.0", path));
@ -54,12 +52,13 @@ public class WriteLogNodeTest {
Assert.assertEquals(updatePlan.getValue(), "4.0");
}
cnt++;
output(plan);
}
node.resetFileStatus();
}
@Test
public void logMemorySizeTest() throws IOException {
System.out.println("============logMemorySizeTest");
WriteLogNode node = new WriteLogNode(fileNode);
node.resetFileStatus();
node.setLogMemorySize(100);
@ -99,11 +98,12 @@ public class WriteLogNodeTest {
// output(plan);
}
Assert.assertEquals(cnt, 201);
node.resetFileStatus();
}
@Test
public void logCompactTest() throws IOException {
System.out.println("============logCompactTest");
// need test bufferwrite, overflow flush
WriteLogNode node = new WriteLogNode(fileNode);
node.resetFileStatus();
node.setLogMemorySize(10);
@ -115,34 +115,95 @@ public class WriteLogNodeTest {
node.write(new UpdatePlan(i, i * 2, "2.0", path));
}
//node.write(new InsertPlan(1, 300L, "3.0", path));
node.overflowFlushStart();
node.overflowFlushEnd();
PhysicalPlan plan;
Assert.assertEquals(node.getPhysicalPlan(), null);
node.resetFileStatus();
List<String> measurementList = new ArrayList<>();
List<String> valueList = new ArrayList<>();
for (int i = 0; i <= 100; i++) {
measurementList.add("s0");
valueList.add(String.valueOf(i));
}
MultiInsertPlan multiInsertPlan = new MultiInsertPlan(1, fileNode, 1L, measurementList, valueList);
node.write(multiInsertPlan);
node.bufferFlushStart();
node.bufferFlushEnd();
PhysicalPlan plan = null;
while ((plan = node.getPhysicalPlan()) != null) {
// cnt++;
output(plan);
for (int i = 300; i <= 500; i++) {
if (i == 409) {
node.overflowFlushStart();
}
if (i == 470) {
node.overflowFlushEnd();
}
node.write(new UpdatePlan(i, i * 2, "8.0", path));
}
int cnt = 1;
while ((plan = node.getPhysicalPlan()) != null) {
cnt ++;
// output(plan);
}
Assert.assertEquals(cnt, 92);
node = new WriteLogNode(fileNode);
// test bufferwrite
node.setLogMemorySize(1);
node.setLogCompactSize(10);
node.resetFileStatus();
for (int i = 1;i <= 10;i++) {
measurementList = new ArrayList<>();
valueList = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
measurementList.add("s"+i+"-"+j);
valueList.add(String.valueOf(i));
}
multiInsertPlan = new MultiInsertPlan(1, fileNode, 1L, measurementList, valueList);
node.write(multiInsertPlan);
}
node.bufferFlushStart();
node.bufferFlushEnd();
for (int i = 1;i <= 1;i++) {
measurementList = new ArrayList<>();
valueList = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
measurementList.add("s"+i+"-"+j);
valueList.add(String.valueOf(i));
}
multiInsertPlan = new MultiInsertPlan(1, fileNode, 1L, measurementList, valueList);
node.write(multiInsertPlan);
}
while ((plan = node.getPhysicalPlan()) != null) {
cnt ++;
// output(plan);
}
node.resetFileStatus();
}
@Test
public void multiInsertTest() throws IOException {
System.out.println("============multiInsertTest");
WriteLogNode node = new WriteLogNode(fileNode);
node.resetFileStatus();
node.setLogMemorySize(1);
node.setLogCompactSize(100);
List<String> measurementList = new ArrayList<>();
List<String> valueList = new ArrayList<>();
int cnt = 1;
for (int i = 0;i <= 10;i++) {
for (int i = 0; i <= 10000; i++) {
measurementList.add("s0");
valueList.add(String.valueOf(i));
}
MultiInsertPlan multiInsertPlan = new MultiInsertPlan(1, fileNode, 1, measurementList, valueList);
MultiInsertPlan multiInsertPlan = new MultiInsertPlan(1, fileNode, 1L, measurementList, valueList);
node.write(multiInsertPlan);
PhysicalPlan plan;
while ((plan = node.getPhysicalPlan()) != null) {
//multiInsertPlan = (MultiInsertPlan) plan;
//Assert.assertEquals(multiInsertPlan.getMeasurementList().size(), 11);
output(plan);
}
node.resetFileStatus();
}
@Test
@ -162,8 +223,8 @@ public class WriteLogNodeTest {
System.out.println("Delete: " + p.getPath() + " " + p.getDeleteTime());
} else if (plan instanceof MultiInsertPlan) {
MultiInsertPlan multiInsertPlan = (MultiInsertPlan) plan;
System.out.println("MultiInsert: " + multiInsertPlan.getPath() + multiInsertPlan.getTime());
for (int i = 0;i < multiInsertPlan.getMeasurementList().size();i++) {
System.out.println("MultiInsert: " + multiInsertPlan.getDeltaObject() + multiInsertPlan.getTime());
for (int i = 0; i < multiInsertPlan.getMeasurementList().size(); i++) {
System.out.println(multiInsertPlan.getMeasurementList().get(i) + " " + multiInsertPlan.getInsertValues().get(i));
}
}