diff --git a/projects/app_thermal160_camera/README.md b/projects/app_thermal160_camera/README.md index ea6cc1d7..39a83801 100644 --- a/projects/app_thermal160_camera/README.md +++ b/projects/app_thermal160_camera/README.md @@ -1,34 +1,66 @@ -# MaixCAM Thermal 160 实时热成像监控 +# MaixCAM Thermal160 实时热成像监控 - 这是一个基于 MaixPy v4 框架开发的实时热成像监控程序,专门为 MaixCAM2(及后续支持型号)设计。该程序通过 UART 接收 160x120 - 分辨率的热成像原始数据,并实时渲染为具有 Ironbow(铁红) 伪彩映射的视频流。 +这是一个基于 MaixPy v4 的 MaixCAM2 Thermal160 实时热成像应用。程序通过 UART 接收 160x120 热成像帧,解析帧尾温度 telemetry,并实时渲染伪彩画面、中心点温度、最高/最低点和 NUC 状态。 ## 硬件要求 -* 设备:MaixCAM2, 可参考该APP 代码移植到其他能够使用串口外设的平台 -* 传感器:支持 PMOD 接口或 UART 输出的 160x120 像素热成像模组。 +* 设备:MaixCAM2。 +* 传感器:Thermal160 / TN160 160x120 热成像模组。 * 连接方式: - * MaixCAM2 默认使用 /dev/ttyS2。 - * 波特率:初始 2,000,000,握手后最高可跳变至 4,000,000。 + * UART:MaixCAM2 默认使用 `/dev/ttyS2`。 + * 波特率:初始 `2,000,000`,发送 `0x44` 握手后切到 `4,000,000`。 + * 复位:MaixCAM2 `A9` 配置为 `GPIOA9`,作为 Thermal160 软件复位脚。 + +## 启动行为 + +MaixCam2 电池供电时,Thermal160 可能不会自动上电/复位到可通信状态。应用启动 UART 前会自动执行一次 A9 软件复位: + +1. `pinmap.set_pin_function("A9", "GPIOA9")` +2. `GPIOA9` 默认释放为高电平 +3. 拉低约 `120ms` +4. 拉高释放并等待约 `400ms` + +当前按“低有效复位”实现。如果硬件接法是高有效,需要在代码顶部对调: + +```python +THERMAL_RESET_IDLE_LEVEL = 0 +THERMAL_RESET_ACTIVE_LEVEL = 1 +``` + +启动后前 3 秒显示 `Initializing thermal160` 和进度条;超过 3 秒仍没有合法帧,才显示 `thermal160 device not found`。 ## 配置说明 - 在代码顶层可以根据需要调整以下常量: +在代码顶层可以根据需要调整以下常量: -* CMAP = True:设为 True 显示彩色热成像,False 则显示原始灰度图(零拷贝,性能更高)。 -* SKIP_COUNT = 10:启动时跳过的初始帧数,用于稳定传感器数据。 +* `SKIP_COUNT = 10`:启动后跳过的初始帧数。 +* `STARTUP_GRACE_SEC = 3.0`:启动等待页到 `Device not found` 的延迟。 +* `THERMAL_RESET_ASSERT_SEC = 0.12`:A9 复位保持时间。 +* `THERMAL_RESET_RELEASE_DELAY_SEC = 0.40`:释放复位后的等待时间。 ## 协议简介 - 程序期望的 UART 数据格式为: +程序期望的 UART 数据格式为: + +* 帧头:`0xFF` +* 像素:`160 * 120 = 19200` 字节,8-bit 温度线性灰度 +* telemetry:30 字节,包含 `VTEMP / t_lo / t_hi / anchor / smooth / mean_diff / NTC / NUC` 等运行状态 +* 总长度:`1 + 19200 + 30 = 19231` 字节 + +注意:像素区允许出现合法的 `0xFF`。同步逻辑不能再用“payload 中出现 `0xFF` 就重同步”的旧方案,而是通过 telemetry 合理性和下一帧帧头位置校验。 + +## 历史记录应用 + +24 小时温度趋势记录已拆成独立应用包: + +```text +../maix-thermal160_camera_history/ +``` -* 帧头:0xFF -* 负载:19,200 字节 (160 * 120) 的单字节灰度数据。 -* 校验:负载中不包含 0xFF。 -总计单包大小为 19201 bytes +该包会每秒记录温度到 CSV,并支持一键保存完整历史 PNG。 ## 注意事项 -* MaixCAM2 兼容性:程序包含针对 MaixCAM2 的波特率切换指令 (0x44),使用其他串口设备时请根据实际通讯协议修改 HardwareHAL 类。 -* 资源释放:程序通过 finally 块确保在退出时安全释放 UART 资源。 -* 当 MaixCam2 仅由电池供电时,如果 TN160 在 MaixCam2 上电前已连接到 MaixCam2,则 MaixCam2 上电后需要手动按下 TN160 的RST复位按键。 \ No newline at end of file +* A9 只能作为复位控制信号,不要用 MaixCAM2 GPIO 直接给 Thermal160 供电。 +* 软件复位、电平有效性和等待时间需要结合实际硬件上板验证。 +* 程序通过 `finally` 释放 UART 资源。 diff --git a/projects/app_thermal160_camera/README_EN.md b/projects/app_thermal160_camera/README_EN.md index 3bb3a9c8..7da988ed 100644 --- a/projects/app_thermal160_camera/README_EN.md +++ b/projects/app_thermal160_camera/README_EN.md @@ -1,33 +1,66 @@ -# MaixCAM Thermal 160 Real-time Thermal Imaging Monitoring +# MaixCAM Thermal160 Real-time Thermal Imaging -This is a real-time thermal imaging monitoring application developed based on the MaixPy v4 framework, specifically designed for MaixCAM2 (and future supported models). The application receives raw thermal imaging data at 160x120 resolution via UART and renders it as a real-time video stream with Ironbow pseudo-color mapping. +This is a MaixPy v4 Thermal160 live-view app for MaixCAM2. It receives 160x120 thermal frames over UART, parses the tail telemetry, and renders pseudo-color live video with center temperature, min/max markers, and NUC status. ## Hardware Requirements -* Device: MaixCAM2, code can be ported to other platforms that support UART peripherals -* Sensor: Thermal imaging module with PMOD interface or UART output supporting 160x120 pixels. -* Connection method: - * MaixCAM2 defaults to /dev/ttyS2. - * Baud rate: Initial 2,000,000, can jump to up to 4,000,000 after handshake. +* Device: MaixCAM2. +* Sensor: Thermal160 / TN160 160x120 thermal imaging module. +* Connections: + * UART: MaixCAM2 uses `/dev/ttyS2` by default. + * Baud rate: starts at `2,000,000`, then switches to `4,000,000` after sending the `0x44` handshake. + * Reset: MaixCAM2 `A9` is configured as `GPIOA9` and used as the Thermal160 software reset pin. + +## Startup Behavior + +When MaixCAM2 is powered from battery, Thermal160 may not automatically enter a usable powered/reset state. Before opening UART, the app sends one A9 reset pulse: + +1. `pinmap.set_pin_function("A9", "GPIOA9")` +2. Release `GPIOA9` high by default +3. Pull it low for about `120ms` +4. Release it high and wait about `400ms` + +The current implementation assumes an active-low reset. If your hardware reset is active-high, swap these constants: + +```python +THERMAL_RESET_IDLE_LEVEL = 0 +THERMAL_RESET_ACTIVE_LEVEL = 1 +``` + +During startup, the UI shows `Initializing thermal160` with a progress bar for 3 seconds. It only shows `thermal160 device not found` if no valid frame is received after that grace period. ## Configuration Adjust the following constants at the top of the code as needed: -* CMAP = True: Set to True to display color thermal imaging, False to display original grayscale (zero-copy, higher performance). -* SKIP_COUNT = 10: Number of initial frames to skip on startup, used to stabilize sensor data. +* `SKIP_COUNT = 10`: Initial frames skipped after startup. +* `STARTUP_GRACE_SEC = 3.0`: Delay before showing `Device not found`. +* `THERMAL_RESET_ASSERT_SEC = 0.12`: A9 reset assert time. +* `THERMAL_RESET_RELEASE_DELAY_SEC = 0.40`: Delay after releasing reset. ## Protocol Overview The expected UART data format is: -* Header: 0xFF -* Payload: 19,200 bytes (160 * 120) of single-byte grayscale data. -* Checksum: Payload does not contain 0xFF. -* Total packet size: 19201 bytes +* Header: `0xFF` +* Pixels: `160 * 120 = 19200` bytes of 8-bit temperature-linear grayscale data +* Telemetry: 30 bytes, including `VTEMP / t_lo / t_hi / anchor / smooth / mean_diff / NTC / NUC` runtime state +* Total size: `1 + 19200 + 30 = 19231` bytes -## Notes +Note: valid pixel data may contain `0xFF`. The parser must not resync only because `0xFF` appears in the pixel payload. It validates frame alignment using telemetry plausibility and the next-frame header position. + +## History Recorder App + +The 24-hour temperature history recorder is a separate app package: -* MaixCAM2 Compatibility: The program includes baud rate switching commands (0x44) specifically for MaixCAM2. When using other UART devices, please modify the HardwareHAL class according to the actual communication protocol. -* Resource Release: The program ensures safe UART resource release on exit through the finally block. +```text +../maix-thermal160_camera_history/ +``` + +It records one temperature sample per second to CSV and supports one-tap full-history PNG export. + +## Notes +* A9 is only a reset control signal. Do not power Thermal160 directly from a MaixCAM2 GPIO. +* Reset polarity and timing still need validation on real hardware. +* UART resources are released in the `finally` block on exit. diff --git a/projects/app_thermal160_camera/app.yaml b/projects/app_thermal160_camera/app.yaml index 0b3eea0c..8623f9f9 100644 --- a/projects/app_thermal160_camera/app.yaml +++ b/projects/app_thermal160_camera/app.yaml @@ -10,3 +10,8 @@ include: - assets/thermal.json - app.yaml - main.py +files: + - assets\thermal.json + - main.py + - README_EN.md + - README.md diff --git a/projects/app_thermal160_camera/main.py b/projects/app_thermal160_camera/main.py index 09b5a7f6..5fbe277c 100644 --- a/projects/app_thermal160_camera/main.py +++ b/projects/app_thermal160_camera/main.py @@ -3,65 +3,56 @@ import time from typing import Tuple -import numpy as np import cv2 -from maix import app, image, display, touchscreen +import numpy as np +from maix import app, display, gpio, image, pinmap, touchscreen from maix.peripheral import uart from maix.sys import device_name + PMOD_W = 160 PMOD_H = 120 FRAME_PIXEL_SIZE = PMOD_W * PMOD_H -# 物理帧尾 30 字节(与 PC 端 thermocam_gui.exe protocol.py 的 FRAME_SIZE -# = 19231 一致:1 引导 FF + 19200 像素 + 30 telemetry)。本文件只解析前 -# 6 字节 (vtemp/t_lo/t_hi),但 FRAME_SIZE 必须按物理长度算,否则下一帧 -# 紧挨校验会落在 NTC 字节上把每帧都误杀。 FRAME_TAIL_SIZE = 30 -FRAME_SIZE = FRAME_PIXEL_SIZE + FRAME_TAIL_SIZE -SKIP_COUNT = 10 -CMAP = True # 渲染管线分支路由开关:True为热成像伪彩映射,False为原生灰度零拷贝 +FRAME_BODY_SIZE = FRAME_PIXEL_SIZE + FRAME_TAIL_SIZE INT16_MAX = 0x7FFF -# 配置常量 BAUDRATE_INIT = 2000000 BAUDRATE_HIGH = 4000000 +UART_BUFFER_SIZE = 32768 +UART_READ_TIMEOUT_MS = 10 + +SKIP_COUNT = 10 +STARTUP_GRACE_SEC = 3.0 FPS_WINDOW_SIZE = 30 EMA_ALPHA = 0.2 -SERIAL_TIMEOUT = 1 + +BACK_BUTTON_WIDTH_RATIO = 0.1 +DEFAULT_ICON_PATH = "/maixapp/share/icon/ret.png" + FONT_SCALE = 2.0 CENTER_TEMP_SCALE = 1.1 CORNER_TEMP_SCALE = 0.9 -# UART 配置 -# 单次 read 容量大于一帧(19220 B),让每帧只触发 1 次 serial.read 而非 ~5 -# 次,消除多次系统调用的调度开销,是这条流水线最显著的 FPS 瓶颈之一。 -UART_BUFFER_SIZE = 32768 # 读取缓冲区大小 -RETRY_DELAY = 0.1 # 重试延迟(秒) +THERMAL_RESET_PIN = "A9" +THERMAL_RESET_GPIO = "GPIOA9" +THERMAL_RESET_IDLE_LEVEL = 1 +THERMAL_RESET_ACTIVE_LEVEL = 0 +THERMAL_RESET_ASSERT_SEC = 0.12 +THERMAL_RESET_RELEASE_DELAY_SEC = 0.40 -# UI 配置 -BACK_BUTTON_WIDTH_RATIO = 0.1 # 返回按钮宽度占比 -# 资源路径 -DEFAULT_ICON_PATH = "/maixapp/share/icon/ret.png" - -# 配置日志 logging.basicConfig( level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s" + format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) +_thermal_reset_gpio = None -def pixels_to_temp(pixels: np.ndarray, t_lo: float, t_hi: float) -> np.ndarray: - if t_hi <= t_lo: - return np.full_like(pixels, t_lo, dtype=np.float32) - norm = (pixels.astype(np.float32) / 254.0).clip(0.0, 1.0) - return (t_lo + (t_hi - t_lo) * norm).astype(np.float32) - - -def draw_cross(img_disp: image.Image, x: int, y: int, color, size: int = 12) -> None: - img_disp.draw_line(x - size, y, x + size, y, color) - img_disp.draw_line(x, y - size, x, y + size, color) +def is_in_button(x: int, y: int, btn_pos: Tuple[int, int, int, int]) -> bool: + return (btn_pos[0] < x < btn_pos[0] + btn_pos[2] and + btn_pos[1] < y < btn_pos[1] + btn_pos[3]) def draw_label(img_disp: image.Image, x: int, y: int, text: str, color, scale: float = 0.8) -> None: @@ -69,18 +60,18 @@ def draw_label(img_disp: image.Image, x: int, y: int, text: str, color, scale: f img_disp.draw_string(x, y, text, color, scale=scale) -def is_in_button(x: int, y: int, btn_pos: Tuple[int, int, int, int]) -> bool: - return (btn_pos[0] < x < btn_pos[0] + btn_pos[2] and - btn_pos[1] < y < btn_pos[1] + btn_pos[3]) +def draw_cross(img_disp: image.Image, x: int, y: int, color, size: int = 12) -> None: + img_disp.draw_line(x - size, y, x + size, y, color) + img_disp.draw_line(x, y - size, x, y + size, color) def get_back_btn_img(width: int) -> Tuple[image.Image, int, int]: ret_width = int(width * BACK_BUTTON_WIDTH_RATIO) img_back = image.load(DEFAULT_ICON_PATH) w, h = (ret_width, img_back.height() * ret_width // img_back.width()) - if w % 2 != 0: + if w % 2: w += 1 - if h % 2 != 0: + if h % 2: h += 1 img_back = img_back.resize(w, h) img_back = img_back.rotate(0) @@ -88,11 +79,6 @@ def get_back_btn_img(width: int) -> Tuple[image.Image, int, int]: class HardwareHAL: - """Hardware Abstraction Layer for thermal camera device management. - - Provides platform-specific serial port configuration and device detection. - """ - _PORT_REGISTRY = { "MaixCAM2": "/dev/ttyS2", "MaixCAM": None, @@ -104,14 +90,233 @@ class HardwareHAL: def serial_port(cls) -> str: dn = device_name() if not isinstance(dn, str) or not dn.strip(): - raise TypeError(f"Invalid device identifier received: '{dn}'") + raise TypeError("Invalid device identifier received: '%s'" % dn) port = cls._PORT_REGISTRY.get(dn) if port is None: - raise RuntimeError(f"Platform mismatch: Device '{dn}' is not supported") + raise RuntimeError("Platform mismatch: Device '%s' is not supported" % dn) cls._device = dn return port +def pulse_thermal_reset() -> None: + global _thermal_reset_gpio + if HardwareHAL._device != "MaixCAM2": + return + try: + ret = pinmap.set_pin_function(THERMAL_RESET_PIN, THERMAL_RESET_GPIO) + if ret != 0: + logger.warning( + "Thermal160 reset pinmap returned %s for %s -> %s", + ret, THERMAL_RESET_PIN, THERMAL_RESET_GPIO, + ) + _thermal_reset_gpio = gpio.GPIO(THERMAL_RESET_GPIO, gpio.Mode.OUT) + _thermal_reset_gpio.value(THERMAL_RESET_IDLE_LEVEL) + time.sleep(0.02) + _thermal_reset_gpio.value(THERMAL_RESET_ACTIVE_LEVEL) + time.sleep(THERMAL_RESET_ASSERT_SEC) + _thermal_reset_gpio.value(THERMAL_RESET_IDLE_LEVEL) + time.sleep(THERMAL_RESET_RELEASE_DELAY_SEC) + logger.info("Thermal160 reset pulse sent on %s", THERMAL_RESET_PIN) + except Exception as exc: + logger.warning("Thermal160 reset pulse skipped: %s", exc) + + +def open_thermal_uart(): + devices = uart.list_devices() + if not devices: + raise RuntimeError("No available UART devices found") + + port_name = HardwareHAL.serial_port() + pulse_thermal_reset() + serial = uart.UART(port=port_name, baudrate=BAUDRATE_INIT) + if HardwareHAL._device == "MaixCAM2": + serial.write(b"\x44") + serial.close() + time.sleep(0.1) + serial = uart.UART(port=port_name, baudrate=BAUDRATE_HIGH) + return serial + + +def read_uart(serial, size: int, timeout_ms: int): + try: + return serial.read(size, timeout=timeout_ms) + except TypeError: + return serial.read(size, timeout_ms) + + +def telemetry_plausible(body: bytes) -> bool: + if len(body) != FRAME_BODY_SIZE: + return False + telemetry = body[FRAME_PIXEL_SIZE:] + raw_vtemp = struct.unpack_from(">H", telemetry, 0)[0] + if raw_vtemp & 0xC000: + return False + + t_lo_x10 = struct.unpack_from(">h", telemetry, 2)[0] + t_hi_x10 = struct.unpack_from(">h", telemetry, 4)[0] + if t_lo_x10 == INT16_MAX and t_hi_x10 == INT16_MAX: + return True + if not (-1000 <= t_lo_x10 <= 3200): + return False + if not (-1000 <= t_hi_x10 <= 3200): + return False + if t_hi_x10 < t_lo_x10: + return False + return True + + +def parse_frame_body(body: bytes): + telemetry = body[FRAME_PIXEL_SIZE:] + pixels = np.frombuffer(body[:FRAME_PIXEL_SIZE], dtype=np.uint8).reshape(PMOD_H, PMOD_W) + return { + "pixels": pixels, + "vtemp": struct.unpack_from(">H", telemetry, 0)[0] & 0x3FFF, + "t_lo_x10": struct.unpack_from(">h", telemetry, 2)[0], + "t_hi_x10": struct.unpack_from(">h", telemetry, 4)[0], + "anchor": struct.unpack_from(">i", telemetry, 6)[0], + "smooth_low": struct.unpack_from(">H", telemetry, 10)[0], + "smooth_high": struct.unpack_from(">H", telemetry, 12)[0], + "mean_diff": struct.unpack_from(">f", telemetry, 14)[0], + "ntc_ref": struct.unpack_from(">H", telemetry, 18)[0], + "ntc": struct.unpack_from(">H", telemetry, 20)[0], + "nuc_decision": telemetry[22], + "nuc_count": telemetry[23], + "nuc_dg": struct.unpack_from(">h", telemetry, 24)[0], + "nuc_dv": struct.unpack_from(">h", telemetry, 26)[0], + "nuc_dn": struct.unpack_from(">h", telemetry, 28)[0], + } + + +def drain_latest_frame(buffer: bytearray, skip: int, frame_count: int): + latest = None + while True: + idx = buffer.find(b"\xFF") + if idx < 0: + buffer.clear() + break + + end = idx + 1 + FRAME_BODY_SIZE + if len(buffer) < end: + if idx > 0: + del buffer[:idx] + break + + body = bytes(buffer[idx + 1:end]) + if not telemetry_plausible(body): + del buffer[:idx + 1] + continue + + if len(buffer) > end and buffer[end] != 0xFF: + del buffer[:idx + 1] + continue + + del buffer[:end] + frame_count += 1 + if skip <= SKIP_COUNT: + skip += 1 + continue + latest = parse_frame_body(body) + + return latest, skip, frame_count + + +def pixels_to_temp(pixels: np.ndarray, t_lo: float, t_hi: float) -> np.ndarray: + if t_hi <= t_lo: + return np.full_like(pixels, t_lo, dtype=np.float32) + norm = (pixels.astype(np.float32) / 254.0).clip(0.0, 1.0) + return (t_lo + (t_hi - t_lo) * norm).astype(np.float32) + + +def safe_colormap(name: str, fallback_id: int): + cp = getattr(cv2, name, fallback_id) + test = np.arange(256, dtype=np.uint8).reshape(1, 256) + try: + cv2.applyColorMap(test, cp) + return (name.replace("COLORMAP_", "").lower(), cp) + except Exception: + return None + + +def build_colormap_options(): + options = [] + for name, fallback in [ + ("COLORMAP_TURBO", 20), + ("COLORMAP_HOT", 0), + ("COLORMAP_COOL", 1), + ("COLORMAP_DEEPGREEN", 15), + ("COLORMAP_MAGMA", 13), + ]: + entry = safe_colormap(name, fallback) + if entry: + options.append(entry) + if not options: + raise RuntimeError("No supported OpenCV colormap") + return options + + +def get_cv2_lut(cmap_entry): + _, cp = cmap_entry + gray = np.arange(256, dtype=np.uint8).reshape(1, 256) + colored = cv2.applyColorMap(gray, cp) + return cv2.cvtColor(colored, cv2.COLOR_BGR2RGB).reshape(256, 3) + + +def init_text_button(width: int, height: int, label: str) -> image.Image: + btn = image.Image(width, height, image.Format.FMT_RGB888) + btn.draw_rect(0, 0, width, height, image.COLOR_BLACK, -1) + text_x = max(2, (width - len(label) * 6) // 2) + text_y = max(2, (height - 15) // 2) + btn.draw_string(text_x, text_y, label, image.COLOR_WHITE, scale=0.6) + return btn.rotate(0) + + +def draw_wait_page(disp, img_back, img_cmap, elapsed: float, has_error: bool) -> None: + w, h = disp.width(), disp.height() + main_img = image.Image(w, h, image.Format.FMT_RGB888) + main_img.draw_rect(0, 0, w, h, image.COLOR_BLACK, -1) + + if has_error: + title = "thermal160 device not found" + else: + title = "Initializing thermal160" + + char_w = 10 * FONT_SCALE + char_h = 20 * FONT_SCALE + x_msg = int((w - len(title) * char_w) // 2) + y_msg = int((h - char_h) // 2) - 18 + main_img.draw_string(x_msg + 1, y_msg + 1, title, image.COLOR_BLACK, scale=FONT_SCALE) + main_img.draw_string(x_msg, y_msg, title, image.COLOR_WHITE, scale=FONT_SCALE) + + if not has_error: + bar_w = max(80, int(w * 0.58)) + bar_h = max(8, int(h * 0.035)) + bar_x = (w - bar_w) // 2 + bar_y = y_msg + int(char_h) + 22 + progress = max(0.0, min(elapsed / STARTUP_GRACE_SEC, 1.0)) + fill_w = int((bar_w - 4) * progress) + main_img.draw_rect(bar_x, bar_y, bar_w, bar_h, image.COLOR_WHITE, 1) + if fill_w > 0: + main_img.draw_rect(bar_x + 2, bar_y + 2, fill_w, bar_h - 4, image.COLOR_WHITE, -1) + + main_img.draw_image(0, 0, img_back) + if img_cmap is not None: + main_img.draw_image(w - img_cmap.width(), h - img_cmap.height(), img_cmap) + disp.show(main_img) + + +def nuc_status_text(frame) -> str: + decision = frame.get("nuc_decision", 255) + count = frame.get("nuc_count", 0) + dg = frame.get("nuc_dg", 0) + if decision == 1: + return "NUC ACC#%d dg%+d" % (count, dg) + if decision == 2: + return "NUC REJ_DG dg%+d" % dg + if decision == 3: + return "NUC REJ_JMP dg%+d" % dg + return "NUC --" + + def main() -> None: disp = display.Display() disp.set_hmirror(False) @@ -120,252 +325,95 @@ def main() -> None: img_back, img_back_w, img_back_h = get_back_btn_img(disp.width()) back_rect = [0, 0, img_back_w, img_back_h] + cmap_rect = [disp.width() - img_back_w, disp.height() - img_back_h, img_back_w, img_back_h] - img_cmap: image.Image = None - cmap_rect = [ - disp.width() - int(disp.width() * BACK_BUTTON_WIDTH_RATIO), - disp.height() - 0, - 1, - 1, - ] + cmap_options = build_colormap_options() + cmap_idx = 0 + cmap_abbrev = {"hot": "HOT", "cool": "COOL", "deepgreen": "DGRN", "magma": "MAGM", "turbo": "TRBO"} - def init_cmap_btn(label: str) -> None: - nonlocal img_cmap, cmap_rect - w, h = img_back_w, img_back_h - img_cmap = image.Image(w, h, image.Format.FMT_RGB888) - img_cmap.draw_rect(0, 0, w, h, image.COLOR_BLACK, -1) - char_h = 15 - text_y = (h - char_h) // 2 - text_x = max(2, (w - len(label) * 6) // 2) - img_cmap.draw_string(text_x, text_y, label, image.COLOR_WHITE, scale=0.6) - img_cmap = img_cmap.rotate(0) - cmap_rect = [disp.width() - w, disp.height() - h, w, h] + def current_cmap_button(): + name = cmap_options[cmap_idx][0] + label = cmap_abbrev.get(name, name[:5].upper()) + return init_text_button(img_back_w, img_back_h, label) - init_cmap_btn("CMAP") + img_cmap = current_cmap_button() + lut = get_cv2_lut(cmap_options[cmap_idx]) + color_buf = np.zeros((PMOD_H, PMOD_W, 3), dtype=np.uint8) - devices = uart.list_devices() - if not devices: - logger.error( - "Error: No available UART devices found! " - "Hardware HAL execution aborted." - ) + disp_w, disp_h = disp.width(), disp.height() + disp_buffers = [ + np.zeros((disp_h, disp_w, 3), dtype=np.uint8), + np.zeros((disp_h, disp_w, 3), dtype=np.uint8), + ] + db_idx = 0 + + try: + serial = open_thermal_uart() + except Exception as exc: + logger.error("UART init failed: %s", exc) + draw_wait_page(disp, img_back, img_cmap, STARTUP_GRACE_SEC, True) + time.sleep(3.0) return - hw = HardwareHAL() - port_name = hw.serial_port() - serial = uart.UART(port=port_name, baudrate=BAUDRATE_INIT) + buffer = bytearray() + skip = 0 + frame_count = 0 + frame_timestamps = [] + fps_ema = 0.0 + capture_started = False + t_start = time.time() + last_wait_draw = 0.0 + + logger.info("System: Maix test pipeline initialized.") try: - if HardwareHAL._device == "MaixCAM2": - serial.write(b"\x44") - serial.close() - time.sleep(0.1) - serial = uart.UART(port=port_name, baudrate=BAUDRATE_HIGH) - - def _safe_colormap(name: str, fallback_id: int): - cp = getattr(cv2, name, fallback_id) - test = np.arange(256, dtype=np.uint8).reshape(1, 256) - try: - cv2.applyColorMap(test, cp) - return (name.replace("COLORMAP_", "").lower(), cp) - except Exception: - return None - - # TURBO 放首位作为默认:中段为绿/黄,均匀场景噪声映射过去是自然 - # 过渡色,从根本上消除 HOT 默认下的「静置偏红」。HOT/COOL 等仍在列 - # 表里,CMAP 按键可循环切换,用户需要 HOT 风格随时可切回。 - cmap_options = [] - for _name, _fid in [ - ("COLORMAP_TURBO", 20), - ("COLORMAP_HOT", 0), - ("COLORMAP_COOL", 1), - ("COLORMAP_DEEPGREEN", 15), - ("COLORMAP_MAGMA", 13), - ]: - entry = _safe_colormap(_name, _fid) - if entry: - cmap_options.append(entry) - - if not cmap_options: - logger.error("Error: No supported colormaps available.") - return - - cmap_idx = 0 - - def get_cv2_lut(idx): - _, cp = cmap_options[idx] - gray = np.arange(256, dtype=np.uint8).reshape(1, 256) - colored = cv2.applyColorMap(gray, cp) - return cv2.cvtColor(colored, cv2.COLOR_BGR2RGB).reshape(256, 3) - - cmap_abbrev = {"hot": "HOT", "cool": "COOL", "deepgreen": "DGRN", "magma": "MAGM", "turbo": "TRBO"} - - def redraw_cmap_btn() -> None: - name = cmap_options[cmap_idx][0] - label = cmap_abbrev.get(name, name[:5].upper()) - init_cmap_btn(label) - - lut = get_cv2_lut(cmap_idx) - color_buf = np.zeros((PMOD_H, PMOD_W, 3), dtype=np.uint8) - - disp_w, disp_h = disp.width(), disp.height() - # 双缓冲:两个物理缓冲轮替。当前帧写 buffer[i],disp.show 异步读取 - # 的同时下一帧写 buffer[i^1],不会与 DMA 读竞争。配合下面的 - # copy=False,省掉 cv2image 每帧~900KB 的内存拷贝。 - disp_buffers = [ - np.zeros((disp_h, disp_w, 3), dtype=np.uint8), - np.zeros((disp_h, disp_w, 3), dtype=np.uint8), - ] - db_idx = 0 - - buffer = bytearray() - skip = 0 - frame_count = 0 - - frame_timestamps = [] - fps_ema = 0.0 - - logger.info( - "System: Data pump and rendering pipeline successfully initialized." - ) - - capture_start = False while not app.need_exit(): x, y, pressed = ts.read() - if is_in_button(x, y, back_rect): + if pressed and is_in_button(x, y, back_rect): app.set_exit_flag(True) break if pressed and is_in_button(x, y, cmap_rect): cmap_idx = (cmap_idx + 1) % len(cmap_options) - lut = get_cv2_lut(cmap_idx) - redraw_cmap_btn() - logger.info(f"System: Switched to colormap {cmap_options[cmap_idx][0]}") + lut = get_cv2_lut(cmap_options[cmap_idx]) + img_cmap = current_cmap_button() time.sleep(0.2) - chunk = serial.read(UART_BUFFER_SIZE, SERIAL_TIMEOUT) + chunk = read_uart(serial, UART_BUFFER_SIZE, UART_READ_TIMEOUT_MS) if chunk: - chunk = bytes(chunk) - - if not chunk and not capture_start: - main_img = image.Image(disp.width(), disp.height(), image.Format.FMT_RGB888) - main_img.draw_rect( - 0, 0, disp.width(), disp.height(), image.COLOR_BLACK, -1 - ) - msg = "thermal160 device not found" - char_w = 10 * FONT_SCALE - char_h = 20 * FONT_SCALE - x_msg = int((disp.width() - len(msg) * char_w) // 2) - y_msg = int((disp.height() - char_h) // 2) - main_img.draw_string( - x_msg, y_msg, msg, image.COLOR_WHITE, scale=FONT_SCALE - ) - main_img.draw_image(0, 0, img_back) - if img_cmap is not None: - main_img.draw_image(disp_w - img_back_w, disp_h - img_back_h, img_cmap) - disp.show(main_img) - continue - if frame_count == 0: - logger.info("System: First data chunk received.") - buffer.extend(chunk) - - # ---- Drain-to-latest 同步策略 ---- - # 同 PC 端 thermocam_gui.exe 的等价行为:解析阶段把缓冲里所有合 - # 法帧全部消费掉,但只保留 *最后一帧* 用于渲染,丢弃中间帧。 - # 单线程下 disp.show + cv2 流水线一旦落后,串口侧会堆积多帧; - # 旧版"每解析一帧就渲染一帧"会把延迟越拉越大,缓冲越拉越长, - # 既导致屏幕卡顿、也提高了伪 0xFF 假同步的命中率(就是 output/ - # 里那张半幅错乱图的成因)。drain-to-latest 让每外循环最多一 - # 次重渲染,CPU 富余、缓冲不堆积、误同步概率断崖下降。 - latest_frame_data = None - latest_t_lo_x10 = INT16_MAX - latest_t_hi_x10 = INT16_MAX - while True: - idx = buffer.find(b"\xFF") - if idx == -1: - buffer.clear() - break - if len(buffer) - (idx + 1) < FRAME_SIZE: - if idx > 0: - del buffer[:idx] - break - - frame_data = bytes(buffer[idx + 1: idx + 1 + FRAME_SIZE]) - - # 帧对齐校验走 telemetry 合理性,而非"像素区内有 0xFF 就 - # 重同步"——后者会把固件 FFC 瞬态/饱和像素产生的合法 255 - # 也误判为失步。VTEMP 是 14-bit 大端(固件对每行累加前已 - # &0x3FFF),telemetry[0] 高 2 bit 恒为 0;t_lo/t_hi 是 - # °C×10 的物理量,落在合理区间且 t_hi≥t_lo。 - telemetry = frame_data[FRAME_PIXEL_SIZE:] - if telemetry[0] & 0xC0: - del buffer[:idx + 1] - continue - t_lo_x10 = struct.unpack_from(">h", telemetry, 2)[0] - t_hi_x10 = struct.unpack_from(">h", telemetry, 4)[0] - if t_lo_x10 != INT16_MAX and t_hi_x10 != INT16_MAX: - if not (-1000 <= t_lo_x10 <= 3200 and - -1000 <= t_hi_x10 <= 3200 and - t_hi_x10 >= t_lo_x10): - del buffer[:idx + 1] - continue - - # 帧间紧挨校验:协议里相邻两帧背靠背,下一帧的 0xFF 必须正 - # 好出现在本帧消费完的位置。这条 magic 能筛掉"FF + telemetry - # 凑巧合理 + 像素区中段被 UART RX FIFO 短暂溢出/固件脏数据 - # 污染"的坏帧——这种坏帧在 drain-to-latest 下会持续显示一 - # 整个 drain 周期,比单帧渲染时显眼得多(即 output/ 里上 1/3 - # 真实图像 + 下 2/3 椒盐噪声那张图的成因)。 - # 缓冲不够长时跳过此校验,留到下一轮 drain 再判。 - next_ff_pos = idx + 1 + FRAME_SIZE - if len(buffer) > next_ff_pos and buffer[next_ff_pos] != 0xFF: - del buffer[:idx + 1] - continue - - # 通过校验,推进缓冲并记账 - del buffer[:idx + 1 + FRAME_SIZE] - frame_count += 1 - - if skip <= SKIP_COUNT: - skip += 1 - continue - - # 关键:只保留缓冲里最后一帧用于渲染,旧帧直接丢弃 - latest_frame_data = frame_data - latest_t_lo_x10 = t_lo_x10 - latest_t_hi_x10 = t_hi_x10 - - if latest_frame_data is None: + buffer.extend(bytes(chunk)) + latest, skip, frame_count = drain_latest_frame(buffer, skip, frame_count) + else: + latest = None + + if latest is None: + if not capture_started: + now = time.time() + elapsed = now - t_start + if now - last_wait_draw >= 0.15: + draw_wait_page(disp, img_back, img_cmap, elapsed, elapsed >= STARTUP_GRACE_SEC) + last_wait_draw = now continue - # ===== 渲染最新一帧(每外循环最多一次 disp.show)===== - t_lo_x10 = latest_t_lo_x10 - t_hi_x10 = latest_t_hi_x10 - gray_np = np.frombuffer(latest_frame_data[:FRAME_PIXEL_SIZE], dtype=np.uint8).reshape(PMOD_H, PMOD_W) - gray_np = cv2.flip(gray_np, -1) + if not capture_started: + logger.info("System: First valid frame received.") + capture_started = True + + gray_np = cv2.flip(latest["pixels"], -1) + t_lo_x10 = latest["t_lo_x10"] + t_hi_x10 = latest["t_hi_x10"] has_temp = ( t_lo_x10 != INT16_MAX and t_hi_x10 != INT16_MAX and t_hi_x10 > t_lo_x10 ) - # 选中当前帧要写入的物理缓冲。另一个缓冲此刻可能仍被 - # 上一帧的 disp.show DMA 读取,不会被本帧写动。 disp_buffer = disp_buffers[db_idx] - if CMAP: - gray_blurred = cv2.GaussianBlur(gray_np, (3, 3), 1) - np.take(lut, gray_blurred, axis=0, out=color_buf) - cv2.resize(color_buf, (disp_w, disp_h), dst=disp_buffer, interpolation=cv2.INTER_LINEAR) - else: - gray_resized = cv2.resize(gray_np, (disp_w, disp_h), interpolation=cv2.INTER_LINEAR) - disp_buffer[:, :, 0] = gray_resized - disp_buffer[:, :, 1] = gray_resized - disp_buffer[:, :, 2] = gray_resized + gray_blurred = cv2.GaussianBlur(gray_np, (3, 3), 1) + np.take(lut, gray_blurred, axis=0, out=color_buf) + cv2.resize(color_buf, (disp_w, disp_h), dst=disp_buffer, interpolation=cv2.INTER_LINEAR) - # 双缓冲保护下安全启用 copy=False,省掉每帧 ~900KB 的 - # cv2image 内部 memcpy。竞态由缓冲轮替天然隔离。 img_disp = image.cv2image(disp_buffer, bgr=False, copy=False) - center_x = disp_w // 2 center_y = disp_h // 2 draw_cross(img_disp, center_x, center_y, image.COLOR_WHITE, 14) @@ -374,7 +422,6 @@ def redraw_cmap_btn() -> None: t_lo = t_lo_x10 / 10.0 t_hi = t_hi_x10 / 10.0 temp_img = pixels_to_temp(gray_np, t_lo, t_hi) - min_idx = np.argmin(temp_img) max_idx = np.argmax(temp_img) min_y, min_x = np.unravel_index(min_idx, temp_img.shape) @@ -390,45 +437,27 @@ def redraw_cmap_btn() -> None: img_disp, min(max(center_x + 18, 8), disp_w - 96), min(max(center_y - 18, 8), disp_h - 24), - f"{center_temp:.1f}C", + "%.1fC" % center_temp, image.COLOR_WHITE, scale=CENTER_TEMP_SCALE, ) - text_x = 8 - text_y = disp_h - 82 - draw_label( - img_disp, - text_x, - text_y, - f"MAX {temp_img[max_y, max_x]:.1f}C", - image.COLOR_RED, - scale=CORNER_TEMP_SCALE, - ) - draw_label( - img_disp, - text_x, - text_y + 26, - f"MIN {temp_img[min_y, min_x]:.1f}C", - image.COLOR_BLUE, - scale=CORNER_TEMP_SCALE, - ) - draw_label( - img_disp, - text_x, - text_y + 52, - f"RNG {t_lo:.1f}~{t_hi:.1f}C", - image.COLOR_WHITE, - scale=0.85, - ) + text_y = disp_h - 104 + draw_label(img_disp, text_x, text_y, "MAX %.1fC" % temp_img[max_y, max_x], + image.COLOR_RED, scale=CORNER_TEMP_SCALE) + draw_label(img_disp, text_x, text_y + 24, "MIN %.1fC" % temp_img[min_y, min_x], + image.COLOR_BLUE, scale=CORNER_TEMP_SCALE) + draw_label(img_disp, text_x, text_y + 48, "RNG %.1f~%.1fC" % (t_lo, t_hi), + image.COLOR_WHITE, scale=0.85) + draw_label(img_disp, text_x, text_y + 72, nuc_status_text(latest), + image.COLOR_WHITE, scale=0.65) + else: + draw_label(img_disp, 8, disp_h - 32, "UNCALIBRATED", image.COLOR_WHITE, scale=0.9) img_disp.draw_image(0, 0, img_back) - if img_cmap is not None: - img_disp.draw_image(disp_w - img_back_w, disp_h - img_back_h, img_cmap) - - capture_start = True + img_disp.draw_image(disp_w - img_back_w, disp_h - img_back_h, img_cmap) disp.show(img_disp) - db_idx ^= 1 # 切到另一个物理缓冲供下一帧写入 + db_idx ^= 1 current_time = time.time() frame_timestamps.append(current_time) @@ -437,22 +466,21 @@ def redraw_cmap_btn() -> None: if len(frame_timestamps) > 1: window_duration = frame_timestamps[-1] - frame_timestamps[0] if window_duration > 0: - window_fps = ((len(frame_timestamps) - 1) / window_duration) + window_fps = (len(frame_timestamps) - 1) / window_duration if fps_ema == 0.0: fps_ema = window_fps else: - fps_ema = ((EMA_ALPHA * window_fps) + ((1.0 - EMA_ALPHA) * fps_ema)) + fps_ema = EMA_ALPHA * window_fps + (1.0 - EMA_ALPHA) * fps_ema if frame_count % 10 == 0: - osd_text = f"FPS: {fps_ema:.2f}" - logger.info(osd_text) + logger.info("FPS: %.2f", fps_ema) - except Exception as e: - logger.error(f"Fatal: Unhandled pipeline exception: {str(e)}") + except Exception as exc: + logger.error("Fatal: %s", exc) raise finally: if serial is not None: serial.close() - logger.info("System: UART resource securely released via interrupt vector.") + logger.info("System: UART closed.") if __name__ == "__main__": diff --git a/projects/maix-thermal160_camera_history/README.md b/projects/maix-thermal160_camera_history/README.md new file mode 100644 index 00000000..163df0bd --- /dev/null +++ b/projects/maix-thermal160_camera_history/README.md @@ -0,0 +1,51 @@ +# Thermal160 历史记录 + +这是一个独立的 MaixCam2 应用包,用于长时间记录 Thermal160 运行温度。 + +## 功能 + +- 通过 MaixCam2 `/dev/ttyS2` 读取 Thermal160 UART 数据。 +- 按 1 秒间隔采样温度历史,适合 24 小时运行观察。 +- 持续保存 `output/history_*.csv`,避免长测中途断电丢失全部数据。 +- 屏幕显示实时预览、当前温度和最近 30 分钟趋势。 +- 点击右上角 `SAVE` 保存完整历史趋势图 PNG。 +- 退出程序时,如果样本足够,会自动再保存一次 PNG。 +- 启动 UART 前通过 MaixCAM2 `A9` / `GPIOA9` 给 Thermal160 发送一次软件复位脉冲。 + +## 软件复位 + +MaixCam2 电池供电时,Thermal160 可能不会自动上电/复位到可通信状态。程序启动时会执行: + +1. `pinmap.set_pin_function("A9", "GPIOA9")` +2. `GPIOA9` 默认释放为高电平 +3. 拉低约 `120ms` +4. 拉高释放并等待约 `400ms` + +当前按“低有效复位”实现。如果硬件接法是高有效,需要在代码顶部对调: + +```python +THERMAL_RESET_IDLE_LEVEL = 0 +THERMAL_RESET_ACTIVE_LEVEL = 1 +``` + +## 输出文件 + +- MaixCam2 安装运行时目录:`/maixapp/apps/thermal160_camera_history/output/` +- CSV:`/maixapp/apps/thermal160_camera_history/output/history_YYYYmmdd_HHMMSS.csv` +- PNG:`/maixapp/apps/thermal160_camera_history/output/trend_YYYYmmdd_HHMMSS.png` + +## 协议 + +程序按当前 Thermal160 监控帧解析: + +- 帧头:`0xFF` +- 像素:`160 * 120 = 19200` 字节 +- telemetry:30 字节 +- 总长度:`1 + 19200 + 30 = 19231` 字节 + +## 注意 + +- 这是历史记录应用,不替代实时热成像主应用。 +- 需要在 MaixCam2 上实测 A9 复位、UART、触屏和 PNG 保存。 +- 如果超过 3 秒仍没有合法帧,界面会显示 `thermal160 device not found`。 +- A9 只能作为复位控制信号,不要用 MaixCAM2 GPIO 直接给 Thermal160 供电。 diff --git a/projects/maix-thermal160_camera_history/README_EN.md b/projects/maix-thermal160_camera_history/README_EN.md new file mode 100644 index 00000000..01d77bc0 --- /dev/null +++ b/projects/maix-thermal160_camera_history/README_EN.md @@ -0,0 +1,51 @@ +# Thermal160 History Recorder + +This is a standalone MaixCam2 app package for long-running Thermal160 temperature logging. + +## Features + +- Reads Thermal160 UART frames from MaixCam2 `/dev/ttyS2`. +- Samples temperature history once per second for 24-hour observation. +- Continuously writes `output/history_*.csv` so data is not lost if the run is interrupted. +- Shows live preview, current temperature values, and a rolling 30-minute trend. +- Tap `SAVE` to save a full-history PNG trend chart. +- Saves one more PNG on exit when enough samples are available. +- Sends one Thermal160 software reset pulse through MaixCAM2 `A9` / `GPIOA9` before opening UART. + +## Software Reset + +When MaixCAM2 is powered from battery, Thermal160 may not automatically enter a usable powered/reset state. On startup, the app does: + +1. `pinmap.set_pin_function("A9", "GPIOA9")` +2. Release `GPIOA9` high by default +3. Pull it low for about `120ms` +4. Release it high and wait about `400ms` + +The current implementation assumes an active-low reset. If your hardware reset is active-high, swap these constants: + +```python +THERMAL_RESET_IDLE_LEVEL = 0 +THERMAL_RESET_ACTIVE_LEVEL = 1 +``` + +## Output + +- MaixCam2 installed app output directory: `/maixapp/apps/thermal160_camera_history/output/` +- CSV: `/maixapp/apps/thermal160_camera_history/output/history_YYYYmmdd_HHMMSS.csv` +- PNG: `/maixapp/apps/thermal160_camera_history/output/trend_YYYYmmdd_HHMMSS.png` + +## Frame Format + +The app uses the current Thermal160 monitor frame format: + +- Header: `0xFF` +- Pixels: `160 * 120 = 19200` bytes +- Telemetry: 30 bytes +- Total size: `1 + 19200 + 30 = 19231` bytes + +## Notes + +- This is a history recorder app, not a replacement for the live thermal camera app. +- A9 reset, UART, touchscreen, and PNG saving still need validation on MaixCam2 hardware. +- If no valid frame is received after 3 seconds, the UI shows `thermal160 device not found`. +- A9 is only a reset control signal. Do not power Thermal160 directly from a MaixCAM2 GPIO. diff --git a/projects/maix-thermal160_camera_history/app.yaml b/projects/maix-thermal160_camera_history/app.yaml new file mode 100644 index 00000000..36a4c1c3 --- /dev/null +++ b/projects/maix-thermal160_camera_history/app.yaml @@ -0,0 +1,18 @@ +id: thermal160_camera_history +name: Thermal160 History +name[zh]: 热成像160历史记录 +version: 1.0.0 +icon: assets/thermal.json +author: Sipeed Ltd +desc: 24-hour Thermal160 temperature history recorder +desc[zh]: Thermal160 24小时温度历史记录 +include: + - assets/thermal.json + - app.yaml + - main.py +files: + - assets\thermal.json + - app.yaml + - main.py + - README_EN.md + - README.md diff --git a/projects/maix-thermal160_camera_history/assets/thermal.json b/projects/maix-thermal160_camera_history/assets/thermal.json new file mode 100644 index 00000000..9e0597a4 --- /dev/null +++ b/projects/maix-thermal160_camera_history/assets/thermal.json @@ -0,0 +1 @@ +{"v":"4.8.0","meta":{"g":"LottieFiles AE 1.0.0","a":"Bas Milius","k":"Meteocons, Weather icons, Icon set","d":"Thermometer - Meteocons.com","tc":""},"fr":60,"ip":0,"op":360,"w":512,"h":512,"nm":"thermometer","ddd":0,"assets":[],"layers":[{"ddd":0,"ind":1,"ty":4,"nm":"thermometer","sr":1,"ks":{"o":{"a":0,"k":100,"ix":11},"r":{"a":0,"k":0,"ix":10},"p":{"a":0,"k":[256,256,0],"ix":2},"a":{"a":0,"k":[256,256,0],"ix":1},"s":{"a":0,"k":[100,100,100],"ix":6}},"ao":0,"shapes":[{"ty":"gr","it":[{"ty":"gr","it":[{"ind":0,"ty":"sh","ix":1,"ks":{"a":0,"k":{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[4,-24],[32,-24]],"c":false},"ix":2},"nm":"Path 1","mn":"ADBE Vector Shape - Group","hd":false},{"ind":1,"ty":"sh","ix":2,"ks":{"a":0,"k":{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[4,-88],[32,-88]],"c":false},"ix":2},"nm":"Path 2","mn":"ADBE Vector Shape - Group","hd":false},{"ind":2,"ty":"sh","ix":3,"ks":{"a":0,"k":{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[4,-56],[32,-56]],"c":false},"ix":2},"nm":"Path 3","mn":"ADBE Vector Shape - Group","hd":false},{"ind":3,"ty":"sh","ix":4,"ks":{"a":0,"k":{"i":[[0,-19.305],[30.928,0],[0,31.389],[-14.496,10.272],[0,0],[-17.673,0],[0,-17.937],[0,0]],"o":[[0,31.389],[-30.928,0],[0,-19.305],[0,0],[0,-17.937],[17.673,0],[0,0],[14.496,10.272]],"v":[[56,79.164],[0,136],[-56,79.164],[-32,32.559],[-32,-103.522],[0,-136],[32,-103.522],[32,32.559]],"c":true},"ix":2},"nm":"Path 4","mn":"ADBE Vector Shape - Group","hd":false},{"ty":"st","c":{"a":0,"k":[0.796078443527,0.835294127464,0.882352948189,1],"ix":3},"o":{"a":0,"k":100,"ix":4},"w":{"a":0,"k":6,"ix":5},"lc":2,"lj":2,"bm":0,"nm":"Stroke 1","mn":"ADBE Vector Graphic - Stroke","hd":false},{"ty":"tr","p":{"a":0,"k":[0,0],"ix":2},"a":{"a":0,"k":[0,0],"ix":1},"s":{"a":0,"k":[100,100],"ix":3},"r":{"a":0,"k":0,"ix":6},"o":{"a":0,"k":100,"ix":7},"sk":{"a":0,"k":0,"ix":4},"sa":{"a":0,"k":0,"ix":5},"nm":"Transform"}],"nm":"Group 1","np":5,"cix":2,"bm":0,"ix":1,"mn":"ADBE Vector Group","hd":false},{"ty":"tr","p":{"a":0,"k":[256,256],"ix":2},"a":{"a":0,"k":[0,0],"ix":1},"s":{"a":0,"k":[100,100],"ix":3},"r":{"a":0,"k":0,"ix":6},"o":{"a":0,"k":100,"ix":7},"sk":{"a":0,"k":0,"ix":4},"sa":{"a":0,"k":0,"ix":5},"nm":"Transform"}],"nm":"thermometer-glass","np":1,"cix":2,"bm":0,"ix":1,"mn":"ADBE Vector Group","hd":false},{"ty":"gr","it":[{"ind":0,"ty":"sh","ix":1,"ks":{"a":1,"k":[{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":0,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.798,213],[255.798,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":30,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.985,236],[255.985,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":60,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.798,213],[255.798,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":90,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.985,236],[255.985,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":120,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.798,213],[255.798,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":150,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.985,236],[255.985,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":180,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.798,213],[255.798,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":210,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.985,236],[255.985,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":240,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.798,213],[255.798,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":270,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.985,236],[255.985,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":300,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.798,213],[255.798,336]],"c":false}]},{"i":{"x":0.667,"y":1},"o":{"x":0.333,"y":0},"t":330,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.985,236],[255.985,336]],"c":false}]},{"t":359,"s":[{"i":[[0,0],[0,0]],"o":[[0,0],[0,0]],"v":[[255.798,213],[255.798,336]],"c":false}]}],"ix":2},"nm":"Path 1","mn":"ADBE Vector Shape - Group","hd":false},{"ty":"st","c":{"a":0,"k":[0.937254905701,0.266666680574,0.266666680574,1],"ix":3},"o":{"a":0,"k":100,"ix":4},"w":{"a":0,"k":24,"ix":5},"lc":2,"lj":1,"ml":10,"bm":0,"nm":"Stroke 1","mn":"ADBE Vector Graphic - Stroke","hd":false},{"ind":2,"ty":"sh","ix":3,"ks":{"a":0,"k":{"i":[[0,19.882],[19.882,0],[0,-19.882],[-19.882,0]],"o":[[0,-19.882],[-19.882,0],[0,19.882],[19.882,0]],"v":[[292,336],[256,300],[220,336],[256,372]],"c":true},"ix":2},"nm":"Path 2","mn":"ADBE Vector Shape - Group","hd":false},{"ty":"fl","c":{"a":0,"k":[0.937254905701,0.266666680574,0.266666680574,1],"ix":4},"o":{"a":0,"k":100,"ix":5},"r":1,"bm":0,"nm":"Fill 1","mn":"ADBE Vector Graphic - Fill","hd":false},{"ty":"tr","p":{"a":0,"k":[256,336],"ix":2},"a":{"a":0,"k":[256,336],"ix":1},"s":{"a":0,"k":[100,100],"ix":3},"r":{"a":0,"k":0,"ix":6},"o":{"a":0,"k":100,"ix":7},"sk":{"a":0,"k":0,"ix":4},"sa":{"a":0,"k":0,"ix":5},"nm":"Transform"}],"nm":"thermometer-mercury","np":4,"cix":2,"bm":0,"ix":2,"mn":"ADBE Vector Group","hd":false}],"ip":0,"op":360,"st":0,"bm":0}],"markers":[]} \ No newline at end of file diff --git a/projects/maix-thermal160_camera_history/main.py b/projects/maix-thermal160_camera_history/main.py new file mode 100644 index 00000000..8d2cdfd1 --- /dev/null +++ b/projects/maix-thermal160_camera_history/main.py @@ -0,0 +1,878 @@ +import logging +import math +import os +import struct +import time +from array import array +from typing import Tuple + +import cv2 +import numpy as np +from maix import app, display, gpio, image, pinmap, touchscreen +from maix.peripheral import uart +from maix.sys import device_name + + +PMOD_W = 160 +PMOD_H = 120 +FRAME_PIXEL_SIZE = PMOD_W * PMOD_H +FRAME_TAIL_SIZE = 30 +FRAME_BODY_SIZE = FRAME_PIXEL_SIZE + FRAME_TAIL_SIZE +INT16_MAX = 0x7FFF + +BAUDRATE_INIT = 2000000 +BAUDRATE_HIGH = 4000000 +UART_BUFFER_SIZE = 32768 +UART_READ_TIMEOUT_MS = 10 + +SKIP_COUNT = 10 +STARTUP_GRACE_SEC = 3.0 +SAMPLE_INTERVAL_SEC = 1.0 +DISPLAY_REFRESH_SEC = 0.25 +LIVE_WINDOW_SEC = 30.0 * 60.0 +RUN_TARGET_SEC = 24.0 * 3600.0 + +APP_DIR = os.path.dirname(os.path.abspath(__file__)) if "__file__" in globals() else os.getcwd() +OUTPUT_DIR = os.path.join(APP_DIR, "output") +CSV_FLUSH_EVERY = 10 + +THERMAL_RESET_PIN = "A9" +THERMAL_RESET_GPIO = "GPIOA9" +THERMAL_RESET_IDLE_LEVEL = 1 +THERMAL_RESET_ACTIVE_LEVEL = 0 +THERMAL_RESET_ASSERT_SEC = 0.12 +THERMAL_RESET_RELEASE_DELAY_SEC = 0.40 + +RGB_BLACK = (0, 0, 0) +RGB_BG = (14, 16, 20) +RGB_PANEL = (28, 31, 38) +RGB_GRID = (62, 68, 78) +RGB_WHITE = (235, 238, 242) +RGB_MUTED = (148, 156, 168) +RGB_RED = (255, 120, 120) +RGB_GREEN = (125, 210, 145) +RGB_CYAN = (95, 205, 245) +RGB_YELLOW = (245, 210, 95) +RGB_BLUE = (100, 150, 255) + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) +_thermal_reset_gpio = None + + +def is_in_button(x: int, y: int, btn_pos: Tuple[int, int, int, int]) -> bool: + return (btn_pos[0] < x < btn_pos[0] + btn_pos[2] and + btn_pos[1] < y < btn_pos[1] + btn_pos[3]) + + +def ensure_dir(path: str) -> None: + try: + os.makedirs(path, exist_ok=True) + except TypeError: + if not os.path.exists(path): + os.makedirs(path) + except OSError: + pass + + +def stamp() -> str: + try: + return time.strftime("%Y%m%d_%H%M%S") + except Exception: + return str(int(time.time())) + + +def finite(v) -> bool: + try: + return v == v and abs(v) != float("inf") + except Exception: + return False + + +def fmt_float(v, digits: int = 2) -> str: + if not finite(v): + return "nan" + return ("%." + str(digits) + "f") % v + + +class HardwareHAL: + _PORT_REGISTRY = { + "MaixCAM2": "/dev/ttyS2", + "MaixCAM": None, + "MaixCAM-Pro": None, + } + _device = "" + + @classmethod + def serial_port(cls) -> str: + dn = device_name() + if not isinstance(dn, str) or not dn.strip(): + raise TypeError("Invalid device identifier received: '%s'" % dn) + port = cls._PORT_REGISTRY.get(dn) + if port is None: + raise RuntimeError("Platform mismatch: Device '%s' is not supported" % dn) + cls._device = dn + return port + + +def pulse_thermal_reset() -> None: + global _thermal_reset_gpio + if HardwareHAL._device != "MaixCAM2": + return + try: + ret = pinmap.set_pin_function(THERMAL_RESET_PIN, THERMAL_RESET_GPIO) + if ret != 0: + logger.warning( + "Thermal160 reset pinmap returned %s for %s -> %s", + ret, THERMAL_RESET_PIN, THERMAL_RESET_GPIO, + ) + _thermal_reset_gpio = gpio.GPIO(THERMAL_RESET_GPIO, gpio.Mode.OUT) + _thermal_reset_gpio.value(THERMAL_RESET_IDLE_LEVEL) + time.sleep(0.02) + _thermal_reset_gpio.value(THERMAL_RESET_ACTIVE_LEVEL) + time.sleep(THERMAL_RESET_ASSERT_SEC) + _thermal_reset_gpio.value(THERMAL_RESET_IDLE_LEVEL) + time.sleep(THERMAL_RESET_RELEASE_DELAY_SEC) + logger.info("Thermal160 reset pulse sent on %s", THERMAL_RESET_PIN) + except Exception as exc: + logger.warning("Thermal160 reset pulse skipped: %s", exc) + + +def open_thermal_uart(): + devices = uart.list_devices() + if not devices: + raise RuntimeError("No available UART devices found") + + port_name = HardwareHAL.serial_port() + pulse_thermal_reset() + serial = uart.UART(port=port_name, baudrate=BAUDRATE_INIT) + if HardwareHAL._device == "MaixCAM2": + serial.write(b"\x44") + serial.close() + time.sleep(0.1) + serial = uart.UART(port=port_name, baudrate=BAUDRATE_HIGH) + return serial + + +def read_uart(serial, size: int, timeout_ms: int): + try: + return serial.read(size, timeout=timeout_ms) + except TypeError: + return serial.read(size, timeout_ms) + + +def telemetry_plausible(body: bytes) -> bool: + if len(body) != FRAME_BODY_SIZE: + return False + telemetry = body[FRAME_PIXEL_SIZE:] + raw_vtemp = struct.unpack_from(">H", telemetry, 0)[0] + if raw_vtemp & 0xC000: + return False + + t_lo_x10 = struct.unpack_from(">h", telemetry, 2)[0] + t_hi_x10 = struct.unpack_from(">h", telemetry, 4)[0] + if t_lo_x10 == INT16_MAX and t_hi_x10 == INT16_MAX: + return True + if not (-1000 <= t_lo_x10 <= 3200): + return False + if not (-1000 <= t_hi_x10 <= 3200): + return False + if t_hi_x10 < t_lo_x10: + return False + return True + + +def parse_frame_body(body: bytes): + telemetry = body[FRAME_PIXEL_SIZE:] + pixels = np.frombuffer(body[:FRAME_PIXEL_SIZE], dtype=np.uint8).reshape(PMOD_H, PMOD_W) + return { + "pixels": pixels, + "vtemp": struct.unpack_from(">H", telemetry, 0)[0] & 0x3FFF, + "t_lo_x10": struct.unpack_from(">h", telemetry, 2)[0], + "t_hi_x10": struct.unpack_from(">h", telemetry, 4)[0], + "anchor": struct.unpack_from(">i", telemetry, 6)[0], + "smooth_low": struct.unpack_from(">H", telemetry, 10)[0], + "smooth_high": struct.unpack_from(">H", telemetry, 12)[0], + "mean_diff": struct.unpack_from(">f", telemetry, 14)[0], + "ntc_ref": struct.unpack_from(">H", telemetry, 18)[0], + "ntc": struct.unpack_from(">H", telemetry, 20)[0], + "nuc_decision": telemetry[22], + "nuc_count": telemetry[23], + "nuc_dg": struct.unpack_from(">h", telemetry, 24)[0], + "nuc_dv": struct.unpack_from(">h", telemetry, 26)[0], + "nuc_dn": struct.unpack_from(">h", telemetry, 28)[0], + } + + +def drain_latest_frame(buffer: bytearray, skip: int, frame_count: int): + latest = None + while True: + idx = buffer.find(b"\xFF") + if idx < 0: + buffer.clear() + break + + end = idx + 1 + FRAME_BODY_SIZE + if len(buffer) < end: + if idx > 0: + del buffer[:idx] + break + + body = bytes(buffer[idx + 1:end]) + if not telemetry_plausible(body): + del buffer[:idx + 1] + continue + + if len(buffer) > end and buffer[end] != 0xFF: + del buffer[:idx + 1] + continue + + del buffer[:end] + frame_count += 1 + if skip <= SKIP_COUNT: + skip += 1 + continue + latest = parse_frame_body(body) + + return latest, skip, frame_count + + +def safe_colormap(name: str, fallback_id: int): + cp = getattr(cv2, name, fallback_id) + test = np.arange(256, dtype=np.uint8).reshape(1, 256) + try: + cv2.applyColorMap(test, cp) + return cp + except Exception: + return None + + +def build_lut(): + cp = safe_colormap("COLORMAP_TURBO", 20) + if cp is None: + cp = safe_colormap("COLORMAP_HOT", 0) + if cp is None: + raise RuntimeError("No supported OpenCV colormap") + gray = np.arange(256, dtype=np.uint8).reshape(1, 256) + colored = cv2.applyColorMap(gray, cp) + return cv2.cvtColor(colored, cv2.COLOR_BGR2RGB).reshape(256, 3) + + +def fpa_to_celsius(vtemp: int) -> float: + if vtemp == 0: + return float("nan") + return 25.0 + (vtemp - 8192) / 70.0 + + +def ntc_adu_to_celsius(adc: int) -> float: + if adc <= 0 or adc >= 4095: + return float("nan") + r_ntc = 10000.0 * adc / (4095.0 - adc) + inv_t = 1.0 / 298.15 + math.log(r_ntc / 10000.0) / 3435.0 + return 1.0 / inv_t - 273.15 + + +def u8_to_temp(u8_value: int, t_lo: float, t_hi: float) -> float: + return t_lo + (t_hi - t_lo) * min(max(float(u8_value) / 254.0, 0.0), 1.0) + + +def frame_to_sample(frame, elapsed_s: float): + t_lo_x10 = frame["t_lo_x10"] + t_hi_x10 = frame["t_hi_x10"] + calibrated = (t_lo_x10 != INT16_MAX and + t_hi_x10 != INT16_MAX and + t_hi_x10 > t_lo_x10) + + t_lo = float("nan") + t_hi = float("nan") + scene_mid = float("nan") + center = float("nan") + min_temp = float("nan") + max_temp = float("nan") + + if calibrated: + t_lo = t_lo_x10 / 10.0 + t_hi = t_hi_x10 / 10.0 + scene_mid = (t_lo + t_hi) / 2.0 + pixels = frame["pixels"] + center = u8_to_temp(int(pixels[PMOD_H // 2, PMOD_W // 2]), t_lo, t_hi) + min_temp = u8_to_temp(int(pixels.min()), t_lo, t_hi) + max_temp = u8_to_temp(int(pixels.max()), t_lo, t_hi) + + return { + "t": elapsed_s, + "scene_mid": scene_mid, + "center": center, + "min": min_temp, + "max": max_temp, + "fpa": fpa_to_celsius(frame["vtemp"]), + "ntc": ntc_adu_to_celsius(frame["ntc"]), + "t_lo": t_lo, + "t_hi": t_hi, + "vtemp": frame["vtemp"], + "ntc_adu": frame["ntc"], + "anchor": frame["anchor"], + "smooth_low": frame["smooth_low"], + "smooth_high": frame["smooth_high"], + "mean_diff": frame["mean_diff"], + "nuc_decision": frame["nuc_decision"], + "nuc_count": frame["nuc_count"], + "nuc_dg": frame["nuc_dg"], + "nuc_dv": frame["nuc_dv"], + "nuc_dn": frame["nuc_dn"], + } + + +class HistoryStore: + def __init__(self): + self.t = array("f") + self.scene_mid = array("f") + self.center = array("f") + self.fpa = array("f") + self.ntc = array("f") + self.min = array("f") + self.max = array("f") + + def append(self, sample) -> None: + self.t.append(float(sample["t"])) + self.scene_mid.append(float(sample["scene_mid"])) + self.center.append(float(sample["center"])) + self.fpa.append(float(sample["fpa"])) + self.ntc.append(float(sample["ntc"])) + self.min.append(float(sample["min"])) + self.max.append(float(sample["max"])) + + def __len__(self) -> int: + return len(self.t) + + +class CsvLogger: + def __init__(self, out_dir: str): + ensure_dir(out_dir) + self.path = os.path.join(out_dir, "history_%s.csv" % stamp()) + self._file = open(self.path, "w") + self._count = 0 + self._file.write( + "elapsed_s,scene_mid_c,center_c,min_c,max_c,fpa_c,ntc_c," + "t_lo_c,t_hi_c,vtemp_adu,ntc_adu,anchor,smooth_low,smooth_high," + "mean_diff,nuc_decision,nuc_count,nuc_dg,nuc_dv,nuc_dn\n" + ) + self._file.flush() + + def write(self, sample) -> None: + row = [ + fmt_float(sample["t"], 3), + fmt_float(sample["scene_mid"], 3), + fmt_float(sample["center"], 3), + fmt_float(sample["min"], 3), + fmt_float(sample["max"], 3), + fmt_float(sample["fpa"], 3), + fmt_float(sample["ntc"], 3), + fmt_float(sample["t_lo"], 3), + fmt_float(sample["t_hi"], 3), + str(sample["vtemp"]), + str(sample["ntc_adu"]), + str(sample["anchor"]), + str(sample["smooth_low"]), + str(sample["smooth_high"]), + fmt_float(sample["mean_diff"], 3), + str(sample["nuc_decision"]), + str(sample["nuc_count"]), + str(sample["nuc_dg"]), + str(sample["nuc_dv"]), + str(sample["nuc_dn"]), + ] + self._file.write(",".join(row) + "\n") + self._count += 1 + if self._count % CSV_FLUSH_EVERY == 0: + self._file.flush() + + def close(self) -> None: + self._file.flush() + self._file.close() + + +def draw_text(canvas, text: str, x: int, y: int, color=RGB_WHITE, scale: float = 0.45, thickness: int = 1): + cv2.putText(canvas, text, (int(x), int(y)), cv2.FONT_HERSHEY_SIMPLEX, + scale, color, thickness) + + +def text_size(text: str, scale: float = 0.45, thickness: int = 1): + size, baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, thickness) + return size[0], size[1], baseline + + +def draw_centered_text(canvas, text: str, rect, color=RGB_WHITE, + scale: float = 0.45, thickness: int = 1): + x, y, w, h = rect + tw, th, _ = text_size(text, scale, thickness) + tx = x + max(0, (w - tw) // 2) + ty = y + max(th + 1, (h + th) // 2 - 1) + draw_text(canvas, text, tx, ty, color, scale, thickness) + + +def latest_valid(values): + for i in range(len(values) - 1, -1, -1): + v = values[i] + if finite(v): + return v + return float("nan") + + +def compact_text(text: str, max_chars: int) -> str: + if not text or max_chars <= 0: + return "" + if len(text) <= max_chars: + return text + if max_chars <= 3: + return text[:max_chars] + return text[:max_chars - 3] + "..." + + +def fit_text_width(text: str, max_width: int, scale: float = 0.45, thickness: int = 1) -> str: + if not text or max_width <= 0: + return "" + if text_size(text, scale, thickness)[0] <= max_width: + return text + suffix = "..." + for length in range(len(text) - 1, 0, -1): + candidate = text[:length] + suffix + if text_size(candidate, scale, thickness)[0] <= max_width: + return candidate + return "" + + +def compact_saved_msg(saved_msg: str, max_chars: int = 28) -> str: + if not saved_msg: + return "" + if saved_msg.startswith("saved "): + saved_msg = "saved " + os.path.basename(saved_msg[6:]) + return compact_text(saved_msg, max_chars) + + +def compact_status(status: str) -> str: + if status.startswith("recording "): + return "REC " + status[10:] + if status.startswith("initializing thermal160"): + return "INIT" + status[len("initializing thermal160"):] + if status == "thermal160 device not found": + return "DEVICE NOT FOUND" + return status + + +def draw_hud_corners(canvas, rect): + x0, y0, w, h = rect + n = max(10, min(20, w // 8, h // 4)) + for sx, sy in ((x0, y0), (x0 + w, y0), (x0, y0 + h), (x0 + w, y0 + h)): + dx = 1 if sx == x0 else -1 + dy = 1 if sy == y0 else -1 + cv2.line(canvas, (sx, sy), (sx + dx * n, sy), RGB_CYAN, 1) + cv2.line(canvas, (sx, sy), (sx, sy + dy * n), RGB_CYAN, 1) + + +def finite_range_multi(series_list, start: int, end: int): + lo = None + hi = None + for values in series_list: + for i in range(start, end): + v = values[i] + if not finite(v): + continue + if lo is None or v < lo: + lo = v + if hi is None or v > hi: + hi = v + if lo is None: + return None + if hi <= lo: + lo -= 0.5 + hi += 0.5 + else: + pad = max(0.3, (hi - lo) * 0.10) + lo -= pad + hi += pad + return lo, hi + + +def nice_ticks(lo: float, hi: float, count: int = 4): + if not finite(lo) or not finite(hi) or hi <= lo: + return [] + if count <= 1: + return [lo] + return [lo + (hi - lo) * i / (count - 1) for i in range(count)] + + +def x_label(seconds: float) -> str: + if seconds < 3600.0: + return "%.0fm" % (seconds / 60.0) + return "%.1fh" % (seconds / 3600.0) + + +def draw_axis_ticks(canvas, rect, t0: float, t1: float, left_rng, right_rng): + x0, y0, w, h = rect + for k in range(5): + x = x0 + int(w * k / 4) + cv2.line(canvas, (x, y0), (x, y0 + h), RGB_GRID, 1) + label_t = t0 + (t1 - t0) * k / 4 + draw_text(canvas, x_label(label_t), x - 14, y0 + h + 16, RGB_MUTED, 0.34) + + for tick in nice_ticks(left_rng[0], left_rng[1], 4): + y = y0 + h - 1 - int((tick - left_rng[0]) / (left_rng[1] - left_rng[0]) * (h - 1)) + cv2.line(canvas, (x0, y), (x0 + w, y), RGB_GRID, 1) + draw_text(canvas, fmt_float(tick, 1), max(0, x0 - 42), y + 4, RGB_RED, 0.34) + + if right_rng is not None: + for tick in nice_ticks(right_rng[0], right_rng[1], 4): + y = y0 + h - 1 - int((tick - right_rng[0]) / (right_rng[1] - right_rng[0]) * (h - 1)) + draw_text(canvas, fmt_float(tick, 1), x0 + w + 6, y + 4, RGB_GREEN, 0.34) + + +def draw_series(canvas, rect, times, values, start: int, end: int, + value_range, color, max_points: int): + if value_range is None: + return + x0, y0, w, h = rect + t0 = times[start] + t1 = times[end - 1] + if t1 <= t0: + t1 = t0 + 1.0 + v0, v1 = value_range + step = max(1, int((end - start) / max_points)) + + prev = None + for i in range(start, end, step): + v = values[i] + if not finite(v): + prev = None + continue + x = x0 + int((times[i] - t0) / (t1 - t0) * (w - 1)) + y = y0 + h - 1 - int((v - v0) / (v1 - v0) * (h - 1)) + pt = (x, y) + if prev is not None: + cv2.line(canvas, prev, pt, color, 1) + prev = pt + + +def draw_chart(canvas, rect, history: HistoryStore, window_sec, max_points: int): + x0, y0, w, h = rect + cv2.rectangle(canvas, (x0, y0), (x0 + w, y0 + h), RGB_PANEL, -1) + cv2.rectangle(canvas, (x0, y0), (x0 + w, y0 + h), RGB_GRID, 1) + + if len(history) < 2: + draw_text(canvas, "Waiting for samples...", x0 + 10, y0 + h // 2, RGB_MUTED, 0.55) + return + + end = len(history) + start = 0 + if window_sec is not None: + threshold = history.t[end - 1] - window_sec + while start < end - 1 and history.t[start] < threshold: + start += 1 + + if end - start < 2: + draw_text(canvas, "Waiting for samples...", x0 + 10, y0 + h // 2, RGB_MUTED, 0.55) + return + + left_rng = finite_range_multi([history.fpa, history.ntc], start, end) + right_rng = finite_range_multi([history.scene_mid, history.center], start, end) + if left_rng is None and right_rng is None: + draw_text(canvas, "Waiting for valid temperature samples...", x0 + 10, y0 + h // 2, RGB_MUTED, 0.55) + return + if left_rng is None: + left_rng = right_rng + + plot_rect = (x0 + 48, y0 + 30, w - 96, h - 62) + px, py, pw, ph = plot_rect + cv2.rectangle(canvas, (px, py), (px + pw, py + ph), RGB_BLACK, -1) + + t0 = history.t[start] + t1 = history.t[end - 1] + if t1 <= t0: + t1 = t0 + 1.0 + draw_axis_ticks(canvas, plot_rect, t0, t1, left_rng, right_rng) + + draw_series(canvas, plot_rect, history.t, history.fpa, start, end, left_rng, RGB_RED, max_points) + draw_series(canvas, plot_rect, history.t, history.ntc, start, end, left_rng, RGB_CYAN, max_points) + if right_rng is not None: + draw_series(canvas, plot_rect, history.t, history.scene_mid, start, end, right_rng, RGB_BLUE, max_points) + draw_series(canvas, plot_rect, history.t, history.center, start, end, right_rng, RGB_GREEN, max_points) + + cv2.rectangle(canvas, (px, py), (px + pw, py + ph), RGB_GRID, 1) + + draw_text(canvas, "FPA/NTC C", px, y0 + 18, RGB_RED, 0.42) + draw_text(canvas, "scene/center C", px + pw - 118, y0 + 18, RGB_GREEN, 0.42) + + legend_x = px + 8 + legend_y = py + 18 + legend = [ + ("FPA", latest_valid(history.fpa), RGB_RED), + ("NTC", latest_valid(history.ntc), RGB_CYAN), + ("scene", latest_valid(history.scene_mid), RGB_BLUE), + ("center", latest_valid(history.center), RGB_GREEN), + ] + for label, val, color in legend: + text = "%s %sC" % (label, fmt_float(val, 1)) + draw_text(canvas, text, legend_x, legend_y, color, 0.42) + legend_x += max(74, len(text) * 8) + + span = history.t[end - 1] - history.t[start] + draw_text(canvas, "window %.0f min" % (span / 60.0), px + pw - 104, py + ph - 8, RGB_MUTED, 0.36) + + +def render_preview(canvas, frame, rect, lut, color_buf): + x0, y0, w, h = rect + cv2.rectangle(canvas, (x0, y0), (x0 + w, y0 + h), RGB_PANEL, -1) + if frame is None: + draw_text(canvas, "No frame", x0 + 18, y0 + h // 2, RGB_MUTED, 0.65) + return + + gray = cv2.flip(frame["pixels"], -1) + np.take(lut, gray, axis=0, out=color_buf) + preview = cv2.resize(color_buf, (w, h), interpolation=cv2.INTER_LINEAR) + canvas[y0:y0 + h, x0:x0 + w, :] = preview + cv2.rectangle(canvas, (x0, y0), (x0 + w, y0 + h), RGB_GRID, 1) + + +def draw_status_panel(canvas, rect, sample, csv_path: str, status: str, saved_msg: str): + x0, y0, w, h = rect + cv2.rectangle(canvas, (x0, y0), (x0 + w, y0 + h), RGB_PANEL, -1) + cv2.rectangle(canvas, (x0, y0), (x0 + w, y0 + h), RGB_GRID, 1) + draw_hud_corners(canvas, rect) + + elapsed = "" + if sample is not None: + elapsed = "%.1fh/%.0fh" % (sample["t"] / 3600.0, RUN_TARGET_SEC / 3600.0) + elapsed_w = text_size(elapsed, 0.46, 1)[0] + 10 if elapsed and w >= 250 else 0 + status_text = compact_status(status) + status_text = fit_text_width(status_text, w - elapsed_w - 18, 0.55, 2) + draw_text(canvas, status_text, x0 + 8, y0 + 24, RGB_WHITE, 0.55, 2) + if elapsed_w: + draw_text(canvas, elapsed, x0 + w - elapsed_w + 2, y0 + 24, RGB_MUTED, 0.46) + cv2.line(canvas, (x0 + 8, y0 + 36), (x0 + w - 8, y0 + 36), RGB_GRID, 1) + cv2.line(canvas, (x0 + 8, y0 + 38), (x0 + max(9, w // 3), y0 + 38), RGB_BLUE, 1) + + if sample is None: + draw_text(canvas, fit_text_width("Waiting for samples", w - 18, 0.56, 1), + x0 + 10, y0 + 70, RGB_MUTED, 0.56) + return + + main = "CENTER %sC" % fmt_float(sample["center"], 1) + draw_text(canvas, fit_text_width(main, w - 18, 0.90, 2), x0 + 10, y0 + 74, + RGB_GREEN, 0.90, 2) + + gauge_y = y0 + 84 + gauge_x0 = x0 + 10 + gauge_x1 = x0 + w - 10 + cv2.line(canvas, (gauge_x0, gauge_y), (gauge_x1, gauge_y), RGB_GRID, 1) + if finite(sample["min"]) and finite(sample["max"]) and sample["max"] > sample["min"]: + marker = int((sample["center"] - sample["min"]) / + (sample["max"] - sample["min"]) * (gauge_x1 - gauge_x0)) + marker = max(0, min(gauge_x1 - gauge_x0, marker)) + mx = gauge_x0 + marker + cv2.line(canvas, (gauge_x0, gauge_y), (mx, gauge_y), RGB_GREEN, 2) + cv2.line(canvas, (mx, gauge_y - 4), (mx, gauge_y + 4), RGB_CYAN, 1) + + center_line = "SCENE %sC MIN %sC MAX %sC" % ( + fmt_float(sample["scene_mid"], 1), + fmt_float(sample["min"], 1), + fmt_float(sample["max"], 1), + ) + draw_text(canvas, fit_text_width(center_line, w - 18, 0.43, 1), + x0 + 10, y0 + 106, RGB_MUTED, 0.43) + + telemetry_line = "FPA %sC NTC %sC" % ( + fmt_float(sample["fpa"], 1), + fmt_float(sample["ntc"], 1), + ) + if h >= 122: + draw_text(canvas, fit_text_width(telemetry_line, w - 18, 0.40, 1), + x0 + 10, y0 + 124, RGB_CYAN, 0.40) + + if h >= 142: + msg = fit_text_width(compact_saved_msg(saved_msg, 48), w - 20, 0.36, 1) + if msg: + draw_text(canvas, msg, x0 + 10, y0 + h - 10, RGB_BLUE, 0.36) + + +def render_screen(width: int, height: int, history: HistoryStore, frame, sample, + csv_path: str, status: str, saved_msg: str, lut, color_buf, + back_rect, save_rect): + canvas = np.zeros((height, width, 3), dtype=np.uint8) + canvas[:, :, :] = RGB_BG + + cv2.rectangle(canvas, (back_rect[0], back_rect[1]), + (back_rect[0] + back_rect[2], back_rect[1] + back_rect[3]), RGB_PANEL, -1) + cv2.rectangle(canvas, (save_rect[0], save_rect[1]), + (save_rect[0] + save_rect[2], save_rect[1] + save_rect[3]), RGB_BLUE, -1) + draw_centered_text(canvas, "BACK", back_rect, RGB_WHITE, 0.64, 2) + draw_centered_text(canvas, "SAVE", save_rect, RGB_WHITE, 0.64, 2) + title_x = back_rect[0] + back_rect[2] + 12 + title = fit_text_width("TN160 history", save_rect[0] - title_x - 8, 0.66, 2) + draw_text(canvas, title, title_x, 25, RGB_WHITE, 0.66, 2) + + top = max(back_rect[3], save_rect[3]) + 8 + preview_h = max(112, int((height - top) * 0.40)) + preview_w = max(132, int(width * 0.32)) + if width - preview_w - 24 < 250: + preview_w = max(112, width - 274) + preview_rect = (8, top, preview_w, preview_h) + status_rect = (preview_rect[0] + preview_rect[2] + 8, top, + width - preview_rect[2] - 24, preview_h) + chart_rect = (8, top + preview_h + 8, width - 16, height - top - preview_h - 14) + + render_preview(canvas, frame, preview_rect, lut, color_buf) + draw_status_panel(canvas, status_rect, sample, csv_path, status, saved_msg) + draw_chart(canvas, chart_rect, history, LIVE_WINDOW_SEC, max(120, width * 2)) + return canvas + + +def render_history_png(history: HistoryStore, csv_path: str, path: str) -> bool: + if len(history) < 2: + return False + width, height = 1400, 780 + canvas = np.zeros((height, width, 3), dtype=np.uint8) + canvas[:, :, :] = RGB_BG + draw_text(canvas, "TN160 temperature history", 28, 42, RGB_WHITE, 0.8, 2) + draw_text(canvas, "samples %d elapsed %.2f h csv %s" % + (len(history), history.t[-1] / 3600.0, csv_path), + 28, 76, RGB_MUTED, 0.5) + draw_chart(canvas, (28, 108, width - 56, height - 150), + history, None, max(1000, width * 3)) + + ensure_dir(os.path.dirname(path) or ".") + bgr = cv2.cvtColor(canvas, cv2.COLOR_RGB2BGR) + return bool(cv2.imwrite(path, bgr)) + + +def save_png(history: HistoryStore, csv_path: str): + ensure_dir(OUTPUT_DIR) + path = os.path.join(OUTPUT_DIR, "trend_%s.png" % stamp()) + if render_history_png(history, csv_path, path): + return path + return None + + +def show_error_page(disp, message: str) -> None: + w, h = disp.width(), disp.height() + canvas = np.zeros((h, w, 3), dtype=np.uint8) + canvas[:, :, :] = RGB_BLACK + draw_text(canvas, message, max(8, w // 2 - 150), h // 2, RGB_WHITE, 0.55) + disp.show(image.cv2image(canvas, bgr=False, copy=True)) + + +def main() -> None: + disp = display.Display() + disp.set_hmirror(False) + disp.set_vflip(True) + ts = touchscreen.TouchScreen() + + width, height = disp.width(), disp.height() + button_h = max(32, height // 11) + back_rect = [0, 0, max(68, width // 8), button_h] + save_rect = [width - max(76, width // 7), 0, max(76, width // 7), button_h] + + lut = build_lut() + color_buf = np.zeros((PMOD_H, PMOD_W, 3), dtype=np.uint8) + history = HistoryStore() + serial = None + csv_log = None + + try: + serial = open_thermal_uart() + except Exception as exc: + logger.error("UART init failed: %s", exc) + show_error_page(disp, "thermal160 UART init failed") + time.sleep(3.0) + return + + csv_log = CsvLogger(OUTPUT_DIR) + + buffer = bytearray() + skip = 0 + frame_count = 0 + latest_frame = None + latest_sample = None + last_frame_time = None + last_sample_time = 0.0 + last_draw_time = 0.0 + last_touch_time = 0.0 + saved_msg = "" + t_start = time.time() + + logger.info("History CSV: %s", csv_log.path) + + try: + while not app.need_exit(): + now = time.time() + x, y, pressed = ts.read() + if pressed and now - last_touch_time > 0.35: + last_touch_time = now + if is_in_button(x, y, back_rect): + app.set_exit_flag(True) + break + if is_in_button(x, y, save_rect): + path = save_png(history, csv_log.path) + if path: + saved_msg = "saved " + path + logger.info("Saved trend PNG: %s", path) + else: + saved_msg = "not enough samples" + + chunk = read_uart(serial, UART_BUFFER_SIZE, UART_READ_TIMEOUT_MS) + if chunk: + buffer.extend(bytes(chunk)) + frame, skip, frame_count = drain_latest_frame(buffer, skip, frame_count) + if frame is not None: + latest_frame = frame + last_frame_time = now + + if latest_frame is not None and now - last_sample_time >= SAMPLE_INTERVAL_SEC: + latest_sample = frame_to_sample(latest_frame, now - t_start) + history.append(latest_sample) + csv_log.write(latest_sample) + last_sample_time = now + + if now - last_draw_time >= DISPLAY_REFRESH_SEC: + elapsed = now - t_start + if last_frame_time is None: + if elapsed < STARTUP_GRACE_SEC: + dots = "." * ((int(elapsed * 3.0) % 3) + 1) + status = "initializing thermal160" + dots + else: + status = "thermal160 device not found" + else: + age = now - last_frame_time + if age > 2.0: + status = "no new frame %.1fs" % age + else: + status = "recording %.1ffps" % (frame_count / max(elapsed, 0.001)) + + canvas = render_screen(width, height, history, latest_frame, latest_sample, + csv_log.path, status, saved_msg, lut, color_buf, + back_rect, save_rect) + disp.show(image.cv2image(canvas, bgr=False, copy=True)) + last_draw_time = now + + except Exception as exc: + logger.error("Fatal: %s", exc) + raise + finally: + if csv_log is not None: + try: + csv_log.close() + except Exception: + pass + if serial is not None: + serial.close() + if csv_log is not None and len(history) >= 2: + path = save_png(history, csv_log.path) + if path: + logger.info("Saved exit trend PNG: %s", path) + + +if __name__ == "__main__": + main()