diff --git a/CHANGELOG.md b/CHANGELOG.md index c539eb4973..06e5efcc55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512 * [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515 * [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534 +* [BUGFIX] Ingester: Release the TSDB appender on every early-return path in `Push` (e.g. out-of-order label set) by deferring `Rollback`. Previously such requests leaked TSDB head series references, mmap'd chunks and pending state per request, causing the `cortex_ingester_tsdb_head_active_appenders` gauge to grow unbounded. #7528 ## 1.21.0 2026-04-24 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6116a31899..c6b8430d36 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1437,6 +1437,22 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Walk the samples, appending them to the users database app := db.Appender(ctx).(extendedAppender) + // Ensure the appender is always released so that we don't leak TSDB head + // series references, mmap'd chunks and pending state on early returns. + // `committed` is flipped to true immediately before app.Commit() because + // Prometheus closes the appender even on Commit failure (it self-rolls + // back internally on WAL error), so the deferred Rollback must not run + // afterwards. + committed := false + defer func() { + if committed { + return + } + if rollbackErr := app.Rollback(); rollbackErr != nil { + level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback appender on early return", "user", userID, "err", rollbackErr) + } + }() + // Even when OOO is enabled globally, we want to reject OOO samples in some cases. // prometheus implementation: https://github.com/prometheus/prometheus/pull/14710 if req.DiscardOutOfOrder { @@ -1505,11 +1521,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback { continue } - // The error looks an issue on our side, so we should rollback - if rollbackErr := app.Rollback(); rollbackErr != nil { - level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr) - } - + // The error looks an issue on our side, so we should rollback. + // The deferred rollback above will close the appender; nothing to do here. return nil, wrapWithUser(err, userID) } @@ -1560,10 +1573,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels, matchedLabelSetLimits); !rollback { continue } - // The error looks an issue on our side, so we should rollback - if rollbackErr := app.Rollback(); rollbackErr != nil { - level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr) - } + // The error looks an issue on our side, so we should rollback. + // The deferred rollback above will close the appender; nothing to do here. return nil, wrapWithUser(err, userID) } } else { @@ -1626,6 +1637,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } startCommit := time.Now() + // Mark committed before calling Commit: Prometheus closes the appender on + // both success and failure of Commit (it self-rolls-back on WAL error), so + // the deferred Rollback must not fire afterwards. + committed = true if err := app.Commit(); err != nil { return nil, wrapWithUser(err, userID) } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5dd0d1ec36..5c2b66270c 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2833,6 +2833,64 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) { require.NoError(t, err) } +// TestIngester_Push_OutOfOrderLabels_AppenderNotLeaked verifies that when Push +// returns early because of an out-of-order label set (and on any other early +// return after the appender is acquired) the appender is released via +// Rollback. Otherwise the TSDB head leaks series references, mmap'd chunks and +// pending state on every such request; observable via the +// cortex_ingester_tsdb_head_active_appenders gauge. +func TestIngester_Push_OutOfOrderLabels_AppenderNotLeaked(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE. + test.Poll(t, time.Second, ring.ACTIVE, func() any { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // First push a valid sample to initialise the user TSDB head so that + // subsequent Push() calls take the headAppender path. + validLabels := labels.FromStrings(labels.MetricName, "test_metric", "a", "1", "b", "2") + validReq, _ := mockWriteRequest(t, validLabels, 1, 1) + _, err = i.Push(ctx, validReq) + require.NoError(t, err) + + // Sanity check: no appenders are currently active. + const activeAppendersMetric = "cortex_ingester_tsdb_head_active_appenders" + expectedZero := ` + # HELP cortex_ingester_tsdb_head_active_appenders Number of currently active TSDB appender transactions. + # TYPE cortex_ingester_tsdb_head_active_appenders gauge + cortex_ingester_tsdb_head_active_appenders 0 +` + require.NoError(t, testutil.GatherAndCompare(r, bytes.NewBufferString(expectedZero), activeAppendersMetric)) + + // Now push a series of requests with an out-of-order label set. Each + // such request creates an appender that, without the leak fix, is never + // released, leaving the active-appenders gauge growing. + outOfOrderLabels := []cortexpb.LabelAdapter{ + {Name: labels.MetricName, Value: "test_metric"}, + {Name: "c", Value: "3"}, + {Name: "a", Value: "1"}, + } + const numLeakyPushes = 5 + for n := range numLeakyPushes { + req, _ := mockWriteRequest(t, cortexpb.FromLabelAdaptersToLabels(outOfOrderLabels), 1, int64(2+n)) + _, err = i.Push(ctx, req) + require.Error(t, err) + require.Contains(t, err.Error(), "out-of-order label set found") + } + + // The active-appenders gauge must still be 0 — every appender created by + // the early-returning Push() must have been released. + require.NoError(t, testutil.GatherAndCompare(r, bytes.NewBufferString(expectedZero), activeAppendersMetric)) +} + func BenchmarkIngesterPush(b *testing.B) { limits := defaultLimitsTestConfig() benchmarkIngesterPush(b, limits, false)