Multiplex filtering#362
Conversation
|
Refactored the code so that a separate trigger function was no longer needed. The multiplex recipe has been updated so that a new output channel "filtering" is able to trigger xia2.multiplex_filtering. This saves moving parameters between multiplex and multiplex_filtering. |
| if parameters.cluster_num != "None": | ||
| is_cluster = True | ||
| else: | ||
| is_cluster = False | ||
|
|
||
| if is_cluster: | ||
| self.log.info( | ||
| f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters." | ||
| ) | ||
| return {"success": True} |
There was a problem hiding this comment.
| if parameters.cluster_num != "None": | |
| is_cluster = True | |
| else: | |
| is_cluster = False | |
| if is_cluster: | |
| self.log.info( | |
| f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters." | |
| ) | |
| return {"success": True} | |
| if parameters.cluster_num != "None": | |
| self.log.info( | |
| f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters." | |
| ) | |
| return {"success": True} |
| f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters." | ||
| ) | ||
| return {"success": True} | ||
| else: |
There was a problem hiding this comment.
else statement isn't needed here because function will have already returned for clusters
| multiplex_dir = parameters.multiplex_job | ||
| if not multiplex_dir.is_dir(): | ||
| self.log.error( | ||
| f"Given multiplex directory {multiplex_dir} does not exist. Aborting job." | ||
| ) | ||
| return {"success": True} | ||
| else: | ||
| self.log.info(f"Previous multiplex job at {multiplex_dir}") |
There was a problem hiding this comment.
The multiplex directory not existing is not an expected behaviour. I'd suggest not handling it gracefully and just let the pipeline crash or raise an error so that any issues would be more obvious.
| query = ( | ||
| ( | ||
| session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join( | ||
| ProcessingJob, | ||
| ProcessingJob.processingJobId | ||
| == AutoProcProgram.processingJobId, | ||
| ) | ||
| ) | ||
| .filter(ProcessingJob.dataCollectionId == parameters.dcid) | ||
| .filter(ProcessingJob.automatic == True) # noqa E712 | ||
| .filter(ProcessingJob.recordTimestamp > min_start_time) # noqa E711 | ||
| .filter( | ||
| AutoProcProgram.processingJobId == parameters.multiplex_id | ||
| ) # check only parent multiplex | ||
| .filter( | ||
| or_( | ||
| AutoProcProgram.processingStatus == None, # noqa E711 | ||
| AutoProcProgram.processingStartTime == None, # noqa E711 | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
| # If there are any running (or yet to start) jobs, then checkpoint with delay | ||
| waiting_processing_jobs = query.all() |
There was a problem hiding this comment.
Waiting for a the multiplex job to finish should not be necessary as it only trigger pipelines downstream of it when it has finished (technically it is slightly before the wrapper has finished but xia2.multiplex will have finished and the necessary files will have been created)
| dc = ( | ||
| session.query(DataCollection) | ||
| .filter(DataCollection.dataCollectionId == parameters.dcid) | ||
| .one() | ||
| ) |
There was a problem hiding this comment.
This query doesn't get used
| for idx, dcid in enumerate(dcids): | ||
| job_parameters.append((f"group_dcid_{idx}", str(dcid))) |
There was a problem hiding this comment.
It would be better to upload these all into a single list. This is possible by uploading multiple job processing parameters under the same key (see how this is handled in the dimple trigger for example). This will save you having to wrestle with this format of having a separate key per dcid elsewhere in the code.
| diffraction_plan_info: Optional[DiffractionPlanInfo] = None | ||
| recipe: Optional[str] = None | ||
| use_clustering: Optional[List[str]] = None | ||
| use_filtering: Optional[List[str]] = None |
There was a problem hiding this comment.
| use_filtering: Optional[List[str]] = None | |
| use_filtering: List[str] = [] |
Use of Optional isn't necessary here, the default value is what truly makes a parameter optional in pydantic terms. The Optional type just means that a None value can be supplied, which I don't think would match the use case.
|
|
||
| self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered") | ||
| self.log.info( | ||
| f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered" |
There was a problem hiding this comment.
| f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered" | |
| f"xia2.multiplex trigger: Processing job {jobid} triggered" |
| if ( | ||
| parameters.use_filtering | ||
| and parameters.beamline in parameters.use_filtering | ||
| ): |
There was a problem hiding this comment.
| if ( | |
| parameters.use_filtering | |
| and parameters.beamline in parameters.use_filtering | |
| ): | |
| if parameters.beamline in parameters.use_filtering: |
If you remove the Optional typing as described above, this check simplifies.
|
|
||
| # Place holder code for future iterations where may run filtering jobs on clusters | ||
|
|
||
| if cluster_num is not None: |
There was a problem hiding this comment.
I don't think it makes sense to retain cluster logic here (and possibly elsewhere in this wrapper). The flitering job is triggered as a separate job by the multiplex wrapper. With the way the recipe is structured, if you were running filtering on clusters, a separate call of the filtering wrapper would get made for each cluster so you wouldn't need the same logic that loops over and distinguishes between clusters and non-clusters.
xia2.multiplexhas a filtering option built in which can greatly improve data reduction quality. VMXm, in particular, always manually reprocess datasets with xia2.multiplex to turn on these filtering parameters. Therefore, it would be nice to include this as a part of the auto processing infrastructure.The issue has always been that the filtering can be slow, and this can impede rapid feedback. In xia2, we recently made a new command line program,
xia2.multiplex_filtering. This performs the same filtering algorithms on a completed multiplex job. By breaking the algorithm into two separate programs, this would allow for rapid feedback as well as providing a filtered mtz later. This PR attempts to provide trigger/wrappers for such a filtering pipeline.The cluster number is passed through from
multiplextomultiplex_filteringto ensure that it is not triggered on clusters (possible implementation for clusters in the future, but would need slightly different triggering requirements).As this pipeline relies on a finished multiplex directory (specific files needed that are not user-interesting), checks are done to make sure data is available where expected. This is done using the same delay multipliers as multiplex.
The sample group information is also passed through from multiplex. This is important, as there can be multiple sample groups related to a single DCID. Multiplex also passes through the actual DCID's it used in processing. This is also important, as the stored list of related DCID's can include both rotation/grid scans or other datasets that should not be used. Given all the relevant queries are already done in the multiplex trigger, it seemed easiest to pass these through rather than repeating all these queries.
The filtering itself is set to
image_groupmode, which means all the images are grouped into batches and a deltacchalf algorithm is used to see if any of these batches do not correlate well with the rest of the data. A group size of 50 is set as default, as this corresponds to 5deg rotation (following standard 0.1 deg fine slicing). However, VMXm have had success using a group size of 10, so they have this specified for their beamline.General intent here is to test on VMXm first via staging, then roll it out live just for VMXm initially. This will be useful stress testing prior to deployment on other beam lines. Eventually, it is expected that this is triggered on all beam lines after multiplex.
NOTE: will need dials/latest to run -> this includes
xia2.multiplex_filteringbug fixes which are not in the latest release.