From 57c610e7a4f660c38890a1c231521e5c0a9e81fc Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Fri, 8 May 2026 14:10:31 +0100 Subject: [PATCH 01/13] estimate_transmission wrapper --- src/dlstbx/services/strategy.py | 5 +- src/dlstbx/wrapper/estimate_transmission.py | 121 ++++++++++++++++++++ 2 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 src/dlstbx/wrapper/estimate_transmission.py diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index cd67c86a7..48120f81a 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -193,12 +193,13 @@ def generate_strategy( ) beamline_config = parse_config_file(beamline_config_file) + minimum_beamline_transmission = get_beamline_param(beamline_config, ("gda.mx.udc.minTransmission",), 0.0), scaled_transmission = parameters.get("scaled_transmission", 1.0) transmission_limits = ( - get_beamline_param(beamline_config, ("gda.mx.udc.minTransmission",), 0.0), + minimum_beamline_transmission, min(get_beamline_param(beamline_config, ("gda.mx.udc.maxTransmission",), 1.0), - scaled_transmission) + max(scaled_transmission, minimum_beamline_transmission)) ) exposure_time_limits = ( get_beamline_param( diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py new file mode 100644 index 000000000..5064c9ad4 --- /dev/null +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -0,0 +1,121 @@ +from __future__ import annotations +from dials.array_family import flex + +import math +import subprocess +import json +from pathlib import Path +import shutil +from collections import Counter +from itertools import accumulate + +from dlstbx.wrapper import Wrapper + +def build_hist_from_reflections(reflections): + "Iterate through the shoeboxes to a reflection and generate a pixel histogram" + + shoeboxes = reflections["shoebox"] + counter = Counter() + for sbox in shoeboxes: + counter.update(sbox.data.as_numpy_array().ravel()) + + sorted_counter = sorted(counter.items()) + return zip(*sorted_counter) + +def get_percentile_index(num_pixels, percentile): + threshold = sum(num_pixels) * percentile + + for i, cum_sum in enumerate(accumulate(num_pixels)): + if cum_sum >= threshold: + return i + + return len(num_pixels) + + +class EstimateTransmissionWrapper(Wrapper): + _logger_name = "dlstbx.wrap.estimate_transmission" + + def run(self): + assert hasattr(self, "recwrap"), "No recipewrapper object found" + + params = self.recwrap.recipe_step["job_parameters"] + working_directory = Path(params["working_directory"]) + results_directory = Path(params["results_directory"]) + + beamline = params["beamline"] + pixel_percentile = params["pixel_percentile"].get(beamline, 100) / 100 + target_countrate_pct = params["target_countrate_pct"].get(beamline, 25) / 100 + oscillation = float(params["oscillation"]) + transmission = float(params["transmission"]) + file = params["input_file"] + + commands = { + "dials.import": ["dials.import", file], + "dials.find_spots": ["dials.find_spots", "imported.expt", "ice_rings.filter=True"], + } + + for command, script in commands.items(): + result = subprocess.Popen(script, cwd=working_directory) + result.wait() + if result.returncode: + self.log.info(f"{command} failed with return code {result.returncode}") + self.log.info(result.stderr) + + self.log.debug(f"Command output:\n{result.stdout}") + self.log.debug(f"From command: {script}") + return False + + results_directory.mkdir(parents=True, exist_ok=True) + output_files = ["dials.find_spots.log", "strong.refl", "imported.expt"] + + # Copy files to + for output_file in output_files: + source_file = working_directory / output_file + destination = results_directory / output_file + + if not source_file.exists(): + self.log.info(f"{source_file=} does not exsist") + return False + + self.log.info(f"Copying {str(source_file)} to {str(destination)}") + shutil.copy(source_file, destination) + + experiment_file = results_directory / "imported.expt" + with experiment_file.open("r") as f: + experiment = json.load(f) + trusted_range = experiment["detector"][0]["panels"][0]["trusted_range"][1] + + reflection_file = results_directory / "strong.refl" + reflections = flex.reflection_table.from_file(reflection_file) + num_counts, num_pixels = build_hist_from_reflections(reflections) + + index_of_percentile = get_percentile_index(num_pixels, pixel_percentile) + counts_at_percentile = num_counts[index_of_percentile] + + mosaicity_corr = params.get("mosaicity_correction", False) + average_to_peak = ( + self.mosaicity_correction(mosaicity_corr, oscillation) + if mosaicity_corr + else 1 + ) + + countrate_saturation = counts_at_percentile / trusted_range + self.log.info(f"Countrate saturation is : {counts_at_percentile / trusted_range}") + scale_factor = average_to_peak * target_countrate_pct / countrate_saturation + + scaled_transmission = min(1, (transmission * scale_factor) / 100) + self.log.info(f"Scaled transmission is : {scaled_transmission}") + + self.recwrap.send_to("strategy", {"parameters": {"scaled_transmission": float(scaled_transmission)}}) + self.log.info("Done.") + return True + + def mosaicity_correction(self, moscaicity_coefficent: float, oscillation: float): + delta_z = oscillation / (moscaicity_coefficent) * math.sqrt(2) + average_to_peak = ( + math.sqrt(math.pi) * delta_z * math.erf(delta_z) + + math.exp(-(delta_z * delta_z)) + - 1 + ) / (delta_z * delta_z) + self.log.info("Average-to-peak intensity ratio: %f", average_to_peak) + return average_to_peak From 8f33f296d413f5eaf5601b675c7cf71a0e4df7cc Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Fri, 8 May 2026 14:11:25 +0100 Subject: [PATCH 02/13] Removing xia2.overload wrapper --- src/dlstbx/wrapper/xia2_overload.py | 92 ----------------------------- 1 file changed, 92 deletions(-) delete mode 100644 src/dlstbx/wrapper/xia2_overload.py diff --git a/src/dlstbx/wrapper/xia2_overload.py b/src/dlstbx/wrapper/xia2_overload.py deleted file mode 100644 index e8e3866c6..000000000 --- a/src/dlstbx/wrapper/xia2_overload.py +++ /dev/null @@ -1,92 +0,0 @@ -from __future__ import annotations - -import json -import math -import subprocess - -from pathlib import Path -import shutil - -from dlstbx.wrapper import Wrapper - - -class Xia2OverloadWrapper(Wrapper): - _logger_name = "dlstbx.wrap.xia2_overload" - - def run(self): - assert hasattr(self, "recwrap"), "No recipewrapper object found" - - params = self.recwrap.recipe_step["job_parameters"] - working_directory = Path(params["working_directory"]) - results_directory = Path(params["results_directory"]) - - target_countrate_pct = float(params["target_countrate_pct"]) - oscillation = float(params["oscillation"]) - transmission = float(params["transmission"]) - - file = params["input_file"] - - command = [f"xia2.overload {file}"] - - result = subprocess.run( - command, shell=True, cwd=working_directory, capture_output=True - ) - - if result.returncode: - self.log.info(f"xia2.overload failed with return code {result.returncode}") - self.log.info(result.stderr) - self.log.debug(f"Command output:\n{result.stdout}") - return False - - results_directory.mkdir(parents=True, exist_ok=True) - output_file_name = "overload.json" - - source_file = working_directory / output_file_name - destination = results_directory / output_file_name - - if not source_file.exists(): - return False - - self.log.debug(f"Copying {str(source_file)} to {str(destination)}") - shutil.copy2(source_file, destination) - - self.record_result_individual_file( - { - "file_path": str(destination.parent), - "file_name": destination.name, - "file_type": "result", - } - ) - - with source_file.open("r") as f: - data = json.load(f) - counts = data["counts"] - overload_limit = float(data["overload_limit"]) - - max_count = float(list(counts)[-1]) - - mosaicity_corr = params.get("mosaicity_correction", False) - average_to_peak = ( - self.mosaicity_correction(mosaicity_corr, oscillation) - if mosaicity_corr - else 1 - ) - - saturation = (max_count / overload_limit) * average_to_peak - scale_factor = target_countrate_pct / saturation - - scaled_transmission = min(1, ( transmission * scale_factor ) / 100) - - self.recwrap.send_to("strategy", {"parameters": {"scaled_transmission": scaled_transmission}}) - self.log.info("Done.") - return True - - def mosaicity_correction(self, moscaicity_coefficent: float, oscillation: float): - delta_z = oscillation / (moscaicity_coefficent) * math.sqrt(2) - average_to_peak = ( - math.sqrt(math.pi) * delta_z * math.erf(delta_z) - + math.exp(-(delta_z * delta_z)) - - 1 - ) / (delta_z * delta_z) - self.log.info("Average-to-peak intensity ratio: %f", average_to_peak) - return average_to_peak From bc6af9ebba7031e5e14a807c80e6cb6ed0f8567c Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Mon, 11 May 2026 14:00:28 +0100 Subject: [PATCH 03/13] Moving to use subprocess.run --- src/dlstbx/services/strategy.py | 14 ++-- src/dlstbx/wrapper/estimate_transmission.py | 79 +++++++++++++-------- 2 files changed, 60 insertions(+), 33 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index 48120f81a..49171e6c7 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -193,13 +193,19 @@ def generate_strategy( ) beamline_config = parse_config_file(beamline_config_file) - minimum_beamline_transmission = get_beamline_param(beamline_config, ("gda.mx.udc.minTransmission",), 0.0), - scaled_transmission = parameters.get("scaled_transmission", 1.0) + minimum_beamline_transmission = get_beamline_param( + beamline_config, ("gda.mx.udc.minTransmission",), 0.0 + ) + scaled_transmission = parameters.get("scaled_transmission", 1.0) transmission_limits = ( minimum_beamline_transmission, - min(get_beamline_param(beamline_config, ("gda.mx.udc.maxTransmission",), 1.0), - max(scaled_transmission, minimum_beamline_transmission)) + min( + get_beamline_param( + beamline_config, ("gda.mx.udc.maxTransmission",), 1.0 + ), + max(scaled_transmission, minimum_beamline_transmission), + ), ) exposure_time_limits = ( get_beamline_param( diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 5064c9ad4..ad2f709df 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -1,16 +1,20 @@ from __future__ import annotations -from dials.array_family import flex -import math -import subprocess import json -from pathlib import Path +import math +import os import shutil +import subprocess +import time from collections import Counter from itertools import accumulate +from pathlib import Path + +from dials.array_family import flex from dlstbx.wrapper import Wrapper + def build_hist_from_reflections(reflections): "Iterate through the shoeboxes to a reflection and generate a pixel histogram" @@ -19,19 +23,20 @@ def build_hist_from_reflections(reflections): for sbox in shoeboxes: counter.update(sbox.data.as_numpy_array().ravel()) - sorted_counter = sorted(counter.items()) + sorted_counter = sorted(counter.items()) return zip(*sorted_counter) + def get_percentile_index(num_pixels, percentile): threshold = sum(num_pixels) * percentile - + for i, cum_sum in enumerate(accumulate(num_pixels)): if cum_sum >= threshold: return i - + return len(num_pixels) - - + + class EstimateTransmissionWrapper(Wrapper): _logger_name = "dlstbx.wrap.estimate_transmission" @@ -43,32 +48,42 @@ def run(self): results_directory = Path(params["results_directory"]) beamline = params["beamline"] - pixel_percentile = params["pixel_percentile"].get(beamline, 100) / 100 + pixel_percentile = params["pixel_percentile"].get(beamline, 100) / 100 target_countrate_pct = params["target_countrate_pct"].get(beamline, 25) / 100 oscillation = float(params["oscillation"]) transmission = float(params["transmission"]) file = params["input_file"] - commands = { - "dials.import": ["dials.import", file], - "dials.find_spots": ["dials.find_spots", "imported.expt", "ice_rings.filter=True"], - } - - for command, script in commands.items(): - result = subprocess.Popen(script, cwd=working_directory) - result.wait() + commands = [ + ("dials.import", ["dials.import", file], "imported.expt"), + ( + "dials.find_spots", + [ + "dials.find_spots", + "imported.expt", + "ice_rings.filter=True", + ], + "strong.refl", + ), + ] + + for command, script, output_file in commands: + result = subprocess.run(script, cwd=working_directory, check=True) + + while not os.path.exists(output_file): + time.sleep(1) + if result.returncode: self.log.info(f"{command} failed with return code {result.returncode}") self.log.info(result.stderr) - + self.log.debug(f"Command output:\n{result.stdout}") self.log.debug(f"From command: {script}") return False results_directory.mkdir(parents=True, exist_ok=True) output_files = ["dials.find_spots.log", "strong.refl", "imported.expt"] - - # Copy files to + for output_file in output_files: source_file = working_directory / output_file destination = results_directory / output_file @@ -79,34 +94,40 @@ def run(self): self.log.info(f"Copying {str(source_file)} to {str(destination)}") shutil.copy(source_file, destination) - + experiment_file = results_directory / "imported.expt" with experiment_file.open("r") as f: experiment = json.load(f) trusted_range = experiment["detector"][0]["panels"][0]["trusted_range"][1] reflection_file = results_directory / "strong.refl" - reflections = flex.reflection_table.from_file(reflection_file) + reflections = flex.reflection_table.from_file(reflection_file) num_counts, num_pixels = build_hist_from_reflections(reflections) index_of_percentile = get_percentile_index(num_pixels, pixel_percentile) counts_at_percentile = num_counts[index_of_percentile] - + mosaicity_corr = params.get("mosaicity_correction", False) average_to_peak = ( - self.mosaicity_correction(mosaicity_corr, oscillation) - if mosaicity_corr - else 1 + self.mosaicity_correction(mosaicity_corr, oscillation) + if mosaicity_corr + else 1 ) countrate_saturation = counts_at_percentile / trusted_range - self.log.info(f"Countrate saturation is : {counts_at_percentile / trusted_range}") + self.log.info( + f"Countrate saturation is : {counts_at_percentile / trusted_range}" + ) scale_factor = average_to_peak * target_countrate_pct / countrate_saturation scaled_transmission = min(1, (transmission * scale_factor) / 100) self.log.info(f"Scaled transmission is : {scaled_transmission}") - self.recwrap.send_to("strategy", {"parameters": {"scaled_transmission": float(scaled_transmission)}}) + self.recwrap.send_to( + "strategy", + {"parameters": {"scaled_transmission": float(scaled_transmission)}}, + ) + self.log.info("Done.") return True From c9c153ffe9a88ebd85d58a093adb007250fc443c Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Tue, 12 May 2026 15:07:37 +0100 Subject: [PATCH 04/13] Using max pixel to scale the transmission, making strategy service use relative transmission --- src/dlstbx/services/strategy.py | 42 ++++++------ src/dlstbx/wrapper/estimate_transmission.py | 76 ++++++++------------- 2 files changed, 49 insertions(+), 69 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index 49171e6c7..d5238f7a0 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -151,10 +151,11 @@ def generate_strategy( ): """Generate a strategy from the results of an upstream pipeline""" self.log.info("Received strategy request, generating strategy") - + + recipe_params = rw.recipe_step["parameters"] parameters = ChainMapWithReplacement( message.get("parameters", {}) if isinstance(message, dict) else {}, - rw.recipe_step["parameters"].get("ispyb_parameters", {}), + recipe_params, substitutions=rw.environment, ) self.log.info(f"Received parameters for strategy generation:\n{parameters}") @@ -162,20 +163,24 @@ def generate_strategy( txn = self._transport.transaction_begin(subscription_id=header["subscription"]) self._transport.ack(header, transaction=txn) + ispyb_parameters = parameters.get("ispyb_parameters", {}) beamline = ( - parameters["beamline"][0] - if isinstance(parameters["beamline"], list) - else parameters["beamline"] + ispyb_parameters["beamline"][0] + if isinstance(ispyb_parameters["beamline"], list) + else ispyb_parameters["beamline"] ) wavelength = ( - float(parameters["wavelength"][0]) - if isinstance(parameters["wavelength"], list) - else float(parameters["wavelength"]) + float(ispyb_parameters["wavelength"][0]) + if isinstance(ispyb_parameters["wavelength"], list) + else float(ispyb_parameters["wavelength"]) ) resolution_estimate = ( - float(parameters["resolution"][0]) - if isinstance(parameters["resolution"], list) - else float(parameters["resolution"]) + float(ispyb_parameters["resolution"][0]) + if isinstance(ispyb_parameters["resolution"], list) + else float(ispyb_parameters["resolution"]) + ) + dc_transmission = ( + float(recipe_params.get("transmission", 100)) / 100 ) resolution_offset = 0.5 min_resolution = 0.9 @@ -193,18 +198,15 @@ def generate_strategy( ) beamline_config = parse_config_file(beamline_config_file) - minimum_beamline_transmission = get_beamline_param( - beamline_config, ("gda.mx.udc.minTransmission",), 0.0 - ) - scaled_transmission = parameters.get("scaled_transmission", 1.0) + recommended_max_transmission = parameters.get("scaled_transmission", 1.0) - transmission_limits = ( - minimum_beamline_transmission, + transmission_limits = (get_beamline_param( + beamline_config, ("gda.mx.udc.minTransmission",), 0.0), min( get_beamline_param( beamline_config, ("gda.mx.udc.maxTransmission",), 1.0 ), - max(scaled_transmission, minimum_beamline_transmission), + recommended_max_transmission ), ) exposure_time_limits = ( @@ -311,7 +313,7 @@ def generate_strategy( ispyb_command_list.append(d) # Convert transmission to percentage for ISPyB - transmission_pct = transmission * 100 + relative_transmission_pct = transmission / dc_transmission * 100 axis_end = ( rotation_start + rotation_increment * recipe_step.number_of_images @@ -325,7 +327,7 @@ def generate_strategy( "axisstart": rotation_start, "axisend": axis_end, "exposuretime": exposure_time, - "transmission": transmission_pct, + "transmission": relative_transmission_pct, "oscillationrange": rotation_increment, "noimages": recipe_step.number_of_images, "resolution": resolution, diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index ad2f709df..105f15761 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -14,29 +14,6 @@ from dlstbx.wrapper import Wrapper - -def build_hist_from_reflections(reflections): - "Iterate through the shoeboxes to a reflection and generate a pixel histogram" - - shoeboxes = reflections["shoebox"] - counter = Counter() - for sbox in shoeboxes: - counter.update(sbox.data.as_numpy_array().ravel()) - - sorted_counter = sorted(counter.items()) - return zip(*sorted_counter) - - -def get_percentile_index(num_pixels, percentile): - threshold = sum(num_pixels) * percentile - - for i, cum_sum in enumerate(accumulate(num_pixels)): - if cum_sum >= threshold: - return i - - return len(num_pixels) - - class EstimateTransmissionWrapper(Wrapper): _logger_name = "dlstbx.wrap.estimate_transmission" @@ -48,9 +25,8 @@ def run(self): results_directory = Path(params["results_directory"]) beamline = params["beamline"] - pixel_percentile = params["pixel_percentile"].get(beamline, 100) / 100 - target_countrate_pct = params["target_countrate_pct"].get(beamline, 25) / 100 - oscillation = float(params["oscillation"]) + trusted_countrate_pct = params["trusted_countrate_pct"].get(beamline, 100) / 100 + target_countrate_pct = params["target_max_pixel_countrate_pct"].get(beamline, 50) / 100 transmission = float(params["transmission"]) file = params["input_file"] @@ -102,23 +78,24 @@ def run(self): reflection_file = results_directory / "strong.refl" reflections = flex.reflection_table.from_file(reflection_file) - num_counts, num_pixels = build_hist_from_reflections(reflections) + num_counts, _ = self.build_hist_from_reflections(reflections) - index_of_percentile = get_percentile_index(num_pixels, pixel_percentile) - counts_at_percentile = num_counts[index_of_percentile] + max_counts = num_counts[-1] - mosaicity_corr = params.get("mosaicity_correction", False) - average_to_peak = ( - self.mosaicity_correction(mosaicity_corr, oscillation) - if mosaicity_corr - else 1 - ) + max_pixel_countrate_pct = max_counts / trusted_range + if max_pixel_countrate_pct < trusted_countrate_pct: + self.log.info(f"Max pixel is less intense than the trusted value of {trusted_countrate_pct}. Pixel intensity is {max_pixel_countrate_pct}") + self.log.info(f"Recommended transmission will not be scaled") - countrate_saturation = counts_at_percentile / trusted_range - self.log.info( - f"Countrate saturation is : {counts_at_percentile / trusted_range}" - ) - scale_factor = average_to_peak * target_countrate_pct / countrate_saturation + self.recwrap.send_to( + "strategy", + {"parameters": {"scaled_transmission": transmission}}, + ) + + self.log.info("Done.") + return True + + scale_factor = target_countrate_pct / max_pixel_countrate_pct scaled_transmission = min(1, (transmission * scale_factor) / 100) self.log.info(f"Scaled transmission is : {scaled_transmission}") @@ -131,12 +108,13 @@ def run(self): self.log.info("Done.") return True - def mosaicity_correction(self, moscaicity_coefficent: float, oscillation: float): - delta_z = oscillation / (moscaicity_coefficent) * math.sqrt(2) - average_to_peak = ( - math.sqrt(math.pi) * delta_z * math.erf(delta_z) - + math.exp(-(delta_z * delta_z)) - - 1 - ) / (delta_z * delta_z) - self.log.info("Average-to-peak intensity ratio: %f", average_to_peak) - return average_to_peak + def build_hist_from_reflections(self, reflections): + "Iterate through the shoeboxes to a reflection and generate a pixel histogram" + + shoeboxes = reflections["shoebox"] + counter = Counter() + for sbox in shoeboxes: + counter.update(sbox.data.as_numpy_array().ravel()) + + sorted_counter = sorted(counter.items()) + return zip(*sorted_counter) From 0f0be5cbbc2290e1d0dc223cc96c90dfbb339960 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Tue, 12 May 2026 16:35:39 +0100 Subject: [PATCH 05/13] Changing chainmap logic to collect all parameters from recipe --- src/dlstbx/services/strategy.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index d5238f7a0..e7a877ed2 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -155,6 +155,7 @@ def generate_strategy( recipe_params = rw.recipe_step["parameters"] parameters = ChainMapWithReplacement( message.get("parameters", {}) if isinstance(message, dict) else {}, + recipe_params.get("ispyb_parameters", {}), recipe_params, substitutions=rw.environment, ) @@ -163,21 +164,20 @@ def generate_strategy( txn = self._transport.transaction_begin(subscription_id=header["subscription"]) self._transport.ack(header, transaction=txn) - ispyb_parameters = parameters.get("ispyb_parameters", {}) beamline = ( - ispyb_parameters["beamline"][0] - if isinstance(ispyb_parameters["beamline"], list) - else ispyb_parameters["beamline"] + parameters["beamline"][0] + if isinstance(parameters["beamline"], list) + else parameters["beamline"] ) wavelength = ( - float(ispyb_parameters["wavelength"][0]) - if isinstance(ispyb_parameters["wavelength"], list) - else float(ispyb_parameters["wavelength"]) + float(parameters["wavelength"][0]) + if isinstance(parameters["wavelength"], list) + else float(parameters["wavelength"]) ) resolution_estimate = ( - float(ispyb_parameters["resolution"][0]) - if isinstance(ispyb_parameters["resolution"], list) - else float(ispyb_parameters["resolution"]) + float(parameters["resolution"][0]) + if isinstance(parameters["resolution"], list) + else float(parameters["resolution"]) ) dc_transmission = ( float(recipe_params.get("transmission", 100)) / 100 From 2a83cdeac558d1d7901bc7dc4fa7eaa15dcd4014 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Tue, 12 May 2026 16:47:28 +0100 Subject: [PATCH 06/13] fixing typo --- src/dlstbx/services/strategy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index e7a877ed2..d0173cbe7 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -180,7 +180,7 @@ def generate_strategy( else float(parameters["resolution"]) ) dc_transmission = ( - float(recipe_params.get("transmission", 100)) / 100 + float(parameters.get("transmission", 100)) / 100 ) resolution_offset = 0.5 min_resolution = 0.9 From 7aa9261c4c23f60ef7bcff8ae36b9bf5634b2cc8 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Wed, 13 May 2026 14:20:50 +0100 Subject: [PATCH 07/13] Saving the histogram to a json file --- src/dlstbx/wrapper/estimate_transmission.py | 58 +++++++++++---------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 105f15761..33f00f41b 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -1,15 +1,11 @@ from __future__ import annotations import json -import math -import os import shutil import subprocess -import time from collections import Counter -from itertools import accumulate from pathlib import Path - +from itertools import accumulate from dials.array_family import flex from dlstbx.wrapper import Wrapper @@ -31,23 +27,15 @@ def run(self): file = params["input_file"] commands = [ - ("dials.import", ["dials.import", file], "imported.expt"), - ( - "dials.find_spots", - [ - "dials.find_spots", - "imported.expt", - "ice_rings.filter=True", - ], - "strong.refl", + ("dials.import", ["dials.import", file]), + ("dials.find_spots", [ "dials.find_spots", + "imported.expt", + "ice_rings.filter=True"], ), ] - for command, script, output_file in commands: - result = subprocess.run(script, cwd=working_directory, check=True) - - while not os.path.exists(output_file): - time.sleep(1) + for command, script in commands: + result = subprocess.run(script, cwd=working_directory, check=True) if result.returncode: self.log.info(f"{command} failed with return code {result.returncode}") @@ -57,8 +45,18 @@ def run(self): self.log.debug(f"From command: {script}") return False + experiment_file = results_directory / "imported.expt" + with experiment_file.open("r") as f: + experiment = json.load(f) + trusted_range = experiment["detector"][0]["panels"][0]["trusted_range"][1] + + reflection_file = results_directory / "strong.refl" + reflections = flex.reflection_table.from_file(reflection_file) + counts_hist = self.build_hist_from_reflections(reflections) + self.save_hist_to_json(counts_hist, trusted_range, results_directory) + results_directory.mkdir(parents=True, exist_ok=True) - output_files = ["dials.find_spots.log", "strong.refl", "imported.expt"] + output_files = ["dials.find_spots.log"] for output_file in output_files: source_file = working_directory / output_file @@ -71,15 +69,8 @@ def run(self): self.log.info(f"Copying {str(source_file)} to {str(destination)}") shutil.copy(source_file, destination) - experiment_file = results_directory / "imported.expt" - with experiment_file.open("r") as f: - experiment = json.load(f) - trusted_range = experiment["detector"][0]["panels"][0]["trusted_range"][1] - - reflection_file = results_directory / "strong.refl" - reflections = flex.reflection_table.from_file(reflection_file) - num_counts, _ = self.build_hist_from_reflections(reflections) + num_counts, num_spots = counts_hist max_counts = num_counts[-1] max_pixel_countrate_pct = max_counts / trusted_range @@ -118,3 +109,14 @@ def build_hist_from_reflections(self, reflections): sorted_counter = sorted(counter.items()) return zip(*sorted_counter) + + def save_hist_to_json(self, hist, max_trusted_value, results_dir): + counts_pixel_data = dict(hist) + + results_path = results_dir / "overload.json" + self.log.info("Saving counts histogram to", results_path) + with open(results_path, 'w') as f: + json.dump({ "counts": counts_pixel_data, + "overload_limit": max_trusted_value}, f) + + self.log.info("Saved.") From ae25331c2f47423135c1c6986d21586cddf9524a Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Wed, 13 May 2026 16:06:34 +0100 Subject: [PATCH 08/13] Using percentile pixel --- src/dlstbx/wrapper/estimate_transmission.py | 74 ++++++++++----------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 33f00f41b..4d5ecb53f 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -21,8 +21,8 @@ def run(self): results_directory = Path(params["results_directory"]) beamline = params["beamline"] - trusted_countrate_pct = params["trusted_countrate_pct"].get(beamline, 100) / 100 - target_countrate_pct = params["target_max_pixel_countrate_pct"].get(beamline, 50) / 100 + pixel_percentile = params["pixel_percentile"].get(beamline, 100) / 100 + target_countrate_pct = params["target_countrate_pct"].get(beamline, 50) / 100 transmission = float(params["transmission"]) file = params["input_file"] @@ -45,19 +45,35 @@ def run(self): self.log.debug(f"From command: {script}") return False - experiment_file = results_directory / "imported.expt" + experiment_file = working_directory / "imported.expt" with experiment_file.open("r") as f: experiment = json.load(f) trusted_range = experiment["detector"][0]["panels"][0]["trusted_range"][1] - reflection_file = results_directory / "strong.refl" + reflection_file = working_directory / "strong.refl" reflections = flex.reflection_table.from_file(reflection_file) counts_hist = self.build_hist_from_reflections(reflections) - self.save_hist_to_json(counts_hist, trusted_range, results_directory) + + num_counts = list(counts_hist.keys()) + num_pixels = list(counts_hist.values()) + + index_of_pixel_percentile = self.get_percentile_index(num_pixels, pixel_percentile) + counts_at_percentile = int(num_counts[index_of_pixel_percentile]) + + pixel_countrate_pct = counts_at_percentile / trusted_range + self.log.info(f"The countrate percentage of the {pixel_percentile}% most intense pixel is {pixel_countrate_pct}") + scale_factor = target_countrate_pct / pixel_countrate_pct + + scaled_transmission = min(1, (transmission * scale_factor) / 100) + self.log.info(f"Scaled transmission is : {scaled_transmission}") + + self.recwrap.send_to( + "strategy", + {"parameters": {"scaled_transmission": float(scaled_transmission)}}, + ) results_directory.mkdir(parents=True, exist_ok=True) output_files = ["dials.find_spots.log"] - for output_file in output_files: source_file = working_directory / output_file destination = results_directory / output_file @@ -69,32 +85,7 @@ def run(self): self.log.info(f"Copying {str(source_file)} to {str(destination)}") shutil.copy(source_file, destination) - - num_counts, num_spots = counts_hist - max_counts = num_counts[-1] - - max_pixel_countrate_pct = max_counts / trusted_range - if max_pixel_countrate_pct < trusted_countrate_pct: - self.log.info(f"Max pixel is less intense than the trusted value of {trusted_countrate_pct}. Pixel intensity is {max_pixel_countrate_pct}") - self.log.info(f"Recommended transmission will not be scaled") - - self.recwrap.send_to( - "strategy", - {"parameters": {"scaled_transmission": transmission}}, - ) - - self.log.info("Done.") - return True - - scale_factor = target_countrate_pct / max_pixel_countrate_pct - - scaled_transmission = min(1, (transmission * scale_factor) / 100) - self.log.info(f"Scaled transmission is : {scaled_transmission}") - - self.recwrap.send_to( - "strategy", - {"parameters": {"scaled_transmission": float(scaled_transmission)}}, - ) + self.save_hist_to_json(counts_hist, trusted_range, results_directory) self.log.info("Done.") return True @@ -108,15 +99,22 @@ def build_hist_from_reflections(self, reflections): counter.update(sbox.data.as_numpy_array().ravel()) sorted_counter = sorted(counter.items()) - return zip(*sorted_counter) + return {str(int(k)): v for k, v in sorted_counter} - def save_hist_to_json(self, hist, max_trusted_value, results_dir): - counts_pixel_data = dict(hist) + def get_percentile_index(self, num_pixels, percentile): + threshold = sum(num_pixels) * percentile + for i, cum_sum in enumerate(accumulate(num_pixels)): + if cum_sum >= threshold: + return i + + return len(num_pixels) + + def save_hist_to_json(self, hist, max_trusted_value, results_dir): results_path = results_dir / "overload.json" - self.log.info("Saving counts histogram to", results_path) + self.log.info(f"Saving counts histogram to {str(results_path)}") with open(results_path, 'w') as f: - json.dump({ "counts": counts_pixel_data, - "overload_limit": max_trusted_value}, f) + json.dump({ "counts": hist, + "overload_limit": max_trusted_value}, f, indent=2) self.log.info("Saved.") From 14b479ff87994750a91c4acb6863df374e6f9c28 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Thu, 14 May 2026 11:28:01 +0100 Subject: [PATCH 09/13] Adding phasing agamemnon recipe to strategy --- src/dlstbx/services/strategy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index d0173cbe7..3d94ce9cc 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -222,7 +222,7 @@ def generate_strategy( ), ) - recipes = {"OSC.yaml": "Native", "Ligand binding.yaml": "Ligand"} + recipes = {"OSC.yaml": "Native", "Ligand binding.yaml": "Ligand", "SAD.yaml": "Phasing"} ispyb_command_list = [] for recipe, recipe_alias in recipes.items(): From 70e592242336bf371348fa2c7a944e5586127122 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Thu, 14 May 2026 11:41:08 +0100 Subject: [PATCH 10/13] to make udc strategy not be triggered for multiple programs and for xia2 dials --- src/dlstbx/services/trigger.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index d74ff5d1e..cab3cdd19 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta from time import time from typing import Any, Dict, List, Literal, Mapping, Optional +from itertools import chain import gemmi import ispyb @@ -2880,12 +2881,38 @@ def trigger_strategy( ) return {"success": True} - if parameters.beamline not in ["i03", "i04", "i04-1"]: + if parameters.beamline not in ["i04"]: self.log.info( f"Skipping strategy trigger: beamline {parameters.beamline} not supported" ) return {"success": True} + find_process_program = ( + session.query(AutoProcProgram.processingPrograms) + .join(ProcessingJob, AutoProcProgram.processingJobId == ProcessingJob.processingJobId) + .filter(ProcessingJob.dataCollectionId == parameters.dcid) + ) + + curr_program = ( + find_process_program + .filter(AutoProcProgram.autoProcProgramId == parameters.program_id) + .scalar() + ) + # xia2 dials occassionaly gives optimistic estimate for resolution + if 'curr_program' == "xia2 dials": + self.log.info(f"Skipping strategy trigger for dcid={parameters.dcid} from program: xia2 dials.") + return {"success": True} + + udc_strategy_previously_triggered = ( + find_process_program + .filter(AutoProcProgram.processingPrograms == "UDC strategy") + .filter(AutoProcProgram.processingStatus == 1) + .all() + ) + if udc_strategy_previously_triggered: + self.log.info(f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}.") + return {"success": True} + # Get resolution estimate from ispyb records for upstream pipeline - returns None if not found. resolution = ( session.query(AutoProcScalingStatistics.resolutionLimitHigh) From 81064e632d20c279df803eefdad385e612286577 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Fri, 15 May 2026 11:59:51 +0100 Subject: [PATCH 11/13] Plotting the data into the logfile --- src/dlstbx/wrapper/estimate_transmission.py | 94 ++++++++++++++++----- 1 file changed, 73 insertions(+), 21 deletions(-) diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 4d5ecb53f..f0840531d 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -1,15 +1,18 @@ from __future__ import annotations import json +import math import shutil import subprocess from collections import Counter -from pathlib import Path from itertools import accumulate +from pathlib import Path + from dials.array_family import flex from dlstbx.wrapper import Wrapper + class EstimateTransmissionWrapper(Wrapper): _logger_name = "dlstbx.wrap.estimate_transmission" @@ -28,14 +31,14 @@ def run(self): commands = [ ("dials.import", ["dials.import", file]), - ("dials.find_spots", [ "dials.find_spots", - "imported.expt", - "ice_rings.filter=True"], + ( + "dials.find_spots", + ["dials.find_spots", "imported.expt", "ice_rings.filter=True"], ), ] for command, script in commands: - result = subprocess.run(script, cwd=working_directory, check=True) + result = subprocess.run(script, cwd=working_directory, check=True) if result.returncode: self.log.info(f"{command} failed with return code {result.returncode}") @@ -52,17 +55,21 @@ def run(self): reflection_file = working_directory / "strong.refl" reflections = flex.reflection_table.from_file(reflection_file) - counts_hist = self.build_hist_from_reflections(reflections) + counts_hist = self.collect_counts_from_reflections(reflections) num_counts = list(counts_hist.keys()) num_pixels = list(counts_hist.values()) - index_of_pixel_percentile = self.get_percentile_index(num_pixels, pixel_percentile) + index_of_pixel_percentile = self.get_percentile_index( + num_pixels, pixel_percentile + ) counts_at_percentile = int(num_counts[index_of_pixel_percentile]) - - pixel_countrate_pct = counts_at_percentile / trusted_range - self.log.info(f"The countrate percentage of the {pixel_percentile}% most intense pixel is {pixel_countrate_pct}") - scale_factor = target_countrate_pct / pixel_countrate_pct + + pixel_countrate_pct = counts_at_percentile / trusted_range + self.log.info( + f"The countrate percentage of the {pixel_percentile * 100}% most intense pixel is {pixel_countrate_pct * 100}% of the trusted value" + ) + scale_factor = target_countrate_pct / pixel_countrate_pct scaled_transmission = min(1, (transmission * scale_factor) / 100) self.log.info(f"Scaled transmission is : {scaled_transmission}") @@ -71,12 +78,18 @@ def run(self): "strategy", {"parameters": {"scaled_transmission": float(scaled_transmission)}}, ) - + results_directory.mkdir(parents=True, exist_ok=True) - output_files = ["dials.find_spots.log"] - for output_file in output_files: - source_file = working_directory / output_file - destination = results_directory / output_file + log_file_name = [ + f for f in working_directory.glob("*.out") if "slurm" in str(f) + ][0] + output_files = { + "dials.find_spots.log": "dials.find_spots.log", + log_file_name: "estimate_transmission.log", + } + for src_file_name, dest_file_name in output_files.items(): + source_file = working_directory / src_file_name + destination = results_directory / dest_file_name if not source_file.exists(): self.log.info(f"{source_file=} does not exsist") @@ -85,12 +98,13 @@ def run(self): self.log.info(f"Copying {str(source_file)} to {str(destination)}") shutil.copy(source_file, destination) + self.draw_plot(num_counts, num_pixels) self.save_hist_to_json(counts_hist, trusted_range, results_directory) self.log.info("Done.") return True - def build_hist_from_reflections(self, reflections): + def collect_counts_from_reflections(self, reflections): "Iterate through the shoeboxes to a reflection and generate a pixel histogram" shoeboxes = reflections["shoebox"] @@ -113,8 +127,46 @@ def get_percentile_index(self, num_pixels, percentile): def save_hist_to_json(self, hist, max_trusted_value, results_dir): results_path = results_dir / "overload.json" self.log.info(f"Saving counts histogram to {str(results_path)}") - with open(results_path, 'w') as f: - json.dump({ "counts": hist, - "overload_limit": max_trusted_value}, f, indent=2) - + with open(results_path, "w") as f: + json.dump( + {"counts": hist, "overload_limit": max_trusted_value}, f, indent=2 + ) + self.log.info("Saved.") + + def draw_plot(self, counts, pixels): + """Create ASCII art histogram""" + + self.log.info("Plotting pixel intensities...") + + width, height = 60, 20 + title = "'Pixel intensity distribution'" + xlabel = "'Num counts'" + ylabel = "'Counts'" + + command = ["gnuplot"] + plot_commands = [ + "set term dumb %d %d" % (width, height - 2), + "set logscale y", + "set logscale x", + "set ytics out", + "set title %s" % title, + "set xlabel %s" % xlabel, + "set ylabel %s offset character %d,0" % (ylabel, len(ylabel) // 2), + ] + + data_string = "\n".join(f"{x} {y}" for x, y in zip(counts[1:], pixels[1:])) + + plot_commands.append("plot '-' using 1:2 title '' with lines") + plot_commands.append(data_string) + + try: + result = subprocess.run( + command, input="\n".join(plot_commands), text=True, capture_output=True + ) + except (OSError, subprocess.TimeoutExpired) as e: + self.log.info("Error plotting counts vs pixels") + self.log.info(e) + return + else: + self.log.info(result.stdout) From 7b4ccd875e70fce77684fa1081a406ce788ce9c0 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Fri, 15 May 2026 12:01:28 +0100 Subject: [PATCH 12/13] populating dose in subwedge table --- src/dlstbx/services/strategy.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index 3d94ce9cc..bd7030ccf 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -151,11 +151,11 @@ def generate_strategy( ): """Generate a strategy from the results of an upstream pipeline""" self.log.info("Received strategy request, generating strategy") - + recipe_params = rw.recipe_step["parameters"] parameters = ChainMapWithReplacement( message.get("parameters", {}) if isinstance(message, dict) else {}, - recipe_params.get("ispyb_parameters", {}), + recipe_params.get("ispyb_parameters", {}), recipe_params, substitutions=rw.environment, ) @@ -331,6 +331,7 @@ def generate_strategy( "oscillationrange": rotation_increment, "noimages": recipe_step.number_of_images, "resolution": resolution, + "doseTotal": dose, "ispyb_command": "insert_screening_strategy_sub_wedge", "screening_strategy_wedge_id": f"$ispyb_screening_strategy_wedge_id_{n_step}", "store_result": f"ispyb_screening_strategy_sub_wedge_id_{n_step}", From 5afd1676b52b7e4f441056addc0f69212987b9b4 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Fri, 15 May 2026 12:04:16 +0100 Subject: [PATCH 13/13] Stopping a second udc strategy from running regardless of processing status --- src/dlstbx/services/trigger.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index cab3cdd19..f8f69e07f 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -4,9 +4,9 @@ import pathlib import re from datetime import datetime, timedelta +from itertools import chain from time import time from typing import Any, Dict, List, Literal, Mapping, Optional -from itertools import chain import gemmi import ispyb @@ -2889,28 +2889,30 @@ def trigger_strategy( find_process_program = ( session.query(AutoProcProgram.processingPrograms) - .join(ProcessingJob, AutoProcProgram.processingJobId == ProcessingJob.processingJobId) + .join( + ProcessingJob, + AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + ) .filter(ProcessingJob.dataCollectionId == parameters.dcid) ) - curr_program = ( - find_process_program - .filter(AutoProcProgram.autoProcProgramId == parameters.program_id) - .scalar() - ) + curr_program = find_process_program.filter( + AutoProcProgram.autoProcProgramId == parameters.program_id + ).scalar() # xia2 dials occassionaly gives optimistic estimate for resolution - if 'curr_program' == "xia2 dials": - self.log.info(f"Skipping strategy trigger for dcid={parameters.dcid} from program: xia2 dials.") + if curr_program == "xia2 dials": + self.log.info( + f"Skipping strategy trigger for dcid={parameters.dcid} from program: xia2 dials." + ) return {"success": True} - udc_strategy_previously_triggered = ( - find_process_program - .filter(AutoProcProgram.processingPrograms == "UDC strategy") - .filter(AutoProcProgram.processingStatus == 1) - .all() - ) + udc_strategy_previously_triggered = find_process_program.filter( + AutoProcProgram.processingPrograms == "UDC strategy" + ).all() if udc_strategy_previously_triggered: - self.log.info(f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}.") + self.log.info( + f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}." + ) return {"success": True} # Get resolution estimate from ispyb records for upstream pipeline - returns None if not found.