Seek reworking.

This commit is contained in:
Pavel Kovalenko 2020-01-28 15:48:01 +03:00 committed by Pavel Kovalenko
parent 861ddbcadd
commit d4707ca911
7 changed files with 101 additions and 9 deletions

View File

@ -92,6 +92,41 @@ TYPED_TEST(DiskTest, writeFile)
}
TYPED_TEST(DiskTest, readFile)
{
const auto & disk = this->getDisk();
{
std::unique_ptr<DB::WriteBuffer> out = disk->writeFile("test_file");
writeString("test data", *out);
}
// Test SEEK_SET
{
DB::String data;
std::unique_ptr<DB::SeekableReadBuffer> in = disk->readFile("test_file");
in->seek(5, SEEK_SET);
readString(data, *in);
EXPECT_EQ("data", data);
}
// Test SEEK_CUR
{
std::unique_ptr<DB::SeekableReadBuffer> in = disk->readFile("test_file");
String buf(4, '0');
in->readStrict(buf.data(), 4);
EXPECT_EQ("test", buf);
// Skip whitespace
in->seek(1, SEEK_CUR);
in->readStrict(buf.data(), 4);
EXPECT_EQ("data", buf);
}
}
TYPED_TEST(DiskTest, iterateDirectory)
{
const auto & disk = this->getDisk();

View File

@ -0,0 +1,45 @@
#include "ReadBufferFromMemory.h"
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
off_t ReadBufferFromMemory::seek(off_t offset, int whence)
{
if (whence == SEEK_SET)
{
if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end())
{
pos = working_buffer.begin() + offset;
return size_t(pos - working_buffer.begin());
}
else
throw Exception("Seek position is out of bounds. "
"Offset: " + std::to_string(offset) +
", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else if (whence == SEEK_CUR)
{
Position new_pos = pos + offset;
if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end())
{
pos = new_pos;
return size_t(pos - working_buffer.begin());
}
else
throw Exception("Seek position is out of bounds. "
"Offset: " + std::to_string(offset) +
", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <IO/ReadBuffer.h>
#include "SeekableReadBuffer.h"
@ -23,7 +22,7 @@ public:
ReadBufferFromMemory(const signed char * buf, size_t size)
: SeekableReadBuffer(const_cast<char *>(reinterpret_cast<const char *>(buf)), size, 0) {}
off_t seek(off_t, int) override { return 0; }
off_t seek(off_t off, int whence) override;
};
}

View File

@ -17,6 +17,8 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
@ -49,10 +51,21 @@ bool ReadBufferFromS3::nextImpl()
return true;
}
off_t ReadBufferFromS3::seek(off_t offset_, int)
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (!initialized && offset_)
offset = offset_;
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.",
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.",
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
offset = offset_;
return offset;
}

View File

@ -50,7 +50,7 @@ TEST(ReadBufferAIOTest, TestReadAfterAIO)
EXPECT_TRUE(testbuf.eof());
testbuf.seek(data.length() - 100);
testbuf.seek(data.length() - 100, SEEK_SET);
std::string smalldata;
smalldata.resize(100);
@ -59,7 +59,7 @@ TEST(ReadBufferAIOTest, TestReadAfterAIO)
EXPECT_TRUE(testbuf.eof());
testbuf.seek(0);
testbuf.seek(0, SEEK_SET);
std::string repeatdata;
repeatdata.resize(data.length());
size_t read_after_eof_big = testbuf.read(repeatdata.data(), repeatdata.size());

View File

@ -30,7 +30,7 @@ try
context.setPath("./");
DiskPtr disk = std::make_unique<DiskLocal>("default", "./", 0);
StoragePtr table = StorageLog::create(disk, "./", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
StoragePtr table = StorageLog::create(disk, "table", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
table->startup();

View File

@ -65,7 +65,7 @@ void checkCompressedHeaders(const std::string & mrk_path, const std::string & bi
out << "Mark " << mark_num << ", points to " << offset_in_compressed_file << ", " << offset_in_decompressed_block << ". ";
bin_in.seek(offset_in_compressed_file);
bin_in.seek(offset_in_compressed_file, SEEK_SET);
auto sizes = stat(bin_in, out);
out << "Block sizes: " << sizes.first << ", " << sizes.second << '\n' << DB::flush;