Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions parquet-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</path>
</annotationProcessorPaths>
<annotationProcessors>
<annotationProcessor>org.openjdk.jmh.generators.BenchmarkProcessor</annotationProcessor>
</annotationProcessors>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import java.io.IOException;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;

/**
* Shared helpers for encode/decode micro-benchmarks.
*/
final class BenchmarkEncodingUtils {

private BenchmarkEncodingUtils() {}

/**
* Container for the two artefacts produced by a dictionary-encoded page:
* the encoded dictionary indices ({@link #dictData}) and the dictionary
* page itself ({@link #dictPage}). The dictionary page may be {@code null}
* if the writer fell back to plain encoding (for example, when the
* dictionary exceeded its configured maximum size).
*/
static final class EncodedDictionary {
final byte[] dictData;
final DictionaryPage dictPage;

EncodedDictionary(byte[] dictData, DictionaryPage dictPage) {
this.dictData = dictData;
this.dictPage = dictPage;
}

boolean fellBackToPlain() {
return dictPage == null;
}
}

/**
* Drains a {@link DictionaryValuesWriter} into an {@link EncodedDictionary}.
*
* <p>The writer's data bytes (the RLE-encoded indices) and the dictionary
* page are returned separately so both pieces can be measured or fed to a
* decoder symmetrically. The dictionary page buffer is copied so it remains
* valid after the writer's allocator is released.
*
* <p>The writer is closed via {@code toDictPageAndClose()}; callers must not
* call {@link DictionaryValuesWriter#close()} again afterwards.
*/
static EncodedDictionary drainDictionary(DictionaryValuesWriter writer) throws IOException {
byte[] dictData = writer.getBytes().toByteArray();
DictionaryPage rawPage = writer.toDictPageAndClose();
DictionaryPage dictPage = rawPage == null ? null : rawPage.copy();
return new EncodedDictionary(dictData, dictPage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import java.io.IOException;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

/**
* A no-op {@link OutputFile} that discards all written data.
* Useful for isolating CPU/encoding cost from filesystem I/O in write benchmarks.
*/
public final class BlackHoleOutputFile implements OutputFile {

public static final BlackHoleOutputFile INSTANCE = new BlackHoleOutputFile();

private BlackHoleOutputFile() {}

@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return -1L;
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) {
return create(blockSizeHint);
}

@Override
public PositionOutputStream create(long blockSizeHint) {
return new PositionOutputStream() {
private long pos;

@Override
public long getPos() throws IOException {
return pos;
}

@Override
public void write(int b) throws IOException {
++pos;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
pos += len;
}
};
}

@Override
public String getPath() {
return "/dev/null";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.LocalOutputFile;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

/**
* File-level read benchmarks measuring end-to-end Parquet read throughput through the
* example {@link Group} API. A temporary file is generated once during setup from
* pre-generated rows using {@link LocalOutputFile}, then read repeatedly during the
* benchmark.
*
* <p>Parameterized across compression codec and writer version. The footer parse
* (via {@link LocalInputFile} open) is included in the timed section so the result
* reflects the full open-and-read cost a typical caller would observe.
*
* <p>{@link Mode#SingleShotTime} is used because each invocation does enough work
* (a full read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
* amortization across invocations is unnecessary. Ten measurement iterations
* provide stable statistics for SS mode.
*/
@BenchmarkMode(Mode.SingleShotTime)
@Fork(1)
@Warmup(iterations = 5, batchSize = 1)
@Measurement(iterations = 10, batchSize = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class FileReadBenchmark {

@Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"})
public String codec;

@Param({"PARQUET_1_0", "PARQUET_2_0"})
public String writerVersion;

private File tempFile;

@Setup(Level.Trial)
public void setup() throws IOException {
tempFile = File.createTempFile("parquet-read-bench-", ".parquet");
tempFile.deleteOnExit();
tempFile.delete(); // remove so the writer can create it

Group[] rows = TestDataFactory.generateRows(
TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath()))
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withType(TestDataFactory.FILE_BENCHMARK_SCHEMA)
.withCompressionCodec(CompressionCodecName.valueOf(codec))
.withWriterVersion(WriterVersion.valueOf(writerVersion))
.withDictionaryEncoding(true)
.build()) {
for (Group row : rows) {
writer.write(row);
}
}
}

@TearDown(Level.Trial)
public void tearDown() {
if (tempFile != null && tempFile.exists()) {
tempFile.delete();
}
}

@Benchmark
public void readFile(Blackhole bh) throws IOException {
InputFile inputFile = new LocalInputFile(tempFile.toPath());
try (ParquetReader<Group> reader = new ParquetReader.Builder<Group>(inputFile) {
@Override
protected ReadSupport<Group> getReadSupport() {
return new GroupReadSupport();
}
}.build()) {
Group group;
while ((group = reader.read()) != null) {
bh.consume(group);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

/**
* File-level write benchmarks measuring end-to-end Parquet write throughput through the
* example {@link Group} API. Row contents are pre-generated during setup so compression
* and writer settings dominate the timed section, while writes still flow through the
* full Parquet writer path.
*
* <p>Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU and encoding cost
* from filesystem I/O. Parameterized across compression codec, writer version, and
* dictionary encoding.
*
* <p>{@link Mode#SingleShotTime} is used because each invocation does enough work
* (a full write of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
* amortization across invocations is unnecessary. Ten measurement iterations
* provide stable statistics for SS mode.
*/
@BenchmarkMode(Mode.SingleShotTime)
@Fork(1)
@Warmup(iterations = 5, batchSize = 1)
@Measurement(iterations = 10, batchSize = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class FileWriteBenchmark {

@Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"})
public String codec;

@Param({"PARQUET_1_0", "PARQUET_2_0"})
public String writerVersion;

@Param({"true", "false"})
public String dictionary;

private Group[] rows;

@Setup(Level.Trial)
public void setup() {
rows = TestDataFactory.generateRows(
TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
}

@Benchmark
public void writeFile() throws IOException {
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withType(TestDataFactory.FILE_BENCHMARK_SCHEMA)
.withCompressionCodec(CompressionCodecName.valueOf(codec))
.withWriterVersion(WriterVersion.valueOf(writerVersion))
.withDictionaryEncoding(Boolean.parseBoolean(dictionary))
.build()) {
for (Group row : rows) {
writer.write(row);
}
}
}
}
Loading