The writer's data bytes (the RLE-encoded indices) and the dictionary + * page are returned separately so both pieces can be measured or fed to a + * decoder symmetrically. The dictionary page buffer is copied so it remains + * valid after the writer's allocator is released. + * + *
The writer is closed via {@code toDictPageAndClose()}; callers must not + * call {@link DictionaryValuesWriter#close()} again afterwards. + */ + static EncodedDictionary drainDictionary(DictionaryValuesWriter writer) throws IOException { + byte[] dictData = writer.getBytes().toByteArray(); + DictionaryPage rawPage = writer.toDictPageAndClose(); + DictionaryPage dictPage = rawPage == null ? null : rawPage.copy(); + return new EncodedDictionary(dictData, dictPage); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java new file mode 100644 index 0000000000..690ddc2bbe --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +/** + * A no-op {@link OutputFile} that discards all written data. + * Useful for isolating CPU/encoding cost from filesystem I/O in write benchmarks. + */ +public final class BlackHoleOutputFile implements OutputFile { + + public static final BlackHoleOutputFile INSTANCE = new BlackHoleOutputFile(); + + private BlackHoleOutputFile() {} + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return -1L; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return new PositionOutputStream() { + private long pos; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void write(int b) throws IOException { + ++pos; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + pos += len; + } + }; + } + + @Override + public String getPath() { + return "/dev/null"; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java new file mode 100644 index 0000000000..de133f4607 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * File-level read benchmarks measuring end-to-end Parquet read throughput through the + * example {@link Group} API. A temporary file is generated once during setup from + * pre-generated rows using {@link LocalOutputFile}, then read repeatedly during the + * benchmark. + * + *
Parameterized across compression codec and writer version. The footer parse + * (via {@link LocalInputFile} open) is included in the timed section so the result + * reflects the full open-and-read cost a typical caller would observe. + * + *
{@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary. Ten measurement iterations
+ * provide stable statistics for SS mode.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 5, batchSize = 1)
+@Measurement(iterations = 10, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class FileReadBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ private File tempFile;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ tempFile = File.createTempFile("parquet-read-bench-", ".parquet");
+ tempFile.deleteOnExit();
+ tempFile.delete(); // remove so the writer can create it
+
+ Group[] rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ try (ParquetWriter Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU and encoding cost
+ * from filesystem I/O. Parameterized across compression codec, writer version, and
+ * dictionary encoding.
+ *
+ * {@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full write of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary. Ten measurement iterations
+ * provide stable statistics for SS mode.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 5, batchSize = 1)
+@Measurement(iterations = 10, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class FileWriteBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ @Param({"true", "false"})
+ public String dictionary;
+
+ private Group[] rows;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ }
+
+ @Benchmark
+ public void writeFile() throws IOException {
+ try (ParquetWriter Each benchmark invocation processes {@value #VALUE_COUNT} values; throughput is
+ * reported per-value via {@link OperationsPerInvocation}.
+ *
+ * The {@code fixedLength} parameter exercises key FLBA sizes:
+ * The {@code dataPattern} parameter controls cardinality:
+ * Per-invocation overhead (decoder construction and {@link ByteBufferInputStream}
+ * wrapping) is amortized over {@value #VALUE_COUNT} reads via
+ * {@link OperationsPerInvocation}.
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Thread)
+public class RleDictionaryIndexDecodingBenchmark {
+
+ static final int VALUE_COUNT = 100_000;
+ private static final int INIT_SLAB_SIZE = 64 * 1024;
+ private static final int PAGE_SIZE = 1024 * 1024;
+ private static final int BIT_WIDTH = 10;
+ private static final int MAX_ID = 1 << BIT_WIDTH;
+
+ static {
+ if (TestDataFactory.LOW_CARDINALITY_DISTINCT > MAX_ID) {
+ throw new IllegalStateException("LOW_CARDINALITY_DISTINCT (" + TestDataFactory.LOW_CARDINALITY_DISTINCT
+ + ") must fit within BIT_WIDTH=" + BIT_WIDTH + " (MAX_ID=" + MAX_ID + ")");
+ }
+ }
+
+ @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY"})
+ public String indexPattern;
+
+ private byte[] encoded;
+
+ // encoded with 4-byte LE length prefix, as expected by ValuesReader.initFromPage()
+ private byte[] encodedWithLengthPrefix;
+
+ private int[] ids;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ ids = generateDictionaryIds();
+ try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(
+ BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) {
+ for (int id : ids) {
+ encoder.writeInt(id);
+ }
+ encoded = encoder.toBytes().toByteArray();
+ }
+
+ // Prepend 4-byte LE length for ValuesReader.initFromPage() format
+ encodedWithLengthPrefix = new byte[4 + encoded.length];
+ ByteBuffer.wrap(encodedWithLengthPrefix).order(ByteOrder.LITTLE_ENDIAN).putInt(encoded.length);
+ System.arraycopy(encoded, 0, encodedWithLengthPrefix, 4, encoded.length);
+ }
+
+ private int[] generateDictionaryIds() {
+ switch (indexPattern) {
+ case "SEQUENTIAL":
+ int[] sequential = new int[VALUE_COUNT];
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ sequential[i] = i % MAX_ID;
+ }
+ return sequential;
+ case "RANDOM":
+ return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, MAX_ID, TestDataFactory.DEFAULT_SEED);
+ case "LOW_CARDINALITY":
+ return TestDataFactory.generateLowCardinalityInts(
+ VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, TestDataFactory.DEFAULT_SEED);
+ default:
+ throw new IllegalArgumentException("Unknown index pattern: " + indexPattern);
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodeDictionaryIds() throws IOException {
+ try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(
+ BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) {
+ for (int id : ids) {
+ encoder.writeInt(id);
+ }
+ return encoder.toBytes().toByteArray();
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeDictionaryIds(Blackhole bh) throws IOException {
+ RunLengthBitPackingHybridDecoder decoder =
+ new RunLengthBitPackingHybridDecoder(BIT_WIDTH, new ByteArrayInputStream(encoded));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(decoder.readInt());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public int[] decodeDictionaryIdsBatch() throws IOException {
+ RunLengthBitPackingHybridDecoder decoder =
+ new RunLengthBitPackingHybridDecoder(BIT_WIDTH, new ByteArrayInputStream(encoded));
+ int[] result = new int[VALUE_COUNT];
+ decoder.readInts(result, 0, VALUE_COUNT);
+ return result;
+ }
+
+ // ---- ValuesReader-level benchmarks ----
+ // These go through the RunLengthBitPackingHybridValuesReader wrapper,
+ // which is the path used by ColumnReader in production.
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeValuesReader(Blackhole bh) throws IOException {
+ RunLengthBitPackingHybridValuesReader reader = new RunLengthBitPackingHybridValuesReader(BIT_WIDTH);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readInteger());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public int[] decodeValuesReaderBatch() throws IOException {
+ RunLengthBitPackingHybridValuesReader reader = new RunLengthBitPackingHybridValuesReader(BIT_WIDTH);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix)));
+ int[] result = new int[VALUE_COUNT];
+ reader.readIntegers(result, 0, VALUE_COUNT);
+ return result;
+ }
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java
new file mode 100644
index 0000000000..9bc5cab0a8
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmark measuring row group flush performance and peak buffer memory.
+ *
+ * Uses a wide schema (20 BINARY columns, 200 bytes each) to produce
+ * substantial per-column page buffers. A {@link PeakTrackingAllocator}
+ * wraps the heap allocator to precisely track the peak bytes outstanding
+ * across all parquet-managed ByteBuffers (independent of JVM GC behavior).
+ *
+ * The key metric is {@code peakAllocatorMB}: with the interleaved flush
+ * optimization, each column's pages are finalized, written, and released
+ * before the next column is processed, so peak buffer memory is roughly
+ * 1/N of the total row group size (N = number of columns).
+ *
+ * Writes to {@link BlackHoleOutputFile} to isolate flush cost from
+ * filesystem I/O.
+ */
+@BenchmarkMode({Mode.AverageTime})
+@Fork(
+ value = 1,
+ jvmArgs = {"-Xms512m", "-Xmx1g"})
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Thread)
+public class RowGroupFlushBenchmark {
+
+ private static final int COLUMN_COUNT = 20;
+ private static final int BINARY_VALUE_LENGTH = 200;
+ private static final int ROW_COUNT = 100_000;
+
+ /** Row group sizes: 8MB and 64MB. */
+ @Param({"8388608", "67108864"})
+ public int rowGroupSize;
+
+ /** Wide schema: 20 required BINARY columns. */
+ private static final MessageType WIDE_SCHEMA;
+
+ static {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+ for (int c = 0; c < COLUMN_COUNT; c++) {
+ builder.required(PrimitiveTypeName.BINARY).named("col_" + c);
+ }
+ WIDE_SCHEMA = builder.named("wide_record");
+ }
+
+ /** Pre-generated column values (one unique value per column). */
+ private Binary[] columnValues;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ Random random = new Random(42);
+ columnValues = new Binary[COLUMN_COUNT];
+ for (int c = 0; c < COLUMN_COUNT; c++) {
+ byte[] value = new byte[BINARY_VALUE_LENGTH];
+ random.nextBytes(value);
+ columnValues[c] = Binary.fromConstantByteArray(value);
+ }
+ }
+
+ /**
+ * Auxiliary counters reported alongside timing. JMH collects these after
+ * each iteration.
+ */
+ @AuxCounters(AuxCounters.Type.EVENTS)
+ @State(Scope.Thread)
+ public static class MemoryCounters {
+ /** Peak bytes outstanding in the parquet ByteBufferAllocator. */
+ public long peakAllocatorBytes;
+
+ /** Convenience: peak in MB (peakAllocatorBytes / 1048576). */
+ public double peakAllocatorMB;
+
+ @Setup(Level.Iteration)
+ public void reset() {
+ peakAllocatorBytes = 0;
+ peakAllocatorMB = 0;
+ }
+ }
+
+ /**
+ * ByteBufferAllocator wrapper that tracks current and peak allocated bytes.
+ * Thread-safe (uses AtomicLong) although the write path is single-threaded.
+ */
+ static class PeakTrackingAllocator implements ByteBufferAllocator {
+ private final ByteBufferAllocator delegate = new HeapByteBufferAllocator();
+ private final AtomicLong currentBytes = new AtomicLong();
+ private final AtomicLong peakBytes = new AtomicLong();
+
+ @Override
+ public ByteBuffer allocate(int size) {
+ ByteBuffer buf = delegate.allocate(size);
+ long current = currentBytes.addAndGet(buf.capacity());
+ peakBytes.accumulateAndGet(current, Math::max);
+ return buf;
+ }
+
+ @Override
+ public void release(ByteBuffer buf) {
+ currentBytes.addAndGet(-buf.capacity());
+ delegate.release(buf);
+ }
+
+ @Override
+ public boolean isDirect() {
+ return delegate.isDirect();
+ }
+
+ long getPeakBytes() {
+ return peakBytes.get();
+ }
+ }
+
+ @Benchmark
+ public void writeWithFlush(MemoryCounters counters) throws IOException {
+ PeakTrackingAllocator allocator = new PeakTrackingAllocator();
+ SimpleGroupFactory factory = new SimpleGroupFactory(WIDE_SCHEMA);
+
+ try (ParquetWriter Note: prefer {@link #generateRandomInts(int, long)} when call ordering between
+ * generators in the same setup must not influence the produced data.
+ */
+ public static int[] generateRandomInts(int count, Random random) {
+ int[] data = new int[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextInt();
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality integers (values drawn from a small set) using the given seed.
+ */
+ public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) {
+ return generateLowCardinalityInts(count, distinctValues, new Random(seed));
+ }
+
+ /**
+ * Generates low-cardinality integers (values drawn from a small set).
+ */
+ public static int[] generateLowCardinalityInts(int count, int distinctValues, Random random) {
+ int[] data = new int[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextInt(distinctValues);
+ }
+ return data;
+ }
+
+ /**
+ * Generates high-cardinality integers (all unique in randomized order) using the given seed.
+ */
+ public static int[] generateHighCardinalityInts(int count, long seed) {
+ return generateHighCardinalityInts(count, new Random(seed));
+ }
+
+ /**
+ * Generates high-cardinality integers (all unique in randomized order).
+ */
+ public static int[] generateHighCardinalityInts(int count, Random random) {
+ int[] data = generateSequentialInts(count);
+ for (int i = count - 1; i > 0; i--) {
+ int swapIndex = random.nextInt(i + 1);
+ int tmp = data[i];
+ data[i] = data[swapIndex];
+ data[swapIndex] = tmp;
+ }
+ return data;
+ }
+
+ // ---- Long data generation for encoding benchmarks ----
+
+ /**
+ * Generates sequential longs: 0, 1, 2, ...
+ */
+ public static long[] generateSequentialLongs(int count) {
+ long[] data = new long[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = i;
+ }
+ return data;
+ }
+
+ /**
+ * Generates uniformly random longs using the given seed.
+ */
+ public static long[] generateRandomLongs(int count, long seed) {
+ Random random = new Random(seed);
+ long[] data = new long[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextLong();
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality longs (values drawn from a small set).
+ */
+ public static long[] generateLowCardinalityLongs(int count, int distinctValues, long seed) {
+ Random random = new Random(seed);
+ long[] palette = new long[distinctValues];
+ for (int i = 0; i < distinctValues; i++) {
+ palette[i] = random.nextLong();
+ }
+ long[] data = new long[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinctValues)];
+ }
+ return data;
+ }
+
+ /**
+ * Generates high-cardinality longs (all unique, shuffled).
+ */
+ public static long[] generateHighCardinalityLongs(int count, long seed) {
+ Random random = new Random(seed);
+ long[] data = generateSequentialLongs(count);
+ for (int i = count - 1; i > 0; i--) {
+ int swapIndex = random.nextInt(i + 1);
+ long tmp = data[i];
+ data[i] = data[swapIndex];
+ data[swapIndex] = tmp;
+ }
+ return data;
+ }
+
+ // ---- Float data generation for encoding benchmarks ----
+
+ /**
+ * Generates uniformly random floats using the given seed.
+ */
+ public static float[] generateRandomFloats(int count, long seed) {
+ Random random = new Random(seed);
+ float[] data = new float[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextFloat() * 1000.0f;
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality floats (values drawn from a small set).
+ */
+ public static float[] generateLowCardinalityFloats(int count, int distinctValues, long seed) {
+ Random random = new Random(seed);
+ float[] palette = new float[distinctValues];
+ for (int i = 0; i < distinctValues; i++) {
+ palette[i] = random.nextFloat() * 1000.0f;
+ }
+ float[] data = new float[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinctValues)];
+ }
+ return data;
+ }
+
+ // ---- Double data generation for encoding benchmarks ----
+
+ /**
+ * Generates uniformly random doubles using the given seed.
+ */
+ public static double[] generateRandomDoubles(int count, long seed) {
+ Random random = new Random(seed);
+ double[] data = new double[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextDouble() * 1000.0;
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality doubles (values drawn from a small set).
+ */
+ public static double[] generateLowCardinalityDoubles(int count, int distinctValues, long seed) {
+ Random random = new Random(seed);
+ double[] palette = new double[distinctValues];
+ for (int i = 0; i < distinctValues; i++) {
+ palette[i] = random.nextDouble() * 1000.0;
+ }
+ double[] data = new double[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinctValues)];
+ }
+ return data;
+ }
+
+ // ---- Fixed-length byte array data generation for encoding benchmarks ----
+
+ /**
+ * Generates fixed-length byte arrays with the specified cardinality.
+ *
+ * @param count number of values
+ * @param length byte length of each value
+ * @param distinct number of distinct values (0 means all unique)
+ * @param seed RNG seed
+ */
+ public static Binary[] generateFixedLenByteArrays(int count, int length, int distinct, long seed) {
+ Random random = new Random(seed);
+ if (distinct > 0) {
+ Binary[] palette = new Binary[distinct];
+ for (int i = 0; i < distinct; i++) {
+ byte[] bytes = new byte[length];
+ random.nextBytes(bytes);
+ palette[i] = Binary.fromConstantByteArray(bytes);
+ }
+ Binary[] data = new Binary[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinct)];
+ }
+ return data;
+ } else {
+ Binary[] data = new Binary[count];
+ for (int i = 0; i < count; i++) {
+ byte[] bytes = new byte[length];
+ random.nextBytes(bytes);
+ data[i] = Binary.fromConstantByteArray(bytes);
+ }
+ return data;
+ }
+ }
+
+ // ---- Binary data generation for encoding benchmarks ----
+
+ /**
+ * Generates binary strings of the given length with the specified cardinality, using
+ * a deterministic seed.
+ */
+ public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) {
+ return generateBinaryData(count, stringLength, distinct, new Random(seed));
+ }
+
+ /**
+ * Generates binary strings of the given length with the specified cardinality.
+ *
+ * @param count number of values
+ * @param stringLength length of each string
+ * @param distinct number of distinct values (0 means all unique)
+ * @param random random source
+ * @return array of Binary values
+ */
+ public static Binary[] generateBinaryData(int count, int stringLength, int distinct, Random random) {
+ Binary[] data = new Binary[count];
+ if (distinct > 0) {
+ // Pre-generate the distinct values
+ Binary[] dictionary = new Binary[distinct];
+ for (int i = 0; i < distinct; i++) {
+ dictionary[i] = Binary.fromConstantByteArray(
+ randomString(stringLength, random).getBytes(StandardCharsets.UTF_8));
+ }
+ for (int i = 0; i < count; i++) {
+ data[i] = dictionary[random.nextInt(distinct)];
+ }
+ } else {
+ // All unique
+ for (int i = 0; i < count; i++) {
+ data[i] = Binary.fromConstantByteArray(
+ randomString(stringLength, random).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ return data;
+ }
+
+ private static String randomString(int length, Random random) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index 1713acc012..114936d153 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -185,6 +185,88 @@ public long readLong() {
throw new UnsupportedOperationException();
}
+ // ---- Batch read methods ----
+ // Default implementations loop over the per-value methods.
+ // Subclasses should override with bulk/memcpy-style implementations.
+
+ /**
+ * Reads {@code count} integers into {@code dest} starting at {@code offset}.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readIntegers(int[] dest, int offset, int count) {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = readInteger();
+ }
+ }
+
+ /**
+ * Reads {@code count} longs into {@code dest} starting at {@code offset}.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readLongs(long[] dest, int offset, int count) {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = readLong();
+ }
+ }
+
+ /**
+ * Reads {@code count} floats into {@code dest} starting at {@code offset}.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readFloats(float[] dest, int offset, int count) {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = readFloat();
+ }
+ }
+
+ /**
+ * Reads {@code count} doubles into {@code dest} starting at {@code offset}.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readDoubles(double[] dest, int offset, int count) {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = readDouble();
+ }
+ }
+
+ /**
+ * Reads {@code count} booleans into {@code dest} starting at {@code offset}.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readBooleans(boolean[] dest, int offset, int count) {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = readBoolean();
+ }
+ }
+
+ /**
+ * Reads {@code count} Binary values into {@code dest} starting at {@code offset}.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readBinaries(Binary[] dest, int offset, int count) {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = readBytes();
+ }
+ }
+
/**
* Skips the next value in the page
*/
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
index ecea4a7520..bbe9230397 100755
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
@@ -98,6 +98,19 @@ public void writeBoolean(boolean v) {
throw new UnsupportedOperationException(getClass().getName());
}
+ /**
+ * Writes a batch of boolean values. Subclasses may override for optimized bulk encoding.
+ *
+ * @param values the boolean array to read from
+ * @param offset the start position in the array
+ * @param length the number of values to write
+ */
+ public void writeBooleans(boolean[] values, int offset, int length) {
+ for (int i = offset; i < offset + length; i++) {
+ writeBoolean(values[i]);
+ }
+ }
+
/**
* @param v the value to encode
*/
@@ -105,6 +118,19 @@ public void writeBytes(Binary v) {
throw new UnsupportedOperationException(getClass().getName());
}
+ /**
+ * Writes a batch of Binary values. Subclasses may override for optimized bulk encoding.
+ *
+ * @param values the Binary array to read from
+ * @param offset the start position in the array
+ * @param length the number of values to write
+ */
+ public void writeBinaries(Binary[] values, int offset, int length) {
+ for (int i = offset; i < offset + length; i++) {
+ writeBytes(values[i]);
+ }
+ }
+
/**
* @param v the value to encode
*/
@@ -112,6 +138,19 @@ public void writeInteger(int v) {
throw new UnsupportedOperationException(getClass().getName());
}
+ /**
+ * Writes a batch of int values. Subclasses may override for optimized bulk encoding.
+ *
+ * @param values the int array to read from
+ * @param offset the start position in the array
+ * @param length the number of values to write
+ */
+ public void writeIntegers(int[] values, int offset, int length) {
+ for (int i = offset; i < offset + length; i++) {
+ writeInteger(values[i]);
+ }
+ }
+
/**
* @param v the value to encode
*/
@@ -119,6 +158,19 @@ public void writeLong(long v) {
throw new UnsupportedOperationException(getClass().getName());
}
+ /**
+ * Writes a batch of long values. Subclasses may override for optimized bulk encoding.
+ *
+ * @param values the long array to read from
+ * @param offset the start position in the array
+ * @param length the number of values to write
+ */
+ public void writeLongs(long[] values, int offset, int length) {
+ for (int i = offset; i < offset + length; i++) {
+ writeLong(values[i]);
+ }
+ }
+
/**
* @param v the value to encode
*/
@@ -126,6 +178,19 @@ public void writeDouble(double v) {
throw new UnsupportedOperationException(getClass().getName());
}
+ /**
+ * Writes a batch of double values. Subclasses may override for optimized bulk encoding.
+ *
+ * @param values the double array to read from
+ * @param offset the start position in the array
+ * @param length the number of values to write
+ */
+ public void writeDoubles(double[] values, int offset, int length) {
+ for (int i = offset; i < offset + length; i++) {
+ writeDouble(values[i]);
+ }
+ }
+
/**
* @param v the value to encode
*/
@@ -133,5 +198,18 @@ public void writeFloat(float v) {
throw new UnsupportedOperationException(getClass().getName());
}
+ /**
+ * Writes a batch of float values. Subclasses may override for optimized bulk encoding.
+ *
+ * @param values the float array to read from
+ * @param offset the start position in the array
+ * @param length the number of values to write
+ */
+ public void writeFloats(float[] values, int offset, int length) {
+ for (int i = offset; i < offset + length; i++) {
+ writeFloat(values[i]);
+ }
+ }
+
public abstract String memUsageString(String prefix);
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java
index c8ab3043bd..8edda45cd5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java
@@ -49,17 +49,151 @@ protected int nextElementByteOffset() {
return offset;
}
- // Decode an entire data page
+ /**
+ * Advances the stream position by {@code count} elements and returns the byte offset
+ * of the first element. Used by batch read methods in subclasses.
+ */
+ protected int advanceByteOffset(int count) {
+ if (indexInStream + count > valuesCount) {
+ throw new ParquetDecodingException("Byte-stream data was already exhausted.");
+ }
+ int offset = indexInStream * elementSizeInBytes;
+ indexInStream += count;
+ return offset;
+ }
+
+ // Decode an entire data page by transposing from stream-split layout to interleaved layout.
private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
- assert encoded.limit() == valuesCount * elementSizeInBytes;
- byte[] decoded = new byte[encoded.limit()];
- int destByteIndex = 0;
- for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) {
- for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) {
- decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount);
+ int totalBytes = valuesCount * elementSizeInBytes;
+ assert encoded.remaining() >= totalBytes;
+
+ // Bulk access: use the backing array directly if available, otherwise copy once.
+ byte[] src;
+ int srcBase;
+ if (encoded.hasArray()) {
+ src = encoded.array();
+ srcBase = encoded.arrayOffset() + encoded.position();
+ } else {
+ src = new byte[totalBytes];
+ encoded.get(src);
+ srcBase = 0;
+ }
+
+ byte[] decoded = new byte[totalBytes];
+
+ // Specialized single-pass loops for common element sizes.
+ if (elementSizeInBytes == 2) {
+ int s0 = srcBase, s1 = srcBase + valuesCount;
+ for (int i = 0; i < valuesCount; ++i) {
+ int di = i * 2;
+ decoded[di] = src[s0 + i];
+ decoded[di + 1] = src[s1 + i];
+ }
+ } else if (elementSizeInBytes == 4) {
+ int s0 = srcBase,
+ s1 = srcBase + valuesCount,
+ s2 = srcBase + 2 * valuesCount,
+ s3 = srcBase + 3 * valuesCount;
+ for (int i = 0; i < valuesCount; ++i) {
+ int di = i * 4;
+ decoded[di] = src[s0 + i];
+ decoded[di + 1] = src[s1 + i];
+ decoded[di + 2] = src[s2 + i];
+ decoded[di + 3] = src[s3 + i];
+ }
+ } else if (elementSizeInBytes == 8) {
+ int s0 = srcBase,
+ s1 = srcBase + valuesCount,
+ s2 = srcBase + 2 * valuesCount,
+ s3 = srcBase + 3 * valuesCount,
+ s4 = srcBase + 4 * valuesCount,
+ s5 = srcBase + 5 * valuesCount,
+ s6 = srcBase + 6 * valuesCount,
+ s7 = srcBase + 7 * valuesCount;
+ for (int i = 0; i < valuesCount; ++i) {
+ int di = i * 8;
+ decoded[di] = src[s0 + i];
+ decoded[di + 1] = src[s1 + i];
+ decoded[di + 2] = src[s2 + i];
+ decoded[di + 3] = src[s3 + i];
+ decoded[di + 4] = src[s4 + i];
+ decoded[di + 5] = src[s5 + i];
+ decoded[di + 6] = src[s6 + i];
+ decoded[di + 7] = src[s7 + i];
+ }
+ } else if (elementSizeInBytes == 12) {
+ int s0 = srcBase,
+ s1 = srcBase + valuesCount,
+ s2 = srcBase + 2 * valuesCount,
+ s3 = srcBase + 3 * valuesCount,
+ s4 = srcBase + 4 * valuesCount,
+ s5 = srcBase + 5 * valuesCount,
+ s6 = srcBase + 6 * valuesCount,
+ s7 = srcBase + 7 * valuesCount,
+ s8 = srcBase + 8 * valuesCount,
+ s9 = srcBase + 9 * valuesCount,
+ s10 = srcBase + 10 * valuesCount,
+ s11 = srcBase + 11 * valuesCount;
+ for (int i = 0; i < valuesCount; ++i) {
+ int di = i * 12;
+ decoded[di] = src[s0 + i];
+ decoded[di + 1] = src[s1 + i];
+ decoded[di + 2] = src[s2 + i];
+ decoded[di + 3] = src[s3 + i];
+ decoded[di + 4] = src[s4 + i];
+ decoded[di + 5] = src[s5 + i];
+ decoded[di + 6] = src[s6 + i];
+ decoded[di + 7] = src[s7 + i];
+ decoded[di + 8] = src[s8 + i];
+ decoded[di + 9] = src[s9 + i];
+ decoded[di + 10] = src[s10 + i];
+ decoded[di + 11] = src[s11 + i];
+ }
+ } else if (elementSizeInBytes == 16) {
+ int s0 = srcBase,
+ s1 = srcBase + valuesCount,
+ s2 = srcBase + 2 * valuesCount,
+ s3 = srcBase + 3 * valuesCount,
+ s4 = srcBase + 4 * valuesCount,
+ s5 = srcBase + 5 * valuesCount,
+ s6 = srcBase + 6 * valuesCount,
+ s7 = srcBase + 7 * valuesCount,
+ s8 = srcBase + 8 * valuesCount,
+ s9 = srcBase + 9 * valuesCount,
+ s10 = srcBase + 10 * valuesCount,
+ s11 = srcBase + 11 * valuesCount,
+ s12 = srcBase + 12 * valuesCount,
+ s13 = srcBase + 13 * valuesCount,
+ s14 = srcBase + 14 * valuesCount,
+ s15 = srcBase + 15 * valuesCount;
+ for (int i = 0; i < valuesCount; ++i) {
+ int di = i * 16;
+ decoded[di] = src[s0 + i];
+ decoded[di + 1] = src[s1 + i];
+ decoded[di + 2] = src[s2 + i];
+ decoded[di + 3] = src[s3 + i];
+ decoded[di + 4] = src[s4 + i];
+ decoded[di + 5] = src[s5 + i];
+ decoded[di + 6] = src[s6 + i];
+ decoded[di + 7] = src[s7 + i];
+ decoded[di + 8] = src[s8 + i];
+ decoded[di + 9] = src[s9 + i];
+ decoded[di + 10] = src[s10 + i];
+ decoded[di + 11] = src[s11 + i];
+ decoded[di + 12] = src[s12 + i];
+ decoded[di + 13] = src[s13 + i];
+ decoded[di + 14] = src[s14 + i];
+ decoded[di + 15] = src[s15 + i];
+ }
+ } else {
+ // Generic fallback for arbitrary element sizes
+ for (int stream = 0; stream < elementSizeInBytes; ++stream) {
+ int srcOffset = srcBase + stream * valuesCount;
+ for (int i = 0; i < valuesCount; ++i) {
+ decoded[i * elementSizeInBytes + stream] = src[srcOffset + i];
+ }
}
}
- assert destByteIndex == decoded.length;
return decoded;
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java
index e725dc9fce..0917cd3902 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java
@@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForDouble() {
public double readDouble() {
return decodedDataBuffer.getDouble(nextElementByteOffset());
}
+
+ @Override
+ public void readDoubles(double[] dest, int offset, int count) {
+ int byteOffset = advanceByteOffset(count);
+ decodedDataBuffer.position(byteOffset);
+ decodedDataBuffer.asDoubleBuffer().get(dest, offset, count);
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java
index d8613dd8b9..b026a7d76e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java
@@ -30,4 +30,17 @@ public ByteStreamSplitValuesReaderForFLBA(int length) {
public Binary readBytes() {
return Binary.fromConstantByteBuffer(decodedDataBuffer, nextElementByteOffset(), elementSizeInBytes);
}
+
+ /**
+ * Batch read: advances the stream by {@code count} elements in a single bounds check,
+ * then creates Binary views at sequential offsets — eliminating per-value bounds checking.
+ */
+ @Override
+ public void readBinaries(Binary[] dest, int offset, int count) {
+ int byteOffset = advanceByteOffset(count);
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = Binary.fromConstantByteBuffer(decodedDataBuffer, byteOffset, elementSizeInBytes);
+ byteOffset += elementSizeInBytes;
+ }
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
index cecb7925d8..bb28ef0ac2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java
@@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForFloat() {
public float readFloat() {
return decodedDataBuffer.getFloat(nextElementByteOffset());
}
+
+ @Override
+ public void readFloats(float[] dest, int offset, int count) {
+ int byteOffset = advanceByteOffset(count);
+ decodedDataBuffer.position(byteOffset);
+ decodedDataBuffer.asFloatBuffer().get(dest, offset, count);
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java
index 57f9bfdf03..e71079d2f6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java
@@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForInteger() {
public int readInteger() {
return decodedDataBuffer.getInt(nextElementByteOffset());
}
+
+ @Override
+ public void readIntegers(int[] dest, int offset, int count) {
+ int byteOffset = advanceByteOffset(count);
+ decodedDataBuffer.position(byteOffset);
+ decodedDataBuffer.asIntBuffer().get(dest, offset, count);
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java
index c7711d8919..f73c46e972 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java
@@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForLong() {
public long readLong() {
return decodedDataBuffer.getLong(nextElementByteOffset());
}
+
+ @Override
+ public void readLongs(long[] dest, int offset, int count) {
+ int byteOffset = advanceByteOffset(count);
+ decodedDataBuffer.position(byteOffset);
+ decodedDataBuffer.asLongBuffer().get(dest, offset, count);
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java
index c197a4fd6f..e62126ed4d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java
@@ -29,9 +29,15 @@
public abstract class ByteStreamSplitValuesWriter extends ValuesWriter {
+ /**
+ * Batch size for buffered scatter writes. Values are accumulated in a batch buffer
+ * and flushed as bulk {@code write(byte[], off, len)} calls to each stream.
+ */
+ private static final int BATCH_SIZE = 64;
+
protected final int numStreams;
protected final int elementSizeInBytes;
- private final CapacityByteArrayOutputStream[] byteStreams;
+ protected final CapacityByteArrayOutputStream[] byteStreams;
public ByteStreamSplitValuesWriter(
int elementSizeInBytes, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
@@ -176,6 +182,8 @@ public String memUsageString(String prefix) {
public static class FixedLenByteArrayByteStreamSplitValuesWriter extends ByteStreamSplitValuesWriter {
private final int length;
+ private byte[][] batchBufs; // [stream][batchIndex] scratch buffers
+ private int flbaBatchCount;
public FixedLenByteArrayByteStreamSplitValuesWriter(
int length, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
@@ -187,7 +195,69 @@ public FixedLenByteArrayByteStreamSplitValuesWriter(
public final void writeBytes(Binary v) {
assert (v.length() == length)
: ("Fixed Binary size " + v.length() + " does not match field type length " + length);
- super.scatterBytes(v.getBytesUnsafe());
+ if (batchBufs == null) {
+ batchBufs = new byte[length][BATCH_SIZE];
+ }
+ byte[] bytes = v.getBytesUnsafe();
+ for (int stream = 0; stream < length; stream++) {
+ batchBufs[stream][flbaBatchCount] = bytes[stream];
+ }
+ flbaBatchCount++;
+ if (flbaBatchCount == BATCH_SIZE) {
+ flushFlbaBatch();
+ }
+ }
+
+ @Override
+ public void writeBinaries(Binary[] values, int offset, int len) {
+ if (batchBufs == null) {
+ batchBufs = new byte[length][BATCH_SIZE];
+ }
+ for (int i = offset; i < offset + len; i++) {
+ Binary v = values[i];
+ assert (v.length() == length)
+ : ("Fixed Binary size " + v.length() + " does not match field type length " + length);
+ byte[] bytes = v.getBytesUnsafe();
+ for (int stream = 0; stream < length; stream++) {
+ batchBufs[stream][flbaBatchCount] = bytes[stream];
+ }
+ flbaBatchCount++;
+ if (flbaBatchCount == BATCH_SIZE) {
+ flushFlbaBatch();
+ }
+ }
+ }
+
+ private void flushFlbaBatch() {
+ if (flbaBatchCount == 0) return;
+ final int count = flbaBatchCount;
+ for (int stream = 0; stream < length; stream++) {
+ byteStreams[stream].write(batchBufs[stream], 0, count);
+ }
+ flbaBatchCount = 0;
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ flushFlbaBatch();
+ return super.getBytes();
+ }
+
+ @Override
+ public void reset() {
+ flbaBatchCount = 0;
+ super.reset();
+ }
+
+ @Override
+ public void close() {
+ flbaBatchCount = 0;
+ super.close();
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return super.getBufferedSize() + (long) flbaBatchCount * length;
}
@Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index 259ebc09c0..6726614460 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -112,6 +112,22 @@ public long readLong() {
return valuesBuffer[valuesRead++];
}
+ @Override
+ public void readIntegers(int[] dest, int offset, int count) {
+ checkRead();
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = (int) valuesBuffer[valuesRead + i];
+ }
+ valuesRead += count;
+ }
+
+ @Override
+ public void readLongs(long[] dest, int offset, int count) {
+ checkRead();
+ System.arraycopy(valuesBuffer, valuesRead, dest, offset, count);
+ valuesRead += count;
+ }
+
private void checkRead() {
if (valuesRead >= totalValueCount) {
throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
index 53fafc55dc..db344c3e63 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -117,6 +117,59 @@ public long readLong() {
}
}
+ @Override
+ public void readIntegers(int[] dest, int offset, int count) {
+ try {
+ // Batch-decode dictionary IDs, then batch-lookup
+ int[] ids = new int[count];
+ decoder.readInts(ids, 0, count);
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = dictionary.decodeToInt(ids[i]);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public void readLongs(long[] dest, int offset, int count) {
+ try {
+ int[] ids = new int[count];
+ decoder.readInts(ids, 0, count);
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = dictionary.decodeToLong(ids[i]);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public void readFloats(float[] dest, int offset, int count) {
+ try {
+ int[] ids = new int[count];
+ decoder.readInts(ids, 0, count);
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = dictionary.decodeToFloat(ids[i]);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public void readDoubles(double[] dest, int offset, int count) {
+ try {
+ int[] ids = new int[count];
+ decoder.readInts(ids, 0, count);
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = dictionary.decodeToDouble(ids[i]);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
@Override
public void skip() {
try {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
index 22ca2d567c..3843e3b6f0 100755
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -18,57 +18,122 @@
*/
package org.apache.parquet.column.values.plain;
-import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
-
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * encodes boolean for the plain encoding: one bit at a time (0 = false)
+ * Decodes PLAIN-encoded booleans: one bit per value, packed 8 per byte, little-endian
+ * bit order (bit 0 of each byte is the first value).
+ *
+ * Direct bit extraction from the page ByteBuffer avoids the overhead of the generic
+ * bit-packing machinery ({@code ByteBitPackingValuesReader}) and intermediate
+ * {@code int[8]} buffers.
+ *
+ * The batch path uses a static 256-entry lookup table that maps each byte value to
+ * its 8 pre-decoded booleans. This enables {@code System.arraycopy} of 8 booleans per
+ * byte (a single 64-bit memory operation in HotSpot) instead of 8 individual
+ * comparison+store operations.
*/
public class BooleanPlainValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(BooleanPlainValuesReader.class);
- private ByteBitPackingValuesReader in = new ByteBitPackingValuesReader(1, LITTLE_ENDIAN);
-
/**
- * {@inheritDoc}
- *
- * @see org.apache.parquet.column.values.ValuesReader#readBoolean()
+ * Lookup table: BYTE_TO_BOOLS[b] contains the 8 boolean values for byte value b,
+ * in little-endian bit order (bit 0 = index 0).
*/
+ private static final boolean[][] BYTE_TO_BOOLS = new boolean[256][8];
+
+ static {
+ for (int b = 0; b < 256; b++) {
+ for (int bit = 0; bit < 8; bit++) {
+ BYTE_TO_BOOLS[b][bit] = ((b >>> bit) & 1) != 0;
+ }
+ }
+ }
+
+ private byte[] pageData;
+ private int pageOffset;
+ private int bitIndex;
+
+ @Override
+ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+ LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
+ int effectiveBitLength = valueCount; // bitWidth = 1
+ int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
+ length = Math.min(length, stream.available());
+ ByteBuffer buf = stream.slice(length);
+
+ // Bulk access: use backing array directly if available, otherwise copy once.
+ if (buf.hasArray()) {
+ pageData = buf.array();
+ pageOffset = buf.arrayOffset() + buf.position();
+ } else {
+ pageData = new byte[length];
+ buf.get(pageData);
+ pageOffset = 0;
+ }
+ bitIndex = 0;
+ updateNextOffset(length);
+ }
+
@Override
public boolean readBoolean() {
- return in.readInteger() == 0 ? false : true;
+ int byteIdx = pageOffset + (bitIndex >>> 3);
+ int bitPos = bitIndex & 7;
+ bitIndex++;
+ return ((pageData[byteIdx] >>> bitPos) & 1) != 0;
}
- /**
- * {@inheritDoc}
- *
- * @see org.apache.parquet.column.values.ValuesReader#skip()
- */
@Override
- public void skip() {
- in.readInteger();
+ public void readBooleans(boolean[] dest, int offset, int count) {
+ int i = 0;
+
+ // Handle partial byte at current position
+ int bitPos = bitIndex & 7;
+ if (bitPos != 0) {
+ int byteIdx = pageOffset + (bitIndex >>> 3);
+ byte b = pageData[byteIdx];
+ while (bitPos < 8 && i < count) {
+ dest[offset + i] = ((b >>> bitPos) & 1) != 0;
+ bitPos++;
+ i++;
+ }
+ }
+
+ // Process full bytes: 8 booleans per byte via lookup table + arraycopy
+ int byteIdx = pageOffset + ((bitIndex + i) >>> 3);
+ while (i + 8 <= count) {
+ System.arraycopy(BYTE_TO_BOOLS[pageData[byteIdx] & 0xFF], 0, dest, offset + i, 8);
+ byteIdx++;
+ i += 8;
+ }
+
+ // Handle remaining bits in the last partial byte
+ if (i < count) {
+ byte b = pageData[byteIdx];
+ int bp = 0;
+ while (i < count) {
+ dest[offset + i] = ((b >>> bp) & 1) != 0;
+ bp++;
+ i++;
+ }
+ }
+
+ bitIndex += count;
}
- /**
- * {@inheritDoc}
- *
- * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
- */
@Override
- public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
- LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
- this.in.initFromPage(valueCount, stream);
+ public void skip() {
+ bitIndex++;
}
- @Deprecated
@Override
- public int getNextOffset() {
- return in.getNextOffset();
+ public void skip(int n) {
+ bitIndex += n;
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
index 7f80ec150a..ae3b43c63b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
@@ -19,52 +19,118 @@
package org.apache.parquet.column.values.plain;
import static org.apache.parquet.column.Encoding.PLAIN;
-import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesWriter;
/**
- * An implementation of the PLAIN encoding
+ * An implementation of the PLAIN encoding for BOOLEAN values.
+ *
+ * Packs booleans directly into bytes (8 per byte, LSB first) without
+ * going through the generic int-based bit-packing encoder.
*/
public class BooleanPlainValuesWriter extends ValuesWriter {
- private ByteBitPackingValuesWriter bitPackingWriter;
+ private static final int INITIAL_SLAB_SIZE = 1024;
+ private static final int MAX_CAPACITY = 64 * 1024;
+
+ private final CapacityByteArrayOutputStream baos;
+ private int currentByte;
+ private int bitsWritten;
public BooleanPlainValuesWriter() {
- bitPackingWriter = new ByteBitPackingValuesWriter(1, LITTLE_ENDIAN);
+ this.baos = new CapacityByteArrayOutputStream(INITIAL_SLAB_SIZE, MAX_CAPACITY);
+ this.currentByte = 0;
+ this.bitsWritten = 0;
}
@Override
public final void writeBoolean(boolean v) {
- bitPackingWriter.writeInteger(v ? 1 : 0);
+ currentByte |= ((v ? 1 : 0) << bitsWritten);
+ bitsWritten++;
+ if (bitsWritten == 8) {
+ baos.write(currentByte);
+ currentByte = 0;
+ bitsWritten = 0;
+ }
+ }
+
+ @Override
+ public void writeBooleans(boolean[] values, int offset, int length) {
+ int pos = offset;
+ int end = offset + length;
+
+ // Fill current partial byte
+ while (bitsWritten > 0 && bitsWritten < 8 && pos < end) {
+ if (values[pos]) {
+ currentByte |= (1 << bitsWritten);
+ }
+ bitsWritten++;
+ pos++;
+ if (bitsWritten == 8) {
+ baos.write(currentByte);
+ currentByte = 0;
+ bitsWritten = 0;
+ }
+ }
+
+ // Process 8 values at a time — pack directly into a byte
+ while (pos + 8 <= end) {
+ int b = 0;
+ if (values[pos]) b |= 0x01;
+ if (values[pos + 1]) b |= 0x02;
+ if (values[pos + 2]) b |= 0x04;
+ if (values[pos + 3]) b |= 0x08;
+ if (values[pos + 4]) b |= 0x10;
+ if (values[pos + 5]) b |= 0x20;
+ if (values[pos + 6]) b |= 0x40;
+ if (values[pos + 7]) b |= 0x80;
+ baos.write(b);
+ pos += 8;
+ }
+
+ // Handle remaining values (< 8)
+ while (pos < end) {
+ if (values[pos]) {
+ currentByte |= (1 << bitsWritten);
+ }
+ bitsWritten++;
+ pos++;
+ }
}
@Override
public long getBufferedSize() {
- return bitPackingWriter.getBufferedSize();
+ return baos.size() + (bitsWritten > 0 ? 1 : 0);
}
@Override
public BytesInput getBytes() {
- return bitPackingWriter.getBytes();
+ if (bitsWritten > 0) {
+ baos.write(currentByte);
+ currentByte = 0;
+ bitsWritten = 0;
+ }
+ return BytesInput.from(baos);
}
@Override
public void reset() {
- bitPackingWriter.reset();
+ baos.reset();
+ currentByte = 0;
+ bitsWritten = 0;
}
@Override
public void close() {
- bitPackingWriter.close();
+ baos.close();
}
@Override
public long getAllocatedSize() {
- return bitPackingWriter.getAllocatedSize();
+ return baos.getCapacity();
}
@Override
@@ -74,6 +140,6 @@ public Encoding getEncoding() {
@Override
public String memUsageString(String prefix) {
- return bitPackingWriter.memUsageString(prefix);
+ return String.format("%s BooleanPlainValuesWriter %d bytes", prefix, getAllocatedSize());
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index adfc488924..6200ae4477 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.values.plain;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
@@ -62,6 +63,25 @@ public void skip(int n) {
}
}
+ /**
+ * Batch read: slices the entire block of {@code count * length} bytes in one call,
+ * then creates Binary views at fixed offsets within the single ByteBuffer — eliminating
+ * per-value slice overhead.
+ */
+ @Override
+ public void readBinaries(Binary[] dest, int offset, int count) {
+ try {
+ int totalBytes = count * length;
+ ByteBuffer block = in.slice(totalBytes);
+ int baseOffset = block.position();
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = Binary.fromConstantByteBuffer(block, baseOffset + i * length, length);
+ }
+ } catch (IOException | RuntimeException e) {
+ throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
+ }
+ }
+
@Override
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
index dec4d1be1b..9d8c7e464b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
@@ -62,6 +62,42 @@ public final void writeBytes(Binary v) {
}
}
+ /**
+ * Batch write: copies Binary values into a temporary buffer and writes them in a single
+ * bulk {@code write()} call to the output stream, amortizing stream overhead across
+ * the entire batch.
+ */
+ @Override
+ public void writeBinaries(Binary[] values, int offset, int length) {
+ final int fixedLen = this.length;
+ // Process in chunks to avoid excessive temp allocation
+ final int CHUNK = 1024;
+ byte[] buf = new byte[Math.min(length, CHUNK) * fixedLen];
+ try {
+ int remaining = length;
+ int srcIdx = offset;
+ while (remaining > 0) {
+ int batch = Math.min(remaining, CHUNK);
+ int bufPos = 0;
+ for (int i = 0; i < batch; i++) {
+ Binary v = values[srcIdx++];
+ if (v.length() != fixedLen) {
+ throw new IllegalArgumentException(
+ "Fixed Binary size " + v.length() + " does not match field type length " + fixedLen);
+ }
+ // Copy bytes from the Binary's backing store into the batch buffer
+ byte[] bytes = v.getBytesUnsafe();
+ System.arraycopy(bytes, 0, buf, bufPos, fixedLen);
+ bufPos += fixedLen;
+ }
+ arrayOut.write(buf, 0, bufPos);
+ remaining -= batch;
+ }
+ } catch (RuntimeException e) {
+ throw new ParquetEncodingException("could not write fixed bytes", e);
+ }
+ }
+
@Override
public long getBufferedSize() {
return arrayOut.size();
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index a0c7af7394..a3d0d06923 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -71,6 +71,17 @@ public double readDouble() {
throw new ParquetDecodingException("could not read double", e);
}
}
+
+ @Override
+ public void readDoubles(double[] dest, int offset, int count) {
+ try {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = in.readDouble();
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read doubles", e);
+ }
+ }
}
public static class FloatPlainValuesReader extends PlainValuesReader {
@@ -92,6 +103,17 @@ public float readFloat() {
throw new ParquetDecodingException("could not read float", e);
}
}
+
+ @Override
+ public void readFloats(float[] dest, int offset, int count) {
+ try {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = in.readFloat();
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read floats", e);
+ }
+ }
}
public static class IntegerPlainValuesReader extends PlainValuesReader {
@@ -113,6 +135,17 @@ public int readInteger() {
throw new ParquetDecodingException("could not read int", e);
}
}
+
+ @Override
+ public void readIntegers(int[] dest, int offset, int count) {
+ try {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = in.readInt();
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read ints", e);
+ }
+ }
}
public static class LongPlainValuesReader extends PlainValuesReader {
@@ -134,5 +167,16 @@ public long readLong() {
throw new ParquetDecodingException("could not read long", e);
}
}
+
+ @Override
+ public void readLongs(long[] dest, int offset, int count) {
+ try {
+ for (int i = 0; i < count; i++) {
+ dest[offset + i] = in.readLong();
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read longs", e);
+ }
+ }
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
index c7069bc092..0802f46d2a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
@@ -94,6 +94,26 @@ public final void writeDouble(double v) {
}
}
+ @Override
+ public final void writeIntegers(int[] values, int offset, int length) {
+ arrayOut.writeInts(values, offset, length);
+ }
+
+ @Override
+ public final void writeLongs(long[] values, int offset, int length) {
+ arrayOut.writeLongs(values, offset, length);
+ }
+
+ @Override
+ public final void writeFloats(float[] values, int offset, int length) {
+ arrayOut.writeFloats(values, offset, length);
+ }
+
+ @Override
+ public final void writeDoubles(double[] values, int offset, int length) {
+ arrayOut.writeDoubles(values, offset, length);
+ }
+
@Override
public void writeByte(int value) {
try {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index e55b276b29..f2dd50d623 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -48,6 +48,8 @@ private static enum MODE {
private int currentCount;
private int currentValue;
private int[] currentBuffer;
+ // Saved packed bytes for bitWidth=1 boolean optimization (lookup table decode)
+ private byte[] packedBytesBuffer;
public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) {
LOG.debug("decoding bitWidth {}", bitWidth);
@@ -77,6 +79,121 @@ public int readInt() throws IOException {
return result;
}
+ /**
+ * Reads {@code count} int values into {@code dest} starting at {@code offset}.
+ * This avoids per-value virtual dispatch overhead by batching across RLE runs
+ * and packed groups.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readInts(int[] dest, int offset, int count) throws IOException {
+ int remaining = count;
+ int pos = offset;
+ while (remaining > 0) {
+ if (currentCount == 0) {
+ readNext();
+ }
+ int batchSize = Math.min(remaining, currentCount);
+ switch (mode) {
+ case RLE:
+ java.util.Arrays.fill(dest, pos, pos + batchSize, currentValue);
+ break;
+ case PACKED:
+ int startIdx = currentBuffer.length - currentCount;
+ System.arraycopy(currentBuffer, startIdx, dest, pos, batchSize);
+ break;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + mode);
+ }
+ currentCount -= batchSize;
+ remaining -= batchSize;
+ pos += batchSize;
+ }
+ }
+
+ /**
+ * Lookup table for bitWidth=1: maps each byte to its 8 boolean values.
+ * Used by {@link #readBooleans} PACKED path to bypass the int[] intermediate.
+ */
+ private static final boolean[][] BYTE_TO_BOOLS = new boolean[256][8];
+
+ static {
+ for (int b = 0; b < 256; b++) {
+ for (int bit = 0; bit < 8; bit++) {
+ BYTE_TO_BOOLS[b][bit] = ((b >>> bit) & 1) != 0;
+ }
+ }
+ }
+
+ /**
+ * Reads {@code count} boolean values into {@code dest} starting at {@code offset}.
+ * For RLE runs, uses {@code Arrays.fill} with a single boolean value.
+ * For packed groups, uses a lookup table to decode 8 booleans per byte directly
+ * from the raw packed bytes, bypassing the int[] intermediate buffer.
+ *
+ * @param dest destination array
+ * @param offset start index in dest
+ * @param count number of values to read
+ */
+ public void readBooleans(boolean[] dest, int offset, int count) throws IOException {
+ int remaining = count;
+ int pos = offset;
+ while (remaining > 0) {
+ if (currentCount == 0) {
+ readNext();
+ }
+ int batchSize = Math.min(remaining, currentCount);
+ switch (mode) {
+ case RLE:
+ java.util.Arrays.fill(dest, pos, pos + batchSize, currentValue != 0);
+ break;
+ case PACKED:
+ // For bitWidth=1, read directly from packedBytesBuffer via lookup table
+ int bitOff = currentBuffer.length - currentCount;
+ int written = 0;
+
+ // Handle partial byte alignment
+ int bitPos = bitOff & 7;
+ if (bitPos != 0) {
+ int byteIdx = bitOff >>> 3;
+ byte b = packedBytesBuffer[byteIdx];
+ while (bitPos < 8 && written < batchSize) {
+ dest[pos + written] = ((b >>> bitPos) & 1) != 0;
+ bitPos++;
+ written++;
+ }
+ }
+
+ // Process full bytes via lookup table
+ int byteIdx = (bitOff + written) >>> 3;
+ while (written + 8 <= batchSize) {
+ System.arraycopy(BYTE_TO_BOOLS[packedBytesBuffer[byteIdx] & 0xFF], 0, dest, pos + written, 8);
+ byteIdx++;
+ written += 8;
+ }
+
+ // Handle remaining bits
+ if (written < batchSize) {
+ byte b = packedBytesBuffer[byteIdx];
+ int bp = 0;
+ while (written < batchSize) {
+ dest[pos + written] = ((b >>> bp) & 1) != 0;
+ bp++;
+ written++;
+ }
+ }
+ break;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + mode);
+ }
+ currentCount -= batchSize;
+ remaining -= batchSize;
+ pos += batchSize;
+ }
+ }
+
private void readNext() throws IOException {
Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
final int header = BytesUtils.readUnsignedVarInt(in);
@@ -97,6 +214,7 @@ private void readNext() throws IOException {
int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0);
bytesToRead = Math.min(bytesToRead, in.available());
new DataInputStream(in).readFully(bytes, 0, bytesToRead);
+ packedBytesBuffer = bytes;
for (int valueIndex = 0, byteIndex = 0;
valueIndex < currentCount;
valueIndex += 8, byteIndex += bitWidth) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
index e33824bff1..fc83e85963 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
@@ -272,6 +272,71 @@ public BytesInput toBytes() throws IOException {
return BytesInput.from(baos);
}
+ /**
+ * Batch-encodes boolean values (bitWidth must be 1). Pre-scans for runs to emit
+ * RLE runs directly and packs remaining groups into bit-packed runs, bypassing
+ * the per-value state machine.
+ *
+ * This method may only be called when the encoder is in its initial state
+ * (no values have been written via {@link #writeInt}). If called after scalar
+ * writes, behavior is undefined.
+ *
+ * @param values the boolean array
+ * @param offset start position in the array
+ * @param length number of values to encode
+ */
+ public void writeBooleans(boolean[] values, int offset, int length) throws IOException {
+ Preconditions.checkArgument(bitWidth == 1, "writeBooleans requires bitWidth == 1");
+
+ int pos = offset;
+ int end = offset + length;
+
+ while (pos < end) {
+ // Scan for run of consecutive identical values
+ boolean val = values[pos];
+ int runStart = pos;
+ pos++;
+ while (pos < end && values[pos] == val) {
+ pos++;
+ }
+ int runLen = pos - runStart;
+ int intVal = val ? 1 : 0;
+
+ // If we have a pending partial buffer, fill it first from this run
+ if (numBufferedValues > 0 && runLen >= 8) {
+ int fill = 8 - numBufferedValues;
+ for (int i = 0; i < fill; i++) {
+ bufferedValues[numBufferedValues] = intVal;
+ numBufferedValues++;
+ }
+ writeOrAppendBitPackedRun();
+ runLen -= fill;
+ }
+
+ if (runLen >= 8) {
+ // Buffer is empty now, emit RLE run for the remaining
+ endPreviousBitPackedRun();
+ BytesUtils.writeUnsignedVarInt(runLen << 1, baos);
+ BytesUtils.writeIntLittleEndianPaddedOnBitWidth(baos, intVal, bitWidth);
+ } else {
+ // Buffer values for bit-packing
+ for (int i = 0; i < runLen; i++) {
+ bufferedValues[numBufferedValues] = intVal;
+ numBufferedValues++;
+ if (numBufferedValues == 8) {
+ writeOrAppendBitPackedRun();
+ }
+ }
+ }
+ }
+
+ // Update state so toBytes() handles the tail correctly
+ repeatCount = 0;
+ if (numBufferedValues > 0) {
+ previousValue = bufferedValues[numBufferedValues - 1];
+ }
+ }
+
/**
* Reset this encoder for re-use
*/
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index 0bd5a18d2b..9ee70add6d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -54,6 +54,24 @@ public int readInteger() {
}
}
+ @Override
+ public void readIntegers(int[] dest, int offset, int count) {
+ try {
+ decoder.readInts(dest, offset, count);
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public void readBooleans(boolean[] dest, int offset, int count) {
+ try {
+ decoder.readBooleans(dest, offset, count);
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
@Override
public boolean readBoolean() {
return readInteger() == 0 ? false : true;
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index e869b0f2a3..b6609b1d43 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -52,6 +52,15 @@ public void writeBoolean(boolean v) {
writeInteger(v ? 1 : 0);
}
+ @Override
+ public void writeBooleans(boolean[] values, int offset, int length) {
+ try {
+ encoder.writeBooleans(values, offset, length);
+ } catch (IOException e) {
+ throw new ParquetEncodingException(e);
+ }
+ }
+
@Override
public long getBufferedSize() {
return encoder.getBufferedSize();
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
index d3d8b1b6de..7dbe22a6b3 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import org.apache.parquet.OutputStreamCloseException;
@@ -201,6 +202,7 @@ private void addSlab(int minimumSize) {
LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize);
this.currentSlab = allocator.allocate(nextSlabSize);
+ this.currentSlab.order(ByteOrder.LITTLE_ENDIAN);
this.slabs.add(currentSlab);
this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);
}
@@ -232,6 +234,114 @@ public void write(byte b[], int off, int len) {
bytesUsed = Math.addExact(bytesUsed, len);
}
+ /**
+ * Writes multiple int values in little-endian byte order using bulk {@code IntBuffer} transfer.
+ * Amortizes capacity checks across the entire batch and leverages platform-optimized bulk put.
+ *
+ * @param values source array
+ * @param offset start index in values
+ * @param length number of ints to write
+ */
+ public void writeInts(int[] values, int offset, int length) {
+ int bytesNeeded = length * 4;
+ if (bytesNeeded <= currentSlab.remaining()) {
+ currentSlab.asIntBuffer().put(values, offset, length);
+ currentSlab.position(currentSlab.position() + bytesNeeded);
+ } else {
+ // Fill current slab, then continue into a new one
+ int fits = currentSlab.remaining() / 4;
+ if (fits > 0) {
+ currentSlab.asIntBuffer().put(values, offset, fits);
+ currentSlab.position(currentSlab.position() + fits * 4);
+ }
+ int remaining = length - fits;
+ addSlab(remaining * 4);
+ currentSlab.asIntBuffer().put(values, offset + fits, remaining);
+ currentSlab.position(currentSlab.position() + remaining * 4);
+ }
+ bytesUsed = Math.addExact(bytesUsed, bytesNeeded);
+ }
+
+ /**
+ * Writes multiple long values in little-endian byte order using bulk {@code LongBuffer} transfer.
+ *
+ * @param values source array
+ * @param offset start index in values
+ * @param length number of longs to write
+ */
+ public void writeLongs(long[] values, int offset, int length) {
+ int bytesNeeded = length * 8;
+ if (bytesNeeded <= currentSlab.remaining()) {
+ currentSlab.asLongBuffer().put(values, offset, length);
+ currentSlab.position(currentSlab.position() + bytesNeeded);
+ } else {
+ int fits = currentSlab.remaining() / 8;
+ if (fits > 0) {
+ currentSlab.asLongBuffer().put(values, offset, fits);
+ currentSlab.position(currentSlab.position() + fits * 8);
+ }
+ int remaining = length - fits;
+ addSlab(remaining * 8);
+ currentSlab.asLongBuffer().put(values, offset + fits, remaining);
+ currentSlab.position(currentSlab.position() + remaining * 8);
+ }
+ bytesUsed = Math.addExact(bytesUsed, bytesNeeded);
+ }
+
+ /**
+ * Writes multiple float values in little-endian byte order using bulk {@code FloatBuffer} transfer.
+ * The slab's LE byte order ensures correct IEEE 754 encoding without explicit
+ * {@code Float.floatToIntBits()} conversion.
+ *
+ * @param values source array
+ * @param offset start index in values
+ * @param length number of floats to write
+ */
+ public void writeFloats(float[] values, int offset, int length) {
+ int bytesNeeded = length * 4;
+ if (bytesNeeded <= currentSlab.remaining()) {
+ currentSlab.asFloatBuffer().put(values, offset, length);
+ currentSlab.position(currentSlab.position() + bytesNeeded);
+ } else {
+ int fits = currentSlab.remaining() / 4;
+ if (fits > 0) {
+ currentSlab.asFloatBuffer().put(values, offset, fits);
+ currentSlab.position(currentSlab.position() + fits * 4);
+ }
+ int remaining = length - fits;
+ addSlab(remaining * 4);
+ currentSlab.asFloatBuffer().put(values, offset + fits, remaining);
+ currentSlab.position(currentSlab.position() + remaining * 4);
+ }
+ bytesUsed = Math.addExact(bytesUsed, bytesNeeded);
+ }
+
+ /**
+ * Writes multiple double values in little-endian byte order using bulk {@code DoubleBuffer} transfer.
+ *
+ * @param values source array
+ * @param offset start index in values
+ * @param length number of doubles to write
+ */
+ public void writeDoubles(double[] values, int offset, int length) {
+ int bytesNeeded = length * 8;
+ if (bytesNeeded <= currentSlab.remaining()) {
+ currentSlab.asDoubleBuffer().put(values, offset, length);
+ currentSlab.position(currentSlab.position() + bytesNeeded);
+ } else {
+ int fits = currentSlab.remaining() / 8;
+ if (fits > 0) {
+ currentSlab.asDoubleBuffer().put(values, offset, fits);
+ currentSlab.position(currentSlab.position() + fits * 8);
+ }
+ int remaining = length - fits;
+ addSlab(remaining * 8);
+ currentSlab.asDoubleBuffer().put(values, offset + fits, remaining);
+ currentSlab.position(currentSlab.position() + remaining * 8);
+ }
+ bytesUsed = Math.addExact(bytesUsed, bytesNeeded);
+ }
+
private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
if (buf.hasArray()) {
out.write(buf.array(), buf.arrayOffset(), len);
+ *
+ *
+ *
+ *
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Thread)
+public class FixedLenByteArrayEncodingBenchmark {
+
+ static final int VALUE_COUNT = 100_000;
+ private static final int INIT_SLAB_SIZE = 64 * 1024;
+ private static final int PAGE_SIZE = 4 * 1024 * 1024;
+ private static final int MAX_DICT_BYTE_SIZE = 4 * 1024 * 1024;
+
+ @Param({"2", "12", "16"})
+ public int fixedLength;
+
+ @Param({"RANDOM", "LOW_CARDINALITY"})
+ public String dataPattern;
+
+ private Binary[] data;
+
+ // Pre-encoded pages for decode benchmarks
+ private byte[] plainEncoded;
+ private byte[] deltaEncoded;
+ private byte[] bssEncoded;
+ private byte[] dictDataEncoded;
+ private Dictionary flbaDictionary;
+ private boolean dictAvailable;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ int distinct = "LOW_CARDINALITY".equals(dataPattern) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0;
+ data = TestDataFactory.generateFixedLenByteArrays(
+ VALUE_COUNT, fixedLength, distinct, TestDataFactory.DEFAULT_SEED);
+
+ // Pre-encode for decode benchmarks
+ plainEncoded = encodeWith(newPlainWriter());
+ deltaEncoded = encodeWith(newDeltaWriter());
+ bssEncoded = encodeWith(newBssWriter());
+ setupDict();
+ }
+
+ private byte[] encodeWith(ValuesWriter writer) throws IOException {
+ for (Binary v : data) {
+ writer.writeBytes(v);
+ }
+ byte[] bytes = writer.getBytes().toByteArray();
+ writer.close();
+ return bytes;
+ }
+
+ private void setupDict() throws IOException {
+ DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter w =
+ new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(
+ MAX_DICT_BYTE_SIZE,
+ fixedLength,
+ Encoding.PLAIN_DICTIONARY,
+ Encoding.PLAIN,
+ new HeapByteBufferAllocator());
+ for (Binary v : data) {
+ w.writeBytes(v);
+ }
+ BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w);
+ dictDataEncoded = enc.dictData;
+ dictAvailable = !enc.fellBackToPlain();
+ if (dictAvailable) {
+ flbaDictionary = new PlainValuesDictionary.PlainBinaryDictionary(enc.dictPage, fixedLength);
+ }
+ }
+
+ // ---- Writer factories ----
+
+ private FixedLenByteArrayPlainValuesWriter newPlainWriter() {
+ return new FixedLenByteArrayPlainValuesWriter(
+ fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
+ }
+
+ private DeltaByteArrayWriter newDeltaWriter() {
+ return new DeltaByteArrayWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
+ }
+
+ private ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter newBssWriter() {
+ return new ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter(
+ fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
+ }
+
+ // ==== ENCODE BENCHMARKS ====
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodePlain() throws IOException {
+ return encodeWith(newPlainWriter());
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodePlainBatch() throws IOException {
+ FixedLenByteArrayPlainValuesWriter writer = newPlainWriter();
+ writer.writeBinaries(data, 0, data.length);
+ byte[] bytes = writer.getBytes().toByteArray();
+ writer.close();
+ return bytes;
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodeDelta() throws IOException {
+ return encodeWith(newDeltaWriter());
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodeBss() throws IOException {
+ return encodeWith(newBssWriter());
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void encodeDictionary(Blackhole bh) throws IOException {
+ DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter w =
+ new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(
+ MAX_DICT_BYTE_SIZE,
+ fixedLength,
+ Encoding.PLAIN_DICTIONARY,
+ Encoding.PLAIN,
+ new HeapByteBufferAllocator());
+ for (Binary v : data) {
+ w.writeBytes(v);
+ }
+ BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w);
+ bh.consume(enc.dictData);
+ bh.consume(enc.dictPage);
+ }
+
+ // ==== DECODE BENCHMARKS ====
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodePlain(Blackhole bh) throws IOException {
+ FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(fixedLength);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readBytes());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodePlainBatch(Blackhole bh) throws IOException {
+ FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(fixedLength);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded)));
+ Binary[] batch = new Binary[VALUE_COUNT];
+ reader.readBinaries(batch, 0, VALUE_COUNT);
+ bh.consume(batch);
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeDelta(Blackhole bh) throws IOException {
+ DeltaByteArrayReader reader = new DeltaByteArrayReader();
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readBytes());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeBss(Blackhole bh) throws IOException {
+ ByteStreamSplitValuesReaderForFLBA reader = new ByteStreamSplitValuesReaderForFLBA(fixedLength);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(bssEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readBytes());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeBssBatch(Blackhole bh) throws IOException {
+ ByteStreamSplitValuesReaderForFLBA reader = new ByteStreamSplitValuesReaderForFLBA(fixedLength);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(bssEncoded)));
+ Binary[] batch = new Binary[VALUE_COUNT];
+ reader.readBinaries(batch, 0, VALUE_COUNT);
+ bh.consume(batch);
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeDictionary(Blackhole bh) throws IOException {
+ if (!dictAvailable) return;
+ DictionaryValuesReader reader = new DictionaryValuesReader(flbaDictionary);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictDataEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readBytes());
+ }
+ }
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java
new file mode 100644
index 0000000000..a330a59edc
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.concurrent.TimeUnit;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Decoding micro-benchmark for synthetic dictionary-id pages encoded with
+ * {@link RunLengthBitPackingHybridEncoder}. This isolates the dictionary-id
+ * decode path and is intentionally separate from {@link IntEncodingBenchmark},
+ * which measures full INT32 value decode paths.
+ *
+ *