Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,732 changes: 2,732 additions & 0 deletions config_files/model_tests/base_210M.yaml

Large diffs are not rendered by default.

419 changes: 419 additions & 0 deletions config_files/model_tests/base_toadapt.yaml

Large diffs are not rendered by default.

2,728 changes: 2,728 additions & 0 deletions config_files/model_tests/leonardo/base_210M.yaml

Large diffs are not rendered by default.

158 changes: 158 additions & 0 deletions docs/components/downstream_evaluation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Downstream Evaluation Pipeline

## Overview

The downstream evaluation pipeline in Modalities is a decoupled, three-stage callback system that executes at configurable step intervals during the training loop.

The order of execution inside `Trainer.train` is:
1. `checkpointing_callback`: Saves the PyTorch/FSDP checkpoint to disk.
2. `conversion_callback`: (Optional) Converts the PyTorch checkpoint to a Hugging Face (HF) checkpoint.
3. `downstream_evaluation_callback`: (Optional) Runs external evaluation tools (like OLMES) on the newly created HF checkpoint.

By keeping conversion and evaluation decoupled, you can configure just the converter, just the evaluator (if HF checkpoints are generated elsewhere), or both.

---

## 1. Conversion Callback (`ModelConverter`)

**Location:** `src/modalities/conversion/model_converter.py` (Lines 10-67)

The `ModelConverter` is a thin wrapper that executes a shell command template via a subprocess.

### Behavior
- Triggered if `num_train_steps_done % eval_interval == 0`.
- Only executes on `global_rank == 0`. You can prefix the command with `CUDA_VISIBLE_DEVICES=X` to manually specify which GPU the evaluation script should run on.
- Reads `last_checkpoint_info.json` from the checkpoint directory to determine the latest checkpoint path.
- Checks if the `{checkpoint_path}/hf_checkpoint` directory already exists. If it does, conversion is skipped.
- If it does not exist, it formats the `command_template` and runs it using `subprocess.run(cmd, shell=True, check=True)`.

### Placeholders
The `command_template` string can use the following placeholders:
- `{checkpoint_path}`: The path to the latest checkpoint directory (resolved at runtime).
- `{output_dir}`: Evaluates to `{checkpoint_path}/hf_checkpoint`.
- `{modalities_config}`: Path to the YAML config file found inside or next to the checkpoint directory.

### YAML Configuration
```yaml
model_converter:
component_key: model_converter
variant_key: default
config:
command_template: "python src/modalities/conversion/gpt2/convert_gpt2.py {modalities_config} {output_dir} --checkpoint_path {checkpoint_path}"
checkpoint_dir: ${settings.paths.experiments_root_path}/${settings.experiment_id}
global_rank: ${settings.cuda_env.global_rank}
eval_interval: 1000
```

---

## 2. Downstream Evaluation Callback (`DownstreamEvaluator`)

**Location:** `src/modalities/evaluator.py` (Lines 210-335)

The `DownstreamEvaluator` checks for the existence of an HF checkpoint, launches an evaluation script via a subprocess, tracks active processes, and syncs OLMES metrics to the active W&B run.

### Behavior
- Triggered if `num_train_steps_done % eval_interval == 0`.
- Only executes on `global_rank == 0`.
- Reads `last_checkpoint_info.json` to find the latest checkpoint.
- Checks if `{checkpoint_path}/hf_checkpoint` exists. If it does NOT exist, evaluation is skipped with a warning (assuming conversion failed or was disabled).
- If the HF checkpoint exists, it formats the `olmes_command_template` and launches it asynchronously using `subprocess.Popen(cmd, shell=True)`.
- **Process Tracking**: Stores `(Popen, step, hf_model_dir)` tuples in `self.active_processes` (Lines 233, 258).
- **Graceful Exit**: `wait_for_evaluations()` (Lines 264-275) iterates over `active_processes`, calls `.wait()`, and syncs metrics after each evaluation completes.
- **W&B Metric Sync**: `_sync_metrics_to_wandb()` (Lines 277-315) parses `metrics-all.jsonl` from the OLMES output directory, extracts `primary_score` for each task alias, and logs them to the active `wandb.run` as `eval/{alias}` at the correct training step. Gracefully skips if W&B is disabled or not installed.

### Placeholders
The `olmes_command_template` string can use the following placeholders:
- `{hf_model_dir}`: The path to the `{checkpoint_path}/hf_checkpoint` directory.
- `{tasks}`: A space-separated string of the tasks provided in the config (Line 248).
- `{step}`: The current `num_train_steps_done`.

