# notify-worker(物流事件 -> 消息入队)说明 面向读者:后端/运维/联调同学。 一句话:`notify-worker` 是常驻 worker,用于从 `notify_queue` 消费“新物流事件通知”,并生成(upsert)到 `express_notifications`,让后续的 `push-server` consumer 去实际下发推送。 --- ## 1. 它解决什么问题? - **解耦**:Webhook 入库只保证事实表(`platform_express_*`)一致;消息生成用 worker 异步做,避免在 webhook 请求里做过多业务计算。 - **可追溯/可补偿**:队列表 `notify_queue` 和消息表 `express_notifications` 让每次处理结果可落库(成功/跳过/失败原因)。 - **幂等**:对同一事件(同一 `dedupe_key`)写入消息时,使用稳定的 `message_id` 进行 upsert,避免重复生成消息。 在整体链路中的位置: 1) webhook-receiver 写入 `platform_express_tracking_events` 2) 事件被推入 `notify_queue`(由触发器/任务/上游逻辑实现) 3) **notify-worker 消费 `notify_queue` → 写入 `express_notifications`** 4) push-server consumer 轮询 `express_notifications` → 调用 `CLOUD_FUNC_URL` 下发 --- ## 2. 用什么做的(技术栈/依赖) - **运行时**:Node.js(建议 18+,以确保 `fetch` 可用;旧版本会回退到 `node-fetch`) - **通信方式**:直接调用 Supabase PostgREST REST API(`$SUPA_URL/rest/v1/...`) - **鉴权**:默认仅发送 `apikey` header;支持 worker 专用开关 `NOTIFY_WORKER_SUPA_USE_BEARER=true`(或 `WORKER_SUPA_USE_BEARER=true`)来附加 `Authorization: Bearer ...`(否则会回退到全局 `SUPA_USE_BEARER`)。 - **核心依赖**: - `crypto`:计算稳定的 `message_id`(SHA-256) - `fetch` / `node-fetch`:HTTP 调用 Supabase REST 它不是一个 HTTP Server(不监听端口),而是一个“循环轮询的后台进程”。 --- ## 3. 输入/输出(读写哪些表) ### 3.1 输入:`notify_queue` `notify-worker` 会拉取: - `processed_at IS NULL` 的记录 - 按 `created_at ASC` 排序 - 每次最多 `BATCH_SIZE` 条 并在处理完成后回写: - `processed_at`(处理时间) - `process_status`:`queued` / `skipped` / `failed` - `last_error`(失败/跳过原因,截断保存) > 说明:字段命名以当前脚本实现为准,你的表结构需要包含这些列。 ### 3.2 查询依赖 为把“物流事件”映射到收件人(user/merchant),脚本会查询: - `platform_express_waybills`:取 `order_id/order_no, carrier, tracking_no` - `ml_orders`:取 `user_id, merchant_id` ### 3.3 输出:`express_notifications` 对每个收件人写入 1 条通知(同一事件会写 user 与 merchant 两条,若都存在): - `aud`:`user` 或 `merchant` - `recipient_id` - `order_id / waybill_id / tracking_no / carrier` - `message_id`:稳定哈希生成(`aud|waybill_id|dedupe_key`) - `dedupe_key`:`waybill_id|aud|`(截断到 256) - `event_text_safe`:当前实现直接复用队列行的 `event_text` - `status_code / event_time / payload` 写入方式是 **upsert**:`POST express_notifications?on_conflict=message_id`,并使用 `resolution=merge-duplicates`。 注意:数据库侧必须有 **普通** 唯一索引/约束 `express_notifications(message_id)`;不要用 `WHERE message_id IS NOT NULL` 的部分唯一索引,否则会触发 `42P10`。 若你已遇到该错误,执行修复脚本:`pages/mall/delivery/doc/需求文档/20260310_fix_express_notifications_on_conflict_message_id.sql`。 --- ## 4. 配置与启动 ### 4.1 配置加载优先级 脚本会复用 `server/load-config.js`,加载顺序(只填充未设置的 env): 1) 系统环境变量 2) `CONFIG_FILE` / `CONFIG_PATH` 指定的 `.env` 或 `.json` 3) 同目录 `server/notify-worker.config.json` 4) `server/.env` / `server/config.json` / `server/config.json.example` ### 4.2 关键环境变量 - `SUPA_URL`(必需) - `SERVICE_ROLE_KEY` 或 `SUPA_KEY`(必需,推荐 service_role) - `NOTIFY_WORKER_SUPA_USE_BEARER`(可选,默认 false):仅对 notify-worker 生效,若为 `true` 则发送 `Authorization: Bearer ` - `SUPA_USE_BEARER`(可选,默认 false):全局开关(会影响其它服务;一般不建议在自托管 JWT_SECRET 不一致时开启) RLS 开启且 Bearer/JWT 不可用时的替代方案(慎用/止血): - `NOTIFY_WORKER_USE_RLS_SAFE_RPC`(可选,默认 false):若为 `true`,则不再直查 `ml_orders`,改为调用 `rpc/notify_get_order_recipients`(SECURITY DEFINER)获取 `user_id/merchant_id`。 - `NOTIFY_WORKER_RPC_TOKEN`(当上面为 true 时必需):notify-worker 调用 RPC 时通过请求头 `x-notify-worker-token` 传递;数据库函数会校验该 token。 配套 SQL: - `pages/mall/delivery/doc/需求文档/db/绕过RLS的方案(慎用)/20260311_add_rpc_notify_get_order_recipients.sql` worker 行为: - `NOTIFY_WORKER_POLL_MS`(默认 2000) - `NOTIFY_WORKER_BATCH_SIZE`(默认 20) - `RUN_ONCE=true`(只跑一轮就退出,适合手动验证/CI) ### 4.3 启动命令 在仓库根目录: ```powershell # 常驻运行 node server/notify-worker.js # 只跑一轮(用于验证) $env:RUN_ONCE='true' node server/notify-worker.js ``` 如果你的配置放在 JSON: ```powershell $env:CONFIG_FILE=(Resolve-Path .\server\notify-worker.config.json) node server/notify-worker.js ``` --- ## 5. 管理者“一页摘要”(给非一线同学) ### 业务价值 - 让“物流事件入库”与“消息生成/推送”解耦,降低 webhook 链路复杂度和故障扩散。 - 通过队列表落库,可审计、可重放、可定位卡点。 ### 边界 - notify-worker **只负责生成消息入队**(`express_notifications`),不负责实际推送下发。 - 推送通道成功率与重试投递由 push-server consumer + 云函数负责。 ### 关键依赖 - Supabase/Postgres 表:`notify_queue`、`platform_express_waybills`、`ml_orders`、`express_notifications` - 上游必须保证:有机制把新事件推入 `notify_queue`(触发器/任务/服务均可) ### 主要风险与控制点 - **幂等/重复消息**:依赖 `dedupe_key` 的稳定性;上游写队列时应构造稳定的 dedupe_key(不要用 Date.now 这种随机值)。 - **权限/密钥**:需要 `service_role` 读写多表,必须只放服务器环境变量。 - **数据合规**:`event_text_safe` 应为清洗/脱敏后的文案;若上游传入的是原文,需在生成前做脱敏。 ### 建议监控指标 - `notify_queue` 未处理数量(`processed_at IS NULL`)是否持续增长 - 每分钟处理量、`process_status=failed` 比例与 Top `last_error` - 从队列入库到消息入队的延迟(created_at → processed_at) --- ## 6. 进一步阅读 - push-server(投递消费者):`server/PUSH_SERVER_README.md` - 总览(Webhook + worker + push):`docs/DELIVERY_WEBHOOK_PUSH_SERVER_OVERVIEW.md` - webhook-receiver(事件入库):`pages/mall/delivery/webhook-server/README.md`