diff --git a/.github/instructions/scenarios.instructions.md b/.github/instructions/scenarios.instructions.md index 9261dca942..0c491da2fe 100644 --- a/.github/instructions/scenarios.instructions.md +++ b/.github/instructions/scenarios.instructions.md @@ -11,11 +11,15 @@ Scenarios orchestrate multi-attack security testing campaigns. Each scenario gro All scenarios inherit from `Scenario` (ABC) and must: 1. **Define `VERSION`** as a class constant (increment on breaking changes) -2. **Implement three abstract methods:** +2. **Optionally declare `BASELINE_POLICY`** (defaults to `BaselinePolicy.Enabled` — a baseline `PromptSendingAttack` is prepended and callers can opt out per run via `initialize_async(include_baseline=False)`): + - `BaselinePolicy.Disabled` — baseline supported but off by default (e.g. `Jailbreak`, where templates dominate the run). + - `BaselinePolicy.Forbidden` — baseline is meaningless for this scenario's comparison axis (e.g. `AdversarialBenchmark`, which compares against gold-standard answers). Explicit `include_baseline=True` raises `ValueError`. +3. **Implement three abstract methods:** ```python class MyScenario(Scenario): VERSION: int = 1 + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Enabled @classmethod def get_strategy_class(cls) -> type[ScenarioStrategy]: @@ -30,7 +34,7 @@ class MyScenario(Scenario): return DatasetConfiguration(dataset_names=["my_dataset"]) ``` -3. **Optionally override `_get_atomic_attacks_async()`** — the base class provides a default +4. **Optionally override `_get_atomic_attacks_async()`** — the base class provides a default that uses the factory/registry pattern (see "AtomicAttack Construction" below). Only override if your scenario needs custom attack construction logic. @@ -154,6 +158,8 @@ The default implementation: Only override when the scenario **cannot** use the factory/registry pattern — e.g., scenarios with custom composite logic, per-strategy converter stacks, or non-standard attack construction. +Overrides that want baseline support must emit it themselves by calling `self._build_baseline_atomic_attack(seed_groups=...)` with the same seeds used for the strategy attacks and prepending the result. The base implementation emits baseline automatically; passing freshly resolved seeds reintroduces ADO 9012 (baseline-vs-strategy population divergence under `max_dataset_size`). + ### Manual AtomicAttack construction (for overrides): ```python diff --git a/doc/code/scenarios/0_scenarios.ipynb b/doc/code/scenarios/0_scenarios.ipynb index 7cb24539a8..82d604b935 100644 --- a/doc/code/scenarios/0_scenarios.ipynb +++ b/doc/code/scenarios/0_scenarios.ipynb @@ -74,7 +74,6 @@ " - `version`: Integer version number\n", " - `strategy_class`: The strategy enum class for this scenario\n", " - `objective_scorer_identifier`: Identifier dict for the scoring mechanism (optional)\n", - " - `include_default_baseline`: Whether to include a baseline attack (default: True)\n", " - `scenario_result_id`: Optional ID to resume an existing scenario (optional)\n", "\n", "5. **Initialization**: Call `await scenario.initialize_async()` to populate atomic attacks:\n", @@ -83,6 +82,8 @@ " - `max_concurrency`: Number of concurrent operations (default: 1)\n", " - `max_retries`: Number of retry attempts on failure (default: 0)\n", " - `memory_labels`: Optional labels for tracking (optional)\n", + " - `include_baseline`: Whether to prepend a baseline attack (defaults to the scenario type's\n", + " `BASELINE_POLICY`; most scenarios default it on, `Jailbreak` defaults it off)\n", "\n", "### Example Structure\n", "\n", @@ -101,9 +102,15 @@ "name": "stdout", "output_type": "stream", "text": [ - "Found default environment files: ['./.pyrit/.env', './.pyrit/.env.local']\n", - "Loaded environment file: ./.pyrit/.env\n", - "Loaded environment file: ./.pyrit/.env.local\n" + "Found default environment files: ['./.pyrit/.env']\n", + "Loaded environment file: ./.pyrit/.env\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "No new upgrade operations detected.\n" ] } ], @@ -193,34 +200,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "Found default environment files: ['./.pyrit/.env', './.pyrit/.env.local']\n", + "Loading default configuration file: ./.pyrit/.pyrit_conf\n", + "Found default environment files: ['./.pyrit/.env']\n", "Loaded environment file: ./.pyrit/.env\n", - "Loaded environment file: ./.pyrit/.env.local\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ "\n", "Available Scenarios:\n", "================================================================================\n", "\u001b[1m\u001b[36m\n", - " airt.content_harms\u001b[0m\n", - " Class: ContentHarms\n", - " Description:\n", - " Content Harms Scenario implementation for PyRIT. This scenario contains\n", - " various harm-based checks that you can run to get a quick idea about\n", - " model behavior with respect to certain harm categories.\n", - " Aggregate Strategies:\n", - " - all\n", - " Available Strategies (7):\n", - " hate, fairness, violence, sexual, harassment, misinformation, leakage\n", - " Default Strategy: all\n", - " Default Datasets (7, max 4 per dataset):\n", - " airt_hate, airt_fairness, airt_violence, airt_sexual, airt_harassment,\n", - " airt_misinformation, airt_leakage\n", - "\u001b[1m\u001b[36m\n", " airt.cyber\u001b[0m\n", " Class: Cyber\n", " Description:\n", @@ -229,9 +215,9 @@ " Cyber class contains different variations of the malware generation\n", " techniques.\n", " Aggregate Strategies:\n", - " - all\n", + " - all, single_turn, multi_turn\n", " Available Strategies (2):\n", - " single_turn, multi_turn\n", + " prompt_sending, red_teaming\n", " Default Strategy: all\n", " Default Datasets (1, max 4 per dataset):\n", " airt_malware\n", @@ -256,14 +242,14 @@ " Description:\n", " Leakage scenario implementation for PyRIT. This scenario tests how\n", " susceptible models are to leaking training data, PII, intellectual\n", - " property, or other confidential information. The Leakage class\n", - " contains different attack variations designed to extract sensitive\n", - " information from models.\n", + " property, or other confidential information. Uses the registry/factory\n", + " pattern to construct attack techniques.\n", " Aggregate Strategies:\n", - " - all, single_turn, multi_turn, ip, sensitive_data\n", - " Available Strategies (4):\n", - " first_letter, image, role_play, crescendo\n", - " Default Strategy: all\n", + " - all, default, single_turn, multi_turn\n", + " Available Strategies (9):\n", + " prompt_sending, role_play, many_shot, tap, crescendo_simulated,\n", + " red_teaming, context_compliance, first_letter, image\n", + " Default Strategy: default\n", " Default Datasets (1, max 4 per dataset):\n", " airt_leakage\n", "\u001b[1m\u001b[36m\n", @@ -296,6 +282,21 @@ " Default Datasets (1, max 4 per dataset):\n", " airt_imminent_crisis\n", "\u001b[1m\u001b[36m\n", + " airt.rapid_response\u001b[0m\n", + " Class: RapidResponse\n", + " Description:\n", + " Rapid Response scenario for content-harms testing. Tests model behavior\n", + " across multiple harm categories using selectable attack techniques.\n", + " Aggregate Strategies:\n", + " - all, default, single_turn, multi_turn\n", + " Available Strategies (7):\n", + " prompt_sending, role_play, many_shot, tap, crescendo_simulated,\n", + " red_teaming, context_compliance\n", + " Default Strategy: default\n", + " Default Datasets (7, max 4 per dataset):\n", + " airt_hate, airt_fairness, airt_violence, airt_sexual, airt_harassment,\n", + " airt_misinformation, airt_leakage\n", + "\u001b[1m\u001b[36m\n", " airt.scam\u001b[0m\n", " Class: Scam\n", " Description:\n", @@ -309,6 +310,21 @@ " Default Strategy: all\n", " Default Datasets (1, max 4 per dataset):\n", " airt_scams\n", + " Supported Parameters:\n", + " - max_turns (int) [default: 5]: Maximum conversation turns for the persuasive_rta strategy.\n", + "\u001b[1m\u001b[36m\n", + " benchmark.adversarial\u001b[0m\n", + " Class: AdversarialBenchmark\n", + " Description:\n", + " Benchmarking scenario that compares the attack success rate (ASR) of\n", + " several different adversarial models.\n", + " Aggregate Strategies:\n", + " - all, default, single_turn, multi_turn, light\n", + " Available Strategies (4):\n", + " role_play, tap, red_teaming, context_compliance\n", + " Default Strategy: light\n", + " Default Datasets (1, max 8 per dataset):\n", + " harmbench\n", "\u001b[1m\u001b[36m\n", " foundry.red_team_agent\u001b[0m\n", " Class: RedTeamAgent\n", @@ -359,7 +375,7 @@ "\n", "================================================================================\n", "\n", - "Total scenarios: 8\n" + "Total scenarios: 9\n" ] }, { @@ -389,11 +405,22 @@ "\n", "Every scenario can optionally include a **baseline attack** — a `PromptSendingAttack` that sends\n", "each objective directly to the target without any converters or multi-turn techniques. This is\n", - "controlled by the `include_default_baseline` parameter (default: `True` for most scenarios).\n", - "\n", - "To run *only* the baseline (no attack strategies), create a `RedTeamAgent` with\n", - "`include_baseline=True` (the default) and pass `scenario_strategies=None`. See\n", - "[Common Scenario Parameters](./1_common_scenario_parameters.ipynb) for a working example." + "controlled by the `include_baseline` parameter on `initialize_async`; when omitted, each\n", + "scenario falls back to its own `BASELINE_POLICY` class attribute (most scenarios default\n", + "it on; `Jailbreak` defaults it off). See\n", + "[Common Scenario Parameters](./1_common_scenario_parameters.ipynb) for a worked example.\n", + "\n", + "Custom scenarios should choose their `BASELINE_POLICY` based on whether an unmodified\n", + "prompt is a meaningful comparator for the scenario's strategies:\n", + "\n", + "- **`Enabled`** — the baseline is prepended by default and the caller can opt out. Use when an\n", + " unmodified-prompt run is a meaningful comparison point (most scenarios).\n", + "- **`Disabled`** — the baseline is supported but omitted by default; the caller must opt in. Use\n", + " when the scenario is already dominated by a large set of templates/strategies that already\n", + " exercise the unmodified surface (e.g., `Jailbreak`).\n", + "- **`Forbidden`** — the baseline is unavailable and passing `include_baseline=True` raises. Use\n", + " when the scenario's semantics make a single-shot unmodified prompt meaningless as a comparator\n", + " (e.g., benchmarks comparing across adversarial models, or multi-turn-only scenarios)." ] }, { @@ -436,7 +463,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.15" + "version": "3.12.13" } }, "nbformat": 4, diff --git a/doc/code/scenarios/0_scenarios.py b/doc/code/scenarios/0_scenarios.py index 6788630572..9e86edf32c 100644 --- a/doc/code/scenarios/0_scenarios.py +++ b/doc/code/scenarios/0_scenarios.py @@ -76,7 +76,6 @@ # - `version`: Integer version number # - `strategy_class`: The strategy enum class for this scenario # - `objective_scorer_identifier`: Identifier dict for the scoring mechanism (optional) -# - `include_default_baseline`: Whether to include a baseline attack (default: True) # - `scenario_result_id`: Optional ID to resume an existing scenario (optional) # # 5. **Initialization**: Call `await scenario.initialize_async()` to populate atomic attacks: @@ -85,6 +84,8 @@ # - `max_concurrency`: Number of concurrent operations (default: 1) # - `max_retries`: Number of retry attempts on failure (default: 0) # - `memory_labels`: Optional labels for tracking (optional) +# - `include_baseline`: Whether to prepend a baseline attack (defaults to the scenario type's +# `BASELINE_POLICY`; most scenarios default it on, `Jailbreak` defaults it off) # # ### Example Structure # @@ -174,11 +175,22 @@ def _build_display_group(self, *, technique_name: str, seed_group_name: str) -> # # Every scenario can optionally include a **baseline attack** — a `PromptSendingAttack` that sends # each objective directly to the target without any converters or multi-turn techniques. This is -# controlled by the `include_default_baseline` parameter (default: `True` for most scenarios). -# -# To run *only* the baseline (no attack strategies), create a `RedTeamAgent` with -# `include_baseline=True` (the default) and pass `scenario_strategies=None`. See -# [Common Scenario Parameters](./1_common_scenario_parameters.ipynb) for a working example. +# controlled by the `include_baseline` parameter on `initialize_async`; when omitted, each +# scenario falls back to its own `BASELINE_POLICY` class attribute (most scenarios default +# it on; `Jailbreak` defaults it off). See +# [Common Scenario Parameters](./1_common_scenario_parameters.ipynb) for a worked example. +# +# Custom scenarios should choose their `BASELINE_POLICY` based on whether an unmodified +# prompt is a meaningful comparator for the scenario's strategies: +# +# - **`Enabled`** — the baseline is prepended by default and the caller can opt out. Use when an +# unmodified-prompt run is a meaningful comparison point (most scenarios). +# - **`Disabled`** — the baseline is supported but omitted by default; the caller must opt in. Use +# when the scenario is already dominated by a large set of templates/strategies that already +# exercise the unmodified surface (e.g., `Jailbreak`). +# - **`Forbidden`** — the baseline is unavailable and passing `include_baseline=True` raises. Use +# when the scenario's semantics make a single-shot unmodified prompt meaningless as a comparator +# (e.g., benchmarks comparing across adversarial models, or multi-turn-only scenarios). # %% [markdown] # diff --git a/doc/code/scenarios/1_common_scenario_parameters.ipynb b/doc/code/scenarios/1_common_scenario_parameters.ipynb index 4c9923e2a3..fc5d8f12a5 100644 --- a/doc/code/scenarios/1_common_scenario_parameters.ipynb +++ b/doc/code/scenarios/1_common_scenario_parameters.ipynb @@ -32,9 +32,15 @@ "name": "stdout", "output_type": "stream", "text": [ - "Found default environment files: ['./.pyrit/.env', './.pyrit/.env.local']\n", - "Loaded environment file: ./.pyrit/.env\n", - "Loaded environment file: ./.pyrit/.env.local\n" + "Found default environment files: ['./.pyrit/.env']\n", + "Loaded environment file: ./.pyrit/.env\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "No new upgrade operations detected.\n" ] }, { @@ -210,8 +216,8 @@ "## Baseline Execution\n", "\n", "The baseline sends each objective directly to the target without any converters or multi-turn\n", - "strategies. It is included automatically when `include_baseline=True` (the default). This is\n", - "useful for:\n", + "strategies. It is included automatically when `initialize_async` is called with\n", + "`include_baseline=True` (the default for scenarios that support a baseline). This is useful for:\n", "\n", "- **Measuring default defenses** — how does the target respond to unmodified harmful prompts?\n", "- **Establishing comparison points** — compare baseline refusal rates against attack-enhanced runs\n", @@ -227,12 +233,12 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "bd475b00d8c845048a8a85d817baee53", + "model_id": "83d7df3d03e644e786db59f46dba22ef", "version_major": 2, "version_minor": 0 }, "text/plain": [ - "Executing RedTeamAgent: 0%| | 0/1 [00:00 None: """ Raise if any value in ``params`` cannot round-trip through JSON. @@ -117,6 +139,14 @@ class Scenario(ABC): #: what the scenario needs. Validated in ``initialize_async`` once the target is supplied. TARGET_REQUIREMENTS: ClassVar[TargetRequirements] = TargetRequirements() + #: How this scenario type treats the default baseline atomic attack. Subclasses override + #: when their semantics call for a different default (``Disabled``) or when a baseline + #: is meaningless for the comparison the scenario performs (``Forbidden``). Resolved in + #: ``initialize_async`` and overridable per run via ``include_baseline`` for the + #: ``Enabled`` and ``Disabled`` states; ``Forbidden`` is a hard constraint and a + #: caller-supplied ``include_baseline=True`` raises ``ValueError``. + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Enabled + @classmethod def _get_additional_scoring_questions(cls) -> Sequence[Path]: """ @@ -136,8 +166,8 @@ def __init__( version: int, strategy_class: type[ScenarioStrategy], objective_scorer: Scorer, - include_default_baseline: bool = True, scenario_result_id: Optional[Union[uuid.UUID, str]] = None, + include_default_baseline: bool | None = None, # Deprecated. Will be removed in 0.16.0. ) -> None: """ Initialize a scenario. @@ -147,14 +177,14 @@ def __init__( version (int): Version number of the scenario. strategy_class (Type[ScenarioStrategy]): The strategy enum class for this scenario. objective_scorer (Scorer): The objective scorer used to evaluate attack results. - include_default_baseline (bool): Whether to include a baseline atomic attack that sends all objectives - without modifications. Most scenarios should have some kind of baseline so users can understand - the impact of strategies, but subclasses can optionally write their own custom baselines. - Defaults to True. scenario_result_id (Optional[Union[uuid.UUID, str]]): Optional ID of an existing scenario result to resume. Can be either a UUID object or a string representation of a UUID. If provided and found in memory, the scenario will resume from prior progress. All other parameters must still match the stored scenario configuration. + include_default_baseline (bool | None): **Deprecated.** Will be removed in 0.16.0. + Pass ``include_baseline`` to ``initialize_async`` instead. When set, the value is + used as the effective ``include_baseline`` for the next ``initialize_async`` call + unless that call passes its own ``include_baseline``. Note: Attack runs are populated by calling initialize_async(), which invokes the @@ -190,8 +220,6 @@ def __init__( self._scenario_result_id: Optional[str] = str(scenario_result_id) if scenario_result_id else None self._result_lock = asyncio.Lock() - self._include_baseline = include_default_baseline - # Store prepared strategies for use in _get_atomic_attacks_async self._scenario_strategies: list[ScenarioStrategy] = [] @@ -206,6 +234,22 @@ def __init__( self.params: dict[str, Any] = {} self._declarations_validated: bool = False + # Resolved effective baseline inclusion for the current run. Set in initialize_async + # before _get_atomic_attacks_async is awaited so overrides can read it. + self._include_baseline: bool = False + + # Deprecated constructor-time baseline override. Will be removed in 0.16.0, along + # with the include_default_baseline kwarg above and the legacy fallback branch in + # initialize_async. Subclass shims set this attribute directly to avoid double-warning. + self._legacy_include_baseline: bool | None = None + if include_default_baseline is not None: + print_deprecation_message( + old_item="Scenario(include_default_baseline=...)", + new_item="Scenario.initialize_async(include_baseline=...)", + removed_in="0.16.0", + ) + self._legacy_include_baseline = include_default_baseline + @property def name(self) -> str: """Get the name of the scenario.""" @@ -546,6 +590,7 @@ async def initialize_async( max_concurrency: int = 10, max_retries: int = 0, memory_labels: Optional[dict[str, str]] = None, + include_baseline: bool | None = None, ) -> None: """ Initialize the scenario by populating self._atomic_attacks and creating the ScenarioResult. @@ -573,9 +618,17 @@ async def initialize_async( For example, max_retries=3 allows up to 4 total attempts (1 initial + 3 retries). memory_labels (Optional[Dict[str, str]]): Additional labels to apply to all attack runs in the scenario. These help track and categorize the scenario. + include_baseline (bool | None): Whether to prepend a baseline atomic attack that sends + all objectives without modifications, allowing comparison between unmodified prompts + and the scenario's strategies. If None (the default), the scenario type's + ``BASELINE_POLICY`` class attribute decides: ``Enabled`` includes it, + ``Disabled`` omits it, and ``Forbidden`` always omits it (and rejects an + explicit ``True``). Passing ``True`` to a scenario whose ``BASELINE_POLICY`` + is ``Forbidden`` raises ``ValueError``. Raises: - ValueError: If no objective_target is provided. + ValueError: If no objective_target is provided, or if ``include_baseline=True`` is passed + to a scenario whose ``BASELINE_POLICY`` is ``Forbidden``. """ # Validate required parameters if objective_target is None: @@ -594,6 +647,28 @@ async def initialize_async( self._max_retries = max_retries self._memory_labels = memory_labels or {} + # Deprecated. Will be removed in 0.16.0. Honor the legacy constructor-time + # include_default_baseline (or subclass include_baseline) only when the caller did + # not supply a runtime value. + if include_baseline is None and self._legacy_include_baseline is not None: + include_baseline = self._legacy_include_baseline + + # Resolve the effective include_baseline. Forbidden is checked first so a forbidden + # scenario type never silently inherits a True default; explicit-True on a forbidden + # type is a hard error rather than a silent ignore. For the Enabled / Disabled states, + # a None runtime value defers to the policy. + if self.BASELINE_POLICY is BaselinePolicy.Forbidden: + if include_baseline is True: + raise ValueError( + f"{type(self).__name__} does not support a default baseline " + f"(BASELINE_POLICY = Forbidden); pass include_baseline=False or omit the argument." + ) + include_baseline = False + elif include_baseline is None: + include_baseline = self.BASELINE_POLICY is BaselinePolicy.Enabled + + self._include_baseline = include_baseline + # Prepare scenario strategies using the stored configuration self._scenario_strategies = self._prepare_strategies(scenario_strategies) @@ -606,9 +681,23 @@ async def initialize_async( self._atomic_attacks = await self._get_atomic_attacks_async() - if self._include_baseline: - baseline_attack = self._get_baseline() - self._atomic_attacks.insert(0, baseline_attack) + # Deprecation rescue. Will be removed in 0.16.0. If the override didn't emit baseline, + # warn and inject. Migrated overrides emit baseline themselves and bypass this branch. + # Reuse seeds from the first existing attack rather than re-resolving from + # dataset_config; re-resolution under max_dataset_size would draw a fresh sample + # (the very ADO 9012 bug this PR fixes). When no atomic attacks exist yet the + # rescue falls back to the dataset_config one-time resolution. + if include_baseline and (not self._atomic_attacks or self._atomic_attacks[0].atomic_attack_name != "baseline"): + print_deprecation_message( + old_item=f"Implicit baseline injection for {type(self).__name__}._get_atomic_attacks_async()", + new_item="explicit emission via self._build_baseline_atomic_attack(seed_groups=...) in the override", + removed_in="0.16.0", + ) + if self._atomic_attacks: + seed_groups = self._atomic_attacks[0].seed_groups + else: + seed_groups = self._dataset_config.get_all_seed_attack_groups() + self._atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=seed_groups)) # Store original objectives for each atomic attack (before any mutations during execution) self._original_objectives_map = { @@ -659,26 +748,34 @@ async def initialize_async( self._scenario_result_id = str(result.id) logger.info(f"Created new scenario result with ID: {self._scenario_result_id}") - def _get_baseline(self) -> AtomicAttack: + def _build_baseline_atomic_attack(self, *, seed_groups: list[SeedAttackGroup]) -> AtomicAttack: """ - Get a baseline AtomicAttack, which simply sends all the objectives without any modifications. + Build the baseline AtomicAttack from pre-resolved seed groups. - If other atomic attacks exist, derives baseline data from the first attack. - Otherwise, creates a standalone baseline from the dataset configuration and scenario settings. + The baseline sends each objective unmodified, providing a comparison point + against the scenario's strategy attacks. Pass the same ``seed_groups`` used + to build the strategy attacks so both populations match. + + Args: + seed_groups: Seed groups to attack. Used as-is, no further sampling. Returns: - AtomicAttack: The baseline AtomicAttack instance. + AtomicAttack: The baseline atomic attack. Raises: - ValueError: If required data (seed_groups, objective_target, attack_scoring_config) - is not available. + ValueError: If ``initialize_async`` has not been called (no objective + target or scorer set). """ - seed_groups, attack_scoring_config, objective_target = self._get_baseline_data() + if self._objective_target is None: + raise ValueError("Objective target is required to create baseline attack.") + if self._objective_scorer is None: + raise ValueError("Objective scorer is required to create baseline attack.") + + from pyrit.executor.attack.core.attack_config import AttackScoringConfig - # Create baseline attack with no converters attack = PromptSendingAttack( - objective_target=objective_target, - attack_scoring_config=attack_scoring_config, + objective_target=self._objective_target, + attack_scoring_config=AttackScoringConfig(objective_scorer=cast("TrueFalseScorer", self._objective_scorer)), ) return AtomicAttack( @@ -688,40 +785,6 @@ def _get_baseline(self) -> AtomicAttack: memory_labels=self._memory_labels, ) - def _get_baseline_data(self) -> tuple[list["SeedAttackGroup"], "AttackScoringConfig", PromptTarget]: - """ - Get the data needed to create a baseline attack. - - Returns the scenario-level data - - Returns: - Tuple containing (seed_groups, attack_scoring_config, objective_target) - - Raises: - ValueError: If required data is not available. - """ - # Create from scenario-level settings - if not self._objective_target: - raise ValueError("Objective target is required to create baseline attack.") - if not self._dataset_config: - raise ValueError("Dataset config is required to create baseline attack.") - if not self._objective_scorer: - raise ValueError("Objective scorer is required to create baseline attack.") - - seed_groups = self._dataset_config.get_all_seed_attack_groups() - if not seed_groups or len(seed_groups) == 0: - raise ValueError("Seed groups are required to create baseline attack.") - - # Import here to avoid circular imports - from pyrit.executor.attack.core.attack_config import AttackScoringConfig - - attack_scoring_config = AttackScoringConfig(objective_scorer=cast("TrueFalseScorer", self._objective_scorer)) - - if not attack_scoring_config: - raise ValueError("Attack scoring config is required to create baseline attack.") - - return seed_groups, attack_scoring_config, self._objective_target - def _raise_dataset_exception(self) -> None: error_msg = textwrap.dedent( f""" @@ -902,7 +965,9 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: ``_build_display_group()``. Subclasses that do **not** use the factory/registry pattern should - override this method entirely. + override this method entirely. Overrides that want baseline support + must call ``self._build_baseline_atomic_attack`` with the strategy + seeds. Returns: list[AtomicAttack]: The generated atomic attacks. @@ -972,6 +1037,10 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: ) ) + if self._include_baseline: + all_seed_groups = [g for groups in seed_groups_by_dataset.values() for g in groups] + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=all_seed_groups)) + return atomic_attacks async def run_async(self) -> ScenarioResult: diff --git a/pyrit/scenario/scenarios/airt/cyber.py b/pyrit/scenario/scenarios/airt/cyber.py index 4f865507da..d29b81eecc 100644 --- a/pyrit/scenario/scenarios/airt/cyber.py +++ b/pyrit/scenario/scenarios/airt/cyber.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, ClassVar from pyrit.common import apply_defaults +from pyrit.common.deprecation import print_deprecation_message # Deprecated. Will be removed in 0.16.0. from pyrit.common.path import SCORER_SEED_PROMPT_PATH from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario @@ -108,8 +109,8 @@ def __init__( self, *, objective_scorer: TrueFalseScorer | None = None, - include_baseline: bool = True, scenario_result_id: str | None = None, + include_baseline: bool | None = None, # Deprecated. Will be removed in 0.16.0. ) -> None: """ Initialize the cyber harms scenario. @@ -117,9 +118,9 @@ def __init__( Args: objective_scorer (TrueFalseScorer | None): Objective scorer for malware detection. If not provided, defaults to a composite scorer using malware detection + refusal backstop. - include_baseline (bool): Whether to include a baseline atomic attack that sends all objectives - without modifications. Defaults to True. scenario_result_id (str | None): Optional ID of an existing scenario result to resume. + include_baseline (bool | None): **Deprecated.** Will be removed in 0.16.0. Pass + ``include_baseline`` to ``initialize_async`` instead. """ self._objective_scorer: TrueFalseScorer = ( objective_scorer if objective_scorer else self._get_default_objective_scorer() @@ -129,6 +130,15 @@ def __init__( version=self.VERSION, objective_scorer=self._objective_scorer, strategy_class=self.get_strategy_class(), - include_default_baseline=include_baseline, scenario_result_id=scenario_result_id, ) + + # Deprecated constructor-time baseline override. Will be removed in 0.16.0, along with + # the include_baseline kwarg above. + if include_baseline is not None: + print_deprecation_message( + old_item="Cyber(include_baseline=...)", + new_item="Cyber.initialize_async(include_baseline=...)", + removed_in="0.16.0", + ) + self._legacy_include_baseline = include_baseline diff --git a/pyrit/scenario/scenarios/airt/jailbreak.py b/pyrit/scenario/scenarios/airt/jailbreak.py index 62eee1b942..f69b55d017 100644 --- a/pyrit/scenario/scenarios/airt/jailbreak.py +++ b/pyrit/scenario/scenarios/airt/jailbreak.py @@ -5,6 +5,7 @@ from typing import Any, Optional, Union from pyrit.common import apply_defaults +from pyrit.common.deprecation import print_deprecation_message # Deprecated. Will be removed in 0.16.0. from pyrit.datasets import TextJailBreak from pyrit.executor.attack.core.attack_config import ( AttackAdversarialConfig, @@ -120,11 +121,11 @@ def __init__( self, *, objective_scorer: Optional[TrueFalseScorer] = None, - include_baseline: bool = False, scenario_result_id: Optional[str] = None, num_templates: Optional[int] = None, num_attempts: int = 1, jailbreak_names: list[str] | None = None, + include_baseline: bool | None = None, # Deprecated. Will be removed in 0.16.0. ) -> None: """ Initialize the jailbreak scenario. @@ -132,13 +133,13 @@ def __init__( Args: objective_scorer (Optional[TrueFalseScorer]): Scorer for detecting successful jailbreaks (non-refusal). If not provided, defaults to an inverted refusal scorer. - include_baseline (bool): Whether to include a baseline atomic attack that sends all - objectives without modifications. Defaults to True. scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume. num_templates (Optional[int]): Choose num_templates random jailbreaks rather than using all of them. num_attempts (Optional[int]): Number of times to try each jailbreak. jailbreak_names (Optional[List[str]]): List of jailbreak names from the template list under datasets. to use. + include_baseline (bool | None): **Deprecated.** Will be removed in 0.16.0. Pass + ``include_baseline`` to ``initialize_async`` instead. Raises: ValueError: If both jailbreak_names and num_templates are provided, as random selection @@ -183,10 +184,19 @@ def __init__( version=self.VERSION, strategy_class=JailbreakStrategy, objective_scorer=self._objective_scorer, - include_default_baseline=include_baseline, scenario_result_id=scenario_result_id, ) + # Deprecated constructor-time baseline override. Will be removed in 0.16.0, along with + # the include_baseline kwarg above. + if include_baseline is not None: + print_deprecation_message( + old_item="Jailbreak(include_baseline=...)", + new_item="Jailbreak.initialize_async(include_baseline=...)", + removed_in="0.16.0", + ) + self._legacy_include_baseline = include_baseline + # Will be resolved in _get_atomic_attacks_async self._seed_groups: Optional[list[SeedAttackGroup]] = None @@ -309,4 +319,7 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: ) atomic_attacks.append(atomic_attack) + if self._include_baseline: + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=self._seed_groups or [])) + return atomic_attacks diff --git a/pyrit/scenario/scenarios/airt/leakage.py b/pyrit/scenario/scenarios/airt/leakage.py index 85998ee795..2ccf54f768 100644 --- a/pyrit/scenario/scenarios/airt/leakage.py +++ b/pyrit/scenario/scenarios/airt/leakage.py @@ -160,7 +160,6 @@ def __init__( version=self.VERSION, strategy_class=self.get_strategy_class(), objective_scorer=objective_scorer, - include_default_baseline=True, scenario_result_id=scenario_result_id, ) diff --git a/pyrit/scenario/scenarios/airt/psychosocial.py b/pyrit/scenario/scenarios/airt/psychosocial.py index 860fe05828..8e7bb0bd5b 100644 --- a/pyrit/scenario/scenarios/airt/psychosocial.py +++ b/pyrit/scenario/scenarios/airt/psychosocial.py @@ -9,6 +9,7 @@ import yaml from pyrit.common import apply_defaults +from pyrit.common.deprecation import print_deprecation_message # Deprecated. Will be removed in 0.16.0. from pyrit.common.path import DATASETS_PATH from pyrit.executor.attack import ( AttackAdversarialConfig, @@ -214,6 +215,7 @@ def __init__( scenario_result_id: Optional[str] = None, subharm_configs: Optional[dict[str, SubharmConfig]] = None, max_turns: int = 5, + include_baseline: bool | None = None, # Deprecated. Will be removed in 0.16.0. ) -> None: """ Initialize the Psychosocial Harms Scenario. @@ -245,6 +247,8 @@ def __init__( max_turns (int): Maximum number of conversation turns for multi-turn attacks (CrescendoAttack). Defaults to 5. Increase for more gradual escalation, decrease for faster testing. + include_baseline (bool | None): **Deprecated.** Will be removed in 0.16.0. Pass + ``include_baseline`` to ``initialize_async`` instead. """ if objectives is not None: logger.warning( @@ -264,9 +268,18 @@ def __init__( strategy_class=PsychosocialStrategy, objective_scorer=self._objective_scorer, scenario_result_id=scenario_result_id, - include_default_baseline=False, ) + # Deprecated constructor-time baseline override. Will be removed in 0.16.0, along with + # the include_baseline kwarg above. + if include_baseline is not None: + print_deprecation_message( + old_item="Psychosocial(include_baseline=...)", + new_item="Psychosocial.initialize_async(include_baseline=...)", + removed_in="0.16.0", + ) + self._legacy_include_baseline = include_baseline + # Store deprecated objectives for later resolution in _resolve_seed_groups self._deprecated_objectives = objectives # Will be resolved in _get_atomic_attacks_async @@ -421,7 +434,7 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: scoring_config = self._create_scoring_config(resolved.subharm) - return [ + atomic_attacks: list[AtomicAttack] = [ *self._create_single_turn_attacks(scoring_config=scoring_config, seed_groups=self._seed_groups), self._create_multi_turn_attack( scoring_config=scoring_config, @@ -430,6 +443,11 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: ), ] + if self._include_baseline: + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=self._seed_groups)) + + return atomic_attacks + def _create_scoring_config(self, subharm: Optional[str]) -> AttackScoringConfig: subharm_config = self._subharm_configs.get(subharm) if subharm else None scorer = self._get_scorer(subharm=subharm) if subharm_config else self._objective_scorer diff --git a/pyrit/scenario/scenarios/airt/scam.py b/pyrit/scenario/scenarios/airt/scam.py index 3ff34be5a3..84b57cffb8 100644 --- a/pyrit/scenario/scenarios/airt/scam.py +++ b/pyrit/scenario/scenarios/airt/scam.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any, Optional from pyrit.common import Parameter, apply_defaults +from pyrit.common.deprecation import print_deprecation_message # Deprecated. Will be removed in 0.16.0. from pyrit.common.path import ( EXECUTOR_RED_TEAM_PATH, SCORER_SEED_PROMPT_PATH, @@ -152,8 +153,8 @@ def __init__( *, objective_scorer: Optional[TrueFalseScorer] = None, adversarial_chat: Optional[PromptTarget] = None, - include_baseline: bool = True, scenario_result_id: Optional[str] = None, + include_baseline: bool | None = None, # Deprecated. Will be removed in 0.16.0. ) -> None: """ Initialize the ScamScenario. @@ -163,11 +164,9 @@ def __init__( evaluation. adversarial_chat (Optional[PromptTarget]): Chat target used to rephrase the objective into the role-play context (in single-turn strategies). - include_baseline (bool): Whether to include a baseline atomic attack that sends all objectives - without modifications. Defaults to True. When True, a "baseline" attack is automatically - added as the first atomic attack, allowing comparison between unmodified prompts and - encoding-modified prompts. scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume. + include_baseline (bool | None): **Deprecated.** Will be removed in 0.16.0. Pass + ``include_baseline`` to ``initialize_async`` instead. """ if not objective_scorer: objective_scorer = self._get_default_objective_scorer() @@ -181,10 +180,19 @@ def __init__( version=self.VERSION, strategy_class=ScamStrategy, objective_scorer=objective_scorer, - include_default_baseline=include_baseline, scenario_result_id=scenario_result_id, ) + # Deprecated constructor-time baseline override. Will be removed in 0.16.0, along with + # the include_baseline kwarg above. + if include_baseline is not None: + print_deprecation_message( + old_item="Scam(include_baseline=...)", + new_item="Scam.initialize_async(include_baseline=...)", + removed_in="0.16.0", + ) + self._legacy_include_baseline = include_baseline + # Will be resolved in _get_atomic_attacks_async self._seed_groups: Optional[list[SeedAttackGroup]] = None @@ -273,4 +281,9 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: strategies = {s.value for s in self._scenario_strategies} - return [self._get_atomic_attack_from_strategy(strategy) for strategy in strategies] + atomic_attacks = [self._get_atomic_attack_from_strategy(strategy) for strategy in strategies] + + if self._include_baseline: + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=self._seed_groups or [])) + + return atomic_attacks diff --git a/pyrit/scenario/scenarios/benchmark/adversarial.py b/pyrit/scenario/scenarios/benchmark/adversarial.py index bdcbd7e0d5..d3f873f67e 100644 --- a/pyrit/scenario/scenarios/benchmark/adversarial.py +++ b/pyrit/scenario/scenarios/benchmark/adversarial.py @@ -14,7 +14,7 @@ from pyrit.registry.tag_query import TagQuery from pyrit.scenario.core.atomic_attack import AtomicAttack from pyrit.scenario.core.dataset_configuration import DatasetConfiguration -from pyrit.scenario.core.scenario import Scenario +from pyrit.scenario.core.scenario import BaselinePolicy, Scenario from pyrit.scenario.core.scenario_techniques import SCENARIO_TECHNIQUES if TYPE_CHECKING: @@ -34,6 +34,10 @@ class AdversarialBenchmark(Scenario): VERSION: int = 1 _cached_strategy_class: ClassVar[type[ScenarioStrategy] | None] = None + #: AdversarialBenchmark compares attack-success rates across adversarial models; a baseline + #: attack would be model-independent and contribute no signal to the comparison. + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Forbidden + @classmethod def get_strategy_class(cls) -> type[ScenarioStrategy]: """ @@ -118,7 +122,6 @@ def __init__( version=self.VERSION, objective_scorer=self._objective_scorer, strategy_class=self.get_strategy_class(), - include_default_baseline=False, scenario_result_id=scenario_result_id, ) diff --git a/pyrit/scenario/scenarios/foundry/red_team_agent.py b/pyrit/scenario/scenarios/foundry/red_team_agent.py index 2ba4e6667d..b9ce521fb4 100644 --- a/pyrit/scenario/scenarios/foundry/red_team_agent.py +++ b/pyrit/scenario/scenarios/foundry/red_team_agent.py @@ -16,6 +16,7 @@ from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast from pyrit.common import REQUIRED_VALUE, apply_defaults +from pyrit.common.deprecation import print_deprecation_message # Deprecated. Will be removed in 0.16.0. from pyrit.datasets import TextJailBreak from pyrit.executor.attack import ( CrescendoAttack, @@ -245,8 +246,8 @@ def __init__( *, adversarial_chat: Optional[PromptTarget] = None, attack_scoring_config: Optional[AttackScoringConfig] = None, - include_baseline: bool = True, scenario_result_id: Optional[str] = None, + include_baseline: bool | None = None, # Deprecated. Will be removed in 0.16.0. ) -> None: """ Initialize a Foundry Scenario with the specified attack strategies. @@ -258,11 +259,9 @@ def __init__( attack_scoring_config (Optional[AttackScoringConfig]): Configuration for attack scoring, including the objective scorer and auxiliary scorers. If not provided, creates a default configuration with a composite scorer using Azure Content Filter and SelfAsk Refusal scorers. - include_baseline (bool): Whether to include a baseline atomic attack that sends all objectives - without modifications. Defaults to True. When True, a "baseline" attack is automatically - added as the first atomic attack, allowing comparison between unmodified prompts and - attack-modified prompts. scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume. + include_baseline (bool | None): **Deprecated.** Will be removed in 0.16.0. Pass + ``include_baseline`` to ``initialize_async`` instead. Raises: ValueError: If attack_strategies is empty or contains unsupported strategies. @@ -284,9 +283,19 @@ def __init__( version=self.VERSION, strategy_class=FoundryStrategy, objective_scorer=objective_scorer, - include_default_baseline=include_baseline, scenario_result_id=scenario_result_id, ) + + # Deprecated constructor-time baseline override. Will be removed in 0.16.0, along with + # the include_baseline kwarg above. + if include_baseline is not None: + print_deprecation_message( + old_item="RedTeamAgent(include_baseline=...)", + new_item="RedTeamAgent.initialize_async(include_baseline=...)", + removed_in="0.16.0", + ) + self._legacy_include_baseline = include_baseline + self._scenario_composites: list[FoundryComposite] = [] @apply_defaults @@ -301,6 +310,7 @@ async def initialize_async( max_concurrency: int = 10, max_retries: int = 0, memory_labels: Optional[dict[str, str]] = None, + include_baseline: bool | None = None, ) -> None: """ Initialize the scenario. @@ -316,6 +326,7 @@ async def initialize_async( max_concurrency (int): Maximum number of concurrent attack executions. Defaults to 10. max_retries (int): Maximum number of retries on failure. Defaults to 0. memory_labels (Optional[dict[str, str]]): Labels to attach to all memory entries. + include_baseline (bool | None): See ``Scenario.initialize_async``. """ # This override exists purely for type-widening: FoundryComposite is a dataclass, # not a ScenarioStrategy enum member, so the base class signature would reject it. @@ -327,6 +338,7 @@ async def initialize_async( max_concurrency=max_concurrency, max_retries=max_retries, memory_labels=memory_labels, + include_baseline=include_baseline, ) def _prepare_strategies( # type: ignore[ty:invalid-method-override] @@ -421,7 +433,12 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: # Resolve seed groups now that initialize_async has been called self._seed_groups = self._resolve_seed_groups() - return [self._get_attack_from_strategy(composition) for composition in self._scenario_composites] + atomic_attacks = [self._get_attack_from_strategy(composition) for composition in self._scenario_composites] + + if self._include_baseline: + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=self._seed_groups)) + + return atomic_attacks def _get_attack_from_strategy(self, composite: FoundryComposite) -> AtomicAttack: """ diff --git a/pyrit/scenario/scenarios/garak/encoding.py b/pyrit/scenario/scenarios/garak/encoding.py index 531b11062e..c20ece87b4 100644 --- a/pyrit/scenario/scenarios/garak/encoding.py +++ b/pyrit/scenario/scenarios/garak/encoding.py @@ -7,6 +7,7 @@ from typing import Optional from pyrit.common import apply_defaults +from pyrit.common.deprecation import print_deprecation_message # Deprecated. Will be removed in 0.16.0. from pyrit.executor.attack.core.attack_config import ( AttackConverterConfig, AttackScoringConfig, @@ -173,8 +174,8 @@ def __init__( *, objective_scorer: Optional[TrueFalseScorer] = None, encoding_templates: Optional[Sequence[str]] = None, - include_baseline: bool = True, scenario_result_id: Optional[str] = None, + include_baseline: bool | None = None, # Deprecated. Will be removed in 0.16.0. ) -> None: """ Initialize the Encoding Scenario. @@ -185,11 +186,9 @@ def __init__( category. encoding_templates (Optional[Sequence[str]]): Templates used to construct the decoding prompts. Defaults to AskToDecodeConverter.garak_templates. - include_baseline (bool): Whether to include a baseline atomic attack that sends all objectives - without modifications. Defaults to True. When True, a "baseline" attack is automatically - added as the first atomic attack, allowing comparison between unmodified prompts and - encoding-modified prompts. scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume. + include_baseline (bool | None): **Deprecated.** Will be removed in 0.16.0. Pass + ``include_baseline`` to ``initialize_async`` instead. """ objective_scorer = objective_scorer or DecodingScorer(categories=["encoding_scenario"]) self._scorer_config = AttackScoringConfig(objective_scorer=objective_scorer) @@ -200,10 +199,19 @@ def __init__( version=self.VERSION, strategy_class=EncodingStrategy, objective_scorer=objective_scorer, - include_default_baseline=include_baseline, scenario_result_id=scenario_result_id, ) + # Deprecated constructor-time baseline override. Will be removed in 0.16.0, along with + # the include_baseline kwarg above. + if include_baseline is not None: + print_deprecation_message( + old_item="Encoding(include_baseline=...)", + new_item="Encoding.initialize_async(include_baseline=...)", + removed_in="0.16.0", + ) + self._legacy_include_baseline = include_baseline + # Will be resolved in _get_atomic_attacks_async self._resolved_seed_groups: Optional[list[SeedAttackGroup]] = None @@ -232,7 +240,12 @@ async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: # Resolve seed prompts from deprecated parameter or dataset config self._resolved_seed_groups = self._resolve_seed_groups() - return self._get_converter_attacks() + atomic_attacks = self._get_converter_attacks() + + if self._include_baseline: + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=self._resolved_seed_groups or [])) + + return atomic_attacks # These are the same as Garak encoding attacks def _get_converter_attacks(self) -> list[AtomicAttack]: diff --git a/tests/integration/datasets/test_seed_dataset_provider_integration.py b/tests/integration/datasets/test_seed_dataset_provider_integration.py index 457aadb7e6..85a8a80235 100644 --- a/tests/integration/datasets/test_seed_dataset_provider_integration.py +++ b/tests/integration/datasets/test_seed_dataset_provider_integration.py @@ -683,7 +683,6 @@ async def test_red_team_agent_initializes_with_harmbench(self, sqlite_instance): rta = RedTeamAgent( adversarial_chat=target, attack_scoring_config=AttackScoringConfig(objective_scorer=mock_scorer), - include_baseline=False, ) # This is the critical call — it loads seed groups from memory @@ -693,6 +692,7 @@ async def test_red_team_agent_initializes_with_harmbench(self, sqlite_instance): objective_target=target, max_concurrency=1, scenario_strategies=[FoundryStrategy.Base64], + include_baseline=False, ) # Verify the scenario got objectives from harmbench diff --git a/tests/unit/scenario/test_adversarial.py b/tests/unit/scenario/test_adversarial.py index e6b082cb0d..c4dea8f3c5 100644 --- a/tests/unit/scenario/test_adversarial.py +++ b/tests/unit/scenario/test_adversarial.py @@ -23,7 +23,7 @@ from pyrit.prompt_target import PromptTarget from pyrit.prompt_target.common.prompt_chat_target import PromptChatTarget from pyrit.registry.object_registries.attack_technique_registry import AttackTechniqueRegistry -from pyrit.scenario.core import AtomicAttack +from pyrit.scenario.core import AtomicAttack, BaselinePolicy from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario_techniques import SCENARIO_TECHNIQUES from pyrit.scenario.scenarios.benchmark.adversarial import AdversarialBenchmark @@ -428,19 +428,42 @@ async def test_attacks_carry_seed_groups(self, mock_objective_target, single_adv for a in attacks: assert len(a.objectives) > 0 - @pytest.mark.asyncio async def test_baseline_excluded(self, mock_objective_target, single_adversarial_model): """AdversarialBenchmark must opt out of the parent's default baseline. - Verifies both the configuration toggle (``_include_baseline is False``) and - the observable property (no atomic attack is named ``"baseline"``). + Verifies both the class-level capability flag and the observable property + (no atomic attack is named ``"baseline"``). """ - scenario, attacks = await self._init_and_get_attacks( + scenario, _ = await self._init_and_get_attacks( mock_objective_target=mock_objective_target, adversarial_models=single_adversarial_model, ) - assert scenario._include_baseline is False - assert not any(a.atomic_attack_name == "baseline" for a in attacks) + assert type(scenario).BASELINE_POLICY is BaselinePolicy.Forbidden + assert not any(a.atomic_attack_name == "baseline" for a in scenario._atomic_attacks) + + async def test_baseline_explicit_true_raises(self, mock_objective_target, single_adversarial_model): + """Explicitly passing include_baseline=True to a forbidden scenario raises ValueError.""" + scenario = AdversarialBenchmark(adversarial_models=single_adversarial_model) + with pytest.raises(ValueError, match="does not support a default baseline"): + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=True, + ) + + async def test_baseline_explicit_false_succeeds(self, mock_objective_target, single_adversarial_model): + """Explicit include_baseline=False on a forbidden scenario is accepted (matches the default).""" + groups = {"harmbench": _make_seed_groups("harmbench")} + with ( + patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups), + patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") as mock_scorer, + ): + mock_scorer.return_value = MagicMock(spec=TrueFalseScorer, get_identifier=lambda: _mock_id("scorer")) + scenario = AdversarialBenchmark(adversarial_models=single_adversarial_model) + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=False, + ) + assert not any(a.atomic_attack_name == "baseline" for a in scenario._atomic_attacks) # =========================================================================== diff --git a/tests/unit/scenario/test_baseline_deprecation.py b/tests/unit/scenario/test_baseline_deprecation.py new file mode 100644 index 0000000000..5faf9a6d76 --- /dev/null +++ b/tests/unit/scenario/test_baseline_deprecation.py @@ -0,0 +1,202 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +Deprecated. Will be removed in 0.16.0 along with the corresponding +``include_default_baseline`` / ``include_baseline`` constructor shims in +``Scenario`` and its subclasses (``Cyber``, ``Jailbreak``, ``Scam``, +``RedTeamAgent``, ``Encoding``). +""" + +import warnings +from typing import ClassVar +from unittest.mock import MagicMock, patch + +import pytest + +from pyrit.identifiers import ComponentIdentifier +from pyrit.scenario import DatasetConfiguration +from pyrit.scenario.core import BaselinePolicy, Scenario, ScenarioStrategy +from pyrit.score import TrueFalseScorer + +_TEST_SCORER_ID = ComponentIdentifier(class_name="MockScorer", class_module="tests.unit.scenarios") + + +class _LegacyStrategy(ScenarioStrategy): + TEST = ("test", {"concrete"}) + ALL = ("all", {"all"}) + + @classmethod + def get_aggregate_tags(cls) -> set[str]: + return {"all"} + + +class _LegacyScenario(Scenario): + """Minimal Scenario stand-in for exercising the deprecated baseline kwargs.""" + + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Enabled + + def __init__(self, **kwargs): + kwargs.setdefault("strategy_class", _LegacyStrategy) + if "objective_scorer" not in kwargs: + mock_scorer = MagicMock(spec=TrueFalseScorer) + mock_scorer.get_identifier.return_value = _TEST_SCORER_ID + mock_scorer.get_scorer_metrics.return_value = None + kwargs["objective_scorer"] = mock_scorer + kwargs.setdefault("version", 1) + super().__init__(**kwargs) + + @classmethod + def get_strategy_class(cls): + return _LegacyStrategy + + @classmethod + def get_default_strategy(cls): + return _LegacyStrategy.ALL + + @classmethod + def default_dataset_config(cls) -> DatasetConfiguration: + return DatasetConfiguration() + + async def _get_atomic_attacks_async(self): + atomic_attacks = [] + if self._include_baseline: + groups_by_dataset = self._dataset_config.get_seed_attack_groups() + all_seed_groups = [g for groups in groups_by_dataset.values() for g in groups] + atomic_attacks.append(self._build_baseline_atomic_attack(seed_groups=all_seed_groups)) + return atomic_attacks + + +@pytest.fixture +def mock_objective_target(): + target = MagicMock() + target.get_identifier.return_value = ComponentIdentifier(class_name="MockTarget", class_module="test") + return target + + +@pytest.mark.usefixtures("patch_central_database") +class TestScenarioBaseDeprecation: + """Cover the deprecated ``Scenario(include_default_baseline=...)`` base kwarg.""" + + def test_base_kwarg_emits_deprecation_warning(self): + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + scenario = _LegacyScenario(include_default_baseline=False) + + deprecations = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert len(deprecations) == 1 + msg = str(deprecations[0].message) + assert "include_default_baseline" in msg + assert "0.16.0" in msg + assert scenario._legacy_include_baseline is False + + def test_base_kwarg_omitted_emits_no_warning(self): + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + scenario = _LegacyScenario() + + assert not any(issubclass(w.category, DeprecationWarning) for w in caught) + assert scenario._legacy_include_baseline is None + + async def test_legacy_value_drives_initialize_when_runtime_kwarg_omitted(self, mock_objective_target): + """Constructor-time False suppresses the baseline that BASELINE_POLICY=Enabled would add.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + scenario = _LegacyScenario(include_default_baseline=False) + + with patch.object(_LegacyScenario, "default_dataset_config", return_value=DatasetConfiguration()): + await scenario.initialize_async(objective_target=mock_objective_target) + + assert not any(a.atomic_attack_name == "baseline" for a in scenario._atomic_attacks) + + async def test_runtime_kwarg_wins_over_legacy_value(self, mock_objective_target): + """Explicit runtime include_baseline overrides any constructor-time legacy value.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + scenario = _LegacyScenario(include_default_baseline=True) + + with patch.object(_LegacyScenario, "default_dataset_config", return_value=DatasetConfiguration()): + await scenario.initialize_async(objective_target=mock_objective_target, include_baseline=False) + + assert not any(a.atomic_attack_name == "baseline" for a in scenario._atomic_attacks) + + +class TestSubclassBaselineKwargDeprecation: + """Cover the deprecated ``include_baseline`` constructor kwarg on user-facing subclasses.""" + + @pytest.mark.parametrize( + "import_path, class_name, needs_adversarial_chat", + [ + ("pyrit.scenario.scenarios.airt.cyber", "Cyber", False), + ("pyrit.scenario.scenarios.airt.jailbreak", "Jailbreak", False), + ("pyrit.scenario.scenarios.airt.scam", "Scam", True), + ("pyrit.scenario.scenarios.garak.encoding", "Encoding", False), + ], + ) + def test_subclass_kwarg_emits_deprecation_warning( + self, import_path, class_name, needs_adversarial_chat, patch_central_database + ): + from pyrit.prompt_target import PromptTarget + from pyrit.score import TrueFalseScorer + + module = __import__(import_path, fromlist=[class_name]) + cls = getattr(module, class_name) + + # Spec'd against TrueFalseScorer so AttackScoringConfig validators accept it. + mock_scorer = MagicMock(spec=TrueFalseScorer) + mock_scorer.get_identifier.return_value = _TEST_SCORER_ID + mock_scorer.get_scorer_metrics.return_value = None + + extra_kwargs = {} + if needs_adversarial_chat: + mock_target = MagicMock(spec=PromptTarget) + mock_target.get_identifier.return_value = ComponentIdentifier(class_name="MockTarget", class_module="test") + extra_kwargs["adversarial_chat"] = mock_target + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + scenario = cls(objective_scorer=mock_scorer, include_baseline=False, **extra_kwargs) + + deprecations = [ + w for w in caught if issubclass(w.category, DeprecationWarning) and class_name in str(w.message) + ] + assert len(deprecations) >= 1, f"{class_name} did not emit a DeprecationWarning naming the class" + assert "0.16.0" in str(deprecations[0].message) + assert scenario._legacy_include_baseline is False + + +@pytest.mark.usefixtures("patch_central_database") +class TestLegacyAndRuntimePathsEquivalentUnderMaxDatasetSize: + """ADO 9012: the deprecated constructor path and the new initialize_async path must + produce the same baseline atomic attack under max_dataset_size.""" + + async def test_paths_produce_matching_objective_sets(self, mock_objective_target): + from pyrit.models import SeedGroup, SeedObjective + + seed_groups = [SeedGroup(seeds=[SeedObjective(value=f"obj{i}")]) for i in range(10)] + + # Both paths share the same patched sample, so each scenario's single + # resolution call returns ``stable_sample``. + stable_sample = seed_groups[:3] + + with patch( + "pyrit.scenario.core.dataset_configuration.random.sample", + return_value=stable_sample, + ): + config_legacy = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + legacy = _LegacyScenario(include_default_baseline=True) + await legacy.initialize_async(objective_target=mock_objective_target, dataset_config=config_legacy) + + config_runtime = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + runtime = _LegacyScenario() + await runtime.initialize_async( + objective_target=mock_objective_target, + dataset_config=config_runtime, + include_baseline=True, + ) + + assert legacy._atomic_attacks[0].atomic_attack_name == "baseline" + assert runtime._atomic_attacks[0].atomic_attack_name == "baseline" + assert set(legacy._atomic_attacks[0].objectives) == set(runtime._atomic_attacks[0].objectives) diff --git a/tests/unit/scenario/test_cyber.py b/tests/unit/scenario/test_cyber.py index e5833525bf..d519e8913f 100644 --- a/tests/unit/scenario/test_cyber.py +++ b/tests/unit/scenario/test_cyber.py @@ -211,7 +211,7 @@ async def _init_and_get_attacks( groups = seed_groups or {"malware": _make_seed_groups("malware")} with patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups): scenario = Cyber(objective_scorer=mock_objective_scorer) - init_kwargs = {"objective_target": mock_objective_target} + init_kwargs = {"objective_target": mock_objective_target, "include_baseline": False} if strategies: init_kwargs["scenario_strategies"] = strategies await scenario.initialize_async(**init_kwargs) diff --git a/tests/unit/scenario/test_encoding.py b/tests/unit/scenario/test_encoding.py index 0df8435a86..bb643ff6dc 100644 --- a/tests/unit/scenario/test_encoding.py +++ b/tests/unit/scenario/test_encoding.py @@ -399,3 +399,36 @@ def test_encoding_dataset_config_can_be_initialized_with_dataset_names(self): assert config._dataset_names == ["garak_slur_terms_en", "garak_web_html_js"] assert config.max_dataset_size == 5 + + +@pytest.mark.usefixtures("patch_central_database") +class TestEncodingBaselineUniformity: + """ADO 9012 regression: baseline shares objectives with strategies under max_dataset_size.""" + + async def test_one_resolution_call_baseline_matches_strategies(self, mock_objective_target, mock_objective_scorer): + from unittest.mock import patch + + from pyrit.models import SeedGroup, SeedObjective + + seed_groups = [SeedGroup(seeds=[SeedObjective(value=f"obj{i}")]) for i in range(10)] + config = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + + first_sample = seed_groups[:3] + second_sample = seed_groups[5:8] + with patch( + "pyrit.scenario.core.dataset_configuration.random.sample", + side_effect=[first_sample, second_sample], + ) as mock_sample: + scenario = Encoding(objective_scorer=mock_objective_scorer) + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[EncodingStrategy.ALL], + dataset_config=config, + include_baseline=True, + ) + + assert mock_sample.call_count == 1 + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + baseline_objs = set(scenario._atomic_attacks[0].objectives) + for attack in scenario._atomic_attacks[1:]: + assert set(attack.objectives) == baseline_objs diff --git a/tests/unit/scenario/test_foundry.py b/tests/unit/scenario/test_foundry.py index d2b613f1b8..3a00a50ef2 100644 --- a/tests/unit/scenario/test_foundry.py +++ b/tests/unit/scenario/test_foundry.py @@ -522,7 +522,6 @@ async def test_scenario_composites_set_after_initialize( with patch.object(RedTeamAgent, "_resolve_seed_groups", return_value=mock_memory_seed_groups): scenario = RedTeamAgent( attack_scoring_config=AttackScoringConfig(objective_scorer=mock_objective_scorer), - include_baseline=False, ) # Before initialize_async, composites should be empty @@ -532,6 +531,7 @@ async def test_scenario_composites_set_after_initialize( objective_target=mock_objective_target, scenario_strategies=strategies, dataset_config=mock_dataset_config, + include_baseline=False, ) # After initialize_async, composites should be set @@ -578,12 +578,12 @@ async def test_initialize_with_foundry_composite_directly( with patch.object(RedTeamAgent, "_resolve_seed_groups", return_value=mock_memory_seed_groups): scenario = RedTeamAgent( attack_scoring_config=AttackScoringConfig(objective_scorer=mock_objective_scorer), - include_baseline=False, ) await scenario.initialize_async( objective_target=mock_objective_target, scenario_strategies=[composite], dataset_config=mock_dataset_config, + include_baseline=False, ) assert len(scenario._scenario_composites) == 1 @@ -601,12 +601,12 @@ async def test_initialize_with_mixed_composites_and_strategies( with patch.object(RedTeamAgent, "_resolve_seed_groups", return_value=mock_memory_seed_groups): scenario = RedTeamAgent( attack_scoring_config=AttackScoringConfig(objective_scorer=mock_objective_scorer), - include_baseline=False, ) await scenario.initialize_async( objective_target=mock_objective_target, scenario_strategies=[composite, FoundryStrategy.ROT13], dataset_config=mock_dataset_config, + include_baseline=False, ) assert len(scenario._scenario_composites) == 2 @@ -624,12 +624,12 @@ async def test_initialize_converts_scenario_composite_strategy_to_foundry_compos with patch.object(RedTeamAgent, "_resolve_seed_groups", return_value=mock_memory_seed_groups): scenario = RedTeamAgent( attack_scoring_config=AttackScoringConfig(objective_scorer=mock_objective_scorer), - include_baseline=False, ) await scenario.initialize_async( objective_target=mock_objective_target, scenario_strategies=[legacy], # type: ignore[arg-type] dataset_config=mock_dataset_config, + include_baseline=False, ) assert len(scenario._scenario_composites) == 1 @@ -647,12 +647,12 @@ async def test_initialize_converts_converter_first_composite_strategy( with patch.object(RedTeamAgent, "_resolve_seed_groups", return_value=mock_memory_seed_groups): scenario = RedTeamAgent( attack_scoring_config=AttackScoringConfig(objective_scorer=mock_objective_scorer), - include_baseline=False, ) await scenario.initialize_async( objective_target=mock_objective_target, scenario_strategies=[legacy], # type: ignore[arg-type] dataset_config=mock_dataset_config, + include_baseline=False, ) result = scenario._scenario_composites[0] @@ -669,14 +669,47 @@ async def test_initialize_converts_converter_only_composite_strategy( with patch.object(RedTeamAgent, "_resolve_seed_groups", return_value=mock_memory_seed_groups): scenario = RedTeamAgent( attack_scoring_config=AttackScoringConfig(objective_scorer=mock_objective_scorer), - include_baseline=False, ) await scenario.initialize_async( objective_target=mock_objective_target, scenario_strategies=[legacy], # type: ignore[arg-type] dataset_config=mock_dataset_config, + include_baseline=False, ) result = scenario._scenario_composites[0] assert result.attack is None assert set(result.converters) == {FoundryStrategy.Base64, FoundryStrategy.ROT13} + + +@pytest.mark.usefixtures(*FIXTURES) +class TestRedTeamAgentBaselineUniformity: + """ADO 9012 regression: baseline shares objectives with strategies under max_dataset_size.""" + + async def test_one_resolution_call_baseline_matches_strategies(self, mock_objective_target, mock_objective_scorer): + from pyrit.models import SeedGroup, SeedObjective + + seed_groups = [SeedGroup(seeds=[SeedObjective(value=f"obj{i}")]) for i in range(10)] + config = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + + first_sample = seed_groups[:3] + second_sample = seed_groups[5:8] + with patch( + "pyrit.scenario.core.dataset_configuration.random.sample", + side_effect=[first_sample, second_sample], + ) as mock_sample: + scenario = RedTeamAgent( + attack_scoring_config=AttackScoringConfig(objective_scorer=mock_objective_scorer), + ) + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[FoundryStrategy.Base64], + dataset_config=config, + include_baseline=True, + ) + + assert mock_sample.call_count == 1 + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + baseline_objs = set(scenario._atomic_attacks[0].objectives) + for attack in scenario._atomic_attacks[1:]: + assert set(attack.objectives) == baseline_objs diff --git a/tests/unit/scenario/test_jailbreak.py b/tests/unit/scenario/test_jailbreak.py index e290084620..b30d386140 100644 --- a/tests/unit/scenario/test_jailbreak.py +++ b/tests/unit/scenario/test_jailbreak.py @@ -16,6 +16,7 @@ from pyrit.identifiers import ComponentIdentifier from pyrit.models import SeedGroup, SeedObjective from pyrit.prompt_target import PromptTarget +from pyrit.scenario.core import BaselinePolicy from pyrit.scenario.scenarios.airt.jailbreak import Jailbreak, JailbreakStrategy from pyrit.score.true_false.true_false_inverter_scorer import TrueFalseInverterScorer @@ -202,6 +203,31 @@ async def test_init_raises_exception_when_no_datasets_available(self, mock_objec with pytest.raises(ValueError, match="DatasetConfiguration has no seed_groups"): await scenario.initialize_async(objective_target=mock_objective_target) + def test_class_inherits_default_baseline_policy(self): + """Jailbreak inherits the base default (Enabled) — baseline included by default.""" + assert Jailbreak.BASELINE_POLICY is BaselinePolicy.Enabled + + async def test_default_initialize_includes_baseline( + self, mock_objective_target, mock_objective_scorer, mock_memory_seed_groups + ): + """initialize_async without include_baseline honors BASELINE_POLICY=Enabled.""" + with patch.object(Jailbreak, "_resolve_seed_groups", return_value=mock_memory_seed_groups): + scenario = Jailbreak(objective_scorer=mock_objective_scorer) + await scenario.initialize_async(objective_target=mock_objective_target) + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + + async def test_explicit_include_baseline_false_omits_baseline( + self, mock_objective_target, mock_objective_scorer, mock_memory_seed_groups + ): + """Caller can opt out of baseline by passing include_baseline=False.""" + with patch.object(Jailbreak, "_resolve_seed_groups", return_value=mock_memory_seed_groups): + scenario = Jailbreak(objective_scorer=mock_objective_scorer) + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=False, + ) + assert not any(a.atomic_attack_name == "baseline" for a in scenario._atomic_attacks) + @pytest.mark.usefixtures(*FIXTURES) class TestJailbreakAttackGeneration: @@ -242,7 +268,9 @@ async def test_attack_generation_for_complex( scenario = Jailbreak(objective_scorer=mock_objective_scorer) await scenario.initialize_async( - objective_target=mock_objective_target, scenario_strategies=[complex_jailbreak_strategy] + objective_target=mock_objective_target, + scenario_strategies=[complex_jailbreak_strategy], + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: @@ -258,7 +286,9 @@ async def test_attack_generation_for_manyshot( scenario = Jailbreak(objective_scorer=mock_objective_scorer) await scenario.initialize_async( - objective_target=mock_objective_target, scenario_strategies=[manyshot_jailbreak_strategy] + objective_target=mock_objective_target, + scenario_strategies=[manyshot_jailbreak_strategy], + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: @@ -272,7 +302,9 @@ async def test_attack_generation_for_promptsending( scenario = Jailbreak(objective_scorer=mock_objective_scorer) await scenario.initialize_async( - objective_target=mock_objective_target, scenario_strategies=[promptsending_jailbreak_strategy] + objective_target=mock_objective_target, + scenario_strategies=[promptsending_jailbreak_strategy], + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: @@ -286,7 +318,9 @@ async def test_attack_generation_for_skeleton( scenario = Jailbreak(objective_scorer=mock_objective_scorer) await scenario.initialize_async( - objective_target=mock_objective_target, scenario_strategies=[skeleton_jailbreak_attack] + objective_target=mock_objective_target, + scenario_strategies=[skeleton_jailbreak_attack], + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: @@ -300,7 +334,9 @@ async def test_attack_generation_for_roleplay( scenario = Jailbreak(objective_scorer=mock_objective_scorer) await scenario.initialize_async( - objective_target=mock_objective_target, scenario_strategies=[roleplay_jailbreak_strategy] + objective_target=mock_objective_target, + scenario_strategies=[roleplay_jailbreak_strategy], + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() for run in atomic_attacks: @@ -362,11 +398,11 @@ async def test_custom_num_attempts( """Test that n successfully tries each jailbreak template n-many times.""" with patch.object(Jailbreak, "_resolve_seed_groups", return_value=mock_memory_seed_groups): base_scenario = Jailbreak(objective_scorer=mock_objective_scorer) - await base_scenario.initialize_async(objective_target=mock_objective_target) + await base_scenario.initialize_async(objective_target=mock_objective_target, include_baseline=False) atomic_attacks_1 = await base_scenario._get_atomic_attacks_async() mult_scenario = Jailbreak(objective_scorer=mock_objective_scorer, num_attempts=mock_random_num_attempts) - await mult_scenario.initialize_async(objective_target=mock_objective_target) + await mult_scenario.initialize_async(objective_target=mock_objective_target, include_baseline=False) atomic_attacks_n = await mult_scenario._get_atomic_attacks_async() assert len(atomic_attacks_1) * mock_random_num_attempts == len(atomic_attacks_n) @@ -481,7 +517,9 @@ async def test_roleplay_attacks_share_adversarial_target( with patch.object(Jailbreak, "_resolve_seed_groups", return_value=mock_memory_seed_groups): scenario = Jailbreak(objective_scorer=mock_objective_scorer, num_templates=2) await scenario.initialize_async( - objective_target=mock_objective_target, scenario_strategies=[roleplay_jailbreak_strategy] + objective_target=mock_objective_target, + scenario_strategies=[roleplay_jailbreak_strategy], + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() assert len(atomic_attacks) >= 2 @@ -489,3 +527,37 @@ async def test_roleplay_attacks_share_adversarial_target( # All role-play attacks should share the same adversarial target adversarial_targets = [run.attack_technique.attack._adversarial_chat for run in atomic_attacks] assert all(t is adversarial_targets[0] for t in adversarial_targets) + + +@pytest.mark.usefixtures(*FIXTURES) +class TestJailbreakBaselineUniformity: + """ADO 9012 regression: baseline shares objectives with strategies under max_dataset_size.""" + + async def test_one_resolution_call_baseline_matches_strategies( + self, mock_objective_target, mock_objective_scorer, simple_jailbreak_strategy + ): + from pyrit.models import SeedGroup, SeedObjective + from pyrit.scenario import DatasetConfiguration + + seed_groups = [SeedGroup(seeds=[SeedObjective(value=f"obj{i}")]) for i in range(10)] + config = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + + first_sample = seed_groups[:3] + second_sample = seed_groups[5:8] + scenario = Jailbreak(objective_scorer=mock_objective_scorer, num_templates=1) + with patch( + "pyrit.scenario.core.dataset_configuration.random.sample", + side_effect=[first_sample, second_sample], + ) as mock_sample: + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[simple_jailbreak_strategy], + dataset_config=config, + include_baseline=True, + ) + + assert mock_sample.call_count == 1 + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + baseline_objs = set(scenario._atomic_attacks[0].objectives) + for attack in scenario._atomic_attacks[1:]: + assert set(attack.objectives) == baseline_objs diff --git a/tests/unit/scenario/test_leakage_scenario.py b/tests/unit/scenario/test_leakage_scenario.py index 5a83bb4565..9c6fa823ae 100644 --- a/tests/unit/scenario/test_leakage_scenario.py +++ b/tests/unit/scenario/test_leakage_scenario.py @@ -14,6 +14,7 @@ from pyrit.prompt_target import PromptTarget from pyrit.scenario import DatasetConfiguration from pyrit.scenario.airt import Leakage, LeakageStrategy +from pyrit.scenario.core import BaselinePolicy from pyrit.score import TrueFalseCompositeScorer @@ -102,10 +103,9 @@ def test_default_scorer_uses_leakage_yaml(self): scorer_path = DATASETS_PATH / "score" / "true_false_question" / "leakage.yaml" assert scorer_path.exists(), f"Expected leakage.yaml scorer at {scorer_path}" - def test_init_include_baseline_true(self, mock_objective_scorer): - """Test that include_baseline is always True.""" - scenario = Leakage(objective_scorer=mock_objective_scorer) - assert scenario._include_baseline is True + def test_init_supports_default_baseline(self): + """Leakage opts into the parent's default baseline.""" + assert Leakage.BASELINE_POLICY is BaselinePolicy.Enabled @pytest.mark.usefixtures(*FIXTURES) diff --git a/tests/unit/scenario/test_psychosocial_harms.py b/tests/unit/scenario/test_psychosocial_harms.py index a8e3325765..ce16363ade 100644 --- a/tests/unit/scenario/test_psychosocial_harms.py +++ b/tests/unit/scenario/test_psychosocial_harms.py @@ -393,3 +393,36 @@ def test_aggregate_tags(self): def test_strategy_values(self): """Test that strategy values are correct.""" assert PsychosocialStrategy.ALL.value == "all" + + +@pytest.mark.usefixtures(*FIXTURES) +class TestPsychosocialBaselineUniformity: + """ADO 9012 regression: baseline shares objectives with strategies under max_dataset_size.""" + + async def test_one_resolution_call_baseline_matches_strategies(self, mock_objective_target, mock_objective_scorer): + from pyrit.scenario import DatasetConfiguration + + seed_groups = [SeedGroup(seeds=[SeedObjective(value=f"obj{i}")]) for i in range(10)] + config = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + + first_sample = seed_groups[:3] + second_sample = seed_groups[5:8] + with ( + patch.object(Psychosocial, "_extract_harm_category_filter", return_value=None), + patch( + "pyrit.scenario.core.dataset_configuration.random.sample", + side_effect=[first_sample, second_sample], + ) as mock_sample, + ): + scenario = Psychosocial(objective_scorer=mock_objective_scorer) + await scenario.initialize_async( + objective_target=mock_objective_target, + dataset_config=config, + include_baseline=True, + ) + + assert mock_sample.call_count == 1 + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + baseline_objs = set(scenario._atomic_attacks[0].objectives) + for attack in scenario._atomic_attacks[1:]: + assert set(attack.objectives) == baseline_objs diff --git a/tests/unit/scenario/test_rapid_response.py b/tests/unit/scenario/test_rapid_response.py index f81093f80c..ecaef3d02c 100644 --- a/tests/unit/scenario/test_rapid_response.py +++ b/tests/unit/scenario/test_rapid_response.py @@ -245,7 +245,7 @@ async def _init_and_get_attacks( scenario = RapidResponse( objective_scorer=mock_objective_scorer, ) - init_kwargs = {"objective_target": mock_objective_target} + init_kwargs = {"objective_target": mock_objective_target, "include_baseline": False} if strategies: init_kwargs["scenario_strategies"] = strategies await scenario.initialize_async(**init_kwargs) @@ -392,6 +392,7 @@ async def test_unknown_technique_skipped_with_warning(self, mock_objective_targe await scenario.initialize_async( objective_target=mock_objective_target, scenario_strategies=[_strategy_class().ALL], + include_baseline=False, ) attacks = await scenario._get_atomic_attacks_async() # Only prompt_sending should have produced attacks diff --git a/tests/unit/scenario/test_scam.py b/tests/unit/scenario/test_scam.py index 80092bb98b..1fc5744fa4 100644 --- a/tests/unit/scenario/test_scam.py +++ b/tests/unit/scenario/test_scam.py @@ -219,6 +219,7 @@ async def test_attack_generation_for_singleturn_async( objective_target=mock_objective_target, scenario_strategies=[single_turn_strategy], dataset_config=mock_dataset_config, + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() @@ -237,6 +238,7 @@ async def test_attack_generation_for_multiturn_async( objective_target=mock_objective_target, scenario_strategies=[multi_turn_strategy], dataset_config=mock_dataset_config, + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() @@ -303,6 +305,7 @@ async def test_max_turns_default_used_when_unset_async( objective_target=mock_objective_target, scenario_strategies=[multi_turn_strategy], dataset_config=mock_dataset_config, + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() @@ -321,6 +324,7 @@ async def test_max_turns_override_flows_into_attack_async( objective_target=mock_objective_target, scenario_strategies=[multi_turn_strategy], dataset_config=mock_dataset_config, + include_baseline=False, ) atomic_attacks = await scenario._get_atomic_attacks_async() @@ -400,3 +404,36 @@ async def test_no_target_duplication_async( assert objective_target != scorer_target assert objective_target != adversarial_target assert scorer_target != adversarial_target + + +@pytest.mark.usefixtures(*FIXTURES) +class TestScamBaselineUniformity: + """ADO 9012 regression: baseline shares objectives with strategies under max_dataset_size.""" + + async def test_one_resolution_call_baseline_matches_strategies( + self, mock_objective_target, mock_objective_scorer, single_turn_strategy + ): + from pyrit.models import SeedGroup, SeedObjective + + seed_groups = [SeedGroup(seeds=[SeedObjective(value=f"obj{i}")]) for i in range(10)] + config = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + + first_sample = seed_groups[:3] + second_sample = seed_groups[5:8] + with patch( + "pyrit.scenario.core.dataset_configuration.random.sample", + side_effect=[first_sample, second_sample], + ) as mock_sample: + scenario = Scam(objective_scorer=mock_objective_scorer) + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[single_turn_strategy], + dataset_config=config, + include_baseline=True, + ) + + assert mock_sample.call_count == 1 + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + baseline_objs = set(scenario._atomic_attacks[0].objectives) + for attack in scenario._atomic_attacks[1:]: + assert set(attack.objectives) == baseline_objs diff --git a/tests/unit/scenario/test_scenario.py b/tests/unit/scenario/test_scenario.py index 261bf51d7f..e7042183d6 100644 --- a/tests/unit/scenario/test_scenario.py +++ b/tests/unit/scenario/test_scenario.py @@ -3,6 +3,7 @@ """Tests for the scenarios.Scenario class.""" +from typing import ClassVar from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch import pytest @@ -12,7 +13,7 @@ from pyrit.memory import CentralMemory from pyrit.models import AttackOutcome, AttackResult from pyrit.scenario import DatasetConfiguration, ScenarioIdentifier, ScenarioResult -from pyrit.scenario.core import AtomicAttack, Scenario, ScenarioStrategy +from pyrit.scenario.core import AtomicAttack, BaselinePolicy, Scenario, ScenarioStrategy from pyrit.score import Scorer # Reusable test scorer identifier @@ -97,10 +98,11 @@ def sample_attack_results(): class ConcreteScenario(Scenario): """Concrete implementation of Scenario for testing.""" - def __init__(self, atomic_attacks_to_return=None, **kwargs): - # Default include_default_baseline=False for tests unless explicitly specified - kwargs.setdefault("include_default_baseline", False) + # Tests using this fixture should default to no baseline; set the class policy to Forbidden + # so we don't have to thread include_baseline=False through every initialize_async call. + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Forbidden + def __init__(self, atomic_attacks_to_return=None, **kwargs): # Add required strategy_class if not provided class TestStrategy(ScenarioStrategy): @@ -700,7 +702,23 @@ def default_dataset_config(cls) -> DatasetConfiguration: return DatasetConfiguration() async def _get_atomic_attacks_async(self): - return self._atomic_attacks_to_return + atomic_attacks = list(self._atomic_attacks_to_return) + if self._include_baseline: + groups_by_dataset = self._dataset_config.get_seed_attack_groups() + all_seed_groups = [g for groups in groups_by_dataset.values() for g in groups] + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=all_seed_groups)) + return atomic_attacks + + +class _LegacyOverrideScenario(ConcreteScenarioWithTrueFalseScorer): + """Override that does NOT emit baseline — exercises the deprecation rescue path. + + Real user scenarios written before the structural fix may follow this pattern; + the rescue path warns and injects baseline so they keep working until 0.16.0. + """ + + async def _get_atomic_attacks_async(self): + return list(self._atomic_attacks_to_return) @pytest.mark.usefixtures("patch_central_database") @@ -711,19 +729,20 @@ async def test_initialize_async_with_empty_strategies_and_baseline(self, mock_ob """Test that baseline is included when include_baseline=True, regardless of strategies.""" from pyrit.models import SeedAttackGroup, SeedObjective - # Create a scenario with include_default_baseline=True and TrueFalseScorer + # Create a scenario with TrueFalseScorer; baseline is included by default scenario = ConcreteScenarioWithTrueFalseScorer( name="Baseline Only Test", version=1, - include_default_baseline=True, ) # Create a mock dataset config with seed groups mock_dataset_config = MagicMock(spec=DatasetConfiguration) - mock_dataset_config.get_all_seed_attack_groups.return_value = [ - SeedAttackGroup(seeds=[SeedObjective(value="test objective 1")]), - SeedAttackGroup(seeds=[SeedObjective(value="test objective 2")]), - ] + mock_dataset_config.get_seed_attack_groups.return_value = { + "default": [ + SeedAttackGroup(seeds=[SeedObjective(value="test objective 1")]), + SeedAttackGroup(seeds=[SeedObjective(value="test objective 2")]), + ] + } # Initialize with None (default strategy) — [] also works, both expand defaults await scenario.initialize_async( @@ -740,18 +759,17 @@ async def test_baseline_only_execution_runs_successfully(self, mock_objective_ta """Test that baseline-only scenario can run successfully.""" from pyrit.models import SeedAttackGroup, SeedObjective - # Create a scenario with include_default_baseline=True and TrueFalseScorer + # Create a scenario with TrueFalseScorer; baseline is included by default scenario = ConcreteScenarioWithTrueFalseScorer( name="Baseline Only Test", version=1, - include_default_baseline=True, ) # Create a mock dataset config with seed groups mock_dataset_config = MagicMock(spec=DatasetConfiguration) - mock_dataset_config.get_all_seed_attack_groups.return_value = [ - SeedAttackGroup(seeds=[SeedObjective(value="test objective 1")]), - ] + mock_dataset_config.get_seed_attack_groups.return_value = { + "default": [SeedAttackGroup(seeds=[SeedObjective(value="test objective 1")])] + } # Initialize with None — [] also expands defaults now, both are equivalent await scenario.initialize_async( @@ -776,7 +794,6 @@ async def test_empty_strategies_without_baseline_allows_initialization(self, moc scenario = ConcreteScenario( name="No Baseline Test", version=1, - include_default_baseline=False, # No baseline ) mock_dataset_config = MagicMock(spec=DatasetConfiguration) @@ -799,7 +816,6 @@ async def test_standalone_baseline_uses_dataset_config_seeds(self, mock_objectiv scenario = ConcreteScenarioWithTrueFalseScorer( name="Baseline Seeds Test", version=1, - include_default_baseline=True, ) # Create specific seed groups to verify they're used @@ -810,7 +826,7 @@ async def test_standalone_baseline_uses_dataset_config_seeds(self, mock_objectiv ] mock_dataset_config = MagicMock(spec=DatasetConfiguration) - mock_dataset_config.get_all_seed_attack_groups.return_value = expected_seeds + mock_dataset_config.get_seed_attack_groups.return_value = {"default": expected_seeds} await scenario.initialize_async( objective_target=mock_objective_target, @@ -892,6 +908,159 @@ async def test_execute_scenario_raises_when_scenario_result_id_is_none(): await scenario._execute_scenario_async() +@pytest.mark.usefixtures("patch_central_database") +class TestScenarioBaselineUniformObjectives: + """ADO 9012 regression: baseline and strategy share objectives under max_dataset_size. + + The structural fix collapses to a single seed-group resolution call per scenario + run. Both the strategy atomic attacks and the baseline use the same sampled + population, so ``random.sample`` runs once and the two groups match. + """ + + async def test_baseline_objectives_match_atomic_attacks_under_max_dataset_size( + self, + mock_objective_target, + ): + from pyrit.models import SeedGroup, SeedObjective + from pyrit.scenario.core.attack_technique import AttackTechnique + + seed_groups = [SeedGroup(seeds=[SeedObjective(value=f"obj{i}")]) for i in range(10)] + config = DatasetConfiguration(seed_groups=seed_groups, max_dataset_size=3) + + class StrategyScenario(ConcreteScenarioWithTrueFalseScorer): + async def _get_atomic_attacks_async(self): + groups_by_dataset = self._dataset_config.get_seed_attack_groups() + all_seed_groups = [g for groups in groups_by_dataset.values() for g in groups] + atomic_attacks = [ + AtomicAttack( + atomic_attack_name="strategy", + attack_technique=AttackTechnique(attack=MagicMock()), + seed_groups=all_seed_groups, + ) + ] + if self._include_baseline: + atomic_attacks.insert(0, self._build_baseline_atomic_attack(seed_groups=all_seed_groups)) + return atomic_attacks + + # Two distinct samples wired up. A buggy implementation with a second + # resolution call would consume both; the structural fix consumes one. + first_sample = seed_groups[:3] + second_sample = seed_groups[5:8] + with patch( + "pyrit.scenario.core.dataset_configuration.random.sample", + side_effect=[first_sample, second_sample], + ) as mock_sample: + scenario = StrategyScenario(name="ADO 9012 regression", version=1) + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=None, + dataset_config=config, + ) + + assert mock_sample.call_count == 1 + + baseline, strategy = scenario._atomic_attacks + assert baseline.atomic_attack_name == "baseline" + assert strategy.atomic_attack_name == "strategy" + assert set(baseline.objectives) == set(strategy.objectives) + assert len(baseline.objectives) == 3 + + +@pytest.mark.usefixtures("patch_central_database") +class TestBuildBaselineAtomicAttack: + """Unit tests for Scenario._build_baseline_atomic_attack.""" + + def _seed_groups(self): + from pyrit.models import SeedAttackGroup, SeedObjective + + return [SeedAttackGroup(seeds=[SeedObjective(value="x")])] + + def test_returns_baseline_atomic_attack(self, mock_objective_target): + from pyrit.executor.attack.single_turn.prompt_sending import PromptSendingAttack + + seed_groups = self._seed_groups() + scenario = ConcreteScenarioWithTrueFalseScorer(name="T", version=1) + scenario._objective_target = mock_objective_target + + atomic = scenario._build_baseline_atomic_attack(seed_groups=seed_groups) + + assert atomic.atomic_attack_name == "baseline" + assert atomic.seed_groups == seed_groups + assert isinstance(atomic.attack_technique.attack, PromptSendingAttack) + + def test_raises_when_target_is_none(self): + scenario = ConcreteScenarioWithTrueFalseScorer(name="T", version=1) + # _objective_target is None pre-initialize_async + + with pytest.raises(ValueError, match="Objective target is required"): + scenario._build_baseline_atomic_attack(seed_groups=self._seed_groups()) + + def test_raises_when_scorer_is_none(self, mock_objective_target): + scenario = ConcreteScenarioWithTrueFalseScorer(name="T", version=1) + scenario._objective_target = mock_objective_target + scenario._objective_scorer = None # type: ignore[assignment] + + with pytest.raises(ValueError, match="Objective scorer is required"): + scenario._build_baseline_atomic_attack(seed_groups=self._seed_groups()) + + +@pytest.mark.usefixtures("patch_central_database") +class TestBaselineEmissionDeprecationRescue: + """Deprecation rescue (removed in 0.16.0): overrides that don't emit baseline get a + DeprecationWarning + auto-injected baseline so they keep working during the migration.""" + + @staticmethod + def _dataset_config(): + from pyrit.models import SeedGroup, SeedObjective + + return DatasetConfiguration( + seed_groups=[SeedGroup(seeds=[SeedObjective(value="x")])], + ) + + async def test_rescue_emits_warning_and_injects_baseline(self, mock_objective_target): + import warnings + + scenario = _LegacyOverrideScenario(name="LegacyOverride", version=1) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + await scenario.initialize_async( + objective_target=mock_objective_target, + dataset_config=self._dataset_config(), + include_baseline=True, + ) + + deprecations = [ + w + for w in caught + if issubclass(w.category, DeprecationWarning) and "_get_atomic_attacks_async" in str(w.message) + ] + assert len(deprecations) == 1, "rescue should emit exactly one DeprecationWarning naming the method" + assert "0.16.0" in str(deprecations[0].message) + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + + async def test_well_behaved_override_does_not_trigger_rescue(self, mock_objective_target): + import warnings + + scenario = ConcreteScenarioWithTrueFalseScorer(name="GoodCitizen", version=1) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + await scenario.initialize_async( + objective_target=mock_objective_target, + dataset_config=self._dataset_config(), + include_baseline=True, + ) + + rescue_warnings = [ + w + for w in caught + if issubclass(w.category, DeprecationWarning) and "_get_atomic_attacks_async" in str(w.message) + ] + assert not rescue_warnings, "well-behaved override should not trigger the rescue path" + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + + @pytest.mark.usefixtures("patch_central_database") class TestValidateStoredScenario: """Tests for Scenario._validate_stored_scenario.""" diff --git a/tests/unit/scenario/test_scenario_parameters.py b/tests/unit/scenario/test_scenario_parameters.py index ae6eaf0010..5289b61ee8 100644 --- a/tests/unit/scenario/test_scenario_parameters.py +++ b/tests/unit/scenario/test_scenario_parameters.py @@ -3,6 +3,7 @@ """Tests for Scenario custom parameter declaration, coercion, and validation (Stage 1b).""" +from typing import ClassVar from unittest.mock import MagicMock import pytest @@ -10,7 +11,7 @@ from pyrit.common import Parameter from pyrit.identifiers import ComponentIdentifier from pyrit.scenario import DatasetConfiguration -from pyrit.scenario.core import Scenario, ScenarioStrategy +from pyrit.scenario.core import BaselinePolicy, Scenario, ScenarioStrategy from pyrit.score import Scorer _TEST_SCORER_ID = ComponentIdentifier(class_name="MockScorer", class_module="tests.unit.scenarios") @@ -33,6 +34,9 @@ def get_aggregate_tags(cls) -> set[str]: return {"all"} class _ParamTestScenario(Scenario): + # No baseline in tests so atomic_attacks observations stay deterministic. + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Forbidden + @classmethod def get_strategy_class(cls): return _ParamTestStrategy @@ -60,7 +64,6 @@ async def _get_atomic_attacks_async(self): version=1, strategy_class=_ParamTestStrategy, objective_scorer=mock_scorer, - include_default_baseline=False, ) diff --git a/tests/unit/scenario/test_scenario_partial_results.py b/tests/unit/scenario/test_scenario_partial_results.py index 2cc7df714a..a18625dc8b 100644 --- a/tests/unit/scenario/test_scenario_partial_results.py +++ b/tests/unit/scenario/test_scenario_partial_results.py @@ -3,6 +3,7 @@ """Additional tests for Scenario retry with AttackExecutorResult functionality.""" +from typing import ClassVar from unittest.mock import MagicMock, PropertyMock import pytest @@ -12,7 +13,7 @@ from pyrit.memory import CentralMemory from pyrit.models import AttackOutcome, AttackResult from pyrit.scenario import DatasetConfiguration, ScenarioResult -from pyrit.scenario.core import AtomicAttack, Scenario, ScenarioStrategy +from pyrit.scenario.core import AtomicAttack, BaselinePolicy, Scenario, ScenarioStrategy def _mock_scorer_id(name: str = "MockScorer") -> ComponentIdentifier: @@ -73,10 +74,9 @@ def filter_objectives(*, remaining_objectives): class ConcreteScenario(Scenario): """Concrete implementation of Scenario for testing.""" - def __init__(self, *, atomic_attacks_to_return=None, objective_scorer=None, **kwargs): - # Default include_default_baseline=False for tests unless explicitly specified - kwargs.setdefault("include_default_baseline", False) + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Forbidden + def __init__(self, *, atomic_attacks_to_return=None, objective_scorer=None, **kwargs): # Get strategy_class from kwargs or use default strategy_class = kwargs.pop("strategy_class", None) or self.get_strategy_class() diff --git a/tests/unit/scenario/test_scenario_retry.py b/tests/unit/scenario/test_scenario_retry.py index 2ff0555192..836503ff5f 100644 --- a/tests/unit/scenario/test_scenario_retry.py +++ b/tests/unit/scenario/test_scenario_retry.py @@ -3,6 +3,7 @@ """Tests for Scenario retry functionality.""" +from typing import ClassVar from unittest.mock import AsyncMock, MagicMock, PropertyMock import pytest @@ -12,7 +13,7 @@ from pyrit.memory import CentralMemory from pyrit.models import AttackOutcome, AttackResult from pyrit.scenario import DatasetConfiguration, ScenarioResult -from pyrit.scenario.core import AtomicAttack, Scenario, ScenarioStrategy +from pyrit.scenario.core import AtomicAttack, BaselinePolicy, Scenario, ScenarioStrategy # Test constants TEST_ATTACK_TYPE = "TestAttack" @@ -136,10 +137,9 @@ def create_mock_atomic_attack(name: str, objectives: list[str], run_async_mock: class ConcreteScenario(Scenario): """Concrete implementation of Scenario for testing.""" - def __init__(self, atomic_attacks_to_return=None, objective_scorer=None, **kwargs): - # Default include_default_baseline=False for tests unless explicitly specified - kwargs.setdefault("include_default_baseline", False) + BASELINE_POLICY: ClassVar[BaselinePolicy] = BaselinePolicy.Forbidden + def __init__(self, atomic_attacks_to_return=None, objective_scorer=None, **kwargs): # Get strategy_class from kwargs or use default strategy_class = kwargs.pop("strategy_class", None) or self.get_strategy_class()