Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .pyrit_conf_example
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ operation: op_trash_panda
# Applies only to the pyrit_backend server.
max_concurrent_scenario_runs: 3

# Custom Initializer Registration (REST API)
# -------------------------------------------
# When true, the REST API accepts POST /api/initializers to register custom
# initializer scripts and DELETE /api/initializers/{name} to remove custom
# initializers.
#
# ⚠️ WARNING: Enabling this allows arbitrary Python code execution on the
# server via the REST API. Only enable on trusted networks.
# The pyrit_backend default host is localhost, which limits exposure.
# If you bind to 0.0.0.0, ensure you are on a trusted network.
#
Comment thread
rlundeen2 marked this conversation as resolved.
# Default: false
allow_custom_initializers: false

# Silent Mode
# -----------
# If true, suppresses print statements during initialization.
Expand Down
2 changes: 2 additions & 0 deletions pyrit/backend/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
InitializerParameterSummary,
ListRegisteredInitializersResponse,
RegisteredInitializer,
RegisterInitializerRequest,
)
from pyrit.backend.models.scenarios import (
ListRegisteredScenariosResponse,
Expand Down Expand Up @@ -110,6 +111,7 @@
"InitializerParameterSummary",
"ListRegisteredInitializersResponse",
"RegisteredInitializer",
"RegisterInitializerRequest",
# Targets
"CreateTargetRequest",
"TargetCapabilitiesInfo",
Expand Down
16 changes: 13 additions & 3 deletions pyrit/backend/models/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@
before scenario execution. These models represent initializer metadata.
"""

from typing import Optional

from pydantic import BaseModel, Field

from pyrit.backend.models.common import PaginationInfo
from pyrit.identifiers.class_name_utils import REGISTRY_NAME_PATTERN


class InitializerParameterSummary(BaseModel):
"""Summary of an initializer-declared parameter."""

name: str = Field(..., description="Parameter name")
description: str = Field(..., description="Human-readable description of the parameter")
default: Optional[list[str]] = Field(None, description="Default value(s), or None if required")
default: list[str] | None = Field(None, description="Default value(s), or None if required")


class RegisteredInitializer(BaseModel):
Expand All @@ -42,3 +41,14 @@ class ListRegisteredInitializersResponse(BaseModel):

items: list[RegisteredInitializer] = Field(..., description="List of initializer summaries")
pagination: PaginationInfo = Field(..., description="Pagination metadata")


class RegisterInitializerRequest(BaseModel):
"""Request body for registering a custom initializer by uploading script content."""

name: str = Field(
...,
pattern=REGISTRY_NAME_PATTERN,
description="Registry name for the initializer (e.g., 'my_custom')",
)
script_content: str = Field(..., description="Python source code containing a PyRITInitializer subclass")
114 changes: 107 additions & 7 deletions pyrit/backend/routes/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,56 @@
"""
Initializer API routes.

Provides endpoints for listing available initializers and their metadata.
Provides endpoints for listing, registering, and removing initializers.

Route structure:
/api/initializers — list all initializers
/api/initializers/{name} — get single initializer detail
GET /api/initializers — list all initializers
GET /api/initializers/{name} — get single initializer detail
POST /api/initializers — register initializer from script
DELETE /api/initializers/{name} — unregister an initializer
"""

from typing import Optional

from fastapi import APIRouter, HTTPException, Query, status
from fastapi import APIRouter, HTTPException, Query, Request, status

from pyrit.backend.models.common import ProblemDetail
from pyrit.backend.models.initializers import (
ListRegisteredInitializersResponse,
RegisteredInitializer,
RegisterInitializerRequest,
)
from pyrit.backend.services.initializer_service import get_initializer_service

router = APIRouter(prefix="/initializers", tags=["initializers"])


def _check_custom_initializers_allowed(request: Request) -> None:
"""
Check that allow_custom_initializers is enabled on the server.

Args:
request: The incoming FastAPI request.

Raises:
HTTPException: 403 if custom initializer operations are not enabled.
"""
allowed = getattr(request.app.state, "allow_custom_initializers", False)
if not allowed:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Custom initializer operations are disabled. "
"Set allow_custom_initializers: true in .pyrit_conf to enable."
),
)


@router.get(
"",
response_model=ListRegisteredInitializersResponse,
)
async def list_initializers(
limit: int = Query(50, ge=1, le=200, description="Maximum items per page"),
cursor: Optional[str] = Query(None, description="Pagination cursor (initializer_name to start after)"),
cursor: str | None = Query(None, description="Pagination cursor (initializer_name to start after)"),
) -> ListRegisteredInitializersResponse:
"""
List all available initializers.
Expand Down Expand Up @@ -73,3 +95,81 @@ async def get_initializer(initializer_name: str) -> RegisteredInitializer:
)

