from pydantic import BaseModel, Field import joblib import json import pandas as pd import numpy as np import logging import os import random # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ========================================== # 1. 定义 Pydantic 数据模型 # ========================================== class CustomerFeatures(BaseModel): """客户特征输入模型""" age: int = Field(ge=18, le=120, description="客户年龄") job: str = Field(description="职业") marital: str = Field(description="婚姻状况") education: str = Field(description="教育程度") default: str = Field(pattern="^(yes|no)$", description="是否有违约记录") balance: int = Field(description="账户余额") housing: str = Field(pattern="^(yes|no)$", description="是否有住房贷款") loan: str = Field(pattern="^(yes|no)$", description="是否有个人贷款") contact: str = Field(description="联系方式") day: int = Field(ge=1, le=31, description="最后联系日") month: str = Field(description="最后联系月份") campaign: int = Field(ge=1, description="本次活动联系次数") pdays: int = Field(description="距离上次联系天数 (-1代表未联系)") previous: int = Field(ge=0, description="活动前联系次数") poutcome: str = Field(description="上次活动结果") # 注意:我们不包含 duration,因为它是事后变量 class Decision(BaseModel): """Agent 输出的结构化决策""" risk_score: float = Field(ge=0, le=1, description="预测购买概率 (0-1)") customer_segment: str = Field(description="客户分群 (如: 高价值/潜在/沉睡)") decision: str = Field(description="建议策略 (如: 立即致电/邮件触达/放弃)") actions: list[str] = Field(description="可执行动作清单") rationale: str = Field(description="决策依据 (结合模型预测与业务规则)") # ========================================== # 2. 定义 Agent 类 # ========================================== class MarketingAgent: def __init__(self, model_path="models/model_artifacts.pkl"): self.model_path = model_path self.artifacts = None self._load_model() def _load_model(self): if os.path.exists(self.model_path): self.artifacts = joblib.load(self.model_path) logger.info(f"Agent 已加载模型: {self.model_path}") else: logger.warning(f"模型文件不存在: {self.model_path},Agent 将无法进行预测") def predict_risk(self, features: CustomerFeatures) -> dict: """ Tool 1: 调用 ML 模型预测购买概率 """ if not self.artifacts: return {"score": 0.0, "reason": "Model not loaded"} # 转换输入为 DataFrame data = features.model_dump() df = pd.DataFrame([data]) # 预处理 (使用训练时保存的 encoder) # 注意:这里需要严格复现训练时的预处理逻辑 # 训练时我们做了 Label Encoding for col, le in self.artifacts['encoders'].items(): if col in df.columns: # 处理未知类别 try: df[col] = le.transform(df[col].astype(str)) except: # 遇到未知类别,这里简单处理为 0 (或者 mode) logger.warning(f"Unknown category in {col}") df[col] = 0 # 确保列顺序一致 # 我们训练时用了 X (df.drop(target)) # 这里需要筛选出 numeric_cols + categorical_cols # 简单起见,我们假设 feature names 保存了顺序 feature_names = self.artifacts['features'] # 补齐可能缺失的列 for col in feature_names: if col not in df.columns: df[col] = 0 X_input = df[feature_names] # 预测 model = self.artifacts['lgb_model'] # 优先使用 LightGBM prob = model.predict_proba(X_input)[0][1] return { "score": float(prob), "top_features": ["balance", "poutcome"] # 这里简化,实际可用 SHAP } def get_strategy(self, score: float) -> dict: """ Tool 2: 规则引擎/检索工具 """ if score > 0.6: return { "segment": "高意向 VIP", "action_type": "人工介入", "templates": ["尊贵的客户,鉴于您...", "专属理财经理一对一服务"] } elif score > 0.3: return { "segment": "潜在客户", "action_type": "自动化营销", "templates": ["你好,近期理财活动...", "点击领取加息券"] } else: return { "segment": "低意向群体", "action_type": "静默/邮件", "templates": ["月度财经摘要"] } def run(self, features: CustomerFeatures) -> Decision: """ Agent 主流程 """ logger.info(f"Agent 正在处理客户: {features.job}, {features.age}岁") # 1. 感知 (调用 ML 工具) pred_result = self.predict_risk(features) score = pred_result["score"] # 2. 规划 (调用 策略工具) strategy = self.get_strategy(score) # 3. 决策 (模拟 LLM 整合) # 在真实场景中,这里构建 Prompt 发送给 DeepSeek # 这里我们用 Python 逻辑模拟 LLM 的结构化输出能力 decision = Decision( risk_score=round(score, 4), customer_segment=strategy["segment"], decision=f"建议采取 {strategy['action_type']}", actions=[f"使用话术: {t}" for t in strategy["templates"]], rationale=f"模型预测概率为 {score:.1%},属于{strategy['segment']}。该群体对{strategy['action_type']}转化率较高。" ) return decision if __name__ == "__main__": # 测试 Agent agent = MarketingAgent() # 构造一个测试用例 test_customer = CustomerFeatures( age=35, job="management", marital="married", education="tertiary", default="no", balance=2000, housing="yes", loan="no", contact="cellular", day=15, month="may", campaign=1, pdays=-1, previous=0, poutcome="unknown" ) result = agent.run(test_customer) print("\n=== Agent Decision ===") print(result.model_dump_json(indent=2))