Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ duplicate = "2.0.1"
[features]
default = ["https", "telemetry"]
telemetry = ["libdd-telemetry"]
# Note (async-libdatadog-host-runtime): the sync entry points on `TraceExporter`
# (`send`, `send_trace_chunks`, `shutdown`) and `TraceExporterBuilder::build` are
# thin `SharedRuntime::block_on` wrappers over their `_async` counterparts. They
# are always compiled today, but they panic / return `io::Error::Unsupported` if
# invoked from inside a host tokio runtime (i.e. on a `SharedRuntime` built via
# `from_handle`). A `sync-api` cargo feature was prototyped to make that a
# compile-time error for async-only consumers like dd-trace-rs; it was reverted
# as not worth the churn for the current set of in-tree callers. Re-add it the
# day a consumer wants a build of libdd-data-pipeline that statically forbids
# the sync facade.
https = [
"libdd-common/https",
"libdd-capabilities-impl/https",
Expand Down
195 changes: 193 additions & 2 deletions libdd-data-pipeline/src/trace_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,19 @@ impl<T: Send + BufferSize + 'static> TraceBuffer<T> {
self.tx.trigger_flush()
}

/// Trigger a flush and synchronously wait for it to be processed by the worker.
///
/// Useful at shutdown to make sure the last batch has been handed off to the export
/// operation (and therefore any side effects like spawning the stats worker have
/// happened) before tearing down the runtime. Returns immediately if the buffer is
/// empty.
pub fn flush_and_wait(&self, timeout: Duration) -> Result<(), TraceBufferError> {
let Some(flush_gen) = self.tx.trigger_flush_and_capture_gen()? else {
return Ok(());
};
self.tx.wait_flush_done(flush_gen, Some(timeout))
}

pub fn queue_metrics(&self) -> QueueMetricsFetcher<T> {
QueueMetricsFetcher {
waiter: self.tx.waiter.clone(),
Expand All @@ -360,6 +373,19 @@ impl<T> fmt::Debug for TraceBuffer<T> {
}
}

impl<T> Drop for TraceBuffer<T> {
/// Best-effort flush so any buffered chunks are handed to the worker before the producer
/// end disappears. The worker itself is owned by the [`SharedRuntime`] and continues to
/// run independently — its own [`Worker::shutdown`] hook (invoked by `SharedRuntime`)
/// will drain whatever remains after this notify.
///
/// Errors are intentionally ignored: a `TraceBuffer` dropped after the runtime has
/// already been shut down has nothing useful to do here.
fn drop(&mut self) {
let _ = self.tx.trigger_flush();
}
}

pub struct QueueMetricsFetcher<T> {
waiter: Arc<Waiter<T>>,
}
Expand Down Expand Up @@ -488,6 +514,20 @@ impl<T> Sender<T> {
Ok(())
}

/// Trigger a flush and capture the batch generation that the worker must overtake
/// before the flush can be considered done. Returns `Ok(None)` if the batch is
/// currently empty (nothing to flush, no need to wait).
fn trigger_flush_and_capture_gen(&self) -> Result<Option<BatchGeneration>, TraceBufferError> {
let mut state = self.get_running_state()?;
if state.batch.byte_count == 0 {
return Ok(None);
}
state.flush_needed = true;
let gen = state.batch.batch_gen;
self.waiter.notify_receiver(state);
Ok(Some(gen))
}

fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), TraceBufferError> {
if timeout.is_zero() {
return Err(TraceBufferError::TimedOut(Duration::ZERO));
Expand Down Expand Up @@ -578,6 +618,16 @@ impl<T> Receiver<T> {
self.waiter.notify_sender(state);
Ok(())
}

/// Synchronously drain the current batch without waiting for a flush trigger.
///
/// Used during shutdown to recover any chunks that the sender accumulated but never had
/// the chance to flush (e.g. the worker loop was cancelled before the next timeout tick).
fn drain(&self) -> Result<Vec<TraceChunk<T>>, MutexPoisonedError> {
let mut state = self.lock_state()?;
state.flush_needed = false;
Ok(state.batch.export())
}
}

#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Default)]
Expand Down Expand Up @@ -765,6 +815,19 @@ impl<T: Send + Debug + 'static> Worker for TraceExporterWorker<T> {
}

async fn shutdown(&mut self) {
// Drain any chunks the sender has buffered but not yet flushed. Without this the
// final partial batch is silently dropped on shutdown — including the common case
// where a tokio app calls `tracer.shutdown()` immediately after producing spans.
match self.rx.drain() {
Ok(trace_chunks) if !trace_chunks.is_empty() => {
self.export_trace_chunks(trace_chunks).await;
let _ = self.rx.ack_export();
}
Ok(_) => {}
Err(MutexPoisonedError) => {
tracing::error!("TraceExporterWorker mailbox poisoned during shutdown drain");
}
}
let _ = self.rx.shutdown_done();
}