### HPC / SLURM Integration
For HPC environments (like Leonardo Booster), running OLMES directly from the trainer process can cause GPU Out-of-Memory (OOM) errors. You can decouple evaluation by creating a wrapper script (`scripts/evaluation/run_olmes_sbatch.sh`) that submits an independent SLURM job using `sbatch --wait`. Because `DownstreamEvaluator` uses `subprocess.Popen` asynchronously, the wrapper script will wait in the background on the training node without blocking the training loop!

> [!IMPORTANT]
> **Nested SLURM Job Environment Isolation (`--export=NONE`)**
> When submitting the nested evaluation job using `sbatch` from within a running SLURM training job, the nested job inherits the parent job's environment variables (such as CUDA variables, `RANK`, `WORLD_SIZE`, `MASTER_ADDR`, etc.) by default. This will cause the evaluation job to fail or behave incorrectly.
>
> To prevent environment leakage, you **must** include `#SBATCH --export=NONE` in the nested `sbatch` script header. This ensures the evaluation job starts with a clean, isolated environment.

### YAML Configuration
```yaml
downstream_evaluator:
component_key: downstream_evaluator
variant_key: default
config:
tokenizer:
instance_key: tokenizer
pass_type: BY_REFERENCE
tasks:
- "arc_challenge::olmes"
- "hellaswag::olmes"
eval_interval: 100
checkpoint_dir: ${settings.paths.experiments_root_path}/${settings.experiment_id}
global_rank: ${settings.cuda_env.global_rank}
olmes_command_template: "bash scripts/evaluation/run_olmes_sbatch.sh {hf_model_dir} '{tasks}' {step} 1024 1"
```

---

## System Integration Summary

For context on how these components are wired into the system, the following files handle the integration:

1. **`src/modalities/trainer.py`**
- `conversion_callback` was added to `train()` signature.
- Pre-loop and in-loop execution order was explicitly set to: `checkpointing_callback` -> `conversion_callback` -> `downstream_evaluation_callback`.

2. **`src/modalities/gym.py`**
- Threads `conversion_callback` through `Gym.run()` and passes it down to `self.trainer.train()`.

3. **`src/modalities/main.py` (Lines 227-249)**
- Resolves `components.model_converter.convert` and `components.downstream_evaluator.evaluate`.
- Passes them into `gym.run()`.
- **Post-Training Wait** (Lines 244-249): At the very end of `run()`, explicitly calls `components.downstream_evaluator.wait_for_evaluations()` with prominent `print_rank_0` logging to ensure training does not exit until evaluations complete.

4. **`src/modalities/config/config.py`**
- Defines Pydantic models `ModelConverterConfig` and `DownstreamEvaluatorConfig`.

5. **`src/modalities/config/instantiation_models.py`**
- Adds `model_converter` and `downstream_evaluator` fields to `TrainingComponentsInstantiationModel`.

6. **`src/modalities/registry/components.py`**
- Registers both classes to the `"default"` component registry.

7. **`src/modalities/conversion/gpt2/convert_gpt2.py` (Lines 105-112)**
- Updated to support Hugging Face tokenizers (`pretrained_hf_tokenizer`) alongside SentencePiece. Detects tokenizer configs and saves `vocab.json` / `tokenizer.json` directly to the `hf_checkpoint` directory.

8. **`tests/test_downstream_evaluator.py`**
- Contains comprehensive tests mocking the `subprocess` calls and verifying interval gating, rank gating, and directory existence logic.

---

## 3. Precaching Datasets (Offline Environments)

If your compute cluster nodes do not have internet access, you must precache the Hugging Face datasets that OLMES requires. We provide a generalized script `scripts/evaluation/precache_tasks.py` that you can run on a login node (or any environment with internet access).

### Usage

Activate your evaluation environment (virtualenv, conda, or your Singularity container shell) and set the `HF_DATASETS_CACHE` and `HF_HOME` variables to a location accessible by your compute nodes.

```bash
# 1. Activate your python environment (e.g. venv where olmes is installed)
source /path/to/olmes/venv/bin/activate
export PYTHONPATH=/path/to/olmes/venv/lib/python3.12/site-packages:$PYTHONPATH

# 2. Point Hugging Face to a shared scratch space or cache directory
export HF_DATASETS_CACHE="/path/to/shared/hf_cache"
export HF_HOME="/path/to/shared/hf_cache"
export HF_TOKEN="your_hf_access_token" # If needed for gated models/datasets

# 3. Define the tasks you need
export OLMES_TASKS="arc_challenge:rc::olmes:full hellaswag:rc::olmes:full gsm8k::olmes"

# 4. Run the precache script
python scripts/evaluation/precache_tasks.py --tasks $OLMES_TASKS
```

