Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions server_api/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Workflow proposal helpers."""

from .proposals import PROPOSAL_TYPE_PREVIEW_CORRECTION_IMPACT, build_preview_correction_impact_proposal

__all__ = [
"PROPOSAL_TYPE_PREVIEW_CORRECTION_IMPACT",
"build_preview_correction_impact_proposal",
]
94 changes: 94 additions & 0 deletions server_api/workflow/proposals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Rule-based workflow proposals for proofreading and retraining decisions."""

from __future__ import annotations

from collections import Counter
from typing import Iterable, Mapping, Sequence

PROPOSAL_TYPE_PREVIEW_CORRECTION_IMPACT = "preview_correction_impact"

# Explicit thresholds for transparent recommendation logic.
LOW_CORRECTION_THRESHOLD = 5
HIGH_CORRECTION_THRESHOLD = 20
RECENT_EXPORT_WINDOW = 3

CORRECTION_EVENT_TYPES = {
"correction_saved",
"mask_edited",
"instance_fixed",
"proofread_correction",
}
EXPORT_EVENT_TYPES = {"mask_exported", "export_masks", "export_completed"}


def _event_type(event: Mapping[str, object]) -> str:
raw = event.get("type", "")
return raw if isinstance(raw, str) else ""


def _recent_export_events(events: Sequence[Mapping[str, object]]) -> list[Mapping[str, object]]:
exports = [event for event in events if _event_type(event) in EXPORT_EVENT_TYPES]
return exports[-RECENT_EXPORT_WINDOW:]


def build_preview_correction_impact_proposal(
recent_events: Iterable[Mapping[str, object]],
) -> dict[str, object]:
"""Build a no-side-effect proposal that previews correction impact on retraining.

Args:
recent_events: Event dictionaries with a string ``type`` key.

Returns:
Proposal payload containing summary, recommendation, and rationale.
"""
event_list = list(recent_events)
event_type_counts = Counter(_event_type(event) for event in event_list)

correction_count = sum(
event_type_counts[event_type] for event_type in CORRECTION_EVENT_TYPES
)
recent_exports = _recent_export_events(event_list)

if correction_count <= LOW_CORRECTION_THRESHOLD:
recommendation = "proceed"
rationale = (
f"Detected {correction_count} correction events (<= {LOW_CORRECTION_THRESHOLD}); "
"current proofreading changes are limited, so retraining can proceed."
)
elif correction_count >= HIGH_CORRECTION_THRESHOLD:
recommendation = "continue_proofreading"
rationale = (
f"Detected {correction_count} correction events (>= {HIGH_CORRECTION_THRESHOLD}); "
"large correction volume suggests continuing proofreading before retraining."
)
else:
recommendation = "continue_proofreading"
rationale = (
f"Detected {correction_count} correction events between explicit thresholds "
f"({LOW_CORRECTION_THRESHOLD} and {HIGH_CORRECTION_THRESHOLD}); "
"collect more proofreading updates before retraining."
)

export_summary = [
{
"type": _event_type(event),
"timestamp": event.get("timestamp"),
"target": event.get("target"),
}
for event in recent_exports
]

return {
"type": PROPOSAL_TYPE_PREVIEW_CORRECTION_IMPACT,
"summary": {
"correction_event_counts": {
event_type: event_type_counts[event_type]
for event_type in sorted(CORRECTION_EVENT_TYPES)
},
"total_correction_events": correction_count,
"recent_exports": export_summary,
},
"recommendation": recommendation,
"rationale": rationale,
}
57 changes: 57 additions & 0 deletions tests/test_workflow_correction_preview.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import unittest

from server_api.workflow.proposals import (
HIGH_CORRECTION_THRESHOLD,
LOW_CORRECTION_THRESHOLD,
PROPOSAL_TYPE_PREVIEW_CORRECTION_IMPACT,
build_preview_correction_impact_proposal,
)


class CorrectionImpactPreviewTests(unittest.TestCase):
def test_low_correction_scenario_recommends_proceed(self):
events = [
{"type": "correction_saved", "timestamp": "2026-04-13T12:00:00Z"},
{"type": "mask_edited", "timestamp": "2026-04-13T12:05:00Z"},
{
"type": "export_masks",
"timestamp": "2026-04-13T12:10:00Z",
"target": "session_a",
},
]

proposal = build_preview_correction_impact_proposal(events)

self.assertEqual(proposal["type"], PROPOSAL_TYPE_PREVIEW_CORRECTION_IMPACT)
self.assertEqual(proposal["recommendation"], "proceed")
self.assertIn(str(LOW_CORRECTION_THRESHOLD), proposal["rationale"])
self.assertEqual(proposal["summary"]["total_correction_events"], 2)
self.assertEqual(len(proposal["summary"]["recent_exports"]), 1)

def test_high_correction_scenario_recommends_continue_proofreading(self):
events = [
{"type": "correction_saved", "timestamp": f"2026-04-13T12:{index:02d}:00Z"}
for index in range(HIGH_CORRECTION_THRESHOLD + 2)
] + [
{
"type": "export_completed",
"timestamp": "2026-04-13T13:00:00Z",
"target": "session_b",
}
]

original_events_snapshot = [dict(event) for event in events]
proposal = build_preview_correction_impact_proposal(events)

self.assertEqual(proposal["type"], PROPOSAL_TYPE_PREVIEW_CORRECTION_IMPACT)
self.assertEqual(proposal["recommendation"], "continue_proofreading")
self.assertIn(str(HIGH_CORRECTION_THRESHOLD), proposal["rationale"])
self.assertEqual(
proposal["summary"]["total_correction_events"],
HIGH_CORRECTION_THRESHOLD + 2,
)
self.assertEqual(events, original_events_snapshot)


if __name__ == "__main__":
unittest.main()
Loading