Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/cmd/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ratatui::prelude::*;
use ratatui::style::{Color, Style, Stylize};
use ratatui::text::{Line, Span};
use ratatui::widgets::{Block, Borders, List, ListItem, ListState};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::models::{AppState, GpuItem, LeaderboardItem, SubmissionModeItem};
Expand Down Expand Up @@ -41,6 +42,7 @@ pub struct App {
pub submission_task: Option<JoinHandle<Result<String, anyhow::Error>>>,
pub leaderboards_task: Option<JoinHandle<Result<Vec<LeaderboardItem>, anyhow::Error>>>,
pub gpus_task: Option<JoinHandle<Result<Vec<GpuItem>, anyhow::Error>>>,
pub submission_log_rx: Option<mpsc::UnboundedReceiver<String>>,

pub loading_page_state: LoadingPageState,

Expand Down Expand Up @@ -300,6 +302,9 @@ impl App {
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)?;

let (log_tx, log_rx) = mpsc::unbounded_channel();
self.submission_log_rx = Some(log_rx);

self.submission_task = Some(tokio::spawn(async move {
service::submit_solution(
&client,
Expand All @@ -308,13 +313,23 @@ impl App {
&leaderboard,
&gpu,
&mode,
None,
Some(Box::new(move |msg| {
let _ = log_tx.send(msg);
})),
)
.await
}));
Ok(())
}

pub fn drain_submission_logs(&mut self) {
if let Some(rx) = &mut self.submission_log_rx {
while let Ok(message) = rx.try_recv() {
self.loading_page_state.status_text = Some(message);
}
}
}

pub async fn check_leaderboard_task(&mut self) {
if let Some(handle) = &mut self.leaderboards_task {
if handle.is_finished() {
Expand Down Expand Up @@ -610,6 +625,7 @@ pub async fn run_submit_tui(

app.check_leaderboard_task().await;
app.check_gpu_task().await;
app.drain_submission_logs();
app.check_submission_task().await;

app.update_loading_page_state(terminal.size()?.width);
Expand Down
9 changes: 9 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct SubmissionDetails {
pub code: String,
pub runs: Vec<SubmissionRun>,
pub job: Option<SubmissionJobStatus>,
pub queue: Option<SubmissionQueueStatus>,
}

#[derive(Clone, Debug)]
Expand All @@ -89,6 +90,14 @@ pub struct SubmissionJobStatus {
pub error: Option<String>,
}

#[derive(Clone, Debug)]
pub struct SubmissionQueueStatus {
pub stage: Option<String>,
pub message: Option<String>,
pub position: Option<i64>,
pub jobs_ahead: Option<i64>,
}

/// A single run within a submission
#[derive(Clone, Debug)]
pub struct SubmissionRun {
Expand Down
98 changes: 88 additions & 10 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use tokio::io::AsyncWriteExt;
use tokio::time::sleep;

use crate::models::{
GpuItem, LeaderboardItem, SubmissionDetails, SubmissionJobStatus, SubmissionRun,
UserSubmission, UserSubmissionRun,
GpuItem, LeaderboardItem, SubmissionDetails, SubmissionJobStatus, SubmissionQueueStatus,
SubmissionRun, UserSubmission, UserSubmissionRun,
};

const SUBMISSION_POLL_INTERVAL_SECONDS: u64 = 5;
Expand All @@ -31,6 +31,49 @@ fn parse_score(value: &Value) -> Option<f64> {
}
}

fn parse_submission_queue(value: Option<&Value>) -> Option<SubmissionQueueStatus> {
let queue = value?;
if queue.is_null() {
return None;
}

Some(SubmissionQueueStatus {
stage: queue
.get("stage")
.and_then(|v| v.as_str())
.map(str::to_string),
message: queue
.get("message")
.and_then(|v| v.as_str())
.map(str::to_string),
position: queue.get("position").and_then(|v| v.as_i64()),
jobs_ahead: queue.get("jobs_ahead").and_then(|v| v.as_i64()),
})
}

fn submission_queue_summary(queue: Option<&SubmissionQueueStatus>) -> Option<String> {
let queue = queue?;
let message = queue
.message
.as_deref()
.or(match queue.stage.as_deref() {
Some("queued") => Some("In KernelBot queue"),
Some("dispatched") => Some("Job dispatched to Modal/GitHub runner"),
_ => None,
})
.unwrap_or("Submission status unknown");

match queue.position {
Some(position) => Some(format!(
"{} (position {}, {} ahead)",
message,
position,
queue.jobs_ahead.unwrap_or(position.saturating_sub(1)),
)),
None => Some(message.to_string()),
}
}

// Helper function to create a reusable reqwest client
pub fn create_client(cli_id: Option<String>) -> Result<Client> {
let mut default_headers = HeaderMap::new();
Expand Down Expand Up @@ -504,6 +547,7 @@ pub async fn get_user_submission(client: &Client, submission_id: i64) -> Result<
code: sub["code"].as_str().unwrap_or("").to_string(),
runs,
job,
queue: parse_submission_queue(sub.get("queue")),
})
}

Expand Down Expand Up @@ -659,10 +703,18 @@ async fn submit_solution_background<P: AsRef<Path>>(
.ok_or_else(|| anyhow!("Server did not return a submission id"))?;

if let Some(ref cb) = on_log {
cb(format!(
"Submission {} accepted. Waiting for results...",
submission_id
));
let queue = parse_submission_queue(accepted.get("queue"));
if let Some(queue_summary) = submission_queue_summary(queue.as_ref()) {
cb(format!(
"Submission {} accepted: {}. Waiting for results...",
submission_id, queue_summary
));
} else {
cb(format!(
"Submission {} accepted. Waiting for results...",
submission_id
));
}
}

let mut elapsed = 0;
Expand All @@ -675,10 +727,17 @@ async fn submit_solution_background<P: AsRef<Path>>(
.unwrap_or(if details.done { "done" } else { "pending" });

if let Some(ref cb) = on_log {
cb(format!(
"Submission {} status: {} ({}s)",
submission_id, job_status, elapsed
));
if let Some(queue_summary) = submission_queue_summary(details.queue.as_ref()) {
cb(format!(
"Submission {} status: {} - {} ({}s)",
submission_id, job_status, queue_summary, elapsed
));
} else {
cb(format!(
"Submission {} status: {} ({}s)",
submission_id, job_status, elapsed
));
}
}

match job_status {
Expand Down Expand Up @@ -1318,9 +1377,28 @@ mod tests {
code: String::new(),
runs,
job: None,
queue: None,
}
}

#[test]
fn test_submission_queue_summary_reports_position() {
use serde_json::json;

let queue = parse_submission_queue(Some(&json!({
"stage": "queued",
"message": "In KernelBot queue",
"position": 3,
"jobs_ahead": 2
})))
.expect("expected queue");

assert_eq!(
submission_queue_summary(Some(&queue)).unwrap(),
"In KernelBot queue (position 3, 2 ahead)"
);
}

#[test]
fn test_leaderboard_score_summary_reports_geomean_scores() {
// Only the scored `leaderboard` runs are reported; test/benchmark and
Expand Down
8 changes: 7 additions & 1 deletion src/views/loading_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use ratatui::{
buffer::Buffer,
layout::{Alignment, Layout, Rect},
style::{Color, Stylize},
widgets::{Block, Gauge, Padding, Paragraph, StatefulWidget, Widget},
widgets::{Block, Gauge, Padding, Paragraph, StatefulWidget, Widget, Wrap},
};

#[derive(Debug, Default, Clone)]
pub struct LoadingPageState {
pub loop_count: u16,
pub progress_column: u16,
pub progress_bar: f64,
pub status_text: Option<String>,
}

#[derive(Default, Debug, PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -56,6 +57,10 @@ fn render_gauge(area: Rect, buf: &mut Buffer, state: &mut LoadingPageState) {
}

fn get_footer_text(state: &LoadingPageState) -> String {
if let Some(status_text) = &state.status_text {
return status_text.clone();
}

let percentage = state.progress_bar;

if state.loop_count > 0 {
Expand All @@ -77,6 +82,7 @@ fn render_footer(area: Rect, buf: &mut Buffer, state: &LoadingPageState) {
.alignment(Alignment::Center)
.fg(Color::White)
.bold()
.wrap(Wrap { trim: true })
.block(blk);

text.render(area, buf);
Expand Down
Loading