fix(scheduler): preserve recurring schedule progress#289
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughImplements end-to-end RRULE recurring schedule lifecycle: adds three new protobuf event messages and two ChangesRRULE Schedule Occurrence Lifecycle
Sequence DiagramsequenceDiagram
participant EventStream
participant ScheduleProcessor
participant Reconciler
participant ExecutionScheduleWriter
participant RRuleWakeupProcessor
participant RecordScheduleOccurrence
participant UserSubject
EventStream->>ScheduleProcessor: ScheduleCreated (RRule)
ScheduleProcessor->>Reconciler: reconcile(Created)
Reconciler-->>ScheduleProcessor: ArmNext
ScheduleProcessor->>ScheduleNextOccurrence: execute via event_store
ScheduleNextOccurrence-->>EventStream: ScheduleOccurrenceScheduled appended
EventStream->>ScheduleProcessor: ScheduleOccurrenceScheduled
ScheduleProcessor->>Reconciler: reconcile(OccurrenceScheduled)
Reconciler-->>ScheduleProcessor: Publish(RRuleWakeupRequest)
ScheduleProcessor->>ExecutionScheduleWriter: upsert RRULE wakeup `@at` message
Note over RRuleWakeupProcessor: NATS JetStream delivers wakeup
RRuleWakeupProcessor->>RecordScheduleOccurrence: execute via event_store
RecordScheduleOccurrence-->>EventStream: ScheduleOccurrenceRecorded + ScheduleOccurrenceScheduled appended
EventStream->>ScheduleProcessor: ScheduleOccurrenceRecorded
ScheduleProcessor->>Reconciler: reconcile(OccurrenceRecorded)
Reconciler-->>ScheduleProcessor: Dispatch(DispatchRequest)
ScheduleProcessor->>ExecutionScheduleWriter: dispatch user message
ExecutionScheduleWriter->>UserSubject: publish with Trogon-Schedule-Occurrence-* headers
EventStream->>ScheduleProcessor: ScheduleCompleted (final occurrence)
ScheduleProcessor->>Reconciler: reconcile(Completed)
Reconciler-->>ScheduleProcessor: Purge + Expired checkpoint
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
PR SummaryHigh Risk Overview Schedule state gains New commands The execution processor arms RRULE on create/resume ( Reviewed by Cursor Bugbot for commit 1c1248b. Bugbot is set up for automated code reviews on this repo. Configure here. |
Code Coverage SummaryDetailsDiff against mainResults for commit: 1c1248b Minimum allowed coverage is ♻️ This comment has been updated with latest results |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rs (1)
97-113: ⚡ Quick winAdd an explicit equality-boundary test for
RRuleCursor::AtOrAfter.The suite validates “before” and strict “after”, but not the exact-match boundary (
cursor == occurrence). Adding that case protects the core recurrence continuation contract from regressions.✅ Suggested test addition
#[test] fn strict_after_cursor_does_not_repeat_the_fired_occurrence() { @@ assert_eq!(next, Some(instant("2026-01-02T00:00:00Z"))); } + +#[test] +fn at_or_after_cursor_includes_an_occurrence_at_the_exact_cursor() { + let schedule = Schedule::rrule("2026-01-01T00:00:00Z", "FREQ=DAILY;COUNT=2", None).unwrap(); + + let next = next_rrule_occurrence(&schedule, RRuleCursor::at_or_after(instant("2026-01-01T00:00:00Z"))).unwrap(); + + assert_eq!(next, Some(instant("2026-01-01T00:00:00Z"))); +}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rs` around lines 97 - 113, Add a new test function to validate the exact-match boundary case for RRuleCursor::at_or_after. The test should create an rrule schedule and call next_rrule_occurrence with RRuleCursor::at_or_after pointing to a time that exactly matches one of the recurrence occurrences, then assert that the function returns that exact occurrence (not skipping to the next one). This ensures the "at or after" semantics are properly tested alongside the existing "before" and strict "after" test cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs`:
- Around line 280-281: The metrics recording calls (record_publish and
record_purge) are being executed before checkpoint persistence, which causes
double-counting if the checkpoint save fails and the continuation is retried.
Move the self.metrics.record_publish() call at line 280-281 to execute after the
checkpoints.save() call completes successfully. Apply the same fix to the
sibling locations at lines 287-288 (move record_purge after checkpoints.save)
and lines 292-299 (move the metrics recording after checkpoints.save). Ensure
metrics are only recorded once the checkpoint has been successfully persisted.
---
Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rs`:
- Around line 97-113: Add a new test function to validate the exact-match
boundary case for RRuleCursor::at_or_after. The test should create an rrule
schedule and call next_rrule_occurrence with RRuleCursor::at_or_after pointing
to a time that exactly matches one of the recurrence occurrences, then assert
that the function returns that exact occurrence (not skipping to the next one).
This ensures the "at or after" semantics are properly tested alongside the
existing "before" and strict "after" test cases.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 507b9e2b-fe44-4c74-b1d4-0dd3f2643302
📒 Files selected for processing (8)
rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/mod.rs
4783111 to
5ccf0da
Compare
| backoff: Vec::new(), | ||
| ..Default::default() | ||
| } | ||
| } |
There was a problem hiding this comment.
RRULE wakeups lack runtime consumer
High Severity
The PR schedules RRULE continuations via internal wakeup subjects and adds RRuleWakeupProcessor plus rrule_wakeup_consumer_config, but nothing in the scheduler worker starts a durable consumer or calls process outside ignored NATS integration tests. After the first NATS @at fires, ScheduleOccurrenceFired events are never appended, so the execution processor cannot dispatch or schedule the next occurrence.
Reviewed by Cursor Bugbot for commit 5ccf0da. Configure here.
| RRuleCursor::After(_) => candidates.into_iter().find(|date| date.with_timezone(&Utc) > instant), | ||
| }; | ||
|
|
||
| Ok(occurrence.map(|date| date.with_timezone(&Utc))) |
There was a problem hiding this comment.
AtOrAfter skips exact instant
Medium Severity
RRuleCursor::AtOrAfter takes the first date from RRuleSet::after(cursor), which is typically strict-after the cursor. When reconciliation uses now - PAST_AT_GRACE and that value equals a recurrence instant, expansion can return no next occurrence and mark the schedule expired while that occurrence was never scheduled or dispatched.
Reviewed by Cursor Bugbot for commit 5ccf0da. Configure here.
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (4)
rsworkspace/crates/trogonai-proto/src/scheduler/schedules/codec.rs (1)
58-82: ⚡ Quick winAdd lifecycle codec assertions for
ScheduleOccurrenceFired.The new branch is wired in encode/decode/type dispatch, but the lifecycle tests still don’t explicitly cover this case. Add one assertion per lifecycle test path so this event can’t regress silently.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogonai-proto/src/scheduler/schedules/codec.rs` around lines 58 - 82, The ScheduleOccurrenceFired variant has been wired into the encode/decode/type dispatch functions (encode_schedule_event_case, decode_schedule_event_case, and schedule_event_case_type), but the lifecycle codec tests do not explicitly cover this new case. Add one assertion per lifecycle test path to verify the ScheduleOccurrenceFired variant encodes, decodes, and dispatches correctly so that future changes don't cause silent regressions. Include assertions that test the encode/decode round-trip and type dispatch for this variant alongside the existing test cases for other ScheduleEventCase variants.rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs (1)
114-143: ⚡ Quick winAdd direct unit coverage for the new wakeup/dispatch builders.
Lines 114 and 213 introduce new construction paths, but the test module currently exercises only
ScheduleRequest::buildpaths. Please add explicit tests forbuild_rrule_wakeup(target/TTL/correlation headers + payload decode) andDispatchRequest::build(UnsupportedDispatchSource, internal-target rejection, user-header passthrough).Also applies to: 213-265, 317-616
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs` around lines 114 - 143, Add explicit unit tests for the new builder methods introduced in this code. Create tests for the `build_rrule_wakeup` method that verify correct setting of target headers, TTL header formatting, correlation headers (TROGON_SCHEDULE_KEY_HEADER and TROGON_SCHEDULE_ID_B64_HEADER), and correct payload encoding/decoding. Additionally, create tests for the `DispatchRequest::build` method (at line 213) that verify handling of UnsupportedDispatchSource errors, rejection of internal-target scenarios, and correct passthrough of user-supplied headers. These new test cases should complement the existing test coverage which currently only exercises the basic `ScheduleRequest::build` path.rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs (1)
230-240: ⚡ Quick winAdd focused tests for the new occurrence-fired decode branch.
This branch is the only path from persisted
ScheduleOccurrenceFiredevents into reconciliation. Please add one success test and one missing-fired_atrejection test so regressions here do not silently stop RRULE continuation.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs` around lines 230 - 240, Add two focused unit tests for the ScheduleOccurrenceFired branch in the reconciliation event decoding logic. First, create a success test that verifies a valid ScheduleOccurrenceFired event with all required fields (schedule_id and fired_at) correctly decodes into a ScheduleChange::OccurrenceFired variant with the expected schedule_id and fired_at values. Second, create a rejection test that verifies when the fired_at field is missing or None, the decoding fails with the ScheduleEventDecodeError::MissingField error for "fired_at". These tests ensure regressions in this critical event processing path are caught before they silently break RRULE continuation.rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs (1)
174-191: Route test NATS interactions throughtrogon-natstrait abstractions instead of binding directly to concreteasync_natstypes.The
fetch_wakeup()helper (lines 174-191) and related test setup functions likeexecution_stream()directly depend on concreteasync_nats::jetstreamtypes. Per repository policy, NATS test code should usetrogon-natsabstractions for testability and swappability. Replace the concrete consumer parameter withtrogon_nats::jetstream::JetStreamConsumertrait bounds and use the mock implementations available introgon_natsfor testing.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs` around lines 174 - 191, Refactor the fetch_wakeup() helper function to accept a generic parameter bound by trogon_nats::jetstream::JetStreamConsumer trait instead of the concrete async_nats::jetstream::consumer::Consumer type. Update the function signature to use trait bounds that allow for mock implementations from trogon_nats, making the test code compatible with the repository's abstraction policy. Additionally, review and update related test setup functions like execution_stream() to similarly use trogon_nats trait abstractions instead of concrete async_nats types, ensuring all NATS interactions in the test file go through trogon_nats abstractions for consistency.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs`:
- Around line 9-12: The RecordScheduleOccurrence struct and other
occurrence-related types expose raw DateTime<Utc> primitives, which bypasses
invariant enforcement and weakens domain guarantees. Create a domain-specific
value object for the occurrence timestamp (similar to how AcpPrefix is used
instead of String), then update all affected locations: replace the
DateTime<Utc> field in RecordScheduleOccurrence (lines 9-12), update the
OccurrenceAlreadyRecorded error type (lines 23-27), and update any other
occurrence timestamp field (lines 40-41) to use this new value object instead of
the raw primitive. Ensure the value object's factory method guarantees
correctness at construction so invalid instances are unrepresentable.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`:
- Around line 277-284: The RRULE grace-boundary handling is inconsistent with
the one-shot schedule (At path) boundary handling. The `decide_rrule` function
call uses `RRuleCursor::at_or_after(now - PAST_AT_GRACE)` which includes
occurrences at the boundary, while the At path on line 288 excludes that same
boundary with `<=`. Either change the RRULE cursor to use an exclusive boundary
(after instead of at_or_after) to match the At path behavior, or update both
code paths and their tests to use the same inclusive boundary rule consistently.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule_wakeup_payload.rs`:
- Around line 5-8: The RRuleWakeupPayload struct currently uses primitive String
fields that can be invalid at construction time, with validation deferred to
Line 22. Split the representation into wire and domain types following the
coding guidelines. At
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule_wakeup_payload.rs
lines 5-8, replace the primitive String fields with domain-specific value
objects. At lines 30-31 where deserialization occurs, deserialize into a
temporary wire type (with primitive strings) instead of directly into
RRuleWakeupPayload. At lines 22-24 where validation currently happens, implement
a conversion function (e.g., impl From or a try_into method) that validates
schedule_id and fired_at exactly once when converting from wire to domain type,
ensuring RRuleWakeupPayload cannot be constructed with invalid values.
In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rs`:
- Around line 47-48: The `SubjectMismatch` error struct at lines 47-48 stores
untrusted subject input as raw strings, violating the coding guideline that
untrusted input must use distinct boundary types. Create a new
`RRuleWakeupSubjectInput` boundary type to wrap and validate the subject string
input, then update the `SubjectMismatch` error definition to use typed fields
from this boundary type instead of raw String fields. Apply the same boundary
type wrapper when creating `SubjectMismatch` errors at all sites where subject
validation occurs (lines 89-96 and 128-137), ensuring the subject is wrapped
before comparison rather than carrying raw strings in the error.
- Around line 90-98: The `RecordScheduleOccurrence::decide` method validates
ordering but does not confirm that the `fired_at` timestamp is a valid RRULE
occurrence for the schedule. Add validation in
`RecordScheduleOccurrence::decide` to call `next_rrule_occurrence` (or
equivalent method) with the schedule's current RRULE and exception dates to
verify that `fired_at` matches a valid occurrence. If `fired_at` does not
correspond to a valid occurrence, reject the command with an appropriate error.
This ensures that if the schedule's RRULE or exception dates change between
wakeup publication and processing, invalid timestamps are not accepted and valid
occurrences are not skipped.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs`:
- Around line 505-506: The match arm handling ReconcileAction::Multiple(_) at
line 505 currently returns Ok(()) without processing the nested actions
contained within the Multiple variant, which causes those actions to be skipped
while allowing checkpoints to be persisted anyway. Extract the nested actions
from the Multiple variant and process each one appropriately (either recursively
through the same processing logic or by iterating through and handling them
sequentially) to ensure all work is actually executed before any checkpoint is
acknowledged. Do not simply return Ok(()) for the Multiple case.
---
Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`:
- Around line 174-191: Refactor the fetch_wakeup() helper function to accept a
generic parameter bound by trogon_nats::jetstream::JetStreamConsumer trait
instead of the concrete async_nats::jetstream::consumer::Consumer type. Update
the function signature to use trait bounds that allow for mock implementations
from trogon_nats, making the test code compatible with the repository's
abstraction policy. Additionally, review and update related test setup functions
like execution_stream() to similarly use trogon_nats trait abstractions instead
of concrete async_nats types, ensuring all NATS interactions in the test file go
through trogon_nats abstractions for consistency.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs`:
- Around line 230-240: Add two focused unit tests for the
ScheduleOccurrenceFired branch in the reconciliation event decoding logic.
First, create a success test that verifies a valid ScheduleOccurrenceFired event
with all required fields (schedule_id and fired_at) correctly decodes into a
ScheduleChange::OccurrenceFired variant with the expected schedule_id and
fired_at values. Second, create a rejection test that verifies when the fired_at
field is missing or None, the decoding fails with the
ScheduleEventDecodeError::MissingField error for "fired_at". These tests ensure
regressions in this critical event processing path are caught before they
silently break RRULE continuation.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs`:
- Around line 114-143: Add explicit unit tests for the new builder methods
introduced in this code. Create tests for the `build_rrule_wakeup` method that
verify correct setting of target headers, TTL header formatting, correlation
headers (TROGON_SCHEDULE_KEY_HEADER and TROGON_SCHEDULE_ID_B64_HEADER), and
correct payload encoding/decoding. Additionally, create tests for the
`DispatchRequest::build` method (at line 213) that verify handling of
UnsupportedDispatchSource errors, rejection of internal-target scenarios, and
correct passthrough of user-supplied headers. These new test cases should
complement the existing test coverage which currently only exercises the basic
`ScheduleRequest::build` path.
In `@rsworkspace/crates/trogonai-proto/src/scheduler/schedules/codec.rs`:
- Around line 58-82: The ScheduleOccurrenceFired variant has been wired into the
encode/decode/type dispatch functions (encode_schedule_event_case,
decode_schedule_event_case, and schedule_event_case_type), but the lifecycle
codec tests do not explicitly cover this new case. Add one assertion per
lifecycle test path to verify the ScheduleOccurrenceFired variant encodes,
decodes, and dispatches correctly so that future changes don't cause silent
regressions. Include assertions that test the encode/decode round-trip and type
dispatch for this variant alongside the existing test cases for other
ScheduleEventCase variants.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 30a0afaf-a167-4781-9db9-6c091c149106
⛔ Files ignored due to path filters (9)
rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.state.v1.state.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.state.v1.state.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__oneof.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__view_oneof.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.mod.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_fired.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_fired.rsis excluded by!**/gen/**
📒 Files selected for processing (25)
proto/trogonai/scheduler/schedules/state/v1/state.protoproto/trogonai/scheduler/schedules/v1/events.protoproto/trogonai/scheduler/schedules/v1/schedule_occurrence_fired.protorsworkspace/crates/trogon-scheduler/src/commands/create_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/mod.rsrsworkspace/crates/trogon-scheduler/src/commands/pause_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rsrsworkspace/crates/trogon-scheduler/src/commands/remove_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/resume_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/state.rsrsworkspace/crates/trogon-scheduler/src/lib.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule_wakeup_payload.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/mod.rsrsworkspace/crates/trogonai-proto/src/scheduler/schedules/codec.rs
✅ Files skipped from review due to trivial changes (7)
- proto/trogonai/scheduler/schedules/v1/events.proto
- proto/trogonai/scheduler/schedules/v1/schedule_occurrence_fired.proto
- rsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rs
- rsworkspace/crates/trogon-scheduler/src/lib.rs
- rsworkspace/crates/trogon-scheduler/src/processor/mod.rs
- rsworkspace/crates/trogon-scheduler/src/commands/remove_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/commands/pause_schedule.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rs
| pub struct RecordScheduleOccurrence { | ||
| pub id: ScheduleId, | ||
| pub fired_at: DateTime<Utc>, | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift
Use a domain value object instead of raw DateTime<Utc> in command/error surfaces.
RecordScheduleOccurrence and OccurrenceAlreadyRecorded currently expose primitive timestamps directly. This bypasses invariant enforcement at construction and weakens domain guarantees for occurrence semantics.
As per coding guidelines, “Prefer domain-specific value objects over primitives (e.g., AcpPrefix instead of String). Each type's factory must guarantee correctness at construction—invalid instances should be unrepresentable.”
Also applies to: 23-27, 40-41
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs`
around lines 9 - 12, The RecordScheduleOccurrence struct and other
occurrence-related types expose raw DateTime<Utc> primitives, which bypasses
invariant enforcement and weakens domain guarantees. Create a domain-specific
value object for the occurrence timestamp (similar to how AcpPrefix is used
instead of String), then update all affected locations: replace the
DateTime<Utc> field in RecordScheduleOccurrence (lines 9-12), update the
OccurrenceAlreadyRecorded error type (lines 23-27), and update any other
occurrence timestamp field (lines 40-41) to use this new value object instead of
the raw primitive. Ensure the value object's factory method guarantees
correctness at construction so invalid instances are unrepresentable.
Source: Coding guidelines
| pub(crate) struct RRuleWakeupPayload { | ||
| schedule_id: String, | ||
| fired_at: String, | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift
Make wakeup payload validity explicit at construction time.
Line 30 currently deserializes into primitive String fields (Lines 6-7), so malformed schedule_id/fired_at can exist inside RRuleWakeupPayload and only fail later at Line 22. Please split wire and domain representations so conversion validates once and the domain payload cannot be invalid after construction.
As per coding guidelines, “Prefer domain-specific value objects over primitives … invalid instances should be unrepresentable” and “Untrusted input must use distinct *Input/*Wire/*Request types. Convert those boundary types into domain types exactly once.”
Also applies to: 22-24, 30-31
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule_wakeup_payload.rs`
around lines 5 - 8, The RRuleWakeupPayload struct currently uses primitive
String fields that can be invalid at construction time, with validation deferred
to Line 22. Split the representation into wire and domain types following the
coding guidelines. At
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule_wakeup_payload.rs
lines 5-8, replace the primitive String fields with domain-specific value
objects. At lines 30-31 where deserialization occurs, deserialize into a
temporary wire type (with primitive strings) instead of directly into
RRuleWakeupPayload. At lines 22-24 where validation currently happens, implement
a conversion function (e.g., impl From or a try_into method) that validates
schedule_id and fired_at exactly once when converting from wire to domain type,
ensuring RRuleWakeupPayload cannot be constructed with invalid values.
Source: Coding guidelines
| #[error("RRULE wakeup subject '{actual}' does not match expected subject '{expected}'")] | ||
| SubjectMismatch { expected: String, actual: String }, |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Wrap the inbound NATS subject in a boundary type before validation.
subject: &str is untrusted transport input, and SubjectMismatch persists the raw values as String. Introduce a small RRuleWakeupSubjectInput/Wire type and carry typed fields in the error instead of raw strings. As per coding guidelines, "Untrusted input must use distinct *Input / *Wire / *Request types" and "Errors must be typed—use structs or enums, never String or format!()."
Also applies to: 89-96, 128-137
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rs` around
lines 47 - 48, The `SubjectMismatch` error struct at lines 47-48 stores
untrusted subject input as raw strings, violating the coding guideline that
untrusted input must use distinct boundary types. Create a new
`RRuleWakeupSubjectInput` boundary type to wrap and validate the subject string
input, then update the `SubjectMismatch` error definition to use typed fields
from this boundary type instead of raw String fields. Apply the same boundary
type wrapper when creating `SubjectMismatch` errors at all sites where subject
validation occurs (lines 89-96 and 128-137), ensuring the subject is wrapped
before comparison rather than carrying raw strings in the error.
Source: Coding guidelines
| let wakeup = RRuleWakeupPayload::decode(payload).map_err(|source| RRuleWakeupError::PayloadJson { source })?; | ||
| let schedule_id = | ||
| ScheduleId::parse(wakeup.schedule_id()).map_err(|source| RRuleWakeupError::ScheduleId { source })?; | ||
| let fired_at = wakeup | ||
| .fired_at() | ||
| .map_err(|source| RRuleWakeupError::FiredAt { source })?; | ||
| ensure_subject_matches_payload(subject, &schedule_id)?; | ||
|
|
||
| let command = RecordScheduleOccurrence::new(schedule_id, fired_at); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Inspect whether RecordScheduleOccurrence validates fired_at against the RRULE schedule,
# not only duplicate/stale ordering.
# Expected: The command path should prove fired_at is a valid/expected occurrence for the current schedule
# before appending ScheduleOccurrenceFired.
set -euo pipefail
rg -n -C 6 \
'RecordScheduleOccurrence|RecordScheduleOccurrenceError|OccurrenceAlreadyRecorded|last_occurrence_fired_at|next_rrule_occurrence|RRuleCursor|Schedule::RRule' \
rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs \
rsworkspace/crates/trogon-scheduler/src/commands/state.rs \
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rsRepository: TrogonStack/trogonai
Length of output: 43270
🏁 Script executed:
cat -n rsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rs | head -120Repository: TrogonStack/trogonai
Length of output: 5232
🏁 Script executed:
# Check if there's any RRULE validation in the wakeup generation or elsewhere in the processor
rg -n 'next_rrule_occurrence|validate.*fired|valid.*occurrence' rsworkspace/crates/trogon-scheduler/src/processor/execution/ --type rustRepository: TrogonStack/trogonai
Length of output: 1778
🏁 Script executed:
# Check what the reconciliation does and how it relates to wakeup generation
rg -n 'wakeup|WakeupPayload|generate.*wakeup|publish.*wakeup' rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/ --type rust -B 2 -A 2Repository: TrogonStack/trogonai
Length of output: 9400
🏁 Script executed:
# Check the reconcile logic that calls build_rrule_wakeup and how `at` is determined
sed -n '300,340p' rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsRepository: TrogonStack/trogonai
Length of output: 1389
RecordScheduleOccurrence must validate that fired_at matches a valid RRULE occurrence for the schedule.
The wakeup processor validates the subject matches the payload schedule ID, then records the payload's fired_at directly. However, RecordScheduleOccurrence::decide only validates ordering (no duplicates/stale timestamps); it does not call next_rrule_occurrence to confirm fired_at is a valid occurrence in the current schedule. If the schedule's RRULE or exception dates change between wakeup publication and processing, an invalid fired_at could be accepted, potentially skipping valid scheduled occurrences.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rs` around
lines 90 - 98, The `RecordScheduleOccurrence::decide` method validates ordering
but does not confirm that the `fired_at` timestamp is a valid RRULE occurrence
for the schedule. Add validation in `RecordScheduleOccurrence::decide` to call
`next_rrule_occurrence` (or equivalent method) with the schedule's current RRULE
and exception dates to verify that `fired_at` matches a valid occurrence. If
`fired_at` does not correspond to a valid occurrence, reject the command with an
appropriate error. This ensures that if the schedule's RRULE or exception dates
change between wakeup publication and processing, invalid timestamps are not
accepted and valid occurrences are not skipped.
5ccf0da to
33a9536
Compare
| pub(crate) mod checkpoints; | ||
| pub(crate) mod execution_schedules; | ||
| pub(crate) mod reconciliation; | ||
| pub mod wakeup; |
There was a problem hiding this comment.
RRULE wakeup consumer unwired
High Severity
RRULE schedules now publish one-shot NATS wakeups to scheduler.schedules.execution.v1.rrule.*, but nothing in the production worker consumes those messages. RRuleWakeupProcessor and rrule_wakeup_consumer_config exist only in integration tests, so ScheduleOccurrenceFired events are never appended and the execution processor never dispatches user payloads or schedules the next occurrence.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 33a9536. Configure here.
33a9536 to
dfcd0af
Compare
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
dfcd0af to
bad4e3d
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs (1)
134-151:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAvoid deleting fixed live-test streams without serializing the tests.
execution_stream()andevent_stream()now delete shared constant streams. When the ignored live tests are run together, one test can delete a stream another test is actively using, causing flaky failures. Use per-test stream names or guard these live tests with a shared async mutex.🧪 One low-impact fix: serialize the live tests
+static LIVE_NATS_TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); + #[tokio::test] #[ignore = "requires NATS 2.14+ with AllowAtomicPublish and AllowMsgSchedules; set SCHEDULER_NATS_URL"] async fn rrule_command_event_is_processed_and_continued_against_live_nats() { + let _guard = LIVE_NATS_TEST_LOCK.lock().await; let Some(context) = context().await else {Apply the same guard at the start of the other live tests that share
EXECUTION_STREAM/EVENT_STREAM.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs` around lines 134 - 151, The execution_stream() and event_stream() functions delete shared constant streams (EXECUTION_STREAM and EVENT_STREAM) without synchronization, causing race conditions when multiple live tests run concurrently. Implement a shared async mutex guard to serialize access across all live tests that use these streams. Add the mutex guard at the start of each live test function that calls execution_stream() or event_stream() to ensure only one test accesses these shared streams at a time, preventing one test from deleting a stream while another is actively using it.
♻️ Duplicate comments (1)
rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs (1)
9-12: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy liftReplace raw occurrence timestamps with a domain value object across command/error surfaces.
RecordScheduleOccurrenceandOccurrenceAlreadyRecordedstill exposeDateTime<Utc>primitives directly. Move these to a dedicated occurrence timestamp value object and convert at boundaries once, so invariants are guaranteed at construction.Refactor sketch
-pub struct RecordScheduleOccurrence { - pub id: ScheduleId, - pub occurrence_at: DateTime<Utc>, -} +pub struct RecordScheduleOccurrence { + pub id: ScheduleId, + pub occurrence_at: ScheduleOccurrenceAt, +} pub enum RecordScheduleOccurrenceError { OccurrenceAlreadyRecorded { id: ScheduleId, - occurrence_at: DateTime<Utc>, - last_recorded_at: DateTime<Utc>, + occurrence_at: ScheduleOccurrenceAt, + last_recorded_at: ScheduleOccurrenceAt, }, } - pub fn new(id: ScheduleId, occurrence_at: DateTime<Utc>) -> Self + pub fn new(id: ScheduleId, occurrence_at: ScheduleOccurrenceAt) -> SelfAs per coding guidelines, “Prefer domain-specific value objects over primitives (e.g.,
AcpPrefixinstead ofString). Each type's factory must guarantee correctness at construction—invalid instances should be unrepresentable.”Also applies to: 23-27, 45-46
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs` around lines 9 - 12, Create a dedicated domain value object for occurrence timestamps to replace raw DateTime<Utc> primitives throughout the module. Define a new value object type (e.g., OccurrenceTimestamp) with a constructor that validates and guarantees correctness at construction time. Update the RecordScheduleOccurrence struct at lines 9-12 to use this value object instead of the occurrence_at field's raw DateTime<Utc> type. Apply the same change to the OccurrenceAlreadyRecorded error struct at lines 23-27 and its usage at lines 45-46, ensuring all command and error surfaces consistently use the domain value object rather than primitives, so timestamp invariants are enforced once at the boundaries.Source: Coding guidelines
🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs (1)
501-514: ⚡ Quick winAssert the final occurrence dispatch before accepting the expiry path.
This test checks the first dispatch headers, then only checks that processing the second occurrence expires the schedule. A regression that expires on the final occurrence without dispatching sequence
2could still pass. Add a second target-message assertion forTrogon-Schedule-Occurrence-Sequence: 2andTrogon-Schedule-Occurrence-At: second_occurrence_textbefore the checkpoint assertions.✅ Suggested coverage addition
let expired = processor .process(&read.events[2], Utc::now()) .await .expect("process second occurrence event"); assert_eq!(expired.outcome, ProcessedOutcome::Expired); + + let final_dispatch = execution_stream + .get_last_raw_message_by_subject(EXECUTION_TARGET_SUBJECT) + .await + .expect("second user dispatch was published"); + assert_eq!( + final_dispatch + .headers + .get("Trogon-Schedule-Occurrence-Sequence") + .unwrap() + .as_str(), + "2" + ); + assert_eq!( + final_dispatch + .headers + .get("Trogon-Schedule-Occurrence-At") + .unwrap() + .as_str(), + second_occurrence_text.as_str() + ); let checkpoint = checkpoints .load(&key)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs` around lines 501 - 514, The test verifies the first occurrence's dispatch but then only checks that processing the second occurrence expires the schedule, without verifying that the second occurrence was actually dispatched. This leaves a gap where a regression could expire the second occurrence without dispatching it and the test would still pass. After the assertion that expired.outcome equals ProcessedOutcome::Expired, add a second target-message assertion to verify that the dispatched message for the second occurrence contains the correct headers: Trogon-Schedule-Occurrence-Sequence: 2 and Trogon-Schedule-Occurrence-At matching the second_occurrence_text value. This assertion should come before the checkpoint read and assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@proto/trogonai/scheduler/schedules/v1/schedule_occurrence_fired.proto`:
- Line 12: The occurrence_sequence field in the ScheduleOccurrenceFired message
permits missing values in the protobuf schema, but the command-state evolution
code in state.rs treats a missing occurrence_sequence as an error
(MissingOccurrenceSequence). To resolve this contract mismatch, add the required
keyword before the occurrence_sequence field declaration to enforce that this
field must always be present when the protobuf message is constructed.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`:
- Around line 345-349: The RRULE boundary test uses insufficient timing
intervals that are vulnerable to real system operation delays. The
first_occurrence is set only 3 seconds in the future (in the
ChronoDuration::seconds(3) call), and the second_occurrence is only 2 seconds
after that (in the ChronoDuration::seconds(2) call). Real NATS operations and
event processing can easily exceed these tight windows, causing the test to skip
occurrences or expire prematurely. Increase both duration values
substantially—aim for at least 30+ seconds for the initial offset and 20+
seconds for the interval—to provide adequate headroom for the actual scheduling
behavior to be validated without timing luck affecting the test outcome.
---
Outside diff comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`:
- Around line 134-151: The execution_stream() and event_stream() functions
delete shared constant streams (EXECUTION_STREAM and EVENT_STREAM) without
synchronization, causing race conditions when multiple live tests run
concurrently. Implement a shared async mutex guard to serialize access across
all live tests that use these streams. Add the mutex guard at the start of each
live test function that calls execution_stream() or event_stream() to ensure
only one test accesses these shared streams at a time, preventing one test from
deleting a stream while another is actively using it.
---
Duplicate comments:
In
`@rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs`:
- Around line 9-12: Create a dedicated domain value object for occurrence
timestamps to replace raw DateTime<Utc> primitives throughout the module. Define
a new value object type (e.g., OccurrenceTimestamp) with a constructor that
validates and guarantees correctness at construction time. Update the
RecordScheduleOccurrence struct at lines 9-12 to use this value object instead
of the occurrence_at field's raw DateTime<Utc> type. Apply the same change to
the OccurrenceAlreadyRecorded error struct at lines 23-27 and its usage at lines
45-46, ensuring all command and error surfaces consistently use the domain value
object rather than primitives, so timestamp invariants are enforced once at the
boundaries.
---
Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`:
- Around line 501-514: The test verifies the first occurrence's dispatch but
then only checks that processing the second occurrence expires the schedule,
without verifying that the second occurrence was actually dispatched. This
leaves a gap where a regression could expire the second occurrence without
dispatching it and the test would still pass. After the assertion that
expired.outcome equals ProcessedOutcome::Expired, add a second target-message
assertion to verify that the dispatched message for the second occurrence
contains the correct headers: Trogon-Schedule-Occurrence-Sequence: 2 and
Trogon-Schedule-Occurrence-At matching the second_occurrence_text value. This
assertion should come before the checkpoint read and assertions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a249ff8c-e53d-49dd-b458-fc3d9c630496
⛔ Files ignored due to path filters (9)
rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.state.v1.state.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.state.v1.state.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__oneof.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__view_oneof.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.mod.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_fired.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_fired.rsis excluded by!**/gen/**
📒 Files selected for processing (27)
proto/trogonai/scheduler/schedules/state/v1/state.protoproto/trogonai/scheduler/schedules/v1/events.protoproto/trogonai/scheduler/schedules/v1/schedule_occurrence_fired.protorsworkspace/crates/trogon-scheduler/src/commands/create_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/domain/mod.rsrsworkspace/crates/trogon-scheduler/src/commands/domain/schedule_occurrence_sequence.rsrsworkspace/crates/trogon-scheduler/src/commands/mod.rsrsworkspace/crates/trogon-scheduler/src/commands/pause_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rsrsworkspace/crates/trogon-scheduler/src/commands/remove_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/resume_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/state.rsrsworkspace/crates/trogon-scheduler/src/lib.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule_wakeup_payload.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/mod.rsrsworkspace/crates/trogonai-proto/src/scheduler/schedules/codec.rs
✅ Files skipped from review due to trivial changes (2)
- rsworkspace/crates/trogon-scheduler/src/processor/mod.rs
- rsworkspace/crates/trogon-scheduler/src/lib.rs
🚧 Files skipped from review as they are similar to previous changes (16)
- rsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rs
- rsworkspace/crates/trogon-scheduler/src/commands/pause_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/commands/resume_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule_wakeup_payload.rs
- rsworkspace/crates/trogonai-proto/src/scheduler/schedules/codec.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs
- proto/trogonai/scheduler/schedules/v1/events.proto
- rsworkspace/crates/trogon-scheduler/src/commands/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/rrule.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
- rsworkspace/crates/trogon-scheduler/src/commands/remove_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
| // Stable NATS token used as the schedule stream id and schedule key. | ||
| string schedule_id = 1 [features.field_presence = LEGACY_REQUIRED]; | ||
| // Gapless sequence among accepted occurrences for this schedule. | ||
| uint64 occurrence_sequence = 2; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify schema/runtime presence mismatch for occurrence_sequence.
rg -n 'occurrence_sequence\s*=\s*2' proto/trogonai/scheduler/schedules/v1/schedule_occurrence_fired.proto
rg -n 'MissingOccurrenceSequence|occurrence_sequence.*ok_or' rsworkspace/crates/trogon-scheduler/src/commands/state.rs
rg -n 'occurrence_sequence|opt_uint64|Option<u64>' rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_fired.rsRepository: TrogonStack/trogonai
Length of output: 1008
Require presence for occurrence_sequence to match runtime expectations.
The field at line 12 currently permits missing occurrence_sequence, but command-state evolution treats a missing value as an error (MissingOccurrenceSequence at line 85 of state.rs). This creates a contract mismatch where valid protobuf messages can cause runtime failures. Tighten the schema to enforce the field's presence.
Proposed fix
- uint64 occurrence_sequence = 2;
+ uint64 occurrence_sequence = 2 [features.field_presence = LEGACY_REQUIRED];📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| uint64 occurrence_sequence = 2; | |
| uint64 occurrence_sequence = 2 [features.field_presence = LEGACY_REQUIRED]; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@proto/trogonai/scheduler/schedules/v1/schedule_occurrence_fired.proto` at
line 12, The occurrence_sequence field in the ScheduleOccurrenceFired message
permits missing values in the protobuf schema, but the command-state evolution
code in state.rs treats a missing occurrence_sequence as an error
(MissingOccurrenceSequence). To resolve this contract mismatch, add the required
keyword before the occurrence_sequence field declaration to enforce that this
field must always be present when the protobuf message is constructed.
| let first_occurrence = Utc::now() + ChronoDuration::seconds(3); | ||
| let first_occurrence = first_occurrence | ||
| .with_nanosecond(0) | ||
| .expect("valid nanosecond normalization"); | ||
| let second_occurrence = first_occurrence + ChronoDuration::seconds(2); |
There was a problem hiding this comment.
Give the RRULE boundary test more scheduling headroom.
The first wakeup is only 3 seconds out, and the second is 2 seconds later. Live NATS setup, event-store append/read, and processor work can easily cross those recurrence boundaries, making the test skip the first/second occurrence or expire early. Increase the offsets/interval so the test validates recurrence behavior instead of scheduler timing luck.
⏱️ Suggested timing adjustment
- let first_occurrence = Utc::now() + ChronoDuration::seconds(3);
+ let first_occurrence = Utc::now() + ChronoDuration::seconds(10);
@@
- let second_occurrence = first_occurrence + ChronoDuration::seconds(2);
+ let second_occurrence = first_occurrence + ChronoDuration::seconds(5);
@@
- let command = create_rrule_schedule(id.clone(), &first_occurrence_text, "FREQ=SECONDLY;INTERVAL=2;COUNT=2");
+ let command = create_rrule_schedule(id.clone(), &first_occurrence_text, "FREQ=SECONDLY;INTERVAL=5;COUNT=2");🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`
around lines 345 - 349, The RRULE boundary test uses insufficient timing
intervals that are vulnerable to real system operation delays. The
first_occurrence is set only 3 seconds in the future (in the
ChronoDuration::seconds(3) call), and the second_occurrence is only 2 seconds
after that (in the ChronoDuration::seconds(2) call). Real NATS operations and
event processing can easily exceed these tight windows, causing the test to skip
occurrences or expire prematurely. Increase both duration values
substantially—aim for at least 30+ seconds for the initial offset and 20+
seconds for the interval—to provide adequate headroom for the actual scheduling
behavior to be validated without timing luck affecting the test outcome.
Recurrence progress must be durable, replayable, and deterministic; deciding the next occurrence outside the event-sourced aggregate left it able to drift or stall and unrecoverable from the stream alone. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
| state_v1::StateValue::STATE_VALUE_PRESENT_ENABLED => { | ||
| if state.pending_occurrence_at.is_set() { | ||
| return Err(ScheduleNextOccurrenceError::AlreadyArmed { id: command.id.clone() }); | ||
| } |
There was a problem hiding this comment.
Exhausted arm lacks idempotency
Medium Severity
ScheduleNextOccurrence only rejects when pending_occurrence_at is set. After a ScheduleCompleted event, pending is cleared while the schedule stays enabled, so a retried ArmNext can append another ScheduleCompleted instead of being treated as already finished.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 2894dbe. Configure here.
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rsworkspace/crates/trogon-scheduler/src/commands/rrule.rs`:
- Around line 103-108: The function schedule_or_complete_event currently accepts
raw u64 primitives for next_sequence and last_sequence parameters, which allows
invalid or unchecked sequence transitions to bypass domain validation and
persist into events. Replace the u64 parameter types with
ScheduleOccurrenceSequence value objects for both next_sequence and
last_sequence to ensure sequence values are validated according to domain
invariants before being used in the helper function. The conversion from
validated domain sequences to wire-format primitives should only occur at the
point of emitting the v1::ScheduleEvent, maintaining separation between
validated domain logic and serialized output.
- Around line 45-47: The Expansion variant in the error enum is currently
storing the error as a flattened String, which discards the source error context
and chain. Replace the message: String field in the Expansion variant with a
#[source] annotated field named source that holds the concrete rrule::RRuleError
type directly. Then update the error conversion at the location where
RRuleExpansionError::Expansion is constructed (around lines 77-79) to pass the
rrule::RRuleError as the source field instead of converting it to a string via
.to_string(). This preserves the full error context and allows error reporting
tools to traverse the error chain.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`:
- Around line 276-293: The reconcile_completed function purges and expires
checkpoints unconditionally without verifying that the checkpoint is
specifically a scheduled RRULE checkpoint. Add a guard check after the
checkpoint_for call to validate the checkpoint status and kind, similar to how
OccurrenceRecorded and OccurrenceScheduled handle validation. Only proceed with
the Purge action and expiration if the checkpoint is confirmed to be a scheduled
RRULE checkpoint; otherwise, return an appropriate error to prevent incorrect
expiration of non-RRULE schedules from out-of-contract Completed events.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs`:
- Around line 257-269: The ScheduleOccurrenceScheduled decoding in the match arm
starting at the ScheduleEventCase::ScheduleOccurrenceScheduled pattern does not
extract or validate the occurrence_sequence field, which represents recurrence
identity data that should be persisted as a validated domain type. Follow the
same extraction and validation pattern used for schedule_id and occurrence_at:
extract occurrence_sequence from the scheduled object, validate it (handling
missing or invalid values with an appropriate ScheduleEventDecodeError), and
include the validated value in the returned ScheduleChange::OccurrenceScheduled
struct alongside schedule_id and occurrence_at.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs`:
- Around line 495-500: The match statement on the rejection in the
CommandError::Decide error handler currently treats ScheduleNotFound and
ScheduleDeleted as success by returning Ok(()), which silently ignores these
critical errors and can cause recurrence stalls. Remove ScheduleNotFound and
ScheduleDeleted from the match arm that returns Ok(()) (which currently also
contains AlreadyArmed and SchedulePaused), allowing these variants to fall
through to the other branch that returns a proper RetrySignal error instead.
This ensures that when a schedule is not found or has been deleted, the error is
properly propagated rather than silently acknowledged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 819a18a4-6079-48b8-b1d0-41ae1018984a
⛔ Files ignored due to path filters (13)
rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.state.v1.state.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.state.v1.state.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__oneof.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.__view_oneof.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.events.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.mod.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_completed.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_completed.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_recorded.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_recorded.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_scheduled.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_scheduled.rsis excluded by!**/gen/**
📒 Files selected for processing (25)
proto/trogonai/scheduler/schedules/state/v1/state.protoproto/trogonai/scheduler/schedules/v1/events.protoproto/trogonai/scheduler/schedules/v1/schedule_completed.protoproto/trogonai/scheduler/schedules/v1/schedule_occurrence_recorded.protoproto/trogonai/scheduler/schedules/v1/schedule_occurrence_scheduled.protorsworkspace/crates/trogon-scheduler/src/commands/create_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/mod.rsrsworkspace/crates/trogon-scheduler/src/commands/pause_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rsrsworkspace/crates/trogon-scheduler/src/commands/remove_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/resume_schedule.rsrsworkspace/crates/trogon-scheduler/src/commands/rrule.rsrsworkspace/crates/trogon-scheduler/src/commands/schedule_next_occurrence.rsrsworkspace/crates/trogon-scheduler/src/commands/state.rsrsworkspace/crates/trogon-scheduler/src/lib.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rsrsworkspace/crates/trogonai-proto/src/scheduler/schedules/codec.rs
🚧 Files skipped from review as they are similar to previous changes (8)
- proto/trogonai/scheduler/schedules/state/v1/state.proto
- rsworkspace/crates/trogon-scheduler/src/commands/remove_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/commands/resume_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/commands/create_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/commands/pause_schedule.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
| #[error("RRULE set could not be expanded: {message}")] | ||
| Expansion { message: String }, | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In the Rust rrulecrate version used by this repository, what concrete error type doesRRuleSet::set_from_stringreturn, and does that type implementstd::error::Error?
💡 Result:
In the rrule crate, the RRuleSet::set_from_string method returns a Result where the error type is RRuleError [1]. RRuleError is an enum defined within the crate [2][3]. It implements the std::error::Error trait [2]. The implementation is facilitated by the thiserror crate, as shown in the source code [3], which derives the Error trait for the enum [3].
Citations:
- 1: https://docs.rs/rrule/latest/rrule/struct.RRuleSet.html
- 2: https://docs.rs/rrule/latest/rrule/enum.RRuleError.html
- 3: https://docs.rs/rrule/latest/src/rrule/error.rs.html
🏁 Script executed:
cat -n rsworkspace/crates/trogon-scheduler/src/commands/rrule.rs | head -90Repository: TrogonStack/trogonai
Length of output: 3690
Preserve the RRULE expansion source error instead of flattening to String.
The Expansion variant (lines 45–47) stores message: String, and line 77–79 converts the typed RRuleError via .to_string(), discarding the error context and source chain. Change the variant to store the concrete rrule::RRuleError with #[source] instead:
Suggested fix
#[error("RRULE set could not be expanded")]
Expansion {
#[source]
source: rrule::RRuleError,
},
And update the conversion:
.map_err(|source| RRuleExpansionError::Expansion { source })?;
Per the coding guidelines: "Never discard error context by converting a typed error into a string; wrap the source error as a field or variant instead."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rsworkspace/crates/trogon-scheduler/src/commands/rrule.rs` around lines 45 -
47, The Expansion variant in the error enum is currently storing the error as a
flattened String, which discards the source error context and chain. Replace the
message: String field in the Expansion variant with a #[source] annotated field
named source that holds the concrete rrule::RRuleError type directly. Then
update the error conversion at the location where RRuleExpansionError::Expansion
is constructed (around lines 77-79) to pass the rrule::RRuleError as the source
field instead of converting it to a string via .to_string(). This preserves the
full error context and allows error reporting tools to traverse the error chain.
Source: Coding guidelines
| pub(crate) fn schedule_or_complete_event( | ||
| schedule_id: &str, | ||
| next_occurrence: Option<DateTime<Utc>>, | ||
| next_sequence: u64, | ||
| last_sequence: u64, | ||
| ) -> v1::ScheduleEvent { |
There was a problem hiding this comment.
Use ScheduleOccurrenceSequence in this helper contract, not raw u64.
Line 106 and Line 107 accept primitive sequence values in domain logic, which bypasses the sequence value-object invariants and allows saturated/invalid progression inputs to leak into persisted events. This helper should take validated domain sequence types and convert to wire primitives only at emit time.
[additional context] rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs:159-166 already passes a saturating_add(1) value into this API, which is precisely the kind of unchecked sequence transition this signature permits.
As per coding guidelines, "Prefer domain-specific value objects over primitives ... invalid instances should be unrepresentable."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rsworkspace/crates/trogon-scheduler/src/commands/rrule.rs` around lines 103 -
108, The function schedule_or_complete_event currently accepts raw u64
primitives for next_sequence and last_sequence parameters, which allows invalid
or unchecked sequence transitions to bypass domain validation and persist into
events. Replace the u64 parameter types with ScheduleOccurrenceSequence value
objects for both next_sequence and last_sequence to ensure sequence values are
validated according to domain invariants before being used in the helper
function. The conversion from validated domain sequences to wire-format
primitives should only occur at the point of emitting the v1::ScheduleEvent,
maintaining separation between validated domain logic and serialized output.
Source: Coding guidelines
| fn reconcile_completed( | ||
| current: Option<&ScheduleCheckpointRecord>, | ||
| schedule_id: &ScheduleId, | ||
| stream_position: StreamPosition, | ||
| event_id: Option<&str>, | ||
| ) -> Result<Reconciliation, ReconcileError> { | ||
| let current = checkpoint_for(current, schedule_id)?; | ||
|
|
||
| Ok(Reconciliation { | ||
| action: ReconcileAction::Purge(current.subject()), | ||
| next_checkpoint: advanced( | ||
| current, | ||
| ScheduleStatus::Expired, | ||
| ReconcileOutcome::Expired, | ||
| stream_position, | ||
| event_id, | ||
| ), | ||
| }) |
There was a problem hiding this comment.
Guard Completed reconciliation to scheduled RRULE checkpoints only.
On Line 285, reconcile_completed purges/expires unconditionally. Unlike OccurrenceRecorded/OccurrenceScheduled, this path doesn’t verify checkpoint status/kind, so an out-of-contract Completed event can incorrectly expire a non-RRULE schedule.
Suggested fix
fn reconcile_completed(
current: Option<&ScheduleCheckpointRecord>,
schedule_id: &ScheduleId,
stream_position: StreamPosition,
event_id: Option<&str>,
) -> Result<Reconciliation, ReconcileError> {
let current = checkpoint_for(current, schedule_id)?;
+ if current.status != ScheduleStatus::Scheduled || !matches!(current.schedule, Schedule::RRule { .. }) {
+ return Ok(Reconciliation {
+ action: ReconcileAction::CheckpointOnly,
+ next_checkpoint: advanced(
+ current,
+ current.status,
+ ReconcileOutcome::DuplicateStale,
+ stream_position,
+ event_id,
+ ),
+ });
+ }
Ok(Reconciliation {
action: ReconcileAction::Purge(current.subject()),
next_checkpoint: advanced(
current,🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`
around lines 276 - 293, The reconcile_completed function purges and expires
checkpoints unconditionally without verifying that the checkpoint is
specifically a scheduled RRULE checkpoint. Add a guard check after the
checkpoint_for call to validate the checkpoint status and kind, similar to how
OccurrenceRecorded and OccurrenceScheduled handle validation. Only proceed with
the Purge action and expiration if the checkpoint is confirmed to be a scheduled
RRULE checkpoint; otherwise, return an appropriate error to prevent incorrect
expiration of non-RRULE schedules from out-of-contract Completed events.
| ScheduleEventCase::ScheduleOccurrenceScheduled(scheduled) => { | ||
| let schedule_id = schedule_id_from(&scheduled.schedule_id)?; | ||
| let occurrence_at = timestamp_to_datetime( | ||
| scheduled | ||
| .occurrence_at | ||
| .as_option() | ||
| .ok_or(ScheduleEventDecodeError::MissingField { field: "occurrence_at" })?, | ||
| "occurrence_at", | ||
| )?; | ||
| Ok(ScheduleChange::OccurrenceScheduled { | ||
| schedule_id, | ||
| occurrence_at, | ||
| }) |
There was a problem hiding this comment.
Validate occurrence_sequence for ScheduleOccurrenceScheduled during decode.
Line 257 decodes ScheduleOccurrenceScheduled without validating occurrence_sequence. That allows missing/invalid sequence data to pass boundary decoding even though this event shape carries recurrence identity fields.
As per coding guidelines, “Prefer domain-specific value objects over primitives … invalid instances should be unrepresentable” and “Persist only validated domain types to storage, events, and runtime state.”
Suggested fix
ScheduleEventCase::ScheduleOccurrenceScheduled(scheduled) => {
let schedule_id = schedule_id_from(&scheduled.schedule_id)?;
+ let _occurrence_sequence = ScheduleOccurrenceSequence::try_new(
+ scheduled.occurrence_sequence.ok_or(ScheduleEventDecodeError::MissingField {
+ field: "occurrence_sequence",
+ })?,
+ )
+ .map_err(|source| ScheduleEventDecodeError::OccurrenceSequence { source })?;
let occurrence_at = timestamp_to_datetime(
scheduled
.occurrence_at
.as_option()
.ok_or(ScheduleEventDecodeError::MissingField { field: "occurrence_at" })?,
"occurrence_at",
)?;
Ok(ScheduleChange::OccurrenceScheduled {
schedule_id,
occurrence_at,
})
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ScheduleEventCase::ScheduleOccurrenceScheduled(scheduled) => { | |
| let schedule_id = schedule_id_from(&scheduled.schedule_id)?; | |
| let occurrence_at = timestamp_to_datetime( | |
| scheduled | |
| .occurrence_at | |
| .as_option() | |
| .ok_or(ScheduleEventDecodeError::MissingField { field: "occurrence_at" })?, | |
| "occurrence_at", | |
| )?; | |
| Ok(ScheduleChange::OccurrenceScheduled { | |
| schedule_id, | |
| occurrence_at, | |
| }) | |
| ScheduleEventCase::ScheduleOccurrenceScheduled(scheduled) => { | |
| let schedule_id = schedule_id_from(&scheduled.schedule_id)?; | |
| let _occurrence_sequence = ScheduleOccurrenceSequence::try_new( | |
| scheduled.occurrence_sequence.ok_or(ScheduleEventDecodeError::MissingField { | |
| field: "occurrence_sequence", | |
| })?, | |
| ) | |
| .map_err(|source| ScheduleEventDecodeError::OccurrenceSequence { source })?; | |
| let occurrence_at = timestamp_to_datetime( | |
| scheduled | |
| .occurrence_at | |
| .as_option() | |
| .ok_or(ScheduleEventDecodeError::MissingField { field: "occurrence_at" })?, | |
| "occurrence_at", | |
| )?; | |
| Ok(ScheduleChange::OccurrenceScheduled { | |
| schedule_id, | |
| occurrence_at, | |
| }) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs`
around lines 257 - 269, The ScheduleOccurrenceScheduled decoding in the match
arm starting at the ScheduleEventCase::ScheduleOccurrenceScheduled pattern does
not extract or validate the occurrence_sequence field, which represents
recurrence identity data that should be persisted as a validated domain type.
Follow the same extraction and validation pattern used for schedule_id and
occurrence_at: extract occurrence_sequence from the scheduled object, validate
it (handling missing or invalid values with an appropriate
ScheduleEventDecodeError), and include the validated value in the returned
ScheduleChange::OccurrenceScheduled struct alongside schedule_id and
occurrence_at.
Source: Coding guidelines
| Err(CommandError::Decide(rejection)) => match rejection { | ||
| ScheduleNextOccurrenceError::AlreadyArmed { .. } | ||
| | ScheduleNextOccurrenceError::SchedulePaused { .. } | ||
| | ScheduleNextOccurrenceError::ScheduleNotFound { .. } | ||
| | ScheduleNextOccurrenceError::ScheduleDeleted { .. } => Ok(()), | ||
| other => Err(RetrySignal::ArmSchedule { |
There was a problem hiding this comment.
Do not silently no-op ScheduleNotFound/ScheduleDeleted in ArmNext.
On Lines 498-499, these rejections are treated as success. In the RRULE create/resume arming path, that can acknowledge the record without planning the next occurrence, causing silent recurrence stalls.
Suggested fix
match CommandExecution::new(&self.event_store, &command).execute().await {
Ok(_) => Ok(()),
Err(CommandError::Decide(rejection)) => match rejection {
ScheduleNextOccurrenceError::AlreadyArmed { .. }
- | ScheduleNextOccurrenceError::SchedulePaused { .. }
- | ScheduleNextOccurrenceError::ScheduleNotFound { .. }
- | ScheduleNextOccurrenceError::ScheduleDeleted { .. } => Ok(()),
+ | ScheduleNextOccurrenceError::SchedulePaused { .. } => Ok(()),
other => Err(RetrySignal::ArmSchedule {
source: Box::new(other),
}),
},🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs`
around lines 495 - 500, The match statement on the rejection in the
CommandError::Decide error handler currently treats ScheduleNotFound and
ScheduleDeleted as success by returning Ok(()), which silently ignores these
critical errors and can cause recurrence stalls. Remove ScheduleNotFound and
ScheduleDeleted from the match arm that returns Ok(()) (which currently also
contains AlreadyArmed and SchedulePaused), allowing these variants to fall
through to the other branch that returns a proper RetrySignal error instead.
This ensures that when a schedule is not found or has been deleted, the error is
properly propagated rather than silently acknowledged.
Operators need to see when an occurrence was actually recorded and planned (and the lateness between), distinct from the due instant. These wall-clock fields are audit-only and stay out of state and identity so the recurrence decision remains deterministic. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
There are 5 total unresolved issues (including 4 from previous reviews).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 1c1248b. Configure here.
| } | ||
| if source.is_some() { | ||
| return Err(ScheduleRequestError::UnsupportedDispatchSource); | ||
| } |
There was a problem hiding this comment.
RRULE sampling breaks dispatch
High Severity
DispatchRequest::build_with_occurrence rejects any delivery with a sampling source, but RRULE recurrence still accepts those deliveries at create and persists them on the checkpoint. After a wakeup is recorded, reconciliation builds a dispatch for the user message and fails with UnsupportedDispatchSource, which the execution processor treats as a durable poison while the aggregate has already advanced.
Reviewed by Cursor Bugbot for commit 1c1248b. Configure here.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs (1)
148-150:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAvoid lossy sequence defaults and saturation when advancing occurrence identity.
unwrap_or(0)can turn malformed persisted progress into sequence1, andsaturating_add(1)can schedule a follow-up with the same sequence at theu64::MAXboundary. Reject invalid progress and advance the follow-up sequence throughScheduleOccurrenceSequence::next_after(...)instead.Suggested direction
+ let prior_sequence = match (state.last_occurrence_at.as_option().is_some(), state.last_occurrence_sequence) { + (false, None) => 0, + (true, Some(sequence)) => sequence, + _ => { + return Err(RecordScheduleOccurrenceError::MalformedOccurrenceProgress { + id: command.id.clone(), + }); + } + }; + let occurrence_sequence = - ScheduleOccurrenceSequence::next_after(state.last_occurrence_sequence.unwrap_or(0)) + ScheduleOccurrenceSequence::next_after(prior_sequence) .map_err(|source| RecordScheduleOccurrenceError::OccurrenceSequence { source })?; let next_occurrence = next_rrule_occurrence(schedule, RRuleCursor::after(command.occurrence_at)) .map_err(|source| RecordScheduleOccurrenceError::NextOccurrence { source })?; + let next_occurrence_sequence = if next_occurrence.is_some() { + ScheduleOccurrenceSequence::next_after(occurrence_sequence.as_u64()) + .map_err(|source| RecordScheduleOccurrenceError::OccurrenceSequence { source })? + .as_u64() + } else { + occurrence_sequence.as_u64() + }; let follow_up = schedule_or_complete_event( command.id.as_str(), next_occurrence, - occurrence_sequence.as_u64().saturating_add(1), + next_occurrence_sequence, occurrence_sequence.as_u64(), command.recorded_at, );Also applies to: 170-176
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs` around lines 148 - 150, The code uses unwrap_or(0) which can mask malformed persisted progress and turn it into sequence 1, and saturating_add(1) which can fail to advance the sequence at the u64::MAX boundary. In both locations where occurrence_sequence is being calculated (around lines 148-150 and 170-176), replace the pattern of unwrap_or(0) combined with saturating_add(1) with a call to ScheduleOccurrenceSequence::next_after(...) that properly rejects invalid progress states. This ensures that invalid progress is rejected rather than silently defaulted, and that sequence advancement correctly handles boundary conditions instead of saturating.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In
`@rsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rs`:
- Around line 148-150: The code uses unwrap_or(0) which can mask malformed
persisted progress and turn it into sequence 1, and saturating_add(1) which can
fail to advance the sequence at the u64::MAX boundary. In both locations where
occurrence_sequence is being calculated (around lines 148-150 and 170-176),
replace the pattern of unwrap_or(0) combined with saturating_add(1) with a call
to ScheduleOccurrenceSequence::next_after(...) that properly rejects invalid
progress states. This ensures that invalid progress is rejected rather than
silently defaulted, and that sequence advancement correctly handles boundary
conditions instead of saturating.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9da5939d-b333-4522-8e32-6b5f90564459
⛔ Files ignored due to path filters (4)
rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_recorded.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_recorded.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_scheduled.__view.rsis excluded by!**/gen/**rsworkspace/crates/trogonai-proto/src/gen/trogonai.scheduler.schedules.v1.schedule_occurrence_scheduled.rsis excluded by!**/gen/**
📒 Files selected for processing (9)
proto/trogonai/scheduler/schedules/v1/schedule_occurrence_recorded.protoproto/trogonai/scheduler/schedules/v1/schedule_occurrence_scheduled.protorsworkspace/crates/trogon-scheduler/src/commands/record_schedule_occurrence.rsrsworkspace/crates/trogon-scheduler/src/commands/rrule.rsrsworkspace/crates/trogon-scheduler/src/commands/schedule_next_occurrence.rsrsworkspace/crates/trogon-scheduler/src/commands/state.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/wakeup.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
🚧 Files skipped from review as they are similar to previous changes (5)
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
- rsworkspace/crates/trogon-scheduler/src/commands/rrule.rs
- rsworkspace/crates/trogon-scheduler/src/commands/schedule_next_occurrence.rs
- rsworkspace/crates/trogon-scheduler/src/commands/state.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs


Uh oh!
There was an error while loading. Please reload this page.