This script will resolve the tasks via OLMES and download all required datasets to your cache directory. When you run your training job via `sbatch`, ensure the compute nodes also set `HF_DATASETS_CACHE` and `HF_HOME` to the exact same shared directory.
62 changes: 62 additions & 0 deletions scripts/evaluation/precache_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import argparse
import copy
import os

from datasets import load_dataset
from oe_eval.configs.tasks import TASK_CONFIGS
from oe_eval.launch import resolve_task_suite
from oe_eval.run_eval import load_task


def main():
parser = argparse.ArgumentParser(description="Precache OLMES tasks and required HF datasets.")
parser.add_argument(
"--tasks",
nargs="+",
required=True,
help="List of OLMES tasks to precache (e.g. arc_challenge:rc::olmes:full hellaswag:rc::olmes:full)"
)
args = parser.parse_args()

hf_home = os.environ.get("HF_DATASETS_CACHE", os.environ.get("HF_HOME", "~/.cache/huggingface"))
print(f"HF_DATASETS_CACHE is set to: {hf_home}")

# ---- Part 1: OLMES tasks ----
print("\n--- Caching OLMES tasks ---")
all_tasks = []
for t in args.tasks:
try:
all_tasks += resolve_task_suite(t, {})
except Exception as e:
print(f"!! could not resolve {t}: {e}")

print(f"\nWill download {len(all_tasks)} tasks to {hf_home}")
for task_name in all_tasks:
if task_name not in TASK_CONFIGS:
print(f"?? not in TASK_CONFIGS: {task_name}")
continue
try:
cfg = copy.deepcopy(TASK_CONFIGS[task_name])
task = load_task(cfg, ".")
print(f"-> downloading {task_name}")
task.download()
print(f" done")
except Exception as e:
print(f"!! failed {task_name}: {e}")

# ---- Part 2: pseudo-sources used by paloma_diagnostics.py ----
print("\n--- Caching diagnostics pseudo-source datasets (gsm8k, trivia_qa) ---")
specs = [
('gsm8k', 'main', 'test'),
('trivia_qa', 'rc.nocontext', 'validation'),
]
for path, config, split in specs:
try:
print(f"-> downloading {path} ({config}, {split})")
load_dataset(path, config, split=split)
print(f" done")
except Exception as e:
print(f"!! failed {path}: {e}")

if __name__ == "__main__":
main()
37 changes: 31 additions & 6 deletions src/modalities/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ def __len__(self) -> int:
return self.samples[key].shape[self.batch_dim]


def _apply_to(val, device):
if isinstance(val, torch.Tensor):
return val.to(device)
elif isinstance(val, dict):
return {k: _apply_to(v, device) for k, v in val.items()}
elif isinstance(val, list):
return [_apply_to(v, device) for v in val]
return val


def _apply_detach(val):
if isinstance(val, torch.Tensor):
return val.detach()
elif isinstance(val, dict):
return {k: _apply_detach(v) for k, v in val.items()}
elif isinstance(val, list):
return [_apply_detach(v) for v in val]
return val


@dataclass
class InferenceResultBatch(Batch, TorchDeviceMixin):
"""Stores targets and predictions of an entire batch."""
Expand All @@ -71,12 +91,12 @@ def device(self) -> torch.device:
return self.targets[key].device

def to(self, device: torch.device):
self.predictions = {k: v.to(device) for k, v in self.predictions.items()}
self.targets = {k: v.to(device) for k, v in self.targets.items()}
self.predictions = {k: _apply_to(v, device) for k, v in self.predictions.items()}
self.targets = {k: _apply_to(v, device) for k, v in self.targets.items()}

def detach(self):
self.targets = {k: v.detach() for k, v in self.targets.items()}
self.predictions = {k: v.detach() for k, v in self.predictions.items()}
self.targets = {k: _apply_detach(v) for k, v in self.targets.items()}
self.predictions = {k: _apply_detach(v) for k, v in self.predictions.items()}

def get_predictions(self, key: str) -> torch.Tensor:
if key not in self.predictions:
Expand All @@ -89,8 +109,13 @@ def get_targets(self, key: str) -> torch.Tensor:
return self.targets[key]

