G05-Sentiment-Analysis-of-A.../src/tweet_data.py
2026-01-15 23:25:47 +08:00

316 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""文本数据清洗模块
针对 Tweets.csv 航空情感分析数据集的文本清洗。
遵循「克制」原则,仅进行必要的预处理,保留文本语义信息。
清洗策略:
1. 文本标准化:统一小写(不进行词形还原/词干提取,保留原始语义)
2. 去除噪声:移除用户提及(@username)、URL链接、多余空格
3. 保留语义:保留表情符号、标点符号(它们对情感分析有价值)
4. 最小化处理:不进行停用词删除(否定词如"not""don't"对情感很重要)
"""
import re
from pathlib import Path
import pandera.polars as pa
import polars as pl
# --- Pandera Schema 定义 ---
class RawTweetSchema(pa.DataFrameModel):
"""原始推文数据 Schema清洗前校验
允许缺失值存在,用于验证数据读取后的基本结构。
"""
tweet_id: int = pa.Field(nullable=False)
airline_sentiment: str = pa.Field(nullable=True)
airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=True)
negativereason: str = pa.Field(nullable=True)
negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True)
airline: str = pa.Field(nullable=True)
text: str = pa.Field(nullable=True)
tweet_coord: str = pa.Field(nullable=True)
tweet_created: str = pa.Field(nullable=True)
tweet_location: str = pa.Field(nullable=True)
user_timezone: str = pa.Field(nullable=True)
class Config:
strict = False
coerce = True
class CleanTweetSchema(pa.DataFrameModel):
"""清洗后推文数据 Schema严格模式
不允许缺失值,强制约束检查。
"""
tweet_id: int = pa.Field(nullable=False)
airline_sentiment: str = pa.Field(isin=["positive", "neutral", "negative"], nullable=False)
airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=False)
negativereason: str = pa.Field(nullable=True)
negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True)
airline: str = pa.Field(isin=["Virgin America", "United", "Southwest", "Delta", "US Airways", "American"], nullable=False)
text_cleaned: str = pa.Field(nullable=False)
text_original: str = pa.Field(nullable=False)
class Config:
strict = True
coerce = True
# --- 文本清洗函数 ---
def clean_text(text: str) -> str:
"""文本清洗函数(克制策略)
清洗原则:
- 移除:用户提及(@username)、URL链接、多余空格
- 保留:表情符号、标点符号、否定词、原始大小写(后续统一小写)
- 不做:词形还原、词干提取、停用词删除
Args:
text: 原始文本
Returns:
str: 清洗后的文本
"""
if not text or not isinstance(text, str):
return ""
# 1. 移除用户提及 (@username)
text = re.sub(r'@\w+', '', text)
# 2. 移除 URL 链接
text = re.sub(r'http\S+|www\S+', '', text)
# 3. 移除多余空格和换行
text = re.sub(r'\s+', ' ', text).strip()
return text
def normalize_text(text: str) -> str:
"""文本标准化
统一小写,但不进行词形还原或词干提取。
Args:
text: 清洗后的文本
Returns:
str: 标准化后的文本
"""
if not text or not isinstance(text, str):
return ""
# 仅统一小写
return text.lower()
# --- 数据加载与预处理 ---
def load_tweets(file_path: str | Path = "Tweets.csv") -> pl.DataFrame:
"""加载原始推文数据
Args:
file_path: CSV 文件路径
Returns:
pl.DataFrame: 原始推文数据
"""
df = pl.read_csv(file_path)
return df
def validate_raw_tweets(df: pl.DataFrame) -> pl.DataFrame:
"""验证原始推文数据结构(清洗前)
Args:
df: 原始 Polars DataFrame
Returns:
pl.DataFrame: 验证通过的 DataFrame
Raises:
pa.errors.SchemaError: 验证失败
"""
return RawTweetSchema.validate(df)
def validate_clean_tweets(df: pl.DataFrame) -> pl.DataFrame:
"""验证清洗后推文数据(严格模式)
Args:
df: 清洗后的 Polars DataFrame
Returns:
pl.DataFrame: 验证通过的 DataFrame
Raises:
pa.errors.SchemaError: 验证失败
"""
return CleanTweetSchema.validate(df)
def preprocess_tweets(
df: pl.DataFrame,
validate: bool = True,
min_confidence: float = 0.5
) -> pl.DataFrame:
"""推文数据预处理流水线
处理步骤:
1. 筛选:仅保留情感置信度 >= min_confidence 的样本
2. 文本清洗:应用 clean_text 和 normalize_text
3. 删除缺失值:删除 text 为空的样本
4. 删除重复行:基于 tweet_id 去重
5. 可选:进行 Schema 校验
Args:
df: 原始 Polars DataFrame
validate: 是否进行清洗后 Schema 校验
min_confidence: 最低情感置信度阈值
Returns:
pl.DataFrame: 清洗后的 DataFrame
"""
# 1. 筛选高置信度样本
df_filtered = df.filter(
pl.col("airline_sentiment_confidence") >= min_confidence
)
# 2. 文本清洗和标准化
df_clean = df_filtered.with_columns([
pl.col("text").map_elements(clean_text, return_dtype=pl.String).alias("text_cleaned"),
pl.col("text").alias("text_original"),
])
df_clean = df_clean.with_columns([
pl.col("text_cleaned").map_elements(normalize_text, return_dtype=pl.String).alias("text_cleaned"),
])
# 3. 删除缺失值text_cleaned 为空或 airline_sentiment 为空)
df_clean = df_clean.filter(
(pl.col("text_cleaned").is_not_null()) &
(pl.col("text_cleaned") != "") &
(pl.col("airline_sentiment").is_not_null())
)
# 4. 删除重复行(基于 tweet_id
df_clean = df_clean.unique(subset=["tweet_id"], keep="first")
# 5. 选择需要的列
df_clean = df_clean.select([
"tweet_id",
"airline_sentiment",
"airline_sentiment_confidence",
"negativereason",
"negativereason_confidence",
"airline",
"text_cleaned",
"text_original",
])
# 6. 可选校验
if validate:
df_clean = validate_clean_tweets(df_clean)
return df_clean
def save_cleaned_tweets(df: pl.DataFrame, output_path: str | Path = "data/Tweets_cleaned.csv") -> None:
"""保存清洗后的数据
Args:
df: 清洗后的 Polars DataFrame
output_path: 输出文件路径
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
df.write_csv(output_path)
print(f"清洗后的数据已保存至 {output_path}")
def load_cleaned_tweets(file_path: str | Path = "data/Tweets_cleaned.csv") -> pl.DataFrame:
"""加载清洗后的推文数据
Args:
file_path: 清洗后的 CSV 文件路径
Returns:
pl.DataFrame: 清洗后的推文数据
"""
df = pl.read_csv(file_path)
return df
# --- 数据统计与分析 ---
def print_data_summary(df: pl.DataFrame, title: str = "数据统计") -> None:
"""打印数据摘要信息
Args:
df: Polars DataFrame
title: 标题
"""
print(f"\n{'='*60}")
print(f"{title}")
print(f"{'='*60}")
print(f"样本总数: {len(df)}")
print(f"\n情感分布:")
print(df.group_by("airline_sentiment").agg(
pl.len().alias("count"),
(pl.len() / len(df) * 100).alias("percentage")
).sort("count", descending=True))
print(f"\n航空公司分布:")
print(df.group_by("airline").agg(
pl.len().alias("count"),
(pl.len() / len(df) * 100).alias("percentage")
).sort("count", descending=True))
print(f"\n文本长度统计:")
df_with_length = df.with_columns([
pl.col("text_cleaned").str.len_chars().alias("text_length")
])
print(df_with_length.select([
pl.col("text_length").min().alias("最小长度"),
pl.col("text_length").max().alias("最大长度"),
pl.col("text_length").mean().alias("平均长度"),
pl.col("text_length").median().alias("中位数长度"),
]))
if __name__ == "__main__":
print(">>> 1. 加载原始数据")
df_raw = load_tweets("Tweets.csv")
print(f"原始数据样本数: {len(df_raw)}")
print(df_raw.head(3))
print("\n>>> 2. 验证原始数据")
df_validated = validate_raw_tweets(df_raw)
print("✅ 原始数据验证通过")
print("\n>>> 3. 清洗数据")
df_clean = preprocess_tweets(df_validated, validate=True, min_confidence=0.5)
print(f"清洗后样本数: {len(df_clean)} (原始: {len(df_raw)})")
print("✅ 清洗后数据验证通过")
print("\n>>> 4. 保存清洗后的数据")
save_cleaned_tweets(df_clean, "data/Tweets_cleaned.csv")
print("\n>>> 5. 数据统计")
print_data_summary(df_clean, "清洗后数据统计")
print("\n>>> 6. 清洗示例对比")
print("\n原始文本:")
print(df_clean.select("text_original").head(3).to_pandas()["text_original"].to_string(index=False))
print("\n清洗后文本:")
print(df_clean.select("text_cleaned").head(3).to_pandas()["text_cleaned"].to_string(index=False))