Assemble all related projects into one. (#450)

This commit is contained in:
Xiangdong Huang 2018-11-19 11:31:03 +08:00 committed by XuYi
parent 88c191cad8
commit 167f733e8e
1101 changed files with 91296 additions and 175 deletions

57
.gitignore vendored
View File

@ -1,6 +1,15 @@
iotdb/data/*
iotdb/derby.log
iotdb/iotdb/data/*
iotdb/iotdb/derby.log
iotdb/*.pid
iotdb/iotdb/data/*
iotdb/iotdb/gc.log
iotdb/iotdb/logs/*
iotdb/iotdb/lib/*
tsfile/src/test/resources/perTestInputData
# Eclipse IDE files
**/.classpath
**/.project
@ -10,9 +19,17 @@ iotdb/*.pid
**/*.iml
**/.idea/
**/*.log
**/*.ipr
**/*.iws
# Apple OS X related
**/.DS_Store
derby-tsfile-db
# intellj IDE files
**/*.iml
**/.idea/
# Apple OS X related
**/.DS_Store
# build generated
**/target/
@ -20,7 +37,6 @@ derby-tsfile-db
# intermediately generated locally
**/logs/
src/main/resources/*
tsfile-timeseries/src/main/resources/logback.out.out.xml
tsfile-timeseries/src/main/resources/logback.out.xml
@ -34,6 +50,43 @@ tsfile-jdbc/src/main/resources/output/queryRes.csv
*.tar.gz
*.tar
/data/
src/test/resources/logback.xml
#src/test/resources/logback.xml
### Maven ###
grafana/target/
!grafana/.mvn/wrapper/maven-wrapper.jar
grafana/.mvn/
grafana/logs/
*.log
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### NetBeans ###
**/nbproject/private/
**/build/
**/nbbuild/
**/dist/
**/nbdist/
**/.nb-gradle/
grafana/data/
**/.DS_Store
grafana/data/test.csv
**/lib/
*.jar
/target/
*.tsfile
tsfile/src/test/resources/*.ts

View File

@ -1,21 +0,0 @@
#!/bin/bash
pwd
rm -rf data
# get tsfile and jdbc
rm -rf tsfile
git clone https://github.com/thulab/tsfile.git
cd tsfile
mvn clean install -Dmaven.test.skip=true
cd ..
rm -rf iotdb-jdbc
git clone https://github.com/thulab/iotdb-jdbc.git
cd iotdb-jdbc
mvn clean install -Dmaven.test.skip=true
cd ..
#begin ...
rm -rf data
mvn clean test
rm -rf data

35
.travis.yml Normal file
View File

@ -0,0 +1,35 @@
dist: trusty
#sudo: required
language: java
jdk:
- oraclejdk8
before_install:
# - sudo apt-get install -qq libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev
# - wget http://archive.apache.org/dist/thrift/0.9.1/thrift-0.9.1.tar.gz
# - tar xfz thrift-0.9.1.tar.gz
# - cd thrift-0.9.1
# - chmod +x ./configure
# - ./configure --disable-gen-erl --disable-gen-hs --without-cpp --without-boost --without-qt4 --without-c_glib --without-csharp --without-erlang --without-python --without-perl --without-php --without-php_extension --without-ruby --without-haskell --without-go --without-d
# - sudo make install
# - cd ..
- pip install --user codecov
# we do not need to use the following commands anymore, because now we are a assemble project!
# - git clone https://github.com/thulab/tsfile.git
# - cd tsfile
# - mvn install -DskipTests=true
# - cd ..
script:
# only SNAPSHOT version needs to do the following phase
#we do not need to run mvn cobertura:cobertura manually anymore, because we have config it in the pom.xml
# - mvn cobertura:cobertura
- bash <(curl -s https://codecov.io/bash) -t 853796a6-a626-422f-a897-ac233e121f8f
- mvn clean test
after_success:
# - mvn cobertura:cobertura
- bash <(curl -s https://codecov.io/bash) -t 853796a6-a626-422f-a897-ac233e121f8f

200
checkstyle.xml Normal file
View File

@ -0,0 +1,200 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
Checkstyle configurartion that checks the Google coding conventions from:
- Google Java Style
https://google-styleguide.googlecode.com/svn-history/r130/trunk/javaguide.html
Checkstyle is very configurable. Be sure to read the documentation at
http://checkstyle.sf.net (or in your downloaded distribution).
Most Checks are configurable, be sure to consult the documentation.
To completely disable a check, just comment it out or delete it from the file.
Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov.
-->
<module name = "Checker">
<property name="charset" value="UTF-8"/>
<property name="severity" value="warning"/>
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
<module name="TreeWalker">
<module name="OuterTypeFilename"/>
<module name="IllegalTokenText">
<property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
<property name="format" value="\\u00(08|09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
<property name="message" value="Avoid using corresponding octal or Unicode escape."/>
</module>
<module name="AvoidEscapedUnicodeCharacters">
<property name="allowEscapesForControlCharacters" value="true"/>
<property name="allowByTailComment" value="true"/>
<property name="allowNonPrintableEscapes" value="true"/>
</module>
<module name="LineLength">
<property name="max" value="100"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
<module name="AvoidStarImport"/>
<module name="OneTopLevelClass"/>
<module name="NoLineWrap"/>
<module name="EmptyBlock">
<property name="option" value="TEXT"/>
<property name="tokens" value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
</module>
<module name="NeedBraces"/>
<module name="LeftCurly">
<property name="maxLineLength" value="100"/>
</module>
<module name="RightCurly"/>
<module name="RightCurly">
<property name="option" value="alone"/>
<property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT"/>
</module>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
<property name="allowEmptyTypes" value="true"/>
<property name="allowEmptyLoops" value="true"/>
<message key="ws.notFollowed"
value="WhitespaceAround: ''{0}'' is not followed by whitespace."/>
<message key="ws.notPreceded"
value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
</module>
<module name="OneStatementPerLine"/>
<module name="MultipleVariableDeclarations"/>
<module name="ArrayTypeStyle"/>
<module name="MissingSwitchDefault"/>
<module name="FallThrough"/>
<module name="UpperEll"/>
<module name="ModifierOrder"/>
<module name="EmptyLineSeparator">
<property name="allowNoEmptyLineBetweenFields" value="true"/>
</module>
<module name="SeparatorWrap">
<property name="tokens" value="DOT"/>
<property name="option" value="nl"/>
</module>
<module name="SeparatorWrap">
<property name="tokens" value="COMMA"/>
<property name="option" value="EOL"/>
</module>
<module name="PackageName">
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
<message key="name.invalidPattern"
value="Package name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="TypeName">
<message key="name.invalidPattern"
value="Type name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="MemberName">
<property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9]*$"/>
<message key="name.invalidPattern"
value="Member name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="ParameterName">
<property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9]*$"/>
<message key="name.invalidPattern"
value="Parameter name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="LocalVariableName">
<property name="tokens" value="VARIABLE_DEF"/>
<property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9]*$"/>
<property name="allowOneCharVarInForLoop" value="true"/>
<message key="name.invalidPattern"
value="Local variable name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="ClassTypeParameterName">
<property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
<message key="name.invalidPattern"
value="Class type name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="MethodTypeParameterName">
<property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
<message key="name.invalidPattern"
value="Method type name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="NoFinalizer"/>
<module name="GenericWhitespace">
<message key="ws.followed"
value="GenericWhitespace ''{0}'' is followed by whitespace."/>
<message key="ws.preceded"
value="GenericWhitespace ''{0}'' is preceded with whitespace."/>
<message key="ws.illegalFollow"
value="GenericWhitespace ''{0}'' should followed by whitespace."/>
<message key="ws.notPreceded"
value="GenericWhitespace ''{0}'' is not preceded with whitespace."/>
</module>
<module name="Indentation">
<property name="basicOffset" value="2"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="2"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="2"/>
</module>
<module name="AbbreviationAsWordInName">
<property name="ignoreFinal" value="false"/>
<property name="allowedAbbreviationLength" value="1"/>
</module>
<module name="OverloadMethodsDeclarationOrder"/>
<module name="VariableDeclarationUsageDistance"/>
<module name="CustomImportOrder">
<property name="thirdPartyPackageRegExp" value=".*"/>
<property name="specialImportsRegExp" value="com.google"/>
<property name="sortImportsInGroupAlphabetically" value="true"/>
<property name="customImportOrderRules" value="STATIC###SPECIAL_IMPORTS###THIRD_PARTY_PACKAGE###STANDARD_JAVA_PACKAGE"/>
</module>
<module name="MethodParamPad"/>
<module name="OperatorWrap">
<property name="option" value="NL"/>
<property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR "/>
</module>
<module name="AnnotationLocation">
<property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF"/>
</module>
<module name="AnnotationLocation">
<property name="tokens" value="VARIABLE_DEF"/>
<property name="allowSamelineMultipleAnnotations" value="true"/>
</module>
<module name="NonEmptyAtclauseDescription"/>
<module name="JavadocTagContinuationIndentation"/>
<module name="SummaryJavadocCheck">
<property name="forbiddenSummaryFragments" value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
</module>
<module name="JavadocParagraph"/>
<module name="AtclauseOrder">
<property name="tagOrder" value="@param, @return, @throws, @deprecated"/>
<property name="target" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
</module>
<module name="JavadocMethod">
<property name="scope" value="public"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingThrowsTags" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
<property name="minLineCount" value="2"/>
<property name="allowedAnnotations" value="Override, Test"/>
<property name="allowThrowsTagsForSubclasses" value="true"/>
</module>
<module name="MethodName">
<property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9_]*$"/>
<message key="name.invalidPattern"
value="Method name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="SingleLineJavadoc"/>
</module>
</module>

View File

@ -0,0 +1,6 @@
spring.datasource.url = jdbc:tsfile://127.0.0.1:6667/
spring.datasource.username = root
spring.datasource.password = root
spring.datasource.driver-class-name=cn.edu.tsinghua.iotdb.jdbc.TsfileDriver
server.port = 8888

80
grafana/pom.xml Normal file
View File

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iotdb-grafana</artifactId>
<version>0.8.0-SNAPSHOT</version>
<packaging>war</packaging>
<name>iotdb-grafana</name>
<description>Grafana data source connector for IoTDB</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<json.version>20140107</json.version>
<compile.version>1.8</compile.version>
<targetJavaVersion>1.8</targetJavaVersion>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>${targetJavaVersion}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${compile.version}</source>
<target>${compile.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>
</plugins>
</build>
</project>

68
grafana/readme.md Normal file
View File

@ -0,0 +1,68 @@
# Grafana安装
Grafana下载地址https://grafana.com/grafana/download
版本4.4.1
选择相应的操作系统下载并安装
# 数据源插件安装
基于simple-json-datasource数据源插件连接IoTDB数据库。
插件下载地址https://github.com/grafana/simple-json-datasource
下载并解压将文件放到Grafana的目录中
`data/plugin/`Windows或`/var/lib/grafana/plugins` (Linux)
# 启动Grafana
启动 Grafana
# IoTDB安装
参考https://github.com/thulab/iotdb
# 后端数据源连接器安装
下载源代码
```
git clone git@github.com:thulab/iotdb-grafana.git
```
进入目录打成war包
```
mvn clean package
```
将`application.properties`文件从`conf/`目录复制到`target`目录下,并编辑属性值
```
spring.datasource.url = jdbc:tsfile://127.0.0.1:6667/
spring.datasource.username = root
spring.datasource.password = root
spring.datasource.driver-class-name=cn.edu.tsinghua.iotdb.jdbc.TsfileDriver
server.port = 8888
```
采用IoTDB作为后端数据源前四行定义了数据库的属性默认端口为6667用户名和密码都为root指定数据源驱动的名称。
编辑server.port的值修改连接器的端口。
# 运行启动
启动数据库参考https://github.com/thulab/iotdb
运行后端数据源连接器,在控制台输入
```$xslt
cd target/
java -jar iotdb-grafana-0.7.0.war
```
Grafana的默认端口为 3000在浏览器中访问 http://localhost:3000
用户名和密码都为 admin
# 添加数据源
在首页点击左上角的图标,选择`Data Sources`,点击右上角`Add data source`图标,填写`data source`相关配置,在`Config`中`Type`选择`SimpleJson``Url`填写http://localhost:8888
端口号和数据源连接器的端口号一致,填写完整后选择`Add`,数据源添加成功。
# 设计并制作仪表板
在首页点击左上角的图标,选择`Dashboards` - `New`,新建仪表板。在面板中可添加多种类型的图表。
以折线图为例说明添加时序数据的过程:
选择`Graph`类型,在空白处出现无数据点的图,点击标题选择`Edit`,在图下方出现属性值编辑和查询条件选择区域,在`Metrics`一栏中`Add Query`添加查询,点击`select metric`下拉框中出现IoTDB中所有时序的名称在右上角选择时间范围绘制出对应的查询结果。可设置定时刷新实时展现时序数据。

View File

@ -0,0 +1,12 @@
package cn.edu.tsinghua.web;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TsfileWebDemoApplication {
public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
SpringApplication.run(TsfileWebDemoApplication.class, args);
}
}

View File

@ -0,0 +1,34 @@
package cn.edu.tsinghua.web.bean;
/**
* Created by dell on 2017/7/18.
*/
public class TimeValues {
private long time;
private float value;
@Override
public String toString() {
return "TimeValues{" +
"time=" + time +
", values=" + value +
'}';
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public float getValue() {
return value;
}
public void setValue(float value) {
this.value = value;
}
}

View File

@ -0,0 +1,24 @@
package cn.edu.tsinghua.web.conf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
/**
* Created by dell on 2017/7/18.
*/
@Configuration
public class MyConfiguration {
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurerAdapter() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**");
}
};
}
}

