From e5a52990c5edfb01872ca461847a89a3efb05afd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=88=99=E6=96=87?= Date: Thu, 15 Jan 2026 15:36:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20src?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/tweet_data.py | 315 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 src/tweet_data.py diff --git a/src/tweet_data.py b/src/tweet_data.py new file mode 100644 index 0000000..7d9237b --- /dev/null +++ b/src/tweet_data.py @@ -0,0 +1,315 @@ +"""文本数据清洗模块 + +针对 Tweets.csv 航空情感分析数据集的文本清洗。 +遵循「克制」原则,仅进行必要的预处理,保留文本语义信息。 + +清洗策略: +1. 文本标准化:统一小写(不进行词形还原/词干提取,保留原始语义) +2. 去除噪声:移除用户提及(@username)、URL链接、多余空格 +3. 保留语义:保留表情符号、标点符号(它们对情感分析有价值) +4. 最小化处理:不进行停用词删除(否定词如"not"、"don't"对情感很重要) +""" + +import re +from pathlib import Path + +import pandera.polars as pa +import polars as pl + + +# --- Pandera Schema 定义 --- + + +class RawTweetSchema(pa.DataFrameModel): + """原始推文数据 Schema(清洗前校验) + + 允许缺失值存在,用于验证数据读取后的基本结构。 + """ + tweet_id: int = pa.Field(nullable=False) + airline_sentiment: str = pa.Field(nullable=True) + airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=True) + negativereason: str = pa.Field(nullable=True) + negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True) + airline: str = pa.Field(nullable=True) + text: str = pa.Field(nullable=True) + tweet_coord: str = pa.Field(nullable=True) + tweet_created: str = pa.Field(nullable=True) + tweet_location: str = pa.Field(nullable=True) + user_timezone: str = pa.Field(nullable=True) + + class Config: + strict = False + coerce = True + + +class CleanTweetSchema(pa.DataFrameModel): + """清洗后推文数据 Schema(严格模式) + + 不允许缺失值,强制约束检查。 + """ + tweet_id: int = pa.Field(nullable=False) + airline_sentiment: str = pa.Field(isin=["positive", "neutral", "negative"], nullable=False) + airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=False) + negativereason: str = pa.Field(nullable=True) + negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True) + airline: str = pa.Field(isin=["Virgin America", "United", "Southwest", "Delta", "US Airways", "American"], nullable=False) + text_cleaned: str = pa.Field(nullable=False) + text_original: str = pa.Field(nullable=False) + + class Config: + strict = True + coerce = True + + +# --- 文本清洗函数 --- + + +def clean_text(text: str) -> str: + """文本清洗函数(克制策略) + + 清洗原则: + - 移除:用户提及(@username)、URL链接、多余空格 + - 保留:表情符号、标点符号、否定词、原始大小写(后续统一小写) + - 不做:词形还原、词干提取、停用词删除 + + Args: + text: 原始文本 + + Returns: + str: 清洗后的文本 + """ + if not text or not isinstance(text, str): + return "" + + # 1. 移除用户提及 (@username) + text = re.sub(r'@\w+', '', text) + + # 2. 移除 URL 链接 + text = re.sub(r'http\S+|www\S+', '', text) + + # 3. 移除多余空格和换行 + text = re.sub(r'\s+', ' ', text).strip() + + return text + + +def normalize_text(text: str) -> str: + """文本标准化 + + 统一小写,但不进行词形还原或词干提取。 + + Args: + text: 清洗后的文本 + + Returns: + str: 标准化后的文本 + """ + if not text or not isinstance(text, str): + return "" + + # 仅统一小写 + return text.lower() + + +# --- 数据加载与预处理 --- + + +def load_tweets(file_path: str | Path = "Tweets.csv") -> pl.DataFrame: + """加载原始推文数据 + + Args: + file_path: CSV 文件路径 + + Returns: + pl.DataFrame: 原始推文数据 + """ + df = pl.read_csv(file_path) + return df + + +def validate_raw_tweets(df: pl.DataFrame) -> pl.DataFrame: + """验证原始推文数据结构(清洗前) + + Args: + df: 原始 Polars DataFrame + + Returns: + pl.DataFrame: 验证通过的 DataFrame + + Raises: + pa.errors.SchemaError: 验证失败 + """ + return RawTweetSchema.validate(df) + + +def validate_clean_tweets(df: pl.DataFrame) -> pl.DataFrame: + """验证清洗后推文数据(严格模式) + + Args: + df: 清洗后的 Polars DataFrame + + Returns: + pl.DataFrame: 验证通过的 DataFrame + + Raises: + pa.errors.SchemaError: 验证失败 + """ + return CleanTweetSchema.validate(df) + + +def preprocess_tweets( + df: pl.DataFrame, + validate: bool = True, + min_confidence: float = 0.5 +) -> pl.DataFrame: + """推文数据预处理流水线 + + 处理步骤: + 1. 筛选:仅保留情感置信度 >= min_confidence 的样本 + 2. 文本清洗:应用 clean_text 和 normalize_text + 3. 删除缺失值:删除 text 为空的样本 + 4. 删除重复行:基于 tweet_id 去重 + 5. 可选:进行 Schema 校验 + + Args: + df: 原始 Polars DataFrame + validate: 是否进行清洗后 Schema 校验 + min_confidence: 最低情感置信度阈值 + + Returns: + pl.DataFrame: 清洗后的 DataFrame + """ + # 1. 筛选高置信度样本 + df_filtered = df.filter( + pl.col("airline_sentiment_confidence") >= min_confidence + ) + + # 2. 文本清洗和标准化 + df_clean = df_filtered.with_columns([ + pl.col("text").map_elements(clean_text, return_dtype=pl.String).alias("text_cleaned"), + pl.col("text").alias("text_original"), + ]) + + df_clean = df_clean.with_columns([ + pl.col("text_cleaned").map_elements(normalize_text, return_dtype=pl.String).alias("text_cleaned"), + ]) + + # 3. 删除缺失值(text_cleaned 为空或 airline_sentiment 为空) + df_clean = df_clean.filter( + (pl.col("text_cleaned").is_not_null()) & + (pl.col("text_cleaned") != "") & + (pl.col("airline_sentiment").is_not_null()) + ) + + # 4. 删除重复行(基于 tweet_id) + df_clean = df_clean.unique(subset=["tweet_id"], keep="first") + + # 5. 选择需要的列 + df_clean = df_clean.select([ + "tweet_id", + "airline_sentiment", + "airline_sentiment_confidence", + "negativereason", + "negativereason_confidence", + "airline", + "text_cleaned", + "text_original", + ]) + + # 6. 可选校验 + if validate: + df_clean = validate_clean_tweets(df_clean) + + return df_clean + + +def save_cleaned_tweets(df: pl.DataFrame, output_path: str | Path = "data/Tweets_cleaned.csv") -> None: + """保存清洗后的数据 + + Args: + df: 清洗后的 Polars DataFrame + output_path: 输出文件路径 + """ + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + df.write_csv(output_path) + print(f"清洗后的数据已保存至 {output_path}") + + +def load_cleaned_tweets(file_path: str | Path = "data/Tweets_cleaned.csv") -> pl.DataFrame: + """加载清洗后的推文数据 + + Args: + file_path: 清洗后的 CSV 文件路径 + + Returns: + pl.DataFrame: 清洗后的推文数据 + """ + df = pl.read_csv(file_path) + return df + + +# --- 数据统计与分析 --- + + +def print_data_summary(df: pl.DataFrame, title: str = "数据统计") -> None: + """打印数据摘要信息 + + Args: + df: Polars DataFrame + title: 标题 + """ + print(f"\n{'='*60}") + print(f"{title}") + print(f"{'='*60}") + print(f"样本总数: {len(df)}") + print(f"\n情感分布:") + print(df.group_by("airline_sentiment").agg( + pl.len().alias("count"), + (pl.len() / len(df) * 100).alias("percentage") + ).sort("count", descending=True)) + + print(f"\n航空公司分布:") + print(df.group_by("airline").agg( + pl.len().alias("count"), + (pl.len() / len(df) * 100).alias("percentage") + ).sort("count", descending=True)) + + print(f"\n文本长度统计:") + df_with_length = df.with_columns([ + pl.col("text_cleaned").str.len_chars().alias("text_length") + ]) + print(df_with_length.select([ + pl.col("text_length").min().alias("最小长度"), + pl.col("text_length").max().alias("最大长度"), + pl.col("text_length").mean().alias("平均长度"), + pl.col("text_length").median().alias("中位数长度"), + ])) + + +if __name__ == "__main__": + print(">>> 1. 加载原始数据") + df_raw = load_tweets("Tweets.csv") + print(f"原始数据样本数: {len(df_raw)}") + print(df_raw.head(3)) + + print("\n>>> 2. 验证原始数据") + df_validated = validate_raw_tweets(df_raw) + print("✅ 原始数据验证通过") + + print("\n>>> 3. 清洗数据") + df_clean = preprocess_tweets(df_validated, validate=True, min_confidence=0.5) + print(f"清洗后样本数: {len(df_clean)} (原始: {len(df_raw)})") + print("✅ 清洗后数据验证通过") + + print("\n>>> 4. 保存清洗后的数据") + save_cleaned_tweets(df_clean, "data/Tweets_cleaned.csv") + + print("\n>>> 5. 数据统计") + print_data_summary(df_clean, "清洗后数据统计") + + print("\n>>> 6. 清洗示例对比") + print("\n原始文本:") + print(df_clean.select("text_original").head(3).to_pandas()["text_original"].to_string(index=False)) + print("\n清洗后文本:") + print(df_clean.select("text_cleaned").head(3).to_pandas()["text_cleaned"].to_string(index=False))