diff --git a/src/client.zig b/src/client.zig index 40c2f21..ce2f2c3 100644 --- a/src/client.zig +++ b/src/client.zig @@ -364,11 +364,15 @@ pub const ControlResult = struct { text: []u8, }; -/// Send a single control command (-X) and wait for the reply. +/// Send a single control command (-X) and wait for the reply. When +/// `timeout_ms` is non-null, give up with `error.Timeout` if no reply +/// arrives in that window, so a caller (notably `boo ui`) cannot hang +/// forever on a daemon that has stopped answering. pub fn control( alloc: std.mem.Allocator, socket_path: []const u8, argv: []const []const u8, + timeout_ms: ?u32, ) !ControlResult { const sock = try connect(alloc, socket_path); defer posix.close(sock); @@ -380,7 +384,20 @@ pub fn control( var decoder: protocol.Decoder = .init(alloc); defer decoder.deinit(); var buf: [4096]u8 = undefined; + const deadline: ?i64 = if (timeout_ms) |ms| + std.time.milliTimestamp() + ms + else + null; while (true) { + if (deadline) |dl| { + const now = std.time.milliTimestamp(); + if (now >= dl) return error.Timeout; + var fds = [_]posix.pollfd{ + .{ .fd = sock, .events = posix.POLL.IN, .revents = 0 }, + }; + const ready = posix.poll(&fds, @intCast(dl - now)) catch return error.Timeout; + if (ready == 0) return error.Timeout; + } const n = posix.read(sock, &buf) catch 0; if (n == 0) return error.ConnectionLost; try decoder.feed(buf[0..n]); @@ -431,3 +448,34 @@ test "ReleaseScan: non-release CSI and plain bytes never trigger" { try std.testing.expect(!scan.feed("dddd")); try std.testing.expect(!scan.feed("\x1b[A\x1b[100;1:2u")); } + +test "control times out when the daemon never answers" { + const alloc = std.testing.allocator; + + var name_buf: [64]u8 = undefined; + const path = try std.fmt.bufPrint( + &name_buf, + "/tmp/boo-control-timeout-{x}.sock", + .{std.crypto.random.int(u32)}, + ); + std.fs.cwd().deleteFile(path) catch {}; + + // A listener that never accepts: connect() still succeeds via the + // backlog and the command write is buffered, so the reply read is + // what blocks. Without the timeout this call would hang forever + // (the boo ls / boo ui freeze); with it, it must give up. + const lfd = try posix.socket(posix.AF.UNIX, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0); + defer { + posix.close(lfd); + std.fs.cwd().deleteFile(path) catch {}; + } + var addr: posix.sockaddr.un = .{ .family = posix.AF.UNIX, .path = undefined }; + @memset(&addr.path, 0); + @memcpy(addr.path[0..path.len], path); + try posix.bind(lfd, @ptrCast(&addr), @sizeOf(posix.sockaddr.un)); + try posix.listen(lfd, 1); + + const start = std.time.milliTimestamp(); + try std.testing.expectError(error.Timeout, control(alloc, path, &.{"info"}, 150)); + try std.testing.expect(std.time.milliTimestamp() - start >= 100); +} diff --git a/src/daemon.zig b/src/daemon.zig index 957e8e5..3ca5a2c 100644 --- a/src/daemon.zig +++ b/src/daemon.zig @@ -5,6 +5,12 @@ //! Single-threaded poll(2) loop. One client may be attached at a time //! (attaching steals); any number of transient control connections //! may come and go. +//! +//! Client sockets are non-blocking and per-connection output is queued +//! and flushed on POLLOUT, so a client that reads slowly never blocks +//! the loop. A blocking write here would stall the PTY and every other +//! client until the slow reader caught up (the boo-ui freeze this +//! design guards against). const std = @import("std"); const posix = std.posix; @@ -32,20 +38,90 @@ pub const Options = struct { cwd: ?[]const u8 = null, }; +/// Hard cap on a single connection's queued-but-unsent output. The +/// daemon never blocks on a client write (see `Conn.flush`), so a +/// client that stops reading would otherwise grow this without bound. +/// A client that falls this far behind is dropped instead; it +/// reconnects and gets a fresh repaint. Generous, so an ordinary burst +/// (a full-screen repaint is kilobytes) never trips it. +const max_conn_out: usize = 8 * 1024 * 1024; + +/// Upper bound on how long teardown waits for queued finals to drain. +const shutdown_drain_ms: i64 = 250; + const Conn = struct { + alloc: std.mem.Allocator, fd: posix.fd_t, decoder: protocol.Decoder, attached: bool = false, /// A ui view (vs a plain attach): gets its scrollback history /// replayed on attach so a wheel-up can page it. ui: bool = false, + /// The connection is finished: its fd is dead or it was dropped. + /// `sweep` closes the fd and frees it. closed: bool = false, - + /// A final frame (detach or exit) is queued: read no more input and + /// close once `out` has drained, so the client still sees it. + shutdown: bool = false, + /// Frames queued for the client but not yet written to the socket. + /// The socket is non-blocking, so a slow reader backs up here + /// instead of blocking the loop. Drained on POLLOUT. + out: std.ArrayList(u8) = .empty, + /// Drop threshold for `out`; a field so tests can shrink it. + out_cap: usize = max_conn_out, + + /// Queue a frame and write what the socket will take right now. + /// Never blocks: the unsent remainder waits for the next POLLOUT. A + /// client that backs up past `out_cap` is dropped rather than + /// buffered without bound. fn send(self: *Conn, msg_type: protocol.MsgType, payload: []const u8) void { - if (self.closed) return; - protocol.writeMsg(self.fd, msg_type, payload) catch { - self.closed = true; + if (self.closed or self.shutdown) return; + protocol.appendMsg(self.alloc, &self.out, msg_type, payload) catch { + self.drop(); + return; }; + if (self.out.items.len > self.out_cap) { + self.drop(); + return; + } + self.flush(); + } + + /// Write as much of `out` as the socket accepts without blocking. A + /// full send buffer (WouldBlock) just leaves the remainder for the + /// next POLLOUT; any other error drops the connection. + fn flush(self: *Conn) void { + if (self.closed) return; + var off: usize = 0; + while (off < self.out.items.len) { + const n = posix.write(self.fd, self.out.items[off..]) catch |err| switch (err) { + error.WouldBlock => break, + else => { + self.drop(); + return; + }, + }; + if (n == 0) break; + off += n; + } + if (off == self.out.items.len) { + self.out.clearRetainingCapacity(); + } else if (off > 0) { + const remaining = self.out.items.len - off; + std.mem.copyForwards(u8, self.out.items[0..remaining], self.out.items[off..]); + self.out.shrinkRetainingCapacity(remaining); + } + } + + /// Discard queued output and mark the connection dead. + fn drop(self: *Conn) void { + self.closed = true; + self.out.clearRetainingCapacity(); + } + + fn deinit(self: *Conn) void { + self.out.deinit(self.alloc); + self.decoder.deinit(); } }; @@ -133,7 +209,7 @@ pub const Daemon = struct { if (self.win) |w| w.destroy(); for (self.conns.items) |c| { posix.close(c.fd); - c.decoder.deinit(); + c.deinit(); self.alloc.destroy(c); } self.conns.deinit(self.alloc); @@ -179,7 +255,20 @@ pub const Daemon = struct { try fds.append(self.alloc, .{ .fd = self.sig_read, .events = posix.POLL.IN, .revents = 0 }); try refs.append(self.alloc, .sigchld); for (self.conns.items) |c| { - try fds.append(self.alloc, .{ .fd = c.fd, .events = posix.POLL.IN, .revents = 0 }); + if (c.closed) continue; + // A connection whose final frame has drained is done; + // close it so sweep can free it this cycle. + if (c.shutdown and c.out.items.len == 0) { + c.closed = true; + continue; + } + var events: i16 = 0; + // Read input until shutdown; a shutdown conn only + // drains its queued tail. + if (!c.shutdown) events |= posix.POLL.IN; + // Watch for writability whenever output is queued. + if (c.out.items.len > 0) events |= posix.POLL.OUT; + try fds.append(self.alloc, .{ .fd = c.fd, .events = events, .revents = 0 }); try refs.append(self.alloc, .{ .conn = c }); } if (self.liveWindow()) |w| { @@ -196,7 +285,16 @@ pub const Daemon = struct { switch (ref) { .listen => self.acceptConn(), .sigchld => self.reapChildren(&buf), - .conn => |c| self.serviceConn(c, &buf), + .conn => |c| { + // Drain queued output first so a writable socket + // makes room before any new input is handled. + if ((pfd.revents & posix.POLL.OUT) != 0) c.flush(); + if (!c.closed and !c.shutdown and + (pfd.revents & (posix.POLL.IN | posix.POLL.HUP | posix.POLL.ERR)) != 0) + { + self.serviceConn(c, &buf); + } + }, .window => |w| self.serviceWindow(w, &buf), } if (self.quitting) break; @@ -208,10 +306,17 @@ pub const Daemon = struct { break; } } + + // Give queued finals (detach, exit, command replies) a bounded + // chance to reach clients before the fds close in deinit. + self.drainOutbound(); } fn acceptConn(self: *Daemon) void { - const fd = posix.accept(self.opts.listen_fd, null, null, posix.SOCK.CLOEXEC) catch |err| { + // Non-blocking so the daemon's writes to this client never + // block the loop; output the socket cannot take yet is queued + // on the Conn and flushed on POLLOUT. + const fd = posix.accept(self.opts.listen_fd, null, null, posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK) catch |err| { log.warn("accept failed: {}", .{err}); return; }; @@ -219,7 +324,7 @@ pub const Daemon = struct { posix.close(fd); return; }; - conn.* = .{ .fd = fd, .decoder = .init(self.alloc) }; + conn.* = .{ .alloc = self.alloc, .fd = fd, .decoder = .init(self.alloc) }; self.conns.append(self.alloc, conn) catch { posix.close(fd); self.alloc.destroy(conn); @@ -236,7 +341,15 @@ pub const Daemon = struct { } fn serviceConn(self: *Daemon, conn: *Conn, buf: []u8) void { - const n = posix.read(conn.fd, buf) catch 0; + const n = posix.read(conn.fd, buf) catch |err| switch (err) { + // A POLLOUT-only wake (or spurious readiness) has nothing + // to read yet; that is not an EOF. + error.WouldBlock => return, + else => { + conn.closed = true; + return; + }, + }; if (n == 0) { conn.closed = true; return; @@ -269,7 +382,9 @@ pub const Daemon = struct { for (self.conns.items) |other| { if (other != conn and other.attached) { other.send(.detached, "stolen"); - other.closed = true; + other.attached = false; + // Let the steal notice drain before closing. + other.shutdown = true; } } conn.attached = true; @@ -602,6 +717,7 @@ pub const Daemon = struct { /// Remove closed conns. Runs after every poll dispatch so /// iteration above never sees mutation. fn sweep(self: *Daemon) void { + var removed = false; var ci: usize = 0; while (ci < self.conns.items.len) { const c = self.conns.items[ci]; @@ -610,10 +726,16 @@ pub const Daemon = struct { continue; } posix.close(c.fd); - c.decoder.deinit(); + c.deinit(); self.alloc.destroy(c); _ = self.conns.swapRemove(ci); + removed = true; } + // A dropped or abruptly closed client may have been the + // attached view; recompute passthrough so the window resumes + // answering its own queries once no client remains, instead of + // staying silent waiting for a terminal that is gone. + if (removed) self.updatePassthrough(); } // -- Window management ------------------------------------------------ @@ -700,19 +822,50 @@ pub const Daemon = struct { fn detachConn(self: *Daemon, conn: *Conn, reason: []const u8) void { conn.send(.detached, reason); conn.attached = false; - conn.closed = true; + // Close once the detach notice has drained, not before, so the + // client still receives it. + conn.shutdown = true; self.updatePassthrough(); } fn broadcastExit(self: *Daemon, reason: []const u8) void { for (self.conns.items) |c| { - if (c.closed) continue; + if (c.closed or c.shutdown) continue; c.send(.exit, reason); - c.closed = true; + // Drain the exit notice before closing (see drainOutbound). + c.shutdown = true; } self.quitting = true; } + /// Best-effort flush of every connection's queued output before + /// teardown, so a final frame (detach, exit, command reply) still + /// reaches a client that is reading. Bounded by `shutdown_drain_ms` + /// so a client that stopped reading cannot delay shutdown. + fn drainOutbound(self: *Daemon) void { + const deadline = std.time.milliTimestamp() + shutdown_drain_ms; + var fds: std.ArrayList(posix.pollfd) = .empty; + defer fds.deinit(self.alloc); + while (true) { + fds.clearRetainingCapacity(); + for (self.conns.items) |c| { + if (c.closed) continue; + c.flush(); + if (!c.closed and c.out.items.len > 0) { + fds.append(self.alloc, .{ + .fd = c.fd, + .events = posix.POLL.OUT, + .revents = 0, + }) catch {}; + } + } + if (fds.items.len == 0) break; + const now = std.time.milliTimestamp(); + if (now >= deadline) break; + _ = posix.poll(fds.items, @intCast(deadline - now)) catch break; + } + } + fn message(self: *Daemon, conn: *Conn, comptime fmt: []const u8, args: anytype) void { const text = std.fmt.allocPrint(self.alloc, fmt, args) catch return; defer self.alloc.free(text); @@ -729,3 +882,85 @@ pub const Daemon = struct { conn.send(.output, seq); } }; + +test "Conn.send queues output without blocking and flushes in order" { + const alloc = std.testing.allocator; + + // A non-blocking pipe stands in for the client socket: writing to a + // full pipe yields WouldBlock just as a full socket send buffer + // does, so this exercises the queue/flush path without a client. + const fds = try posix.pipe2(.{ .NONBLOCK = true }); + defer posix.close(fds[0]); + + var conn: Conn = .{ .alloc = alloc, .fd = fds[1], .decoder = .init(alloc) }; + defer { + posix.close(fds[1]); + conn.deinit(); + } + + const payload = "x" ** 1024; + const frame_count: usize = 1000; + + // Nothing reads during this loop, so the pipe fills and the surplus + // must land in conn.out rather than block the caller. (A blocking + // write here would hang the test, which is the regression.) + for (0..frame_count) |_| conn.send(.output, payload); + try std.testing.expect(!conn.closed); + try std.testing.expect(conn.out.items.len > 0); + + // Drain: flush what the pipe will take, read it back, repeat. Every + // frame must arrive intact and in order. + var decoder: protocol.Decoder = .init(alloc); + defer decoder.deinit(); + var rbuf: [8192]u8 = undefined; + var got: usize = 0; + var guard: usize = 0; + while (got < frame_count) { + conn.flush(); + const n = posix.read(fds[0], &rbuf) catch |err| switch (err) { + error.WouldBlock => { + guard += 1; + try std.testing.expect(guard < 1_000_000); + continue; + }, + else => return err, + }; + try std.testing.expect(n != 0); + try decoder.feed(rbuf[0..n]); + while (try decoder.next()) |msg| { + try std.testing.expectEqual(protocol.MsgType.output, msg.type); + try std.testing.expectEqualStrings(payload, msg.payload); + got += 1; + } + } + try std.testing.expectEqual(@as(usize, 0), conn.out.items.len); +} + +test "Conn.send drops a client that exceeds its output cap" { + const alloc = std.testing.allocator; + + const fds = try posix.pipe2(.{ .NONBLOCK = true }); + defer posix.close(fds[0]); + + var conn: Conn = .{ + .alloc = alloc, + .fd = fds[1], + .decoder = .init(alloc), + .out_cap = 64 * 1024, + }; + defer { + posix.close(fds[1]); + conn.deinit(); + } + + // Nothing reads, so once the pipe is full the queue grows until it + // trips out_cap and the connection is dropped instead of buffering + // without bound. The bound on the loop keeps a regression from + // hanging the test. + const payload = "y" ** 4096; + var i: usize = 0; + while (!conn.closed and i < 10_000) : (i += 1) conn.send(.output, payload); + + try std.testing.expect(conn.closed); + try std.testing.expectEqual(@as(usize, 0), conn.out.items.len); +} diff --git a/src/main.zig b/src/main.zig index 017a2e2..c487274 100644 --- a/src/main.zig +++ b/src/main.zig @@ -177,10 +177,15 @@ pub const SessionInfo = struct { }; /// Query a session daemon, deleting the socket when the daemon is gone. -pub fn sessionInfo(alloc: std.mem.Allocator, dir: []const u8, name: []const u8) !?SessionInfo { +/// A non-null `timeout_ms` bounds the control round-trip so a slow daemon +/// cannot hang the caller; a timeout reports no info but leaves the socket +/// in place, because a slow daemon is not a dead one. +pub fn sessionInfo(alloc: std.mem.Allocator, dir: []const u8, name: []const u8, timeout_ms: ?u32) !?SessionInfo { const sock = try paths.socketPath(alloc, dir, name); defer alloc.free(sock); - const result = client.control(alloc, sock, &.{"info"}) catch { + const result = client.control(alloc, sock, &.{"info"}, timeout_ms) catch |err| { + // A timeout means the daemon is busy, not gone; keep the socket. + if (err == error.Timeout) return null; // Stale socket: the daemon is gone. std.fs.cwd().deleteFile(sock) catch {}; return null; @@ -237,7 +242,7 @@ fn mustControl( ) !client.ControlResult { const sock = try paths.socketPath(alloc, dir, name); defer alloc.free(sock); - return client.control(alloc, sock, argv) catch |err| switch (err) { + return client.control(alloc, sock, argv, null) catch |err| switch (err) { error.FileNotFound, error.ConnectionRefused, error.ConnectionLost => fail( exit_no_session, "no session named {s}", @@ -452,7 +457,7 @@ fn cmdLs(alloc: std.mem.Allocator, args: []const [:0]const u8) !void { infos.deinit(alloc); } for (sessions) |name| { - const info = sessionInfo(alloc, dir, name) catch continue orelse continue; + const info = sessionInfo(alloc, dir, name, null) catch continue orelse continue; try infos.append(alloc, .{ .name = name, .info = info }); name_width = @max(name_width, name.len); live += 1; @@ -767,7 +772,7 @@ fn cmdWait(alloc: std.mem.Allocator, args: []const [:0]const u8) !void { fail(exit_runtime, "malformed peek response", .{}); if (std.mem.indexOf(u8, peek.screen, needle) != null) return; } else { - const info = try sessionInfo(alloc, dir, name) orelse + const info = try sessionInfo(alloc, dir, name, null) orelse fail(exit_no_session, "no session named {s}", .{name}); defer alloc.free(info.text); if (info.out_idle_ms >= idle_settle_ms) return; @@ -810,7 +815,7 @@ fn cmdKill(alloc: std.mem.Allocator, args: []const [:0]const u8) !void { for (sessions) |name| { const sock = try paths.socketPath(alloc, dir, name); defer alloc.free(sock); - const result = client.control(alloc, sock, &.{"quit"}) catch { + const result = client.control(alloc, sock, &.{"quit"}, null) catch { std.fs.cwd().deleteFile(sock) catch {}; continue; }; diff --git a/src/protocol.zig b/src/protocol.zig index f618c35..e5e097a 100644 --- a/src/protocol.zig +++ b/src/protocol.zig @@ -79,6 +79,23 @@ pub fn writeMsg(fd: std.posix.fd_t, msg_type: MsgType, payload: []const u8) !voi try writeAll(fd, payload); } +/// Append a full frame (header + payload) to a growable buffer, for +/// callers that queue output to flush later instead of writing it to a +/// fd right away (e.g. a non-blocking socket with backpressure). +pub fn appendMsg( + alloc: std.mem.Allocator, + list: *std.ArrayList(u8), + msg_type: MsgType, + payload: []const u8, +) !void { + std.debug.assert(payload.len <= max_payload); + var header: [header_len]u8 = undefined; + header[0] = @intFromEnum(msg_type); + std.mem.writeInt(u32, header[1..5], @intCast(payload.len), .little); + try list.appendSlice(alloc, &header); + try list.appendSlice(alloc, payload); +} + pub fn writeAll(fd: std.posix.fd_t, bytes: []const u8) !void { var i: usize = 0; while (i < bytes.len) i += try std.posix.write(fd, bytes[i..]); diff --git a/src/ui.zig b/src/ui.zig index 81bb444..ba31165 100644 --- a/src/ui.zig +++ b/src/ui.zig @@ -38,6 +38,12 @@ const log = std.log.scoped(.ui); /// immediate re-poll whenever the focused session changes its own /// title), so this bounds how stale a background row can look. const refresh_interval_ms: i64 = 250; +/// Upper bound on a sidebar control round-trip (info, cwd, rename, +/// quit). The daemon write-deadlock fix keeps these replies prompt, so +/// this is a safety net: if a daemon ever stops answering, the ui gives +/// up instead of freezing. A timed-out poll leaves the session in place +/// because a slow daemon is not a dead one. +const control_timeout_ms: u32 = 3000; /// Transient status messages stay visible this long. const message_ttl_ms: i64 = 4000; /// Render coalescing: at most one repaint per interval while output @@ -2157,7 +2163,7 @@ const Ui = struct { const main = @import("main.zig"); for (names) |name| { - const info = main.sessionInfo(self.alloc, self.dir, name) catch continue orelse continue; + const info = main.sessionInfo(self.alloc, self.dir, name, control_timeout_ms) catch continue orelse continue; defer self.alloc.free(info.text); try fresh.append(self.alloc, .{ .name = try self.alloc.dupe(u8, name), @@ -2642,7 +2648,7 @@ const Ui = struct { if (idx >= self.sessions.items.len) return null; const sock = paths.socketPath(self.alloc, self.dir, self.sessions.items[idx].name) catch return null; defer self.alloc.free(sock); - const result = client.control(self.alloc, sock, &.{"cwd"}) catch return null; + const result = client.control(self.alloc, sock, &.{"cwd"}, control_timeout_ms) catch return null; if (!result.ok or result.text.len == 0) { self.alloc.free(result.text); return null; @@ -2716,7 +2722,7 @@ const Ui = struct { const sock = paths.socketPath(self.alloc, self.dir, entry.name) catch return; defer self.alloc.free(sock); - const result = client.control(self.alloc, sock, &.{ "rename", new_name }) catch { + const result = client.control(self.alloc, sock, &.{ "rename", new_name }, control_timeout_ms) catch { self.setMessage("rename failed", .{}); return; }; @@ -2739,7 +2745,12 @@ const Ui = struct { const sock = paths.socketPath(self.alloc, self.dir, name) catch return; defer self.alloc.free(sock); - const result = client.control(self.alloc, sock, &.{"quit"}) catch { + const result = client.control(self.alloc, sock, &.{"quit"}, control_timeout_ms) catch |err| { + if (err == error.Timeout) { + // Slow, not dead: keep the socket and tell the user. + self.setMessage("kill {s} timed out", .{name}); + return; + } // The daemon is already gone; remove the stale socket. std.fs.cwd().deleteFile(sock) catch {}; self.refreshSessions() catch {};