diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f86f69e383..8050b78ad6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 * [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 +* [ENHANCEMENT] Parquet Converter: Add `parquet-converter.max-block-label-names` limit to skip conversion of TSDB blocks with too many label names. #7524 * [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462 * [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a605dd798ba..1a67655824c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4468,6 +4468,12 @@ query_rejection: # CLI flag: -parquet-converter.sort-columns [parquet_converter_sort_columns: | default = []] +# [Experimental] Maximum number of distinct label names allowed in a TSDB block +# for parquet conversion. If exceeded, the converter writes a no-convert marker. +# 0 to disable. +# CLI flag: -parquet-converter.max-block-label-names +[parquet_converter_max_block_label_names: | default = 0] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index e52c65a8a5b..fb79456cb06 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -133,3 +133,4 @@ Currently experimental features are: - Ingester: Active Series Tracker - Per-tenant `active_series_trackers` configuration in runtime config overrides - Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric +- Parquet Converter: `-parquet-converter.max-block-label-names` (int) - If enabled, adds a no-convert mark and skips blocks with too many labels. diff --git a/integration/parquet_converter_test.go b/integration/parquet_converter_test.go new file mode 100644 index 00000000000..4a10d3f1e0f --- /dev/null +++ b/integration/parquet_converter_test.go @@ -0,0 +1,142 @@ +//go:build integration + +package integration + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/log" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestParquetConverter_NoConvertMarkWithTooManyLabels(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-target": "all,parquet-converter", + "-blocks-storage.tsdb.block-ranges-period": "1m,24h", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", + "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + // compactor + "-compactor.cleanup-interval": "1s", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // Enable vertical sharding. + "-frontend.query-vertical-shard-size": "3", + "-frontend.max-cache-freshness": "1m", + // enable experimental promQL funcs + "-querier.enable-promql-experimental-functions": "true", + // parquet-converter + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.enabled": "true", + "-parquet-converter.max-block-label-names": "1", + // Querier + "-querier.enable-parquet-queryable": "true", + // Enable cache for parquet labels and chunks + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + lbls := []labels.Labels{ + labels.FromStrings("__name__", "test_series_a", "job", "test"), + } + + numSamples := 60 + scrapeInterval := time.Minute + now := time.Now() + start := now.Add(-time.Hour * 24) + end := now.Add(-time.Hour) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, + start.UnixMilli(), + end.UnixMilli(), + scrapeInterval.Milliseconds(), 10, + ) + require.NoError(t, err) + + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait for the converter to write the no-convert marker + cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + noConvertMarkerPath := fmt.Sprintf("%s/parquet-no-convert-mark.json", id.String()) + found := false + err := bkt.Iter(ctx, "", func(name string) error { + if name == noConvertMarkerPath { + found = true + } + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return found + }) + + // confirm the conversion did not happen (check both paths) + blockID := id.String() + markerPaths := []string{ + fmt.Sprintf("%s/parquet-converter-mark.json", blockID), + fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", blockID), + } + for _, markerPath := range markerPaths { + exists, err := bkt.Exists(ctx, markerPath) + require.NoError(t, err) + require.False(t, exists, "converter mark should not exist at %s", markerPath) + } +} diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index a5ed6ce0c05..a0c4a05b1fd 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -389,6 +389,8 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin // We don't convert blocks again if they already have a valid converter mark. if cortex_parquet.ValidConverterMarkVersion(marker.Version) { + level.Debug(logger).Log("msg", "skipping block, no-convert marker already exists", "block", b.ULID.String()) + c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() continue } @@ -396,6 +398,21 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } + maxBlockLabelNames := c.limits.ParquetConverterMaxBlockLabelNames(userID) + + // If the threshold is enabled, check for no-convert mark + if maxBlockLabelNames > 0 { + + noConvertMark, err := cortex_parquet.ReadNoConvertMark(ctx, b.ULID, uBucket, logger) + if err != nil { + level.Error(logger).Log("msg", "failed to read parquet no-convert marker", "block", b.ULID.String(), "err", err) + continue + } + if cortex_parquet.ValidNoConvertMarkVersion(noConvertMark.Version) { + continue + } + } + if err := os.RemoveAll(c.compactRootDir()); err != nil { level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err) if c.checkConvertError(userID, err) { @@ -425,6 +442,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } + if maxBlockLabelNames > 0 { + labelNames, err := tsdbBlock.LabelNames(ctx) + if err != nil { + _ = tsdbBlock.Close() + level.Error(logger).Log("msg", "failed to get label names", "block", b.ULID.String(), "err", err) + if c.checkConvertError(userID, err) { + return err + } + continue + } + labelNamesCount := len(labelNames) + if labelNamesCount > maxBlockLabelNames { + if err := cortex_parquet.WriteNoConvertMark(ctx, b.ULID, uBucket, labelNamesCount, maxBlockLabelNames); err != nil { + _ = tsdbBlock.Close() + level.Error(logger).Log("msg", "failed to write parquet no-convert marker", "block", b.ULID.String(), "err", err) + if c.checkConvertError(userID, err) { + return err + } + continue + } + level.Debug(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames) + c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() + _ = tsdbBlock.Close() + continue + } + } + level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir) start := time.Now() diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index bdcf46b3d36..bb588f3df62 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -488,3 +488,128 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { // It should be 0 since the block was already converted assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) } + +func TestConverter_WriteNoConvertMarkForBlockWithTooManyLabels(t *testing.T) { + cfg := prepareConfig() + user := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + limits.ParquetConverterMaxBlockLabelNames = 1 + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + lbls := labels.FromStrings("__name__", "test", "job", "foo") + + // Create a block + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + // 2h blocks are skipped by ShouldConvertBlockToParquet + blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, 4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + + // Upload the block to the bucket + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck + + // Start the converter + err = c.convertUser(ctx, logger, c.ring, user) + require.NoError(t, err) + + // Verify the marker was written correctly + readNoConvertMark, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.True(t, parquet.ValidNoConvertMarkVersion(readNoConvertMark.Version)) + require.Equal(t, parquet.NoConvertReasonTooManyLabels, readNoConvertMark.Reason) + require.Equal(t, 1, readNoConvertMark.Threshold) + require.Equal(t, 2, readNoConvertMark.LabelNamesCount) + + // Confirm conversion did not happen + assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) +} + +func TestConverter_SkipBlockWhenNoConvertMarkAlreadyExists(t *testing.T) { + cfg := prepareConfig() + user := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + limits.ParquetConverterMaxBlockLabelNames = 1 + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + lbls := labels.FromStrings("__name__", "test", "job", "foo") + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + // 2h blocks are skipped by ShouldConvertBlockToParquet + blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, + 4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + markerV1 := parquet.NoConvertMark{ + Version: parquet.CurrentNoConvertMarkVersion, + Reason: parquet.NoConvertReasonTooManyLabels, + LabelNamesCount: 2, + Threshold: 1, + } + markerBytes, err := json.Marshal(markerV1) + require.NoError(t, err) + markerPath := path.Join(blockID.String(), parquet.NoConvertMarkerFileName) + err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes)) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck + + // start converter + err = c.convertUser(ctx, logger, c.ring, user) + require.NoError(t, err) + + // confirm conversion was skipped + assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) + + markerAfter, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.True(t, parquet.ValidNoConvertMarkVersion(markerAfter.Version)) + require.Equal(t, parquet.NoConvertReasonTooManyLabels, markerAfter.Reason) + require.Equal(t, 1, markerAfter.Threshold) + require.Equal(t, 2, markerAfter.LabelNamesCount) +} diff --git a/pkg/parquetconverter/metrics.go b/pkg/parquetconverter/metrics.go index 2b3e80b0cfd..dcbfaf4a20f 100644 --- a/pkg/parquetconverter/metrics.go +++ b/pkg/parquetconverter/metrics.go @@ -8,6 +8,7 @@ import ( type metrics struct { convertedBlocks *prometheus.CounterVec convertBlockFailures *prometheus.CounterVec + skippedBlocks *prometheus.CounterVec convertBlockDuration *prometheus.GaugeVec convertParquetBlockDelay prometheus.Histogram ownedUsers prometheus.Gauge @@ -23,6 +24,10 @@ func newMetrics(reg prometheus.Registerer) *metrics { Name: "cortex_parquet_converter_block_convert_failures_total", Help: "Total number of failed block conversions per user.", }, []string{"user"}), + skippedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_converter_blocks_skipped_total", + Help: "Total number of blocks skipped during parquet conversion per user and reason.", + }, []string{"user", "reason"}), convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_parquet_converter_convert_block_duration_seconds", Help: "Time taken to for the latest block conversion for the user.", diff --git a/pkg/storage/parquet/no_convert_marker.go b/pkg/storage/parquet/no_convert_marker.go new file mode 100644 index 00000000000..c5d7d10e0cf --- /dev/null +++ b/pkg/storage/parquet/no_convert_marker.go @@ -0,0 +1,74 @@ +package parquet + +import ( + "bytes" + "context" + "encoding/json" + "io" + "path" + + "github.com/efficientgo/core/errors" + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/runutil" + + "github.com/cortexproject/cortex/pkg/storage/bucket" +) + +const ( + NoConvertMarkerFileName = "parquet-no-convert-mark.json" + + CurrentNoConvertMarkVersion = NoConvertMarkVersion1 + NoConvertMarkVersion1 = 1 + + NoConvertReasonTooManyLabels = "too_many_labels" +) + +type NoConvertMark struct { + Version int `json:"version"` + Reason string `json:"reason"` + LabelNamesCount int `json:"label_names_count"` + Threshold int `json:"threshold"` +} + +func ReadNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*NoConvertMark, error) { + markerPath := path.Join(id.String(), NoConvertMarkerFileName) + reader, err := userBkt.WithExpectedErrs(bucket.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath) + if err != nil { + if userBkt.IsObjNotFoundErr(err) || userBkt.IsAccessDeniedErr(err) { + return &NoConvertMark{}, nil + } + + return &NoConvertMark{}, err + } + defer runutil.CloseWithLogOnErr(logger, reader, "close parquet no-convert marker file reader") + + markerContent, err := io.ReadAll(reader) + if err != nil { + return &NoConvertMark{}, errors.Wrapf(err, "read file: %s", NoConvertMarkerFileName) + } + + marker := NoConvertMark{} + err = json.Unmarshal(markerContent, &marker) + return &marker, err +} + +func WriteNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket, labelNamesCount int, maxBlockLabelNames int) error { + noConvertMarker := NoConvertMark{ + Version: CurrentNoConvertMarkVersion, + Reason: NoConvertReasonTooManyLabels, + LabelNamesCount: labelNamesCount, + Threshold: maxBlockLabelNames, + } + noConvertMarkerPath := path.Join(id.String(), NoConvertMarkerFileName) + b, err := json.Marshal(noConvertMarker) + if err != nil { + return err + } + return userBkt.Upload(ctx, noConvertMarkerPath, bytes.NewReader(b)) +} + +func ValidNoConvertMarkVersion(version int) bool { + return version == NoConvertMarkVersion1 +} diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 6067ed96067..1414d3f2bb5 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -101,6 +101,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="out_of_order_results_cache_ttl",user="tenant-a"} 0 cortex_overrides{limit_name="out_of_order_time_window",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_enabled",user="tenant-a"} 0 + cortex_overrides{limit_name="parquet_converter_max_block_label_names",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_chunk_bytes",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_data_bytes",user="tenant-a"} 0 diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 019a5adc3ed..fc37e4a4054 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -241,9 +241,10 @@ type Limits struct { CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"` // Parquet converter - ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` - ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` - ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"` + ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` + ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` + ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"` + ParquetConverterMaxBlockLabelNames int `yaml:"parquet_converter_max_block_label_names" json:"parquet_converter_max_block_label_names"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."` @@ -369,6 +370,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.") + f.IntVar(&l.ParquetConverterMaxBlockLabelNames, "parquet-converter.max-block-label-names", 0, "[Experimental] Maximum number of distinct label names allowed in a TSDB block for parquet conversion. If exceeded, the converter writes a no-convert marker. 0 to disable.") // Parquet Queryable enforced limits. f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.") @@ -1029,6 +1031,11 @@ func (o *Overrides) ParquetConverterSortColumns(userID string) []string { return o.GetOverridesForUser(userID).ParquetConverterSortColumns } +// ParquetConverterMaxBlockLabelNames returns the maximum number of distinct label names allowed in a TSDB block for parquet conversion. +func (o *Overrides) ParquetConverterMaxBlockLabelNames(userID string) int { + return o.GetOverridesForUser(userID).ParquetConverterMaxBlockLabelNames +} + // ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage. func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int { return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 51aee2c0f59..9081ddaa683 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5415,6 +5415,12 @@ "type": "boolean", "x-cli-flag": "parquet-converter.enabled" }, + "parquet_converter_max_block_label_names": { + "default": 0, + "description": "[Experimental] Maximum number of distinct label names allowed in a TSDB block for parquet conversion. If exceeded, the converter writes a no-convert marker. 0 to disable.", + "type": "number", + "x-cli-flag": "parquet-converter.max-block-label-names" + }, "parquet_converter_sort_columns": { "default": [], "description": "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.",