diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index a0c7af7394..aeacc71cbc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,25 +19,36 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Plain encoding for float, double, int, long + * Plain encoding for float, double, int, long. + * + *

Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order, + * bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch + * overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via + * {@link ByteBufferInputStream#slice(int)}. */ public abstract class PlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class); - protected LittleEndianDataInputStream in; + ByteBuffer buffer; @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in = new LittleEndianDataInputStream(stream.remainingStream()); + int available = stream.available(); + if (available > 0) { + this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN); + } else { + this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN); + } } @Override @@ -45,31 +56,25 @@ public void skip() { skip(1); } - void skipBytesFully(int n) throws IOException { - int skipped = 0; - while (skipped < n) { - skipped += in.skipBytes(n - skipped); - } - } - public static class DoublePlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " double values", e); - } + buffer.position(buffer.position() + n * 8); } @Override public double readDouble() { - try { - return in.readDouble(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read double", e); - } + return buffer.getDouble(); + } + + /** + * Reads {@code count} doubles into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.DoubleBuffer} view read for reduced per-value overhead. + */ + public void readDoubles(double[] dest, int offset, int count) { + buffer.asDoubleBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Double.BYTES); } } @@ -77,20 +82,21 @@ public static class FloatPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " floats", e); - } + buffer.position(buffer.position() + n * 4); } @Override public float readFloat() { - try { - return in.readFloat(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read float", e); - } + return buffer.getFloat(); + } + + /** + * Reads {@code count} floats into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.FloatBuffer} view read for reduced per-value overhead. + */ + public void readFloats(float[] dest, int offset, int count) { + buffer.asFloatBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Float.BYTES); } } @@ -98,20 +104,21 @@ public static class IntegerPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " ints", e); - } + buffer.position(buffer.position() + n * 4); } @Override public int readInteger() { - try { - return in.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read int", e); - } + return buffer.getInt(); + } + + /** + * Reads {@code count} integers into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.IntBuffer} view read for reduced per-value overhead. + */ + public void readIntegers(int[] dest, int offset, int count) { + buffer.asIntBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Integer.BYTES); } } @@ -119,20 +126,21 @@ public static class LongPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " longs", e); - } + buffer.position(buffer.position() + n * 8); } @Override public long readLong() { - try { - return in.readLong(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read long", e); - } + return buffer.getLong(); + } + + /** + * Reads {@code count} longs into {@code dest} starting at {@code offset}. + * Uses a bulk {@link java.nio.LongBuffer} view read for reduced per-value overhead. + */ + public void readLongs(long[] dest, int offset, int count) { + buffer.asLongBuffer().get(dest, offset, count); + buffer.position(buffer.position() + count * Long.BYTES); } } } diff --git a/pom.xml b/pom.xml index 1bd9893d87..b2fd26c5d5 100644 --- a/pom.xml +++ b/pom.xml @@ -594,6 +594,8 @@ org.apache.parquet.internal.column.columnindex.IndexIterator org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[]) + + org.apache.parquet.column.values.plain.PlainValuesReader#in org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping)