Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ channels:
dependencies:
- click
- MDAnalysis
- MDAnalysisTests
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this to the test dependencies in the pyproject.toml.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

- netCDF4
- openff-units
- pip
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ test = [
"pytest-xdist",
"pytest-cov",
"pooch",
"MDAnalysisTests",
]

[project.urls]
Expand Down
272 changes: 178 additions & 94 deletions src/openfe_analysis/rmsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import MDAnalysis as mda
import netCDF4 as nc
import numpy as np
import tqdm
from MDAnalysis.analysis import rms
from MDAnalysis.analysis import diffusionmap, rms
from MDAnalysis.analysis.base import AnalysisBase
from MDAnalysis.transformations import unwrap
from numpy import typing as npt

from .reader import FEReader
from .transformations import Aligner, ClosestImageShift, NoJump
Expand Down Expand Up @@ -53,7 +52,7 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers
- Unwraps protein and ligand atom to be made whole
- Shifts protein chains and the ligand to the image closest to the first
protein chain (:class:`ClosestImageShift`)
- Aligns the entire system to minimise the protein RMSD (:class:`Aligner`)
- Aligns the entire system to minimize the protein RMSD (:class:`Aligner`)

If only a ligand is present:

Expand Down Expand Up @@ -88,7 +87,7 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers
else:
# if there's no protein
# - make the ligand not jump periodic images between frames
# - align the ligand to minimise its RMSD
# - align the ligand to minimize its RMSD
nope = NoJump(ligand)
align = Aligner(ligand)

Expand All @@ -100,9 +99,169 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers
return u


