diff --git a/app/domain/pusher/pusher.go b/app/domain/pusher/pusher.go index 7073843d..03152fca 100644 --- a/app/domain/pusher/pusher.go +++ b/app/domain/pusher/pusher.go @@ -8,23 +8,15 @@ package pusher import ( - "bytes" "context" "errors" "fmt" - "math" - "math/rand/v2" - "net/http" - "strconv" "sync" "time" - "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/prompb" "github.com/rs/zerolog/log" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/protoadapt" config "github.com/cloudzero/cloudzero-agent/app/config/webhook" "github.com/cloudzero/cloudzero-agent/app/types" @@ -335,133 +327,14 @@ func (h *MetricsPusher) Flush() error { return nil } -func (h *MetricsPusher) formatMetrics(records []*types.ResourceTags) []prompb.TimeSeries { - timeSeries := []prompb.TimeSeries{} - for _, record := range records { - metricName := h.constructMetricTagName(record, "labels") - recordCreatedOrUpdated := h.maxTime(record.RecordUpdated, record.RecordCreated) - timeSeries = append(timeSeries, h.createTimeseries(metricName, *record.Labels, *record.MetricLabels, recordCreatedOrUpdated)) - if record.Annotations != nil { - metricName := h.constructMetricTagName(record, "annotations") - timeSeries = append(timeSeries, h.createTimeseries(metricName, *record.Annotations, *record.MetricLabels, recordCreatedOrUpdated)) - } - } - return timeSeries -} - -func (h *MetricsPusher) constructMetricTagName(record *types.ResourceTags, metricType string) string { - return fmt.Sprintf("cloudzero_%s_%s", config.ResourceTypeToMetricName[record.Type], metricType) -} - -func (h *MetricsPusher) createTimeseries( - metricName string, metricTags config.MetricLabelTags, - additionalMetricLabels config.MetricLabels, - recordCreatedOrUpdated time.Time, -) prompb.TimeSeries { - ts := prompb.TimeSeries{ - Labels: []prompb.Label{ - { - Name: "__name__", - Value: metricName, - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: recordCreatedOrUpdated.UnixNano() / int64(time.Millisecond), - }, - }, - } - for labelKey, labelValue := range additionalMetricLabels { - ts.Labels = append(ts.Labels, prompb.Label{ - Name: labelKey, - Value: labelValue, - }) - } - for labelKey, labelValue := range metricTags { - ts.Labels = append(ts.Labels, prompb.Label{ - Name: "label_" + labelKey, - Value: labelValue, - }) - } +// formatMetrics and pushMetrics delegate to the public package-level +// functions in sender.go so the same logic can be reused by the streaming +// store (which doesn't have a MetricsPusher instance). - return ts +func (h *MetricsPusher) formatMetrics(records []*types.ResourceTags) []prompb.TimeSeries { + return FormatMetrics(records) } func (h *MetricsPusher) pushMetrics(remoteWriteURL string, apiKey string, timeSeries []prompb.TimeSeries) error { - writeRequest := &prompb.WriteRequest{ - Timeseries: timeSeries, - } - - data, err := proto.Marshal(protoadapt.MessageV2Of(writeRequest)) - if err != nil { - return fmt.Errorf("error marshaling WriteRequest: %v", err) - } - - compressed := snappy.Encode(nil, data) - - endpoint := remoteWriteURL - start := time.Now() - - // Instrument: Observe payload size - RemoteWritePayloadSizeBytes.WithLabelValues(endpoint).Observe(float64(len(compressed))) - - var resp *http.Response - var req *http.Request - - for attempt := range h.maxRetries { - ctx, cancel := context.WithTimeout(context.Background(), h.sendTimeout) - defer cancel() - - req, err = http.NewRequestWithContext(ctx, "POST", remoteWriteURL, bytes.NewBuffer(compressed)) - if err != nil { - return fmt.Errorf("error creating HTTP request: %v", err) - } - - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("Content-Encoding", "snappy") - req.Header.Set("Authorization", "Bearer "+apiKey) - - client := &http.Client{} - resp, err = client.Do(req) - if err != nil { - log.Ctx(ctx).Err(err).Msg("post metric failure") - } - - // Instrument: measure duration after each attempt - duration := time.Since(start).Seconds() - RemoteWriteRequestDuration.WithLabelValues(endpoint).Observe(duration) - - if err == nil && (resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices) { - defer resp.Body.Close() - // Instrument: response code 200 - RemoteWriteResponseCodes.WithLabelValues(endpoint, "2xx").Inc() - return nil - } - - if resp != nil { - statusCode := strconv.Itoa(resp.StatusCode) - RemoteWriteResponseCodes.WithLabelValues(endpoint, statusCode).Inc() - resp.Body.Close() - log.Ctx(h.ctx).Error(). - Int("statusCode", resp.StatusCode). - Str("statusText", resp.Status). - Msg("Received non-2xx response, retrying...") - } else { - // If resp is nil, we can track it as a failure as well - RemoteWriteResponseCodes.WithLabelValues(endpoint, "no_response").Inc() - } - backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second - jitter := time.Duration(rand.Int64N(int64(time.Second))) //nolint:gosec // cryptographically secure PRNG here is not necessary - time.Sleep(backoff + jitter) - } - - return fmt.Errorf("received non-2xx response: %v after %d retries", err, h.maxRetries) -} - -func (h *MetricsPusher) maxTime(t1, t2 time.Time) time.Time { - if t1.After(t2) { - return t1 - } - return t2 + return PushMetrics(h.ctx, remoteWriteURL, apiKey, timeSeries, h.maxRetries, h.sendTimeout) } diff --git a/app/domain/pusher/sender.go b/app/domain/pusher/sender.go new file mode 100644 index 00000000..3de615ef --- /dev/null +++ b/app/domain/pusher/sender.go @@ -0,0 +1,152 @@ +// SPDX-FileCopyrightText: Copyright (c) 2016-2026, CloudZero, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package pusher + +import ( + "bytes" + "context" + "fmt" + "math" + "math/rand/v2" + "net/http" + "strconv" + "time" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/rs/zerolog/log" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/protoadapt" + + config "github.com/cloudzero/cloudzero-agent/app/config/webhook" + "github.com/cloudzero/cloudzero-agent/app/types" +) + +// FormatMetrics converts ResourceTags records into Prometheus TimeSeries +// suitable for remote_write. Each record produces one labels timeseries +// and optionally one annotations timeseries. +func FormatMetrics(records []*types.ResourceTags) []prompb.TimeSeries { + timeSeries := []prompb.TimeSeries{} + for _, record := range records { + metricName := fmt.Sprintf("cloudzero_%s_labels", config.ResourceTypeToMetricName[record.Type]) + recordTime := maxTime(record.RecordUpdated, record.RecordCreated) + timeSeries = append(timeSeries, createTimeseries(metricName, *record.Labels, *record.MetricLabels, recordTime)) + if record.Annotations != nil { + metricName := fmt.Sprintf("cloudzero_%s_annotations", config.ResourceTypeToMetricName[record.Type]) + timeSeries = append(timeSeries, createTimeseries(metricName, *record.Annotations, *record.MetricLabels, recordTime)) + } + } + return timeSeries +} + +// PushMetrics serializes timeseries to a snappy-compressed protobuf and +// POSTs them to the remote_write endpoint with retry + exponential backoff. +func PushMetrics(ctx context.Context, url, apiKey string, timeSeries []prompb.TimeSeries, maxRetries int, sendTimeout time.Duration) error { + writeRequest := &prompb.WriteRequest{ + Timeseries: timeSeries, + } + + data, err := proto.Marshal(protoadapt.MessageV2Of(writeRequest)) + if err != nil { + return fmt.Errorf("error marshaling WriteRequest: %v", err) + } + + compressed := snappy.Encode(nil, data) + + endpoint := url + start := time.Now() + + RemoteWritePayloadSizeBytes.WithLabelValues(endpoint).Observe(float64(len(compressed))) + + var resp *http.Response + var req *http.Request + + for attempt := range maxRetries { + reqCtx, cancel := context.WithTimeout(ctx, sendTimeout) + defer cancel() + + req, err = http.NewRequestWithContext(reqCtx, "POST", url, bytes.NewBuffer(compressed)) + if err != nil { + return fmt.Errorf("error creating HTTP request: %v", err) + } + + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + req.Header.Set("Authorization", "Bearer "+apiKey) + + client := &http.Client{} + resp, err = client.Do(req) + if err != nil { + log.Ctx(reqCtx).Err(err).Msg("post metric failure") + } + + duration := time.Since(start).Seconds() + RemoteWriteRequestDuration.WithLabelValues(endpoint).Observe(duration) + + if err == nil && (resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices) { + defer resp.Body.Close() + RemoteWriteResponseCodes.WithLabelValues(endpoint, "2xx").Inc() + return nil + } + + if resp != nil { + statusCode := strconv.Itoa(resp.StatusCode) + RemoteWriteResponseCodes.WithLabelValues(endpoint, statusCode).Inc() + resp.Body.Close() + log.Ctx(ctx).Error(). + Int("statusCode", resp.StatusCode). + Str("statusText", resp.Status). + Msg("Received non-2xx response, retrying...") + } else { + RemoteWriteResponseCodes.WithLabelValues(endpoint, "no_response").Inc() + } + backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second + jitter := time.Duration(rand.Int64N(int64(time.Second))) //nolint:gosec // cryptographically secure PRNG is not necessary for backoff jitter + time.Sleep(backoff + jitter) + } + + return fmt.Errorf("received non-2xx response: %v after %d retries", err, maxRetries) +} + +func createTimeseries( + metricName string, metricTags config.MetricLabelTags, + additionalMetricLabels config.MetricLabels, + recordTime time.Time, +) prompb.TimeSeries { + ts := prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "__name__", + Value: metricName, + }, + }, + Samples: []prompb.Sample{ + { + Value: 1, + Timestamp: recordTime.UnixNano() / int64(time.Millisecond), + }, + }, + } + for labelKey, labelValue := range additionalMetricLabels { + ts.Labels = append(ts.Labels, prompb.Label{ + Name: labelKey, + Value: labelValue, + }) + } + for labelKey, labelValue := range metricTags { + ts.Labels = append(ts.Labels, prompb.Label{ + Name: "label_" + labelKey, + Value: labelValue, + }) + } + + return ts +} + +func maxTime(t1, t2 time.Time) time.Time { + if t1.After(t2) { + return t1 + } + return t2 +} diff --git a/app/functions/webhook/main.go b/app/functions/webhook/main.go index 2b27e3f1..4c561418 100644 --- a/app/functions/webhook/main.go +++ b/app/functions/webhook/main.go @@ -30,6 +30,7 @@ import ( "github.com/cloudzero/cloudzero-agent/app/http/middleware" "github.com/cloudzero/cloudzero-agent/app/logging" "github.com/cloudzero/cloudzero-agent/app/storage/repo" + "github.com/cloudzero/cloudzero-agent/app/storage/streaming" "github.com/cloudzero/cloudzero-agent/app/utils" "github.com/cloudzero/cloudzero-agent/app/utils/k8s" ) @@ -84,13 +85,37 @@ func main() { fmt.Println(string(enc)) } - // setup database + if backfill { + log.Info().Msg("Starting backfill mode") + streamStore := streaming.New(settings) + wd, err2 := webhook.NewWebhookFactory(streamStore, settings, clock) + if err2 != nil { + log.Fatal().Err(err2).Msg("failed to create webhook domain controller") + } + k8sClient, err2 := k8s.NewClient(settings.K8sClient.KubeConfig) + if err2 != nil { + log.Fatal().Err(err2).Msg("Failed to build k8s client") + } + enum := backfiller.NewKubernetesObjectEnumerator(k8sClient, wd, settings) + if backfillNoWait { + enum.DisableServiceWait() + } + if err2 = enum.Start(context.Background()); err2 != nil { + log.Fatal().Err(err2).Msg("Failed to start Kubernetes object enumerator") + } + if err2 = streamStore.Flush(); err2 != nil { + log.Fatal().Err(err2).Msg("failed to flush remaining backfill data") + } + return + } + + // --- webhook server mode --- + store, err := repo.NewInMemoryResourceRepository(clock) if err != nil { log.Fatal().Err(err).Msg("Failed to create in-memory resource repository") } - // Start a monitor that can pickup secrets changes and update the settings ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -104,7 +129,6 @@ func main() { } }() - // create remote metrics writer dataPusher := pusher.New(ctx, store, clock, settings) if err = dataPusher.Run(); err != nil { log.Fatal().Err(err).Msg("failed to start remote metrics writer") @@ -113,13 +137,10 @@ func main() { log.Ctx(ctx).Debug().Msg("Starting main shutdown process") if innerErr := dataPusher.Shutdown(); innerErr != nil { log.Err(innerErr).Msg("failed to flush data") - // Exit with a non-zero status code to indicate failure because we - // are potentially losing data. os.Exit(1) } }() - // start the housekeeper to delete old data hk := housekeeper.New(ctx, store, clock, settings) if err = hk.Run(); err != nil { log.Fatal().Err(err).Msg("failed to start database housekeeper") @@ -135,23 +156,6 @@ func main() { log.Fatal().Err(err).Msg("failed to create webhook domain controller") } - if backfill { - log.Ctx(ctx).Info().Msg("Starting backfill mode") - // setup k8s client - k8sClient, err2 := k8s.NewClient(settings.K8sClient.KubeConfig) - if err2 != nil { - log.Fatal().Err(err2).Msg("Failed to build k8s client") - } - enum := backfiller.NewKubernetesObjectEnumerator(k8sClient, wd, settings) - if backfillNoWait { - enum.DisableServiceWait() - } - if err3 := enum.Start(context.Background()); err3 != nil { - log.Fatal().Err(err3).Msg("Failed to start Kubernetes object enumerator") - } - return - } - defer func() { if r := recover(); r != nil { logger.Panic().Interface("panic", r).Msg("application panicked, exiting") diff --git a/app/profiling/profiling.go b/app/profiling/profiling.go new file mode 100644 index 00000000..85feb445 --- /dev/null +++ b/app/profiling/profiling.go @@ -0,0 +1,239 @@ +// SPDX-FileCopyrightText: Copyright (c) 2016-2026, CloudZero, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Package profiling provides a self-contained pprof profiling server and +// periodic profile dump facility for CloudZero Agent components. +// +// Usage: +// +// stop := profiling.Start(profiling.Options{ +// Enabled: true, +// Port: 6060, +// Dir: "/var/lib/agent/profiles", +// HeapInterval: 30 * time.Second, +// CPUInterval: 5 * time.Minute, +// CPUDuration: 30 * time.Second, +// }) +// defer stop() +// +// When Enabled is false, Start returns immediately with a no-op stop function +// and starts no goroutines or listeners. +package profiling + +import ( + "context" + "fmt" + "net/http" + _ "net/http/pprof" //nolint:gosec // intentional: we proxy DefaultServeMux through a private mux, not exposed unless profiling is enabled + "os" + "path/filepath" + "runtime/pprof" + "time" + + "github.com/rs/zerolog/log" +) + +// Options configures the profiling server and periodic profile dumps. +type Options struct { + // Enabled is the master switch. Nothing runs unless this is true. + Enabled bool + + // Port is the TCP port for the pprof HTTP server. Defaults to 6060 when zero. + Port int + + // Dir is the output directory for periodic profile dumps. Heap and CPU dumps + // are disabled when Dir is empty. + Dir string + + // HeapInterval is the interval between heap profile snapshots. Defaults to + // 30 seconds when zero. Heap dumps are only written when Dir is set. + HeapInterval time.Duration + + // CPUInterval is the interval between CPU capture sessions. CPU profiling is + // disabled when CPUInterval is zero or Dir is empty. + CPUInterval time.Duration + + // CPUDuration is the length of each CPU capture window. Defaults to 30 seconds + // when zero (only meaningful when CPUInterval > 0). + CPUDuration time.Duration +} + +const ( + defaultProfilingPort = 6060 + defaultHeapInterval = 30 * time.Second + defaultCPUDuration = 30 * time.Second +) + +// applyDefaults fills in zero-value fields with their documented defaults. +func applyDefaults(opts *Options) { + if opts.Port == 0 { + opts.Port = defaultProfilingPort + } + if opts.HeapInterval == 0 { + opts.HeapInterval = defaultHeapInterval + } + if opts.CPUDuration == 0 { + opts.CPUDuration = defaultCPUDuration + } +} + +// Start launches the profiling subsystem according to opts and returns a stop +// function that shuts everything down cleanly. +// +// If opts.Enabled is false, Start is a no-op and the returned function does +// nothing. No goroutines are started and no ports are opened. +func Start(opts Options) (stop func()) { + if !opts.Enabled { + return func() {} + } + + applyDefaults(&opts) + + done := make(chan struct{}) + + // --- pprof HTTP server --- + addr := fmt.Sprintf(":%d", opts.Port) + mux := http.NewServeMux() + // net/http/pprof registers its handlers on http.DefaultServeMux, so we + // proxy through to it so the caller's server mux is unaffected. + mux.Handle("/debug/pprof/", http.DefaultServeMux) + + srv := &http.Server{ //nolint:gosec // G112: profiling server is internal-only, not exposed to untrusted clients + Addr: addr, + Handler: mux, + } + + go func() { + log.Info().Str("addr", addr).Msg("profiling: pprof HTTP server starting") + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Error().Err(err).Msg("profiling: pprof HTTP server error") + } + }() + + // --- heap dump goroutine --- + if opts.Dir != "" { + go func() { + ticker := time.NewTicker(opts.HeapInterval) + defer ticker.Stop() + for { + select { + case <-done: + return + case t := <-ticker.C: + writeHeapProfile(opts.Dir, t) + } + } + }() + log.Info(). + Str("dir", opts.Dir). + Dur("interval", opts.HeapInterval). + Msg("profiling: heap dump goroutine started") + } + + // --- CPU profile goroutine --- + if opts.CPUInterval > 0 && opts.Dir != "" { + go func() { + ticker := time.NewTicker(opts.CPUInterval) + defer ticker.Stop() + for { + select { + case <-done: + return + case t := <-ticker.C: + writeCPUProfile(opts.Dir, t, opts.CPUDuration, done) + } + } + }() + log.Info(). + Str("dir", opts.Dir). + Dur("interval", opts.CPUInterval). + Dur("duration", opts.CPUDuration). + Msg("profiling: CPU profile goroutine started") + } + + return func() { + log.Info().Msg("profiling: shutting down") + close(done) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + log.Error().Err(err).Msg("profiling: error during HTTP server shutdown") + } + } +} + +// writeHeapProfile writes a heap profile to opts.Dir with a timestamped filename. +// The file is written under a `.tmp` suffix and atomically renamed on success so +// readers never observe a partially-written profile. +func writeHeapProfile(dir string, t time.Time) { + name := filepath.Join(dir, "heap-"+t.UTC().Format("20060102-150405.000")+".pb.gz") + tmp := name + ".tmp" + f, err := os.Create(tmp) + if err != nil { + log.Error().Err(err).Str("path", name).Msg("profiling: failed to create heap profile file") + return + } + + if err := pprof.WriteHeapProfile(f); err != nil { + _ = f.Close() + _ = os.Remove(tmp) + log.Error().Err(err).Str("path", name).Msg("profiling: failed to write heap profile") + return + } + if err := f.Close(); err != nil { + _ = os.Remove(tmp) + log.Error().Err(err).Str("path", name).Msg("profiling: failed to close heap profile") + return + } + if err := os.Rename(tmp, name); err != nil { + _ = os.Remove(tmp) + log.Error().Err(err).Str("path", name).Msg("profiling: failed to rename heap profile") + return + } + log.Debug().Str("path", name).Msg("profiling: heap profile written") +} + +// writeCPUProfile captures a CPU profile of length duration and writes it to dir. +// done is checked before starting so a shutdown during the inter-tick gap is clean. +func writeCPUProfile(dir string, t time.Time, duration time.Duration, done <-chan struct{}) { + // Don't start a new CPU capture if we've been asked to stop. + select { + case <-done: + return + default: + } + + name := filepath.Join(dir, "cpu-"+t.UTC().Format("20060102-150405.000")+".pb.gz") + tmp := name + ".tmp" + f, err := os.Create(tmp) + if err != nil { + log.Error().Err(err).Str("path", name).Msg("profiling: failed to create CPU profile file") + return + } + + if err := pprof.StartCPUProfile(f); err != nil { + _ = f.Close() + _ = os.Remove(tmp) + log.Error().Err(err).Str("path", name).Msg("profiling: failed to start CPU profile") + return + } + + // Wait for the capture window, but honour an early stop request. + select { + case <-done: + case <-time.After(duration): + } + + pprof.StopCPUProfile() + if err := f.Close(); err != nil { + _ = os.Remove(tmp) + log.Error().Err(err).Str("path", name).Msg("profiling: failed to close CPU profile") + return + } + if err := os.Rename(tmp, name); err != nil { + _ = os.Remove(tmp) + log.Error().Err(err).Str("path", name).Msg("profiling: failed to rename CPU profile") + return + } + log.Debug().Str("path", name).Msg("profiling: CPU profile written") +} diff --git a/app/profiling/profiling_test.go b/app/profiling/profiling_test.go new file mode 100644 index 00000000..cdde23d3 --- /dev/null +++ b/app/profiling/profiling_test.go @@ -0,0 +1,175 @@ +// SPDX-FileCopyrightText: Copyright (c) 2016-2026, CloudZero, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package profiling_test + +import ( + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cloudzero/cloudzero-agent/app/profiling" +) + +// waitForPort polls addr until it either accepts a connection or the deadline +// is exceeded. Returns true if the port is reachable within the timeout. +func waitForPort(addr string, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := http.Get("http://" + addr + "/debug/pprof/") + if err == nil { + resp.Body.Close() + return true + } + time.Sleep(10 * time.Millisecond) + } + return false +} + +// portOpen returns true if the given address is currently serving HTTP. +func portOpen(addr string) bool { + resp, err := http.Get("http://" + addr + "/debug/pprof/") + if err != nil { + return false + } + resp.Body.Close() + return true +} + +// TestStart_Disabled_DoesNothing verifies that calling Start with Enabled:false +// starts no HTTP listener and returns a callable (no-op) stop function. +func TestStart_Disabled_DoesNothing(t *testing.T) { + stop := profiling.Start(profiling.Options{}) // Enabled defaults to false + require.NotNil(t, stop) + + // Default port must NOT be listening. + assert.False(t, portOpen("localhost:6060"), "port 6060 should not be listening when disabled") + + // stop must not panic. + stop() +} + +// TestStart_Enabled_StartsPprofServer verifies that enabling profiling causes +// the pprof HTTP server to start and serve the index page. +func TestStart_Enabled_StartsPprofServer(t *testing.T) { + const port = 16061 + addr := fmt.Sprintf("localhost:%d", port) + + stop := profiling.Start(profiling.Options{ + Enabled: true, + Port: port, + }) + require.NotNil(t, stop) + t.Cleanup(stop) + + require.True(t, waitForPort(addr, 2*time.Second), "pprof server did not start on port %d", port) + + resp, err := http.Get("http://" + addr + "/debug/pprof/") + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +// TestStart_WritesHeapDumps verifies that heap profile files are written to +// opts.Dir at approximately HeapInterval. +func TestStart_WritesHeapDumps(t *testing.T) { + const port = 16062 + dir := t.TempDir() + + stop := profiling.Start(profiling.Options{ + Enabled: true, + Port: port, + Dir: dir, + HeapInterval: 200 * time.Millisecond, + }) + require.NotNil(t, stop) + t.Cleanup(stop) + + // Poll for at least 2 heap dumps. HeapInterval is 200ms, so under a + // healthy runner this completes in ~400ms; the generous deadline absorbs + // scheduling jitter on loaded CI without making the happy path slow. + var matches []string + require.Eventually(t, func() bool { + m, err := filepath.Glob(filepath.Join(dir, "heap-*.pb.gz")) + if err != nil { + return false + } + matches = m + return len(matches) >= 2 + }, 5*time.Second, 50*time.Millisecond, "expected at least 2 heap dump files within deadline") + + for _, m := range matches { + info, err := os.Stat(m) + require.NoError(t, err) + assert.Greater(t, info.Size(), int64(0), "heap file %s should not be empty", m) + } +} + +// TestStart_WritesCPUProfiles verifies that CPU profile files are written to +// opts.Dir after the first CPUInterval elapses. +func TestStart_WritesCPUProfiles(t *testing.T) { + const port = 16063 + dir := t.TempDir() + + stop := profiling.Start(profiling.Options{ + Enabled: true, + Port: port, + Dir: dir, + CPUInterval: 300 * time.Millisecond, + CPUDuration: 100 * time.Millisecond, + }) + require.NotNil(t, stop) + t.Cleanup(stop) + + // Poll for at least one CPU capture. First capture finishes at + // CPUInterval+CPUDuration = 400ms; the generous deadline absorbs runner + // scheduling jitter. + var matches []string + require.Eventually(t, func() bool { + m, err := filepath.Glob(filepath.Join(dir, "cpu-*.pb.gz")) + if err != nil { + return false + } + matches = m + return len(matches) >= 1 + }, 5*time.Second, 50*time.Millisecond, "expected at least 1 CPU profile file within deadline") + + for _, m := range matches { + info, err := os.Stat(m) + require.NoError(t, err) + assert.Greater(t, info.Size(), int64(0), "cpu file %s should not be empty", m) + } +} + +// TestStart_StopShutsDownServer verifies that calling the stop function causes +// the pprof HTTP server to stop accepting connections. +func TestStart_StopShutsDownServer(t *testing.T) { + const port = 16064 + addr := fmt.Sprintf("localhost:%d", port) + + stop := profiling.Start(profiling.Options{ + Enabled: true, + Port: port, + }) + require.NotNil(t, stop) + + require.True(t, waitForPort(addr, 2*time.Second), "pprof server did not start on port %d", port) + + stop() + + // Give the server a moment to finish shutting down. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if !portOpen(addr) { + return // success + } + time.Sleep(20 * time.Millisecond) + } + t.Errorf("pprof server on port %d is still reachable after stop()", port) +} diff --git a/app/storage/streaming/store.go b/app/storage/streaming/store.go new file mode 100644 index 00000000..d411213e --- /dev/null +++ b/app/storage/streaming/store.go @@ -0,0 +1,122 @@ +// SPDX-FileCopyrightText: Copyright (c) 2016-2026, CloudZero, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Package streaming provides a ResourceStore implementation that sends records +// directly to a remote_write endpoint instead of persisting them. Designed for +// the backfill job where the SQLite store is unnecessary overhead. +package streaming + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog/log" + + config "github.com/cloudzero/cloudzero-agent/app/config/webhook" + "github.com/cloudzero/cloudzero-agent/app/domain/pusher" + "github.com/cloudzero/cloudzero-agent/app/types" +) + +// Store implements types.ResourceStore by buffering records in memory and +// flushing them to the collector via remote_write when a batch threshold +// is reached. It does not persist data to disk or support queries. +const defaultBatchRecords = 500 + +type Store struct { + settings *config.Settings + mu sync.Mutex + batch []*types.ResourceTags + maxBatchCount int + maxRetries int + sendTimeout time.Duration +} + +// New creates a streaming store that sends records directly to the +// collector endpoint configured in settings.RemoteWrite. +func New(settings *config.Settings) *Store { + return &Store{ + settings: settings, + batch: make([]*types.ResourceTags, 0, defaultBatchRecords), + maxBatchCount: defaultBatchRecords, + maxRetries: settings.RemoteWrite.MaxRetries, + sendTimeout: settings.RemoteWrite.SendTimeout, + } +} + +func (s *Store) Create(_ context.Context, record *types.ResourceTags) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.batch = append(s.batch, record) + + if len(s.batch) >= s.maxBatchCount { + return s.flushLocked() + } + return nil +} + +func (s *Store) Update(ctx context.Context, record *types.ResourceTags) error { + return s.Create(ctx, record) +} + +// Flush sends any buffered records to the collector. +func (s *Store) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.flushLocked() +} + +func (s *Store) flushLocked() error { + if len(s.batch) == 0 { + return nil + } + + ts := pusher.FormatMetrics(s.batch) + log.Info(). + Int("records", len(s.batch)). + Int("timeseries", len(ts)). + Msg("streaming store: flushing batch to collector") + + err := pusher.PushMetrics( + context.Background(), + s.settings.RemoteWrite.Host, + s.settings.GetAPIKey(), + ts, + s.maxRetries, + s.settings.RemoteWrite.SendTimeout, + ) + + // Always clear the batch — if the send fails, the data is lost, which + // is acceptable for backfill (it will be rediscovered on the next run). + s.batch = s.batch[:0] + + if err != nil { + return fmt.Errorf("streaming store flush: %w", err) + } + return nil +} + +// The remaining ResourceStore methods are no-ops or return not-found. The +// backfill path writes records but never queries them. + +func (s *Store) Tx(_ context.Context, block func(context.Context) error) error { + return block(context.Background()) +} + +func (s *Store) Get(_ context.Context, _ string) (*types.ResourceTags, error) { + return nil, types.ErrNotFound +} + +func (s *Store) Delete(_ context.Context, _ string) error { return nil } +func (s *Store) Count(_ context.Context) (int, error) { return 0, nil } +func (s *Store) DeleteAll(_ context.Context) error { return nil } + +func (s *Store) FindFirstBy(_ context.Context, _ ...interface{}) (*types.ResourceTags, error) { + return nil, types.ErrNotFound +} + +func (s *Store) FindAllBy(_ context.Context, _ ...interface{}) ([]*types.ResourceTags, error) { + return nil, nil +} diff --git a/app/storage/streaming/store_test.go b/app/storage/streaming/store_test.go new file mode 100644 index 00000000..2e778c1c --- /dev/null +++ b/app/storage/streaming/store_test.go @@ -0,0 +1,277 @@ +// SPDX-FileCopyrightText: Copyright (c) 2016-2026, CloudZero, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package streaming_test + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "regexp" + "sync" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/microcosm-cc/bluemonday" + "github.com/prometheus/prometheus/prompb" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + config "github.com/cloudzero/cloudzero-agent/app/config/webhook" + "github.com/cloudzero/cloudzero-agent/app/domain/webhook" + "github.com/cloudzero/cloudzero-agent/app/domain/webhook/backfiller" + "github.com/cloudzero/cloudzero-agent/app/storage/streaming" + "github.com/cloudzero/cloudzero-agent/app/utils" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func TestStreamingStoreEndToEnd(t *testing.T) { + zerolog.SetGlobalLevel(zerolog.WarnLevel) + log.Logger = zerolog.Nop() + + sink := newCollectorSink(t) + defer sink.server.Close() + + settings := makeSettings(t, sink.server.URL) + clock := &utils.Clock{} + + store := streaming.New(settings) + wc, err := webhook.NewWebhookFactory(store, settings, clock) + require.NoError(t, err) + + fakeClient := fake.NewClientset( + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "production", + Labels: map[string]string{ + "environment": "prod", + "team": "platform", + "unmatched": "should-be-filtered", + }, + }, + }, + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "staging", + Labels: map[string]string{ + "environment": "staging", + "team": "qa", + }, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "web-1", + Namespace: "production", + Labels: map[string]string{ + "app": "web", + "team": "platform", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "c", Image: "nginx"}}, + }, + }, + ) + + enum := backfiller.NewKubernetesObjectEnumerator(fakeClient, wc, settings) + enum.DisableServiceWait() + require.NoError(t, enum.Start(context.Background())) + require.NoError(t, store.Flush()) + + received := sink.timeseries() + require.NotEmpty(t, received, "collector should have received timeseries") + + // Index received timeseries by __name__ for easier lookup. + type ts struct { + labels map[string]string + } + byName := map[string][]ts{} + for _, series := range received { + entry := ts{labels: map[string]string{}} + var name string + for _, l := range series.Labels { + if l.Name == "__name__" { + name = l.Value + } else { + entry.labels[l.Name] = l.Value + } + } + byName[name] = append(byName[name], entry) + } + + t.Logf("received %d timeseries across %d WriteRequests", len(received), sink.requestCount()) + for name, entries := range byName { + t.Logf(" %s: %d entries", name, len(entries)) + for i, e := range entries { + if i < 3 { + t.Logf(" %v", e.labels) + } + } + } + + // We should have namespace and pod label timeseries. + nsLabels := byName["cloudzero_namespace_labels"] + require.NotEmpty(t, nsLabels, "should have cloudzero_namespace_labels timeseries") + + // Find the "production" namespace entry. + var prodEntry *ts + for i := range nsLabels { + if nsLabels[i].labels["namespace"] == "production" { + prodEntry = &nsLabels[i] + break + } + } + require.NotNil(t, prodEntry, "should have namespace labels for 'production'") + assert.Equal(t, "prod", prodEntry.labels["label_environment"]) + assert.Equal(t, "platform", prodEntry.labels["label_team"]) + assert.NotContains(t, prodEntry.labels, "label_unmatched", + "unmatched label should be filtered by pattern") + + // Verify pod labels. + podLabels := byName["cloudzero_pod_labels"] + require.NotEmpty(t, podLabels, "should have cloudzero_pod_labels timeseries") + var podEntry *ts + for i := range podLabels { + if podLabels[i].labels["pod"] == "web-1" { + podEntry = &podLabels[i] + break + } + } + require.NotNil(t, podEntry, "should have pod labels for 'web-1'") + assert.Equal(t, "web", podEntry.labels["label_app"]) + assert.Equal(t, "platform", podEntry.labels["label_team"]) +} + +// collectorSink is an httptest server that captures Prometheus WriteRequests. +type collectorSink struct { + server *httptest.Server + mu sync.Mutex + reqs []prompb.WriteRequest +} + +func newCollectorSink(t *testing.T) *collectorSink { + t.Helper() + s := &collectorSink{} + mux := http.NewServeMux() + mux.HandleFunc("/v1/container-metrics", func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("sink: read body: %v", err) + http.Error(w, "read error", 500) + return + } + r.Body.Close() + + decoded, err := snappy.Decode(nil, body) + if err != nil { + t.Errorf("sink: snappy decode: %v", err) + http.Error(w, "decode error", 500) + return + } + + var wr prompb.WriteRequest + if err := proto.Unmarshal(decoded, &wr); err != nil { + t.Errorf("sink: proto unmarshal: %v", err) + http.Error(w, "unmarshal error", 500) + return + } + + s.mu.Lock() + s.reqs = append(s.reqs, wr) + s.mu.Unlock() + + w.WriteHeader(http.StatusOK) + }) + s.server = httptest.NewServer(mux) + return s +} + +func (s *collectorSink) timeseries() []prompb.TimeSeries { + s.mu.Lock() + defer s.mu.Unlock() + var all []prompb.TimeSeries + for _, wr := range s.reqs { + all = append(all, wr.Timeseries...) + } + return all +} + +func (s *collectorSink) requestCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.reqs) +} + +func makeSettings(t *testing.T, sinkURL string) *config.Settings { + t.Helper() + collectorURL := fmt.Sprintf("%s/v1/container-metrics?cloud_account_id=test&cluster_name=test®ion=us-west-2", sinkURL) + s := &config.Settings{ + CloudAccountID: "test", + Region: "us-west-2", + ClusterName: "test", + Destination: collectorURL, + Logging: config.Logging{Level: "error"}, + RemoteWrite: config.RemoteWrite{ + Host: collectorURL, + SendInterval: 1 * time.Second, + MaxBytesPerSend: 500000, + SendTimeout: 10 * time.Second, + MaxRetries: 3, + }, + K8sClient: config.K8sClient{PaginationLimit: 500}, + Database: config.Database{ + RetentionTime: 24 * time.Hour, + CleanupInterval: 3 * time.Hour, + BatchUpdateSize: 500, + }, + Server: config.Server{ + Port: 0, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 120 * time.Second, + }, + Filters: config.Filters{ + Labels: config.Labels{ + Enabled: true, + Patterns: []string{`^environment$`, `^team$`, `^app$`}, + Resources: config.Resources{ + Namespaces: true, + Pods: true, + }, + }, + Annotations: config.Annotations{ + Enabled: false, + Resources: config.Resources{}, + }, + }, + } + s.Filters.Policy = *bluemonday.UGCPolicy() + s.LabelMatches = compilePatterns(s.Filters.Labels.Patterns) + s.AnnotationMatches = compilePatterns(s.Filters.Annotations.Patterns) + return s +} + +func compilePatterns(patterns []string) []regexp.Regexp { + out := make([]regexp.Regexp, 0, len(patterns)) + for _, p := range patterns { + if re, err := regexp.Compile(p); err == nil { + out = append(out, *re) + } + } + return out +} + +// Ensure we don't accidentally reference the unused runtime import from +// the fake clientset's k8sruntime alias. +var _ k8sruntime.Object = (*corev1.Pod)(nil) diff --git a/tests/stress/backfill/backfill_test.go b/tests/stress/backfill/backfill_test.go index dda971e7..d38cd3ca 100644 --- a/tests/stress/backfill/backfill_test.go +++ b/tests/stress/backfill/backfill_test.go @@ -454,10 +454,13 @@ users: func writeWebhookConfig(t *testing.T, cfgPath, kubeconfigPath, serverURL string) { t.Helper() + apiKeyPath := filepath.Join(filepath.Dir(cfgPath), "api-key") + require.NoError(t, os.WriteFile(apiKeyPath, []byte("test-key"), 0o600)) sinkURL := fmt.Sprintf("%s/v1/container-metrics?cloud_account_id=test&cluster_name=stress®ion=us-west-2", serverURL) content := fmt.Sprintf(`cloud_account_id: test region: us-west-2 cluster_name: stress +api_key_path: %s host: %s destination: %s logging: @@ -476,11 +479,11 @@ database: cleanup_interval: 3h batch_update_size: 500 server: - port: 18099 + port: 0 read_timeout: 10s write_timeout: 10s idle_timeout: 120s - profiling: true + profiling: false filters: labels: enabled: true @@ -504,7 +507,7 @@ filters: resources: namespaces: false pods: false -`, serverURL, sinkURL, sinkURL, kubeconfigPath) +`, apiKeyPath, serverURL, sinkURL, sinkURL, kubeconfigPath) require.NoError(t, os.WriteFile(cfgPath, []byte(content), 0o644)) }