Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions compute/bulk_upload_chunk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package compute

import (
"reflect"
"testing"
)

// TestBulkUploadChunkRanges_Tiling verifies the chunk splitter exactly tiles
// the input (no gaps, no overlaps) and respects both the byte and tensor caps,
// which is the correctness-critical part of the GB10 wedge fix (ztensor#106).
func TestBulkUploadChunkRanges_Tiling(t *testing.T) {
const elemSize = 4

cases := []struct {
name string
nelems []int
maxBytes int
maxTensors int
want [][2]int
}{
{"empty", nil, 64, 8, [][2]int{}},
{"single", []int{10}, 64, 8, [][2]int{{0, 1}}},
{"all-fit-one-chunk", []int{1, 1, 1, 1}, 1 << 20, 1024, [][2]int{{0, 4}}},
{
// 4 tensors x 4 elems x 4 bytes = 16 bytes each; cap 32 bytes -> 2 per chunk.
"byte-cap-splits", []int{4, 4, 4, 4}, 32, 1024,
[][2]int{{0, 2}, {2, 4}},
},
{
"tensor-cap-splits", []int{1, 1, 1, 1, 1}, 1 << 20, 2,
[][2]int{{0, 2}, {2, 4}, {4, 5}},
},
{
// Middle tensor alone exceeds the byte cap: it must still get its
// own range, and the split must not stall or drop tensors.
"lone-oversized-gets-own-range", []int{1, 100, 1}, 32, 1024,
[][2]int{{0, 1}, {1, 2}, {2, 3}},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := bulkUploadChunkRanges(tc.nelems, elemSize, tc.maxBytes, tc.maxTensors)
if len(tc.nelems) == 0 {
if len(got) != 0 {
t.Fatalf("empty input: got %v, want no ranges", got)
}
return
}
if !reflect.DeepEqual(got, tc.want) {
t.Fatalf("ranges = %v, want %v", got, tc.want)
}

// Invariants: contiguous tiling of [0,len) and caps respected
// (except a single tensor that alone exceeds maxBytes).
prev := 0
for _, r := range got {
if r[0] != prev {
t.Fatalf("gap/overlap: range %v does not start at %d", r, prev)
}
if r[1] <= r[0] {
t.Fatalf("empty/inverted range %v", r)
}
if r[1]-r[0] > tc.maxTensors {
t.Fatalf("range %v exceeds maxTensors=%d", r, tc.maxTensors)
}
bytes := 0
for i := r[0]; i < r[1]; i++ {
bytes += tc.nelems[i] * elemSize
}
if bytes > tc.maxBytes && r[1]-r[0] > 1 {
t.Fatalf("range %v (%d bytes) exceeds maxBytes=%d with >1 tensor", r, bytes, tc.maxBytes)
}
prev = r[1]
}
if prev != len(tc.nelems) {
t.Fatalf("ranges cover [0,%d), want [0,%d)", prev, len(tc.nelems))
}
})
}
}

// TestBulkUploadChunkRanges_LargeCountIsBounded mirrors the production failure:
// a very large tensor count must split into many bounded chunks rather than one
// giant range (which wedged the GB10 driver).
func TestBulkUploadChunkRanges_LargeCountIsBounded(t *testing.T) {
const elemSize = 4
const n = 213304 // the observed hang count
nelems := make([]int, n)
for i := range nelems {
nelems[i] = 193 // one feature row
}
ranges := bulkUploadChunkRanges(nelems, elemSize, bulkUploadF32MaxChunkBytes, bulkUploadF32MaxChunkTensors)

if len(ranges) < 2 {
t.Fatalf("expected many chunks for %d tensors, got %d", n, len(ranges))
}
covered := 0
for _, r := range ranges {
if r[1]-r[0] > bulkUploadF32MaxChunkTensors {
t.Fatalf("chunk %v exceeds tensor cap %d", r, bulkUploadF32MaxChunkTensors)
}
bytes := 0
for i := r[0]; i < r[1]; i++ {
bytes += nelems[i] * elemSize
}
if bytes > bulkUploadF32MaxChunkBytes {
t.Fatalf("chunk %v (%d bytes) exceeds byte cap %d", r, bytes, bulkUploadF32MaxChunkBytes)
}
covered += r[1] - r[0]
}
if covered != n {
t.Fatalf("chunks cover %d tensors, want %d", covered, n)
}
}
65 changes: 65 additions & 0 deletions compute/bulk_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,71 @@ func TestGPUEngine_UploadWeights_BulkPath(t *testing.T) {
}
}

