代码库:
hermes-agent· 版本线索:pyproject.toml中version = "0.14.0"一句话导读:Hermes Agent 的核心不是某一个超长
run_agent.py,而是一个被逐步拆分出来的运行时内核:入口层负责把 CLI、TUI、ACP、Gateway 统一成 session;AIAgent只是门面;真正的决策循环在agent/conversation_loop.py;工具系统由 registry + toolsets + executor 驱动;state.db是 canonical store;上下文压缩通过 session lineage 保持长任务连续性。
引言:Hermes Agent 不是一个 CLI,而是一套 Agent Runtime
第一次打开 hermes-agent 代码库,很容易被它的表层功能误导:它有 hermes 命令、TUI、Telegram/Discord/Slack 等 messaging gateway、ACP 编辑器协议、cron、skills、memory、MCP、web dashboard,看起来像是把很多 AI 工具能力堆在一起。
但深入源码之后,会发现它真正稳定的抽象不是“聊天界面”,而是四层结构:
- 入口层:
hermes_cli/main.py、cli.py、tui_gateway/server.py、acp_adapter/session.py等,把不同平台的用户输入归一化为一次 Agent turn。 - 运行时内核:
run_agent.AIAgent保留兼容门面,初始化委托给agent/agent_init.py,主循环委托给agent/conversation_loop.py。 - 能力层:工具 registry、toolsets、model transports、memory、skills、context engine、checkpoint 等围绕主循环提供扩展能力。
- 状态层:
hermes_state.SessionDB以 SQLite 保存 session 与 message,压缩、恢复、搜索和跨平台连续会话都依赖它。
这篇文章不按功能列表介绍,而按“一个请求如何被执行”来拆:从命令入口,到 AIAgent 初始化,再到模型调用、工具调度、状态落盘、上下文压缩和外围协议接入。
一、入口层:所有路最终都通向 AIAgent
Hermes 的安装入口很直接:pyproject.toml 里定义了三个 console scripts。
[project.scripts]
hermes = "hermes_cli.main:main"
hermes-agent = "run_agent:main"
hermes-acp = "acp_adapter.entry:main"
[tool.setuptools]
py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_bootstrap", "hermes_constants", "hermes_state", "hermes_time", "hermes_logging", "utils"]
这说明项目至少保留了三类入口:
hermes:面向用户的统一 CLI,总入口在hermes_cli.main:main。hermes-agent:更底层的 agent runner,指向run_agent:main。hermes-acp:面向编辑器集成的 ACP server。
1. hermes_cli.main 做的是“总控台”工作
hermes_cli/main.py 的 main() 会构造顶层 parser,把 chat 子命令绑定到 cmd_chat,再继续注册大量管理命令:model、fallback、gateway、cron、sessions、dashboard 等。
def main():
"""Main entry point for hermes CLI."""
# Force UTF-8 stdio on Windows before anything prints. No-op elsewhere.
try:
from hermes_cli.stdio import configure_windows_stdio
configure_windows_stdio()
except Exception:
pass
# Sweep stale ``hermes.exe.old.*`` quarantine files left by previous
# ``hermes update`` runs on Windows. Silent no-op on non-Windows or when
# there's nothing to clean. See ``_quarantine_running_hermes_exe``.
try:
_cleanup_quarantined_exes()
except Exception:
pass
if _try_termux_fast_tui_launch():
return
from hermes_cli._parser import build_top_level_parser
parser, subparsers, chat_parser = build_top_level_parser()
chat_parser.set_defaults(func=cmd_chat)
cmd_chat() 才是聊天路径的关键桥。它先处理 --continue、--resume、首次配置检查、技能同步、--ignore-rules 等环境开关;然后根据 --tui 决定进入 TUI,还是调用经典 cli.py。
def cmd_chat(args):
"""Run interactive chat CLI."""
use_tui = getattr(args, "tui", False) or os.environ.get("HERMES_TUI") == "1"
# Resolve --continue into --resume with the latest session or by name
continue_val = getattr(args, "continue_last", None)
if continue_val and not getattr(args, "resume", None):
if isinstance(continue_val, str):
# -c "session name" — resolve by title or ID
resolved = _resolve_session_by_name_or_id(continue_val)
if resolved:
args.resume = resolved
else:
print(f"No session found matching '{continue_val}'.")
print("Use 'hermes sessions list' to see available sessions.")
sys.exit(1)
else:
# -c with no argument — continue the most recent session
source = "tui" if use_tui else "cli"
last_id = _resolve_last_session(source=source)
if not last_id and source == "tui":
last_id = _resolve_last_session(source="cli")
if last_id:
args.resume = last_id
else:
kind = "TUI" if use_tui else "CLI"
print(f"No previous {kind} session found to continue.")
sys.exit(1)
# ...省略 provider 首次配置、技能同步、环境变量处理...
if use_tui:
_launch_tui(
getattr(args, "resume", None),
tui_dev=getattr(args, "tui_dev", False),
model=getattr(args, "model", None),
provider=getattr(args, "provider", None),
toolsets=getattr(args, "toolsets", None),
skills=getattr(args, "skills", None),
verbose=getattr(args, "verbose", False),
quiet=getattr(args, "quiet", False),
query=getattr(args, "query", None),
image=getattr(args, "image", None),
worktree=getattr(args, "worktree", False),
checkpoints=getattr(args, "checkpoints", False),
pass_session_id=getattr(args, "pass_session_id", False),
max_turns=getattr(args, "max_turns", None),
accept_hooks=getattr(args, "accept_hooks", False),
)
# Import and run the CLI
from cli import main as cli_main
# Build kwargs from args
kwargs = {
"model": args.model,
"provider": getattr(args, "provider", None),
"toolsets": args.toolsets,
"skills": getattr(args, "skills", None),
"verbose": args.verbose,
"quiet": getattr(args, "quiet", False),
"query": args.query,
"image": getattr(args, "image", None),
"resume": getattr(args, "resume", None),
"worktree": getattr(args, "worktree", False),
"checkpoints": getattr(args, "checkpoints", False),
"pass_session_id": getattr(args, "pass_session_id", False),
"max_turns": getattr(args, "max_turns", None),
"ignore_rules": getattr(args, "ignore_rules", False),
"ignore_user_config": getattr(args, "ignore_user_config", False),
}
# Filter out None values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
try:
cli_main(**kwargs)
这里有一个值得注意的设计:hermes_cli.main 并不直接跑 agent loop,而是负责启动前治理:profile、配置、session resolve、技能同步、TUI 分流、hook/plugin 发现。它像一个 shell,而不是 runtime。
2. cli.py 是传统交互控制器
cli.py 的职责是把用户输入组织成 conversation history,并在第一次需要模型时初始化 agent。它在恢复 session 时会从 SQLite 读取历史,再把历史传给 AIAgent.run_conversation()。
因此,入口层的本质可以概括为:
不同平台负责“如何收消息、如何显示消息、如何恢复上下文”;真正的任务执行统一交给
AIAgent。
二、AIAgent 已经从“大单体”退化为门面
从文件名看,run_agent.py 好像是核心;但现在它更像兼容层。AIAgent.__init__() 不自己做复杂初始化,而是直接 forward 到 agent.agent_init.init_agent()。
class AIAgent:
"""
AI Agent with tool calling capabilities.
This class manages the conversation flow, tool execution, and response handling
for AI models that support function calling.
"""
def __init__(
self,
base_url: str = None,
api_key: str = None,
provider: str = None,
api_mode: str = None,
acp_command: str = None,
acp_args: list[str] | None = None,
command: str = None,
args: list[str] | None = None,
model: str = "",
max_iterations: int = 90, # Default tool-calling iterations (shared with subagents)
tool_delay: float = 1.0,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None,
save_trajectories: bool = False,
verbose_logging: bool = False,
quiet_mode: bool = False,
ephemeral_system_prompt: str = None,
# ...省略大量平台、回调、状态、checkpoint 参数...
pass_session_id: bool = False,
):
"""Forwarder — see ``agent.agent_init.init_agent``."""
from agent.agent_init import init_agent
init_agent(
self,
base_url=base_url,
api_key=api_key,
provider=provider,
api_mode=api_mode,
acp_command=acp_command,
acp_args=acp_args,
command=command,
args=args,
model=model,
max_iterations=max_iterations,
tool_delay=tool_delay,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets,
save_trajectories=save_trajectories,
verbose_logging=verbose_logging,
quiet_mode=quiet_mode,
ephemeral_system_prompt=ephemeral_system_prompt,
# ...继续转发...
pass_session_id=pass_session_id,
)
主循环也是 forward:
def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Forwarder — see ``agent.tool_executor.execute_tool_calls_concurrent``."""
from agent.tool_executor import execute_tool_calls_concurrent
return execute_tool_calls_concurrent(self, assistant_message, messages, effective_task_id, api_call_count)
def _execute_tool_calls_sequential(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Forwarder — see ``agent.tool_executor.execute_tool_calls_sequential``."""
from agent.tool_executor import execute_tool_calls_sequential
return execute_tool_calls_sequential(self, assistant_message, messages, effective_task_id, api_call_count)
def _handle_max_iterations(self, messages: list, api_call_count: int) -> str:
"""Forwarder — see ``agent.chat_completion_helpers.handle_max_iterations``."""
from agent.chat_completion_helpers import handle_max_iterations
return handle_max_iterations(self, messages, api_call_count)
def run_conversation(
self,
user_message: str,
system_message: str = None,
conversation_history: List[Dict[str, Any]] = None,
task_id: str = None,
stream_callback: Optional[callable] = None,
persist_user_message: Optional[str] = None,
) -> Dict[str, Any]:
"""Forwarder — see ``agent.conversation_loop.run_conversation``."""
from agent.conversation_loop import run_conversation
return run_conversation(self, user_message, system_message, conversation_history, task_id, stream_callback, persist_user_message)
这类门面设计通常有两个目的:
- 保持旧 API 稳定:外部测试、插件、脚本仍然可以 patch 或 import
run_agent.AIAgent。 - 把真实实现拆到可维护模块:初始化、conversation loop、tool executor、compression、runtime helpers 分别演进。
从架构演进角度看,这是一个典型的“从巨石文件向 runtime package 迁移”的中间态。
三、初始化:把模型、工具、状态、回调装进同一个运行时对象
agent/agent_init.py 的 init_agent() 参数非常长,这不是偶然:Hermes 的 runtime 必须同时服务 CLI、gateway、TUI、ACP、subagent、cron 等多种调用方。
def init_agent(
agent,
base_url: str = None,
api_key: str = None,
provider: str = None,
api_mode: str = None,
acp_command: str = None,
acp_args: list[str] | None = None,
command: str = None,
args: list[str] | None = None,
model: str = "",
max_iterations: int = 90, # Default tool-calling iterations (shared with subagents)
tool_delay: float = 1.0,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None,
save_trajectories: bool = False,
verbose_logging: bool = False,
quiet_mode: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
log_prefix: str = "",
providers_allowed: List[str] = None,
providers_ignored: List[str] = None,
providers_order: List[str] = None,
provider_sort: str = None,
provider_require_parameters: bool = False,
provider_data_collection: str = None,
openrouter_min_coding_score: Optional[float] = None,
session_id: str = None,
tool_progress_callback: callable = None,
tool_start_callback: callable = None,
tool_complete_callback: callable = None,
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
stream_delta_callback: callable = None,
interim_assistant_callback: callable = None,
tool_gen_callback: callable = None,
status_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
service_tier: str = None,
request_overrides: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
platform: str = None,
user_id: str = None,
user_name: str = None,
chat_id: str = None,
chat_name: str = None,
chat_type: str = None,
thread_id: str = None,
gateway_session_key: str = None,
skip_context_files: bool = False,
load_soul_identity: bool = False,
skip_memory: bool = False,
session_db=None,
parent_session_id: str = None,
iteration_budget: "IterationBudget" = None,
fallback_model: Dict[str, Any] = None,
credential_pool=None,
checkpoints_enabled: bool = False,
checkpoint_max_snapshots: int = 20,
checkpoint_max_total_size_mb: int = 500,
checkpoint_max_file_size_mb: int = 10,
pass_session_id: bool = False,
):
这里的关键不是参数多,而是它把五类运行时状态集中到了一个 agent 对象:
- 模型运行时:
provider、base_url、api_key、api_mode、fallback、credential pool。 - 工具能力面:enabled/disabled toolsets、tool schema、agent-level tools。
- 平台回调:tool progress、stream delta、thinking、reasoning、status、clarify、step。
- 会话状态:
session_id、session_db、parent session、gateway session key。 - 长任务治理:iteration budget、context compressor、memory manager、checkpoint manager。
换句话说,AIAgent 不是一个“模型客户端封装”,而是一次任务执行所需的完整 runtime context。
四、核心循环:模型和工具之间的状态机
真正的主循环在 agent/conversation_loop.py。它的 docstring 已经直接说明:这是从 run_agent.AIAgent 里抽出来的最大块逻辑,负责“一次用户 turn”。
1. turn 初始化:不是直接调模型
run_conversation() 开始后,首先确保 DB session 存在、设置当前模型运行时、绑定日志 session、设置 skill 写入来源、恢复 fallback 主运行时、清洗输入、创建 task_id、重置各种 retry 计数器。
def run_conversation(
agent,
user_message: str,
system_message: str = None,
conversation_history: List[Dict[str, Any]] = None,
task_id: str = None,
stream_callback: Optional[callable] = None,
persist_user_message: Optional[str] = None,
) -> Dict[str, Any]:
"""
Run a complete conversation with tool calling until completion.
"""
# Guard stdio against OSError from broken pipes (systemd/headless/daemon).
_install_safe_stdio()
agent._ensure_db_session()
# Tell auxiliary_client what the live main provider/model are for
# this turn. Used by tools whose behaviour depends on the active
# main model (e.g. vision_analyze's native fast path) so they see
# the CLI/gateway override instead of the stale config.yaml
# default. Idempotent — fine to call every turn.
try:
from agent.auxiliary_client import set_runtime_main
set_runtime_main(
getattr(agent, "provider", "") or "",
getattr(agent, "model", "") or "",
)
except Exception:
pass
# Tag all log records on this thread with the session ID so
# ``hermes logs --session <id>`` can filter a single conversation.
from hermes_logging import set_session_context
set_session_context(agent.session_id)
# Bind the skill write-origin ContextVar for this thread so tool
# handlers (e.g. skill_manage create) can tell whether they are
# running inside the background agent-improvement review fork vs.
# a foreground user-directed turn.
from tools.skill_provenance import set_current_write_origin
set_current_write_origin(getattr(agent, "_memory_write_origin", "assistant_tool"))
# If the previous turn activated fallback, restore the primary
# runtime so this turn gets a fresh attempt with the preferred model.
agent._restore_primary_runtime()
# Sanitize surrogate characters from user input.
if isinstance(user_message, str):
user_message = _sanitize_surrogates(user_message)
# Store stream callback for _interruptible_api_call to pick up
agent._stream_callback = stream_callback
agent._persist_user_message_idx = None
agent._persist_user_message_override = persist_user_message
# Generate unique task_id if not provided to isolate VMs between concurrent tasks
effective_task_id = task_id or str(uuid.uuid4())
这段代码暴露了 Hermes 主循环的一个特点:它把模型请求视为 turn 中的一个阶段,而不是唯一阶段。模型调用之前,runtime 已经完成了 session、日志、内存、技能、fallback、工具隔离等治理。
接着,循环会复制历史、恢复 todo 状态、重建 memory nudge 计数、把当前用户消息追加到 messages。
# Initialize conversation (copy to avoid mutating the caller's list)
messages = list(conversation_history) if conversation_history else []
# Hydrate todo store from conversation history (gateway creates a fresh
# AIAgent per message, so the in-memory store is empty -- we need to
# recover the todo state from the most recent todo tool response in history)
if conversation_history and not agent._todo_store.has_items():
agent._hydrate_todo_store(conversation_history)
# Hydrate per-session nudge counters from persisted history.
# Gateway creates a fresh AIAgent per inbound message ...
if conversation_history and agent._user_turn_count == 0:
prior_user_turns = sum(
1 for m in conversation_history if m.get("role") == "user"
)
if prior_user_turns > 0:
agent._user_turn_count = prior_user_turns
if agent._memory_nudge_interval > 0 and agent._turns_since_memory == 0:
agent._turns_since_memory = prior_user_turns % agent._memory_nudge_interval
# Track user turns for memory flush and periodic nudge logic
agent._user_turn_count += 1
# Preserve the original user message (no nudge injection).
original_user_message = persist_user_message if persist_user_message is not None else user_message
# Add user message
user_msg = {"role": "user", "content": user_message}
messages.append(user_msg)
current_turn_user_idx = len(messages) - 1
agent._persist_user_message_idx = current_turn_user_idx
if not agent.quiet_mode:
_print_preview = _summarize_user_message_for_log(user_message)
agent._safe_print(f"💬 Starting conversation: '{_print_preview[:60]}{'...' if len(_print_preview) > 60 else ''}'")
# ── System prompt (cached per session for prefix caching) ──
这解释了为什么 Hermes 能支持 gateway 的“冷启动恢复”:即使平台层为每条消息新建 AIAgent,它也能从持久化历史中恢复 todo/memory cadence 等隐式状态。
2. while loop:预算、打断、上下文构造、模型调用
主循环受两个条件约束:max_iterations 和 iteration_budget。此外还有 _budget_grace_call,允许预算耗尽后给模型一次收尾机会。
while (api_call_count < agent.max_iterations and agent.iteration_budget.remaining > 0) or agent._budget_grace_call:
# Reset per-turn checkpoint dedup so each iteration can take one snapshot
agent._checkpoint_mgr.new_turn()
# Check for interrupt request (e.g., user sent new message)
if agent._interrupt_requested:
interrupted = True
_turn_exit_reason = "interrupted_by_user"
if not agent.quiet_mode:
agent._safe_print("\n⚡ Breaking out of tool loop due to interrupt...")
break
api_call_count += 1
agent._api_call_count = api_call_count
agent._touch_activity(f"starting API call #{api_call_count}")
# Grace call: the budget is exhausted but we gave the model one
# more chance. Consume the grace flag so the loop exits after
# this iteration regardless of outcome.
if agent._budget_grace_call:
agent._budget_grace_call = False
elif not agent.iteration_budget.consume():
_turn_exit_reason = "budget_exhausted"
if not agent.quiet_mode:
agent._safe_print(f"\n⚠️ Iteration budget exhausted ({agent.iteration_budget.used}/{agent.iteration_budget.max_total} iterations used)")
break
这段逻辑体现了一个很实用的 Agent 控制原则:工具循环必须有明确停止条件。否则,模型可能在“调用工具 → 看到结果 → 再调用工具”的路径里无界循环。
模型调用前,Hermes 会把运行时注入上下文放进当前用户消息,而不是写入系统提示词。
api_messages = []
for idx, msg in enumerate(messages):
api_msg = msg.copy()
# Inject ephemeral context into the current turn's user message.
# Sources: memory manager prefetch + plugin pre_llm_call hooks
# with target="user_message" (the default). Both are
# API-call-time only — the original message in `messages` is
# never mutated, so nothing leaks into session persistence.
if idx == current_turn_user_idx and msg.get("role") == "user":
_injections = []
if _ext_prefetch_cache:
_fenced = build_memory_context_block(_ext_prefetch_cache)
if _fenced:
_injections.append(_fenced)
if _plugin_user_context:
_injections.append(_plugin_user_context)
if _injections:
_base = api_msg.get("content", "")
if isinstance(_base, str):
api_msg["content"] = _base + "\n\n" + "\n\n".join(_injections)
这个细节很关键:
- 系统提示词保持稳定,有利于 prompt cache。
- memory/plugin context 是 ephemeral 的,不污染 SQLite transcript。
- 当前 turn 的模型可见上下文和长期持久化历史被刻意分离。
3. 模型返回工具调用后,继续循环;否则结束
当模型返回 tool_calls,Hermes 会先把 assistant message 追加到 messages,然后执行工具,再把工具结果加入上下文,进入下一轮模型调用。
agent._post_tool_empty_retried = False
messages.append(assistant_msg)
agent._emit_interim_assistant_message(assistant_msg)
# Close any open streaming display ... before tool execution begins.
if agent.stream_delta_callback:
try:
agent.stream_delta_callback(None)
except Exception:
pass
agent._execute_tool_calls(assistant_message, messages, effective_task_id, api_call_count)
if agent._tool_guardrail_halt_decision is not None:
decision = agent._tool_guardrail_halt_decision
_turn_exit_reason = "guardrail_halt"
final_response = agent._toolguard_controlled_halt_response(decision)
agent._emit_status(
f"⚠️ Tool guardrail halted {decision.tool_name}: {decision.code}"
)
messages.append({"role": "assistant", "content": final_response})
break
# Refund the iteration if the ONLY tool(s) called were
# execute_code (programmatic tool calling).
_tc_names = {tc.function.name for tc in assistant_message.tool_calls}
if _tc_names == {"execute_code"}:
agent.iteration_budget.refund()
# Use real token counts from the API response to decide
# compression.
_compressor = agent.context_compressor
if _compressor.last_prompt_tokens > 0:
_real_tokens = _compressor.last_prompt_tokens
else:
_real_tokens = estimate_request_tokens_rough(
messages, tools=agent.tools or None
)
if agent.compression_enabled and _compressor.should_compress(_real_tokens):
agent._safe_print(" ⟳ compacting context…")
messages, active_system_prompt = agent._compress_context(
messages, system_message,
approx_tokens=agent.context_compressor.last_prompt_tokens,
task_id=effective_task_id,
)
# Compression created a new session — clear history so
# _flush_messages_to_session_db writes compressed messages
# to the new session.
conversation_history = None
# Save session log incrementally (so progress is visible even if interrupted)
agent._session_messages = messages
# Continue loop for next response
continue
这就是 Agent loop 的核心状态转移:
- 模型产生工具意图。
- runtime 执行工具。
- 工具结果作为
toolmessage 回填。 - 模型基于工具结果继续推理。
- 直到没有工具调用,或被预算/中断/guardrail 截停。
五、模型 Transport:多 Provider 被归一成一种响应形态
Hermes 支持 OpenAI-compatible、Anthropic Messages、Bedrock Converse、Codex Responses 等模式。主循环不直接关心这些协议差异,而是通过 agent/chat_completion_helpers.py 构造参数并执行可中断请求。
def interruptible_api_call(agent, api_kwargs: dict):
"""
Run the API call in a background thread so the main conversation loop
can detect interrupts without waiting for the full HTTP round-trip.
Each worker thread gets its own OpenAI client instance. Interrupts only
close that worker-local client, so retries and other requests never
inherit a closed transport.
"""
result = {"response": None, "error": None}
request_client_holder = {"client": None}
request_client_lock = threading.Lock()
def _call():
try:
if agent.api_mode == "codex_responses":
request_client = _set_request_client(
agent._create_request_openai_client(
reason="codex_stream_request",
api_kwargs=api_kwargs,
)
)
result["response"] = agent._run_codex_stream(
api_kwargs,
client=request_client,
on_first_delta=getattr(agent, "_codex_on_first_delta", None),
)
elif agent.api_mode == "anthropic_messages":
result["response"] = agent._anthropic_messages_create(api_kwargs)
elif agent.api_mode == "bedrock_converse":
# Bedrock uses boto3 directly — no OpenAI client needed.
# normalize_converse_response produces an OpenAI-compatible
# SimpleNamespace so the rest of the agent loop can treat
# bedrock responses like chat_completions responses.
from agent.bedrock_adapter import (
_get_bedrock_runtime_client,
invalidate_runtime_client,
is_stale_connection_error,
normalize_converse_response,
)
region = api_kwargs.pop("__bedrock_region__", "us-east-1")
api_kwargs.pop("__bedrock_converse__", None)
client = _get_bedrock_runtime_client(region)
try:
raw_response = client.converse(**api_kwargs)
except Exception as _bedrock_exc:
if is_stale_connection_error(_bedrock_exc):
invalidate_runtime_client(region)
raise
result["response"] = normalize_converse_response(raw_response)
else:
request_client = _set_request_client(
agent._create_request_openai_client(
reason="chat_completion_request",
api_kwargs=api_kwargs,
)
)
result["response"] = request_client.chat.completions.create(**api_kwargs)
except Exception as e:
result["error"] = e
这段代码的设计点有三个:
- 每次请求独立 client:中断时只关闭当前请求的连接,不污染后续 retry。
- 后台线程执行:主循环可以轮询中断和 stale timeout。
- 协议归一:Bedrock/Anthropic/Codex 最终都被适配成主循环能处理的 assistant message / usage / tool_calls 形态。
因此,Hermes 的 provider 支持不是在主循环里塞满 if/else,而是在 transport 层做协议变换。
六、工具系统:registry 负责注册,executor 负责调度
Hermes 的工具系统可以拆成三步:
- 工具模块 import 时注册到
tools.registry.registry。 model_tools.get_tool_definitions()根据 toolsets 生成模型可见 schema。- 模型返回 tool call 后,由
agent.tool_executor决定并发或顺序执行。
1. 注册:工具以 ToolEntry 进入 registry
def register(
self,
name: str,
toolset: str,
schema: dict,
handler: Callable,
check_fn: Callable = None,
requires_env: list = None,
is_async: bool = False,
description: str = "",
emoji: str = "",
max_result_size_chars: int | float | None = None,
dynamic_schema_overrides: Callable = None,
override: bool = False,
):
"""Register a tool. Called at module-import time by each tool file.
``override=True`` is an explicit opt-in for plugins that intend to
replace an existing built-in tool implementation ...
"""
with self._lock:
existing = self._tools.get(name)
if existing and existing.toolset != toolset:
# Allow MCP-to-MCP overwrites ...
both_mcp = (
existing.toolset.startswith("mcp-")
and toolset.startswith("mcp-")
)
if both_mcp:
logger.debug(
"Tool '%s': MCP toolset '%s' overwriting MCP toolset '%s'",
name, toolset, existing.toolset,
)
elif override:
logger.info(
"Tool '%s': toolset '%s' overriding existing toolset '%s' "
"(override=True opt-in)",
name, existing.toolset,
)
else:
logger.error(
"Tool registration REJECTED: '%s' (toolset '%s') would "
"shadow existing tool from toolset '%s'. Pass "
"override=True to register() if the replacement is "
"intentional, or deregister the existing tool first.",
name, toolset, existing.toolset,
)
return
self._tools[name] = ToolEntry(
name=name,
toolset=toolset,
schema=schema,
handler=handler,
check_fn=check_fn,
requires_env=requires_env or [],
is_async=is_async,
description=description or schema.get("description", ""),
emoji=emoji,
max_result_size_chars=max_result_size_chars,
dynamic_schema_overrides=dynamic_schema_overrides,
)
if check_fn and toolset not in self._toolset_checks:
self._toolset_checks[toolset] = check_fn
self._generation += 1
这里有两个很工程化的保护:
- 非 MCP 工具默认禁止意外覆盖,除非显式
override=True。 - registry 有
_generation计数,供 schema cache 失效使用。
2. 曝光:get_tool_definitions() 把 registry 转成模型 schema
def get_tool_definitions(
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None,
quiet_mode: bool = False,
) -> List[Dict[str, Any]]:
"""
Get tool definitions for model API calls with toolset-based filtering.
All tools must be part of a toolset to be accessible.
"""
# Fast path: memoized result when the caller doesn't need stdout prints.
# The cache key captures every argument-level input; the registry
# generation captures registry mutations (MCP refresh, plugin load).
if quiet_mode:
try:
from hermes_cli.config import get_config_path
cfg_path = get_config_path()
cfg_stat = cfg_path.stat()
cfg_fp = (cfg_stat.st_mtime_ns, cfg_stat.st_size)
except (FileNotFoundError, OSError, ImportError):
cfg_fp = None
cache_key = (
frozenset(enabled_toolsets) if enabled_toolsets is not None else None,
frozenset(disabled_toolsets) if disabled_toolsets else None,
registry._generation,
cfg_fp,
bool(os.environ.get("HERMES_KANBAN_TASK")),
)
cached = _tool_defs_cache.get(cache_key)
if cached is not None:
global _last_resolved_tool_names
_last_resolved_tool_names = [t["function"]["name"] for t in cached]
return list(cached)
result = _compute_tool_definitions(enabled_toolsets, disabled_toolsets, quiet_mode)
if quiet_mode:
_tool_defs_cache[cache_key] = result
return list(result)
return result
这个 cache 的 key 很有意思:除了 enabled/disabled toolsets,还包括 registry generation、配置文件 mtime/size、HERMES_KANBAN_TASK。这说明工具 schema 不只是静态函数签名,还会受动态配置和任务类型影响。
3. 调度:agent-level tools 和普通 registry tools 分流
不是所有工具都直接进 registry dispatch。todo、memory、session_search、delegate_task 这类工具需要访问当前 agent 的内存状态,所以在 agent loop 内拦截。
def invoke_tool(agent, function_name: str, function_args: dict, effective_task_id: str,
tool_call_id: Optional[str] = None, messages: list = None,
pre_tool_block_checked: bool = False) -> str:
"""Invoke a single tool and return the result string. No display logic.
Handles both agent-level tools (todo, memory, etc.) and registry-dispatched
tools. Used by the concurrent execution path; the sequential path retains
its own inline invocation for backward-compatible display handling.
"""
# Check plugin hooks for a block directive before executing anything.
block_message: Optional[str] = None
if not pre_tool_block_checked:
try:
from hermes_cli.plugins import get_pre_tool_call_block_message
block_message = get_pre_tool_call_block_message(
function_name, function_args, task_id=effective_task_id or "",
)
except Exception:
pass
if block_message is not None:
return json.dumps({"error": block_message}, ensure_ascii=False)
if function_name == "todo":
from tools.todo_tool import todo_tool as _todo_tool
return _todo_tool(
todos=function_args.get("todos"),
merge=function_args.get("merge", False),
store=agent._todo_store,
)
elif function_name == "session_search":
session_db = agent._get_session_db_for_recall()
if not session_db:
from hermes_state import format_session_db_unavailable
return json.dumps({"success": False, "error": format_session_db_unavailable()})
from tools.session_search_tool import session_search as _session_search
return _session_search(
query=function_args.get("query", ""),
role_filter=function_args.get("role_filter"),
limit=function_args.get("limit", 3),
session_id=function_args.get("session_id"),
around_message_id=function_args.get("around_message_id"),
window=function_args.get("window", 5),
sort=function_args.get("sort"),
db=session_db,
current_session_id=agent.session_id,
)
elif function_name == "memory":
target = function_args.get("target", "memory")
from tools.memory_tool import memory_tool as _memory_tool
result = _memory_tool(
action=function_args.get("action"),
target=target,
content=function_args.get("content"),
old_text=function_args.get("old_text"),
store=agent._memory_store,
)
# Bridge: notify external memory provider of built-in memory writes
if agent._memory_manager and function_args.get("action") in {"add", "replace"}:
try:
agent._memory_manager.on_memory_write(
function_args.get("action", ""),
target,
function_args.get("content", ""),
metadata=agent._build_memory_write_metadata(
task_id=effective_task_id,
tool_call_id=tool_call_id,
),
)
except Exception:
pass
return result
elif agent._memory_manager and agent._memory_manager.has_tool(function_name):
return agent._memory_manager.handle_tool_call(function_name, function_args)
elif function_name == "clarify":
from tools.clarify_tool import clarify_tool as _clarify_tool
return _clarify_tool(
question=function_args.get("question", ""),
choices=function_args.get("choices"),
callback=agent.clarify_callback,
)
elif function_name == "delegate_task":
return agent._dispatch_delegate_task(function_args)
else:
return _ra().handle_function_call(
function_name, function_args, effective_task_id,
tool_call_id=tool_call_id,
session_id=agent.session_id or "",
enabled_tools=list(agent.valid_tool_names) if agent.valid_tool_names else None,
skip_pre_tool_call_hook=True,
)
普通工具最终进入 model_tools.handle_function_call(),再进入 registry。
def handle_function_call(
function_name: str,
function_args: Dict[str, Any],
task_id: Optional[str] = None,
tool_call_id: Optional[str] = None,
session_id: Optional[str] = None,
user_task: Optional[str] = None,
enabled_tools: Optional[List[str]] = None,
skip_pre_tool_call_hook: bool = False,
) -> str:
"""
Main function call dispatcher that routes calls to the tool registry.
"""
# Coerce string arguments to their schema-declared types (e.g. "42"→42)
function_args = coerce_tool_args(function_name, function_args)
try:
if function_name in _AGENT_LOOP_TOOLS:
return json.dumps({"error": f"{function_name} must be handled by the agent loop"})
# Check plugin hooks for a block directive ...
if not skip_pre_tool_call_hook:
block_message: Optional[str] = None
try:
from hermes_cli.plugins import get_pre_tool_call_block_message
block_message = get_pre_tool_call_block_message(
function_name,
function_args,
task_id=task_id or "",
session_id=session_id or "",
tool_call_id=tool_call_id or "",
)
except Exception as _hook_err:
logger.debug("pre_tool_call hook error: %s", _hook_err)
if block_message is not None:
return json.dumps({"error": block_message}, ensure_ascii=False)
# ACP/Zed edit approval runs before any file mutation.
try:
from acp_adapter.edit_approval import maybe_require_edit_approval
edit_block_message = maybe_require_edit_approval(function_name, function_args)
if edit_block_message is not None:
return edit_block_message
except Exception as _edit_approval_err:
logger.debug("ACP edit approval guard error: %s", _edit_approval_err)
if function_name in {"write_file", "patch"}:
return json.dumps({"error": "Edit approval denied: approval guard failed"}, ensure_ascii=False)
_dispatch_start = time.monotonic()
if function_name == "execute_code":
sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names
result = registry.dispatch(
function_name, function_args,
task_id=task_id,
enabled_tools=sandbox_enabled,
)
else:
result = registry.dispatch(
function_name, function_args,
task_id=task_id,
user_task=user_task,
)
duration_ms = int((time.monotonic() - _dispatch_start) * 1000)
4. 并发算法:只并发安全工具,文件工具做路径冲突检测
Hermes 不盲目并发所有 tool calls。agent/tool_dispatch_helpers.py 用白名单和路径冲突判断决定是否可以走并发执行。
# Tools that must never run concurrently (interactive / user-facing).
# When any of these appear in a batch, we fall back to sequential execution.
_NEVER_PARALLEL_TOOLS = frozenset({"clarify"})
# Read-only tools with no shared mutable session state.
_PARALLEL_SAFE_TOOLS = frozenset({
"ha_get_state",
"ha_list_entities",
"ha_list_services",
"read_file",
"search_files",
"session_search",
"skill_view",
"skills_list",
"vision_analyze",
"web_extract",
"web_search",
})
# File tools can run concurrently when they target independent paths.
_PATH_SCOPED_TOOLS = frozenset({"read_file", "write_file", "patch"})
def _should_parallelize_tool_batch(tool_calls) -> bool:
"""Return True when a tool-call batch is safe to run concurrently."""
if len(tool_calls) <= 1:
return False
tool_names = [tc.function.name for tc in tool_calls]
if any(name in _NEVER_PARALLEL_TOOLS for name in tool_names):
return False
reserved_paths: list[Path] = []
for tool_call in tool_calls:
tool_name = tool_call.function.name
try:
function_args = json.loads(tool_call.function.arguments)
except Exception:
logging.debug(
"Could not parse args for %s — defaulting to sequential; raw=%s",
tool_name,
tool_call.function.arguments[:200],
)
return False
if not isinstance(function_args, dict):
logging.debug(
"Non-dict args for %s (%s) — defaulting to sequential",
tool_name,
type(function_args).__name__,
)
return False
if tool_name in _PATH_SCOPED_TOOLS:
scoped_path = _extract_parallel_scope_path(tool_name, function_args)
if scoped_path is None:
return False
if any(_paths_overlap(scoped_path, existing) for existing in reserved_paths):
return False
reserved_paths.append(scoped_path)
continue
if tool_name not in _PARALLEL_SAFE_TOOLS:
# Check if it's an MCP tool from a server that opted into parallel calls.
if not _is_mcp_tool_parallel_safe(tool_name):
return False
return True
并发执行器会保持结果顺序,避免 API 看到乱序 tool result。
def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Execute multiple tool calls concurrently using a thread pool.
Results are collected in the original tool-call order and appended to
messages so the API sees them in the expected sequence.
"""
tool_calls = assistant_message.tool_calls
num_tools = len(tool_calls)
# ── Pre-flight: interrupt check ──────────────────────────────────
if agent._interrupt_requested:
print(f"{agent.log_prefix}⚡ Interrupt: skipping {num_tools} tool call(s)")
for tc in tool_calls:
messages.append(make_tool_result_message(
tc.function.name,
f"[Tool execution cancelled — {tc.function.name} was skipped due to user interrupt]",
tc.id,
))
return
这个策略的底层原则是:并发不是性能优化而已,它必须保持 conversation protocol 的合法性。工具可以并发跑,但回填给模型时仍然要按原始 tool_call 顺序追加。
七、状态持久化:state.db 是 canonical store
Hermes 的 live session 并不依赖 JSON 文件。cli.py 的 /save 可以导出 JSON snapshot,但真正可恢复、可搜索、可跨平台续写的状态源是 ~/.hermes/state.db。
hermes_state.py 的 schema 说明了核心模型:sessions 保存会话元数据,messages 保存完整消息流。
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
user_id TEXT,
model TEXT,
model_config TEXT,
system_prompt TEXT,
parent_session_id TEXT,
started_at REAL NOT NULL,
ended_at REAL,
end_reason TEXT,
message_count INTEGER DEFAULT 0,
tool_call_count INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
cache_write_tokens INTEGER DEFAULT 0,
reasoning_tokens INTEGER DEFAULT 0,
billing_provider TEXT,
billing_base_url TEXT,
billing_mode TEXT,
estimated_cost_usd REAL,
actual_cost_usd REAL,
cost_status TEXT,
cost_source TEXT,
pricing_version TEXT,
title TEXT,
api_call_count INTEGER DEFAULT 0,
handoff_state TEXT,
handoff_platform TEXT,
handoff_error TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL REFERENCES sessions(id),
role TEXT NOT NULL,
content TEXT,
tool_call_id TEXT,
tool_calls TEXT,
tool_name TEXT,
timestamp REAL NOT NULL,
token_count INTEGER,
finish_reason TEXT,
reasoning TEXT,
reasoning_content TEXT,
reasoning_details TEXT,
codex_reasoning_items TEXT,
codex_message_items TEXT,
platform_message_id TEXT
);
CREATE TABLE IF NOT EXISTS state_meta (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
"""
消息写入时,结构化字段会先 JSON 序列化,再在同一事务里更新 session 计数。
def append_message(
self,
session_id: str,
role: str,
content: str = None,
tool_name: str = None,
tool_calls: Any = None,
tool_call_id: str = None,
token_count: int = None,
finish_reason: str = None,
reasoning: str = None,
reasoning_content: str = None,
reasoning_details: Any = None,
codex_reasoning_items: Any = None,
codex_message_items: Any = None,
platform_message_id: str = None,
) -> int:
"""
Append a message to a session. Returns the message row ID.
Also increments the session's message_count (and tool_call_count
if role is 'tool' or tool_calls is present).
"""
# Serialize structured fields to JSON before entering the write txn
reasoning_details_json = (
json.dumps(reasoning_details)
if reasoning_details else None
)
codex_items_json = (
json.dumps(codex_reasoning_items)
if codex_reasoning_items else None
)
codex_message_items_json = (
json.dumps(codex_message_items)
if codex_message_items else None
)
tool_calls_json = json.dumps(tool_calls) if tool_calls else None
# Multimodal content (list of parts) must be JSON-encoded: sqlite3
# cannot bind list/dict parameters directly.
stored_content = self._encode_content(content)
# Pre-compute tool call count
num_tool_calls = 0
if tool_calls is not None:
num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1
def _do(conn):
cursor = conn.execute(
"""INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count, finish_reason,
reasoning, reasoning_content, reasoning_details, codex_reasoning_items,
codex_message_items, platform_message_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
role,
stored_content,
tool_call_id,
tool_calls_json,
tool_name,
time.time(),
token_count,
finish_reason,
reasoning,
reasoning_content,
reasoning_details_json,
codex_items_json,
codex_message_items_json,
platform_message_id,
),
)
msg_id = cursor.lastrowid
# Update counters
if num_tool_calls > 0:
conn.execute(
"""UPDATE sessions SET message_count = message_count + 1,
tool_call_count = tool_call_count + ? WHERE id = ?""",
(num_tool_calls, session_id),
)
else:
conn.execute(
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
(session_id,),
)
return msg_id
return self._execute_write(_do)
这里的 schema 选择很务实:它不是把消息简单塞成一个 JSON blob,而是拆出 role、tool_call_id、tool_calls、finish_reason、reasoning、platform_message_id 等字段,方便:
- 恢复 OpenAI-style conversation。
- 跨平台消息去重或 redaction。
- session_search 做 FTS 检索。
- 压缩 session 形成父子链。
- dashboard/TUI 展示 token、cost、tool call count。
压缩后的 resume 为什么要走 lineage
上下文压缩会结束当前 session 并创建 child session。某些情况下,父 session 可能没有实际 messages;因此恢复时需要沿 parent_session_id 找到真正有消息的后代。
def resolve_resume_session_id(self, session_id: str) -> str:
"""Redirect a resume target to the descendant session that holds the messages.
Context compression ends the current session and forks a new child session
(linked via ``parent_session_id``). The flush cursor is reset, so the
child is where new messages actually land — the parent ends up with
``message_count = 0`` rows unless messages had already been flushed to
it before compression.
"""
if not session_id:
return session_id
with self._lock:
# If this session already has messages, nothing to redirect.
try:
row = self._conn.execute(
"SELECT 1 FROM messages WHERE session_id = ? LIMIT 1",
(session_id,),
).fetchone()
except Exception:
return session_id
if row is not None:
return session_id
# Walk descendants: at each step, pick the most-recently-started
# child session; stop once we find one with messages.
current = session_id
seen = {current}
for _ in range(32):
try:
child_row = self._conn.execute(
"SELECT id FROM sessions "
"WHERE parent_session_id = ? "
"ORDER BY started_at DESC, id DESC LIMIT 1",
(current,),
).fetchone()
except Exception:
return session_id
if child_row is None:
return session_id
child_id = child_row["id"] if hasattr(child_row, "keys") else child_row[0]
if not child_id or child_id in seen:
return session_id
seen.add(child_id)
try:
msg_row = self._conn.execute(
"SELECT 1 FROM messages WHERE session_id = ? LIMIT 1",
(child_id,),
).fetchone()
except Exception:
return session_id
if msg_row is not None:
return child_id
current = child_id
return session_id
这说明 Hermes 把“压缩”当作一次会话边界切分,而不是简单修改当前 messages。好处是压缩前后的状态可以被审计、搜索和恢复。
八、上下文压缩:不是截断,而是可恢复的 session split
Hermes 的上下文治理由 ContextEngine 抽象定义,默认实现是 ContextCompressor。
class ContextEngine(ABC):
"""Base class all context engines must implement."""
# -- Token state (read by run_agent.py for display/logging) ------------
# Engines MUST maintain these. run_agent.py reads them directly.
last_prompt_tokens: int = 0
last_completion_tokens: int = 0
last_total_tokens: int = 0
threshold_tokens: int = 0
context_length: int = 0
compression_count: int = 0
@abstractmethod
def update_from_response(self, usage: Dict[str, Any]) -> None:
"""Update tracked token usage from an API response.
Called after every LLM call with the usage dict from the response.
"""
@abstractmethod
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Return True if compaction should fire this turn."""
@abstractmethod
def compress(
self,
messages: List[Dict[str, Any]],
current_tokens: int = None,
focus_topic: str = None,
) -> List[Dict[str, Any]]:
"""Compact the message list and return the new message list.
This is the main entry point. The engine receives the full message
list and returns a (possibly shorter) list that fits within the
context budget. The implementation is free to summarize, build a
DAG, or do anything else — as long as the returned list is a valid
OpenAI-format message sequence.
"""
默认压缩器的算法写在类注释里,已经非常清楚:
class ContextCompressor(ContextEngine):
"""Default context engine — compresses conversation context via lossy summarization.
Algorithm:
1. Prune old tool results (cheap, no LLM call)
2. Protect head messages (system prompt + first exchange)
3. Protect tail messages by token budget (most recent ~20K tokens)
4. Summarize middle turns with structured LLM prompt
5. On subsequent compactions, iteratively update the previous summary
"""
它的触发条件不是“消息数量”,而是 token threshold,并带 anti-thrashing 保护:如果连续压缩收益太低,就避免无限压缩循环。
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Check if context exceeds the compression threshold.
Includes anti-thrashing protection: if the last two compressions
each saved less than 10%, skip compression to avoid infinite loops
where each pass removes only 1-2 messages.
"""
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
if tokens < self.threshold_tokens:
return False
# Anti-thrashing: back off if recent compressions were ineffective
if self._ineffective_compression_count >= 2:
if not self.quiet_mode:
logger.warning(
"Compression skipped — last %d compressions saved <10%% each. "
"Consider /new to start a fresh session, or /compress <topic> "
"for focused compression.",
self._ineffective_compression_count,
)
return False
return True
压缩摘要本身不是一句“前文总结”,而是一个结构化 checkpoint,包含 Active Task、Goal、Constraints、Completed Actions、Active State、Blocked、Key Decisions、Relevant Files、Remaining Work 等字段。
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]], focus_topic: str = None) -> Optional[str]:
"""Generate a structured summary of conversation turns.
Uses a structured template (Goal, Progress, Decisions, Resolved/Pending
Questions, Files, Remaining Work) with explicit preamble telling the
summarizer not to answer questions. When a previous summary exists,
generates an iterative update instead of summarizing from scratch.
"""
now = time.monotonic()
if now < self._summary_failure_cooldown_until:
logger.debug(
"Skipping context summary during cooldown (%.0fs remaining)",
self._summary_failure_cooldown_until - now,
)
return None
summary_budget = self._compute_summary_budget(turns_to_summarize)
content_to_summarize = self._serialize_for_summary(turns_to_summarize)
# Preamble shared by both first-compaction and iterative-update prompts.
_summarizer_preamble = (
"You are a summarization agent creating a context checkpoint. "
"Treat the conversation turns below as source material for a "
"compact record of prior work. "
"Produce only the structured summary; do not add a greeting, "
"preamble, or prefix. "
"Write the summary in the same language the user was using in the "
"conversation — do not translate or switch to English. "
"NEVER include API keys, tokens, passwords, secrets, credentials, "
"or connection strings in the summary — replace any that appear "
"with [REDACTED]. Note that the user had credentials present, but "
"do not preserve their values."
)
# Shared structured template (used by both paths).
_template_sections = f"""## Active Task
[THE SINGLE MOST IMPORTANT FIELD. Copy the user's most recent request or
task assignment verbatim — the exact words they used. If multiple tasks
were requested and only some are done, list only the ones NOT yet completed.
## Goal
[What the user is trying to accomplish overall]
## Constraints & Preferences
[User preferences, coding style, constraints, important decisions]
## Completed Actions
[Numbered list of concrete actions taken — include tool used, target, and outcome.
Format each as: N. ACTION target — outcome [tool: name]
## Active State
[Current working state — include:
- Working directory and branch (if applicable)
- Modified/created files with brief note on each
- Test status (X/Y passing)
- Any running processes or servers
- Environment details that matter]
## In Progress
[Work currently underway — what was being done when compaction fired]
## Blocked
[Any blockers, errors, or issues not yet resolved. Include exact error messages.]
## Key Decisions
[Important technical decisions and WHY they were made]
## Resolved Questions
[Questions the user asked that were ALREADY answered — include the answer so it is not repeated]
## Pending User Asks
[Questions or requests from the user that have NOT yet been answered or fulfilled. If none, write "None."]
## Relevant Files
[Files read, modified, or created — with brief note on each]
## Remaining Work
[What remains to be done — framed as context, not instructions]
## Critical Context
[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation. NEVER include API keys, tokens, passwords, or credentials — write [REDACTED] instead.]
Target ~{summary_budget} tokens. Be CONCRETE — include file paths, command outputs, error messages, line numbers, and specific values. Avoid vague descriptions like "made some changes" — say exactly what changed.
Write only the summary body. Do not include any preamble or prefix."""
if self._previous_summary:
# Iterative update: preserve existing info, add new progress
prompt = f"""{_summarizer_preamble}
You are updating a context compaction summary. A previous compaction produced the summary below. New conversation turns have occurred since then and need to be incorporated.
PREVIOUS SUMMARY:
{self._previous_summary}
NEW TURNS TO INCORPORATE:
{content_to_summarize}
Update the summary using this exact structure. PRESERVE all existing information that is still relevant. ADD new completed actions to the numbered list (continue numbering). Move items from "In Progress" to "Completed Actions" when done. Move answered questions to "Resolved Questions". Update "Active State" to reflect current state. Remove information only if it is clearly obsolete. CRITICAL: Update "## Active Task" to reflect the user's most recent unfulfilled request — this is the most important field for task continuity.
{_template_sections}"""
else:
# First compaction: summarize from scratch
prompt = f"""{_summarizer_preamble}
Create a structured checkpoint summary for the conversation after earlier turns are compacted. The summary should preserve enough detail for continuity without re-reading the original turns.
压缩驱动函数 compress_context() 则负责把压缩结果和 SQLite session lineage 绑定起来:如果压缩成功,结束旧 session、创建 child session、重建 system prompt、重置 flush cursor。
def compress_context(
agent: Any,
messages: list,
system_message: str,
*,
approx_tokens: Optional[int] = None,
task_id: str = "default",
focus_topic: Optional[str] = None,
force: bool = False,
) -> Tuple[list, str]:
"""Compress conversation context and split the session in SQLite.
Args:
agent: The owning :class:`AIAgent`.
messages: Current message history (will be summarised).
system_message: Current system prompt; rebuilt after compression.
approx_tokens: Pre-compression token estimate, logged for ops.
task_id: Tool task scope (used for clearing file-read dedup state).
focus_topic: Optional focus string for guided compression — the
summariser will prioritise preserving information related to
this topic. Inspired by Claude Code's ``/compact <focus>``.
force: If True, bypass any active summary-failure cooldown.
Returns:
``(compressed_messages, new_system_prompt)`` tuple. When
compression aborts (aux LLM failed to produce a usable summary),
returns the original messages unchanged and the existing system
prompt — the session is NOT rotated.
"""
# Lazy feasibility check — run the auxiliary-provider probe + context
# length lookup just-in-time on the first compression attempt instead of
# at AIAgent.__init__.
if not getattr(agent, "_compression_feasibility_checked", True):
try:
check_compression_model_feasibility(agent)
finally:
agent._compression_feasibility_checked = True
_pre_msg_count = len(messages)
logger.info(
"context compression started: session=%s messages=%d tokens=~%s model=%s focus=%r",
agent.session_id or "none", _pre_msg_count,
f"{approx_tokens:,}" if approx_tokens else "unknown", agent.model,
focus_topic,
)
agent._emit_status(
"🗜️ Compacting context — summarizing earlier conversation so I can continue..."
)
# Notify external memory provider before compression discards context
if agent._memory_manager:
try:
agent._memory_manager.on_pre_compress(messages)
except Exception:
pass
try:
compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens, focus_topic=focus_topic, force=force)
if agent._session_db:
try:
# Propagate title to the new session with auto-numbering
old_title = agent._session_db.get_session_title(agent.session_id)
# Trigger memory extraction on the old session before it rotates.
agent.commit_memory_session(messages)
agent._session_db.end_session(agent.session_id, "compression")
old_session_id = agent.session_id
agent.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
os.environ["HERMES_SESSION_ID"] = agent.session_id
try:
from gateway.session_context import _SESSION_ID
_SESSION_ID.set(agent.session_id)
except Exception:
pass
agent._session_db_created = False
agent._session_db.create_session(
session_id=agent.session_id,
source=agent.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
model=agent.model,
model_config=agent._session_init_model_config,
parent_session_id=old_session_id,
)
agent._session_db_created = True
# Auto-number the title for the continuation session
if old_title:
try:
new_title = agent._session_db.get_next_title_in_lineage(old_title)
agent._session_db.set_session_title(agent.session_id, new_title)
except (ValueError, Exception) as e:
logger.debug("Could not propagate title on compression: %s", e)
agent._session_db.update_system_prompt(agent.session_id, new_system_prompt)
# Reset flush cursor — new session starts with no messages written
agent._last_flushed_db_idx = 0
这套设计比“简单删掉最早消息”复杂,但换来了三个能力:
- 压缩前后的 session 有清晰边界,可恢复、可搜索、可审计。
- 摘要是结构化 checkpoint,适合长任务连续执行。
- 外部 memory/context engine 能感知压缩边界,更新自己的状态。
九、外围接入:ACP、TUI、Gateway 都是 Runtime Adapter
1. ACP:编辑器协议不污染 stdout
ACP session manager 的文件头直接说明:ACP session 会映射到 Hermes AIAgent,并持久化到共享 SessionDB。
"""ACP session manager — maps ACP sessions to Hermes AIAgent instances.
Sessions are persisted to the shared SessionDB (``~/.hermes/state.db``) so they
survive process restarts and appear in ``session_search``. When the editor
reconnects after idle/restart, the ``load_session`` / ``resume_session`` calls
find the persisted session in the database and restore the full conversation
history.
"""
ACP 还有一个非常具体的协议约束:stdout 要留给 JSON-RPC frame,所以普通输出必须走 stderr。
def _acp_stderr_print(*args, **kwargs) -> None:
"""Best-effort human-readable output sink for ACP stdio sessions.
ACP reserves stdout for JSON-RPC frames, so any incidental CLI/status output
from AIAgent must be redirected away from stdout. Route it to stderr instead.
"""
kwargs = dict(kwargs)
kwargs.setdefault("file", sys.stderr)
print(*args, **kwargs)
这类细节说明 ACP 不是“另一个 CLI”,而是协议 adapter:它要把 editor world 的 cwd、approval、stdio 约束转成 Hermes runtime 能理解的上下文。
2. TUI Gateway:session resume 和 manual compress 都在 server 层暴露
TUI gateway 的 session.resume 会查 SessionDB,恢复 history,创建 agent,再初始化 gateway session。
@method("session.resume")
def _(rid, params: dict) -> dict:
target = params.get("session_id", "")
if not target:
return _err(rid, 4006, "session_id required")
db = _get_db()
if db is None:
return _db_unavailable_error(rid, code=5000)
found = db.get_session(target)
if not found:
found = db.get_session_by_title(target)
if found:
target = found["id"]
else:
return _err(rid, 4007, "session not found")
sid = uuid.uuid4().hex[:8]
_enable_gateway_prompts()
try:
db.reopen_session(target)
history = db.get_messages_as_conversation(target)
display_history = db.get_messages_as_conversation(
target, include_ancestors=True
)
messages = _history_to_messages(display_history)
tokens = _set_session_context(target)
try:
agent = _make_agent(sid, target, session_id=target)
finally:
_clear_session_context(tokens)
_init_session(sid, target, agent, history, cols=int(params.get("cols", 80)))
except Exception as e:
return _err(rid, 5000, f"resume failed: {e}")
return _ok(
rid,
{
"session_id": sid,
"resumed": target,
"message_count": len(messages),
"messages": messages,
"info": _session_info(agent),
},
)
session.compress 则暴露了手动压缩能力,支持 focus topic,并在压缩后重新估算 token、同步 session key、向 UI 发 session.info。
@method("session.compress")
def _(rid, params: dict) -> dict:
session, err = _sess(params, rid)
if err:
return err
if session.get("running"):
return _err(
rid, 4009, "session busy — /interrupt the current turn before /compress"
)
sid = params.get("session_id", "")
focus_topic = str(params.get("focus_topic", "") or "").strip()
try:
from agent.manual_compression_feedback import summarize_manual_compression
from agent.model_metadata import estimate_request_tokens_rough
with session["history_lock"]:
before_messages = list(session.get("history", []))
history_version = int(session.get("history_version", 0))
before_count = len(before_messages)
_agent = session["agent"]
_sys_prompt = getattr(_agent, "_cached_system_prompt", "") or ""
_tools = getattr(_agent, "tools", None) or None
before_tokens = (
estimate_request_tokens_rough(
before_messages, system_prompt=_sys_prompt, tools=_tools
)
if before_count
else 0
)
if before_count >= 4:
focus_suffix = f', focus: "{focus_topic}"' if focus_topic else ""
_status_update(
sid,
"compressing",
f"⠋ compressing {before_count} messages "
f"(~{before_tokens:,} tok){focus_suffix}…",
)
try:
removed, usage = _compress_session_history(
session,
focus_topic,
approx_tokens=before_tokens,
before_messages=before_messages,
history_version=history_version,
)
with session["history_lock"]:
messages = list(session.get("history", []))
after_count = len(messages)
# Re-read system prompt + tools after compression — _compress_context
# may have rebuilt the system prompt (_cached_system_prompt=None).
_sys_prompt_after = (
getattr(_agent, "_cached_system_prompt", "") or _sys_prompt
)
_tools_after = getattr(_agent, "tools", None) or _tools
after_tokens = (
estimate_request_tokens_rough(
messages,
system_prompt=_sys_prompt_after,
tools=_tools_after,
)
if after_count
else 0
)
agent = session["agent"]
_sync_session_key_after_compress(sid, session)
summary = summarize_manual_compression(
before_messages, messages, before_tokens, after_tokens
)
info = _session_info(agent)
_emit("session.info", sid, info)
return _ok(
rid,
{
"status": "compressed",
"removed": removed,
"before_messages": before_count,
"after_messages": after_count,
"before_tokens": before_tokens,
"after_tokens": after_tokens,
"summary": summary,
"usage": usage,
"info": info,
"messages": messages,
},
)
finally:
_status_update(sid, "ready")
except Exception as e:
return _err(rid, 5005, str(e))
从这里可以看出,外围模块不是复制 Agent 逻辑,而是围绕 AIAgent + SessionDB 做协议映射、UI 状态同步和平台约束处理。
十、几个核心算法与设计取舍
1. 工具循环预算:防止 agent 自旋
max_iterations 和 IterationBudget 是第一道闸门。Hermes 甚至对 execute_code 做了 refund,因为它被视为低成本 RPC-style 调用。这说明 Hermes 在设计上区分“模型推理 turn”和“程序化工具调用”。
2. 工具并发:性能服从一致性
并发只发生在读工具、独立路径文件工具、显式允许并发的 MCP 工具上。交互工具、未知工具、路径重叠写工具会退回顺序执行。这种策略不追求最大并行度,而追求不破坏消息顺序和文件一致性。
3. Ephemeral context:可见但不持久化
memory prefetch、plugin pre_llm_call context 被注入当前 API 请求,但不写回 messages。这把“模型当下需要知道的东西”和“会话真实历史”分离开,避免 session DB 被临时上下文污染。
4. 压缩不是删历史,而是创建续写链
压缩触发后,Hermes 不只是替换当前内存数组;它会结束旧 session、创建 child session、设置 parent_session_id。这样 resume、session_search、TUI 展示都能理解压缩前后关系。
5. AIAgent 门面保兼容,模块化文件承载演进
run_agent.py 仍然暴露很多方法,但大量方法都是 forwarder。这让外部 API 不破,同时允许内部把 conversation_loop、tool_executor、context_compressor、agent_runtime_helpers 独立演进。
结语:Hermes Agent 的本质是一套“可持久化的工具调用操作系统”
如果只从 README 看,Hermes Agent 的卖点是多模型、多平台、技能、记忆、定时任务、远程运行。但从源码看,它的核心价值在更底层:它把 Agent 执行拆成了一套可组合的 runtime primitives。
- 入口层负责把不同用户界面统一成 session turn。
AIAgentruntime 负责把模型、工具、回调、状态、预算放进同一个执行上下文。- conversation loop 负责模型与工具的闭环状态机。
- tool registry/executor 负责工具 schema、权限、hook、并发和结果回填。
- SessionDB 负责长期连续性,而不是让内存里的
messages成为唯一真相。 - ContextEngine/Compressor 负责在长上下文任务里保留“可继续工作”的状态摘要。
- ACP/TUI/Gateway 则只是不同协议世界通向同一 runtime 的 adapter。
这套架构最值得借鉴的地方不是“功能多”,而是它对 Agent 工程化问题的分层处理:协议、模型、工具、状态、压缩、UI 都有自己的边界;主循环只维持核心状态机。对任何想构建生产级 AI Agent 的团队来说,Hermes Agent 提供了一个很好的源码样本:先把 Agent 当 runtime 设计,再谈具体能力。
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付