Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/raven_python/binaries.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,21 @@ def _safe_extract_zip(zf: zipfile.ZipFile, dest_dir: Path) -> None:

``ZipFile.extractall`` has no path-traversal guard (unlike tarfile's
``filter="data"`` used in reconstruction/kegg/download.py), so a malicious or
corrupt archive could write outside the cache via absolute paths or ``..``.
SHA256 + HTTPS already make a hostile archive unlikely; this is defence in depth.
corrupt archive could write outside the cache via absolute paths, ``..``, or
symlink members. SHA256 + HTTPS already make a hostile archive unlikely; this
is defence in depth.
"""
dest = dest_dir.resolve()
for member in zf.namelist():
for info in zf.infolist():
member = info.filename
target = (dest_dir / member).resolve()
if target != dest and dest not in target.parents:
raise ValueError(f"unsafe path in archive (path traversal): {member!r}")
# Reject symlink members: extractall recreates them as real symlinks whose
# target the path check above never validates, so one could point outside
# dest_dir (or a later member be written through it).
if (info.external_attr >> 16) & 0o170000 == 0o120000: # stat.S_IFLNK
raise ValueError(f"unsafe symlink in archive: {member!r}")
zf.extractall(dest_dir)


Expand Down Expand Up @@ -138,7 +145,7 @@ def ensure_binary(executable: str, *, registry: dict | None = None) -> Path:
# that a later run might mistake for a finished one. Mirrors data.py.
part = archive.with_suffix(archive.suffix + ".part")
try:
with urlopen(entry["url"]) as resp, open(part, "wb") as out: # noqa: S310
with urlopen(entry["url"], timeout=60) as resp, open(part, "wb") as out: # noqa: S310
shutil.copyfileobj(resp, out)
digest = _sha256(part)
if digest != entry["sha256"]:
Expand Down
2 changes: 1 addition & 1 deletion src/raven_python/curation/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def _build_new_reactions(
if _has_value(row.get("grRules")):
rxn.gene_reaction_rule = str(row["grRules"])
if _has_value(row.get("subSystems")):
rxn.subsystem = str(row["subSystems"])
rxn.subsystem = subsystem_to_str(row["subSystems"])
if _has_value(row.get("eccodes")):
rxn.annotation["ec-code"] = str(row["eccodes"])
if _has_value(row.get("rxnNotes")):
Expand Down
2 changes: 1 addition & 1 deletion src/raven_python/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def ensure_data_file(

dest_dir.mkdir(parents=True, exist_ok=True)
tmp = dest.with_name(dest.name + ".part")
with urlopen(entry["url"]) as resp, open(tmp, "wb") as out: # noqa: S310 (trusted registry URLs)
with urlopen(entry["url"], timeout=60) as resp, open(tmp, "wb") as out: # noqa: S310 (trusted registry URLs)
shutil.copyfileobj(resp, out)
digest = _sha256(tmp)
if digest != entry["sha256"]:
Expand Down
2 changes: 1 addition & 1 deletion src/raven_python/io/ec_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,5 +388,5 @@ def _eccodes_to_yaml(eccodes: str):
a scalar string for one EC, a list for multiple."""
parts = [p for p in eccodes.split(";") if p]
if len(parts) <= 1:
return eccodes
return ";".join(parts)
return parts
54 changes: 54 additions & 0 deletions tests/test_binaries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for raven_python.binaries (binary resolution + bundled-ZIP provisioning)."""
import hashlib
import shutil
import stat
import zipfile
from pathlib import Path

Expand Down Expand Up @@ -99,3 +100,56 @@ def test_ensure_binary_rejects_path_traversal_zip(tmp_path, monkeypatch):
with pytest.raises(ValueError, match="path traversal"):
binaries.ensure_binary("footool", registry=registry)
assert not (tmp_path / "escape.txt").exists()


def test_ensure_binary_rejects_symlink_zip(tmp_path, monkeypatch):
# A ZIP member flagged as a symlink must be refused: extractall would
# recreate it as a real symlink whose target the path guard never validates,
# so it could point outside the cache dir.
archive = tmp_path / "evil.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr("footool", "ok")
link = zipfile.ZipInfo("escape")
link.external_attr = (stat.S_IFLNK | 0o777) << 16
zf.writestr(link, "/etc/passwd")
sha = hashlib.sha256(archive.read_bytes()).hexdigest()
registry = {
"footool": {
"version": "1.0",
"provides": ["footool"],
"platforms": {binaries.platform_key(): {"url": archive.as_uri(), "sha256": sha}},
}
}
monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path / "cache"))
with pytest.raises(ValueError, match="symlink"):
binaries.ensure_binary("footool", registry=registry)


def test_ensure_binary_passes_download_timeout(tmp_path, monkeypatch):
# The download must use a socket timeout so a stalled server can't hang the
# process forever. Spy on urlopen to confirm a positive timeout is forwarded.
exe = tmp_path / "footool"
exe.write_text("#!/bin/sh\necho hi\n")
archive = tmp_path / "footool.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.write(exe, "footool")
sha = hashlib.sha256(archive.read_bytes()).hexdigest()
registry = {
"footool": {
"version": "1.0",
"provides": ["footool"],
"platforms": {binaries.platform_key(): {"url": archive.as_uri(), "sha256": sha}},
}
}
monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path / "cache"))

seen = {}
real_urlopen = binaries.urlopen

def spy(url, *args, **kwargs):
seen["timeout"] = kwargs.get("timeout")
return real_urlopen(url, *args, **kwargs)

monkeypatch.setattr(binaries, "urlopen", spy)
binaries.ensure_binary("footool", registry=registry)
assert isinstance(seen["timeout"], (int, float)) and seen["timeout"] > 0
23 changes: 23 additions & 0 deletions tests/test_curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ def test_add_new_reaction_with_existing_mets():
assert new.notes.get("rxnConfidenceScores") == "3"


def test_add_new_reaction_joins_list_subsystems():
# RAVEN allows a reaction in several subsystems (a list). The new-reaction
# path must ``;``-join it like the update path does, not emit a str(list) repr.
m = _base_model()
rxns_df = pd.DataFrame([
{"rxnNames": "ADP phosphorylation", "grRules": "YBR456W",
"lb": -1000, "ub": 1000, "rev": 1,
"subSystems": ["glycolysis", "energy"],
"eccodes": "2.7.4.6", "rxnNotes": "", "rxnReferences": "",
"rxnConfidenceScores": 3, "kegg.reaction": "R00187"},
])
coeffs_df = pd.DataFrame([
{"rxnNames": "ADP phosphorylation", "metNames": "ADP", "comps": "c",
"coefficient": -1.0},
{"rxnNames": "ADP phosphorylation", "metNames": "ATP", "comps": "c",
"coefficient": 1.0},
])
result = batch_curate(m, rxns_df=rxns_df, rxns_coeffs_df=coeffs_df,
rxn_id_prefix="r_")
assert result.added_reactions == ["r_0002"]
assert m.reactions.get_by_id("r_0002").subsystem == "glycolysis;energy"


def test_update_existing_reaction_by_stoichiometry():
m = _base_model()
rxns_df = pd.DataFrame([
Expand Down
11 changes: 11 additions & 0 deletions tests/test_io_yaml_ec_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from scipy import sparse

from raven_python.io import EcData, read_yaml_model, write_yaml_model
from raven_python.io.ec_data import _eccodes_to_yaml
from raven_python.io.yaml import model_from_yaml_data

# --------------------------------------------------------------------------- #
Expand Down Expand Up @@ -133,6 +134,16 @@ def test_load_eccodes_scalar_or_list_both_round_trip(tmp_path):
assert m2.ec.eccodes == ["1.1.1.1;1.1.99.40"]


def test_eccodes_to_yaml_strips_trailing_separator():
# The write-path helper must clean the internal `;`-joined form: a single EC
# round-trips to a bare scalar, and stray separators never leak into the YAML.
assert _eccodes_to_yaml("1.1.1.1") == "1.1.1.1"
assert _eccodes_to_yaml("1.1.1.1;") == "1.1.1.1"
assert _eccodes_to_yaml(";") == ""
assert _eccodes_to_yaml("") == ""
assert _eccodes_to_yaml("1.1.1.1;1.1.99.40") == ["1.1.1.1", "1.1.99.40"]


# --------------------------------------------------------------------------- #
# Save: model.ec serialised back to top-level sections
# --------------------------------------------------------------------------- #
Expand Down
Loading