From ed3f1a39190467b08dcceb7c800a07d2aa015e2f Mon Sep 17 00:00:00 2001 From: Mark Saroufim Date: Wed, 24 Jun 2026 22:20:39 -0700 Subject: [PATCH] Show submission queue status --- src/cmd/submit.rs | 18 ++++++- src/models/mod.rs | 9 ++++ src/service/mod.rs | 98 +++++++++++++++++++++++++++++++++++---- src/views/loading_page.rs | 8 +++- 4 files changed, 121 insertions(+), 12 deletions(-) diff --git a/src/cmd/submit.rs b/src/cmd/submit.rs index ec8e6fb..4700cbe 100644 --- a/src/cmd/submit.rs +++ b/src/cmd/submit.rs @@ -9,6 +9,7 @@ use ratatui::prelude::*; use ratatui::style::{Color, Style, Stylize}; use ratatui::text::{Line, Span}; use ratatui::widgets::{Block, Borders, List, ListItem, ListState}; +use tokio::sync::mpsc; use tokio::task::JoinHandle; use crate::models::{AppState, GpuItem, LeaderboardItem, SubmissionModeItem}; @@ -41,6 +42,7 @@ pub struct App { pub submission_task: Option>>, pub leaderboards_task: Option, anyhow::Error>>>, pub gpus_task: Option, anyhow::Error>>>, + pub submission_log_rx: Option>, pub loading_page_state: LoadingPageState, @@ -300,6 +302,9 @@ impl App { let mut file_content = Vec::new(); file.read_to_end(&mut file_content)?; + let (log_tx, log_rx) = mpsc::unbounded_channel(); + self.submission_log_rx = Some(log_rx); + self.submission_task = Some(tokio::spawn(async move { service::submit_solution( &client, @@ -308,13 +313,23 @@ impl App { &leaderboard, &gpu, &mode, - None, + Some(Box::new(move |msg| { + let _ = log_tx.send(msg); + })), ) .await })); Ok(()) } + pub fn drain_submission_logs(&mut self) { + if let Some(rx) = &mut self.submission_log_rx { + while let Ok(message) = rx.try_recv() { + self.loading_page_state.status_text = Some(message); + } + } + } + pub async fn check_leaderboard_task(&mut self) { if let Some(handle) = &mut self.leaderboards_task { if handle.is_finished() { @@ -610,6 +625,7 @@ pub async fn run_submit_tui( app.check_leaderboard_task().await; app.check_gpu_task().await; + app.drain_submission_logs(); app.check_submission_task().await; app.update_loading_page_state(terminal.size()?.width); diff --git a/src/models/mod.rs b/src/models/mod.rs index 4ade4e5..f2a5286 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -81,6 +81,7 @@ pub struct SubmissionDetails { pub code: String, pub runs: Vec, pub job: Option, + pub queue: Option, } #[derive(Clone, Debug)] @@ -89,6 +90,14 @@ pub struct SubmissionJobStatus { pub error: Option, } +#[derive(Clone, Debug)] +pub struct SubmissionQueueStatus { + pub stage: Option, + pub message: Option, + pub position: Option, + pub jobs_ahead: Option, +} + /// A single run within a submission #[derive(Clone, Debug)] pub struct SubmissionRun { diff --git a/src/service/mod.rs b/src/service/mod.rs index 07a43d2..b33a42b 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -12,8 +12,8 @@ use tokio::io::AsyncWriteExt; use tokio::time::sleep; use crate::models::{ - GpuItem, LeaderboardItem, SubmissionDetails, SubmissionJobStatus, SubmissionRun, - UserSubmission, UserSubmissionRun, + GpuItem, LeaderboardItem, SubmissionDetails, SubmissionJobStatus, SubmissionQueueStatus, + SubmissionRun, UserSubmission, UserSubmissionRun, }; const SUBMISSION_POLL_INTERVAL_SECONDS: u64 = 5; @@ -31,6 +31,49 @@ fn parse_score(value: &Value) -> Option { } } +fn parse_submission_queue(value: Option<&Value>) -> Option { + let queue = value?; + if queue.is_null() { + return None; + } + + Some(SubmissionQueueStatus { + stage: queue + .get("stage") + .and_then(|v| v.as_str()) + .map(str::to_string), + message: queue + .get("message") + .and_then(|v| v.as_str()) + .map(str::to_string), + position: queue.get("position").and_then(|v| v.as_i64()), + jobs_ahead: queue.get("jobs_ahead").and_then(|v| v.as_i64()), + }) +} + +fn submission_queue_summary(queue: Option<&SubmissionQueueStatus>) -> Option { + let queue = queue?; + let message = queue + .message + .as_deref() + .or(match queue.stage.as_deref() { + Some("queued") => Some("In KernelBot queue"), + Some("dispatched") => Some("Job dispatched to Modal/GitHub runner"), + _ => None, + }) + .unwrap_or("Submission status unknown"); + + match queue.position { + Some(position) => Some(format!( + "{} (position {}, {} ahead)", + message, + position, + queue.jobs_ahead.unwrap_or(position.saturating_sub(1)), + )), + None => Some(message.to_string()), + } +} + // Helper function to create a reusable reqwest client pub fn create_client(cli_id: Option) -> Result { let mut default_headers = HeaderMap::new(); @@ -504,6 +547,7 @@ pub async fn get_user_submission(client: &Client, submission_id: i64) -> Result< code: sub["code"].as_str().unwrap_or("").to_string(), runs, job, + queue: parse_submission_queue(sub.get("queue")), }) } @@ -659,10 +703,18 @@ async fn submit_solution_background>( .ok_or_else(|| anyhow!("Server did not return a submission id"))?; if let Some(ref cb) = on_log { - cb(format!( - "Submission {} accepted. Waiting for results...", - submission_id - )); + let queue = parse_submission_queue(accepted.get("queue")); + if let Some(queue_summary) = submission_queue_summary(queue.as_ref()) { + cb(format!( + "Submission {} accepted: {}. Waiting for results...", + submission_id, queue_summary + )); + } else { + cb(format!( + "Submission {} accepted. Waiting for results...", + submission_id + )); + } } let mut elapsed = 0; @@ -675,10 +727,17 @@ async fn submit_solution_background>( .unwrap_or(if details.done { "done" } else { "pending" }); if let Some(ref cb) = on_log { - cb(format!( - "Submission {} status: {} ({}s)", - submission_id, job_status, elapsed - )); + if let Some(queue_summary) = submission_queue_summary(details.queue.as_ref()) { + cb(format!( + "Submission {} status: {} - {} ({}s)", + submission_id, job_status, queue_summary, elapsed + )); + } else { + cb(format!( + "Submission {} status: {} ({}s)", + submission_id, job_status, elapsed + )); + } } match job_status { @@ -1318,9 +1377,28 @@ mod tests { code: String::new(), runs, job: None, + queue: None, } } + #[test] + fn test_submission_queue_summary_reports_position() { + use serde_json::json; + + let queue = parse_submission_queue(Some(&json!({ + "stage": "queued", + "message": "In KernelBot queue", + "position": 3, + "jobs_ahead": 2 + }))) + .expect("expected queue"); + + assert_eq!( + submission_queue_summary(Some(&queue)).unwrap(), + "In KernelBot queue (position 3, 2 ahead)" + ); + } + #[test] fn test_leaderboard_score_summary_reports_geomean_scores() { // Only the scored `leaderboard` runs are reported; test/benchmark and diff --git a/src/views/loading_page.rs b/src/views/loading_page.rs index 02a809c..f995960 100644 --- a/src/views/loading_page.rs +++ b/src/views/loading_page.rs @@ -2,7 +2,7 @@ use ratatui::{ buffer::Buffer, layout::{Alignment, Layout, Rect}, style::{Color, Stylize}, - widgets::{Block, Gauge, Padding, Paragraph, StatefulWidget, Widget}, + widgets::{Block, Gauge, Padding, Paragraph, StatefulWidget, Widget, Wrap}, }; #[derive(Debug, Default, Clone)] @@ -10,6 +10,7 @@ pub struct LoadingPageState { pub loop_count: u16, pub progress_column: u16, pub progress_bar: f64, + pub status_text: Option, } #[derive(Default, Debug, PartialEq, Eq, Clone)] @@ -56,6 +57,10 @@ fn render_gauge(area: Rect, buf: &mut Buffer, state: &mut LoadingPageState) { } fn get_footer_text(state: &LoadingPageState) -> String { + if let Some(status_text) = &state.status_text { + return status_text.clone(); + } + let percentage = state.progress_bar; if state.loop_count > 0 { @@ -77,6 +82,7 @@ fn render_footer(area: Rect, buf: &mut Buffer, state: &LoadingPageState) { .alignment(Alignment::Center) .fg(Color::White) .bold() + .wrap(Wrap { trim: true }) .block(blk); text.render(area, buf);