Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
788 changes: 784 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"crates/apl-delegator-biscuit",
"crates/apl-pii-scanner",
"crates/apl-audit-logger",
"crates/apl-session-valkey",
"examples/go-demo/ffi",
]

Expand Down
1 change: 1 addition & 0 deletions crates/apl-cpex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ apl-cmf = { path = "../apl-cmf" }
cpex-core = { path = "../cpex-core" }
async-trait = { workspace = true }
chrono = { workspace = true }
thiserror = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true }
Expand Down
68 changes: 39 additions & 29 deletions crates/apl-cpex/src/cmf_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ use cpex_core::manager::PluginManager;
use apl_core::attributes::AttributeBag;
use apl_core::evaluator::Decision;
use apl_core::pipeline::{TaintEvent, TaintScope};
use apl_core::step::{
DispatchPhase, PluginError, PluginInvocation, PluginInvoker, PluginOutcome,
};
use apl_core::step::{DispatchPhase, PluginError, PluginInvocation, PluginInvoker, PluginOutcome};

use crate::dispatch_plan::RouteDispatchPlan;
use crate::session_store::SessionStore;
use crate::session_store::{SessionStore, SessionStoreError};

/// Bridges APL plugin dispatch to CMF-family CPEX hooks.
///
Expand Down Expand Up @@ -118,35 +116,39 @@ impl CmfPluginInvoker {
payload: MessagePayload,
plan: Arc<RouteDispatchPlan>,
session_store: Arc<dyn SessionStore>,
) -> Self {
) -> Result<Self, SessionStoreError> {
// Resolve session id via the 4-tier resolver (token claim →
// header → identity-derived → none). Snapshotted before
// hydration so the lookup is independent of the COW write
// that hydration performs.
let session_id: Option<String> = crate::session_resolver::resolve_session(&extensions)
.map(|(sid, _src)| sid);
let session_id: Option<String> =
crate::session_resolver::resolve_session(&extensions).map(|(sid, _src)| sid);

// Hydration: union the session's accumulated labels into the
// request's security labels. Skipped when there's no session_id
// OR no stored labels (avoid the COW clone for nothing).
// (anonymous/sessionless traffic has no state to load and is
// unaffected by a store outage). A load error propagates so the
// caller fails the request closed *before* any decision is made
// — a distributed store being unreachable must never silently
// present as "no accumulated labels".
if let Some(sid) = &session_id {
let stored = session_store.load_labels(sid).await;
let stored = session_store.load_labels(sid).await?;
if !stored.is_empty() {
extensions = hydrate_labels(extensions, &stored);
}
}

let initial_labels = snapshot_labels(&extensions);

Self {
Ok(Self {
manager,
extensions: Arc::new(Mutex::new(extensions)),
payload: Arc::new(Mutex::new(payload)),
plan,
session_id,
session_store,
initial_labels,
}
})
}