// TestGPUEngine_UploadWeights_MultiChunk exercises the bounded-chunk upload
// path on real hardware (zerfoo/ztensor#106). It uploads a payload large enough
// to span several bulkUploadF32MaxChunkBytes (64 MiB) chunks, proving that (a) a
// real 64 MiB cudaMalloc + H2D copy does not wedge the GB10 driver, (b) the
// bulk buffer slice holds one pointer per chunk, and (c) tensor data round-trips
// correctly across chunk boundaries. Skips without CUDA.
func TestGPUEngine_UploadWeights_MultiChunk(t *testing.T) {
if !cuda.Available() {
t.Skip("CUDA not available")
}

ops := numeric.Float32Ops{}
gpuEng, err := NewGPUEngine[float32](ops)
if err != nil {
t.Fatalf("NewGPUEngine: %v", err)
}
defer func() { _ = gpuEng.Close() }()

// 256 tensors of 1 MiB each = 256 MiB total. With a 64 MiB byte cap this
// tiles into 4 chunks (the tensor-count cap of 4096 is not reached), so the
// upload issues 4 bounded device allocations + copies instead of one 256 MiB
// allocation that would risk wedging the driver.
const elemsPer = 256 * 1024 // 1 MiB per tensor
const N = 256
const wantChunks = 4

tensors := make([]*tensor.TensorNumeric[float32], N)
for i := range N {
data := make([]float32, elemsPer)
// Sentinel at both ends of each tensor to catch chunk-boundary offset bugs.
data[0] = float32(i*1_000_000 + 1)
data[elemsPer-1] = float32(i*1_000_000 + 2)
tt, _ := tensor.New[float32]([]int{elemsPer}, data)
tensors[i] = tt
}

if err := gpuEng.UploadWeights(tensors); err != nil {
t.Fatalf("UploadWeights (multi-chunk): %v", err)
}

if got := len(gpuEng.bulkUploadBuffers); got != wantChunks {
t.Fatalf("bulkUploadBuffers after multi-chunk upload = %d, want %d", got, wantChunks)
}

for i, tt := range tensors {
if _, ok := tt.GetStorage().(*tensor.GPUStorage[float32]); !ok {
t.Fatalf("tensor[%d] storage = %T, want *GPUStorage[float32]", i, tt.GetStorage())
}
}

// Round-trip the first and last element of tensors at and around each chunk
// boundary (every 64th tensor) to confirm views point at the right offsets.
for _, i := range []int{0, 63, 64, 127, 128, 191, 192, N - 1} {
got := tensors[i].Data()
wantHead := float32(i*1_000_000 + 1)
wantTail := float32(i*1_000_000 + 2)
if math.Abs(float64(got[0]-wantHead)) > 1e-6 {
t.Errorf("tensor[%d][0] = %f, want %f", i, got[0], wantHead)
}
if math.Abs(float64(got[elemsPer-1]-wantTail)) > 1e-6 {
t.Errorf("tensor[%d][last] = %f, want %f", i, got[elemsPer-1], wantTail)
}
}
}