Expand Down Expand Up @@ -888,7 +951,7 @@ mod tests {
);

// pause
rt.before_fork();
rt.before_fork().expect("error pausing");

for i in 1..=3 {
sender.send_chunk(vec![(); i]).unwrap();
Expand Down Expand Up @@ -968,6 +1031,69 @@ mod tests {
rt.shutdown(None).unwrap();
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_shutdown_drains_pending_batch() {
// Set thresholds high enough that send_chunk alone never triggers a flush,
// and the timer long enough that it won't fire during the test. The only way
// the assert_export closure should be invoked is via the shutdown drain path.
let exported = Arc::new(std::sync::Mutex::new(Vec::<usize>::new()));
let exported_clone = exported.clone();
let (rt, _, sender) = make_buffer(
Box::new(move |chunks| {
let mut lengths = chunks.into_iter().map(|c| c.len()).collect::<Vec<_>>();
lengths.sort();
exported_clone.lock().unwrap().extend(lengths);
}),
TraceBufferConfig::default()
.max_buffered_bytes(100)
.flush_threshold_bytes(100)
.max_flush_interval(Duration::from_secs(u32::MAX as u64)),
);

sender.send_chunk(vec![()]).unwrap();
sender.send_chunk(vec![(), ()]).unwrap();

// Shutdown must export the two buffered chunks even though no flush ever fired.
rt.shutdown(Some(Duration::from_secs(10))).unwrap();
sender.wait_shutdown_done(Duration::from_secs(10)).unwrap();

let exported = exported.lock().unwrap().clone();
assert_eq!(exported, vec![1, 2]);
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_flush_and_wait() {
// Same setup as test_force_flush, but verify flush_and_wait blocks until the
// worker has actually processed the batch.
let (rt, sem, sender) = make_buffer(
Box::new(|chunks| assert_eq!(chunks.len(), 2)),
TraceBufferConfig::default()
.max_buffered_bytes(100)
.flush_threshold_bytes(100)
.max_flush_interval(Duration::from_secs(u32::MAX as u64)),
);

sender.send_chunk(vec![()]).unwrap();
sender.send_chunk(vec![(), ()]).unwrap();
assert_eq!(sem.available_permits(), 0);

sender
.flush_and_wait(Duration::from_secs(10))
.expect("flush_and_wait failed");
// After flush_and_wait returns, the worker must have actually exported.
assert_eq!(sem.available_permits(), 1);

// Calling flush_and_wait on an empty buffer is a no-op and must not block.
sender
.flush_and_wait(Duration::from_secs(10))
.expect("flush_and_wait on empty buffer should not error");

rt.shutdown(None).unwrap();
sender.wait_shutdown_done(Duration::from_secs(10)).unwrap();
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_force_flush() {
Expand Down Expand Up @@ -1006,7 +1132,7 @@ mod tests {
sender.send_chunk(vec![()]).unwrap();
assert_eq!(sem.available_permits(), 0);

rt.before_fork();
rt.before_fork().unwrap();
rt.after_fork_child().unwrap();

sender.send_chunk(vec![(), ()]).unwrap();
Expand All @@ -1015,4 +1141,69 @@ mod tests {
assert_eq!(sender.queue_metrics().get_metrics().spans_queued, 2);
rt.shutdown(None).unwrap();
}

/// Regression coverage for the [`Drop`] impl on [`TraceBuffer`]: dropping the producer
/// after spans have been buffered (but without an explicit `force_flush`) should
/// trigger one final flush so the worker exports the pending chunks instead of losing
/// them.
#[test]
#[cfg_attr(miri, ignore)]
fn test_drop_triggers_flush() {
let exported = Arc::new(std::sync::Mutex::new(Vec::<usize>::new()));
let exported_clone = exported.clone();
let (rt, sem, sender) = make_buffer(
Box::new(move |chunks| {
let mut lengths = chunks.into_iter().map(|c| c.len()).collect::<Vec<_>>();
lengths.sort();
exported_clone.lock().unwrap().extend(lengths);
}),
TraceBufferConfig::default()
.max_buffered_bytes(100)
.flush_threshold_bytes(100)
.max_flush_interval(Duration::from_secs(u32::MAX as u64)),
);

sender.send_chunk(vec![()]).unwrap();
sender.send_chunk(vec![(), ()]).unwrap();
assert_eq!(sem.available_permits(), 0);

// Drop the sender — no `force_flush`/`flush_and_wait` call — and verify the
// background worker still receives the pending chunks because Drop notifies it.
drop(sender);
let _ = rt.block_on(sem.acquire_many(1)).unwrap().unwrap();

let exported = exported.lock().unwrap().clone();
assert_eq!(exported, vec![1, 2]);
rt.shutdown(None).unwrap();
}

/// Dropping a [`TraceBuffer`] (and shutting down the surrounding [`SharedRuntime`])
/// from inside a host tokio runtime must not panic. Uses the borrowed-mode
/// `SharedRuntime::from_handle` so the trigger + Condvar wait path is exercised
/// instead of sync `block_on` (which would panic "Cannot start a runtime from
/// within a runtime" on a host worker thread).
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_drop_inside_tokio_runtime() {
let rt = Arc::new(SharedRuntime::from_handle(tokio::runtime::Handle::current()));
let (sender, worker) = TraceBuffer::new(
TraceBufferConfig::default(),
Box::new(
|_r: Result<AgentResponse, crate::trace_exporter::error::TraceExporterError>| {},
),
Box::new(AssertExporter(
Box::new(|_chunks| {}),
Arc::new(tokio::sync::Semaphore::new(0)),
)),
);
let _ = rt.spawn_worker(worker, true).unwrap();

// Send a chunk so the Drop has something to flush — exercises the full
// notify path under a tokio scheduler.
sender.send_chunk(vec![()]).unwrap();
drop(sender);

// Borrowed mode: use trigger + Condvar wait, never block_on.
rt.trigger_shutdown_signal().unwrap();
rt.wait_shutdown_done(Duration::from_secs(5)).unwrap();
}
}
70 changes: 57 additions & 13 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,49 @@ impl TraceExporterBuilder {
self
}

/// Build the [`TraceExporter`] synchronously.
///
/// Sync facade over [`Self::build_async`]. It materializes the [`SharedRuntime`]
/// up-front (creating a fresh owned one if none was supplied) and drives the async
/// builder to completion via `block_on`. This is the only place in the builder that
/// blocks; all internal I/O — including telemetry start-up — happens inside
/// `build_async`.
///
/// Calling this from within an existing tokio context will panic. Async callers
/// should use [`Self::build_async`] directly.
#[allow(missing_docs)]
pub fn build<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>(
mut self,
) -> Result<TraceExporter<C>, TraceExporterError> {
// Materialize the SharedRuntime here so we have something to block_on with and
// so build_async reuses the same instance rather than creating a second one.
let shared_runtime = match &self.shared_runtime {
Some(rt) => rt.clone(),
None => {
let rt = Arc::new(SharedRuntime::new().map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?);
self.shared_runtime = Some(rt.clone());
rt
}
};
shared_runtime
.block_on(self.build_async::<C>())
.map_err(TraceExporterError::Io)?
}

/// Build the [`TraceExporter`] asynchronously.
///
/// This is the async-internal entry point: every operation that used to live behind
/// `SharedRuntime::block_on` (currently just telemetry start-up) is awaited directly.
/// It is safe to drive from any async context, but note that on native targets
/// `C::new_client()` may capture `tokio::runtime::Handle::current()` — the caller
/// must be inside a tokio runtime that should own the resulting HTTP client.
pub async fn build_async<
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
>(
self,
) -> Result<TraceExporter<C>, TraceExporterError> {
if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) {
Expand Down Expand Up @@ -322,14 +363,21 @@ impl TraceExporterBuilder {
// internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context
// so that handle is available. On wasm this is a no-op — the JS event loop is
// always the implicit executor.
#[cfg(not(target_arch = "wasm32"))]
let _guard = shared_runtime
.runtime_handle()
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
})?
.enter();
let capabilities = C::new_client();
//
// The `EnterGuard` returned by `Handle::enter()` is `!Send`, so we scope it to a
// block that drops it before any later `.await` to keep this future `Send`.
let capabilities = {
#[cfg(not(target_arch = "wasm32"))]
let _guard = shared_runtime
.runtime_handle()
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?
.enter();
C::new_client()
};

// --- Platform-specific worker setup ---
// The blocks below spawn background workers via `SharedRuntime`. On
Expand Down Expand Up @@ -400,11 +448,7 @@ impl TraceExporterBuilder {
e.to_string(),
))
})?;
shared_runtime.block_on(client_tel.start()).map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?;
client_tel.start().await;
(Some(client_tel), Some(handle))
}
Some(Err(e)) => return Err(e),
Expand Down
Loading
Loading