Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/cf_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ pub async fn build_map(
.get("deleted_at")
.and_then(|v| v.as_str())
.map(String::from),
status: t.get("status").and_then(|v| v.as_str()).map(String::from),
created_at: t
.get("created_at")
.and_then(|v| v.as_str())
.map(String::from),
};
tunnel_env.insert(cft.id.clone(), env.clone());
buckets.entry(env).or_default().tunnels.push(cft);
Expand Down
200 changes: 200 additions & 0 deletions src/cf_reconcile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//! Cross-environment Cloudflare reconcile plan (read-only / dry-run).
//!
//! Given the unified [`crate::cf_map::CfMap`] and the serving CP's view of
//! its own env, compute what a reconcile WOULD do — env-labelled — in
//! three buckets:
//! - **adopt**: live CF agent tunnels in the serving env that the CP
//! store is missing (fill-only recovery — rebuild the store from CF).
//! - **prune**: leaked resources — a dead/unclaimed orphan in the serving
//! env, OR everything belonging to an env with no live control plane
//! (e.g. a closed PR), OR the `(unattributed)` bucket.
//! - **refill**: hostnames the serving CP expects but CF has no CNAME for.
//!
//! This module only *plans*. Execution — with the in-flight-deploy + TTL +
//! zero-connection guards — is a separate, operator-gated step (next PR).

use std::collections::HashSet;
use std::time::SystemTime;

use serde::Serialize;

use crate::cf_map::CfMap;
use crate::cf_snapshot::CpState;

#[derive(Debug, Clone, Serialize)]
pub struct ReconcilePlan {
pub computed_at: String,
pub serving_env: String,
/// The CF map was partial (a list call failed); the plan is empty and
/// must not be applied.
pub degraded: bool,
pub adopt: Vec<PlanItem>,
pub prune: Vec<PlanItem>,
pub refill: Vec<PlanItem>,
/// Human-readable notes (e.g. live foreign envs intentionally skipped).
pub notes: Vec<String>,
}

#[derive(Debug, Clone, Serialize)]
pub struct PlanItem {
pub env: String,
pub kind: String, // "tunnel" | "dns"
pub id: String,
pub name: String,
pub reason: String,
}

fn item(env: &str, kind: &str, id: &str, name: &str, reason: &str) -> PlanItem {
PlanItem {
env: env.into(),
kind: kind.into(),
id: id.into(),
name: name.into(),
reason: reason.into(),
}
}

/// Compute the dry-run plan. Pure over the map + the serving CP state.
pub fn plan(map: &CfMap, cp: &CpState) -> ReconcilePlan {
let mut out = ReconcilePlan {
computed_at: chrono::DateTime::<chrono::Utc>::from(SystemTime::now()).to_rfc3339(),
serving_env: cp.env_label.clone(),
degraded: map.degraded,
adopt: vec![],
prune: vec![],
refill: vec![],
notes: vec![],
};
if map.degraded {
out.notes.push(
"CF map is degraded (a list call failed); plan is empty and reconcile must not run"
.into(),
);
return out;
}

// The tunnel ids the serving CP knows it owns.
let known_tunnel_ids: HashSet<&str> = cp
.agents
.iter()
.filter(|a| !a.tunnel_id.is_empty())
.map(|a| a.tunnel_id.as_str())
.collect();

// Pass 1 — tunnels. Decide adopt/prune/keep and record which tunnels
// are being pruned, so DNS can follow its tunnel (pass 2).
let mut prune_tunnel_ids: HashSet<String> = HashSet::new();
for inst in &map.installations {
let serving = inst.env == cp.env_label;
let unattributed = inst.kind == "unattributed";
let leaked_env = unattributed || !inst.has_live_cp;

for t in &inst.tunnels {
if t.deleted_at.is_some() {
continue; // already soft-deleted
}
if serving {
if t.name.contains("-cp-") || known_tunnel_ids.contains(t.id.as_str()) {
continue; // the CP's own tunnel, or a claimed agent → keep
}
if t.status.as_deref() == Some("healthy") {
out.adopt.push(item(
&inst.env,
"tunnel",
&t.id,
&t.name,
"live CF agent tunnel not in the CP store — adopt (fill-only)",
));
} else {
prune_tunnel_ids.insert(t.id.clone());
out.prune.push(item(
&inst.env,
"tunnel",
&t.id,
&t.name,
&format!(
"serving-env agent tunnel unclaimed by any CP agent and not healthy (status={}) — prune",
t.status.as_deref().unwrap_or("unknown")
),
));
}
} else if leaked_env {
prune_tunnel_ids.insert(t.id.clone());
let why = if unattributed {
"tunnel name has no parseable env — prune"
} else {
"tunnel for an env with no live control plane (torn-down install) — prune"
};
out.prune
.push(item(&inst.env, "tunnel", &t.id, &t.name, why));
}
// live foreign env → leave its tunnels alone (noted below)
}
if !serving && !leaked_env {
out.notes.push(format!(
"{}: live foreign env ({} tunnels, {} dns) left untouched — reconcile from its own CP",
inst.env,
inst.tunnels.len(),
inst.dns.len()
));
}
}

// Pass 2 — DNS, keyed purely on the tunnel it targets. We never guess
// by hostname: the CP creates more CNAMEs (agent-api, oracle, shell)
// than it records in its store, so a name-based "orphan" check would
// falsely prune live records. A CNAME is prunable only if its target
// tunnel is gone (unattributed bucket) or is itself being pruned.
for inst in &map.installations {
let unattributed = inst.kind == "unattributed";
for d in &inst.dns {
let targets_pruned_tunnel = d
.tunnel_id_ref
.as_deref()
.map(|t| prune_tunnel_ids.contains(t))
.unwrap_or(false);
if unattributed {
out.prune.push(item(
&inst.env,
"dns",
&d.id,
&d.name,
"CNAME targets a tunnel that no longer exists — prune",
));
} else if targets_pruned_tunnel {
out.prune.push(item(
&inst.env,
"dns",
&d.id,
&d.name,
"CNAME targets a tunnel being pruned — prune",
));
}
}
}

// Refill — only the reliably-known primary agent hostname. Extras
// (agent-api / oracle / shell) aren't tracked in the store, so we never
// synthesize them; a missing primary CNAME means the agent is
// unreachable and is safe to flag.
if let Some(serving_inst) = map.installations.iter().find(|i| i.env == cp.env_label) {
let cf_dns_names: HashSet<&str> =
serving_inst.dns.iter().map(|d| d.name.as_str()).collect();
for a in &cp.agents {
if !a.tunnel_id.is_empty()
&& a.hostname != cp.control_plane_hostname
&& !cf_dns_names.contains(a.hostname.as_str())
{
out.refill.push(item(
&cp.env_label,
"dns",
"",
&a.hostname,
"CP knows this agent but its primary CNAME is missing in CF — refill",
));
}
}
}

out
}
15 changes: 14 additions & 1 deletion src/cf_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub struct CfTunnel {
pub id: String,
pub name: String,
pub deleted_at: Option<String>,
/// Cloudflare tunnel status: `healthy` (≥1 live connection), else
/// `degraded` / `down` / `inactive`. Used to tell a live agent (adopt)
/// from a dead/leaked tunnel (prune).
pub status: Option<String>,
/// RFC3339 creation time, for the prune age (TTL) guard.
pub created_at: Option<String>,
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -161,7 +167,7 @@ pub async fn snapshot(
}
}

