Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions internal/api/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions internal/application/tickets/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,13 @@ func (o *Orchestrator) runState(ctx context.Context, state *workflowstate.State,
_ = os.WriteFile(rawLogPath, []byte(result.RawOutput+"\n\n[stderr]\n"+result.Stderr), 0o644)
if err != nil {
if errors.Is(err, providers.ErrTokensExhausted) {
// Mark the ticket as rescheduled to indicate that it should be automatically re-run when the quota resets.
err = fmt.Errorf("token usage limit reached — wait for your quota to reset, then rerun this ticket to continue: %w", err)
state.FlowStatus = workflowstate.FlowStatusRescheduled
_ = o.Store.SaveState(state.TicketNumber, *state)
return err
}

_ = markdown.AppendSection(logPath, stateCfg.Name+" Failed", err.Error())

return o.failState(state, err)
Expand Down Expand Up @@ -825,3 +830,35 @@ func EnsureStateIgnored(repoRoot, stateDirName string) error {

return nil
}

func (o *Orchestrator) ProbeProvider(ctx context.Context) error {
// create temp work dir (WorkDir must be a real existing path)
workDir, err := os.MkdirTemp("", "autopr-probe-work-*")
if err != nil {
return fmt.Errorf("probe: create work dir: %w", err)
}
defer os.RemoveAll(workDir)
// create temp runtime dir (RuntimeDir must be a real existing path)
runtimeDir, err := os.MkdirTemp("", "autopr-probe-runtime-*")
if err != nil {
return fmt.Errorf("probe: create runtime dir: %w", err)
}
defer os.RemoveAll(runtimeDir)

// write minimal prompt file (PromptPath must exist on disk — CLIProvider reads it)
promptPath := filepath.Join(workDir, "probe.md")
if err = os.WriteFile(promptPath, []byte("ping"), 0o644); err != nil {
return fmt.Errorf("probe: write prompt: %w", err)
}
// call provider — discard output, only care about ErrTokensExhausted
_, err = o.Provider.Execute(ctx, providers.ExecuteRequest{
PromptPath: promptPath,
WorkDir: workDir,
RuntimeDir: runtimeDir,
SessionData: "", // fresh call, no session
})
if errors.Is(err, providers.ErrTokensExhausted) {
return providers.ErrTokensExhausted // quota still hit
}
return nil // any other result = quota not the issue
}
1 change: 1 addition & 0 deletions internal/domain/workflowstate/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
FlowStatusDone FlowStatus = "done"
FlowStatusFailed FlowStatus = "failed"
FlowStatusCancelled FlowStatus = "cancelled"
FlowStatusRescheduled FlowStatus = "rescheduled"
)

// StateRun records a single execution of a workflow state for a ticket.
Expand Down
40 changes: 40 additions & 0 deletions internal/server/job_scheduled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package server

import (
"fmt"
"os"
"strings"

workflowstate "github.com/Neokil/AutoPR/internal/domain/workflowstate"
"github.com/Neokil/AutoPR/internal/markdown"
)

func (s *server) persistTicketScheduled(repoID, repoRoot, ticket string, repoRt *repoRuntime, cause error) error {
if strings.TrimSpace(ticket) == "" {
return nil
}

ticketState, err := repoRt.store.LoadState(ticket)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("load ticket state: %w", err)
}
ticketState = workflowstate.New(ticket)
}

msg := strings.TrimSpace(cause.Error())
ticketState.FlowStatus = workflowstate.FlowStatusRescheduled
ticketState.LastError = msg
saveErr := repoRt.store.SaveState(ticket, ticketState)
if saveErr != nil {
return fmt.Errorf("save ticket state: %w", saveErr)
}

body := msg
if ticketState.WorktreePath != "" && ticketState.CurrentState != "" {
logPath := ticketState.CurrentRunLogPath()
_ = markdown.AppendSection(logPath, "Job Rescheduled — Quota Exhausted", body)
}

return s.syncTicketFromRepo(repoID, repoRoot, ticket, repoRt, true)
}
56 changes: 53 additions & 3 deletions internal/server/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,35 @@ package server

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"sync"

"github.com/Neokil/AutoPR/internal/api"
"github.com/Neokil/AutoPR/internal/providers"
"github.com/Neokil/AutoPR/internal/serverstate"
)

