解决update的路径拼接问题,并在多于一个路径时抛出异常。暂时的解决方案,因为要和带value filter的总和设计

This commit is contained in:
softpoeter 2017-10-10 19:13:25 +08:00
parent b8d9f3eb15
commit 6de11422f6
3 changed files with 88 additions and 86 deletions

View File

@ -7,6 +7,9 @@ import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
import cn.edu.tsinghua.iotdb.qp.logical.Operator;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.tsfile.timeseries.read.qp.Path;
import cn.edu.tsinghua.tsfile.timeseries.utils.StringContainer;
import static cn.edu.tsinghua.iotdb.qp.constant.SQLConstant.lineFeedSignal;
/**
* @author kangrong
@ -66,4 +69,16 @@ public class UpdatePlan extends PhysicalPlan {
ret.add(path);
return ret;
}
@Override
public String printQueryPlan() {
StringContainer sc = new StringContainer();
String preSpace = " ";
sc.addTail("UpdatePlan:");
sc.addTail(preSpace, "paths: ", path.toString(), lineFeedSignal);
sc.addTail(preSpace, "value:", value, lineFeedSignal);
sc.addTail(preSpace, "filter: ", lineFeedSignal);
intervals.forEach(p->sc.addTail(preSpace, preSpace, p.left,p.right, lineFeedSignal));
return sc.toString();
}
}

View File

@ -300,17 +300,20 @@ public class LogicalGenerator {
InsertOp.setValueList(valueList);
}
private void analyzeUpdate(ASTNode astNode) throws QueryProcessorException {
if (astNode.getChildCount() != 3)
throw new LogicalOperatorException("error format in UPDATE statement, please check whether SQL statement is correct." );
UpdateOperator updateOp = new UpdateOperator(SQLConstant.TOK_UPDATE);
initializedOperator = updateOp;
analyzeSelect(astNode.getChild(0));
if (astNode.getChild(1).getType() != TSParser.TOK_VALUE)
throw new LogicalOperatorException("error format in UPDATE statement, please check whether SQL statement is correct.");
updateOp.setValue(astNode.getChild(1).getChild(0).getText());
analyzeWhere(astNode.getChild(2));
}
private void analyzeUpdate(ASTNode astNode) throws QueryProcessorException {
if (astNode.getChildCount() > 3)
throw new LogicalOperatorException("UPDATE clause doesn't support multi-update yet.");
UpdateOperator updateOp = new UpdateOperator(SQLConstant.TOK_UPDATE);
initializedOperator = updateOp;
FromOperator fromOp = new FromOperator(TSParser.TOK_SELECT);
fromOp.addPrefixTablePath(parsePath(astNode.getChild(0)));
updateOp.setFromOperator(fromOp);
SelectOperator selectOp = new SelectOperator(TSParser.TOK_SELECT);
selectOp.addSelectPath(parsePath(astNode.getChild(1).getChild(0)));
updateOp.setSelectOperator(selectOp);
updateOp.setValue(astNode.getChild(1).getChild(1).getText());
analyzeWhere(astNode.getChild(2));
}
private void analyzeDelete(ASTNode astNode) throws LogicalOperatorException {
initializedOperator = new DeleteOperator(SQLConstant.TOK_DELETE);

View File

@ -1,14 +1,20 @@
package cn.edu.tsinghua.iotdb.qp.cud;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import cn.edu.tsinghua.iotdb.exception.ArgsErrorException;
import cn.edu.tsinghua.iotdb.qp.executor.OverflowQPExecutor;
import cn.edu.tsinghua.iotdb.qp.utils.MemIntQpExecutor;
import org.junit.After;
import cn.edu.tsinghua.iotdb.qp.QueryProcessor;
@ -17,92 +23,70 @@ import cn.edu.tsinghua.iotdb.qp.physical.PhysicalPlan;
import cn.edu.tsinghua.iotdb.query.management.RecordReaderFactory;
import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.timeseries.read.query.QueryDataSet;
import org.junit.Before;
import org.junit.Test;
public class QPUpdateTest {
QueryProcessor processor = new QueryProcessor(new OverflowQPExecutor());
private QueryProcessor processor;
@After
public void after() throws ProcessorException {
// try {
// FileUtils.deleteDirectory(new File(TsfileDBDescriptor.getInstance().getConfig().metadataDir));
// FileUtils.forceDelete(new File(TsfileDBDescriptor.getInstance().getConfig().overflowDataDir+File.separator+"root.qp_update_test"));
// FileUtils.forceDelete(new File(TsfileDBDescriptor.getInstance().getConfig().bufferWriteDir+File.separator+"root.qp_update_test"));
// FileUtils.forceDelete(new File(TsfileDBDescriptor.getInstance().getConfig().fileNodeDir+File.separator+"root.qp_update_test"));
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
}
// @Test
@Test
public void test() throws QueryProcessorException, ArgsErrorException, ProcessorException, IOException {
init();
testUpdate();
// testUpdate();
testUpdate2();
testDelete();
testInsert();
testDeletePaths();
// testDelete();
// testInsert();
// testDeletePaths();
}
private void init() throws QueryProcessorException, ArgsErrorException, ProcessorException{
PhysicalPlan plan = processor.parseSQLToPhysicalPlan("CREATE TIMESERIES root.qp_update_test.device_1.sensor_1 WITH DATATYPE=INT32, ENCODING=RLE");
processor.getExecutor().processNonQuery(plan);
plan = processor.parseSQLToPhysicalPlan("CREATE TIMESERIES root.qp_update_test.device_1.sensor_2 WITH DATATYPE=INT32, ENCODING=RLE");
processor.getExecutor().processNonQuery(plan);
plan = processor.parseSQLToPhysicalPlan("SET STORAGE GROUP TO root.qp_update_test");
processor.getExecutor().processNonQuery(plan);
plan = processor.parseSQLToPhysicalPlan("insert into root.qp_update_test.device_1(timestamp,sensor_1) values(10,10)");
processor.getExecutor().processNonQuery(plan);
plan = processor.parseSQLToPhysicalPlan("insert into root.qp_update_test.device_1(timestamp,sensor_2) values(20,10)");
processor.getExecutor().processNonQuery(plan);
}
private void testUpdate2() throws QueryProcessorException, ArgsErrorException, ProcessorException, IOException {
String sql = "update root.qp_update_test.device_1.sensor_1 set value=100 where time>100 or (time<=50 and time>10)";
PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sql);
boolean upRet = processor.getExecutor().processNonQuery(plan);
assertTrue(upRet);
sql = "update root.qp_update_test.device_1.sensor_1 set value=100 where time>100 and time<20";
try {
plan = processor.parseSQLToPhysicalPlan(sql);
} catch (QueryProcessorException e) {
assertEquals("For update command, time filter is invalid", e.getMessage());
}
sql = "update root.qp_update_test.device_1.sensor_1 set value=100 where time<-10 or time>1000";
try {
plan = processor.parseSQLToPhysicalPlan(sql);
} catch (QueryProcessorException e) {
assertEquals("update time must be greater than 0.", e.getMessage());
}
sql = "update root.qp_update_test.device_1.sensor_1 set value=100 where time<100";
// remove read cache compulsory
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1", "sensor_1");
try {
plan = processor.parseSQLToPhysicalPlan(sql);
processor.getExecutor().processNonQuery(plan);
String sqlStr = "select sensor_1,sensor_2 from root.qp_update_test.device_1";
PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
Iterator<QueryDataSet> iter = processor.getExecutor().processQuery(plan2);
String[] expect = {
"10 100 null",
"20 null 10" };
int i = 0;
while (iter.hasNext()) {
QueryDataSet set = iter.next();
while (set.hasNextRecord()) {
String result = set.getNextRecord().toString();
assertEquals(expect[i++], result);
}
}
assertEquals(expect.length, i);
} catch (QueryProcessorException e) {
fail(e.getMessage());
}
private void init() throws QueryProcessorException, ArgsErrorException, ProcessorException{
MemIntQpExecutor memProcessor = new MemIntQpExecutor();
Map<String, List<String>> fakeAllPaths = new HashMap<String, List<String>>() {{
put("root.laptop.d1.s1", new ArrayList<String>() {{
add("root.laptop.d1.s1");
}});
put("root.laptop.d2.s1", new ArrayList<String>() {{
add("root.laptop.d1.s1");
}});
put("root.laptop.d2.s2", new ArrayList<String>() {{
add("root.laptop.d1.s2");
}});
put("root.laptop.*.s1", new ArrayList<String>() {{
add("root.laptop.d1.s1");
add("root.laptop.d2.s1");
}});
}};
memProcessor.setFakeAllPaths(fakeAllPaths);
processor = new QueryProcessor(memProcessor);
}
private void testUpdate2() throws ArgsErrorException, ProcessorException, IOException {
PhysicalPlan plan = null;
// String sql = "update root.qp_update_test.device_1.sensor_1 set value=100 where time>100 or (time<=50 and time>10)";
String sql = "UPDATE root.laptop SET d1.s1 = -33000, d2.s1 = 'string' WHERE time < 100";
try {
plan = processor.parseSQLToPhysicalPlan(sql);
} catch (QueryProcessorException e) {
assertEquals("UPDATE clause doesn't support multi-update yet.",e.getMessage());
}
sql = "UPDATE root.laptop SET d1.s1 = -33000 WHERE time < 100";
try {
plan = processor.parseSQLToPhysicalPlan(sql);
} catch (QueryProcessorException e) {
assertTrue(false);
}
assertEquals(
"UpdatePlan: paths: root.laptop.d1.s1\n" +
" value:-33000\n" +
" filter: \n" +
" 199\n",
plan.printQueryPlan());
// System.out.println(plan.printQueryPlan());
}
private void testUpdate() throws QueryProcessorException, ArgsErrorException, ProcessorException {