232 lines
7.2 KiB
Python
232 lines
7.2 KiB
Python
from __future__ import annotations
|
||
|
||
import logging
|
||
from typing import Any, List, Optional
|
||
|
||
import litellm
|
||
|
||
from config import (
|
||
API_BASE,
|
||
API_KEY,
|
||
MODEL_ID,
|
||
AGENT_ROLES,
|
||
MAX_TOKENS,
|
||
TEMPERATURE,
|
||
MAX_RETRIES,
|
||
RETRY_INITIAL_DELAY,
|
||
RETRY_BACKOFF_FACTOR,
|
||
RETRY_MAX_DELAY,
|
||
)
|
||
from utils.retry import retry
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class Agent:
|
||
"""单个评审角色(Agent)。
|
||
|
||
说明:
|
||
- 每个 Agent 仅负责基于 system prompt + 用户输入生成一次回复。
|
||
- 不在 Agent 内部保存历史(历史由外部管理器控制)。
|
||
- 全部角色统一使用同一个 API Key(符合课程对 Key 管理的简化要求)。
|
||
"""
|
||
|
||
def __init__(self, role_name: str):
|
||
if role_name not in AGENT_ROLES:
|
||
raise ValueError(f"未知角色:{role_name}")
|
||
|
||
self.role_name = role_name
|
||
self.system_prompt = AGENT_ROLES.get(role_name, "")
|
||
self.model = MODEL_ID
|
||
self.api_base = API_BASE
|
||
self.api_key = API_KEY
|
||
|
||
def reset(self):
|
||
"""保留接口:当前版本 Agent 不维护历史。"""
|
||
return
|
||
|
||
@retry(
|
||
max_retries=MAX_RETRIES,
|
||
initial_delay=RETRY_INITIAL_DELAY,
|
||
backoff_factor=RETRY_BACKOFF_FACTOR,
|
||
max_delay=RETRY_MAX_DELAY,
|
||
retry_exceptions=(Exception,),
|
||
)
|
||
def _completion(self, messages: list[dict[str, str]], stream: bool = False) -> Any:
|
||
return litellm.completion(
|
||
model=self.model,
|
||
api_base=self.api_base,
|
||
api_key=self.api_key,
|
||
messages=messages,
|
||
max_tokens=MAX_TOKENS,
|
||
temperature=TEMPERATURE,
|
||
stream=stream,
|
||
)
|
||
|
||
def generate_response(
|
||
self,
|
||
prompt: str,
|
||
stream: bool = False,
|
||
external_history: Optional[List[dict[str, str]]] = None,
|
||
):
|
||
"""生成回答。
|
||
|
||
参数:
|
||
- prompt: 用户输入
|
||
- stream: 是否流式输出
|
||
- external_history: 预留的外部历史(本项目单轮评审默认不使用)
|
||
"""
|
||
|
||
messages: list[dict[str, str]] = [{"role": "system", "content": self.system_prompt}]
|
||
|
||
if external_history:
|
||
messages.extend(external_history)
|
||
|
||
messages.append({"role": "user", "content": prompt})
|
||
|
||
try:
|
||
response = self._completion(messages, stream=stream)
|
||
except Exception as e:
|
||
logger.exception("API 调用失败(已重试仍失败):%s", e)
|
||
if stream:
|
||
raise RuntimeError(f"API 调用失败:{e}") from e
|
||
return f"❌ API 调用失败:{e}"
|
||
|
||
if stream:
|
||
return response
|
||
|
||
try:
|
||
if not hasattr(response, "choices") or not response.choices:
|
||
raise ValueError("响应缺少 choices")
|
||
|
||
first = response.choices[0]
|
||
content = getattr(getattr(first, "message", None), "content", None)
|
||
if content is None:
|
||
content = getattr(first, "text", None)
|
||
|
||
if not content or not str(content).strip():
|
||
raise ValueError("响应内容为空")
|
||
|
||
if len(str(content).strip()) < 10:
|
||
raise ValueError("响应内容过短,疑似生成失败")
|
||
|
||
return str(content)
|
||
except Exception as e:
|
||
logger.exception("API 响应结构异常:%s", e)
|
||
return f"❌ API 响应结构异常:{e}"
|
||
|
||
|
||
class DebateManager:
|
||
"""评审管理器(保留类名以兼容现有 UI 调用)。
|
||
|
||
当前版本约束:
|
||
- 固定单轮评审:每个角色各生成一次“初始评审意见”。
|
||
- 不做多轮互动,不维护上下文摘要(减少运行时间与 API 调用次数)。
|
||
|
||
数据结构说明:
|
||
- debate_history: 仍沿用历史字段名,便于 UI 复用。
|
||
其中第一条为 init 信息,第二条为 round=1 的初始观点。
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.agents: dict[str, Agent] = {}
|
||
self.debate_history: list[dict[str, Any]] = []
|
||
|
||
def add_agent(self, role_name: str) -> None:
|
||
if role_name not in self.agents:
|
||
self.agents[role_name] = Agent(role_name)
|
||
|
||
def remove_agent(self, role_name: str) -> None:
|
||
self.agents.pop(role_name, None)
|
||
|
||
def reset(self) -> None:
|
||
for agent in self.agents.values():
|
||
agent.reset()
|
||
self.debate_history = []
|
||
|
||
def start_debate(self, topic: str, rounds: int = 1):
|
||
"""开始评审(单轮)。
|
||
|
||
参数 rounds 为兼容 UI 保留,但当前固定只执行 1 轮。
|
||
"""
|
||
|
||
self.debate_history.append(
|
||
{
|
||
"round": "init",
|
||
"topic": topic,
|
||
"participants": list(self.agents.keys()),
|
||
}
|
||
)
|
||
|
||
opinions_round_1: dict[str, str] = {}
|
||
for role_name, agent in self.agents.items():
|
||
prompt = (
|
||
f"你是{agent.role_name}。请对以下方案给出初始评审意见。\n"
|
||
"要求:\n"
|
||
"- 用 5-8 条要点(列表)输出\n"
|
||
"- 每条尽量具体,避免空话\n"
|
||
"- 总字数尽量控制在 250~400 字\n\n"
|
||
f"方案内容:\n{topic}"
|
||
)
|
||
response = agent.generate_response(prompt, external_history=None)
|
||
opinions_round_1[role_name] = response
|
||
|
||
self.debate_history.append(
|
||
{
|
||
"round": 1,
|
||
"type": "initial_opinions",
|
||
"opinions": opinions_round_1,
|
||
}
|
||
)
|
||
|
||
return self.debate_history
|
||
|
||
def generate_decision_points(self) -> str:
|
||
"""从评审历史生成“可执行的决策摘要”。"""
|
||
|
||
if not self.debate_history:
|
||
return ""
|
||
|
||
review_context = "\n".join(
|
||
[
|
||
f"Round {round_data['round']} ({round_data.get('type', 'review')}):\n"
|
||
+ "\n".join(
|
||
[
|
||
f"{role}: {opinion}"
|
||
for role, opinion in round_data.get("opinions", {}).items()
|
||
]
|
||
)
|
||
for round_data in self.debate_history[1:]
|
||
]
|
||
)
|
||
|
||
template = """你是一个严格的方案评审秘书,请把评审内容整理成一份“可执行的决策摘要”。
|
||
|
||
【输出格式要求(必须严格遵守 Markdown 标题):】
|
||
|
||
# 一句话结论
|
||
用一句话给出结论(推荐/谨慎推进/不建议),并说明原因。
|
||
|
||
# 关键决策要点(Top 5)
|
||
用 1-5 编号,每条包含:问题 → 各方分歧/共识 → 建议决策。
|
||
|
||
# 主要风险(Top 3)
|
||
用 1-3 编号,每条包含:风险描述 → 影响 → 缓解方案。
|
||
|
||
# 下一步行动清单
|
||
用勾选列表(- [ ])列出 4-8 条下一步动作,尽量可执行、可验证。
|
||
|
||
# 需要进一步澄清的问题
|
||
列出 3-6 个必须向需求方追问的问题。
|
||
|
||
【要求】
|
||
- 文字精炼,避免空话;每条尽量具体。
|
||
- 不要复述全部评审,只提炼结论。
|
||
- 输出必须适合普通人快速阅读。
|
||
"""
|
||
|
||
prompt = f"{template}\n\n【评审内容】\n{review_context}"
|
||
|
||
decision_agent = Agent("business_analyst")
|
||
return decision_agent.generate_response(prompt)
|