func (s *server) workerLoop() {
for job := range s.jobs {
s.waitIfQuotaReached()
s.setJobStatus(job.record, "running", "")
err := s.executeJob(job)
if err != nil {
if errors.Is(err, providers.ErrTokensExhausted) {
slog.Warn("LLM quota reached during job execution. Marking quota as reached and pausing further jobs.")
s.setJobStatus(job.record, "queued", "")

s.setQuotaReached(true)
if err := s.reQueueJob(job); err != nil {
slog.Error("quota re-queue failed", "job", job.record.ID, "err", err)
}
continue

}

s.setJobStatus(job.record, "failed", err.Error())

continue
Expand Down Expand Up @@ -108,9 +124,16 @@ func (s *server) executeJob(job queuedJob) error {
err = fmt.Errorf("%w: %s", errUnsupportedJobAction, job.record.Action)
}
if err != nil && ticket != "" {
persistErr := s.persistTicketFailure(repoID, repoRoot, ticket, repoRt, job, err)
if persistErr != nil {
return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr)
if errors.Is(err, providers.ErrTokensExhausted) {
persistErr := s.persistTicketScheduled(repoID, repoRoot, ticket, repoRt, err)
if persistErr != nil {
return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr)
}
} else {
persistErr := s.persistTicketFailure(repoID, repoRoot, ticket, repoRt, job, err)
if persistErr != nil {
return fmt.Errorf("%w (also failed to persist ticket failure: %w)", err, persistErr)
}
}
}

Expand Down Expand Up @@ -141,3 +164,30 @@ func (s *server) getTicketLock(repoID, ticket string) *sync.Mutex {

return m
}

func (s *server) waitIfQuotaReached() {
s.quotaMu.RLock()
quotaReached := s.quotaReached
resetCh := s.quotaResetCh
s.quotaMu.RUnlock()

if !quotaReached {
return
}

<-resetCh
slog.Info("LLM quota reset detected. Resuming job execution.")

}

func (s *server) reQueueJob(job queuedJob) error {
select {
case s.jobs <- job:
return nil
// here we could also listen for a shutdown signal if we had one, to avoid trying to re-queue when the server is shutting down. For now, we'll just return an error if the job queue is full.
// case <-context.Background().Done():
// return fmt.Errorf("re-queue aborted: server shutting down")
default:
return fmt.Errorf("re-queue failed: job queue full")
}
}
72 changes: 72 additions & 0 deletions internal/server/quota_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package server

import (
"context"
"errors"
"log/slog"
"time"

"github.com/Neokil/AutoPR/internal/providers"
)

const (
quotaMonitorInterval = 20 * time.Minute
)

func (s *server) quotaMonitorLoop() {
ticker := time.NewTicker(quotaMonitorInterval)

defer ticker.Stop()
for range ticker.C {
s.checkQuotaStatus()
}
}

func (s *server) isQuotaReached() bool {
s.quotaMu.RLock()
defer s.quotaMu.RUnlock()
return s.quotaReached
}

func (s *server) setQuotaReached(reached bool) {
s.quotaMu.Lock()
defer s.quotaMu.Unlock()
if reached && !s.quotaReached {
// Create a fresh channel that workers will block on
s.quotaResetCh = make(chan struct{})
}
if !reached && s.quotaReached {
// Signal all waiting workers to wake up
close(s.quotaResetCh)
}
s.quotaReached = reached

}

func (s *server) checkQuotaStatus() {

if !s.isQuotaReached() {
return
}
slog.Info("quota monitor: probing provider to check if quota has reset")

repos := s.meta.ListRepos()
if len(repos) == 0 {
slog.Warn("quota monitor: no repos available for probe, skipping")
return
}
rt, err := s.runtimeForRepo(repos[0].Path)
if err != nil {
slog.Error("quota monitor: failed to get runtime for probe", "err", err)
return
}

probeErr := rt.svc.ProbeProvider(context.Background())
if errors.Is(probeErr, providers.ErrTokensExhausted) {
slog.Info("quota monitor: quota still reached, will check again later")
return
}

s.setQuotaReached(false)
slog.Info("quota monitor: quota has reset, resuming operations")
}
23 changes: 15 additions & 8 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type server struct {

ticketLockMu sync.Mutex
ticketLocks map[string]*sync.Mutex

quotaReached bool
quotaMu sync.RWMutex
quotaResetCh chan struct{}
}

var sectionHeaderRE = regexp.MustCompile(`^## (.+) \(([^)]+)\)$`)
Expand All @@ -96,19 +100,22 @@ func Run(portOverride int) error {
}

daemon := &server{
cfg: cfg,
meta: meta,
runtimes: map[string]*repoRuntime{},
jobs: make(chan queuedJob, jobQueueSize),
repoLocks: map[string]*sync.RWMutex{},
ticketLocks: map[string]*sync.Mutex{},
webFS: distFS,
subscribers: map[string]chan api.ServerEvent{},
cfg: cfg,
meta: meta,
runtimes: map[string]*repoRuntime{},
jobs: make(chan queuedJob, jobQueueSize),
repoLocks: map[string]*sync.RWMutex{},
ticketLocks: map[string]*sync.Mutex{},
webFS: distFS,
subscribers: map[string]chan api.ServerEvent{},
quotaMu: sync.RWMutex{},
quotaResetCh: make(chan struct{}),
}
for range cfg.ServerWorkers {
go daemon.workerLoop()
}
go daemon.prMonitorLoop()
go daemon.quotaMonitorLoop()

port := cfg.ServerPort
if portOverride > 0 {
Expand Down
2 changes: 1 addition & 1 deletion openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ components:
schemas:
FlowStatus:
type: string
enum: [pending, running, waiting, done, failed, cancelled]
enum: [pending, running, waiting, done, failed, cancelled, rescheduled]
JobStatus:
type: string
enum: [queued, running, done, failed]
Expand Down
2 changes: 1 addition & 1 deletion web/src/generated/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ export type webhooks = Record<string, never>;
export interface components {
schemas: {
/** @enum {string} */
FlowStatus: "pending" | "running" | "waiting" | "done" | "failed" | "cancelled";
FlowStatus: "pending" | "running" | "waiting" | "done" | "failed" | "cancelled" | "rescheduled";
/** @enum {string} */
JobStatus: "queued" | "running" | "done" | "failed";
HealthResponse: {
Expand Down