From 5a2c58e1a0ea2646f3f1480f9d27afe0dd4c35d4 Mon Sep 17 00:00:00 2001 From: Morteza Iravani Date: Fri, 29 May 2026 20:29:41 +0200 Subject: [PATCH 1/3] add quota monitor and auto-retry on usage limit (#18) --- internal/application/tickets/orchestrator.go | 38 ++++++++++++ internal/domain/workflowstate/types.go | 1 + internal/server/job_scheduled.go | 40 ++++++++++++ internal/server/jobs.go | 47 +++++++++++++- internal/server/quota_monitor.go | 65 ++++++++++++++++++++ internal/server/server.go | 22 ++++--- 6 files changed, 202 insertions(+), 11 deletions(-) create mode 100644 internal/server/job_scheduled.go create mode 100644 internal/server/quota_monitor.go diff --git a/internal/application/tickets/orchestrator.go b/internal/application/tickets/orchestrator.go index 0e5df2f..7d16005 100644 --- a/internal/application/tickets/orchestrator.go +++ b/internal/application/tickets/orchestrator.go @@ -68,6 +68,7 @@ func (o *Orchestrator) StartFlow(ctx context.Context, ticketNumber string) error } state, loadErr := o.Store.LoadState(ticketNumber) + fmt.Println("-- StartFlow: load state of ticket", ticketNumber, state, loadErr) if os.IsNotExist(loadErr) { state = workflowstate.New(ticketNumber) saveErr := o.Store.SaveState(ticketNumber, state) @@ -415,8 +416,13 @@ func (o *Orchestrator) runState(ctx context.Context, state *workflowstate.State, _ = os.WriteFile(rawLogPath, []byte(result.RawOutput+"\n\n[stderr]\n"+result.Stderr), 0o644) if err != nil { if errors.Is(err, providers.ErrTokensExhausted) { + // Mark the ticket as rescheduled to indicate that it should be automatically re-run when the quota resets. err = fmt.Errorf("token usage limit reached — wait for your quota to reset, then rerun this ticket to continue: %w", err) + state.FlowStatus = workflowstate.FlowStatusRescheduled + _ = o.Store.SaveState(state.TicketNumber, *state) + return err } + _ = markdown.AppendSection(logPath, stateCfg.Name+" Failed", err.Error()) return o.failState(state, err) @@ -825,3 +831,35 @@ func EnsureStateIgnored(repoRoot, stateDirName string) error { return nil } + +func (o *Orchestrator) ProbeProvider(ctx context.Context) error { + // create temp work dir (WorkDir must be a real existing path) + workDir, err := os.MkdirTemp("", "autopr-probe-work-*") + if err != nil { + return fmt.Errorf("probe: create work dir: %w", err) + } + defer os.RemoveAll(workDir) + // create temp runtime dir (RuntimeDir must be a real existing path) + runtimeDir, err := os.MkdirTemp("", "autopr-probe-runtime-*") + if err != nil { + return fmt.Errorf("probe: create runtime dir: %w", err) + } + defer os.RemoveAll(runtimeDir) + + // write minimal prompt file (PromptPath must exist on disk — CLIProvider reads it) + promptPath := filepath.Join(workDir, "probe.md") + if err = os.WriteFile(promptPath, []byte("ping"), 0o644); err != nil { + return fmt.Errorf("probe: write prompt: %w", err) + } + // call provider — discard output, only care about ErrTokensExhausted + _, err = o.Provider.Execute(ctx, providers.ExecuteRequest{ + PromptPath: promptPath, + WorkDir: workDir, + RuntimeDir: runtimeDir, + SessionData: "", // fresh call, no session + }) + if errors.Is(err, providers.ErrTokensExhausted) { + return providers.ErrTokensExhausted // quota still hit + } + return nil // any other result = quota not the issue +} diff --git a/internal/domain/workflowstate/types.go b/internal/domain/workflowstate/types.go index cc6bac9..eed89cd 100644 --- a/internal/domain/workflowstate/types.go +++ b/internal/domain/workflowstate/types.go @@ -18,6 +18,7 @@ const ( FlowStatusDone FlowStatus = "done" FlowStatusFailed FlowStatus = "failed" FlowStatusCancelled FlowStatus = "cancelled" + FlowStatusRescheduled FlowStatus = "rescheduled" ) // StateRun records a single execution of a workflow state for a ticket. diff --git a/internal/server/job_scheduled.go b/internal/server/job_scheduled.go new file mode 100644 index 0000000..922abdf --- /dev/null +++ b/internal/server/job_scheduled.go @@ -0,0 +1,40 @@ +package server + +import ( + "fmt" + "os" + "strings" + + workflowstate "github.com/Neokil/AutoPR/internal/domain/workflowstate" + "github.com/Neokil/AutoPR/internal/markdown" +) + +func (s *server) persistTicketScheduled(repoID, repoRoot, ticket string, repoRt *repoRuntime, cause error) error { + if strings.TrimSpace(ticket) == "" { + return nil + } + + ticketState, err := repoRt.store.LoadState(ticket) + if err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("load ticket state: %w", err) + } + ticketState = workflowstate.New(ticket) + } + + msg := strings.TrimSpace(cause.Error()) + ticketState.FlowStatus = workflowstate.FlowStatusRescheduled + ticketState.LastError = msg + saveErr := repoRt.store.SaveState(ticket, ticketState) + if saveErr != nil { + return fmt.Errorf("save ticket state: %w", saveErr) + } + + body := msg + if ticketState.WorktreePath != "" && ticketState.CurrentState != "" { + logPath := ticketState.CurrentRunLogPath() + _ = markdown.AppendSection(logPath, "Job Rescheduled — Quota Exhausted", body) + } + + return s.syncTicketFromRepo(repoID, repoRoot, ticket, repoRt, true) +} diff --git a/internal/server/jobs.go b/internal/server/jobs.go index b34d1cd..c473bf5 100644 --- a/internal/server/jobs.go +++ b/internal/server/jobs.go @@ -2,19 +2,36 @@ package server import ( "context" + "errors" "fmt" + "log/slog" "strings" "sync" + "time" "github.com/Neokil/AutoPR/internal/api" + "github.com/Neokil/AutoPR/internal/providers" "github.com/Neokil/AutoPR/internal/serverstate" ) func (s *server) workerLoop() { for job := range s.jobs { + s.waitIfQuotaReached() s.setJobStatus(job.record, "running", "") err := s.executeJob(job) if err != nil { + if errors.Is(err, providers.ErrTokensExhausted) { + slog.Warn("LLM quota reached during job execution. Marking quota as reached and pausing further jobs.") + s.setJobStatus(job.record, "queued", "") + + s.setQuotaReached(true) + if err := s.reQueueJob(job); err != nil { + slog.Error("quota re-queue failed", "job", job.record.ID, "err", err) + } + continue + + } + s.setJobStatus(job.record, "failed", err.Error()) continue @@ -108,9 +125,16 @@ func (s *server) executeJob(job queuedJob) error { err = fmt.Errorf("%w: %s", errUnsupportedJobAction, job.record.Action) } if err != nil && ticket != "" { - persistErr := s.persistTicketFailure(repoID, repoRoot, ticket, repoRt, job, err) - if persistErr != nil { - return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr) + if errors.Is(err, providers.ErrTokensExhausted) { + persistErr := s.persistTicketScheduled(repoID, repoRoot, ticket, repoRt, err) + if persistErr != nil { + return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr) + } + } else { + persistErr := s.persistTicketFailure(repoID, repoRoot, ticket, repoRt, job, err) + if persistErr != nil { + return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr) + } } } @@ -141,3 +165,20 @@ func (s *server) getTicketLock(repoID, ticket string) *sync.Mutex { return m } + +func (s *server) waitIfQuotaReached() { + for s.isQuotaReached() { + slog.Info("LLM quota reached. Pausing job execution until quota is reset.") + time.Sleep(2 * time.Minute) // Wait before checking again + } +} + +func (s *server) reQueueJob(job queuedJob) error { + // This method is intended to be used when a job fails due to quota limits. It re-queues the job to be retried later. + select { + case s.jobs <- job: + return nil + default: + return fmt.Errorf("re-queue failed: job queue full") + } +} diff --git a/internal/server/quota_monitor.go b/internal/server/quota_monitor.go new file mode 100644 index 0000000..26fd460 --- /dev/null +++ b/internal/server/quota_monitor.go @@ -0,0 +1,65 @@ +package server + +import ( + "context" + "errors" + "log/slog" + "time" + + "github.com/Neokil/AutoPR/internal/providers" +) + +const ( + quotaMonitorInterval = 20 * time.Minute + quotaMonitorInitialWait = 20 * time.Second +) + +func (s *server) quotaMonitorLoop() { + ticker := time.NewTicker(quotaMonitorInterval) + + defer ticker.Stop() + time.Sleep(quotaMonitorInitialWait) + for range ticker.C { + s.checkQuotaStatus() + } +} + +func (s *server) isQuotaReached() bool { + s.quotaMu.RLock() + defer s.quotaMu.RUnlock() + return s.quotaReached +} + +func (s *server) setQuotaReached(reached bool) { + s.quotaMu.Lock() + defer s.quotaMu.Unlock() + s.quotaReached = reached +} + +func (s *server) checkQuotaStatus() { + + if !s.isQuotaReached() { + return + } + slog.Info("quota monitor: probing provider to check if quota has reset") + + repos := s.meta.ListRepos() + if len(repos) == 0 { + slog.Warn("quota monitor: no repos available for probe, skipping") + return + } + rt, err := s.runtimeForRepo(repos[0].Path) + if err != nil { + slog.Error("quota monitor: failed to get runtime for probe", "err", err) + return + } + + probeErr := rt.svc.ProbeProvider(context.Background()) + if errors.Is(probeErr, providers.ErrTokensExhausted) { + slog.Info("quota monitor: quota still reached, will check again later") + return + } + + s.setQuotaReached(false) + slog.Info("quota monitor: quota has reset, resuming operations") +} diff --git a/internal/server/server.go b/internal/server/server.go index 3e107bf..36740a4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -70,6 +70,9 @@ type server struct { ticketLockMu sync.Mutex ticketLocks map[string]*sync.Mutex + + quotaReached bool + quotaMu sync.RWMutex } var sectionHeaderRE = regexp.MustCompile(`^## (.+) \(([^)]+)\)$`) @@ -96,19 +99,22 @@ func Run(portOverride int) error { } daemon := &server{ - cfg: cfg, - meta: meta, - runtimes: map[string]*repoRuntime{}, - jobs: make(chan queuedJob, jobQueueSize), - repoLocks: map[string]*sync.RWMutex{}, - ticketLocks: map[string]*sync.Mutex{}, - webFS: distFS, - subscribers: map[string]chan api.ServerEvent{}, + cfg: cfg, + meta: meta, + runtimes: map[string]*repoRuntime{}, + jobs: make(chan queuedJob, jobQueueSize), + repoLocks: map[string]*sync.RWMutex{}, + ticketLocks: map[string]*sync.Mutex{}, + webFS: distFS, + subscribers: map[string]chan api.ServerEvent{}, + quotaReached: false, + quotaMu: sync.RWMutex{}, } for range cfg.ServerWorkers { go daemon.workerLoop() } go daemon.prMonitorLoop() + go daemon.quotaMonitorLoop() port := cfg.ServerPort if portOverride > 0 { From cf39e71ff0dbe89807361c70effdb11757ae14fb Mon Sep 17 00:00:00 2001 From: Morteza Iravani Date: Sun, 31 May 2026 17:50:54 +0200 Subject: [PATCH 2/3] fix(#18): auto-resume jobs by blocking re-queue and signaling workers on quota reset --- internal/application/tickets/orchestrator.go | 1 - internal/server/jobs.go | 19 ++++++++++++++----- internal/server/quota_monitor.go | 11 +++++++++-- internal/server/server.go | 3 ++- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/internal/application/tickets/orchestrator.go b/internal/application/tickets/orchestrator.go index 7d16005..a7a8b04 100644 --- a/internal/application/tickets/orchestrator.go +++ b/internal/application/tickets/orchestrator.go @@ -68,7 +68,6 @@ func (o *Orchestrator) StartFlow(ctx context.Context, ticketNumber string) error } state, loadErr := o.Store.LoadState(ticketNumber) - fmt.Println("-- StartFlow: load state of ticket", ticketNumber, state, loadErr) if os.IsNotExist(loadErr) { state = workflowstate.New(ticketNumber) saveErr := o.Store.SaveState(ticketNumber, state) diff --git a/internal/server/jobs.go b/internal/server/jobs.go index c473bf5..1d356c7 100644 --- a/internal/server/jobs.go +++ b/internal/server/jobs.go @@ -7,7 +7,6 @@ import ( "log/slog" "strings" "sync" - "time" "github.com/Neokil/AutoPR/internal/api" "github.com/Neokil/AutoPR/internal/providers" @@ -167,17 +166,27 @@ func (s *server) getTicketLock(repoID, ticket string) *sync.Mutex { } func (s *server) waitIfQuotaReached() { - for s.isQuotaReached() { - slog.Info("LLM quota reached. Pausing job execution until quota is reset.") - time.Sleep(2 * time.Minute) // Wait before checking again + s.quotaMu.RLock() + quotaReached := s.quotaReached + resetCh := s.quotaResetCh + s.quotaMu.RUnlock() + + if !quotaReached { + return } + + <-resetCh + slog.Info("LLM quota reset detected. Resuming job execution.") + } func (s *server) reQueueJob(job queuedJob) error { - // This method is intended to be used when a job fails due to quota limits. It re-queues the job to be retried later. select { case s.jobs <- job: return nil + // here we could also listen for a shutdown signal if we had one, to avoid trying to re-queue when the server is shutting down. For now, we'll just return an error if the job queue is full. + // case <-context.Background().Done(): + // return fmt.Errorf("re-queue aborted: server shutting down") default: return fmt.Errorf("re-queue failed: job queue full") } diff --git a/internal/server/quota_monitor.go b/internal/server/quota_monitor.go index 26fd460..d148070 100644 --- a/internal/server/quota_monitor.go +++ b/internal/server/quota_monitor.go @@ -11,14 +11,12 @@ import ( const ( quotaMonitorInterval = 20 * time.Minute - quotaMonitorInitialWait = 20 * time.Second ) func (s *server) quotaMonitorLoop() { ticker := time.NewTicker(quotaMonitorInterval) defer ticker.Stop() - time.Sleep(quotaMonitorInitialWait) for range ticker.C { s.checkQuotaStatus() } @@ -33,7 +31,16 @@ func (s *server) isQuotaReached() bool { func (s *server) setQuotaReached(reached bool) { s.quotaMu.Lock() defer s.quotaMu.Unlock() + if reached && !s.quotaReached { + // Create a fresh channel that workers will block on + s.quotaResetCh = make(chan struct{}) + } + if !reached && s.quotaReached { + // Signal all waiting workers to wake up + close(s.quotaResetCh) + } s.quotaReached = reached + } func (s *server) checkQuotaStatus() { diff --git a/internal/server/server.go b/internal/server/server.go index 36740a4..ed354fc 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -73,6 +73,7 @@ type server struct { quotaReached bool quotaMu sync.RWMutex + quotaResetCh chan struct{} } var sectionHeaderRE = regexp.MustCompile(`^## (.+) \(([^)]+)\)$`) @@ -107,8 +108,8 @@ func Run(portOverride int) error { ticketLocks: map[string]*sync.Mutex{}, webFS: distFS, subscribers: map[string]chan api.ServerEvent{}, - quotaReached: false, quotaMu: sync.RWMutex{}, + quotaResetCh: make(chan struct{}), } for range cfg.ServerWorkers { go daemon.workerLoop() From ad96e6e9184e01a27211268abffd22f454685e83 Mon Sep 17 00:00:00 2001 From: Morteza Iravani Date: Sun, 31 May 2026 19:58:22 +0200 Subject: [PATCH 3/3] add rescheduled status to OpenAPI schema --- internal/api/generated.go | 15 +++++++++------ openapi/openapi.yaml | 2 +- web/src/generated/api.ts | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/api/generated.go b/internal/api/generated.go index 00e7f5a..98a564b 100644 --- a/internal/api/generated.go +++ b/internal/api/generated.go @@ -36,12 +36,13 @@ func (e CleanupScopeRequestScope) Valid() bool { // Defines values for FlowStatus. const ( - FlowStatusCancelled FlowStatus = "cancelled" - FlowStatusDone FlowStatus = "done" - FlowStatusFailed FlowStatus = "failed" - FlowStatusPending FlowStatus = "pending" - FlowStatusRunning FlowStatus = "running" - FlowStatusWaiting FlowStatus = "waiting" + FlowStatusCancelled FlowStatus = "cancelled" + FlowStatusDone FlowStatus = "done" + FlowStatusFailed FlowStatus = "failed" + FlowStatusPending FlowStatus = "pending" + FlowStatusRescheduled FlowStatus = "rescheduled" + FlowStatusRunning FlowStatus = "running" + FlowStatusWaiting FlowStatus = "waiting" ) // Valid indicates whether the value is a known member of the FlowStatus enum. @@ -55,6 +56,8 @@ func (e FlowStatus) Valid() bool { return true case FlowStatusPending: return true + case FlowStatusRescheduled: + return true case FlowStatusRunning: return true case FlowStatusWaiting: diff --git a/openapi/openapi.yaml b/openapi/openapi.yaml index 00cfbc2..934a7df 100644 --- a/openapi/openapi.yaml +++ b/openapi/openapi.yaml @@ -305,7 +305,7 @@ components: schemas: FlowStatus: type: string - enum: [pending, running, waiting, done, failed, cancelled] + enum: [pending, running, waiting, done, failed, cancelled, rescheduled] JobStatus: type: string enum: [queued, running, done, failed] diff --git a/web/src/generated/api.ts b/web/src/generated/api.ts index 215da1b..81cc8f3 100644 --- a/web/src/generated/api.ts +++ b/web/src/generated/api.ts @@ -233,7 +233,7 @@ export type webhooks = Record; export interface components { schemas: { /** @enum {string} */ - FlowStatus: "pending" | "running" | "waiting" | "done" | "failed" | "cancelled"; + FlowStatus: "pending" | "running" | "waiting" | "done" | "failed" | "cancelled" | "rescheduled"; /** @enum {string} */ JobStatus: "queued" | "running" | "done" | "failed"; HealthResponse: {