View File

@ -0,0 +1,165 @@
package cn.edu.tsinghua.web.controller;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.web.bean.TimeValues;
import cn.edu.tsinghua.web.service.DBConnectService;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by dell on 2017/7/17.
*/
@CrossOrigin
@Controller
public class DBConnectController {
private static final Logger logger = LoggerFactory.getLogger(DBConnectController.class);
@Autowired
private DBConnectService DBConnectService;
@RequestMapping(value = "/", method = RequestMethod.GET)
@ResponseStatus(value = HttpStatus.OK)
public void testDataConnection(HttpServletResponse response) throws IOException {
logger.info("Connection is ok now!");
response.getWriter().print("I have sent a message.");
}
@RequestMapping(value = "/search")
@ResponseBody
public String metricFindQuery(HttpServletRequest request, HttpServletResponse response) {
Map<Integer, String> target = new HashMap<>();
response.setStatus(200);
List<String> columnsName = new ArrayList<>();
try {
columnsName = DBConnectService.getMetaData();
} catch (Exception e) {
logger.error("Failed to get metadata", e);
}
Collections.sort(columnsName);
int cnt = 0;
for (String columnName : columnsName) {
target.put(cnt++, columnName);
}
JSONObject ojb = new JSONObject(target);
return ojb.toString();
}
@RequestMapping(value = "/query")
@ResponseBody
public String query(HttpServletRequest request, HttpServletResponse response) {
response.setStatus(200);
try {
JSONObject jsonObject = getRequestBodyJSON(request);
Pair<ZonedDateTime, ZonedDateTime> timeRange = getTimeFromAndTo(jsonObject);
JSONArray array = (JSONArray)jsonObject.get("targets"); //[]
JSONArray result = new JSONArray();
for (int i=0; i<array.length(); i++) {
JSONObject object = (JSONObject)array.get(i); //{}
if (object.isNull("target")) return "[]";
String target = (String) object.get("target");
String type = getJSONType(jsonObject);
JSONObject obj = new JSONObject();
obj.put("target", target);
if (type.equals("table")) {
setJSONTable(obj, target, timeRange);
} else if (type.equals("timeserie")) {
setJSONTimeseries(obj, target, timeRange);
}
result.put(i, obj);
}
logger.info("query finished");
return result.toString();
} catch (Exception e) {
logger.error("/query failed", e);
}
return null;
}
private Pair<ZonedDateTime, ZonedDateTime> getTimeFromAndTo(JSONObject jsonObject) throws JSONException {
JSONObject obj = (JSONObject)jsonObject.get("range");
Instant from = Instant.parse((String)obj.get("from"));
Instant to = Instant.parse((String)obj.get("to"));
return new Pair<>(from.atZone(ZoneId.of("Asia/Shanghai")), to.atZone(ZoneId.of("Asia/Shanghai")));
}
private void setJSONTable(JSONObject obj, String target, Pair<ZonedDateTime, ZonedDateTime> timeRange) throws JSONException {
List<TimeValues> timeValues = DBConnectService.querySeries(target, timeRange);
JSONArray columns = new JSONArray();
JSONObject column = new JSONObject();
column.put("text", "Time");
column.put("type", "time");
columns.put(column);
column = new JSONObject();
column.put("text", "Number");
column.put("type", "number");
columns.put(column);
obj.put("columns", columns);
JSONArray values = new JSONArray();
for (TimeValues tv : timeValues) {
JSONArray value = new JSONArray();
value.put(tv.getTime());
value.put(tv.getValue());
values.put(value);
}
obj.put("values", values);
}
private void setJSONTimeseries(JSONObject obj, String target, Pair<ZonedDateTime, ZonedDateTime> timeRange) throws JSONException {
List<TimeValues> timeValues = DBConnectService.querySeries(target, timeRange);
logger.info("query size: {}", timeValues.size());
JSONArray dataPoints = new JSONArray();
for (TimeValues tv : timeValues) {
long time = tv.getTime();
float value = tv.getValue();
JSONArray jsonArray = new JSONArray();
jsonArray.put(value);
jsonArray.put(time);
dataPoints.put(jsonArray);
}
obj.put("datapoints", dataPoints);
}
public JSONObject getRequestBodyJSON(HttpServletRequest request) throws JSONException {
try {
BufferedReader br = request.getReader();
StringBuilder sb = new StringBuilder();
String line;
while((line = br.readLine()) != null) {
sb.append(line);
}
return new JSONObject(sb.toString());
} catch (IOException e) {
logger.error("getRequestBodyJSON failed", e);
}
return null;
}
public String getJSONType(JSONObject jsonObject) throws JSONException {
JSONArray array = (JSONArray)jsonObject.get("targets"); //[]
JSONObject object = (JSONObject)array.get(0); //{}
return (String)object.get("type");
}
}

