From 4c7157362c1aeb18151d96a91d4c561bf10cec8b Mon Sep 17 00:00:00 2001 From: Clemens Elflein Date: Tue, 26 May 2026 18:29:48 +0200 Subject: [PATCH 1/2] feat: dedicated callback thread, on_configured hook, send_config() All user-visible callbacks (on_connected, on_disconnected, on_configured, on_*_changed) now run on a per-ServiceInterface daemon thread (xbot-cb-*) instead of spawning a new thread per event. This means: - Callbacks are serialised and arrive in order - RPC calls (call_*) are safe inside any callback without deadlock New API: - on_configured: fires after connect + initial config transaction sent - send_config(): explicitly push register values when already connected - RegisterProxy no longer auto-sends on assignment while connected lcd_hello.py rewritten as a button-driven counter using on_configured and on_gpio_event_changed callbacks. gpio.json updated to two inputs. Add _join_callbacks() test helper; update tests that assert on async callback delivery to drain the queue before asserting. --- xbot_service_interface_py/examples/gpio.json | 13 +- .../examples/lcd_hello.py | 262 ++++++++++-------- .../tests/test_interface.py | 55 +++- .../xbot_service_interface/interface.py | 102 +++++-- .../xbot_service_interface/manager.py | 78 ++++++ 5 files changed, 367 insertions(+), 143 deletions(-) diff --git a/xbot_service_interface_py/examples/gpio.json b/xbot_service_interface_py/examples/gpio.json index b9b8d97..008ded2 100644 --- a/xbot_service_interface_py/examples/gpio.json +++ b/xbot_service_interface_py/examples/gpio.json @@ -1,17 +1,16 @@ { "gpios": [ + { + "direction": "input", + "id": 6, + "line": "GPIO6", + "name": "Input GPIO6" + }, { "direction": "input", "id": 7, "line": "GPIO7", "name": "Input GPIO7" - }, - { - "default": 0, - "direction": "output", - "id": 6, - "line": "GPIO6", - "name": "Output GPIO6" } ], "i2c": [ diff --git a/xbot_service_interface_py/examples/lcd_hello.py b/xbot_service_interface_py/examples/lcd_hello.py index 297c4b4..34f0aef 100644 --- a/xbot_service_interface_py/examples/lcd_hello.py +++ b/xbot_service_interface_py/examples/lcd_hello.py @@ -1,154 +1,190 @@ """ -Write "hello world" to a 1602 LCD connected via PCF8574 I2C expander. +Counter display on a 1602 LCD via RemoteGPIOService (id=10). -Uses RemoteGPIOService (id=10). Configures with gpio.json (heatshrink-compressed) -and 1000 ms periodic update interval. - -Typical PCF8574 addresses: 0x27 (A0-A2 high) or 0x3F (A0-A2 low). +GPIO6 = button → count down +GPIO7 = button → count up +Line 0: counter value +Line 1: custom icon strip (CGRAM slots 0-7) """ -import sys import time import logging +import argparse from pathlib import Path import heatshrink2 -sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) - from xbot_service_interface import XbotServiceIo, ServiceInterface logging.basicConfig(level=logging.WARNING) -# ── Config ──────────────────────────────────────────────────────────────────── - BIND_IP = '0.0.0.0' GPIO_JSON = Path(__file__).parent / 'gpio.json' -LCD_ADDR = 0x27 # PCF8574 I2C address — change to 0x3F if needed -I2C_BUS = 0 # matches "id": 0 in gpio.json - -# ── HD44780 via PCF8574 ─────────────────────────────────────────────────────── -# PCF8574 pin mapping: -# P7-P4 → D7-D4 (high nibble) -# P3 → Backlight -# P2 → Enable -# P1 → R/W (always 0 = write) -# P0 → RS (0=command, 1=data) - -_BL = 0x08 -_EN = 0x04 -_RS = 0x01 - - -def _nibble_bytes(nibble: int, flags: int) -> bytes: - """Pulse Enable for one 4-bit nibble. Returns 2 I2C bytes.""" - b = (nibble & 0xF0) | flags | _BL - return bytes([b | _EN, b & ~_EN]) - - -def _byte_bytes(value: int, rs: int) -> bytes: - """Encode a full byte as two nibble pulses (4 I2C bytes).""" - return (_nibble_bytes(value & 0xF0, rs) + - _nibble_bytes((value << 4) & 0xF0, rs)) - - -def _tx(svc, data: bytes): - svc.call_i2c_transmit(I2C_BUS, LCD_ADDR, data, timeout_ms=1500) - - -def lcd_cmd(svc, cmd: int): - _tx(svc, _byte_bytes(cmd, 0)) - - -def lcd_char(svc, ch: str): - _tx(svc, _byte_bytes(ord(ch), _RS)) - - -def lcd_init(svc): - lcd_cmd(svc, 0x33) - lcd_cmd(svc, 0x32) - lcd_cmd(svc, 0x06) - lcd_cmd(svc, 0x0C) - lcd_cmd(svc, 0x28) - lcd_cmd(svc, 0x01) - time.sleep(0.0005) - - -def lcd_write(svc, text: str, line: int = 0): - addr = 0x80 if line == 0 else 0xC0 - lcd_cmd(svc, addr) - for ch in text[:16]: - lcd_char(svc, ch) - - -def create_char(svc, location, charmap): - """Write custom char to CGRAM""" - location &= 0x7 # Only 8 slots (0–7) - lcd_cmd(svc, 0x40 | (location << 3)) - for byte in charmap: - _tx(svc, _byte_bytes(byte, _RS)) - - -def define_custom_characters(svc): - # emergency - create_char(svc, 0, [0x0E, 0x0E, 0x0E, 0x0E, 0x0E, 0x00, 0x0E, 0x0E]) - # battery empty - create_char(svc, 1, [0x0E, 0x11, 0x11, 0x11, 0x11, 0x11, 0x1F, 0x00]) - # battery 50% - create_char(svc, 2, [0x0E, 0x11, 0x11, 0x11, 0x1F, 0x1F, 0x1F, 0x00]) - # battery full - create_char(svc, 3, [0x0E, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x00]) - # battery charging - create_char(svc, 4, [0x0E, 0x1B, 0x17, 0x11, 0x1D, 0x1B, 0x1F, 0x00]) - # gps no rtk - create_char(svc, 5, [0x00, 0x0E, 0x19, 0x15, 0x13, 0x0E, 0x00, 0x00]) - # gps rtk float - create_char(svc, 6, [0x00, 0x0E, 0x11, 0x11, 0x11, 0x0E, 0x00, 0x00]) - # gps rtk fixed - create_char(svc, 7, [0x00, 0x0E, 0x1F, 0x1B, 0x1F, 0x0E, 0x00, 0x00]) +GPIO_DOWN = 6 +GPIO_UP = 7 + + +# ── LcdDisplay ──────────────────────────────────────────────────────────────── + +class LcdDisplay: + """HD44780 1602 LCD over PCF8574 I2C expander via RemoteGPIOService. + + PCF8574 pin mapping: + P7-P4 → D7-D4 (high nibble) + P3 → Backlight + P2 → Enable + P1 → R/W (always 0 = write) + P0 → RS (0=command, 1=data) + + Call configure() each time the service connects; it initialises the + controller and uploads all custom characters to CGRAM. + """ + + # PCF8574 control bits + _BL = 0x08 + _EN = 0x04 + _RS = 0x01 + + # Typical PCF8574 addresses: 0x27 (A0-A2 high) or 0x3F (A0-A2 low) + DEFAULT_ADDR = 0x27 + DEFAULT_I2C_BUS = 0 # matches "id": 0 in gpio.json + + # Custom CGRAM characters (slots 0-7) + CUSTOM_CHARS = [ + [0x0E, 0x0E, 0x0E, 0x0E, 0x0E, 0x00, 0x0E, 0x0E], # 0: emergency + [0x0E, 0x11, 0x11, 0x11, 0x11, 0x11, 0x1F, 0x00], # 1: battery empty + [0x0E, 0x11, 0x11, 0x11, 0x1F, 0x1F, 0x1F, 0x00], # 2: battery 50% + [0x0E, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x00], # 3: battery full + [0x0E, 0x1B, 0x17, 0x11, 0x1D, 0x1B, 0x1F, 0x00], # 4: battery charging + [0x00, 0x0E, 0x19, 0x15, 0x13, 0x0E, 0x00, 0x00], # 5: gps no rtk + [0x00, 0x0E, 0x11, 0x11, 0x11, 0x0E, 0x00, 0x00], # 6: gps rtk float + [0x00, 0x0E, 0x1F, 0x1B, 0x1F, 0x0E, 0x00, 0x00], # 7: gps rtk fixed + ] + + # Icon strip: emergency, gps no-rtk, gps float, gps fixed, + # bat empty, bat 50%, bat full, bat charging + ICON_STRIP = [0, 5, 6, 7, 1, 2, 3, 4] + + def __init__(self, svc: ServiceInterface, + i2c_bus: int = DEFAULT_I2C_BUS, + addr: int = DEFAULT_ADDR): + self._svc = svc + self._bus = i2c_bus + self._addr = addr + + # ── low-level helpers ───────────────────────────────────────────────────── + + def _tx(self, data: bytes) -> None: + self._svc.call_i2c_transmit(self._bus, self._addr, data, timeout_ms=1500) + + def _nibble(self, nibble: int, flags: int) -> bytes: + b = (nibble & 0xF0) | flags | self._BL + return bytes([b | self._EN, b & ~self._EN]) + + def _encode(self, value: int, rs: int) -> bytes: + return (self._nibble(value & 0xF0, rs) + + self._nibble((value << 4) & 0xF0, rs)) + + def _cmd(self, cmd: int) -> None: + self._tx(self._encode(cmd, 0)) + + def _data(self, value: int) -> None: + self._tx(self._encode(value, self._RS)) + + # ── public API ──────────────────────────────────────────────────────────── + + def configure(self) -> None: + """Initialise controller and upload custom characters. Call on connect.""" + self._cmd(0x33) + self._cmd(0x32) + self._cmd(0x06) + self._cmd(0x0C) + self._cmd(0x28) + self._cmd(0x01) + time.sleep(0.0005) + for slot, charmap in enumerate(self.CUSTOM_CHARS): + self._cmd(0x40 | (slot << 3)) + for byte in charmap: + self._data(byte) + + def write(self, text: str, line: int = 0) -> None: + """Write up to 16 chars of text to line 0 or 1.""" + self._cmd(0x80 if line == 0 else 0xC0) + for ch in text[:16]: + self._data(ord(ch)) + + def write_icons(self, slots: list[int], line: int = 1) -> None: + """Write CGRAM slot numbers as characters (custom icons).""" + self._cmd(0x80 if line == 0 else 0xC0) + for slot in slots[:16]: + self._data(slot) # ── Main ────────────────────────────────────────────────────────────────────── def main(): - xbot = XbotServiceIo(bind_ip=BIND_IP) + parser = argparse.ArgumentParser(description='LCD counter via RemoteGPIOService') + parser.add_argument('--bind', '-b', default=BIND_IP, metavar='IP', + help='local interface IP to bind (default: 0.0.0.0)') + args = parser.parse_args() + + xbot = XbotServiceIo(bind_ip=args.bind) svc = ServiceInterface(service_id=10) xbot.register(svc) gpio_blob = heatshrink2.compress( GPIO_JSON.read_bytes(), window_sz2=9, lookahead_sz2=5) - @svc.on_connected - def connected(): + svc.registers['gpio_configs'] = gpio_blob + svc.registers['periodic_update_interval'] = 1000 + + lcd = LcdDisplay(svc) + counter = 0 + + @svc.on_gpio_event_changed + def gpio_event(value, _ts): + nonlocal counter + gpio_id = value[0] + # value[1] = level (1 = pressed for active-high), value[2] = flags + if value[1] == 0: + return + if gpio_id == GPIO_DOWN: + counter -= 1 + elif gpio_id == GPIO_UP: + counter += 1 + try: + lcd.write(f"Count: {counter:<9}", line=0) + except Exception as e: + print(f"LCD update error: {e}") + + @svc.on_configured + def configured(): print("RemoteGPIOService connected — initialising LCD…") - svc.registers['gpio_configs'] = gpio_blob - svc.registers['periodic_update_interval'] = 1000 try: - lcd_init(svc) - define_custom_characters(svc) - print("Done.") + lcd.configure() + lcd.write(f"Count: {counter:<9}", line=0) + lcd.write_icons(LcdDisplay.ICON_STRIP, line=1) + print("LCD ready.") except Exception as e: - print(f"LCD error: {e}") + print(f"LCD init error: {e}") + return + try: + svc.call_subscribe_gpio(GPIO_DOWN, 0) + svc.call_subscribe_gpio(GPIO_UP, 0) + print(f"Subscribed to GPIO{GPIO_DOWN} (down) and GPIO{GPIO_UP} (up).") + except Exception as e: + print(f"GPIO subscribe error: {e}") @svc.on_disconnected def disconnected(): print("RemoteGPIOService disconnected.") - - xbot.start() - print("Waiting for RemoteGPIOService (id=10)…") + local_ip, local_port = xbot._io.get_endpoint() + print(f"Listening on {local_ip}:{local_port} — waiting for RemoteGPIOService (id=10)…") try: - counter = 0 while xbot.ok(): - if svc.connected: - try: - lcd_write(svc, f"Counter: {counter}", line=0) - except Exception as e: - print(f"Update error: {e}") - counter += 1 - - time.sleep(1.0) + time.sleep(0.1) except KeyboardInterrupt: pass finally: diff --git a/xbot_service_interface_py/tests/test_interface.py b/xbot_service_interface_py/tests/test_interface.py index fa498e3..e4a1db8 100644 --- a/xbot_service_interface_py/tests/test_interface.py +++ b/xbot_service_interface_py/tests/test_interface.py @@ -209,6 +209,7 @@ def test_multiple_connected_callbacks(self): si.on_connected(cb2) si.on_connected(cb3) si._on_claim_ack() + si._join_callbacks() cb1.assert_called_once() cb2.assert_called_once() cb3.assert_called_once() @@ -229,6 +230,7 @@ def test_on_disconnected_fires_callbacks(self): cb = MagicMock() si.on_disconnected(cb) si._on_disconnected() + si._join_callbacks() cb.assert_called_once() def test_connected_callback_exception_does_not_propagate(self): @@ -238,6 +240,39 @@ def bad_cb(): raise RuntimeError("boom") si._on_claim_ack() # should not raise assert si._connected + def test_on_configured_decorator(self): + si = ServiceInterface(service_id=1) + cb = MagicMock() + ret = si.on_configured(cb) + assert ret is cb + assert cb in si._configured_callbacks + + def test_on_configured_fires_on_config_request(self): + si = make_si(connected=True) + cb = MagicMock() + si.on_configured(cb) + si._on_config_request() + si._join_callbacks() + cb.assert_called_once() + + def test_on_configured_fires_even_with_no_registers(self): + si = make_si(connected=True) + cb = MagicMock() + si.on_configured(cb) + si._on_config_request() # no registers set → empty send, but callback still fires + si._join_callbacks() + cb.assert_called_once() + + def test_on_claim_ack_does_not_auto_send_config(self): + schema_obj = ServiceSchema.from_dict(ECHO_DESC) + si = ServiceInterface(service_id=1) + object.__setattr__(si, '_active_schema', schema_obj) + mock_io = MagicMock() + object.__setattr__(si, '_io', mock_io) + si._register_values['Prefix'] = 'hello' + si._on_claim_ack() + mock_io.send_transaction.assert_not_called() + # --------------------------------------------------------------------------- # _on_service_discovered — Mode 1 and Mode 2 @@ -300,6 +335,7 @@ def test_dispatches_string_output(self): si.on_echo_changed = cb raw = pack_value('char[100]', 'hello world') si._on_data(timestamp=999, target_id=0, payload=raw) + si._join_callbacks() cb.assert_called_once_with('hello world', 999) def test_dispatches_uint32_output(self): @@ -308,6 +344,7 @@ def test_dispatches_uint32_output(self): si.on_message_count_changed = cb raw = pack_value('uint32_t', 42) si._on_data(timestamp=0, target_id=1, payload=raw) + si._join_callbacks() cb.assert_called_once_with(42, 0) def test_unknown_target_id_ignored(self): @@ -333,6 +370,7 @@ def test_timestamp_passed_to_callback(self): si.on_message_count_changed = cb raw = pack_value('uint32_t', 7) si._on_data(timestamp=12345678, target_id=1, payload=raw) + si._join_callbacks() assert cb.call_args[0][1] == 12345678 @@ -411,13 +449,20 @@ def test_contains(self): assert 'X' in si.registers assert 'Y' not in si.registers - def test_set_while_connected_sends_all_registers(self): - # Setting one register must send ALL registers in a single config - # transaction because the service resets all registers before applying. + def test_set_while_connected_does_not_auto_send(self): + # Setting a register while connected does NOT auto-send — + # caller must invoke send_config() explicitly. + si = make_si(connected=True) + si.registers['EchoCount'] = 3 + si._io.send_transaction.assert_not_called() + + def test_send_config_sends_all_registers(self): + # send_config() must send ALL registers in a single config transaction + # because the service resets all registers before applying. si = make_si(connected=True) - si._register_values['Prefix'] = 'hello' # pre-populate + si._register_values['Prefix'] = 'hello' si._register_values['EchoCount'] = 2 - si.registers['EchoCount'] = 3 # update one register + si.send_config() si._io.send_transaction.assert_called_once() _, chunks, *_ = si._io.send_transaction.call_args[0] ids = {c[0] for c in chunks} diff --git a/xbot_service_interface_py/xbot_service_interface/interface.py b/xbot_service_interface_py/xbot_service_interface/interface.py index 2a42da9..110b96e 100644 --- a/xbot_service_interface_py/xbot_service_interface/interface.py +++ b/xbot_service_interface_py/xbot_service_interface/interface.py @@ -1,3 +1,4 @@ +import queue import threading import logging from pathlib import Path @@ -23,9 +24,9 @@ class RegisterProxy: iface.registers['EchoCount'] = 2 val = iface.registers['Prefix'] - Setting a register while the service is connected immediately sends a - single-register config transaction. Otherwise the value is stored and - sent on the next CONFIGURATION_REQUEST. + Values set before connection are automatically sent as a configuration + transaction when the service connects. To send config manually (e.g. after + updating registers while connected) call iface.send_config(). """ def __init__(self, si: 'ServiceInterface'): @@ -35,12 +36,6 @@ def __setitem__(self, name: str, value) -> None: si = object.__getattribute__(self, '_si') si._register_values[name] = value - # Must send ALL registers in one config transaction — the service resets - # all registers to defaults before applying the received transaction, so - # sending only the changed register leaves required registers invalid. - if si._connected and si._active_schema is not None and si._io is not None: - si._on_config_request() - def __getitem__(self, name: str): si = object.__getattribute__(self, '_si') if name not in si._register_values: @@ -152,8 +147,12 @@ class ServiceInterface: @iface.on_connected def handler(): ... + @iface.on_configured + def handler(): ... # fires after connect + initial config sent + Register access: - iface.registers['Prefix'] = "hello: " + iface.registers['Prefix'] = "hello: " # set before or after connect + iface.send_config() # push updated values manually Atomic send: with iface.transaction(): @@ -186,6 +185,7 @@ def __init__(self, service_id: int, object.__setattr__(self, '_output_callbacks_by_id', {}) # id → callable (pre-discovery) object.__setattr__(self, '_connected_callbacks', []) object.__setattr__(self, '_disconnected_callbacks', []) + object.__setattr__(self, '_configured_callbacks', []) object.__setattr__(self, '_register_values', {}) # name → python value @@ -206,6 +206,40 @@ def __init__(self, service_id: int, object.__setattr__(self, 'send_input', _ByIdProxy(self, 'input')) object.__setattr__(self, 'on_output', _ByIdProxy(self, 'output')) + # Dedicated callback thread — all user callbacks run here so they can + # safely make RPC calls without blocking the IO recv thread. + cb_queue = queue.SimpleQueue() + object.__setattr__(self, '_cb_queue', cb_queue) + t = threading.Thread(target=self._cb_worker, daemon=True, + name=f'xbot-cb-{service_id}') + object.__setattr__(self, '_cb_thread', t) + t.start() + + # ------------------------------------------------------------------ + # Callback worker + # ------------------------------------------------------------------ + + def _cb_worker(self) -> None: + q = object.__getattribute__(self, '_cb_queue') + while True: + fn = q.get() + if fn is None: + return + try: + fn() + except Exception: + log.exception("Error in service callback") + + def _dispatch(self, fn: Callable) -> None: + """Schedule fn to run on the callback thread.""" + self._cb_queue.put(fn) + + def _join_callbacks(self) -> None: + """Block until all currently queued callbacks have run. For tests.""" + done = threading.Event() + self._cb_queue.put(done.set) + done.wait() + # ------------------------------------------------------------------ # Lifecycle callbacks # ------------------------------------------------------------------ @@ -225,6 +259,25 @@ def on_disconnected(self, callback: Callable) -> Callable: self._disconnected_callbacks.append(callback) return callback + def on_configured(self, callback: Callable) -> Callable: + """Register configured callback. Use as decorator or direct call. + + Fires once the service is connected and the initial configuration + transaction has been sent (or immediately if no registers are set). + The service is fully usable when this fires. + """ + self._configured_callbacks.append(callback) + return callback + + def send_config(self) -> None: + """Send current register values as a configuration transaction. + + Call this to push updated register values while already connected. + Registers set before connection are sent automatically — no need + to call send_config() for the initial configuration. + """ + self._on_config_request() + # ------------------------------------------------------------------ # Transaction context manager # ------------------------------------------------------------------ @@ -396,8 +449,6 @@ def _on_service_discovered(self, ip: str, port: int, def _on_claim_ack(self) -> None: object.__setattr__(self, '_connected', True) log.info(f"ServiceInterface {self._service_id} connected") - # Fire callbacks in a separate thread so callers can make RPC calls - # without deadlocking the IO receive thread. callbacks = list(self._connected_callbacks) def _fire(): for cb in callbacks: @@ -405,8 +456,7 @@ def _fire(): cb() except Exception: log.exception("Error in on_connected callback") - threading.Thread(target=_fire, daemon=True, - name=f'xbot-connected-{self._service_id}').start() + self._dispatch(_fire) def _on_data(self, timestamp: int, target_id: int, payload: bytes) -> None: schema = self._active_schema @@ -422,9 +472,16 @@ def _on_data(self, timestamp: int, target_id: int, payload: bytes) -> None: return try: value = unpack_value(ch['type_str'], payload, schema.enums_dict) - cb(value, timestamp) except Exception: - log.exception(f"Error dispatching data for channel {ch['name']!r}") + log.exception(f"Error unpacking data for channel {ch['name']!r}") + return + ch_name = ch['name'] + def _fire(): + try: + cb(value, timestamp) + except Exception: + log.exception(f"Error in callback for channel {ch_name!r}") + self._dispatch(_fire) def _on_transaction_start(self, timestamp: int) -> None: pass # Reserved for future use @@ -473,6 +530,16 @@ def _on_config_request(self) -> None: elif not chunks: log.debug(f"No registers to send for service {self._service_id}") + configured_cbs = list(self._configured_callbacks) + if configured_cbs: + def _fire(): + for cb in configured_cbs: + try: + cb() + except Exception: + log.exception("Error in on_configured callback") + self._dispatch(_fire) + def _on_rpc_response(self, call_id: int, status: int, payload: bytes) -> None: with self._rpc_condition: if not self._rpc_call_active or call_id != self._pending_call_id: @@ -498,5 +565,4 @@ def _fire(): cb() except Exception: log.exception("Error in on_disconnected callback") - threading.Thread(target=_fire, daemon=True, - name=f'xbot-disconnected-{self._service_id}').start() + self._dispatch(_fire) diff --git a/xbot_service_interface_py/xbot_service_interface/manager.py b/xbot_service_interface_py/xbot_service_interface/manager.py index 026fcca..1d30d8d 100644 --- a/xbot_service_interface_py/xbot_service_interface/manager.py +++ b/xbot_service_interface_py/xbot_service_interface/manager.py @@ -1,3 +1,81 @@ +""" +Threading model +=============== + +Four daemon threads are created per XbotServiceIo + ServiceInterface pair. +All four start when XbotServiceIo.start() is called (xbot-cb-* starts at +ServiceInterface construction time). + +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Thread │ Created in │ Role │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ xbot-discovery │ ServiceDiscovery│ Listens on multicast for │ +│ │ │ SERVICE_ADVERTISEMENT packets. │ +│ │ │ On receipt: calls XbotServiceIo. │ +│ │ │ _on_service_found() *synchronously* │ +│ │ │ on this thread, which validates the │ +│ │ │ schema and registers the service with │ +│ │ │ ServiceIO. No user callbacks fired. │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ xbot-io-recv │ ServiceIO │ Receives unicast UDP from services. │ +│ │ │ For each packet calls the matching │ +│ │ │ interface internal callback directly: │ +│ │ │ │ +│ │ │ CLAIM ack → _on_claim_ack │ +│ │ │ DATA → _on_data │ +│ │ │ TRANSACTION → _on_transaction_start, │ +│ │ │ _on_data (per chunk), │ +│ │ │ _on_transaction_end │ +│ │ │ CONFIG_REQ → _on_config_request │ +│ │ │ (sends config *here*, │ +│ │ │ then enqueues cb) │ +│ │ │ RPC_RESPONSE→ _on_rpc_response │ +│ │ │ (unblocks rpc cond.) │ +│ │ │ │ +│ │ │ Must NEVER block. Internal callbacks │ +│ │ │ enqueue work onto xbot-cb-* and │ +│ │ │ return immediately. │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ xbot-io-watchdog │ ServiceIO │ 1 s tick. Retries CLAIM until ack. │ +│ │ │ On heartbeat timeout: marks service │ +│ │ │ unclaimed, calls _on_disconnected │ +│ │ │ which enqueues user callbacks. │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ xbot-cb-{svc_id} │ ServiceInterface│ Single callback thread per │ +│ │ __init__ │ ServiceInterface. Drains a │ +│ │ │ SimpleQueue of callables. │ +│ │ │ │ +│ │ │ ALL user-visible callbacks run here: │ +│ │ │ on_connected, on_configured, │ +│ │ │ on_disconnected, │ +│ │ │ on_{output}_changed │ +│ │ │ │ +│ │ │ Callbacks are serialized (one at a │ +│ │ │ time, in arrival order). Blocking │ +│ │ │ calls including RPC (call_*) are safe │ +│ │ │ here — the recv thread stays free. │ +└─────────────────────────────────────────────────────────────────────────────┘ + +RPC call flow +───────────── +User code (any thread) calls svc.call_foo(args): + 1. Serializes params, sends RPC_CALL packet (xbot-cb-* or main thread). + 2. Blocks on rpc_condition (Condition.wait_for). + 3. xbot-io-recv receives RPC_RESPONSE → _on_rpc_response() notifies the + condition immediately (no enqueue needed — it's just a notify). + 4. Caller unblocks, reads response, returns. + +If called from xbot-io-recv (e.g. in a raw _on_data override) this would +deadlock. All public user callbacks run on xbot-cb-* so this cannot happen +via the normal API. + +Locks +───── +_lock (ServiceInterface) — guards _transaction_active / _transaction_chunks +_rpc_lock (ServiceInterface) — serialises concurrent RPC callers +_rpc_condition (ServiceInterface) — recv→caller signalling for RPC responses +ServiceIO._lock — guards _services dict and send path +""" import logging from .discovery import ServiceDiscovery From cd2dd9c61e8dca5f9b65ebe50c1c4764f9072277 Mon Sep 17 00:00:00 2001 From: Clemens Elflein Date: Tue, 26 May 2026 18:48:09 +0200 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20review=20findings=20=E2=80=94=20clos?= =?UTF-8?q?e(),=20on=5Fconfigured=20gating,=20clear=20delay,=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add close() to stop the callback thread cleanly; guard _dispatch against calls after shutdown - Gate on_configured callbacks on successful config send: suppress when serialization errors occur or io is unavailable; fire when no registers need sending (treated as success) - Increase HD44780 clear-display post-command delay 0.5 ms → 2 ms to satisfy the ≥1.52 ms datasheet requirement before CGRAM writes - Replace set-based chunk ID assertion with len + sorted list to catch duplicate register IDs - Add test_on_configured_fires_after_config_sent: asserts send_transaction completes before the configured callback runs - Add test_on_configured_suppressed_on_serialize_error - Fix manager.py thread startup comment to unambiguously distinguish XbotServiceIo.start() threads from the ServiceInterface constructor thread --- .../examples/lcd_hello.py | 2 +- .../tests/test_interface.py | 24 +++++++++- .../xbot_service_interface/interface.py | 48 ++++++++++++------- .../xbot_service_interface/manager.py | 7 +-- 4 files changed, 58 insertions(+), 23 deletions(-) diff --git a/xbot_service_interface_py/examples/lcd_hello.py b/xbot_service_interface_py/examples/lcd_hello.py index 34f0aef..04a9261 100644 --- a/xbot_service_interface_py/examples/lcd_hello.py +++ b/xbot_service_interface_py/examples/lcd_hello.py @@ -100,7 +100,7 @@ def configure(self) -> None: self._cmd(0x0C) self._cmd(0x28) self._cmd(0x01) - time.sleep(0.0005) + time.sleep(0.002) # HD44780 clear-display requires ≥1.52 ms for slot, charmap in enumerate(self.CUSTOM_CHARS): self._cmd(0x40 | (slot << 3)) for byte in charmap: diff --git a/xbot_service_interface_py/tests/test_interface.py b/xbot_service_interface_py/tests/test_interface.py index e4a1db8..138ca3c 100644 --- a/xbot_service_interface_py/tests/test_interface.py +++ b/xbot_service_interface_py/tests/test_interface.py @@ -263,6 +263,26 @@ def test_on_configured_fires_even_with_no_registers(self): si._join_callbacks() cb.assert_called_once() + def test_on_configured_fires_after_config_sent(self): + si = make_si(connected=True) + si._register_values['Prefix'] = 'hi' + si._register_values['EchoCount'] = 1 + call_order = [] + si._io.send_transaction.side_effect = lambda *a, **kw: call_order.append('send') + si.on_configured(lambda: call_order.append('cb')) + si._on_config_request() + si._join_callbacks() + assert call_order == ['send', 'cb'] + + def test_on_configured_suppressed_on_serialize_error(self): + si = make_si(connected=True) + si._register_values['EchoCount'] = 'not-an-int' # will fail pack_value + cb = MagicMock() + si.on_configured(cb) + si._on_config_request() + si._join_callbacks() + cb.assert_not_called() + def test_on_claim_ack_does_not_auto_send_config(self): schema_obj = ServiceSchema.from_dict(ECHO_DESC) si = ServiceInterface(service_id=1) @@ -465,8 +485,8 @@ def test_send_config_sends_all_registers(self): si.send_config() si._io.send_transaction.assert_called_once() _, chunks, *_ = si._io.send_transaction.call_args[0] - ids = {c[0] for c in chunks} - assert ids == {0, 1} # both Prefix (id=0) and EchoCount (id=1) sent + assert len(chunks) == 2 + assert sorted(c[0] for c in chunks) == [0, 1] # Prefix id=0, EchoCount id=1, no dupes def test_set_while_disconnected_does_not_send(self): si = ServiceInterface(service_id=1) diff --git a/xbot_service_interface_py/xbot_service_interface/interface.py b/xbot_service_interface_py/xbot_service_interface/interface.py index 110b96e..f89f10b 100644 --- a/xbot_service_interface_py/xbot_service_interface/interface.py +++ b/xbot_service_interface_py/xbot_service_interface/interface.py @@ -230,8 +230,15 @@ def _cb_worker(self) -> None: except Exception: log.exception("Error in service callback") + def close(self) -> None: + """Stop the callback thread. Call when the ServiceInterface is no longer needed.""" + self._cb_queue.put(None) + self._cb_thread.join() + def _dispatch(self, fn: Callable) -> None: """Schedule fn to run on the callback thread.""" + if not self._cb_thread.is_alive(): + return self._cb_queue.put(fn) def _join_callbacks(self) -> None: @@ -503,6 +510,7 @@ def _on_config_request(self) -> None: chunks = [] missing_required = [] + serialize_failed = False for reg in schema.registers: value = self._register_values.get(reg['name'], self._register_values.get(reg['snake_name'], @@ -516,29 +524,35 @@ def _on_config_request(self) -> None: chunks.append((reg['id'], raw)) except Exception as e: log.error(f"Cannot serialize register {reg['name']!r}: {e}") + serialize_failed = True if missing_required: names = ', '.join(missing_required) log.debug( f"Service {self._service_id}: required registers not set: {names}") - if chunks and self._io is not None: - self._io.send_transaction(self._service_id, chunks, is_config=True) - log.info( - f"Sent configuration for service {self._service_id} " - f"({len(chunks)} registers)") - elif not chunks: - log.debug(f"No registers to send for service {self._service_id}") - - configured_cbs = list(self._configured_callbacks) - if configured_cbs: - def _fire(): - for cb in configured_cbs: - try: - cb() - except Exception: - log.exception("Error in on_configured callback") - self._dispatch(_fire) + send_ok = False + if not serialize_failed: + if chunks and self._io is not None: + self._io.send_transaction(self._service_id, chunks, is_config=True) + log.info( + f"Sent configuration for service {self._service_id} " + f"({len(chunks)} registers)") + send_ok = True + elif not chunks: + log.debug(f"No registers to send for service {self._service_id}") + send_ok = True + + if send_ok: + configured_cbs = list(self._configured_callbacks) + if configured_cbs: + def _fire(): + for cb in configured_cbs: + try: + cb() + except Exception: + log.exception("Error in on_configured callback") + self._dispatch(_fire) def _on_rpc_response(self, call_id: int, status: int, payload: bytes) -> None: with self._rpc_condition: diff --git a/xbot_service_interface_py/xbot_service_interface/manager.py b/xbot_service_interface_py/xbot_service_interface/manager.py index 1d30d8d..0400aad 100644 --- a/xbot_service_interface_py/xbot_service_interface/manager.py +++ b/xbot_service_interface_py/xbot_service_interface/manager.py @@ -2,9 +2,10 @@ Threading model =============== -Four daemon threads are created per XbotServiceIo + ServiceInterface pair. -All four start when XbotServiceIo.start() is called (xbot-cb-* starts at -ServiceInterface construction time). +Four daemon threads are created per XbotServiceIo + ServiceInterface pair: +three (xbot-discovery, xbot-io-recv, xbot-io-watchdog) start when +XbotServiceIo.start() is called; xbot-cb-{svc_id} starts at +ServiceInterface construction time. ┌─────────────────────────────────────────────────────────────────────────────┐ │ Thread │ Created in │ Role │