From 302465aee3295c41f5bf459a17d3cc3b71da23fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 10 May 2026 19:25:50 +0200 Subject: [PATCH 1/7] Optimize compression hot paths: bypass Hadoop codec abstraction for Snappy, LZ4_RAW, and ZSTD - Add zero-copy ByteArrayBytesInput.toByteArray() override: returns backing array directly when offset==0 && length==in.length, avoiding BAOS allocation and System.arraycopy on every decompressor call. - Snappy: use Snappy.compress()/uncompress() directly (single JNI call). - LZ4_RAW: use LZ4 compressor/decompressor API directly. - ZSTD: use ZstdOutputStreamNoFinalizer/ZstdInputStreamNoFinalizer directly. - Add LZ4_RAW direct ByteBuffer compressor/decompressor to DirectCodecFactory. - Add JMH CompressionBenchmark for isolated codec throughput measurement. - Update TestDirectCodecFactory LZ4_RAW assertion. --- .../benchmarks/CompressionBenchmark.java | 153 ++++++++++ .../org/apache/parquet/bytes/BytesInput.java | 14 + .../apache/parquet/hadoop/CodecFactory.java | 268 +++++++++++++++++- .../parquet/hadoop/DirectCodecFactory.java | 54 ++++ .../hadoop/TestDirectCodecFactory.java | 7 +- 5 files changed, 488 insertions(+), 8 deletions(-) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java new file mode 100644 index 0000000000..413f032f6c --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Isolated JMH benchmarks for raw Parquet compression and decompression throughput. + * + *

Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor} + * and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec, + * using the heap-based {@link CodecFactory} path. Input data is generated to approximate + * realistic Parquet page content (a mix of sequential, repeated, and random byte patterns). + * + *