// TestGPUEngine_UploadWeights_BelowBulkThreshold verifies that small inputs
// stay on the per-tensor path and the bulk allocation slice remains empty.
func TestGPUEngine_UploadWeights_BelowBulkThreshold(t *testing.T) {
Expand Down
131 changes: 94 additions & 37 deletions compute/gpu_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,41 @@ func (e *GPUEngine[T]) checkVRAMBounds(op string, allocBytes int) error {
// round-trips regardless of input size.
const bulkUploadF32MinTensors = 64

// bulkUploadF32MaxChunkBytes / bulkUploadF32MaxChunkTensors bound a single
// device allocation + H2D copy inside bulkUploadF32. A single unbounded
// allocation/copy of all eligible tensors (hundreds of thousands -> multi-GB)
// wedges the GB10 (sm_121) CUDA driver in an uninterruptible ioctl, which also
// makes the container unkillable (zerfoo/ztensor#106). Chunking keeps every
// driver call bounded while preserving the few-round-trips win over the
// per-tensor path. maxChunkBytes is a var so tests can force multi-chunk paths.
var bulkUploadF32MaxChunkBytes = 64 << 20 // 64 MiB

const bulkUploadF32MaxChunkTensors = 4096

// bulkUploadChunkRanges splits a sequence of tensors (given their per-tensor
// element counts) into contiguous [start,end) ranges, each bounded by maxBytes
// (sum of nelem*elemSize) and maxTensors. Every range holds at least one tensor,
// so a lone tensor whose size exceeds maxBytes still gets its own range rather
// than stalling. The ranges exactly tile [0,len(nelems)) with no gaps/overlaps.
func bulkUploadChunkRanges(nelems []int, elemSize, maxBytes, maxTensors int) [][2]int {
ranges := make([][2]int, 0, 1)
for start := 0; start < len(nelems); {
end := start
chunkBytes := 0
for end < len(nelems) {
tb := nelems[end] * elemSize
if end > start && (chunkBytes+tb > maxBytes || end-start >= maxTensors) {
break
}
chunkBytes += tb
end++
}
ranges = append(ranges, [2]int{start, end})
start = end
}
return ranges
}

// bulkUploadF32 fast-paths the F32 weight upload by allocating one device
// buffer for all eligible tensors and performing one H2D copy. Each tensor
// receives a non-owning GPUStorage view into the bulk buffer; the engine
Expand All @@ -382,12 +417,10 @@ func (e *GPUEngine[T]) bulkUploadF32(tensors []*tensor.TensorNumeric[float32]) (
}

type entry struct {
t *tensor.TensorNumeric[float32]
offset int
nelem int
t *tensor.TensorNumeric[float32]
nelem int
}
eligible := make([]entry, 0, len(tensors))
total := 0
for _, t := range tensors {
if t == nil {
continue
Expand All @@ -404,8 +437,7 @@ func (e *GPUEngine[T]) bulkUploadF32(tensors []*tensor.TensorNumeric[float32]) (
if n == 0 {
continue
}
eligible = append(eligible, entry{t: t, offset: total, nelem: n})
total += n * f32Size
eligible = append(eligible, entry{t: t, nelem: n})
}
if len(eligible) < bulkUploadF32MinTensors {
return 0, nil
Expand All @@ -414,41 +446,66 @@ func (e *GPUEngine[T]) bulkUploadF32(tensors []*tensor.TensorNumeric[float32]) (
return 0, err
}

var devPtr unsafe.Pointer
var err error
if e.managedMem {
devPtr, err = mallocManagedFn(total)
} else {
devPtr, err = e.runtime.Malloc(total)
}
if err != nil {
return 0, fmt.Errorf("bulk alloc f32 (%d tensors, %d bytes): %w",
len(eligible), total, err)
// Upload in bounded chunks. A single unbounded allocation + H2D copy of
// all eligible tensors (hundreds of thousands -> multi-GB) wedges the GB10
// (sm_121) CUDA driver in an uninterruptible ioctl, which also makes the
// container unkillable (zerfoo/ztensor#106). Cap each device allocation +
// copy at bulkUploadF32MaxChunkBytes / MaxChunkTensors; each tensor gets a
// non-owning view into its chunk's buffer.
nelems := make([]int, len(eligible))
for i, en := range eligible {
nelems[i] = en.nelem
}
for _, r := range bulkUploadChunkRanges(nelems, f32Size,
bulkUploadF32MaxChunkBytes, bulkUploadF32MaxChunkTensors) {
chunk := eligible[r[0]:r[1]]
chunkBytes := 0
for _, en := range chunk {
chunkBytes += en.nelem * f32Size
}

if e.managedMem {
dst := unsafe.Slice((*byte)(devPtr), total)
for _, en := range eligible {
src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size)
copy(dst[en.offset:en.offset+en.nelem*f32Size], src)
}
} else {
host := make([]byte, total)
for _, en := range eligible {
src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size)
copy(host[en.offset:en.offset+en.nelem*f32Size], src)
}
if err := e.runtime.Memcpy(devPtr, unsafe.Pointer(&host[0]), total, gpuapi.MemcpyHostToDevice); err != nil {
_ = e.runtime.Free(devPtr)
return 0, fmt.Errorf("bulk H2D f32 (%d bytes): %w", total, err)
var devPtr unsafe.Pointer
var err error
if e.managedMem {
devPtr, err = mallocManagedFn(chunkBytes)
} else {
devPtr, err = e.runtime.Malloc(chunkBytes)
}
if err != nil {
return 0, fmt.Errorf("bulk alloc f32 chunk (%d tensors, %d bytes): %w",
len(chunk), chunkBytes, err)
}

if e.managedMem {
dst := unsafe.Slice((*byte)(devPtr), chunkBytes)
off := 0
for _, en := range chunk {
src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size)
copy(dst[off:off+en.nelem*f32Size], src)
off += en.nelem * f32Size
}
} else {
host := make([]byte, chunkBytes)
off := 0
for _, en := range chunk {
src := unsafe.Slice((*byte)(unsafe.Pointer(&en.t.Data()[0])), en.nelem*f32Size)
copy(host[off:off+en.nelem*f32Size], src)
off += en.nelem * f32Size
}
if err := e.runtime.Memcpy(devPtr, unsafe.Pointer(&host[0]), chunkBytes, gpuapi.MemcpyHostToDevice); err != nil {
_ = e.runtime.Free(devPtr)
return 0, fmt.Errorf("bulk H2D f32 chunk (%d bytes): %w", chunkBytes, err)
}
}
}

e.bulkUploadBuffers = append(e.bulkUploadBuffers, devPtr)
for _, en := range eligible {
sub := unsafe.Add(devPtr, en.offset)
view := tensor.NewGPUStorageViewFromPtr[float32](sub, en.nelem, e.deviceID)
en.t.SetStorage(view)
e.bulkUploadBuffers = append(e.bulkUploadBuffers, devPtr)
off := 0
for _, en := range chunk {
sub := unsafe.Add(devPtr, off)
view := tensor.NewGPUStorageViewFromPtr[float32](sub, en.nelem, e.deviceID)
en.t.SetStorage(view)
off += en.nelem * f32Size
}
}
return len(eligible), nil
}
Expand Down
Loading
Loading