return initializer


@router.post(
"",
response_model=RegisteredInitializer,
status_code=status.HTTP_201_CREATED,
responses={
403: {"model": ProblemDetail, "description": "Custom initializer operations disabled"},
409: {"model": ProblemDetail, "description": "Initializer name already registered"},
},
)
async def register_initializer(
request: Request,
body: RegisterInitializerRequest,
) -> RegisteredInitializer:
"""
Register an initializer by uploading Python source code.
Comment thread
rlundeen2 marked this conversation as resolved.

The script must contain a concrete PyRITInitializer subclass.
Requires allow_custom_initializers to be enabled in pyrit_conf.

Args:
request: The incoming FastAPI request.
body: Request body with name and script_content.

Returns:
The newly registered initializer summary.
"""
_check_custom_initializers_allowed(request)
service = get_initializer_service()

try:
return await service.register_initializer_async(name=body.name, script_content=body.script_content)
except ValueError as e:
detail = str(e)
if "already registered" in detail:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=detail) from None
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail) from None


@router.delete(
"/{initializer_name}",
status_code=status.HTTP_204_NO_CONTENT,
responses={
400: {"model": ProblemDetail, "description": "Cannot remove built-in initializer"},
403: {"model": ProblemDetail, "description": "Custom initializer operations disabled"},
404: {"model": ProblemDetail, "description": "Initializer not found"},
},
)
async def unregister_initializer(
request: Request,
initializer_name: str,
) -> None:
"""
Remove a custom initializer from the registry.

Built-in initializers cannot be removed. Requires
allow_custom_initializers to be enabled in pyrit_conf.

Args:
request: The incoming FastAPI request.
initializer_name: Registry name of the initializer to remove.
"""
_check_custom_initializers_allowed(request)
service = get_initializer_service()

try:
await service.unregister_initializer_async(initializer_name=initializer_name)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from None
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Initializer '{initializer_name}' not found",
) from None
47 changes: 44 additions & 3 deletions pyrit/backend/services/initializer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
# Licensed under the MIT license.

"""
Initializer service for listing available initializers.
Initializer service for listing, registering, and removing initializers.

Provides read-only access to the InitializerRegistry, exposing initializer
Provides access to the InitializerRegistry, exposing initializer
metadata through the REST API.
"""

import logging
from functools import lru_cache

from pyrit.backend.models.common import PaginationInfo
Expand All @@ -18,6 +19,8 @@
)
from pyrit.registry import InitializerMetadata, InitializerRegistry

logger = logging.getLogger(__name__)


def _metadata_to_registered_initializer(metadata: InitializerMetadata) -> RegisteredInitializer:
"""
Expand Down Expand Up @@ -47,7 +50,7 @@ def _metadata_to_registered_initializer(metadata: InitializerMetadata) -> Regist

class InitializerService:
"""
Service for listing available initializers.
Service for listing, registering, and removing initializers.

Uses InitializerRegistry as the source of truth for initializer metadata.
"""
Expand Down Expand Up @@ -99,6 +102,44 @@ async def get_initializer_async(self, *, initializer_name: str) -> RegisteredIni
return _metadata_to_registered_initializer(metadata)
return None

async def register_initializer_async(
self,
*,
name: str,
script_content: str,
) -> RegisteredInitializer:
"""
Register an initializer from uploaded Python source code.

Args:
name: Registry name for the new initializer.
script_content: Python source code containing a PyRITInitializer subclass.

Returns:
The newly registered initializer summary.

