Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ pub struct SubmissionDetails {
pub done: bool,
pub code: String,
pub runs: Vec<SubmissionRun>,
pub job: Option<SubmissionJobStatus>,
}

#[derive(Clone, Debug)]
pub struct SubmissionJobStatus {
pub status: Option<String>,
pub error: Option<String>,
}

/// A single run within a submission
Expand Down
193 changes: 192 additions & 1 deletion src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ use std::env;
use std::path::Path;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::time::sleep;

use crate::models::{
GpuItem, LeaderboardItem, SubmissionDetails, SubmissionRun, UserSubmission, UserSubmissionRun,
GpuItem, LeaderboardItem, SubmissionDetails, SubmissionJobStatus, SubmissionRun,
UserSubmission, UserSubmissionRun,
};

const SUBMISSION_POLL_INTERVAL_SECONDS: u64 = 5;
const SUBMISSION_POLL_TIMEOUT_SECONDS: u64 = 60 * 60;

// Helper function to create a reusable reqwest client
pub fn create_client(cli_id: Option<String>) -> Result<Client> {
let mut default_headers = HeaderMap::new();
Expand Down Expand Up @@ -465,6 +470,17 @@ pub async fn get_user_submission(client: &Client, submission_id: i64) -> Result<
})
.unwrap_or_default();

let job = sub.get("job").and_then(|job| {
if job.is_null() {
None
} else {
Some(SubmissionJobStatus {
status: job["status"].as_str().map(str::to_string),
error: job["error"].as_str().map(str::to_string),
})
}
});

Ok(SubmissionDetails {
id: sub["id"].as_i64().unwrap_or(0),
leaderboard_id: sub["leaderboard_id"].as_i64().unwrap_or(0),
Expand All @@ -475,6 +491,7 @@ pub async fn get_user_submission(client: &Client, submission_id: i64) -> Result<
done: sub["done"].as_bool().unwrap_or(false),
code: sub["code"].as_str().unwrap_or("").to_string(),
runs,
job,
})
}

Expand Down Expand Up @@ -547,6 +564,180 @@ pub async fn submit_solution<P: AsRef<Path>>(
gpu: &str,
submission_mode: &str,
on_log: Option<Box<dyn Fn(String) + Send + Sync>>,
) -> Result<String> {
if submission_mode.eq_ignore_ascii_case("profile") {
return submit_solution_streaming(
client,
filepath,
file_content,
leaderboard,
gpu,
submission_mode,
on_log,
)
.await;
}

submit_solution_background(
client,
filepath,
file_content,
leaderboard,
gpu,
submission_mode,
on_log,
)
.await
}

async fn submit_solution_background<P: AsRef<Path>>(
client: &Client,
filepath: P,
file_content: &[u8],
leaderboard: &str,
gpu: &str,
submission_mode: &str,
on_log: Option<Box<dyn Fn(String) + Send + Sync>>,
) -> Result<String> {
let base_url =
env::var("POPCORN_API_URL").map_err(|_| anyhow!("POPCORN_API_URL is not set"))?;

let filename = filepath
.as_ref()
.file_name()
.ok_or_else(|| anyhow!("Invalid filepath"))?
.to_string_lossy();

let part = Part::bytes(file_content.to_vec()).file_name(filename.to_string());
let form = Form::new().part("file", part);
let url = format!(
"{}/submission/{}/{}/{}",
base_url,
leaderboard.to_lowercase(),
gpu,
submission_mode.to_lowercase()
);

let resp = client
.post(&url)
.multipart(form)
.timeout(Duration::from_secs(60))
.send()
.await?;

let status = resp.status();
if !status.is_success() {
let error_text = resp.text().await?;
let detail = serde_json::from_str::<Value>(&error_text)
.ok()
.and_then(|v| v.get("detail").and_then(|d| d.as_str()).map(str::to_string));

return Err(anyhow!(
"Server returned status {}: {}",
status,
detail.unwrap_or(error_text)
));
}

let accepted: Value = resp.json().await?;
let submission_id = accepted
.get("details")
.and_then(|v| v.get("id"))
.and_then(|v| v.as_i64())
.ok_or_else(|| anyhow!("Server did not return a submission id"))?;

if let Some(ref cb) = on_log {
cb(format!(
"Submission {} accepted. Waiting for results...",
submission_id
));
}

let mut elapsed = 0;
loop {
let details = get_user_submission(client, submission_id).await?;
let job_status = details
.job
.as_ref()
.and_then(|job| job.status.as_deref())
.unwrap_or(if details.done { "done" } else { "pending" });

if let Some(ref cb) = on_log {
cb(format!(
"Submission {} status: {} ({}s)",
submission_id, job_status, elapsed
));
}

match job_status {
"failed" | "timed_out" | "hacked" => {
let error = details
.job
.as_ref()
.and_then(|job| job.error.as_deref())
.unwrap_or("No error details were provided");
return Err(anyhow!(
"Submission {} {}: {}",
submission_id,
job_status,
error
));
}
_ => {}
}

if details.done {
return format_submission_details(&details);
}

if elapsed >= SUBMISSION_POLL_TIMEOUT_SECONDS {
return Err(anyhow!(
"Timed out waiting for submission {} after {} seconds",
submission_id,
SUBMISSION_POLL_TIMEOUT_SECONDS
));
}

sleep(Duration::from_secs(SUBMISSION_POLL_INTERVAL_SECONDS)).await;
elapsed += SUBMISSION_POLL_INTERVAL_SECONDS;
}
}

fn format_submission_details(details: &SubmissionDetails) -> Result<String> {
let runs: Vec<Value> = details
.runs
.iter()
.map(|run| {
serde_json::json!({
"mode": run.mode,
"secret": run.secret,
"runner": run.runner,
"score": run.score,
"passed": run.passed,
"start_time": run.start_time,
"end_time": run.end_time,
})
})
.collect();

serde_json::to_string_pretty(&serde_json::json!({
"submission_id": details.id,
"leaderboard": details.leaderboard_name,
"file_name": details.file_name,
"done": details.done,
"runs": runs,
}))
.map_err(|e| anyhow!("Failed to format submission result: {}", e))
}

async fn submit_solution_streaming<P: AsRef<Path>>(
client: &Client,
filepath: P,
file_content: &[u8],
leaderboard: &str,
gpu: &str,
submission_mode: &str,
on_log: Option<Box<dyn Fn(String) + Send + Sync>>,
) -> Result<String> {
let base_url =
env::var("POPCORN_API_URL").map_err(|_| anyhow!("POPCORN_API_URL is not set"))?;
Expand Down
Loading