Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ redundant_else = "allow" # less clear
option_if_let_else = "allow" # less clear
unreadable_literal = "allow" # annoying for all the ids in the codebase
cognitive_complexity = "allow" # using clippy::too_many_lines already
inline_always = "allow" # hot-path perf hints are intentional; let the author decide
missing_errors_doc = "allow" # error types are self-descriptive; boilerplate # Errors sections add noise

[workspace.lints.rust]
future_incompatible = { level = "warn", priority = -1 }
Expand Down
3 changes: 3 additions & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,6 @@ fips = [
]
test-utils = []
regex-lite = ["libdd-common/regex-lite"]

[lints]
workspace = true
14 changes: 7 additions & 7 deletions libdd-data-pipeline/benches/trace_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
#![allow(clippy::expect_used, reason = "bench harness: panics are acceptable")]

use std::collections::HashMap;
use std::pin::Pin;
Expand All @@ -18,7 +19,7 @@ use libdd_trace_utils::span::v04::SpanBytes;
// Number of chunks each sender thread sends per benchmark iteration.
const CHUNKS_PER_SENDER: usize = 900;

fn bs(s: &'static str) -> BytesString {
const fn bs(s: &'static str) -> BytesString {
BytesString::from_static(s)
}

Expand Down Expand Up @@ -101,15 +102,14 @@ fn bench_trace_buffer(c: &mut Criterion) {
));