class Protein2DRMSD(AnalysisBase):
"""
Flattened 2D RMSD matrix

For all unique frame pairs ``(i, j)`` with ``i < j``, this function
computes the RMSD between atomic coordinates after optimal alignment.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe expand this to mention you're doing a center of geometry fit as well as a rotational and translational superposition usingg QCP?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this!

Alignment is performed by centering each frame on its center of geometry,
followed by rotational and translational superposition using the QCP method.

Parameters
----------
atomgroup: mda.AtomGroup
Protein atoms (e.g. CA selection)
weights: np.ndarray, optional
Per-atom weights to use in the RMSD calculation. If ``None``,
all atoms are weighted equally.

Notes
-----
All atom positions are accumulated in memory during the trajectory
iteration. For long trajectories or large systems this may result in
significant memory usage. Consider using the ``step`` argument to
``run()`` to reduce the number of frames analyzed.
"""

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly define _analysis_algorithm_is_parallelizable = False (it's inherited by default, but it would be good to have it explicitly defined here) in these classes and then raise an issue about looking into parallism?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this and also opened an issue.

_analysis_algorithm_is_parallelizable = False

def __init__(self, atomgroup: mda.AtomGroup, weights: Optional[np.ndarray] = None, **kwargs):
super().__init__(atomgroup.universe.trajectory, **kwargs)
self._weights = weights
self._ag = atomgroup

def _prepare(self) -> None:
self._coords = np.zeros((self.n_frames, self._ag.n_atoms, 3), dtype=np.float64)

def _single_frame(self) -> None:
self._coords[self._frame_index] = self._ag.positions

def _conclude(self) -> None:
nframes = self._coords.shape[0]

# Pre-allocate numpy arrays
n_pairs = nframes * (nframes - 1) // 2
self.results.rmsd2d = np.empty(n_pairs)

for idx, (i, j) in enumerate(itertools.combinations(range(nframes), 2)):
posi, posj = self._coords[i], self._coords[j]
self.results.rmsd2d[idx] = rms.rmsd(
posi,
posj,
self._weights,
center=True,
superposition=True,
)


class RMSDAnalysis(AnalysisBase):
"""
1D RMSD time series for an AtomGroup.

Parameters
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, try to make where you put parameters consistent between the docstrings of different classes. I prefer it in this location (since it reflects what MDAnalysis does), but Protein2DRMSD has it in the __init__.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved Parameters in the Protein2DRMSD to the class.

----------
atomgroup : MDAnalysis.AtomGroup
Atoms to compute RMSD for.
reference: Optional[MDAnalysis.AtomGroup]
Reference AtomGroup. If ``None``, the reference positions are taken
from the first analyzed frame, so ``run(start=10)`` measures RMSD
relative to frame 10, not frame 0.
mass_weighted : bool, optional
If True, compute mass-weighted RMSD.
center : bool, optional
If ``True``, subtract the center of geometry before computing RMSD.
Defaults to ``False`` as the trajectory is assumed to be pre-centered.
superposition : bool, optional
If ``True``, perform rotational superposition before computing RMSD.
Defaults to ``False`` as the trajectory is assumed to be pre-superposed.
"""

_analysis_algorithm_is_parallelizable = False

def __init__(
self,
atomgroup: mda.AtomGroup,
reference: Optional[mda.AtomGroup] = None,
mass_weighted: bool = False,
center: bool = False,
superposition: bool = False,
**kwargs,
):
super().__init__(atomgroup.universe.trajectory, **kwargs)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_analysis_algorithm_is_parallelizable

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remember to add.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this!

self._ag = atomgroup
self._reference = reference if reference is not None else self._ag
self._mass_weighted = mass_weighted
self._center = center
self._superposition = superposition

def _prepare(self) -> None:
self.results.rmsd = np.zeros(self.n_frames, dtype=np.float64)
# reference is taken from the first analyzed frame, not necessarily frame 0
self._reference_pos = self._reference.positions.copy()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if you call .run(start=10), this will mean your reference is frame 10 not frame 0. This should probably be documented.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc string and also inline comment


if self._mass_weighted:
self._weights = self._ag.masses / np.mean(self._ag.masses)
else:
self._weights = None

def _single_frame(self) -> None:
self.results.rmsd[self._frame_index] = rms.rmsd(
self._ag.positions,
self._reference_pos,
self._weights,
center=self._center,
superposition=self._superposition,
)


class LigandCOMDrift(AnalysisBase):
"""
Ligand center-of-mass displacement from initial position.

Parameters
----------
atomgroup : mda.AtomGroup
Ligand atoms for which the center-of-mass drift is calculated.

Notes
-----
The initial position is taken from the first analyzed frame, so
``run(start=10)`` measures drift relative to frame 10, not frame 0.

PBC are not applied as the trajectory is assumed to have been
pre-processed, ensuring the ligand does not jump between periodic images.
Passing a box to apply the minimum image convention would give
incorrect results for ligands that have drifted more than half a box
length from their starting position.
"""

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above re: parallelizable

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

_analysis_algorithm_is_parallelizable = False

def __init__(self, atomgroup: mda.AtomGroup, **kwargs):
super().__init__(atomgroup.universe.trajectory, **kwargs)
self._ag = atomgroup

def _prepare(self) -> None:
self.results.com_drift = np.zeros(self.n_frames, dtype=np.float64)
# initial COM is taken from the first analyzed frame, not necessarily frame 0
self._initial_com = self._ag.center_of_mass()

def _single_frame(self) -> None:
# no box argument, assumes the ligand stays in a consistent image;
# applying the minimum image convention could mask large drifts > half a box length
self.results.com_drift[self._frame_index] = mda.lib.distances.calc_bonds(
self._ag.center_of_mass(),
self._initial_com,
)


def gather_rms_data(
pdb_topology: pathlib.Path, dataset: pathlib.Path, skip: Optional[int] = None
) -> dict[str, list[Any]]:
pdb_topology: pathlib.Path,
dataset: pathlib.Path,
skip: Optional[int] = None,
) -> dict[str, list[float]]:
"""
Compute structural RMSD-based metrics for a multistate BFE simulation.

Expand Down Expand Up @@ -161,105 +320,30 @@ def gather_rms_data(
# max against 1 to avoid skip=0 case
skip = max(n_frames // 500, 1)

pb = tqdm.tqdm(total=int(n_frames / skip) * n_lambda)

u_top = mda.Universe(pdb_topology)

for i in range(n_lambda):
for state_idx in range(n_lambda):
# cheeky, but we can read the PDB topology once and reuse per universe
# this then only hits the PDB file once for all replicas
u = make_Universe(u_top._topology, ds, state=i)
u = make_Universe(u_top._topology, ds, state=state_idx)

prot = u.select_atoms("protein and name CA")
ligand = u.select_atoms("resname UNK")

# save coordinates for 2D RMSD matrix
# TODO: Some smart guard to avoid allocating a silly amount of memory?
prot2d = np.empty((len(u.trajectory[::skip]), len(prot), 3), dtype=np.float32)

prot_start = prot.positions
ligand_start = ligand.positions
ligand_initial_com = ligand.center_of_mass()
ligand_weights = ligand.masses / np.mean(ligand.masses)

this_protein_rmsd = []
this_ligand_rmsd = []
this_ligand_wander = []

for ts_i, ts in enumerate(u.trajectory[::skip]):
pb.update()

if prot:
prot2d[ts_i, :, :] = prot.positions
this_protein_rmsd.append(
rms.rmsd(
prot.positions,
prot_start,
None, # prot_weights,
center=False,
superposition=False,
)
)
if ligand:
this_ligand_rmsd.append(
rms.rmsd(
ligand.positions,
ligand_start,
ligand_weights,
center=False,
superposition=False,
)
)
this_ligand_wander.append(
# distance between start and current ligand position
# ignores PBC, but we've already centered the traj
mda.lib.distances.calc_bonds(ligand.center_of_mass(), ligand_initial_com)
)

if prot:
# can ignore weights here as it's all Ca
rmsd2d = twoD_RMSD(prot2d, w=None) # prot_weights)
output["protein_RMSD"].append(this_protein_rmsd)
output["protein_2D_RMSD"].append(rmsd2d)
if ligand:
output["ligand_RMSD"].append(this_ligand_rmsd)
output["ligand_wander"].append(this_ligand_wander)

output["time(ps)"] = list(np.arange(len(u.trajectory))[::skip] * u.trajectory.dt)

return output

prot_rmsd = RMSDAnalysis(prot).run(step=skip)
output["protein_RMSD"].append(prot_rmsd.results.rmsd)

def twoD_RMSD(positions, w: Optional[npt.NDArray]) -> list[float]:
"""
Compute a flattened 2D RMSD matrix from a trajectory.
prot_rmsd2d = Protein2DRMSD(prot).run(step=skip)
output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d)

