153 lines
6.9 KiB
Markdown
153 lines
6.9 KiB
Markdown
# notify-worker(物流事件 -> 消息入队)说明
|
||
|
||
面向读者:后端/运维/联调同学。
|
||
|
||
一句话:`notify-worker` 是常驻 worker,用于从 `notify_queue` 消费“新物流事件通知”,并生成(upsert)到 `express_notifications`,让后续的 `push-server` consumer 去实际下发推送。
|
||
|
||
---
|
||
|
||
## 1. 它解决什么问题?
|
||
|
||
- **解耦**:Webhook 入库只保证事实表(`platform_express_*`)一致;消息生成用 worker 异步做,避免在 webhook 请求里做过多业务计算。
|
||
- **可追溯/可补偿**:队列表 `notify_queue` 和消息表 `express_notifications` 让每次处理结果可落库(成功/跳过/失败原因)。
|
||
- **幂等**:对同一事件(同一 `dedupe_key`)写入消息时,使用稳定的 `message_id` 进行 upsert,避免重复生成消息。
|
||
|
||
在整体链路中的位置:
|
||
|
||
1) webhook-receiver 写入 `platform_express_tracking_events`
|
||
2) 事件被推入 `notify_queue`(由触发器/任务/上游逻辑实现)
|
||
3) **notify-worker 消费 `notify_queue` → 写入 `express_notifications`**
|
||
4) push-server consumer 轮询 `express_notifications` → 调用 `CLOUD_FUNC_URL` 下发
|
||
|
||
---
|
||
|
||
## 2. 用什么做的(技术栈/依赖)
|
||
|
||
- **运行时**:Node.js(建议 18+,以确保 `fetch` 可用;旧版本会回退到 `node-fetch`)
|
||
- **通信方式**:直接调用 Supabase PostgREST REST API(`$SUPA_URL/rest/v1/...`)
|
||
- **鉴权**:默认仅发送 `apikey` header;支持 worker 专用开关 `NOTIFY_WORKER_SUPA_USE_BEARER=true`(或 `WORKER_SUPA_USE_BEARER=true`)来附加 `Authorization: Bearer ...`(否则会回退到全局 `SUPA_USE_BEARER`)。
|
||
- **核心依赖**:
|
||
- `crypto`:计算稳定的 `message_id`(SHA-256)
|
||
- `fetch` / `node-fetch`:HTTP 调用 Supabase REST
|
||
|
||
它不是一个 HTTP Server(不监听端口),而是一个“循环轮询的后台进程”。
|
||
|
||
---
|
||
|
||
## 3. 输入/输出(读写哪些表)
|
||
|
||
### 3.1 输入:`notify_queue`
|
||
`notify-worker` 会拉取:
|
||
- `processed_at IS NULL` 的记录
|
||
- 按 `created_at ASC` 排序
|
||
- 每次最多 `BATCH_SIZE` 条
|
||
|
||
并在处理完成后回写:
|
||
- `processed_at`(处理时间)
|
||
- `process_status`:`queued` / `skipped` / `failed`
|
||
- `last_error`(失败/跳过原因,截断保存)
|
||
|
||
> 说明:字段命名以当前脚本实现为准,你的表结构需要包含这些列。
|
||
|
||
### 3.2 查询依赖
|
||
为把“物流事件”映射到收件人(user/merchant),脚本会查询:
|
||
- `platform_express_waybills`:取 `order_id/order_no, carrier, tracking_no`
|
||
- `ml_orders`:取 `user_id, merchant_id`
|
||
|
||
### 3.3 输出:`express_notifications`
|
||
对每个收件人写入 1 条通知(同一事件会写 user 与 merchant 两条,若都存在):
|
||
- `aud`:`user` 或 `merchant`
|
||
- `recipient_id`
|
||
- `order_id / waybill_id / tracking_no / carrier`
|
||
- `message_id`:稳定哈希生成(`aud|waybill_id|dedupe_key`)
|
||
- `dedupe_key`:`waybill_id|aud|<queue.dedupe_key>`(截断到 256)
|
||
- `event_text_safe`:当前实现直接复用队列行的 `event_text`
|
||
- `status_code / event_time / payload`
|
||
|
||
写入方式是 **upsert**:`POST express_notifications?on_conflict=message_id`,并使用 `resolution=merge-duplicates`。
|
||
|
||
注意:数据库侧必须有 **普通** 唯一索引/约束 `express_notifications(message_id)`;不要用 `WHERE message_id IS NOT NULL` 的部分唯一索引,否则会触发 `42P10`。
|
||
若你已遇到该错误,执行修复脚本:`pages/mall/delivery/doc/需求文档/20260310_fix_express_notifications_on_conflict_message_id.sql`。
|
||
|
||
---
|
||
|
||
## 4. 配置与启动
|
||
|
||
### 4.1 配置加载优先级
|
||
脚本会复用 `server/load-config.js`,加载顺序(只填充未设置的 env):
|
||
1) 系统环境变量
|
||
2) `CONFIG_FILE` / `CONFIG_PATH` 指定的 `.env` 或 `.json`
|
||
3) 同目录 `server/notify-worker.config.json`
|
||
4) `server/.env` / `server/config.json` / `server/config.json.example`
|
||
|
||
### 4.2 关键环境变量
|
||
- `SUPA_URL`(必需)
|
||
- `SERVICE_ROLE_KEY` 或 `SUPA_KEY`(必需,推荐 service_role)
|
||
- `NOTIFY_WORKER_SUPA_USE_BEARER`(可选,默认 false):仅对 notify-worker 生效,若为 `true` 则发送 `Authorization: Bearer <SUPA_KEY>`
|
||
- `SUPA_USE_BEARER`(可选,默认 false):全局开关(会影响其它服务;一般不建议在自托管 JWT_SECRET 不一致时开启)
|
||
|
||
RLS 开启且 Bearer/JWT 不可用时的替代方案(慎用/止血):
|
||
- `NOTIFY_WORKER_USE_RLS_SAFE_RPC`(可选,默认 false):若为 `true`,则不再直查 `ml_orders`,改为调用 `rpc/notify_get_order_recipients`(SECURITY DEFINER)获取 `user_id/merchant_id`。
|
||
- `NOTIFY_WORKER_RPC_TOKEN`(当上面为 true 时必需):notify-worker 调用 RPC 时通过请求头 `x-notify-worker-token` 传递;数据库函数会校验该 token。
|
||
|
||
配套 SQL:
|
||
- `pages/mall/delivery/doc/需求文档/db/绕过RLS的方案(慎用)/20260311_add_rpc_notify_get_order_recipients.sql`
|
||
|
||
worker 行为:
|
||
- `NOTIFY_WORKER_POLL_MS`(默认 2000)
|
||
- `NOTIFY_WORKER_BATCH_SIZE`(默认 20)
|
||
- `RUN_ONCE=true`(只跑一轮就退出,适合手动验证/CI)
|
||
|
||
### 4.3 启动命令
|
||
在仓库根目录:
|
||
|
||
```powershell
|
||
# 常驻运行
|
||
node server/notify-worker.js
|
||
|
||
# 只跑一轮(用于验证)
|
||
$env:RUN_ONCE='true'
|
||
node server/notify-worker.js
|
||
```
|
||
|
||
如果你的配置放在 JSON:
|
||
|
||
```powershell
|
||
$env:CONFIG_FILE=(Resolve-Path .\server\notify-worker.config.json)
|
||
node server/notify-worker.js
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 管理者“一页摘要”(给非一线同学)
|
||
|
||
### 业务价值
|
||
- 让“物流事件入库”与“消息生成/推送”解耦,降低 webhook 链路复杂度和故障扩散。
|
||
- 通过队列表落库,可审计、可重放、可定位卡点。
|
||
|
||
### 边界
|
||
- notify-worker **只负责生成消息入队**(`express_notifications`),不负责实际推送下发。
|
||
- 推送通道成功率与重试投递由 push-server consumer + 云函数负责。
|
||
|
||
### 关键依赖
|
||
- Supabase/Postgres 表:`notify_queue`、`platform_express_waybills`、`ml_orders`、`express_notifications`
|
||
- 上游必须保证:有机制把新事件推入 `notify_queue`(触发器/任务/服务均可)
|
||
|
||
### 主要风险与控制点
|
||
- **幂等/重复消息**:依赖 `dedupe_key` 的稳定性;上游写队列时应构造稳定的 dedupe_key(不要用 Date.now 这种随机值)。
|
||
- **权限/密钥**:需要 `service_role` 读写多表,必须只放服务器环境变量。
|
||
- **数据合规**:`event_text_safe` 应为清洗/脱敏后的文案;若上游传入的是原文,需在生成前做脱敏。
|
||
|
||
### 建议监控指标
|
||
- `notify_queue` 未处理数量(`processed_at IS NULL`)是否持续增长
|
||
- 每分钟处理量、`process_status=failed` 比例与 Top `last_error`
|
||
- 从队列入库到消息入队的延迟(created_at → processed_at)
|
||
|
||
---
|
||
|
||
## 6. 进一步阅读
|
||
|
||
- push-server(投递消费者):`server/PUSH_SERVER_README.md`
|
||
- 总览(Webhook + worker + push):`docs/DELIVERY_WEBHOOK_PUSH_SERVER_OVERVIEW.md`
|
||
- webhook-receiver(事件入库):`pages/mall/delivery/webhook-server/README.md`
|