From 2977c01bceb68e6f59d777ab5c51578fa12fd065 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 20 Feb 2026 11:08:42 +0100 Subject: [PATCH 01/34] Refactor RMSD analyses into MDAnalysis AnaysisBase classes --- src/openfe_analysis/rmsd.py | 234 ++++++++++++++++++++++-------------- 1 file changed, 144 insertions(+), 90 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index b566ba1..808339c 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -5,11 +5,9 @@ import MDAnalysis as mda import netCDF4 as nc import numpy as np -import tqdm from MDAnalysis.analysis import rms -from MDAnalysis.lib.mdamath import make_whole +from MDAnalysis.analysis.base import AnalysisBase from MDAnalysis.transformations import unwrap -from numpy import typing as npt from .reader import FEReader from .transformations import Aligner, ClosestImageShift, NoJump @@ -99,6 +97,139 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers return u +class ProteinRMSD(AnalysisBase): + """Calculate the 1D protein RMSD""" + + def __init__(self, atomgroup, **kwargs): + super(ProteinRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) + + self._ag = atomgroup + + def _prepare(self): + """ + Set the first frame as the reference + """ + self.results.rmsd = [] + self._reference = self._ag.positions + + def _single_frame(self): + rmsd = rms.rmsd( + self._ag.positions, + self._reference, + center=False, + superposition=False, + ) + self.results.rmsd.append(rmsd) + + def _conclude(self): + self.results.rmsd = np.asarray(self.results.rmsd) + + +class Protein2DRMSD(AnalysisBase): + """ + Flattened 2D RMSD matrix + + For all unique frame pairs ``(i, j)`` with ``i < j``, this function + computes the RMSD between atomic coordinates after optimal alignment. + """ + + def __init__(self, atomgroup, weights=None, **kwargs): + """ + Parameters + ---------- + atomgroup: AtomGroup + Protein atoms (e.g. CA selection) + weights: np.ndarray, optional + Per-atom weights to use in the RMSD calculation. If ``None``, + all atoms are weighted equally. + """ + super(Protein2DRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) + + self._weights = weights + self._ag = atomgroup + + def _prepare(self): + self._coords = [] + self.results.rmsd2d = [] + + def _single_frame(self): + self._coords.append(self._ag.positions) + + def _conclude(self): + positions = np.asarray(self._coords) + nframes, _, _ = positions.shape + + output = [] + for i, j in itertools.combinations(range(nframes), 2): + posi, posj = positions[i], positions[j] + rmsd = rms.rmsd( + posi, + posj, + self._weights, + center=True, + superposition=True, + ) + output.append(rmsd) + + self.results.rmsd2d = np.asarray(output) + + +class LigandRMSD(AnalysisBase): + """ + 1D RMSD time series for a ligand AtomGroup. + """ + + def __init__(self, atomgroup, **kwargs): + super(LigandRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) + + self._ag = atomgroup + + def _prepare(self): + self.results.rmsd = [] + self._reference = self._ag.positions + self._weights = self._ag.masses / np.mean(self._ag.masses) + + def _single_frame(self): + rmsd = rms.rmsd( + self._ag.positions, + self._reference, + self._weights, + center=False, + superposition=False, + ) + self.results.rmsd.append(rmsd) + + def _conclude(self): + self.results.rmsd = np.asarray(self.results.rmsd) + + +class LigandCOMDrift(AnalysisBase): + """ + Ligand center-of-mass displacement from initial position. + """ + + def __init__(self, atomgroup, **kwargs): + super(LigandCOMDrift, self).__init__(atomgroup.universe.trajectory, **kwargs) + + self._ag = atomgroup + + def _prepare(self): + self.results.com_drift = [] + self._initial_com = self._ag.center_of_mass() + + def _single_frame(self): + # distance between start and current ligand position + # ignores PBC, but we've already centered the traj + drift = mda.lib.distances.calc_bonds( + self._ag.center_of_mass(), + self._initial_com, + ) + self.results.com_drift.append(drift) + + def _conclude(self): + self.results.com_drift = np.asarray(self.results.com_drift) + + def gather_rms_data( pdb_topology: pathlib.Path, dataset: pathlib.Path, skip: Optional[int] = None ) -> dict[str, list[float]]: @@ -159,8 +290,6 @@ def gather_rms_data( # max against 1 to avoid skip=0 case skip = max(n_frames // 500, 1) - pb = tqdm.tqdm(total=int(n_frames / skip) * n_lambda) - u_top = mda.Universe(pdb_topology) for i in range(n_lambda): @@ -171,93 +300,18 @@ def gather_rms_data( prot = u.select_atoms("protein and name CA") ligand = u.select_atoms("resname UNK") - # save coordinates for 2D RMSD matrix - # TODO: Some smart guard to avoid allocating a silly amount of memory? - prot2d = np.empty((len(u.trajectory[::skip]), len(prot), 3), dtype=np.float32) - - prot_start = prot.positions - ligand_start = ligand.positions - ligand_initial_com = ligand.center_of_mass() - ligand_weights = ligand.masses / np.mean(ligand.masses) - - this_protein_rmsd = [] - this_ligand_rmsd = [] - this_ligand_wander = [] - - for ts_i, ts in enumerate(u.trajectory[::skip]): - pb.update() - - if prot: - prot2d[ts_i, :, :] = prot.positions - this_protein_rmsd.append( - rms.rmsd( - prot.positions, - prot_start, - None, # prot_weights, - center=False, - superposition=False, - ) - ) - if ligand: - this_ligand_rmsd.append( - rms.rmsd( - ligand.positions, - ligand_start, - ligand_weights, - center=False, - superposition=False, - ) - ) - this_ligand_wander.append( - # distance between start and current ligand position - # ignores PBC, but we've already centered the traj - mda.lib.distances.calc_bonds(ligand.center_of_mass(), ligand_initial_com) - ) - if prot: - # can ignore weights here as it's all Ca - rmsd2d = twoD_RMSD(prot2d, w=None) # prot_weights) - output["protein_RMSD"].append(this_protein_rmsd) - output["protein_2D_RMSD"].append(rmsd2d) - if ligand: - output["ligand_RMSD"].append(this_ligand_rmsd) - output["ligand_wander"].append(this_ligand_wander) - - output["time(ps)"] = list(np.arange(len(u.trajectory))[::skip] * u.trajectory.dt) - - return output - - -def twoD_RMSD(positions, w: Optional[npt.NDArray]) -> list[float]: - """ - Compute a flattened 2D RMSD matrix from a trajectory. - - For all unique frame pairs ``(i, j)`` with ``i < j``, this function - computes the RMSD between atomic coordinates after optimal alignment. - - Parameters - ---------- - positions : np.ndarray - Atomic coordinates for all frames in the trajectory. - w : np.ndarray, optional - Per-atom weights to use in the RMSD calculation. If ``None``, - all atoms are weighted equally. + prot_rmsd = ProteinRMSD(prot).run(step=skip) + output["protein_RMSD"].append(prot_rmsd.results.rmsd) + prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) + output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) - Returns - ------- - list of float - Flattened list of RMSD values corresponding to all frame pairs - ``(i, j)`` with ``i < j``. - """ - nframes, _, _ = positions.shape - - output = [] - - for i, j in itertools.combinations(range(nframes), 2): - posi, posj = positions[i], positions[j] - - rmsd = rms.rmsd(posi, posj, w, center=True, superposition=True) + if ligand: + lig_rmsd = LigandRMSD(ligand).run(step=skip) + output["ligand_RMSD"].append(lig_rmsd.results.rmsd) + lig_com_drift = LigandCOMDrift(ligand).run(step=skip) + output["ligand_wander"].append(lig_com_drift.results.com_drift) - output.append(rmsd) + output["time(ps)"] = np.arange(len(u.trajectory))[::skip] * u.trajectory.dt return output From 4a960a02e3b92beb8d7a448ca8ef87fd8dd115bd Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Mon, 23 Feb 2026 09:52:33 +0100 Subject: [PATCH 02/34] Combine RMSD analyses --- src/openfe_analysis/rmsd.py | 59 +++++++++++++++---------------------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 808339c..5591da4 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -97,34 +97,6 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers return u -class ProteinRMSD(AnalysisBase): - """Calculate the 1D protein RMSD""" - - def __init__(self, atomgroup, **kwargs): - super(ProteinRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) - - self._ag = atomgroup - - def _prepare(self): - """ - Set the first frame as the reference - """ - self.results.rmsd = [] - self._reference = self._ag.positions - - def _single_frame(self): - rmsd = rms.rmsd( - self._ag.positions, - self._reference, - center=False, - superposition=False, - ) - self.results.rmsd.append(rmsd) - - def _conclude(self): - self.results.rmsd = np.asarray(self.results.rmsd) - - class Protein2DRMSD(AnalysisBase): """ Flattened 2D RMSD matrix @@ -174,20 +146,32 @@ def _conclude(self): self.results.rmsd2d = np.asarray(output) -class LigandRMSD(AnalysisBase): +class RMSDAnalysis(AnalysisBase): """ - 1D RMSD time series for a ligand AtomGroup. + 1D RMSD time series for an AtomGroup. + + Parameters + ---------- + atomgroup : MDAnalysis.AtomGroup + Atoms to compute RMSD for. + mass_weighted : bool, optional + If True, compute mass-weighted RMSD. """ - def __init__(self, atomgroup, **kwargs): - super(LigandRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) + def __init__(self, atomgroup, mass_weighted=False, **kwargs): + super(RMSDAnalysis, self).__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup + self._mass_weighted = mass_weighted def _prepare(self): self.results.rmsd = [] self._reference = self._ag.positions - self._weights = self._ag.masses / np.mean(self._ag.masses) + + if self._mass_weighted: + self._weights = self._ag.masses / np.mean(self._ag.masses) + else: + self._weights = None def _single_frame(self): rmsd = rms.rmsd( @@ -301,14 +285,19 @@ def gather_rms_data( ligand = u.select_atoms("resname UNK") if prot: - prot_rmsd = ProteinRMSD(prot).run(step=skip) + prot_rmsd = RMSDAnalysis(prot).run(step=skip) output["protein_RMSD"].append(prot_rmsd.results.rmsd) + # prot_rmsd = rms.RMSD(prot).run(step=skip) + # output["protein_RMSD"].append(prot_rmsd.results.rmsd.T[2]) prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) if ligand: - lig_rmsd = LigandRMSD(ligand).run(step=skip) + lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) + # weight = ligand.masses / np.mean(ligand.masses) + # lig_rmsd = rms.RMSD(ligand, weights=weight).run(step=skip) + # output["ligand_RMSD"].append(lig_rmsd.results.rmsd.T[2]) lig_com_drift = LigandCOMDrift(ligand).run(step=skip) output["ligand_wander"].append(lig_com_drift.results.com_drift) From 19ddaba1683d2d6739e48ea96568d2444f00c498 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 27 Feb 2026 13:12:01 +0100 Subject: [PATCH 03/34] First attempt at implementing a symmetry corrected RMSD --- environment.yml | 1 + src/openfe_analysis/rmsd.py | 72 ++++++++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index e48a81f..168e14b 100644 --- a/environment.yml +++ b/environment.yml @@ -16,3 +16,4 @@ dependencies: - pytest-cov - pytest-xdist - pytest-rerunfailures + - spyrmsd diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 5591da4..300c310 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -5,9 +5,11 @@ import MDAnalysis as mda import netCDF4 as nc import numpy as np +import spyrmsd.rmsd as srmsd from MDAnalysis.analysis import rms from MDAnalysis.analysis.base import AnalysisBase from MDAnalysis.transformations import unwrap +from rdkit.Chem import rdmolops from .reader import FEReader from .transformations import Aligner, ClosestImageShift, NoJump @@ -187,6 +189,69 @@ def _conclude(self): self.results.rmsd = np.asarray(self.results.rmsd) +class SymmetryCorrectedLigandRMSD(AnalysisBase): + """ + 1D RMSD time series for an AtomGroup. + + Parameters + ---------- + atomgroup : MDAnalysis.AtomGroup + Atoms to compute RMSD for. + mass_weighted : bool, optional + If True, compute mass-weighted RMSD. + """ + + def __init__(self, atomgroup, mass_weighted=False, **kwargs): + super(SymmetryCorrectedLigandRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) + + self._ag = atomgroup + self._mass_weighted = mass_weighted + self._isomorphisms = None + vdwradii = { + "Cl": 1.75, + "CL": 1.75, + "Br": 1.85, + "BR": 1.85, + "Na": 2.27, + "NA": 2.27, + } + atomgroup.guess_bonds(vdwradii) + self._mol = atomgroup.convert_to("RDKIT") + self._aprops = np.array([atom.GetAtomicNum() for atom in self._mol.GetAtoms()]) + self._am = rdmolops.GetAdjacencyMatrix(self._mol) + + def _prepare(self): + self.results.rmsd = [] + self._reference = self._ag.positions + self._ref_aprops = self._aprops + self._ref_am = self._am + + if self._mass_weighted: + self._weights = self._ag.masses / np.mean(self._ag.masses) + else: + self._weights = None + + def _single_frame(self): + coords = self._ag.positions.copy() + rmsd, isomorphisms, _ = srmsd._rmsd_isomorphic_core( + coords1=coords, + coords2=self._reference, + aprops1=self._aprops, + aprops2=self._ref_aprops, + am1=self._am, + am2=self._ref_am, + center=False, + minimize=False, + isomorphisms=self._isomorphisms, + ) + self.results.rmsd.append(rmsd) + if self._isomorphisms is None: + self._isomorphisms = isomorphisms + + def _conclude(self): + self.results.rmsd = np.asarray(self.results.rmsd) + + class LigandCOMDrift(AnalysisBase): """ Ligand center-of-mass displacement from initial position. @@ -280,9 +345,13 @@ def gather_rms_data( # cheeky, but we can read the PDB topology once and reuse per universe # this then only hits the PDB file once for all replicas u = make_Universe(u_top._topology, ds, state=i) + bfactor = 0.25 + state_atoms = np.array([atom.ix for atom in u.atoms if atom.bfactor in (bfactor, 0.5)]) + state = u.atoms[state_atoms] prot = u.select_atoms("protein and name CA") ligand = u.select_atoms("resname UNK") + state_lig = state.select_atoms("resname UNK") if prot: prot_rmsd = RMSDAnalysis(prot).run(step=skip) @@ -293,7 +362,8 @@ def gather_rms_data( output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) if ligand: - lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) + # lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) + lig_rmsd = SymmetryCorrectedLigandRMSD(state_lig, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) # weight = ligand.masses / np.mean(ligand.masses) # lig_rmsd = rms.RMSD(ligand, weights=weight).run(step=skip) From df017da2a32384213bf6ff0974622d74e43b28f3 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Mon, 2 Mar 2026 15:05:53 +0100 Subject: [PATCH 04/34] Add reference and superposition option --- src/openfe_analysis/rmsd.py | 41 +++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 5591da4..6576100 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -154,19 +154,26 @@ class RMSDAnalysis(AnalysisBase): ---------- atomgroup : MDAnalysis.AtomGroup Atoms to compute RMSD for. + reference: Optional[MDAnalysis.AtomGroup] + Reference AtomGroup. If None, the first mass_weighted : bool, optional If True, compute mass-weighted RMSD. """ - def __init__(self, atomgroup, mass_weighted=False, **kwargs): + def __init__( + self, atomgroup, reference=None, mass_weighted=False, superposition=False, **kwargs + ): super(RMSDAnalysis, self).__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup + self._reference = reference if reference is not None else self._ag self._mass_weighted = mass_weighted + self._superposition = superposition def _prepare(self): self.results.rmsd = [] - self._reference = self._ag.positions + + self._reference_pos = self._reference.positions if self._mass_weighted: self._weights = self._ag.masses / np.mean(self._ag.masses) @@ -176,10 +183,10 @@ def _prepare(self): def _single_frame(self): rmsd = rms.rmsd( self._ag.positions, - self._reference, + self._reference_pos, self._weights, center=False, - superposition=False, + superposition=self._superposition, ) self.results.rmsd.append(rmsd) @@ -287,17 +294,33 @@ def gather_rms_data( if prot: prot_rmsd = RMSDAnalysis(prot).run(step=skip) output["protein_RMSD"].append(prot_rmsd.results.rmsd) - # prot_rmsd = rms.RMSD(prot).run(step=skip) - # output["protein_RMSD"].append(prot_rmsd.results.rmsd.T[2]) + # # Using the MDAnalysis RMSD class instead + # gs = ["protein and name CA", "protein"] + # prot_rmsd = rms.RMSD( + # u, select="protein and name CA", groupselections=gs, weights="mass") + # prot_rmsd.run(step=skip) + # # The results contain: + # # - frame number + # # - time + # # - RMSD based on select (after superimposing) + # # - RMSD based on groupselections, one array per selection + # output["protein_RMSD"].append(prot_rmsd.results.rmsd.T[3]) prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) if ligand: lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) - # weight = ligand.masses / np.mean(ligand.masses) - # lig_rmsd = rms.RMSD(ligand, weights=weight).run(step=skip) - # output["ligand_RMSD"].append(lig_rmsd.results.rmsd.T[2]) + # # Using the MDAnalysis RMSD class instead + # groupselections = ["resname UNK"] + # lig_rmsd = rms.RMSD( + # u, + # select="protein and name CA", + # groupselections=groupselections, + # weights="mass", + # ) + # lig_rmsd.run(step=skip) + # output["ligand_RMSD"].append(lig_rmsd.results.rmsd.T[3]) lig_com_drift = LigandCOMDrift(ligand).run(step=skip) output["ligand_wander"].append(lig_com_drift.results.com_drift) From d824ab6e65a92a3722dd738958742a1643d08399 Mon Sep 17 00:00:00 2001 From: Hannah Baumann <43765638+hannahbaumann@users.noreply.github.com> Date: Mon, 2 Mar 2026 15:18:04 +0100 Subject: [PATCH 05/34] Apply suggestion from @hannahbaumann --- src/openfe_analysis/rmsd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 6576100..f653fcd 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -155,7 +155,7 @@ class RMSDAnalysis(AnalysisBase): atomgroup : MDAnalysis.AtomGroup Atoms to compute RMSD for. reference: Optional[MDAnalysis.AtomGroup] - Reference AtomGroup. If None, the first + Reference AtomGroup. If None, the first frame of the trajectory will be used. mass_weighted : bool, optional If True, compute mass-weighted RMSD. """ From e16b2e34cf2fe03758e7035aab1555045cdcccef Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Mon, 13 Apr 2026 10:59:10 +0200 Subject: [PATCH 06/34] Some changes --- src/openfe_analysis/rmsd.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index a0305f4..1c84e35 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -5,7 +5,7 @@ import MDAnalysis as mda import netCDF4 as nc import numpy as np -from MDAnalysis.analysis import rms +from MDAnalysis.analysis import diffusionmap, rms from MDAnalysis.analysis.base import AnalysisBase from MDAnalysis.transformations import unwrap @@ -310,6 +310,9 @@ def gather_rms_data( # output["protein_RMSD"].append(prot_rmsd.results.rmsd.T[3]) prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) + prot_rmsd2d = diffusionmap.DistanceMatrix(u, select="protein") + prot_rmsd2d.run(step=skip) + output["protein_2D_RMSD"].append(prot_rmsd2d.results.dist_matrix) if ligand: lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) From 616bf42b0f422258c36bf614dbaeb8b7f8a263d0 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Mon, 13 Apr 2026 11:41:49 +0200 Subject: [PATCH 07/34] Fix mda 2D analysis example --- src/openfe_analysis/rmsd.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 1c84e35..1e068fc 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -298,7 +298,7 @@ def gather_rms_data( prot_rmsd = RMSDAnalysis(prot).run(step=skip) output["protein_RMSD"].append(prot_rmsd.results.rmsd) # # Using the MDAnalysis RMSD class instead - # gs = ["protein and name CA", "protein"] + # gs = ["protein and name CA"] # prot_rmsd = rms.RMSD( # u, select="protein and name CA", groupselections=gs, weights="mass") # prot_rmsd.run(step=skip) @@ -308,11 +308,16 @@ def gather_rms_data( # # - RMSD based on select (after superimposing) # # - RMSD based on groupselections, one array per selection # output["protein_RMSD"].append(prot_rmsd.results.rmsd.T[3]) + prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) - prot_rmsd2d = diffusionmap.DistanceMatrix(u, select="protein") - prot_rmsd2d.run(step=skip) - output["protein_2D_RMSD"].append(prot_rmsd2d.results.dist_matrix) + # # Using the MDAnalysis DistanceMatrix class + # prot_rmsd2d = diffusionmap.DistanceMatrix(u, select="protein and name CA") + # prot_rmsd2d.run(step=skip) + # dist_mat = prot_rmsd2d.results.dist_matrix + # i, j = np.triu_indices_from(dist_mat, k=1) + # flattened = dist_mat[i, j] + # output["protein_2D_RMSD"].append(flattened) if ligand: lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) From 90ed39ef7d000405e09a7f83decd96b92430f7fc Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Mon, 13 Apr 2026 13:40:32 +0200 Subject: [PATCH 08/34] Add RMSD test using mda test data --- environment.yml | 1 + .../tests/test_rmsd_mda_data.py | 80 +++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 src/openfe_analysis/tests/test_rmsd_mda_data.py diff --git a/environment.yml b/environment.yml index e48a81f..f65c4df 100644 --- a/environment.yml +++ b/environment.yml @@ -4,6 +4,7 @@ channels: dependencies: - click - MDAnalysis + - MDAnalysisTests - netCDF4 - openff-units - pip diff --git a/src/openfe_analysis/tests/test_rmsd_mda_data.py b/src/openfe_analysis/tests/test_rmsd_mda_data.py new file mode 100644 index 0000000..83f95ef --- /dev/null +++ b/src/openfe_analysis/tests/test_rmsd_mda_data.py @@ -0,0 +1,80 @@ +import MDAnalysis as mda +import pytest +from MDAnalysisTests.datafiles import DCD, PSF +from numpy.testing import assert_allclose, assert_almost_equal + +from openfe_analysis.rmsd import RMSDAnalysis + + +@pytest.fixture +def mda_universe(): + return mda.Universe(PSF, DCD) + + +@pytest.fixture() +def correct_values(): + return [0, 4.68953] + + +@pytest.fixture() +def correct_values_mass(): + return [0, 4.74920] + + +def test_rmsd(mda_universe, correct_values): + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True).run(step=49) + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values, + 4, + err_msg="error: rmsd profile should match" + "test values", + ) + + +def test_rmsd_frames(mda_universe, correct_values): + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True).run(frames=[0, 49]) + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values, + 4, + err_msg="error: rmsd profile should match" + "test values", + ) + + +def test_rmsd_single_frame(mda_universe): + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True).run(start=5, stop=6) + single_frame = [0.91544906] + assert_almost_equal( + prot_rmsd.results.rmsd, + single_frame, + 4, + err_msg="error: rmsd profile should match" + "test values", + ) + + +def test_mass_weighted(mda_universe, correct_values): + # mass weighting the CA should give the same answer as weighing + # equally because all CA have the same mass + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49) + + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values, + 4, + err_msg="error: rmsd profile should matchtest values", + ) + + +def test_custom_weighted(mda_universe, correct_values_mass): + prot = mda_universe.select_atoms("all") + prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49) + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values_mass, + 4, + err_msg="error: rmsd profile should matchtest values", + ) From 7d6e49fde33281964f76289e283088c40a0a52d6 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 19 May 2026 09:51:30 +0200 Subject: [PATCH 09/34] Some fixes --- src/openfe_analysis/rmsd.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 1e068fc..4b5fb2e 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -127,7 +127,7 @@ def _prepare(self): self.results.rmsd2d = [] def _single_frame(self): - self._coords.append(self._ag.positions) + self._coords.append(self._ag.positions.copy()) def _conclude(self): positions = np.asarray(self._coords) @@ -175,7 +175,7 @@ def __init__( def _prepare(self): self.results.rmsd = [] - self._reference_pos = self._reference.positions + self._reference_pos = self._reference.positions.copy() if self._mass_weighted: self._weights = self._ag.masses / np.mean(self._ag.masses) @@ -335,6 +335,6 @@ def gather_rms_data( lig_com_drift = LigandCOMDrift(ligand).run(step=skip) output["ligand_wander"].append(lig_com_drift.results.com_drift) - output["time(ps)"] = np.arange(len(u.trajectory))[::skip] * u.trajectory.dt + output["time(ps)"] = np.arange(len(u.trajectory))[::skip] * u.trajectory.dt return output From 60478a9dba44c190420c69631b108a6bf0d44f3e Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 19 May 2026 10:19:13 +0200 Subject: [PATCH 10/34] Some updates --- src/openfe_analysis/rmsd.py | 74 +++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 300c310..6098369 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -8,12 +8,17 @@ import spyrmsd.rmsd as srmsd from MDAnalysis.analysis import rms from MDAnalysis.analysis.base import AnalysisBase +from MDAnalysis.guesser.tables import vdwradii as MDA_VDWRADII from MDAnalysis.transformations import unwrap from rdkit.Chem import rdmolops from .reader import FEReader from .transformations import Aligner, ClosestImageShift, NoJump +# B-factor values used to identify atoms present at a given lambda state. +# 0.25 marks atoms unique to one end state, 0.5 marks atoms shared by both. +_BFACTOR_STATE_VALUES = (0.25, 0.5) + def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Universe: """ @@ -202,19 +207,20 @@ class SymmetryCorrectedLigandRMSD(AnalysisBase): """ def __init__(self, atomgroup, mass_weighted=False, **kwargs): - super(SymmetryCorrectedLigandRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) - + super().__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup self._mass_weighted = mass_weighted self._isomorphisms = None - vdwradii = { - "Cl": 1.75, - "CL": 1.75, - "Br": 1.85, - "BR": 1.85, - "Na": 2.27, - "NA": 2.27, - } + + vdwradii = dict(MDA_VDWRADII) + vdwradii.update( + { + "Cl": vdwradii["CL"], + "Br": vdwradii["BR"], + "Na": vdwradii["NA"], + } + ) + atomgroup.guess_bonds(vdwradii) self._mol = atomgroup.convert_to("RDKIT") self._aprops = np.array([atom.GetAtomicNum() for atom in self._mol.GetAtoms()]) @@ -222,9 +228,7 @@ def __init__(self, atomgroup, mass_weighted=False, **kwargs): def _prepare(self): self.results.rmsd = [] - self._reference = self._ag.positions - self._ref_aprops = self._aprops - self._ref_am = self._am + self._reference = self._ag.positions.copy() if self._mass_weighted: self._weights = self._ag.masses / np.mean(self._ag.masses) @@ -232,19 +236,18 @@ def _prepare(self): self._weights = None def _single_frame(self): - coords = self._ag.positions.copy() - rmsd, isomorphisms, _ = srmsd._rmsd_isomorphic_core( - coords1=coords, + frame_rmsd, isomorphisms, _ = srmsd._rmsd_isomorphic_core( + coords1=self._ag.positions.copy(), coords2=self._reference, aprops1=self._aprops, - aprops2=self._ref_aprops, + aprops2=self._aprops, am1=self._am, - am2=self._ref_am, + am2=self._am, center=False, minimize=False, isomorphisms=self._isomorphisms, ) - self.results.rmsd.append(rmsd) + self.results.rmsd.append(frame_rmsd) if self._isomorphisms is None: self._isomorphisms = isomorphisms @@ -279,6 +282,27 @@ def _conclude(self): self.results.com_drift = np.asarray(self.results.com_drift) +def _select_state_ligand(u: mda.Universe) -> mda.AtomGroup: + """ + Select ligand atoms that are present at the current lambda state. + + Atoms are identified by their b-factor values: ``0.25`` marks atoms + unique to one end state and ``0.5`` marks atoms shared by both end + states. Only atoms with these b-factor values and residue name "UNK" + are included. + + Parameters + ---------- + u : mda.Universe + + Returns + ------- + MDAnalysis.AtomGroup + """ + state_indices = np.array([atom.ix for atom in u.atoms if atom.bfactor in _BFACTOR_STATE_VALUES]) + return u.atoms[state_indices].select_atoms("resname UNK") + + def gather_rms_data( pdb_topology: pathlib.Path, dataset: pathlib.Path, skip: Optional[int] = None ) -> dict[str, list[float]]: @@ -341,17 +365,13 @@ def gather_rms_data( u_top = mda.Universe(pdb_topology) - for i in range(n_lambda): + for state_idx in range(n_lambda): # cheeky, but we can read the PDB topology once and reuse per universe # this then only hits the PDB file once for all replicas - u = make_Universe(u_top._topology, ds, state=i) - bfactor = 0.25 - state_atoms = np.array([atom.ix for atom in u.atoms if atom.bfactor in (bfactor, 0.5)]) - state = u.atoms[state_atoms] - + u = make_Universe(u_top._topology, ds, state=state_idx) prot = u.select_atoms("protein and name CA") ligand = u.select_atoms("resname UNK") - state_lig = state.select_atoms("resname UNK") + state_lig = _select_state_ligand(u) if prot: prot_rmsd = RMSDAnalysis(prot).run(step=skip) @@ -361,7 +381,7 @@ def gather_rms_data( prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) - if ligand: + if state_lig.n_atoms > 0: # lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) lig_rmsd = SymmetryCorrectedLigandRMSD(state_lig, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) From 4bf598f3901e0feccb1d1d4982cc14ede26e05cc Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 19 May 2026 10:26:24 +0200 Subject: [PATCH 11/34] Some more updates --- src/openfe_analysis/rmsd.py | 41 ++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 4b5fb2e..23c2680 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -117,8 +117,7 @@ def __init__(self, atomgroup, weights=None, **kwargs): Per-atom weights to use in the RMSD calculation. If ``None``, all atoms are weighted equally. """ - super(Protein2DRMSD, self).__init__(atomgroup.universe.trajectory, **kwargs) - + super().__init__(atomgroup.universe.trajectory, **kwargs) self._weights = weights self._ag = atomgroup @@ -131,19 +130,19 @@ def _single_frame(self): def _conclude(self): positions = np.asarray(self._coords) - nframes, _, _ = positions.shape + nframes = positions.shape[0] output = [] for i, j in itertools.combinations(range(nframes), 2): posi, posj = positions[i], positions[j] - rmsd = rms.rmsd( + pair_rmsd = rms.rmsd( posi, posj, self._weights, center=True, superposition=True, ) - output.append(rmsd) + output.append(pair_rmsd) self.results.rmsd2d = np.asarray(output) @@ -157,15 +156,24 @@ class RMSDAnalysis(AnalysisBase): atomgroup : MDAnalysis.AtomGroup Atoms to compute RMSD for. reference: Optional[MDAnalysis.AtomGroup] - Reference AtomGroup. If None, the first frame of the trajectory will be used. + Reference AtomGroup. If ``None``, the reference positions are captured + from the mobile AtomGroup at the start of the run (i.e. whatever frame + the trajectory is on when ``.run()`` is called). mass_weighted : bool, optional If True, compute mass-weighted RMSD. + superposition : bool, optional + If ``True``, perform rotational superposition before computing RMSD. """ def __init__( - self, atomgroup, reference=None, mass_weighted=False, superposition=False, **kwargs + self, + atomgroup, + reference=None, + mass_weighted=False, + superposition=False, + **kwargs, ): - super(RMSDAnalysis, self).__init__(atomgroup.universe.trajectory, **kwargs) + super().__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup self._reference = reference if reference is not None else self._ag @@ -183,14 +191,14 @@ def _prepare(self): self._weights = None def _single_frame(self): - rmsd = rms.rmsd( + frame_rmsd = rms.rmsd( self._ag.positions, self._reference_pos, self._weights, center=False, superposition=self._superposition, ) - self.results.rmsd.append(rmsd) + self.results.rmsd.append(frame_rmsd) def _conclude(self): self.results.rmsd = np.asarray(self.results.rmsd) @@ -202,8 +210,7 @@ class LigandCOMDrift(AnalysisBase): """ def __init__(self, atomgroup, **kwargs): - super(LigandCOMDrift, self).__init__(atomgroup.universe.trajectory, **kwargs) - + super().__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup def _prepare(self): @@ -224,7 +231,9 @@ def _conclude(self): def gather_rms_data( - pdb_topology: pathlib.Path, dataset: pathlib.Path, skip: Optional[int] = None + pdb_topology: pathlib.Path, + dataset: pathlib.Path, + skip: Optional[int] = None, ) -> dict[str, list[float]]: """ Compute structural RMSD-based metrics for a multistate BFE simulation. @@ -286,10 +295,10 @@ def gather_rms_data( u_top = mda.Universe(pdb_topology) - for i in range(n_lambda): + for state_idx in range(n_lambda): # cheeky, but we can read the PDB topology once and reuse per universe # this then only hits the PDB file once for all replicas - u = make_Universe(u_top._topology, ds, state=i) + u = make_Universe(u_top._topology, ds, state=state_idx) prot = u.select_atoms("protein and name CA") ligand = u.select_atoms("resname UNK") @@ -319,7 +328,7 @@ def gather_rms_data( # flattened = dist_mat[i, j] # output["protein_2D_RMSD"].append(flattened) - if ligand: + if ligand.n_atoms > 0: lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) # # Using the MDAnalysis RMSD class instead From c73260b35fd5702dc7bd6392611aef060dbf6b72 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 19 May 2026 11:50:09 +0200 Subject: [PATCH 12/34] Add tests symmetry RMSD --- src/openfe_analysis/tests/test_rmsd.py | 84 +++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/src/openfe_analysis/tests/test_rmsd.py b/src/openfe_analysis/tests/test_rmsd.py index a9e8b09..4055dc8 100644 --- a/src/openfe_analysis/tests/test_rmsd.py +++ b/src/openfe_analysis/tests/test_rmsd.py @@ -4,13 +4,21 @@ import netCDF4 as nc import numpy as np import pytest +import spyrmsd.rmsd as srmsd from MDAnalysis.analysis import rms from MDAnalysis.lib.mdamath import make_whole from MDAnalysis.transformations import unwrap from numpy.testing import assert_allclose +from rdkit.Chem import rdmolops from openfe_analysis.reader import FEReader -from openfe_analysis.rmsd import gather_rms_data, make_Universe +from openfe_analysis.rmsd import ( + RMSDAnalysis, + SymmetryCorrectedLigandRMSD, + _select_state_ligand, + gather_rms_data, + make_Universe, +) from openfe_analysis.transformations import Aligner @@ -177,3 +185,77 @@ def test_ligand_com_continuity(mda_universe): assert max(jumps) < 5.0 u.trajectory.close() + + +def test_symmetry_corrected_ligand_rmsd_nonnegative(mda_universe): + """RMSD values must be non-negative for all frames.""" + u = mda_universe + state_lig = _select_state_ligand(u) + + result = SymmetryCorrectedLigandRMSD(state_lig).run() + + assert np.all(result.results.rmsd >= 0.0) + + +def test_symmetry_corrected_ligand_rmsd_zero_for_valid_swap(): + """ + For a water-like symmetric molecule, swapping the two equivalent H atoms + gives naive RMSD > 0 but SymmetryCorrectedLigandRMSD = 0. + """ + # Build a minimal universe with two frames: reference and swapped + coords_ref = np.array( + [ + [0.0, 0.0, 0.0], # O + [1.0, 0.0, 0.0], # H1 + [0.0, 1.0, 0.0], # H2 + ] + ) + coords_swapped = np.array( + [ + [0.0, 0.0, 0.0], # O + [0.0, 1.0, 0.0], # H2 in H1's slot + [1.0, 0.0, 0.0], # H1 in H2's slot + ] + ) + + u = mda.Universe.empty(3, trajectory=True) + u.add_TopologyAttr("elements", ["O", "H", "H"]) + u.add_TopologyAttr("names", ["O", "H1", "H2"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new( + np.array([coords_ref, coords_swapped]), + order="fac", + ) + + ag = u.select_atoms("all") + + corrected = SymmetryCorrectedLigandRMSD(ag).run() + naive = RMSDAnalysis(ag).run() + + # Frame 0 is reference — both should be 0 + assert corrected.results.rmsd[0] == pytest.approx(0.0, abs=1e-5) + assert naive.results.rmsd[0] == pytest.approx(0.0, abs=1e-5) + + # Frame 1 is the swap — naive sees displacement, corrected sees zero + assert naive.results.rmsd[1] > 0.0 + assert corrected.results.rmsd[1] == pytest.approx(0.0, abs=1e-5) + + +def test_ligand_rmsd_mass_weighting_effect(simulation_skipped_nc, hybrid_system_skipped_pdb): + with nc.Dataset(simulation_skipped_nc) as ds: + u_top = mda.Universe(hybrid_system_skipped_pdb) + u = make_Universe(u_top._topology, ds, state=0) + ligand = u.select_atoms("resname UNK") + state_lig = _select_state_ligand(u) + + rmsd_full_mw = RMSDAnalysis(ligand, mass_weighted=True).run() + rmsd_full_no_mw = RMSDAnalysis(ligand, mass_weighted=False).run() + rmsd_state_mw = RMSDAnalysis(state_lig, mass_weighted=True).run() + rmsd_state_no_mw = RMSDAnalysis(state_lig, mass_weighted=False).run() + + print(f"Full ligand, mass weighted: {rmsd_full_mw.results.rmsd[:6]}") + print(f"Full ligand, no mass weighting: {rmsd_full_no_mw.results.rmsd[:6]}") + print(f"State ligand, mass weighted: {rmsd_state_mw.results.rmsd[:6]}") + print(f"State ligand, no mass weighting: {rmsd_state_no_mw.results.rmsd[:6]}") + print("Old expected: [0.0, 1.092039, 0.839234, 1.228383, 1.533331, 1.276798]") From 80b984d82f6a6e24cb43c7bd9dd569194e483de4 Mon Sep 17 00:00:00 2001 From: Hannah Baumann <43765638+hannahbaumann@users.noreply.github.com> Date: Fri, 22 May 2026 09:22:41 +0200 Subject: [PATCH 13/34] Update src/openfe_analysis/rmsd.py Co-authored-by: Irfan Alibay --- src/openfe_analysis/rmsd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 23c2680..7caf3db 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -328,7 +328,7 @@ def gather_rms_data( # flattened = dist_mat[i, j] # output["protein_2D_RMSD"].append(flattened) - if ligand.n_atoms > 0: + if ligand: lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) # # Using the MDAnalysis RMSD class instead From db61dc4ef7c03fa2eab4b54614d42b051c880af9 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 22 May 2026 11:02:40 +0200 Subject: [PATCH 14/34] Address review comments --- src/openfe_analysis/rmsd.py | 167 ++++++++++++++++++------------------ 1 file changed, 84 insertions(+), 83 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 7caf3db..eb98eec 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -52,7 +52,7 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers - Unwraps protein and ligand atom to be made whole - Shifts protein chains and the ligand to the image closest to the first protein chain (:class:`ClosestImageShift`) - - Aligns the entire system to minimise the protein RMSD (:class:`Aligner`) + - Aligns the entire system to minimize the protein RMSD (:class:`Aligner`) If only a ligand is present: @@ -87,7 +87,7 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers else: # if there's no protein # - make the ligand not jump periodic images between frames - # - align the ligand to minimise its RMSD + # - align the ligand to minimize its RMSD nope = NoJump(ligand) align = Aligner(ligand) @@ -105,46 +105,54 @@ class Protein2DRMSD(AnalysisBase): For all unique frame pairs ``(i, j)`` with ``i < j``, this function computes the RMSD between atomic coordinates after optimal alignment. + Alignment is performed by centering each frame on its center of geometry, + followed by rotational and translational superposition using the QCP method. + + Parameters + ---------- + atomgroup: mda.AtomGroup + Protein atoms (e.g. CA selection) + weights: np.ndarray, optional + Per-atom weights to use in the RMSD calculation. If ``None``, + all atoms are weighted equally. + + .. note:: + + All atom positions are accumulated in memory during the trajectory + iteration. For long trajectories or large systems this may result in + significant memory usage. Consider using the ``step`` argument to + ``run()`` to reduce the number of frames analyzed. """ - def __init__(self, atomgroup, weights=None, **kwargs): - """ - Parameters - ---------- - atomgroup: AtomGroup - Protein atoms (e.g. CA selection) - weights: np.ndarray, optional - Per-atom weights to use in the RMSD calculation. If ``None``, - all atoms are weighted equally. - """ + _analysis_algorithm_is_parallelizable = False + + def __init__(self, atomgroup: mda.AtomGroup, weights: Optional[np.ndarray] = None, **kwargs): super().__init__(atomgroup.universe.trajectory, **kwargs) self._weights = weights self._ag = atomgroup - def _prepare(self): - self._coords = [] - self.results.rmsd2d = [] + def _prepare(self) -> None: + self._coords = np.zeros((self.n_frames, self._ag.n_atoms, 3), dtype=np.float64) - def _single_frame(self): - self._coords.append(self._ag.positions.copy()) + def _single_frame(self) -> None: + self._coords[self._frame_index] = self._ag.positions - def _conclude(self): - positions = np.asarray(self._coords) - nframes = positions.shape[0] + def _conclude(self) -> None: + nframes = self._coords.shape[0] - output = [] - for i, j in itertools.combinations(range(nframes), 2): - posi, posj = positions[i], positions[j] - pair_rmsd = rms.rmsd( + # Pre-allocate numpy arrays + n_pairs = nframes * (nframes - 1) // 2 + self.results.rmsd2d = np.empty(n_pairs) + + for idx, (i, j) in enumerate(itertools.combinations(range(nframes), 2)): + posi, posj = self._coords[i], self._coords[j] + self.results.rmsd2d[idx] = rms.rmsd( posi, posj, self._weights, center=True, superposition=True, ) - output.append(pair_rmsd) - - self.results.rmsd2d = np.asarray(output) class RMSDAnalysis(AnalysisBase): @@ -156,21 +164,29 @@ class RMSDAnalysis(AnalysisBase): atomgroup : MDAnalysis.AtomGroup Atoms to compute RMSD for. reference: Optional[MDAnalysis.AtomGroup] - Reference AtomGroup. If ``None``, the reference positions are captured - from the mobile AtomGroup at the start of the run (i.e. whatever frame - the trajectory is on when ``.run()`` is called). + Reference AtomGroup. If ``None``, the reference positions are taken + from the first analyzed frame, so ``run(start=10)`` measures RMSD + relative to frame 10, not frame 0. mass_weighted : bool, optional If True, compute mass-weighted RMSD. + center : bool, optional + If ``True``, subtract the center of geometry before computing RMSD. + Defaults to ``False`` as the trajectory is assumed to be pre-centered. superposition : bool, optional If ``True``, perform rotational superposition before computing RMSD. + Defaults to ``False`` as the trajectory is assumed to be pre-superposed. + """ + _analysis_algorithm_is_parallelizable = False + def __init__( self, - atomgroup, - reference=None, - mass_weighted=False, - superposition=False, + atomgroup: mda.AtomGroup, + reference: Optional[mda.AtomGroup] = None, + mass_weighted: bool = False, + center: bool = False, + superposition: bool = False, **kwargs, ): super().__init__(atomgroup.universe.trajectory, **kwargs) @@ -178,11 +194,12 @@ def __init__( self._ag = atomgroup self._reference = reference if reference is not None else self._ag self._mass_weighted = mass_weighted + self._center = center self._superposition = superposition - def _prepare(self): - self.results.rmsd = [] - + def _prepare(self) -> None: + self.results.rmsd = np.zeros(self.n_frames, dtype=np.float64) + # reference is taken from the first analyzed frame, not necessarily frame 0 self._reference_pos = self._reference.positions.copy() if self._mass_weighted: @@ -190,44 +207,55 @@ def _prepare(self): else: self._weights = None - def _single_frame(self): - frame_rmsd = rms.rmsd( + def _single_frame(self) -> None: + self.results.rmsd[self._frame_index] = rms.rmsd( self._ag.positions, self._reference_pos, self._weights, - center=False, + center=self._center, superposition=self._superposition, ) - self.results.rmsd.append(frame_rmsd) - - def _conclude(self): - self.results.rmsd = np.asarray(self.results.rmsd) class LigandCOMDrift(AnalysisBase): """ Ligand center-of-mass displacement from initial position. + + Parameters + ---------- + atomgroup : mda.AtomGroup + Ligand atoms for which the center-of-mass drift is calculated. + + .. note:: + The initial position is taken from the first analyzed frame, so + ``run(start=10)`` measures drift relative to frame 10, not frame 0. + + .. note:: + PBC are not applied as the trajectory is assumed to have been + pre-processed, ensuring the ligand does not jump between periodic images. + Passing a box to apply the minimum image convention would give + incorrect results for ligands that have drifted more than half a box + length from their starting position. """ - def __init__(self, atomgroup, **kwargs): + _analysis_algorithm_is_parallelizable = False + + def __init__(self, atomgroup: mda.AtomGroup, **kwargs): super().__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup - def _prepare(self): - self.results.com_drift = [] + def _prepare(self) -> None: + self.results.com_drift = np.zeros(self.n_frames, dtype=np.float64) + # initial COM is taken from the first analyzed frame, not necessarily frame 0 self._initial_com = self._ag.center_of_mass() - def _single_frame(self): - # distance between start and current ligand position - # ignores PBC, but we've already centered the traj - drift = mda.lib.distances.calc_bonds( + def _single_frame(self) -> None: + # no box argument, assumes the ligand stays in a consistent image; + # applying the minimum image convention could mask large drifts > half a box length + self.results.com_drift[self._frame_index] = mda.lib.distances.calc_bonds( self._ag.center_of_mass(), self._initial_com, ) - self.results.com_drift.append(drift) - - def _conclude(self): - self.results.com_drift = np.asarray(self.results.com_drift) def gather_rms_data( @@ -306,41 +334,14 @@ def gather_rms_data( if prot: prot_rmsd = RMSDAnalysis(prot).run(step=skip) output["protein_RMSD"].append(prot_rmsd.results.rmsd) - # # Using the MDAnalysis RMSD class instead - # gs = ["protein and name CA"] - # prot_rmsd = rms.RMSD( - # u, select="protein and name CA", groupselections=gs, weights="mass") - # prot_rmsd.run(step=skip) - # # The results contain: - # # - frame number - # # - time - # # - RMSD based on select (after superimposing) - # # - RMSD based on groupselections, one array per selection - # output["protein_RMSD"].append(prot_rmsd.results.rmsd.T[3]) prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) - # # Using the MDAnalysis DistanceMatrix class - # prot_rmsd2d = diffusionmap.DistanceMatrix(u, select="protein and name CA") - # prot_rmsd2d.run(step=skip) - # dist_mat = prot_rmsd2d.results.dist_matrix - # i, j = np.triu_indices_from(dist_mat, k=1) - # flattened = dist_mat[i, j] - # output["protein_2D_RMSD"].append(flattened) if ligand: lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) - # # Using the MDAnalysis RMSD class instead - # groupselections = ["resname UNK"] - # lig_rmsd = rms.RMSD( - # u, - # select="protein and name CA", - # groupselections=groupselections, - # weights="mass", - # ) - # lig_rmsd.run(step=skip) - # output["ligand_RMSD"].append(lig_rmsd.results.rmsd.T[3]) + lig_com_drift = LigandCOMDrift(ligand).run(step=skip) output["ligand_wander"].append(lig_com_drift.results.com_drift) From f22d96faf1abe7c421401882e006c045b2da9743 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 22 May 2026 11:17:24 +0200 Subject: [PATCH 15/34] Fix docs build --- src/openfe_analysis/rmsd.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index eb98eec..3870bb1 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -116,12 +116,12 @@ class Protein2DRMSD(AnalysisBase): Per-atom weights to use in the RMSD calculation. If ``None``, all atoms are weighted equally. - .. note:: - - All atom positions are accumulated in memory during the trajectory - iteration. For long trajectories or large systems this may result in - significant memory usage. Consider using the ``step`` argument to - ``run()`` to reduce the number of frames analyzed. + Notes + ----- + All atom positions are accumulated in memory during the trajectory + iteration. For long trajectories or large systems this may result in + significant memory usage. Consider using the ``step`` argument to + ``run()`` to reduce the number of frames analyzed. """ _analysis_algorithm_is_parallelizable = False @@ -175,7 +175,6 @@ class RMSDAnalysis(AnalysisBase): superposition : bool, optional If ``True``, perform rotational superposition before computing RMSD. Defaults to ``False`` as the trajectory is assumed to be pre-superposed. - """ _analysis_algorithm_is_parallelizable = False @@ -226,16 +225,16 @@ class LigandCOMDrift(AnalysisBase): atomgroup : mda.AtomGroup Ligand atoms for which the center-of-mass drift is calculated. - .. note:: - The initial position is taken from the first analyzed frame, so - ``run(start=10)`` measures drift relative to frame 10, not frame 0. - - .. note:: - PBC are not applied as the trajectory is assumed to have been - pre-processed, ensuring the ligand does not jump between periodic images. - Passing a box to apply the minimum image convention would give - incorrect results for ligands that have drifted more than half a box - length from their starting position. + Notes + ----- + The initial position is taken from the first analyzed frame, so + ``run(start=10)`` measures drift relative to frame 10, not frame 0. + + PBC are not applied as the trajectory is assumed to have been + pre-processed, ensuring the ligand does not jump between periodic images. + Passing a box to apply the minimum image convention would give + incorrect results for ligands that have drifted more than half a box + length from their starting position. """ _analysis_algorithm_is_parallelizable = False From fdf4531af9378b479f9c058f265124290946919a Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 22 May 2026 12:10:16 +0200 Subject: [PATCH 16/34] Add MDAnalysisTests as test dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 4381d5b..c93e5c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ test = [ "pytest-xdist", "pytest-cov", "pooch", + "MDAnalysisTests", ] [project.urls] From a1022a249a6b3f1c73421525e7d995d05539ddda Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 22 May 2026 16:19:31 +0200 Subject: [PATCH 17/34] Add utils for state atom extraction and fixing elements in a state --- src/openfe_analysis/rmsd.py | 97 +++------- .../tests/utils/test_universe_utils.py | 172 ++++++++++++++++++ src/openfe_analysis/utils/universe_utils.py | 161 ++++++++++++++++ 3 files changed, 363 insertions(+), 67 deletions(-) create mode 100644 src/openfe_analysis/tests/utils/test_universe_utils.py create mode 100644 src/openfe_analysis/utils/universe_utils.py diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 5505d5c..e863ebb 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -8,16 +8,12 @@ import spyrmsd.rmsd as srmsd from MDAnalysis.analysis import rms from MDAnalysis.analysis.base import AnalysisBase -from MDAnalysis.guesser.tables import vdwradii as MDA_VDWRADII from MDAnalysis.transformations import unwrap -from rdkit.Chem import rdmolops +from rdkit import Chem from .reader import FEReader from .transformations import Aligner, ClosestImageShift, NoJump - -# B-factor values used to identify atoms present at a given lambda state. -# 0.25 marks atoms unique to one end state, 0.5 marks atoms shared by both. -_BFACTOR_STATE_VALUES = (0.25, 0.5) +from .utils.universe_utils import guess_ligand_bonds, select_state_atoms def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Universe: @@ -225,46 +221,42 @@ def _single_frame(self) -> None: class SymmetryCorrectedLigandRMSD(AnalysisBase): """ - 1D RMSD time series for an AtomGroup. + Symmetry-corrected 1D RMSD time series for a ligand AtomGroup. Parameters ---------- - atomgroup : MDAnalysis.AtomGroup - Atoms to compute RMSD for. - mass_weighted : bool, optional - If True, compute mass-weighted RMSD. + atomgroup : mda.AtomGroup + Ligand atoms to compute RMSD for. If ``rdmol`` is not provided, + bonds must be guessed on the atomgroup before instantiating this + class; use :func:`guess_ligand_bonds` for this purpose. + rdmol : Chem.Mol, optional + RDKit molecule corresponding to ``atomgroup``. If provided, it is + used directly and ``guess_ligand_bonds`` does not need to be called. + If ``None``, the RDKit molecule is derived from ``atomgroup`` via + ``convert_to("RDKIT")``. """ - def __init__(self, atomgroup, mass_weighted=False, **kwargs): + _analysis_algorithm_is_parallelizable = False + + def __init__( + self, + atomgroup: mda.AtomGroup, + rdmol: Optional[Chem.Mol] = None, + **kwargs, + ): super().__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup - self._mass_weighted = mass_weighted - self._isomorphisms = None - - vdwradii = dict(MDA_VDWRADII) - vdwradii.update( - { - "Cl": vdwradii["CL"], - "Br": vdwradii["BR"], - "Na": vdwradii["NA"], - } - ) - - atomgroup.guess_bonds(vdwradii) - self._mol = atomgroup.convert_to("RDKIT") + self._mol = rdmol if rdmol is not None else atomgroup.convert_to("RDKIT") self._aprops = np.array([atom.GetAtomicNum() for atom in self._mol.GetAtoms()]) - self._am = rdmolops.GetAdjacencyMatrix(self._mol) + self._am = Chem.rdmolops.GetAdjacencyMatrix(self._mol) def _prepare(self): - self.results.rmsd = [] + self.results.rmsd = np.zeros(self.n_frames, dtype=np.float64) + # reference is taken from the first analyzed frame, not necessarily frame 0 self._reference = self._ag.positions.copy() + self._isomorphisms: list | None = None - if self._mass_weighted: - self._weights = self._ag.masses / np.mean(self._ag.masses) - else: - self._weights = None - - def _single_frame(self): + def _single_frame(self) -> None: frame_rmsd, isomorphisms, _ = srmsd._rmsd_isomorphic_core( coords1=self._ag.positions.copy(), coords2=self._reference, @@ -276,13 +268,11 @@ def _single_frame(self): minimize=False, isomorphisms=self._isomorphisms, ) - self.results.rmsd.append(frame_rmsd) + self.results.rmsd[self._frame_index] = frame_rmsd + # cache isomorphisms after first frame to avoid redundant graph matching if self._isomorphisms is None: self._isomorphisms = isomorphisms - def _conclude(self): - self.results.rmsd = np.asarray(self.results.rmsd) - class LigandCOMDrift(AnalysisBase): """ @@ -325,27 +315,6 @@ def _single_frame(self) -> None: ) -def _select_state_ligand(u: mda.Universe) -> mda.AtomGroup: - """ - Select ligand atoms that are present at the current lambda state. - - Atoms are identified by their b-factor values: ``0.25`` marks atoms - unique to one end state and ``0.5`` marks atoms shared by both end - states. Only atoms with these b-factor values and residue name "UNK" - are included. - - Parameters - ---------- - u : mda.Universe - - Returns - ------- - MDAnalysis.AtomGroup - """ - state_indices = np.array([atom.ix for atom in u.atoms if atom.bfactor in _BFACTOR_STATE_VALUES]) - return u.atoms[state_indices].select_atoms("resname UNK") - - def gather_rms_data( pdb_topology: pathlib.Path, dataset: pathlib.Path, @@ -417,7 +386,7 @@ def gather_rms_data( u = make_Universe(u_top._topology, ds, state=state_idx) prot = u.select_atoms("protein and name CA") ligand = u.select_atoms("resname UNK") - state_lig = _select_state_ligand(u) + state_lig = select_state_atoms(u, end_state="A").select_atoms("resname UNK") if prot: prot_rmsd = RMSDAnalysis(prot).run(step=skip) @@ -425,16 +394,10 @@ def gather_rms_data( prot_rmsd2d = Protein2DRMSD(prot).run(step=skip) output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) - # # Using the MDAnalysis DistanceMatrix class - # prot_rmsd2d = diffusionmap.DistanceMatrix(u, select="protein and name CA") - # prot_rmsd2d.run(step=skip) - # dist_mat = prot_rmsd2d.results.dist_matrix - # i, j = np.triu_indices_from(dist_mat, k=1) - # flattened = dist_mat[i, j] - # output["protein_2D_RMSD"].append(flattened) if ligand: # lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) + guess_ligand_bonds(state_lig, delete_existing=True) lig_rmsd = SymmetryCorrectedLigandRMSD(state_lig, mass_weighted=True).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) diff --git a/src/openfe_analysis/tests/utils/test_universe_utils.py b/src/openfe_analysis/tests/utils/test_universe_utils.py new file mode 100644 index 0000000..a386a59 --- /dev/null +++ b/src/openfe_analysis/tests/utils/test_universe_utils.py @@ -0,0 +1,172 @@ +import MDAnalysis as mda +import numpy as np +import pytest +from rdkit import Chem + +from openfe_analysis.rmsd import make_Universe +from openfe_analysis.utils.universe_utils import ( + correct_elements, + guess_ligand_bonds, + select_state_atoms, +) + + +@pytest.fixture +def universe(hybrid_system_skipped_pdb, simulation_skipped_nc): + u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) + yield u + u.trajectory.close() + + +@pytest.fixture +def ligand_ag(universe): + return select_state_atoms(universe, end_state="A").select_atoms("resname UNK") + + +def test_guess_ligand_bonds_adds_bonds(ligand_ag): + """Bonds should be present on the atomgroup after guess_ligand_bonds.""" + original_count = len(ligand_ag.bonds) + # This also has stateB bond + assert original_count == 49 + guess_ligand_bonds(ligand_ag, delete_existing=True) + # Now only 48 stateA bonds + assert len(ligand_ag.bonds) == 48 + + +def test_guess_ligand_bonds_modifies_universe_inplace(ligand_ag): + """Bond topology should be reflected on the parent universe after guessing.""" + guess_ligand_bonds(ligand_ag) + universe_bonds = ligand_ag.universe.select_atoms("resname UNK").bonds + assert len(universe_bonds) > 0 + + +@pytest.mark.parametrize( + "end_state, expected_bfactors", + [ + ("A", (0.25, 0.5)), + ("B", (0.75, 0.5)), + ], +) +def test_select_state_atoms(universe, end_state, expected_bfactors): + """State selection should include state-unique and shared atoms.""" + state = select_state_atoms(universe, end_state=end_state) + assert len(state) > 0 + assert all(atom.bfactor in expected_bfactors for atom in state) + + +def test_select_state_atoms_invalid_state(universe): + """Invalid end_state should raise a ValueError.""" + with pytest.raises(ValueError, match="end_state must be 'A' or 'B'"): + select_state_atoms(universe, end_state="C") + + +def test_select_state_atoms_shared_atoms(universe): + """Shared atoms (bfactor 0.5) should appear in both state A and B selections.""" + state_a = select_state_atoms(universe, end_state="A") + state_b = select_state_atoms(universe, end_state="B") + shared_a = set(atom.ix for atom in state_a if atom.bfactor == 0.5) + shared_b = set(atom.ix for atom in state_b if atom.bfactor == 0.5) + assert shared_a == shared_b + + +def test_correct_elements_fixes_element(): + """correct_elements should update element where rdmol differs.""" + + # Build a minimal universe with a C atom + u = mda.Universe.empty(2, n_residues=1, trajectory=True) + u.add_TopologyAttr("elements", ["C", "C"]) # second atom is wrong + u.add_TopologyAttr("names", ["C1", "C2"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new( + np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), + order="fac", + ) + ag = u.select_atoms("all") + + mol = Chem.RWMol() + mol.AddAtom(Chem.Atom(6)) # C + mol.AddAtom(Chem.Atom(7)) # N + rdmol = mol.GetMol() + + with pytest.warns(UserWarning, match="No atom_mapping provided"): + correct_elements(ag, rdmol) + + assert ag[0].element == "C" + assert ag[1].element == "N" + assert ag[1].name == "N" + + +def test_correct_elements_no_change_when_correct(): + """correct_elements should not modify atoms that already have correct elements.""" + + u = mda.Universe.empty(2, n_residues=1, trajectory=True) + u.add_TopologyAttr("elements", ["C", "N"]) + u.add_TopologyAttr("names", ["C1", "N1"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new( + np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), + order="fac", + ) + ag = u.select_atoms("all") + + mol = Chem.RWMol() + mol.AddAtom(Chem.Atom(6)) # C + mol.AddAtom(Chem.Atom(7)) # N + rdmol = mol.GetMol() + + with pytest.warns(UserWarning, match="No atom_mapping provided"): + correct_elements(ag, rdmol) + + assert ag[0].element == "C" + assert ag[0].name == "C1" # name unchanged + assert ag[1].element == "N" + assert ag[1].name == "N1" # name unchanged + + +def test_correct_elements_with_atom_mapping(): + """correct_elements with atom_mapping should use mapping without warning.""" + + u = mda.Universe.empty(2, n_residues=1, trajectory=True) + u.add_TopologyAttr("elements", ["C", "C"]) # second atom is wrong + u.add_TopologyAttr("names", ["C1", "C2"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new( + np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), + order="fac", + ) + ag = u.select_atoms("all") + + # rdmol has atoms in reverse order: N, C + mol = Chem.RWMol() + mol.AddAtom(Chem.Atom(7)) # N at rdmol index 0 + mol.AddAtom(Chem.Atom(6)) # C at rdmol index 1 + rdmol = mol.GetMol() + + # explicitly map ag index 0 -> rdmol index 1 (C), ag index 1 -> rdmol index 0 (N) + correct_elements(ag, rdmol, atom_mapping={0: 1, 1: 0}) + + assert ag[0].element == "C" # mapped to rdmol index 1 (C) + assert ag[1].element == "N" # mapped to rdmol index 0 (N) + assert ag[1].name == "N" + + +def test_correct_elements_raises_size_error(): + """correct_elements should raise ValueError if atom counts don't match.""" + + u = mda.Universe.empty(2, n_residues=1, trajectory=True) + u.add_TopologyAttr("elements", ["C", "N"]) + u.add_TopologyAttr("names", ["C1", "N1"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new(np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), order="fac") + ag = u.select_atoms("all") + + mol = Chem.RWMol() + mol.AddAtom(Chem.Atom(6)) # only 1 atom + rdmol = mol.GetMol() + + with pytest.raises(ValueError, match="atomgroup has 2 atoms but rdmol has 1"): + correct_elements(ag, rdmol) diff --git a/src/openfe_analysis/utils/universe_utils.py b/src/openfe_analysis/utils/universe_utils.py new file mode 100644 index 0000000..7dbee44 --- /dev/null +++ b/src/openfe_analysis/utils/universe_utils.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +import warnings +from typing import Literal + +import MDAnalysis as mda +import numpy as np +from MDAnalysis.guesser.tables import vdwradii as MDA_VDWRADII +from rdkit import Chem + +# B-factor values used to identify atoms present at a given lambda state. +# 0.25 marks atoms unique to state A, 0.75 marks atoms unique to state B, +# and 0.5 marks atoms shared by both end states. +_BFACTOR_STATE_A = (0.25, 0.5) +_BFACTOR_STATE_B = (0.75, 0.5) + + +def select_state_atoms( + universe: mda.Universe, + end_state: Literal["A", "B"], +) -> mda.AtomGroup: + """ + Select all atoms present at a given end state. + + Atoms are identified by their b-factor values, following the OpenFE + PDB convention: + + - ``0.25`` — unique to state A + - ``0.75`` — unique to state B + - ``0.5`` — shared by both end states + + Parameters + ---------- + universe : mda.Universe + Universe containing the hybrid topology. + end_state : {"A", "B"} + The end state to select atoms for. + + Returns + ------- + mda.AtomGroup + All atoms present at the given end state. + + Raises + ------ + ValueError + If ``end_state`` is not ``"A"`` or ``"B"``. + + Examples + -------- + Select all state A atoms, then further filter to just the ligand:: + + state_a = select_state_atoms(universe, end_state="A") + ligand_a = state_a.select_atoms("resname UNK") + """ + if end_state == "A": + bfactor_values = _BFACTOR_STATE_A + elif end_state == "B": + bfactor_values = _BFACTOR_STATE_B + else: + raise ValueError(f"end_state must be 'A' or 'B', got '{end_state}'") + + state_indices = np.array([atom.ix for atom in universe.atoms if atom.bfactor in bfactor_values]) + return universe.atoms[state_indices] + + +def guess_ligand_bonds( + atomgroup: mda.AtomGroup, + delete_existing: bool = False, +) -> None: + """ + Guess bonds for a ligand AtomGroup in-place. + + Parameters + ---------- + atomgroup : mda.AtomGroup + Ligand atoms for which bonds will be guessed. + delete_existing : bool, optional + If ``True``, delete existing bonds on the atomgroup before guessing. + This ensures a clean re-guess from scratch, removing any + bonds (e.g. cross-state bonds in hybrid topologies). Default is + ``False``. + """ + if delete_existing: + atomgroup.universe.delete_bonds(atomgroup.bonds) + # MDA vdw radii use uppercase element symbols (e.g. "CL", "BR", "NA"), + # but RDKit uses mixed case; add aliases so bond guessing works correctly + vdwradii = dict(MDA_VDWRADII) + vdwradii.update( + { + "Cl": vdwradii["CL"], + "Br": vdwradii["BR"], + "Na": vdwradii["NA"], + } + ) + atomgroup.guess_bonds(vdwradii) + + +def correct_elements( + atomgroup: mda.AtomGroup, + rdmol: Chem.Mol, + atom_mapping: dict[int, int] | None = None, +) -> None: + """ + Correct element and atom name assignments in an AtomGroup in-place + using an RDKit molecule as the source of truth. + + This is particularly useful for hybrid topologies where mapped atoms + undergoing element changes carry state A's element types, even when + state B's ligand is selected. Correcting elements ensures accurate + bond guessing and subsequent analyses. + + Parameters + ---------- + atomgroup : mda.AtomGroup + Ligand atoms whose elements and names will be corrected. Modified + in-place. + rdmol : Chem.Mol + RDKit molecule providing the correct element and atom name + information. + atom_mapping : dict[int, int], optional + A mapping of ``{atomgroup_index: rdmol_index}`` defining the + correspondence between atoms in ``atomgroup`` and ``rdmol``. If + ``None``, atoms are matched by position — the i-th atom in + ``atomgroup`` corresponds to the i-th atom in ``rdmol``. A + warning is issued in this case since positional correspondence + is not guaranteed when the RDKit molecule comes from an external + source such as an SDF file. + + Raises + ------ + ValueError + If the number of atoms in ``atomgroup`` and ``rdmol`` do not match + and no ``atom_mapping`` is provided. + """ + periodic_table = Chem.GetPeriodicTable() + + if atom_mapping is not None: + for ag_idx, rd_idx in atom_mapping.items(): + mda_atom = atomgroup[ag_idx] + rd_atom = rdmol.GetAtomWithIdx(rd_idx) + element = periodic_table.GetElementSymbol(rd_atom.GetAtomicNum()) + if mda_atom.element != element: + mda_atom.element = element + mda_atom.name = rd_atom.GetSymbol() + else: + if len(atomgroup) != rdmol.GetNumAtoms(): + raise ValueError( + f"atomgroup has {len(atomgroup)} atoms but rdmol has {rdmol.GetNumAtoms()} atoms." + ) + warnings.warn( + "No atom_mapping provided to correct_elements — assuming positional " + "correspondence between atomgroup and rdmol. This may give incorrect " + "results if the atom ordering differs between the two.", + UserWarning, + ) + for mda_atom, rd_atom in zip(atomgroup, rdmol.GetAtoms()): + element = periodic_table.GetElementSymbol(rd_atom.GetAtomicNum()) + if mda_atom.element != element: + mda_atom.element = element + mda_atom.name = rd_atom.GetSymbol() From aae4531bb7106d2a4f2ff907f0a9d3d5e9ccda45 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 22 May 2026 16:28:13 +0200 Subject: [PATCH 18/34] small update --- src/openfe_analysis/utils/universe_utils.py | 60 ++++++++------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/src/openfe_analysis/utils/universe_utils.py b/src/openfe_analysis/utils/universe_utils.py index 7dbee44..2950091 100644 --- a/src/openfe_analysis/utils/universe_utils.py +++ b/src/openfe_analysis/utils/universe_utils.py @@ -9,8 +9,9 @@ from rdkit import Chem # B-factor values used to identify atoms present at a given lambda state. -# 0.25 marks atoms unique to state A, 0.75 marks atoms unique to state B, -# and 0.5 marks atoms shared by both end states. +# 0.25 : atoms unique to state A +# 0.75 : atoms unique to state B +# 0.5 : atoms shared by both end states. _BFACTOR_STATE_A = (0.25, 0.5) _BFACTOR_STATE_B = (0.75, 0.5) @@ -22,8 +23,7 @@ def select_state_atoms( """ Select all atoms present at a given end state. - Atoms are identified by their b-factor values, following the OpenFE - PDB convention: + Atoms are identified by their b-factor values: - ``0.25`` — unique to state A - ``0.75`` — unique to state B @@ -45,13 +45,6 @@ def select_state_atoms( ------ ValueError If ``end_state`` is not ``"A"`` or ``"B"``. - - Examples - -------- - Select all state A atoms, then further filter to just the ligand:: - - state_a = select_state_atoms(universe, end_state="A") - ligand_a = state_a.select_atoms("resname UNK") """ if end_state == "A": bfactor_values = _BFACTOR_STATE_A @@ -77,9 +70,8 @@ def guess_ligand_bonds( Ligand atoms for which bonds will be guessed. delete_existing : bool, optional If ``True``, delete existing bonds on the atomgroup before guessing. - This ensures a clean re-guess from scratch, removing any - bonds (e.g. cross-state bonds in hybrid topologies). Default is - ``False``. + This may be necessary to avoid cross-state bonds in hybrid topologies. + Default is ``False``. """ if delete_existing: atomgroup.universe.delete_bonds(atomgroup.bonds) @@ -102,39 +94,37 @@ def correct_elements( atom_mapping: dict[int, int] | None = None, ) -> None: """ - Correct element and atom name assignments in an AtomGroup in-place + Correct element and atom names in an AtomGroup in-place using an RDKit molecule as the source of truth. - This is particularly useful for hybrid topologies where mapped atoms - undergoing element changes carry state A's element types, even when - state B's ligand is selected. Correcting elements ensures accurate - bond guessing and subsequent analyses. + This is needed for hybrid topologies where mapped atoms that + undergo element changes carry state A's element types, even when + state B's ligand is selected. Parameters ---------- atomgroup : mda.AtomGroup - Ligand atoms whose elements and names will be corrected. Modified - in-place. + Ligand atoms whose elements and names will be corrected. rdmol : Chem.Mol - RDKit molecule providing the correct element and atom name - information. + RDKit molecule with the correct element and atom name information. atom_mapping : dict[int, int], optional A mapping of ``{atomgroup_index: rdmol_index}`` defining the correspondence between atoms in ``atomgroup`` and ``rdmol``. If - ``None``, atoms are matched by position — the i-th atom in - ``atomgroup`` corresponds to the i-th atom in ``rdmol``. A - warning is issued in this case since positional correspondence - is not guaranteed when the RDKit molecule comes from an external - source such as an SDF file. + ``None``, atoms are matched by position which gives wrong results if + the atom order was not the same. Raises ------ ValueError - If the number of atoms in ``atomgroup`` and ``rdmol`` do not match - and no ``atom_mapping`` is provided. + If the number of atoms in ``atomgroup`` and ``rdmol`` do not match. """ periodic_table = Chem.GetPeriodicTable() + if len(atomgroup) != rdmol.GetNumAtoms(): + raise ValueError( + f"atomgroup has {len(atomgroup)} atoms but rdmol has {rdmol.GetNumAtoms()} atoms." + ) + if atom_mapping is not None: for ag_idx, rd_idx in atom_mapping.items(): mda_atom = atomgroup[ag_idx] @@ -144,14 +134,10 @@ def correct_elements( mda_atom.element = element mda_atom.name = rd_atom.GetSymbol() else: - if len(atomgroup) != rdmol.GetNumAtoms(): - raise ValueError( - f"atomgroup has {len(atomgroup)} atoms but rdmol has {rdmol.GetNumAtoms()} atoms." - ) warnings.warn( - "No atom_mapping provided to correct_elements — assuming positional " - "correspondence between atomgroup and rdmol. This may give incorrect " - "results if the atom ordering differs between the two.", + "No atom_mapping provided to correct_elements. Assuming that " + "atom ordering is the same between atomgroup and rdmol. This may " + "give incorrect results if the atom ordering differs between the two.", UserWarning, ) for mda_atom, rd_atom in zip(atomgroup, rdmol.GetAtoms()): From 3c001a1e20a104dcb59beb515b7f3f5a9908b3c6 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 09:49:06 +0200 Subject: [PATCH 19/34] Add news entry --- news/refactor_analyses_base.rst | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 news/refactor_analyses_base.rst diff --git a/news/refactor_analyses_base.rst b/news/refactor_analyses_base.rst new file mode 100644 index 0000000..7bde8cf --- /dev/null +++ b/news/refactor_analyses_base.rst @@ -0,0 +1,23 @@ +**Added:** + +* + +**Changed:** + +* Refactored the structural analyses methods into MDAnalysis AnalysisBase classes. + +**Deprecated:** + +* + +**Removed:** + +* + +**Fixed:** + +* + +**Security:** + +* From 5e55ecb44a5f18baca4b9a3c7a244cb27526bbbb Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 10:15:27 +0200 Subject: [PATCH 20/34] Add spyrmsd --- docs/environment.yaml | 2 ++ environment.yml | 1 + pyproject.toml | 1 + 3 files changed, 4 insertions(+) diff --git a/docs/environment.yaml b/docs/environment.yaml index 9462526..17a03a1 100644 --- a/docs/environment.yaml +++ b/docs/environment.yaml @@ -7,3 +7,5 @@ dependencies: - python=3.12 - sphinx - furo +- spyrmsd +- rdkit diff --git a/environment.yml b/environment.yml index d7c1d44..2a60a2f 100644 --- a/environment.yml +++ b/environment.yml @@ -18,3 +18,4 @@ dependencies: - pytest-xdist - pytest-rerunfailures - spyrmsd + - rdkit diff --git a/pyproject.toml b/pyproject.toml index f1c7451..0d76399 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ 'netCDF4', 'openff-units', 'pyyaml', + 'spyrmsd', ] description="Trajectory analysis of free energy calculations." readme="README.md" From e4c5417210b1feae1917fab8bde4a0c55acaf76e Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 14:08:35 +0200 Subject: [PATCH 21/34] add more tests for classes --- src/openfe_analysis/rmsd.py | 8 +- src/openfe_analysis/tests/test_rmsd.py | 9 -- .../tests/test_rmsd_classes.py | 147 ++++++++++++++++++ .../tests/test_rmsd_mda_data.py | 80 ---------- 4 files changed, 152 insertions(+), 92 deletions(-) create mode 100644 src/openfe_analysis/tests/test_rmsd_classes.py delete mode 100644 src/openfe_analysis/tests/test_rmsd_mda_data.py diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 0b705d3..1050adf 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -227,8 +227,10 @@ class LigandCOMDrift(AnalysisBase): Notes ----- - The initial position is taken from the first analyzed frame, so - ``run(start=10)`` measures drift relative to frame 10, not frame 0. + The reference COM is taken from whatever frame the trajectory is on when ``.run()`` is + called, not necessarily the first analyzed frame. For consistent results, + ensure the trajectory is at frame 0 (or your desired reference frame) + before calling ``.run()``. PBC are not applied as the trajectory is assumed to have been pre-processed, ensuring the ligand does not jump between periodic images. @@ -245,7 +247,7 @@ def __init__(self, atomgroup: mda.AtomGroup, **kwargs): def _prepare(self) -> None: self.results.com_drift = np.zeros(self.n_frames, dtype=np.float64) - # initial COM is taken from the first analyzed frame, not necessarily frame 0 + # COM is captured from the current trajectory frame when .run() is called self._initial_com = self._ag.center_of_mass() def _single_frame(self) -> None: diff --git a/src/openfe_analysis/tests/test_rmsd.py b/src/openfe_analysis/tests/test_rmsd.py index a9e8b09..a78a153 100644 --- a/src/openfe_analysis/tests/test_rmsd.py +++ b/src/openfe_analysis/tests/test_rmsd.py @@ -16,12 +16,6 @@ @pytest.fixture def mda_universe(hybrid_system_skipped_pdb, simulation_skipped_nc): - """ - Safely create and destroy an MDAnalysis Universe. - - Guarantees: - - NetCDF file is opened exactly once - """ u = make_Universe( hybrid_system_skipped_pdb, simulation_skipped_nc, @@ -83,14 +77,12 @@ def test_gather_rms_data_regression_skippednc(simulation_skipped_nc, hybrid_syst rtol=1e-3, ) assert len(output["ligand_RMSD"]) == 11 - # TODO: RMSD is very large as the multichain fix is not in yet assert_allclose( output["ligand_RMSD"][0][:6], [0.0, 1.092039, 0.839234, 1.228383, 1.533331, 1.276798], rtol=1e-3, ) assert len(output["ligand_wander"]) == 11 - # TODO: very large as the multichain fix is not in yet assert_allclose( output["ligand_wander"][0][:6], [0.0, 0.908097, 0.674262, 0.971328, 0.909263, 1.101882], @@ -99,7 +91,6 @@ def test_gather_rms_data_regression_skippednc(simulation_skipped_nc, hybrid_syst assert len(output["protein_2D_RMSD"]) == 11 # 15 entries because 6 * 6 frames // 2 assert len(output["protein_2D_RMSD"][0]) == 1275 - # TODO: very large as the multichain fix is not in yet assert_allclose( output["protein_2D_RMSD"][0][:6], [1.089747, 1.006143, 1.045068, 1.476353, 1.332893, 1.110507], diff --git a/src/openfe_analysis/tests/test_rmsd_classes.py b/src/openfe_analysis/tests/test_rmsd_classes.py new file mode 100644 index 0000000..5312611 --- /dev/null +++ b/src/openfe_analysis/tests/test_rmsd_classes.py @@ -0,0 +1,147 @@ +from functools import partial + +import MDAnalysis as mda +import numpy as np +import pytest +from MDAnalysis.analysis import diffusionmap, rms +from MDAnalysisTests.datafiles import DCD, PSF +from numpy.testing import assert_allclose, assert_almost_equal + +from openfe_analysis.rmsd import LigandCOMDrift, Protein2DRMSD, RMSDAnalysis, make_Universe + + +@pytest.fixture +def mda_universe(): + return mda.Universe(PSF, DCD) + + +@pytest.fixture() +def correct_values(): + return [0, 4.68953] + + +@pytest.fixture() +def correct_values_mass(): + return [0, 4.74920] + + +class TestRMSDAnalysis: + def test_rmsd(self, mda_universe, correct_values): + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True).run(step=49) + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values, + 4, + err_msg="error: rmsd profile should match test values", + ) + + def test_rmsd_frames(self, mda_universe, correct_values): + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True).run(frames=[0, 49]) + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values, + 4, + err_msg="error: rmsd profile should match test values", + ) + + def test_rmsd_single_frame(self, mda_universe): + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True).run(start=5, stop=6) + assert_almost_equal( + prot_rmsd.results.rmsd, + [0.91544906], + 4, + err_msg="error: rmsd profile should match test values", + ) + + def test_mass_weighted(self, mda_universe, correct_values): + # mass weighting the CA should give the same answer as weighing + # equally because all CA have the same mass + prot = mda_universe.select_atoms("name CA") + prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49) + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values, + 4, + err_msg="error: rmsd profile should match test values", + ) + + def test_custom_weighted(self, mda_universe, correct_values_mass): + prot = mda_universe.select_atoms("all") + prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49) + assert_almost_equal( + prot_rmsd.results.rmsd, + correct_values_mass, + 4, + err_msg="error: rmsd profile should match test values", + ) + + +class TestProtein2DRMSD: + def test_output_shape(self, mda_universe): + """Output should have n*(n-1)//2 entries for n frames.""" + prot = mda_universe.select_atoms("name CA") + result = Protein2DRMSD(prot).run(step=10) + n_frames = len(mda_universe.trajectory[::10]) + expected_pairs = n_frames * (n_frames - 1) // 2 + assert len(result.results.rmsd2d) == expected_pairs + + def test_values_nonnegative(self, mda_universe): + """All RMSD values should be non-negative.""" + prot = mda_universe.select_atoms("name CA") + result = Protein2DRMSD(prot).run(step=10) + assert np.all(result.results.rmsd2d >= 0) + + def test_symmetric_diagonal_zero(self, mda_universe): + """When reconstructed into a full matrix, diagonal should be zero + and matrix should be symmetric.""" + prot = mda_universe.select_atoms("name CA") + result = Protein2DRMSD(prot).run(step=10) + + n_frames = len(mda_universe.trajectory[::10]) + mat = np.zeros((n_frames, n_frames)) + mat[np.triu_indices_from(mat, k=1)] = result.results.rmsd2d + mat += mat.T + + assert_allclose(np.diag(mat), 0.0) + assert_allclose(mat, mat.T) + + def test_matches_mda_distance_matrix(self, mda_universe): + """Results should match MDAnalysis DistanceMatrix.""" + prot = mda_universe.select_atoms("name CA") + result = Protein2DRMSD(prot).run(step=10) + + metric = partial(rms.rmsd, center=True, superposition=True) + ref = diffusionmap.DistanceMatrix(prot, metric=metric) + ref.run(step=10) + dist_mat = ref.results.dist_matrix + i, j = np.triu_indices_from(dist_mat, k=1) + expected = dist_mat[i, j] + + assert_allclose(result.results.rmsd2d, expected, atol=1e-4) + + +class TestLigandCOMDrift: + @pytest.fixture + def ligand(self, hybrid_system_skipped_pdb, simulation_skipped_nc): + u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) + yield u.select_atoms("resname UNK") + u.trajectory.close() + + def test_first_frame_is_zero(self, ligand): + """COM drift at the first frame should always be zero.""" + result = LigandCOMDrift(ligand).run(step=10) + assert result.results.com_drift[0] == pytest.approx(0.0, abs=1e-5) + + def test_output_shape(self, ligand): + """Output should have one entry per analyzed frame.""" + result = LigandCOMDrift(ligand).run(step=10) + n_frames = len(ligand.universe.trajectory[::10]) + assert len(result.results.com_drift) == n_frames + + def test_values_nonnegative(self, ligand): + """COM drift values should be non-negative distances.""" + result = LigandCOMDrift(ligand).run(step=10) + assert np.all(result.results.com_drift >= 0) diff --git a/src/openfe_analysis/tests/test_rmsd_mda_data.py b/src/openfe_analysis/tests/test_rmsd_mda_data.py deleted file mode 100644 index 83f95ef..0000000 --- a/src/openfe_analysis/tests/test_rmsd_mda_data.py +++ /dev/null @@ -1,80 +0,0 @@ -import MDAnalysis as mda -import pytest -from MDAnalysisTests.datafiles import DCD, PSF -from numpy.testing import assert_allclose, assert_almost_equal - -from openfe_analysis.rmsd import RMSDAnalysis - - -@pytest.fixture -def mda_universe(): - return mda.Universe(PSF, DCD) - - -@pytest.fixture() -def correct_values(): - return [0, 4.68953] - - -@pytest.fixture() -def correct_values_mass(): - return [0, 4.74920] - - -def test_rmsd(mda_universe, correct_values): - prot = mda_universe.select_atoms("name CA") - prot_rmsd = RMSDAnalysis(prot, superposition=True).run(step=49) - assert_almost_equal( - prot_rmsd.results.rmsd, - correct_values, - 4, - err_msg="error: rmsd profile should match" + "test values", - ) - - -def test_rmsd_frames(mda_universe, correct_values): - prot = mda_universe.select_atoms("name CA") - prot_rmsd = RMSDAnalysis(prot, superposition=True).run(frames=[0, 49]) - assert_almost_equal( - prot_rmsd.results.rmsd, - correct_values, - 4, - err_msg="error: rmsd profile should match" + "test values", - ) - - -def test_rmsd_single_frame(mda_universe): - prot = mda_universe.select_atoms("name CA") - prot_rmsd = RMSDAnalysis(prot, superposition=True).run(start=5, stop=6) - single_frame = [0.91544906] - assert_almost_equal( - prot_rmsd.results.rmsd, - single_frame, - 4, - err_msg="error: rmsd profile should match" + "test values", - ) - - -def test_mass_weighted(mda_universe, correct_values): - # mass weighting the CA should give the same answer as weighing - # equally because all CA have the same mass - prot = mda_universe.select_atoms("name CA") - prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49) - - assert_almost_equal( - prot_rmsd.results.rmsd, - correct_values, - 4, - err_msg="error: rmsd profile should matchtest values", - ) - - -def test_custom_weighted(mda_universe, correct_values_mass): - prot = mda_universe.select_atoms("all") - prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49) - assert_almost_equal( - prot_rmsd.results.rmsd, - correct_values_mass, - 4, - err_msg="error: rmsd profile should matchtest values", - ) From b4600d8c631ac0444b65c97cca8d824f6778bf9f Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 14:23:59 +0200 Subject: [PATCH 22/34] Remove element fix function --- .../tests/utils/test_universe_utils.py | 106 ------------------ src/openfe_analysis/utils/universe_utils.py | 61 ---------- 2 files changed, 167 deletions(-) diff --git a/src/openfe_analysis/tests/utils/test_universe_utils.py b/src/openfe_analysis/tests/utils/test_universe_utils.py index a386a59..2447fb2 100644 --- a/src/openfe_analysis/tests/utils/test_universe_utils.py +++ b/src/openfe_analysis/tests/utils/test_universe_utils.py @@ -1,7 +1,4 @@ -import MDAnalysis as mda -import numpy as np import pytest -from rdkit import Chem from openfe_analysis.rmsd import make_Universe from openfe_analysis.utils.universe_utils import ( @@ -67,106 +64,3 @@ def test_select_state_atoms_shared_atoms(universe): shared_a = set(atom.ix for atom in state_a if atom.bfactor == 0.5) shared_b = set(atom.ix for atom in state_b if atom.bfactor == 0.5) assert shared_a == shared_b - - -def test_correct_elements_fixes_element(): - """correct_elements should update element where rdmol differs.""" - - # Build a minimal universe with a C atom - u = mda.Universe.empty(2, n_residues=1, trajectory=True) - u.add_TopologyAttr("elements", ["C", "C"]) # second atom is wrong - u.add_TopologyAttr("names", ["C1", "C2"]) - u.add_TopologyAttr("resnames", ["UNK"]) - u.add_TopologyAttr("resids", [1]) - u.load_new( - np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), - order="fac", - ) - ag = u.select_atoms("all") - - mol = Chem.RWMol() - mol.AddAtom(Chem.Atom(6)) # C - mol.AddAtom(Chem.Atom(7)) # N - rdmol = mol.GetMol() - - with pytest.warns(UserWarning, match="No atom_mapping provided"): - correct_elements(ag, rdmol) - - assert ag[0].element == "C" - assert ag[1].element == "N" - assert ag[1].name == "N" - - -def test_correct_elements_no_change_when_correct(): - """correct_elements should not modify atoms that already have correct elements.""" - - u = mda.Universe.empty(2, n_residues=1, trajectory=True) - u.add_TopologyAttr("elements", ["C", "N"]) - u.add_TopologyAttr("names", ["C1", "N1"]) - u.add_TopologyAttr("resnames", ["UNK"]) - u.add_TopologyAttr("resids", [1]) - u.load_new( - np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), - order="fac", - ) - ag = u.select_atoms("all") - - mol = Chem.RWMol() - mol.AddAtom(Chem.Atom(6)) # C - mol.AddAtom(Chem.Atom(7)) # N - rdmol = mol.GetMol() - - with pytest.warns(UserWarning, match="No atom_mapping provided"): - correct_elements(ag, rdmol) - - assert ag[0].element == "C" - assert ag[0].name == "C1" # name unchanged - assert ag[1].element == "N" - assert ag[1].name == "N1" # name unchanged - - -def test_correct_elements_with_atom_mapping(): - """correct_elements with atom_mapping should use mapping without warning.""" - - u = mda.Universe.empty(2, n_residues=1, trajectory=True) - u.add_TopologyAttr("elements", ["C", "C"]) # second atom is wrong - u.add_TopologyAttr("names", ["C1", "C2"]) - u.add_TopologyAttr("resnames", ["UNK"]) - u.add_TopologyAttr("resids", [1]) - u.load_new( - np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), - order="fac", - ) - ag = u.select_atoms("all") - - # rdmol has atoms in reverse order: N, C - mol = Chem.RWMol() - mol.AddAtom(Chem.Atom(7)) # N at rdmol index 0 - mol.AddAtom(Chem.Atom(6)) # C at rdmol index 1 - rdmol = mol.GetMol() - - # explicitly map ag index 0 -> rdmol index 1 (C), ag index 1 -> rdmol index 0 (N) - correct_elements(ag, rdmol, atom_mapping={0: 1, 1: 0}) - - assert ag[0].element == "C" # mapped to rdmol index 1 (C) - assert ag[1].element == "N" # mapped to rdmol index 0 (N) - assert ag[1].name == "N" - - -def test_correct_elements_raises_size_error(): - """correct_elements should raise ValueError if atom counts don't match.""" - - u = mda.Universe.empty(2, n_residues=1, trajectory=True) - u.add_TopologyAttr("elements", ["C", "N"]) - u.add_TopologyAttr("names", ["C1", "N1"]) - u.add_TopologyAttr("resnames", ["UNK"]) - u.add_TopologyAttr("resids", [1]) - u.load_new(np.array([[[0.0, 0.0, 0.0], [1.5, 0.0, 0.0]]]), order="fac") - ag = u.select_atoms("all") - - mol = Chem.RWMol() - mol.AddAtom(Chem.Atom(6)) # only 1 atom - rdmol = mol.GetMol() - - with pytest.raises(ValueError, match="atomgroup has 2 atoms but rdmol has 1"): - correct_elements(ag, rdmol) diff --git a/src/openfe_analysis/utils/universe_utils.py b/src/openfe_analysis/utils/universe_utils.py index 2950091..89a50fc 100644 --- a/src/openfe_analysis/utils/universe_utils.py +++ b/src/openfe_analysis/utils/universe_utils.py @@ -1,12 +1,10 @@ from __future__ import annotations -import warnings from typing import Literal import MDAnalysis as mda import numpy as np from MDAnalysis.guesser.tables import vdwradii as MDA_VDWRADII -from rdkit import Chem # B-factor values used to identify atoms present at a given lambda state. # 0.25 : atoms unique to state A @@ -86,62 +84,3 @@ def guess_ligand_bonds( } ) atomgroup.guess_bonds(vdwradii) - - -def correct_elements( - atomgroup: mda.AtomGroup, - rdmol: Chem.Mol, - atom_mapping: dict[int, int] | None = None, -) -> None: - """ - Correct element and atom names in an AtomGroup in-place - using an RDKit molecule as the source of truth. - - This is needed for hybrid topologies where mapped atoms that - undergo element changes carry state A's element types, even when - state B's ligand is selected. - - Parameters - ---------- - atomgroup : mda.AtomGroup - Ligand atoms whose elements and names will be corrected. - rdmol : Chem.Mol - RDKit molecule with the correct element and atom name information. - atom_mapping : dict[int, int], optional - A mapping of ``{atomgroup_index: rdmol_index}`` defining the - correspondence between atoms in ``atomgroup`` and ``rdmol``. If - ``None``, atoms are matched by position which gives wrong results if - the atom order was not the same. - - Raises - ------ - ValueError - If the number of atoms in ``atomgroup`` and ``rdmol`` do not match. - """ - periodic_table = Chem.GetPeriodicTable() - - if len(atomgroup) != rdmol.GetNumAtoms(): - raise ValueError( - f"atomgroup has {len(atomgroup)} atoms but rdmol has {rdmol.GetNumAtoms()} atoms." - ) - - if atom_mapping is not None: - for ag_idx, rd_idx in atom_mapping.items(): - mda_atom = atomgroup[ag_idx] - rd_atom = rdmol.GetAtomWithIdx(rd_idx) - element = periodic_table.GetElementSymbol(rd_atom.GetAtomicNum()) - if mda_atom.element != element: - mda_atom.element = element - mda_atom.name = rd_atom.GetSymbol() - else: - warnings.warn( - "No atom_mapping provided to correct_elements. Assuming that " - "atom ordering is the same between atomgroup and rdmol. This may " - "give incorrect results if the atom ordering differs between the two.", - UserWarning, - ) - for mda_atom, rd_atom in zip(atomgroup, rdmol.GetAtoms()): - element = periodic_table.GetElementSymbol(rd_atom.GetAtomicNum()) - if mda_atom.element != element: - mda_atom.element = element - mda_atom.name = rd_atom.GetSymbol() From bf0261108f896305311d05a558c274a400543941 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 14:49:21 +0200 Subject: [PATCH 23/34] Restructure tests --- src/openfe_analysis/rmsd.py | 9 ++- src/openfe_analysis/tests/test_rmsd.py | 76 ------------------- .../tests/test_rmsd_classes.py | 76 +++++++++++++++++-- .../tests/utils/test_universe_utils.py | 1 - 4 files changed, 74 insertions(+), 88 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 39815e1..6b381ef 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -388,7 +388,6 @@ def gather_rms_data( u = make_Universe(u_top._topology, ds, state=state_idx) prot = u.select_atoms("protein and name CA") ligand = u.select_atoms("resname UNK") - state_lig = select_state_atoms(u, end_state="A").select_atoms("resname UNK") if prot: prot_rmsd = RMSDAnalysis(prot).run(step=skip) @@ -398,9 +397,11 @@ def gather_rms_data( output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) if ligand: - # lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) - guess_ligand_bonds(state_lig, delete_existing=True) - lig_rmsd = SymmetryCorrectedLigandRMSD(state_lig, mass_weighted=True).run(step=skip) + # For now, leave it at the normal RMSD + lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) + # state_lig = select_state_atoms(u, end_state="A").select_atoms("resname UNK") + # guess_ligand_bonds(state_lig, delete_existing=True) + # lig_rmsd = SymmetryCorrectedLigandRMSD(state_lig).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) lig_com_drift = LigandCOMDrift(ligand).run(step=skip) diff --git a/src/openfe_analysis/tests/test_rmsd.py b/src/openfe_analysis/tests/test_rmsd.py index 98ed2c7..b67f7c4 100644 --- a/src/openfe_analysis/tests/test_rmsd.py +++ b/src/openfe_analysis/tests/test_rmsd.py @@ -14,8 +14,6 @@ from openfe_analysis.reader import FEReader from openfe_analysis.rmsd import ( RMSDAnalysis, - SymmetryCorrectedLigandRMSD, - _select_state_ligand, gather_rms_data, make_Universe, ) @@ -176,77 +174,3 @@ def test_ligand_com_continuity(mda_universe): assert max(jumps) < 5.0 u.trajectory.close() - - -def test_symmetry_corrected_ligand_rmsd_nonnegative(mda_universe): - """RMSD values must be non-negative for all frames.""" - u = mda_universe - state_lig = _select_state_ligand(u) - - result = SymmetryCorrectedLigandRMSD(state_lig).run() - - assert np.all(result.results.rmsd >= 0.0) - - -def test_symmetry_corrected_ligand_rmsd_zero_for_valid_swap(): - """ - For a water-like symmetric molecule, swapping the two equivalent H atoms - gives naive RMSD > 0 but SymmetryCorrectedLigandRMSD = 0. - """ - # Build a minimal universe with two frames: reference and swapped - coords_ref = np.array( - [ - [0.0, 0.0, 0.0], # O - [1.0, 0.0, 0.0], # H1 - [0.0, 1.0, 0.0], # H2 - ] - ) - coords_swapped = np.array( - [ - [0.0, 0.0, 0.0], # O - [0.0, 1.0, 0.0], # H2 in H1's slot - [1.0, 0.0, 0.0], # H1 in H2's slot - ] - ) - - u = mda.Universe.empty(3, trajectory=True) - u.add_TopologyAttr("elements", ["O", "H", "H"]) - u.add_TopologyAttr("names", ["O", "H1", "H2"]) - u.add_TopologyAttr("resnames", ["UNK"]) - u.add_TopologyAttr("resids", [1]) - u.load_new( - np.array([coords_ref, coords_swapped]), - order="fac", - ) - - ag = u.select_atoms("all") - - corrected = SymmetryCorrectedLigandRMSD(ag).run() - naive = RMSDAnalysis(ag).run() - - # Frame 0 is reference — both should be 0 - assert corrected.results.rmsd[0] == pytest.approx(0.0, abs=1e-5) - assert naive.results.rmsd[0] == pytest.approx(0.0, abs=1e-5) - - # Frame 1 is the swap — naive sees displacement, corrected sees zero - assert naive.results.rmsd[1] > 0.0 - assert corrected.results.rmsd[1] == pytest.approx(0.0, abs=1e-5) - - -def test_ligand_rmsd_mass_weighting_effect(simulation_skipped_nc, hybrid_system_skipped_pdb): - with nc.Dataset(simulation_skipped_nc) as ds: - u_top = mda.Universe(hybrid_system_skipped_pdb) - u = make_Universe(u_top._topology, ds, state=0) - ligand = u.select_atoms("resname UNK") - state_lig = _select_state_ligand(u) - - rmsd_full_mw = RMSDAnalysis(ligand, mass_weighted=True).run() - rmsd_full_no_mw = RMSDAnalysis(ligand, mass_weighted=False).run() - rmsd_state_mw = RMSDAnalysis(state_lig, mass_weighted=True).run() - rmsd_state_no_mw = RMSDAnalysis(state_lig, mass_weighted=False).run() - - print(f"Full ligand, mass weighted: {rmsd_full_mw.results.rmsd[:6]}") - print(f"Full ligand, no mass weighting: {rmsd_full_no_mw.results.rmsd[:6]}") - print(f"State ligand, mass weighted: {rmsd_state_mw.results.rmsd[:6]}") - print(f"State ligand, no mass weighting: {rmsd_state_no_mw.results.rmsd[:6]}") - print("Old expected: [0.0, 1.092039, 0.839234, 1.228383, 1.533331, 1.276798]") diff --git a/src/openfe_analysis/tests/test_rmsd_classes.py b/src/openfe_analysis/tests/test_rmsd_classes.py index 5312611..05e1a1b 100644 --- a/src/openfe_analysis/tests/test_rmsd_classes.py +++ b/src/openfe_analysis/tests/test_rmsd_classes.py @@ -7,7 +7,15 @@ from MDAnalysisTests.datafiles import DCD, PSF from numpy.testing import assert_allclose, assert_almost_equal -from openfe_analysis.rmsd import LigandCOMDrift, Protein2DRMSD, RMSDAnalysis, make_Universe +from openfe_analysis.rmsd import ( + LigandCOMDrift, + Protein2DRMSD, + RMSDAnalysis, + SymmetryCorrectedLigandRMSD, + gather_rms_data, + make_Universe, +) +from openfe_analysis.utils import universe_utils @pytest.fixture @@ -15,6 +23,13 @@ def mda_universe(): return mda.Universe(PSF, DCD) +@pytest.fixture +def ligand(hybrid_system_skipped_pdb, simulation_skipped_nc): + u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) + yield u.select_atoms("resname UNK") + u.trajectory.close() + + @pytest.fixture() def correct_values(): return [0, 4.68953] @@ -124,12 +139,6 @@ def test_matches_mda_distance_matrix(self, mda_universe): class TestLigandCOMDrift: - @pytest.fixture - def ligand(self, hybrid_system_skipped_pdb, simulation_skipped_nc): - u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) - yield u.select_atoms("resname UNK") - u.trajectory.close() - def test_first_frame_is_zero(self, ligand): """COM drift at the first frame should always be zero.""" result = LigandCOMDrift(ligand).run(step=10) @@ -145,3 +154,56 @@ def test_values_nonnegative(self, ligand): """COM drift values should be non-negative distances.""" result = LigandCOMDrift(ligand).run(step=10) assert np.all(result.results.com_drift >= 0) + + +class TestSymmetryCorrectedLigandRMSD: + def test_values_nonnegative(self, ligand): + state_lig = universe_utils.select_state_atoms(ligand.universe, end_state="A").select_atoms( + "resname UNK" + ) + result = SymmetryCorrectedLigandRMSD(state_lig).run() + assert np.all(result.results.rmsd >= 0.0) + + def test_zero_for_valid_swap(self): + """ + For a water-like symmetric molecule, swapping the two equivalent H atoms + gives naive RMSD > 0 but SymmetryCorrectedLigandRMSD = 0. + """ + # Build a minimal universe with two frames: reference and swapped + coords_ref = np.array( + [ + [0.0, 0.0, 0.0], # O + [1.0, 0.0, 0.0], # H1 + [0.0, 1.0, 0.0], # H2 + ] + ) + coords_swapped = np.array( + [ + [0.0, 0.0, 0.0], # O + [0.0, 1.0, 0.0], # H2 in H1's slot + [1.0, 0.0, 0.0], # H1 in H2's slot + ] + ) + + u = mda.Universe.empty(3, trajectory=True) + u.add_TopologyAttr("elements", ["O", "H", "H"]) + u.add_TopologyAttr("names", ["O", "H1", "H2"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new( + np.array([coords_ref, coords_swapped]), + order="fac", + ) + + ag = u.select_atoms("all") + + corrected = SymmetryCorrectedLigandRMSD(ag).run() + naive = RMSDAnalysis(ag).run() + + # Frame 0 is reference — both should be 0 + assert corrected.results.rmsd[0] == pytest.approx(0.0, abs=1e-5) + assert naive.results.rmsd[0] == pytest.approx(0.0, abs=1e-5) + + # Frame 1 is the swap — naive sees displacement, corrected sees zero + assert naive.results.rmsd[1] > 0.0 + assert corrected.results.rmsd[1] == pytest.approx(0.0, abs=1e-5) diff --git a/src/openfe_analysis/tests/utils/test_universe_utils.py b/src/openfe_analysis/tests/utils/test_universe_utils.py index 2447fb2..b344120 100644 --- a/src/openfe_analysis/tests/utils/test_universe_utils.py +++ b/src/openfe_analysis/tests/utils/test_universe_utils.py @@ -2,7 +2,6 @@ from openfe_analysis.rmsd import make_Universe from openfe_analysis.utils.universe_utils import ( - correct_elements, guess_ligand_bonds, select_state_atoms, ) From c96fd8efef4e9d7a075580e4678a9f3e7b0cee22 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 14:59:30 +0200 Subject: [PATCH 24/34] Update test symm corr --- src/openfe_analysis/tests/test_rmsd_classes.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/openfe_analysis/tests/test_rmsd_classes.py b/src/openfe_analysis/tests/test_rmsd_classes.py index 05e1a1b..02c6176 100644 --- a/src/openfe_analysis/tests/test_rmsd_classes.py +++ b/src/openfe_analysis/tests/test_rmsd_classes.py @@ -157,12 +157,13 @@ def test_values_nonnegative(self, ligand): class TestSymmetryCorrectedLigandRMSD: - def test_values_nonnegative(self, ligand): + def test_regression(self, ligand): state_lig = universe_utils.select_state_atoms(ligand.universe, end_state="A").select_atoms( "resname UNK" ) - result = SymmetryCorrectedLigandRMSD(state_lig).run() - assert np.all(result.results.rmsd >= 0.0) + result = SymmetryCorrectedLigandRMSD(state_lig).run(step=10) + expected = [0.0, 0.75138, 2.09003, 0.95125, 1.54566, 2.00029] + assert_allclose(result.results.rmsd[:6], expected, rtol=1e-3) def test_zero_for_valid_swap(self): """ From 984bcbbf60abcec50c8659deeaa3feddadf2b3b8 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 15:26:45 +0200 Subject: [PATCH 25/34] Update tests --- .../tests/test_rmsd_classes.py | 58 +++++++------------ 1 file changed, 20 insertions(+), 38 deletions(-) diff --git a/src/openfe_analysis/tests/test_rmsd_classes.py b/src/openfe_analysis/tests/test_rmsd_classes.py index 5312611..00d19d3 100644 --- a/src/openfe_analysis/tests/test_rmsd_classes.py +++ b/src/openfe_analysis/tests/test_rmsd_classes.py @@ -15,6 +15,13 @@ def mda_universe(): return mda.Universe(PSF, DCD) +@pytest.fixture +def ligand(hybrid_system_skipped_pdb, simulation_skipped_nc): + u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) + yield u.select_atoms("resname UNK") + u.trajectory.close() + + @pytest.fixture() def correct_values(): return [0, 4.68953] @@ -80,23 +87,8 @@ def test_custom_weighted(self, mda_universe, correct_values_mass): class TestProtein2DRMSD: - def test_output_shape(self, mda_universe): - """Output should have n*(n-1)//2 entries for n frames.""" - prot = mda_universe.select_atoms("name CA") - result = Protein2DRMSD(prot).run(step=10) - n_frames = len(mda_universe.trajectory[::10]) - expected_pairs = n_frames * (n_frames - 1) // 2 - assert len(result.results.rmsd2d) == expected_pairs - - def test_values_nonnegative(self, mda_universe): - """All RMSD values should be non-negative.""" - prot = mda_universe.select_atoms("name CA") - result = Protein2DRMSD(prot).run(step=10) - assert np.all(result.results.rmsd2d >= 0) - def test_symmetric_diagonal_zero(self, mda_universe): - """When reconstructed into a full matrix, diagonal should be zero - and matrix should be symmetric.""" + """Diagonal should be zero and matrix should be symmetric.""" prot = mda_universe.select_atoms("name CA") result = Protein2DRMSD(prot).run(step=10) @@ -113,6 +105,13 @@ def test_matches_mda_distance_matrix(self, mda_universe): prot = mda_universe.select_atoms("name CA") result = Protein2DRMSD(prot).run(step=10) + # Check the output shape + n_frames = len(mda_universe.trajectory[::10]) + expected_pairs = n_frames * (n_frames - 1) // 2 + assert len(result.results.rmsd2d) == expected_pairs + assert np.all(result.results.rmsd2d >= 0) + + # Distancematrix doesn't do centering and superposition by default metric = partial(rms.rmsd, center=True, superposition=True) ref = diffusionmap.DistanceMatrix(prot, metric=metric) ref.run(step=10) @@ -123,25 +122,8 @@ def test_matches_mda_distance_matrix(self, mda_universe): assert_allclose(result.results.rmsd2d, expected, atol=1e-4) -class TestLigandCOMDrift: - @pytest.fixture - def ligand(self, hybrid_system_skipped_pdb, simulation_skipped_nc): - u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) - yield u.select_atoms("resname UNK") - u.trajectory.close() - - def test_first_frame_is_zero(self, ligand): - """COM drift at the first frame should always be zero.""" - result = LigandCOMDrift(ligand).run(step=10) - assert result.results.com_drift[0] == pytest.approx(0.0, abs=1e-5) - - def test_output_shape(self, ligand): - """Output should have one entry per analyzed frame.""" - result = LigandCOMDrift(ligand).run(step=10) - n_frames = len(ligand.universe.trajectory[::10]) - assert len(result.results.com_drift) == n_frames - - def test_values_nonnegative(self, ligand): - """COM drift values should be non-negative distances.""" - result = LigandCOMDrift(ligand).run(step=10) - assert np.all(result.results.com_drift >= 0) +def test_ligand_com_drift(ligand): + result = LigandCOMDrift(ligand).run(step=10) + expected = [0.0, 0.38549, 0.61483, 0.54140, 1.26861, 0.92772] + assert len(result.results.com_drift) == len(ligand.universe.trajectory[::10]) + assert_allclose(result.results.com_drift[:6], expected, rtol=1e-3) From 42c30175b12470d2c4cec081c9ad45435961d49b Mon Sep 17 00:00:00 2001 From: Hannah Baumann <43765638+hannahbaumann@users.noreply.github.com> Date: Tue, 26 May 2026 15:28:15 +0200 Subject: [PATCH 26/34] Update src/openfe_analysis/utils/universe_utils.py Co-authored-by: Josh Horton --- src/openfe_analysis/utils/universe_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openfe_analysis/utils/universe_utils.py b/src/openfe_analysis/utils/universe_utils.py index 89a50fc..c944a43 100644 --- a/src/openfe_analysis/utils/universe_utils.py +++ b/src/openfe_analysis/utils/universe_utils.py @@ -51,7 +51,7 @@ def select_state_atoms( else: raise ValueError(f"end_state must be 'A' or 'B', got '{end_state}'") - state_indices = np.array([atom.ix for atom in universe.atoms if atom.bfactor in bfactor_values]) + state = sum([universe.atoms[universe.atoms.tempfactors == i] for i in bfactor_values]) return universe.atoms[state_indices] From 26f04f520571a6f2481e2a72a7bc2d274f32de94 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 15:38:07 +0200 Subject: [PATCH 27/34] Small fix --- src/openfe_analysis/utils/universe_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openfe_analysis/utils/universe_utils.py b/src/openfe_analysis/utils/universe_utils.py index c944a43..217d71d 100644 --- a/src/openfe_analysis/utils/universe_utils.py +++ b/src/openfe_analysis/utils/universe_utils.py @@ -52,7 +52,7 @@ def select_state_atoms( raise ValueError(f"end_state must be 'A' or 'B', got '{end_state}'") state = sum([universe.atoms[universe.atoms.tempfactors == i] for i in bfactor_values]) - return universe.atoms[state_indices] + return universe.atoms[state] def guess_ligand_bonds( From eafa23e305c27c3a9d03e84e9b6f2f8492a5854f Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Tue, 26 May 2026 16:09:39 +0200 Subject: [PATCH 28/34] Add error for missing bonds --- src/openfe_analysis/rmsd.py | 6 ++++++ src/openfe_analysis/tests/test_rmsd_classes.py | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 6b381ef..2b79d18 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -246,6 +246,12 @@ def __init__( ): super().__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup + if rdmol is None and len(atomgroup.bonds) == 0: + raise ValueError( + "No bonds found on atomgroup. Call guess_ligand_bonds() " + "before instantiating SymmetryCorrectedLigandRMSD, or " + "pass an rdmol directly." + ) self._mol = rdmol if rdmol is not None else atomgroup.convert_to("RDKIT") self._aprops = np.array([atom.GetAtomicNum() for atom in self._mol.GetAtoms()]) self._am = Chem.rdmolops.GetAdjacencyMatrix(self._mol) diff --git a/src/openfe_analysis/tests/test_rmsd_classes.py b/src/openfe_analysis/tests/test_rmsd_classes.py index a43dd3a..632fd77 100644 --- a/src/openfe_analysis/tests/test_rmsd_classes.py +++ b/src/openfe_analysis/tests/test_rmsd_classes.py @@ -189,3 +189,18 @@ def test_zero_for_valid_swap(self): # Frame 1 is the swap — naive sees displacement, corrected sees zero assert naive.results.rmsd[1] > 0.0 assert corrected.results.rmsd[1] == pytest.approx(0.0, abs=1e-5) + + def test_raises_on_missing_bonds(self): + """Should raise ValueError if atomgroup has no bonds and no rdmol is provided.""" + u = mda.Universe.empty(3, n_residues=1, trajectory=True) + u.add_TopologyAttr("elements", ["O", "H", "H"]) + u.add_TopologyAttr("names", ["O", "H1", "H2"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new( + np.array([[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]]]), + order="fac", + ) + ag = u.select_atoms("all") + with pytest.raises(ValueError, match="No bonds found"): + SymmetryCorrectedLigandRMSD(ag) From ca9f3d6325cb30497070f5095fe99c025cd55589 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Wed, 27 May 2026 12:39:55 +0200 Subject: [PATCH 29/34] fix failing tests --- src/openfe_analysis/rmsd.py | 17 +++++++++++------ src/openfe_analysis/tests/test_rmsd_classes.py | 1 + src/openfe_analysis/utils/universe_utils.py | 7 +++++-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index ef68c2a..cf82663 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -246,12 +246,17 @@ def __init__( ): super().__init__(atomgroup.universe.trajectory, **kwargs) self._ag = atomgroup - if rdmol is None and len(atomgroup.bonds) == 0: - raise ValueError( - "No bonds found on atomgroup. Call guess_ligand_bonds() " - "before instantiating SymmetryCorrectedLigandRMSD, or " - "pass an rdmol directly." - ) + if rdmol is None: + try: + has_bonds = len(atomgroup.bonds) > 0 + except mda.exceptions.NoDataError: + has_bonds = False + if not has_bonds: + raise ValueError( + "No bonds found on atomgroup. Call guess_ligand_bonds() " + "before instantiating SymmetryCorrectedLigandRMSD, or " + "pass an rdmol directly." + ) self._mol = rdmol if rdmol is not None else atomgroup.convert_to("RDKIT") self._aprops = np.array([atom.GetAtomicNum() for atom in self._mol.GetAtoms()]) self._am = Chem.rdmolops.GetAdjacencyMatrix(self._mol) diff --git a/src/openfe_analysis/tests/test_rmsd_classes.py b/src/openfe_analysis/tests/test_rmsd_classes.py index 632fd77..65ade43 100644 --- a/src/openfe_analysis/tests/test_rmsd_classes.py +++ b/src/openfe_analysis/tests/test_rmsd_classes.py @@ -172,6 +172,7 @@ def test_zero_for_valid_swap(self): u.add_TopologyAttr("names", ["O", "H1", "H2"]) u.add_TopologyAttr("resnames", ["UNK"]) u.add_TopologyAttr("resids", [1]) + u.add_TopologyAttr("bonds", [(0, 1), (0, 2)]) u.load_new( np.array([coords_ref, coords_swapped]), order="fac", diff --git a/src/openfe_analysis/utils/universe_utils.py b/src/openfe_analysis/utils/universe_utils.py index 217d71d..7eb9cca 100644 --- a/src/openfe_analysis/utils/universe_utils.py +++ b/src/openfe_analysis/utils/universe_utils.py @@ -51,8 +51,11 @@ def select_state_atoms( else: raise ValueError(f"end_state must be 'A' or 'B', got '{end_state}'") - state = sum([universe.atoms[universe.atoms.tempfactors == i] for i in bfactor_values]) - return universe.atoms[state] + state = sum( + [universe.atoms[universe.atoms.tempfactors == i] for i in bfactor_values], + universe.atoms[[]], # empty AtomGroup as start value + ) + return state def guess_ligand_bonds( From c1347260688def72f893068fb035da6b0afca1b0 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Wed, 27 May 2026 14:26:00 +0200 Subject: [PATCH 30/34] update fixture to make test more stable --- src/openfe_analysis/tests/utils/test_universe_utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/openfe_analysis/tests/utils/test_universe_utils.py b/src/openfe_analysis/tests/utils/test_universe_utils.py index b344120..205dee9 100644 --- a/src/openfe_analysis/tests/utils/test_universe_utils.py +++ b/src/openfe_analysis/tests/utils/test_universe_utils.py @@ -15,8 +15,11 @@ def universe(hybrid_system_skipped_pdb, simulation_skipped_nc): @pytest.fixture -def ligand_ag(universe): - return select_state_atoms(universe, end_state="A").select_atoms("resname UNK") +def ligand_ag(hybrid_system_skipped_pdb, simulation_skipped_nc): + u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) + ag = select_state_atoms(u, end_state="A").select_atoms("resname UNK") + yield ag + u.trajectory.close() def test_guess_ligand_bonds_adds_bonds(ligand_ag): From b08671aa89bca97b8fdbb1208930e78b10678f9d Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Wed, 27 May 2026 14:40:36 +0200 Subject: [PATCH 31/34] fix tests --- .../tests/utils/test_universe_utils.py | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/openfe_analysis/tests/utils/test_universe_utils.py b/src/openfe_analysis/tests/utils/test_universe_utils.py index 205dee9..e104cb8 100644 --- a/src/openfe_analysis/tests/utils/test_universe_utils.py +++ b/src/openfe_analysis/tests/utils/test_universe_utils.py @@ -1,7 +1,8 @@ import pytest -from openfe_analysis.rmsd import make_Universe +from openfe_analysis.utils import apply_transformations from openfe_analysis.utils.universe_utils import ( + create_universe_single_state, guess_ligand_bonds, select_state_atoms, ) @@ -9,17 +10,27 @@ @pytest.fixture def universe(hybrid_system_skipped_pdb, simulation_skipped_nc): - u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) - yield u - u.trajectory.close() + universe = create_universe_single_state( + hybrid_system_skipped_pdb, simulation_skipped_nc, state=0 + ) + prot = universe.select_atoms("protein and name CA") + ligand = universe.select_atoms("resname UNK") + apply_transformations.apply_alignment_transformations(universe, prot, ligand) + yield universe + universe.trajectory.close() @pytest.fixture def ligand_ag(hybrid_system_skipped_pdb, simulation_skipped_nc): - u = make_Universe(hybrid_system_skipped_pdb, simulation_skipped_nc, state=0) - ag = select_state_atoms(u, end_state="A").select_atoms("resname UNK") + universe = create_universe_single_state( + hybrid_system_skipped_pdb, simulation_skipped_nc, state=0 + ) + prot = universe.select_atoms("protein and name CA") + ligand = universe.select_atoms("resname UNK") + apply_transformations.apply_alignment_transformations(universe, prot, ligand) + ag = select_state_atoms(universe, end_state="A").select_atoms("resname UNK") yield ag - u.trajectory.close() + universe.trajectory.close() def test_guess_ligand_bonds_adds_bonds(ligand_ag): From b6b05c07a7c5c705b0bdf304d7942db0d14bb065 Mon Sep 17 00:00:00 2001 From: Hannah Baumann <43765638+hannahbaumann@users.noreply.github.com> Date: Wed, 27 May 2026 16:10:05 +0200 Subject: [PATCH 32/34] Apply suggestion from @hannahbaumann --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 093ffc9..f430640 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ 'openff-units', 'pyyaml', 'spyrmsd', + 'rdkit', 'matplotlib', ] description="Trajectory analysis of free energy calculations." From e8d8a026934802703f1a9e6060f152debfd89f05 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Thu, 28 May 2026 09:54:37 +0200 Subject: [PATCH 33/34] Add error if rdkit mol and atomgroup have different number of atoms --- src/openfe_analysis/rmsd.py | 16 ++++++-- .../tests/test_rmsd_classes.py | 38 +++++++++++++------ 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 60c68b8..12c3f38 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -146,6 +146,13 @@ class SymmetryCorrectedLigandRMSD(AnalysisBase): used directly and ``guess_ligand_bonds`` does not need to be called. If ``None``, the RDKit molecule is derived from ``atomgroup`` via ``convert_to("RDKIT")``. + + Raises + ------ + ValueError + If ``rdmol`` is ``None`` and no bonds are found on the atomgroup. + ValueError + If the number of atoms in ``atomgroup`` and ``rdmol`` do not match. """ _analysis_algorithm_is_parallelizable = False @@ -170,6 +177,11 @@ def __init__( "pass an rdmol directly." ) self._mol = rdmol if rdmol is not None else atomgroup.convert_to("RDKIT") + if len(atomgroup) != self._mol.GetNumAtoms(): + raise ValueError( + f"atomgroup has {len(atomgroup)} atoms but rdmol has " + f"{self._mol.GetNumAtoms()} atoms." + ) self._aprops = np.array([atom.GetAtomicNum() for atom in self._mol.GetAtoms()]) self._am = Chem.rdmolops.GetAdjacencyMatrix(self._mol) @@ -331,11 +343,7 @@ def gather_rms_data( output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d) if ligand: - # For now, leave it at the normal RMSD lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip) - # state_lig = select_state_atoms(u, end_state="A").select_atoms("resname UNK") - # guess_ligand_bonds(state_lig, delete_existing=True) - # lig_rmsd = SymmetryCorrectedLigandRMSD(state_lig).run(step=skip) output["ligand_RMSD"].append(lig_rmsd.results.rmsd) lig_com_drift = LigandCOMDrift(ligand).run(step=skip) diff --git a/src/openfe_analysis/tests/test_rmsd_classes.py b/src/openfe_analysis/tests/test_rmsd_classes.py index f3cf15f..ec3d5ce 100644 --- a/src/openfe_analysis/tests/test_rmsd_classes.py +++ b/src/openfe_analysis/tests/test_rmsd_classes.py @@ -6,6 +6,7 @@ from MDAnalysis.analysis import diffusionmap, rms from MDAnalysisTests.datafiles import DCD, PSF from numpy.testing import assert_allclose, assert_almost_equal +from rdkit import Chem from openfe_analysis.rmsd import ( LigandCOMDrift, @@ -33,6 +34,21 @@ def ligand(hybrid_system_skipped_pdb, simulation_skipped_nc): universe.trajectory.close() +@pytest.fixture +def minimal_universe(): + """Minimal 3-atom water-like universe without bonds.""" + u = mda.Universe.empty(3, n_residues=1, trajectory=True) + u.add_TopologyAttr("elements", ["O", "H", "H"]) + u.add_TopologyAttr("names", ["O", "H1", "H2"]) + u.add_TopologyAttr("resnames", ["UNK"]) + u.add_TopologyAttr("resids", [1]) + u.load_new( + np.array([[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]]]), + order="fac", + ) + return u.select_atoms("all") + + @pytest.fixture() def correct_values(): return [0, 4.68953] @@ -194,17 +210,15 @@ def test_zero_for_valid_swap(self): assert naive.results.rmsd[1] > 0.0 assert corrected.results.rmsd[1] == pytest.approx(0.0, abs=1e-5) - def test_raises_on_missing_bonds(self): + def test_raises_on_missing_bonds(self, minimal_universe): """Should raise ValueError if atomgroup has no bonds and no rdmol is provided.""" - u = mda.Universe.empty(3, n_residues=1, trajectory=True) - u.add_TopologyAttr("elements", ["O", "H", "H"]) - u.add_TopologyAttr("names", ["O", "H1", "H2"]) - u.add_TopologyAttr("resnames", ["UNK"]) - u.add_TopologyAttr("resids", [1]) - u.load_new( - np.array([[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]]]), - order="fac", - ) - ag = u.select_atoms("all") with pytest.raises(ValueError, match="No bonds found"): - SymmetryCorrectedLigandRMSD(ag) + SymmetryCorrectedLigandRMSD(minimal_universe) + + def test_raises_on_atom_count_mismatch(self, minimal_universe): + """Should raise ValueError if atomgroup and rdmol have different atom counts.""" + mol = Chem.RWMol() + mol.AddAtom(Chem.Atom(8)) # O + mol.AddAtom(Chem.Atom(1)) # H + with pytest.raises(ValueError, match="atomgroup has 3 atoms but rdmol has 2"): + SymmetryCorrectedLigandRMSD(minimal_universe, rdmol=mol.GetMol()) From 53d718bc0a4d32488c29d6bf190532538f0668c4 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Thu, 28 May 2026 10:22:52 +0200 Subject: [PATCH 34/34] Move atm number check --- src/openfe_analysis/rmsd.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 12c3f38..b54beb9 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -176,12 +176,13 @@ def __init__( "before instantiating SymmetryCorrectedLigandRMSD, or " "pass an rdmol directly." ) + else: + if len(atomgroup) != rdmol.GetNumAtoms(): + raise ValueError( + f"atomgroup has {len(atomgroup)} atoms but rdmol has " + f"{rdmol.GetNumAtoms()} atoms." + ) self._mol = rdmol if rdmol is not None else atomgroup.convert_to("RDKIT") - if len(atomgroup) != self._mol.GetNumAtoms(): - raise ValueError( - f"atomgroup has {len(atomgroup)} atoms but rdmol has " - f"{self._mol.GetNumAtoms()} atoms." - ) self._aprops = np.array([atom.GetAtomicNum() for atom in self._mol.GetAtoms()]) self._am = Chem.rdmolops.GetAdjacencyMatrix(self._mol)