diff --git a/Cargo.toml b/Cargo.toml index b4e735d5e3..b500d68e92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -146,6 +146,8 @@ redundant_else = "allow" # less clear option_if_let_else = "allow" # less clear unreadable_literal = "allow" # annoying for all the ids in the codebase cognitive_complexity = "allow" # using clippy::too_many_lines already +inline_always = "allow" # hot-path perf hints are intentional; let the author decide +missing_errors_doc = "allow" # error types are self-descriptive; boilerplate # Errors sections add noise [workspace.lints.rust] future_incompatible = { level = "warn", priority = -1 } diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index ba2d7f21ea..0509d12755 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -108,3 +108,6 @@ fips = [ ] test-utils = [] regex-lite = ["libdd-common/regex-lite"] + +[lints] +workspace = true diff --git a/libdd-data-pipeline/benches/trace_buffer.rs b/libdd-data-pipeline/benches/trace_buffer.rs index e06f4b0a02..68dd9d9e73 100644 --- a/libdd-data-pipeline/benches/trace_buffer.rs +++ b/libdd-data-pipeline/benches/trace_buffer.rs @@ -1,5 +1,6 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +#![allow(clippy::expect_used, reason = "bench harness: panics are acceptable")] use std::collections::HashMap; use std::pin::Pin; @@ -18,7 +19,7 @@ use libdd_trace_utils::span::v04::SpanBytes; // Number of chunks each sender thread sends per benchmark iteration. const CHUNKS_PER_SENDER: usize = 900; -fn bs(s: &'static str) -> BytesString { +const fn bs(s: &'static str) -> BytesString { BytesString::from_static(s) } @@ -101,15 +102,14 @@ fn bench_trace_buffer(c: &mut Criterion) { )); group.bench_function( - BenchmarkId::new(format!("{}_senders", num_senders), delay_label), + BenchmarkId::new(format!("{num_senders}_senders"), delay_label), |b| { b.iter_batched( || { - Vec::from_iter( - (0..num_senders) - .map(|_| (0..CHUNKS_PER_SENDER).map(|_| vec![make_span()])) - .map(Vec::from_iter), - ) + (0..num_senders) + .map(|_| (0..CHUNKS_PER_SENDER).map(|_| vec![make_span()])) + .map(std::iter::Iterator::collect::>) + .collect::>() }, |input| { std::thread::scope(|s| { diff --git a/libdd-data-pipeline/examples/send-traces-with-stats.rs b/libdd-data-pipeline/examples/send-traces-with-stats.rs index 460018c587..b4a66985cb 100644 --- a/libdd-data-pipeline/examples/send-traces-with-stats.rs +++ b/libdd-data-pipeline/examples/send-traces-with-stats.rs @@ -1,5 +1,11 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +#![allow( + clippy::expect_used, + clippy::cast_possible_wrap, + clippy::cast_possible_truncation, + reason = "example binary: panics and lossy casts are acceptable" +)] use clap::Parser; use libdd_capabilities_impl::NativeCapabilities; @@ -27,7 +33,7 @@ fn get_span(now: i64, trace_id: u64, span_id: u64) -> pb::Span { service: "data-pipeline-test".to_string(), name: format!("test-name-{}", span_id % 2), resource: format!("test-resource-{}", (span_id + trace_id) % 3), - error: if trace_id % 10 == 0 { 1 } else { 0 }, + error: i32::from(trace_id % 10 == 0), metrics: HashMap::from([ ("_sampling_priority_v1".to_string(), 1.0), ("_dd.measured".to_string(), 1.0), diff --git a/libdd-data-pipeline/src/agent_info/fetcher.rs b/libdd-data-pipeline/src/agent_info/fetcher.rs index 43e594caab..0f8a5866e2 100644 --- a/libdd-data-pipeline/src/agent_info/fetcher.rs +++ b/libdd-data-pipeline/src/agent_info/fetcher.rs @@ -34,6 +34,10 @@ pub enum FetchInfoStatus { /// If either the agent state hash or container tags hash is different from the current one: /// - Return a `FetchInfoStatus::NewState` of the info struct /// - Else return `FetchInfoStatus::SameState` +#[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" +)] async fn fetch_info_with_state_and_container_tags( info_endpoint: &Endpoint, current_state_hash: Option<&str>, @@ -59,11 +63,15 @@ async fn fetch_info_with_state_and_container_tags( info_endpoint: &Endpoint, current_state_hash: Option<&str>, @@ -92,6 +100,10 @@ pub async fn fetch_info_with_state( /// # Ok(()) /// # } /// ``` +#[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" +)] pub async fn fetch_info( info_endpoint: &Endpoint, ) -> Result> { @@ -104,8 +116,12 @@ pub async fn fetch_info( /// Fetch and hash the response from the agent info endpoint. /// -/// Returns a tuple of (state_hash, response_body_bytes, container_tags_hash). +/// Returns a tuple of (`state_hash`, `response_body_bytes`, `container_tags_hash`). /// The hash is calculated using SHA256 to match the agent's calculation method. +#[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" +)] async fn fetch_and_hash_response( info_endpoint: &Endpoint, ) -> Result<(String, bytes::Bytes, Option)> { @@ -120,28 +136,28 @@ async fn fetch_and_hash_response( // Runtime-agnostic timeout: race the request against a capability-driven // sleep instead of `tokio::time::timeout`, which requires a tokio reactor // (not available on wasm where we run on the JS event loop). - let res = tokio::select! { + let response = tokio::select! { biased; result = client.request(req) => result?, - _ = sleeper.sleep(timeout) => { + () = sleeper.sleep(timeout) => { return Err(anyhow!("Request to /info timed out after {:?}", timeout)); } }; // Extract the Datadog-Container-Tags-Hash header - let container_tags_hash = res + let container_tags_hash = response .headers() .get("Datadog-Container-Tags-Hash") .and_then(|v| v.to_str().ok()) - .map(|s| s.to_string()); + .map(std::string::ToString::to_string); - let body_data = res.into_body(); + let body_data = response.into_body(); let hash = format!("{:x}", Sha256::digest(&body_data)); Ok((hash, body_data, container_tags_hash)) } -/// Fetch the info endpoint and update an ArcSwap keeping it up-to-date. +/// Fetch the info endpoint and update an `ArcSwap` keeping it up-to-date. /// /// This type implements [`libdd_shared_runtime::Worker`] and is intended to be driven by a worker /// runner such as [`libdd_shared_runtime::SharedRuntime`]. @@ -191,7 +207,7 @@ async fn fetch_and_hash_response( /// `C` is the capability bundle, see [`HttpClientCapability`] and [`SleepCapability`]. /// Leaf crates pin it to a concrete type. #[derive(Debug)] -pub struct AgentInfoFetcher { +pub struct AgentInfoFetcher { info_endpoint: Endpoint, refresh_interval: Duration, trigger_rx: Option>, @@ -201,13 +217,13 @@ pub struct AgentInfoFetcher { _phantom: PhantomData, } -impl AgentInfoFetcher { +impl AgentInfoFetcher { /// Return a new `AgentInfoFetcher` fetching the `info_endpoint` on each `refresh_interval` /// and updating the stored info. /// - /// Returns a tuple of (fetcher, trigger_component) where: - /// - `fetcher`: The AgentInfoFetcher to be run in a background task - /// - `response_observer`: The ResponseObserver component for checking HTTP responses + /// Returns a tuple of (fetcher, `trigger_component`) where: + /// - `fetcher`: The `AgentInfoFetcher` to be run in a background task + /// - `response_observer`: The `ResponseObserver` component for checking HTTP responses pub fn new(info_endpoint: Endpoint, refresh_interval: Duration) -> (Self, ResponseObserver) { // The trigger channel stores a single message to avoid multiple triggers. let (trigger_tx, trigger_rx) = mpsc::channel(1); @@ -244,7 +260,7 @@ impl Wor if AGENT_INFO_CACHE.load().is_none() { return; } - self.trigger().await + self.trigger().await; } async fn trigger(&mut self) { @@ -261,7 +277,7 @@ impl Wor } } // Regular periodic fetch timer - _ = sleeper.sleep(self.refresh_interval) => {} + () = sleeper.sleep(self.refresh_interval) => {} } } None => { @@ -275,10 +291,14 @@ impl Wor // Release the IoStack waker stored in trigger_rx by waking the channel and drain the // message to avoid a spurious fetch on restart. If the channel is not empty then it has // already been waked. - if self.trigger_rx.as_ref().is_some_and(|rx| rx.is_empty()) { + if self + .trigger_rx + .as_ref() + .is_some_and(tokio::sync::mpsc::Receiver::is_empty) + { let _ = self.trigger_tx.try_send(()); self.drain(); - }; + } } async fn run(&mut self) { @@ -286,8 +306,12 @@ impl Wor } } -impl AgentInfoFetcher { +impl AgentInfoFetcher { /// Fetch agent info and update cache if needed + #[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" + )] async fn fetch_and_update(&self) { let current_info = AGENT_INFO_CACHE.load(); let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str()); @@ -306,7 +330,7 @@ impl AgentInfoFetcher { AGENT_INFO_CACHE.store(Some(Arc::new(*new_info))); } Ok(FetchInfoStatus::SameState) => { - debug!("Agent info is up-to-date") + debug!("Agent info is up-to-date"); } Err(err) => { warn!(?err, "Error while fetching /info"); @@ -325,8 +349,9 @@ pub struct ResponseObserver { } impl ResponseObserver { - /// Create a new ResponseObserver with the given channel sender. - pub fn new(trigger_tx: mpsc::Sender<()>) -> Self { + /// Create a new `ResponseObserver` with the given channel sender. + #[must_use] + pub const fn new(trigger_tx: mpsc::Sender<()>) -> Self { Self { trigger_tx } } @@ -344,11 +369,11 @@ impl ResponseObserver { let current_state = AGENT_INFO_CACHE.load(); if current_state.as_ref().map(|s| s.state_hash.as_str()) != Some(state_str) { match self.trigger_tx.try_send(()) { - Ok(_) => {} - Err(mpsc::error::TrySendError::Full(_)) => { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(())) => { debug!("Response observer channel full, fetch has already been triggered"); } - Err(mpsc::error::TrySendError::Closed(_)) => { + Err(mpsc::error::TrySendError::Closed(())) => { debug!("Agent info fetcher channel closed, unable to trigger refresh"); } } @@ -583,10 +608,8 @@ mod single_threaded_tests { .body(r#"{"version":"1"}"#); }); let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap()); - let (fetcher, _response_observer) = AgentInfoFetcher::::new( - endpoint.clone(), - Duration::from_millis(100), - ); + let (fetcher, _response_observer) = + AgentInfoFetcher::::new(endpoint, Duration::from_millis(100)); assert!(agent_info::get_agent_info().is_none()); let shared_runtime = SharedRuntime::new().unwrap(); let _ = shared_runtime.spawn_worker(fetcher, true).unwrap(); @@ -652,6 +675,8 @@ mod single_threaded_tests { #[cfg_attr(miri, ignore)] #[test] fn test_agent_info_trigger_different_state() { + const MAX_ATTEMPTS: u32 = 500; + const SLEEP_DURATION_MS: u64 = 10; let server = MockServer::start(); let mock = server.mock(|when, then| { when.path("/info"); @@ -685,9 +710,6 @@ mod single_threaded_tests { response_observer.check_response(&response); // Wait for the fetch to complete - const MAX_ATTEMPTS: u32 = 500; - const SLEEP_DURATION_MS: u64 = 10; - let mut attempts = 0; while mock.calls() == 0 && attempts < MAX_ATTEMPTS { attempts += 1; diff --git a/libdd-data-pipeline/src/agent_info/schema.rs b/libdd-data-pipeline/src/agent_info/schema.rs index f0eedc97e1..f7cd5b5544 100644 --- a/libdd-data-pipeline/src/agent_info/schema.rs +++ b/libdd-data-pipeline/src/agent_info/schema.rs @@ -48,7 +48,7 @@ pub struct AgentInfoStruct { } /// Require/reject lists for tag-based trace filters exposed by the agent /info endpoint. -#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)] pub struct FilterTagsConfig { /// All listed filters must match at least one root-span tag for the trace to be accepted. pub require: Option>, @@ -75,7 +75,7 @@ pub struct Config { } #[allow(missing_docs)] -#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)] pub struct ObfuscationConfig { pub elastic_search: bool, pub mongo: bool, @@ -91,21 +91,21 @@ pub struct ObfuscationConfig { } #[allow(missing_docs)] -#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)] pub struct HttpObfuscationConfig { pub remove_query_string: bool, pub remove_path_digits: bool, } #[allow(missing_docs)] -#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)] pub struct RedisObfuscationConfig { pub enabled: bool, pub remove_all_args: bool, } #[allow(missing_docs)] -#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)] pub struct MemcachedObfuscationConfig { pub enabled: bool, pub keep_command: bool, diff --git a/libdd-data-pipeline/src/health_metrics.rs b/libdd-data-pipeline/src/health_metrics.rs index af7b3344a7..8ee7fd817f 100644 --- a/libdd-data-pipeline/src/health_metrics.rs +++ b/libdd-data-pipeline/src/health_metrics.rs @@ -4,12 +4,12 @@ //! # Trace Exporter Health Metrics //! //! This module defines all health metrics emitted by the libdatadog trace exporter. These metrics -//! are sent to DogStatsD to provide visibility for Datadog support. +//! are sent to `DogStatsD` to provide visibility for Datadog support. //! //! ## Overview //! //! Health metrics help monitor the trace exporter's behavior, including successful operations, -//! error conditions, and performance characteristics. They are emitted via DogStatsD and follow +//! error conditions, and performance characteristics. They are emitted via `DogStatsD` and follow //! consistent naming conventions. //! //! **Note**: Health metrics are **disabled by default**. They must be explicitly enabled using @@ -77,24 +77,24 @@ use std::borrow::Cow; /// Number of trace chunks successfully deserialized from input. /// -/// **Type**: Count -/// **When Emitted**: After successful deserialization of trace data from msgpack format +/// **Type**: Count\ +/// **When Emitted**: After successful deserialization of trace data from msgpack format\ /// **Tags**: `libdatadog_version` -pub(crate) const DESERIALIZE_TRACES: &str = "datadog.tracer.exporter.deserialize.traces"; +pub const DESERIALIZE_TRACES: &str = "datadog.tracer.exporter.deserialize.traces"; /// Number of trace deserialization errors. /// -/// **Type**: Count -/// **When Emitted**: When msgpack deserialization fails due to invalid format or corrupted data +/// **Type**: Count\ +/// **When Emitted**: When msgpack deserialization fails due to invalid format or corrupted data\ /// **Tags**: `libdatadog_version` -pub(crate) const DESERIALIZE_TRACES_ERRORS: &str = "datadog.tracer.exporter.deserialize.errors"; +pub const DESERIALIZE_TRACES_ERRORS: &str = "datadog.tracer.exporter.deserialize.errors"; /// Number of trace serialization errors. /// -/// **Type**: Count -/// **When Emitted**: When msgpack serialization fails +/// **Type**: Count\ +/// **When Emitted**: When msgpack serialization fails\ /// **Tags**: `libdatadog_version` -pub(crate) const SERIALIZE_TRACES_ERRORS: &str = "datadog.tracer.exporter.serialize.errors"; +pub const SERIALIZE_TRACES_ERRORS: &str = "datadog.tracer.exporter.serialize.errors"; // ============================================================================= // Transport - Trace Metrics @@ -102,22 +102,21 @@ pub(crate) const SERIALIZE_TRACES_ERRORS: &str = "datadog.tracer.exporter.serial /// Number of trace chunks included in HTTP requests to the agent (all attempts). /// -/// **Type**: Distribution -/// **When Emitted**: Always emitted for every send attempt, regardless of success or failure +/// **Type**: Distribution\ +/// **When Emitted**: Always emitted for every send attempt, regardless of success or failure\ /// **Tags**: `libdatadog_version` -pub(crate) const TRANSPORT_TRACES_SENT: &str = "datadog.tracer.exporter.transport.traces.sent"; +pub const TRANSPORT_TRACES_SENT: &str = "datadog.tracer.exporter.transport.traces.sent"; /// Number of trace chunks successfully sent to the agent. /// -/// **Type**: Count -/// **When Emitted**: After successful HTTP response from the agent (2xx status codes) +/// **Type**: Count\ +/// **When Emitted**: After successful HTTP response from the agent (2xx status codes)\ /// **Tags**: `libdatadog_version` -pub(crate) const TRANSPORT_TRACES_SUCCESSFUL: &str = - "datadog.tracer.exporter.transport.traces.successful"; +pub const TRANSPORT_TRACES_SUCCESSFUL: &str = "datadog.tracer.exporter.transport.traces.successful"; /// Number of errors encountered while sending traces to the agent. /// -/// **Type**: Count +/// **Type**: Count\ /// **When Emitted**: /// - HTTP error responses (4xx, 5xx status codes) /// - Network/connection errors @@ -133,11 +132,11 @@ pub(crate) const TRANSPORT_TRACES_SUCCESSFUL: &str = /// - `type:response_body`: Response body read errors /// - `type:build`: Request build errors /// - `type:unknown`: Fallback for unrecognized error types -pub(crate) const TRANSPORT_TRACES_FAILED: &str = "datadog.tracer.exporter.transport.traces.failed"; +pub const TRANSPORT_TRACES_FAILED: &str = "datadog.tracer.exporter.transport.traces.failed"; /// Number of trace chunks dropped due to errors. /// -/// **Type**: Distribution +/// **Type**: Distribution\ /// **When Emitted**: /// - HTTP error responses (excluding 404 Not Found and 415 Unsupported Media Type) /// - Network/connection errors @@ -147,8 +146,7 @@ pub(crate) const TRANSPORT_TRACES_FAILED: &str = "datadog.tracer.exporter.transp /// /// **Note**: 404 and 415 status codes are excluded as they represent endpoint/format issues rather /// than dropped payloads. While they aren't counted as dropped traces, they may still be dropped. -pub(crate) const TRANSPORT_TRACES_DROPPED: &str = - "datadog.tracer.exporter.transport.traces.dropped"; +pub const TRANSPORT_TRACES_DROPPED: &str = "datadog.tracer.exporter.transport.traces.dropped"; // ============================================================================= // Transport - Payload Metrics @@ -156,14 +154,14 @@ pub(crate) const TRANSPORT_TRACES_DROPPED: &str = /// Size in bytes of HTTP payloads sent to the agent. /// -/// **Type**: Distribution -/// **When Emitted**: Always emitted for every send attempt, regardless of success or failure +/// **Type**: Distribution\ +/// **When Emitted**: Always emitted for every send attempt, regardless of success or failure\ /// **Tags**: `libdatadog_version` -pub(crate) const TRANSPORT_SENT_BYTES: &str = "datadog.tracer.exporter.transport.sent.bytes"; +pub const TRANSPORT_SENT_BYTES: &str = "datadog.tracer.exporter.transport.sent.bytes"; /// Size in bytes of HTTP payloads dropped due to errors. /// -/// **Type**: Distribution +/// **Type**: Distribution\ /// **When Emitted**: /// - HTTP error responses (excluding 404 Not Found and 415 Unsupported Media Type) /// - Network/connection errors @@ -173,30 +171,30 @@ pub(crate) const TRANSPORT_SENT_BYTES: &str = "datadog.tracer.exporter.transport /// /// **Note**: 404 and 415 status codes are excluded as they represent endpoint/format issues rather /// than dropped payloads -pub(crate) const TRANSPORT_DROPPED_BYTES: &str = "datadog.tracer.exporter.transport.dropped.bytes"; +pub const TRANSPORT_DROPPED_BYTES: &str = "datadog.tracer.exporter.transport.dropped.bytes"; /// Number of HTTP requests made to the agent. /// -/// **Type**: Distribution +/// **Type**: Distribution\ /// **When Emitted**: Always emitted after each send operation, counting all HTTP attempts including /// retries **Tags**: `libdatadog_version` /// /// **Note**: Value represents total request attempts (1 for success without retries, >1 when /// retries occur) -pub(crate) const TRANSPORT_REQUESTS: &str = "datadog.tracer.exporter.transport.requests"; +pub const TRANSPORT_REQUESTS: &str = "datadog.tracer.exporter.transport.requests"; -#[derive(Debug)] -#[cfg_attr(test, derive(PartialEq))] -pub(crate) enum HealthMetric { +#[derive(Debug, Copy, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] +pub enum HealthMetric { Count(&'static str, i64), Distribution(&'static str, i64), } /// Categorization of errors from different sources (direct hyper responses vs -/// send_with_retry results) for consistent metric emission +/// `send_with_retry` results) for consistent metric emission #[derive(Debug, Clone, Copy)] #[cfg_attr(test, derive(PartialEq, Eq))] -pub(crate) enum TransportErrorType { +pub enum TransportErrorType { /// HTTP error with a specific status code (4xx, 5xx) Http(u16), /// Network/connection error @@ -210,24 +208,21 @@ pub(crate) enum TransportErrorType { } impl TransportErrorType { - pub(crate) fn as_tag_value(&self) -> Cow<'static, str> { + pub(crate) fn as_tag_value(self) -> Cow<'static, str> { match self { - TransportErrorType::Http(code) => Cow::Owned(code.to_string()), - TransportErrorType::Network => Cow::Borrowed("network"), - TransportErrorType::Timeout => Cow::Borrowed("timeout"), - TransportErrorType::ResponseBody => Cow::Borrowed("response_body"), - TransportErrorType::Build => Cow::Borrowed("build"), + Self::Http(code) => Cow::Owned(code.to_string()), + Self::Network => Cow::Borrowed("network"), + Self::Timeout => Cow::Borrowed("timeout"), + Self::ResponseBody => Cow::Borrowed("response_body"), + Self::Build => Cow::Borrowed("build"), } } /// Per the health metrics specification: /// - 404 and 415 status codes do NOT emit dropped metrics /// - All other HTTP errors and non-HTTP errors emit dropped metrics - pub(crate) fn should_emit_dropped_metrics(&self) -> bool { - !matches!( - self, - TransportErrorType::Http(404) | TransportErrorType::Http(415) - ) + pub(crate) const fn should_emit_dropped_metrics(self) -> bool { + !matches!(self, Self::Http(404 | 415)) } } @@ -236,8 +231,8 @@ impl TransportErrorType { /// This structure captures all the information needed to emit the appropriate /// health metric for a send operation regardless whence it came #[derive(Debug)] -#[cfg_attr(test, derive(Clone, PartialEq))] -pub(crate) struct SendResult { +#[cfg_attr(test, derive(Clone, PartialEq, Eq))] +pub struct SendResult { /// The error type if the operation failed, or `None` if it succeeded. pub error_type: Option, /// Size of the payload in bytes @@ -288,7 +283,7 @@ impl SendResult { /// Returns whether the send operation was successful #[cfg(test)] - pub(crate) fn is_success(&self) -> bool { + pub(crate) const fn is_success(&self) -> bool { self.error_type.is_none() } @@ -296,13 +291,17 @@ impl SendResult { /// /// This method encapsulates all the logic for determining which metrics to /// emit based on the send operation. It returns a vector of metrics that - /// should be sent to DogStatsD + /// should be sent to `DogStatsD` /// /// # Returns /// /// A vector of `(HealthMetric, Option>)` tuples where: /// - The first element is the metric to emit /// - The second element is an optional tag value for error classification + #[allow( + clippy::cast_possible_wrap, + reason = "trace/byte counts never realistically exceed i64::MAX" + )] pub(crate) fn collect_metrics(&self) -> Vec<(HealthMetric, Option>)> { // Max capacity: 3 base + 1 outcome + 2 dropped let mut metrics = Vec::with_capacity(6); @@ -317,7 +316,7 @@ impl SendResult { None, )); metrics.push(( - HealthMetric::Distribution(TRANSPORT_REQUESTS, self.request_attempts as i64), + HealthMetric::Distribution(TRANSPORT_REQUESTS, i64::from(self.request_attempts)), None, )); @@ -363,7 +362,7 @@ impl SendResult { mod tests { use super::*; - /// Test-only extension methods for SendResult + /// Test-only extension methods for `SendResult` impl SendResult { /// Create a `SendResult` from a `SendWithRetryResult`. /// @@ -646,17 +645,17 @@ mod tests { let has_sent_bytes = metrics.iter().any(|(m, _)| { matches!(m, HealthMetric::Distribution(name, _) if *name == TRANSPORT_SENT_BYTES) }); - assert!(has_sent_bytes, "Missing sent_bytes for {:?}", result); + assert!(has_sent_bytes, "Missing sent_bytes for {result:?}"); let has_sent_traces = metrics.iter().any(|(m, _)| { matches!(m, HealthMetric::Distribution(name, _) if *name == TRANSPORT_TRACES_SENT) }); - assert!(has_sent_traces, "Missing sent_traces for {:?}", result); + assert!(has_sent_traces, "Missing sent_traces for {result:?}"); let has_requests = metrics.iter().any(|(m, _)| { matches!(m, HealthMetric::Distribution(name, _) if *name == TRANSPORT_REQUESTS) }); - assert!(has_requests, "Missing requests for {:?}", result); + assert!(has_requests, "Missing requests for {result:?}"); } } @@ -800,7 +799,7 @@ mod tests { assert_eq!(name, SERIALIZE_TRACES_ERRORS); assert_eq!(count, 1); } - _ => panic!("Expected Count metric"), + HealthMetric::Distribution(..) => panic!("Expected Count metric"), } } } diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 2b9955ce3d..cb2cb8e4fd 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -1,13 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -#![cfg_attr(not(test), deny(clippy::panic))] -#![cfg_attr(not(test), deny(clippy::unwrap_used))] -#![cfg_attr(not(test), deny(clippy::expect_used))] -#![cfg_attr(not(test), deny(clippy::unreachable))] -#![cfg_attr(not(test), deny(clippy::todo))] -#![cfg_attr(not(test), deny(clippy::unimplemented))] -//! TraceExporter provides a minimum viable product (MVP) to send traces to agents. The aim of the +//! `TraceExporter` provides a minimum viable product (MVP) to send traces to agents. The aim of the //! project at this state is to provide a basic API in order to test its viability and integration //! in different languages. diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 02d7a45f80..4624995514 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -8,7 +8,7 @@ use std::time::Duration; /// OTLP trace export protocol. HTTP/JSON is currently supported. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -pub(crate) enum OtlpProtocol { +pub enum OtlpProtocol { /// HTTP with JSON body (Content-Type: application/json). Default for HTTP. #[default] HttpJson, @@ -32,7 +32,7 @@ pub struct OtlpTraceConfig { pub headers: HeaderMap, /// Request timeout. pub timeout: Duration, - /// Protocol (for future use; currently only HttpJson is supported). + /// Protocol (for future use; currently only `HttpJson` is supported). #[allow(dead_code)] pub(crate) protocol: OtlpProtocol, } diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 08c849392e..91efe127c8 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -22,6 +22,10 @@ const OTLP_RETRY_DELAY_MS: u64 = 100; /// /// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests /// against the Datadog test agent's OTLP endpoint. +#[allow( + clippy::future_not_send, + reason = "&C is not Send because C lacks Sync bound; adding it would be an API break" +)] pub async fn send_otlp_traces_http( capabilities: &C, config: &OtlpTraceConfig, @@ -30,13 +34,16 @@ pub async fn send_otlp_traces_http( ) -> Result<(), TraceExporterError> { let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( - "Invalid OTLP endpoint URL: {}", - e + "Invalid OTLP endpoint URL: {e}" ))) })?; let target = Endpoint { url, + #[allow( + clippy::cast_possible_truncation, + reason = "timeout in millis fits comfortably in u64" + )] timeout_ms: config.timeout.as_millis() as u64, ..Endpoint::default() }; @@ -64,11 +71,11 @@ pub async fn send_otlp_traces_http( match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await { Ok(_) => Ok(()), - Err(e) => Err(map_send_error(e).await), + Err(e) => Err(map_send_error(e)), } } -async fn map_send_error(err: SendWithRetryError) -> TraceExporterError { +fn map_send_error(err: SendWithRetryError) -> TraceExporterError { match err { SendWithRetryError::Http(response, _) => { let status = response.status(); diff --git a/libdd-data-pipeline/src/telemetry/error.rs b/libdd-data-pipeline/src/telemetry/error.rs index 21dc6ef6b2..a12aeb5b4f 100644 --- a/libdd-data-pipeline/src/telemetry/error.rs +++ b/libdd-data-pipeline/src/telemetry/error.rs @@ -5,7 +5,7 @@ use std::error::Error; use std::fmt::Display; -/// TelemetryError holds different types of errors that occur when sending metrics. +/// `TelemetryError` holds different types of errors that occur when sending metrics. #[derive(Debug)] pub enum TelemetryError { /// Invalid configuration during Telemetry client creation. @@ -17,15 +17,15 @@ pub enum TelemetryError { impl Display for TelemetryError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - TelemetryError::Builder(e) => write!(f, "Telemetry client builder failed: {e}"), - TelemetryError::Send(e) => write!(f, "Send metric failed: {e}"), + Self::Builder(e) => write!(f, "Telemetry client builder failed: {e}"), + Self::Send(e) => write!(f, "Send metric failed: {e}"), } } } impl From for TelemetryError { fn from(value: anyhow::Error) -> Self { - TelemetryError::Send(value.to_string()) + Self::Send(value.to_string()) } } diff --git a/libdd-data-pipeline/src/telemetry/metrics.rs b/libdd-data-pipeline/src/telemetry/metrics.rs index feeb74e492..79660c3133 100644 --- a/libdd-data-pipeline/src/telemetry/metrics.rs +++ b/libdd-data-pipeline/src/telemetry/metrics.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! Provides an abstraction layer to hold metrics that comes from 'SendDataResult'. +//! Provides an abstraction layer to hold metrics that comes from '`SendDataResult`'. use libdd_common::tag; use libdd_telemetry::data::metrics::{MetricNamespace, MetricType}; use libdd_telemetry::metrics::ContextKey; @@ -11,30 +11,30 @@ use std::ops::Index; /// Used as identifier to match the different metrics. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub enum MetricKind { - /// trace_api.requests metric + /// `trace_api.requests` metric ApiRequest, - /// trace_api.errors (network) metric + /// `trace_api.errors` (network) metric ApiErrorsNetwork, - /// trace_api.errors (timeout) metric + /// `trace_api.errors` (timeout) metric ApiErrorsTimeout, - /// trace_api.errors (status_code) metric + /// `trace_api.errors` (`status_code`) metric ApiErrorsStatusCode, - /// trace_api.bytes metric + /// `trace_api.bytes` metric ApiBytes, - /// trace_api.responses metric + /// `trace_api.responses` metric ApiResponses, - /// trace_chunks_sent metric + /// `trace_chunks_sent` metric ChunksSent, - /// trace_chunks_dropped metric (reason: p0_drop) + /// `trace_chunks_dropped` metric (reason: `p0_drop`) ChunksDroppedP0, - /// trace_chunks_dropped metric (reason: serialization_error) + /// `trace_chunks_dropped` metric (reason: `serialization_error`) ChunksDroppedSerializationError, - /// trace_chunks_dropped metric (reason: send_failure) + /// `trace_chunks_dropped` metric (reason: `send_failure`) ChunksDroppedSendFailure, } /// Constants for metric names -/// These must match https://github.com/DataDog/dd-go/blob/prod/trace/apps/tracer-telemetry-intake/telemetry-metrics/static/common_metrics.json +/// These must match const API_REQUEST_STR: &str = "trace_api.requests"; const API_ERRORS_STR: &str = "trace_api.errors"; const API_BYTES_STR: &str = "trace_api.bytes"; @@ -45,7 +45,7 @@ const CHUNKS_DROPPED_STR: &str = "trace_chunks_dropped"; #[derive(Debug)] struct Metric { name: &'static str, - metric_type: MetricType, + kind: MetricType, namespace: MetricNamespace, tags: &'static [libdd_common::tag::Tag], } @@ -53,25 +53,25 @@ struct Metric { const METRICS: &[Metric] = &[ Metric { name: API_REQUEST_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"]], }, Metric { name: API_ERRORS_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"], tag!["type", "network"]], }, Metric { name: API_ERRORS_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"], tag!["type", "timeout"]], }, Metric { name: API_ERRORS_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[ tag!["src_library", "libdatadog"], @@ -80,31 +80,31 @@ const METRICS: &[Metric] = &[ }, Metric { name: API_BYTES_STR, - metric_type: MetricType::Distribution, + kind: MetricType::Distribution, namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"]], }, Metric { name: API_RESPONSES_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"]], }, Metric { name: CHUNKS_SENT_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"]], }, Metric { name: CHUNKS_DROPPED_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"], tag!["reason", "p0_drop"]], }, Metric { name: CHUNKS_DROPPED_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[ tag!["src_library", "libdatadog"], @@ -113,7 +113,7 @@ const METRICS: &[Metric] = &[ }, Metric { name: CHUNKS_DROPPED_STR, - metric_type: MetricType::Count, + kind: MetricType::Count, namespace: MetricNamespace::Tracers, tags: &[ tag!["src_library", "libdatadog"], @@ -141,7 +141,7 @@ impl Metrics { let key = worker.register_metric_context( metric.name.to_string(), metric.tags.to_vec(), - metric.metric_type, + metric.kind, true, metric.namespace, ); diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 9bf4177b5a..80858059a1 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -79,7 +79,7 @@ impl TelemetryClientBuilder { } /// Sets the heartbeat notification interval in millis. - pub fn set_heartbeat(mut self, interval: u64) -> Self { + pub const fn set_heartbeat(mut self, interval: u64) -> Self { if interval > 0 { self.config.telemetry_heartbeat_interval = Duration::from_millis(interval); } @@ -111,7 +111,7 @@ impl TelemetryClientBuilder { } /// Sets the debug enabled flag for the telemetry client. - pub fn set_debug_enabled(mut self, debug: bool) -> Self { + pub const fn set_debug_enabled(mut self, debug: bool) -> Self { self.config.debug_enabled = debug; self } @@ -156,7 +156,7 @@ pub struct TelemetryClient { /// Telemetry describing the sending of a trace payload /// It can be produced from a [`SendWithRetryResult`] or from a [`SendDataResult`]. -#[derive(PartialEq, Debug, Default)] +#[derive(PartialEq, Eq, Debug, Default)] pub struct SendPayloadTelemetry { requests_count: u64, errors_network: u64, @@ -211,7 +211,7 @@ impl SendPayloadTelemetry { telemetry .responses_count_per_code .insert(response.status().as_u16(), 1); - telemetry.requests_count = *attempts as u64; + telemetry.requests_count = u64::from(*attempts); } Err(err) => match err { SendWithRetryError::Http(response, attempts) => { @@ -220,29 +220,25 @@ impl SendPayloadTelemetry { telemetry .responses_count_per_code .insert(response.status().as_u16(), 1); - telemetry.requests_count = *attempts as u64; + telemetry.requests_count = u64::from(*attempts); } SendWithRetryError::Timeout(attempts) => { telemetry.chunks_dropped_send_failure = chunks; telemetry.errors_timeout = 1; - telemetry.requests_count = *attempts as u64; + telemetry.requests_count = u64::from(*attempts); } - SendWithRetryError::Network(_, attempts) => { + SendWithRetryError::Network(_, attempts) + | SendWithRetryError::ResponseBody(attempts) => { telemetry.chunks_dropped_send_failure = chunks; telemetry.errors_network = 1; - telemetry.requests_count = *attempts as u64; - } - SendWithRetryError::ResponseBody(attempts) => { - telemetry.chunks_dropped_send_failure = chunks; - telemetry.errors_network = 1; - telemetry.requests_count = *attempts as u64; + telemetry.requests_count = u64::from(*attempts); } SendWithRetryError::Build(attempts) => { telemetry.chunks_dropped_serialization_error = chunks; - telemetry.requests_count = *attempts as u64; + telemetry.requests_count = u64::from(*attempts); } }, - }; + } telemetry } } @@ -253,6 +249,10 @@ impl TelemetryClient { /// # Arguments: /// /// * `telemetry_handle`: telemetry worker handle used to enqueue metrics. + #[allow( + clippy::cast_precision_loss, + reason = "metric counter values losing last few bits at >2^53 has no observable effect on dashboards" + )] pub fn send(&self, data: &SendPayloadTelemetry) -> Result<(), TelemetryError> { if data.requests_count > 0 { let key = self.metrics.get(metrics::MetricKind::ApiRequest); @@ -714,7 +714,7 @@ mod tests { responses_count_per_code: HashMap::from([(200, 1)]), ..Default::default() } - ) + ); } #[test] @@ -737,7 +737,7 @@ mod tests { responses_count_per_code: HashMap::from([(200, 1)]), ..Default::default() } - ) + ); } #[test] @@ -757,7 +757,7 @@ mod tests { responses_count_per_code: HashMap::from([(400, 1)]), ..Default::default() } - ) + ); } #[test] @@ -775,7 +775,7 @@ mod tests { errors_network: 1, ..Default::default() } - ) + ); } #[test] @@ -790,7 +790,7 @@ mod tests { errors_timeout: 1, ..Default::default() } - ) + ); } #[cfg_attr(miri, ignore)] @@ -805,7 +805,7 @@ mod tests { requests_count: 5, ..Default::default() } - ) + ); } #[test] @@ -834,7 +834,7 @@ mod tests { ..Default::default() }; - assert_eq!(SendPayloadTelemetry::from(&result), expected_telemetry) + assert_eq!(SendPayloadTelemetry::from(&result), expected_telemetry); } #[cfg_attr(miri, ignore)] diff --git a/libdd-data-pipeline/src/trace_buffer/mod.rs b/libdd-data-pipeline/src/trace_buffer/mod.rs index 09698eef18..66e21cc079 100644 --- a/libdd-data-pipeline/src/trace_buffer/mod.rs +++ b/libdd-data-pipeline/src/trace_buffer/mod.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 //! Trace buffer that batches trace chunks and periodically flushes them through a -//! [`TraceExporter`]. A background worker handles the actual export, allowing callers to +//! [`TraceExporter`]. +//! +//! A background worker handles the actual export, allowing callers to //! enqueue traces without blocking on network I/O (unless synchronous mode is enabled). use std::{ fmt::{self, Debug}, - ops::DerefMut, pin::Pin, sync::{Arc, Condvar, Mutex, MutexGuard}, time::{Duration, Instant}, @@ -82,8 +83,7 @@ where match v { AttributeArrayValue::String(s) => s.as_ref().len(), AttributeArrayValue::Boolean(_) => 1, - AttributeArrayValue::Integer(_) => 8, - AttributeArrayValue::Double(_) => 8, + AttributeArrayValue::Integer(_) | AttributeArrayValue::Double(_) => 8, } } @@ -97,24 +97,27 @@ pub struct TraceBufferConfig { } impl TraceBufferConfig { + #[must_use] pub fn new() -> Self { Self::default() } /// Whether the async exporter waits for the trace chunks to be exported before returning from - /// export_chunk - pub fn synchronous_export(self, synchronous_writes: bool) -> Self { + /// `export_chunk` + #[must_use] + pub const fn synchronous_export(self, synchronous_writes: bool) -> Self { Self { synchronous_export: synchronous_writes, ..self } } - /// The maximum amount of time the export_chunk waits for a flush if synchronous_writes is - /// enabled. If this is zero send_chunk will always return an error + /// The maximum amount of time the `export_chunk` waits for a flush if `synchronous_writes` is + /// enabled. If this is zero `send_chunk` will always return an error /// /// If it is None, the send will wait forever - pub fn synchronous_export_timeout(self, timeout: Option) -> Self { + #[must_use] + pub const fn synchronous_export_timeout(self, timeout: Option) -> Self { Self { synchronous_export_timeout: timeout, ..self @@ -122,7 +125,8 @@ impl TraceBufferConfig { } /// The maximum amount of time between two flushes - pub fn max_flush_interval(self, interval: Duration) -> Self { + #[must_use] + pub const fn max_flush_interval(self, interval: Duration) -> Self { Self { max_flush_interval: interval, ..self @@ -130,7 +134,8 @@ impl TraceBufferConfig { } /// The maximum number of bytes that will be buffered before we drop data - pub fn max_buffered_bytes(self, max: usize) -> Self { + #[must_use] + pub const fn max_buffered_bytes(self, max: usize) -> Self { Self { max_buffered_bytes: max, ..self @@ -138,7 +143,8 @@ impl TraceBufferConfig { } /// The number of bytes that will be buffered before we decide to flush - pub fn flush_threshold_bytes(self, threshold: usize) -> Self { + #[must_use] + pub const fn flush_threshold_bytes(self, threshold: usize) -> Self { Self { flush_threshold_bytes: threshold, ..self @@ -189,7 +195,7 @@ struct Batch { last_flush: Instant, byte_count: usize, max_buffered_bytes: usize, - batch_gen: BatchGeneration, + generation: BatchGeneration, } // Pre-allocate the batch buffer to avoid reallocations on small sizes. @@ -198,13 +204,13 @@ const PRE_ALLOCATE_CHUNKS: usize = 400; impl Batch { fn new(max_buffered_bytes: usize) -> Self { - let mut batch_gen = BatchGeneration::default(); - batch_gen.incr(); + let mut generation = BatchGeneration::default(); + generation.incr(); Self { chunks: Vec::with_capacity(PRE_ALLOCATE_CHUNKS), last_flush: Instant::now(), byte_count: 0, - batch_gen, + generation, max_buffered_bytes, } } @@ -214,17 +220,17 @@ impl Batch { chunks, last_flush, byte_count, - batch_gen, + generation, max_buffered_bytes: _max_buffered_bytes, } = self; chunks.clear(); *last_flush = Instant::now(); *byte_count = 0; - *batch_gen = { - let mut batch_gen = BatchGeneration::default(); - batch_gen.incr(); - batch_gen + *generation = { + let mut generation = BatchGeneration::default(); + generation.incr(); + generation }; } @@ -248,7 +254,7 @@ impl Batch { return Ok(()); } - self.byte_count += chunk.iter().map(|s| s.byte_size()).sum::(); + self.byte_count += chunk.iter().map(BufferSize::byte_size).sum::(); self.chunks.push(chunk); Ok(()) } @@ -259,16 +265,16 @@ impl Batch { self.byte_count = 0; self.last_flush = Instant::now(); if !chunks.is_empty() { - self.batch_gen.incr(); + self.generation.incr(); } chunks } } -/// # TraceBuffer +/// # `TraceBuffer` /// -/// Creating an instance of the TraceBuffer will spawn a background task that -/// periodically sends trace chunks through the TraceExporter +/// Creating an instance of the `TraceBuffer` will spawn a background task that +/// periodically sends trace chunks through the `TraceExporter` /// /// # Buffering behavior /// @@ -291,8 +297,8 @@ pub struct TraceBuffer { /// /// Each batch in the queue will get a generation associated. Generations are strictly /// incremental and processed in order by the background thread. - /// When the background thread processes a batch it will increment it's 'last_flushed_batch' - /// and an export can wait until the 'last_flushed_batch' is equal to the batch it added it's + /// When the background thread processes a batch it will increment it's '`last_flushed_batch`' + /// and an export can wait until the '`last_flushed_batch`' is equal to the batch it added it's /// trace chunks to. synchronous_export: bool, synchronous_export_timeout: Option, @@ -301,6 +307,7 @@ pub struct TraceBuffer { pub type ResponseHandler = Box) + Send + Sync>; impl TraceBuffer { + #[must_use] pub fn new( config: TraceBufferConfig, response_handler: ResponseHandler, @@ -322,6 +329,10 @@ impl TraceBuffer { ) } + #[allow( + clippy::significant_drop_tightening, + reason = "guard must be held across the condvar wait; early drop would be incorrect" + )] pub fn send_chunk(&self, trace_chunk: Vec) -> Result<(), TraceBufferError> { if trace_chunk.is_empty() { return Ok(()); @@ -343,12 +354,17 @@ impl TraceBuffer { self.tx.trigger_flush() } + #[must_use] pub fn queue_metrics(&self) -> QueueMetricsFetcher { QueueMetricsFetcher { waiter: self.tx.waiter.clone(), } } + #[allow( + clippy::significant_drop_tightening, + reason = "guard must be held across the condvar wait; early drop would be incorrect" + )] pub fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), TraceBufferError> { self.tx.wait_shutdown_done(timeout) } @@ -365,6 +381,7 @@ pub struct QueueMetricsFetcher { } impl QueueMetricsFetcher { + #[must_use] pub fn get_metrics(&self) -> QueueMetrics { let Some(mut state) = self.waiter.state.lock().ok() else { return QueueMetrics::default(); @@ -412,6 +429,10 @@ struct Sender { } impl Sender { + #[allow( + clippy::significant_drop_tightening, + reason = "guard must be held across the condvar wait; early drop would be incorrect" + )] fn wait_flush_done( &self, flush_gen: BatchGeneration, @@ -426,21 +447,23 @@ impl Sender { return Err(TraceBufferError::TimedOut(Duration::ZERO)); } let state = self.lock_state()?; - let (_state, res) = self + let (state, res) = self .waiter .sender_notifier .wait_timeout_while(state, timeout, cond) .map_err(|_| TraceBufferError::MutexPoisoned)?; + drop(state); if res.timed_out() { return Err(TraceBufferError::TimedOut(timeout)); } } else { let state = self.lock_state()?; - let _state = self + let state = self .waiter .sender_notifier .wait_while(state, cond) .map_err(|_| TraceBufferError::MutexPoisoned)?; + drop(state); } Ok(()) } @@ -471,16 +494,19 @@ impl Sender { return Err(TraceBufferError::BatchFull(e)); } state.metrics.spans_queued += chunk_len; - let gen = state.batch.batch_gen; + let gen = state.batch.generation; if !state.flush_needed && (state.batch.byte_count > self.flush_trigger_bytes || self.synchronous_write) { state.flush_needed = true; self.waiter.notify_receiver(state); + } else { + drop(state); } Ok(gen) } + #[allow(clippy::significant_drop_tightening)] // notify_receiver consumes the guard by value fn trigger_flush(&self) -> Result<(), TraceBufferError> { let mut state = self.get_running_state()?; state.flush_needed = true; @@ -488,16 +514,21 @@ impl Sender { Ok(()) } + #[allow( + clippy::significant_drop_tightening, + reason = "guard must be held across the condvar wait; early drop would be incorrect" + )] fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), TraceBufferError> { if timeout.is_zero() { return Err(TraceBufferError::TimedOut(Duration::ZERO)); } let state = self.lock_state()?; - let (_state, res) = self + let (state, res) = self .waiter .sender_notifier .wait_timeout_while(state, timeout, |state| !state.has_shutdown) .map_err(|_| TraceBufferError::MutexPoisoned)?; + drop(state); if res.timed_out() { return Err(TraceBufferError::TimedOut(timeout)); } @@ -514,6 +545,7 @@ impl Receiver { self.waiter.state.lock().map_err(|_| MutexPoisonedError) } + #[allow(clippy::significant_drop_tightening)] // notify_sender consumes the guard by value fn shutdown_done(&self) -> Result<(), MutexPoisonedError> { let mut state = self.lock_state()?; state.has_shutdown = true; @@ -529,15 +561,20 @@ impl Receiver { has_shutdown, batch, metrics, - } = state.deref_mut(); + } = &mut *state; *flush_needed = false; *last_flush_generation = BatchGeneration::default(); *has_shutdown = false; batch.reset(); *metrics = QueueMetrics::default(); + drop(state); Ok(()) } + #[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" + )] async fn receive(&self, timeout: Duration) -> Result>, MutexPoisonedError> { loop { // Enable the notify future BEFORE acquiring the lock to avoid lost wakeups: @@ -563,8 +600,8 @@ impl Receiver { tokio::select! { biased; - _ = notified.as_mut() => {} // woken by sender; loop to re-check state - _ = tokio::time::sleep(leftover) => { + () = notified.as_mut() => {} // woken by sender; loop to re-check state + () = tokio::time::sleep(leftover) => { let mut state = self.lock_state()?; return Ok(state.batch.export()); } @@ -572,6 +609,7 @@ impl Receiver { } } + #[allow(clippy::significant_drop_tightening)] // notify_sender consumes the guard by value fn ack_export(&self) -> Result<(), MutexPoisonedError> { let mut state = self.lock_state()?; state.last_flush_generation.incr(); @@ -584,7 +622,7 @@ impl Receiver { struct BatchGeneration(u64); impl BatchGeneration { - fn incr(&mut self) { + const fn incr(&mut self) { self.0 = self.0.wrapping_add(1); } } @@ -645,7 +683,7 @@ pub struct DefaultExport DefaultExport { - pub fn new(trace_exporter: TraceExporter) -> Self { + pub const fn new(trace_exporter: TraceExporter) -> Self { Self { trace_exporter } } } @@ -699,7 +737,7 @@ impl std::fmt::Debug for TraceExporterWorker { .field("export_operation", &self.export_operation) .field("config", &self.config) .field("run_input", &self.run_input) - .finish() + .finish_non_exhaustive() } } @@ -738,7 +776,7 @@ impl Worker for TraceExporterWorker { }; if !trace_chunks.is_empty() { self.export_trace_chunks(trace_chunks).await; - if let Err(MutexPoisonedError) = self.rx.ack_export() {} + if matches!(self.rx.ack_export(), Err(MutexPoisonedError)) {} } } @@ -748,7 +786,7 @@ impl Worker for TraceExporterWorker { #[allow(clippy::unwrap_used)] self.export_operation.wait_ready().await.unwrap(); } - self.trigger().await + self.trigger().await; } async fn trigger(&mut self) { @@ -849,13 +887,13 @@ mod tests { Box::new(|chunks| { assert_eq!(chunks.len(), 2); let mut lengths = chunks.into_iter().map(|c| c.len()).collect::>(); - lengths.sort(); + lengths.sort_unstable(); assert_eq!(lengths, &[1, 2]); }), TraceBufferConfig::default() .max_buffered_bytes(4) .flush_threshold_bytes(2) - .max_flush_interval(Duration::from_secs(u32::MAX as u64)), + .max_flush_interval(Duration::from_secs(u64::from(u32::MAX))), ); std::thread::scope(|s| { @@ -884,7 +922,7 @@ mod tests { TraceBufferConfig::default() .max_buffered_bytes(4) .flush_threshold_bytes(3) - .max_flush_interval(Duration::from_secs(u32::MAX as u64)), + .max_flush_interval(Duration::from_secs(u64::from(u32::MAX))), ); // pause @@ -980,7 +1018,7 @@ mod tests { TraceBufferConfig::default() .max_buffered_bytes(100) .flush_threshold_bytes(100) - .max_flush_interval(Duration::from_secs(u32::MAX as u64)), + .max_flush_interval(Duration::from_secs(u64::from(u32::MAX))), ); sender.send_chunk(vec![()]).unwrap(); diff --git a/libdd-data-pipeline/src/trace_exporter/agent_response.rs b/libdd-data-pipeline/src/trace_exporter/agent_response.rs index 02b4ae3fa6..89c235150d 100644 --- a/libdd-data-pipeline/src/trace_exporter/agent_response.rs +++ b/libdd-data-pipeline/src/trace_exporter/agent_response.rs @@ -10,7 +10,7 @@ pub const DATADOG_RATES_PAYLOAD_VERSION: HeaderName = HeaderName::from_static("datadog-rates-payload-version"); /// `AgentResponse` structure holds agent response information upon successful request. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub enum AgentResponse { Unchanged, Changed { body: String }, @@ -23,7 +23,7 @@ pub(crate) struct AgentResponsePayloadVersion { impl AgentResponsePayloadVersion { pub fn new() -> Self { - AgentResponsePayloadVersion { + Self { payload_version: ArcSwap::new(Arc::new("0".to_string())), } } diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index ecc8dad0f8..a57e7c062a 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -28,6 +28,10 @@ use std::time::Duration; const DEFAULT_AGENT_URL: &str = "http://127.0.0.1:8126"; #[allow(missing_docs)] +#[allow( + clippy::struct_excessive_bools, + reason = "builder flags are independent feature toggles; enum refactor would break the API" +)] #[derive(Debug, Default)] pub struct TraceExporterBuilder { url: Option, @@ -162,26 +166,29 @@ impl TraceExporterBuilder { } #[allow(missing_docs)] - pub fn set_input_format(&mut self, input_format: TraceExporterInputFormat) -> &mut Self { + pub const fn set_input_format(&mut self, input_format: TraceExporterInputFormat) -> &mut Self { self.input_format = input_format; self } #[allow(missing_docs)] - pub fn set_output_format(&mut self, output_format: TraceExporterOutputFormat) -> &mut Self { + pub const fn set_output_format( + &mut self, + output_format: TraceExporterOutputFormat, + ) -> &mut Self { self.output_format = output_format; self } /// Set the header indicating the tracer has computed the top-level tag - pub fn set_client_computed_top_level(&mut self) -> &mut Self { + pub const fn set_client_computed_top_level(&mut self) -> &mut Self { self.client_computed_top_level = true; self } /// Set the header indicating the tracer has already computed stats. /// This should not be used when stats computation is enabled. - pub fn set_client_computed_stats(&mut self) -> &mut Self { + pub const fn set_client_computed_stats(&mut self) -> &mut Self { self.client_computed_stats = true; self } @@ -193,7 +200,7 @@ impl TraceExporterBuilder { } /// Enable stats computation on traces sent through this exporter - pub fn enable_stats(&mut self, bucket_size: Duration) -> &mut Self { + pub const fn enable_stats(&mut self, bucket_size: Duration) -> &mut Self { self.stats_bucket_size = Some(bucket_size); self } @@ -208,7 +215,7 @@ impl TraceExporterBuilder { /// Enable stats eligibility by span kind (requires stats computation to be /// enabled) - pub fn enable_compute_stats_by_span_kind(&mut self) -> &mut Self { + pub const fn enable_compute_stats_by_span_kind(&mut self) -> &mut Self { self.compute_stats_by_span_kind = true; self } @@ -219,7 +226,7 @@ impl TraceExporterBuilder { /// `obfuscation_version` via the `/info` endpoint. When disabled, no /// `datadog-obfuscation-version` header is sent on stats payloads. #[cfg(feature = "stats-obfuscation")] - pub fn enable_client_side_stats_obfuscation(&mut self) -> &mut Self { + pub const fn enable_client_side_stats_obfuscation(&mut self) -> &mut Self { self.client_side_stats_obfuscation_enabled = true; self } @@ -247,19 +254,19 @@ impl TraceExporterBuilder { } /// Enables health metrics emission. - pub fn enable_health_metrics(&mut self) -> &mut Self { + pub const fn enable_health_metrics(&mut self) -> &mut Self { self.health_metrics_enabled = true; self } /// Enables storing and checking the agent payload - pub fn enable_agent_rates_payload_version(&mut self) -> &mut Self { + pub const fn enable_agent_rates_payload_version(&mut self) -> &mut Self { self.agent_rates_payload_version_enabled = true; self } /// Sets the agent's connection timeout. - pub fn set_connection_timeout(&mut self, timeout_ms: Option) -> &mut Self { + pub const fn set_connection_timeout(&mut self, timeout_ms: Option) -> &mut Self { self.connection_timeout = timeout_ms; self } @@ -287,6 +294,11 @@ impl TraceExporterBuilder { } #[allow(missing_docs)] + #[allow(clippy::items_after_statements)] + #[allow( + clippy::too_many_lines, + reason = "linear wiring function; splitting adds helpers with no independent meaning" + )] pub fn build( self, ) -> Result, TraceExporterError> { @@ -359,6 +371,10 @@ impl TraceExporterBuilder { let _ = info_fetcher_handle; #[allow(unused_mut)] + #[allow( + clippy::useless_let_if_seq, + reason = "cfg guard on the if-block prevents the if-let-else rewrite clippy wants" + )] let mut stats = StatsComputationStatus::Disabled; #[cfg(not(target_arch = "wasm32"))] if let Some(bucket_size) = self.stats_bucket_size { @@ -432,8 +448,7 @@ impl TraceExporterBuilder { headers, timeout: self .connection_timeout - .map(Duration::from_millis) - .unwrap_or(DEFAULT_OTLP_TIMEOUT), + .map_or(DEFAULT_OTLP_TIMEOUT, Duration::from_millis), protocol: OtlpProtocol::HttpJson, } }); @@ -441,10 +456,10 @@ impl TraceExporterBuilder { Ok(TraceExporter { endpoint: Endpoint { url: agent_url, - test_token: self.test_session_token.map(|token| token.into()), + test_token: self.test_session_token.map(std::convert::Into::into), timeout_ms: self .connection_timeout - .unwrap_or(Endpoint::default().timeout_ms), + .unwrap_or_else(|| Endpoint::default().timeout_ms), ..Default::default() }, metadata: TracerMetadata { @@ -498,7 +513,7 @@ impl TraceExporterBuilder { }) } - fn is_inputs_outputs_formats_compatible( + const fn is_inputs_outputs_formats_compatible( input: TraceExporterInputFormat, output: TraceExporterOutputFormat, ) -> bool { diff --git a/libdd-data-pipeline/src/trace_exporter/error.rs b/libdd-data-pipeline/src/trace_exporter/error.rs index 1e83445da5..ca9bc857af 100644 --- a/libdd-data-pipeline/src/trace_exporter/error.rs +++ b/libdd-data-pipeline/src/trace_exporter/error.rs @@ -11,7 +11,7 @@ use std::error::Error; use std::fmt::{Debug, Display}; /// Represents different kinds of errors that can occur when interacting with the agent. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub enum AgentErrorKind { /// Indicates that the agent returned an empty response. EmptyResponse, @@ -20,13 +20,13 @@ pub enum AgentErrorKind { impl Display for AgentErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - AgentErrorKind::EmptyResponse => write!(f, "Agent empty response"), + Self::EmptyResponse => write!(f, "Agent empty response"), } } } /// Represents different kinds of errors that can occur during the builder process. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub enum BuilderErrorKind { /// Represents an error when an invalid URI is provided. /// The associated `String` contains underlying error message. @@ -40,11 +40,11 @@ pub enum BuilderErrorKind { impl Display for BuilderErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BuilderErrorKind::InvalidUri(msg) => write!(f, "Invalid URI: {msg}"), - BuilderErrorKind::InvalidTelemetryConfig(msg) => { + Self::InvalidUri(msg) => write!(f, "Invalid URI: {msg}"), + Self::InvalidTelemetryConfig(msg) => { write!(f, "Invalid telemetry configuration: {msg}") } - BuilderErrorKind::InvalidConfiguration(msg) => { + Self::InvalidConfiguration(msg) => { write!(f, "Invalid configuration: {msg}") } } @@ -52,7 +52,7 @@ impl Display for BuilderErrorKind { } /// Represents different kinds of internal errors. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub enum InternalErrorKind { /// Indicates that some background workers are in an invalid state. The associated `String` /// contains the error message. @@ -62,7 +62,7 @@ pub enum InternalErrorKind { impl Display for InternalErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - InternalErrorKind::InvalidWorkerState(msg) => { + Self::InvalidWorkerState(msg) => { write!(f, "Invalid worker state: {msg}") } } @@ -111,7 +111,8 @@ impl NetworkError { } } - pub fn kind(&self) -> NetworkErrorKind { + #[must_use] + pub const fn kind(&self) -> NetworkErrorKind { self.kind } } @@ -123,7 +124,7 @@ impl Display for NetworkError { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct RequestError { code: StatusCode, msg: String, @@ -140,6 +141,7 @@ impl Display for RequestError { } impl RequestError { + #[must_use] pub fn new(code: StatusCode, msg: &str) -> Self { Self { code, @@ -147,10 +149,12 @@ impl RequestError { } } - pub fn status(&self) -> StatusCode { + #[must_use] + pub const fn status(&self) -> StatusCode { self.code } + #[must_use] pub fn msg(&self) -> &str { &self.msg } @@ -164,14 +168,14 @@ pub enum ShutdownError { impl Display for ShutdownError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ShutdownError::TimedOut(dur) => { + Self::TimedOut(dur) => { write!(f, "Shutdown timed out after {}s", dur.as_secs_f32()) } } } } -/// TraceExporterError holds different types of errors that occur when handling traces. +/// `TraceExporterError` holds different types of errors that occur when handling traces. #[derive(Debug)] pub enum TraceExporterError { /// Error in agent response processing. @@ -199,18 +203,18 @@ pub enum TraceExporterError { impl Display for TraceExporterError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - TraceExporterError::Agent(e) => write!(f, "Agent response processing: {e}"), - TraceExporterError::Builder(e) => write!(f, "Invalid builder input: {e}"), - TraceExporterError::Internal(e) => write!(f, "Internal: {e}"), - TraceExporterError::Deserialization(e) => { + Self::Agent(e) => write!(f, "Agent response processing: {e}"), + Self::Builder(e) => write!(f, "Invalid builder input: {e}"), + Self::Internal(e) => write!(f, "Internal: {e}"), + Self::Deserialization(e) => { write!(f, "Deserialization of incoming payload: {e}") } - TraceExporterError::Io(e) => write!(f, "IO: {e}"), - TraceExporterError::Shutdown(e) => write!(f, "Shutdown: {e}"), - TraceExporterError::Telemetry(e) => write!(f, "Telemetry: {e}"), - TraceExporterError::Network(e) => write!(f, "Network: {e}"), - TraceExporterError::Request(e) => write!(f, "Agent responded with an error code: {e}"), - TraceExporterError::Serialization(e) => { + Self::Io(e) => write!(f, "IO: {e}"), + Self::Shutdown(e) => write!(f, "Shutdown: {e}"), + Self::Telemetry(e) => write!(f, "Telemetry: {e}"), + Self::Network(e) => write!(f, "Network: {e}"), + Self::Request(e) => write!(f, "Agent responded with an error code: {e}"), + Self::Serialization(e) => { write!(f, "Serialization of trace payload payload: {e}") } } @@ -219,21 +223,21 @@ impl Display for TraceExporterError { impl From for TraceExporterError { fn from(value: EncodeError) -> Self { - TraceExporterError::Serialization(value) + Self::Serialization(value) } } impl From for TraceExporterError { fn from(value: http::uri::InvalidUri) -> Self { - TraceExporterError::Builder(BuilderErrorKind::InvalidUri(value.to_string())) + Self::Builder(BuilderErrorKind::InvalidUri(value.to_string())) } } impl From for TraceExporterError { fn from(err: http_common::Error) -> Self { match err { - http_common::Error::Client(e) => TraceExporterError::from(e), - http_common::Error::Other(e) => TraceExporterError::Network(NetworkError { + http_common::Error::Client(e) => Self::from(e), + http_common::Error::Other(e) => Self::Network(NetworkError { kind: NetworkErrorKind::Unknown, source: e, }), @@ -254,25 +258,25 @@ impl From for TraceExporterError { ErrorKind::Timeout => NetworkErrorKind::TimedOut, ErrorKind::Other => NetworkErrorKind::Unknown, }; - TraceExporterError::Network(NetworkError::new(network_kind, err)) + Self::Network(NetworkError::new(network_kind, err)) } } impl From for TraceExporterError { fn from(err: DecodeError) -> Self { - TraceExporterError::Deserialization(err) + Self::Deserialization(err) } } impl From for TraceExporterError { fn from(err: std::io::Error) -> Self { - TraceExporterError::Io(err) + Self::Io(err) } } impl From for TraceExporterError { fn from(err: http::Error) -> Self { - TraceExporterError::Network(NetworkError { + Self::Network(NetworkError { kind: NetworkErrorKind::Parse, source: err.into(), }) @@ -281,7 +285,7 @@ impl From for TraceExporterError { impl From for TraceExporterError { fn from(err: libdd_capabilities::HttpError) -> Self { - TraceExporterError::Network(NetworkError { + Self::Network(NetworkError { kind: match &err { libdd_capabilities::HttpError::Timeout => NetworkErrorKind::TimedOut, libdd_capabilities::HttpError::Network(_) => NetworkErrorKind::ConnectionClosed, @@ -299,9 +303,9 @@ impl From for TraceExporterError { fn from(value: TelemetryError) -> Self { match value { TelemetryError::Builder(e) => { - TraceExporterError::Builder(BuilderErrorKind::InvalidTelemetryConfig(e)) + Self::Builder(BuilderErrorKind::InvalidTelemetryConfig(e)) } - TelemetryError::Send(e) => TraceExporterError::Telemetry(e), + TelemetryError::Send(e) => Self::Telemetry(e), } } } @@ -316,6 +320,6 @@ mod tests { fn test_request_error() { let error = RequestError::new(StatusCode::NOT_FOUND, "Not found"); assert_eq!(error.status(), StatusCode::NOT_FOUND); - assert_eq!(error.msg(), "Not found") + assert_eq!(error.msg(), "Not found"); } } diff --git a/libdd-data-pipeline/src/trace_exporter/metrics.rs b/libdd-data-pipeline/src/trace_exporter/metrics.rs index 776ddee598..01774b7969 100644 --- a/libdd-data-pipeline/src/trace_exporter/metrics.rs +++ b/libdd-data-pipeline/src/trace_exporter/metrics.rs @@ -7,7 +7,7 @@ use libdd_common::tag::Tag; use libdd_dogstatsd_client::{Client, DogStatsDAction}; use tracing::debug; -/// Handles emission of health metrics to DogStatsD +/// Handles emission of health metrics to `DogStatsD` #[derive(Debug)] pub(crate) struct MetricsEmitter<'a> { dogstatsd: Option<&'a Client>, @@ -15,8 +15,8 @@ pub(crate) struct MetricsEmitter<'a> { } impl<'a> MetricsEmitter<'a> { - /// Create a new MetricsEmitter - pub(crate) fn new(dogstatsd: Option<&'a Client>, common_tags: &'a [Tag]) -> Self { + /// Create a new `MetricsEmitter` + pub(crate) const fn new(dogstatsd: Option<&'a Client>, common_tags: &'a [Tag]) -> Self { Self { dogstatsd, common_tags, @@ -24,6 +24,10 @@ impl<'a> MetricsEmitter<'a> { } /// Emit a health metric to dogstatsd + #[allow( + clippy::cast_precision_loss, + reason = "metric values losing mantissa precision at >2^53 has no observable effect" + )] pub(crate) fn emit(&self, metric: HealthMetric, custom_tags: Option>) { let has_custom_tags = custom_tags.is_some(); if let Some(flusher) = self.dogstatsd { @@ -39,7 +43,7 @@ impl<'a> MetricsEmitter<'a> { has_custom_tags = has_custom_tags, "Emitting health metric to dogstatsd" ); - flusher.send(vec![DogStatsDAction::Count(name, c, tags.into_iter())]) + flusher.send(vec![DogStatsDAction::Count(name, c, tags.into_iter())]); } HealthMetric::Distribution(name, value) => { debug!( @@ -52,7 +56,7 @@ impl<'a> MetricsEmitter<'a> { name, value as f64, tags.into_iter(), - )]) + )]); } } } else { @@ -63,9 +67,9 @@ impl<'a> MetricsEmitter<'a> { } } - /// Emit all health metrics from a SendResult + /// Emit all health metrics from a `SendResult` /// - /// This method processes the SendResult and emits all appropriate metrics + /// This method processes the `SendResult` and emits all appropriate metrics /// based on the operation's outcome (success/failure, error type, etc.) pub(crate) fn emit_from_send_result(&self, result: &SendResult) { for (metric, type_tag_value) in result.collect_metrics() { diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 561bc56e88..506f867be8 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -63,9 +63,9 @@ pub struct TelemetryInstrumentationSessions { pub parent_session_id: Option, } -/// TraceExporterInputFormat represents the format of the input traces. +/// `TraceExporterInputFormat` represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. -#[derive(Copy, Clone, Debug, Default, PartialEq)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] #[repr(C)] pub enum TraceExporterInputFormat { #[allow(missing_docs)] @@ -74,9 +74,9 @@ pub enum TraceExporterInputFormat { V05, } -/// TraceExporterOutputFormat represents the format of the output traces. +/// `TraceExporterOutputFormat` represents the format of the output traces. /// The output format can be either V0.4 or v0.5, where V0.4 is the default. -#[derive(Copy, Clone, Debug, Default, PartialEq)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] #[repr(C)] pub enum TraceExporterOutputFormat { #[allow(missing_docs)] @@ -87,12 +87,12 @@ pub enum TraceExporterOutputFormat { impl TraceExporterOutputFormat { /// Add the agent trace endpoint path to the URL. - fn add_path(&self, url: &Uri) -> Uri { + fn add_path(self, url: &Uri) -> Uri { add_path( url, match self { - TraceExporterOutputFormat::V04 => "/v0.4/traces", - TraceExporterOutputFormat::V05 => "/v0.5/traces", + Self::V04 => "/v0.4/traces", + Self::V05 => "/v0.5/traces", }, ) } @@ -146,7 +146,7 @@ pub struct TracerMetadata { } impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> { - fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> { + fn from(tags: &'a TracerMetadata) -> Self { TracerHeaderTags::<'_> { lang: &tags.language, lang_version: &tags.language_version, @@ -161,7 +161,7 @@ impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> { } impl<'a> From<&'a TracerMetadata> for HeaderMap { - fn from(tags: &'a TracerMetadata) -> HeaderMap { + fn from(tags: &'a TracerMetadata) -> Self { TracerHeaderTags::from(tags).into() } } @@ -175,7 +175,7 @@ pub(crate) struct TraceExporterWorkers { telemetry: Option, } -/// The TraceExporter ingest traces from the tracers serialized as messagepack and forward them to +/// The `TraceExporter` ingest traces from the tracers serialized as messagepack and forward them to /// the agent while applying some transformation. /// /// # Proxy @@ -183,8 +183,9 @@ pub(crate) struct TraceExporterWorkers { /// deserializing them. /// /// # Features -/// When the input format is set to `V04` the TraceExporter will deserialize the traces and perform -/// some operation before sending them to the agent. The available operations are described below. +/// When the input format is set to `V04` the `TraceExporter` will deserialize the traces and +/// perform some operation before sending them to the agent. The available operations are described +/// below. /// /// ## V07 Serialization /// The Trace exporter can serialize the traces to V07 before sending them to the agent. @@ -192,8 +193,9 @@ pub(crate) struct TraceExporterWorkers { /// ## Stats computation /// The Trace Exporter can compute stats on traces. In this case the trace exporter will start /// another task to send stats when a time bucket expire. When this feature is enabled the -/// TraceExporter drops all spans that may not be sampled by the agent. +/// `TraceExporter` drops all spans that may not be sampled by the agent. #[allow(missing_docs)] +#[derive(Copy, Clone)] enum DeserInputFormat { V04, V05, @@ -202,8 +204,8 @@ enum DeserInputFormat { impl From for DeserInputFormat { fn from(f: TraceExporterInputFormat) -> Self { match f { - TraceExporterInputFormat::V04 => DeserInputFormat::V04, - TraceExporterInputFormat::V05 => DeserInputFormat::V05, + TraceExporterInputFormat::V04 => Self::V04, + TraceExporterInputFormat::V05 => Self::V05, } } } @@ -240,6 +242,7 @@ pub struct TraceExporter TraceExporter { #[allow(missing_docs)] + #[must_use] pub fn builder() -> TraceExporterBuilder { TraceExporterBuilder::default() } @@ -311,12 +314,16 @@ impl Tra /// # Arguments /// /// * data: A slice containing the serialized traces. This slice should be encoded following the - /// input_format passed to the TraceExporter on creating. + /// `input_format` passed to the `TraceExporter` on creating. /// /// # Returns /// * Ok(AgentResponse): The response from the agent /// * Err(TraceExporterError): An error detailing what went wrong in the process #[cfg(not(target_arch = "wasm32"))] + #[allow( + clippy::cast_possible_wrap, + reason = "trace counts never realistically exceed i64::MAX" + )] pub fn send(&self, data: &[u8]) -> Result { self.check_agent_info(); @@ -353,6 +360,10 @@ impl Tra trace_count = traces.len(), "Trace deserialization completed successfully" ); + #[allow( + clippy::cast_possible_wrap, + reason = "trace counts never realistically exceed i64::MAX" + )] self.emit_metric( HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64), None, @@ -375,7 +386,7 @@ impl Tra .previous_info_state .load() .as_deref() - .map(|s| s.as_str()) + .map(std::string::String::as_str) } #[cfg(not(target_arch = "wasm32"))] @@ -414,7 +425,7 @@ impl Tra } } self.previous_info_state - .store(Some(agent_info.state_hash.clone().into())) + .store(Some(agent_info.state_hash.clone().into())); } } } @@ -427,7 +438,7 @@ impl Tra /// !!! This function is only for testing purposes !!! /// - /// Waits the agent info to be ready by checking the agent_info state. + /// Waits the agent info to be ready by checking the `agent_info` state. /// It will only return Ok after the agent info has been fetched at least once or Err if timeout /// has been reached /// @@ -436,9 +447,9 @@ impl Tra /// 2) It's not guaranteed to not block forever, since the /info endpoint might not be /// available. /// - /// The `send` function will check agent_info when running, which will only be available if the - /// fetcher had time to reach to the agent. - /// Since agent_info can enable CSS computation, waiting for this during testing can make + /// The `send` function will check `agent_info` when running, which will only be available if + /// the fetcher had time to reach to the agent. + /// Since `agent_info` can enable CSS computation, waiting for this during testing can make /// snapshots non-deterministic. #[cfg(feature = "test-utils")] pub async fn wait_agent_info_ready(&self, timeout: Duration) -> anyhow::Result<()> { @@ -462,7 +473,7 @@ impl Tra } } - /// Emit all health metrics from a SendResult + /// Emit all health metrics from a `SendResult` fn emit_send_result(&self, result: &SendResult) { if self.health_metrics_enabled { let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags); @@ -473,7 +484,7 @@ impl Tra /// Send a list of trace chunks to the agent (or OTLP endpoint when configured). /// /// # Arguments - /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. + /// * `trace_chunks`: A list of trace chunks. Each trace chunk is a list of spans. /// /// # Returns /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) @@ -491,11 +502,15 @@ impl Tra /// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured). /// /// # Arguments - /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. + /// * `trace_chunks`: A list of trace chunks. Each trace chunk is a list of spans. /// /// # Returns /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process + #[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" + )] pub async fn send_trace_chunks_async( &self, trace_chunks: Vec>>, @@ -505,6 +520,10 @@ impl Tra } /// Sends trace chunks via OTLP HTTP/JSON when OTLP config is enabled. + #[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" + )] async fn send_otlp_traces_inner( &self, traces: Vec>>, @@ -512,12 +531,12 @@ impl Tra ) -> Result { let resource_info = { let mut r = OtlpResourceInfo::default(); - r.service = self.metadata.service.clone(); - r.env = self.metadata.env.clone(); - r.app_version = self.metadata.app_version.clone(); - r.language = self.metadata.language.clone(); - r.tracer_version = self.metadata.tracer_version.clone(); - r.runtime_id = self.metadata.runtime_id.clone(); + r.service.clone_from(&self.metadata.service); + r.env.clone_from(&self.metadata.env); + r.app_version.clone_from(&self.metadata.app_version); + r.language.clone_from(&self.metadata.language); + r.tracer_version.clone_from(&self.metadata.tracer_version); + r.runtime_id.clone_from(&self.metadata.runtime_id); r }; let request = map_traces_to_otlp(traces, &resource_info); @@ -537,6 +556,10 @@ impl Tra /// Deserializes, processes and sends trace chunks to the agent #[cfg(not(target_arch = "wasm32"))] + #[allow( + clippy::cast_possible_wrap, + reason = "trace counts never realistically exceed i64::MAX" + )] fn send_deser( &self, data: &[u8], @@ -601,14 +624,18 @@ impl Tra } } - self.handle_send_result(result, chunks, payload_len).await + self.handle_send_result(result, chunks, payload_len) } + #[allow( + clippy::future_not_send, + reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0" + )] async fn send_trace_chunks_inner( &self, mut traces: Vec>>, ) -> Result { - let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); + let mut header_tags: TracerHeaderTags<'_> = self.metadata.borrow().into(); // Process stats computation and drop non-sampled (p0) chunks. // This must run before the OTLP path so that unsampled spans are not exported. @@ -662,7 +689,7 @@ impl Tra } /// Handle the result of sending traces to the agent - async fn handle_send_result( + fn handle_send_result( &self, result: SendWithRetryResult, chunks: usize, @@ -670,15 +697,14 @@ impl Tra ) -> Result { match result { Ok((response, attempts)) => { - self.handle_agent_response(chunks, response, payload_len, attempts) - .await + self.handle_agent_response(chunks, &response, payload_len, attempts) } - Err(err) => self.handle_send_error(err, payload_len, chunks).await, + Err(err) => self.handle_send_error(err, payload_len, chunks), } } /// Handle errors from send with retry operation - async fn handle_send_error( + fn handle_send_error( &self, err: SendWithRetryError, payload_len: usize, @@ -688,8 +714,7 @@ impl Tra match err { SendWithRetryError::Http(response, attempts) => { - self.handle_http_send_error(response, payload_len, chunks, attempts) - .await + self.handle_http_send_error(&response, payload_len, chunks, attempts) } SendWithRetryError::Timeout(attempts) => { let send_result = @@ -730,9 +755,9 @@ impl Tra } /// Handle HTTP error responses from send with retry - async fn handle_http_send_error( + fn handle_http_send_error( &self, - response: http::Response, + response: &http::Response, payload_len: usize, chunks: usize, attempts: u32, @@ -740,7 +765,7 @@ impl Tra let status = response.status(); // Check if the agent state has changed for error responses - self.info_response_observer.check_response(&response); + self.info_response_observer.check_response(response); let send_result = SendResult::failure( TransportErrorType::Http(status.as_u16()), @@ -768,10 +793,6 @@ impl Tra self.agent_payload_response_version.as_ref(), version_header, ) { - (false, _, _) => { - // If the status is not success, the rates are considered unchanged - false - } (true, None, _) => { // if the agent_payload_response_version fingerprint is not enabled we always // return the new rates @@ -792,30 +813,30 @@ impl Tra attempts: u32, body: String, payload_version_changed: bool, - ) -> Result { + ) -> AgentResponse { debug!(chunks = chunks, "Trace chunks sent successfully to agent"); let send_result = SendResult::success(payload_len, chunks, attempts); self.emit_send_result(&send_result); - Ok(if payload_version_changed { + if payload_version_changed { AgentResponse::Changed { body } } else { AgentResponse::Unchanged - }) + } } - async fn handle_agent_response( + fn handle_agent_response( &self, chunks: usize, - response: http::Response, + response: &http::Response, payload_len: usize, attempts: u32, ) -> Result { // Check if the agent state has changed - self.info_response_observer.check_response(&response); + self.info_response_observer.check_response(response); let status = response.status(); - let payload_version_changed = self.check_payload_version_changed(&response); + let payload_version_changed = self.check_payload_version_changed(response); let body = String::from_utf8_lossy(response.body()).to_string(); if !status.is_success() { @@ -835,13 +856,13 @@ impl Tra ))); } - self.handle_successful_trace_response( + Ok(self.handle_successful_trace_response( chunks, payload_len, attempts, body, payload_version_changed, - ) + )) } fn get_agent_url(&self) -> Uri { @@ -899,7 +920,7 @@ mod tests { ..Default::default() }; - let tracer_header_tags: TracerHeaderTags = (&tracer_tags).into(); + let tracer_header_tags: TracerHeaderTags<'_> = (&tracer_tags).into(); assert_eq!(tracer_header_tags.tracer_version, "v0.1"); assert_eq!(tracer_header_tags.lang, "rust"); @@ -943,7 +964,7 @@ mod tests { } fn build_test_exporter( - url: String, + url: &str, dogstatsd_url: Option, input: TraceExporterInputFormat, output: TraceExporterOutputFormat, @@ -952,7 +973,7 @@ mod tests { ) -> TraceExporter { let mut builder = TraceExporterBuilder::default(); builder - .set_url(&url) + .set_url(url) .set_service("test") .set_env("staging") .set_tracer_version("v0.1") @@ -968,7 +989,7 @@ mod tests { if let Some(url) = dogstatsd_url { builder.set_dogstatsd_url(&url); - }; + } if enable_telemetry { builder.enable_telemetry(TelemetryConfig { @@ -994,7 +1015,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + &fake_agent.url("/v0.4/traces"), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1064,7 +1085,7 @@ mod tests { let fake_agent = MockServer::start(); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + &fake_agent.url("/v0.4/traces"), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1100,7 +1121,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + &fake_agent.url("/v0.4/traces"), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1208,7 +1229,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + &fake_agent.url("/v0.4/traces"), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1312,7 +1333,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + &fake_agent.url("/v0.4/traces"), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1556,7 +1577,7 @@ mod tests { }); let exporter = build_test_exporter( - server.url("/"), + &server.url("/"), None, TraceExporterInputFormat::V05, TraceExporterOutputFormat::V05, @@ -1642,7 +1663,7 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] - /// Tests that if agent_response_payload_version is not enabled + /// Tests that if `agent_response_payload_version` is not enabled /// the exporter always returns the response body fn test_agent_response_payload_version_disabled() { let server = MockServer::start(); @@ -1676,7 +1697,7 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] - /// Tests that if agent_response_payload_version is enabled + /// Tests that if `agent_response_payload_version` is enabled /// the exporter returns the response body only once /// and then returns Unchanged response until the payload version header changes fn test_agent_response_payload_version() { @@ -1962,9 +1983,10 @@ mod single_threaded_tests { // tests on CI where we shutdown before the stats worker had time to start let start_time = std::time::Instant::now(); while !exporter.is_stats_worker_active() { - if start_time.elapsed() > Duration::from_secs(10) { - panic!("Timeout waiting for stats worker to become active"); - } + assert!( + start_time.elapsed() <= Duration::from_secs(10), + "Timeout waiting for stats worker to become active" + ); std::thread::sleep(Duration::from_millis(10)); } @@ -2060,9 +2082,10 @@ mod single_threaded_tests { // tests on CI where we shutdown before the stats worker had time to start let start_time = std::time::Instant::now(); while !exporter.is_stats_worker_active() { - if start_time.elapsed() > Duration::from_secs(10) { - panic!("Timeout waiting for stats worker to become active"); - } + assert!( + start_time.elapsed() <= Duration::from_secs(10), + "Timeout waiting for stats worker to become active" + ); std::thread::sleep(Duration::from_millis(10)); } @@ -2075,13 +2098,13 @@ mod single_threaded_tests { #[cfg(feature = "stats-obfuscation")] fn build_obfuscation_test_exporter( - url: String, + url: &str, runtime: Arc, opt_in: bool, ) -> TraceExporter { let mut builder = TraceExporter::::builder(); builder - .set_url(&url) + .set_url(url) .set_service("test") .set_env("staging") .set_tracer_version("v0.1") @@ -2133,7 +2156,7 @@ mod single_threaded_tests { }); let runtime = Arc::new(SharedRuntime::new().unwrap()); - let exporter = build_obfuscation_test_exporter(server.url("/"), runtime.clone(), opt_in); + let exporter = build_obfuscation_test_exporter(&server.url("/"), runtime.clone(), opt_in); while agent_info::get_agent_info().is_none() { std::thread::sleep(Duration::from_millis(100)); @@ -2148,9 +2171,10 @@ mod single_threaded_tests { let start = std::time::Instant::now(); while !exporter.is_stats_worker_active() { - if start.elapsed() > Duration::from_secs(10) { - panic!("Timeout waiting for stats worker to become active"); - } + assert!( + start.elapsed() <= Duration::from_secs(10), + "Timeout waiting for stats worker to become active" + ); std::thread::sleep(Duration::from_millis(10)); } diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 6e1d8a4f49..0c56afcd73 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -105,7 +105,7 @@ fn get_span_kinds_for_stats(agent_info: &Arc) -> Vec { pub(crate) fn start_stats_computation< C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, >( - ctx: &StatsContext, + ctx: &StatsContext<'_>, span_kinds: Vec, peer_tags: Vec, capabilities: C, @@ -132,7 +132,7 @@ pub(crate) fn start_stats_computation< fn create_and_start_stats_worker< C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, >( - ctx: &StatsContext, + ctx: &StatsContext<'_>, stats_concentrator: &Arc>, capabilities: C, client_side_stats: &StatsComputationConfig, @@ -143,7 +143,7 @@ fn create_and_start_stats_worker< stats_concentrator.clone(), StatsMetadata::from(ctx.metadata.clone()), Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)), - capabilities.clone(), + capabilities, #[cfg(feature = "stats-obfuscation")] client_side_stats.obfuscation_config.clone(), #[cfg(feature = "stats-obfuscation")] @@ -170,7 +170,7 @@ fn create_and_start_stats_worker< /// /// Used when client-side stats is disabled by the agent pub(crate) fn stop_stats_computation( - ctx: &StatsContext, + ctx: &StatsContext<'_>, client_side_stats: &ArcSwap, ) { if let StatsComputationStatus::Enabled { @@ -196,7 +196,7 @@ pub(crate) fn stop_stats_computation( pub(crate) fn handle_stats_disabled_by_agent< C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, >( - ctx: &StatsContext, + ctx: &StatsContext<'_>, agent_info: &Arc, capabilities: C, client_side_stats: &StatsComputationConfig, @@ -209,16 +209,15 @@ pub(crate) fn handle_stats_disabled_by_agent< capabilities, client_side_stats, ); - match status { - Ok(()) => { - #[cfg(feature = "stats-obfuscation")] - update_obfuscation_config(agent_info, client_side_stats); - debug!("Client-side stats enabled"); - } - Err(_) => error!("Failed to start stats computation"), + if matches!(status, Ok(())) { + #[cfg(feature = "stats-obfuscation")] + update_obfuscation_config(agent_info, client_side_stats); + debug!("Client-side stats enabled"); + } else { + error!("Failed to start stats computation"); } } else { - debug!("Client-side stats computation has been disabled by the agent") + debug!("Client-side stats computation has been disabled by the agent"); } } @@ -256,7 +255,7 @@ fn update_obfuscation_config( #[cfg(not(target_arch = "wasm32"))] /// Handle stats computation when it's already enabled pub(crate) fn handle_stats_enabled( - ctx: &StatsContext, + ctx: &StatsContext<'_>, agent_info: &Arc, stats_concentrator: &Arc>, client_side_stats: &StatsComputationConfig, @@ -265,11 +264,12 @@ pub(crate) fn handle_stats_enabled( let mut concentrator = stats_concentrator.lock_or_panic(); concentrator.set_span_kinds(get_span_kinds_for_stats(agent_info)); concentrator.set_peer_tags(agent_info.info.peer_tags.clone().unwrap_or_default()); + drop(concentrator); #[cfg(feature = "stats-obfuscation")] update_obfuscation_config(agent_info, client_side_stats); } else { stop_stats_computation(ctx, &client_side_stats.status); - debug!("Client-side stats computation has been disabled by the agent") + debug!("Client-side stats computation has been disabled by the agent"); } } @@ -294,7 +294,7 @@ fn add_spans_to_stats( /// Returns the number of P0 traces and spans that were dropped. pub(crate) fn process_traces_for_stats( traces: &mut Vec>>, - header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, + header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags<'_>, client_side_stats: &ArcSwap, client_computed_top_level: bool, ) -> libdd_trace_utils::span::trace_utils::DroppedP0Stats { @@ -341,8 +341,8 @@ pub(crate) fn is_stats_worker_active(client_side_stats: &ArcSwap for StatsMetadata { - fn from(m: TracerMetadata) -> StatsMetadata { - StatsMetadata { + fn from(m: TracerMetadata) -> Self { + Self { hostname: m.hostname, env: m.env, app_version: m.app_version, diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 005050024f..99e26c39b4 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -40,7 +40,7 @@ pub(super) struct TraceSerializer { impl TraceSerializer { /// Create a new trace serializer - pub(super) fn new(output_format: TraceExporterOutputFormat) -> Self { + pub(super) const fn new(output_format: TraceExporterOutputFormat) -> Self { Self { previous_serialised_len: AtomicUsize::new(MIN_BUFFER_CAPACITY), output_format, @@ -51,13 +51,13 @@ impl TraceSerializer { pub(super) fn prepare_traces_payload( &self, traces: Vec>>, - header_tags: TracerHeaderTags, + header_tags: TracerHeaderTags<'_>, agent_payload_response_version: Option<&AgentResponsePayloadVersion>, ) -> Result { let payload = self.collect_and_process_traces(traces)?; let chunks = payload.size(); let headers = - self.build_traces_headers(header_tags, chunks, agent_payload_response_version); + Self::build_traces_headers(header_tags, chunks, agent_payload_response_version); let mp_payload = self.serialize_payload(&payload)?; Ok(PreparedTracesPayload { @@ -83,8 +83,7 @@ impl TraceSerializer { /// Build HTTP headers for traces request fn build_traces_headers( - &self, - header_tags: TracerHeaderTags, + header_tags: TracerHeaderTags<'_>, chunk_count: usize, agent_payload_response_version: Option<&AgentResponsePayloadVersion>, ) -> HeaderMap { @@ -111,9 +110,10 @@ impl TraceSerializer { .load(Ordering::Relaxed) .max(MIN_BUFFER_CAPACITY); let buff = match payload { - tracer_payload::TraceChunks::V04(p) => { - msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32) - } + tracer_payload::TraceChunks::V04(p) => msgpack_encoder::v04::to_vec_with_capacity( + p, + u32::try_from(capacity).unwrap_or(u32::MAX), + ), tracer_payload::TraceChunks::V05(p) => { let mut buff = Vec::with_capacity(capacity); rmp_serde::encode::write(&mut buff, p) @@ -186,9 +186,9 @@ mod tests { #[test] fn test_build_traces_headers() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let _serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let header_tags = create_test_header_tags(); - let headers = serializer.build_traces_headers(header_tags, 3, None); + let headers = TraceSerializer::build_traces_headers(header_tags, 3, None); // Check basic headers are present assert_eq!(headers.get(DATADOG_SEND_REAL_HTTP_STATUS).unwrap(), "1"); @@ -216,9 +216,9 @@ mod tests { #[test] fn test_build_traces_headers_with_agent_version() { let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let _serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); let header_tags = create_test_header_tags(); - let headers = serializer.build_traces_headers(header_tags, 2, Some(&agent_version)); + let headers = TraceSerializer::build_traces_headers(header_tags, 2, Some(&agent_version)); // Check that agent payload version header is included assert!(headers.contains_key(DATADOG_RATES_PAYLOAD_VERSION)); @@ -278,12 +278,12 @@ mod tests { let result = serializer.serialize_payload(&payload); assert!(result.is_ok()); - let serialized = result.unwrap(); - assert!(!serialized.is_empty()); + let serialized_bytes = result.unwrap(); + assert!(!serialized_bytes.is_empty()); // Verify we can deserialize it back and data integrity is preserved let (deserialized_traces, _) = - libdd_trace_utils::msgpack_decoder::v04::from_slice(&serialized).unwrap(); + libdd_trace_utils::msgpack_decoder::v04::from_slice(&serialized_bytes).unwrap(); assert_eq!(deserialized_traces.len(), 1); assert_eq!(deserialized_traces[0].len(), 1); @@ -313,12 +313,12 @@ mod tests { let result = serializer.serialize_payload(&payload); assert!(result.is_ok()); - let serialized = result.unwrap(); - assert!(!serialized.is_empty()); + let serialized_bytes = result.unwrap(); + assert!(!serialized_bytes.is_empty()); // Verify we can deserialize it back and data integrity is preserved let (deserialized_traces, _) = - libdd_trace_utils::msgpack_decoder::v05::from_slice(&serialized).unwrap(); + libdd_trace_utils::msgpack_decoder::v05::from_slice(&serialized_bytes).unwrap(); assert_eq!(deserialized_traces.len(), 1); assert_eq!(deserialized_traces[0].len(), 1); @@ -417,8 +417,8 @@ mod tests { ..Default::default() }; - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); - let headers = serializer.build_traces_headers(header_tags, 1, None); + let _serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let headers = TraceSerializer::build_traces_headers(header_tags, 1, None); assert_eq!(headers.get("datadog-meta-lang").unwrap(), "python"); assert_eq!(headers.get("datadog-meta-lang-version").unwrap(), "3.9.0"); diff --git a/libdd-data-pipeline/tests/test_fetch_info.rs b/libdd-data-pipeline/tests/test_fetch_info.rs index 81ec7acea8..f62585b743 100644 --- a/libdd-data-pipeline/tests/test_fetch_info.rs +++ b/libdd-data-pipeline/tests/test_fetch_info.rs @@ -19,11 +19,11 @@ mod tracing_integration_tests { let info = fetch_info::(&endpoint) .await .expect("Failed to fetch agent info"); - assert!( + assert_eq!( info.info .version - .expect("Missing version field in agent response") - == "test" + .expect("Missing version field in agent response"), + "test" ); } @@ -46,12 +46,12 @@ mod tracing_integration_tests { .await .expect("Agent request timed out"); - assert!( + assert_eq!( info.info .version .as_ref() - .expect("Missing version field in agent response") - == "test" + .expect("Missing version field in agent response"), + "test" ); } } diff --git a/libdd-trace-normalization/src/normalizer.rs b/libdd-trace-normalization/src/normalizer.rs index 7450dad908..73edfd5937 100644 --- a/libdd-trace-normalization/src/normalizer.rs +++ b/libdd-trace-normalization/src/normalizer.rs @@ -101,7 +101,7 @@ pub fn normalize_chunk(chunk: &mut pb::TraceChunk, root_span_index: usize) -> an if chunk.origin.is_empty() { if let Some(origin) = root_span.meta.get(TAG_ORIGIN) { // Older tracers set origin in the root span. - chunk.origin = origin.to_string(); + chunk.origin = origin.clone(); } } Ok(()) diff --git a/libdd-trace-obfuscation/src/sql.rs b/libdd-trace-obfuscation/src/sql.rs index df5451973e..35e12af4b3 100644 --- a/libdd-trace-obfuscation/src/sql.rs +++ b/libdd-trace-obfuscation/src/sql.rs @@ -35,7 +35,7 @@ impl TryFrom<&str> for DbmsKind { } #[allow(deprecated)] -#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] #[non_exhaustive] pub enum SqlObfuscationMode {