B2. 预测模型与智能决策 | Prediction Models & Intelligent Decision
路径: Path B: 技术人 · 模块: B2 最后更新: 2026-03-12 难度: 中级 → 进阶 前提: B1 数据管道基础(pandas、数据清洗)、Python 基础 预计时间: 每天 1 小时,2-3 周
flowchart LR
B1["B1 数据管道"]
B1 --> B2
B2[" B2 预测模型<br/>(当前)"]:::current
B2 --> B3
B3["B3 RAG 知识库"]
B3 --> B4
B4["B4 Agent 工作流"]
B4 --> B5
B5["B5 本地模型部署"]
classDef current fill:#ff9900,stroke:#333,color:#fff,font-weight:bold
本模块章节导航
本模块你将构建
销量预测模型 + Review 主题分析系统。
完成本模块后,你将能够:
- 用 Prophet 对 SKU 做 30/60/90 天销量预测,输出预测值和置信区间
- 理解时间序列预测的核心原理(趋势 + 季节性 + 噪声分解)
- 处理电商预测的特殊挑战:大促尖峰、新品冷启动、竞品影响
- 用 AutoGluon 零配置建模,自动选择最优算法
- 用 BERTopic 从 Review 文本中自动发现主题和情感趋势
- 将预测结果转化为补货决策(连接 A5 库存模块)
- 用 MAPE/MAE/RMSE 评估模型质量,用回测验证预测可靠性
1. 预测方法论
相关阅读: A5 库存与供应链 销量预测在库存补货决策中的应用详见 A5。 · D3 跨平台 AI 协同策略 跨平台需求预测详见 D3
1.1 时间序列预测的第一性原理
任何时间序列数据都可以分解为三个成分:
观测值 = 趋势(Trend) + 季节性(Seasonality) + 噪声(Residual)
| 成分 | 含义 | 电商场景举例 |
|---|---|---|
| 趋势 | 长期上升或下降方向 | 新品上市后销量逐月增长;老品进入衰退期 |
| 季节性 | 固定周期的重复模式 | 每周一销量最高(周末下单周一到货);Q4 旺季 |
| 噪声 | 无法解释的随机波动 | 某天突然被网红推荐,销量暴涨一天后恢复 |
加法模型 vs 乘法模型:
- 加法模型:
y = trend + seasonality + noise季节性波动幅度固定(如每年 Q4 多卖 1000 件) - 乘法模型:
y = trend × seasonality × noise季节性波动幅度随趋势变化(如每年 Q4 多卖 30%)
电商场景通常用乘法模型更准确,因为销量基数越大,季节性波动的绝对值也越大。
分解可视化:
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams["font.sans-serif"] = ["PingFang SC", "Heiti TC", "Arial"]
matplotlib.rcParams["axes.unicode_minus"] = False
def decompose_sales(df: pd.DataFrame, date_col: str = "date", value_col: str = "units"):
"""
将销量时间序列分解为趋势、季节性、噪声三个成分。
Args:
df: 包含日期和销量的 DataFrame
date_col: 日期列名
value_col: 销量列名
"""
ts = df.set_index(date_col)[value_col]
ts = ts.asfreq("D").fillna(method="ffill") # 补齐缺失日期
# 乘法分解,周期=7(周季节性)
result = seasonal_decompose(ts, model="multiplicative", period=7)
fig, axes = plt.subplots(4, 1, figsize=(12, 8), sharex=True)
result.observed.plot(ax=axes[0], title="原始数据 (Observed)")
result.trend.plot(ax=axes[1], title="趋势 (Trend)")
result.seasonal.plot(ax=axes[2], title="季节性 (Seasonality)")
result.resid.plot(ax=axes[3], title="噪声 (Residual)")
plt.tight_layout()
plt.savefig("output/decomposition.png", dpi=150)
plt.show()
return result
# 使用示例
# df = pd.read_csv("data/daily_sales.csv")
# result = decompose_sales(df, date_col="date", value_col="units")
关键洞察:如果分解后的“噪声“成分仍然有明显的模式(比如每逢大促就有巨大残差),说明你的模型缺少对大促事件的建模。这正是 Prophet 的
holidays参数要解决的问题。
1.2 电商预测的特殊挑战
电商销量预测比传统零售更难,因为有几个独特的干扰因素:
| 挑战 | 表现 | 影响 | 应对策略 |
|---|---|---|---|
| 大促尖峰 | Prime Day/BFCM 销量暴涨 5-20 倍 | 模型被极端值带偏 | 将大促作为特殊事件建模(Prophet holidays) |
| 新品冷启动 | 新 ASIN 无历史数据 | 无法用时间序列预测 | 用类似品的销量曲线做类比预测 |
| 竞品影响 | 竞品降价/断货导致自己销量突变 | 外部因素无法从自身数据中学到 | 添加外部回归变量(竞品价格、BSR) |
| 广告依赖 | 停广告后销量断崖下跌 | 自然销量和广告销量混在一起 | 分离自然流量和广告流量分别预测 |
| 库存约束 | 断货期间销量为 0(非真实需求) | 历史数据中的 0 不代表“没有需求“ | 断货期数据需要特殊处理或剔除 |
| 季节性叠加 | 周季节性 + 月季节性 + 年季节性同时存在 | 单一周期模型不够 | Prophet 支持多重季节性自动建模 |
断货数据处理(关键!):
def handle_stockout(df: pd.DataFrame, units_col: str = "units") -> pd.DataFrame:
"""
处理断货期间的零销量数据。
断货期间销量为 0 不代表需求为 0。
策略:用断货前后的平均销量填充,避免模型学到"某些天需求为 0"。
"""
df = df.copy()
# 标记连续零销量(可能是断货)
df["is_zero"] = df[units_col] == 0
df["zero_streak"] = (
df["is_zero"]
.groupby((~df["is_zero"]).cumsum())
.cumsum()
)
# 连续 3 天以上零销量视为断货(而非真实零需求)
stockout_mask = df["zero_streak"] >= 3
if stockout_mask.any():
# 用前后各 7 天的非零均值填充
rolling_mean = (
df[~stockout_mask][units_col]
.rolling(window=7, min_periods=1)
.mean()
)
df.loc[stockout_mask, units_col] = rolling_mean.reindex(
df.index
).ffill().bfill()
stockout_days = stockout_mask.sum()
print(f" 检测到 {stockout_days} 天疑似断货,已用滚动均值填充")
df = df.drop(columns=["is_zero", "zero_streak"])
return df
1.3 何时用简单规则 vs 何时用 ML 模型
不是所有预测都需要机器学习。选择合适的方法比选择最复杂的方法更重要:
| 场景 | 推荐方法 | 理由 |
|---|---|---|
| 稳定老品,历史 >1 年 | Prophet / 指数平滑 | 数据充足,时间序列模型效果好 |
| 新品上市 <3 个月 | 类比法 + 人工调整 | 数据不足,ML 模型会过拟合 |
| 大促备货 | 去年同期 × 增长系数 + 人工调整 | 大促是非常规事件,历史样本太少 |
| 多 SKU 批量预测 | AutoGluon / LightGBM | 自动化程度高,适合批量处理 |
| 需要解释性 | Prophet | 可分解为趋势+季节性,业务人员能理解 |
| 追求精度 | 集成方法(多模型加权) | 单一模型有偏差,集成可以互补 |
决策框架:
历史数据 > 1 年?
是 → 销量稳定?
是 → Prophet(简单高效)
否 → Prophet + 外部变量 / AutoGluon
否 → 历史数据 > 3 个月?
是 → Prophet(短周期)/ 移动平均
否 → 类比法(找相似品历史数据)
2. 工具全景
| 工具 | 类型 | 难度 | 最佳场景 | 安装 |
|---|---|---|---|---|
| Prophet | 时间序列 | 入门 | 单 SKU 销量预测,最易上手 | pip install prophet |
| Darts | 时间序列 | 中级 | 需要对比多种模型 | pip install darts |
| AutoGluon | 自动化 ML | 入门 | 零 ML 知识批量建模 | pip install autogluon.timeseries |
| BERTopic | NLP 主题建模 | 中级 | Review 文本主题发现 | pip install bertopic |
| OR-Tools | 运筹优化 | 进阶 | 补货策略优化 | pip install ortools |
| scikit-learn | 通用 ML | 中级 | 特征工程、回归、分类 | pip install scikit-learn |
选择建议:
- 刚入门 → 从 Prophet 开始,一个下午就能出结果
- 想要自动化 → AutoGluon,零配置自动选最优模型
- 需要 Review 分析 → BERTopic,自动发现评论主题
- 需要优化补货量 → OR-Tools,数学规划求最优解
3. 代码实战
3.1 Prophet 销量预测(完整流程)
Prophet 是 Meta 开源的时间序列预测库,专为业务预测设计。它的核心优势:
- 自动处理缺失值和异常值
- 内置节假日效应
- 可解释的分解结果(趋势 + 季节性 + 节假日)
- 对非专业人员友好
完整代码:数据准备 → 训练 → 预测 → 评估 → 可视化
import pandas as pd
import numpy as np
from prophet import Prophet
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams["font.sans-serif"] = ["PingFang SC", "Heiti TC", "Arial"]
matplotlib.rcParams["axes.unicode_minus"] = False
# ============================================================
# Step 1: 数据准备
# ============================================================
def prepare_prophet_data(
df: pd.DataFrame,
date_col: str = "date",
value_col: str = "units"
) -> pd.DataFrame:
"""
将业务数据转换为 Prophet 要求的格式。
Prophet 要求两列:
- ds: 日期列(datetime 类型)
- y: 目标值列(数值类型)
Args:
df: 原始数据
date_col: 日期列名
value_col: 目标值列名
Returns:
Prophet 格式的 DataFrame
"""
prophet_df = df[[date_col, value_col]].copy()
prophet_df.columns = ["ds", "y"]
# 确保日期格式正确
prophet_df["ds"] = pd.to_datetime(prophet_df["ds"])
prophet_df["y"] = pd.to_numeric(prophet_df["y"], errors="coerce")
# 按日期排序并去重(同一天多条记录则求和)
prophet_df = prophet_df.groupby("ds")["y"].sum().reset_index()
prophet_df = prophet_df.sort_values("ds").reset_index(drop=True)
# 补齐缺失日期(用 0 填充,后续可用断货处理逻辑替换)
date_range = pd.date_range(
start=prophet_df["ds"].min(),
end=prophet_df["ds"].max(),
freq="D"
)
prophet_df = (
prophet_df
.set_index("ds")
.reindex(date_range)
.fillna(0)
.reset_index()
.rename(columns={"index": "ds"})
)
print(f" 数据准备完成: {len(prophet_df)} 天")
print(f" 日期范围: {prophet_df['ds'].min().date()} → {prophet_df['ds'].max().date()}")
print(f" 日均销量: {prophet_df['y'].mean():.1f}")
return prophet_df
# ============================================================
# Step 2: 训练模型
# ============================================================
def train_prophet(
df: pd.DataFrame,
yearly: bool = True,
weekly: bool = True,
daily: bool = False,
changepoint_prior: float = 0.05
) -> Prophet:
"""
训练 Prophet 模型。
Args:
df: Prophet 格式数据(ds, y 两列)
yearly: 是否启用年季节性
weekly: 是否启用周季节性
daily: 是否启用日季节性(通常不需要)
changepoint_prior: 趋势变化点灵敏度
- 值越大,模型越容易捕捉趋势变化(但可能过拟合)
- 值越小,趋势越平滑(但可能欠拟合)
- 默认 0.05,电商场景建议 0.1-0.3(变化较快)
Returns:
训练好的 Prophet 模型
"""
model = Prophet(
yearly_seasonality=yearly,
weekly_seasonality=weekly,
daily_seasonality=daily,
changepoint_prior_scale=changepoint_prior,
interval_width=0.8, # 80% 置信区间
)
model.fit(df)
print(" 模型训练完成")
return model
# ============================================================
# Step 3: 生成预测
# ============================================================
def make_forecast(
model: Prophet,
periods: int = 90,
freq: str = "D"
) -> pd.DataFrame:
"""
生成未来 N 天的预测。
Args:
model: 训练好的 Prophet 模型
periods: 预测天数
freq: 频率(D=天, W=周, M=月)
Returns:
预测结果 DataFrame,包含:
- ds: 日期
- yhat: 预测值
- yhat_lower: 预测下界
- yhat_upper: 预测上界
- trend: 趋势成分
- weekly: 周季节性成分
- yearly: 年季节性成分
"""
future = model.make_future_dataframe(periods=periods, freq=freq)
forecast = model.predict(future)
# 预测值不能为负(销量最小为 0)
forecast["yhat"] = forecast["yhat"].clip(lower=0)
forecast["yhat_lower"] = forecast["yhat_lower"].clip(lower=0)
print(f" 预测完成: 未来 {periods} 天")
print(f" 预测均值: {forecast['yhat'].tail(periods).mean():.1f}")
print(f" 预测区间: [{forecast['yhat_lower'].tail(periods).mean():.1f}, "
f"{forecast['yhat_upper'].tail(periods).mean():.1f}]")
return forecast
# ============================================================
# Step 4: 可视化
# ============================================================
def plot_forecast(
model: Prophet,
forecast: pd.DataFrame,
actual_df: pd.DataFrame = None,
title: str = "SKU 销量预测"
):
"""
绘制预测结果图。
Args:
model: Prophet 模型
forecast: 预测结果
actual_df: 实际数据(用于对比)
title: 图表标题
"""
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# 图 1: 预测 vs 实际
ax1 = axes[0]
ax1.plot(forecast["ds"], forecast["yhat"], color="#1a73e8", label="预测值")
ax1.fill_between(
forecast["ds"],
forecast["yhat_lower"],
forecast["yhat_upper"],
alpha=0.2, color="#1a73e8", label="80% 置信区间"
)
if actual_df is not None:
ax1.scatter(
actual_df["ds"], actual_df["y"],
color="#333", s=10, alpha=0.5, label="实际值"
)
ax1.set_title(title, fontsize=14, fontweight="bold")
ax1.set_ylabel("销量 (Units)")
ax1.legend()
ax1.grid(True, alpha=0.3)
# 图 2: 成分分解
ax2 = axes[1]
ax2.plot(forecast["ds"], forecast["trend"], label="趋势", color="#e8710a")
if "weekly" in forecast.columns:
ax2_twin = ax2.twinx()
weekly_data = forecast.drop_duplicates(subset=["ds"]).tail(90)
ax2_twin.plot(
weekly_data["ds"], weekly_data["weekly"],
label="周季节性", color="#0d652d", alpha=0.7
)
ax2_twin.set_ylabel("周季节性")
ax2.set_title("趋势分解", fontsize=14, fontweight="bold")
ax2.set_ylabel("趋势")
ax2.legend(loc="upper left")
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig("output/forecast.png", dpi=150, bbox_inches="tight")
plt.show()
print(" 图表已保存: output/forecast.png")
# ============================================================
# 完整使用示例
# ============================================================
# # 1. 加载数据
# raw_df = pd.read_csv("data/daily_sales.csv")
# prophet_df = prepare_prophet_data(raw_df, date_col="date", value_col="units")
#
# # 2. 处理断货数据
# prophet_df = handle_stockout(prophet_df, units_col="y")
#
# # 3. 训练模型
# model = train_prophet(prophet_df, changepoint_prior=0.1)
#
# # 4. 预测未来 90 天
# forecast = make_forecast(model, periods=90)
#
# # 5. 可视化
# plot_forecast(model, forecast, actual_df=prophet_df, title="ASIN-B0XXXXX 销量预测")
changepoint_prior_scale 调参指南:这是 Prophet 最重要的超参数。电商数据变化快,建议从 0.1 开始尝试。如果预测曲线太平滑(跟不上趋势变化),调大到 0.2-0.3;如果预测曲线太波动(过拟合噪声),调小到 0.01-0.05。
3.2 Prophet 进阶:节假日效应与外部变量
基础 Prophet 模型忽略了电商最重要的因素:大促事件。添加节假日效应可以显著提升预测精度。
添加电商大促事件:
def create_ecommerce_holidays(years: list[int]) -> pd.DataFrame:
"""
创建电商大促日历。
Prophet 的 holidays 参数接受一个 DataFrame,包含:
- holiday: 事件名称
- ds: 事件日期
- lower_window: 事件前影响天数(负数)
- upper_window: 事件后影响天数
"""
holidays = []
for year in years:
# Prime Day(通常 7 月中旬,持续 2 天)
holidays.append({
"holiday": "prime_day",
"ds": f"{year}-07-12",
"lower_window": -3, # 提前 3 天开始有影响(预热期)
"upper_window": 2, # 结束后 2 天仍有余波
})
# Black Friday(11 月第四个周五)
# 简化处理:固定 11-24 附近
holidays.append({
"holiday": "black_friday",
"ds": f"{year}-11-24",
"lower_window": -7, # BFCM 周提前一周开始
"upper_window": 3, # Cyber Monday 后几天
})
# Cyber Monday
holidays.append({
"holiday": "cyber_monday",
"ds": f"{year}-11-27",
"lower_window": 0,
"upper_window": 1,
})
# 双十一(对中国卖家有影响)
holidays.append({
"holiday": "singles_day",
"ds": f"{year}-11-11",
"lower_window": -3,
"upper_window": 1,
})
# 圣诞节前购物季
holidays.append({
"holiday": "christmas_shopping",
"ds": f"{year}-12-15",
"lower_window": -5,
"upper_window": 10,
})
# 新年后低谷期
holidays.append({
"holiday": "post_newyear_dip",
"ds": f"{year}-01-05",
"lower_window": -5,
"upper_window": 10,
})
return pd.DataFrame(holidays)
def train_prophet_with_holidays(
df: pd.DataFrame,
holidays: pd.DataFrame = None,
changepoint_prior: float = 0.1
) -> Prophet:
"""
训练带节假日效应的 Prophet 模型。
"""
if holidays is None:
years = list(range(
df["ds"].dt.year.min(),
df["ds"].dt.year.max() + 2 # 包含预测年份
))
holidays = create_ecommerce_holidays(years)
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
changepoint_prior_scale=changepoint_prior,
holidays=holidays,
holidays_prior_scale=10.0, # 节假日效应的灵敏度
interval_width=0.8,
)
model.fit(df)
print(f" 模型训练完成(含 {len(holidays)} 个节假日事件)")
return model
添加外部回归变量(广告花费、竞品价格):
def train_prophet_with_regressors(
df: pd.DataFrame,
regressor_cols: list[str] = None
) -> Prophet:
"""
训练带外部回归变量的 Prophet 模型。
外部变量可以是:
- ad_spend: 广告花费(广告投入越多,销量越高)
- competitor_price: 竞品价格(竞品涨价,自己销量可能上升)
- bsr_rank: BSR 排名(排名越高,曝光越多)
- coupon_active: 是否有优惠券(0/1)
注意:预测时也需要提供未来的外部变量值!
"""
years = list(range(
df["ds"].dt.year.min(),
df["ds"].dt.year.max() + 2
))
holidays = create_ecommerce_holidays(years)
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
changepoint_prior_scale=0.1,
holidays=holidays,
interval_width=0.8,
)
# 添加外部回归变量
regressor_cols = regressor_cols or []
for col in regressor_cols:
if col in df.columns:
model.add_regressor(col, standardize=True)
print(f" 添加回归变量: {col}")
model.fit(df)
print(" 模型训练完成(含外部变量)")
return model
def forecast_with_regressors(
model: Prophet,
periods: int = 90,
future_regressors: pd.DataFrame = None
) -> pd.DataFrame:
"""
带外部变量的预测。
Args:
model: 训练好的模型
periods: 预测天数
future_regressors: 未来的外部变量值
如果不提供,用历史均值填充(不推荐,会降低精度)
"""
future = model.make_future_dataframe(periods=periods)
# 合并未来的外部变量
if future_regressors is not None:
future = future.merge(future_regressors, on="ds", how="left")
# 对缺失的外部变量用历史均值填充
for col in future.columns:
if col not in ["ds"] and future[col].isna().any():
fill_value = future[col].dropna().mean()
future[col] = future[col].fillna(fill_value)
print(f" {col} 有缺失值,用均值 {fill_value:.2f} 填充")
forecast = model.predict(future)
forecast["yhat"] = forecast["yhat"].clip(lower=0)
forecast["yhat_lower"] = forecast["yhat_lower"].clip(lower=0)
return forecast
# 使用示例
# df = prepare_prophet_data(raw_df)
# df["ad_spend"] = ad_data["spend"] # 合并广告花费数据
# df["competitor_price"] = competitor_data["price"] # 合并竞品价格
#
# model = train_prophet_with_regressors(df, regressor_cols=["ad_spend", "competitor_price"])
#
# # 预测时需要提供未来的广告预算和竞品价格预估
# future_regs = pd.DataFrame({
# "ds": pd.date_range("2025-04-01", periods=90, freq="D"),
# "ad_spend": [500] * 90, # 假设未来每天广告预算 $500
# "competitor_price": [29.99] * 90, # 假设竞品价格不变
# })
# forecast = forecast_with_regressors(model, periods=90, future_regressors=future_regs)
外部变量的陷阱:预测时你需要提供未来的外部变量值。如果你不知道未来的广告预算,那添加
ad_spend作为回归变量反而会降低预测精度。只添加你能合理预估未来值的变量。
3.3 AutoGluon 自动化预测(零配置建模)
AutoGluon 是 Amazon 开源的自动化机器学习框架。它的时间序列模块可以自动尝试多种模型(Prophet、ETS、DeepAR、Theta 等),选择最优的一个。适合不想手动调参的场景。
from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor
def autogluon_forecast(
df: pd.DataFrame,
date_col: str = "date",
value_col: str = "units",
item_col: str = "asin",
prediction_length: int = 30,
time_limit: int = 300
) -> pd.DataFrame:
"""
用 AutoGluon 自动化预测多个 SKU 的销量。
AutoGluon 的优势:
- 零配置:不需要选模型、调参数
- 多 SKU:一次训练,同时预测所有 SKU
- 自动集成:自动尝试多种模型并集成最优结果
Args:
df: 包含日期、销量、SKU 标识的 DataFrame
date_col: 日期列名
value_col: 目标值列名
item_col: SKU 标识列名
prediction_length: 预测天数
time_limit: 训练时间限制(秒)
Returns:
预测结果 DataFrame
"""
# 1. 转换为 AutoGluon 格式
ag_df = df.rename(columns={
date_col: "timestamp",
value_col: "target",
item_col: "item_id"
})
ag_df["timestamp"] = pd.to_datetime(ag_df["timestamp"])
ts_df = TimeSeriesDataFrame.from_data_frame(
ag_df,
id_column="item_id",
timestamp_column="timestamp"
)
print(f" 数据: {ts_df.num_items} 个 SKU, "
f"{len(ts_df)} 条记录")
# 2. 训练(AutoGluon 自动选择最优模型)
predictor = TimeSeriesPredictor(
prediction_length=prediction_length,
target="target",
eval_metric="MAPE", # 用 MAPE 作为评估指标
)
predictor.fit(
train_data=ts_df,
time_limit=time_limit, # 限制训练时间
presets="medium_quality", # fast / medium / high / best
)
# 3. 查看模型排行榜
leaderboard = predictor.leaderboard(ts_df)
print("\n 模型排行榜:")
print(leaderboard[["model", "score_val"]].to_string(index=False))
# 4. 生成预测
predictions = predictor.predict(ts_df)
print(f"\n 预测完成: {ts_df.num_items} 个 SKU × {prediction_length} 天")
return predictions
# 使用示例
# df = pd.read_csv("data/daily_sales_all_skus.csv")
# predictions = autogluon_forecast(
# df,
# date_col="date",
# value_col="units",
# item_col="asin",
# prediction_length=30,
# time_limit=600 # 10 分钟
# )
#
# # 查看某个 SKU 的预测
# sku_pred = predictions.loc["B0XXXXX"]
# print(sku_pred)
AutoGluon vs Prophet 怎么选?
- 单个 SKU 深度分析 → Prophet(可解释性强,可以添加节假日和外部变量)
- 批量预测 100+ SKU → AutoGluon(自动化程度高,一次搞定)
- 不确定用什么 → 先用 AutoGluon 跑一遍基线,再用 Prophet 对重点 SKU 精调
3.4 BERTopic Review 主题分析
BERTopic 可以从大量 Review 文本中自动发现主题,帮助你理解客户在说什么。这比手动读 Review 高效 100 倍。
from bertopic import BERTopic
from sentence_transformers import SentenceTransformer
import pandas as pd
def analyze_review_topics(
reviews: list[str],
language: str = "english",
nr_topics: int = "auto",
min_topic_size: int = 10
) -> tuple:
"""
用 BERTopic 从 Review 文本中自动发现主题。
工作原理:
1. 用 Sentence-BERT 将每条 Review 转换为向量
2. 用 UMAP 降维
3. 用 HDBSCAN 聚类
4. 用 c-TF-IDF 提取每个主题的关键词
Args:
reviews: Review 文本列表
language: 语言 ("english" 或 "chinese")
nr_topics: 主题数量("auto" 自动确定)
min_topic_size: 最小主题大小(Review 数少于此值的主题会被合并)
Returns:
(topic_model, topics, probs)
- topic_model: 训练好的 BERTopic 模型
- topics: 每条 Review 的主题编号
- probs: 每条 Review 属于各主题的概率
"""
# 选择嵌入模型
if language == "chinese":
embedding_model = SentenceTransformer(
"paraphrase-multilingual-MiniLM-L12-v2"
)
else:
embedding_model = SentenceTransformer(
"all-MiniLM-L6-v2"
)
# 创建 BERTopic 模型
topic_model = BERTopic(
embedding_model=embedding_model,
nr_topics=nr_topics,
min_topic_size=min_topic_size,
language=language,
verbose=True
)
# 训练
topics, probs = topic_model.fit_transform(reviews)
# 输出主题概览
topic_info = topic_model.get_topic_info()
print("\n 发现的主题:")
for _, row in topic_info.head(10).iterrows():
if row["Topic"] != -1: # -1 是离群点
print(f" 主题 {row['Topic']}: {row['Name']} "
f"({row['Count']} 条 Review)")
return topic_model, topics, probs
def get_topic_summary(
topic_model: BERTopic,
reviews: list[str],
topics: list[int],
ratings: list[int] = None
) -> pd.DataFrame:
"""
生成主题摘要报告。
Args:
topic_model: 训练好的模型
reviews: Review 文本
topics: 主题编号
ratings: 评分(1-5),用于分析每个主题的情感倾向
Returns:
主题摘要 DataFrame
"""
summary_data = []
topic_info = topic_model.get_topic_info()
for _, row in topic_info.iterrows():
topic_id = row["Topic"]
if topic_id == -1:
continue
# 获取该主题的关键词
keywords = topic_model.get_topic(topic_id)
keyword_str = ", ".join([w for w, _ in keywords[:5]])
# 获取该主题的 Review 索引
topic_mask = [t == topic_id for t in topics]
topic_reviews = [r for r, m in zip(reviews, topic_mask) if m]
entry = {
"topic_id": topic_id,
"keywords": keyword_str,
"review_count": len(topic_reviews),
"sample_review": topic_reviews[0][:200] if topic_reviews else "",
}
# 如果有评分数据,计算该主题的平均评分
if ratings:
topic_ratings = [r for r, m in zip(ratings, topic_mask) if m]
entry["avg_rating"] = round(sum(topic_ratings) / len(topic_ratings), 2) if topic_ratings else None
entry["negative_pct"] = round(
sum(1 for r in topic_ratings if r <= 2) / len(topic_ratings) * 100, 1
) if topic_ratings else None
summary_data.append(entry)
summary = pd.DataFrame(summary_data)
if "avg_rating" in summary.columns:
summary = summary.sort_values("avg_rating", ascending=True)
return summary
# 使用示例
# reviews_df = pd.read_csv("data/reviews.csv")
# reviews = reviews_df["review_text"].tolist()
# ratings = reviews_df["rating"].tolist()
#
# topic_model, topics, probs = analyze_review_topics(reviews, language="english")
#
# # 生成主题摘要
# summary = get_topic_summary(topic_model, reviews, topics, ratings)
# print(summary.to_string(index=False))
#
# # 可视化主题分布
# fig = topic_model.visualize_topics()
# fig.write_html("output/review_topics.html")
#
# # 查看主题随时间的变化(需要日期数据)
# timestamps = reviews_df["date"].tolist()
# topics_over_time = topic_model.topics_over_time(reviews, topics, timestamps)
# fig = topic_model.visualize_topics_over_time(topics_over_time)
# fig.write_html("output/topics_over_time.html")
BERTopic 的实际价值:假设你有 5000 条竞品 Review,手动读完需要几天。BERTopic 5 分钟就能告诉你:主题 1 是“电池续航差“(平均评分 2.1),主题 2 是“画质好“(平均评分 4.5),主题 3 是“App 难用“(平均评分 1.8)。这直接告诉你产品改进方向。
3.5 预测结果转化为补货决策
预测本身不是目的,补货决策才是。这一节把预测结果连接到 A5 库存模块 的补货逻辑。
def forecast_to_reorder(
forecast: pd.DataFrame,
current_stock: int,
lead_time_days: int = 30,
safety_stock_days: int = 14,
moq: int = 100
) -> dict:
"""
将预测结果转化为补货建议。
Args:
forecast: Prophet 预测结果
current_stock: 当前库存数量
lead_time_days: 供应商交货周期(天)
safety_stock_days: 安全库存天数
moq: 最小起订量
Returns:
补货建议字典
"""
# 取未来预测数据
future_data = forecast[forecast["ds"] > pd.Timestamp.now()]
if future_data.empty:
return {"error": "无未来预测数据"}
# 计算日均预测销量(用上界做保守估计)
daily_forecast = future_data["yhat"].mean()
daily_upper = future_data["yhat_upper"].mean()
# 安全库存 = 安全天数 × 日均销量上界
safety_stock = int(safety_stock_days * daily_upper)
# Lead Time 期间的预期消耗
lt_consumption = int(lead_time_days * daily_forecast)
# 再订货点 = Lead Time 消耗 + 安全库存
reorder_point = lt_consumption + safety_stock
# 当前库存可支撑天数
days_of_stock = int(current_stock / daily_forecast) if daily_forecast > 0 else 999
# 建议订货量 = 90 天预测需求 - 当前库存 + 安全库存
forecast_90d = int(future_data["yhat"].head(90).sum())
suggested_qty = max(forecast_90d - current_stock + safety_stock, 0)
# 向上取整到 MOQ 的倍数
if suggested_qty > 0:
suggested_qty = max(
((suggested_qty + moq - 1) // moq) * moq,
moq
)
# 紧急程度判断
if current_stock <= reorder_point * 0.5:
urgency = " 紧急补货"
elif current_stock <= reorder_point:
urgency = " 建议补货"
else:
urgency = " 库存充足"
result = {
"urgency": urgency,
"current_stock": current_stock,
"days_of_stock": days_of_stock,
"daily_forecast": round(daily_forecast, 1),
"safety_stock": safety_stock,
"reorder_point": reorder_point,
"suggested_qty": suggested_qty,
"forecast_90d": forecast_90d,
"lead_time_days": lead_time_days,
}
print(f"\n 补货建议:")
print(f" 状态: {urgency}")
print(f" 当前库存: {current_stock} 件(可支撑 {days_of_stock} 天)")
print(f" 日均预测: {daily_forecast:.1f} 件/天")
print(f" 安全库存: {safety_stock} 件")
print(f" 再订货点: {reorder_point} 件")
print(f" 建议订货: {suggested_qty} 件(MOQ={moq})")
return result
# 使用示例
# reorder = forecast_to_reorder(
# forecast=forecast,
# current_stock=500,
# lead_time_days=30,
# safety_stock_days=14,
# moq=200
# )
4. 模型评估
4.1 评估指标
| 指标 | 公式 | 含义 | 适合场景 |
|---|---|---|---|
| MAE | mean(abs(actual - predicted)) | 平均绝对误差 | 通用,对异常值不敏感 |
| RMSE | sqrt(mean((actual - predicted)²)) | 均方根误差 | 惩罚大误差,适合不容忍大偏差的场景 |
| MAPE | mean(abs((actual - predicted) / actual)) × 100% | 平均绝对百分比误差 | 跨 SKU 对比(不受销量基数影响) |
| WAPE | sum(abs(actual - predicted)) / sum(actual) × 100% | 加权绝对百分比误差 | 避免 MAPE 在低销量时爆炸 |
电商场景推荐用 WAPE:MAPE 在实际销量接近 0 时会趋向无穷大(除以接近 0 的数),而 WAPE 用总销量做分母,更稳定。
def evaluate_forecast(
actual: pd.Series,
predicted: pd.Series
) -> dict:
"""
计算预测评估指标。
Args:
actual: 实际值
predicted: 预测值
Returns:
评估指标字典
"""
actual = actual.values
predicted = predicted.values
mae = np.mean(np.abs(actual - predicted))
rmse = np.sqrt(np.mean((actual - predicted) ** 2))
# MAPE(过滤掉实际值为 0 的天)
nonzero_mask = actual > 0
if nonzero_mask.any():
mape = np.mean(
np.abs((actual[nonzero_mask] - predicted[nonzero_mask])
/ actual[nonzero_mask])
) * 100
else:
mape = float("inf")
# WAPE(更稳健)
wape = np.sum(np.abs(actual - predicted)) / np.sum(actual) * 100 if np.sum(actual) > 0 else float("inf")
metrics = {
"MAE": round(mae, 2),
"RMSE": round(rmse, 2),
"MAPE": round(mape, 2),
"WAPE": round(wape, 2),
}
print(" 评估结果:")
for k, v in metrics.items():
unit = "%" if k in ("MAPE", "WAPE") else "units"
print(f" {k}: {v} {unit}")
return metrics
MAPE 参考基准(电商场景):
| MAPE | 评价 | 说明 |
|---|---|---|
| < 15% | 优秀 | 稳定老品,数据充足 |
| 15-25% | 良好 | 大多数 SKU 的合理水平 |
| 25-40% | 可接受 | 新品或波动较大的品类 |
| > 40% | 需改进 | 检查数据质量或模型配置 |
4.2 回测(Backtesting)
回测是验证预测模型最可靠的方法:用历史数据模拟“如果当时用这个模型预测,结果会怎样“。
def backtest_prophet(
df: pd.DataFrame,
initial_days: int = 180,
horizon_days: int = 30,
period_days: int = 30
) -> pd.DataFrame:
"""
Prophet 回测:滚动窗口验证。
原理:
1. 用前 initial_days 天数据训练
2. 预测未来 horizon_days 天
3. 与实际值对比
4. 窗口向前滑动 period_days 天,重复
Args:
df: Prophet 格式数据
initial_days: 初始训练数据天数
horizon_days: 每次预测的天数
period_days: 窗口滑动步长
Returns:
回测结果 DataFrame
"""
from prophet.diagnostics import cross_validation, performance_metrics
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
changepoint_prior_scale=0.1,
interval_width=0.8,
)
model.fit(df)
# 交叉验证
cv_results = cross_validation(
model,
initial=f"{initial_days} days",
period=f"{period_days} days",
horizon=f"{horizon_days} days"
)
# 计算性能指标
perf = performance_metrics(cv_results)
print(" 回测结果:")
print(f" MAE: {perf['mae'].mean():.2f}")
print(f" RMSE: {perf['rmse'].mean():.2f}")
print(f" MAPE: {perf['mape'].mean() * 100:.2f}%")
return cv_results, perf
# 使用示例
# cv_results, perf = backtest_prophet(prophet_df, initial_days=180, horizon_days=30)
#
# # 可视化回测结果
# from prophet.plot import plot_cross_validation_metric
# fig = plot_cross_validation_metric(cv_results, metric="mape")
# plt.savefig("output/backtest_mape.png", dpi=150)
4.3 预测区间的使用
预测值是一个点估计,但业务决策需要考虑不确定性。Prophet 的预测区间告诉你“真实值大概率落在这个范围内“。
| 决策场景 | 用哪个值 | 理由 |
|---|---|---|
| 补货量计算 | yhat_upper(上界) | 宁可多备一点,断货损失 > 库存成本 |
| 销售目标设定 | yhat(中位数) | 目标应该是最可能的结果 |
| 悲观情景分析 | yhat_lower(下界) | 评估最差情况下的现金流 |
| 仓储空间规划 | yhat_upper(上界) | 确保有足够空间存放 |
5. 实战项目:构建 SKU 销量预测系统
5.1 项目架构
sales-forecaster/
config.py # 配置(数据路径、模型参数)
requirements.txt # 依赖
data/ # 数据目录
raw/ # 原始销量数据
processed/ # 清洗后的数据
models/ # 模型存储
prophet/ # Prophet 模型文件
src/
data_prep.py # 数据准备(清洗、断货处理)
prophet_model.py # Prophet 训练和预测
autogluon_model.py # AutoGluon 批量预测
review_analysis.py # BERTopic Review 分析
evaluator.py # 模型评估和回测
reorder.py # 补货决策
output/ # 输出
forecasts/ # 预测结果 CSV
reports/ # HTML 报告
plots/ # 图表
run_forecast.py # 主入口:单 SKU 预测
run_batch_forecast.py # 批量预测入口
README.md
5.2 主入口脚本
# run_forecast.py 单 SKU 销量预测
import argparse
import pandas as pd
from pathlib import Path
from src.data_prep import prepare_prophet_data, handle_stockout
from src.prophet_model import (
train_prophet_with_holidays,
make_forecast,
plot_forecast
)
from src.evaluator import evaluate_forecast, backtest_prophet
from src.reorder import forecast_to_reorder
def run(
data_path: str,
asin: str = None,
forecast_days: int = 90,
current_stock: int = None,
lead_time: int = 30
):
"""
完整预测流程:数据准备 → 训练 → 预测 → 评估 → 补货建议
"""
print(f" 开始预测流程")
# 1. 加载数据
df = pd.read_csv(data_path)
if asin and "asin" in df.columns:
df = df[df["asin"] == asin]
print(f" SKU: {asin}")
# 2. 数据准备
prophet_df = prepare_prophet_data(df, date_col="date", value_col="units")
prophet_df = handle_stockout(prophet_df, units_col="y")
# 3. 训练(带节假日效应)
model = train_prophet_with_holidays(prophet_df, changepoint_prior=0.1)
# 4. 预测
forecast = make_forecast(model, periods=forecast_days)
# 5. 评估(用最后 30 天做验证)
if len(prophet_df) > 30:
train_df = prophet_df.iloc[:-30]
test_df = prophet_df.iloc[-30:]
eval_model = train_prophet_with_holidays(train_df)
eval_forecast = make_forecast(eval_model, periods=30)
eval_pred = eval_forecast.tail(30)["yhat"].values
eval_actual = test_df["y"].values
metrics = evaluate_forecast(
pd.Series(eval_actual), pd.Series(eval_pred)
)
# 6. 可视化
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)
plot_forecast(
model, forecast, actual_df=prophet_df,
title=f"{'ASIN ' + asin if asin else 'SKU'} 销量预测 ({forecast_days}天)"
)
# 7. 保存预测结果
forecast_output = forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].tail(forecast_days)
forecast_output.to_csv(
output_dir / f"forecast_{asin or 'sku'}_{forecast_days}d.csv",
index=False
)
# 8. 补货建议
if current_stock is not None:
reorder = forecast_to_reorder(
forecast,
current_stock=current_stock,
lead_time_days=lead_time
)
print(f"\n 预测完成!结果已保存到 output/")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="SKU 销量预测")
parser.add_argument("--data", required=True, help="销量数据 CSV 路径")
parser.add_argument("--asin", help="ASIN 编号")
parser.add_argument("--days", type=int, default=90, help="预测天数")
parser.add_argument("--stock", type=int, help="当前库存")
parser.add_argument("--lead-time", type=int, default=30, help="交货周期(天)")
args = parser.parse_args()
run(
data_path=args.data,
asin=args.asin,
forecast_days=args.days,
current_stock=args.stock,
lead_time=args.lead_time
)
# 运行示例
python3 run_forecast.py --data data/daily_sales.csv --asin B0XXXXX --days 90
python3 run_forecast.py --data data/daily_sales.csv --asin B0XXXXX --stock 500 --lead-time 30
6. 常见陷阱
| 陷阱 | 表现 | 解决方案 |
|---|---|---|
| 过拟合 | 训练集 MAPE 很低,测试集 MAPE 很高 | 降低 changepoint_prior_scale,用回测验证 |
| 数据泄露 | 用未来数据训练模型(如用全年数据预测 Q3) | 严格按时间切分训练/测试集,用 cross_validation |
| 忽略断货 | 断货期间的 0 被当作真实需求 | 用 handle_stockout 函数处理 |
| 忽略外部因素 | 竞品降价导致销量暴涨,模型无法解释 | 添加外部回归变量(竞品价格、广告花费) |
| 预测值为负 | Prophet 可能输出负数预测 | 用 .clip(lower=0) 截断 |
| 季节性错配 | 用周数据训练却期望日级预测 | 确保训练数据和预测频率一致 |
| 大促过拟合 | 模型把大促当作常规模式 | 用 holidays 参数显式建模大促事件 |
| 新品无数据 | 新 ASIN 没有历史销量 | 用类似品的销量曲线做类比,或用 AutoGluon 的迁移学习 |
7. 学习资源
7.1 免费课程与文档
| 资源 | 平台 | 时长 | 适合谁 | 链接 |
|---|---|---|---|---|
| Prophet 官方教程 | Meta | 2h | 时间序列入门 | facebook.github.io/prophet |
| Kaggle: Time Series | Kaggle | 5h | 时间序列基础 | kaggle.com/learn/time-series |
| Kaggle: Intro to ML | Kaggle | 4h | ML 零基础 | kaggle.com/learn/intro-to-machine-learning |
| Google ML Crash Course | 15h | 系统学习 ML | developers.google.com/machine-learning/crash-course | |
| AutoGluon 时间序列教程 | Amazon | 2h | 自动化预测 | auto.gluon.ai |
| BERTopic 文档 | GitHub | 3h | Review 主题分析 | maartengr.github.io/BERTopic |
| Darts 文档 | Unit8 | 3h | 多模型对比 | unit8co.github.io/darts |
7.2 推荐 GitHub 仓库
| 仓库 | Star | 用途 |
|---|---|---|
| Prophet | 18k+ | 时间序列预测核心库 |
| AutoGluon | 8k+ | 自动化 ML 框架 |
| BERTopic | 6k+ | 主题建模 |
| Darts | 8k+ | 时间序列工具箱 |
| OR-Tools | 11k+ | 运筹优化 |
Content rephrased for compliance with licensing restrictions. Sources cited inline.
9. 完成标志
- 用 Prophet 对一个真实 SKU 做 90 天销量预测,输出预测值和置信区间
- 添加电商大促节假日效应(Prime Day、BFCM),对比有无节假日的预测精度差异
- 用回测(backtesting)验证模型,MAPE < 30%
- 用 AutoGluon 对 10+ SKU 做批量预测,查看模型排行榜
- 用 BERTopic 分析一组 Review 文本,发现至少 3 个有意义的主题
- 将预测结果转化为补货建议(计算再订货点和建议订货量)
完成以上所有项目后,你已经掌握了电商预测建模的核心技能。接下来进入 B3 RAG 知识库,学习如何构建基于 RAG 的智能问答系统。
附录
附录 A:模型选择决策树
你的预测需求是什么?
单个 SKU 深度预测
有 1 年以上历史数据?
是 → Prophet + 节假日 + 外部变量
否 → Prophet 基础版 / 移动平均
需要解释为什么这样预测?
是 → Prophet(可分解为趋势+季节性)
否 → AutoGluon(自动选最优模型)
批量预测 100+ SKU
有 GPU?
是 → AutoGluon (high_quality preset)
否 → AutoGluon (medium_quality preset)
需要快速出结果?
是 → AutoGluon (fast_training preset)
Review 文本分析
发现主题 → BERTopic
情感分类 → scikit-learn + TF-IDF / BERT
关键词提取 → BERTopic 的 c-TF-IDF
补货优化
简单规则 → 预测值 + 安全库存公式
多约束优化 → OR-Tools(考虑 MOQ、仓容、资金)
附录 B:代码速查表
# === Prophet 基础 ===
from prophet import Prophet
df = pd.DataFrame({"ds": dates, "y": values}) # 必须是 ds 和 y
model = Prophet()
model.fit(df)
future = model.make_future_dataframe(periods=90)
forecast = model.predict(future)
model.plot(forecast) # 预测图
model.plot_components(forecast) # 成分分解图
# === Prophet 节假日 ===
holidays = pd.DataFrame({
"holiday": ["prime_day"], "ds": ["2025-07-12"],
"lower_window": [-3], "upper_window": [2]
})
model = Prophet(holidays=holidays)
# === Prophet 外部变量 ===
model = Prophet()
model.add_regressor("ad_spend")
model.fit(df) # df 必须包含 ad_spend 列
future["ad_spend"] = 500 # 预测时也要提供
# === Prophet 回测 ===
from prophet.diagnostics import cross_validation, performance_metrics
cv = cross_validation(model, initial="180 days", period="30 days", horizon="30 days")
perf = performance_metrics(cv)
# === AutoGluon ===
from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor
ts = TimeSeriesDataFrame.from_data_frame(df, id_column="item_id", timestamp_column="timestamp")
predictor = TimeSeriesPredictor(prediction_length=30, target="target")
predictor.fit(ts, time_limit=300)
predictions = predictor.predict(ts)
# === BERTopic ===
from bertopic import BERTopic
model = BERTopic(language="english", min_topic_size=10)
topics, probs = model.fit_transform(documents)
model.get_topic_info() # 主题概览
model.visualize_topics() # 主题可视化
# === 评估指标 ===
mae = np.mean(np.abs(actual - predicted))
rmse = np.sqrt(np.mean((actual - predicted) ** 2))
mape = np.mean(np.abs((actual - predicted) / actual)) * 100
wape = np.sum(np.abs(actual - predicted)) / np.sum(actual) * 100
附录 C:依赖安装
# 基础预测
pip install prophet pandas numpy matplotlib
# AutoGluon(较大,建议单独安装)
pip install autogluon.timeseries
# Review 分析
pip install bertopic sentence-transformers
# 运筹优化
pip install ortools
# 全部安装
pip install prophet pandas numpy matplotlib \
autogluon.timeseries \
bertopic sentence-transformers \
ortools scikit-learn \
statsmodels
Prophet 在某些系统上安装可能遇到问题(依赖 pystan/cmdstanpy)。如果安装失败,参考 Prophet 安装指南 或使用 Google Colab(预装了大部分依赖)。