Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions embodichain/gen_sim/action_agent_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2021-2026 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------

from __future__ import annotations

"""Action-agent graph compilation and atomic-action runtime."""

__all__: list[str] = []
24 changes: 24 additions & 0 deletions embodichain/gen_sim/action_agent_pipeline/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2021-2026 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------

from __future__ import annotations

__all__ = [
"agent_base",
"compile_agent",
"llm",
"task_agent",
]
94 changes: 94 additions & 0 deletions embodichain/gen_sim/action_agent_pipeline/agents/agent_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2021-2026 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------

from __future__ import annotations

from abc import ABCMeta
import os

from embodichain.utils.utility import load_txt


def _resolve_prompt_path(file_name: str, config_dir: str | None = None) -> str:
# If absolute path, use directly
if os.path.isabs(file_name):
if os.path.exists(file_name):
return file_name
raise FileNotFoundError(f"Prompt file not found: {file_name}")

# Try config directory first (for task-specific prompts)
if config_dir:
config_path = os.path.join(config_dir, file_name)
if os.path.exists(config_path):
return config_path

# Try action_agent_pipeline/prompts directory for reusable prompts.
agents_prompts_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "prompts"
)
agents_path = os.path.join(agents_prompts_dir, file_name)
if os.path.exists(agents_path):
return agents_path

# If still not found, raise error with search paths
searched_paths = []
if config_dir:
searched_paths.append(f" - {config_dir}/{file_name}")
searched_paths.append(f" - {agents_prompts_dir}/{file_name}")

raise FileNotFoundError(
f"Prompt file not found: {file_name}\n"
f"Searched in:\n" + "\n".join(searched_paths)
)


class AgentBase(metaclass=ABCMeta):
def __init__(self, **kwargs) -> None:

assert (
"prompt_kwargs" in kwargs.keys()
), "Key prompt_kwargs must exist in config."

for key, value in kwargs.items():
setattr(self, key, value)

# Get config directory if provided
config_dir = kwargs.get("config_dir", None)
if config_dir:
config_dir = os.path.dirname(os.path.abspath(config_dir))

# Preload and store prompt contents inside self.prompt_kwargs
for key, val in self.prompt_kwargs.items():
if val["type"] == "text":
file_path = _resolve_prompt_path(val["name"], config_dir)
val["content"] = load_txt(file_path)
else:
raise ValueError(
f"Now only support `text` type but {val['type']} is given."
)

def generate(self, *args, **kwargs):
pass

def act(self, *args, **kwargs):
pass

def get_composed_observations(self, **kwargs):
ret = {}
for key, val in self.prompt_kwargs.items():
ret[key] = val["content"]
ret.update(kwargs)
return ret
119 changes: 119 additions & 0 deletions embodichain/gen_sim/action_agent_pipeline/agents/compile_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2021-2026 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------

from __future__ import annotations

import hashlib
import json
from pathlib import Path
from typing import Any

from embodichain.gen_sim.action_agent_pipeline.agents.agent_base import AgentBase
from embodichain.gen_sim.action_agent_pipeline.utils.llm_json import (
extract_json_object,
normalize_json_content,
)
from embodichain.data import database_agent_prompt_dir

__all__ = ["CompileAgent"]

COMPILED_GRAPH_SCHEMA_VERSION = "nominal_graph_v1"


class CompileAgent(AgentBase):
"""Compile and execute nominal atomic-action graph specs."""

query_prefix = "# "
query_suffix = "."
prompt_kwargs: dict[str, dict[str, Any]]

def __init__(self, **kwargs) -> None:
for key, value in kwargs.items():
setattr(self, key, value)
self.prompt_kwargs = kwargs.get("prompt_kwargs", {})

def generate(self, **kwargs):
log_dir = kwargs.get(
"log_dir", Path(database_agent_prompt_dir) / self.task_name
)
file_path = Path(log_dir) / "agent_compiled_graph.json"
task_graph = extract_json_object(kwargs["task_graph"])
task_graph_hash = _stable_json_hash(task_graph)

if not kwargs.get("regenerate", False) and file_path.exists():
existing_bundle = extract_json_object(file_path.read_text(encoding="utf-8"))
metadata = existing_bundle.get("metadata", {})
if (
metadata.get("schema_version") == COMPILED_GRAPH_SCHEMA_VERSION
and metadata.get("task_graph_hash") == task_graph_hash
):
print(f"Compiled graph artifact already exists at {file_path}.")
return file_path, kwargs, None

content = normalize_json_content(
{
"task_graph": task_graph,
"metadata": {
"schema_version": COMPILED_GRAPH_SCHEMA_VERSION,
"task_graph_hash": task_graph_hash,
},
}
)

file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
print(f"Compiled graph artifact saved to {file_path}")
return file_path, kwargs, content

def act(self, graph_file_path, **kwargs):
graph_file_path = Path(graph_file_path)
if graph_file_path.suffix != ".json":
raise ValueError("CompileAgent executes compiled graph JSON artifacts.")

from embodichain.gen_sim.action_agent_pipeline.runtime.graph_compiler import (
compile_agent_graph_from_file,
)

runtime_kwargs = _runtime_kwargs(kwargs, getattr(self, "prompt_kwargs", {}))
graph = compile_agent_graph_from_file(graph_file_path)
result = graph.run(**runtime_kwargs)
print("Compiled agent graph executed successfully.")
return result
Comment on lines +90 to +94

def get_composed_observations(self, **kwargs):
return dict(kwargs)


def _stable_json_hash(content: dict[str, Any]) -> str:
payload = json.dumps(
content, ensure_ascii=False, sort_keys=True, separators=(",", ":")
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()


def _runtime_kwargs(
kwargs: dict[str, Any],
prompt_kwargs: dict[str, dict[str, Any]],
) -> dict[str, Any]:
prompt_only_keys = set(prompt_kwargs)
prompt_only_keys.update(
{
"task_graph",
"observations",
"regenerate",
}
)
return {key: value for key, value in kwargs.items() if key not in prompt_only_keys}
71 changes: 71 additions & 0 deletions embodichain/gen_sim/action_agent_pipeline/agents/llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2021-2026 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------

from __future__ import annotations

from embodichain.gen_sim.action_agent_pipeline.utils.mllm import create_chat_openai

__all__ = ["create_llm", "task_llm"]


# ------------------------------------------------------------------------------
# LLM factory
# ------------------------------------------------------------------------------


def create_llm(*, temperature=0.0, model=None, usage_stage=None):
return create_chat_openai(
temperature=temperature,
model=model,
usage_stage=usage_stage,
)


# ------------------------------------------------------------------------------
# LLM instances
# ------------------------------------------------------------------------------


# Initialize LLM instances, but handle errors gracefully for documentation builds
def _create_llm_safe(*, temperature=0.0, model=None, usage_stage=None):
try:
return create_llm(
temperature=temperature,
model=model,
usage_stage=usage_stage,
)
except Exception:
return None


task_llm = _create_llm_safe(
temperature=0.0,
usage_stage="action_agent.task_graph",
)

if __name__ == "__main__":

def call_llm(prompt, temperature=0.0, model=None):
llm = create_llm(
temperature=temperature,
model=model,
usage_stage="action_agent.debug",
)
response = llm.invoke(prompt)
return response.content

response = call_llm(prompt="Which model you are?", temperature=0.0)
print(response)
Loading
Loading