6.9 KiB
notify-worker(物流事件 -> 消息入队)说明
面向读者:后端/运维/联调同学。
一句话:notify-worker 是常驻 worker,用于从 notify_queue 消费“新物流事件通知”,并生成(upsert)到 express_notifications,让后续的 push-server consumer 去实际下发推送。
1. 它解决什么问题?
- 解耦:Webhook 入库只保证事实表(
platform_express_*)一致;消息生成用 worker 异步做,避免在 webhook 请求里做过多业务计算。 - 可追溯/可补偿:队列表
notify_queue和消息表express_notifications让每次处理结果可落库(成功/跳过/失败原因)。 - 幂等:对同一事件(同一
dedupe_key)写入消息时,使用稳定的message_id进行 upsert,避免重复生成消息。
在整体链路中的位置:
- webhook-receiver 写入
platform_express_tracking_events - 事件被推入
notify_queue(由触发器/任务/上游逻辑实现) - notify-worker 消费
notify_queue→ 写入express_notifications - push-server consumer 轮询
express_notifications→ 调用CLOUD_FUNC_URL下发
2. 用什么做的(技术栈/依赖)
- 运行时:Node.js(建议 18+,以确保
fetch可用;旧版本会回退到node-fetch) - 通信方式:直接调用 Supabase PostgREST REST API(
$SUPA_URL/rest/v1/...) - 鉴权:默认仅发送
apikeyheader;支持 worker 专用开关NOTIFY_WORKER_SUPA_USE_BEARER=true(或WORKER_SUPA_USE_BEARER=true)来附加Authorization: Bearer ...(否则会回退到全局SUPA_USE_BEARER)。 - 核心依赖:
crypto:计算稳定的message_id(SHA-256)fetch/node-fetch:HTTP 调用 Supabase REST
它不是一个 HTTP Server(不监听端口),而是一个“循环轮询的后台进程”。
3. 输入/输出(读写哪些表)
3.1 输入:notify_queue
notify-worker 会拉取:
processed_at IS NULL的记录- 按
created_at ASC排序 - 每次最多
BATCH_SIZE条
并在处理完成后回写:
processed_at(处理时间)process_status:queued/skipped/failedlast_error(失败/跳过原因,截断保存)
说明:字段命名以当前脚本实现为准,你的表结构需要包含这些列。
3.2 查询依赖
为把“物流事件”映射到收件人(user/merchant),脚本会查询:
platform_express_waybills:取order_id/order_no, carrier, tracking_noml_orders:取user_id, merchant_id
3.3 输出:express_notifications
对每个收件人写入 1 条通知(同一事件会写 user 与 merchant 两条,若都存在):
aud:user或merchantrecipient_idorder_id / waybill_id / tracking_no / carriermessage_id:稳定哈希生成(aud|waybill_id|dedupe_key)dedupe_key:waybill_id|aud|<queue.dedupe_key>(截断到 256)event_text_safe:当前实现直接复用队列行的event_textstatus_code / event_time / payload
写入方式是 upsert:POST express_notifications?on_conflict=message_id,并使用 resolution=merge-duplicates。
注意:数据库侧必须有 普通 唯一索引/约束 express_notifications(message_id);不要用 WHERE message_id IS NOT NULL 的部分唯一索引,否则会触发 42P10。
若你已遇到该错误,执行修复脚本:pages/mall/delivery/doc/需求文档/20260310_fix_express_notifications_on_conflict_message_id.sql。
4. 配置与启动
4.1 配置加载优先级
脚本会复用 server/load-config.js,加载顺序(只填充未设置的 env):
- 系统环境变量
CONFIG_FILE/CONFIG_PATH指定的.env或.json- 同目录
server/notify-worker.config.json server/.env/server/config.json/server/config.json.example
4.2 关键环境变量
SUPA_URL(必需)SERVICE_ROLE_KEY或SUPA_KEY(必需,推荐 service_role)NOTIFY_WORKER_SUPA_USE_BEARER(可选,默认 false):仅对 notify-worker 生效,若为true则发送Authorization: Bearer <SUPA_KEY>SUPA_USE_BEARER(可选,默认 false):全局开关(会影响其它服务;一般不建议在自托管 JWT_SECRET 不一致时开启)
RLS 开启且 Bearer/JWT 不可用时的替代方案(慎用/止血):
NOTIFY_WORKER_USE_RLS_SAFE_RPC(可选,默认 false):若为true,则不再直查ml_orders,改为调用rpc/notify_get_order_recipients(SECURITY DEFINER)获取user_id/merchant_id。NOTIFY_WORKER_RPC_TOKEN(当上面为 true 时必需):notify-worker 调用 RPC 时通过请求头x-notify-worker-token传递;数据库函数会校验该 token。
配套 SQL:
pages/mall/delivery/doc/需求文档/db/绕过RLS的方案(慎用)/20260311_add_rpc_notify_get_order_recipients.sql
worker 行为:
NOTIFY_WORKER_POLL_MS(默认 2000)NOTIFY_WORKER_BATCH_SIZE(默认 20)RUN_ONCE=true(只跑一轮就退出,适合手动验证/CI)
4.3 启动命令
在仓库根目录:
# 常驻运行
node server/notify-worker.js
# 只跑一轮(用于验证)
$env:RUN_ONCE='true'
node server/notify-worker.js
如果你的配置放在 JSON:
$env:CONFIG_FILE=(Resolve-Path .\server\notify-worker.config.json)
node server/notify-worker.js
5. 管理者“一页摘要”(给非一线同学)
业务价值
- 让“物流事件入库”与“消息生成/推送”解耦,降低 webhook 链路复杂度和故障扩散。
- 通过队列表落库,可审计、可重放、可定位卡点。
边界
- notify-worker 只负责生成消息入队(
express_notifications),不负责实际推送下发。 - 推送通道成功率与重试投递由 push-server consumer + 云函数负责。
关键依赖
- Supabase/Postgres 表:
notify_queue、platform_express_waybills、ml_orders、express_notifications - 上游必须保证:有机制把新事件推入
notify_queue(触发器/任务/服务均可)
主要风险与控制点
- 幂等/重复消息:依赖
dedupe_key的稳定性;上游写队列时应构造稳定的 dedupe_key(不要用 Date.now 这种随机值)。 - 权限/密钥:需要
service_role读写多表,必须只放服务器环境变量。 - 数据合规:
event_text_safe应为清洗/脱敏后的文案;若上游传入的是原文,需在生成前做脱敏。
建议监控指标
notify_queue未处理数量(processed_at IS NULL)是否持续增长- 每分钟处理量、
process_status=failed比例与 Toplast_error - 从队列入库到消息入队的延迟(created_at → processed_at)
6. 进一步阅读
- push-server(投递消费者):
server/PUSH_SERVER_README.md - 总览(Webhook + worker + push):
docs/DELIVERY_WEBHOOK_PUSH_SERVER_OVERVIEW.md - webhook-receiver(事件入库):
pages/mall/delivery/webhook-server/README.md