From 80f9fea85772364dc1ae0cabb54c98ecc0318014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 19 Apr 2026 10:50:32 +0000 Subject: [PATCH 1/2] GH-3493: Optimize PlainValuesReader with direct ByteBuffer reads Replace the LittleEndianDataInputStream wrapper with direct ByteBuffer access using LITTLE_ENDIAN byte order in PlainValuesReader. Each read{Integer,Long,Float,Double}() previously dispatched through 4 in.read() calls per value and assembled the result with manual bit shifts; it now compiles to a single ByteBuffer get*() JVM intrinsic. In initFromPage, the page data is obtained as a single contiguous ByteBuffer via ByteBufferInputStream.slice(available). The ByteBufferInputStream.slice() method handles both single-buffer (zero-copy view) and multi-buffer (copy into contiguous buffer) cases transparently. In practice page data is almost always a single contiguous buffer. Benchmark (IntEncodingBenchmark.decodePlain, 100k INT32 values per invocation, JMH -wi 3 -i 5 -f 1): Pattern Before (ops/s) After (ops/s) Speedup SEQUENTIAL 427,630,411 5,397,298,681 12.6x RANDOM 431,052,072 5,437,926,758 12.6x LOW_CARDINALITY 423,443,685 5,477,810,011 12.9x HIGH_CARDINALITY 426,405,891 5,485,493,740 12.9x The improvement is consistent regardless of data distribution because the bottleneck was entirely in the dispatch overhead. All four numeric plain reader types (int, long, float, double) benefit equally. All 573 parquet-column tests pass. --- .../values/plain/PlainValuesReader.java | 74 ++++++------------- pom.xml | 2 + 2 files changed, 25 insertions(+), 51 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index a0c7af7394..cab438a4b0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,25 +19,36 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Plain encoding for float, double, int, long + * Plain encoding for float, double, int, long. + * + *

Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order, + * bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch + * overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via + * {@link ByteBufferInputStream#slice(int)}. */ public abstract class PlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class); - protected LittleEndianDataInputStream in; + ByteBuffer buffer; @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in = new LittleEndianDataInputStream(stream.remainingStream()); + int available = stream.available(); + if (available > 0) { + this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN); + } else { + this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN); + } } @Override @@ -45,31 +56,16 @@ public void skip() { skip(1); } - void skipBytesFully(int n) throws IOException { - int skipped = 0; - while (skipped < n) { - skipped += in.skipBytes(n - skipped); - } - } - public static class DoublePlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " double values", e); - } + buffer.position(buffer.position() + n * 8); } @Override public double readDouble() { - try { - return in.readDouble(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read double", e); - } + return buffer.getDouble(); } } @@ -77,20 +73,12 @@ public static class FloatPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " floats", e); - } + buffer.position(buffer.position() + n * 4); } @Override public float readFloat() { - try { - return in.readFloat(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read float", e); - } + return buffer.getFloat(); } } @@ -98,20 +86,12 @@ public static class IntegerPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " ints", e); - } + buffer.position(buffer.position() + n * 4); } @Override public int readInteger() { - try { - return in.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read int", e); - } + return buffer.getInt(); } } @@ -119,20 +99,12 @@ public static class LongPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " longs", e); - } + buffer.position(buffer.position() + n * 8); } @Override public long readLong() { - try { - return in.readLong(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read long", e); - } + return buffer.getLong(); } } } diff --git a/pom.xml b/pom.xml index 1bd9893d87..b2fd26c5d5 100644 --- a/pom.xml +++ b/pom.xml @@ -594,6 +594,8 @@ org.apache.parquet.internal.column.columnindex.IndexIterator org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[]) + + org.apache.parquet.column.values.plain.PlainValuesReader#in org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping) From f8e1333933e7639cead35b7ce83b2eced02ba464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 13 May 2026 14:12:52 +0200 Subject: [PATCH 2/2] Add batch read methods to PlainValuesReader with bulk ByteBuffer view reads Add readIntegers/readFloats/readLongs/readDoubles batch methods to all PlainValuesReader inner classes. All four types use bulk typed-buffer view reads (e.g. buffer.asIntBuffer().get(dest, offset, count)) which bypass per-value bounds checks and position updates. Benchmark results (PlainDecodingBenchmark, 100K values, pre-allocated arrays): Type Per-value (ops/s) Batch (ops/s) Speedup INT32 5,454M 28,256M +418% FLOAT 5,407M 25,798M +377% INT64 5,408M 8,088M +50% DOUBLE 7,404M 7,965M +8% --- .../values/plain/PlainValuesReader.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index cab438a4b0..aeacc71cbc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -67,6 +67,15 @@ public void skip(int n) { public double readDouble() { return buffer.getDouble(); } + + /** + * Reads {@code count} doubles into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.DoubleBuffer} view read for reduced per-value overhead. + */ + public void readDoubles(double[] dest, int offset, int count) { + buffer.asDoubleBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Double.BYTES); + } } public static class FloatPlainValuesReader extends PlainValuesReader { @@ -80,6 +89,15 @@ public void skip(int n) { public float readFloat() { return buffer.getFloat(); } + + /** + * Reads {@code count} floats into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.FloatBuffer} view read for reduced per-value overhead. + */ + public void readFloats(float[] dest, int offset, int count) { + buffer.asFloatBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Float.BYTES); + } } public static class IntegerPlainValuesReader extends PlainValuesReader { @@ -93,6 +111,15 @@ public void skip(int n) { public int readInteger() { return buffer.getInt(); } + + /** + * Reads {@code count} integers into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.IntBuffer} view read for reduced per-value overhead. + */ + public void readIntegers(int[] dest, int offset, int count) { + buffer.asIntBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Integer.BYTES); + } } public static class LongPlainValuesReader extends PlainValuesReader { @@ -106,5 +133,14 @@ public void skip(int n) { public long readLong() { return buffer.getLong(); } + + /** + * Reads {@code count} longs into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.LongBuffer} view read for reduced per-value overhead. + */ + public void readLongs(long[] dest, int offset, int count) { + buffer.asLongBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Long.BYTES); + } } }