View File

@ -0,0 +1,18 @@
package cn.edu.tsinghua.web.dao;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.web.bean.TimeValues;
import java.time.ZonedDateTime;
import java.util.List;
/**
* Created by dell on 2017/7/17.
*/
public interface BasicDao {
List<TimeValues> querySeries(String s, Pair<ZonedDateTime, ZonedDateTime> timeRange);
List<String> getMetaData();
}

View File

@ -0,0 +1,107 @@
package cn.edu.tsinghua.web.dao.impl;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.web.bean.TimeValues;
import cn.edu.tsinghua.web.dao.BasicDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* Created by dell on 2017/7/17.
*/
@Repository
public class BasicDaoImpl implements BasicDao {
private static final Logger logger = LoggerFactory.getLogger(BasicDaoImpl.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public BasicDaoImpl(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public List<String> getMetaData() {
ConnectionCallback<Object> connectionCallback = new ConnectionCallback<Object>() {
public Object doInConnection(Connection connection) throws SQLException {
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet resultSet = databaseMetaData.getColumns(null, null, "root.*", null);
System.out.println("Start to get timeseries");
List<String> columnsName = new ArrayList<>();
while (resultSet.next()) {
String timeseries = resultSet.getString(1);
columnsName.add(timeseries.substring(5));
}
return columnsName;
}
};
return (List<String>)jdbcTemplate.execute(connectionCallback);
}
@Override
public List<TimeValues> querySeries(String s, Pair<ZonedDateTime, ZonedDateTime> timeRange) {
Long from = zonedCovertToLong(timeRange.left);
Long to = zonedCovertToLong(timeRange.right);
String sql = "SELECT " + s.substring(s.lastIndexOf('.')+1) + " FROM root." + s.substring(0, s.lastIndexOf('.')) + " WHERE time > " + from + " and time < " + to;
logger.info(sql);
List<TimeValues> rows = null;
try {
rows = jdbcTemplate.query(sql, new TimeValuesRowMapper("root." + s));
} catch (Exception e) {
logger.error(e.getMessage());
}
return rows;
}
private Long zonedCovertToLong(ZonedDateTime time) {
return time.toInstant().toEpochMilli();
}
static class TimeValuesRowMapper implements RowMapper<TimeValues> {
String columnName;
final String TRUE_STR = "true";
final String FALSE_STR = "false";
TimeValuesRowMapper(String columnName) {
this.columnName = columnName;
}
@Override
public TimeValues mapRow(ResultSet resultSet, int i) throws SQLException {
TimeValues tv = new TimeValues();
tv.setTime(resultSet.getLong("Time"));
String vString = resultSet.getString(columnName);
if (vString != null) {
if (TRUE_STR.equals(vString.toLowerCase())) {
tv.setValue(1);
} else if (FALSE_STR.equals(vString.toLowerCase())) {
tv.setValue(0);
} else {
try {
tv.setValue(resultSet.getFloat(columnName));
} catch (Exception e) {
tv.setValue(0);
}
}
}
return tv;
}
}
}

View File

@ -0,0 +1,20 @@
package cn.edu.tsinghua.web.service;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.web.bean.TimeValues;
import java.time.ZonedDateTime;
import java.util.List;
/**
* Created by dell on 2017/7/17.
*/
public interface DBConnectService {
int testConnection();
List<TimeValues> querySeries(String s, Pair<ZonedDateTime, ZonedDateTime> timeRange);
List<String> getMetaData();
}

View File

@ -0,0 +1,38 @@
package cn.edu.tsinghua.web.service.impl;
import cn.edu.tsinghua.tsfile.common.utils.Pair;
import cn.edu.tsinghua.web.bean.TimeValues;
import cn.edu.tsinghua.web.dao.BasicDao;
import cn.edu.tsinghua.web.service.DBConnectService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.ZonedDateTime;
import java.util.List;
/**
* Created by dell on 2017/7/17.
*/
@Service
public class DBConnectServiceImpl implements DBConnectService {
@Autowired
BasicDao basicDao;
@Override
public int testConnection() {
return 0;
}
@Override
public List<TimeValues> querySeries(String s, Pair<ZonedDateTime, ZonedDateTime> timeRange) {
return basicDao.querySeries(s, timeRange);
}
@Override
public List<String> getMetaData() {
return basicDao.getMetaData();
}
}

1
hadoop/README.md Normal file
View File

@ -0,0 +1 @@
# tsfile-hadoop-connector

43
hadoop/pom.xml Normal file
View File

@ -0,0 +1,43 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>root</artifactId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>cn.edu.thu.tsfile</groupId>
<artifactId>hadoop</artifactId>
<packaging>jar</packaging>
<name>tsfile-hadoop</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>tsfile</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
</plugins>
</build>
</project>

View File

@ -0,0 +1,31 @@
package cn.edu.thu.tsfile.hadoop;
/**
* @author liukun
*/
public class TSFHadoopException extends Exception {
private static final long serialVersionUID = 9206686224701568169L;
public TSFHadoopException() {
super();
}
public TSFHadoopException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public TSFHadoopException(String message, Throwable cause) {
super(message, cause);
}
public TSFHadoopException(String message) {
super(message);
}
public TSFHadoopException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,362 @@
package cn.edu.thu.tsfile.hadoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfile.hadoop.io.HDFSInputStream;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.timeseries.read.FileReader;
/**
* @author liukun
*/
public class TSFInputFormat extends FileInputFormat<NullWritable, ArrayWritable> {
private static final Logger LOGGER = LoggerFactory.getLogger(TSFInputFormat.class);
/**
* key to configure whether reading time enable
*/
public static final String READ_TIME_ENABLE = "tsfile.read.time.enable";
/**
* key to configure whether reading deltaObjectId enable
*/
public static final String READ_DELTAOBJECT_ENABLE = "tsfile.read.deltaObjectId.enable";
/**
* key to configure the type of filter
*/
@Deprecated
public static final String FILTER_TYPE = "tsfile.filter.type";
/**
* key to configure the filter
*/
@Deprecated
public static final String FILTER_EXPRESSION = "tsfile.filter.expression";
/**
* key to configure whether filtering is enable
*/
public static final String FILTER_EXIST = "tsfile.filter.exist";
/**
* key to configure the reading deltaObjectIds
*/
public static final String READ_DELTAOBJECTS = "tsfile.read.deltaobject";
/**
* key to configure the reading measurementIds
*/
public static final String READ_MEASUREMENTID = "tsfile.read.measurement";
private static final String SPERATOR = ",";
/**
* Set the deltaObjectIds which want to be read
*
* @param job hadoop job
* @param value the deltaObjectIds will be read
* @throws TSFHadoopException
*/
public static void setReadDeltaObjectIds(Job job, String[] value) throws TSFHadoopException {
if (value == null || value.length < 1) {
throw new TSFHadoopException("The devices selected is null or empty");
} else {
String deltaObjectIds = "";
for (String deltaObjectId : value) {
deltaObjectIds = deltaObjectIds + deltaObjectId + SPERATOR;
}
job.getConfiguration().set(READ_DELTAOBJECTS, (String) deltaObjectIds.subSequence(0, deltaObjectIds.length() - 1));
}
}
/**
* Get the deltaObjectIds which want to be read
*
* @param configuration
* @return List of deltaObject, if configuration has been set the deltaObjectIds.
* null, if configuration has not been set the deltaObjectIds.
*/
public static List<String> getReadDeltaObjectIds(Configuration configuration) {
String deltaObjectIds = configuration.get(READ_DELTAOBJECTS);
if (deltaObjectIds == null || deltaObjectIds.length() < 1) {
return null;
} else {
List<String> deltaObjectIdsList = Arrays.asList(deltaObjectIds.split(SPERATOR));
return deltaObjectIdsList;
}
}
/**
* Set the measurementIds which want to be read
*
* @param job hadoop job
* @param value the measurementIds will be read
* @throws TSFHadoopException
*/
public static void setReadMeasurementIds(Job job, String[] value) throws TSFHadoopException {
if (value == null || value.length < 1) {
throw new TSFHadoopException("The sensors selected is null or empty");
} else {
String measurementIds = "";
for (String measurementId : value) {
measurementIds = measurementIds + measurementId + SPERATOR;
}
// Get conf type
job.getConfiguration().set(READ_MEASUREMENTID, (String) measurementIds.subSequence(0, measurementIds.length() - 1));
}
}
/**
* Get the measurementIds which want to be read
*
* @param configuration hadoop configuration
* @return if not set the measurementIds, return null
*/
public static List<String> getReadMeasurementIds(Configuration configuration) {
String measurementIds = configuration.get(READ_MEASUREMENTID);
if (measurementIds == null || measurementIds.length() < 1) {
return null;
} else {
List<String> measurementIdsList = Arrays.asList(measurementIds.split(SPERATOR));
return measurementIdsList;
}
}
/**
* @param job
* @param value
*/
public static void setReadDeltaObjectId(Job job, boolean value) {
job.getConfiguration().setBoolean(READ_DELTAOBJECT_ENABLE, value);
}
/**
* @param configuration
* @return
*/
public static boolean getReadDeltaObject(Configuration configuration) {
return configuration.getBoolean(READ_DELTAOBJECT_ENABLE, false);
}
/**
* @param job
* @param value
*/
public static void setReadTime(Job job, boolean value) {
job.getConfiguration().setBoolean(READ_TIME_ENABLE, value);
}
public static boolean getReadTime(Configuration configuration) {
return configuration.getBoolean(READ_TIME_ENABLE, false);
}
/**
* Set filter exist or not
*
* @param job
* @param value
*/
@Deprecated
public static void setHasFilter(Job job, boolean value) {
job.getConfiguration().setBoolean(FILTER_EXIST, value);
}
// check is we didn't set this key, the value will be null or empty
/**
* Get filter exist or not
*
* @param configuration
* @return
*/
@Deprecated
public static boolean getHasFilter(Configuration configuration) {
return configuration.getBoolean(FILTER_EXIST, false);
}
/**
* @param job
* @param value
*/
@Deprecated
public static void setFilterType(Job job, String value) {
job.getConfiguration().set(FILTER_TYPE, value);
}
/**
* Get the filter type
*
* @param configuration
* @return
*/
// check if not set the filter type, the result will null or empty
@Deprecated
public static String getFilterType(Configuration configuration) {
return configuration.get(FILTER_TYPE);
}
@Deprecated
public static void setFilterExp(Job job, String value) {
job.getConfiguration().set(FILTER_EXPRESSION, value);
}
@Deprecated
public static String getFilterExp(Configuration configuration) {
return configuration.get(FILTER_EXPRESSION);
}
@Override
public RecordReader<NullWritable, ArrayWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new TSFRecordReader();
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
Configuration configuration = job.getConfiguration();
BlockLocation[] blockLocations;
List<InputSplit> splits = new ArrayList<>();
// get the all file in the directory
List<FileStatus> listFileStatus = super.listStatus(job);
LOGGER.info("The number of this job file is {}", listFileStatus.size());
// For each file
for (FileStatus fileStatus : listFileStatus) {
LOGGER.info("The file path is {}", fileStatus.getPath());
// Get the file path
Path path = fileStatus.getPath();
// Get the file length
long length = fileStatus.getLen();
// Check the file length. if the length is less than 0, return the
// empty splits
if (length > 0) {
// Get block information in the local file system or hdfs
if (fileStatus instanceof LocatedFileStatus) {
LOGGER.info("The file status is {}", LocatedFileStatus.class.getName());
blockLocations = ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
FileSystem fileSystem = path.getFileSystem(configuration);
LOGGER.info("The file status is {}", fileStatus.getClass().getName());
System.out.println("The file status is " + fileStatus.getClass().getName());
System.out.println("The file system is " + fileSystem.getClass());
blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, length);
}
LOGGER.info("The block location information is {}", Arrays.toString(blockLocations));
HDFSInputStream hdfsInputStream = new HDFSInputStream(path, configuration);
FileReader fileReader = new FileReader(hdfsInputStream);
// Get the timeserise to test
splits.addAll(generateSplits(path, fileReader, blockLocations));
fileReader.close();
} else {
LOGGER.warn("The file length is " + length);
}
}
configuration.setLong(NUM_INPUT_FILES, listFileStatus.size());
LOGGER.info("The number of splits is " + splits.size());
return splits;
}
/**
* get the TSFInputSplit from tsfMetaData and hdfs block location
* information with the filter
*
* @param path
* @param fileReader
* @param blockLocations
* @return
* @throws IOException
*/
private List<TSFInputSplit> generateSplits(Path path, FileReader fileReader, BlockLocation[] blockLocations)
throws IOException {
List<TSFInputSplit> splits = new ArrayList<TSFInputSplit>();
Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
@Override
public int compare(BlockLocation o1, BlockLocation o2) {
return Long.signum(o1.getOffset() - o2.getOffset());
}
};
Arrays.sort(blockLocations, comparator);
List<RowGroupMetaData> rowGroupMetaDataList = new ArrayList<>();
int currentBlockIndex = 0;
long splitSize = 0;
long splitStart = 0;
List<String> hosts = new ArrayList<>();
for (RowGroupMetaData rowGroupMetaData : fileReader.getSortedRowGroupMetaDataList()) {
LOGGER.info("The rowGroupMetaData information is {}", rowGroupMetaData);
long start = getRowGroupStart(rowGroupMetaData);
int blkIndex = getBlockLocationIndex(blockLocations, start);
if(hosts.size() == 0)
{
hosts.addAll(Arrays.asList(blockLocations[blkIndex].getHosts()));
splitStart = start;
}
if(blkIndex != currentBlockIndex)
{
TSFInputSplit tsfInputSplit = makeSplit(path, rowGroupMetaDataList, splitStart,
splitSize, hosts);
LOGGER.info("The tsfile inputsplit information is {}", tsfInputSplit);
splits.add(tsfInputSplit);
currentBlockIndex = blkIndex;
rowGroupMetaDataList.clear();
rowGroupMetaDataList.add(rowGroupMetaData);
splitStart = start;
splitSize = rowGroupMetaData.getTotalByteSize();
hosts.clear();
}
else
{
rowGroupMetaDataList.add(rowGroupMetaData);
splitSize += rowGroupMetaData.getTotalByteSize();
}
}
TSFInputSplit tsfInputSplit = makeSplit(path, rowGroupMetaDataList, splitStart,
splitSize, hosts);
LOGGER.info("The tsfile inputsplit information is {}", tsfInputSplit);
splits.add(tsfInputSplit);
return splits;
}
private long getRowGroupStart(RowGroupMetaData rowGroupMetaData) {
return rowGroupMetaData.getMetaDatas().get(0).getProperties().getFileOffset();
}
private int getBlockLocationIndex(BlockLocation[] blockLocations, long start) {
for (int i = 0; i < blockLocations.length; i++) {
if (blockLocations[i].getOffset() <= start
&& start < blockLocations[i].getOffset() + blockLocations[i].getLength()) {
return i;
}
}
LOGGER.warn(String.format("Can't find the block. The start is:%d. the last block is", start),
blockLocations[blockLocations.length - 1].getOffset()
+ blockLocations[blockLocations.length - 1].getLength());
return -1;
}
private TSFInputSplit makeSplit(Path path, List<RowGroupMetaData> rowGroupMataDataList, long start, long length,
List<String> hosts) {
String[] hosts_str = hosts.toArray(new String[hosts.size()]);
return new TSFInputSplit(path, rowGroupMataDataList, start, length, hosts_str);
}
}

