Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,120 +19,128 @@
package org.apache.parquet.column.values.plain;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.LittleEndianDataInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Plain encoding for float, double, int, long
* Plain encoding for float, double, int, long.
*
* <p>Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order,
* bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch
* overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via
* {@link ByteBufferInputStream#slice(int)}.
*/
public abstract class PlainValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class);

protected LittleEndianDataInputStream in;
ByteBuffer buffer;

@Override
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
this.in = new LittleEndianDataInputStream(stream.remainingStream());
int available = stream.available();
if (available > 0) {
this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN);
} else {
this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);
}
}

@Override
public void skip() {
skip(1);
}

void skipBytesFully(int n) throws IOException {
int skipped = 0;
while (skipped < n) {
skipped += in.skipBytes(n - skipped);
}
}

public static class DoublePlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
skipBytesFully(n * 8);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " double values", e);
}
buffer.position(buffer.position() + n * 8);
}

@Override
public double readDouble() {
try {
return in.readDouble();
} catch (IOException e) {
throw new ParquetDecodingException("could not read double", e);
}
return buffer.getDouble();
}

/**
* Reads {@code count} doubles into {@code dest} starting at {@code offset}.
* Uses a bulk {@link java.nio.DoubleBuffer} view read for reduced per-value overhead.
*/
public void readDoubles(double[] dest, int offset, int count) {
buffer.asDoubleBuffer().get(dest, offset, count);
buffer.position(buffer.position() + count * Double.BYTES);
}
}

public static class FloatPlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
skipBytesFully(n * 4);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " floats", e);
}
buffer.position(buffer.position() + n * 4);
}

@Override
public float readFloat() {
try {
return in.readFloat();
} catch (IOException e) {
throw new ParquetDecodingException("could not read float", e);
}
return buffer.getFloat();
}

/**
* Reads {@code count} floats into {@code dest} starting at {@code offset}.
* Uses a bulk {@link java.nio.FloatBuffer} view read for reduced per-value overhead.
*/
public void readFloats(float[] dest, int offset, int count) {
buffer.asFloatBuffer().get(dest, offset, count);
buffer.position(buffer.position() + count * Float.BYTES);
}
}

public static class IntegerPlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
in.skipBytes(n * 4);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " ints", e);
}
buffer.position(buffer.position() + n * 4);
}

@Override
public int readInteger() {
try {
return in.readInt();
} catch (IOException e) {
throw new ParquetDecodingException("could not read int", e);
}
return buffer.getInt();
}

/**
* Reads {@code count} integers into {@code dest} starting at {@code offset}.
* Uses a bulk {@link java.nio.IntBuffer} view read for reduced per-value overhead.
*/
public void readIntegers(int[] dest, int offset, int count) {
buffer.asIntBuffer().get(dest, offset, count);
buffer.position(buffer.position() + count * Integer.BYTES);
}
}

public static class LongPlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
in.skipBytes(n * 8);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " longs", e);
}
buffer.position(buffer.position() + n * 8);
}

@Override
public long readLong() {
try {
return in.readLong();
} catch (IOException e) {
throw new ParquetDecodingException("could not read long", e);
}
return buffer.getLong();
}

/**
* Reads {@code count} longs into {@code dest} starting at {@code offset}.
* Uses a bulk {@link java.nio.LongBuffer} view read for reduced per-value overhead.
*/
public void readLongs(long[] dest, int offset, int count) {
buffer.asLongBuffer().get(dest, offset, count);
buffer.position(buffer.position() + count * Long.BYTES);
}
}
}
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,8 @@
<exclude>org.apache.parquet.internal.column.columnindex.IndexIterator</exclude>
<!-- Removal of a protected method in a class that's not supposed to be subclassed by third-party code -->
<exclude>org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[])</exclude>
<!-- Removal of a protected internal field that should not have been part of the public API -->
<exclude>org.apache.parquet.column.values.plain.PlainValuesReader#in</exclude>
<!-- Due to the removal of deprecated methods -->
<exclude>org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping)</exclude>
<!-- Intentional removal of deprecated Pig support from parquet-thrift -->
Expand Down
Loading