Skip to content

Multiplex filtering#362

Open
amyjaynethompson wants to merge 6 commits into
mainfrom
multiplex_filtering
Open

Multiplex filtering#362
amyjaynethompson wants to merge 6 commits into
mainfrom
multiplex_filtering

Conversation

@amyjaynethompson
Copy link
Copy Markdown
Contributor

@amyjaynethompson amyjaynethompson commented Apr 28, 2026

xia2.multiplex has a filtering option built in which can greatly improve data reduction quality. VMXm, in particular, always manually reprocess datasets with xia2.multiplex to turn on these filtering parameters. Therefore, it would be nice to include this as a part of the auto processing infrastructure.

The issue has always been that the filtering can be slow, and this can impede rapid feedback. In xia2, we recently made a new command line program, xia2.multiplex_filtering. This performs the same filtering algorithms on a completed multiplex job. By breaking the algorithm into two separate programs, this would allow for rapid feedback as well as providing a filtered mtz later. This PR attempts to provide trigger/wrappers for such a filtering pipeline.

The cluster number is passed through from multiplex to multiplex_filtering to ensure that it is not triggered on clusters (possible implementation for clusters in the future, but would need slightly different triggering requirements).

As this pipeline relies on a finished multiplex directory (specific files needed that are not user-interesting), checks are done to make sure data is available where expected. This is done using the same delay multipliers as multiplex.

The sample group information is also passed through from multiplex. This is important, as there can be multiple sample groups related to a single DCID. Multiplex also passes through the actual DCID's it used in processing. This is also important, as the stored list of related DCID's can include both rotation/grid scans or other datasets that should not be used. Given all the relevant queries are already done in the multiplex trigger, it seemed easiest to pass these through rather than repeating all these queries.

The filtering itself is set to image_group mode, which means all the images are grouped into batches and a deltacchalf algorithm is used to see if any of these batches do not correlate well with the rest of the data. A group size of 50 is set as default, as this corresponds to 5deg rotation (following standard 0.1 deg fine slicing). However, VMXm have had success using a group size of 10, so they have this specified for their beamline.

General intent here is to test on VMXm first via staging, then roll it out live just for VMXm initially. This will be useful stress testing prior to deployment on other beam lines. Eventually, it is expected that this is triggered on all beam lines after multiplex.

NOTE: will need dials/latest to run -> this includes xia2.multiplex_filtering bug fixes which are not in the latest release.

@amyjaynethompson
Copy link
Copy Markdown
Contributor Author

Refactored the code so that a separate trigger function was no longer needed. The multiplex recipe has been updated so that a new output channel "filtering" is able to trigger xia2.multiplex_filtering. This saves moving parameters between multiplex and multiplex_filtering.

Comment thread src/dlstbx/services/trigger.py Outdated
Comment on lines +2366 to +2375
if parameters.cluster_num != "None":
is_cluster = True
else:
is_cluster = False

if is_cluster:
self.log.info(
f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters."
)
return {"success": True}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if parameters.cluster_num != "None":
is_cluster = True
else:
is_cluster = False
if is_cluster:
self.log.info(
f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters."
)
return {"success": True}
if parameters.cluster_num != "None":
self.log.info(
f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters."
)
return {"success": True}

Comment thread src/dlstbx/services/trigger.py Outdated
f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters."
)
return {"success": True}
else:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else statement isn't needed here because function will have already returned for clusters

Comment thread src/dlstbx/services/trigger.py Outdated
Comment on lines +2377 to +2384
multiplex_dir = parameters.multiplex_job
if not multiplex_dir.is_dir():
self.log.error(
f"Given multiplex directory {multiplex_dir} does not exist. Aborting job."
)
return {"success": True}
else:
self.log.info(f"Previous multiplex job at {multiplex_dir}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multiplex directory not existing is not an expected behaviour. I'd suggest not handling it gracefully and just let the pipeline crash or raise an error so that any issues would be more obvious.

Comment thread src/dlstbx/services/trigger.py Outdated
Comment on lines +2402 to +2425
query = (
(
session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join(
ProcessingJob,
ProcessingJob.processingJobId
== AutoProcProgram.processingJobId,
)
)
.filter(ProcessingJob.dataCollectionId == parameters.dcid)
.filter(ProcessingJob.automatic == True) # noqa E712
.filter(ProcessingJob.recordTimestamp > min_start_time) # noqa E711
.filter(
AutoProcProgram.processingJobId == parameters.multiplex_id
) # check only parent multiplex
.filter(
or_(
AutoProcProgram.processingStatus == None, # noqa E711
AutoProcProgram.processingStartTime == None, # noqa E711
)
)
)

# If there are any running (or yet to start) jobs, then checkpoint with delay
waiting_processing_jobs = query.all()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting for a the multiplex job to finish should not be necessary as it only trigger pipelines downstream of it when it has finished (technically it is slightly before the wrapper has finished but xia2.multiplex will have finished and the necessary files will have been created)

Comment thread src/dlstbx/services/trigger.py Outdated
Comment on lines +2467 to +2471
dc = (
session.query(DataCollection)
.filter(DataCollection.dataCollectionId == parameters.dcid)
.one()
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query doesn't get used

Comment thread src/dlstbx/services/trigger.py Outdated
Comment on lines +2201 to +2202
for idx, dcid in enumerate(dcids):
job_parameters.append((f"group_dcid_{idx}", str(dcid)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to upload these all into a single list. This is possible by uploading multiple job processing parameters under the same key (see how this is handled in the dimple trigger for example). This will save you having to wrestle with this format of having a separate key per dcid elsewhere in the code.

diffraction_plan_info: Optional[DiffractionPlanInfo] = None
recipe: Optional[str] = None
use_clustering: Optional[List[str]] = None
use_filtering: Optional[List[str]] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use_filtering: Optional[List[str]] = None
use_filtering: List[str] = []

Use of Optional isn't necessary here, the default value is what truly makes a parameter optional in pydantic terms. The Optional type just means that a None value can be supplied, which I don't think would match the use case.


self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered")
self.log.info(
f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered"
f"xia2.multiplex trigger: Processing job {jobid} triggered"

Comment on lines +2200 to +2203
if (
parameters.use_filtering
and parameters.beamline in parameters.use_filtering
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (
parameters.use_filtering
and parameters.beamline in parameters.use_filtering
):
if parameters.beamline in parameters.use_filtering:

If you remove the Optional typing as described above, this check simplifies.


# Place holder code for future iterations where may run filtering jobs on clusters

if cluster_num is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to retain cluster logic here (and possibly elsewhere in this wrapper). The flitering job is triggered as a separate job by the multiplex wrapper. With the way the recipe is structured, if you were running filtering on clusters, a separate call of the filtering wrapper would get made for each cluster so you wouldn't need the same logic that loops over and distinguishes between clusters and non-clusters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants