diff --git a/internal/api/generated.go b/internal/api/generated.go index 00e7f5a..98a564b 100644 --- a/internal/api/generated.go +++ b/internal/api/generated.go @@ -36,12 +36,13 @@ func (e CleanupScopeRequestScope) Valid() bool { // Defines values for FlowStatus. const ( - FlowStatusCancelled FlowStatus = "cancelled" - FlowStatusDone FlowStatus = "done" - FlowStatusFailed FlowStatus = "failed" - FlowStatusPending FlowStatus = "pending" - FlowStatusRunning FlowStatus = "running" - FlowStatusWaiting FlowStatus = "waiting" + FlowStatusCancelled FlowStatus = "cancelled" + FlowStatusDone FlowStatus = "done" + FlowStatusFailed FlowStatus = "failed" + FlowStatusPending FlowStatus = "pending" + FlowStatusRescheduled FlowStatus = "rescheduled" + FlowStatusRunning FlowStatus = "running" + FlowStatusWaiting FlowStatus = "waiting" ) // Valid indicates whether the value is a known member of the FlowStatus enum. @@ -55,6 +56,8 @@ func (e FlowStatus) Valid() bool { return true case FlowStatusPending: return true + case FlowStatusRescheduled: + return true case FlowStatusRunning: return true case FlowStatusWaiting: diff --git a/internal/application/tickets/orchestrator.go b/internal/application/tickets/orchestrator.go index 0e5df2f..a7a8b04 100644 --- a/internal/application/tickets/orchestrator.go +++ b/internal/application/tickets/orchestrator.go @@ -415,8 +415,13 @@ func (o *Orchestrator) runState(ctx context.Context, state *workflowstate.State, _ = os.WriteFile(rawLogPath, []byte(result.RawOutput+"\n\n[stderr]\n"+result.Stderr), 0o644) if err != nil { if errors.Is(err, providers.ErrTokensExhausted) { + // Mark the ticket as rescheduled to indicate that it should be automatically re-run when the quota resets. err = fmt.Errorf("token usage limit reached — wait for your quota to reset, then rerun this ticket to continue: %w", err) + state.FlowStatus = workflowstate.FlowStatusRescheduled + _ = o.Store.SaveState(state.TicketNumber, *state) + return err } + _ = markdown.AppendSection(logPath, stateCfg.Name+" Failed", err.Error()) return o.failState(state, err) @@ -825,3 +830,35 @@ func EnsureStateIgnored(repoRoot, stateDirName string) error { return nil } + +func (o *Orchestrator) ProbeProvider(ctx context.Context) error { + // create temp work dir (WorkDir must be a real existing path) + workDir, err := os.MkdirTemp("", "autopr-probe-work-*") + if err != nil { + return fmt.Errorf("probe: create work dir: %w", err) + } + defer os.RemoveAll(workDir) + // create temp runtime dir (RuntimeDir must be a real existing path) + runtimeDir, err := os.MkdirTemp("", "autopr-probe-runtime-*") + if err != nil { + return fmt.Errorf("probe: create runtime dir: %w", err) + } + defer os.RemoveAll(runtimeDir) + + // write minimal prompt file (PromptPath must exist on disk — CLIProvider reads it) + promptPath := filepath.Join(workDir, "probe.md") + if err = os.WriteFile(promptPath, []byte("ping"), 0o644); err != nil { + return fmt.Errorf("probe: write prompt: %w", err) + } + // call provider — discard output, only care about ErrTokensExhausted + _, err = o.Provider.Execute(ctx, providers.ExecuteRequest{ + PromptPath: promptPath, + WorkDir: workDir, + RuntimeDir: runtimeDir, + SessionData: "", // fresh call, no session + }) + if errors.Is(err, providers.ErrTokensExhausted) { + return providers.ErrTokensExhausted // quota still hit + } + return nil // any other result = quota not the issue +} diff --git a/internal/domain/workflowstate/types.go b/internal/domain/workflowstate/types.go index cc6bac9..eed89cd 100644 --- a/internal/domain/workflowstate/types.go +++ b/internal/domain/workflowstate/types.go @@ -18,6 +18,7 @@ const ( FlowStatusDone FlowStatus = "done" FlowStatusFailed FlowStatus = "failed" FlowStatusCancelled FlowStatus = "cancelled" + FlowStatusRescheduled FlowStatus = "rescheduled" ) // StateRun records a single execution of a workflow state for a ticket. diff --git a/internal/server/job_scheduled.go b/internal/server/job_scheduled.go new file mode 100644 index 0000000..922abdf --- /dev/null +++ b/internal/server/job_scheduled.go @@ -0,0 +1,40 @@ +package server + +import ( + "fmt" + "os" + "strings" + + workflowstate "github.com/Neokil/AutoPR/internal/domain/workflowstate" + "github.com/Neokil/AutoPR/internal/markdown" +) + +func (s *server) persistTicketScheduled(repoID, repoRoot, ticket string, repoRt *repoRuntime, cause error) error { + if strings.TrimSpace(ticket) == "" { + return nil + } + + ticketState, err := repoRt.store.LoadState(ticket) + if err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("load ticket state: %w", err) + } + ticketState = workflowstate.New(ticket) + } + + msg := strings.TrimSpace(cause.Error()) + ticketState.FlowStatus = workflowstate.FlowStatusRescheduled + ticketState.LastError = msg + saveErr := repoRt.store.SaveState(ticket, ticketState) + if saveErr != nil { + return fmt.Errorf("save ticket state: %w", saveErr) + } + + body := msg + if ticketState.WorktreePath != "" && ticketState.CurrentState != "" { + logPath := ticketState.CurrentRunLogPath() + _ = markdown.AppendSection(logPath, "Job Rescheduled — Quota Exhausted", body) + } + + return s.syncTicketFromRepo(repoID, repoRoot, ticket, repoRt, true) +} diff --git a/internal/server/jobs.go b/internal/server/jobs.go index b34d1cd..1d356c7 100644 --- a/internal/server/jobs.go +++ b/internal/server/jobs.go @@ -2,19 +2,35 @@ package server import ( "context" + "errors" "fmt" + "log/slog" "strings" "sync" "github.com/Neokil/AutoPR/internal/api" + "github.com/Neokil/AutoPR/internal/providers" "github.com/Neokil/AutoPR/internal/serverstate" ) func (s *server) workerLoop() { for job := range s.jobs { + s.waitIfQuotaReached() s.setJobStatus(job.record, "running", "") err := s.executeJob(job) if err != nil { + if errors.Is(err, providers.ErrTokensExhausted) { + slog.Warn("LLM quota reached during job execution. Marking quota as reached and pausing further jobs.") + s.setJobStatus(job.record, "queued", "") + + s.setQuotaReached(true) + if err := s.reQueueJob(job); err != nil { + slog.Error("quota re-queue failed", "job", job.record.ID, "err", err) + } + continue + + } + s.setJobStatus(job.record, "failed", err.Error()) continue @@ -108,9 +124,16 @@ func (s *server) executeJob(job queuedJob) error { err = fmt.Errorf("%w: %s", errUnsupportedJobAction, job.record.Action) } if err != nil && ticket != "" { - persistErr := s.persistTicketFailure(repoID, repoRoot, ticket, repoRt, job, err) - if persistErr != nil { - return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr) + if errors.Is(err, providers.ErrTokensExhausted) { + persistErr := s.persistTicketScheduled(repoID, repoRoot, ticket, repoRt, err) + if persistErr != nil { + return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr) + } + } else { + persistErr := s.persistTicketFailure(repoID, repoRoot, ticket, repoRt, job, err) + if persistErr != nil { + return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr) + } } } @@ -141,3 +164,30 @@ func (s *server) getTicketLock(repoID, ticket string) *sync.Mutex { return m } + +func (s *server) waitIfQuotaReached() { + s.quotaMu.RLock() + quotaReached := s.quotaReached + resetCh := s.quotaResetCh + s.quotaMu.RUnlock() + + if !quotaReached { + return + } + + <-resetCh + slog.Info("LLM quota reset detected. Resuming job execution.") + +} + +func (s *server) reQueueJob(job queuedJob) error { + select { + case s.jobs <- job: + return nil + // here we could also listen for a shutdown signal if we had one, to avoid trying to re-queue when the server is shutting down. For now, we'll just return an error if the job queue is full. + // case <-context.Background().Done(): + // return fmt.Errorf("re-queue aborted: server shutting down") + default: + return fmt.Errorf("re-queue failed: job queue full") + } +} diff --git a/internal/server/quota_monitor.go b/internal/server/quota_monitor.go new file mode 100644 index 0000000..d148070 --- /dev/null +++ b/internal/server/quota_monitor.go @@ -0,0 +1,72 @@ +package server + +import ( + "context" + "errors" + "log/slog" + "time" + + "github.com/Neokil/AutoPR/internal/providers" +) + +const ( + quotaMonitorInterval = 20 * time.Minute +) + +func (s *server) quotaMonitorLoop() { + ticker := time.NewTicker(quotaMonitorInterval) + + defer ticker.Stop() + for range ticker.C { + s.checkQuotaStatus() + } +} + +func (s *server) isQuotaReached() bool { + s.quotaMu.RLock() + defer s.quotaMu.RUnlock() + return s.quotaReached +} + +func (s *server) setQuotaReached(reached bool) { + s.quotaMu.Lock() + defer s.quotaMu.Unlock() + if reached && !s.quotaReached { + // Create a fresh channel that workers will block on + s.quotaResetCh = make(chan struct{}) + } + if !reached && s.quotaReached { + // Signal all waiting workers to wake up + close(s.quotaResetCh) + } + s.quotaReached = reached + +} + +func (s *server) checkQuotaStatus() { + + if !s.isQuotaReached() { + return + } + slog.Info("quota monitor: probing provider to check if quota has reset") + + repos := s.meta.ListRepos() + if len(repos) == 0 { + slog.Warn("quota monitor: no repos available for probe, skipping") + return + } + rt, err := s.runtimeForRepo(repos[0].Path) + if err != nil { + slog.Error("quota monitor: failed to get runtime for probe", "err", err) + return + } + + probeErr := rt.svc.ProbeProvider(context.Background()) + if errors.Is(probeErr, providers.ErrTokensExhausted) { + slog.Info("quota monitor: quota still reached, will check again later") + return + } + + s.setQuotaReached(false) + slog.Info("quota monitor: quota has reset, resuming operations") +} diff --git a/internal/server/server.go b/internal/server/server.go index 3e107bf..ed354fc 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -70,6 +70,10 @@ type server struct { ticketLockMu sync.Mutex ticketLocks map[string]*sync.Mutex + + quotaReached bool + quotaMu sync.RWMutex + quotaResetCh chan struct{} } var sectionHeaderRE = regexp.MustCompile(`^## (.+) \(([^)]+)\)$`) @@ -96,19 +100,22 @@ func Run(portOverride int) error { } daemon := &server{ - cfg: cfg, - meta: meta, - runtimes: map[string]*repoRuntime{}, - jobs: make(chan queuedJob, jobQueueSize), - repoLocks: map[string]*sync.RWMutex{}, - ticketLocks: map[string]*sync.Mutex{}, - webFS: distFS, - subscribers: map[string]chan api.ServerEvent{}, + cfg: cfg, + meta: meta, + runtimes: map[string]*repoRuntime{}, + jobs: make(chan queuedJob, jobQueueSize), + repoLocks: map[string]*sync.RWMutex{}, + ticketLocks: map[string]*sync.Mutex{}, + webFS: distFS, + subscribers: map[string]chan api.ServerEvent{}, + quotaMu: sync.RWMutex{}, + quotaResetCh: make(chan struct{}), } for range cfg.ServerWorkers { go daemon.workerLoop() } go daemon.prMonitorLoop() + go daemon.quotaMonitorLoop() port := cfg.ServerPort if portOverride > 0 { diff --git a/openapi/openapi.yaml b/openapi/openapi.yaml index 00cfbc2..934a7df 100644 --- a/openapi/openapi.yaml +++ b/openapi/openapi.yaml @@ -305,7 +305,7 @@ components: schemas: FlowStatus: type: string - enum: [pending, running, waiting, done, failed, cancelled] + enum: [pending, running, waiting, done, failed, cancelled, rescheduled] JobStatus: type: string enum: [queued, running, done, failed] diff --git a/web/src/generated/api.ts b/web/src/generated/api.ts index 215da1b..81cc8f3 100644 --- a/web/src/generated/api.ts +++ b/web/src/generated/api.ts @@ -233,7 +233,7 @@ export type webhooks = Record; export interface components { schemas: { /** @enum {string} */ - FlowStatus: "pending" | "running" | "waiting" | "done" | "failed" | "cancelled"; + FlowStatus: "pending" | "running" | "waiting" | "done" | "failed" | "cancelled" | "rescheduled"; /** @enum {string} */ JobStatus: "queued" | "running" | "done" | "failed"; HealthResponse: {