Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct SubmissionDetails {
pub code: String,
pub runs: Vec<SubmissionRun>,
pub job: Option<SubmissionJobStatus>,
pub runner_queue: Option<RunnerQueueStatus>,
}

#[derive(Clone, Debug)]
Expand All @@ -89,6 +90,13 @@ pub struct SubmissionJobStatus {
pub error: Option<String>,
}

#[derive(Clone, Debug)]
pub struct RunnerQueueStatus {
pub runner: Option<String>,
pub gpu: Option<String>,
pub queued_jobs: Option<i64>,
}

/// A single run within a submission
#[derive(Clone, Debug)]
pub struct SubmissionRun {
Expand Down
84 changes: 74 additions & 10 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use tokio::io::AsyncWriteExt;
use tokio::time::sleep;

use crate::models::{
GpuItem, LeaderboardItem, SubmissionDetails, SubmissionJobStatus, SubmissionRun,
UserSubmission, UserSubmissionRun,
GpuItem, LeaderboardItem, RunnerQueueStatus, SubmissionDetails, SubmissionJobStatus,
SubmissionRun, UserSubmission, UserSubmissionRun,
};

const SUBMISSION_POLL_INTERVAL_SECONDS: u64 = 5;
Expand All @@ -31,6 +31,41 @@ fn parse_score(value: &Value) -> Option<f64> {
}
}

fn parse_runner_queue(value: Option<&Value>) -> Option<RunnerQueueStatus> {
let queue = value?;
if queue.is_null() {
return None;
}

Some(RunnerQueueStatus {
runner: queue
.get("runner")
.and_then(|v| v.as_str())
.map(str::to_string),
gpu: queue
.get("gpu")
.and_then(|v| v.as_str())
.map(str::to_string),
queued_jobs: queue.get("queued_jobs").and_then(|v| v.as_i64()),
})
}

fn runner_queue_summary(queue: Option<&RunnerQueueStatus>) -> Option<String> {
let queue = queue?;
let queued_jobs = queue.queued_jobs?;
let runner = queue.runner.as_deref().unwrap_or("Runner");
let gpu = queue.gpu.as_deref().unwrap_or("selected GPU");
let suffix = match queued_jobs {
1 => "job",
_ => "jobs",
};

Some(format!(
"{} {} queue: {} queued {}",
runner, gpu, queued_jobs, suffix
))
}

// Helper function to create a reusable reqwest client
pub fn create_client(cli_id: Option<String>) -> Result<Client> {
let mut default_headers = HeaderMap::new();
Expand Down Expand Up @@ -504,6 +539,7 @@ pub async fn get_user_submission(client: &Client, submission_id: i64) -> Result<
code: sub["code"].as_str().unwrap_or("").to_string(),
runs,
job,
runner_queue: parse_runner_queue(sub.get("runner_queue")),
})
}

Expand Down Expand Up @@ -657,12 +693,19 @@ async fn submit_solution_background<P: AsRef<Path>>(
.and_then(|v| v.get("id"))
.and_then(|v| v.as_i64())
.ok_or_else(|| anyhow!("Server did not return a submission id"))?;
let accepted_runner_queue = parse_runner_queue(accepted.get("runner_queue"));

if let Some(ref cb) = on_log {
cb(format!(
"Submission {} accepted. Waiting for results...",
submission_id
));
match runner_queue_summary(accepted_runner_queue.as_ref()) {
Some(queue_summary) => cb(format!(
"Submission {} accepted. {}. Waiting for results...",
submission_id, queue_summary
)),
None => cb(format!(
"Submission {} accepted. Waiting for results...",
submission_id
)),
}
}

let mut elapsed = 0;
Expand All @@ -675,10 +718,16 @@ async fn submit_solution_background<P: AsRef<Path>>(
.unwrap_or(if details.done { "done" } else { "pending" });

if let Some(ref cb) = on_log {
cb(format!(
"Submission {} status: {} ({}s)",
submission_id, job_status, elapsed
));
match runner_queue_summary(details.runner_queue.as_ref()) {
Some(queue_summary) => cb(format!(
"Submission {} status: {} - {} ({}s)",
submission_id, job_status, queue_summary, elapsed
)),
None => cb(format!(
"Submission {} status: {} ({}s)",
submission_id, job_status, elapsed
)),
}
}

match job_status {
Expand Down Expand Up @@ -1318,9 +1367,24 @@ mod tests {
code: String::new(),
runs,
job: None,
runner_queue: None,
}
}

#[test]
fn test_runner_queue_summary_reports_queued_jobs() {
let queue = RunnerQueueStatus {
runner: Some("Modal".to_string()),
gpu: Some("B200".to_string()),
queued_jobs: Some(5),
};

assert_eq!(
runner_queue_summary(Some(&queue)),
Some("Modal B200 queue: 5 queued jobs".to_string())
);
}

#[test]
fn test_leaderboard_score_summary_reports_geomean_scores() {
// Only the scored `leaderboard` runs are reported; test/benchmark and
Expand Down
Loading