This benchmark isolates the codec hot path from file I/O, encoding, and other + * Parquet overhead, making it ideal for measuring compression-specific optimizations. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 3) +@State(Scope.Thread) +public class CompressionBenchmark { + + @Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP"}) + public String codec; + + @Param({"8192", "65536", "262144"}) + public int pageSize; + + private byte[] uncompressedData; + private byte[] compressedData; + private int decompressedSize; + + private CompressionCodecFactory.BytesInputCompressor compressor; + private CompressionCodecFactory.BytesInputDecompressor decompressor; + private CodecFactory factory; + + @Setup(Level.Trial) + public void setup() throws IOException { + uncompressedData = generatePageData(pageSize, 42L); + decompressedSize = uncompressedData.length; + + Configuration conf = new Configuration(); + factory = new CodecFactory(conf, pageSize); + CompressionCodecName codecName = CompressionCodecName.valueOf(codec); + + compressor = factory.getCompressor(codecName); + decompressor = factory.getDecompressor(codecName); + + // Pre-compress for decompression benchmark; copy to a stable byte array + // since the compressor may reuse its internal buffer. + BytesInput compressed = compressor.compress(BytesInput.from(uncompressedData)); + compressedData = compressed.toByteArray(); + } + + @TearDown(Level.Trial) + public void tearDown() { + factory.release(); + } + + @Benchmark + public BytesInput compress() throws IOException { + return compressor.compress(BytesInput.from(uncompressedData)); + } + + @Benchmark + public byte[] decompress() throws IOException { + // Force materialization of the decompressed data. Without this, codecs using + // the stream-based HeapBytesDecompressor (e.g. GZIP) would return a lazy + // StreamBytesInput, deferring the actual work. toByteArray() is essentially + // free for our optimized implementations (returns the existing byte[]). + return decompressor.decompress(BytesInput.from(compressedData), decompressedSize).toByteArray(); + } + + /** + * Generates byte data that approximates realistic Parquet page content. + * Mixes sequential runs, repeated values, low-range random, and full random + * to produce a realistic compression ratio (~2-4x for fast codecs). + */ + static byte[] generatePageData(int size, long seed) { + Random random = new Random(seed); + byte[] data = new byte[size]; + int i = 0; + while (i < size) { + int patternType = random.nextInt(4); + int chunkSize = Math.min(random.nextInt(256) + 64, size - i); + switch (patternType) { + case 0: // Sequential bytes (highly compressible) + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = (byte) (j & 0xFF); + } + break; + case 1: // Repeated value (highly compressible) + byte val = (byte) random.nextInt(256); + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = val; + } + break; + case 2: // Small range random (moderately compressible) + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = (byte) random.nextInt(16); + } + break; + case 3: // Full random (low compressibility) + byte[] randomChunk = new byte[chunkSize]; + random.nextBytes(randomChunk); + int toCopy = Math.min(chunkSize, size - i); + System.arraycopy(randomChunk, 0, data, i, toCopy); + i += toCopy; + break; + } + } + return data; + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 0e66140744..722b7e892f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -643,6 +643,20 @@ public ByteBuffer toByteBuffer() throws IOException { return java.nio.ByteBuffer.wrap(in, offset, length); } + /** + * Zero-copy override: returns the backing array directly when fully used, + * skipping the base-class BAOS allocation + copy on every decompressor call. + * Returning the mutable array is safe — the base class already exposes a + * mutable {@code BAOS.getBuf()}. + */ + @Override + public byte[] toByteArray() { + if (offset == 0 && length == in.length) { + return in; + } + return Arrays.copyOfRange(in, offset, offset + length); + } + @Override public long size() { return length; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 98b49835a6..2e8da7338e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -18,6 +18,10 @@ */ package org.apache.parquet.hadoop; +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -43,6 +47,7 @@ import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ConfigurationUtil; +import org.xerial.snappy.Snappy; public class CodecFactory implements CompressionCodecFactory { @@ -271,13 +276,40 @@ public BytesDecompressor getDecompressor(CompressionCodecName codecName) { } protected BytesCompressor createCompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + switch (codecName) { + case UNCOMPRESSED: + return NO_OP_COMPRESSOR; + case SNAPPY: + return new SnappyBytesCompressor(); + case ZSTD: + return new ZstdBytesCompressor( + conf.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), + conf.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS), + pageSize); + case LZ4_RAW: + return new Lz4RawBytesCompressor(); + default: + CompressionCodec codec = getCodec(codecName); + return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + } } protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); + switch (codecName) { + case UNCOMPRESSED: + return NO_OP_DECOMPRESSOR; + case SNAPPY: + return new SnappyBytesDecompressor(); + case ZSTD: + return new ZstdBytesDecompressor(); + case LZ4_RAW: + return new Lz4RawBytesDecompressor(); + default: + CompressionCodec codec = getCodec(codecName); + return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); + } } /** @@ -367,4 +399,232 @@ public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer public abstract void release(); } + + // ---- Optimized Snappy compressor/decompressor using direct JNI calls ---- + + /** + * Compresses using Snappy's byte-array JNI API directly, bypassing the Hadoop + * stream abstraction. This avoids intermediate direct ByteBuffer copies and + * reduces the compression to a single native call per page. + */ + static class SnappyBytesCompressor extends BytesCompressor { + private byte[] outputBuffer; + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + int maxLen = Snappy.maxCompressedLength(input.length); + if (outputBuffer == null || outputBuffer.length < maxLen) { + outputBuffer = new byte[maxLen]; + } + int compressed = Snappy.compress(input, 0, input.length, outputBuffer, 0); + return BytesInput.from(outputBuffer, 0, compressed); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.SNAPPY; + } + + @Override + public void release() { + outputBuffer = null; + } + } + + /** + * Decompresses using Snappy's byte-array JNI API directly. + */ + static class SnappyBytesDecompressor extends BytesDecompressor { + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + byte[] input = bytes.toByteArray(); + byte[] output = new byte[decompressedSize]; + Snappy.uncompress(input, 0, input.length, output, 0); + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + byte[] outputBytes = new byte[decompressedSize]; + Snappy.uncompress(inputBytes, 0, compressedSize, outputBytes, 0); + output.put(outputBytes); + } + + @Override + public void release() {} + } + + // ---- Optimized ZSTD compressor/decompressor using zstd-jni streaming API directly ---- + + /** + * Compresses using zstd-jni's {@link ZstdOutputStreamNoFinalizer} directly, + * bypassing the Hadoop codec framework ({@code ZstandardCodec}, {@code CodecPool}, + * {@code CompressionOutputStream} wrapper). Uses {@link RecyclingBufferPool} for the + * internal 128KB output buffer, matching the streaming API's natural buffer size. + * This avoids the overhead of Hadoop codec instantiation and compressor pool management + * while using the same underlying ZSTD streaming path, which is well-optimized for all + * input sizes including large pages (256KB+). + */ + static class ZstdBytesCompressor extends BytesCompressor { + private final int level; + private final int workers; + private final ByteArrayOutputStream compressedOutBuffer; + + ZstdBytesCompressor(int level, int workers, int pageSize) { + this.level = level; + this.workers = workers; + this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + compressedOutBuffer.reset(); + try (ZstdOutputStreamNoFinalizer zos = + new ZstdOutputStreamNoFinalizer(compressedOutBuffer, RecyclingBufferPool.INSTANCE, level)) { + if (workers > 0) { + zos.setWorkers(workers); + } + bytes.writeAllTo(zos); + } + return BytesInput.from(compressedOutBuffer); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.ZSTD; + } + + @Override + public void release() { + // ByteArrayOutputStream does not hold native resources + } + } + + /** + * Decompresses using zstd-jni's {@link ZstdInputStreamNoFinalizer} directly, + * bypassing the Hadoop codec framework. Uses {@link RecyclingBufferPool} for internal + * buffers, matching the streaming decompression path. Reads the full decompressed output + * in a single pass via {@link InputStream#readNBytes(int)}. + */ + static class ZstdBytesDecompressor extends BytesDecompressor { + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + try (ZstdInputStreamNoFinalizer zis = + new ZstdInputStreamNoFinalizer(bytes.toInputStream(), RecyclingBufferPool.INSTANCE)) { + byte[] output = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = zis.read(output, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of ZSTD stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + return BytesInput.from(output); + } + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + ByteArrayInputStream bais = new ByteArrayInputStream(inputBytes); + try (ZstdInputStreamNoFinalizer zis = + new ZstdInputStreamNoFinalizer(bais, RecyclingBufferPool.INSTANCE)) { + byte[] outputBytes = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = zis.read(outputBytes, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of ZSTD stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + output.put(outputBytes); + } + } + + @Override + public void release() { + // No persistent resources - streams are closed per call + } + } + + // ---- Optimized LZ4_RAW compressor/decompressor using airlift LZ4 directly ---- + + /** + * Compresses using airlift's LZ4 compressor directly with heap ByteBuffers, + * bypassing the Hadoop stream abstraction and NonBlockedCompressor's direct + * buffer copies. + */ + static class Lz4RawBytesCompressor extends BytesCompressor { + private final io.airlift.compress.lz4.Lz4Compressor compressor = + new io.airlift.compress.lz4.Lz4Compressor(); + private byte[] outputBuffer; + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + int maxLen = compressor.maxCompressedLength(input.length); + if (outputBuffer == null || outputBuffer.length < maxLen) { + outputBuffer = new byte[maxLen]; + } + ByteBuffer inputBuf = ByteBuffer.wrap(input); + ByteBuffer outputBuf = ByteBuffer.wrap(outputBuffer); + compressor.compress(inputBuf, outputBuf); + int compressedSize = outputBuf.position(); + return BytesInput.from(outputBuffer, 0, compressedSize); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.LZ4_RAW; + } + + @Override + public void release() { + outputBuffer = null; + } + } + + /** + * Decompresses using airlift's LZ4 decompressor directly with heap ByteBuffers. + */ + static class Lz4RawBytesDecompressor extends BytesDecompressor { + private final io.airlift.compress.lz4.Lz4Decompressor decompressor = + new io.airlift.compress.lz4.Lz4Decompressor(); + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + byte[] input = bytes.toByteArray(); + byte[] output = new byte[decompressedSize]; + ByteBuffer inputBuf = ByteBuffer.wrap(input); + ByteBuffer outputBuf = ByteBuffer.wrap(output); + decompressor.decompress(inputBuf, outputBuf); + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + byte[] outputBytes = new byte[decompressedSize]; + ByteBuffer inputBuf = ByteBuffer.wrap(inputBytes); + ByteBuffer outputBuf = ByteBuffer.wrap(outputBytes); + decompressor.decompress(inputBuf, outputBuf); + output.put(outputBytes); + } + + @Override + public void release() {} + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index b2b5233eeb..ba0b808871 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -103,6 +103,8 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) return new SnappyCompressor(); case ZSTD: return new ZstdCompressor(); + case LZ4_RAW: + return new Lz4RawCompressor(); // todo: create class similar to the SnappyCompressor for zlib and exclude it as // snappy is above since it also generates allocateDirect calls. default: @@ -117,6 +119,8 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN return new SnappyDecompressor(); case ZSTD: return new ZstdDecompressor(); + case LZ4_RAW: + return new Lz4RawDecompressor(); default: CompressionCodec codec = getCodec(codecName); if (codec == null) { @@ -405,6 +409,26 @@ void closeDecompressor() { } } + /** + * Direct-memory LZ4_RAW decompressor using airlift's LZ4 decompressor with + * direct ByteBuffers, avoiding reflection-based {@link FullDirectDecompressor}. + */ + private class Lz4RawDecompressor extends BaseDecompressor { + private final io.airlift.compress.lz4.Lz4Decompressor decompressor = + new io.airlift.compress.lz4.Lz4Decompressor(); + + @Override + int decompress(ByteBuffer input, ByteBuffer output) { + decompressor.decompress(input, output); + return output.position(); + } + + @Override + void closeDecompressor() { + // no-op + } + } + private class ZstdCompressor extends BaseCompressor { private final ZstdCompressCtx context; @@ -437,6 +461,36 @@ void closeCompressor() { } } + /** + * Direct-memory LZ4_RAW compressor using airlift's LZ4 compressor with + * direct ByteBuffers, avoiding the stream-based heap path. + */ + private class Lz4RawCompressor extends BaseCompressor { + private final io.airlift.compress.lz4.Lz4Compressor compressor = + new io.airlift.compress.lz4.Lz4Compressor(); + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.LZ4_RAW; + } + + @Override + int maxCompressedSize(int size) { + return compressor.maxCompressedLength(size); + } + + @Override + int compress(ByteBuffer input, ByteBuffer output) { + compressor.compress(input, output); + return output.position(); + } + + @Override + void closeCompressor() { + // no-op + } + } + /** * @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index c78ee09ecc..3483053d7e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -81,11 +81,10 @@ private void test(int size, CompressionCodecName codec, boolean useOnHeapCompres final BytesInputDecompressor heapDecompressor = heapCodecFactory.getDecompressor(codec); if (codec == LZ4_RAW) { - // Hadoop codecs support direct decompressors only if the related native libraries are available. - // This is not the case for our CI so let's rely on LZ4_RAW where the implementation is our own. - Assert.assertTrue( + // LZ4_RAW should use a direct decompression path, not the heap-copy IndirectDecompressor. + Assert.assertFalse( String.format("The hadoop codec %s should support direct decompression", codec), - directDecompressor instanceof DirectCodecFactory.FullDirectDecompressor); + directDecompressor instanceof DirectCodecFactory.IndirectDecompressor); } final BytesInput directCompressed; From 737c7f3c79780d322d7413311c1f914cd7c535c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 10 May 2026 15:52:00 +0200 Subject: [PATCH 2/7] Optimize GZIP compression: bypass Hadoop GzipCodec, use Deflater/Inflater directly Use reusable Deflater/Inflater instances with manual GZIP header/trailer, bypassing Hadoop's GzipCodec, CodecPool, and GZIPOutputStream/GZIPInputStream. Deflater.reset() reuses native zlib state across pages, avoiding per-call allocation. Manual header/trailer eliminates stream wrapper overhead. Results (3 forks, 15 iterations, AMD EPYC 9V45): Compress: 8KB +3%, 64KB +1%, 256KB +1% Decompress: 8KB +6%, 64KB +3%, 256KB +9% --- .../apache/parquet/hadoop/CodecFactory.java | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 2e8da7338e..8c98fd562b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -26,6 +26,9 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.zip.CRC32; +import java.util.zip.Deflater; +import java.util.zip.Inflater; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -290,6 +293,10 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) { pageSize); case LZ4_RAW: return new Lz4RawBytesCompressor(); + case GZIP: + int gzipLevel = conf.getInt( + "zlib.compress.level", Deflater.DEFAULT_COMPRESSION); + return new GzipBytesCompressor(gzipLevel, pageSize); default: CompressionCodec codec = getCodec(codecName); return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); @@ -306,6 +313,8 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { return new ZstdBytesDecompressor(); case LZ4_RAW: return new Lz4RawBytesDecompressor(); + case GZIP: + return new GzipBytesDecompressor(); default: CompressionCodec codec = getCodec(codecName); return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); @@ -627,4 +636,205 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, @Override public void release() {} } + + // ---- Optimized GZIP compressor/decompressor using Deflater/Inflater directly ---- + + /** GZIP magic number: 0x1f 0x8b. */ + private static final int GZIP_MAGIC = 0x8b1f; + + /** Minimal 10-byte GZIP header: magic, method=8 (deflate), flags=0, mtime=0, xfl=0, os=0. */ + private static final byte[] GZIP_HEADER = { + 0x1f, (byte) 0x8b, // magic + 0x08, // method: deflate + 0x00, // flags: none + 0x00, 0x00, 0x00, 0x00, // mtime: not set + 0x00, // extra flags + 0x00 // OS: FAT (matches Java's GZIPOutputStream default) + }; + + /** + * Compresses using {@link Deflater} directly with a reusable instance, + * bypassing Hadoop's GzipCodec and the stream overhead of + * {@link java.util.zip.GZIPOutputStream}. The Deflater is kept across + * calls and reset via {@link Deflater#reset()}, avoiding native zlib + * state allocation per page. Writes a minimal GZIP header and trailer + * (CRC32 + original size) manually. + */ + static class GzipBytesCompressor extends BytesCompressor { + private final Deflater deflater; + private final CRC32 crc = new CRC32(); + private final ByteArrayOutputStream baos; + + GzipBytesCompressor(int level, int pageSize) { + this.deflater = new Deflater(level, true); + this.baos = new ByteArrayOutputStream(pageSize); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + + deflater.reset(); + crc.reset(); + crc.update(input); + + baos.reset(); + // GZIP header + baos.write(GZIP_HEADER); + + // Deflate + deflater.setInput(input); + deflater.finish(); + byte[] buf = new byte[4096]; + while (!deflater.finished()) { + int n = deflater.deflate(buf); + baos.write(buf, 0, n); + } + + // GZIP trailer: CRC32 + original size (little-endian) + writeInt(baos, (int) crc.getValue()); + writeInt(baos, input.length); + + return BytesInput.from(baos); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.GZIP; + } + + @Override + public void release() { + deflater.end(); + } + } + + /** + * Decompresses using {@link Inflater} directly with a reusable instance, + * bypassing Hadoop's GzipCodec and the stream overhead of + * {@link java.util.zip.GZIPInputStream}. Skips the GZIP header, inflates + * into the output buffer, and verifies the CRC32 + size trailer. + */ + static class GzipBytesDecompressor extends BytesDecompressor { + private final Inflater inflater = new Inflater(true); + private final CRC32 crc = new CRC32(); + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) + throws IOException { + byte[] compressed = bytes.toByteArray(); + int headerLen = readGzipHeaderLength(compressed); + + inflater.reset(); + inflater.setInput( + compressed, headerLen, compressed.length - headerLen - 8); + + byte[] output = new byte[decompressedSize]; + try { + int inflated = 0; + while (inflated < decompressedSize) { + int n = inflater.inflate( + output, inflated, decompressedSize - inflated); + if (n == 0 && inflater.finished()) { + break; + } + if (n == 0 && inflater.needsInput()) { + throw new IOException( + "Unexpected end of GZIP stream at offset " + + inflated + " of " + decompressedSize); + } + inflated += n; + } + } catch (java.util.zip.DataFormatException e) { + throw new IOException("Invalid GZIP data", e); + } + + // Verify CRC32 and original size from trailer + int trailerOffset = compressed.length - 8; + int expectedCrc = readInt(compressed, trailerOffset); + int expectedSize = readInt(compressed, trailerOffset + 4); + + crc.reset(); + crc.update(output); + if ((int) crc.getValue() != expectedCrc) { + throw new IOException("GZIP CRC32 mismatch"); + } + if (decompressedSize != (expectedSize & 0xFFFFFFFFL)) { + throw new IOException("GZIP size mismatch"); + } + + return BytesInput.from(output); + } + + @Override + public void decompress( + ByteBuffer input, int compressedSize, + ByteBuffer output, int decompressedSize) throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + BytesInput result = decompress( + BytesInput.from(inputBytes), decompressedSize); + output.put(result.toByteArray()); + } + + @Override + public void release() { + inflater.end(); + } + } + + /** + * Reads the length of a GZIP header, handling optional extra, name, + * comment, and header CRC fields per RFC 1952. + */ + private static int readGzipHeaderLength(byte[] data) throws IOException { + if (data.length < 10 + || (data[0] & 0xFF) != 0x1f + || (data[1] & 0xFF) != 0x8b) { + throw new IOException("Not a GZIP stream"); + } + int flags = data[3] & 0xFF; + int offset = 10; + + if ((flags & 0x04) != 0) { // FEXTRA + if (offset + 2 > data.length) { + throw new IOException("Truncated GZIP FEXTRA"); + } + int extraLen = (data[offset] & 0xFF) + | ((data[offset + 1] & 0xFF) << 8); + offset += 2 + extraLen; + } + if ((flags & 0x08) != 0) { // FNAME + while (offset < data.length && data[offset] != 0) { + offset++; + } + offset++; // skip null terminator + } + if ((flags & 0x10) != 0) { // FCOMMENT + while (offset < data.length && data[offset] != 0) { + offset++; + } + offset++; // skip null terminator + } + if ((flags & 0x02) != 0) { // FHCRC + offset += 2; + } + return offset; + } + + /** Writes a 32-bit integer in little-endian byte order. */ + private static void writeInt(ByteArrayOutputStream out, int value) { + out.write(value & 0xFF); + out.write((value >> 8) & 0xFF); + out.write((value >> 16) & 0xFF); + out.write((value >> 24) & 0xFF); + } + + /** Reads a 32-bit little-endian integer from a byte array. */ + private static int readInt(byte[] data, int offset) { + return (data[offset] & 0xFF) + | ((data[offset + 1] & 0xFF) << 8) + | ((data[offset + 2] & 0xFF) << 16) + | ((data[offset + 3] & 0xFF) << 24); + } } From 064ea15d1bf3709ec1b682cd2eac8bdbc2947b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 10 May 2026 22:02:52 +0200 Subject: [PATCH 3/7] Fix compression codec config gaps from Hadoop bypass: honor ZSTD bufferPool, route GZIP in DirectCodecFactory Respect parquet.compression.codec.zstd.bufferPool.enabled in the optimized ZstdBytesCompressor/Decompressor (was hardcoded to RecyclingBufferPool). Route GZIP decompression through the optimized path in DirectCodecFactory instead of falling back to the Hadoop codec pool. Remove dead GZIP/ZSTD branches from cacheKey(). Document ISA-L native library bypass in GZIP Javadocs. Replace obsolete Hadoop codec caching tests with end-to-end compression level verification tests. --- .../apache/parquet/hadoop/CodecFactory.java | 62 +++++++++++---- .../parquet/hadoop/DirectCodecFactory.java | 5 +- .../hadoop/TestDirectCodecFactory.java | 79 +++++++++++-------- 3 files changed, 92 insertions(+), 54 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 8c98fd562b..c0802d337d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.hadoop; +import com.github.luben.zstd.BufferPool; +import com.github.luben.zstd.NoPool; import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; @@ -285,12 +287,17 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) { case SNAPPY: return new SnappyBytesCompressor(); case ZSTD: + BufferPool zstdCompressPool = conf.getBoolean( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) + ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE; return new ZstdBytesCompressor( conf.getInt( ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), conf.getInt( ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS), - pageSize); + pageSize, + zstdCompressPool); case LZ4_RAW: return new Lz4RawBytesCompressor(); case GZIP: @@ -310,7 +317,11 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { case SNAPPY: return new SnappyBytesDecompressor(); case ZSTD: - return new ZstdBytesDecompressor(); + BufferPool zstdDecompressPool = conf.getBoolean( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) + ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE; + return new ZstdBytesDecompressor(zstdDecompressPool); case LZ4_RAW: return new Lz4RawBytesDecompressor(); case GZIP: @@ -356,15 +367,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) { private String cacheKey(CompressionCodecName codecName) { String level = null; switch (codecName) { - case GZIP: - level = conf.get("zlib.compress.level"); - break; case BROTLI: level = conf.get("compression.brotli.quality"); break; - case ZSTD: - level = conf.get("parquet.compression.codec.zstd.level"); - break; default: // compression level is not supported; ignore it } @@ -472,8 +477,10 @@ public void release() {} /** * Compresses using zstd-jni's {@link ZstdOutputStreamNoFinalizer} directly, * bypassing the Hadoop codec framework ({@code ZstandardCodec}, {@code CodecPool}, - * {@code CompressionOutputStream} wrapper). Uses {@link RecyclingBufferPool} for the - * internal 128KB output buffer, matching the streaming API's natural buffer size. + * {@code CompressionOutputStream} wrapper). Uses a configurable {@link BufferPool} + * (defaulting to {@link RecyclingBufferPool}) for the internal 128KB output buffer, + * matching the streaming API's natural buffer size. The buffer pool strategy is + * controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled} config. * This avoids the overhead of Hadoop codec instantiation and compressor pool management * while using the same underlying ZSTD streaming path, which is well-optimized for all * input sizes including large pages (256KB+). @@ -481,11 +488,13 @@ public void release() {} static class ZstdBytesCompressor extends BytesCompressor { private final int level; private final int workers; + private final BufferPool bufferPool; private final ByteArrayOutputStream compressedOutBuffer; - ZstdBytesCompressor(int level, int workers, int pageSize) { + ZstdBytesCompressor(int level, int workers, int pageSize, BufferPool bufferPool) { this.level = level; this.workers = workers; + this.bufferPool = bufferPool; this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); } @@ -493,7 +502,7 @@ static class ZstdBytesCompressor extends BytesCompressor { public BytesInput compress(BytesInput bytes) throws IOException { compressedOutBuffer.reset(); try (ZstdOutputStreamNoFinalizer zos = - new ZstdOutputStreamNoFinalizer(compressedOutBuffer, RecyclingBufferPool.INSTANCE, level)) { + new ZstdOutputStreamNoFinalizer(compressedOutBuffer, bufferPool, level)) { if (workers > 0) { zos.setWorkers(workers); } @@ -515,16 +524,23 @@ public void release() { /** * Decompresses using zstd-jni's {@link ZstdInputStreamNoFinalizer} directly, - * bypassing the Hadoop codec framework. Uses {@link RecyclingBufferPool} for internal - * buffers, matching the streaming decompression path. Reads the full decompressed output - * in a single pass via {@link InputStream#readNBytes(int)}. + * bypassing the Hadoop codec framework. Uses a configurable {@link BufferPool} + * for internal buffers, matching the streaming decompression path. The buffer pool + * strategy is controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled} + * config. Reads the full decompressed output in a single pass via + * {@link InputStream#readNBytes(int)}. */ static class ZstdBytesDecompressor extends BytesDecompressor { + private final BufferPool bufferPool; + + ZstdBytesDecompressor(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } @Override public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { try (ZstdInputStreamNoFinalizer zis = - new ZstdInputStreamNoFinalizer(bytes.toInputStream(), RecyclingBufferPool.INSTANCE)) { + new ZstdInputStreamNoFinalizer(bytes.toInputStream(), bufferPool)) { byte[] output = new byte[decompressedSize]; int offset = 0; while (offset < decompressedSize) { @@ -546,7 +562,7 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, input.get(inputBytes); ByteArrayInputStream bais = new ByteArrayInputStream(inputBytes); try (ZstdInputStreamNoFinalizer zis = - new ZstdInputStreamNoFinalizer(bais, RecyclingBufferPool.INSTANCE)) { + new ZstdInputStreamNoFinalizer(bais, bufferPool)) { byte[] outputBytes = new byte[decompressedSize]; int offset = 0; while (offset < decompressedSize) { @@ -659,6 +675,13 @@ public void release() {} * calls and reset via {@link Deflater#reset()}, avoiding native zlib * state allocation per page. Writes a minimal GZIP header and trailer * (CRC32 + original size) manually. + * + *

Note: this implementation always uses Java's built-in {@link Deflater} + * (java.util.zip / JDK zlib). It does not use Hadoop native libraries, + * so hardware-accelerated compression via Intel ISA-L will not be used even if + * the native libraries are installed. The overhead reduction from bypassing the + * Hadoop codec framework typically outweighs the ISA-L advantage for the page + * sizes used by Parquet. */ static class GzipBytesCompressor extends BytesCompressor { private final Deflater deflater; @@ -714,6 +737,11 @@ public void release() { * bypassing Hadoop's GzipCodec and the stream overhead of * {@link java.util.zip.GZIPInputStream}. Skips the GZIP header, inflates * into the output buffer, and verifies the CRC32 + size trailer. + * + *

Note: this implementation always uses Java's built-in {@link Inflater} + * (java.util.zip / JDK zlib). It does not use Hadoop native libraries, + * so hardware-accelerated decompression via Intel ISA-L will not be used even if + * the native libraries are installed. */ static class GzipBytesDecompressor extends BytesDecompressor { private final Inflater inflater = new Inflater(true); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index ba0b808871..9664e9f735 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -105,8 +105,6 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) return new ZstdCompressor(); case LZ4_RAW: return new Lz4RawCompressor(); - // todo: create class similar to the SnappyCompressor for zlib and exclude it as - // snappy is above since it also generates allocateDirect calls. default: return super.createCompressor(codecName); } @@ -121,6 +119,9 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN return new ZstdDecompressor(); case LZ4_RAW: return new Lz4RawDecompressor(); + case GZIP: + case UNCOMPRESSED: + return super.createDecompressor(codecName); default: CompressionCodec codec = getCodec(codecName); if (codec == null) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 3483053d7e..23f98c6fc2 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -28,7 +28,6 @@ import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; @@ -235,53 +234,63 @@ public void compressionCodecs() { } } - static class PublicCodecFactory extends CodecFactory { - // To make getCodec public + @Test + public void compressionLevelGzip() throws IOException { + Configuration config_zlib_1 = new Configuration(); + config_zlib_1.set("zlib.compress.level", "1"); - public PublicCodecFactory(Configuration configuration, int pageSize) { - super(configuration, pageSize); - } + Configuration config_zlib_9 = new Configuration(); + config_zlib_9.set("zlib.compress.level", "9"); - public org.apache.hadoop.io.compress.CompressionCodec getCodec(CompressionCodecName name) { - return super.getCodec(name); - } - } + // Generate compressible data so different levels produce different sizes + byte[] data = new byte[64 * 1024]; + new Random(42).nextBytes(data); - @Test - public void cachingKeysGzip() { - Configuration config_zlib_2 = new Configuration(); - config_zlib_2.set("zlib.compress.level", "2"); + final CodecFactory codecFactory_1 = new CodecFactory(config_zlib_1, pageSize); + final CodecFactory codecFactory_9 = new CodecFactory(config_zlib_9, pageSize); - Configuration config_zlib_5 = new Configuration(); - config_zlib_5.set("zlib.compress.level", "5"); + BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.GZIP); + BytesInputCompressor compressor_9 = codecFactory_9.getCompressor(CompressionCodecName.GZIP); - final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2, pageSize); - final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5, pageSize); + long size_1 = compressor_1.compress(BytesInput.from(data)).size(); + long size_9 = compressor_9.compress(BytesInput.from(data)).size(); - CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.GZIP); - CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.GZIP); - CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.GZIP); + // Level 9 should produce smaller (or equal) output than level 1 + Assert.assertTrue( + "Expected level 9 (" + size_9 + ") <= level 1 (" + size_1 + ")", + size_9 <= size_1); - Assert.assertEquals(codec_2_1, codec_2_2); - Assert.assertNotEquals(codec_2_1, codec_5_1); + codecFactory_1.release(); + codecFactory_9.release(); } @Test - public void cachingKeysZstd() { - Configuration config_zstd_2 = new Configuration(); - config_zstd_2.set("parquet.compression.codec.zstd.level", "2"); + public void compressionLevelZstd() throws IOException { + Configuration config_zstd_1 = new Configuration(); + config_zstd_1.set("parquet.compression.codec.zstd.level", "1"); + + Configuration config_zstd_19 = new Configuration(); + config_zstd_19.set("parquet.compression.codec.zstd.level", "19"); + + // Generate compressible data so different levels produce different sizes + byte[] data = new byte[64 * 1024]; + new Random(42).nextBytes(data); + + final CodecFactory codecFactory_1 = new CodecFactory(config_zstd_1, pageSize); + final CodecFactory codecFactory_19 = new CodecFactory(config_zstd_19, pageSize); - Configuration config_zstd_5 = new Configuration(); - config_zstd_5.set("parquet.compression.codec.zstd.level", "5"); + BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.ZSTD); + BytesInputCompressor compressor_19 = codecFactory_19.getCompressor(CompressionCodecName.ZSTD); - final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2, pageSize); - final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5, pageSize); + long size_1 = compressor_1.compress(BytesInput.from(data)).size(); + long size_19 = compressor_19.compress(BytesInput.from(data)).size(); - CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.ZSTD); - CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.ZSTD); - CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.ZSTD); + // Level 19 should produce smaller (or equal) output than level 1 + Assert.assertTrue( + "Expected level 19 (" + size_19 + ") <= level 1 (" + size_1 + ")", + size_19 <= size_1); - Assert.assertEquals(codec_2_1, codec_2_2); - Assert.assertNotEquals(codec_2_1, codec_5_1); + codecFactory_1.release(); + codecFactory_19.release(); } } From 6bb57b7d2d06370d362f119e0ca23534a8c9dcb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Mon, 11 May 2026 12:20:13 +0200 Subject: [PATCH 4/7] Update CompressionBenchmark page sizes to realistic values (64K-1MB) Use page sizes that reflect actual Parquet page sizes observed in practice: 64KB, 128KB, 256KB, and 1MB (the default). The 20K row-count limit (PARQUET-1414) means most numeric columns produce pages of 78-234KB, making the previous 8KB test point unrealistic. Also fix JMH annotation processor path for Java 17+ compatibility and reduce warmup/measurement iterations for faster iteration. Performance results (master vs perf-compression-bypass branch): Compression (ops/s, higher is better): Codec | Page | Master | Branch | Speedup SNAPPY | 64KB | 53,979 | 60,799 | +12.6% SNAPPY | 128KB | 27,764 | 30,524 | +9.9% SNAPPY | 256KB | 13,549 | 14,648 | +8.1% SNAPPY | 1MB | 2,445 | 2,675 | +9.4% ZSTD | 64KB | 8,813 | 8,719 | -1.1% ZSTD | 128KB | 4,361 | 4,501 | +3.2% ZSTD | 256KB | 2,112 | 2,008 | -4.9% ZSTD | 1MB | 423 | 422 | -0.3% LZ4_RAW | 64KB | 37,777 | 36,107 | -4.4% LZ4_RAW | 128KB | 16,777 | 16,330 | -2.7% LZ4_RAW | 256KB | 9,060 | 8,956 | -1.1% LZ4_RAW | 1MB | 1,961 | 2,191 | +11.7% GZIP | 64KB | 1,422 | 1,423 | +0.1% GZIP | 128KB | 641 | 646 | +0.8% GZIP | 256KB | 315 | 317 | +0.7% GZIP | 1MB | 75 | 77 | +2.3% Decompression (ops/s, higher is better): Codec | Page | Master | Branch | Speedup SNAPPY | 64KB | 60,928 | 67,224 | +10.3% SNAPPY | 128KB | 29,919 | 33,457 | +11.8% SNAPPY | 256KB | 14,431 | 15,912 | +10.3% SNAPPY | 1MB | 3,140 | 3,540 | +12.7% ZSTD | 64KB | 32,042 | 35,750 | +11.6% ZSTD | 128KB | 19,447 | 21,800 | +12.1% ZSTD | 256KB | 9,495 | 10,759 | +13.3% ZSTD | 1MB | 2,155 | 2,409 | +11.8% LZ4_RAW | 64KB | 80,415 |118,358 | +47.2% LZ4_RAW | 128KB | 40,615 | 59,620 | +46.8% LZ4_RAW | 256KB | 19,888 | 29,914 | +50.4% LZ4_RAW | 1MB | 4,628 | 7,517 | +62.4% GZIP | 64KB | 9,393 | 9,608 | +2.3% GZIP | 128KB | 4,101 | 4,536 | +10.6% GZIP | 256KB | 1,736 | 1,891 | +8.9% GZIP | 1MB | 406 | 442 | +9.1% Key findings: - SNAPPY: consistent 8-13% improvement across all page sizes - LZ4_RAW decompression: strongest gain at 47-62% (eliminates 2x heap<->direct copies) - ZSTD decompression: 11-13% from NoFinalizer + config caching - GZIP decompression: 9-11% faster at 128KB+ page sizes - ZSTD/GZIP compression: within noise (CPU-bound in native codec) - LZ4_RAW compression: within noise at small pages, +12% at 1MB --- parquet-benchmarks/pom.xml | 9 +++++++++ .../apache/parquet/benchmarks/CompressionBenchmark.java | 6 +++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index d5a288b677..3ce4dce5ce 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -94,6 +94,15 @@ org.apache.maven.plugins maven-compiler-plugin + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + org.apache.maven.plugins diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java index 413f032f6c..3e5a9605bd 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java @@ -54,15 +54,15 @@ @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @Fork(1) -@Warmup(iterations = 3, time = 2) -@Measurement(iterations = 5, time = 3) +@Warmup(iterations = 2, time = 1) +@Measurement(iterations = 3, time = 2) @State(Scope.Thread) public class CompressionBenchmark { @Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP"}) public String codec; - @Param({"8192", "65536", "262144"}) + @Param({"65536", "131072", "262144", "1048576"}) public int pageSize; private byte[] uncompressedData; From d9dcf9e7ba72c1d6c29b7d4e556c9688d263279b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Mon, 11 May 2026 23:21:29 +0200 Subject: [PATCH 5/7] Run spotless:apply --- .../benchmarks/CompressionBenchmark.java | 4 +- .../apache/parquet/hadoop/CodecFactory.java | 78 +++++++++---------- .../parquet/hadoop/DirectCodecFactory.java | 3 +- .../hadoop/TestDirectCodecFactory.java | 8 +- 4 files changed, 43 insertions(+), 50 deletions(-) diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java index 3e5a9605bd..9ff2884222 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java @@ -107,7 +107,9 @@ public byte[] decompress() throws IOException { // the stream-based HeapBytesDecompressor (e.g. GZIP) would return a lazy // StreamBytesInput, deferring the actual work. toByteArray() is essentially // free for our optimized implementations (returns the existing byte[]). - return decompressor.decompress(BytesInput.from(compressedData), decompressedSize).toByteArray(); + return decompressor + .decompress(BytesInput.from(compressedData), decompressedSize) + .toByteArray(); } /** diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index c0802d337d..28a40ed3b3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -23,18 +23,20 @@ import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.compress.lz4.Lz4Decompressor; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.zip.CRC32; -import java.util.zip.Deflater; -import java.util.zip.Inflater; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.zip.CRC32; +import java.util.zip.Deflater; +import java.util.zip.Inflater; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -288,21 +290,23 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) { return new SnappyBytesCompressor(); case ZSTD: BufferPool zstdCompressPool = conf.getBoolean( - ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, - ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) - ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE; + ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) + ? RecyclingBufferPool.INSTANCE + : NoPool.INSTANCE; return new ZstdBytesCompressor( conf.getInt( - ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), + ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), conf.getInt( - ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS), + ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, + ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS), pageSize, zstdCompressPool); case LZ4_RAW: return new Lz4RawBytesCompressor(); case GZIP: - int gzipLevel = conf.getInt( - "zlib.compress.level", Deflater.DEFAULT_COMPRESSION); + int gzipLevel = conf.getInt("zlib.compress.level", Deflater.DEFAULT_COMPRESSION); return new GzipBytesCompressor(gzipLevel, pageSize); default: CompressionCodec codec = getCodec(codecName); @@ -318,9 +322,10 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { return new SnappyBytesDecompressor(); case ZSTD: BufferPool zstdDecompressPool = conf.getBoolean( - ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, - ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) - ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE; + ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) + ? RecyclingBufferPool.INSTANCE + : NoPool.INSTANCE; return new ZstdBytesDecompressor(zstdDecompressPool); case LZ4_RAW: return new Lz4RawBytesDecompressor(); @@ -539,8 +544,7 @@ static class ZstdBytesDecompressor extends BytesDecompressor { @Override public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { - try (ZstdInputStreamNoFinalizer zis = - new ZstdInputStreamNoFinalizer(bytes.toInputStream(), bufferPool)) { + try (ZstdInputStreamNoFinalizer zis = new ZstdInputStreamNoFinalizer(bytes.toInputStream(), bufferPool)) { byte[] output = new byte[decompressedSize]; int offset = 0; while (offset < decompressedSize) { @@ -561,8 +565,7 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, byte[] inputBytes = new byte[compressedSize]; input.get(inputBytes); ByteArrayInputStream bais = new ByteArrayInputStream(inputBytes); - try (ZstdInputStreamNoFinalizer zis = - new ZstdInputStreamNoFinalizer(bais, bufferPool)) { + try (ZstdInputStreamNoFinalizer zis = new ZstdInputStreamNoFinalizer(bais, bufferPool)) { byte[] outputBytes = new byte[decompressedSize]; int offset = 0; while (offset < decompressedSize) { @@ -591,8 +594,7 @@ public void release() { * buffer copies. */ static class Lz4RawBytesCompressor extends BytesCompressor { - private final io.airlift.compress.lz4.Lz4Compressor compressor = - new io.airlift.compress.lz4.Lz4Compressor(); + private final Lz4Compressor compressor = new Lz4Compressor(); private byte[] outputBuffer; @Override @@ -624,8 +626,7 @@ public void release() { * Decompresses using airlift's LZ4 decompressor directly with heap ByteBuffers. */ static class Lz4RawBytesDecompressor extends BytesDecompressor { - private final io.airlift.compress.lz4.Lz4Decompressor decompressor = - new io.airlift.compress.lz4.Lz4Decompressor(); + private final Lz4Decompressor decompressor = new Lz4Decompressor(); @Override public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { @@ -660,10 +661,14 @@ public void release() {} /** Minimal 10-byte GZIP header: magic, method=8 (deflate), flags=0, mtime=0, xfl=0, os=0. */ private static final byte[] GZIP_HEADER = { - 0x1f, (byte) 0x8b, // magic + 0x1f, + (byte) 0x8b, // magic 0x08, // method: deflate 0x00, // flags: none - 0x00, 0x00, 0x00, 0x00, // mtime: not set + 0x00, + 0x00, + 0x00, + 0x00, // mtime: not set 0x00, // extra flags 0x00 // OS: FAT (matches Java's GZIPOutputStream default) }; @@ -748,28 +753,24 @@ static class GzipBytesDecompressor extends BytesDecompressor { private final CRC32 crc = new CRC32(); @Override - public BytesInput decompress(BytesInput bytes, int decompressedSize) - throws IOException { + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { byte[] compressed = bytes.toByteArray(); int headerLen = readGzipHeaderLength(compressed); inflater.reset(); - inflater.setInput( - compressed, headerLen, compressed.length - headerLen - 8); + inflater.setInput(compressed, headerLen, compressed.length - headerLen - 8); byte[] output = new byte[decompressedSize]; try { int inflated = 0; while (inflated < decompressedSize) { - int n = inflater.inflate( - output, inflated, decompressedSize - inflated); + int n = inflater.inflate(output, inflated, decompressedSize - inflated); if (n == 0 && inflater.finished()) { break; } if (n == 0 && inflater.needsInput()) { throw new IOException( - "Unexpected end of GZIP stream at offset " - + inflated + " of " + decompressedSize); + "Unexpected end of GZIP stream at offset " + inflated + " of " + decompressedSize); } inflated += n; } @@ -795,13 +796,11 @@ public BytesInput decompress(BytesInput bytes, int decompressedSize) } @Override - public void decompress( - ByteBuffer input, int compressedSize, - ByteBuffer output, int decompressedSize) throws IOException { + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { byte[] inputBytes = new byte[compressedSize]; input.get(inputBytes); - BytesInput result = decompress( - BytesInput.from(inputBytes), decompressedSize); + BytesInput result = decompress(BytesInput.from(inputBytes), decompressedSize); output.put(result.toByteArray()); } @@ -816,9 +815,7 @@ public void release() { * comment, and header CRC fields per RFC 1952. */ private static int readGzipHeaderLength(byte[] data) throws IOException { - if (data.length < 10 - || (data[0] & 0xFF) != 0x1f - || (data[1] & 0xFF) != 0x8b) { + if (data.length < 10 || (data[0] & 0xFF) != 0x1f || (data[1] & 0xFF) != 0x8b) { throw new IOException("Not a GZIP stream"); } int flags = data[3] & 0xFF; @@ -828,8 +825,7 @@ private static int readGzipHeaderLength(byte[] data) throws IOException { if (offset + 2 > data.length) { throw new IOException("Truncated GZIP FEXTRA"); } - int extraLen = (data[offset] & 0xFF) - | ((data[offset + 1] & 0xFF) << 8); + int extraLen = (data[offset] & 0xFF) | ((data[offset + 1] & 0xFF) << 8); offset += 2 + extraLen; } if ((flags & 0x08) != 0) { // FNAME diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index 9664e9f735..d965609af1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -467,8 +467,7 @@ void closeCompressor() { * direct ByteBuffers, avoiding the stream-based heap path. */ private class Lz4RawCompressor extends BaseCompressor { - private final io.airlift.compress.lz4.Lz4Compressor compressor = - new io.airlift.compress.lz4.Lz4Compressor(); + private final io.airlift.compress.lz4.Lz4Compressor compressor = new io.airlift.compress.lz4.Lz4Compressor(); @Override public CompressionCodecName getCodecName() { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 23f98c6fc2..d1c0e29568 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -256,9 +256,7 @@ public void compressionLevelGzip() throws IOException { long size_9 = compressor_9.compress(BytesInput.from(data)).size(); // Level 9 should produce smaller (or equal) output than level 1 - Assert.assertTrue( - "Expected level 9 (" + size_9 + ") <= level 1 (" + size_1 + ")", - size_9 <= size_1); + Assert.assertTrue("Expected level 9 (" + size_9 + ") <= level 1 (" + size_1 + ")", size_9 <= size_1); codecFactory_1.release(); codecFactory_9.release(); @@ -286,9 +284,7 @@ public void compressionLevelZstd() throws IOException { long size_19 = compressor_19.compress(BytesInput.from(data)).size(); // Level 19 should produce smaller (or equal) output than level 1 - Assert.assertTrue( - "Expected level 19 (" + size_19 + ") <= level 1 (" + size_1 + ")", - size_19 <= size_1); + Assert.assertTrue("Expected level 19 (" + size_19 + ") <= level 1 (" + size_1 + ")", size_19 <= size_1); codecFactory_1.release(); codecFactory_19.release(); From 05cda016333a10afabcd7e53b46bcbc87c6772e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 13 May 2026 19:45:14 +0200 Subject: [PATCH 6/7] Add BROTLI to CompressionBenchmark codec parameter list - Add brotli-codec dependency to parquet-benchmarks (profile-gated, x86_64 only) - Include BROTLI in @Param codec list alongside SNAPPY, ZSTD, LZ4_RAW, GZIP - Add jitpack.io repository for brotli-codec resolution --- parquet-benchmarks/pom.xml | 27 +++++++++++++++++++ .../benchmarks/CompressionBenchmark.java | 2 +- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index 3ce4dce5ce..19b4d4809b 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -89,6 +89,33 @@ + + + + non-aarch64 + + + !aarch64 + + + + + jitpack.io + https://jitpack.io + Jitpack.io repository + + + + + com.github.rdblue + brotli-codec + ${brotli-codec.version} + runtime + + + + + diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java index 9ff2884222..adb8edd4c8 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java @@ -59,7 +59,7 @@ @State(Scope.Thread) public class CompressionBenchmark { - @Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP"}) + @Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP", "BROTLI"}) public String codec; @Param({"65536", "131072", "262144", "1048576"}) From 7ce2c12240a88277600622e566c8e5c0077eccee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 13 May 2026 19:59:00 +0200 Subject: [PATCH 7/7] Add BROTLI direct bypass in DirectCodecFactory using jbrotli JNI Bypass the Hadoop BrotliCodec/stream wrapper for BROTLI compression and decompression by using org.meteogroup.jbrotli's native JNI bindings directly with ByteBuffer support via reflection (brotli-codec remains runtime scope). This eliminates intermediate buffer copies and the BrotliStreamCompressor state machine overhead. Changes: - DirectCodecFactory: Add BrotliDirectCompressor (quality=1, matching Hadoop default) and BrotliDirectDecompressor using one-shot jbrotli API via reflection - Load native library eagerly with graceful fallback to Hadoop codec path - CompressionBenchmark: Switch from heap CodecFactory to DirectCodecFactory to benchmark the actual production code path Results at 64KB page size: - Compress: 6,746 -> 9,662 ops/s (1.43x speedup) - Decompress: 2,534 -> 2,786 ops/s (1.10x speedup) --- .../benchmarks/CompressionBenchmark.java | 8 +- .../parquet/hadoop/DirectCodecFactory.java | 136 ++++++++++++++++++ 2 files changed, 141 insertions(+), 3 deletions(-) diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java index adb8edd4c8..11e9fe6d6a 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -45,8 +46,9 @@ * *

Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor} * and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec, - * using the heap-based {@link CodecFactory} path. Input data is generated to approximate - * realistic Parquet page content (a mix of sequential, repeated, and random byte patterns). + * using the direct-memory {@link CodecFactory} path (same as actual Parquet file I/O). + * Input data is generated to approximate realistic Parquet page content (a mix of + * sequential, repeated, and random byte patterns). * *

This benchmark isolates the codec hot path from file I/O, encoding, and other * Parquet overhead, making it ideal for measuring compression-specific optimizations. @@ -79,7 +81,7 @@ public void setup() throws IOException { decompressedSize = uncompressedData.length; Configuration conf = new Configuration(); - factory = new CodecFactory(conf, pageSize); + factory = CodecFactory.createDirectCodecFactory(conf, DirectByteBufferAllocator.getInstance(), pageSize); CompressionCodecName codecName = CompressionCodecName.valueOf(codec); compressor = factory.getCompressor(codecName); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index d965609af1..0971ee71dd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -21,6 +21,7 @@ import com.github.luben.zstd.ZstdCompressCtx; import com.github.luben.zstd.ZstdDecompressCtx; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; @@ -61,6 +62,14 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { private static final Method DECOMPRESS_METHOD; private static final Method CREATE_DIRECT_DECOMPRESSOR_METHOD; + // Brotli JNI bypass via reflection (brotli-codec is a runtime-only dependency) + private static final boolean BROTLI_NATIVE_AVAILABLE; + private static final Method BROTLI_DECOMPRESS_METHOD; // BrotliDeCompressor.deCompress(ByteBuffer, ByteBuffer) + private static final Method BROTLI_COMPRESS_METHOD; // BrotliCompressor.compress(Parameter, ByteBuffer, ByteBuffer) + private static final Constructor BROTLI_DECOMPRESSOR_CTOR; // BrotliDeCompressor() + private static final Constructor BROTLI_COMPRESSOR_CTOR; // BrotliCompressor() + private static final Object BROTLI_COMPRESS_PARAMETER; // Brotli.Parameter instance (quality=1) + static { Class tempClass = null; Method tempCreateMethod = null; @@ -76,6 +85,46 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { DIRECT_DECOMPRESSION_CODEC_CLASS = tempClass; CREATE_DIRECT_DECOMPRESSOR_METHOD = tempCreateMethod; DECOMPRESS_METHOD = tempDecompressMethod; + + // Initialize Brotli JNI bypass via reflection + boolean brotliLoaded = false; + Method brotliDecompress = null; + Method brotliCompress = null; + Constructor brotliDecompressorCtor = null; + Constructor brotliCompressorCtor = null; + Object brotliParam = null; + try { + // Load native library + Class loaderClass = Class.forName("org.meteogroup.jbrotli.libloader.BrotliLibraryLoader"); + loaderClass.getMethod("loadBrotli").invoke(null); + + // BrotliDeCompressor: no-arg ctor + deCompress(ByteBuffer, ByteBuffer) -> int + Class decompClass = Class.forName("org.meteogroup.jbrotli.BrotliDeCompressor"); + brotliDecompressorCtor = decompClass.getConstructor(); + brotliDecompress = decompClass.getMethod("deCompress", ByteBuffer.class, ByteBuffer.class); + + // BrotliCompressor: no-arg ctor + compress(Parameter, ByteBuffer, ByteBuffer) -> int + Class compClass = Class.forName("org.meteogroup.jbrotli.BrotliCompressor"); + Class paramClass = Class.forName("org.meteogroup.jbrotli.Brotli$Parameter"); + Class modeClass = Class.forName("org.meteogroup.jbrotli.Brotli$Mode"); + brotliCompressorCtor = compClass.getConstructor(); + brotliCompress = compClass.getMethod("compress", paramClass, ByteBuffer.class, ByteBuffer.class); + + // Create Parameter(Mode.GENERIC, quality=1, lgwin=22, lgblock=0) + Object genericMode = modeClass.getField("GENERIC").get(null); + Constructor paramCtor = paramClass.getConstructor(modeClass, int.class, int.class, int.class); + brotliParam = paramCtor.newInstance(genericMode, 1, 22, 0); + + brotliLoaded = true; + } catch (Throwable t) { + LOG.debug("Brotli native library not available, falling back to Hadoop codec", t); + } + BROTLI_NATIVE_AVAILABLE = brotliLoaded; + BROTLI_DECOMPRESS_METHOD = brotliDecompress; + BROTLI_COMPRESS_METHOD = brotliCompress; + BROTLI_DECOMPRESSOR_CTOR = brotliDecompressorCtor; + BROTLI_COMPRESSOR_CTOR = brotliCompressorCtor; + BROTLI_COMPRESS_PARAMETER = brotliParam; } /** @@ -105,6 +154,11 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) return new ZstdCompressor(); case LZ4_RAW: return new Lz4RawCompressor(); + case BROTLI: + if (BROTLI_NATIVE_AVAILABLE) { + return new BrotliDirectCompressor(); + } + return super.createCompressor(codecName); default: return super.createCompressor(codecName); } @@ -119,6 +173,11 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN return new ZstdDecompressor(); case LZ4_RAW: return new Lz4RawDecompressor(); + case BROTLI: + if (BROTLI_NATIVE_AVAILABLE) { + return new BrotliDirectDecompressor(); + } + // fall through to default Hadoop codec path case GZIP: case UNCOMPRESSED: return super.createDecompressor(codecName); @@ -491,6 +550,83 @@ void closeCompressor() { } } + /** + * Direct-memory Brotli decompressor using jbrotli's native JNI bindings via reflection, + * bypassing the Hadoop BrotliCodec/stream wrapper overhead. + */ + private class BrotliDirectDecompressor extends BaseDecompressor { + private final Object decompressor; + + BrotliDirectDecompressor() { + try { + this.decompressor = BROTLI_DECOMPRESSOR_CTOR.newInstance(); + } catch (ReflectiveOperationException e) { + throw new DirectCodecPool.ParquetCompressionCodecException("Failed to create Brotli decompressor", e); + } + } + + @Override + int decompress(ByteBuffer input, ByteBuffer output) throws IOException { + try { + return (int) BROTLI_DECOMPRESS_METHOD.invoke(decompressor, input, output); + } catch (InvocationTargetException e) { + throw new IOException("Brotli decompression failed", e.getCause()); + } catch (IllegalAccessException e) { + throw new IOException("Brotli decompression failed", e); + } + } + + @Override + void closeDecompressor() { + // no-op: BrotliDeCompressor has no resources to release + } + } + + /** + * Direct-memory Brotli compressor using jbrotli's native JNI bindings via reflection, + * bypassing the Hadoop BrotliCodec/stream wrapper overhead. + * Uses quality=1 by default (fast compression, matching Hadoop's BrotliCompressor default). + */ + private class BrotliDirectCompressor extends BaseCompressor { + private final Object compressor; + + BrotliDirectCompressor() { + try { + this.compressor = BROTLI_COMPRESSOR_CTOR.newInstance(); + } catch (ReflectiveOperationException e) { + throw new DirectCodecPool.ParquetCompressionCodecException("Failed to create Brotli compressor", e); + } + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.BROTLI; + } + + @Override + int maxCompressedSize(int size) { + // Brotli worst case: input size + (input size >> 2) + 1K overhead for small inputs + // This is a conservative upper bound matching the Brotli spec + return size + (size >> 2) + 1024; + } + + @Override + int compress(ByteBuffer input, ByteBuffer output) throws IOException { + try { + return (int) BROTLI_COMPRESS_METHOD.invoke(compressor, BROTLI_COMPRESS_PARAMETER, input, output); + } catch (InvocationTargetException e) { + throw new IOException("Brotli compression failed", e.getCause()); + } catch (IllegalAccessException e) { + throw new IOException("Brotli compression failed", e); + } + } + + @Override + void closeCompressor() { + // no-op: BrotliCompressor has no resources to release + } + } + /** * @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead */