diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b5b7f12..cdfbe0c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,9 @@ jobs: cache: pip cache-dependency-path: pyproject.toml - run: pip install --upgrade pip - - run: pip install ruff + # Pinned so the lint result is reproducible (unpinned ruff silently + # adds rules over time). Keep in sync with the ``dev`` extra in pyproject. + - run: pip install ruff==0.15.15 - run: ruff check . test: diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d9bd98..44f3149 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,34 @@ Milestones in the raven-python port. For function-level status see [docs/raven_migration.md](https://github.com/SysBioChalmers/raven-python/blob/develop/docs/reference/migration.md); for open work see [docs/todo.md](https://github.com/SysBioChalmers/raven-python/blob/develop/docs/reference/todo.md). +## Unreleased + +Post-release review pass — cobra-aligned hardening (no behaviour change on +well-formed inputs). Highlights: + +* **Packaging:** `raven_python.__version__` now derives from the installed package + metadata (`importlib.metadata`) instead of a hard-coded literal that had drifted + to `0.0.1`; the docs site reported the wrong version. Pinned `ruff==0.15.15` in + both the `dev` extra and CI so the lint result is reproducible, and fixed two + lint errors the unpinned ruff had started flagging. +* **Errors aligned to cobra:** solver/feasibility failures in `run_init`, + `run_ftinit`, `fill_tasks` and `random_sampling` now raise + `cobra.exceptions.OptimizationError` (already used elsewhere in the package) + instead of a bare `RuntimeError`. +* **Consistency:** a single `utils.parse.subsystem_to_str` coerces a reaction + `subsystem` to cobra's canonical `str` everywhere it is rendered/compared + (`io.excel`, `comparison.compare`, `curation.batch`, `manipulation.add`) — fixes + a crash on non-string subsystem items and the silent drop of multi-subsystem + reactions. GPR score-aggregation (`AGGREGATORS` / `resolve_aggregators`) is now + shared by `init.score` and `init.genes`. Maintainer-side KEGG-download progress + uses a module logger instead of `print`. +* **Robustness:** path-traversal guard on bundled-ZIP extraction (`binaries.py`, + matching the tarfile `filter="data"` precedent); `connect_blocked_reactions` + rejects a non-positive `penalty`; `random_sampling` refuses a NaN-contaminated + sample matrix; `ec_data` warns on an all-zero reaction↔enzyme coupling; optional + `verify=` SHA256 re-check on `ensure_data_file` cache hits; reporter p-value + guarded against non-finite z-scores. Regression tests added for each. + ## 0.1.0a1 — 2026-05-30 First alpha release. Covers the functional scope of RAVEN built on cobrapy: diff --git a/pyproject.toml b/pyproject.toml index adb2ab7..376a961 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ dependencies = [ dev = [ "pytest>=7", "pytest-cov", - "ruff>=0.4", + "ruff==0.15.15", ] excel = [ "openpyxl>=3.1", diff --git a/src/raven_python/__init__.py b/src/raven_python/__init__.py index 591c4c1..0c62aba 100644 --- a/src/raven_python/__init__.py +++ b/src/raven_python/__init__.py @@ -7,4 +7,9 @@ sub-cellular localisation, N-model comparison, and the RAVEN-style I/O formats. """ -__version__ = "0.0.1" +from importlib.metadata import PackageNotFoundError, version + +try: + __version__ = version("raven-python") +except PackageNotFoundError: # running from a source tree without an install + __version__ = "0.0.0+unknown" diff --git a/src/raven_python/analysis/reporter.py b/src/raven_python/analysis/reporter.py index 5d96d47..5150968 100644 --- a/src/raven_python/analysis/reporter.py +++ b/src/raven_python/analysis/reporter.py @@ -60,6 +60,8 @@ def _reporter_one(model: cobra.Model, gene_z: dict[str, float], test: str) -> Re raw = zs.sum() / math.sqrt(n) # Exact background correction for sampling-with-replacement (see module doc). corrected = (raw - math.sqrt(n) * mu) / sigma if sigma > 0 else 0.0 + if not math.isfinite(corrected): # never on clamped z-scores; guards the p-value + corrected = 0.0 rows.append( { "metabolite": met.id, diff --git a/src/raven_python/analysis/sampling.py b/src/raven_python/analysis/sampling.py index 429b164..e187ffd 100644 --- a/src/raven_python/analysis/sampling.py +++ b/src/raven_python/analysis/sampling.py @@ -36,6 +36,7 @@ import cobra import numpy as np import pandas as pd +from cobra.exceptions import OptimizationError from cobra.flux_analysis import flux_variability_analysis, pfba logger = logging.getLogger(__name__) @@ -188,11 +189,18 @@ def random_sampling( model.objective = model.problem.Objective(sum(terms), direction="max") sol = model.optimize() if sol.status == "optimal" and abs(sol.objective_value) > 1e-8: - samples[i, :] = (pfba(model) if min_flux else sol).fluxes.reindex(reaction_ids).to_numpy() + fluxes = (pfba(model) if min_flux else sol).fluxes.reindex(reaction_ids) + if fluxes.isna().any(): # solver returned an unexpected reaction set + missing = fluxes.index[fluxes.isna()].tolist() + raise OptimizationError( + "solver returned fluxes missing reaction(s) " + f"{missing[:5]}; cannot assemble a NaN-free sample matrix." + ) + samples[i, :] = fluxes.to_numpy() break if attempt == max_attempts: if not suppress_errors: - raise RuntimeError( + raise OptimizationError( "Could not find a non-zero, loop-free solution after " f"{max_attempts} attempts for sample {i}. Review the model's " "constraints, or set suppress_errors=True." diff --git a/src/raven_python/binaries.py b/src/raven_python/binaries.py index ed7ec7b..d15b0b0 100644 --- a/src/raven_python/binaries.py +++ b/src/raven_python/binaries.py @@ -86,6 +86,22 @@ def _sha256(path: Path) -> str: return h.hexdigest() +def _safe_extract_zip(zf: zipfile.ZipFile, dest_dir: Path) -> None: + """Extract ``zf`` into ``dest_dir``, rejecting members that escape it. + + ``ZipFile.extractall`` has no path-traversal guard (unlike tarfile's + ``filter="data"`` used in reconstruction/kegg/download.py), so a malicious or + corrupt archive could write outside the cache via absolute paths or ``..``. + SHA256 + HTTPS already make a hostile archive unlikely; this is defence in depth. + """ + dest = dest_dir.resolve() + for member in zf.namelist(): + target = (dest_dir / member).resolve() + if target != dest and dest not in target.parents: + raise ValueError(f"unsafe path in archive (path traversal): {member!r}") + zf.extractall(dest_dir) + + def ensure_binary(executable: str, *, registry: dict | None = None) -> Path: """Download (if needed) and return the path to a bundled ``executable``. @@ -134,7 +150,7 @@ def ensure_binary(executable: str, *, registry: dict | None = None) -> Path: finally: part.unlink(missing_ok=True) with zipfile.ZipFile(archive) as zf: - zf.extractall(dest_dir) + _safe_extract_zip(zf, dest_dir) archive.unlink(missing_ok=True) if not exe.exists(): raise FileNotFoundError(f"{executable!r} not found in the extracted bundle at {dest_dir}.") diff --git a/src/raven_python/comparison/compare.py b/src/raven_python/comparison/compare.py index c7d38a1..36e8b73 100644 --- a/src/raven_python/comparison/compare.py +++ b/src/raven_python/comparison/compare.py @@ -18,6 +18,7 @@ import pandas as pd from raven_python.tasks import Task, check_tasks +from raven_python.utils.parse import subsystem_to_str @dataclass @@ -60,12 +61,9 @@ def _subsystem_counts(model: cobra.Model) -> dict[str, int]: """{subsystem_name: reaction_count}. Reactions with empty subsystem fall under '(none)'.""" counts: dict[str, int] = {} for r in model.reactions: - # cobra stores subsystem as a string; RAVEN sometimes uses cell-of-cells (we'd - # already have it as a string here, but guard against list/tuple from messy YAML). - sub = r.subsystem - if isinstance(sub, (list, tuple)): - sub = sub[0] if sub else "" - sub = (sub or "").strip() or "(none)" + # cobra stores subsystem as a string; RAVEN sometimes uses cell-of-cells. + # Coerce to the same ;-joined string used everywhere else (no data lost). + sub = subsystem_to_str(r.subsystem).strip() or "(none)" counts[sub] = counts.get(sub, 0) + 1 return counts diff --git a/src/raven_python/curation/batch.py b/src/raven_python/curation/batch.py index 1bdeb23..0f00b4d 100644 --- a/src/raven_python/curation/batch.py +++ b/src/raven_python/curation/batch.py @@ -8,6 +8,8 @@ import cobra import pandas as pd +from raven_python.utils.parse import subsystem_to_str + #: Core columns recognised in ``mets_df``. Anything else is treated as a #: MIRIAM annotation column (the header becomes the namespace key). DEFAULT_CORE_MET_COLUMNS: tuple[str, ...] = ( @@ -369,7 +371,7 @@ def _update_reaction(rxn: cobra.Reaction, row: pd.Series, miriam_cols: list[str] if _has_value(row.get("ub")): rxn.upper_bound = float(row["ub"]) if _has_value(row.get("subSystems")): - rxn.subsystem = str(row["subSystems"]) + rxn.subsystem = subsystem_to_str(row["subSystems"]) if _has_value(row.get("eccodes")): rxn.annotation["ec-code"] = str(row["eccodes"]) if _has_value(row.get("rxnNotes")): diff --git a/src/raven_python/data.py b/src/raven_python/data.py index ecb46a3..fd26c8d 100644 --- a/src/raven_python/data.py +++ b/src/raven_python/data.py @@ -75,12 +75,18 @@ def ensure_data_file( *, version: str | None = None, registry: dict | None = None, + verify: bool = False, ) -> Path: """Download (if needed) and return the cached path to one artefact file. Looks the file up in the registry for ``dataset`` (at ``version`` or the registry's default), downloads it to the version-pinned cache directory, verifies its SHA256, and returns the path. Re-uses an already-cached copy. + + A freshly downloaded file is always SHA256-checked. ``verify`` additionally + re-checks an *already-cached* file's SHA256 (a mismatch — i.e. a corrupted + cache — discards it and re-downloads); it is off by default so the common + cache-hit path stays fast. """ registry = _DATA_REGISTRY if registry is None else registry bundle = _bundle(dataset, registry) @@ -95,7 +101,9 @@ def ensure_data_file( dest_dir = _data_cache_dir() / f"{dataset}-{ver}" dest = dest_dir / filename if dest.exists(): - return dest + if not verify or _sha256(dest) == entry["sha256"]: + return dest + dest.unlink() # corrupted cache → fall through and re-download dest_dir.mkdir(parents=True, exist_ok=True) tmp = dest.with_name(dest.name + ".part") diff --git a/src/raven_python/gapfilling/fill.py b/src/raven_python/gapfilling/fill.py index ba3418d..ad0f1ab 100644 --- a/src/raven_python/gapfilling/fill.py +++ b/src/raven_python/gapfilling/fill.py @@ -137,6 +137,8 @@ def connect_blocked_reactions( The draft is expected to have exchange reactions for its nutrients (otherwise most reactions are trivially blocked). """ + if penalty <= 0: + raise ValueError(f"penalty must be > 0 (a non-positive cost malforms the MILP); got {penalty}.") templates = _as_models(templates) blocked = set(find_blocked_reactions(model)) candidates = [r for r in blocked if model.reactions.get_by_id(r).lower_bound >= 0] diff --git a/src/raven_python/init/ftinit.py b/src/raven_python/init/ftinit.py index b355e45..58e4ab9 100644 --- a/src/raven_python/init/ftinit.py +++ b/src/raven_python/init/ftinit.py @@ -51,6 +51,7 @@ from dataclasses import dataclass, field import cobra +from cobra.exceptions import OptimizationError from optlang.symbolics import Real, add, mul from raven_python.init.genes import remove_low_score_genes @@ -210,7 +211,7 @@ def add_constraint(expr, **kw): opt.optimize() # Accept a near-optimal incumbent (when a MIP gap / time limit is set), as RAVEN does. if opt.status not in ("optimal", "feasible", "suboptimal", "time_limit"): - raise RuntimeError(f"ftINIT MILP did not solve (status: {opt.status}).") + raise OptimizationError(f"ftINIT MILP did not solve (status: {opt.status}).") # RAVEN: a reaction is "on" iff its indicator ≥ 0.5 (positive indicators are # continuous and can land fractionally when a reaction can carry only tiny flux). diff --git a/src/raven_python/init/genes.py b/src/raven_python/init/genes.py index ceed3da..4076c13 100644 --- a/src/raven_python/init/genes.py +++ b/src/raven_python/init/genes.py @@ -11,13 +11,12 @@ from __future__ import annotations import ast -import statistics from collections.abc import Mapping import cobra from cobra.manipulation import remove_genes -_AGG = {"min": min, "max": max, "median": statistics.median, "average": statistics.fmean} +from raven_python.utils.gpr import resolve_aggregators def _prune(node, scores, iso, cplx) -> tuple[str | None, float | None]: @@ -64,10 +63,7 @@ def remove_low_score_genes( **deterministically** (first on a tie), unlike RAVEN's random tie-break — same quality, reproducible. """ - for name, value in (("isozyme_scoring", isozyme_scoring), ("complex_scoring", complex_scoring)): - if value not in _AGG: - raise ValueError(f"{name} must be one of {sorted(_AGG)}; got {value!r}.") - iso, cplx = _AGG[isozyme_scoring], _AGG[complex_scoring] + iso, cplx = resolve_aggregators(isozyme_scoring, complex_scoring) out = model.copy() for rxn in out.reactions: diff --git a/src/raven_python/init/init.py b/src/raven_python/init/init.py index f23e17a..3cd5942 100644 --- a/src/raven_python/init/init.py +++ b/src/raven_python/init/init.py @@ -39,6 +39,7 @@ from dataclasses import dataclass import cobra +from cobra.exceptions import OptimizationError from optlang.symbolics import Real, add, mul _EPS = 1.0 # flux an included reaction must carry (RAVEN's fake-met unit) @@ -203,7 +204,7 @@ def run_init( opt.optimize() # With a MIP gap / time limit set, accept a near-optimal incumbent (as RAVEN does). if opt.status not in ("optimal", "feasible", "suboptimal", "time_limit"): - raise RuntimeError(f"INIT MILP did not solve (status: {opt.status}).") + raise OptimizationError(f"INIT MILP did not solve (status: {opt.status}).") # A reaction is kept if any of its directed parts is essential or has x≈1. kept_origins = {d.origin for d in directed if d.essential} diff --git a/src/raven_python/init/score.py b/src/raven_python/init/score.py index 6e14f86..c380544 100644 --- a/src/raven_python/init/score.py +++ b/src/raven_python/init/score.py @@ -15,12 +15,11 @@ import ast import math -import statistics from collections.abc import Mapping import cobra -_AGG = {"min": min, "max": max, "median": statistics.median, "average": statistics.fmean} +from raven_python.utils.gpr import resolve_aggregators def gene_scores_from_expression( @@ -70,10 +69,7 @@ def score_reactions_from_genes( no_gene_score: float = -2.0, ) -> dict[str, float]: """Return ``{reaction_id: score}`` from per-gene scores via each reaction's GPR.""" - for name, value in (("isozyme_scoring", isozyme_scoring), ("complex_scoring", complex_scoring)): - if value not in _AGG: - raise ValueError(f"{name} must be one of {sorted(_AGG)}; got {value!r}.") - iso, cplx = _AGG[isozyme_scoring], _AGG[complex_scoring] + iso, cplx = resolve_aggregators(isozyme_scoring, complex_scoring) scores: dict[str, float] = {} for rxn in model.reactions: diff --git a/src/raven_python/init/taskfill.py b/src/raven_python/init/taskfill.py index 58501ce..d90cb00 100644 --- a/src/raven_python/init/taskfill.py +++ b/src/raven_python/init/taskfill.py @@ -18,6 +18,7 @@ from dataclasses import dataclass import cobra +from cobra.exceptions import OptimizationError from optlang.symbolics import Real, add, mul from raven_python.tasks import Task @@ -87,13 +88,15 @@ def _fill_one_task( right trade for tractability, exactly as for the main ftINIT MILP. """ if not candidates: # nothing left to add → task cannot be made feasible - raise RuntimeError(f"gap-filling found no candidates for task {task.id!r}.") + raise OptimizationError(f"gap-filling found no candidates for task {task.id!r}.") combined = _closed_copy(model) # task I/O via the task's b, not the model's exchanges combined.add_reactions([r.copy() for r in candidates]) name_to_id, comp_to_ids = task_name_maps(combined) _, error = apply_task_constraints(combined, task, name_to_id, comp_to_ids) if error is not None: - raise RuntimeError(f"task {task.id!r} could not be applied to the reference: {error}") + raise OptimizationError( + f"task {task.id!r} could not be applied to the reference: {error}" + ) prob = combined.problem extras = [] @@ -126,7 +129,7 @@ def _fill_one_task( # (no incumbent) means the task cannot be satisfied from the reference. if combined.solver.status not in ("optimal", "feasible", "suboptimal", "time_limit") or \ combined.variables[f"_fill_{candidates[0].id}"].primal is None: - raise RuntimeError(f"gap-filling found no way to make task {task.id!r} feasible.") + raise OptimizationError(f"gap-filling found no way to make task {task.id!r} feasible.") return [c.id for c in candidates if (combined.variables[f"_fill_{c.id}"].primal or 0.0) > 0.5] @@ -174,7 +177,7 @@ def fill_tasks( avail = [r for r in candidates if r.id not in present] try: chosen = _fill_one_task(out, avail, task, costs, mip_gap=mip_gap, time_limit=time_limit) - except RuntimeError: + except OptimizationError: failed.append(task.id) continue if chosen: diff --git a/src/raven_python/io/ec_data.py b/src/raven_python/io/ec_data.py index 850c8e8..7ef941e 100644 --- a/src/raven_python/io/ec_data.py +++ b/src/raven_python/io/ec_data.py @@ -45,6 +45,7 @@ from __future__ import annotations import math +import warnings from dataclasses import dataclass, field from typing import Any @@ -263,6 +264,14 @@ def _build_ec_data( ) mat[i, j] = float(stoich) + if n_r > 0 and mat.nnz == 0: # reactions present but none coupled to an enzyme + warnings.warn( + f"ec-rxns has {n_r} entries but none reference an enzyme; the " + "reaction↔enzyme coupling matrix is all-zero (likely malformed " + "ec-rxns/ec-enzymes).", + stacklevel=2, + ) + return EcData( gecko_light=gecko_light, rxns=rxns, diff --git a/src/raven_python/io/excel.py b/src/raven_python/io/excel.py index cf6196e..398b6da 100644 --- a/src/raven_python/io/excel.py +++ b/src/raven_python/io/excel.py @@ -12,6 +12,8 @@ import cobra +from raven_python.utils.parse import subsystem_to_str + def _miriam_string(annotation: dict, exclude: tuple[str, ...] = ()) -> str: """RAVEN MIRIAM column: ``namespace/id;namespace/id2;...`` (sorted).""" @@ -84,9 +86,7 @@ def export_to_excel( "REPLACEMENT ID", "NOTE", "REFERENCE", "CONFIDENCE SCORE"] ) for r in reactions: - subsystem = r.subsystem - if isinstance(subsystem, (list, tuple)): - subsystem = ";".join(subsystem) + subsystem = subsystem_to_str(r.subsystem) ws.append([ None, r.id, r.name, _equation(r), _ec_codes(r), r.gene_reaction_rule, r.lower_bound, r.upper_bound, diff --git a/src/raven_python/localization/predict.py b/src/raven_python/localization/predict.py index 0fb8596..3fc2369 100644 --- a/src/raven_python/localization/predict.py +++ b/src/raven_python/localization/predict.py @@ -223,7 +223,10 @@ def _met_cost(m_id: str) -> float: score_lookup = scores.df # gene_id × compartment → float for g in genes_in_scope: for c in compartments: - s = float(score_lookup.at[g, c]) if c in score_lookup.columns and not pd.isna(score_lookup.at[g, c]) else 0.0 + if c not in score_lookup.columns: + continue + val = score_lookup.at[g, c] + s = 0.0 if pd.isna(val) else float(val) if s: obj_terms.append(mul([Real(s), y[g, c]])) # − transport cost per added transport diff --git a/src/raven_python/manipulation/add.py b/src/raven_python/manipulation/add.py index f5e6495..d772acd 100644 --- a/src/raven_python/manipulation/add.py +++ b/src/raven_python/manipulation/add.py @@ -35,7 +35,7 @@ from cobra import Metabolite, Reaction from cobra.core.gene import GPR -from raven_python.utils.parse import parse_name_comp +from raven_python.utils.parse import parse_name_comp, subsystem_to_str # Reversibility arrows. ``<=>`` must be tried before ``=>`` (it contains it). _REVERSIBLE_ARROWS = ("<=>",) @@ -302,7 +302,9 @@ def add_reactions_from_equations( rxn_id = spec["id"] if rxn_id in model.reactions: raise ValueError( - f"Reaction {rxn_id!r} already exists; use changeRxns or remove it first." + f"Reaction {rxn_id!r} already exists. To change its stoichiometry use " + "change_reaction_equations; to replace it, remove it first with " + "model.remove_reactions([...])." ) if "equation" not in spec: raise ValueError(f"Reaction {rxn_id!r} spec missing required 'equation'.") @@ -324,7 +326,7 @@ def add_reactions_from_equations( lower = config.lower_bound if reversible else 0.0 rxn.bounds = (lower, config.upper_bound) if "subsystem" in spec: - rxn.subsystem = spec["subsystem"] + rxn.subsystem = subsystem_to_str(spec["subsystem"]) model.add_reactions([rxn]) rxn.add_metabolites(coeffs) diff --git a/src/raven_python/manipulation/transport.py b/src/raven_python/manipulation/transport.py index d222377..2edad74 100644 --- a/src/raven_python/manipulation/transport.py +++ b/src/raven_python/manipulation/transport.py @@ -46,11 +46,15 @@ def _transport_id_factory(model: cobra.Model, prefix: str): def next_id() -> str: nonlocal counter - while f"{prefix}{counter:04d}" in model.reactions: + # A free id must appear within len(reactions)+1 tries (only that many are + # occupied); the bound turns any pathological case into a clear error + # instead of spinning forever. + for _ in range(len(model.reactions) + 2): + rid = f"{prefix}{counter:04d}" counter += 1 - rid = f"{prefix}{counter:04d}" - counter += 1 - return rid + if rid not in model.reactions: + return rid + raise RuntimeError(f"could not allocate a free reaction id with prefix {prefix!r}.") return next_id diff --git a/src/raven_python/reconstruction/kegg/download.py b/src/raven_python/reconstruction/kegg/download.py index 8bb1826..d990b42 100644 --- a/src/raven_python/reconstruction/kegg/download.py +++ b/src/raven_python/reconstruction/kegg/download.py @@ -27,12 +27,15 @@ from __future__ import annotations import gzip +import logging import netrc import shutil import tarfile import urllib.request from pathlib import Path +logger = logging.getLogger(__name__) + KEGG_HOST = "ftp.kegg.net" BASE_URL = "https://ftp.kegg.net" @@ -116,12 +119,12 @@ def fetch_kegg_files( target = dest / Path(path).name if target.exists() and not force: if verbose: - print(f" skip (exists): {target.name}") + logger.info("skip (exists): %s", target.name) out.append(target) continue url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" if verbose: - print(f" fetching {path}") + logger.info("fetching %s", path) with opener.open(url) as resp, open(target, "wb") as handle: shutil.copyfileobj(resp, handle) out.append(target) @@ -253,5 +256,5 @@ def download_kegg_dump( verbose=verbose, ) if verbose: - print(">>> Extracting and arranging KEGG dump...") + logger.info("Extracting and arranging KEGG dump...") return extract_kegg_dump(dest) diff --git a/src/raven_python/utils/gpr.py b/src/raven_python/utils/gpr.py index 2e2122d..28818da 100644 --- a/src/raven_python/utils/gpr.py +++ b/src/raven_python/utils/gpr.py @@ -17,11 +17,41 @@ from __future__ import annotations import ast +import statistics +from collections.abc import Callable from dataclasses import dataclass import cobra from cobra.core.gene import GPR +#: Score-aggregation functions for combining gene scores across a GPR: isozymes +#: (genes joined by OR) and complex subunits (joined by AND). Shared by the +#: gene→reaction scoring (:mod:`raven_python.init.score`) and low-score-gene +#: pruning (:mod:`raven_python.init.genes`) so both validate the same way. +AGGREGATORS: dict[str, Callable] = { + "min": min, + "max": max, + "median": statistics.median, + "average": statistics.fmean, +} + + +def resolve_aggregators( + isozyme_scoring: str, complex_scoring: str +) -> tuple[Callable, Callable]: + """Validate the scoring-mode names and return ``(isozyme_fn, complex_fn)``. + + Raises ``ValueError`` naming the offending argument if either mode is not one + of :data:`AGGREGATORS`. + """ + for name, value in ( + ("isozyme_scoring", isozyme_scoring), + ("complex_scoring", complex_scoring), + ): + if value not in AGGREGATORS: + raise ValueError(f"{name} must be one of {sorted(AGGREGATORS)}; got {value!r}.") + return AGGREGATORS[isozyme_scoring], AGGREGATORS[complex_scoring] + def _contains_or(node: ast.AST | None) -> bool: """True if ``node``'s subtree contains an OR operator anywhere.""" diff --git a/src/raven_python/utils/parse.py b/src/raven_python/utils/parse.py index 8068f6c..e35a0d0 100644 --- a/src/raven_python/utils/parse.py +++ b/src/raven_python/utils/parse.py @@ -31,3 +31,29 @@ def parse_name_comp(token: str) -> tuple[str, str | None]: if match: return match.group("name").strip(), match.group("comp").strip() return token.strip(), None + + +def subsystem_to_str(value) -> str: + """Coerce a reaction ``subsystem`` to cobra's canonical ``str`` form. + + cobra defines ``Reaction.subsystem`` as a plain string, but RAVEN/MATLAB and + "messy" YAML sometimes deliver a list/tuple of subsystem names (a reaction can + belong to several). This collapses any such value to a single ``;``-joined + string of its parts — losing no data, unlike taking only the first — and returns + ``""`` for an empty/None subsystem. Used wherever a subsystem is rendered or + compared so the handling is identical across modules. + + Examples + -------- + >>> subsystem_to_str("glycolysis") + 'glycolysis' + >>> subsystem_to_str(["glycolysis", "TCA cycle"]) + 'glycolysis;TCA cycle' + >>> subsystem_to_str(None) + '' + """ + if value is None: + return "" + if isinstance(value, (list, tuple)): + return ";".join(str(s) for s in value if str(s).strip()) + return str(value) diff --git a/tests/test_binaries.py b/tests/test_binaries.py index d74ce0b..9e340b5 100644 --- a/tests/test_binaries.py +++ b/tests/test_binaries.py @@ -78,3 +78,24 @@ def test_ensure_binary_unhosted_platform_raises(tmp_path): registry = {"footool": {"version": "1", "provides": ["footool"], "platforms": {}}} with pytest.raises(FileNotFoundError, match="No bundled"): binaries.ensure_binary("footool", registry=registry) + + +def test_ensure_binary_rejects_path_traversal_zip(tmp_path, monkeypatch): + # A ZIP whose member escapes the extraction dir must be refused, not written + # outside the cache (ZipFile.extractall has no traversal guard of its own). + archive = tmp_path / "evil.zip" + with zipfile.ZipFile(archive, "w") as zf: + zf.writestr("footool", "ok") + zf.writestr("../escape.txt", "pwned") + sha = hashlib.sha256(archive.read_bytes()).hexdigest() + registry = { + "footool": { + "version": "1.0", + "provides": ["footool"], + "platforms": {binaries.platform_key(): {"url": archive.as_uri(), "sha256": sha}}, + } + } + monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path / "cache")) + with pytest.raises(ValueError, match="path traversal"): + binaries.ensure_binary("footool", registry=registry) + assert not (tmp_path / "escape.txt").exists() diff --git a/tests/test_gapfilling.py b/tests/test_gapfilling.py index b92e982..1922fea 100644 --- a/tests/test_gapfilling.py +++ b/tests/test_gapfilling.py @@ -37,6 +37,12 @@ def draft_and_template(): # --------------------------------------------------------------------------- # # Connectivity gap-fill # --------------------------------------------------------------------------- # +def test_non_positive_penalty_rejected(draft_and_template): + draft, template = draft_and_template + with pytest.raises(ValueError, match="penalty must be"): + connect_blocked_reactions(draft, template, penalty=0) + + def test_fill_gaps_connects_blocked_reaction(draft_and_template): draft, template = draft_and_template assert "r1" in cobra.flux_analysis.find_blocked_reactions(draft) # precondition diff --git a/tests/test_init.py b/tests/test_init.py index a61ee19..9bfbc10 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1,6 +1,7 @@ """Tests for the INIT MILP (init/init.py, Phase 4c).""" import cobra import pytest +from cobra.exceptions import OptimizationError from raven_python.init import InitResult, run_init @@ -40,6 +41,21 @@ def test_keeps_positive_drops_negative(model): assert "r3" not in kept +def test_infeasible_milp_raises_optimization_error(): + # A lone irreversible reaction that drains A (no producer) forced essential cannot + # satisfy steady state without excretion -> the solver is infeasible. raven aligns + # with cobra by raising cobra.exceptions.OptimizationError (not bare RuntimeError). + m = cobra.Model("infeasible") + A = _met("A_c") + m.add_metabolites([A]) + drain = cobra.Reaction("drain", lower_bound=0, upper_bound=1000) + drain.add_metabolites({A: -1}) + m.add_reactions([drain]) + with pytest.raises(OptimizationError): + run_init(m, {"drain": 1.0}, essential_rxns=["drain"], + prod_weight=0.0, allow_excretion=False) + + def test_negative_scores_emptied_when_no_reward(model): # All reactions negative and no production reward -> keep nothing (empty optimum). scores = {r.id: -1.0 for r in model.reactions} diff --git a/tests/test_io_yaml_ec_data.py b/tests/test_io_yaml_ec_data.py index 94c2ebb..3cc99ba 100644 --- a/tests/test_io_yaml_ec_data.py +++ b/tests/test_io_yaml_ec_data.py @@ -349,6 +349,16 @@ def test_load_dangling_enzyme_reference_raises(tmp_path): read_yaml_model(_write_yaml(doc, tmp_path / "m.yml")) +def test_load_all_zero_coupling_matrix_warns(tmp_path): + """ec-rxns present but none reference an enzyme -> the coupling matrix is + all-zero (likely malformed). Warn rather than build a silently empty model.""" + doc = _minimal_model_doc() + doc["ec-rxns"] = [{"id": "R1", "kcat": 1.0, "enzymes": {}}] + doc["ec-enzymes"] = [{"genes": "G1", "enzymes": "P1", "mw": 1.0}] + with pytest.warns(UserWarning, match="all-zero"): + read_yaml_model(_write_yaml(doc, tmp_path / "m.yml")) + + # --------------------------------------------------------------------------- # # In-memory model_from_yaml_data (no file I/O) # --------------------------------------------------------------------------- # diff --git a/tests/test_io_yaml_parity.py b/tests/test_io_yaml_parity.py index 2c89f58..32d8a6c 100644 --- a/tests/test_io_yaml_parity.py +++ b/tests/test_io_yaml_parity.py @@ -25,7 +25,6 @@ from raven_python.io import read_yaml_model, write_yaml_model - SAMPLE = { "metabolites": [ { @@ -349,9 +348,9 @@ def test_eccodes_round_trip_through_cobra_extras(src, tmp_path): pass1 = tmp_path / "via_rp.yml" write_yaml_model(model, pass1) via_cobra = cobra.io.load_yaml_model(str(pass1)) - # cobra exposes eccodes as an attribute (setattr fall-through); - # raven_python sourced it from notes, so .notes['eccodes'] should - # still be present on the reloaded model. + # cobra exposes eccodes as an attribute (setattr fall-through), proving + # the key written by write_yaml_model survives a cobra round-trip. + assert getattr(via_cobra.reactions.get_by_id("R1"), "eccodes", None) == "1.1.1.1" pass2 = tmp_path / "via_rp2.yml" # Promote cobra's setattr-eccodes back into notes for the writer # path. (Tests the documented integration: cobra preserves the YAML diff --git a/tests/test_utils_parse.py b/tests/test_utils_parse.py new file mode 100644 index 0000000..c6d2f74 --- /dev/null +++ b/tests/test_utils_parse.py @@ -0,0 +1,57 @@ +"""Tests for raven_python.utils.parse helpers.""" +from __future__ import annotations + +import cobra + +from raven_python.utils.parse import parse_name_comp, subsystem_to_str + + +def test_parse_name_comp_basic(): + assert parse_name_comp("ATP[c]") == ("ATP", "c") + assert parse_name_comp("ATP") == ("ATP", None) + assert parse_name_comp("weird[name][m]") == ("weird[name]", "m") + + +def test_subsystem_to_str_scalar_and_none(): + assert subsystem_to_str("glycolysis") == "glycolysis" + assert subsystem_to_str("") == "" + assert subsystem_to_str(None) == "" + + +def test_subsystem_to_str_joins_list_without_data_loss(): + # A multi-subsystem reaction keeps every part (unlike taking only the first). + assert subsystem_to_str(["glycolysis", "TCA cycle"]) == "glycolysis;TCA cycle" + # Empty parts are dropped; the rest survive. + assert subsystem_to_str(["", "x"]) == "x" + + +def test_subsystem_to_str_coerces_non_string_items(): + # Non-string items must not crash (the old excel.py ";".join did). + assert subsystem_to_str([1, "name"]) == "1;name" + + +def test_excel_export_handles_list_subsystem(tmp_path): + import pytest + + pytest.importorskip("openpyxl") + from openpyxl import load_workbook + + from raven_python.io.excel import export_to_excel + + model = cobra.Model("m") + a = cobra.Metabolite("a_c", compartment="c") + b = cobra.Metabolite("b_c", compartment="c") + model.add_metabolites([a, b]) + r = cobra.Reaction("R1", lower_bound=-1000, upper_bound=1000) + model.add_reactions([r]) + r.add_metabolites({a: -1, b: 1}) + r.subsystem = ["glycolysis", "TCA cycle"] # list, would crash old ";".join + + out = tmp_path / "m.xlsx" + export_to_excel(model, out) + wb = load_workbook(out) + ws = wb["RXNS"] + header = [c.value for c in next(ws.iter_rows(max_row=1))] + sub_col = header.index("SUBSYSTEM") + row1 = [c.value for c in list(ws.iter_rows(min_row=2, max_row=2))[0]] + assert row1[sub_col] == "glycolysis;TCA cycle"