View File

@ -0,0 +1,166 @@
package cn.edu.thu.tsfile.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TsRowGroupBlockMetaData;
import cn.edu.tsinghua.tsfile.file.utils.ReadWriteThriftFormatUtils;
import cn.edu.tsinghua.tsfile.format.RowGroupBlockMetaData;
/**
* This is tsfile <code>InputSplit</code>.<br>
* Each <code>InputSplit</code> will be processed by individual
* <code>Mapper</code> task.
*
* @author liukun
*/
public class TSFInputSplit extends InputSplit implements Writable {
private Path path;
private int numOfDeviceRowGroup;
private List<RowGroupMetaData> deviceRowGroupMetaDataList;
private long start;
private long length;
private String[] hosts;
public TSFInputSplit() {
}
/**
* @param path
* @param deviceRowGroupMetaDataList
* @param start
* @param length
* @param hosts
*/
public TSFInputSplit(Path path, List<RowGroupMetaData> deviceRowGroupMetaDataList, long start, long length,
String[] hosts) {
this.path = path;
this.deviceRowGroupMetaDataList = deviceRowGroupMetaDataList;
this.numOfDeviceRowGroup = deviceRowGroupMetaDataList.size();
this.start = start;
this.length = length;
this.hosts = hosts;
}
/**
* @return the path
*/
public Path getPath() {
return path;
}
/**
* @param path
* the path to set
*/
public void setPath(Path path) {
this.path = path;
}
/**
* @return the numOfDeviceRowGroup
*/
public int getNumOfDeviceRowGroup() {
return numOfDeviceRowGroup;
}
/**
* @param numOfDeviceRowGroup
* the numOfDeviceRowGroup to set
*/
public void setNumOfDeviceRowGroup(int numOfDeviceRowGroup) {
this.numOfDeviceRowGroup = numOfDeviceRowGroup;
}
/**
* @return the deviceRowGroupMetaDataList
*/
public List<RowGroupMetaData> getDeviceRowGroupMetaDataList() {
return deviceRowGroupMetaDataList;
}
/**
* @param deviceRowGroupMetaDataList
* the deviceRowGroupMetaDataList to set
*/
public void setDeviceRowGroupMetaDataList(List<RowGroupMetaData> deviceRowGroupMetaDataList) {
this.deviceRowGroupMetaDataList = deviceRowGroupMetaDataList;
}
/**
* @return the start
*/
public long getStart() {
return start;
}
/**
* @param start
* the start to set
*/
public void setStart(long start) {
this.start = start;
}
@Override
public long getLength() throws IOException, InterruptedException {
return this.length;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return this.hosts;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(path.toString());
out.writeLong(start);
out.writeLong(length);
out.writeInt(hosts.length);
for (int i = 0; i < hosts.length; i++) {
String string = hosts[i];
out.writeUTF(string);
}
out.writeInt(numOfDeviceRowGroup);
RowGroupBlockMetaData rowGroupBlockMetaData = new TsRowGroupBlockMetaData(deviceRowGroupMetaDataList)
.convertToThrift();
ReadWriteThriftFormatUtils.writeRowGroupBlockMetadata(rowGroupBlockMetaData, (OutputStream) out);
}
@Override
public void readFields(DataInput in) throws IOException {
path = new Path(in.readUTF());
this.start = in.readLong();
this.length = in.readLong();
int len = in.readInt();
this.hosts = new String[len];
for (int i = 0; i < len; i++) {
hosts[i] = in.readUTF();
}
this.numOfDeviceRowGroup = in.readInt();
TsRowGroupBlockMetaData tsRowGroupBlockMetaData = new TsRowGroupBlockMetaData();
tsRowGroupBlockMetaData.convertToTSF(ReadWriteThriftFormatUtils.readRowGroupBlockMetaData((InputStream) in));
deviceRowGroupMetaDataList = tsRowGroupBlockMetaData.getRowGroups();
}
@Override
public String toString() {
return "TSFInputSplit [path=" + path + ", numOfDeviceGroup=" + numOfDeviceRowGroup
+ ", deviceRowGroupMetaDataList=" + deviceRowGroupMetaDataList + ", start=" + start + ", length="
+ length + ", hosts=" + Arrays.toString(hosts) + "]";
}
}

