diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index a0c7af7394..aeacc71cbc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,25 +19,36 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Plain encoding for float, double, int, long + * Plain encoding for float, double, int, long. + * + *
Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order,
+ * bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch
+ * overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via
+ * {@link ByteBufferInputStream#slice(int)}.
*/
public abstract class PlainValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class);
- protected LittleEndianDataInputStream in;
+ ByteBuffer buffer;
@Override
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
- this.in = new LittleEndianDataInputStream(stream.remainingStream());
+ int available = stream.available();
+ if (available > 0) {
+ this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN);
+ } else {
+ this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);
+ }
}
@Override
@@ -45,31 +56,25 @@ public void skip() {
skip(1);
}
- void skipBytesFully(int n) throws IOException {
- int skipped = 0;
- while (skipped < n) {
- skipped += in.skipBytes(n - skipped);
- }
- }
-
public static class DoublePlainValuesReader extends PlainValuesReader {
@Override
public void skip(int n) {
- try {
- skipBytesFully(n * 8);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not skip " + n + " double values", e);
- }
+ buffer.position(buffer.position() + n * 8);
}
@Override
public double readDouble() {
- try {
- return in.readDouble();
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read double", e);
- }
+ return buffer.getDouble();
+ }
+
+ /**
+ * Reads {@code count} doubles into {@code dest} starting at {@code offset}.
+ * Uses a bulk {@link java.nio.DoubleBuffer} view read for reduced per-value overhead.
+ */
+ public void readDoubles(double[] dest, int offset, int count) {
+ buffer.asDoubleBuffer().get(dest, offset, count);
+ buffer.position(buffer.position() + count * Double.BYTES);
}
}
@@ -77,20 +82,21 @@ public static class FloatPlainValuesReader extends PlainValuesReader {
@Override
public void skip(int n) {
- try {
- skipBytesFully(n * 4);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not skip " + n + " floats", e);
- }
+ buffer.position(buffer.position() + n * 4);
}
@Override
public float readFloat() {
- try {
- return in.readFloat();
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read float", e);
- }
+ return buffer.getFloat();
+ }
+
+ /**
+ * Reads {@code count} floats into {@code dest} starting at {@code offset}.
+ * Uses a bulk {@link java.nio.FloatBuffer} view read for reduced per-value overhead.
+ */
+ public void readFloats(float[] dest, int offset, int count) {
+ buffer.asFloatBuffer().get(dest, offset, count);
+ buffer.position(buffer.position() + count * Float.BYTES);
}
}
@@ -98,20 +104,21 @@ public static class IntegerPlainValuesReader extends PlainValuesReader {
@Override
public void skip(int n) {
- try {
- in.skipBytes(n * 4);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not skip " + n + " ints", e);
- }
+ buffer.position(buffer.position() + n * 4);
}
@Override
public int readInteger() {
- try {
- return in.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read int", e);
- }
+ return buffer.getInt();
+ }
+
+ /**
+ * Reads {@code count} integers into {@code dest} starting at {@code offset}.
+ * Uses a bulk {@link java.nio.IntBuffer} view read for reduced per-value overhead.
+ */
+ public void readIntegers(int[] dest, int offset, int count) {
+ buffer.asIntBuffer().get(dest, offset, count);
+ buffer.position(buffer.position() + count * Integer.BYTES);
}
}
@@ -119,20 +126,21 @@ public static class LongPlainValuesReader extends PlainValuesReader {
@Override
public void skip(int n) {
- try {
- in.skipBytes(n * 8);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not skip " + n + " longs", e);
- }
+ buffer.position(buffer.position() + n * 8);
}
@Override
public long readLong() {
- try {
- return in.readLong();
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read long", e);
- }
+ return buffer.getLong();
+ }
+
+ /**
+ * Reads {@code count} longs into {@code dest} starting at {@code offset}.
+ * Uses a bulk {@link java.nio.LongBuffer} view read for reduced per-value overhead.
+ */
+ public void readLongs(long[] dest, int offset, int count) {
+ buffer.asLongBuffer().get(dest, offset, count);
+ buffer.position(buffer.position() + count * Long.BYTES);
}
}
}
diff --git a/pom.xml b/pom.xml
index 1bd9893d87..b2fd26c5d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -594,6 +594,8 @@