Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions docs/integrations/cave-integration-spike.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# CAVE Integration Spike (Non-invasive)

## Purpose
This spike defines a **CAVE-compatible data flow contract** for future export of
workflow artifacts produced by this app, without changing any current runtime
behavior or existing routes.

## Target use case in this app
Primary target: export app-generated workflow artifacts (for example synapse
annotation outputs, mesh/segment summaries, or inference-derived tabular
results) into CAVE-aligned table payloads for downstream analysis and shared
annotation ecosystems.

This spike intentionally does **not** perform any live CAVE writes.

## Expected inputs and outputs

### Input: workflow artifact
A workflow artifact is expected to include:
- `artifact_id` (string): unique identifier in this app
- `artifact_type` (string): category used for mapping rules
- `payload` (object): app-native structured data
- `metadata` (object, optional): project/run context and provenance

### Output: CAVE payload
A CAVE payload is expected to include:
- `table_name` (string): target CAVE table identifier
- `records` (array of objects): normalized row-level entries
- `provenance` (object): transformation metadata

## Mapping from workflow artifacts to CAVE concepts

| App workflow concept | CAVE concept | Notes |
| --- | --- | --- |
| `artifact_id` | provenance field (`source_artifact_id`) | retain source traceability |
| `artifact_type` | table routing key | determines target `table_name` |
| `payload.rows[*]` | `records[*]` | row-level normalization and field mapping |
| app run metadata | provenance fields | include model version, timestamp, operator |

### Proposed mapping flow
1. Receive a `WorkflowArtifact` from an explicit, non-default integration path.
2. Select table mapping logic by `artifact_type`.
3. Transform artifact payload rows into normalized CAVE records.
4. Attach provenance data from artifact and runtime context.
5. Return `CavePayload` for optional downstream publisher step.

## Non-invasive guardrails
- Adapter code lives in `server_api/workflow/cave_adapter.py` and is not wired
into existing startup, routes, or default workflow execution.
- Adapter methods are stubs and raise `NotImplementedError`.
- No network calls, auth dependencies, or deployment config are active.

## TODO assumptions to resolve before implementation
- **Auth TODO:** decide token model (service token vs user token), rotation, and
secret storage location.
- **Network TODO:** define endpoint discovery, TLS requirements, retry/backoff,
and timeout policy.
- **Deployment TODO:** define environment config contract, feature flags, and
rollback strategy for staged rollout.

## Actionable next steps
1. Finalize per-`artifact_type` mapping schemas and validation rules.
2. Add contract tests for each mapping variant with realistic fixture payloads.
3. Implement publisher client behind an explicit feature flag.
4. Add integration tests with mocked transport/auth layers.
5. Add observability for export attempts, latency, and error classes.
54 changes: 54 additions & 0 deletions server_api/workflow/cave_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""CAVE integration scaffold for future workflow interoperability.

This module is intentionally inert unless explicitly instantiated and called by a
future integration path. It introduces data contracts and stub interfaces only.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any


@dataclass(frozen=True)
class WorkflowArtifact:
"""Represents a workflow output produced by this app."""

artifact_id: str
artifact_type: str
payload: dict[str, Any]
metadata: dict[str, Any] = field(default_factory=dict)


@dataclass(frozen=True)
class CavePayload:
"""Represents a CAVE-compatible payload shape."""

table_name: str
records: list[dict[str, Any]]
provenance: dict[str, Any] = field(default_factory=dict)


class CaveAdapter:
"""Adapter stub for converting app workflow artifacts into CAVE payloads.

This class does not perform network calls, authentication, or runtime side
effects. It only defines an interface for future implementation.
"""

def to_cave_payload(self, artifact: WorkflowArtifact) -> CavePayload:
"""Convert a workflow artifact to a CAVE payload.

TODO: Implement artifact-type specific mappings.
TODO: Validate payload schema against target CAVE tables.
"""
raise NotImplementedError("Spike scaffold only; mapping not implemented.")

def publish(self, payload: CavePayload) -> dict[str, Any]:
"""Publish a payload to a CAVE endpoint.

TODO: Add authentication strategy assumptions (token/source/refresh flow).
TODO: Add network transport and retry policy assumptions.
TODO: Add deployment/runtime configuration assumptions.
"""
raise NotImplementedError("Spike scaffold only; publishing not implemented.")
49 changes: 49 additions & 0 deletions tests/test_cave_adapter_contract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from server_api.workflow.cave_adapter import CaveAdapter, CavePayload, WorkflowArtifact


def test_workflow_artifact_contract() -> None:
artifact = WorkflowArtifact(
artifact_id="artifact-1",
artifact_type="synapse_annotations",
payload={"rows": [{"id": 1}]},
metadata={"project": "demo"},
)

assert artifact.artifact_id == "artifact-1"
assert artifact.payload["rows"][0]["id"] == 1


def test_cave_payload_contract() -> None:
payload = CavePayload(
table_name="synapse_table",
records=[{"id": 1, "confidence": 0.92}],
provenance={"source": "spike"},
)

assert payload.table_name == "synapse_table"
assert payload.records[0]["confidence"] == 0.92


def test_adapter_stubs_are_explicitly_unimplemented() -> None:
adapter = CaveAdapter()
artifact = WorkflowArtifact(
artifact_id="artifact-2",
artifact_type="mesh_summary",
payload={"rows": []},
)

try:
adapter.to_cave_payload(artifact)
except NotImplementedError as exc:
assert "Spike scaffold only" in str(exc)
else:
raise AssertionError("to_cave_payload should remain unimplemented in spike")

payload = CavePayload(table_name="mesh_table", records=[])

try:
adapter.publish(payload)
except NotImplementedError as exc:
assert "Spike scaffold only" in str(exc)
else:
raise AssertionError("publish should remain unimplemented in spike")
Loading