diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index d5a288b677..d687b80053 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -94,6 +94,18 @@ org.apache.maven.plugins maven-compiler-plugin + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + + org.openjdk.jmh.generators.BenchmarkProcessor + + org.apache.maven.plugins diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java new file mode 100644 index 0000000000..c79dedce28 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; + +/** + * Shared helpers for encode/decode micro-benchmarks. + */ +final class BenchmarkEncodingUtils { + + private BenchmarkEncodingUtils() {} + + /** + * Container for the two artefacts produced by a dictionary-encoded page: + * the encoded dictionary indices ({@link #dictData}) and the dictionary + * page itself ({@link #dictPage}). The dictionary page may be {@code null} + * if the writer fell back to plain encoding (for example, when the + * dictionary exceeded its configured maximum size). + */ + static final class EncodedDictionary { + final byte[] dictData; + final DictionaryPage dictPage; + + EncodedDictionary(byte[] dictData, DictionaryPage dictPage) { + this.dictData = dictData; + this.dictPage = dictPage; + } + + boolean fellBackToPlain() { + return dictPage == null; + } + } + + /** + * Drains a {@link DictionaryValuesWriter} into an {@link EncodedDictionary}. + * + *

The writer's data bytes (the RLE-encoded indices) and the dictionary + * page are returned separately so both pieces can be measured or fed to a + * decoder symmetrically. The dictionary page buffer is copied so it remains + * valid after the writer's allocator is released. + * + *

The writer is closed via {@code toDictPageAndClose()}; callers must not + * call {@link DictionaryValuesWriter#close()} again afterwards. + */ + static EncodedDictionary drainDictionary(DictionaryValuesWriter writer) throws IOException { + byte[] dictData = writer.getBytes().toByteArray(); + DictionaryPage rawPage = writer.toDictPageAndClose(); + DictionaryPage dictPage = rawPage == null ? null : rawPage.copy(); + return new EncodedDictionary(dictData, dictPage); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java new file mode 100644 index 0000000000..690ddc2bbe --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +/** + * A no-op {@link OutputFile} that discards all written data. + * Useful for isolating CPU/encoding cost from filesystem I/O in write benchmarks. + */ +public final class BlackHoleOutputFile implements OutputFile { + + public static final BlackHoleOutputFile INSTANCE = new BlackHoleOutputFile(); + + private BlackHoleOutputFile() {} + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return -1L; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return new PositionOutputStream() { + private long pos; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void write(int b) throws IOException { + ++pos; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + pos += len; + } + }; + } + + @Override + public String getPath() { + return "/dev/null"; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java new file mode 100644 index 0000000000..de133f4607 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * File-level read benchmarks measuring end-to-end Parquet read throughput through the + * example {@link Group} API. A temporary file is generated once during setup from + * pre-generated rows using {@link LocalOutputFile}, then read repeatedly during the + * benchmark. + * + *

Parameterized across compression codec and writer version. The footer parse + * (via {@link LocalInputFile} open) is included in the timed section so the result + * reflects the full open-and-read cost a typical caller would observe. + * + *

{@link Mode#SingleShotTime} is used because each invocation does enough work + * (a full read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT + * amortization across invocations is unnecessary. Ten measurement iterations + * provide stable statistics for SS mode. + */ +@BenchmarkMode(Mode.SingleShotTime) +@Fork(1) +@Warmup(iterations = 5, batchSize = 1) +@Measurement(iterations = 10, batchSize = 1) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FileReadBenchmark { + + @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"}) + public String codec; + + @Param({"PARQUET_1_0", "PARQUET_2_0"}) + public String writerVersion; + + private File tempFile; + + @Setup(Level.Trial) + public void setup() throws IOException { + tempFile = File.createTempFile("parquet-read-bench-", ".parquet"); + tempFile.deleteOnExit(); + tempFile.delete(); // remove so the writer can create it + + Group[] rows = TestDataFactory.generateRows( + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED); + try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath())) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) + .withCompressionCodec(CompressionCodecName.valueOf(codec)) + .withWriterVersion(WriterVersion.valueOf(writerVersion)) + .withDictionaryEncoding(true) + .build()) { + for (Group row : rows) { + writer.write(row); + } + } + } + + @TearDown(Level.Trial) + public void tearDown() { + if (tempFile != null && tempFile.exists()) { + tempFile.delete(); + } + } + + @Benchmark + public void readFile(Blackhole bh) throws IOException { + InputFile inputFile = new LocalInputFile(tempFile.toPath()); + try (ParquetReader reader = new ParquetReader.Builder(inputFile) { + @Override + protected ReadSupport getReadSupport() { + return new GroupReadSupport(); + } + }.build()) { + Group group; + while ((group = reader.read()) != null) { + bh.consume(group); + } + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java new file mode 100644 index 0000000000..6716010cc3 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * File-level write benchmarks measuring end-to-end Parquet write throughput through the + * example {@link Group} API. Row contents are pre-generated during setup so compression + * and writer settings dominate the timed section, while writes still flow through the + * full Parquet writer path. + * + *

Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU and encoding cost + * from filesystem I/O. Parameterized across compression codec, writer version, and + * dictionary encoding. + * + *

{@link Mode#SingleShotTime} is used because each invocation does enough work + * (a full write of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT + * amortization across invocations is unnecessary. Ten measurement iterations + * provide stable statistics for SS mode. + */ +@BenchmarkMode(Mode.SingleShotTime) +@Fork(1) +@Warmup(iterations = 5, batchSize = 1) +@Measurement(iterations = 10, batchSize = 1) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FileWriteBenchmark { + + @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"}) + public String codec; + + @Param({"PARQUET_1_0", "PARQUET_2_0"}) + public String writerVersion; + + @Param({"true", "false"}) + public String dictionary; + + private Group[] rows; + + @Setup(Level.Trial) + public void setup() { + rows = TestDataFactory.generateRows( + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED); + } + + @Benchmark + public void writeFile() throws IOException { + try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) + .withCompressionCodec(CompressionCodecName.valueOf(codec)) + .withWriterVersion(WriterVersion.valueOf(writerVersion)) + .withDictionaryEncoding(Boolean.parseBoolean(dictionary)) + .build()) { + for (Group row : rows) { + writer.write(row); + } + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FixedLenByteArrayEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FixedLenByteArrayEncodingBenchmark.java new file mode 100644 index 0000000000..bc964b9f25 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FixedLenByteArrayEncodingBenchmark.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; +import org.apache.parquet.column.values.dictionary.PlainValuesDictionary; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesReader; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Encoding-level micro-benchmarks for FIXED_LEN_BYTE_ARRAY (FLBA) values across + * all supported encodings: PLAIN, DELTA_BYTE_ARRAY, BYTE_STREAM_SPLIT, and DICTIONARY. + * + *

Each benchmark invocation processes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + * + *

The {@code fixedLength} parameter exercises key FLBA sizes: + *

+ * + *

The {@code dataPattern} parameter controls cardinality: + *

+ */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class FixedLenByteArrayEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + private static final int MAX_DICT_BYTE_SIZE = 4 * 1024 * 1024; + + @Param({"2", "12", "16"}) + public int fixedLength; + + @Param({"RANDOM", "LOW_CARDINALITY"}) + public String dataPattern; + + private Binary[] data; + + // Pre-encoded pages for decode benchmarks + private byte[] plainEncoded; + private byte[] deltaEncoded; + private byte[] bssEncoded; + private byte[] dictDataEncoded; + private Dictionary flbaDictionary; + private boolean dictAvailable; + + @Setup(Level.Trial) + public void setup() throws IOException { + int distinct = "LOW_CARDINALITY".equals(dataPattern) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; + data = TestDataFactory.generateFixedLenByteArrays( + VALUE_COUNT, fixedLength, distinct, TestDataFactory.DEFAULT_SEED); + + // Pre-encode for decode benchmarks + plainEncoded = encodeWith(newPlainWriter()); + deltaEncoded = encodeWith(newDeltaWriter()); + bssEncoded = encodeWith(newBssWriter()); + setupDict(); + } + + private byte[] encodeWith(ValuesWriter writer) throws IOException { + for (Binary v : data) { + writer.writeBytes(v); + } + byte[] bytes = writer.getBytes().toByteArray(); + writer.close(); + return bytes; + } + + private void setupDict() throws IOException { + DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + fixedLength, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + dictDataEncoded = enc.dictData; + dictAvailable = !enc.fellBackToPlain(); + if (dictAvailable) { + flbaDictionary = new PlainValuesDictionary.PlainBinaryDictionary(enc.dictPage, fixedLength); + } + } + + // ---- Writer factories ---- + + private FixedLenByteArrayPlainValuesWriter newPlainWriter() { + return new FixedLenByteArrayPlainValuesWriter( + fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private DeltaByteArrayWriter newDeltaWriter() { + return new DeltaByteArrayWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter newBssWriter() { + return new ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter( + fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + // ==== ENCODE BENCHMARKS ==== + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodePlain() throws IOException { + return encodeWith(newPlainWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodePlainBatch() throws IOException { + FixedLenByteArrayPlainValuesWriter writer = newPlainWriter(); + writer.writeBinaries(data, 0, data.length); + byte[] bytes = writer.getBytes().toByteArray(); + writer.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDelta() throws IOException { + return encodeWith(newDeltaWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeBss() throws IOException { + return encodeWith(newBssWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeDictionary(Blackhole bh) throws IOException { + DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter w = + new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, + fixedLength, + Encoding.PLAIN_DICTIONARY, + Encoding.PLAIN, + new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + BenchmarkEncodingUtils.EncodedDictionary enc = BenchmarkEncodingUtils.drainDictionary(w); + bh.consume(enc.dictData); + bh.consume(enc.dictPage); + } + + // ==== DECODE BENCHMARKS ==== + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodePlain(Blackhole bh) throws IOException { + FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(fixedLength); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodePlainBatch(Blackhole bh) throws IOException { + FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(fixedLength); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded))); + Binary[] batch = new Binary[VALUE_COUNT]; + reader.readBinaries(batch, 0, VALUE_COUNT); + bh.consume(batch); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDelta(Blackhole bh) throws IOException { + DeltaByteArrayReader reader = new DeltaByteArrayReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBss(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForFLBA reader = new ByteStreamSplitValuesReaderForFLBA(fixedLength); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(bssEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBssBatch(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForFLBA reader = new ByteStreamSplitValuesReaderForFLBA(fixedLength); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(bssEncoded))); + Binary[] batch = new Binary[VALUE_COUNT]; + reader.readBinaries(batch, 0, VALUE_COUNT); + bh.consume(batch); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionary(Blackhole bh) throws IOException { + if (!dictAvailable) return; + DictionaryValuesReader reader = new DictionaryValuesReader(flbaDictionary); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictDataEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java new file mode 100644 index 0000000000..a330a59edc --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding micro-benchmark for synthetic dictionary-id pages encoded with + * {@link RunLengthBitPackingHybridEncoder}. This isolates the dictionary-id + * decode path and is intentionally separate from {@link IntEncodingBenchmark}, + * which measures full INT32 value decode paths. + * + *

Per-invocation overhead (decoder construction and {@link ByteBufferInputStream} + * wrapping) is amortized over {@value #VALUE_COUNT} reads via + * {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleDictionaryIndexDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + private static final int BIT_WIDTH = 10; + private static final int MAX_ID = 1 << BIT_WIDTH; + + static { + if (TestDataFactory.LOW_CARDINALITY_DISTINCT > MAX_ID) { + throw new IllegalStateException("LOW_CARDINALITY_DISTINCT (" + TestDataFactory.LOW_CARDINALITY_DISTINCT + + ") must fit within BIT_WIDTH=" + BIT_WIDTH + " (MAX_ID=" + MAX_ID + ")"); + } + } + + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY"}) + public String indexPattern; + + private byte[] encoded; + + // encoded with 4-byte LE length prefix, as expected by ValuesReader.initFromPage() + private byte[] encodedWithLengthPrefix; + + private int[] ids; + + @Setup(Level.Trial) + public void setup() throws IOException { + ids = generateDictionaryIds(); + try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (int id : ids) { + encoder.writeInt(id); + } + encoded = encoder.toBytes().toByteArray(); + } + + // Prepend 4-byte LE length for ValuesReader.initFromPage() format + encodedWithLengthPrefix = new byte[4 + encoded.length]; + ByteBuffer.wrap(encodedWithLengthPrefix).order(ByteOrder.LITTLE_ENDIAN).putInt(encoded.length); + System.arraycopy(encoded, 0, encodedWithLengthPrefix, 4, encoded.length); + } + + private int[] generateDictionaryIds() { + switch (indexPattern) { + case "SEQUENTIAL": + int[] sequential = new int[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + sequential[i] = i % MAX_ID; + } + return sequential; + case "RANDOM": + return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, MAX_ID, TestDataFactory.DEFAULT_SEED); + case "LOW_CARDINALITY": + return TestDataFactory.generateLowCardinalityInts( + VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, TestDataFactory.DEFAULT_SEED); + default: + throw new IllegalArgumentException("Unknown index pattern: " + indexPattern); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDictionaryIds() throws IOException { + try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (int id : ids) { + encoder.writeInt(id); + } + return encoder.toBytes().toByteArray(); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionaryIds(Blackhole bh) throws IOException { + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(BIT_WIDTH, new ByteArrayInputStream(encoded)); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(decoder.readInt()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public int[] decodeDictionaryIdsBatch() throws IOException { + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(BIT_WIDTH, new ByteArrayInputStream(encoded)); + int[] result = new int[VALUE_COUNT]; + decoder.readInts(result, 0, VALUE_COUNT); + return result; + } + + // ---- ValuesReader-level benchmarks ---- + // These go through the RunLengthBitPackingHybridValuesReader wrapper, + // which is the path used by ColumnReader in production. + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeValuesReader(Blackhole bh) throws IOException { + RunLengthBitPackingHybridValuesReader reader = new RunLengthBitPackingHybridValuesReader(BIT_WIDTH); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public int[] decodeValuesReaderBatch() throws IOException { + RunLengthBitPackingHybridValuesReader reader = new RunLengthBitPackingHybridValuesReader(BIT_WIDTH); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix))); + int[] result = new int[VALUE_COUNT]; + reader.readIntegers(result, 0, VALUE_COUNT); + return result; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java new file mode 100644 index 0000000000..9bc5cab0a8 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmark measuring row group flush performance and peak buffer memory. + * + *

Uses a wide schema (20 BINARY columns, 200 bytes each) to produce + * substantial per-column page buffers. A {@link PeakTrackingAllocator} + * wraps the heap allocator to precisely track the peak bytes outstanding + * across all parquet-managed ByteBuffers (independent of JVM GC behavior). + * + *

The key metric is {@code peakAllocatorMB}: with the interleaved flush + * optimization, each column's pages are finalized, written, and released + * before the next column is processed, so peak buffer memory is roughly + * 1/N of the total row group size (N = number of columns). + * + *

Writes to {@link BlackHoleOutputFile} to isolate flush cost from + * filesystem I/O. + */ +@BenchmarkMode({Mode.AverageTime}) +@Fork( + value = 1, + jvmArgs = {"-Xms512m", "-Xmx1g"}) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +public class RowGroupFlushBenchmark { + + private static final int COLUMN_COUNT = 20; + private static final int BINARY_VALUE_LENGTH = 200; + private static final int ROW_COUNT = 100_000; + + /** Row group sizes: 8MB and 64MB. */ + @Param({"8388608", "67108864"}) + public int rowGroupSize; + + /** Wide schema: 20 required BINARY columns. */ + private static final MessageType WIDE_SCHEMA; + + static { + Types.MessageTypeBuilder builder = Types.buildMessage(); + for (int c = 0; c < COLUMN_COUNT; c++) { + builder.required(PrimitiveTypeName.BINARY).named("col_" + c); + } + WIDE_SCHEMA = builder.named("wide_record"); + } + + /** Pre-generated column values (one unique value per column). */ + private Binary[] columnValues; + + @Setup(Level.Trial) + public void setup() { + Random random = new Random(42); + columnValues = new Binary[COLUMN_COUNT]; + for (int c = 0; c < COLUMN_COUNT; c++) { + byte[] value = new byte[BINARY_VALUE_LENGTH]; + random.nextBytes(value); + columnValues[c] = Binary.fromConstantByteArray(value); + } + } + + /** + * Auxiliary counters reported alongside timing. JMH collects these after + * each iteration. + */ + @AuxCounters(AuxCounters.Type.EVENTS) + @State(Scope.Thread) + public static class MemoryCounters { + /** Peak bytes outstanding in the parquet ByteBufferAllocator. */ + public long peakAllocatorBytes; + + /** Convenience: peak in MB (peakAllocatorBytes / 1048576). */ + public double peakAllocatorMB; + + @Setup(Level.Iteration) + public void reset() { + peakAllocatorBytes = 0; + peakAllocatorMB = 0; + } + } + + /** + * ByteBufferAllocator wrapper that tracks current and peak allocated bytes. + * Thread-safe (uses AtomicLong) although the write path is single-threaded. + */ + static class PeakTrackingAllocator implements ByteBufferAllocator { + private final ByteBufferAllocator delegate = new HeapByteBufferAllocator(); + private final AtomicLong currentBytes = new AtomicLong(); + private final AtomicLong peakBytes = new AtomicLong(); + + @Override + public ByteBuffer allocate(int size) { + ByteBuffer buf = delegate.allocate(size); + long current = currentBytes.addAndGet(buf.capacity()); + peakBytes.accumulateAndGet(current, Math::max); + return buf; + } + + @Override + public void release(ByteBuffer buf) { + currentBytes.addAndGet(-buf.capacity()); + delegate.release(buf); + } + + @Override + public boolean isDirect() { + return delegate.isDirect(); + } + + long getPeakBytes() { + return peakBytes.get(); + } + } + + @Benchmark + public void writeWithFlush(MemoryCounters counters) throws IOException { + PeakTrackingAllocator allocator = new PeakTrackingAllocator(); + SimpleGroupFactory factory = new SimpleGroupFactory(WIDE_SCHEMA); + + try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(WIDE_SCHEMA) + .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) + .withWriterVersion(WriterVersion.PARQUET_1_0) + .withRowGroupSize(rowGroupSize) + .withDictionaryEncoding(false) + .withAllocator(allocator) + .build()) { + for (int i = 0; i < ROW_COUNT; i++) { + Group group = factory.newGroup(); + for (int c = 0; c < COLUMN_COUNT; c++) { + group.append("col_" + c, columnValues[c]); + } + writer.write(group); + } + } + + counters.peakAllocatorBytes = allocator.getPeakBytes(); + counters.peakAllocatorMB = allocator.getPeakBytes() / (1024.0 * 1024.0); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java new file mode 100644 index 0000000000..93cc714730 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +import java.nio.charset.StandardCharsets; +import java.util.Random; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; + +/** + * Utility class for generating test schemas and data for benchmarks. + */ +public final class TestDataFactory { + + /** Default number of rows for file-level benchmarks. */ + public static final int DEFAULT_ROW_COUNT = 100_000; + + /** Number of distinct values for low-cardinality data patterns. */ + public static final int LOW_CARDINALITY_DISTINCT = 100; + + /** Default RNG seed used across benchmarks for deterministic data. */ + public static final long DEFAULT_SEED = 42L; + + /** A standard multi-type schema used by file-level benchmarks. */ + public static final MessageType FILE_BENCHMARK_SCHEMA = Types.buildMessage() + .required(INT32) + .named("int32_field") + .required(INT64) + .named("int64_field") + .required(FLOAT) + .named("float_field") + .required(DOUBLE) + .named("double_field") + .required(BOOLEAN) + .named("boolean_field") + .required(BINARY) + .named("binary_field") + .named("benchmark_record"); + + private TestDataFactory() {} + + /** + * Creates a {@link SimpleGroupFactory} for the standard benchmark schema. + */ + public static SimpleGroupFactory newGroupFactory() { + return new SimpleGroupFactory(FILE_BENCHMARK_SCHEMA); + } + + /** + * Generates a single row of benchmark data. + * + * @param factory the group factory + * @param index the row index (used for deterministic data) + * @param random the random source + * @return a populated Group + */ + public static Group generateRow(SimpleGroupFactory factory, int index, Random random) { + return factory.newGroup() + .append("int32_field", index) + .append("int64_field", (long) index * 100) + .append("float_field", random.nextFloat()) + .append("double_field", random.nextDouble()) + .append("boolean_field", index % 2 == 0) + .append("binary_field", "value_" + (index % 1000)); + } + + /** + * Generates a deterministic set of rows for file-level benchmarks. + */ + public static Group[] generateRows(SimpleGroupFactory factory, int rowCount, long seed) { + Group[] rows = new Group[rowCount]; + Random random = new Random(seed); + for (int i = 0; i < rowCount; i++) { + rows[i] = generateRow(factory, i, random); + } + return rows; + } + + // ---- Integer data generation for encoding benchmarks ---- + + /** + * Generates sequential integers: 0, 1, 2, ... + */ + public static int[] generateSequentialInts(int count) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random integers using the given seed. + */ + public static int[] generateRandomInts(int count, long seed) { + return generateRandomInts(count, new Random(seed)); + } + + /** + * Generates uniformly random integers. + * + *

Note: prefer {@link #generateRandomInts(int, long)} when call ordering between + * generators in the same setup must not influence the produced data. + */ + public static int[] generateRandomInts(int count, Random random) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(); + } + return data; + } + + /** + * Generates low-cardinality integers (values drawn from a small set) using the given seed. + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) { + return generateLowCardinalityInts(count, distinctValues, new Random(seed)); + } + + /** + * Generates low-cardinality integers (values drawn from a small set). + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, Random random) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(distinctValues); + } + return data; + } + + /** + * Generates high-cardinality integers (all unique in randomized order) using the given seed. + */ + public static int[] generateHighCardinalityInts(int count, long seed) { + return generateHighCardinalityInts(count, new Random(seed)); + } + + /** + * Generates high-cardinality integers (all unique in randomized order). + */ + public static int[] generateHighCardinalityInts(int count, Random random) { + int[] data = generateSequentialInts(count); + for (int i = count - 1; i > 0; i--) { + int swapIndex = random.nextInt(i + 1); + int tmp = data[i]; + data[i] = data[swapIndex]; + data[swapIndex] = tmp; + } + return data; + } + + // ---- Long data generation for encoding benchmarks ---- + + /** + * Generates sequential longs: 0, 1, 2, ... + */ + public static long[] generateSequentialLongs(int count) { + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random longs using the given seed. + */ + public static long[] generateRandomLongs(int count, long seed) { + Random random = new Random(seed); + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextLong(); + } + return data; + } + + /** + * Generates low-cardinality longs (values drawn from a small set). + */ + public static long[] generateLowCardinalityLongs(int count, int distinctValues, long seed) { + Random random = new Random(seed); + long[] palette = new long[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextLong(); + } + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + /** + * Generates high-cardinality longs (all unique, shuffled). + */ + public static long[] generateHighCardinalityLongs(int count, long seed) { + Random random = new Random(seed); + long[] data = generateSequentialLongs(count); + for (int i = count - 1; i > 0; i--) { + int swapIndex = random.nextInt(i + 1); + long tmp = data[i]; + data[i] = data[swapIndex]; + data[swapIndex] = tmp; + } + return data; + } + + // ---- Float data generation for encoding benchmarks ---- + + /** + * Generates uniformly random floats using the given seed. + */ + public static float[] generateRandomFloats(int count, long seed) { + Random random = new Random(seed); + float[] data = new float[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextFloat() * 1000.0f; + } + return data; + } + + /** + * Generates low-cardinality floats (values drawn from a small set). + */ + public static float[] generateLowCardinalityFloats(int count, int distinctValues, long seed) { + Random random = new Random(seed); + float[] palette = new float[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextFloat() * 1000.0f; + } + float[] data = new float[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + // ---- Double data generation for encoding benchmarks ---- + + /** + * Generates uniformly random doubles using the given seed. + */ + public static double[] generateRandomDoubles(int count, long seed) { + Random random = new Random(seed); + double[] data = new double[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextDouble() * 1000.0; + } + return data; + } + + /** + * Generates low-cardinality doubles (values drawn from a small set). + */ + public static double[] generateLowCardinalityDoubles(int count, int distinctValues, long seed) { + Random random = new Random(seed); + double[] palette = new double[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextDouble() * 1000.0; + } + double[] data = new double[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + // ---- Fixed-length byte array data generation for encoding benchmarks ---- + + /** + * Generates fixed-length byte arrays with the specified cardinality. + * + * @param count number of values + * @param length byte length of each value + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + */ + public static Binary[] generateFixedLenByteArrays(int count, int length, int distinct, long seed) { + Random random = new Random(seed); + if (distinct > 0) { + Binary[] palette = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + palette[i] = Binary.fromConstantByteArray(bytes); + } + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinct)]; + } + return data; + } else { + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + data[i] = Binary.fromConstantByteArray(bytes); + } + return data; + } + } + + // ---- Binary data generation for encoding benchmarks ---- + + /** + * Generates binary strings of the given length with the specified cardinality, using + * a deterministic seed. + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) { + return generateBinaryData(count, stringLength, distinct, new Random(seed)); + } + + /** + * Generates binary strings of the given length with the specified cardinality. + * + * @param count number of values + * @param stringLength length of each string + * @param distinct number of distinct values (0 means all unique) + * @param random random source + * @return array of Binary values + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, Random random) { + Binary[] data = new Binary[count]; + if (distinct > 0) { + // Pre-generate the distinct values + Binary[] dictionary = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + dictionary[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + for (int i = 0; i < count; i++) { + data[i] = dictionary[random.nextInt(distinct)]; + } + } else { + // All unique + for (int i = 0; i < count; i++) { + data[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + } + return data; + } + + private static String randomString(int length, Random random) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java index 1713acc012..114936d153 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java @@ -185,6 +185,88 @@ public long readLong() { throw new UnsupportedOperationException(); } + // ---- Batch read methods ---- + // Default implementations loop over the per-value methods. + // Subclasses should override with bulk/memcpy-style implementations. + + /** + * Reads {@code count} integers into {@code dest} starting at {@code offset}. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readIntegers(int[] dest, int offset, int count) { + for (int i = 0; i < count; i++) { + dest[offset + i] = readInteger(); + } + } + + /** + * Reads {@code count} longs into {@code dest} starting at {@code offset}. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readLongs(long[] dest, int offset, int count) { + for (int i = 0; i < count; i++) { + dest[offset + i] = readLong(); + } + } + + /** + * Reads {@code count} floats into {@code dest} starting at {@code offset}. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readFloats(float[] dest, int offset, int count) { + for (int i = 0; i < count; i++) { + dest[offset + i] = readFloat(); + } + } + + /** + * Reads {@code count} doubles into {@code dest} starting at {@code offset}. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readDoubles(double[] dest, int offset, int count) { + for (int i = 0; i < count; i++) { + dest[offset + i] = readDouble(); + } + } + + /** + * Reads {@code count} booleans into {@code dest} starting at {@code offset}. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readBooleans(boolean[] dest, int offset, int count) { + for (int i = 0; i < count; i++) { + dest[offset + i] = readBoolean(); + } + } + + /** + * Reads {@code count} Binary values into {@code dest} starting at {@code offset}. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readBinaries(Binary[] dest, int offset, int count) { + for (int i = 0; i < count; i++) { + dest[offset + i] = readBytes(); + } + } + /** * Skips the next value in the page */ diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java index ecea4a7520..bbe9230397 100755 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java @@ -98,6 +98,19 @@ public void writeBoolean(boolean v) { throw new UnsupportedOperationException(getClass().getName()); } + /** + * Writes a batch of boolean values. Subclasses may override for optimized bulk encoding. + * + * @param values the boolean array to read from + * @param offset the start position in the array + * @param length the number of values to write + */ + public void writeBooleans(boolean[] values, int offset, int length) { + for (int i = offset; i < offset + length; i++) { + writeBoolean(values[i]); + } + } + /** * @param v the value to encode */ @@ -105,6 +118,19 @@ public void writeBytes(Binary v) { throw new UnsupportedOperationException(getClass().getName()); } + /** + * Writes a batch of Binary values. Subclasses may override for optimized bulk encoding. + * + * @param values the Binary array to read from + * @param offset the start position in the array + * @param length the number of values to write + */ + public void writeBinaries(Binary[] values, int offset, int length) { + for (int i = offset; i < offset + length; i++) { + writeBytes(values[i]); + } + } + /** * @param v the value to encode */ @@ -112,6 +138,19 @@ public void writeInteger(int v) { throw new UnsupportedOperationException(getClass().getName()); } + /** + * Writes a batch of int values. Subclasses may override for optimized bulk encoding. + * + * @param values the int array to read from + * @param offset the start position in the array + * @param length the number of values to write + */ + public void writeIntegers(int[] values, int offset, int length) { + for (int i = offset; i < offset + length; i++) { + writeInteger(values[i]); + } + } + /** * @param v the value to encode */ @@ -119,6 +158,19 @@ public void writeLong(long v) { throw new UnsupportedOperationException(getClass().getName()); } + /** + * Writes a batch of long values. Subclasses may override for optimized bulk encoding. + * + * @param values the long array to read from + * @param offset the start position in the array + * @param length the number of values to write + */ + public void writeLongs(long[] values, int offset, int length) { + for (int i = offset; i < offset + length; i++) { + writeLong(values[i]); + } + } + /** * @param v the value to encode */ @@ -126,6 +178,19 @@ public void writeDouble(double v) { throw new UnsupportedOperationException(getClass().getName()); } + /** + * Writes a batch of double values. Subclasses may override for optimized bulk encoding. + * + * @param values the double array to read from + * @param offset the start position in the array + * @param length the number of values to write + */ + public void writeDoubles(double[] values, int offset, int length) { + for (int i = offset; i < offset + length; i++) { + writeDouble(values[i]); + } + } + /** * @param v the value to encode */ @@ -133,5 +198,18 @@ public void writeFloat(float v) { throw new UnsupportedOperationException(getClass().getName()); } + /** + * Writes a batch of float values. Subclasses may override for optimized bulk encoding. + * + * @param values the float array to read from + * @param offset the start position in the array + * @param length the number of values to write + */ + public void writeFloats(float[] values, int offset, int length) { + for (int i = offset; i < offset + length; i++) { + writeFloat(values[i]); + } + } + public abstract String memUsageString(String prefix); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java index c8ab3043bd..8edda45cd5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java @@ -49,17 +49,151 @@ protected int nextElementByteOffset() { return offset; } - // Decode an entire data page + /** + * Advances the stream position by {@code count} elements and returns the byte offset + * of the first element. Used by batch read methods in subclasses. + */ + protected int advanceByteOffset(int count) { + if (indexInStream + count > valuesCount) { + throw new ParquetDecodingException("Byte-stream data was already exhausted."); + } + int offset = indexInStream * elementSizeInBytes; + indexInStream += count; + return offset; + } + + // Decode an entire data page by transposing from stream-split layout to interleaved layout. private byte[] decodeData(ByteBuffer encoded, int valuesCount) { - assert encoded.limit() == valuesCount * elementSizeInBytes; - byte[] decoded = new byte[encoded.limit()]; - int destByteIndex = 0; - for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) { - for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) { - decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount); + int totalBytes = valuesCount * elementSizeInBytes; + assert encoded.remaining() >= totalBytes; + + // Bulk access: use the backing array directly if available, otherwise copy once. + byte[] src; + int srcBase; + if (encoded.hasArray()) { + src = encoded.array(); + srcBase = encoded.arrayOffset() + encoded.position(); + } else { + src = new byte[totalBytes]; + encoded.get(src); + srcBase = 0; + } + + byte[] decoded = new byte[totalBytes]; + + // Specialized single-pass loops for common element sizes. + if (elementSizeInBytes == 2) { + int s0 = srcBase, s1 = srcBase + valuesCount; + for (int i = 0; i < valuesCount; ++i) { + int di = i * 2; + decoded[di] = src[s0 + i]; + decoded[di + 1] = src[s1 + i]; + } + } else if (elementSizeInBytes == 4) { + int s0 = srcBase, + s1 = srcBase + valuesCount, + s2 = srcBase + 2 * valuesCount, + s3 = srcBase + 3 * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + int di = i * 4; + decoded[di] = src[s0 + i]; + decoded[di + 1] = src[s1 + i]; + decoded[di + 2] = src[s2 + i]; + decoded[di + 3] = src[s3 + i]; + } + } else if (elementSizeInBytes == 8) { + int s0 = srcBase, + s1 = srcBase + valuesCount, + s2 = srcBase + 2 * valuesCount, + s3 = srcBase + 3 * valuesCount, + s4 = srcBase + 4 * valuesCount, + s5 = srcBase + 5 * valuesCount, + s6 = srcBase + 6 * valuesCount, + s7 = srcBase + 7 * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + int di = i * 8; + decoded[di] = src[s0 + i]; + decoded[di + 1] = src[s1 + i]; + decoded[di + 2] = src[s2 + i]; + decoded[di + 3] = src[s3 + i]; + decoded[di + 4] = src[s4 + i]; + decoded[di + 5] = src[s5 + i]; + decoded[di + 6] = src[s6 + i]; + decoded[di + 7] = src[s7 + i]; + } + } else if (elementSizeInBytes == 12) { + int s0 = srcBase, + s1 = srcBase + valuesCount, + s2 = srcBase + 2 * valuesCount, + s3 = srcBase + 3 * valuesCount, + s4 = srcBase + 4 * valuesCount, + s5 = srcBase + 5 * valuesCount, + s6 = srcBase + 6 * valuesCount, + s7 = srcBase + 7 * valuesCount, + s8 = srcBase + 8 * valuesCount, + s9 = srcBase + 9 * valuesCount, + s10 = srcBase + 10 * valuesCount, + s11 = srcBase + 11 * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + int di = i * 12; + decoded[di] = src[s0 + i]; + decoded[di + 1] = src[s1 + i]; + decoded[di + 2] = src[s2 + i]; + decoded[di + 3] = src[s3 + i]; + decoded[di + 4] = src[s4 + i]; + decoded[di + 5] = src[s5 + i]; + decoded[di + 6] = src[s6 + i]; + decoded[di + 7] = src[s7 + i]; + decoded[di + 8] = src[s8 + i]; + decoded[di + 9] = src[s9 + i]; + decoded[di + 10] = src[s10 + i]; + decoded[di + 11] = src[s11 + i]; + } + } else if (elementSizeInBytes == 16) { + int s0 = srcBase, + s1 = srcBase + valuesCount, + s2 = srcBase + 2 * valuesCount, + s3 = srcBase + 3 * valuesCount, + s4 = srcBase + 4 * valuesCount, + s5 = srcBase + 5 * valuesCount, + s6 = srcBase + 6 * valuesCount, + s7 = srcBase + 7 * valuesCount, + s8 = srcBase + 8 * valuesCount, + s9 = srcBase + 9 * valuesCount, + s10 = srcBase + 10 * valuesCount, + s11 = srcBase + 11 * valuesCount, + s12 = srcBase + 12 * valuesCount, + s13 = srcBase + 13 * valuesCount, + s14 = srcBase + 14 * valuesCount, + s15 = srcBase + 15 * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + int di = i * 16; + decoded[di] = src[s0 + i]; + decoded[di + 1] = src[s1 + i]; + decoded[di + 2] = src[s2 + i]; + decoded[di + 3] = src[s3 + i]; + decoded[di + 4] = src[s4 + i]; + decoded[di + 5] = src[s5 + i]; + decoded[di + 6] = src[s6 + i]; + decoded[di + 7] = src[s7 + i]; + decoded[di + 8] = src[s8 + i]; + decoded[di + 9] = src[s9 + i]; + decoded[di + 10] = src[s10 + i]; + decoded[di + 11] = src[s11 + i]; + decoded[di + 12] = src[s12 + i]; + decoded[di + 13] = src[s13 + i]; + decoded[di + 14] = src[s14 + i]; + decoded[di + 15] = src[s15 + i]; + } + } else { + // Generic fallback for arbitrary element sizes + for (int stream = 0; stream < elementSizeInBytes; ++stream) { + int srcOffset = srcBase + stream * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + decoded[i * elementSizeInBytes + stream] = src[srcOffset + i]; + } } } - assert destByteIndex == decoded.length; return decoded; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java index e725dc9fce..0917cd3902 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java @@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForDouble() { public double readDouble() { return decodedDataBuffer.getDouble(nextElementByteOffset()); } + + @Override + public void readDoubles(double[] dest, int offset, int count) { + int byteOffset = advanceByteOffset(count); + decodedDataBuffer.position(byteOffset); + decodedDataBuffer.asDoubleBuffer().get(dest, offset, count); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java index d8613dd8b9..b026a7d76e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFLBA.java @@ -30,4 +30,17 @@ public ByteStreamSplitValuesReaderForFLBA(int length) { public Binary readBytes() { return Binary.fromConstantByteBuffer(decodedDataBuffer, nextElementByteOffset(), elementSizeInBytes); } + + /** + * Batch read: advances the stream by {@code count} elements in a single bounds check, + * then creates Binary views at sequential offsets — eliminating per-value bounds checking. + */ + @Override + public void readBinaries(Binary[] dest, int offset, int count) { + int byteOffset = advanceByteOffset(count); + for (int i = 0; i < count; i++) { + dest[offset + i] = Binary.fromConstantByteBuffer(decodedDataBuffer, byteOffset, elementSizeInBytes); + byteOffset += elementSizeInBytes; + } + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java index cecb7925d8..bb28ef0ac2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java @@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForFloat() { public float readFloat() { return decodedDataBuffer.getFloat(nextElementByteOffset()); } + + @Override + public void readFloats(float[] dest, int offset, int count) { + int byteOffset = advanceByteOffset(count); + decodedDataBuffer.position(byteOffset); + decodedDataBuffer.asFloatBuffer().get(dest, offset, count); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java index 57f9bfdf03..e71079d2f6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java @@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForInteger() { public int readInteger() { return decodedDataBuffer.getInt(nextElementByteOffset()); } + + @Override + public void readIntegers(int[] dest, int offset, int count) { + int byteOffset = advanceByteOffset(count); + decodedDataBuffer.position(byteOffset); + decodedDataBuffer.asIntBuffer().get(dest, offset, count); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java index c7711d8919..f73c46e972 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java @@ -27,4 +27,11 @@ public ByteStreamSplitValuesReaderForLong() { public long readLong() { return decodedDataBuffer.getLong(nextElementByteOffset()); } + + @Override + public void readLongs(long[] dest, int offset, int count) { + int byteOffset = advanceByteOffset(count); + decodedDataBuffer.position(byteOffset); + decodedDataBuffer.asLongBuffer().get(dest, offset, count); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java index c197a4fd6f..e62126ed4d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java @@ -29,9 +29,15 @@ public abstract class ByteStreamSplitValuesWriter extends ValuesWriter { + /** + * Batch size for buffered scatter writes. Values are accumulated in a batch buffer + * and flushed as bulk {@code write(byte[], off, len)} calls to each stream. + */ + private static final int BATCH_SIZE = 64; + protected final int numStreams; protected final int elementSizeInBytes; - private final CapacityByteArrayOutputStream[] byteStreams; + protected final CapacityByteArrayOutputStream[] byteStreams; public ByteStreamSplitValuesWriter( int elementSizeInBytes, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { @@ -176,6 +182,8 @@ public String memUsageString(String prefix) { public static class FixedLenByteArrayByteStreamSplitValuesWriter extends ByteStreamSplitValuesWriter { private final int length; + private byte[][] batchBufs; // [stream][batchIndex] scratch buffers + private int flbaBatchCount; public FixedLenByteArrayByteStreamSplitValuesWriter( int length, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { @@ -187,7 +195,69 @@ public FixedLenByteArrayByteStreamSplitValuesWriter( public final void writeBytes(Binary v) { assert (v.length() == length) : ("Fixed Binary size " + v.length() + " does not match field type length " + length); - super.scatterBytes(v.getBytesUnsafe()); + if (batchBufs == null) { + batchBufs = new byte[length][BATCH_SIZE]; + } + byte[] bytes = v.getBytesUnsafe(); + for (int stream = 0; stream < length; stream++) { + batchBufs[stream][flbaBatchCount] = bytes[stream]; + } + flbaBatchCount++; + if (flbaBatchCount == BATCH_SIZE) { + flushFlbaBatch(); + } + } + + @Override + public void writeBinaries(Binary[] values, int offset, int len) { + if (batchBufs == null) { + batchBufs = new byte[length][BATCH_SIZE]; + } + for (int i = offset; i < offset + len; i++) { + Binary v = values[i]; + assert (v.length() == length) + : ("Fixed Binary size " + v.length() + " does not match field type length " + length); + byte[] bytes = v.getBytesUnsafe(); + for (int stream = 0; stream < length; stream++) { + batchBufs[stream][flbaBatchCount] = bytes[stream]; + } + flbaBatchCount++; + if (flbaBatchCount == BATCH_SIZE) { + flushFlbaBatch(); + } + } + } + + private void flushFlbaBatch() { + if (flbaBatchCount == 0) return; + final int count = flbaBatchCount; + for (int stream = 0; stream < length; stream++) { + byteStreams[stream].write(batchBufs[stream], 0, count); + } + flbaBatchCount = 0; + } + + @Override + public BytesInput getBytes() { + flushFlbaBatch(); + return super.getBytes(); + } + + @Override + public void reset() { + flbaBatchCount = 0; + super.reset(); + } + + @Override + public void close() { + flbaBatchCount = 0; + super.close(); + } + + @Override + public long getBufferedSize() { + return super.getBufferedSize() + (long) flbaBatchCount * length; } @Override diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java index 259ebc09c0..6726614460 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java @@ -112,6 +112,22 @@ public long readLong() { return valuesBuffer[valuesRead++]; } + @Override + public void readIntegers(int[] dest, int offset, int count) { + checkRead(); + for (int i = 0; i < count; i++) { + dest[offset + i] = (int) valuesBuffer[valuesRead + i]; + } + valuesRead += count; + } + + @Override + public void readLongs(long[] dest, int offset, int count) { + checkRead(); + System.arraycopy(valuesBuffer, valuesRead, dest, offset, count); + valuesRead += count; + } + private void checkRead() { if (valuesRead >= totalValueCount) { throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java index 53fafc55dc..db344c3e63 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -117,6 +117,59 @@ public long readLong() { } } + @Override + public void readIntegers(int[] dest, int offset, int count) { + try { + // Batch-decode dictionary IDs, then batch-lookup + int[] ids = new int[count]; + decoder.readInts(ids, 0, count); + for (int i = 0; i < count; i++) { + dest[offset + i] = dictionary.decodeToInt(ids[i]); + } + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + + @Override + public void readLongs(long[] dest, int offset, int count) { + try { + int[] ids = new int[count]; + decoder.readInts(ids, 0, count); + for (int i = 0; i < count; i++) { + dest[offset + i] = dictionary.decodeToLong(ids[i]); + } + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + + @Override + public void readFloats(float[] dest, int offset, int count) { + try { + int[] ids = new int[count]; + decoder.readInts(ids, 0, count); + for (int i = 0; i < count; i++) { + dest[offset + i] = dictionary.decodeToFloat(ids[i]); + } + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + + @Override + public void readDoubles(double[] dest, int offset, int count) { + try { + int[] ids = new int[count]; + decoder.readInts(ids, 0, count); + for (int i = 0; i < count; i++) { + dest[offset + i] = dictionary.decodeToDouble(ids[i]); + } + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + @Override public void skip() { try { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java index 22ca2d567c..3843e3b6f0 100755 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -18,57 +18,122 @@ */ package org.apache.parquet.column.values.plain; -import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; - import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * encodes boolean for the plain encoding: one bit at a time (0 = false) + * Decodes PLAIN-encoded booleans: one bit per value, packed 8 per byte, little-endian + * bit order (bit 0 of each byte is the first value). + * + *

Direct bit extraction from the page ByteBuffer avoids the overhead of the generic + * bit-packing machinery ({@code ByteBitPackingValuesReader}) and intermediate + * {@code int[8]} buffers. + * + *

The batch path uses a static 256-entry lookup table that maps each byte value to + * its 8 pre-decoded booleans. This enables {@code System.arraycopy} of 8 booleans per + * byte (a single 64-bit memory operation in HotSpot) instead of 8 individual + * comparison+store operations. */ public class BooleanPlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(BooleanPlainValuesReader.class); - private ByteBitPackingValuesReader in = new ByteBitPackingValuesReader(1, LITTLE_ENDIAN); - /** - * {@inheritDoc} - * - * @see org.apache.parquet.column.values.ValuesReader#readBoolean() + * Lookup table: BYTE_TO_BOOLS[b] contains the 8 boolean values for byte value b, + * in little-endian bit order (bit 0 = index 0). */ + private static final boolean[][] BYTE_TO_BOOLS = new boolean[256][8]; + + static { + for (int b = 0; b < 256; b++) { + for (int bit = 0; bit < 8; bit++) { + BYTE_TO_BOOLS[b][bit] = ((b >>> bit) & 1) != 0; + } + } + } + + private byte[] pageData; + private int pageOffset; + private int bitIndex; + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + int effectiveBitLength = valueCount; // bitWidth = 1 + int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); + length = Math.min(length, stream.available()); + ByteBuffer buf = stream.slice(length); + + // Bulk access: use backing array directly if available, otherwise copy once. + if (buf.hasArray()) { + pageData = buf.array(); + pageOffset = buf.arrayOffset() + buf.position(); + } else { + pageData = new byte[length]; + buf.get(pageData); + pageOffset = 0; + } + bitIndex = 0; + updateNextOffset(length); + } + @Override public boolean readBoolean() { - return in.readInteger() == 0 ? false : true; + int byteIdx = pageOffset + (bitIndex >>> 3); + int bitPos = bitIndex & 7; + bitIndex++; + return ((pageData[byteIdx] >>> bitPos) & 1) != 0; } - /** - * {@inheritDoc} - * - * @see org.apache.parquet.column.values.ValuesReader#skip() - */ @Override - public void skip() { - in.readInteger(); + public void readBooleans(boolean[] dest, int offset, int count) { + int i = 0; + + // Handle partial byte at current position + int bitPos = bitIndex & 7; + if (bitPos != 0) { + int byteIdx = pageOffset + (bitIndex >>> 3); + byte b = pageData[byteIdx]; + while (bitPos < 8 && i < count) { + dest[offset + i] = ((b >>> bitPos) & 1) != 0; + bitPos++; + i++; + } + } + + // Process full bytes: 8 booleans per byte via lookup table + arraycopy + int byteIdx = pageOffset + ((bitIndex + i) >>> 3); + while (i + 8 <= count) { + System.arraycopy(BYTE_TO_BOOLS[pageData[byteIdx] & 0xFF], 0, dest, offset + i, 8); + byteIdx++; + i += 8; + } + + // Handle remaining bits in the last partial byte + if (i < count) { + byte b = pageData[byteIdx]; + int bp = 0; + while (i < count) { + dest[offset + i] = ((b >>> bp) & 1) != 0; + bp++; + i++; + } + } + + bitIndex += count; } - /** - * {@inheritDoc} - * - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream) - */ @Override - public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in.initFromPage(valueCount, stream); + public void skip() { + bitIndex++; } - @Deprecated @Override - public int getNextOffset() { - return in.getNextOffset(); + public void skip(int n) { + bitIndex += n; } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java index 7f80ec150a..ae3b43c63b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java @@ -19,52 +19,118 @@ package org.apache.parquet.column.values.plain; import static org.apache.parquet.column.Encoding.PLAIN; -import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; -import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesWriter; /** - * An implementation of the PLAIN encoding + * An implementation of the PLAIN encoding for BOOLEAN values. + * + *

Packs booleans directly into bytes (8 per byte, LSB first) without + * going through the generic int-based bit-packing encoder. */ public class BooleanPlainValuesWriter extends ValuesWriter { - private ByteBitPackingValuesWriter bitPackingWriter; + private static final int INITIAL_SLAB_SIZE = 1024; + private static final int MAX_CAPACITY = 64 * 1024; + + private final CapacityByteArrayOutputStream baos; + private int currentByte; + private int bitsWritten; public BooleanPlainValuesWriter() { - bitPackingWriter = new ByteBitPackingValuesWriter(1, LITTLE_ENDIAN); + this.baos = new CapacityByteArrayOutputStream(INITIAL_SLAB_SIZE, MAX_CAPACITY); + this.currentByte = 0; + this.bitsWritten = 0; } @Override public final void writeBoolean(boolean v) { - bitPackingWriter.writeInteger(v ? 1 : 0); + currentByte |= ((v ? 1 : 0) << bitsWritten); + bitsWritten++; + if (bitsWritten == 8) { + baos.write(currentByte); + currentByte = 0; + bitsWritten = 0; + } + } + + @Override + public void writeBooleans(boolean[] values, int offset, int length) { + int pos = offset; + int end = offset + length; + + // Fill current partial byte + while (bitsWritten > 0 && bitsWritten < 8 && pos < end) { + if (values[pos]) { + currentByte |= (1 << bitsWritten); + } + bitsWritten++; + pos++; + if (bitsWritten == 8) { + baos.write(currentByte); + currentByte = 0; + bitsWritten = 0; + } + } + + // Process 8 values at a time — pack directly into a byte + while (pos + 8 <= end) { + int b = 0; + if (values[pos]) b |= 0x01; + if (values[pos + 1]) b |= 0x02; + if (values[pos + 2]) b |= 0x04; + if (values[pos + 3]) b |= 0x08; + if (values[pos + 4]) b |= 0x10; + if (values[pos + 5]) b |= 0x20; + if (values[pos + 6]) b |= 0x40; + if (values[pos + 7]) b |= 0x80; + baos.write(b); + pos += 8; + } + + // Handle remaining values (< 8) + while (pos < end) { + if (values[pos]) { + currentByte |= (1 << bitsWritten); + } + bitsWritten++; + pos++; + } } @Override public long getBufferedSize() { - return bitPackingWriter.getBufferedSize(); + return baos.size() + (bitsWritten > 0 ? 1 : 0); } @Override public BytesInput getBytes() { - return bitPackingWriter.getBytes(); + if (bitsWritten > 0) { + baos.write(currentByte); + currentByte = 0; + bitsWritten = 0; + } + return BytesInput.from(baos); } @Override public void reset() { - bitPackingWriter.reset(); + baos.reset(); + currentByte = 0; + bitsWritten = 0; } @Override public void close() { - bitPackingWriter.close(); + baos.close(); } @Override public long getAllocatedSize() { - return bitPackingWriter.getAllocatedSize(); + return baos.getCapacity(); } @Override @@ -74,6 +140,6 @@ public Encoding getEncoding() { @Override public String memUsageString(String prefix) { - return bitPackingWriter.memUsageString(prefix); + return String.format("%s BooleanPlainValuesWriter %d bytes", prefix, getAllocatedSize()); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java index adfc488924..6200ae4477 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -62,6 +63,25 @@ public void skip(int n) { } } + /** + * Batch read: slices the entire block of {@code count * length} bytes in one call, + * then creates Binary views at fixed offsets within the single ByteBuffer — eliminating + * per-value slice overhead. + */ + @Override + public void readBinaries(Binary[] dest, int offset, int count) { + try { + int totalBytes = count * length; + ByteBuffer block = in.slice(totalBytes); + int baseOffset = block.position(); + for (int i = 0; i < count; i++) { + dest[offset + i] = Binary.fromConstantByteBuffer(block, baseOffset + i * length, length); + } + } catch (IOException | RuntimeException e) { + throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); + } + } + @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java index dec4d1be1b..9d8c7e464b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java @@ -62,6 +62,42 @@ public final void writeBytes(Binary v) { } } + /** + * Batch write: copies Binary values into a temporary buffer and writes them in a single + * bulk {@code write()} call to the output stream, amortizing stream overhead across + * the entire batch. + */ + @Override + public void writeBinaries(Binary[] values, int offset, int length) { + final int fixedLen = this.length; + // Process in chunks to avoid excessive temp allocation + final int CHUNK = 1024; + byte[] buf = new byte[Math.min(length, CHUNK) * fixedLen]; + try { + int remaining = length; + int srcIdx = offset; + while (remaining > 0) { + int batch = Math.min(remaining, CHUNK); + int bufPos = 0; + for (int i = 0; i < batch; i++) { + Binary v = values[srcIdx++]; + if (v.length() != fixedLen) { + throw new IllegalArgumentException( + "Fixed Binary size " + v.length() + " does not match field type length " + fixedLen); + } + // Copy bytes from the Binary's backing store into the batch buffer + byte[] bytes = v.getBytesUnsafe(); + System.arraycopy(bytes, 0, buf, bufPos, fixedLen); + bufPos += fixedLen; + } + arrayOut.write(buf, 0, bufPos); + remaining -= batch; + } + } catch (RuntimeException e) { + throw new ParquetEncodingException("could not write fixed bytes", e); + } + } + @Override public long getBufferedSize() { return arrayOut.size(); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index a0c7af7394..a3d0d06923 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -71,6 +71,17 @@ public double readDouble() { throw new ParquetDecodingException("could not read double", e); } } + + @Override + public void readDoubles(double[] dest, int offset, int count) { + try { + for (int i = 0; i < count; i++) { + dest[offset + i] = in.readDouble(); + } + } catch (IOException e) { + throw new ParquetDecodingException("could not read doubles", e); + } + } } public static class FloatPlainValuesReader extends PlainValuesReader { @@ -92,6 +103,17 @@ public float readFloat() { throw new ParquetDecodingException("could not read float", e); } } + + @Override + public void readFloats(float[] dest, int offset, int count) { + try { + for (int i = 0; i < count; i++) { + dest[offset + i] = in.readFloat(); + } + } catch (IOException e) { + throw new ParquetDecodingException("could not read floats", e); + } + } } public static class IntegerPlainValuesReader extends PlainValuesReader { @@ -113,6 +135,17 @@ public int readInteger() { throw new ParquetDecodingException("could not read int", e); } } + + @Override + public void readIntegers(int[] dest, int offset, int count) { + try { + for (int i = 0; i < count; i++) { + dest[offset + i] = in.readInt(); + } + } catch (IOException e) { + throw new ParquetDecodingException("could not read ints", e); + } + } } public static class LongPlainValuesReader extends PlainValuesReader { @@ -134,5 +167,16 @@ public long readLong() { throw new ParquetDecodingException("could not read long", e); } } + + @Override + public void readLongs(long[] dest, int offset, int count) { + try { + for (int i = 0; i < count; i++) { + dest[offset + i] = in.readLong(); + } + } catch (IOException e) { + throw new ParquetDecodingException("could not read longs", e); + } + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java index c7069bc092..0802f46d2a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java @@ -94,6 +94,26 @@ public final void writeDouble(double v) { } } + @Override + public final void writeIntegers(int[] values, int offset, int length) { + arrayOut.writeInts(values, offset, length); + } + + @Override + public final void writeLongs(long[] values, int offset, int length) { + arrayOut.writeLongs(values, offset, length); + } + + @Override + public final void writeFloats(float[] values, int offset, int length) { + arrayOut.writeFloats(values, offset, length); + } + + @Override + public final void writeDoubles(double[] values, int offset, int length) { + arrayOut.writeDoubles(values, offset, length); + } + @Override public void writeByte(int value) { try { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index e55b276b29..f2dd50d623 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -48,6 +48,8 @@ private static enum MODE { private int currentCount; private int currentValue; private int[] currentBuffer; + // Saved packed bytes for bitWidth=1 boolean optimization (lookup table decode) + private byte[] packedBytesBuffer; public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { LOG.debug("decoding bitWidth {}", bitWidth); @@ -77,6 +79,121 @@ public int readInt() throws IOException { return result; } + /** + * Reads {@code count} int values into {@code dest} starting at {@code offset}. + * This avoids per-value virtual dispatch overhead by batching across RLE runs + * and packed groups. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readInts(int[] dest, int offset, int count) throws IOException { + int remaining = count; + int pos = offset; + while (remaining > 0) { + if (currentCount == 0) { + readNext(); + } + int batchSize = Math.min(remaining, currentCount); + switch (mode) { + case RLE: + java.util.Arrays.fill(dest, pos, pos + batchSize, currentValue); + break; + case PACKED: + int startIdx = currentBuffer.length - currentCount; + System.arraycopy(currentBuffer, startIdx, dest, pos, batchSize); + break; + default: + throw new ParquetDecodingException("not a valid mode " + mode); + } + currentCount -= batchSize; + remaining -= batchSize; + pos += batchSize; + } + } + + /** + * Lookup table for bitWidth=1: maps each byte to its 8 boolean values. + * Used by {@link #readBooleans} PACKED path to bypass the int[] intermediate. + */ + private static final boolean[][] BYTE_TO_BOOLS = new boolean[256][8]; + + static { + for (int b = 0; b < 256; b++) { + for (int bit = 0; bit < 8; bit++) { + BYTE_TO_BOOLS[b][bit] = ((b >>> bit) & 1) != 0; + } + } + } + + /** + * Reads {@code count} boolean values into {@code dest} starting at {@code offset}. + * For RLE runs, uses {@code Arrays.fill} with a single boolean value. + * For packed groups, uses a lookup table to decode 8 booleans per byte directly + * from the raw packed bytes, bypassing the int[] intermediate buffer. + * + * @param dest destination array + * @param offset start index in dest + * @param count number of values to read + */ + public void readBooleans(boolean[] dest, int offset, int count) throws IOException { + int remaining = count; + int pos = offset; + while (remaining > 0) { + if (currentCount == 0) { + readNext(); + } + int batchSize = Math.min(remaining, currentCount); + switch (mode) { + case RLE: + java.util.Arrays.fill(dest, pos, pos + batchSize, currentValue != 0); + break; + case PACKED: + // For bitWidth=1, read directly from packedBytesBuffer via lookup table + int bitOff = currentBuffer.length - currentCount; + int written = 0; + + // Handle partial byte alignment + int bitPos = bitOff & 7; + if (bitPos != 0) { + int byteIdx = bitOff >>> 3; + byte b = packedBytesBuffer[byteIdx]; + while (bitPos < 8 && written < batchSize) { + dest[pos + written] = ((b >>> bitPos) & 1) != 0; + bitPos++; + written++; + } + } + + // Process full bytes via lookup table + int byteIdx = (bitOff + written) >>> 3; + while (written + 8 <= batchSize) { + System.arraycopy(BYTE_TO_BOOLS[packedBytesBuffer[byteIdx] & 0xFF], 0, dest, pos + written, 8); + byteIdx++; + written += 8; + } + + // Handle remaining bits + if (written < batchSize) { + byte b = packedBytesBuffer[byteIdx]; + int bp = 0; + while (written < batchSize) { + dest[pos + written] = ((b >>> bp) & 1) != 0; + bp++; + written++; + } + } + break; + default: + throw new ParquetDecodingException("not a valid mode " + mode); + } + currentCount -= batchSize; + remaining -= batchSize; + pos += batchSize; + } + } + private void readNext() throws IOException { Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream."); final int header = BytesUtils.readUnsignedVarInt(in); @@ -97,6 +214,7 @@ private void readNext() throws IOException { int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); bytesToRead = Math.min(bytesToRead, in.available()); new DataInputStream(in).readFully(bytes, 0, bytesToRead); + packedBytesBuffer = bytes; for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index e33824bff1..fc83e85963 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -272,6 +272,71 @@ public BytesInput toBytes() throws IOException { return BytesInput.from(baos); } + /** + * Batch-encodes boolean values (bitWidth must be 1). Pre-scans for runs to emit + * RLE runs directly and packs remaining groups into bit-packed runs, bypassing + * the per-value state machine. + * + *

This method may only be called when the encoder is in its initial state + * (no values have been written via {@link #writeInt}). If called after scalar + * writes, behavior is undefined. + * + * @param values the boolean array + * @param offset start position in the array + * @param length number of values to encode + */ + public void writeBooleans(boolean[] values, int offset, int length) throws IOException { + Preconditions.checkArgument(bitWidth == 1, "writeBooleans requires bitWidth == 1"); + + int pos = offset; + int end = offset + length; + + while (pos < end) { + // Scan for run of consecutive identical values + boolean val = values[pos]; + int runStart = pos; + pos++; + while (pos < end && values[pos] == val) { + pos++; + } + int runLen = pos - runStart; + int intVal = val ? 1 : 0; + + // If we have a pending partial buffer, fill it first from this run + if (numBufferedValues > 0 && runLen >= 8) { + int fill = 8 - numBufferedValues; + for (int i = 0; i < fill; i++) { + bufferedValues[numBufferedValues] = intVal; + numBufferedValues++; + } + writeOrAppendBitPackedRun(); + runLen -= fill; + } + + if (runLen >= 8) { + // Buffer is empty now, emit RLE run for the remaining + endPreviousBitPackedRun(); + BytesUtils.writeUnsignedVarInt(runLen << 1, baos); + BytesUtils.writeIntLittleEndianPaddedOnBitWidth(baos, intVal, bitWidth); + } else { + // Buffer values for bit-packing + for (int i = 0; i < runLen; i++) { + bufferedValues[numBufferedValues] = intVal; + numBufferedValues++; + if (numBufferedValues == 8) { + writeOrAppendBitPackedRun(); + } + } + } + } + + // Update state so toBytes() handles the tail correctly + repeatCount = 0; + if (numBufferedValues > 0) { + previousValue = bufferedValues[numBufferedValues - 1]; + } + } + /** * Reset this encoder for re-use */ diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index 0bd5a18d2b..9ee70add6d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -54,6 +54,24 @@ public int readInteger() { } } + @Override + public void readIntegers(int[] dest, int offset, int count) { + try { + decoder.readInts(dest, offset, count); + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + + @Override + public void readBooleans(boolean[] dest, int offset, int count) { + try { + decoder.readBooleans(dest, offset, count); + } catch (IOException e) { + throw new ParquetDecodingException(e); + } + } + @Override public boolean readBoolean() { return readInteger() == 0 ? false : true; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java index e869b0f2a3..b6609b1d43 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java @@ -52,6 +52,15 @@ public void writeBoolean(boolean v) { writeInteger(v ? 1 : 0); } + @Override + public void writeBooleans(boolean[] values, int offset, int length) { + try { + encoder.writeBooleans(values, offset, length); + } catch (IOException e) { + throw new ParquetEncodingException(e); + } + } + @Override public long getBufferedSize() { return encoder.getBufferedSize(); diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java index d3d8b1b6de..7dbe22a6b3 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; import org.apache.parquet.OutputStreamCloseException; @@ -201,6 +202,7 @@ private void addSlab(int minimumSize) { LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize); this.currentSlab = allocator.allocate(nextSlabSize); + this.currentSlab.order(ByteOrder.LITTLE_ENDIAN); this.slabs.add(currentSlab); this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize); } @@ -232,6 +234,114 @@ public void write(byte b[], int off, int len) { bytesUsed = Math.addExact(bytesUsed, len); } + /** + * Writes multiple int values in little-endian byte order using bulk {@code IntBuffer} transfer. + * Amortizes capacity checks across the entire batch and leverages platform-optimized bulk put. + * + * @param values source array + * @param offset start index in values + * @param length number of ints to write + */ + public void writeInts(int[] values, int offset, int length) { + int bytesNeeded = length * 4; + if (bytesNeeded <= currentSlab.remaining()) { + currentSlab.asIntBuffer().put(values, offset, length); + currentSlab.position(currentSlab.position() + bytesNeeded); + } else { + // Fill current slab, then continue into a new one + int fits = currentSlab.remaining() / 4; + if (fits > 0) { + currentSlab.asIntBuffer().put(values, offset, fits); + currentSlab.position(currentSlab.position() + fits * 4); + } + int remaining = length - fits; + addSlab(remaining * 4); + currentSlab.asIntBuffer().put(values, offset + fits, remaining); + currentSlab.position(currentSlab.position() + remaining * 4); + } + bytesUsed = Math.addExact(bytesUsed, bytesNeeded); + } + + /** + * Writes multiple long values in little-endian byte order using bulk {@code LongBuffer} transfer. + * + * @param values source array + * @param offset start index in values + * @param length number of longs to write + */ + public void writeLongs(long[] values, int offset, int length) { + int bytesNeeded = length * 8; + if (bytesNeeded <= currentSlab.remaining()) { + currentSlab.asLongBuffer().put(values, offset, length); + currentSlab.position(currentSlab.position() + bytesNeeded); + } else { + int fits = currentSlab.remaining() / 8; + if (fits > 0) { + currentSlab.asLongBuffer().put(values, offset, fits); + currentSlab.position(currentSlab.position() + fits * 8); + } + int remaining = length - fits; + addSlab(remaining * 8); + currentSlab.asLongBuffer().put(values, offset + fits, remaining); + currentSlab.position(currentSlab.position() + remaining * 8); + } + bytesUsed = Math.addExact(bytesUsed, bytesNeeded); + } + + /** + * Writes multiple float values in little-endian byte order using bulk {@code FloatBuffer} transfer. + * The slab's LE byte order ensures correct IEEE 754 encoding without explicit + * {@code Float.floatToIntBits()} conversion. + * + * @param values source array + * @param offset start index in values + * @param length number of floats to write + */ + public void writeFloats(float[] values, int offset, int length) { + int bytesNeeded = length * 4; + if (bytesNeeded <= currentSlab.remaining()) { + currentSlab.asFloatBuffer().put(values, offset, length); + currentSlab.position(currentSlab.position() + bytesNeeded); + } else { + int fits = currentSlab.remaining() / 4; + if (fits > 0) { + currentSlab.asFloatBuffer().put(values, offset, fits); + currentSlab.position(currentSlab.position() + fits * 4); + } + int remaining = length - fits; + addSlab(remaining * 4); + currentSlab.asFloatBuffer().put(values, offset + fits, remaining); + currentSlab.position(currentSlab.position() + remaining * 4); + } + bytesUsed = Math.addExact(bytesUsed, bytesNeeded); + } + + /** + * Writes multiple double values in little-endian byte order using bulk {@code DoubleBuffer} transfer. + * + * @param values source array + * @param offset start index in values + * @param length number of doubles to write + */ + public void writeDoubles(double[] values, int offset, int length) { + int bytesNeeded = length * 8; + if (bytesNeeded <= currentSlab.remaining()) { + currentSlab.asDoubleBuffer().put(values, offset, length); + currentSlab.position(currentSlab.position() + bytesNeeded); + } else { + int fits = currentSlab.remaining() / 8; + if (fits > 0) { + currentSlab.asDoubleBuffer().put(values, offset, fits); + currentSlab.position(currentSlab.position() + fits * 8); + } + int remaining = length - fits; + addSlab(remaining * 8); + currentSlab.asDoubleBuffer().put(values, offset + fits, remaining); + currentSlab.position(currentSlab.position() + remaining * 8); + } + bytesUsed = Math.addExact(bytesUsed, bytesNeeded); + } + private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException { if (buf.hasArray()) { out.write(buf.array(), buf.arrayOffset(), len);