View File

@ -0,0 +1,58 @@
package cn.edu.thu.tsfile.hadoop;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TSFOutputFormat extends FileOutputFormat<NullWritable, TSRow> {
private static final Logger LOGGER = LoggerFactory.getLogger(TSFOutputFormat.class);
public static final String FILE_SCHEMA = "tsfile.schema";
private static final String extension = "tsfile";
public static void setWriterSchema(Job job, JSONObject schema) {
LOGGER.info("Set the write schema - {}", schema.toString());
job.getConfiguration().set(FILE_SCHEMA, schema.toString());
}
public static void setWriterSchema(Job job, String schema) {
LOGGER.info("Set the write schema - {}", schema);
job.getConfiguration().set(FILE_SCHEMA, schema);
}
public static JSONObject getWriterSchema(JobContext jobContext) throws InterruptedException {
String schema = jobContext.getConfiguration().get(FILE_SCHEMA);
if (schema == null || schema == "") {
throw new InterruptedException("The tsfile schema is null or empty");
}
JSONObject jsonSchema = new JSONObject(schema);
return jsonSchema;
}
@Override
public RecordWriter<NullWritable, TSRow> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Path outputPath = getDefaultWorkFile(job, extension);
LOGGER.info("The task attempt id is {}, the output path is {}", job.getTaskAttemptID(), outputPath);
JSONObject schema = getWriterSchema(job);
return new TSFRecordWriter(outputPath, schema);
}
}

View File

@ -0,0 +1,204 @@
package cn.edu.thu.tsfile.hadoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfile.hadoop.io.HDFSInputStream;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.timeseries.read.query.HadoopQueryEngine;
import cn.edu.tsinghua.tsfile.timeseries.read.query.QueryDataSet;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Field;
import cn.edu.tsinghua.tsfile.timeseries.read.support.RowRecord;
/**
* @author liukun
*/
public class TSFRecordReader extends RecordReader<NullWritable, ArrayWritable> {
private static final Logger LOGGER = LoggerFactory.getLogger(TSFRecordReader.class);
private QueryDataSet dataSet = null;
private List<Field> fields = null;
private long timestamp = 0;
private String deviceId;
private int sensorNum = 0;
private int sensorIndex = 0;
private boolean isReadDeviceId = false;
private boolean isReadTime = false;
private int arraySize = 0;
private HDFSInputStream hdfsInputStream;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
if (split instanceof TSFInputSplit) {
TSFInputSplit tsfInputSplit = (TSFInputSplit) split;
Path path = tsfInputSplit.getPath();
List<RowGroupMetaData> rowGroupMetaDataList = tsfInputSplit.getDeviceRowGroupMetaDataList();
Configuration configuration = context.getConfiguration();
hdfsInputStream = new HDFSInputStream(path, configuration);
// Get the read columns and filter information
List<String> deltaObjectIdsList = TSFInputFormat.getReadDeltaObjectIds(configuration);
if(deltaObjectIdsList == null)deltaObjectIdsList = initDeviceIdList(rowGroupMetaDataList);
List<String> measurementIdsList = TSFInputFormat.getReadMeasurementIds(configuration);
if(measurementIdsList == null)measurementIdsList = initSensorIdList(rowGroupMetaDataList);
LOGGER.info("deltaObjectIds:" + deltaObjectIdsList);
LOGGER.info("Sensors:" + measurementIdsList);
this.sensorNum = measurementIdsList.size();
isReadDeviceId = TSFInputFormat.getReadDeltaObject(configuration);
isReadTime = TSFInputFormat.getReadTime(configuration);
if (isReadDeviceId) {
arraySize++;
}
if (isReadTime) {
arraySize++;
}
arraySize += sensorNum;
HadoopQueryEngine queryEngine = new HadoopQueryEngine(hdfsInputStream, rowGroupMetaDataList);
dataSet = queryEngine.queryWithSpecificRowGroups(deltaObjectIdsList, measurementIdsList, null, null, null);
} else {
LOGGER.error("The InputSplit class is not {}, the class is {}", TSFInputSplit.class.getName(),
split.getClass().getName());
throw new InternalError(String.format("The InputSplit class is not %s, the class is %s",
TSFInputSplit.class.getName(), split.getClass().getName()));
}
}
private List<String> initDeviceIdList(List<RowGroupMetaData> rowGroupMetaDataList) {
Set<String> deviceIdSet = new HashSet<>();
for (RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) {
deviceIdSet.add(rowGroupMetaData.getDeltaObjectID());
}
return new ArrayList<>(deviceIdSet);
}
private List<String> initSensorIdList(List<RowGroupMetaData> rowGroupMetaDataList){
Set<String> sensorIdSet = new HashSet<>();
for(RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) {
for(TimeSeriesChunkMetaData timeSeriesChunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()){
sensorIdSet.add(timeSeriesChunkMetaData.getProperties().getMeasurementUID());
}
}
return new ArrayList<>(sensorIdSet);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
sensorIndex += sensorNum;
if(fields == null || sensorIndex >= fields.size()){
LOGGER.info("Start another row~");
if(!dataSet.next()){
LOGGER.info("Finish all rows~");
return false;
}
RowRecord rowRecord = dataSet.getCurrentRecord();
fields = rowRecord.getFields();
timestamp = rowRecord.getTime();
sensorIndex = 0;
}
deviceId = fields.get(sensorIndex).deltaObjectId;
return true;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public ArrayWritable getCurrentValue() throws IOException, InterruptedException {
Writable[] writables = getEmptyWritables();
Text deviceid = new Text(deviceId);
LongWritable time = new LongWritable(timestamp);
int index = 0;
if (isReadTime && isReadDeviceId) {
writables[0] = time;
writables[1] = deviceid;
index = 2;
} else if (isReadTime && !isReadDeviceId) {
writables[0] = time;
index = 1;
} else if (!isReadTime && isReadDeviceId) {
writables[0] = deviceid;
index = 1;
}
for(int i = 0;i < sensorNum;i++)
{
Field field = fields.get(sensorIndex + i);
if (field.isNull()) {
LOGGER.info("Current value is null");
writables[index] = NullWritable.get();
} else {
switch (field.dataType) {
case INT32:
writables[index] = new IntWritable(field.getIntV());
break;
case INT64:
writables[index] = new LongWritable(field.getLongV());
break;
case FLOAT:
writables[index] = new FloatWritable(field.getFloatV());
break;
case DOUBLE:
writables[index] = new DoubleWritable(field.getDoubleV());
break;
case BOOLEAN:
writables[index] = new BooleanWritable(field.getBoolV());
break;
case TEXT:
writables[index] = new Text(field.getBinaryV().getStringValue());
break;
default:
LOGGER.error("The data type is not support {}", field.dataType);
throw new InterruptedException(String.format("The data type %s is not support ", field.dataType));
}
}
index++;
}
return new ArrayWritable(Writable.class, writables);
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
dataSet = null;
hdfsInputStream.close();
}
private Writable[] getEmptyWritables() {
Writable[] writables = new Writable[arraySize];
return writables;
}
}

View File

