Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor} + * and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec, + * using the direct-memory {@link CodecFactory} path (same as actual Parquet file I/O). + * Input data is generated to approximate realistic Parquet page content (a mix of + * sequential, repeated, and random byte patterns). + * + *
This benchmark isolates the codec hot path from file I/O, encoding, and other + * Parquet overhead, making it ideal for measuring compression-specific optimizations. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 2, time = 1) +@Measurement(iterations = 3, time = 2) +@State(Scope.Thread) +public class CompressionBenchmark { + + @Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP", "BROTLI"}) + public String codec; + + @Param({"65536", "131072", "262144", "1048576"}) + public int pageSize; + + private byte[] uncompressedData; + private byte[] compressedData; + private int decompressedSize; + + private CompressionCodecFactory.BytesInputCompressor compressor; + private CompressionCodecFactory.BytesInputDecompressor decompressor; + private CodecFactory factory; + + @Setup(Level.Trial) + public void setup() throws IOException { + uncompressedData = generatePageData(pageSize, 42L); + decompressedSize = uncompressedData.length; + + Configuration conf = new Configuration(); + factory = CodecFactory.createDirectCodecFactory(conf, DirectByteBufferAllocator.getInstance(), pageSize); + CompressionCodecName codecName = CompressionCodecName.valueOf(codec); + + compressor = factory.getCompressor(codecName); + decompressor = factory.getDecompressor(codecName); + + // Pre-compress for decompression benchmark; copy to a stable byte array + // since the compressor may reuse its internal buffer. + BytesInput compressed = compressor.compress(BytesInput.from(uncompressedData)); + compressedData = compressed.toByteArray(); + } + + @TearDown(Level.Trial) + public void tearDown() { + factory.release(); + } + + @Benchmark + public BytesInput compress() throws IOException { + return compressor.compress(BytesInput.from(uncompressedData)); + } + + @Benchmark + public byte[] decompress() throws IOException { + // Force materialization of the decompressed data. Without this, codecs using + // the stream-based HeapBytesDecompressor (e.g. GZIP) would return a lazy + // StreamBytesInput, deferring the actual work. toByteArray() is essentially + // free for our optimized implementations (returns the existing byte[]). + return decompressor + .decompress(BytesInput.from(compressedData), decompressedSize) + .toByteArray(); + } + + /** + * Generates byte data that approximates realistic Parquet page content. + * Mixes sequential runs, repeated values, low-range random, and full random + * to produce a realistic compression ratio (~2-4x for fast codecs). + */ + static byte[] generatePageData(int size, long seed) { + Random random = new Random(seed); + byte[] data = new byte[size]; + int i = 0; + while (i < size) { + int patternType = random.nextInt(4); + int chunkSize = Math.min(random.nextInt(256) + 64, size - i); + switch (patternType) { + case 0: // Sequential bytes (highly compressible) + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = (byte) (j & 0xFF); + } + break; + case 1: // Repeated value (highly compressible) + byte val = (byte) random.nextInt(256); + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = val; + } + break; + case 2: // Small range random (moderately compressible) + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = (byte) random.nextInt(16); + } + break; + case 3: // Full random (low compressibility) + byte[] randomChunk = new byte[chunkSize]; + random.nextBytes(randomChunk); + int toCopy = Math.min(chunkSize, size - i); + System.arraycopy(randomChunk, 0, data, i, toCopy); + i += toCopy; + break; + } + } + return data; + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 0e66140744..722b7e892f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -643,6 +643,20 @@ public ByteBuffer toByteBuffer() throws IOException { return java.nio.ByteBuffer.wrap(in, offset, length); } + /** + * Zero-copy override: returns the backing array directly when fully used, + * skipping the base-class BAOS allocation + copy on every decompressor call. + * Returning the mutable array is safe — the base class already exposes a + * mutable {@code BAOS.getBuf()}. + */ + @Override + public byte[] toByteArray() { + if (offset == 0 && length == in.length) { + return in; + } + return Arrays.copyOfRange(in, offset, offset + length); + } + @Override public long size() { return length; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 98b49835a6..28a40ed3b3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -18,6 +18,14 @@ */ package org.apache.parquet.hadoop; +import com.github.luben.zstd.BufferPool; +import com.github.luben.zstd.NoPool; +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.compress.lz4.Lz4Decompressor; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -26,6 +34,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.zip.CRC32; +import java.util.zip.Deflater; +import java.util.zip.Inflater; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -43,6 +54,7 @@ import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ConfigurationUtil; +import org.xerial.snappy.Snappy; public class CodecFactory implements CompressionCodecFactory { @@ -271,13 +283,58 @@ public BytesDecompressor getDecompressor(CompressionCodecName codecName) { } protected BytesCompressor createCompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + switch (codecName) { + case UNCOMPRESSED: + return NO_OP_COMPRESSOR; + case SNAPPY: + return new SnappyBytesCompressor(); + case ZSTD: + BufferPool zstdCompressPool = conf.getBoolean( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) + ? RecyclingBufferPool.INSTANCE + : NoPool.INSTANCE; + return new ZstdBytesCompressor( + conf.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), + conf.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, + ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS), + pageSize, + zstdCompressPool); + case LZ4_RAW: + return new Lz4RawBytesCompressor(); + case GZIP: + int gzipLevel = conf.getInt("zlib.compress.level", Deflater.DEFAULT_COMPRESSION); + return new GzipBytesCompressor(gzipLevel, pageSize); + default: + CompressionCodec codec = getCodec(codecName); + return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + } } protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); + switch (codecName) { + case UNCOMPRESSED: + return NO_OP_DECOMPRESSOR; + case SNAPPY: + return new SnappyBytesDecompressor(); + case ZSTD: + BufferPool zstdDecompressPool = conf.getBoolean( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED) + ? RecyclingBufferPool.INSTANCE + : NoPool.INSTANCE; + return new ZstdBytesDecompressor(zstdDecompressPool); + case LZ4_RAW: + return new Lz4RawBytesDecompressor(); + case GZIP: + return new GzipBytesDecompressor(); + default: + CompressionCodec codec = getCodec(codecName); + return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); + } } /** @@ -315,15 +372,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) { private String cacheKey(CompressionCodecName codecName) { String level = null; switch (codecName) { - case GZIP: - level = conf.get("zlib.compress.level"); - break; case BROTLI: level = conf.get("compression.brotli.quality"); break; - case ZSTD: - level = conf.get("parquet.compression.codec.zstd.level"); - break; default: // compression level is not supported; ignore it } @@ -367,4 +418,447 @@ public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer public abstract void release(); } + + // ---- Optimized Snappy compressor/decompressor using direct JNI calls ---- + + /** + * Compresses using Snappy's byte-array JNI API directly, bypassing the Hadoop + * stream abstraction. This avoids intermediate direct ByteBuffer copies and + * reduces the compression to a single native call per page. + */ + static class SnappyBytesCompressor extends BytesCompressor { + private byte[] outputBuffer; + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + int maxLen = Snappy.maxCompressedLength(input.length); + if (outputBuffer == null || outputBuffer.length < maxLen) { + outputBuffer = new byte[maxLen]; + } + int compressed = Snappy.compress(input, 0, input.length, outputBuffer, 0); + return BytesInput.from(outputBuffer, 0, compressed); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.SNAPPY; + } + + @Override + public void release() { + outputBuffer = null; + } + } + + /** + * Decompresses using Snappy's byte-array JNI API directly. + */ + static class SnappyBytesDecompressor extends BytesDecompressor { + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + byte[] input = bytes.toByteArray(); + byte[] output = new byte[decompressedSize]; + Snappy.uncompress(input, 0, input.length, output, 0); + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + byte[] outputBytes = new byte[decompressedSize]; + Snappy.uncompress(inputBytes, 0, compressedSize, outputBytes, 0); + output.put(outputBytes); + } + + @Override + public void release() {} + } + + // ---- Optimized ZSTD compressor/decompressor using zstd-jni streaming API directly ---- + + /** + * Compresses using zstd-jni's {@link ZstdOutputStreamNoFinalizer} directly, + * bypassing the Hadoop codec framework ({@code ZstandardCodec}, {@code CodecPool}, + * {@code CompressionOutputStream} wrapper). Uses a configurable {@link BufferPool} + * (defaulting to {@link RecyclingBufferPool}) for the internal 128KB output buffer, + * matching the streaming API's natural buffer size. The buffer pool strategy is + * controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled} config. + * This avoids the overhead of Hadoop codec instantiation and compressor pool management + * while using the same underlying ZSTD streaming path, which is well-optimized for all + * input sizes including large pages (256KB+). + */ + static class ZstdBytesCompressor extends BytesCompressor { + private final int level; + private final int workers; + private final BufferPool bufferPool; + private final ByteArrayOutputStream compressedOutBuffer; + + ZstdBytesCompressor(int level, int workers, int pageSize, BufferPool bufferPool) { + this.level = level; + this.workers = workers; + this.bufferPool = bufferPool; + this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + compressedOutBuffer.reset(); + try (ZstdOutputStreamNoFinalizer zos = + new ZstdOutputStreamNoFinalizer(compressedOutBuffer, bufferPool, level)) { + if (workers > 0) { + zos.setWorkers(workers); + } + bytes.writeAllTo(zos); + } + return BytesInput.from(compressedOutBuffer); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.ZSTD; + } + + @Override + public void release() { + // ByteArrayOutputStream does not hold native resources + } + } + + /** + * Decompresses using zstd-jni's {@link ZstdInputStreamNoFinalizer} directly, + * bypassing the Hadoop codec framework. Uses a configurable {@link BufferPool} + * for internal buffers, matching the streaming decompression path. The buffer pool + * strategy is controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled} + * config. Reads the full decompressed output in a single pass via + * {@link InputStream#readNBytes(int)}. + */ + static class ZstdBytesDecompressor extends BytesDecompressor { + private final BufferPool bufferPool; + + ZstdBytesDecompressor(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + try (ZstdInputStreamNoFinalizer zis = new ZstdInputStreamNoFinalizer(bytes.toInputStream(), bufferPool)) { + byte[] output = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = zis.read(output, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of ZSTD stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + return BytesInput.from(output); + } + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + ByteArrayInputStream bais = new ByteArrayInputStream(inputBytes); + try (ZstdInputStreamNoFinalizer zis = new ZstdInputStreamNoFinalizer(bais, bufferPool)) { + byte[] outputBytes = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = zis.read(outputBytes, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of ZSTD stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + output.put(outputBytes); + } + } + + @Override + public void release() { + // No persistent resources - streams are closed per call + } + } + + // ---- Optimized LZ4_RAW compressor/decompressor using airlift LZ4 directly ---- + + /** + * Compresses using airlift's LZ4 compressor directly with heap ByteBuffers, + * bypassing the Hadoop stream abstraction and NonBlockedCompressor's direct + * buffer copies. + */ + static class Lz4RawBytesCompressor extends BytesCompressor { + private final Lz4Compressor compressor = new Lz4Compressor(); + private byte[] outputBuffer; + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + int maxLen = compressor.maxCompressedLength(input.length); + if (outputBuffer == null || outputBuffer.length < maxLen) { + outputBuffer = new byte[maxLen]; + } + ByteBuffer inputBuf = ByteBuffer.wrap(input); + ByteBuffer outputBuf = ByteBuffer.wrap(outputBuffer); + compressor.compress(inputBuf, outputBuf); + int compressedSize = outputBuf.position(); + return BytesInput.from(outputBuffer, 0, compressedSize); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.LZ4_RAW; + } + + @Override + public void release() { + outputBuffer = null; + } + } + + /** + * Decompresses using airlift's LZ4 decompressor directly with heap ByteBuffers. + */ + static class Lz4RawBytesDecompressor extends BytesDecompressor { + private final Lz4Decompressor decompressor = new Lz4Decompressor(); + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + byte[] input = bytes.toByteArray(); + byte[] output = new byte[decompressedSize]; + ByteBuffer inputBuf = ByteBuffer.wrap(input); + ByteBuffer outputBuf = ByteBuffer.wrap(output); + decompressor.decompress(inputBuf, outputBuf); + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + byte[] outputBytes = new byte[decompressedSize]; + ByteBuffer inputBuf = ByteBuffer.wrap(inputBytes); + ByteBuffer outputBuf = ByteBuffer.wrap(outputBytes); + decompressor.decompress(inputBuf, outputBuf); + output.put(outputBytes); + } + + @Override + public void release() {} + } + + // ---- Optimized GZIP compressor/decompressor using Deflater/Inflater directly ---- + + /** GZIP magic number: 0x1f 0x8b. */ + private static final int GZIP_MAGIC = 0x8b1f; + + /** Minimal 10-byte GZIP header: magic, method=8 (deflate), flags=0, mtime=0, xfl=0, os=0. */ + private static final byte[] GZIP_HEADER = { + 0x1f, + (byte) 0x8b, // magic + 0x08, // method: deflate + 0x00, // flags: none + 0x00, + 0x00, + 0x00, + 0x00, // mtime: not set + 0x00, // extra flags + 0x00 // OS: FAT (matches Java's GZIPOutputStream default) + }; + + /** + * Compresses using {@link Deflater} directly with a reusable instance, + * bypassing Hadoop's GzipCodec and the stream overhead of + * {@link java.util.zip.GZIPOutputStream}. The Deflater is kept across + * calls and reset via {@link Deflater#reset()}, avoiding native zlib + * state allocation per page. Writes a minimal GZIP header and trailer + * (CRC32 + original size) manually. + * + *
Note: this implementation always uses Java's built-in {@link Deflater} + * (java.util.zip / JDK zlib). It does not use Hadoop native libraries, + * so hardware-accelerated compression via Intel ISA-L will not be used even if + * the native libraries are installed. The overhead reduction from bypassing the + * Hadoop codec framework typically outweighs the ISA-L advantage for the page + * sizes used by Parquet. + */ + static class GzipBytesCompressor extends BytesCompressor { + private final Deflater deflater; + private final CRC32 crc = new CRC32(); + private final ByteArrayOutputStream baos; + + GzipBytesCompressor(int level, int pageSize) { + this.deflater = new Deflater(level, true); + this.baos = new ByteArrayOutputStream(pageSize); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + + deflater.reset(); + crc.reset(); + crc.update(input); + + baos.reset(); + // GZIP header + baos.write(GZIP_HEADER); + + // Deflate + deflater.setInput(input); + deflater.finish(); + byte[] buf = new byte[4096]; + while (!deflater.finished()) { + int n = deflater.deflate(buf); + baos.write(buf, 0, n); + } + + // GZIP trailer: CRC32 + original size (little-endian) + writeInt(baos, (int) crc.getValue()); + writeInt(baos, input.length); + + return BytesInput.from(baos); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.GZIP; + } + + @Override + public void release() { + deflater.end(); + } + } + + /** + * Decompresses using {@link Inflater} directly with a reusable instance, + * bypassing Hadoop's GzipCodec and the stream overhead of + * {@link java.util.zip.GZIPInputStream}. Skips the GZIP header, inflates + * into the output buffer, and verifies the CRC32 + size trailer. + * + *
Note: this implementation always uses Java's built-in {@link Inflater} + * (java.util.zip / JDK zlib). It does not use Hadoop native libraries, + * so hardware-accelerated decompression via Intel ISA-L will not be used even if + * the native libraries are installed. + */ + static class GzipBytesDecompressor extends BytesDecompressor { + private final Inflater inflater = new Inflater(true); + private final CRC32 crc = new CRC32(); + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + byte[] compressed = bytes.toByteArray(); + int headerLen = readGzipHeaderLength(compressed); + + inflater.reset(); + inflater.setInput(compressed, headerLen, compressed.length - headerLen - 8); + + byte[] output = new byte[decompressedSize]; + try { + int inflated = 0; + while (inflated < decompressedSize) { + int n = inflater.inflate(output, inflated, decompressedSize - inflated); + if (n == 0 && inflater.finished()) { + break; + } + if (n == 0 && inflater.needsInput()) { + throw new IOException( + "Unexpected end of GZIP stream at offset " + inflated + " of " + decompressedSize); + } + inflated += n; + } + } catch (java.util.zip.DataFormatException e) { + throw new IOException("Invalid GZIP data", e); + } + + // Verify CRC32 and original size from trailer + int trailerOffset = compressed.length - 8; + int expectedCrc = readInt(compressed, trailerOffset); + int expectedSize = readInt(compressed, trailerOffset + 4); + + crc.reset(); + crc.update(output); + if ((int) crc.getValue() != expectedCrc) { + throw new IOException("GZIP CRC32 mismatch"); + } + if (decompressedSize != (expectedSize & 0xFFFFFFFFL)) { + throw new IOException("GZIP size mismatch"); + } + + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + byte[] inputBytes = new byte[compressedSize]; + input.get(inputBytes); + BytesInput result = decompress(BytesInput.from(inputBytes), decompressedSize); + output.put(result.toByteArray()); + } + + @Override + public void release() { + inflater.end(); + } + } + + /** + * Reads the length of a GZIP header, handling optional extra, name, + * comment, and header CRC fields per RFC 1952. + */ + private static int readGzipHeaderLength(byte[] data) throws IOException { + if (data.length < 10 || (data[0] & 0xFF) != 0x1f || (data[1] & 0xFF) != 0x8b) { + throw new IOException("Not a GZIP stream"); + } + int flags = data[3] & 0xFF; + int offset = 10; + + if ((flags & 0x04) != 0) { // FEXTRA + if (offset + 2 > data.length) { + throw new IOException("Truncated GZIP FEXTRA"); + } + int extraLen = (data[offset] & 0xFF) | ((data[offset + 1] & 0xFF) << 8); + offset += 2 + extraLen; + } + if ((flags & 0x08) != 0) { // FNAME + while (offset < data.length && data[offset] != 0) { + offset++; + } + offset++; // skip null terminator + } + if ((flags & 0x10) != 0) { // FCOMMENT + while (offset < data.length && data[offset] != 0) { + offset++; + } + offset++; // skip null terminator + } + if ((flags & 0x02) != 0) { // FHCRC + offset += 2; + } + return offset; + } + + /** Writes a 32-bit integer in little-endian byte order. */ + private static void writeInt(ByteArrayOutputStream out, int value) { + out.write(value & 0xFF); + out.write((value >> 8) & 0xFF); + out.write((value >> 16) & 0xFF); + out.write((value >> 24) & 0xFF); + } + + /** Reads a 32-bit little-endian integer from a byte array. */ + private static int readInt(byte[] data, int offset) { + return (data[offset] & 0xFF) + | ((data[offset + 1] & 0xFF) << 8) + | ((data[offset + 2] & 0xFF) << 16) + | ((data[offset + 3] & 0xFF) << 24); + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index b2b5233eeb..0971ee71dd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -21,6 +21,7 @@ import com.github.luben.zstd.ZstdCompressCtx; import com.github.luben.zstd.ZstdDecompressCtx; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; @@ -61,6 +62,14 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { private static final Method DECOMPRESS_METHOD; private static final Method CREATE_DIRECT_DECOMPRESSOR_METHOD; + // Brotli JNI bypass via reflection (brotli-codec is a runtime-only dependency) + private static final boolean BROTLI_NATIVE_AVAILABLE; + private static final Method BROTLI_DECOMPRESS_METHOD; // BrotliDeCompressor.deCompress(ByteBuffer, ByteBuffer) + private static final Method BROTLI_COMPRESS_METHOD; // BrotliCompressor.compress(Parameter, ByteBuffer, ByteBuffer) + private static final Constructor> BROTLI_DECOMPRESSOR_CTOR; // BrotliDeCompressor() + private static final Constructor> BROTLI_COMPRESSOR_CTOR; // BrotliCompressor() + private static final Object BROTLI_COMPRESS_PARAMETER; // Brotli.Parameter instance (quality=1) + static { Class> tempClass = null; Method tempCreateMethod = null; @@ -76,6 +85,46 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { DIRECT_DECOMPRESSION_CODEC_CLASS = tempClass; CREATE_DIRECT_DECOMPRESSOR_METHOD = tempCreateMethod; DECOMPRESS_METHOD = tempDecompressMethod; + + // Initialize Brotli JNI bypass via reflection + boolean brotliLoaded = false; + Method brotliDecompress = null; + Method brotliCompress = null; + Constructor> brotliDecompressorCtor = null; + Constructor> brotliCompressorCtor = null; + Object brotliParam = null; + try { + // Load native library + Class> loaderClass = Class.forName("org.meteogroup.jbrotli.libloader.BrotliLibraryLoader"); + loaderClass.getMethod("loadBrotli").invoke(null); + + // BrotliDeCompressor: no-arg ctor + deCompress(ByteBuffer, ByteBuffer) -> int + Class> decompClass = Class.forName("org.meteogroup.jbrotli.BrotliDeCompressor"); + brotliDecompressorCtor = decompClass.getConstructor(); + brotliDecompress = decompClass.getMethod("deCompress", ByteBuffer.class, ByteBuffer.class); + + // BrotliCompressor: no-arg ctor + compress(Parameter, ByteBuffer, ByteBuffer) -> int + Class> compClass = Class.forName("org.meteogroup.jbrotli.BrotliCompressor"); + Class> paramClass = Class.forName("org.meteogroup.jbrotli.Brotli$Parameter"); + Class> modeClass = Class.forName("org.meteogroup.jbrotli.Brotli$Mode"); + brotliCompressorCtor = compClass.getConstructor(); + brotliCompress = compClass.getMethod("compress", paramClass, ByteBuffer.class, ByteBuffer.class); + + // Create Parameter(Mode.GENERIC, quality=1, lgwin=22, lgblock=0) + Object genericMode = modeClass.getField("GENERIC").get(null); + Constructor> paramCtor = paramClass.getConstructor(modeClass, int.class, int.class, int.class); + brotliParam = paramCtor.newInstance(genericMode, 1, 22, 0); + + brotliLoaded = true; + } catch (Throwable t) { + LOG.debug("Brotli native library not available, falling back to Hadoop codec", t); + } + BROTLI_NATIVE_AVAILABLE = brotliLoaded; + BROTLI_DECOMPRESS_METHOD = brotliDecompress; + BROTLI_COMPRESS_METHOD = brotliCompress; + BROTLI_DECOMPRESSOR_CTOR = brotliDecompressorCtor; + BROTLI_COMPRESSOR_CTOR = brotliCompressorCtor; + BROTLI_COMPRESS_PARAMETER = brotliParam; } /** @@ -103,8 +152,13 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) return new SnappyCompressor(); case ZSTD: return new ZstdCompressor(); - // todo: create class similar to the SnappyCompressor for zlib and exclude it as - // snappy is above since it also generates allocateDirect calls. + case LZ4_RAW: + return new Lz4RawCompressor(); + case BROTLI: + if (BROTLI_NATIVE_AVAILABLE) { + return new BrotliDirectCompressor(); + } + return super.createCompressor(codecName); default: return super.createCompressor(codecName); } @@ -117,6 +171,16 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN return new SnappyDecompressor(); case ZSTD: return new ZstdDecompressor(); + case LZ4_RAW: + return new Lz4RawDecompressor(); + case BROTLI: + if (BROTLI_NATIVE_AVAILABLE) { + return new BrotliDirectDecompressor(); + } + // fall through to default Hadoop codec path + case GZIP: + case UNCOMPRESSED: + return super.createDecompressor(codecName); default: CompressionCodec codec = getCodec(codecName); if (codec == null) { @@ -405,6 +469,26 @@ void closeDecompressor() { } } + /** + * Direct-memory LZ4_RAW decompressor using airlift's LZ4 decompressor with + * direct ByteBuffers, avoiding reflection-based {@link FullDirectDecompressor}. + */ + private class Lz4RawDecompressor extends BaseDecompressor { + private final io.airlift.compress.lz4.Lz4Decompressor decompressor = + new io.airlift.compress.lz4.Lz4Decompressor(); + + @Override + int decompress(ByteBuffer input, ByteBuffer output) { + decompressor.decompress(input, output); + return output.position(); + } + + @Override + void closeDecompressor() { + // no-op + } + } + private class ZstdCompressor extends BaseCompressor { private final ZstdCompressCtx context; @@ -437,6 +521,112 @@ void closeCompressor() { } } + /** + * Direct-memory LZ4_RAW compressor using airlift's LZ4 compressor with + * direct ByteBuffers, avoiding the stream-based heap path. + */ + private class Lz4RawCompressor extends BaseCompressor { + private final io.airlift.compress.lz4.Lz4Compressor compressor = new io.airlift.compress.lz4.Lz4Compressor(); + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.LZ4_RAW; + } + + @Override + int maxCompressedSize(int size) { + return compressor.maxCompressedLength(size); + } + + @Override + int compress(ByteBuffer input, ByteBuffer output) { + compressor.compress(input, output); + return output.position(); + } + + @Override + void closeCompressor() { + // no-op + } + } + + /** + * Direct-memory Brotli decompressor using jbrotli's native JNI bindings via reflection, + * bypassing the Hadoop BrotliCodec/stream wrapper overhead. + */ + private class BrotliDirectDecompressor extends BaseDecompressor { + private final Object decompressor; + + BrotliDirectDecompressor() { + try { + this.decompressor = BROTLI_DECOMPRESSOR_CTOR.newInstance(); + } catch (ReflectiveOperationException e) { + throw new DirectCodecPool.ParquetCompressionCodecException("Failed to create Brotli decompressor", e); + } + } + + @Override + int decompress(ByteBuffer input, ByteBuffer output) throws IOException { + try { + return (int) BROTLI_DECOMPRESS_METHOD.invoke(decompressor, input, output); + } catch (InvocationTargetException e) { + throw new IOException("Brotli decompression failed", e.getCause()); + } catch (IllegalAccessException e) { + throw new IOException("Brotli decompression failed", e); + } + } + + @Override + void closeDecompressor() { + // no-op: BrotliDeCompressor has no resources to release + } + } + + /** + * Direct-memory Brotli compressor using jbrotli's native JNI bindings via reflection, + * bypassing the Hadoop BrotliCodec/stream wrapper overhead. + * Uses quality=1 by default (fast compression, matching Hadoop's BrotliCompressor default). + */ + private class BrotliDirectCompressor extends BaseCompressor { + private final Object compressor; + + BrotliDirectCompressor() { + try { + this.compressor = BROTLI_COMPRESSOR_CTOR.newInstance(); + } catch (ReflectiveOperationException e) { + throw new DirectCodecPool.ParquetCompressionCodecException("Failed to create Brotli compressor", e); + } + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.BROTLI; + } + + @Override + int maxCompressedSize(int size) { + // Brotli worst case: input size + (input size >> 2) + 1K overhead for small inputs + // This is a conservative upper bound matching the Brotli spec + return size + (size >> 2) + 1024; + } + + @Override + int compress(ByteBuffer input, ByteBuffer output) throws IOException { + try { + return (int) BROTLI_COMPRESS_METHOD.invoke(compressor, BROTLI_COMPRESS_PARAMETER, input, output); + } catch (InvocationTargetException e) { + throw new IOException("Brotli compression failed", e.getCause()); + } catch (IllegalAccessException e) { + throw new IOException("Brotli compression failed", e); + } + } + + @Override + void closeCompressor() { + // no-op: BrotliCompressor has no resources to release + } + } + /** * @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index c78ee09ecc..d1c0e29568 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -28,7 +28,6 @@ import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; @@ -81,11 +80,10 @@ private void test(int size, CompressionCodecName codec, boolean useOnHeapCompres final BytesInputDecompressor heapDecompressor = heapCodecFactory.getDecompressor(codec); if (codec == LZ4_RAW) { - // Hadoop codecs support direct decompressors only if the related native libraries are available. - // This is not the case for our CI so let's rely on LZ4_RAW where the implementation is our own. - Assert.assertTrue( + // LZ4_RAW should use a direct decompression path, not the heap-copy IndirectDecompressor. + Assert.assertFalse( String.format("The hadoop codec %s should support direct decompression", codec), - directDecompressor instanceof DirectCodecFactory.FullDirectDecompressor); + directDecompressor instanceof DirectCodecFactory.IndirectDecompressor); } final BytesInput directCompressed; @@ -236,53 +234,59 @@ public void compressionCodecs() { } } - static class PublicCodecFactory extends CodecFactory { - // To make getCodec public + @Test + public void compressionLevelGzip() throws IOException { + Configuration config_zlib_1 = new Configuration(); + config_zlib_1.set("zlib.compress.level", "1"); - public PublicCodecFactory(Configuration configuration, int pageSize) { - super(configuration, pageSize); - } + Configuration config_zlib_9 = new Configuration(); + config_zlib_9.set("zlib.compress.level", "9"); - public org.apache.hadoop.io.compress.CompressionCodec getCodec(CompressionCodecName name) { - return super.getCodec(name); - } - } + // Generate compressible data so different levels produce different sizes + byte[] data = new byte[64 * 1024]; + new Random(42).nextBytes(data); - @Test - public void cachingKeysGzip() { - Configuration config_zlib_2 = new Configuration(); - config_zlib_2.set("zlib.compress.level", "2"); + final CodecFactory codecFactory_1 = new CodecFactory(config_zlib_1, pageSize); + final CodecFactory codecFactory_9 = new CodecFactory(config_zlib_9, pageSize); - Configuration config_zlib_5 = new Configuration(); - config_zlib_5.set("zlib.compress.level", "5"); + BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.GZIP); + BytesInputCompressor compressor_9 = codecFactory_9.getCompressor(CompressionCodecName.GZIP); - final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2, pageSize); - final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5, pageSize); + long size_1 = compressor_1.compress(BytesInput.from(data)).size(); + long size_9 = compressor_9.compress(BytesInput.from(data)).size(); - CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.GZIP); - CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.GZIP); - CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.GZIP); + // Level 9 should produce smaller (or equal) output than level 1 + Assert.assertTrue("Expected level 9 (" + size_9 + ") <= level 1 (" + size_1 + ")", size_9 <= size_1); - Assert.assertEquals(codec_2_1, codec_2_2); - Assert.assertNotEquals(codec_2_1, codec_5_1); + codecFactory_1.release(); + codecFactory_9.release(); } @Test - public void cachingKeysZstd() { - Configuration config_zstd_2 = new Configuration(); - config_zstd_2.set("parquet.compression.codec.zstd.level", "2"); + public void compressionLevelZstd() throws IOException { + Configuration config_zstd_1 = new Configuration(); + config_zstd_1.set("parquet.compression.codec.zstd.level", "1"); + + Configuration config_zstd_19 = new Configuration(); + config_zstd_19.set("parquet.compression.codec.zstd.level", "19"); + + // Generate compressible data so different levels produce different sizes + byte[] data = new byte[64 * 1024]; + new Random(42).nextBytes(data); + + final CodecFactory codecFactory_1 = new CodecFactory(config_zstd_1, pageSize); + final CodecFactory codecFactory_19 = new CodecFactory(config_zstd_19, pageSize); - Configuration config_zstd_5 = new Configuration(); - config_zstd_5.set("parquet.compression.codec.zstd.level", "5"); + BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.ZSTD); + BytesInputCompressor compressor_19 = codecFactory_19.getCompressor(CompressionCodecName.ZSTD); - final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2, pageSize); - final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5, pageSize); + long size_1 = compressor_1.compress(BytesInput.from(data)).size(); + long size_19 = compressor_19.compress(BytesInput.from(data)).size(); - CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.ZSTD); - CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.ZSTD); - CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.ZSTD); + // Level 19 should produce smaller (or equal) output than level 1 + Assert.assertTrue("Expected level 19 (" + size_19 + ") <= level 1 (" + size_1 + ")", size_19 <= size_1); - Assert.assertEquals(codec_2_1, codec_2_2); - Assert.assertNotEquals(codec_2_1, codec_5_1); + codecFactory_1.release(); + codecFactory_19.release(); } }