diff --git a/tools/launcher/common/smoke/hostname.sh b/tools/launcher/common/smoke/hostname.sh new file mode 100755 index 00000000000..e48ba548c2b --- /dev/null +++ b/tools/launcher/common/smoke/hostname.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +echo "MODEL_OPT_HOSTNAME_SMOKE_START" +hostname +echo "MODEL_OPT_HOSTNAME_SMOKE_DONE" diff --git a/tools/mcp/modelopt_mcp/bridge.py b/tools/mcp/modelopt_mcp/bridge.py index 8d8965163d2..5ac251e83b1 100644 --- a/tools/mcp/modelopt_mcp/bridge.py +++ b/tools/mcp/modelopt_mcp/bridge.py @@ -34,6 +34,7 @@ from __future__ import annotations import hashlib +import json import os import re import shutil @@ -68,6 +69,25 @@ _GIT_SHA_RE = re.compile(r"^[0-9a-fA-F]{7,40}$") _SAFE_PATH_TOKEN_RE = re.compile(r"[^A-Za-z0-9_.-]+") +_SLURM_TERMINAL_STATES: frozenset[str] = frozenset( + { + "COMPLETED", + "FAILED", + "CANCELLED", + "TIMEOUT", + "OUT_OF_MEMORY", + "NODE_FAIL", + "PREEMPTED", + "BOOT_FAIL", + } +) +_SLURM_FAILURE_STATES: frozenset[str] = frozenset( + {"FAILED", "CANCELLED", "TIMEOUT", "OUT_OF_MEMORY", "NODE_FAIL", "PREEMPTED", "BOOT_FAIL"} +) +_SLURM_RUNNING_STATES: frozenset[str] = frozenset( + {"PENDING", "RUNNING", "CONFIGURING", "COMPLETING", "REQUEUED", "RESIZING", "SUSPENDED"} +) +_SLURM_STATUS_META = ".modelopt_mcp_slurm.json" @dataclass @@ -551,9 +571,7 @@ def _launcher_argv(abs_yaml: Path, checkout: SourceCheckout | None, *flags: str) return [ _uv_binary(), "run", - "--reinstall-package", - "modelopt-launcher", - "--project", + "--with-editable", str(checkout.launcher_dir), "modelopt-launcher", "--yaml", @@ -1345,6 +1363,16 @@ def submit_job_impl( **_source_result_fields(checkout), } + _write_slurm_status_metadata( + experiment_id=experiment_id, + slurm_job_id=slurm_job_id, + cluster_host=cluster_host, + cluster_user=cluster_user, + identity=identity, + control_socket=control_socket, + reconnect_command=reconnect_command, + ) + return { "ok": True, "executor": "slurm", @@ -1593,6 +1621,172 @@ def _experiment_not_found_diagnostic() -> str: ) +def _local_completion_status(done_marker: Path, any_failed: bool) -> str: + """Return status based on local nemo_run completion markers.""" + if done_marker.exists(): + return "failed" if any_failed else "done" + return "running" + + +def _write_slurm_status_metadata( + *, + experiment_id: str, + slurm_job_id: str | None, + cluster_host: str | None, + cluster_user: str | None, + identity: str | None, + control_socket: str | None, + reconnect_command: str | None, +) -> None: + """Persist Slurm submit metadata so status polling can query Slurm.""" + if not slurm_job_id or not cluster_host: + return + exp_dir = _resolve_experiment_dir(experiment_id) + if exp_dir is None: + return + payload = { + "executor": "slurm", + "experiment_id": experiment_id, + "slurm_job_id": slurm_job_id, + "cluster_host": cluster_host, + "cluster_user": cluster_user, + "identity": identity, + "control_socket": os.path.expanduser(control_socket) if control_socket else None, + "reconnect_command": reconnect_command, + } + try: + (exp_dir / _SLURM_STATUS_META).write_text( + json.dumps(payload, sort_keys=True, indent=2), + encoding="utf-8", + ) + except OSError: + # Status remains filesystem-based if the sidecar cannot be written. + return + + +def _load_slurm_status_metadata(exp_dir: Path) -> dict | None: + """Load Slurm status metadata written by submit_job_impl.""" + try: + payload = json.loads((exp_dir / _SLURM_STATUS_META).read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + if payload.get("executor") != "slurm": + return None + if not payload.get("slurm_job_id") or not payload.get("cluster_host"): + return None + return payload + + +def _ssh_argv_for_slurm(meta: dict, remote_command: str) -> list[str]: + """Build a fixed SSH argv for querying Slurm status.""" + argv = ["ssh", "-o", "BatchMode=yes"] + if meta.get("identity"): + argv += ["-i", str(meta["identity"])] + if meta.get("control_socket"): + argv += [ + "-o", + f"ControlPath={os.path.expanduser(str(meta['control_socket']))}", + "-o", + "ControlMaster=no", + ] + user = meta.get("cluster_user") + host = meta["cluster_host"] + target = f"{user}@{host}" if user else host + return [*argv, target, remote_command] + + +def _parse_slurm_records(text: str, job_id: str) -> dict[str, str] | None: + """Return the parent Slurm job record from sacct -P output.""" + lines = [line for line in text.splitlines() if line.strip()] + if len(lines) < 2: + return None + headers = lines[0].split("|") + for line in lines[1:]: + values = line.split("|") + record = dict(zip(headers, values)) + if record.get("JobID") == job_id: + return record + return None + + +def _query_slurm_status(meta: dict) -> dict: + """Query remote Slurm for a detached job's authoritative status.""" + job_id = str(meta["slurm_job_id"]) + sacct_cmd = f"sacct -j {job_id} --format=JobID,State,ExitCode,Elapsed,NodeList -P" + try: + proc = subprocess.run( # nosec B603 - fixed ssh argv, no shell. + _ssh_argv_for_slurm(meta, sacct_cmd), + capture_output=True, + text=True, + timeout=30, + check=False, + ) + except (FileNotFoundError, subprocess.TimeoutExpired) as e: + return { + "ok": False, + "reason": "slurm_status_query_failed", + "diagnostic": f"Unable to query Slurm status via SSH: {e}.", + } + + record = _parse_slurm_records(proc.stdout, job_id) if proc.returncode == 0 else None + if record is None: + squeue_cmd = f"squeue -j {job_id} -h -o '%T|%M|%R'" + try: + squeue = subprocess.run( # nosec B603 - fixed ssh argv, no shell. + _ssh_argv_for_slurm(meta, squeue_cmd), + capture_output=True, + text=True, + timeout=30, + check=False, + ) + except (FileNotFoundError, subprocess.TimeoutExpired) as e: + return { + "ok": False, + "reason": "slurm_status_query_failed", + "diagnostic": f"Unable to query Slurm queue via SSH: {e}.", + "sacct_stderr_tail": _tail(proc.stderr), + } + if squeue.returncode == 0 and squeue.stdout.strip(): + state = squeue.stdout.strip().split("|", 1)[0].upper() + return { + "ok": True, + "status": "running", + "slurm_state": state, + "slurm_job_id": job_id, + "slurm_source": "squeue", + } + return { + "ok": False, + "reason": "slurm_status_not_found", + "diagnostic": "Slurm job was not found by sacct or squeue.", + "slurm_job_id": job_id, + "sacct_stdout_tail": _tail(proc.stdout), + "sacct_stderr_tail": _tail(proc.stderr), + "squeue_stdout_tail": _tail(squeue.stdout), + "squeue_stderr_tail": _tail(squeue.stderr), + } + + state = (record.get("State", "").split() or [""])[0].upper() + if state in _SLURM_FAILURE_STATES: + status = "failed" + elif state in _SLURM_TERMINAL_STATES: + status = "done" + elif state in _SLURM_RUNNING_STATES or state: + status = "running" + else: + status = "unknown" + return { + "ok": True, + "status": status, + "slurm_state": state, + "slurm_job_id": job_id, + "slurm_exit_code": record.get("ExitCode"), + "slurm_elapsed": record.get("Elapsed"), + "slurm_node_list": record.get("NodeList"), + "slurm_source": "sacct", + } + + def job_status_impl(experiment_id: str) -> dict: """Read filesystem-based status from a nemo_run experiment dir. @@ -1636,10 +1830,26 @@ def job_status_impl(experiment_id: str) -> dict: if first_word in _STATUS_FAILURE_WORDS: any_failed = True - if done_marker.exists(): - overall = "failed" if any_failed else "done" - else: - overall = "running" + slurm_meta = _load_slurm_status_metadata(exp_dir) + if slurm_meta is not None: + slurm_status = _query_slurm_status(slurm_meta) + if slurm_status.get("ok"): + overall = slurm_status.get("status", "unknown") + elif slurm_status.get("reason") == "slurm_status_not_found": + overall = _local_completion_status(done_marker, any_failed) + else: + overall = "running" + return { + "ok": True, + "experiment_id": experiment_id, + "experiment_dir": str(exp_dir), + "status": overall, + "task_statuses": task_statuses, + "has_done_marker": done_marker.exists(), + "slurm_status": slurm_status, + } + + overall = _local_completion_status(done_marker, any_failed) return { "ok": True, diff --git a/tools/mcp/tests/test_bridge.py b/tools/mcp/tests/test_bridge.py index bca1e92f217..69d57822119 100644 --- a/tools/mcp/tests/test_bridge.py +++ b/tools/mcp/tests/test_bridge.py @@ -17,6 +17,7 @@ from __future__ import annotations +import json import subprocess import pytest @@ -391,7 +392,7 @@ def fake_resolve(repo, ref): def test_submit_job_dry_run_uses_managed_source_checkout(monkeypatch, tmp_path): - """Managed source routes launcher execution through uv --project .""" + """Managed source routes launcher execution through an editable checkout.""" checkout_root = tmp_path / "checkout" yaml_dir = checkout_root / "tools" / "launcher" / "examples" / "fam" / "model" yaml_dir.mkdir(parents=True) @@ -440,12 +441,10 @@ def fake_run(argv, **kwargs): assert result["ok"] is True assert result["source_ref"] == "feature/ref" assert result["source_sha"] == "b" * 40 - assert captured["argv"][:7] == [ + assert captured["argv"][:5] == [ "uv", "run", - "--reinstall-package", - "modelopt-launcher", - "--project", + "--with-editable", str(checkout_root / "tools" / "launcher"), "modelopt-launcher", ] @@ -664,6 +663,9 @@ def test_submit_job_slurm_parses_nemo_job_id(monkeypatch, tmp_path): yaml_path = yaml_dir / "config.yaml" yaml_path.write_text("job_name: t\npipeline: []\n") monkeypatch.setenv("MODELOPT_LAUNCHER_EXAMPLES_DIR", str(yaml_dir)) + monkeypatch.setenv("NEMORUN_HOME", str(tmp_path)) + exp_dir = tmp_path / "experiments" / "cicd" / "cicd_1782173197" + exp_dir.mkdir(parents=True) monkeypatch.setattr( bridge, "verify_slurm_setup_impl", @@ -698,6 +700,10 @@ def fake_run(argv, **kwargs): assert result["ok"] is True assert result["slurm_job_id"] == "13049989" assert result["experiment_id"] == "cicd_1782173197" + meta = json.loads((exp_dir / bridge._SLURM_STATUS_META).read_text()) + assert meta["slurm_job_id"] == "13049989" + assert meta["cluster_host"] == "cluster.example.com" + assert meta["cluster_user"] == "user" def test_submit_job_slurm_accepts_nmm_cluster_fields(monkeypatch, tmp_path): @@ -1127,6 +1133,149 @@ def test_job_status_nested_nemo_title_dir(tmp_path, monkeypatch): assert result["status"] == "running" +def test_job_status_slurm_sidecar_overrides_local_done_marker(tmp_path, monkeypatch): + """Detached Slurm jobs are not done until Slurm says they are done.""" + exp = tmp_path / "experiments" / "cicd" / "exp_slurm_running" + exp.mkdir(parents=True) + (exp / "_DONE").touch() + (exp / bridge._SLURM_STATUS_META).write_text( + json.dumps( + { + "executor": "slurm", + "experiment_id": "exp_slurm_running", + "slurm_job_id": "12345", + "cluster_host": "cluster.example.com", + "cluster_user": "alice", + } + ) + ) + monkeypatch.setenv("NEMORUN_HOME", str(tmp_path)) + + def fake_run(argv, **kwargs): + assert argv[:2] == ["ssh", "-o"] + return subprocess.CompletedProcess( + args=argv, + returncode=0, + stdout=("JobID|State|ExitCode|Elapsed|NodeList\n12345|RUNNING|0:0|00:00:30|node001\n"), + stderr="", + ) + + monkeypatch.setattr(subprocess, "run", fake_run) + + result = bridge.job_status_impl("exp_slurm_running") + + assert result["ok"] is True + assert result["status"] == "running" + assert result["has_done_marker"] is True + assert result["slurm_status"]["slurm_state"] == "RUNNING" + + +def test_job_status_slurm_sidecar_reports_terminal_state(tmp_path, monkeypatch): + """Slurm terminal state becomes the MCP terminal status.""" + exp = tmp_path / "experiments" / "cicd" / "exp_slurm_done" + exp.mkdir(parents=True) + (exp / "_DONE").touch() + (exp / bridge._SLURM_STATUS_META).write_text( + json.dumps( + { + "executor": "slurm", + "experiment_id": "exp_slurm_done", + "slurm_job_id": "12346", + "cluster_host": "cluster.example.com", + "cluster_user": "alice", + } + ) + ) + monkeypatch.setenv("NEMORUN_HOME", str(tmp_path)) + + monkeypatch.setattr( + subprocess, + "run", + lambda argv, **kwargs: subprocess.CompletedProcess( + args=argv, + returncode=0, + stdout=( + "JobID|State|ExitCode|Elapsed|NodeList\n12346|COMPLETED|0:0|00:02:42|node001\n" + ), + stderr="", + ), + ) + + result = bridge.job_status_impl("exp_slurm_done") + + assert result["ok"] is True + assert result["status"] == "done" + assert result["slurm_status"]["slurm_exit_code"] == "0:0" + + +def test_job_status_slurm_not_found_falls_back_to_local_done_marker(tmp_path, monkeypatch): + """Aged-out Slurm records should not make completed local experiments run forever.""" + exp = tmp_path / "experiments" / "cicd" / "exp_slurm_aged_out" + exp.mkdir(parents=True) + (exp / "_DONE").touch() + (exp / bridge._SLURM_STATUS_META).write_text( + json.dumps( + { + "executor": "slurm", + "experiment_id": "exp_slurm_aged_out", + "slurm_job_id": "12348", + "cluster_host": "cluster.example.com", + "cluster_user": "alice", + } + ) + ) + monkeypatch.setenv("NEMORUN_HOME", str(tmp_path)) + monkeypatch.setattr( + bridge, + "_query_slurm_status", + lambda meta: { + "ok": False, + "reason": "slurm_status_not_found", + "slurm_job_id": meta["slurm_job_id"], + }, + ) + + result = bridge.job_status_impl("exp_slurm_aged_out") + + assert result["ok"] is True + assert result["status"] == "done" + assert result["slurm_status"]["reason"] == "slurm_status_not_found" + + +def test_job_status_slurm_not_found_falls_back_to_local_failed_marker(tmp_path, monkeypatch): + """Aged-out Slurm records still preserve local task failure status.""" + exp = tmp_path / "experiments" / "cicd" / "exp_slurm_aged_out_failed" + exp.mkdir(parents=True) + (exp / "_DONE").touch() + (exp / "status_task_0.out").write_text("failed (rc=1)\n") + (exp / bridge._SLURM_STATUS_META).write_text( + json.dumps( + { + "executor": "slurm", + "experiment_id": "exp_slurm_aged_out_failed", + "slurm_job_id": "12349", + "cluster_host": "cluster.example.com", + "cluster_user": "alice", + } + ) + ) + monkeypatch.setenv("NEMORUN_HOME", str(tmp_path)) + monkeypatch.setattr( + bridge, + "_query_slurm_status", + lambda meta: { + "ok": False, + "reason": "slurm_status_not_found", + "slurm_job_id": meta["slurm_job_id"], + }, + ) + + result = bridge.job_status_impl("exp_slurm_aged_out_failed") + + assert result["ok"] is True + assert result["status"] == "failed" + + def test_job_status_launcher_experiments_fallback(tmp_path, monkeypatch): """Resolve experiments under the installed launcher's package directory.""" launcher_dir = tmp_path / "launcher" @@ -1271,6 +1420,49 @@ def fake_status(experiment_id): assert call_count["n"] >= 2 +def test_wait_for_experiment_polls_slurm_despite_local_done_marker(tmp_path, monkeypatch): + """Detached Slurm local _DONE does not stop wait before Slurm is terminal.""" + exp = tmp_path / "experiments" / "cicd" / "exp_slurm_wait" + exp.mkdir(parents=True) + (exp / "_DONE").touch() + (exp / bridge._SLURM_STATUS_META).write_text( + json.dumps( + { + "executor": "slurm", + "experiment_id": "exp_slurm_wait", + "slurm_job_id": "12347", + "cluster_host": "cluster.example.com", + "cluster_user": "alice", + } + ) + ) + monkeypatch.setenv("NEMORUN_HOME", str(tmp_path)) + + calls = {"n": 0} + + def fake_run(argv, **kwargs): + calls["n"] += 1 + state = "RUNNING" if calls["n"] == 1 else "COMPLETED" + return subprocess.CompletedProcess( + args=argv, + returncode=0, + stdout=(f"JobID|State|ExitCode|Elapsed|NodeList\n12347|{state}|0:0|00:00:30|node001\n"), + stderr="", + ) + + monkeypatch.setattr(subprocess, "run", fake_run) + + result = bridge.wait_for_experiment_impl( + "exp_slurm_wait", + timeout_sec=10, + poll_interval_sec=0, + ) + + assert result["ok"] is True + assert result["status"] == "done" + assert calls["n"] == 2 + + def test_wait_for_experiment_timeout(tmp_path, monkeypatch): """Never reaches terminal → wait_timeout with last_status.""" exp = tmp_path / "experiments" / "exp_stuck"