@ -0,0 +1,65 @@
package cn.edu.thu.tsfile.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfile.hadoop.io.HDFSOutputStream;
import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile;
import cn.edu.tsinghua.tsfile.timeseries.write.exception.InvalidJsonSchemaException;
import cn.edu.tsinghua.tsfile.timeseries.write.exception.WriteProcessException;
import cn.edu.tsinghua.tsfile.timeseries.write.schema.FileSchema;
public class TSFRecordWriter extends RecordWriter<NullWritable, TSRow> {
private static final Logger LOGGER = LoggerFactory.getLogger(TSFRecordWriter.class);
private TsFile write = null;
public TSFRecordWriter(Path path, JSONObject schema) throws InterruptedException, IOException {
// construct the internalrecordwriter
FileSchema fileSchema = null;
try {
fileSchema = new FileSchema(schema);
} catch (InvalidJsonSchemaException e) {
e.printStackTrace();
LOGGER.error("Construct the tsfile schema failed, the reason is {}", e.getMessage());
throw new InterruptedException(e.getMessage());
}
HDFSOutputStream hdfsOutputStream = new HDFSOutputStream(path, new Configuration(), false);
try {
write = new TsFile(hdfsOutputStream, fileSchema);
} catch (WriteProcessException e) {
e.printStackTrace();
throw new IOException(e.getMessage());
}
}
@Override
public void write(NullWritable key, TSRow value) throws IOException, InterruptedException {
try {
write.writeRecord(value.getRow());
} catch (WriteProcessException e) {
e.printStackTrace();
LOGGER.error("Write tsfile record error, the error message is {}", e.getMessage());
throw new InterruptedException(e.getMessage());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
LOGGER.info("Close the recordwriter, the task attempt id is {}", context.getTaskAttemptID());
write.close();
}
}

View File

@ -0,0 +1,35 @@
package cn.edu.thu.tsfile.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
public class TSRow implements Writable {
private TSRecord row;
public TSRow(TSRecord row){
this.row = row;
}
public TSRecord getRow(){
return row;
}
@Override
public void write(DataOutput out) throws IOException {
throw new IOException("Not support");
}
@Override
public void readFields(DataInput in) throws IOException {
throw new IOException("Not support");
}
}

View File

@ -0,0 +1,111 @@
package cn.edu.thu.tsfile.hadoop.example;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import cn.edu.thu.tsfile.hadoop.TSFHadoopException;
import cn.edu.thu.tsfile.hadoop.TSFInputFormat;
import cn.edu.thu.tsfile.hadoop.TSFOutputFormat;
/**
* One example for reading TsFile with MapReduce.
* This MR Job is used to get the result of count("root.car.d1") in the tsfile.
* The source of tsfile can be generated by <code>TsFileHelper</code>.
* @author liukun
*
*/
public class TSFMRReadExample {
public static class TSMapper extends Mapper<NullWritable, ArrayWritable, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
@Override
protected void map(NullWritable key, ArrayWritable value,
Mapper<NullWritable, ArrayWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
Text deltaObjectId = (Text) value.get()[1];
context.write(deltaObjectId, one);
}
}
public static class TSReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : values) {
sum = sum + intWritable.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args)
throws IOException, ClassNotFoundException, TSFHadoopException, URISyntaxException {
if (args.length != 3) {
System.out.println("Please give hdfs url, input path, output path");
return;
}
String HDFSURL = args[0];
Path inputPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
Configuration configuration = new Configuration();
// set file system configuration
//configuration.set("fs.defaultFS", HDFSURL);
Job job = Job.getInstance(configuration);
job.setJobName("TsFile read jar");
job.setJarByClass(TSFMRReadExample.class);
// set mapper and reducer
job.setMapperClass(TSMapper.class);
job.setReducerClass(TSReducer.class);
// set inputformat and outputformat
job.setInputFormatClass(TSFInputFormat.class);
// set mapper output key and value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// set reducer output key and value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// set input file path
TSFInputFormat.setInputPaths(job, inputPath);
// set output file path
TSFOutputFormat.setOutputPath(job, outputPath);
/**
* special configuration for reading tsfile with TSFInputFormat
*/
TSFInputFormat.setReadTime(job, true); // configure reading time enable
TSFInputFormat.setReadDeltaObjectId(job, true); // configure reading deltaObjectId enable
String[] deltaObjectIds = { "device_1" };// configure reading which deltaObjectIds
TSFInputFormat.setReadDeltaObjectIds(job, deltaObjectIds);
String[] measurementIds = { "sensor_1", };// configure reading which measurementIds
TSFInputFormat.setReadMeasurementIds(job, measurementIds);
boolean isSuccess = false;
try {
isSuccess = job.waitForCompletion(true);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (isSuccess) {
System.out.println("Execute successfully");
} else {
System.out.println("Execute unsuccessfully");
}
}
}

View File

