diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..f808a7092b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -67,6 +67,7 @@ public class ParquetProperties { public static final boolean DEFAULT_STATISTICS_ENABLED = true; public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true; + public static final boolean DEFAULT_DICTIONARY_EARLY_CHECK_ENABLED = true; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; /** @@ -131,6 +132,7 @@ public static WriterVersion fromString(String name) { private final int rowGroupRowCountLimit; private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; + private final boolean dictionaryEarlyCheckEnabled; private final ColumnProperty byteStreamSplitEnabled; private final Map extraMetaData; private final ColumnProperty statistics; @@ -163,6 +165,7 @@ private ParquetProperties(Builder builder) { this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit; this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; + this.dictionaryEarlyCheckEnabled = builder.dictionaryEarlyCheckEnabled; this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build(); this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); @@ -322,6 +325,10 @@ public boolean getPageWriteChecksumEnabled() { return pageWriteChecksumEnabled; } + public boolean isDictionaryEarlyCheckEnabled() { + return dictionaryEarlyCheckEnabled; + } + public OptionalLong getBloomFilterNDV(ColumnDescriptor column) { Long ndv = bloomFilterNDVs.getValue(column); return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv); @@ -415,6 +422,7 @@ public static class Builder { private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT; private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; + private boolean dictionaryEarlyCheckEnabled = DEFAULT_DICTIONARY_EARLY_CHECK_ENABLED; private final ColumnProperty.Builder byteStreamSplitEnabled; private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; @@ -450,6 +458,7 @@ private Builder(ParquetProperties toCopy) { this.allocator = toCopy.allocator; this.pageRowCountLimit = toCopy.pageRowCountLimit; this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; + this.dictionaryEarlyCheckEnabled = toCopy.dictionaryEarlyCheckEnabled; this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs); this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs); this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled); @@ -709,6 +718,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) { return this; } + public Builder withDictionaryEarlyCheckEnabled(boolean val) { + this.dictionaryEarlyCheckEnabled = val; + return this; + } + public Builder withExtraMetaData(Map extraMetaData) { this.extraMetaData = extraMetaData; return this; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java index 4c03e6b65e..5c88eb3041 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java @@ -111,7 +111,9 @@ static ValuesWriter dictWriterWithFallBack( ValuesWriter writerToFallBackTo) { if (parquetProperties.isDictionaryEnabled(path)) { return FallbackValuesWriter.of( - dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), writerToFallBackTo); + dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding), + writerToFallBackTo, + parquetProperties.isDictionaryEarlyCheckEnabled()); } else { return writerToFallBackTo; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java index 7f56ef2192..03a2bd64b0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java @@ -30,7 +30,12 @@ public class FallbackValuesWriter FallbackValuesWriter of( I initialWriter, F fallBackWriter) { - return new FallbackValuesWriter<>(initialWriter, fallBackWriter); + return new FallbackValuesWriter<>(initialWriter, fallBackWriter, true); + } + + public static FallbackValuesWriter of( + I initialWriter, F fallBackWriter, boolean checkCompressionOnFirstPage) { + return new FallbackValuesWriter<>(initialWriter, fallBackWriter, checkCompressionOnFirstPage); } /** @@ -44,6 +49,8 @@ public static writer = + FallbackValuesWriter.of(dictWriter, plainWriter, false); + + try { + // Write many distinct values — would normally cause isCompressionSatisfying to fail + for (int i = 0; i < 1000; i++) { + writer.writeInteger(i); + } + + writer.getBytes(); + Encoding encoding = writer.getEncoding(); + + assertTrue( + "Dictionary encoding should be preserved when early check is disabled", + encoding.usesDictionary()); + } finally { + writer.close(); + } + } + + @Test + public void testEarlyCheckEnabledFallsBack() throws Exception { + int dictPageSize = 1024 * 1024; + PlainIntegerDictionaryValuesWriter dictWriter = new PlainIntegerDictionaryValuesWriter( + dictPageSize, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN_DICTIONARY, allocator); + PlainValuesWriter plainWriter = new PlainValuesWriter(1024, 1024 * 1024, allocator); + + FallbackValuesWriter writer = + FallbackValuesWriter.of(dictWriter, plainWriter, true); + + try { + // Write many distinct values — encoded size will exceed raw size + for (int i = 0; i < 1000; i++) { + writer.writeInteger(i); + } + + writer.getBytes(); + Encoding encoding = writer.getEncoding(); + + assertFalse( + "Should fall back to plain encoding when early check is enabled with high cardinality", + encoding.usesDictionary()); + } finally { + writer.close(); + } + } +}