Files
medical-mall/server/消息推送文档/NOTIFY_WORKER_README.md
not-like-juvenile 4acbb8ced5 补充方案
2026-03-12 10:36:51 +08:00

153 lines
6.9 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# notify-worker物流事件 -> 消息入队)说明
面向读者:后端/运维/联调同学。
一句话:`notify-worker` 是常驻 worker用于从 `notify_queue` 消费“新物流事件通知”并生成upsert`express_notifications`,让后续的 `push-server` consumer 去实际下发推送。
---
## 1. 它解决什么问题?
- **解耦**Webhook 入库只保证事实表(`platform_express_*`)一致;消息生成用 worker 异步做,避免在 webhook 请求里做过多业务计算。
- **可追溯/可补偿**:队列表 `notify_queue` 和消息表 `express_notifications` 让每次处理结果可落库(成功/跳过/失败原因)。
- **幂等**:对同一事件(同一 `dedupe_key`)写入消息时,使用稳定的 `message_id` 进行 upsert避免重复生成消息。
在整体链路中的位置:
1) webhook-receiver 写入 `platform_express_tracking_events`
2) 事件被推入 `notify_queue`(由触发器/任务/上游逻辑实现)
3) **notify-worker 消费 `notify_queue` → 写入 `express_notifications`**
4) push-server consumer 轮询 `express_notifications` → 调用 `CLOUD_FUNC_URL` 下发
---
## 2. 用什么做的(技术栈/依赖)
- **运行时**Node.js建议 18+,以确保 `fetch` 可用;旧版本会回退到 `node-fetch`
- **通信方式**:直接调用 Supabase PostgREST REST API`$SUPA_URL/rest/v1/...`
- **鉴权**:默认仅发送 `apikey` header支持 worker 专用开关 `NOTIFY_WORKER_SUPA_USE_BEARER=true`(或 `WORKER_SUPA_USE_BEARER=true`)来附加 `Authorization: Bearer ...`(否则会回退到全局 `SUPA_USE_BEARER`)。
- **核心依赖**
- `crypto`:计算稳定的 `message_id`SHA-256
- `fetch` / `node-fetch`HTTP 调用 Supabase REST
它不是一个 HTTP Server不监听端口而是一个“循环轮询的后台进程”。
---
## 3. 输入/输出(读写哪些表)
### 3.1 输入:`notify_queue`
`notify-worker` 会拉取:
- `processed_at IS NULL` 的记录
-`created_at ASC` 排序
- 每次最多 `BATCH_SIZE`
并在处理完成后回写:
- `processed_at`(处理时间)
- `process_status``queued` / `skipped` / `failed`
- `last_error`(失败/跳过原因,截断保存)
> 说明:字段命名以当前脚本实现为准,你的表结构需要包含这些列。
### 3.2 查询依赖
为把“物流事件”映射到收件人user/merchant脚本会查询
- `platform_express_waybills`:取 `order_id/order_no, carrier, tracking_no`
- `ml_orders`:取 `user_id, merchant_id`
### 3.3 输出:`express_notifications`
对每个收件人写入 1 条通知(同一事件会写 user 与 merchant 两条,若都存在):
- `aud``user``merchant`
- `recipient_id`
- `order_id / waybill_id / tracking_no / carrier`
- `message_id`:稳定哈希生成(`aud|waybill_id|dedupe_key`
- `dedupe_key``waybill_id|aud|<queue.dedupe_key>`(截断到 256
- `event_text_safe`:当前实现直接复用队列行的 `event_text`
- `status_code / event_time / payload`
写入方式是 **upsert**`POST express_notifications?on_conflict=message_id`,并使用 `resolution=merge-duplicates`
注意:数据库侧必须有 **普通** 唯一索引/约束 `express_notifications(message_id)`;不要用 `WHERE message_id IS NOT NULL` 的部分唯一索引,否则会触发 `42P10`
若你已遇到该错误,执行修复脚本:`pages/mall/delivery/doc/需求文档/20260310_fix_express_notifications_on_conflict_message_id.sql`
---
## 4. 配置与启动
### 4.1 配置加载优先级
脚本会复用 `server/load-config.js`,加载顺序(只填充未设置的 env
1) 系统环境变量
2) `CONFIG_FILE` / `CONFIG_PATH` 指定的 `.env``.json`
3) 同目录 `server/notify-worker.config.json`
4) `server/.env` / `server/config.json` / `server/config.json.example`
### 4.2 关键环境变量
- `SUPA_URL`(必需)
- `SERVICE_ROLE_KEY``SUPA_KEY`(必需,推荐 service_role
- `NOTIFY_WORKER_SUPA_USE_BEARER`(可选,默认 false仅对 notify-worker 生效,若为 `true` 则发送 `Authorization: Bearer <SUPA_KEY>`
- `SUPA_USE_BEARER`(可选,默认 false全局开关会影响其它服务一般不建议在自托管 JWT_SECRET 不一致时开启)
RLS 开启且 Bearer/JWT 不可用时的替代方案(慎用/止血):
- `NOTIFY_WORKER_USE_RLS_SAFE_RPC`(可选,默认 false若为 `true`,则不再直查 `ml_orders`,改为调用 `rpc/notify_get_order_recipients`SECURITY DEFINER获取 `user_id/merchant_id`
- `NOTIFY_WORKER_RPC_TOKEN`(当上面为 true 时必需notify-worker 调用 RPC 时通过请求头 `x-notify-worker-token` 传递;数据库函数会校验该 token。
配套 SQL
- `pages/mall/delivery/doc/需求文档/db/绕过RLS的方案(慎用)/20260311_add_rpc_notify_get_order_recipients.sql`
worker 行为:
- `NOTIFY_WORKER_POLL_MS`(默认 2000
- `NOTIFY_WORKER_BATCH_SIZE`(默认 20
- `RUN_ONCE=true`(只跑一轮就退出,适合手动验证/CI
### 4.3 启动命令
在仓库根目录:
```powershell
# 常驻运行
node server/notify-worker.js
# 只跑一轮(用于验证)
$env:RUN_ONCE='true'
node server/notify-worker.js
```
如果你的配置放在 JSON
```powershell
$env:CONFIG_FILE=(Resolve-Path .\server\notify-worker.config.json)
node server/notify-worker.js
```
---
## 5. 管理者“一页摘要”(给非一线同学)
### 业务价值
- 让“物流事件入库”与“消息生成/推送”解耦,降低 webhook 链路复杂度和故障扩散。
- 通过队列表落库,可审计、可重放、可定位卡点。
### 边界
- notify-worker **只负责生成消息入队**`express_notifications`),不负责实际推送下发。
- 推送通道成功率与重试投递由 push-server consumer + 云函数负责。
### 关键依赖
- Supabase/Postgres 表:`notify_queue``platform_express_waybills``ml_orders``express_notifications`
- 上游必须保证:有机制把新事件推入 `notify_queue`(触发器/任务/服务均可)
### 主要风险与控制点
- **幂等/重复消息**:依赖 `dedupe_key` 的稳定性;上游写队列时应构造稳定的 dedupe_key不要用 Date.now 这种随机值)。
- **权限/密钥**:需要 `service_role` 读写多表,必须只放服务器环境变量。
- **数据合规**`event_text_safe` 应为清洗/脱敏后的文案;若上游传入的是原文,需在生成前做脱敏。
### 建议监控指标
- `notify_queue` 未处理数量(`processed_at IS NULL`)是否持续增长
- 每分钟处理量、`process_status=failed` 比例与 Top `last_error`
- 从队列入库到消息入队的延迟created_at → processed_at
---
## 6. 进一步阅读
- push-server投递消费者`server/PUSH_SERVER_README.md`
- 总览Webhook + worker + push`docs/DELIVERY_WEBHOOK_PUSH_SERVER_OVERVIEW.md`
- webhook-receiver事件入库`pages/mall/delivery/webhook-server/README.md`