@ -0,0 +1,113 @@
package cn.edu.thu.tsfile.hadoop.example;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig;
import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.constant.JsonFormatConstant;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
import cn.edu.tsinghua.tsfile.timeseries.write.exception.WriteProcessException;
public class TsFileHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(TsFileHelper.class);
public static void deleteTsFile(String filePath){
File file = new File(filePath);
file.delete();
}
public static void writeTsFile(String filePath) {
TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
conf.pageSizeInByte=100;
conf.groupSizeInByte = 2000;
conf.pageCheckSizeThreshold = 1;
conf.maxStringLength = 2;
File file = new File(filePath);
if (file.exists())
file.delete();
JSONObject jsonSchema = getJsonSchema();
try {
ITsRandomAccessFileWriter output = new TsRandomAccessFileWriter(new File(filePath));
TsFile tsFile = new TsFile(output, jsonSchema);
String line = "";
for(int i = 1;i<1000;i++){
line = "root.car.d1,"+i+",s1," + i + ",s2,1,s3,0.1,s4,0.1";
tsFile.writeLine(line);
}
tsFile.writeLine("root.car.d2,5, s1, 5, s2, 50, s3, 200.5, s4, 0.5");
tsFile.writeLine("root.car.d2,6, s1, 6, s2, 60, s3, 200.6, s4, 0.6");
tsFile.writeLine("root.car.d2,7, s1, 7, s2, 70, s3, 200.7, s4, 0.7");
tsFile.writeLine("root.car.d2,8, s1, 8, s2, 80, s3, 200.8, s4, 0.8");
tsFile.close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
} catch (WriteProcessException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
private static JSONObject getJsonSchema() {
TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
JSONObject s1 = new JSONObject();
s1.put(JsonFormatConstant.MEASUREMENT_UID, "s1");
s1.put(JsonFormatConstant.DATA_TYPE, TSDataType.INT32.toString());
s1.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s2 = new JSONObject();
s2.put(JsonFormatConstant.MEASUREMENT_UID, "s2");
s2.put(JsonFormatConstant.DATA_TYPE, TSDataType.INT64.toString());
s2.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s3 = new JSONObject();
s3.put(JsonFormatConstant.MEASUREMENT_UID, "s3");
s3.put(JsonFormatConstant.DATA_TYPE, TSDataType.FLOAT.toString());
s3.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s4 = new JSONObject();
s4.put(JsonFormatConstant.MEASUREMENT_UID, "s4");
s4.put(JsonFormatConstant.DATA_TYPE, TSDataType.DOUBLE.toString());
s4.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONArray measureGroup = new JSONArray();
measureGroup.put(s1);
measureGroup.put(s2);
measureGroup.put(s3);
measureGroup.put(s4);
JSONObject jsonSchema = new JSONObject();
jsonSchema.put(JsonFormatConstant.DELTA_TYPE, "test_type");
jsonSchema.put(JsonFormatConstant.JSON_SCHEMA, measureGroup);
return jsonSchema;
}
public static void main(String[] args) throws FileNotFoundException, IOException{
String filePath = "example_mr.tsfile";
File file = new File(filePath);
file.delete();
writeTsFile(filePath);
TsFile tsFile = new TsFile(new TsRandomAccessLocalFileReader(filePath));
LOGGER.info("Get columns information: {}",tsFile.getAllColumns());
LOGGER.info("Get all deltaObjectId: {}",tsFile.getAllDeltaObject());
tsFile.close();
}
}

View File

@ -0,0 +1,95 @@
package cn.edu.thu.tsfile.hadoop.io;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
/**
* This class is used to wrap the {@link}FSDataInputStream and implement the
* interface {@link}TSRandomAccessFileReader.
*
* @author liukun
*/
public class HDFSInputStream implements ITsRandomAccessFileReader {
private FSDataInputStream fsDataInputStream;
private FileStatus fileStatus;
public HDFSInputStream(String filePath) throws IOException {
this(filePath, new Configuration());
}
public HDFSInputStream(String filePath, Configuration configuration) throws IOException {
this(new Path(filePath), configuration);
}
public HDFSInputStream(Path path, Configuration configuration) throws IOException {
FileSystem fs = FileSystem.get(configuration);
fsDataInputStream = fs.open(path);
fileStatus = fs.getFileStatus(path);
}
public void seek(long offset) throws IOException {
fsDataInputStream.seek(offset);
}
public int read() throws IOException {
return fsDataInputStream.read();
}
public long length() throws IOException {
return fileStatus.getLen();
}
public int readInt() throws IOException {
return fsDataInputStream.readInt();
}
public void close() throws IOException {
fsDataInputStream.close();
}
public long getPos() throws IOException {
return fsDataInputStream.getPos();
}
/**
* Read the data into b, and the check the length
*
* @param b
* read the data into
* @param off
* the begin offset to read into
* @param len
* the length to read into
*/
public int read(byte[] b, int off, int len) throws IOException {
if (len < 0) {
throw new IndexOutOfBoundsException();
}
int n = 0;
while (n < len) {
int count = fsDataInputStream.read(b, off + n, len - n);
if (count < 0) {
throw new IOException("The read length is out of the length of inputstream");
}
n += count;
}
return n;
}
}

View File

@ -0,0 +1,74 @@
package cn.edu.thu.tsfile.hadoop.io;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
/**
* This class is used to wrap the {@link}FSDataOutputStream and implement the
* interface {@link}TSRandomAccessFileWriter
*
* @author liukun
*/
public class HDFSOutputStream implements ITsRandomAccessFileWriter {
private FSDataOutputStream fsDataOutputStream;
public HDFSOutputStream(String filePath, boolean overwriter) throws IOException {
this(filePath, new Configuration(), overwriter);
}
public HDFSOutputStream(String filePath, Configuration configuration, boolean overwriter) throws IOException {
this(new Path(filePath), configuration, overwriter);
}
public HDFSOutputStream(Path path, Configuration configuration, boolean overwriter) throws IOException {
FileSystem fsFileSystem = FileSystem.get(configuration);
fsDataOutputStream = fsFileSystem.create(path, overwriter);
}
@Override
public OutputStream getOutputStream() {
return fsDataOutputStream;
}
@Override
public long getPos() throws IOException {
return fsDataOutputStream.getPos();
}
@Override
public void seek(long offset) throws IOException {
throw new IOException("Not support");
}
@Override
public void write(int b) throws IOException {
fsDataOutputStream.write(b);
}
@Override
public void write(byte[] b) throws IOException {
fsDataOutputStream.write(b);
}
@Override
public void close() throws IOException {
fsDataOutputStream.close();
}
}

View File

@ -0,0 +1,76 @@
package cn.edu.thu.tsfile.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.thu.tsfile.hadoop.io.HDFSInputStream;
import cn.edu.thu.tsfile.hadoop.io.HDFSOutputStream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class InputOutputStreamTest {
private HDFSInputStream hdfsInputStream = null;
private HDFSOutputStream hdfsOutputStream = null;
private int lenOfBytes = 50;
private byte b = 10;
private byte[] bs = new byte[lenOfBytes];
private byte[] rbs = new byte[lenOfBytes];
private String filename = "testinputandoutputstream.file";
private Path path;
private FileSystem fileSystem;
@Before
public void setUp() throws Exception {
fileSystem = FileSystem.get(new Configuration());
path = new Path(filename);
fileSystem.delete(path,true);
}
@After
public void tearDown() throws Exception {
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
}
@Test
public void test() throws Exception {
// write one byte
hdfsOutputStream = new HDFSOutputStream(filename, new Configuration(), true);
hdfsOutputStream.write(b);
assertEquals(1, hdfsOutputStream.getPos());
hdfsOutputStream.close();
assertEquals(true, fileSystem.exists(path));
fileSystem.delete(path, true);
assertEquals(false, fileSystem.exists(path));
// write bytes
hdfsOutputStream = new HDFSOutputStream(filename, new Configuration(), true);
hdfsOutputStream.write(bs);
assertEquals(bs.length, hdfsOutputStream.getPos());
hdfsOutputStream.close();
assertEquals(true, fileSystem.exists(path));
// read bytes using hdfs inputstream
hdfsInputStream = new HDFSInputStream(filename);
assertEquals(0, hdfsInputStream.getPos());
assertEquals(lenOfBytes, hdfsInputStream.length());
hdfsInputStream.seek(10);
assertEquals(10, hdfsInputStream.getPos());
hdfsInputStream.seek(0);
hdfsInputStream.read(rbs, 0, rbs.length);
assertEquals(lenOfBytes, hdfsInputStream.getPos());
assertArrayEquals(bs, rbs);
hdfsInputStream.close();
assertEquals(true, fileSystem.exists(path));
fileSystem.delete(path, true);
assertEquals(false, fileSystem.exists(path));
}
}

View File

@ -0,0 +1,182 @@
package cn.edu.thu.tsfile.hadoop;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
public class TSFHadoopTest {
private TSFInputFormat inputformat = null;
private String tsfilePath = "tsfile";
@Before
public void setUp() throws Exception {
TsFileTestHelper.deleteTsFile(tsfilePath);
inputformat = new TSFInputFormat();
}
@After
public void tearDown() throws Exception {
TsFileTestHelper.deleteTsFile(tsfilePath);
}
@Test
public void staticMethodTest() {
Job job = null;
try {
job = Job.getInstance();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
//
// columns
//
String[] value = { "s1", "s2", "s3" };
try {
TSFInputFormat.setReadMeasurementIds(job, value);
String[] getValue = (String[])TSFInputFormat.getReadMeasurementIds(job.getConfiguration()).toArray();
assertArrayEquals(value, getValue);
} catch (TSFHadoopException e) {
e.printStackTrace();
fail(e.getMessage());
}
//
// deviceid
//
TSFInputFormat.setReadDeltaObjectId(job, true);
assertEquals(true, TSFInputFormat.getReadDeltaObject(job.getConfiguration()));
//
// time
//
TSFInputFormat.setReadTime(job, true);
assertEquals(true, TSFInputFormat.getReadTime(job.getConfiguration()));
//
// filter
//
TSFInputFormat.setHasFilter(job, true);
assertEquals(true, TSFInputFormat.getHasFilter(job.getConfiguration()));
String filterType = "singleFilter";
TSFInputFormat.setFilterType(job, filterType);
assertEquals(filterType, TSFInputFormat.getFilterType(job.getConfiguration()));
String filterExpr = "s1>100";
TSFInputFormat.setFilterExp(job, filterExpr);
assertEquals(filterExpr, TSFInputFormat.getFilterExp(job.getConfiguration()));
}
@Test
public void InputFormatTest() {
//
// test getinputsplit method
//
TsFileTestHelper.writeTsFile(tsfilePath);
try {
Job job = Job.getInstance();
// set input path to the job
TSFInputFormat.setInputPaths(job, tsfilePath);
List<InputSplit> inputSplits = inputformat.getSplits(job);
ITsRandomAccessFileReader reader = new TsRandomAccessLocalFileReader(tsfilePath);
TsFile tsFile = new TsFile(reader);
System.out.println(tsFile.getDeltaObjectRowGroupCount());
//assertEquals(tsFile.getRowGroupPosList().size(), inputSplits.size());
for (InputSplit inputSplit : inputSplits) {
System.out.println(inputSplit);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void RecordReaderTest() {
TsFileTestHelper.writeTsFile(tsfilePath);
try {
Job job = Job.getInstance();
// set input path to the job
TSFInputFormat.setInputPaths(job, tsfilePath);
String[] devices = {"root.car.d1"};
TSFInputFormat.setReadDeltaObjectIds(job, devices);
String[] sensors = { "s1", "s2", "s3", "s4", "s5", "s6"};
TSFInputFormat.setReadMeasurementIds(job, sensors);
TSFInputFormat.setReadDeltaObjectId(job, false);
TSFInputFormat.setReadTime(job, false);
List<InputSplit> inputSplits = inputformat.getSplits(job);
ITsRandomAccessFileReader reader = new TsRandomAccessLocalFileReader(tsfilePath);
TsFile tsFile = new TsFile(reader);
System.out.println(tsFile.getDeltaObjectRowGroupCount());
//assertEquals(tsFile.getRowGroupPosList().size(), inputSplits.size());
for (InputSplit inputSplit : inputSplits) {
System.out.println(inputSplit);
}
reader.close();
// read one split
TSFRecordReader recordReader = new TSFRecordReader();
TaskAttemptContextImpl attemptContextImpl = new TaskAttemptContextImpl(job.getConfiguration(),
new TaskAttemptID());
recordReader.initialize(inputSplits.get(0), attemptContextImpl);
while (recordReader.nextKeyValue()) {
assertEquals(recordReader.getCurrentValue().get().length, sensors.length);
for (Writable writable : recordReader.getCurrentValue().get()) {
if (writable instanceof IntWritable) {
assertEquals(writable.toString(), "1");
} else if (writable instanceof LongWritable) {
assertEquals(writable.toString(), "1");
} else if (writable instanceof FloatWritable) {
assertEquals(writable.toString(), "0.1");
} else if (writable instanceof DoubleWritable) {
assertEquals(writable.toString(), "0.1");
} else if (writable instanceof BooleanWritable) {
assertEquals(writable.toString(), "true");
} else if (writable instanceof Text) {
assertEquals(writable.toString(), "tsfile");
} else {
fail(String.format("Not support type %s", writable.getClass().getName()));
}
}
}
recordReader.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} catch (InterruptedException e) {
e.printStackTrace();
fail(e.getMessage());
} catch (TSFHadoopException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

View File

@ -0,0 +1,79 @@
package cn.edu.thu.tsfile.hadoop;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Before;
import org.junit.Test;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
/**
* Test the {@link cn.edu.thu.tsfile.hadoop.TSFInputSplit}
* Assert the readFields function and write function is right
*
* @author liukun
*/
public class TSFInputSplitTest {
private TSFInputSplit wInputSplit;
private TSFInputSplit rInputSplit;
private DataInputBuffer DataInputBuffer = new DataInputBuffer();
private DataOutputBuffer DataOutputBuffer = new DataOutputBuffer();
@Before
public void setUp() throws Exception {
// For the test data
Path path = new Path("input");
String deviceId = "d1";
int numOfRowGroupMetaDate = 1;
List<RowGroupMetaData> rowGroupMetaDataList = new ArrayList<>();
rowGroupMetaDataList.add(new RowGroupMetaData("d1", 1, 10, new ArrayList<TimeSeriesChunkMetaData>(), "Int"));
rowGroupMetaDataList.add(new RowGroupMetaData("d1", 2, 20, new ArrayList<TimeSeriesChunkMetaData>(), "Int"));
rowGroupMetaDataList.add(new RowGroupMetaData("d2", 3, 30, new ArrayList<TimeSeriesChunkMetaData>(), "Float"));
long start = 5;
long length = 100;
String[] hosts = {"192.168.1.1", "192.168.1.0", "localhost"};
wInputSplit = new TSFInputSplit(path, rowGroupMetaDataList, start, length, hosts);
rInputSplit = new TSFInputSplit();
}
@Test
public void testInputSplitWriteAndRead() {
try {
// call the write method to serialize the object
wInputSplit.write(DataOutputBuffer);
DataOutputBuffer.flush();
DataInputBuffer.reset(DataOutputBuffer.getData(), DataOutputBuffer.getLength());
rInputSplit.readFields(DataInputBuffer);
DataInputBuffer.close();
DataOutputBuffer.close();
// assert
assertEquals(wInputSplit.getPath(), rInputSplit.getPath());
assertEquals(wInputSplit.getNumOfDeviceRowGroup(), rInputSplit.getNumOfDeviceRowGroup());
//assertEquals(wInputSplit.getDeviceRowGroupMetaDataList(), rInputSplit.getDeviceRowGroupMetaDataList());
assertEquals(wInputSplit.getStart(), rInputSplit.getStart());
try {
assertEquals(wInputSplit.getLength(), rInputSplit.getLength());
assertArrayEquals(wInputSplit.getLocations(), rInputSplit.getLocations());
} catch (InterruptedException e) {
e.printStackTrace();
fail(e.getMessage());
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

View File

@ -0,0 +1,132 @@
package cn.edu.thu.tsfile.hadoop;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.json.JSONArray;
import org.json.JSONObject;
import cn.edu.tsinghua.tsfile.common.conf.TSFileConfig;
import cn.edu.tsinghua.tsfile.common.conf.TSFileDescriptor;
import cn.edu.tsinghua.tsfile.common.constant.JsonFormatConstant;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
import cn.edu.tsinghua.tsfile.timeseries.write.exception.WriteProcessException;
public class TsFileTestHelper {
public static void deleteTsFile(String filePath){
File file = new File(filePath);
file.delete();
}
public static void restoreConf(){
}
public static void writeTsFile(String filePath) {
TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
conf.pageSizeInByte=100;
conf.groupSizeInByte = 3000;
conf.pageCheckSizeThreshold = 1;
conf.maxStringLength = 2;
File file = new File(filePath);
if (file.exists())
file.delete();
JSONObject jsonSchema = getJsonSchema();
try {
ITsRandomAccessFileWriter output = new TsRandomAccessFileWriter(new File(filePath));
TsFile tsFile = new TsFile(output, jsonSchema);
String line = "";
for(int i = 1;i<10;i++){
line = "root.car.d1,"+i+",s1,1,s2,1,s3,0.1,s4,0.1,s5,true,s6,tsfile";
tsFile.writeLine(line);
}
tsFile.writeLine("root.car.d2,5, s1, 5, s2, 50, s3, 200.5, s4, 0.5");
tsFile.writeLine("root.car.d2,6, s1, 6, s2, 60, s3, 200.6, s4, 0.6");
tsFile.writeLine("root.car.d2,7, s1, 7, s2, 70, s3, 200.7, s4, 0.7");
tsFile.writeLine("root.car.d2,8, s1, 8, s2, 80, s3, 200.8, s4, 0.8");
tsFile.close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
} catch (WriteProcessException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
private static JSONObject getJsonSchema() {
TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
JSONObject s1 = new JSONObject();
s1.put(JsonFormatConstant.MEASUREMENT_UID, "s1");
s1.put(JsonFormatConstant.DATA_TYPE, TSDataType.INT32.toString());
s1.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s2 = new JSONObject();
s2.put(JsonFormatConstant.MEASUREMENT_UID, "s2");
s2.put(JsonFormatConstant.DATA_TYPE, TSDataType.INT64.toString());
s2.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s3 = new JSONObject();
s3.put(JsonFormatConstant.MEASUREMENT_UID, "s3");
s3.put(JsonFormatConstant.DATA_TYPE, TSDataType.FLOAT.toString());
s3.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s4 = new JSONObject();
s4.put(JsonFormatConstant.MEASUREMENT_UID, "s4");
s4.put(JsonFormatConstant.DATA_TYPE, TSDataType.DOUBLE.toString());
s4.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s5 = new JSONObject();
s5.put(JsonFormatConstant.MEASUREMENT_UID, "s5");
s5.put(JsonFormatConstant.DATA_TYPE, TSDataType.BOOLEAN.toString());
s5.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONObject s6 = new JSONObject();
s6.put(JsonFormatConstant.MEASUREMENT_UID, "s6");
s6.put(JsonFormatConstant.DATA_TYPE, TSDataType.TEXT.toString());
s6.put(JsonFormatConstant.MEASUREMENT_ENCODING, conf.valueEncoder);
JSONArray measureGroup = new JSONArray();
measureGroup.put(s1);
measureGroup.put(s2);
measureGroup.put(s3);
measureGroup.put(s4);
measureGroup.put(s5);
measureGroup.put(s6);
JSONObject jsonSchema = new JSONObject();
jsonSchema.put(JsonFormatConstant.DELTA_TYPE, "test_type");
jsonSchema.put(JsonFormatConstant.JSON_SCHEMA, measureGroup);
return jsonSchema;
}
public static void main(String[] args) throws FileNotFoundException, IOException{
String filePath = "tsfile";
File file = new File(filePath);
System.out.println(file.exists());
file.delete();
writeTsFile(filePath);
TsFile tsFile = new TsFile(new TsRandomAccessLocalFileReader(filePath));
System.out.println(tsFile.getAllColumns());
System.out.println(tsFile.getAllDeltaObject());
System.out.println(tsFile.getAllSeries());
System.out.println(tsFile.getDeltaObjectRowGroupCount());
System.out.println(tsFile.getDeltaObjectTypes());
System.out.println(tsFile.getRowGroupPosList());
tsFile.close();
}
}

21
iotdb/.local.ci Normal file
View File

@ -0,0 +1,21 @@
#!/bin/bash
#pwd
#rm -rf data
## get tsfile and jdbc
#rm -rf tsfile
#git clone https://github.com/thulab/tsfile.git
#cd tsfile
#mvn clean install -Dmaven.test.skip=true
#cd ..
#rm -rf iotdb-jdbc
#git clone https://github.com/thulab/iotdb-jdbc.git
#cd iotdb-jdbc
#mvn clean install -Dmaven.test.skip=true
#cd ..
##begin ...
#rm -rf data
#mvn clean test
#rm -rf data

224
iotdb/pom.xml Normal file
View File

@ -0,0 +1,224 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>root</artifactId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>IoTDB</artifactId>
<packaging>jar</packaging>
<name>IoTDB</name>
<url>http://maven.apache.org</url>
<properties>
<common.cli.version>1.3.1</common.cli.version>
<antlr3.version>3.5.2</antlr3.version>
<jline.version>2.14.5</jline.version>
<it.test.includes>**/*Test.java</it.test.includes>
<it.test.excludes>**/NoTest.java</it.test.excludes>
</properties>
<dependencies>
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.edu.fudan.dsm</groupId>
<artifactId>kvmatch-iotdb</artifactId>
<version>1.0.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>tsfile</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>${common.cli.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons.collections4}</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr3-maven-plugin</artifactId>
<version>${antlr3.version}</version>
</dependency>
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>${jline.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${compile.version}</source>
<target>${compile.version}</target>
<encoding>UTF-8</encoding>
</configuration>
<!-- compile java files in src/it/java -->
<executions>
<execution>
<id>compile-integration-test</id>
<phase>pre-integration-test</phase>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<testIncludes>
<testInclude>**/*.java</testInclude>
</testIncludes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.9</version>
<configuration>
<runOrder>alphabetical</runOrder>
<excludes>
<exclude>${it.test.excludes}</exclude>
</excludes>
<includes>
<include>${it.test.includes}</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.basedir}/iotdb/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>copy-native-libraries</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy todir="${project.basedir}/iotdb/lib">
<fileset dir="${project.basedir}/target/">
<include name="*.jar" />
</fileset>
</copy>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr3-maven-plugin</artifactId>
<version>${antlr3.version}</version>
<executions>
<execution>
<goals>
<goal>antlr</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/target/generated-sources/antlr3</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<!-- <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
<version>2.7</version>
<configuration>
<quiet>true</quiet>
<formats>
<format>xml</format>
</formats>
<instrumentation>
<ignoreTrivial>true</ignoreTrivial>
</instrumentation>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>cobertura</goal>
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>
</project>

Some files were not shown because too many files have changed in this diff Show More