Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9e288de
sdk_v2/cpp: cross-process lock + skip-existing for downloads
bmehta001 Jun 9, 2026
8225ce7
sdk_v2/cpp: per-chunk resumable downloads with linked cancellation
bmehta001 Jun 9, 2026
2735570
Stream blob chunks via per-worker scratch + IFileWriter (caps per-wor…
bmehta001 Jun 9, 2026
aa5d30b
Fix Linux/macOS -Werror build break + MutexFstream sticky-failbit
bmehta001 Jun 10, 2026
036f7a6
Preserve resume progress on transient stat / rename failures
bmehta001 Jun 10, 2026
a0b944b
CrossProcessFileLock: drop self-cleanup; persist lock file across rel…
bmehta001 Jun 10, 2026
d7a3072
Revert "CrossProcessFileLock: drop self-cleanup; persist lock file ac…
bmehta001 Jun 10, 2026
38b14ad
refactor(download): collapse file writer to a single concrete FileWriter
bmehta001 Jun 17, 2026
7b1dd4f
fix(download): per-model serialization + airtight cross-process lock
bmehta001 Jun 18, 2026
905b6a1
fixup(rebase): resolve semantic conflicts against main's download ref…
bmehta001 Jun 18, 2026
3e7382f
fix(download): correct SaveState prefix advance across a 64-bit word …
bmehta001 Jun 18, 2026
42a7737
fix(download): persist sidecar before pre-allocating, so a crash stay…
bmehta001 Jun 18, 2026
1ee720c
fix(download): serialize the user progress callback across chunk workers
bmehta001 Jun 18, 2026
6396c8a
feat(download): also flush the resume sidecar on a wall-clock cap
bmehta001 Jun 18, 2026
4a961b5
Fix comment
bmehta001 Jun 18, 2026
f5b377f
download: clarify state-sidecar serialization naming and docs
bmehta001 Jun 18, 2026
94809d4
download: fixed-size sidecar interval; drop test-only cancel plumbing
bmehta001 Jun 18, 2026
92f8605
Rename function
bmehta001 Jun 18, 2026
f4567a4
test(download): add a true cross-process lock test (POSIX)
bmehta001 Jun 18, 2026
124f3f3
download: serialize model downloads with one process-wide mutex
bmehta001 Jun 19, 2026
faa9902
Nits
bmehta001 Jun 19, 2026
5443756
download: skip the periodic sidecar save once the download is complete
bmehta001 Jun 19, 2026
6f823f7
download: fix stale comments flagged in Copilot review
bmehta001 Jun 19, 2026
74a9938
download: include <type_traits> explicitly in blob_download_state.cc
bmehta001 Jun 19, 2026
9dff27b
download: address Copilot review — resume integrity + non-blocking lo…
bmehta001 Jun 19, 2026
defc6ed
test(download): fix misleading comment in PersistsSidecarOnChunkFailure
bmehta001 Jun 19, 2026
cc99205
fix(download): fail loud if the initial resume sidecar can't be persi…
bmehta001 Jun 19, 2026
06a92cc
test(download): add cross-process-lock wait-then-serve-cached test
bmehta001 Jun 19, 2026
eeb7278
test(download): add DISABLED live resume-after-cancel test + fix canc…
bmehta001 Jun 19, 2026
4721485
test(download): skip 0% heartbeat in resume assertion
bmehta001 Jun 19, 2026
e4f9dfc
fix(download): drain in-flight peers promptly on user cancel
bmehta001 Jun 20, 2026
0404ff2
Make some things references
skottmckay Jun 20, 2026
fdfa267
fix(download): report global running total for monotonic progress
bmehta001 Jun 20, 2026
cfeedb8
Merge skottmckay/resumable-downloads-review: logger references + cons…
bmehta001 Jun 22, 2026
19b2f2b
Address review: simplify serialization, escalate save errors, formatt…
bmehta001 Jun 22, 2026
35729e9
Wrap initial-percent assignment to stay within the 120-char line limit
bmehta001 Jun 22, 2026
5cbaddd
Drop unused includes from download_manager.h
bmehta001 Jun 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk_v2/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ set(FOUNDRY_LOCAL_SOURCES
src/inferencing/generative/chat/chat_session.cc
src/inferencing/generative/chat/chat_template.cc
src/configuration.cc
src/download/blob_download_state.cc
src/download/blob_downloader.cc
src/download/cross_process_file_lock.cc
src/download/download_manager.cc
src/download/file_writer.cc
src/download/inference_model_writer.cc
src/download/model_registry_client.cc
src/ep_detection/cuda_ep_bootstrapper.cc
Expand Down
367 changes: 367 additions & 0 deletions sdk_v2/cpp/src/download/blob_download_state.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,367 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
#include "download/blob_download_state.h"
#include "logger.h"

#include <chrono>
#include <cstring>
#include <fstream>
#include <system_error>
Comment thread
Copilot marked this conversation as resolved.
#include <type_traits>

namespace fl {

namespace {

constexpr const char* kStateFileExtension = ".dlstate";

// On-disk format. Scalar fields use host byte order (little-endian on every
// target we build for); see WriteNative/ReadNative below. The bitmap suffix is
// a raw byte copy and is endian-agnostic.
// bytes | field
// -------|--------------------------------------------------------
// 0..3 | magic "FLDS"
// 4 | version (currently 1)
// 5..12 | blob_size (int64)
// 13..16 | chunk_size (int32)
// 17..20 | total_chunks (int32)
// 21..24 | bitmap_byte_aligned_start (int32)
// 25..28 | highest_completed_chunk (int32)
// 29..32 | completed_count (int32)
// 33..40 | last_modified_unix_ms (int64)
// 41..44 | trunc_bitmap_byte_len (uint32)
// 45.. | trunc_bitmap_byte_len bytes of bitmap data, copied directly out of
// full_completion_bitmap starting at the byte offset implied by
// bitmap_byte_aligned_start.
constexpr char kMagic[4] = {'F', 'L', 'D', 'S'};
constexpr uint8_t kVersion = 1;

constexpr int32_t kBitsPerWord = 64;

// Serialize a scalar field in host byte order. Every target we build for
// (x64 / arm64) is little-endian, so the on-disk layout is little-endian in
// practice.
template <typename T>
void WriteNative(std::ostream& out, T value) {
static_assert(std::is_trivially_copyable_v<T>);
out.write(reinterpret_cast<const char*>(&value), sizeof(T));
}

template <typename T>
bool ReadNative(std::istream& in, T& out_value) {
static_assert(std::is_trivially_copyable_v<T>);
in.read(reinterpret_cast<char*>(&out_value), sizeof(T));
return static_cast<bool>(in);
}

int64_t NowUnixMs() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}

} // namespace

std::filesystem::path BlobDownloadState::GetStateFilePath(const std::filesystem::path& local_file_path) {
auto p = local_file_path;
p += kStateFileExtension;
return p;
}

std::unique_ptr<BlobDownloadState> BlobDownloadState::CreateNew(std::string blob_name,
std::filesystem::path local_file_path,
int64_t blob_size,
int32_t chunk_size,
int32_t total_chunks) {
auto state = std::make_unique<BlobDownloadState>();
state->blob_name = std::move(blob_name);
state->local_file_path = local_file_path.string();
state->blob_size = blob_size;
state->chunk_size = chunk_size;
state->total_chunks = total_chunks;
state->bitmap_byte_aligned_start = 0;
state->highest_completed_chunk = -1;
state->completed_count = 0;
state->last_modified_unix_ms = NowUnixMs();
auto words = static_cast<size_t>((total_chunks + kBitsPerWord - 1) / kBitsPerWord);
state->full_completion_bitmap.assign(words, 0);
return state;
}

std::unique_ptr<BlobDownloadState> BlobDownloadState::LoadState(std::string blob_name,
std::filesystem::path local_file_path,
int64_t expected_blob_size,
int32_t expected_chunk_size,
int32_t expected_total_chunks,
ILogger& logger) {
auto state_path = GetStateFilePath(local_file_path);
std::error_code ec;
if (!std::filesystem::exists(state_path, ec)) {
return nullptr;
}

std::ifstream in(state_path, std::ios::binary);
if (!in) {
logger.Log(LogLevel::Warning, "Could not open download state file: " + state_path.string());
return nullptr;
}

char magic[4]{};
in.read(magic, 4);
uint8_t version = 0;
if (!in || std::memcmp(magic, kMagic, 4) != 0 || !ReadNative(in, version) || version != kVersion) {
logger.Log(LogLevel::Warning,
"Download state file " + state_path.string() + " has unexpected magic/version; ignoring");
return nullptr;
}

int64_t blob_size = 0;
int32_t chunk_size = 0;
int32_t total_chunks = 0;
int32_t bitmap_byte_aligned_start = 0;
int32_t highest_completed_chunk = 0;
int32_t completed_count = 0;
int64_t last_modified_unix_ms = 0;
uint32_t trunc_len = 0;
if (!ReadNative(in, blob_size) || !ReadNative(in, chunk_size) || !ReadNative(in, total_chunks) ||
!ReadNative(in, bitmap_byte_aligned_start) || !ReadNative(in, highest_completed_chunk) ||
!ReadNative(in, completed_count) || !ReadNative(in, last_modified_unix_ms) || !ReadNative(in, trunc_len)) {
logger.Log(LogLevel::Warning, "Download state header truncated: " + state_path.string());
return nullptr;
}

// Sanity / compatibility checks.
if (blob_size != expected_blob_size || chunk_size != expected_chunk_size ||
total_chunks != expected_total_chunks) {
logger.Log(LogLevel::Information,
"Download state for " + state_path.string() +
" is incompatible with current blob layout; starting fresh");
return nullptr;
}
if (bitmap_byte_aligned_start < 0 || bitmap_byte_aligned_start % 8 != 0 ||
bitmap_byte_aligned_start > total_chunks || completed_count < 0 ||
completed_count > total_chunks || highest_completed_chunk < -1 ||
highest_completed_chunk >= total_chunks) {
logger.Log(LogLevel::Warning, "Download state header values out of range: " + state_path.string());
return nullptr;
}

auto words_total = static_cast<size_t>((total_chunks + kBitsPerWord - 1) / kBitsPerWord);
std::vector<uint64_t> bitmap(words_total, 0);

// The prefix of fully-completed chunks below bitmap_byte_aligned_start is
// implied — fill those bits.
size_t implicit_full_words = static_cast<size_t>(bitmap_byte_aligned_start) / kBitsPerWord;
for (size_t i = 0; i < implicit_full_words && i < bitmap.size(); ++i) {
bitmap[i] = ~uint64_t{0};
}
// Any remaining "implicit" bits inside a partial word (between
// implicit_full_words*64 and bitmap_byte_aligned_start).
if (size_t partial_bits = static_cast<size_t>(bitmap_byte_aligned_start) % kBitsPerWord;
partial_bits > 0 && implicit_full_words < bitmap.size()) {
bitmap[implicit_full_words] |= (uint64_t{1} << partial_bits) - 1;
}

if (trunc_len > 0) {
// Copy serialized bytes directly into the bitmap starting at the byte
// position implied by bitmap_byte_aligned_start.
size_t byte_offset = static_cast<size_t>(bitmap_byte_aligned_start) / 8;
auto* dest = reinterpret_cast<unsigned char*>(bitmap.data()) + byte_offset;
auto dest_capacity = bitmap.size() * sizeof(uint64_t) - byte_offset;
if (trunc_len > dest_capacity) {
logger.Log(LogLevel::Warning,
"Download state bitmap length exceeds expected capacity: " + state_path.string());
return nullptr;
}
in.read(reinterpret_cast<char*>(dest), trunc_len);
if (!in) {
logger.Log(LogLevel::Warning,
"Download state bitmap payload truncated: " + state_path.string());
return nullptr;
}
}

auto state = std::make_unique<BlobDownloadState>();
state->blob_name = std::move(blob_name);
state->local_file_path = local_file_path.string();
state->blob_size = blob_size;
state->chunk_size = chunk_size;
state->total_chunks = total_chunks;
state->bitmap_byte_aligned_start = bitmap_byte_aligned_start;
state->highest_completed_chunk = highest_completed_chunk;
state->completed_count = completed_count;
state->last_modified_unix_ms = last_modified_unix_ms;
state->full_completion_bitmap = std::move(bitmap);

logger.Log(LogLevel::Information,
"Loaded download state " + state_path.string() + ": " +
std::to_string(completed_count) + "/" + std::to_string(total_chunks) +
" chunks already done");
return state;
}

int64_t BlobDownloadState::CalculateDownloadedSize() const noexcept {
int64_t bytes = static_cast<int64_t>(completed_count) * chunk_size;
// If the final chunk is partial and was completed, adjust the overcount.
if (highest_completed_chunk == total_chunks - 1 && chunk_size > 0) {
auto remainder = blob_size % chunk_size;
if (remainder != 0) {
bytes -= (chunk_size - remainder);
}
}
return bytes;
}

bool BlobDownloadState::IsChunkComplete(int32_t chunk_idx) const noexcept {
if (chunk_idx < 0 || chunk_idx >= total_chunks) {
return false;
}
if (chunk_idx < bitmap_byte_aligned_start) {
// Below the truncation point — implicitly complete.
return true;
}
auto word_idx = static_cast<size_t>(chunk_idx) / kBitsPerWord;
auto bit_idx = static_cast<size_t>(chunk_idx) % kBitsPerWord;
if (word_idx >= full_completion_bitmap.size()) {
return false;
}
return (full_completion_bitmap[word_idx] & (uint64_t{1} << bit_idx)) != 0;
}

void BlobDownloadState::MarkChunkComplete(int32_t chunk_idx) {
if (chunk_idx < 0 || chunk_idx >= total_chunks) {
return;
}
if (IsChunkComplete(chunk_idx)) {
return;
}
if (chunk_idx > highest_completed_chunk) {
highest_completed_chunk = chunk_idx;
}
auto word_idx = static_cast<size_t>(chunk_idx) / kBitsPerWord;
auto bit_idx = static_cast<size_t>(chunk_idx) % kBitsPerWord;
full_completion_bitmap[word_idx] |= (uint64_t{1} << bit_idx);
++completed_count;
}

std::vector<int32_t> BlobDownloadState::GetPendingChunks() const {
std::vector<int32_t> pending;
pending.reserve(static_cast<size_t>(total_chunks - completed_count));
for (int32_t i = bitmap_byte_aligned_start; i < total_chunks; ++i) {
if (!IsChunkComplete(i)) {
pending.push_back(i);
}
}
return pending;
}

bool BlobDownloadState::SaveState(ILogger& logger) {
// Advance bitmap_byte_aligned_start past any words that are now all 1s, so
// the next save serializes only the unfinished tail.
// Find the first word that is not fully complete. Every word below it is
// implicitly complete and need not be serialized again.
size_t word_idx = static_cast<size_t>(bitmap_byte_aligned_start) / kBitsPerWord;
while (word_idx < full_completion_bitmap.size() &&
full_completion_bitmap[word_idx] == ~uint64_t{0}) {
++word_idx;
}
int32_t new_start;
if (word_idx < full_completion_bitmap.size()) {
// Within the first not-fully-set word, advance to the lowest 0 bit. Derive
// the absolute chunk index from the word base (word_idx * 64), NOT by
// accumulating 64 per word onto the (possibly unaligned) previous start —
// the latter overshoots by (bitmap_byte_aligned_start % 64) and would mark
// never-downloaded chunks complete on reload. Round down to a byte boundary
// so reload-then-resume re-reads on a clean alignment.
uint64_t inverted = ~full_completion_bitmap[word_idx];
int trailing_zero = 0;
while (trailing_zero < kBitsPerWord && ((inverted >> trailing_zero) & 1) == 0) {
++trailing_zero;
}
new_start = static_cast<int32_t>(word_idx) * kBitsPerWord + trailing_zero;
} else {
// Every word is fully complete.
new_start = total_chunks;
}
new_start = (new_start / 8) * 8;
if (new_start > total_chunks) {
new_start = (total_chunks / 8) * 8;
}
if (new_start > bitmap_byte_aligned_start) {
bitmap_byte_aligned_start = new_start;
}

last_modified_unix_ms = NowUnixMs();

auto state_path = GetStateFilePath(local_file_path);
auto tmp_path = state_path;
tmp_path += ".tmp";

// Compute the serialized bitmap payload: bytes from bitmap_byte_aligned_start
// up to (highest_completed_chunk + 1), rounded up to the nearest byte.
uint32_t trunc_len = 0;
if (highest_completed_chunk >= bitmap_byte_aligned_start) {
int32_t bit_count = highest_completed_chunk - bitmap_byte_aligned_start + 1;
trunc_len = static_cast<uint32_t>((bit_count + 7) / 8);
}
size_t byte_offset = static_cast<size_t>(bitmap_byte_aligned_start) / 8;

{
std::ofstream out(tmp_path, std::ios::binary | std::ios::trunc);
if (!out) {
logger.Log(LogLevel::Error, "Failed to open download state tmp file: " + tmp_path.string());
return false;
}
out.write(kMagic, 4);
WriteNative(out, kVersion);
WriteNative(out, blob_size);
WriteNative(out, chunk_size);
WriteNative(out, total_chunks);
WriteNative(out, bitmap_byte_aligned_start);
WriteNative(out, highest_completed_chunk);
WriteNative(out, completed_count);
WriteNative(out, last_modified_unix_ms);
WriteNative(out, trunc_len);
if (trunc_len > 0) {
auto* src = reinterpret_cast<const unsigned char*>(full_completion_bitmap.data()) + byte_offset;
out.write(reinterpret_cast<const char*>(src), trunc_len);
}
if (!out) {
logger.Log(LogLevel::Error, "Failed to write download state tmp file: " + tmp_path.string());
return false;
}
}

std::error_code ec;
std::filesystem::rename(tmp_path, state_path, ec);
if (ec) {
// std::filesystem::rename atomically replaces the destination on every
// platform we target (POSIX rename(2); Windows MoveFileExW with
// MOVEFILE_REPLACE_EXISTING). If it still fails, the cause is transient
// (e.g. a brief sharing violation on Windows or a flaky network FS) —
// do NOT delete state_path as a fallback; that loses the only intact
// copy of the resume bitmap. Instead, drop the tmp file and let the
// next SaveState call retry from the up-to-date in-memory state.
std::error_code rm_ec;
std::filesystem::remove(tmp_path, rm_ec);
logger.Log(LogLevel::Error,
"Failed to commit download state file: " + tmp_path.string() + " -> " +
state_path.string() + " (" + ec.message() +
"); previous state retained, will retry on next save");
return false;
}
return true;
}

void BlobDownloadState::DeleteState(const std::filesystem::path& local_file_path, ILogger& logger) {
auto state_path = GetStateFilePath(local_file_path);
std::error_code ec;
std::filesystem::remove(state_path, ec);
if (ec) {
logger.Log(LogLevel::Warning,
"Failed to delete download state file: " + state_path.string() + " (" +
ec.message() + ")");
}
}

} // namespace fl
Loading
Loading