上传文件至 src

This commit is contained in:
张则文 2026-01-15 16:51:24 +08:00
parent eda22ba4f0
commit e38df4c89b

315
src/tweet_data.py Normal file
View File

@ -0,0 +1,315 @@
"""文本数据清洗模块
针对 Tweets.csv 航空情感分析数据集的文本清洗
遵循克制原则仅进行必要的预处理保留文本语义信息
清洗策略
1. 文本标准化统一小写不进行词形还原/词干提取保留原始语义
2. 去除噪声移除用户提及(@username)URL链接多余空格
3. 保留语义保留表情符号标点符号它们对情感分析有价值
4. 最小化处理不进行停用词删除否定词如"not""don't"对情感很重要
"""
import re
from pathlib import Path
import pandera.polars as pa
import polars as pl
# --- Pandera Schema 定义 ---
class RawTweetSchema(pa.DataFrameModel):
"""原始推文数据 Schema清洗前校验
允许缺失值存在用于验证数据读取后的基本结构
"""
tweet_id: int = pa.Field(nullable=False)
airline_sentiment: str = pa.Field(nullable=True)
airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=True)
negativereason: str = pa.Field(nullable=True)
negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True)
airline: str = pa.Field(nullable=True)
text: str = pa.Field(nullable=True)
tweet_coord: str = pa.Field(nullable=True)
tweet_created: str = pa.Field(nullable=True)
tweet_location: str = pa.Field(nullable=True)
user_timezone: str = pa.Field(nullable=True)
class Config:
strict = False
coerce = True
class CleanTweetSchema(pa.DataFrameModel):
"""清洗后推文数据 Schema严格模式
不允许缺失值强制约束检查
"""
tweet_id: int = pa.Field(nullable=False)
airline_sentiment: str = pa.Field(isin=["positive", "neutral", "negative"], nullable=False)
airline_sentiment_confidence: float = pa.Field(ge=0, le=1, nullable=False)
negativereason: str = pa.Field(nullable=True)
negativereason_confidence: float = pa.Field(ge=0, le=1, nullable=True)
airline: str = pa.Field(isin=["Virgin America", "United", "Southwest", "Delta", "US Airways", "American"], nullable=False)
text_cleaned: str = pa.Field(nullable=False)
text_original: str = pa.Field(nullable=False)
class Config:
strict = True
coerce = True
# --- 文本清洗函数 ---
def clean_text(text: str) -> str:
"""文本清洗函数(克制策略)
清洗原则
- 移除用户提及(@username)URL链接多余空格
- 保留表情符号标点符号否定词原始大小写后续统一小写
- 不做词形还原词干提取停用词删除
Args:
text: 原始文本
Returns:
str: 清洗后的文本
"""
if not text or not isinstance(text, str):
return ""
# 1. 移除用户提及 (@username)
text = re.sub(r'@\w+', '', text)
# 2. 移除 URL 链接
text = re.sub(r'http\S+|www\S+', '', text)
# 3. 移除多余空格和换行
text = re.sub(r'\s+', ' ', text).strip()
return text
def normalize_text(text: str) -> str:
"""文本标准化
统一小写但不进行词形还原或词干提取
Args:
text: 清洗后的文本
Returns:
str: 标准化后的文本
"""
if not text or not isinstance(text, str):
return ""
# 仅统一小写
return text.lower()
# --- 数据加载与预处理 ---
def load_tweets(file_path: str | Path = "Tweets.csv") -> pl.DataFrame:
"""加载原始推文数据
Args:
file_path: CSV 文件路径
Returns:
pl.DataFrame: 原始推文数据
"""
df = pl.read_csv(file_path)
return df
def validate_raw_tweets(df: pl.DataFrame) -> pl.DataFrame:
"""验证原始推文数据结构(清洗前)
Args:
df: 原始 Polars DataFrame
Returns:
pl.DataFrame: 验证通过的 DataFrame
Raises:
pa.errors.SchemaError: 验证失败
"""
return RawTweetSchema.validate(df)
def validate_clean_tweets(df: pl.DataFrame) -> pl.DataFrame:
"""验证清洗后推文数据(严格模式)
Args:
df: 清洗后的 Polars DataFrame
Returns:
pl.DataFrame: 验证通过的 DataFrame
Raises:
pa.errors.SchemaError: 验证失败
"""
return CleanTweetSchema.validate(df)
def preprocess_tweets(
df: pl.DataFrame,
validate: bool = True,
min_confidence: float = 0.5
) -> pl.DataFrame:
"""推文数据预处理流水线
处理步骤
1. 筛选仅保留情感置信度 >= min_confidence 的样本
2. 文本清洗应用 clean_text normalize_text
3. 删除缺失值删除 text 为空的样本
4. 删除重复行基于 tweet_id 去重
5. 可选进行 Schema 校验
Args:
df: 原始 Polars DataFrame
validate: 是否进行清洗后 Schema 校验
min_confidence: 最低情感置信度阈值
Returns:
pl.DataFrame: 清洗后的 DataFrame
"""
# 1. 筛选高置信度样本
df_filtered = df.filter(
pl.col("airline_sentiment_confidence") >= min_confidence
)
# 2. 文本清洗和标准化
df_clean = df_filtered.with_columns([
pl.col("text").map_elements(clean_text, return_dtype=pl.String).alias("text_cleaned"),
pl.col("text").alias("text_original"),
])
df_clean = df_clean.with_columns([
pl.col("text_cleaned").map_elements(normalize_text, return_dtype=pl.String).alias("text_cleaned"),
])
# 3. 删除缺失值text_cleaned 为空或 airline_sentiment 为空)
df_clean = df_clean.filter(
(pl.col("text_cleaned").is_not_null()) &
(pl.col("text_cleaned") != "") &
(pl.col("airline_sentiment").is_not_null())
)
# 4. 删除重复行(基于 tweet_id
df_clean = df_clean.unique(subset=["tweet_id"], keep="first")
# 5. 选择需要的列
df_clean = df_clean.select([
"tweet_id",
"airline_sentiment",
"airline_sentiment_confidence",
"negativereason",
"negativereason_confidence",
"airline",
"text_cleaned",
"text_original",
])
# 6. 可选校验
if validate:
df_clean = validate_clean_tweets(df_clean)
return df_clean
def save_cleaned_tweets(df: pl.DataFrame, output_path: str | Path = "data/Tweets_cleaned.csv") -> None:
"""保存清洗后的数据
Args:
df: 清洗后的 Polars DataFrame
output_path: 输出文件路径
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
df.write_csv(output_path)
print(f"清洗后的数据已保存至 {output_path}")
def load_cleaned_tweets(file_path: str | Path = "data/Tweets_cleaned.csv") -> pl.DataFrame:
"""加载清洗后的推文数据
Args:
file_path: 清洗后的 CSV 文件路径
Returns:
pl.DataFrame: 清洗后的推文数据
"""
df = pl.read_csv(file_path)
return df
# --- 数据统计与分析 ---
def print_data_summary(df: pl.DataFrame, title: str = "数据统计") -> None:
"""打印数据摘要信息
Args:
df: Polars DataFrame
title: 标题
"""
print(f"\n{'='*60}")
print(f"{title}")
print(f"{'='*60}")
print(f"样本总数: {len(df)}")
print(f"\n情感分布:")
print(df.group_by("airline_sentiment").agg(
pl.len().alias("count"),
(pl.len() / len(df) * 100).alias("percentage")
).sort("count", descending=True))
print(f"\n航空公司分布:")
print(df.group_by("airline").agg(
pl.len().alias("count"),
(pl.len() / len(df) * 100).alias("percentage")
).sort("count", descending=True))
print(f"\n文本长度统计:")
df_with_length = df.with_columns([
pl.col("text_cleaned").str.len_chars().alias("text_length")
])
print(df_with_length.select([
pl.col("text_length").min().alias("最小长度"),
pl.col("text_length").max().alias("最大长度"),
pl.col("text_length").mean().alias("平均长度"),
pl.col("text_length").median().alias("中位数长度"),
]))
if __name__ == "__main__":
print(">>> 1. 加载原始数据")
df_raw = load_tweets("Tweets.csv")
print(f"原始数据样本数: {len(df_raw)}")
print(df_raw.head(3))
print("\n>>> 2. 验证原始数据")
df_validated = validate_raw_tweets(df_raw)
print("✅ 原始数据验证通过")
print("\n>>> 3. 清洗数据")
df_clean = preprocess_tweets(df_validated, validate=True, min_confidence=0.5)
print(f"清洗后样本数: {len(df_clean)} (原始: {len(df_raw)})")
print("✅ 清洗后数据验证通过")
print("\n>>> 4. 保存清洗后的数据")
save_cleaned_tweets(df_clean, "data/Tweets_cleaned.csv")
print("\n>>> 5. 数据统计")
print_data_summary(df_clean, "清洗后数据统计")
print("\n>>> 6. 清洗示例对比")
print("\n原始文本:")
print(df_clean.select("text_original").head(3).to_pandas()["text_original"].to_string(index=False))
print("\n清洗后文本:")
print(df_clean.select("text_cleaned").head(3).to_pandas()["text_cleaned"].to_string(index=False))