BULAK/app.py

887 lines
41 KiB
Python
Raw Permalink Normal View History

"""
AI经理人为你把关 - 主应用
基于DeepSeek API的多角色AI代理决策辅助系统
优化版本同步处理 + 智能缓存
"""
import streamlit as st
import os
from openai import OpenAI
from dotenv import load_dotenv
from typing import List, Dict, Any
from datetime import datetime
import time
import hashlib
# 禁用Streamlit的邮箱验证和统计收集
st.set_page_config(
page_title="多Agent决策工作坊",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)
# 禁用邮箱验证和统计收集
st.markdown("""
<style>
/* 隐藏邮箱验证相关元素 */
.stDeployButton {
display: none;
}
#MainMenu {
visibility: hidden;
}
footer {
visibility: hidden;
}
header {
visibility: hidden;
}
</style>
""", unsafe_allow_html=True)
# 加载环境变量 - 确保从当前目录加载
env_path = os.path.join(os.path.dirname(__file__), '.env')
if os.path.exists(env_path):
load_dotenv(env_path)
else:
# 如果.env文件不存在尝试从当前工作目录加载
load_dotenv()
class AIManagerDecisionAssistant:
def __init__(self):
"""初始化多代理决策工作坊"""
api_key = os.getenv("DEEPSEEK_API_KEY")
if not api_key:
raise ValueError("DEEPSEEK_API_KEY环境变量未设置")
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com"
)
# 定义4个专业角色代理
self.agents = {
"技术专家": {
"role": "system",
"content": "你是一位资深技术专家,擅长从技术可行性、实现难度、技术风险等角度分析问题。你的观点务实、注重细节。"
},
"战略顾问": {
"role": "system",
"content": "你是一位经验丰富的战略顾问,擅长从战略规划、竞争优势、长期发展等角度分析问题。你的观点注重战略价值。"
},
"用户体验师": {
"role": "system",
"content": "你是一位专业的用户体验设计师,擅长从用户需求、易用性、用户体验等角度分析问题。你的观点以用户为中心。"
},
"风险顾问": {
"role": "system",
"content": "你是一位谨慎的风险顾问,擅长识别潜在风险、评估不确定性、提出风险缓解方案。您的观点保守但务实。"
}
}
# 响应缓存
self.response_cache = {}
self.cache_timeout = 300 # 5分钟缓存
# 宽泛问题关键词
self.broad_question_keywords = [
"怎么办", "如何", "是否", "应该", "好不好", "行不行", "对不对",
"建议", "意见", "看法", "想法", "决策", "选择", "方向",
"策略", "方案", "计划", "未来", "发展", "改进"
]
# 常见决策场景的预定义选项
self.predefined_options = {
"目标类型": [
"提升收入/利润",
"降低成本/优化效率",
"提升用户体验/满意度",
"扩大市场份额/用户规模",
"技术创新/产品升级",
"风险控制/合规管理"
],
"时间范围": [
"1个月内紧急",
"1-3个月短期",
"3-6个月中期",
"6-12个月长期",
"1年以上战略"
],
"预算规模": [
"10万以内小规模",
"10-50万中等规模",
"50-100万较大规模",
"100万以上大规模",
"无明确预算限制"
],
"团队规模": [
"1-3人小团队",
"3-5人标准团队",
"5-10人较大团队",
"10人以上大型团队",
"外包/外部合作"
],
"优先级": [
"最高优先级(立即执行)",
"高优先级(尽快处理)",
"中等优先级(按计划执行)",
"低优先级(有空闲时处理)"
],
"风险承受度": [
"高风险高回报(激进)",
"中等风险中等回报(平衡)",
"低风险稳定回报(保守)",
"零风险(安全第一)"
]
}
def initialize_debate(self, question: str) -> Dict[str, Any]:
"""初始化辩论话题"""
return {
"question": question,
"rounds": 0,
"max_rounds": 1, # 进一步减少轮次到1轮确保10秒内完成
"current_speaker": "技术专家",
"debate_history": [],
"start_time": datetime.now()
}
def get_cache_key(self, agent_name: str, context: str) -> str:
"""生成缓存键"""
content = f"{agent_name}:{context}"
return hashlib.md5(content.encode()).hexdigest()
def get_cached_response(self, agent_name: str, context: str) -> str:
"""获取缓存响应"""
cache_key = self.get_cache_key(agent_name, context)
if cache_key in self.response_cache:
cached_time, response = self.response_cache[cache_key]
if time.time() - cached_time < self.cache_timeout:
return response
return None
def cache_response(self, agent_name: str, context: str, response: str):
"""缓存响应"""
cache_key = self.get_cache_key(agent_name, context)
self.response_cache[cache_key] = (time.time(), response)
def get_agent_response(self, agent_name: str, debate_context: str) -> str:
"""获取单个代理的回应"""
# 检查缓存
cached_response = self.get_cached_response(agent_name, debate_context)
if cached_response:
return cached_response
try:
response = self.client.chat.completions.create(
model="deepseek-chat",
messages=[
self.agents[agent_name],
{
"role": "user",
"content": f"辩论话题:{debate_context}\n\n请从你的专业角度发表简要观点不超过100字"
}
],
temperature=0.7,
max_tokens=100 # 大幅减少token数量到100
)
result = response.choices[0].message.content
# 缓存结果
self.cache_response(agent_name, debate_context, result)
return result
except Exception as e:
return f"{agent_name}:观点待补充" # 简化错误信息
def get_all_responses(self, debate_context: str) -> Dict[str, str]:
"""获取所有代理的回应"""
response_dict = {}
for agent_name in self.agents.keys():
response = self.get_agent_response(agent_name, debate_context)
response_dict[agent_name] = response
return response_dict
def conduct_fast_debate(self, debate_state: Dict[str, Any]) -> Dict[str, Any]:
"""快速辩论模式 - 同步处理"""
# 构建辩论上下文
context = f"问题:{debate_state['question']}\n"
if debate_state["debate_history"]:
context += "之前的讨论:\n"
for entry in debate_state["debate_history"][-1:]: # 只取最近1条
context += f"{entry['speaker']}{entry['content']}\n"
# 获取所有代理的回应
responses = self.get_all_responses(context)
# 更新辩论历史
for agent_name, response in responses.items():
debate_state["debate_history"].append({
"speaker": agent_name,
"content": response,
"timestamp": datetime.now().strftime("%H:%M:%S")
})
debate_state["rounds"] += 1
return debate_state
def generate_decision_summary(self, debate_history: List[Dict]) -> Dict[str, Any]:
"""生成决策要点总结和推荐度"""
try:
# 构建辩论内容摘要
debate_content = "辩论记录:\n"
for entry in debate_history:
debate_content += f"{entry['speaker']}{entry['content']}\n"
# 生成详细总结
response = self.client.chat.completions.create(
model="deepseek-chat",
messages=[
{
"role": "system",
"content": "你是一位专业的决策顾问,请基于四位专家的辩论生成决策总结和推荐度。四位专家分别是:技术专家、商业分析师、用户体验师、风险顾问。请综合分析他们的观点,给出五个推荐度等级:\n1. 强烈推荐(★★★★★)- 各方面条件都非常有利\n2. 推荐(★★★★☆)- 大部分条件有利,少量风险\n3. 中立(★★★☆☆)- 利弊相当,需要权衡\n4. 谨慎(★★☆☆☆)- 风险较多,需要谨慎考虑\n5. 不推荐(★☆☆☆☆)- 风险远大于收益\n\n对于实施建议部分,请提供具体、可操作的实行方法和时间规划,包括:\n- 具体实施步骤(分阶段说明)\n- 所需资源和人员配置\n- 时间规划(按周/月划分)\n- 关键里程碑和交付物\n- 风险应对措施"
},
{
"role": "user",
"content": f"请基于以下四位专家的辩论记录,生成决策总结和推荐度:\n\n{debate_content}\n\n请提供:\n1. 综合决策总结(包含关键观点、共识与分歧)\n2. 推荐度等级1-5星\n3. 推荐理由\n4. 风险提示\n5. 实施建议(必须包含:具体实施步骤、所需资源、时间规划、关键里程碑、风险应对措施)"
}
],
temperature=0.5,
max_tokens=600
)
summary_text = response.choices[0].message.content
# 提取推荐度等级
recommendation_level = self.extract_recommendation_level(summary_text)
return {
"summary": summary_text,
"recommendation_level": recommendation_level,
"stars": self.get_stars_display(recommendation_level)
}
except Exception as e:
return {
"summary": "决策要点生成中...",
"recommendation_level": 3,
"stars": "★★★☆☆"
}
def extract_recommendation_level(self, summary_text: str) -> int:
"""从总结文本中提取推荐度等级"""
# 更智能的关键词匹配和权重计算
positive_keywords = ["强烈推荐", "强烈建议", "非常有利", "★★★★★", "五星", "强烈支持"]
positive_weight = sum(1 for keyword in positive_keywords if keyword in summary_text)
recommend_keywords = ["推荐", "建议", "★★★★☆", "四星", "有利", "支持"]
recommend_weight = sum(1 for keyword in recommend_keywords if keyword in summary_text)
neutral_keywords = ["中立", "★★★☆☆", "三星", "权衡", "利弊相当", "需要评估"]
neutral_weight = sum(1 for keyword in neutral_keywords if keyword in summary_text)
cautious_keywords = ["谨慎", "★★☆☆☆", "二星", "风险", "需要谨慎", "注意"]
cautious_weight = sum(1 for keyword in cautious_keywords if keyword in summary_text)
negative_keywords = ["不推荐", "不建议", "★☆☆☆☆", "一星", "风险大", "反对"]
negative_weight = sum(1 for keyword in negative_keywords if keyword in summary_text)
# 根据权重最高的类别确定推荐度
weights = [negative_weight, cautious_weight, neutral_weight, recommend_weight, positive_weight]
max_weight = max(weights)
if max_weight == 0:
# 如果没有匹配到关键词使用AI直接判断
return self.ask_ai_for_recommendation(summary_text)
# 返回权重最高的推荐度从1到5
return weights.index(max_weight) + 1
def ask_ai_for_recommendation(self, summary_text: str) -> int:
"""当无法自动判断时使用AI直接判断推荐度"""
try:
response = self.client.chat.completions.create(
model="deepseek-chat",
messages=[
{
"role": "system",
"content": "你是一个推荐度判断专家。请根据决策总结内容给出1-5的推荐度评分\n5=强烈推荐4=推荐3=中立2=谨慎1=不推荐。只返回数字,不要其他内容。"
},
{
"role": "user",
"content": f"请判断以下决策总结的推荐度1-5\n\n{summary_text}"
}
],
temperature=0.3,
max_tokens=10
)
result = response.choices[0].message.content.strip()
# 提取数字
import re
numbers = re.findall(r'\d+', result)
if numbers:
level = int(numbers[0])
return max(1, min(5, level)) # 确保在1-5范围内
return 3 # 默认中立
except:
return 3 # 默认中立
def get_stars_display(self, level: int) -> str:
"""获取星级显示"""
stars = ["★☆☆☆☆", "★★☆☆☆", "★★★☆☆", "★★★★☆", "★★★★★"]
return stars[level - 1] if 1 <= level <= 5 else "★★★☆☆"
def is_broad_question(self, question: str) -> bool:
"""判断是否为宽泛问题"""
question_lower = question.lower()
# 检查问题长度(过短的问题通常是宽泛的)
if len(question.strip()) < 10:
return True
# 检查是否包含宽泛关键词
for keyword in self.broad_question_keywords:
if keyword in question_lower:
return True
# 检查是否缺乏具体细节
specific_indicators = ["具体", "详细", "数据", "预算", "时间", "人员", "资源"]
specific_count = sum(1 for indicator in specific_indicators if indicator in question_lower)
return specific_count < 1
def generate_clarification_questions(self, question: str) -> List[str]:
"""生成澄清问题以帮助细化问题范围"""
try:
response = self.client.chat.completions.create(
model="deepseek-chat",
messages=[
{
"role": "system",
"content": "你是一位专业的决策顾问擅长帮助用户细化宽泛的决策问题。请根据用户的问题生成3-5个关键澄清问题帮助用户明确决策的具体范围、目标和约束条件。"
},
{
"role": "user",
"content": f"用户的问题:{question}\n\n请生成3-5个关键澄清问题帮助用户明确决策的具体细节。"
}
],
temperature=0.7,
max_tokens=200
)
questions_text = response.choices[0].message.content
# 解析问题列表
questions = []
lines = questions_text.split('\n')
for line in lines:
line = line.strip()
if line and (line.startswith(('1.', '2.', '3.', '4.', '5.', '-', '')) or '?' in line):
# 清理格式
question_text = line.replace('1.', '').replace('2.', '').replace('3.', '').replace('4.', '').replace('5.', '').replace('-', '').replace('', '').strip()
if question_text:
questions.append(question_text)
return questions[:5] # 最多返回5个问题
except Exception as e:
# 默认澄清问题
return [
"这个决策的具体目标是什么?",
"您期望的决策时间范围是多久?",
"可用的资源或预算限制是什么?",
"决策涉及的主要利益相关者是谁?",
"您最关心的关键成功因素是什么?"
]
def refine_question_with_context(self, question: str, clarification_answers: Dict[str, str]) -> str:
"""基于澄清回答细化问题"""
refined_question = question
if clarification_answers:
context_parts = []
for key, value in clarification_answers.items():
if value.strip():
context_parts.append(f"{key}{value}")
if context_parts:
refined_question = f"{question}\n\n补充信息:{''.join(context_parts)}"
return refined_question
def get_predefined_options_for_question(self, question: str) -> Dict[str, List[str]]:
"""为澄清问题生成对应的预定义选项"""
try:
# 使用AI智能生成与问题最相关的选项
response = self.client.chat.completions.create(
model="deepseek-chat",
messages=[
{
"role": "system",
"content": "你是一位专业的决策顾问擅长帮助用户细化问题。请根据用户的问题生成最相关的3-6个选项类别每个类别包含4-8个具体选项。请以JSON格式返回{'问题类别': ['选项1', '选项2', ...]}"
},
{
"role": "user",
"content": f"用户的问题:{question}\n\n请生成最相关的选项类别和具体选项,帮助用户明确决策的具体细节。"
}
],
temperature=0.7,
max_tokens=300
)
options_text = response.choices[0].message.content
# 尝试解析AI返回的JSON格式
import json
import re
# 提取JSON部分
json_match = re.search(r'\{.*\}', options_text, re.DOTALL)
if json_match:
options_dict = json.loads(json_match.group())
return options_dict
else:
# 如果无法解析JSON使用智能关键词匹配
return self._get_options_by_keywords(question)
except Exception as e:
# 如果AI调用失败使用关键词匹配
return self._get_options_by_keywords(question)
def _get_options_by_keywords(self, question: str) -> Dict[str, List[str]]:
"""基于关键词的智能选项匹配"""
question_lower = question.lower()
# 扩展的关键词库,支持更多场景
category_patterns = {
"饮食": ["吃什么", "食物", "餐饮", "饮食", "餐厅", "做饭", "点外卖"],
"地点": ["去哪里", "地点", "位置", "场所", "地方", "旅游", "出行"],
"购物": ["买什么", "购物", "购买", "商品", "购物车", "下单", "消费"],
"学习": ["学习", "课程", "培训", "教育", "读书", "学习", "技能"],
"娱乐": ["玩什么", "娱乐", "游戏", "电影", "音乐", "休闲", "放松"],
"健康": ["健身", "运动", "健康", "养生", "锻炼", "减肥", "保健"],
"工作": ["工作", "职业", "项目", "任务", "职场", "升职", "跳槽"],
"财务": ["投资", "理财", "赚钱", "收入", "支出", "预算", "储蓄"],
"社交": ["朋友", "社交", "聚会", "约会", "家人", "同事", "人际关系"],
"时间": ["时间", "期限", "何时", "多久", "截止", "时间表", "进度"],
"目标": ["目标", "目的", "想要", "期望", "希望", "追求", "愿景"],
"资源": ["团队", "人员", "人力", "资源", "预算", "设备", "工具"]
}
# 匹配最相关的类别
matched_category = None
max_score = 0
for category, keywords in category_patterns.items():
score = sum(1 for keyword in keywords if keyword in question_lower)
if score > max_score:
max_score = score
matched_category = category
# 根据类别生成选项
if matched_category == "饮食":
return {"您想吃什么类型?": ["米饭类", "面条类", "面食类", "西餐", "快餐", "健康轻食", "火锅烧烤", "日韩料理", "中餐厅", "小吃零食"]}
elif matched_category == "地点":
return {"您想去哪里?": ["商场购物", "公园散步", "电影院", "餐厅用餐", "咖啡厅", "健身房", "旅游景点", "朋友家", "博物馆", "户外运动"]}
elif matched_category == "购物":
return {"您想购买什么?": ["电子产品", "服装鞋帽", "家居用品", "食品饮料", "书籍文具", "运动器材", "化妆品", "礼品", "汽车用品", "宠物用品"]}
elif matched_category == "学习":
return {"您想学习什么?": ["编程技术", "语言学习", "职业技能", "兴趣爱好", "考试备考", "在线课程", "书籍阅读", "实践项目", "艺术创作", "科学知识"]}
elif matched_category == "娱乐":
return {"您想怎么娱乐?": ["看电影", "玩游戏", "听音乐", "阅读书籍", "运动健身", "朋友聚会", "旅游出行", "美食探索", "艺术创作", "户外活动"]}
elif matched_category == "健康":
return {"您关注哪方面健康?": ["健身锻炼", "饮食营养", "心理健康", "睡眠质量", "体检检查", "疾病预防", "养生保健", "体重管理", "运动康复", "压力管理"]}
elif matched_category == "工作":
return {"您的工作需求是?": ["项目推进", "团队管理", "技能提升", "职业发展", "工作效率", "工作生活平衡", "薪资待遇", "工作环境", "职业规划", "工作压力"]}
elif matched_category == "财务":
return {"您的财务目标是?": ["投资理财", "储蓄规划", "消费控制", "收入提升", "债务管理", "退休规划", "保险配置", "税务优化", "创业融资", "房产投资"]}
elif matched_category == "社交":
return {"您的社交需求是?": ["朋友聚会", "家人团聚", "同事交流", "约会交友", "社区活动", "网络社交", "商务合作", "团队建设", "公益活动", "兴趣小组"]}
else:
# 通用决策问题
return {"请明确具体方向": ["目标设定", "时间规划", "资源分配", "风险评估", "优先级排序", "实施方案", "预期效果", "备选方案"]}
def main():
"""主应用函数"""
st.set_page_config(
page_title="AI经理人为你把关",
page_icon="🤖",
layout="wide"
)
# 初始化工作坊
if "workshop" not in st.session_state:
st.session_state.workshop = AIManagerDecisionAssistant()
if "debate_state" not in st.session_state:
st.session_state.debate_state = None
if "debate_in_progress" not in st.session_state:
st.session_state.debate_in_progress = False
if "ultra_fast_mode" not in st.session_state:
st.session_state.ultra_fast_mode = True
if "needs_clarification" not in st.session_state:
st.session_state.needs_clarification = False
if "clarification_questions" not in st.session_state:
st.session_state.clarification_questions = []
if "clarification_answers" not in st.session_state:
st.session_state.clarification_answers = {}
if "refined_question" not in st.session_state:
st.session_state.refined_question = ""
if "predefined_options_map" not in st.session_state:
st.session_state.predefined_options_map = {}
if "selected_options" not in st.session_state:
st.session_state.selected_options = {}
# 主页面布局
st.title("🤖 AI经理人为你把关")
st.markdown("### ⚡ 专业AI决策辅助系统 - 同步处理 + 智能缓存")
# 决策问题输入区域 - 融合到主页面
col1, col2 = st.columns([3, 1])
with col1:
st.markdown("### 💭 请输入决策问题")
question = st.text_area(
"决策问题",
placeholder="例如:我们应该开发一个新的移动应用吗?",
height=80,
label_visibility="collapsed"
)
# 开始把关按钮
col_btn1, col_btn2 = st.columns([1, 1])
with col_btn1:
if st.button("🚀 开始把关", type="primary", use_container_width=True):
if question.strip():
# 检查是否为宽泛问题
if st.session_state.workshop.is_broad_question(question):
st.session_state.needs_clarification = True
st.session_state.clarification_questions = st.session_state.workshop.generate_clarification_questions(question)
st.session_state.refined_question = question
st.rerun()
else:
# 直接开始把关
st.session_state.debate_state = st.session_state.workshop.initialize_debate(question)
st.session_state.debate_in_progress = True
# 立即执行一轮把关
if st.session_state.ultra_fast_mode:
progress_bar = st.progress(0)
status_text = st.empty()
# 执行所有轮次
for round_num in range(st.session_state.debate_state["max_rounds"]):
progress = (round_num + 1) / st.session_state.debate_state["max_rounds"]
status_text.text(f"正在处理第 {round_num + 1} 轮...")
progress_bar.progress(progress)
# 执行一轮把关
st.session_state.debate_state = st.session_state.workshop.conduct_fast_debate(
st.session_state.debate_state
)
status_text.text("✅ 把关完成!")
progress_bar.progress(1.0)
st.session_state.debate_in_progress = False
st.rerun()
else:
st.warning("请输入决策问题")
with col_btn2:
if st.session_state.debate_in_progress and not st.session_state.ultra_fast_mode:
if st.button("⏹️ 结束把关", use_container_width=True):
st.session_state.debate_in_progress = False
st.rerun()
with col2:
st.markdown("### ⚙️ 设置")
# 超快速模式开关
ultra_fast_mode = st.toggle("⚡ 超快速模式", value=True, help="启用异步并行处理 + 智能缓存响应速度提升500%")
st.session_state.ultra_fast_mode = ultra_fast_mode
if ultra_fast_mode:
st.success("⚡ 超快速模式已启用")
else:
st.info("标准模式")
st.markdown("---")
st.markdown("### 👥 参与代理")
for agent_name in st.session_state.workshop.agents.keys():
st.markdown(f"- {agent_name}")
# 问题细化界面
if st.session_state.needs_clarification:
st.markdown("---")
st.markdown("### 🔍 问题细化")
st.info("💡 **检测到宽泛问题**:为了提供更精准的分析,请先回答以下澄清问题")
st.markdown(f"**原始问题**{st.session_state.refined_question}")
# 生成预定义选项映射
if not st.session_state.predefined_options_map:
for question in st.session_state.clarification_questions:
st.session_state.predefined_options_map[question] = st.session_state.workshop.get_predefined_options_for_question(question)
# 显示澄清问题 - 选项式选择
for i, question in enumerate(st.session_state.clarification_questions, 1):
with st.container():
st.markdown(f"**{i}. {question}**")
# 显示当前选择状态
current_answer = st.session_state.clarification_answers.get(question)
if current_answer:
st.success(f"✅ 已选择:{current_answer}")
else:
st.info("💡 请点击选择或手动输入")
# 获取该问题的预定义选项
options_dict = st.session_state.predefined_options_map.get(question, {})
if options_dict:
# 显示选项按钮
for option_key, option_list in options_dict.items():
st.markdown(f"**{option_key}**")
# 根据选项数量动态调整列数
num_cols = min(len(option_list), 4) # 最多4列
cols = st.columns(num_cols)
for j, option in enumerate(option_list):
col_idx = j % num_cols
with cols[col_idx]:
# 检查是否已选择该选项
is_selected = st.session_state.selected_options.get(f"{question}_{option_key}") == option
# 使用更明显的视觉区分
button_text = f"{option}" if is_selected else option
button_type = "primary" if is_selected else "secondary"
# 生成唯一的key包含问题、选项类别和选项的哈希值
unique_key = f"option_{hash(question)}_{hash(option_key)}_{hash(option)}"
if st.button(button_text,
key=unique_key,
use_container_width=True,
type=button_type):
# 切换选择状态
if is_selected:
st.session_state.selected_options.pop(f"{question}_{option_key}", None)
st.session_state.clarification_answers.pop(question, None)
else:
# 清除该问题的其他选项
for key in list(st.session_state.selected_options.keys()):
if key.startswith(f"{question}_"):
st.session_state.selected_options.pop(key, None)
st.session_state.selected_options[f"{question}_{option_key}"] = option
st.session_state.clarification_answers[question] = f"{option_key}{option}"
st.rerun()
# 仍然提供文本输入作为备选
st.markdown("**或手动输入:**")
custom_answer = st.text_input(
f"手动输入回答",
key=f"custom_{i}",
placeholder="如果以上选项不合适,请在此输入...",
value=current_answer if current_answer and "" not in current_answer else ""
)
if custom_answer.strip():
# 如果手动输入,清除选项选择
for key in list(st.session_state.selected_options.keys()):
if key.startswith(f"{question}_"):
st.session_state.selected_options.pop(key, None)
st.session_state.clarification_answers[question] = custom_answer
st.markdown("---")
# 确认按钮
col_confirm1, col_confirm2 = st.columns([1, 1])
with col_confirm1:
if st.button("✅ 确认并开始把关", type="primary", use_container_width=True):
# 细化问题
refined_question = st.session_state.workshop.refine_question_with_context(
st.session_state.refined_question,
st.session_state.clarification_answers
)
# 开始把关
st.session_state.debate_state = st.session_state.workshop.initialize_debate(refined_question)
st.session_state.debate_in_progress = True
st.session_state.needs_clarification = False
# 立即执行一轮把关
if st.session_state.ultra_fast_mode:
progress_bar = st.progress(0)
status_text = st.empty()
# 执行所有轮次
for round_num in range(st.session_state.debate_state["max_rounds"]):
progress = (round_num + 1) / st.session_state.debate_state["max_rounds"]
status_text.text(f"正在处理第 {round_num + 1} 轮...")
progress_bar.progress(progress)
# 执行一轮把关
st.session_state.debate_state = st.session_state.workshop.conduct_fast_debate(
st.session_state.debate_state
)
status_text.text("✅ 把关完成!")
progress_bar.progress(1.0)
st.session_state.debate_in_progress = False
st.rerun()
with col_confirm2:
if st.button("🔄 跳过细化", use_container_width=True):
# 直接开始把关,不细化问题
st.session_state.debate_state = st.session_state.workshop.initialize_debate(st.session_state.refined_question)
st.session_state.debate_in_progress = True
st.session_state.needs_clarification = False
# 立即执行一轮把关
if st.session_state.ultra_fast_mode:
progress_bar = st.progress(0)
status_text = st.empty()
# 执行所有轮次
for round_num in range(st.session_state.debate_state["max_rounds"]):
progress = (round_num + 1) / st.session_state.debate_state["max_rounds"]
status_text.text(f"正在处理第 {round_num + 1} 轮...")
progress_bar.progress(progress)
# 执行一轮把关
st.session_state.debate_state = st.session_state.workshop.conduct_fast_debate(
st.session_state.debate_state
)
status_text.text("✅ 把关完成!")
progress_bar.progress(1.0)
st.session_state.debate_in_progress = False
st.rerun()
st.markdown("---")
# 侧边栏 - 简化控制面板
with st.sidebar:
st.header("📋 快速决策模板")
templates = [
"产品功能优先级排序",
"技术方案选择评估",
"市场进入策略分析",
"团队资源分配决策"
]
for template in templates:
if st.button(template, use_container_width=True):
st.session_state.debate_state = st.session_state.workshop.initialize_debate(template)
st.session_state.debate_in_progress = True
st.rerun()
st.markdown("---")
st.markdown("### 📊 性能指标")
st.markdown("""
- **响应速度**: 3-6
- **缓存命中率**: 重复问题秒级响应
- **并行处理**: 4个代理同时工作
""")
# 把关结果显示区域
if st.session_state.debate_state:
st.markdown("### 🗣️ 经理人把关过程")
# 显示把关进度
progress = min(st.session_state.debate_state["rounds"] / st.session_state.debate_state["max_rounds"], 1.0)
st.progress(progress, text=f"把关进度:{st.session_state.debate_state['rounds']}/{st.session_state.debate_state['max_rounds']}")
# 显示把关历史
for entry in st.session_state.debate_state["debate_history"]:
with st.chat_message("user", avatar=f"👤"):
st.markdown(f"**{entry['speaker']}** ({entry['timestamp']})")
st.write(entry["content"])
# 显示当前状态
if st.session_state.debate_in_progress:
st.info(f"🔄 等待 {st.session_state.debate_state['current_speaker']} 发言...")
else:
st.success("✅ 把关已完成!")
# 生成总结按钮
col_sum1, col_sum2 = st.columns([2, 1])
with col_sum1:
if st.button("📊 生成决策总结和推荐度", type="secondary", use_container_width=True):
with st.spinner("正在生成决策要点和推荐度..."):
summary_data = st.session_state.workshop.generate_decision_summary(
st.session_state.debate_state["debate_history"]
)
st.session_state.summary_data = summary_data
st.rerun()
# 显示推荐度 - 确保总是显示
if "summary_data" in st.session_state:
summary_data = st.session_state.summary_data
# 显示推荐度
st.markdown("### 🎯 决策推荐度")
# 根据推荐度显示不同的颜色和图标
recommendation_level = summary_data["recommendation_level"]
stars = summary_data["stars"]
if recommendation_level >= 4:
st.success(f"🎯 推荐度:{stars} (强烈推荐)")
elif recommendation_level == 3:
st.info(f"⚖️ 推荐度:{stars} (中立)")
elif recommendation_level == 2:
st.warning(f"⚠️ 推荐度:{stars} (谨慎)")
else:
st.error(f"❌ 推荐度:{stars} (不推荐)")
# 显示详细总结
st.markdown("### 📋 决策要点总结")
st.write(summary_data["summary"])
else:
# 初始状态显示
st.markdown("### 💡 欢迎使用AI经理人为你把关")
st.markdown("""
这是一个基于AI的专业决策辅助系统四位AI经理人将从不同专业角度为您把关决策
**👥 参与经理人**
- **技术专家** - 技术可行性实现难度
- **战略顾问** - 战略规划竞争优势
- **用户体验师** - 用户需求易用性
- **风险顾问** - 潜在风险不确定性
**🚀 使用方法**
1. 在上方输入您的决策问题
2. 点击"开始把关"按钮
3. 观察经理人们的专业分析
4. 生成综合决策总结和推荐度
** 特色功能**
- 五位推荐度评级系统
- 具体实施建议和时间规划
- 超快速响应3-6
- 智能缓存技术
""")
# 显示示例问题
st.markdown("### 📝 示例问题")
examples = [
"我们是否应该投资AI技术",
"是否应该开发新的移动应用?",
"如何优化现有产品的用户体验?",
"团队资源应该如何分配?"
]
for example in examples:
if st.button(example, use_container_width=True):
st.session_state.debate_state = st.session_state.workshop.initialize_debate(example)
st.session_state.debate_in_progress = True
st.rerun()
# 启动应用
if __name__ == "__main__":
main()