From 8af8002ad71894fffa4d22d491b03dc0bda6f7fa Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sat, 6 Jun 2026 14:17:49 +0000 Subject: [PATCH] fix(deps): update module github.com/riverqueue/rivercontrib/otelriver to v0.10.0 --- go.mod | 2 +- go.sum | 2 + .../rivercontrib/otelriver/middleware.go | 71 ++++++++++++++----- vendor/modules.txt | 2 +- 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 514a61fa..e5778811 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/riverqueue/river v0.38.0 github.com/riverqueue/river/riverdriver/riverpgxv5 v0.38.0 github.com/riverqueue/river/rivertype v0.38.0 - github.com/riverqueue/rivercontrib/otelriver v0.8.0 + github.com/riverqueue/rivercontrib/otelriver v0.10.0 github.com/rodaine/table v1.3.1 github.com/sourcegraph/jsonrpc2 v0.2.1 github.com/spf13/cobra v1.10.2 diff --git a/go.sum b/go.sum index 8f639176..f488d2a6 100644 --- a/go.sum +++ b/go.sum @@ -362,6 +362,8 @@ github.com/riverqueue/river/rivertype v0.38.0 h1:Tzu0OhRojFhuwcARVz7C2lYG8wE4zH+ github.com/riverqueue/river/rivertype v0.38.0/go.mod h1:D1Ad+EaZiaXbQbJcJcfeicXJMBKno0n6UcfKI5Q7DIQ= github.com/riverqueue/rivercontrib/otelriver v0.8.0 h1:zBFuoMhcGq0P1rrNl+kbxW6A2EI3/eRROAkY6bnNCZE= github.com/riverqueue/rivercontrib/otelriver v0.8.0/go.mod h1:qwnMagI9IsFGEaKlp5oMecLGxE4byhOk8kqis7oeomo= +github.com/riverqueue/rivercontrib/otelriver v0.10.0 h1:cOtwJ6PyVGQWN45XfuSQSMJAV60KozgES8vaHE9u1F0= +github.com/riverqueue/rivercontrib/otelriver v0.10.0/go.mod h1:Ewb2HiCy9yoltuomU4yZcdvBRHQBNo2683qhChq2JOQ= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rodaine/protogofakeit v0.1.1 h1:ZKouljuRM3A+TArppfBqnH8tGZHOwM/pjvtXe9DaXH8= diff --git a/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go b/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go index e47a3fd8..63173f3b 100644 --- a/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go +++ b/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "errors" + "slices" "time" "go.opentelemetry.io/otel" @@ -156,12 +157,42 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job duration := m.durationInPreferredUnit(time.Since(begin)) setStatus(attrs, statusIndex, span, panicked, err) + + var skipped int64 + for _, r := range insertRes { + if r != nil && r.UniqueSkippedAsDuplicate { + skipped++ + } + } + + kinds := make([]string, 0, len(manyParams)) + for _, p := range manyParams { + kinds = append(kinds, p.Kind) + } + slices.Sort(kinds) + kinds = slices.Compact(kinds) + span.SetAttributes(attrs...) // set after finalizing status + span.SetAttributes( + attribute.StringSlice("kinds", kinds), + attribute.Int64("unique_skipped_as_duplicate_count", skipped), + ) // This allocates a new slice, so make sure to do it as few times as possible. measurementOpt := metric.WithAttributes(attrs...) - m.metrics.insertCount.Add(ctx, int64(len(manyParams)), measurementOpt) + // Partition insert_count by unique_skipped_as_duplicate so the + // metric shows how many of the submitted jobs were dropped by + // UniqueOpts vs. actually enqueued. Sum across both data points + // still equals len(manyParams). + if inserted := int64(len(manyParams)) - skipped; inserted > 0 { + m.metrics.insertCount.Add(ctx, inserted, + metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", false))...)) + } + if skipped > 0 { + m.metrics.insertCount.Add(ctx, skipped, + metric.WithAttributes(append(attrs, attribute.Bool("unique_skipped_as_duplicate", true))...)) + } m.metrics.insertManyCount.Add(ctx, 1, measurementOpt) m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt) m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt) @@ -210,6 +241,11 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu defer func() { duration := m.durationInPreferredUnit(time.Since(begin)) + var ( + cancelErr *river.JobCancelError + snoozeErr *river.JobSnoozeError + ) + if err != nil { var batchResult interface { // To be superseded if riverbatch.MultiError is moved to rivertype. ErrorsByID() map[int64]error @@ -218,11 +254,6 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu err = batchResult.ErrorsByID()[job.ID] } - var ( - cancelErr *river.JobCancelError - snoozeErr *river.JobSnoozeError - ) - switch { case errors.As(err, &cancelErr): attrs = append(attrs, attribute.Bool("cancel", true)) @@ -233,16 +264,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu setStatus(attrs, statusIndex, span, panicked, err) - { - // Add some higher cardinality attributes to spans, but keep them - // out of metrics given it's been traditional wisdom that metric - // attribute sets shouldn't be too large. - attrs := append(attrs, - attribute.Int64("id", job.ID), - attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)), - attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)), - ) - span.SetAttributes(attrs...) // set after finalizing status + // Add some higher cardinality attributes to spans, but keep them + // out of metrics given it's been traditional wisdom that metric + // attribute sets shouldn't be too large. + span.SetAttributes( + attribute.Int64("id", job.ID), + attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)), + attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)), + ) + span.SetAttributes(attrs...) // set after finalizing status + if snoozeErr != nil { + span.SetAttributes(attribute.String("snooze.duration", snoozeErr.Duration.String())) } // This allocates a new slice, so make sure to do it as few times as possible. @@ -316,6 +348,13 @@ func setStatus(attrs []attribute.KeyValue, statusIndex int, span trace.Span, pan case panicked: attrs[statusIndex] = attribute.String("status", "panic") span.SetStatus(codes.Error, "panic") + case errors.Is(err, &river.JobSnoozeError{}): + // Snooze is flow control, not failure: the job will be retried + // later. Record as ok so it doesn't pollute error rates; the + // snooze:true span/metric attribute (set by the caller) keeps + // snoozes queryable as a dimension. + attrs[statusIndex] = attribute.String("status", "ok") + span.SetStatus(codes.Ok, "") case err != nil: attrs[statusIndex] = attribute.String("status", "error") span.SetStatus(codes.Error, err.Error()) diff --git a/vendor/modules.txt b/vendor/modules.txt index 101c4a7e..7999902f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -804,7 +804,7 @@ github.com/riverqueue/river/rivershared/util/valutil # github.com/riverqueue/river/rivertype v0.38.0 ## explicit; go 1.25.0 github.com/riverqueue/river/rivertype -# github.com/riverqueue/rivercontrib/otelriver v0.8.0 +# github.com/riverqueue/rivercontrib/otelriver v0.10.0 ## explicit; go 1.25.0 github.com/riverqueue/rivercontrib/otelriver # github.com/rodaine/table v1.3.1