[IOTDB-294]online upgrade from v0.8.0 to current version (#467)

* online upgrade from 0.8.0 to 0.9.0
This commit is contained in:
gwmh 2019-11-12 15:21:11 +08:00 committed by Jialin Qiao
parent 35be4849ee
commit 8682fe2646
31 changed files with 1783 additions and 64 deletions

View File

@ -201,6 +201,15 @@ chunk_buffer_pool_enable=false
# data.
# default_ttl=36000000
####################
### Upgrade Configurations
####################
# When there exists old version(v0.8.x) data, how many thread will be set up to perform upgrade tasks, 1 by default.
# Set to 1 when less than or equal to 0.
upgrade_thread_num=1
####################
### Merge Configurations
####################

View File

@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# how many thread will be set up to perform offline upgrade tasks
upgrade_thread_num=10
# Used to specify the data dirs that need to be upgraded
# Commas could be used to separate the folder paths if there are more than one data dir that needs to be upgraded
old_version_data_dirs=/Users/tianyu/3sjar/data
# Used to specify the upgrading data dirs
# It is worth noting that the length of the old_version_data_dirs and new_version_data_dirs parameters should be equal.
new_version_data_dirs=/Users/tianyu/3sjar/data1

View File

@ -0,0 +1,67 @@
@REM
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM
@echo off
if "%OS%" == "Windows_NT" setlocal
pushd %~dp0..\..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
set IOTDB_CONF=%IOTDB_HOME%\conf
if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.upgrade.OfflineUpgradeTool
if NOT DEFINED JAVA_HOME goto :err
@REM -----------------------------------------------------------------------------
@REM JVM Opts we'll use in legacy run or installation
set JAVA_OPTS=-ea^
-Dlogback.configurationFile="%IOTDB_CONF%\logback-tool.xml"^
-DIOTDB_HOME=%IOTDB_HOME%
@REM ***** CLASSPATH library setting *****
@REM Ensure that any user defined CLASSPATH variables are not used on startup
set CLASSPATH="%IOTDB_HOME%\lib"
@REM For each jar in the IOTDB_HOME lib directory call append to build the CLASSPATH variable.
for %%i in ("%IOTDB_HOME%\lib\*.jar") do call :append "%%i"
goto okClasspath
:append
set CLASSPATH=%CLASSPATH%;%1
goto :eof
@REM -----------------------------------------------------------------------------
:okClasspath
"%JAVA_HOME%\bin\java" %JAVA_OPTS% %JAVA_OPTS% -cp "%CLASSPATH%" %MAIN_CLASS% %*
goto finally
:err
echo JAVA_HOME environment variable must be set!
pause
@REM -----------------------------------------------------------------------------
:finally
ENDLOCAL

View File

@ -0,0 +1,47 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
if [ -z "${IOTDB_HOME}" ]; then
export IOTDB_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
fi
IOTDB_CONF=${IOTDB_HOME}/conf
CLASSPATH=""
for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
MAIN_CLASS=org.apache.iotdb.db.tools.upgrade.OfflineUpgradeTool
if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
if [ -x "$java" ]; then
JAVA="$java"
break
fi
done
else
JAVA=java
fi
iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback-tool.xml"
exec "$JAVA" $iotdb_parms -cp "$CLASSPATH" "$MAIN_CLASS" "$@"

View File

@ -336,6 +336,11 @@ public class IoTDBConfig {
*/
private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
/**
* How many threads will be set up to perform upgrade tasks.
*/
private int upgradeThreadNum = 1;
/**
* How many threads will be set up to perform main merge tasks.
*/
@ -1169,6 +1174,14 @@ public class IoTDBConfig {
this.hdfsPort = hdfsPort;
}
public int getUpgradeThreadNum() {
return upgradeThreadNum;
}
public void setUpgradeThreadNum(int upgradeThreadNum) {
this.upgradeThreadNum = upgradeThreadNum;
}
public String getDfsNameServices() {
return dfsNameServices;
}

View File

@ -232,6 +232,8 @@ public class IoTDBDescriptor {
conf.setExternalSortThreshold(Integer.parseInt(properties
.getProperty("external_sort_threshold",
Integer.toString(conf.getExternalSortThreshold()))));
conf.setUpgradeThreadNum(Integer.parseInt(properties.getProperty("upgrade_thread_num",
Integer.toString(conf.getUpgradeThreadNum()))));
conf.setMergeMemoryBudget(Long.parseLong(properties.getProperty("merge_memory_budget",
Long.toString(conf.getMergeMemoryBudget()))));
conf.setMergeThreadNum(Integer.parseInt(properties.getProperty("merge_thread_num",

View File

@ -58,6 +58,7 @@ import org.apache.iotdb.db.query.control.JobFileManager;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@ -104,6 +105,8 @@ public class StorageEngine implements IService {
throw new StorageEngineFailureException(e);
}
// recover upgrade process
UpgradeUtils.recoverUpgrade();
/*
* recover all storage group processors.
*/
@ -332,6 +335,33 @@ public class StorageEngine implements IService {
return Collections.emptyList();
}
/**
* count all Tsfiles which need to be upgraded
* @return total num of the tsfiles which need to be upgraded
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
}
return totalUpgradeFileNum;
}
/**
* upgrade all storage groups.
*
* @throws StorageEngineException StorageEngineException
*/
public void upgradeAll() throws StorageEngineException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new StorageEngineException(
"Current system mode is read only, does not support file upgrade");
}
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
storageGroupProcessor.upgrade();
}
}
/**
* merge all storage groups.
*

View File

@ -36,14 +36,12 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@ -69,10 +67,10 @@ public class MergeResource {
private boolean cacheDeviceMeta = false;
public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
this.seqFiles =
seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.unseqFiles =
unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.seqFiles = seqFiles.stream().filter(this::filterResource)
.collect(Collectors.toList());
this.unseqFiles = unseqFiles.stream().filter(this::filterResource)
.collect(Collectors.toList());
}
private boolean filterResource(TsFileResource res) {

View File

@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -155,6 +156,24 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
selectOverlappedSeqFiles(unseqFile);
// skip if the unseqFile and tmpSelectedSeqFiles has TsFileResources that need to be upgraded
boolean isNeedUpgrade = false;
if (UpgradeUtils.isNeedUpgrade(unseqFile)) {
isNeedUpgrade = true;
}
for (Integer seqIdx : tmpSelectedSeqFiles) {
if (UpgradeUtils.isNeedUpgrade(resource.getSeqFiles().get(seqIdx))) {
isNeedUpgrade = true;
break;
}
}
if (isNeedUpgrade) {
tmpSelectedSeqFiles.clear();
unseqIndex++;
timeConsumption = System.currentTimeMillis() - startTime;
continue;
}
tempMaxSeqFileCost = maxSeqFileCost;
long newCost = useTightBound ? calculateTightMemoryCost(unseqFile, tmpSelectedSeqFiles,
startTime, timeLimit) :
@ -181,7 +200,7 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
}
private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
if (seqSelectedNum == resource.getSeqFiles().size()) {
if (seqSelectedNum == resource.getSeqFiles().size() || UpgradeUtils.isNeedUpgrade(unseqFile)) {
return;
}
int tmpSelectedNum = 0;
@ -200,7 +219,7 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
if (unseqEndTime <= seqEndTime) {
// the unseqFile overlaps current seqFile
tmpSelectedSeqFiles.add(i);
tmpSelectedNum ++;
tmpSelectedNum++;
// the device of the unseqFile can not merge with later seqFiles
noMoreOverlap = true;
} else if (unseqStartTime <= seqEndTime) {
@ -251,7 +270,8 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
private long calculateTightMemoryCost(TsFileResource tmpSelectedUnseqFile,
Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
this::calculateTightUnseqMemoryCost, this::calculateTightSeqMemoryCost, startTime, timeLimit);
this::calculateTightUnseqMemoryCost, this::calculateTightSeqMemoryCost, startTime,
timeLimit);
}
private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
@ -264,11 +284,13 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
return cost;
}
private long calculateTightFileMemoryCost(TsFileResource seqFile, IFileQueryMemMeasurement measurement)
private long calculateTightFileMemoryCost(TsFileResource seqFile,
IFileQueryMemMeasurement measurement)
throws IOException {
Long cost = maxSeriesQueryCostMap.get(seqFile);
if (cost == null) {
long[] chunkNums = MergeUtils.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
long[] chunkNums = MergeUtils
.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
long totalChunkNum = chunkNums[0];
long maxChunkNum = chunkNums[1];
cost = measurement.measure(seqFile) * maxChunkNum / totalChunkNum;

View File

@ -116,7 +116,7 @@ class MergeFileTask {
return;
}
seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
TsFileMetaDataCache.getInstance().remove(seqFile);
DeviceMetaDataCache.getInstance().remove(seqFile);
@ -163,7 +163,7 @@ class MergeFileTask {
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}
@ -217,7 +217,7 @@ class MergeFileTask {
mergeLogger.logFileMergeEnd();
logger.debug("{} moved unmerged chunks of {} to the new file", taskName, seqFile);
seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
resource.removeFileReader(seqFile);
TsFileMetaDataCache.getInstance().remove(seqFile);
@ -232,7 +232,7 @@ class MergeFileTask {
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}

View File

@ -71,6 +71,8 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.JobFileManager;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
@ -657,7 +659,7 @@ public class StorageGroupProcessor {
return;
}
// ensure that the file is not used by any queries
if (resource.getMergeQueryLock().writeLock().tryLock()) {
if (resource.getWriteQueryLock().writeLock().tryLock()) {
try {
// physical removal
resource.remove();
@ -671,7 +673,7 @@ public class StorageGroupProcessor {
unSequenceFileList.remove(resource);
}
} finally {
resource.getMergeQueryLock().writeLock().unlock();
resource.getWriteQueryLock().writeLock().unlock();
}
}
} finally {
@ -962,6 +964,35 @@ public class StorageGroupProcessor {
}
}
/**
* count all Tsfiles in the storage group which need to be upgraded
*
* @return total num of the tsfiles which need to be upgraded in the storage group
*/
public int countUpgradeFiles() {
int cntUpgradeFileNum = 0;
for (TsFileResource seqTsFileResource : sequenceFileList) {
if (UpgradeUtils.isNeedUpgrade(seqTsFileResource)) {
cntUpgradeFileNum += 1;
}
}
for (TsFileResource unseqTsFileResource : unSequenceFileList) {
if (UpgradeUtils.isNeedUpgrade(unseqTsFileResource)) {
cntUpgradeFileNum += 1;
}
}
return cntUpgradeFileNum;
}
public void upgrade() {
for (TsFileResource seqTsFileResource : sequenceFileList) {
seqTsFileResource.doUpgrade();
}
for (TsFileResource unseqTsFileResource : unSequenceFileList) {
unseqTsFileResource.doUpgrade();
}
}
public void merge(boolean fullMerge) {
writeLock();
try {
@ -1046,17 +1077,17 @@ public class StorageGroupProcessor {
}
for (TsFileResource unseqFile : unseqFiles) {
unseqFile.getMergeQueryLock().writeLock().lock();
unseqFile.getWriteQueryLock().writeLock().lock();
try {
unseqFile.remove();
} finally {
unseqFile.getMergeQueryLock().writeLock().unlock();
unseqFile.getWriteQueryLock().writeLock().unlock();
}
}
}
private void updateMergeModification(TsFileResource seqFile) {
seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
// remove old modifications and write modifications generated during merge
seqFile.removeModFile();
@ -1069,7 +1100,7 @@ public class StorageGroupProcessor {
logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
seqFile.getFile(), e);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}
@ -1301,12 +1332,12 @@ public class StorageGroupProcessor {
if (deletedTsFileResource == null) {
return;
}
deletedTsFileResource.getMergeQueryLock().writeLock().lock();
deletedTsFileResource.getWriteQueryLock().writeLock().lock();
try {
logger.info("Delete tsfile {} in sync loading process.", deletedTsFileResource.getFile());
deletedTsFileResource.remove();
} finally {
deletedTsFileResource.getMergeQueryLock().writeLock().unlock();
deletedTsFileResource.getWriteQueryLock().writeLock().unlock();
}
}

View File

@ -18,7 +18,10 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
import java.io.*;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -28,9 +31,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class TsFileResource {
@ -59,6 +65,7 @@ public class TsFileResource {
private volatile boolean deleted = false;
private volatile boolean isMerging = false;
/**
* Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
* process.
@ -70,7 +77,7 @@ public class TsFileResource {
*/
private ReadOnlyMemChunk readOnlyMemChunk;
private ReentrantReadWriteLock mergeQueryLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock writeQueryLock = new ReentrantReadWriteLock();
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
@ -230,8 +237,14 @@ public class TsFileResource {
return processor;
}
public ReentrantReadWriteLock getMergeQueryLock() {
return mergeQueryLock;
public ReentrantReadWriteLock getWriteQueryLock() {
return writeQueryLock;
}
public void doUpgrade() {
if (UpgradeUtils.isNeedUpgrade(this)) {
UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this));
}
}
public void removeModFile() throws IOException {

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.upgrade;
public enum UpgradeCheckStatus {
BEGIN_UPGRADE_FILE(1), AFTER_UPGRADE_FILE(2), UPGRADE_SUCCESS(3);
private final int checkStatusCode;
UpgradeCheckStatus(int checkStatusCode) {
this.checkStatusCode = checkStatusCode;
}
public int getCheckStatusCode() {
return checkStatusCode;
}
@Override
public String toString() {
return String.valueOf(checkStatusCode);
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.upgrade;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UpgradeLog {
private static final Logger logger = LoggerFactory.getLogger(UpgradeLog.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String UPGRADE_DIR = "upgrade";
private static final String UPGRADE_LOG_NAME = "upgrade.txt";
private static BufferedWriter upgradeLogWriter;
private static File upgradeLogPath = SystemFileFactory.INSTANCE
.getFile(SystemFileFactory.INSTANCE.getFile(config.getSystemDir(), UPGRADE_DIR),
UPGRADE_LOG_NAME);
public static boolean createUpgradeLog() {
try {
if (!upgradeLogPath.getParentFile().exists()) {
upgradeLogPath.getParentFile().mkdirs();
}
upgradeLogPath.createNewFile();
upgradeLogWriter = FSFactoryProducer.getFSFactory()
.getBufferedWriter(getUpgradeLogPath(), true);
return true;
} catch (IOException e) {
logger.error("meet error when create upgrade log, file path:{}",
upgradeLogPath, e);
return false;
}
}
public static String getUpgradeLogPath() {
return upgradeLogPath.getAbsolutePath();
}
public static boolean writeUpgradeLogFile(String content) {
UpgradeUtils.getUpgradeLogLock().writeLock().lock();
try {
upgradeLogWriter.write(content);
upgradeLogWriter.newLine();
upgradeLogWriter.flush();
return true;
} catch (IOException e) {
logger.error("write upgrade log file failed, the log file:{}", getUpgradeLogPath(), e);
return false;
} finally {
UpgradeUtils.getUpgradeLogLock().writeLock().unlock();
}
}
public static void closeLogWriter() {
try {
if (upgradeLogWriter != null) {
upgradeLogWriter.close();
}
} catch (IOException e) {
logger.error("close upgrade log file failed, the log file:{}", getUpgradeLogPath(), e);
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.upgrade;
import java.io.IOException;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.tool.upgrade.UpgradeTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UpgradeTask implements Runnable {
private final TsFileResource upgradeResource;
private static final Logger logger = LoggerFactory.getLogger(UpgradeTask.class);
private static final String COMMA_SEPERATOR = ",";
public UpgradeTask(TsFileResource upgradeResource) {
this.upgradeResource = upgradeResource;
}
@Override
public void run() {
try {
upgradeResource.getWriteQueryLock().readLock().lock();
String tsfilePathBefore = upgradeResource.getFile().getAbsolutePath();
String tsfilePathAfter = UpgradeUtils.getUpgradeFileName(upgradeResource.getFile());
UpgradeLog.writeUpgradeLogFile(
tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.BEGIN_UPGRADE_FILE);
try {
UpgradeTool.upgradeOneTsfile(tsfilePathBefore, tsfilePathAfter);
UpgradeLog.writeUpgradeLogFile(
tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.AFTER_UPGRADE_FILE);
} catch (IOException e) {
logger
.error("generate upgrade file failed, the file to be upgraded:{}", tsfilePathBefore, e);
return;
} finally {
upgradeResource.getWriteQueryLock().readLock().unlock();
}
upgradeResource.getWriteQueryLock().writeLock().lock();
try {
FSFactoryProducer.getFSFactory().getFile(tsfilePathBefore).delete();
FSFactoryProducer.getFSFactory()
.moveFile(FSFactoryProducer.getFSFactory().getFile(tsfilePathAfter),
FSFactoryProducer.getFSFactory().getFile(tsfilePathBefore));
UpgradeLog.writeUpgradeLogFile(
tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.UPGRADE_SUCCESS);
FSFactoryProducer.getFSFactory().getFile(tsfilePathAfter).getParentFile().delete();
} finally {
upgradeResource.getWriteQueryLock().writeLock().unlock();
}
UpgradeSevice.setCntUpgradeFileNum(UpgradeSevice.getCntUpgradeFileNum() - 1);
logger.info("Upgrade completes, file path:{} , the remaining upgraded file num: {}",
tsfilePathBefore, UpgradeSevice.getCntUpgradeFileNum());
} catch (Exception e) {
logger.error("meet error when upgrade file:{}", upgradeResource.getFile().getAbsolutePath(),
e);
}
}
}

View File

@ -171,7 +171,7 @@ public class FileReaderManager implements IService {
} else {
closedReferenceMap.computeIfAbsent(tsFile, k -> new AtomicInteger()).getAndIncrement();
}
tsFile.getMergeQueryLock().readLock().lock();
tsFile.getWriteQueryLock().readLock().lock();
}
/**
@ -184,7 +184,7 @@ public class FileReaderManager implements IService {
} else if (closedReferenceMap.containsKey(tsFile)){
closedReferenceMap.get(tsFile).getAndDecrement();
}
tsFile.getMergeQueryLock().readLock().unlock();
tsFile.getWriteQueryLock().readLock().unlock();
}
/**

View File

@ -101,6 +101,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(SyncServerManager.getInstance());
registerManager.register(TVListAllocator.getInstance());
registerManager.register(FlushManager.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
registerManager.register(MergeManager.getINSTANCE());
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(MetricsService.getInstance());

View File

@ -34,6 +34,7 @@ public enum ServiceType {
AUTHORIZATION_SERVICE("Authorization ServerService", ""),
FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
SYNC_SERVICE("SYNC ServerService", ""),
UPGRADE_SERVICE("UPGRADE DataService", ""),
MERGE_SERVICE("Merge Manager", ""),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
MANAGE_DYNAMIC_PARAMETERS_SERVICE("Manage Dynamic Parameters", "Manage Dynamic Parameters"),

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.upgrade.UpgradeLog;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UpgradeSevice implements IService {
private static final Logger logger = LoggerFactory.getLogger(UpgradeSevice.class);
private static final UpgradeSevice INSTANCE = new UpgradeSevice();
private ExecutorService upgradeThreadPool;
private AtomicInteger threadCnt = new AtomicInteger();
private static int cntUpgradeFileNum;
private UpgradeSevice() {
}
public static UpgradeSevice getINSTANCE() {
return INSTANCE;
}
@Override
public void start() throws StartupException {
int updateThreadNum = IoTDBDescriptor.getInstance().getConfig().getUpgradeThreadNum();
if (updateThreadNum <= 0) {
updateThreadNum = 1;
}
upgradeThreadPool = Executors.newFixedThreadPool(updateThreadNum,
r -> new Thread(r, "UpgradeThread-" + threadCnt.getAndIncrement()));
UpgradeLog.createUpgradeLog();
countUpgradeFiles();
upgradeAll();
}
@Override
public void stop() {
UpgradeLog.closeLogWriter();
if (upgradeThreadPool != null) {
upgradeThreadPool.shutdownNow();
logger.info("Waiting for upgrade task pool to shut down");
while (!upgradeThreadPool.isTerminated()) {
// wait
}
upgradeThreadPool = null;
logger.info("Upgrade service stopped");
}
}
@Override
public ServiceType getID() {
return ServiceType.UPGRADE_SERVICE;
}
public static void setCntUpgradeFileNum(int cntUpgradeFileNum) {
UpgradeUtils.getCntUpgradeFileLock().writeLock().lock();
try {
UpgradeSevice.cntUpgradeFileNum = cntUpgradeFileNum;
} finally {
UpgradeUtils.getCntUpgradeFileLock().writeLock().unlock();
}
}
public static int getCntUpgradeFileNum() {
UpgradeUtils.getCntUpgradeFileLock().readLock().lock();
try {
return cntUpgradeFileNum;
} finally {
UpgradeUtils.getCntUpgradeFileLock().readLock().unlock();
}
}
public void submitUpgradeTask(UpgradeTask upgradeTask) {
upgradeThreadPool.submit(upgradeTask);
}
private static void countUpgradeFiles() {
cntUpgradeFileNum = StorageEngine.getInstance().countUpgradeFiles();
logger.info("finish counting upgrading files, total num:{}", cntUpgradeFileNum);
}
private static void upgradeAll() {
try {
StorageEngine.getInstance().upgradeAll();
} catch (StorageEngineException e) {
logger.error("Cannot perform a global upgrade because", e);
}
}
}

View File

@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,6 +94,9 @@ public class SyncFileManager implements ISyncFileManager {
dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
.listFiles();
for (File sgFolder : allSGFolders) {
if (sgFolder.getName().equals(TsFileConstant.PATH_UPGRADE)){
continue;
}
allSGs.add(sgFolder.getName());
currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
File[] files = sgFolder.listFiles();

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.tools.upgrade;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.tool.upgrade.UpgradeTool;
public class OfflineUpgradeTool {
private static List<String> oldVersionTsfileDirs = new ArrayList<>();
private static List<String> newVersionTsfileDirs = new ArrayList<>();
private static int upgradeThreadNum;
private static void loadProps(String configPath) {
InputStream inputStream = null;
try {
inputStream = new FileInputStream(FSFactoryProducer.getFSFactory().getFile(configPath));
} catch (FileNotFoundException e) {
System.out.println(String.format("Fail to find config file:%s", configPath));
e.printStackTrace();
System.exit(1);
}
Properties properties = new Properties();
try {
properties.load(inputStream);
String oldVersionTsfileDirString = properties.getProperty("old_version_data_dirs");
Collections.addAll(oldVersionTsfileDirs, oldVersionTsfileDirString.split(","));
String newVersionTsfileDirString = properties.getProperty("new_version_data_dirs");
Collections.addAll(newVersionTsfileDirs, newVersionTsfileDirString.split(","));
upgradeThreadNum = Integer.parseInt(properties.getProperty("upgrade_thread_num"));
} catch (IOException e) {
System.out.println("Cannot load config file ");
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
loadProps(args[0]);
for (int i = 0; i < oldVersionTsfileDirs.size(); i++) {
UpgradeTool.upgradeTsfiles(oldVersionTsfileDirs.get(i), newVersionTsfileDirs.get(i),
upgradeThreadNum);
}
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.utils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.upgrade.UpgradeCheckStatus;
import org.apache.iotdb.db.engine.upgrade.UpgradeLog;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UpgradeUtils {
private static final Logger logger = LoggerFactory.getLogger(UpgradeUtils.class);
private static final String TMP_STRING = "tmp";
private static final String UPGRADE_FILE_PREFIX = "upgrade_";
private static final String COMMA_SEPERATOR = ",";
private static final ReadWriteLock cntUpgradeFileLock = new ReentrantReadWriteLock();
private static final ReadWriteLock upgradeLogLock = new ReentrantReadWriteLock();
public static ReadWriteLock getCntUpgradeFileLock() {
return cntUpgradeFileLock;
}
public static ReadWriteLock getUpgradeLogLock() {
return upgradeLogLock;
}
/**
* judge whether a tsfile needs to be upgraded
*/
public static boolean isNeedUpgrade(TsFileResource tsFileResource) {
tsFileResource.getWriteQueryLock().readLock().lock();
try (TsFileSequenceReader tsFileSequenceReader = new TsFileSequenceReader(
tsFileResource.getFile().getAbsolutePath())) {
if (tsFileSequenceReader.readVersionNumber().equals(TSFileConfig.OLD_VERSION)) {
return true;
}
} catch (IOException e) {
logger.error("meet error when judge whether file needs to be upgraded, the file's path:{}",
tsFileResource.getFile().getAbsolutePath(), e);
} finally {
tsFileResource.getWriteQueryLock().readLock().unlock();
}
return false;
}
public static String getUpgradeFileName(File upgradeResource) {
return upgradeResource.getParentFile().getParent() + File.separator + TMP_STRING
+ File.separator + UPGRADE_FILE_PREFIX + upgradeResource.getName();
}
public static void recoverUpgrade() {
if (FSFactoryProducer.getFSFactory().getFile(UpgradeLog.getUpgradeLogPath()).exists()) {
try (BufferedReader upgradeLogReader = new BufferedReader(
new FileReader(
FSFactoryProducer.getFSFactory().getFile(UpgradeLog.getUpgradeLogPath())))) {
Map<String, Integer> upgradeRecoverMap = new HashMap<>();
String line = null;
while ((line = upgradeLogReader.readLine()) != null) {
String upgradeFileName = line.split(COMMA_SEPERATOR)[0];
if (upgradeRecoverMap.containsKey(upgradeFileName)) {
upgradeRecoverMap.put(upgradeFileName, upgradeRecoverMap.get(upgradeFileName) + 1);
} else {
upgradeRecoverMap.put(upgradeFileName, 1);
}
}
for (String key : upgradeRecoverMap.keySet()) {
String upgradeFileName = getUpgradeFileName(
FSFactoryProducer.getFSFactory().getFile(key));
if (upgradeRecoverMap.get(key) == UpgradeCheckStatus.BEGIN_UPGRADE_FILE
.getCheckStatusCode()) {
if (FSFactoryProducer.getFSFactory().getFile(upgradeFileName).exists()) {
FSFactoryProducer.getFSFactory().getFile(upgradeFileName).delete();
}
} else if (upgradeRecoverMap.get(key) == UpgradeCheckStatus.AFTER_UPGRADE_FILE
.getCheckStatusCode()) {
if (FSFactoryProducer.getFSFactory().getFile(key).exists() && FSFactoryProducer
.getFSFactory().getFile(upgradeFileName).exists()) {
//if both old tsfile and upgrade file exists, replace the old tsfile with the upgrade one
FSFactoryProducer.getFSFactory().getFile(key).delete();
FSFactoryProducer.getFSFactory()
.moveFile(FSFactoryProducer.getFSFactory().getFile(upgradeFileName),
FSFactoryProducer.getFSFactory().getFile(key));
} else if (!FSFactoryProducer.getFSFactory().getFile(key).exists()) {
//if the old tsfile does not exist, rename the upgrade file to the old tsfile path
FSFactoryProducer.getFSFactory()
.moveFile(FSFactoryProducer.getFSFactory().getFile(upgradeFileName),
FSFactoryProducer.getFSFactory().getFile(key));
}
FSFactoryProducer.getFSFactory().getFile(upgradeFileName).getParentFile()
.delete();
}
}
} catch (IOException e) {
logger.error("meet error when recover upgrade process, file path:{}",
UpgradeLog.getUpgradeLogPath(), e);
} finally {
FSFactoryProducer.getFSFactory().getFile(UpgradeLog.getUpgradeLogPath()).delete();
}
}
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.merge;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class MergeUpgradeTest {
private List<TsFileResource> seqResources = new ArrayList<>();
private List<TsFileResource> unseqResources = new ArrayList<>();
private int seqFileNum = 2;
private TSEncoding encoding = TSEncoding.RLE;
private MeasurementSchema[] measurementSchemas;
private int measurementNum = 5;
private long ptNum = 10;
private boolean changeVersion = true;
private String deviceName = "root.MergeUpgrade.device0";
@Before
public void setUp() throws IOException, WriteProcessException {
prepareSeries();
prepareFiles();
}
@After
public void tearDown() {
removeFiles();
seqResources.clear();
unseqResources.clear();
}
@Test
public void testMergeUpgradeSelect() throws MergeException {
MergeResource resource = new MergeResource(seqResources, unseqResources);
MaxFileMergeFileSelector mergeFileSelector = new MaxFileMergeFileSelector(resource,
Long.MAX_VALUE);
List[] result = mergeFileSelector.select();
assertEquals(0, result.length);
}
private void prepareFiles() throws IOException, WriteProcessException {
// prepare seqFiles
for (int i = 0; i < seqFileNum; i++) {
File seqfile = FSFactoryProducer.getFSFactory().getFile(
"seq" + IoTDBConstant.TSFILE_NAME_SEPARATOR + i + IoTDBConstant.TSFILE_NAME_SEPARATOR
+ i + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0
+ ".tsfile");
TsFileResource seqTsFileResource = new TsFileResource(seqfile);
seqResources.add(seqTsFileResource);
prepareOldFile(seqTsFileResource, i * ptNum, ptNum, 0);
}
// prepare unseqFile
File unseqfile = FSFactoryProducer.getFSFactory().getFile(
"unseq" + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0 + IoTDBConstant.TSFILE_NAME_SEPARATOR
+ 0 + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0
+ ".tsfile");
TsFileResource unseqTsFileResource = new TsFileResource(unseqfile);
unseqResources.add(unseqTsFileResource);
prepareFile(unseqTsFileResource, 0, 2 * ptNum, 10);
}
private void prepareSeries() {
measurementSchemas = new MeasurementSchema[measurementNum];
for (int i = 0; i < measurementNum; i++) {
measurementSchemas[i] = new MeasurementSchema("sensor" + i, TSDataType.DOUBLE,
encoding, CompressionType.UNCOMPRESSED);
}
}
private void prepareOldFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
long valueOffset)
throws IOException, WriteProcessException {
TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());
prepareData(tsFileResource, fileWriter, timeOffset, ptNum, valueOffset);
fileWriter.close();
if (changeVersion) {
try (RandomAccessFile oldTsfile = new RandomAccessFile(tsFileResource.getFile(), "rw")) {
oldTsfile.seek(TSFileConfig.MAGIC_STRING.length());
oldTsfile.write(TSFileConfig.OLD_VERSION.getBytes());
}
changeVersion = false;
}
}
private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
long valueOffset) throws IOException, WriteProcessException {
TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());
prepareData(tsFileResource, fileWriter, timeOffset, ptNum, valueOffset);
fileWriter.close();
}
private void removeFiles() {
for (TsFileResource tsFileResource : seqResources) {
tsFileResource.remove();
}
for (TsFileResource tsFileResource : unseqResources) {
tsFileResource.remove();
}
}
private void prepareData(TsFileResource tsFileResource, TsFileWriter fileWriter, long timeOffset,
long ptNum, long valueOffset) throws WriteProcessException, IOException {
for (MeasurementSchema measurementSchema : measurementSchemas) {
fileWriter.addMeasurement(measurementSchema);
}
for (long i = timeOffset; i < timeOffset + ptNum; i++) {
TSRecord record = new TSRecord(i, deviceName);
for (int k = 0; k < measurementNum; k++) {
record.addTuple(DataPoint.getDataPoint(measurementSchemas[k].getType(),
measurementSchemas[k].getMeasurementId(), String.valueOf(i + valueOffset)));
}
fileWriter.write(record);
tsFileResource.updateStartTime(deviceName, i);
tsFileResource.updateEndTime(deviceName, i);
}
}
}

View File

@ -53,6 +53,8 @@ public class TSFileConfig {
public static final String CONFIG_FILE_NAME = "iotdb-engine.properties";
public static final String MAGIC_STRING = "TsFile";
public static final String VERSION_NUMBER = "000001";
public static final String OLD_MAGIC_STRING = "TsFilev0.8.0";
public static final String OLD_VERSION = "v0.8.0";
/**
* Bloom filter constrain

View File

@ -24,6 +24,7 @@ public class TsFileConstant {
public static final String TSFILE_HOME = "TSFILE_HOME";
public static final String TSFILE_CONF = "TSFILE_CONF";
public static final String PATH_ROOT = "root";
public static final String PATH_UPGRADE = "tmp";
public static final String PATH_SEPARATOR = ".";
public static final String PATH_SEPARATER_NO_REGEX = "\\.";
public static final String DEFAULT_DELTA_TYPE = "default_delta_type";

View File

@ -85,7 +85,8 @@ public class TsFileMetaData {
* @param inputStream input stream used to deserialize
* @return an instance of TsFileMetaData
*/
public static TsFileMetaData deserializeFrom(InputStream inputStream) throws IOException {
public static TsFileMetaData deserializeFrom(InputStream inputStream, boolean isOldVersion)
throws IOException {
TsFileMetaData fileMetaData = new TsFileMetaData();
int size = ReadWriteIOUtils.readInt(inputStream);
@ -113,11 +114,21 @@ public class TsFileMetaData {
}
}
if (isOldVersion) {
// skip the current version of file metadata
ReadWriteIOUtils.readInt(inputStream);
}
if (ReadWriteIOUtils.readIsNull(inputStream)) {
fileMetaData.createdBy = ReadWriteIOUtils.readString(inputStream);
}
fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(inputStream);
fileMetaData.invalidChunkNum = ReadWriteIOUtils.readInt(inputStream);
if (isOldVersion) {
fileMetaData.totalChunkNum = 0;
fileMetaData.invalidChunkNum = 0;
} else {
fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(inputStream);
fileMetaData.invalidChunkNum = ReadWriteIOUtils.readInt(inputStream);
}
// read bloom filter
if (!ReadWriteIOUtils.checkIfMagicString(inputStream)) {
byte[] bytes = ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream);
@ -135,7 +146,8 @@ public class TsFileMetaData {
* @param buffer -buffer use to deserialize
* @return -a instance of TsFileMetaData
*/
public static TsFileMetaData deserializeFrom(ByteBuffer buffer) throws IOException {
public static TsFileMetaData deserializeFrom(ByteBuffer buffer, boolean isOldVersion)
throws IOException {
TsFileMetaData fileMetaData = new TsFileMetaData();
int size = ReadWriteIOUtils.readInt(buffer);
@ -163,11 +175,21 @@ public class TsFileMetaData {
}
}
if (isOldVersion) {
// skip the current version of file metadata
ReadWriteIOUtils.readInt(buffer);
}
if (ReadWriteIOUtils.readIsNull(buffer)) {
fileMetaData.createdBy = ReadWriteIOUtils.readString(buffer);
}
fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer);
fileMetaData.invalidChunkNum = ReadWriteIOUtils.readInt(buffer);
if (isOldVersion) {
fileMetaData.totalChunkNum = 0;
fileMetaData.invalidChunkNum = 0;
} else {
fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer);
fileMetaData.invalidChunkNum = ReadWriteIOUtils.readInt(buffer);
}
// read bloom filter
if (buffer.hasRemaining()) {
byte[] bytes = ReadWriteIOUtils.readByteBufferWithSelfDescriptionLength(buffer).array();
@ -382,4 +404,20 @@ public class TsFileMetaData {
public List<MeasurementSchema> getMeasurementSchemaList() {
return new ArrayList<MeasurementSchema>(measurementSchema.values());
}
}
/**
* This function is just for upgrade.
*/
public void setDeviceIndexMap(
Map<String, TsDeviceMetadataIndex> deviceIndexMap) {
this.deviceIndexMap = deviceIndexMap;
}
/**
* This function is just for upgrade.
*/
public void setMeasurementSchema(
Map<String, MeasurementSchema> measurementSchema) {
this.measurementSchema = measurementSchema;
}
}

View File

@ -70,14 +70,16 @@ public class TsFileSequenceReader implements AutoCloseable {
private int totalChunkNum;
private TsFileMetaData tsFileMetaData;
private EndianType endianType = EndianType.BIG_ENDIAN;
private boolean isOldVersion = false;
private boolean cacheDeviceMetadata = false;
private Map<TsDeviceMetadataIndex, TsDeviceMetadata> deviceMetadataMap;
/**
* Create a file reader of the given file. The reader will read the tail of the file to get the
* file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length +
* TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real data.
* file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length
* + TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real
* data.
*
* @param file the data file
* @throws IOException If some I/O error occurs
@ -96,8 +98,9 @@ public class TsFileSequenceReader implements AutoCloseable {
this.file = file;
tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
// old version number of TsFile using little endian starts with "v"
this.endianType = this.readVersionNumber().startsWith("v")
this.endianType = this.readVersionNumber().startsWith("v")
? EndianType.LITTLE_ENDIAN : EndianType.BIG_ENDIAN;
this.isOldVersion = this.readVersionNumber().startsWith("v");
try {
if (loadMetadataSize) {
loadMetadataSize();
@ -119,8 +122,9 @@ public class TsFileSequenceReader implements AutoCloseable {
/**
* Create a file reader of the given file. The reader will read the tail of the file to get the
* file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length +
* TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real data.
* file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length
* + TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real
* data.
*
* @param input given input
*/
@ -164,13 +168,25 @@ public class TsFileSequenceReader implements AutoCloseable {
public void loadMetadataSize() throws IOException {
ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
tsFileInput.read(metadataSize,
tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
metadataSize.flip();
// read file metadata size and position
fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
fileMetadataPos =
tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES - fileMetadataSize;
if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
tsFileInput.read(metadataSize,
tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
metadataSize.flip();
// read file metadata size and position
fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
fileMetadataPos =
tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
- fileMetadataSize;
} else if (readTailMagic().equals(TSFileConfig.OLD_VERSION)) {
tsFileInput.read(metadataSize,
tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.getBytes().length - Integer.BYTES);
metadataSize.flip();
// read file metadata size and position
fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
fileMetadataPos =
tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.getBytes().length - Integer.BYTES
- fileMetadataSize;
}
}
public long getFileMetadataPos() {
@ -196,8 +212,9 @@ public class TsFileSequenceReader implements AutoCloseable {
* whether the file is a complete TsFile: only if the head magic and tail magic string exists.
*/
public boolean isComplete() throws IOException {
return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.getBytes().length * 2 + TSFileConfig.VERSION_NUMBER.getBytes().length &&
readTailMagic().equals(readHeadMagic());
return tsFileInput.size()
>= TSFileConfig.MAGIC_STRING.getBytes().length * 2 + TSFileConfig.VERSION_NUMBER.getBytes().length &&
(readTailMagic().equals(readHeadMagic()) || readTailMagic().equals(TSFileConfig.OLD_VERSION));
}
/**
@ -229,13 +246,13 @@ public class TsFileSequenceReader implements AutoCloseable {
* this function reads version number and checks compatibility of TsFile.
*/
public String readVersionNumber() throws IOException, NotCompatibleException {
ByteBuffer versionNumberBytes = ByteBuffer.allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
ByteBuffer versionNumberBytes = ByteBuffer
.allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
tsFileInput.read(versionNumberBytes, TSFileConfig.MAGIC_STRING.getBytes().length);
versionNumberBytes.flip();
String versionNumberString = new String(versionNumberBytes.array());
return versionNumberString;
return new String(versionNumberBytes.array());
}
public EndianType getEndianType() {
return this.endianType;
}
@ -245,11 +262,30 @@ public class TsFileSequenceReader implements AutoCloseable {
*/
public TsFileMetaData readFileMetadata() throws IOException {
if (tsFileMetaData == null) {
tsFileMetaData = TsFileMetaData.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
tsFileMetaData = TsFileMetaData
.deserializeFrom(readData(fileMetadataPos, fileMetadataSize), isOldVersion);
}
if (isOldVersion) {
tsFileMetaData.setTotalChunkNum(countTotalChunkNum());
}
return tsFileMetaData;
}
/**
* count total chunk num
*/
private int countTotalChunkNum() throws IOException {
int count = 0;
for (TsDeviceMetadataIndex deviceIndex : tsFileMetaData.getDeviceMap().values()) {
TsDeviceMetadata deviceMetadata = readTsDeviceMetaData(deviceIndex);
for (ChunkGroupMetaData chunkGroupMetaData : deviceMetadata
.getChunkGroupMetaDataList()) {
count += chunkGroupMetaData.getChunkMetaDataList().size();
}
}
return count;
}
/**
* @return get the position after the last chunk group in the file
*/
@ -262,7 +298,8 @@ public class TsFileSequenceReader implements AutoCloseable {
return data.get();
} else {
//no real data
return TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER.getBytes().length;
return TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length;
}
}
@ -552,16 +589,19 @@ public class TsFileSequenceReader implements AutoCloseable {
long endOffsetOfChunkGroup;
long versionOfChunkGroup = 0;
if (fileSize < TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER.getBytes().length) {
if (fileSize < TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
String magic = readHeadMagic(true);
tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER.getBytes().length);
tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length);
if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
if (fileSize == TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER.getBytes().length) {
if (fileSize == TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
.getBytes().length) {
return TsFileCheckStatus.ONLY_MAGIC_HEAD;
} else if (readTailMagic().equals(magic)) {
loadMetadataSize();
@ -719,7 +759,8 @@ public class TsFileSequenceReader implements AutoCloseable {
List<ChunkGroupMetaData> result = new ArrayList<>();
for (Map.Entry<String, TsDeviceMetadataIndex> entry : tsFileMetaData.getDeviceMap().entrySet()) {
for (Map.Entry<String, TsDeviceMetadataIndex> entry : tsFileMetaData.getDeviceMap()
.entrySet()) {
// read TsDeviceMetadata from file
TsDeviceMetadata tsDeviceMetadata = readTsDeviceMetaData(entry.getValue());
result.addAll(tsDeviceMetadata.getChunkGroupMetaDataList());

View File

@ -0,0 +1,539 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.tool.upgrade;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsDigest;
import org.apache.iotdb.tsfile.file.metadata.TsDigest.StatisticType;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TsfileUpgradeToolV0_8_0 implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TsfileUpgradeToolV0_8_0.class);
private TsFileInput tsFileInput;
private long fileMetadataPos;
private int fileMetadataSize;
private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
protected String file;
/**
* Create a file reader of the given file. The reader will read the tail of the file to get the
* file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
* bytes of the file for preparing reading real data.
*
* @param file the data file
* @throws IOException If some I/O error occurs
*/
public TsfileUpgradeToolV0_8_0(String file) throws IOException {
this(file, true);
}
/**
* construct function for TsfileUpgradeToolV0_8_0.
*
* @param file -given file name
* @param loadMetadataSize -load meta data size
*/
public TsfileUpgradeToolV0_8_0(String file, boolean loadMetadataSize) throws IOException {
this.file = file;
final Path path = Paths.get(file);
tsFileInput = new DefaultTsFileInput(path);
try {
if (loadMetadataSize) {
loadMetadataSize(false);
}
} catch (Throwable e) {
tsFileInput.close();
throw e;
}
}
/**
* @param sealedWithNewMagic true when an old version tsfile sealed with new version MAGIC_STRING
*/
public void loadMetadataSize(boolean sealedWithNewMagic) throws IOException {
ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
if (sealedWithNewMagic) {
tsFileInput.read(metadataSize,
tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
metadataSize.flip();
// read file metadata size and position
fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
fileMetadataPos =
tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
- fileMetadataSize;
} else {
tsFileInput.read(metadataSize,
tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.length() - Integer.BYTES);
metadataSize.flip();
// read file metadata size and position
fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
fileMetadataPos = tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.length() - Integer.BYTES
- fileMetadataSize;
}
// skip the magic header
position(TSFileConfig.OLD_MAGIC_STRING.length());
}
public String readTailMagic() throws IOException {
long totalSize = tsFileInput.size();
ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.OLD_MAGIC_STRING.length());
tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.OLD_MAGIC_STRING.length());
magicStringBytes.flip();
return new String(magicStringBytes.array());
}
/**
* whether the file is a complete TsFile: only if the head magic and tail magic string exists.
*/
public boolean isComplete() throws IOException {
return tsFileInput.size() >= TSFileConfig.OLD_MAGIC_STRING.length() * 2 && readTailMagic()
.equals(readHeadMagic());
}
/**
* this function does not modify the position of the file reader.
*/
public String readHeadMagic() throws IOException {
return readHeadMagic(false);
}
/**
*
* @param movePosition whether move the position of the file reader after reading the magic header
* to the end of the magic head string.
*/
public String readHeadMagic(boolean movePosition) throws IOException {
ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.OLD_MAGIC_STRING.length());
if (movePosition) {
tsFileInput.position(0);
tsFileInput.read(magicStringBytes);
} else {
tsFileInput.read(magicStringBytes, 0);
}
magicStringBytes.flip();
return new String(magicStringBytes.array());
}
/**
* this function does not modify the position of the file reader.
*/
public TsFileMetaData readFileMetadata() throws IOException {
ByteBuffer buffer = readData(fileMetadataPos, fileMetadataSize);
TsFileMetaData fileMetaData = new TsFileMetaData();
int size = ReadWriteIOUtils.readInt(buffer);
if (size > 0) {
Map<String, TsDeviceMetadataIndex> deviceMap = new HashMap<>();
String key;
TsDeviceMetadataIndex value;
for (int i = 0; i < size; i++) {
key = ReadWriteIOUtils.readString(buffer);
value = TsDeviceMetadataIndex.deserializeFrom(buffer);
deviceMap.put(key, value);
}
fileMetaData.setDeviceIndexMap(deviceMap);
}
size = ReadWriteIOUtils.readInt(buffer);
if (size > 0) {
fileMetaData.setMeasurementSchema(new HashMap<>());
String key;
MeasurementSchema value;
for (int i = 0; i < size; i++) {
key = ReadWriteIOUtils.readString(buffer);
value = MeasurementSchema.deserializeFrom(buffer);
fileMetaData.getMeasurementSchema().put(key, value);
}
}
// skip the current version of file metadata
ReadWriteIOUtils.readInt(buffer);
if (ReadWriteIOUtils.readIsNull(buffer)) {
fileMetaData.setCreatedBy(ReadWriteIOUtils.readString(buffer));
}
return fileMetaData;
}
/**
* this function does not modify the position of the file reader.
*/
public TsDeviceMetadata readTsDeviceMetaData(TsDeviceMetadataIndex index) throws IOException {
return TsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
}
/**
* read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
* This method is not threadsafe.
*
* @return a CHUNK_GROUP_FOOTER
* @throws IOException io error
*/
public ChunkGroupFooter readChunkGroupFooter() throws IOException {
return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
}
/**
* read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
* method is not threadsafe.
*
* @return a CHUNK_HEADER
* @throws IOException io error
*/
public ChunkHeader readChunkHeader() throws IOException {
return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
}
/**
* not thread safe.
*
* @param type given tsfile data type
*/
public PageHeader readPageHeader(TSDataType type) throws IOException {
return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
}
public long position() throws IOException {
return tsFileInput.position();
}
public void position(long offset) throws IOException {
tsFileInput.position(offset);
}
/**
* read one byte from the input. <br> this method is not thread safe
*/
public byte readMarker() throws IOException {
markerBuffer.clear();
if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
throw new IOException("reach the end of the file.");
}
markerBuffer.flip();
return markerBuffer.get();
}
public void close() throws IOException {
this.tsFileInput.close();
}
public String getFileName() {
return this.file;
}
/**
* read data from tsFileInput, from the current position (if position = -1), or the given
* position. <br> if position = -1, the tsFileInput's position will be changed to the current
* position + real data size that been read. Other wise, the tsFileInput's position is not
* changed.
*
* @param position the start position of data in the tsFileInput, or the current position if
* position = -1
* @param size the size of data that want to read
* @return data that been read.
*/
private ByteBuffer readData(long position, int size) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(size);
if (position == -1) {
if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
throw new IOException("reach the end of the data");
}
} else {
if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
throw new IOException("reach the end of the data");
}
}
buffer.flip();
return buffer;
}
/**
* upgrade file and return the boolean value whether upgrade task completes
*/
public boolean upgradeFile(String updateFileName) throws IOException {
File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
long fileSize;
if (!checkFile.exists()) {
logger.error("the file to be updated does not exist, file path: {}", checkFile.getPath());
return false;
} else {
fileSize = checkFile.length();
}
File upgradeFile = FSFactoryProducer.getFSFactory().getFile(updateFileName);
if (!upgradeFile.getParentFile().exists()) {
upgradeFile.getParentFile().mkdirs();
}
upgradeFile.createNewFile();
TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(upgradeFile);
List<ChunkHeader> chunkHeaders = new ArrayList<>();
List<List<PageHeader>> pageHeadersList = new ArrayList<>();
List<List<ByteBuffer>> pagesList = new ArrayList<>();
Schema schema = null;
String magic = readHeadMagic(true);
if (!magic.equals(TSFileConfig.OLD_MAGIC_STRING)) {
logger.error("the file's MAGIC STRING is incorrect, file path: {}", checkFile.getPath());
return false;
}
if (fileSize == TSFileConfig.OLD_MAGIC_STRING.length()) {
logger.error("the file only contains magic string, file path: {}", checkFile.getPath());
return false;
} else if (readTailMagic().equals(TSFileConfig.OLD_MAGIC_STRING)) {
loadMetadataSize(false);
TsFileMetaData tsFileMetaData = readFileMetadata();
schema = new Schema(tsFileMetaData.getMeasurementSchema());
} else {
loadMetadataSize(true);
TsFileMetaData tsFileMetaData = readFileMetadata();
schema = new Schema(tsFileMetaData.getMeasurementSchema());
}
long startTimeOfChunk = 0;
long endTimeOfChunk = 0;
long numOfPoints = 0;
ChunkMetaData currentChunkMetaData;
List<ChunkMetaData> chunkMetaDataList = null;
long startOffsetOfChunkGroup = 0;
boolean newChunkGroup = true;
long versionOfChunkGroup = 0;
List<ChunkGroupMetaData> newMetaData = new ArrayList<>();
List<Statistics<?>> chunkStatisticsList = new ArrayList<>();
boolean goon = true;
byte marker;
try {
while (goon && (marker = this.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
// this is the first chunk of a new ChunkGroup.
if (newChunkGroup) {
newChunkGroup = false;
chunkMetaDataList = new ArrayList<>();
startOffsetOfChunkGroup = this.position() - 1;
}
long fileOffsetOfChunk = this.position() - 1;
ChunkHeader header = this.readChunkHeader();
chunkHeaders.add(header);
List<PageHeader> pageHeaders = new ArrayList<>();
List<ByteBuffer> pages = new ArrayList<>();
TSDataType dataType = header.getDataType();
Statistics<?> chunkStatistics = Statistics.getStatsByType(dataType);
chunkStatisticsList.add(chunkStatistics);
if (header.getNumOfPages() > 0) {
PageHeader pageHeader = this.readPageHeader(header.getDataType());
pageHeaders.add(pageHeader);
numOfPoints += pageHeader.getNumOfValues();
startTimeOfChunk = pageHeader.getMinTimestamp();
endTimeOfChunk = pageHeader.getMaxTimestamp();
chunkStatistics.mergeStatistics(pageHeader.getStatistics());
pages.add(readData(-1, pageHeader.getCompressedSize()));
}
for (int j = 1; j < header.getNumOfPages() - 1; j++) {
PageHeader pageHeader = this.readPageHeader(header.getDataType());
pageHeaders.add(pageHeader);
numOfPoints += pageHeader.getNumOfValues();
chunkStatistics.mergeStatistics(pageHeader.getStatistics());
pages.add(readData(-1, pageHeader.getCompressedSize()));
}
if (header.getNumOfPages() > 1) {
PageHeader pageHeader = this.readPageHeader(header.getDataType());
pageHeaders.add(pageHeader);
numOfPoints += pageHeader.getNumOfValues();
endTimeOfChunk = pageHeader.getMaxTimestamp();
chunkStatistics.mergeStatistics(pageHeader.getStatistics());
pages.add(readData(-1, pageHeader.getCompressedSize()));
}
currentChunkMetaData = new ChunkMetaData(header.getMeasurementID(), dataType,
fileOffsetOfChunk,
startTimeOfChunk, endTimeOfChunk);
currentChunkMetaData.setNumOfPoints(numOfPoints);
ByteBuffer[] statisticsArray = new ByteBuffer[StatisticType.getTotalTypeNum()];
statisticsArray[StatisticType.min_value.ordinal()] = ByteBuffer
.wrap(chunkStatistics.getMinBytes());
statisticsArray[StatisticType.max_value.ordinal()] = ByteBuffer
.wrap(chunkStatistics.getMaxBytes());
statisticsArray[StatisticType.first_value.ordinal()] = ByteBuffer
.wrap(chunkStatistics.getFirstBytes());
statisticsArray[StatisticType.last_value.ordinal()] = ByteBuffer
.wrap(chunkStatistics.getLastBytes());
statisticsArray[StatisticType.sum_value.ordinal()] = ByteBuffer
.wrap(chunkStatistics.getSumBytes());
TsDigest tsDigest = new TsDigest();
tsDigest.setStatistics(statisticsArray);
currentChunkMetaData.setDigest(tsDigest);
chunkMetaDataList.add(currentChunkMetaData);
numOfPoints = 0;
pageHeadersList.add(pageHeaders);
pagesList.add(pages);
break;
case MetaMarker.CHUNK_GROUP_FOOTER:
ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
String deviceID = chunkGroupFooter.getDeviceID();
long endOffsetOfChunkGroup = this.position();
ChunkGroupMetaData currentChunkGroup = new ChunkGroupMetaData(deviceID,
chunkMetaDataList,
startOffsetOfChunkGroup);
currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
currentChunkGroup.setVersion(versionOfChunkGroup++);
newMetaData.add(currentChunkGroup);
tsFileIOWriter.startChunkGroup(deviceID);
for (int i = 0; i < chunkHeaders.size(); i++) {
TSDataType tsDataType = chunkHeaders.get(i).getDataType();
TSEncoding encodingType = chunkHeaders.get(i).getEncodingType();
ChunkHeader chunkHeader = chunkHeaders.get(i);
List<PageHeader> pageHeaderList = pageHeadersList.get(i);
List<ByteBuffer> pageList = pagesList.get(i);
if (schema.getMeasurementSchema(chunkHeader.getMeasurementID()) != null) {
ChunkBuffer chunkBuffer = new ChunkBuffer(
schema.getMeasurementSchema(chunkHeader.getMeasurementID()));
for (int j = 0; j < pageHeaderList.size(); j++) {
if (encodingType.equals(TSEncoding.PLAIN)) {
pageList.set(j, rewrite(pageList.get(j), tsDataType));
}
chunkBuffer
.writePageHeaderAndDataIntoBuff(pageList.get(j), pageHeaderList.get(j));
}
chunkBuffer
.writeAllPagesOfSeriesToTsFile(tsFileIOWriter, chunkStatisticsList.get(i));
}
}
tsFileIOWriter.endChunkGroup(currentChunkGroup.getVersion());
chunkStatisticsList.clear();
chunkHeaders.clear();
pageHeadersList.clear();
pagesList.clear();
newChunkGroup = true;
break;
default:
// the disk file is corrupted, using this file may be dangerous
logger.error("Unrecognized marker detected, this file may be corrupted");
return false;
}
}
tsFileIOWriter.endFile(schema);
return true;
} catch (IOException | PageException e2) {
logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+ "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
return false;
} finally {
if (tsFileInput != null) {
tsFileInput.close();
}
if (tsFileIOWriter != null) {
tsFileIOWriter.close();
}
}
}
static ByteBuffer rewrite(ByteBuffer page, TSDataType tsDataType) {
ByteBuffer modifiedPage = ByteBuffer.allocate(page.capacity());
int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(page);
ByteBuffer timeBuffer = page.slice();
ByteBuffer valueBuffer = page.slice();
timeBuffer.limit(timeBufferLength);
valueBuffer.position(timeBufferLength);
valueBuffer.order(ByteOrder.LITTLE_ENDIAN);
modifiedPage.put(page.get(0));
modifiedPage.put(timeBuffer);
modifiedPage.order(ByteOrder.BIG_ENDIAN);
switch (tsDataType) {
case BOOLEAN:
modifiedPage.put(valueBuffer);
break;
case INT32:
while (valueBuffer.remaining() > 0) {
modifiedPage.putInt(valueBuffer.getInt());
}
break;
case INT64:
while (valueBuffer.remaining() > 0) {
modifiedPage.putLong(valueBuffer.getLong());
}
break;
case FLOAT:
while (valueBuffer.remaining() > 0) {
modifiedPage.putFloat(valueBuffer.getFloat());
}
break;
case DOUBLE:
while (valueBuffer.remaining() > 0) {
modifiedPage.putDouble(valueBuffer.getDouble());
}
break;
case TEXT:
while (valueBuffer.remaining() > 0) {
int length = valueBuffer.getInt();
byte[] buf = new byte[length];
valueBuffer.get(buf, 0, buf.length);
modifiedPage.putInt(length);
modifiedPage.put(buf);
}
break;
}
modifiedPage.clear();
return modifiedPage;
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.tool.upgrade;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
public class UpgradeTool {
/**
* upgrade all tsfiles in the specific dir
*
* @param dir tsfile dir which needs to be upgraded
* @param upgradeDir tsfile dir after upgraded
* @param threadNum num of threads that perform offline upgrade tasks
*/
public static void upgradeTsfiles(String dir, String upgradeDir, int threadNum)
throws IOException {
//Traverse to find all tsfiles
File file = FSFactoryProducer.getFSFactory().getFile(dir);
Queue<File> tmp = new LinkedList<>();
tmp.add(file);
List<String> tsfiles = new ArrayList<>();
if (file.exists()) {
while (!tmp.isEmpty()) {
File tmp_file = tmp.poll();
File[] files = tmp_file.listFiles();
for (File file2 : files) {
if (file2.isDirectory()) {
tmp.add(file2);
} else {
if (file2.getName().endsWith(".tsfile")) {
tsfiles.add(file2.getAbsolutePath());
}
// copy all the resource files to the upgradeDir
if (file2.getName().endsWith(".resource")) {
File newFileName = FSFactoryProducer.getFSFactory()
.getFile(file2.getAbsoluteFile().toString().replace(dir, upgradeDir));
if (!newFileName.getParentFile().exists()) {
newFileName.getParentFile().mkdirs();
}
newFileName.createNewFile();
FileUtils.copyFile(file2, newFileName);
}
}
}
}
}
// begin upgrade tsfiles
System.out.println(String.format(
"begin upgrade the data dir:%s, the total num of the tsfiles that need to be upgraded:%s",
dir, tsfiles.size()));
AtomicInteger dirUpgradeFileNum = new AtomicInteger(tsfiles.size());
ExecutorService offlineUpgradeThreadPool = Executors.newFixedThreadPool(threadNum);
//for every tsfiledo upgrade operation
for (String tsfile : tsfiles) {
offlineUpgradeThreadPool.submit(() -> {
try {
upgradeOneTsfile(tsfile, tsfile.replace(dir, upgradeDir));
System.out.println(
String.format("upgrade file success, file name:%s, remaining file num:%s", tsfile,
dirUpgradeFileNum.decrementAndGet()));
} catch (Exception e) {
System.out.println(String.format("meet error when upgrade file:%s", tsfile));
e.printStackTrace();
}
});
}
offlineUpgradeThreadPool.shutdown();
}
/**
* upgrade a single tsfile
*
* @param tsfileName old version tsfile's absolute path
* @param updateFileName new version tsfile's absolute path
*/
public static void upgradeOneTsfile(String tsfileName, String updateFileName) throws IOException {
TsfileUpgradeToolV0_8_0 updater = new TsfileUpgradeToolV0_8_0(tsfileName);
updater.upgradeFile(updateFileName);
}
}

View File

@ -150,6 +150,46 @@ public class ChunkBuffer {
return headerSize + uncompressedSize;
}
/**
* write the page header and data into the PageWriter's output stream.
*/
public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
throws PageException {
numOfPages++;
// 1. update time statistics
if (this.minTimestamp == Long.MIN_VALUE) {
this.minTimestamp = header.getMinTimestamp();
}
if (this.minTimestamp == Long.MIN_VALUE) {
throw new PageException("No valid data point in this page");
}
this.maxTimestamp = header.getMaxTimestamp();
// write the page header to pageBuffer
try {
LOG.debug("start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
header.serializeTo(pageBuffer);
LOG.debug("finish to flush a page header {} of {} into buffer, buffer position {} ", header,
schema.getMeasurementId(), pageBuffer.size());
} catch (IOException e) {
resetTimeStamp();
throw new PageException(
"IO Exception in writeDataPageHeader,ignore this page", e);
}
// update data point num
this.totalValueCount += header.getNumOfValues();
// write page content to temp PBAOS
try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
channel.write(data);
} catch (IOException e) {
throw new PageException(e);
}
}
private void resetTimeStamp() {
if (totalValueCount == 0) {
minTimestamp = Long.MIN_VALUE;
@ -186,8 +226,9 @@ public class ChunkBuffer {
long dataSize = writer.getPos() - dataOffset;
if (dataSize != pageBuffer.size()) {
throw new IOException("Bytes written is inconsistent with the size of data: " + dataSize +" !="
+ " " + pageBuffer.size());
throw new IOException(
"Bytes written is inconsistent with the size of data: " + dataSize + " !="
+ " " + pageBuffer.size());
}
writer.endChunk(totalValueCount);

View File

@ -60,7 +60,7 @@ public class TsFileMetaDataTest {
TsFileMetaData metaData = null;
try {
fileInputStream = new FileInputStream(new File(PATH));
metaData = TsFileMetaData.deserializeFrom(fileInputStream);
metaData = TsFileMetaData.deserializeFrom(fileInputStream, false);
return metaData;
} catch (IOException e) {
e.printStackTrace();