Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class ParquetProperties {
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true;

public static final boolean DEFAULT_DICTIONARY_EARLY_CHECK_ENABLED = true;
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

/**
Expand Down Expand Up @@ -131,6 +132,7 @@ public static WriterVersion fromString(String name) {
private final int rowGroupRowCountLimit;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean dictionaryEarlyCheckEnabled;
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
Expand Down Expand Up @@ -163,6 +165,7 @@ private ParquetProperties(Builder builder) {
this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.dictionaryEarlyCheckEnabled = builder.dictionaryEarlyCheckEnabled;
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
Expand Down Expand Up @@ -322,6 +325,10 @@ public boolean getPageWriteChecksumEnabled() {
return pageWriteChecksumEnabled;
}

public boolean isDictionaryEarlyCheckEnabled() {
return dictionaryEarlyCheckEnabled;
}

public OptionalLong getBloomFilterNDV(ColumnDescriptor column) {
Long ndv = bloomFilterNDVs.getValue(column);
return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv);
Expand Down Expand Up @@ -415,6 +422,7 @@ public static class Builder {
private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean dictionaryEarlyCheckEnabled = DEFAULT_DICTIONARY_EARLY_CHECK_ENABLED;
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
Expand Down Expand Up @@ -450,6 +458,7 @@ private Builder(ParquetProperties toCopy) {
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
this.dictionaryEarlyCheckEnabled = toCopy.dictionaryEarlyCheckEnabled;
this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs);
this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs);
this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled);
Expand Down Expand Up @@ -709,6 +718,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) {
return this;
}

public Builder withDictionaryEarlyCheckEnabled(boolean val) {
this.dictionaryEarlyCheckEnabled = val;
return this;
}

public Builder withExtraMetaData(Map<String, String> extraMetaData) {
this.extraMetaData = extraMetaData;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ static ValuesWriter dictWriterWithFallBack(
ValuesWriter writerToFallBackTo) {
if (parquetProperties.isDictionaryEnabled(path)) {
return FallbackValuesWriter.of(
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), writerToFallBackTo);
dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding),
writerToFallBackTo,
parquetProperties.isDictionaryEarlyCheckEnabled());
} else {
return writerToFallBackTo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F e

public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
I initialWriter, F fallBackWriter) {
return new FallbackValuesWriter<>(initialWriter, fallBackWriter);
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, true);
}

public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(
I initialWriter, F fallBackWriter, boolean checkCompressionOnFirstPage) {
return new FallbackValuesWriter<>(initialWriter, fallBackWriter, checkCompressionOnFirstPage);
}

/**
Expand All @@ -44,6 +49,8 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter

private boolean fellBackAlready = false;

private final boolean checkCompressionOnFirstPage;

/**
* writer currently written to
*/
Expand All @@ -63,10 +70,15 @@ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter
private boolean firstPage = true;

public FallbackValuesWriter(I initialWriter, F fallBackWriter) {
this(initialWriter, fallBackWriter, true);
}

public FallbackValuesWriter(I initialWriter, F fallBackWriter, boolean checkCompressionOnFirstPage) {
super();
this.initialWriter = initialWriter;
this.fallBackWriter = fallBackWriter;
this.currentWriter = initialWriter;
this.checkCompressionOnFirstPage = checkCompressionOnFirstPage;
}

@Override
Expand All @@ -82,7 +94,7 @@ public BytesInput getBytes() {
if (!fellBackAlready && firstPage) {
// we use the first page to decide if we're going to use this encoding
BytesInput bytes = initialWriter.getBytes();
if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
if (checkCompressionOnFirstPage && !initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
fallBack();
} else {
return bytes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.fallback;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
import org.apache.parquet.column.values.plain.PlainValuesWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestFallbackValuesWriter {

private TrackingByteBufferAllocator allocator;

@Before
public void initAllocator() {
allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
}

@After
public void closeAllocator() {
allocator.close();
}

@Test
public void testEarlyCheckDisabledPreservesDictionary() throws Exception {
int dictPageSize = 1024 * 1024;
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);

FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
FallbackValuesWriter.of(dictWriter, plainWriter, false);

try {
// Write many distinct values — would normally cause isCompressionSatisfying to fail
for (int i = 0; i < 1000; i++) {
writer.writeInteger(i);
}

writer.getBytes();
Encoding encoding = writer.getEncoding();

assertTrue(
"Dictionary encoding should be preserved when early check is disabled",
encoding.usesDictionary());
} finally {
writer.close();
}
}

@Test
public void testEarlyCheckEnabledFallsBack() throws Exception {
int dictPageSize = 1024 * 1024;
PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter(
dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator);
PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator);

FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> writer =
FallbackValuesWriter.of(dictWriter, plainWriter, true);

try {
// Write many distinct values — encoded size will exceed raw size
for (int i = 0; i < 1000; i++) {
writer.writeInteger(i);
}

writer.getBytes();
Encoding encoding = writer.getEncoding();

assertFalse(
"Should fall back to plain encoding when early check is enabled with high cardinality",
encoding.usesDictionary());
} finally {
writer.close();
}
}
}