"""DeepSeek API 驱动的智能情感分析 Agent 使用 DeepSeek API 实现实时、智能的推文情感分析、解释和处置方案生成。 """ import os from typing import Optional, Dict, Any import httpx from pydantic import BaseModel, Field class DeepSeekClient: """DeepSeek API 客户端""" def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None): self.api_key = api_key or os.getenv("DEEPSEEK_API_KEY") self.base_url = base_url or os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com") if not self.api_key: raise ValueError("DeepSeek API Key 未配置,请设置 DEEPSEEK_API_KEY 环境变量") async def chat_completion(self, messages: list, model: str = "deepseek-chat", temperature: float = 0.7) -> str: """调用 DeepSeek API 进行对话补全""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": model, "messages": messages, "temperature": temperature, "stream": False } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{self.base_url}/chat/completions", headers=headers, json=data ) if response.status_code == 200: result = response.json() return result["choices"][0]["message"]["content"] else: raise Exception(f"DeepSeek API 调用失败: {response.status_code} - {response.text}") class SentimentAnalysisResult(BaseModel): """情感分析结果""" sentiment: str = Field(description="情感类别: negative/neutral/positive") confidence: float = Field(description="置信度 (0-1)") reasoning: str = Field(description="情感判断的推理过程") key_factors: list[str] = Field(description="影响情感判断的关键因素") class DisposalPlan(BaseModel): """处置方案""" priority: str = Field(description="处理优先级: high/medium/low") action_type: str = Field(description="行动类型: response/investigate/monitor/ignore") suggested_response: Optional[str] = Field(description="建议回复内容", default=None) follow_up_actions: list[str] = Field(description="后续行动建议") reasoning: str = Field(description="处置方案制定的理由") class TweetAnalysisResult(BaseModel): """推文分析完整结果""" tweet_text: str = Field(description="原始推文文本") airline: str = Field(description="航空公司") sentiment_analysis: SentimentAnalysisResult = Field(description="情感分析结果") disposal_plan: DisposalPlan = Field(description="处置方案") class DeepSeekTweetAgent: """基于 DeepSeek API 的智能推文分析 Agent""" def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None): self.client = DeepSeekClient(api_key, base_url) async def analyze_sentiment(self, text: str, airline: str) -> SentimentAnalysisResult: """使用 DeepSeek API 进行情感分析""" prompt = f""" 请分析以下航空推文的情感倾向,并给出详细的分析过程: 推文内容:"{text}" 航空公司:{airline} 请按照以下格式输出分析结果: 1. 情感类别:negative/neutral/positive 2. 置信度:0-1之间的数值 3. 关键因素:列出影响情感判断的关键因素(最多5个) 4. 推理过程:详细说明情感判断的推理过程 请确保分析准确、客观,并考虑航空行业的特殊性。 """ messages = [ {"role": "system", "content": "你是一位专业的航空行业情感分析专家,擅长分析推文中的情感倾向和潜在问题。"}, {"role": "user", "content": prompt} ] response = await self.client.chat_completion(messages, temperature=0.3) # 解析响应并构建结果 return self._parse_sentiment_response(response, text, airline) async def generate_disposal_plan(self, text: str, airline: str, sentiment_result: SentimentAnalysisResult) -> DisposalPlan: """生成处置方案""" prompt = f""" 基于以下推文分析和情感判断结果,为航空公司制定一个合理的处置方案: 推文内容:"{text}" 航空公司:{airline} 情感分析结果: - 情感类别:{sentiment_result.sentiment} - 置信度:{sentiment_result.confidence} - 关键因素:{', '.join(sentiment_result.key_factors)} - 推理过程:{sentiment_result.reasoning} 请按照以下格式输出处置方案: 1. 优先级:high/medium/low 2. 行动类型:response/investigate/monitor/ignore 3. 建议回复:如果行动类型是response,提供具体的回复建议 4. 后续行动:列出2-4个具体的后续行动建议 5. 制定理由:说明为什么制定这样的处置方案 请确保处置方案符合航空行业的服务标准和客户关系管理最佳实践。 """ messages = [ {"role": "system", "content": "你是一位航空公司的客户服务专家,擅长制定合理的客户反馈处置方案。"}, {"role": "user", "content": prompt} ] response = await self.client.chat_completion(messages, temperature=0.5) return self._parse_disposal_response(response) async def analyze_tweet(self, text: str, airline: str) -> TweetAnalysisResult: """完整的推文分析流程""" # 1. 情感分析 sentiment_result = await self.analyze_sentiment(text, airline) # 2. 生成处置方案 disposal_plan = await self.generate_disposal_plan(text, airline, sentiment_result) # 3. 返回完整结果 return TweetAnalysisResult( tweet_text=text, airline=airline, sentiment_analysis=sentiment_result, disposal_plan=disposal_plan ) def _parse_sentiment_response(self, response: str, text: str, airline: str) -> SentimentAnalysisResult: """解析情感分析响应""" # 简化解析逻辑,实际应用中可以使用更复杂的解析 lines = response.strip().split('\n') sentiment = "neutral" confidence = 0.5 key_factors = [] reasoning = "" for line in lines: line = line.strip() if line.startswith("情感类别:"): sentiment = line.replace("情感类别:", "").strip().lower() elif line.startswith("置信度:"): try: confidence = float(line.replace("置信度:", "").strip()) except: confidence = 0.5 elif line.startswith("关键因素:"): factors = line.replace("关键因素:", "").strip() key_factors = [f.strip() for f in factors.split(',') if f.strip()] elif line.startswith("推理过程:"): reasoning = line.replace("推理过程:", "").strip() # 如果解析失败,使用默认值 if not reasoning: reasoning = f"基于推文内容和航空行业特点进行综合分析,判断情感倾向为{sentiment}。" return SentimentAnalysisResult( sentiment=sentiment, confidence=confidence, reasoning=reasoning, key_factors=key_factors ) def _parse_disposal_response(self, response: str) -> DisposalPlan: """解析处置方案响应""" lines = response.strip().split('\n') priority = "medium" action_type = "monitor" suggested_response = None follow_up_actions = [] reasoning = "" for line in lines: line = line.strip() if line.startswith("优先级:"): priority = line.replace("优先级:", "").strip().lower() elif line.startswith("行动类型:"): action_type = line.replace("行动类型:", "").strip().lower() elif line.startswith("建议回复:"): suggested_response = line.replace("建议回复:", "").strip() elif line.startswith("后续行动:"): actions = line.replace("后续行动:", "").strip() follow_up_actions = [a.strip() for a in actions.split(',') if a.strip()] elif line.startswith("制定理由:"): reasoning = line.replace("制定理由:", "").strip() if not reasoning: reasoning = "基于情感分析结果和航空行业最佳实践制定此处置方案。" return DisposalPlan( priority=priority, action_type=action_type, suggested_response=suggested_response, follow_up_actions=follow_up_actions, reasoning=reasoning ) # 同步版本的包装函数(为了兼容现有接口) import asyncio def analyze_tweet_deepseek(text: str, airline: str) -> TweetAnalysisResult: """同步版本的推文分析函数""" agent = DeepSeekTweetAgent() return asyncio.run(agent.analyze_tweet(text, airline))