group.bench_function(
BenchmarkId::new(format!("{}_senders", num_senders), delay_label),
BenchmarkId::new(format!("{num_senders}_senders"), delay_label),
|b| {
b.iter_batched(
|| {
Vec::from_iter(
(0..num_senders)
.map(|_| (0..CHUNKS_PER_SENDER).map(|_| vec![make_span()]))
.map(Vec::from_iter),
)
(0..num_senders)
.map(|_| (0..CHUNKS_PER_SENDER).map(|_| vec![make_span()]))
.map(std::iter::Iterator::collect::<Vec<_>>)
.collect::<Vec<_>>()
},
|input| {
std::thread::scope(|s| {
Expand Down
8 changes: 7 additions & 1 deletion libdd-data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
#![allow(
clippy::expect_used,
clippy::cast_possible_wrap,
clippy::cast_possible_truncation,
reason = "example binary: panics and lossy casts are acceptable"
)]

use clap::Parser;
use libdd_capabilities_impl::NativeCapabilities;
Expand Down Expand Up @@ -27,7 +33,7 @@ fn get_span(now: i64, trace_id: u64, span_id: u64) -> pb::Span {
service: "data-pipeline-test".to_string(),
name: format!("test-name-{}", span_id % 2),
resource: format!("test-resource-{}", (span_id + trace_id) % 3),
error: if trace_id % 10 == 0 { 1 } else { 0 },
error: i32::from(trace_id % 10 == 0),
metrics: HashMap::from([
("_sampling_priority_v1".to_string(), 1.0),
("_dd.measured".to_string(), 1.0),
Expand Down
84 changes: 53 additions & 31 deletions libdd-data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub enum FetchInfoStatus {
/// If either the agent state hash or container tags hash is different from the current one:
/// - Return a `FetchInfoStatus::NewState` of the info struct
/// - Else return `FetchInfoStatus::SameState`
#[allow(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a clippy bug, I wonder if we shouldn't just disable it for the whole file and call it a day... that will be less place to update when we bump to 1.87, and this will happen soon ™️

clippy::future_not_send,
reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0"
)]
async fn fetch_info_with_state_and_container_tags<C: HttpClientCapability + SleepCapability>(
info_endpoint: &Endpoint,
current_state_hash: Option<&str>,
Expand All @@ -59,11 +63,15 @@ async fn fetch_info_with_state_and_container_tags<C: HttpClientCapability + Slee
Ok(FetchInfoStatus::NewState(info))
}

/// Fetch info from the given info_endpoint and compare its state to the current state hash.
/// Fetch info from the given `info_endpoint` and compare its state to the current state hash.
///
/// If the state hash is different from the current one:
/// - Return a `FetchInfoStatus::NewState` of the info struct
/// - Else return `FetchInfoStatus::SameState`
#[allow(
clippy::future_not_send,
reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0"
)]
pub async fn fetch_info_with_state<C: HttpClientCapability + SleepCapability>(
info_endpoint: &Endpoint,
current_state_hash: Option<&str>,
Expand Down Expand Up @@ -92,6 +100,10 @@ pub async fn fetch_info_with_state<C: HttpClientCapability + SleepCapability>(
/// # Ok(())
/// # }
/// ```
#[allow(
clippy::future_not_send,
reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0"
)]
pub async fn fetch_info<C: HttpClientCapability + SleepCapability>(
info_endpoint: &Endpoint,
) -> Result<Box<AgentInfo>> {
Expand All @@ -104,8 +116,12 @@ pub async fn fetch_info<C: HttpClientCapability + SleepCapability>(

/// Fetch and hash the response from the agent info endpoint.
///
/// Returns a tuple of (state_hash, response_body_bytes, container_tags_hash).
/// Returns a tuple of (`state_hash`, `response_body_bytes`, `container_tags_hash`).
/// The hash is calculated using SHA256 to match the agent's calculation method.
#[allow(
clippy::future_not_send,
reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0"
)]
async fn fetch_and_hash_response<C: HttpClientCapability + SleepCapability>(
info_endpoint: &Endpoint,
) -> Result<(String, bytes::Bytes, Option<String>)> {
Expand All @@ -120,28 +136,28 @@ async fn fetch_and_hash_response<C: HttpClientCapability + SleepCapability>(
// Runtime-agnostic timeout: race the request against a capability-driven
// sleep instead of `tokio::time::timeout`, which requires a tokio reactor
// (not available on wasm where we run on the JS event loop).
let res = tokio::select! {
let response = tokio::select! {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really clippy or unrelated claude changes ? 😛

biased;
result = client.request(req) => result?,
_ = sleeper.sleep(timeout) => {
() = sleeper.sleep(timeout) => {
return Err(anyhow!("Request to /info timed out after {:?}", timeout));
}
};

// Extract the Datadog-Container-Tags-Hash header
let container_tags_hash = res
let container_tags_hash = response
.headers()
.get("Datadog-Container-Tags-Hash")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
.map(std::string::ToString::to_string);

let body_data = res.into_body();
let body_data = response.into_body();
let hash = format!("{:x}", Sha256::digest(&body_data));

Ok((hash, body_data, container_tags_hash))
}

/// Fetch the info endpoint and update an ArcSwap keeping it up-to-date.
/// Fetch the info endpoint and update an `ArcSwap` keeping it up-to-date.
///
/// This type implements [`libdd_shared_runtime::Worker`] and is intended to be driven by a worker
/// runner such as [`libdd_shared_runtime::SharedRuntime`].
Expand Down Expand Up @@ -191,7 +207,7 @@ async fn fetch_and_hash_response<C: HttpClientCapability + SleepCapability>(
/// `C` is the capability bundle, see [`HttpClientCapability`] and [`SleepCapability`].
/// Leaf crates pin it to a concrete type.
#[derive(Debug)]
pub struct AgentInfoFetcher<C: HttpClientCapability + SleepCapability> {
pub struct AgentInfoFetcher<C: HttpClientCapability + SleepCapability + Sync> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaalibaba42 is that really ok to change a sync? This doesn't look like a clippy trivial fix.

info_endpoint: Endpoint,
refresh_interval: Duration,
trigger_rx: Option<mpsc::Receiver<()>>,
Expand All @@ -201,13 +217,13 @@ pub struct AgentInfoFetcher<C: HttpClientCapability + SleepCapability> {
_phantom: PhantomData<C>,
}

impl<C: HttpClientCapability + SleepCapability> AgentInfoFetcher<C> {
impl<C: HttpClientCapability + SleepCapability + Sync> AgentInfoFetcher<C> {
/// Return a new `AgentInfoFetcher` fetching the `info_endpoint` on each `refresh_interval`
/// and updating the stored info.
///
/// Returns a tuple of (fetcher, trigger_component) where:
/// - `fetcher`: The AgentInfoFetcher to be run in a background task
/// - `response_observer`: The ResponseObserver component for checking HTTP responses
/// Returns a tuple of (fetcher, `trigger_component`) where:
/// - `fetcher`: The `AgentInfoFetcher` to be run in a background task
/// - `response_observer`: The `ResponseObserver` component for checking HTTP responses
pub fn new(info_endpoint: Endpoint, refresh_interval: Duration) -> (Self, ResponseObserver) {
// The trigger channel stores a single message to avoid multiple triggers.
let (trigger_tx, trigger_rx) = mpsc::channel(1);
Expand Down Expand Up @@ -244,7 +260,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Wor
if AGENT_INFO_CACHE.load().is_none() {
return;
}
self.trigger().await
self.trigger().await;
}

async fn trigger(&mut self) {
Expand All @@ -261,7 +277,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Wor
}
}
// Regular periodic fetch timer
_ = sleeper.sleep(self.refresh_interval) => {}
() = sleeper.sleep(self.refresh_interval) => {}
}
}
None => {
Expand All @@ -275,19 +291,27 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Wor
// Release the IoStack waker stored in trigger_rx by waking the channel and drain the
// message to avoid a spurious fetch on restart. If the channel is not empty then it has
// already been waked.
if self.trigger_rx.as_ref().is_some_and(|rx| rx.is_empty()) {
if self
.trigger_rx
.as_ref()
.is_some_and(tokio::sync::mpsc::Receiver::is_empty)
{
let _ = self.trigger_tx.try_send(());
self.drain();
};
}
}

async fn run(&mut self) {
self.fetch_and_update().await;
}
}

impl<C: HttpClientCapability + SleepCapability> AgentInfoFetcher<C> {
impl<C: HttpClientCapability + SleepCapability + Sync> AgentInfoFetcher<C> {
/// Fetch agent info and update cache if needed
#[allow(
clippy::future_not_send,
reason = "FIXME: remove when MSRV > 1.84.1 — regression in 1.84.1 patch: future_not_send fires spuriously on async fns holding non-Send/Sync generics across .await even when the future is never required to be Send; fixed in 1.85.0"
)]
async fn fetch_and_update(&self) {
let current_info = AGENT_INFO_CACHE.load();
let current_hash = current_info.as_ref().map(|info| info.state_hash.as_str());
Expand All @@ -306,7 +330,7 @@ impl<C: HttpClientCapability + SleepCapability> AgentInfoFetcher<C> {
AGENT_INFO_CACHE.store(Some(Arc::new(*new_info)));
}
Ok(FetchInfoStatus::SameState) => {
debug!("Agent info is up-to-date")
debug!("Agent info is up-to-date");
}
Err(err) => {
warn!(?err, "Error while fetching /info");
Expand All @@ -325,8 +349,9 @@ pub struct ResponseObserver {
}

impl ResponseObserver {
/// Create a new ResponseObserver with the given channel sender.
pub fn new(trigger_tx: mpsc::Sender<()>) -> Self {
/// Create a new `ResponseObserver` with the given channel sender.
#[must_use]
pub const fn new(trigger_tx: mpsc::Sender<()>) -> Self {
Self { trigger_tx }
}

Expand All @@ -344,11 +369,11 @@ impl ResponseObserver {
let current_state = AGENT_INFO_CACHE.load();
if current_state.as_ref().map(|s| s.state_hash.as_str()) != Some(state_str) {
match self.trigger_tx.try_send(()) {
Ok(_) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(())) => {
debug!("Response observer channel full, fetch has already been triggered");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Err(mpsc::error::TrySendError::Closed(())) => {
debug!("Agent info fetcher channel closed, unable to trigger refresh");
}
}
Expand Down Expand Up @@ -583,10 +608,8 @@ mod single_threaded_tests {
.body(r#"{"version":"1"}"#);
});
let endpoint = Endpoint::from_url(server.url("/info").parse().unwrap());
let (fetcher, _response_observer) = AgentInfoFetcher::<NativeCapabilities>::new(
endpoint.clone(),
Duration::from_millis(100),
);
let (fetcher, _response_observer) =
AgentInfoFetcher::<NativeCapabilities>::new(endpoint, Duration::from_millis(100));
assert!(agent_info::get_agent_info().is_none());
let shared_runtime = SharedRuntime::new().unwrap();
let _ = shared_runtime.spawn_worker(fetcher, true).unwrap();
Expand Down Expand Up @@ -652,6 +675,8 @@ mod single_threaded_tests {
#[cfg_attr(miri, ignore)]
#[test]
fn test_agent_info_trigger_different_state() {
const MAX_ATTEMPTS: u32 = 500;
const SLEEP_DURATION_MS: u64 = 10;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.path("/info");
Expand Down Expand Up @@ -685,9 +710,6 @@ mod single_threaded_tests {
response_observer.check_response(&response);

// Wait for the fetch to complete
const MAX_ATTEMPTS: u32 = 500;
const SLEEP_DURATION_MS: u64 = 10;

let mut attempts = 0;
while mock.calls() == 0 && attempts < MAX_ATTEMPTS {
attempts += 1;
Expand Down
10 changes: 5 additions & 5 deletions libdd-data-pipeline/src/agent_info/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct AgentInfoStruct {
}

/// Require/reject lists for tag-based trace filters exposed by the agent /info endpoint.
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)]
pub struct FilterTagsConfig {
/// All listed filters must match at least one root-span tag for the trace to be accepted.
pub require: Option<Vec<String>>,
Expand All @@ -75,7 +75,7 @@ pub struct Config {
}

#[allow(missing_docs)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)]
pub struct ObfuscationConfig {
pub elastic_search: bool,
pub mongo: bool,
Expand All @@ -91,21 +91,21 @@ pub struct ObfuscationConfig {
}

#[allow(missing_docs)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)]
pub struct HttpObfuscationConfig {
pub remove_query_string: bool,
pub remove_path_digits: bool,
}

#[allow(missing_docs)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)]
pub struct RedisObfuscationConfig {
pub enabled: bool,
pub remove_all_args: bool,
}

#[allow(missing_docs)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq, Eq)]
pub struct MemcachedObfuscationConfig {
pub enabled: bool,
pub keep_command: bool,
Expand Down
Loading
Loading