From 50781a4d2316ab3a13f83079e13d0835f8c32e67 Mon Sep 17 00:00:00 2001 From: Mark Saroufim Date: Thu, 18 Jun 2026 16:23:34 -0700 Subject: [PATCH] Submit through background jobs --- src/models/mod.rs | 7 ++ src/service/mod.rs | 193 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 199 insertions(+), 1 deletion(-) diff --git a/src/models/mod.rs b/src/models/mod.rs index 4853d23..4ade4e5 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -80,6 +80,13 @@ pub struct SubmissionDetails { pub done: bool, pub code: String, pub runs: Vec, + pub job: Option, +} + +#[derive(Clone, Debug)] +pub struct SubmissionJobStatus { + pub status: Option, + pub error: Option, } /// A single run within a submission diff --git a/src/service/mod.rs b/src/service/mod.rs index bcdfa80..37abe1b 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -9,11 +9,16 @@ use std::env; use std::path::Path; use std::time::Duration; use tokio::io::AsyncWriteExt; +use tokio::time::sleep; use crate::models::{ - GpuItem, LeaderboardItem, SubmissionDetails, SubmissionRun, UserSubmission, UserSubmissionRun, + GpuItem, LeaderboardItem, SubmissionDetails, SubmissionJobStatus, SubmissionRun, + UserSubmission, UserSubmissionRun, }; +const SUBMISSION_POLL_INTERVAL_SECONDS: u64 = 5; +const SUBMISSION_POLL_TIMEOUT_SECONDS: u64 = 60 * 60; + // Helper function to create a reusable reqwest client pub fn create_client(cli_id: Option) -> Result { let mut default_headers = HeaderMap::new(); @@ -465,6 +470,17 @@ pub async fn get_user_submission(client: &Client, submission_id: i64) -> Result< }) .unwrap_or_default(); + let job = sub.get("job").and_then(|job| { + if job.is_null() { + None + } else { + Some(SubmissionJobStatus { + status: job["status"].as_str().map(str::to_string), + error: job["error"].as_str().map(str::to_string), + }) + } + }); + Ok(SubmissionDetails { id: sub["id"].as_i64().unwrap_or(0), leaderboard_id: sub["leaderboard_id"].as_i64().unwrap_or(0), @@ -475,6 +491,7 @@ pub async fn get_user_submission(client: &Client, submission_id: i64) -> Result< done: sub["done"].as_bool().unwrap_or(false), code: sub["code"].as_str().unwrap_or("").to_string(), runs, + job, }) } @@ -547,6 +564,180 @@ pub async fn submit_solution>( gpu: &str, submission_mode: &str, on_log: Option>, +) -> Result { + if submission_mode.eq_ignore_ascii_case("profile") { + return submit_solution_streaming( + client, + filepath, + file_content, + leaderboard, + gpu, + submission_mode, + on_log, + ) + .await; + } + + submit_solution_background( + client, + filepath, + file_content, + leaderboard, + gpu, + submission_mode, + on_log, + ) + .await +} + +async fn submit_solution_background>( + client: &Client, + filepath: P, + file_content: &[u8], + leaderboard: &str, + gpu: &str, + submission_mode: &str, + on_log: Option>, +) -> Result { + let base_url = + env::var("POPCORN_API_URL").map_err(|_| anyhow!("POPCORN_API_URL is not set"))?; + + let filename = filepath + .as_ref() + .file_name() + .ok_or_else(|| anyhow!("Invalid filepath"))? + .to_string_lossy(); + + let part = Part::bytes(file_content.to_vec()).file_name(filename.to_string()); + let form = Form::new().part("file", part); + let url = format!( + "{}/submission/{}/{}/{}", + base_url, + leaderboard.to_lowercase(), + gpu, + submission_mode.to_lowercase() + ); + + let resp = client + .post(&url) + .multipart(form) + .timeout(Duration::from_secs(60)) + .send() + .await?; + + let status = resp.status(); + if !status.is_success() { + let error_text = resp.text().await?; + let detail = serde_json::from_str::(&error_text) + .ok() + .and_then(|v| v.get("detail").and_then(|d| d.as_str()).map(str::to_string)); + + return Err(anyhow!( + "Server returned status {}: {}", + status, + detail.unwrap_or(error_text) + )); + } + + let accepted: Value = resp.json().await?; + let submission_id = accepted + .get("details") + .and_then(|v| v.get("id")) + .and_then(|v| v.as_i64()) + .ok_or_else(|| anyhow!("Server did not return a submission id"))?; + + if let Some(ref cb) = on_log { + cb(format!( + "Submission {} accepted. Waiting for results...", + submission_id + )); + } + + let mut elapsed = 0; + loop { + let details = get_user_submission(client, submission_id).await?; + let job_status = details + .job + .as_ref() + .and_then(|job| job.status.as_deref()) + .unwrap_or(if details.done { "done" } else { "pending" }); + + if let Some(ref cb) = on_log { + cb(format!( + "Submission {} status: {} ({}s)", + submission_id, job_status, elapsed + )); + } + + match job_status { + "failed" | "timed_out" | "hacked" => { + let error = details + .job + .as_ref() + .and_then(|job| job.error.as_deref()) + .unwrap_or("No error details were provided"); + return Err(anyhow!( + "Submission {} {}: {}", + submission_id, + job_status, + error + )); + } + _ => {} + } + + if details.done { + return format_submission_details(&details); + } + + if elapsed >= SUBMISSION_POLL_TIMEOUT_SECONDS { + return Err(anyhow!( + "Timed out waiting for submission {} after {} seconds", + submission_id, + SUBMISSION_POLL_TIMEOUT_SECONDS + )); + } + + sleep(Duration::from_secs(SUBMISSION_POLL_INTERVAL_SECONDS)).await; + elapsed += SUBMISSION_POLL_INTERVAL_SECONDS; + } +} + +fn format_submission_details(details: &SubmissionDetails) -> Result { + let runs: Vec = details + .runs + .iter() + .map(|run| { + serde_json::json!({ + "mode": run.mode, + "secret": run.secret, + "runner": run.runner, + "score": run.score, + "passed": run.passed, + "start_time": run.start_time, + "end_time": run.end_time, + }) + }) + .collect(); + + serde_json::to_string_pretty(&serde_json::json!({ + "submission_id": details.id, + "leaderboard": details.leaderboard_name, + "file_name": details.file_name, + "done": details.done, + "runs": runs, + })) + .map_err(|e| anyhow!("Failed to format submission result: {}", e)) +} + +async fn submit_solution_streaming>( + client: &Client, + filepath: P, + file_content: &[u8], + leaderboard: &str, + gpu: &str, + submission_mode: &str, + on_log: Option>, ) -> Result { let base_url = env::var("POPCORN_API_URL").map_err(|_| anyhow!("POPCORN_API_URL is not set"))?;