def __len__(self) -> int:
key = list(self.predictions.keys())[0]
return self.predictions[key].shape[self.batch_dim]
for v in self.predictions.values():
if isinstance(v, torch.Tensor):
return v.shape[self.batch_dim]
for v in self.targets.values():
if isinstance(v, torch.Tensor):
return v.shape[self.batch_dim]
raise ValueError("No tensor found in predictions or targets to determine batch length")


@dataclass
Expand Down
16 changes: 16 additions & 0 deletions src/modalities/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,22 @@ class ParallelDegreeConfig(BaseModel):
parallelism_methods: list[ParallelismDegrees]


class ModelConverterConfig(BaseModel):
command_template: str
checkpoint_dir: Path
global_rank: Annotated[int, Field(strict=True, ge=0)]
eval_interval: Annotated[int, Field(strict=True, gt=0)]


class DownstreamEvaluatorConfig(BaseModel):
tokenizer: PydanticTokenizerIFType
tasks: list[str]
eval_interval: Annotated[int, Field(strict=True, gt=0)]
checkpoint_dir: Path
global_rank: Annotated[int, Field(strict=True, ge=0)]
olmes_command_template: str


# Recursive type representing arbitrary-depth YAML config structures.
YAMLPrimitive = str | int | float | bool | None
YAMLValue: TypeAlias = YAMLPrimitive | Path | list["YAMLValue"] | dict[str, "YAMLValue"]
Expand Down
4 changes: 4 additions & 0 deletions src/modalities/config/instantiation_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
PydanticSteppableProfilerIFType,
PydanticTextInferenceComponentType,
PydanticTokenizerIFType,
PydanticDownstreamEvaluatorType,
PydanticModelConverterType,
)
from modalities.config.utils import parse_torch_device
from modalities.dataloader.dataset import Dataset
Expand Down Expand Up @@ -192,6 +194,8 @@ def _check_last_step_checkpointed(self) -> "TrainingComponentsInstantiationModel
mfu_calculator: PydanticMFUCalculatorABCType | None = None
scheduled_pipeline: PydanticPipelineType | None = None
device_mesh: PydanticDeviceMeshIFType | None = None
downstream_evaluator: Optional[PydanticDownstreamEvaluatorType] = None
model_converter: Optional[PydanticModelConverterType] = None
model_raw: PydanticPytorchModuleType

@model_validator(mode="after")
Expand Down
4 changes: 4 additions & 0 deletions src/modalities/config/pydantic_if_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from modalities.checkpointing.stateful.app_state import AppState
from modalities.dataloader.collate_fns.collate_if import CollateFnIF
from modalities.dataloader.dataloader import LLMDataLoader
from modalities.conversion.model_converter import ModelConverter
from modalities.evaluator import DownstreamEvaluator
from modalities.inference.text.inference_component import TextInferenceComponent
from modalities.logging_broker.subscriber import MessageSubscriberIF
from modalities.loss_functions import Loss
Expand Down Expand Up @@ -98,3 +100,5 @@ def __get_pydantic_core_schema__(
torch.utils.hooks.RemovableHandle, PydanticThirdPartyTypeIF(torch.utils.hooks.RemovableHandle)
]
PydanticDebuggingType = Annotated[Debugging, PydanticThirdPartyTypeIF(Debugging)]
PydanticDownstreamEvaluatorType = Annotated[DownstreamEvaluator, PydanticThirdPartyTypeIF(DownstreamEvaluator)]
PydanticModelConverterType = Annotated[ModelConverter, PydanticThirdPartyTypeIF(ModelConverter)]
8 changes: 6 additions & 2 deletions src/modalities/conversion/gpt2/configuration_gpt2.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,12 @@ def __init__(
attention_dropout=0.0,
mlp_bias=False,
head_dim=None,
norm_type="layer_norm",
use_qk_norm=False,
qk_norm_dim=None,
**kwargs,
):
if rms_norm_eps is not None:
raise ValueError("RMSNorm is not supported in GPT2 model.")
self.norm_type = norm_type
self.vocab_size = vocab_size
self.max_position_embeddings = max_position_embeddings
self.hidden_size = hidden_size
Expand All @@ -211,6 +213,8 @@ def __init__(
self.attention_dropout = attention_dropout
self.mlp_bias = mlp_bias
self.head_dim = head_dim if head_dim is not None else self.hidden_size // self.num_attention_heads
self.use_qk_norm = use_qk_norm
self.qk_norm_dim = qk_norm_dim
# Validate the correctness of rotary position embeddings parameters
# BC: if there is a 'type' field, copy it it to 'rope_type'.
if self.rope_scaling is not None and "type" in self.rope_scaling:
Expand Down
Loading