CASSANALYTICS-167: Regenerate bloom filters for CQLSSTableWriter#211
CASSANALYTICS-167: Regenerate bloom filters for CQLSSTableWriter#211lukasz-antoniak wants to merge 10 commits into
Conversation
4ad23cb to
b8b6c02
Compare
|
Solid patch. Just a few things - nothing major. |
rustyrazorblade
left a comment
There was a problem hiding this comment.
Just a handful of small questions.
| { | ||
| FileSystemSSTable ssTable = new FileSystemSSTable(dataFile, false, BufferingInputStreamStats::doNothingStats); | ||
| writerContext.bridge().rebuildBloomFilter(layer.partitioner(), layer.cqlTable(), ssTable, outputDirectory); | ||
| LOGGER.error("Rebuilt bloom filter for sstable {}", dataFile); |
There was a problem hiding this comment.
I think this should be INFO level. It'll be confusing to users why every one of their jobs has errors.
| String table = cqltable.table(); | ||
|
|
||
| Map<MetadataType, MetadataComponent> componentMap = ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable, EnumSet.of(MetadataType.STATS)); | ||
| StatsMetadata statsMetadata = (StatsMetadata) componentMap.get(MetadataType.STATS); |
There was a problem hiding this comment.
Is there a possibility this could be NULL?
There was a problem hiding this comment.
From what I have reviewed in CQLSSTableWriter, all files (including statistics) have to be present once sstable is considered produced. Did I understand your concern correctly?
There was a problem hiding this comment.
Yep, just wanted to be sure.
| try (FileOutputStream fos = new FileOutputStream(filterFile, false); | ||
| DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos)) | ||
| { | ||
| BloomFilterSerializer.serialize((BloomFilter) filter, stream); |
There was a problem hiding this comment.
I'm not sure about this, but if the user has bloom filters disabled, I think this might throw an exception. In 5.0 it uses IFilter.
There was a problem hiding this comment.
For 5.0, we are using five-zero bridge and this code.
Adding a condition to skip filter regeneration when bloom_filter_fp_chance == 1.0.
| ImmutableList.of(ImmutableList.of("1", "1"), ImmutableList.of("7", "2"))); | ||
|
|
||
| assertThat(bloomFilter.mightContain(searchKeys.get(0))).isTrue(); | ||
| assertThat(bloomFilter.doesNotContain(searchKeys.get(1))).isTrue(); |
There was a problem hiding this comment.
Since we're using a limited set of data in here, it's very unlikely we'll ever hit this... so maybe just leave a comment. If the test changes, we could hit a bloom filter false positive, and it'll cause the test to fail. I know it's really, really unlikely, so please just document it.
| } | ||
| catch (Exception e) | ||
| { | ||
| LOGGER.warn("Failed to rebuild bloom filter for sstable {}", dataFile, e); |
There was a problem hiding this comment.
Is it safe to swallow this exception? How would it fail? Maybe make it an error?
There was a problem hiding this comment.
I initially though to ignore failures, but producing unparseable file (and uploading it) may indeed cause an issue. Original 16B file is overwritten, so we do not publish the empty one in case of failure, but potentially broken one.
There was a problem hiding this comment.
Ok. Mind just leaving a comment so it's clear that we're deliberately not uploading a bloom filter in case of a failure? I don't expect we'll hit this path ever, I just want it to be clear of the intention.
| import com.google.common.base.Preconditions; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import com.google.common.collect.Lists; | ||
| import org.apache.commons.lang.StringUtils; |
There was a problem hiding this comment.
nit / question: Is this is commons lang3?
0b061e8 to
b721c58
Compare
b721c58 to
9871ec7
Compare
| } | ||
|
|
||
| protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext, @NotNull Path outputDirectory, | ||
| @NotNull DirectoryStream.Filter<Path> filter) throws IOException |
There was a problem hiding this comment.
nit: indentation is off by 2 spaces.
| File filterFile = new File(directory.toFile(), descriptor.relativeFilenameFor(Component.FILTER)); | ||
| try (FileOutputStream fos = new FileOutputStream(filterFile, false); | ||
| DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos)) | ||
| { | ||
| BloomFilterSerializer.serialize((BloomFilter) filter, stream); | ||
| stream.flush(); | ||
| SyncUtil.sync(fos); | ||
| } |
There was a problem hiding this comment.
Should we delete the filterFile if it fails to rebuild, instead of throwing exception?
I think we are fine to upload without the filter component (after deleting the likely corrupted filter file). Server will rebuild the filter. Throwing exception fails the spark task, which causes a more expensive retry, delaying the completion of the spark job.
There was a problem hiding this comment.
+1 to throwing it away. Better than having a corrupt one, it'll rebuild on it's own this way.
There was a problem hiding this comment.
Done, error handling updated.
|
@rustyrazorblade, @yifan-c, I think I solved all comments. Can you do the final review? |
| }; | ||
| Set<Path> dataFilePaths = new HashSet<>(); | ||
| Map<Path, Digest> fileDigests = new HashMap<>(); | ||
| // FIXME: CQLSSTableWriter may produce incomplete Filter.db file, rebuilding it manually (see CASSANALYTICS-167). |
There was a problem hiding this comment.
If I understand correctly, we are rebuilding complete bloom filter, instead of fixing the actual cause (the cause of empty bloom filter), because the later requires changes in both C* 4.0 and 5.0 and new release/backporting etc in Cassandra, am I right? Can we create a Jira to fix this in upcoming C* versions so we can avoid completely rebuilding for upcoming C* versions, and add corresponding Jira here in comments, so we don't forget by any chance?
| protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext, @NotNull Path outputDirectory, | ||
| @NotNull DirectoryStream.Filter<Path> filter) throws IOException | ||
| { | ||
| LocalDataLayer layer = buildLocalDataLayer(writerContext, outputDirectory, null); |
There was a problem hiding this comment.
[Minot] LocalDataLayer class's description says 'Mostly used for testing', but we are using for prod. We need to update this comment.
| SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner); | ||
| TableMetadata tableMetadata = schemaBuilder.tableMetaData(); | ||
|
|
||
| if (tableMetadata.params.bloomFilterFpChance == 1.0) |
There was a problem hiding this comment.
I guess here == comparison on doubles is safe as 1.0 doesn't involve rounding
| // rebuild Filter.db files before calculating their digest | ||
| try (DirectoryStream<Path> stream = getDataFileStream(sstableFilter)) | ||
| { | ||
| for (Path path : stream) |
There was a problem hiding this comment.
Can we just pass sstableFilter to rebuildFilterComponents as it is already calling getDataFileStream and looping through it?
Fixes CASSANALYTICS-167.