use raw ZSTD API

This commit is contained in:
Hui Liu 2022-10-05 17:20:07 -07:00
parent 06d9ebd620
commit ec76f89d97
2 changed files with 24 additions and 87 deletions

View File

@ -25,24 +25,17 @@
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include <boost/iostreams/copy.hpp>
#ifdef ZLIB_LIB_SUPPORTED
#include <boost/iostreams/filter/gzip.hpp>
#endif
#include <boost/iostreams/filtering_streambuf.hpp>
#ifdef ZSTD_LIB_SUPPORTED
#include <boost/iostreams/filter/zstd.hpp>
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
static constexpr int ZSTD_COMPRESSION_LEVEL_1 = 1;
#endif
#include <sstream>
namespace {
std::unordered_set<CompressionFilter> getSupportedFilters() {
std::unordered_set<CompressionFilter> filters;
filters.insert(CompressionFilter::NONE);
#ifdef ZLIB_LIB_SUPPORTED
filters.insert(CompressionFilter::GZIP);
#endif
#ifdef ZSTD_LIB_SUPPORTED
filters.insert(CompressionFilter::ZSTD);
#endif
@ -59,16 +52,9 @@ StringRef CompressionUtils::compress(const CompressionFilter filter, const Strin
if (filter == CompressionFilter::NONE) {
return StringRef(arena, data);
}
namespace bio = boost::iostreams;
#ifdef ZLIB_LIB_SUPPORTED
if (filter == CompressionFilter::GZIP) {
return CompressionUtils::compress(filter, data, bio::gzip::default_compression, arena);
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
return CompressionUtils::compress(filter, data, bio::zstd::default_compression, arena);
return CompressionUtils::compress(filter, data, ZSTD_COMPRESSION_LEVEL_1, arena);
}
#endif
@ -81,27 +67,19 @@ StringRef CompressionUtils::compress(const CompressionFilter filter, const Strin
if (filter == CompressionFilter::NONE) {
return StringRef(arena, data);
}
namespace bio = boost::iostreams;
std::stringstream compStream;
std::stringstream decomStream(data.toString());
bio::filtering_streambuf<bio::input> out;
#ifdef ZLIB_LIB_SUPPORTED
if (filter == CompressionFilter::GZIP) {
out.push(bio::gzip_compressor(bio::gzip_params(level)));
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
out.push(bio::zstd_compressor(bio::zstd_params(level)));
const char* src = reinterpret_cast<const char*>(data.begin());
size_t destSize = ZSTD_compressBound(data.size());
std::unique_ptr<uint8_t[]> dest = std::make_unique<uint8_t[]>(destSize);
size_t bytes = ZSTD_compress(dest.get(), destSize, src, data.size(), level);
if (ZSTD_isError(bytes)) {
throw internal_error();
}
return StringRef(arena, StringRef(dest.get(), bytes));
}
#endif
out.push(decomStream);
bio::copy(out, compStream);
return StringRef(arena, compStream.str());
throw internal_error(); // We should never get here
}
StringRef CompressionUtils::decompress(const CompressionFilter filter, const StringRef& data, Arena& arena) {
@ -110,27 +88,19 @@ StringRef CompressionUtils::decompress(const CompressionFilter filter, const Str
if (filter == CompressionFilter::NONE) {
return StringRef(arena, data);
}
namespace bio = boost::iostreams;
std::stringstream compStream(data.toString());
std::stringstream decompStream;
bio::filtering_streambuf<bio::input> out;
#ifdef ZLIB_LIB_SUPPORTED
if (filter == CompressionFilter::GZIP) {
out.push(bio::gzip_decompressor());
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
out.push(bio::zstd_decompressor());
const char* src = reinterpret_cast<const char*>(data.begin());
size_t destSize = ZSTD_decompressBound(src, data.size());
std::unique_ptr<uint8_t[]> dest = std::make_unique<uint8_t[]>(destSize);
size_t bytes = ZSTD_decompress(dest.get(), destSize, src, data.size());
if (ZSTD_isError(bytes)) {
throw internal_error();
}
return StringRef(arena, StringRef(dest.get(), bytes));
}
#endif
out.push(compStream);
bio::copy(out, decompStream);
return StringRef(arena, decompStream.str());
throw internal_error(); // We should never get here
}
int CompressionUtils::getDefaultCompressionLevel(CompressionFilter filter) {
@ -140,22 +110,11 @@ int CompressionUtils::getDefaultCompressionLevel(CompressionFilter filter) {
return -1;
}
#ifdef ZLIB_LIB_SUPPORTED
if (filter == CompressionFilter::GZIP) {
// opt for high speed compression, larger levels have a high cpu cost and not much compression ratio
// improvement, according to benchmarks
// return boost::iostream::gzip::default_compression;
// return boost::iostream::gzip::best_compression;
return boost::iostreams::gzip::best_speed;
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
// opt for high speed compression, larger levels have a high cpu cost and not much compression ratio
// optimize for high speed compression, larger levels have a high cpu cost and not much compression ratio
// improvement, according to benchmarks
// return boost::iostreams::zstd::default_compression;
// return boost::iostreams::zstd::best_compression;
return boost::iostreams::zstd::best_speed;
return ZSTD_COMPRESSION_LEVEL_1;
}
#endif
@ -233,23 +192,6 @@ TEST_CASE("/CompressionUtils/noCompression") {
return Void();
}
#ifdef ZLIB_LIB_SUPPORTED
TEST_CASE("/CompressionUtils/gzipCompression") {
testCompression(CompressionFilter::GZIP);
TraceEvent("GzipCompressionDone");
return Void();
}
TEST_CASE("/CompressionUtils/gzipCompression2") {
testCompression2(CompressionFilter::GZIP);
TraceEvent("GzipCompression2Done");
return Void();
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
TEST_CASE("/CompressionUtils/zstdCompression") {
testCompression(CompressionFilter::ZSTD);

View File

@ -28,7 +28,6 @@
enum class CompressionFilter {
NONE,
GZIP,
ZSTD,
LAST // Always the last member
};
@ -44,8 +43,6 @@ struct CompressionUtils {
static CompressionFilter fromFilterString(const std::string& filter) {
if (filter == "NONE") {
return CompressionFilter::NONE;
} else if (filter == "GZIP") {
return CompressionFilter::GZIP;
} else if (filter == "ZSTD") {
return CompressionFilter::ZSTD;
} else {
@ -56,8 +53,6 @@ struct CompressionUtils {
static std::string toString(const CompressionFilter filter) {
if (filter == CompressionFilter::NONE) {
return "NONE";
} else if (filter == CompressionFilter::GZIP) {
return "GZP";
} else if (filter == CompressionFilter::ZSTD) {
return "ZSTD";
} else {