Raises:
ValueError: If the script is invalid or contains no initializer class.
"""
self._registry.register_from_content(name=name, script_content=script_content)

initializer = await self.get_initializer_async(initializer_name=name)
if not initializer:
raise ValueError(f"Initializer '{name}' was registered but metadata could not be retrieved.")
return initializer

async def unregister_initializer_async(self, *, initializer_name: str) -> None:
"""
Remove a custom initializer from the registry.

Built-in initializers cannot be removed.

Args:
initializer_name: The registry name to remove.
"""
self._registry.unregister_and_cleanup(initializer_name)
logger.info(f"Unregistered initializer: {initializer_name}")

@staticmethod
def _paginate(
*,
Expand Down
2 changes: 2 additions & 0 deletions pyrit/cli/frontend_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def __init__(
self._operator = config.operator
self._operation = config.operation
self._max_concurrent_scenario_runs = config.max_concurrent_scenario_runs
self._allow_custom_initializers = config.allow_custom_initializers

# Lazy-loaded registries
self._scenario_registry: Optional[ScenarioRegistry] = None
Expand Down Expand Up @@ -223,6 +224,7 @@ def with_overrides(
derived._operator = self._operator
derived._operation = self._operation
derived._max_concurrent_scenario_runs = self._max_concurrent_scenario_runs
derived._allow_custom_initializers = self._allow_custom_initializers
derived._scenario_config = self._scenario_config

# Apply overrides or inherit
Expand Down
9 changes: 9 additions & 0 deletions pyrit/cli/pyrit_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,17 @@ async def initialize_and_run_async(*, parsed_args: Namespace) -> int:
default_labels["operation"] = context._operation
app.state.default_labels = default_labels
app.state.max_concurrent_scenario_runs = context._max_concurrent_scenario_runs
app.state.allow_custom_initializers = context._allow_custom_initializers

display_host = parsed_args.host
if context._allow_custom_initializers:
print("⚠️ WARNING: Custom initializer registration is ENABLED (allow_custom_initializers: true).")
print(" This allows arbitrary Python code execution via the REST API.")
if parsed_args.host == "0.0.0.0":
print(" 🚨 Server is bound to 0.0.0.0 — accessible from the NETWORK. Use only on trusted networks!")
else:
print(f" Server is bound to {display_host}.")

print(f"🚀 Starting PyRIT backend on http://{display_host}:{parsed_args.port}")
print(f" API Docs: http://{display_host}:{parsed_args.port}/docs")
if parsed_args.host == "0.0.0.0":
Expand Down
4 changes: 4 additions & 0 deletions pyrit/identifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
build_seed_identifier,
)
from pyrit.identifiers.class_name_utils import (
REGISTRY_NAME_PATTERN,
class_name_to_snake_case,
snake_case_to_class_name,
validate_registry_name,
)
from pyrit.identifiers.component_identifier import ComponentIdentifier, Identifiable, config_hash
from pyrit.identifiers.evaluation_identifier import (
Expand All @@ -31,8 +33,10 @@
"compute_eval_hash",
"EvaluationIdentifier",
"Identifiable",
"REGISTRY_NAME_PATTERN",
"ScorerEvaluationIdentifier",
"snake_case_to_class_name",
"validate_registry_name",
"config_hash",
"IdentifierFilter",
"IdentifierType",
Expand Down
25 changes: 25 additions & 0 deletions pyrit/identifiers/class_name_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,31 @@

import re

# Valid registry names: lowercase letter followed by up to 63 lowercase
# letters, digits, or underscores. This matches the output of
# class_name_to_snake_case and is safe for use as filesystem components.
REGISTRY_NAME_PATTERN = r"^[a-z][a-z0-9_]{0,63}$"

_REGISTRY_NAME_RE = re.compile(REGISTRY_NAME_PATTERN)


def validate_registry_name(name: str) -> None:
"""
Validate that *name* is a legal registry name.

Args:
name: The name to validate.

Raises:
ValueError: If *name* does not match the required pattern.
"""
if not _REGISTRY_NAME_RE.match(name):
raise ValueError(
f"Invalid registry name '{name}'. "
f"Names must match {REGISTRY_NAME_PATTERN} "
"(lowercase ASCII, digits, underscores; 1-64 chars; must start with a letter)."
)


def class_name_to_snake_case(class_name: str, *, suffix: str = "") -> str:
"""
Expand Down
Loading
Loading