async fn build_cp_state(
pub(crate) async fn build_cp_state(
env_label: &str,
cp_hostname: &str,
store: &Arc<tokio::sync::Mutex<HashMap<String, collector::Agent>>>,
Expand Down Expand Up @@ -243,6 +249,11 @@ async fn build_cf_state(
.get("deleted_at")
.and_then(|v| v.as_str())
.map(String::from),
status: t.get("status").and_then(|v| v.as_str()).map(String::from),
created_at: t
.get("created_at")
.and_then(|v| v.as_str())
.map(String::from),
})
})
.collect();
Expand Down Expand Up @@ -568,6 +579,8 @@ mod tests {
id: id.into(),
name: name.into(),
deleted_at: None,
status: Some("healthy".into()),
created_at: None,
}
}

Expand Down
36 changes: 36 additions & 0 deletions src/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ pub async fn run() -> Result<()> {
.route("/api/fleet", get(fleet_fragment))
.route("/admin/cf/snapshot", get(cf_snapshot_handler))
.route("/admin/cf/map", get(cf_map_handler))
.route("/admin/cf/reconcile", post(cf_reconcile_handler))
.route("/api/v1/admin/export", get(export_state))
.route("/admin/enroll", get(enroll_page))
.with_state(state);
Expand Down Expand Up @@ -1332,6 +1333,41 @@ async fn cf_map_handler(
Ok(Json(map))
}

#[derive(Debug, Deserialize)]
struct ReconcileParams {
#[serde(default)]
apply: bool,
}

/// POST /admin/cf/reconcile — cross-env reconcile. Returns the dry-run
/// plan (adopt / prune / refill, each env-labelled) computed over the
/// unified CF map + the serving CP's store. This build is **dry-run only**:
/// `?apply=true` is acknowledged but performs NO mutations — the guarded,
/// operator-gated apply path lands in a follow-up. Same auth as the other
/// `/admin/cf/*` surfaces.
async fn cf_reconcile_handler(
State(s): State<St>,
axum::extract::ConnectInfo(peer): axum::extract::ConnectInfo<std::net::SocketAddr>,
axum::extract::Query(params): axum::extract::Query<ReconcileParams>,
headers: axum::http::HeaderMap,
) -> Result<Json<serde_json::Value>> {
if !agents_auth_ok(&s, peer, &headers).await {
return Err(Error::Unauthorized);
}
let http = cf::http_client();
let map = crate::cf_map::build_map(&http, &s.cfg.cf, s.cfg.common.env.label(), &s.store).await;
let cp =
crate::cf_snapshot::build_cp_state(s.cfg.common.env.label(), &s.cfg.hostname, &s.store)
.await;
let plan = crate::cf_reconcile::plan(&map, &cp);
Ok(Json(serde_json::json!({
"dry_run": true,
"applied": false,
"apply_requested": params.apply,
"plan": plan,
})))
}

/// Accept the request if the caller is on the loopback interface
/// (same-VM trust — any CP-VM workload / dd-agent-proxy) or presents a valid
/// bearer that verifies as either a GitHub Actions OIDC token for
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod agent;
pub mod auth;
pub mod cf;
pub mod cf_map;
pub mod cf_reconcile;
pub mod cf_snapshot;
pub mod collector;
pub mod config;
Expand Down
Loading