/// Snapshot the current payload. Call after route evaluation to
Expand Down Expand Up @@ -218,13 +220,22 @@ impl CmfPluginInvoker {

/// Persist session-scoped state added during this request. Diffs
/// current `security.labels` against the post-hydration snapshot
/// and appends new labels to the session store. No-op when there
/// was no session ID. Host calls this exactly once after route
/// evaluation completes.
pub async fn persist_session(&self) {
let Some(sid) = &self.session_id else { return };
/// and appends new labels to the session store. No-op (returns
/// `Ok`) when there was no session ID or no new labels. Host calls
/// this exactly once after route evaluation completes.
///
/// An append error is returned so the caller can fail the request
/// closed (R18). Because this runs after the policy decision is
/// computed, the route handler converts an append error into a Deny
/// outcome rather than dropping the accumulated taint silently.
pub async fn persist_session(&self) -> Result<(), SessionStoreError> {
let Some(sid) = &self.session_id else {
return Ok(());
};
let current = self.extensions.lock().await;
let Some(security) = current.security.as_ref() else { return };
let Some(security) = current.security.as_ref() else {
return Ok(());
};
let new_labels: Vec<String> = security
.labels
.iter()
Expand All @@ -233,8 +244,9 @@ impl CmfPluginInvoker {
.collect();
drop(current); // release the lock before the await
if !new_labels.is_empty() {
self.session_store.append_labels(sid, &new_labels).await;
self.session_store.append_labels(sid, &new_labels).await?;
}
Ok(())
}
}

Expand Down Expand Up @@ -305,7 +317,10 @@ impl PluginInvoker for CmfPluginInvoker {
Some(v) => (Some(v.reason), v.code),
None => (None, "policy.forbidden".to_string()),
};
Decision::Deny { reason, rule_source }
Decision::Deny {
reason,
rule_source,
}
} else {
Decision::Allow
};
Expand All @@ -319,11 +334,9 @@ impl PluginInvoker for CmfPluginInvoker {
Some(modified) => {
*self.payload.lock().await = modified.clone();
match invocation {
PluginInvocation::Field { .. } => {
Some(serde_json::Value::String(
modified.message.get_text_content(),
))
}
PluginInvocation::Field { .. } => Some(serde_json::Value::String(
modified.message.get_text_content(),
)),
PluginInvocation::Step { .. } => None,
}
}
Expand All @@ -347,10 +360,8 @@ impl PluginInvoker for CmfPluginInvoker {
// already validated label monotonicity on the way out.
let taints = if let Some(modified_ext) = result.modified_extensions {
let after_labels = snapshot_labels(&modified_ext);
let new_labels: Vec<String> = after_labels
.difference(&before_labels)
.cloned()
.collect();
let new_labels: Vec<String> =
after_labels.difference(&before_labels).cloned().collect();
*self.extensions.lock().await = modified_ext;
new_labels
.into_iter()
Expand Down Expand Up @@ -407,4 +418,3 @@ fn hydrate_labels(mut extensions: Extensions, labels: &[String]) -> Extensions {
extensions.security = Some(Arc::new(security));
extensions
}

2 changes: 1 addition & 1 deletion crates/apl-cpex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ pub use dispatch_plan::{DispatchCache, RouteDispatchPlan, RoutePluginEntry};
pub use pdp_router::PdpRouter;
pub use register::{register_apl, AplOptions};
pub use route_handler::{AplRouteHandler, Phase};
pub use session_store::{MemorySessionStore, SessionStore};
pub use session_store::{MemorySessionStore, SessionStore, SessionStoreError, SessionStoreFactory};
pub use visitor::AplConfigVisitor;
28 changes: 17 additions & 11 deletions crates/apl-cpex/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ use cpex_core::visitor::ConfigVisitor;
use apl_core::step::{PdpFactory, PdpResolver};

use crate::dispatch_plan::DispatchCache;
use crate::session_store::SessionStore;
use crate::session_store::{SessionStore, SessionStoreFactory};
use crate::visitor::AplConfigVisitor;


/// Configuration for [`register_apl`]. All runtime collaborators APL
/// needs to do its work are funneled through here so the call site
/// reads as a single block instead of a multi-step builder.
Expand Down Expand Up @@ -75,6 +74,14 @@ pub struct AplOptions {
/// `pdps`.
pub pdp_factories: Vec<Arc<dyn PdpFactory>>,

/// Session-store factories the visitor consults when it encounters a
/// `global.apl.session_store` block. Each factory advertises a
/// `kind()` string matching the block's `kind:` field — e.g.
/// `valkey`. An empty list keeps the constructor-supplied
/// `session_store` (the `MemorySessionStore` default) active, so
/// existing deployments are unaffected.
pub session_store_factories: Vec<Arc<dyn SessionStoreFactory>>,

/// Override the visitor's baseline capabilities for installed
/// `AplRouteHandler`s. `None` uses the visitor's default
/// (read-only across the common attribute namespaces); `Some(set)`
Expand All @@ -96,6 +103,7 @@ impl AplOptions {
session_store: Arc::new(crate::session_store::MemorySessionStore::new()),
pdps: Vec::new(),
pdp_factories: Vec::new(),
session_store_factories: Vec::new(),
base_capabilities: None,
}
}
Expand Down Expand Up @@ -142,15 +150,13 @@ impl AplOptions {
/// mgr.load_config_yaml(&yaml_string)?;
/// mgr.initialize().await?;
/// ```
pub fn register_apl(
mgr: &Arc<PluginManager>,
opts: AplOptions,
) -> Arc<AplConfigVisitor> {
pub fn register_apl(mgr: &Arc<PluginManager>, opts: AplOptions) -> Arc<AplConfigVisitor> {
let AplOptions {
dispatch_cache,
session_store,
pdps,
pdp_factories,
session_store_factories,
base_capabilities,
} = opts;

Expand All @@ -160,11 +166,7 @@ pub fn register_apl(
// handle to the manager. Code-supplied PDPs go through
// `register_pdp(&self, ...)` which uses interior mutability, so
// they're registered after the `Arc` wrap.
let mut visitor = AplConfigVisitor::new(
dispatch_cache,
session_store,
Arc::downgrade(mgr),
);
let mut visitor = AplConfigVisitor::new(dispatch_cache, session_store, Arc::downgrade(mgr));

if let Some(caps) = base_capabilities {
visitor = visitor.with_base_capabilities(caps);
Expand All @@ -174,6 +176,10 @@ pub fn register_apl(
visitor.register_pdp_factory(factory);
}

for factory in session_store_factories {
visitor.register_session_store_factory(factory);
}

let arc = Arc::new(visitor);

for pdp in pdps {
Expand Down
Loading