For all unique frame pairs ``(i, j)`` with ``i < j``, this function
computes the RMSD between atomic coordinates after optimal alignment.

Parameters
----------
positions : np.ndarray
Atomic coordinates for all frames in the trajectory.
w : np.ndarray, optional
Per-atom weights to use in the RMSD calculation. If ``None``,
all atoms are weighted equally.

Returns
-------
list of float
Flattened list of RMSD values corresponding to all frame pairs
``(i, j)`` with ``i < j``.
"""
nframes, _, _ = positions.shape

output = []

for i, j in itertools.combinations(range(nframes), 2):
posi, posj = positions[i], positions[j]
if ligand:
lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ligand RMSD is currently calculated on the hybrid topology, which may not be what we want long term.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a separate PR - the atom selection (or atomgroup) should really be user defined rather than defaulting to UNK.

This might be a good argument for letting Protocols deal with this rather than making it uniform.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue here: #103

output["ligand_RMSD"].append(lig_rmsd.results.rmsd)

rmsd = rms.rmsd(posi, posj, w, center=True, superposition=True)
lig_com_drift = LigandCOMDrift(ligand).run(step=skip)
output["ligand_wander"].append(lig_com_drift.results.com_drift)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is historical, so it doesn't have to be here, but can we please renamed this to ligand_com_drift or anything else? wander is such an unspecific name 😅

Copy link
Copy Markdown
Contributor Author

@hannahbaumann hannahbaumann May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would do this in a separate PR, since it would require an update in openfe? Raised an issue here #104


output.append(rmsd)
output["time(ps)"] = np.arange(len(u.trajectory))[::skip] * u.trajectory.dt

return output
Loading
Loading