From ec76f89d97649f381d269fe2341b457b59e7469e Mon Sep 17 00:00:00 2001 From: Hui Liu Date: Wed, 5 Oct 2022 17:20:07 -0700 Subject: [PATCH] use raw ZSTD API --- flow/CompressionUtils.cpp | 106 ++++++--------------------- flow/include/flow/CompressionUtils.h | 5 -- 2 files changed, 24 insertions(+), 87 deletions(-) diff --git a/flow/CompressionUtils.cpp b/flow/CompressionUtils.cpp index 133fb775a4..554effc34d 100644 --- a/flow/CompressionUtils.cpp +++ b/flow/CompressionUtils.cpp @@ -25,24 +25,17 @@ #include "flow/IRandom.h" #include "flow/UnitTest.h" -#include -#ifdef ZLIB_LIB_SUPPORTED -#include -#endif -#include #ifdef ZSTD_LIB_SUPPORTED -#include +#define ZSTD_STATIC_LINKING_ONLY +#include +static constexpr int ZSTD_COMPRESSION_LEVEL_1 = 1; #endif -#include namespace { std::unordered_set getSupportedFilters() { std::unordered_set filters; filters.insert(CompressionFilter::NONE); -#ifdef ZLIB_LIB_SUPPORTED - filters.insert(CompressionFilter::GZIP); -#endif #ifdef ZSTD_LIB_SUPPORTED filters.insert(CompressionFilter::ZSTD); #endif @@ -59,16 +52,9 @@ StringRef CompressionUtils::compress(const CompressionFilter filter, const Strin if (filter == CompressionFilter::NONE) { return StringRef(arena, data); } - - namespace bio = boost::iostreams; -#ifdef ZLIB_LIB_SUPPORTED - if (filter == CompressionFilter::GZIP) { - return CompressionUtils::compress(filter, data, bio::gzip::default_compression, arena); - } -#endif #ifdef ZSTD_LIB_SUPPORTED if (filter == CompressionFilter::ZSTD) { - return CompressionUtils::compress(filter, data, bio::zstd::default_compression, arena); + return CompressionUtils::compress(filter, data, ZSTD_COMPRESSION_LEVEL_1, arena); } #endif @@ -81,27 +67,19 @@ StringRef CompressionUtils::compress(const CompressionFilter filter, const Strin if (filter == CompressionFilter::NONE) { return StringRef(arena, data); } - - namespace bio = boost::iostreams; - std::stringstream compStream; - std::stringstream decomStream(data.toString()); - - bio::filtering_streambuf out; -#ifdef ZLIB_LIB_SUPPORTED - if (filter == CompressionFilter::GZIP) { - out.push(bio::gzip_compressor(bio::gzip_params(level))); - } -#endif #ifdef ZSTD_LIB_SUPPORTED if (filter == CompressionFilter::ZSTD) { - out.push(bio::zstd_compressor(bio::zstd_params(level))); + const char* src = reinterpret_cast(data.begin()); + size_t destSize = ZSTD_compressBound(data.size()); + std::unique_ptr dest = std::make_unique(destSize); + size_t bytes = ZSTD_compress(dest.get(), destSize, src, data.size(), level); + if (ZSTD_isError(bytes)) { + throw internal_error(); + } + return StringRef(arena, StringRef(dest.get(), bytes)); } #endif - - out.push(decomStream); - bio::copy(out, compStream); - - return StringRef(arena, compStream.str()); + throw internal_error(); // We should never get here } StringRef CompressionUtils::decompress(const CompressionFilter filter, const StringRef& data, Arena& arena) { @@ -110,27 +88,19 @@ StringRef CompressionUtils::decompress(const CompressionFilter filter, const Str if (filter == CompressionFilter::NONE) { return StringRef(arena, data); } - - namespace bio = boost::iostreams; - std::stringstream compStream(data.toString()); - std::stringstream decompStream; - - bio::filtering_streambuf out; -#ifdef ZLIB_LIB_SUPPORTED - if (filter == CompressionFilter::GZIP) { - out.push(bio::gzip_decompressor()); - } -#endif #ifdef ZSTD_LIB_SUPPORTED if (filter == CompressionFilter::ZSTD) { - out.push(bio::zstd_decompressor()); + const char* src = reinterpret_cast(data.begin()); + size_t destSize = ZSTD_decompressBound(src, data.size()); + std::unique_ptr dest = std::make_unique(destSize); + size_t bytes = ZSTD_decompress(dest.get(), destSize, src, data.size()); + if (ZSTD_isError(bytes)) { + throw internal_error(); + } + return StringRef(arena, StringRef(dest.get(), bytes)); } #endif - - out.push(compStream); - bio::copy(out, decompStream); - - return StringRef(arena, decompStream.str()); + throw internal_error(); // We should never get here } int CompressionUtils::getDefaultCompressionLevel(CompressionFilter filter) { @@ -140,22 +110,11 @@ int CompressionUtils::getDefaultCompressionLevel(CompressionFilter filter) { return -1; } -#ifdef ZLIB_LIB_SUPPORTED - if (filter == CompressionFilter::GZIP) { - // opt for high speed compression, larger levels have a high cpu cost and not much compression ratio - // improvement, according to benchmarks - // return boost::iostream::gzip::default_compression; - // return boost::iostream::gzip::best_compression; - return boost::iostreams::gzip::best_speed; - } -#endif #ifdef ZSTD_LIB_SUPPORTED if (filter == CompressionFilter::ZSTD) { - // opt for high speed compression, larger levels have a high cpu cost and not much compression ratio + // optimize for high speed compression, larger levels have a high cpu cost and not much compression ratio // improvement, according to benchmarks - // return boost::iostreams::zstd::default_compression; - // return boost::iostreams::zstd::best_compression; - return boost::iostreams::zstd::best_speed; + return ZSTD_COMPRESSION_LEVEL_1; } #endif @@ -233,23 +192,6 @@ TEST_CASE("/CompressionUtils/noCompression") { return Void(); } - -#ifdef ZLIB_LIB_SUPPORTED -TEST_CASE("/CompressionUtils/gzipCompression") { - testCompression(CompressionFilter::GZIP); - TraceEvent("GzipCompressionDone"); - - return Void(); -} - -TEST_CASE("/CompressionUtils/gzipCompression2") { - testCompression2(CompressionFilter::GZIP); - TraceEvent("GzipCompression2Done"); - - return Void(); -} -#endif - #ifdef ZSTD_LIB_SUPPORTED TEST_CASE("/CompressionUtils/zstdCompression") { testCompression(CompressionFilter::ZSTD); diff --git a/flow/include/flow/CompressionUtils.h b/flow/include/flow/CompressionUtils.h index d7b3c5c047..807d45c60d 100644 --- a/flow/include/flow/CompressionUtils.h +++ b/flow/include/flow/CompressionUtils.h @@ -28,7 +28,6 @@ enum class CompressionFilter { NONE, - GZIP, ZSTD, LAST // Always the last member }; @@ -44,8 +43,6 @@ struct CompressionUtils { static CompressionFilter fromFilterString(const std::string& filter) { if (filter == "NONE") { return CompressionFilter::NONE; - } else if (filter == "GZIP") { - return CompressionFilter::GZIP; } else if (filter == "ZSTD") { return CompressionFilter::ZSTD; } else { @@ -56,8 +53,6 @@ struct CompressionUtils { static std::string toString(const CompressionFilter filter) { if (filter == CompressionFilter::NONE) { return "NONE"; - } else if (filter == CompressionFilter::GZIP) { - return "GZP"; } else if (filter == CompressionFilter::ZSTD) { return "ZSTD"; } else {