消息推送

This commit is contained in:
not-like-juvenile
2026-03-10 16:39:50 +08:00
parent ee9fabd806
commit e67016a6f4
18 changed files with 1176 additions and 49 deletions

View File

@@ -0,0 +1,215 @@
# 物流 Webhook → 通知入库 → 推送到手机联调问题记录2026-03-10
本文记录本次将链路跑通时遇到的阻塞点、根因与修复方式,便于后续环境复现与排障。
目标链路:
1) webhook-receiver 接收物流回调并落库
2) DB/服务侧写入 `notify_queue`
3) `server/notify-worker.js` 消费 `notify_queue`,生成 `express_notifications`
4) `server/push-server.js` 的 consumer 轮询 `express_notifications`,调用 `CLOUD_FUNC_URL` 推送到设备 `cid`
---
## 1. 症状notify-worker 报 `order not found for waybill`
**现象**
- `notify_queue.process_status=skipped``failed`
- `last_error=order not found for waybill`
**根因**
- `public.ml_orders` 启用了 RLS。
- 后端通过 PostgREST 仅使用 `apikey`(或无法解码的 Bearer访问时`auth.uid()` 为空RLS policy 不成立,导致对 `ml_orders` 的查询返回 `200 []`(看起来像“无数据”,实为被行级安全过滤)。
**验证方法**
- 对比SQL Editor 里能查到订单,但经 `/rest/v1/ml_orders` 查不到。
**临时修复(用于快速跑通链路)**
```sql
ALTER TABLE public.ml_orders DISABLE ROW LEVEL SECURITY;
```
**恢复建议(生产化方向)**
- 不建议长期关闭 RLS。
- 建议让后端使用可被 PostgREST 正确解码/信任的 service_role JWT或通过 RPC/安全视图完成必要查询;避免依赖 `DISABLE RLS`
---
## 2. 症状:写入 `express_notifications` 时报 `42P10`
**现象**
- `notify_queue.process_status=failed`
- `last_error` 包含:
- `there is no unique or exclusion constraint matching the ON CONFLICT specification`
**根因**
- notify-worker / push-server 使用 PostgREST upsert
- `POST /rest/v1/express_notifications?on_conflict=message_id`
- 但数据库侧 `express_notifications(message_id)` 只有“部分唯一索引”(例如 `WHERE message_id IS NOT NULL`),无法匹配 `ON CONFLICT(message_id)`,触发 `42P10`
**修复(改为普通唯一索引)**
- 执行脚本:
- `pages/mall/delivery/doc/需求文档/20260310_fix_express_notifications_on_conflict_message_id.sql`
核心 SQL摘要
```sql
DROP INDEX IF EXISTS public.ux_express_notifications_message_id;
CREATE UNIQUE INDEX IF NOT EXISTS ux_express_notifications_message_id
ON public.express_notifications(message_id);
```
**修复后验证**
- `notify_queue` 最新记录不再失败;`express_notifications` 能正常生成/更新。
---
## 3. 症状push-server 启动失败 `EADDRINUSE 0.0.0.0:7301`
**现象**
- `node server/push-server.js` 直接退出
- 提示端口占用
**根因**
- 7301 端口已有 node 进程占用(重复启动或遗留进程)
**排查/处理Windows**
```powershell
netstat -ano | Select-String ":7301 "
Get-Process -Id <PID>
Stop-Process -Id <PID> -Force
```
---
## 4. 症状PGRST301JWT 无法解码)导致访问异常
**现象**
- PostgREST 返回 `PGRST301` 或“JWT cannot be decoded/verified”
**根因**
- 在自托管/网关配置不一致时,手动发送 `Authorization: Bearer <SUPA_KEY>` 可能触发 JWT 解码失败。
- 本仓库对 Supabase REST 调用默认只发 `apikey`,仅在显式 `SUPA_USE_BEARER=true` 时才附加 Bearer。
**处理建议**
- 保持 `SUPA_USE_BEARER=false`(默认)
- 使用可用的 `apikey/service_role key` 走 Kong key-auth
相关实现:
- `server/push-server.js``supaFetch` 逻辑
---
## 5. 症状:数据库显示 `send_status=success`,但手机没弹通知
**现象**
- `express_notifications.send_status=success`
- 但手机端没有系统通知/横幅
**常见原因**
1) 推送内容为空(部分通道/客户端不会展示空内容通知)
2) 客户端处于前台/透传模式:消息到达但不自动弹系统通知(需要客户端本地展示)
3) 手机系统通知权限/渠道被关闭、省电策略限制
**本次定位与修复**
- notify-worker 生成的 `express_notifications` 通常只有 `event_text_safe`可作为标题body/content 可能为空。
- push-server consumer 调云函数时原先传入的 `content` 可能为空,导致“云函数返回成功,但不展示”。
- 已在 `server/push-server.js` 增加兜底:当 body/content 为空时,使用 title/event_text_safe 作为 content。
验证方法(直接调用云函数对 CID 推送)
- 观察云函数返回中是否包含类似 `successed_online`;并对比手机是否实际展示。
---
## 6. 症状merchant 侧 `send_status=no-targets`
**现象**
- `express_notifications.aud=merchant` 的记录 `send_status=no-targets`
- `last_error=no active devices`
**根因**
- 设备绑定表 `push_devices` 仅存在 `user_id` 绑定,没有 `merchant_id` 绑定。
**处理建议**
- 让商家端也调用注册接口写入 `merchant_id` 维度的设备(或插入对应记录)。
- push-server consumer 对 merchant 的设备查询路径是:
- `push_devices?merchant_id=eq.<recipient_id>&is_active=eq.true`
---
## 7. 端到端自检清单(最短路径)
1) 触发 webhook 测试:
- `node pages/mall/delivery/webhook-server/test-send.js`
2) 查队列:
- `notify_queue` 最新应为 `process_status=queued``last_error=null`
3) 查通知:
- `express_notifications` 最新应生成 `aud=user``aud=merchant` 两条
- `aud=user` 通常应推进到 `send_status=success`
4) 查设备:
- `push_devices` 中对应 `user_id`/`merchant_id` 必须存在 `is_active=true``cid`
---
## 7.1 Windows 一键启动三服务(建议)
为了保证与联调时一致的效果,推荐用脚本一次性启动 3 个后台进程(并落日志):
1) 启动(会默认释放 7201/7301 端口占用):
- `powershell -ExecutionPolicy Bypass -File .\server\scripts\start-delivery-backend.ps1`
2) 触发与联调一致的 webhook 测试(打到本机 7201
- `node .\pages\mall\delivery\webhook-server\test-send.js`
3) 停止:
- `powershell -ExecutionPolicy Bypass -File .\server\scripts\stop-delivery-backend.ps1`
日志文件默认在仓库根目录:
- `webhook-receiver.log/.err.log`
- `notify-worker.log/.err.log`
- `push-server.log/.err.log`
PID 文件(用于停止/排查进程)在:
- `server/.runtime/delivery-backend.pids.json`
查看当前是否仍在运行(示例):
- `Get-Content .\server\.runtime\delivery-backend.pids.json`
- `Get-Process -Id <PID>`
注意:`test-send.js` 默认请求 `http://localhost:7201/webhook/express/status`,所以必须先启动 webhook-receiver。
如果 `test-send.js` 返回 `{ ok: false, message: 'waybill not found' }`
- 说明数据库里没有 `platform_express_waybills.tracking_no = 'TEST_YT_20260206_0007'` 的运单记录。
- 处理方式:先在 `platform_express_waybills` 补一条对应 tracking_no 的运单(并关联到你的 `ml_orders`),再重试发送。
---
## 8. 常用 SQL / 操作片段
### 8.1 重跑某条队列记录(把 failed/skipped 重置为可再次消费)
```sql
UPDATE public.notify_queue
SET processed_at = NULL,
process_status = NULL,
last_error = NULL
WHERE id = '<notify_queue.id>';
```
### 8.2 检查 message_id 是否存在重复(创建唯一索引前)
```sql
SELECT message_id, COUNT(*)
FROM public.express_notifications
WHERE message_id IS NOT NULL
GROUP BY message_id
HAVING COUNT(*) > 1;
```
### 8.3 查看 `push_devices` 是否有绑定
```sql
SELECT *
FROM public.push_devices
WHERE user_id = '<user_id>' OR merchant_id = '<merchant_id>'
ORDER BY updated_at DESC
LIMIT 20;
```

View File

@@ -53,9 +53,13 @@
- `platform_express_waybills`(运单摘要 current_status_*
2) **消息生成段**(通常是“事件处理器/任务/触发器”,不在这两个服务里)
- 监听/轮询 `platform_express_tracking_events`新增事件
- 按推送策略(关键状态、去噪、幂等)生成“消息中心记录/推送任务”
- 写入 `express_notifications`或调用 push-server 的 HTTP 接口让其写入
- 触发器把 `platform_express_tracking_events`关键状态事件写入队列表 `notify_queue`
- 常驻 `notify-worker` 消费 `notify_queue`按推送策略(关键状态、去噪、幂等)生成“消息中心记录/推送任务”
- 写入 `express_notifications`push-server consumer 会复用该表进行下发
相关实现(仓库内):
- 迁移脚本:`pages/mall/delivery/doc/需求文档/20260309_add_notify_queue_and_trigger.sql`
- 常驻 worker`server/notify-worker.js`
3) **推送下发段**Push Server + 云函数负责)
- push-server consumer 读取 `express_notifications` 的待处理记录
@@ -72,8 +76,10 @@ flowchart LR
WH --> EV[(platform_express_tracking_events)]
WH --> WB[(platform_express_waybills)]
EV --> EP[事件处理器/任务/触发器]
EP --> N[(express_notifications)]
EV --> TRG[DB Trigger]
TRG --> Q[(notify_queue)]
Q --> W[notify-worker]
W --> N[(express_notifications)]
App[App 客户端] -->|注册CID| PS[push-server :7301]
PS --> D[(push_devices)]
@@ -177,6 +183,14 @@ flowchart LR
- `public.express_notifications`
- 存储待发通知、状态、重试信息等(用于消息中心/推送队列)
字段语义(重要,避免联调误判):
- `status_code`:物流/业务状态(例如 `OUT_FOR_DELIVERY/DELIVERED/EXCEPTION`),由 webhook→tracking_events→notify-worker 生成。
- `send_status`:投递处理状态(`null`=待发送,`processing/retrying/success/failed/no-targets`),由 push-server consumer 读写。
迁移脚本:
- 新环节(入队+worker`pages/mall/delivery/doc/需求文档/20260309_add_notify_queue_and_trigger.sql`
- 字段拆分consumer 必需):`pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql`
迁移脚本参考:`pages/mall/delivery/doc/需求文档/20260224_add_push_devices_and_notifications.sql`
### 4.4 关键配置env
@@ -191,6 +205,38 @@ flowchart LR
- `PUSH_TOKEN`:可选鉴权透传给云函数
- 重试配置:`MAX_RETRIES / RETRY_INITIAL_MS / RETRY_FACTOR / RETRY_MAX_MS`
### 4.4.1 云函数过期/URL 变更时,改哪里?
现象云函数链接过期或更换后push-server 下发会出现 HTTP 4xx/5xx 或网络错误(日志里通常能看到调用 `CLOUD_FUNC_URL` 失败),并导致 `express_notifications.send_status` 长期处于 `retrying/failed`
需要修改的配置键:
- `CLOUD_FUNC_URL`:云函数 HTTP invoke 地址push-server consumer 与 `/api/v1/push/send` 都依赖它)
- (如云函数鉴权也变化)`PUSH_TOKEN`
配置可能存在于以下位置(按优先级理解即可):
1) **系统环境变量**(临时验证/单次启动最方便)
2) **显式配置文件**:通过 `CONFIG_FILE/CONFIG_PATH` 指定的 `.json``.env`
3) **默认配置文件**`server/config.json``server/load-config.js` 会读取并注入 env
最常用改法(推荐):修改 `server/config.json` 里的 `CLOUD_FUNC_URL`,然后重启 push-server。
PowerShell 快速验证(不改文件,先确认新 URL 可用):
```powershell
$env:CLOUD_FUNC_URL='https://new-cloudfunc.example/invoke'
node server/push-server.js
```
改完必须做的动作:
- **重启 push-server**`CLOUD_FUNC_URL` 在进程启动时读取,改配置后不重启不会生效。
CI/自动化(如果你用了 smoke test
- GitHub Actions / CI 里若有云函数 smoke test通常还需要同步更新 Secrets例如 `CLOUD_FUNC_URL`)。
最小 smoke test确认云函数通不通
```powershell
curl.exe -sS -X POST "$env:CLOUD_FUNC_URL" -H "Content-Type: application/json" -d "{\"token\":\"$env:PUSH_TOKEN\",\"push_clientid\":\"<DEVICE_CID>\",\"title\":\"ping\",\"content\":\"pong\",\"payload\":{}}"
```
### 4.5 常见问题(定位方向)
- Supabase 报 `PGRST301` / 401通常是 Bearer/JWT_SECRET 不匹配导致,优先只用 `apikey``SUPA_USE_BEARER=false`)。
- `/api/v1/push/send` 返回 `push_clientid required`:云函数侧没有正确解析请求体(常见于 HTTP 触发器把 body 放在 `event.body`)。
@@ -199,6 +245,65 @@ flowchart LR
---
## 4.6 notify-worker消息生成入队
### 4.6.1 作用(它负责哪一段)
**一句话**:从 `notify_queue` 消费“需要生成消息的事件”,把它转换成 `express_notifications`(消息中心记录/推送任务),供 push-server consumer 后续下发。
它关注的是:
- **把事件变成消息**:把“物流事件”落成“可按 user/merchant 查询的消息记录”。
- **收件人映射**:根据运单 → 订单,解析出 `user_id` / `merchant_id`
- **幂等入队**:对同一事件避免重复生成(靠稳定的 `message_id` upsert
> 边界notify-worker **不负责推送下发**;真正调用 `CLOUD_FUNC_URL` 的是 push-server consumer。
### 4.6.2 位置与入口
- 代码:`server/notify-worker.js`
- 说明文档(更详细):`server/NOTIFY_WORKER_README.md`
### 4.6.3 核心行为(做了什么)
1) 拉取待处理队列:`notify_queue?processed_at=is.null`(按 `created_at` 升序,批量处理)。
2) 解析业务上下文:
-`platform_express_waybills``order_id/order_no, tracking_no, carrier`
-`ml_orders``user_id, merchant_id`
3) 为每个受众写消息:
-`user_id` 存在:写 `aud='user'`
-`merchant_id` 存在:写 `aud='merchant'`
- 写入方式upsert 到 `express_notifications``on_conflict=message_id`),避免重复。
4) 回写队列处理结果:把 `notify_queue.processed_at` 置上,并写 `process_status/last_error`queued/skipped/failed
### 4.6.4 数据依赖(读/写哪些表)
- 读取:`notify_queue``platform_express_waybills``ml_orders`
- 写入:`express_notifications`
关键字段口径:
- `notify_queue.dedupe_key`:应尽量稳定(同一事件重复到达也一致),否则会造成重复消息。
- `express_notifications.message_id`:脚本按 `aud|waybill_id|dedupe_key` 计算 hash 生成,用于幂等。
### 4.6.5 配置与运行(最小要点)
配置加载与 webhook/push-server 一致(复用 `server/load-config.js`),并支持同目录 `server/notify-worker.config.json`
常用环境变量:
- `SUPA_URL`(必需)
- `SERVICE_ROLE_KEY``SUPA_KEY`(必需,推荐 service_role
- `SUPA_USE_BEARER`(可选,默认 false
- `NOTIFY_WORKER_POLL_MS`(默认 2000
- `NOTIFY_WORKER_BATCH_SIZE`(默认 20
- `RUN_ONCE=true`(只跑一轮便退出,适合验证)
启动示例PowerShell
```powershell
node server/notify-worker.js
```
### 4.6.6 常见问题(定位方向)
- 队列一直堆积:看 `notify-worker.err.log`/控制台错误;重点检查 `SUPA_URL/SERVICE_ROLE_KEY` 是否正确、表/字段是否存在。
- 都是 skipped通常是运单找不到订单`platform_express_waybills``order_id/order_no``ml_orders` 查不到)。
- 重复消息:检查上游写入 `notify_queue.dedupe_key` 是否稳定(避免 `Date.now()` 这种随机值)。
- RLS/403优先使用 `SERVICE_ROLE_KEY`,并确保 PostgREST 有权限访问相关表。
---
## 5. 给联调/运维的“最小心智模型”
- Webhook-receiver 只负责:**接收** & **入库**`platform_express_*`)。
@@ -259,10 +364,72 @@ flowchart LR
---
## 5.2 联调/运维速记2026-03-10
下面是本仓库在“webhook → 入库 → 入队 → notify-worker 生成通知 → push-server consumer 调云函数推送到手机”端到端跑通时,最容易踩到的坑与对应结论(只写要点;细节见排障文档)。
更详细的排障与 SQL 清单:`docs/DELIVERY_E2E_TROUBLESHOOTING_20260310.md`
### 5.2.1 最常见的故障与根因(按出现频率)
1) **notify-worker 报 “order not found for waybill” 但数据库明明有订单**
- 典型根因:`ml_orders` 开了 RLSPostgREST 用 `apikey` 查询会返回空数组(看起来像“没数据”)。
- 处理建议:生产上用 `SERVICE_ROLE_KEY` 或服务端 JWT + 正确策略;联调阶段可以临时放开/调整策略以确认链路。
2) **写 `express_notifications` upsert 报 `42P10`ON CONFLICT 找不到唯一约束)**
- 典型根因表上是“部分唯一索引partial unique index而 PostgREST 的 `on_conflict=message_id` 只能匹配“普通 UNIQUE 约束/索引”。
- 处理建议:确保存在普通唯一索引:`UNIQUE(message_id)`(不要带 WHERE 条件)。
3) **push-server 启动失败:`EADDRINUSE` / 7301 端口被占用**
- 典型根因:旧进程未退出、或端口被其它程序占用。
- 处理建议:先释放端口再启动;一键启动脚本会自动尝试清理 7201/7301。
4) **日志显示云函数调用 success但手机不弹通知**
- 典型根因:下发的 `content/body` 为空或过短,在某些系统/机型上会被折叠为“不展示”。
- 处理建议:确保推送请求里至少有稳定的 `title` + `content`(本仓库已在 consumer 侧做了 content 兜底)。
5) **`send_status=no-targets`(找不到目标设备)**
- 典型根因:`push_devices` 里没有对应 `user_id/merchant_id` 的活跃 `cid`(未注册/已注销/绑定字段为空)。
- 处理建议:先用 App 触发注册接口写入设备表;若要商家端收推送,必须正确绑定 `merchant_id`
### 5.2.2 Windows 下一键启动/停止(后台常驻)
为方便“可运维 + 可自启动”,仓库提供了 PowerShell 脚本把 3 个常驻进程统一拉起/停掉:
- 启动(会尽量释放 7201/7301并把进程 PID 写入 runtime 文件):
```powershell
powershell -ExecutionPolicy Bypass -File .\server\scripts\start-delivery-backend.ps1
```
- 停止(按 PID 文件停止进程):
```powershell
powershell -ExecutionPolicy Bypass -File .\server\scripts\stop-delivery-backend.ps1
```
- PID 文件位置(便于运维查进程/做自启接入):
- `server/.runtime/delivery-backend.pids.json`
> 说明:脚本按自身路径定位仓库 root因此可以在任意工作目录执行并会按 `server/load-config.js` 的规则加载配置(最常见是 `server/config.json`)。`CLOUD_FUNC_URL` 等配置变更后需重启进程才会生效。
### 5.2.3 快速复现(和联调一致的 E2E
1) 先启动后台(见上节)。
2) 触发 webhook 测试请求:
```powershell
node .\pages\mall\delivery\webhook-server\test-send.js
```
3) 观察链路:
- webhook-receiver 日志是否收到请求;若返回 `{ ok:false, message:'waybill not found' }`,先补齐 `platform_express_waybills` 对应运单。
- notify-worker 是否把队列变成 `express_notifications`
- push-server consumer 是否把 `send_status` 推进到 `success`(或 `no-targets/retrying/failed`)。
---
## 6. 进一步阅读(从“总览”到“可落地”)
- webhook-receiver`pages/mall/delivery/webhook-server/README.md`
- push-server运行与变更记录`server/PUSH_SERVER_README.md`
- notify-worker消息生成入队`server/NOTIFY_WORKER_README.md`
- push-server消费者与云函数约定`server/README.md`
- 配置加载器:`server/load-config.js`
- 业务方案与隐私口径:

View File

@@ -144,8 +144,12 @@ CREATE INDEX IF NOT EXISTS idx_express_notifications_created_at ON public.expres
CREATE INDEX IF NOT EXISTS idx_express_notifications_read_at ON public.express_notifications(read_at);
-- 若使用 message_id 做幂等(外部系统/队列),则建立唯一索引
-- 注意这里不能使用“部分唯一索引WHERE message_id IS NOT NULL
-- 否则 PostgREST 的 upsert `?on_conflict=message_id` 会触发 42P10
-- "there is no unique or exclusion constraint matching the ON CONFLICT specification"
-- 普通 UNIQUE INDEX 仍允许多个 NULL符合历史兼容
CREATE UNIQUE INDEX IF NOT EXISTS ux_express_notifications_message_id
ON public.express_notifications(message_id) WHERE message_id IS NOT NULL;
ON public.express_notifications(message_id);
-- =====================================================
-- C. 兼容性与外键(若目标表存在则添加外键约束;若不存在则保留字段)

View File

@@ -0,0 +1,26 @@
-- =====================================================================================
-- Add send_status to express_notifications
--
-- 目的区分“物流状态status_code”与“投递处理状态send_status”。
-- - status_code业务/物流状态SHIPPED/OUT_FOR_DELIVERY/...
-- - send_status投递状态null=待发送, processing, retrying, success, failed, no-targets
--
-- 创建日期2026-03-09
-- =====================================================================================
BEGIN;
ALTER TABLE public.express_notifications
ADD COLUMN IF NOT EXISTS send_status VARCHAR(32) NULL;
CREATE INDEX IF NOT EXISTS idx_express_notifications_send_status
ON public.express_notifications(send_status);
-- 兼容旧实现:历史上 push-server consumer 使用 status_code 存投递状态。
-- 迁移后 consumer 改读写 send_status为避免把旧的 success/failed 等记录当成 pending 再次推送,做一次安全回填。
UPDATE public.express_notifications
SET send_status = status_code
WHERE send_status IS NULL
AND status_code IN ('processing', 'retrying', 'success', 'failed', 'no-targets');
COMMIT;

View File

@@ -0,0 +1,102 @@
-- =====================================================================================
-- notify_queue + trigger: platform_express_tracking_events -> notify_queue
--
-- 目的:把“轨迹事件入库”和“消息生成/推送”解耦。
-- - Webhook/轮询/手工写入 tracking_events 后,由触发器把关键事件入队到 notify_queue。
-- - 常驻 worker 消费 notify_queue生成 express_notifications消息中心/推送任务)。
-- - push-server consumer 轮询 express_notifications 并调用 CLOUD_FUNC_URL 进行实际下发。
--
-- 创建日期2026-03-09
-- =====================================================================================
BEGIN;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- 队列表:仅做“轻量入队”,避免触发器做外部 IO
CREATE TABLE IF NOT EXISTS public.notify_queue (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
waybill_id UUID NOT NULL,
carrier VARCHAR(32) NULL,
tracking_no VARCHAR(64) NULL,
event_id VARCHAR(128) NULL,
status_code VARCHAR(32) NOT NULL,
event_time TIMESTAMP WITH TIME ZONE NOT NULL,
event_text TEXT NULL,
source VARCHAR(16) NULL,
-- 与 platform_express_tracking_events 对齐的幂等键
dedupe_key VARCHAR(256) NOT NULL,
raw_payload JSONB NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE NULL,
process_status VARCHAR(32) NULL,
last_error TEXT NULL,
CONSTRAINT uk_notify_queue_dedupe UNIQUE (waybill_id, dedupe_key)
);
CREATE INDEX IF NOT EXISTS idx_notify_queue_processed_at ON public.notify_queue(processed_at);
CREATE INDEX IF NOT EXISTS idx_notify_queue_created_at ON public.notify_queue(created_at);
CREATE INDEX IF NOT EXISTS idx_notify_queue_status_code ON public.notify_queue(status_code);
-- 触发器函数:入队关键状态事件
CREATE OR REPLACE FUNCTION public.notify_new_tracking_event()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $func$
DECLARE
should_enqueue BOOLEAN := FALSE;
BEGIN
-- 推送策略MVP只对关键状态入队
IF NEW.status_code IN ('SHIPPED','OUT_FOR_DELIVERY','READY_FOR_PICKUP','DELIVERED','EXCEPTION','RETURNED') THEN
should_enqueue := TRUE;
END IF;
IF should_enqueue THEN
INSERT INTO public.notify_queue(
waybill_id,
carrier,
tracking_no,
event_id,
status_code,
event_time,
event_text,
source,
dedupe_key,
raw_payload
) VALUES (
NEW.waybill_id,
NEW.carrier,
NEW.tracking_no,
NEW.event_id,
NEW.status_code,
NEW.event_time,
NEW.event_text,
NEW.source,
NEW.dedupe_key,
NEW.raw_payload
)
ON CONFLICT (waybill_id, dedupe_key) DO NOTHING;
END IF;
RETURN NEW;
END;
$func$;
-- 触发器tracking_events 写入后入队
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger WHERE tgname = 'trigger_notify_new_tracking_event'
) THEN
CREATE TRIGGER trigger_notify_new_tracking_event
AFTER INSERT ON public.platform_express_tracking_events
FOR EACH ROW EXECUTE FUNCTION public.notify_new_tracking_event();
END IF;
END $$;
COMMIT;

View File

@@ -0,0 +1,30 @@
-- =====================================================================================
-- Fix express_notifications upsert conflict target (message_id)
-- 目的:修复 notify-worker / push-server 在 upsert 时遇到的 42P10
-- "there is no unique or exclusion constraint matching the ON CONFLICT specification"
-- 原因历史迁移可能创建了部分唯一索引WHERE message_id IS NOT NULL
-- PostgREST 的 upsert `?on_conflict=message_id` 无法匹配该索引。
-- 方案:改为普通 UNIQUE INDEX (message_id)。Postgres UNIQUE 允许多条 NULL兼容旧数据。
-- 创建日期2026-03-10
-- =====================================================================================
BEGIN;
-- 1) 预检查:是否存在重复的非空 message_id若存在创建唯一索引会失败
-- 如有返回结果,请先人工去重后再继续执行后续语句。
-- 示例去重策略:保留最新 created_at其它行将 message_id 置为 NULL 或删除重复行。
--
-- SELECT message_id, COUNT(*)
-- FROM public.express_notifications
-- WHERE message_id IS NOT NULL
-- GROUP BY message_id
-- HAVING COUNT(*) > 1;
-- 2) 删除历史“部分唯一索引”(如果存在)
DROP INDEX IF EXISTS public.ux_express_notifications_message_id;
-- 3) 创建普通唯一索引,让 `on_conflict=message_id` 正常工作
CREATE UNIQUE INDEX IF NOT EXISTS ux_express_notifications_message_id
ON public.express_notifications(message_id);
COMMIT;

View File

@@ -10,7 +10,8 @@
- `SUPA_USE_BEARER`(可选):是否附加 `Authorization: Bearer <SUPA_KEY>`,默认 `false`
- 在一些自托管 Supabase/Kongkey-auth环境中**只需要** `apikey`;如果误加 Bearer 且 key 不是 JWT可能出现 `PGRST301`"None of the keys was able to decode the JWT")。
- `WEBHOOK_SECRET`(可选):与第三方共享的 HMAC-SHA256 secret用于校验 `X-Signature`(签名为 hex
- `PORT`(可选):接收器监听端口,默认 `7201`
- `WEBHOOK_PORT`(可选):接收器监听端口,默认 `7201`(推荐用这个,便于与 push-server 共享同一份 `server/config.json`
- `PORT`(可选):接收器监听端口(兼容旧用法;若共享 `server/config.json` 且其中 `PORT=7301`,会导致端口冲突)
配置方式(推荐用配置文件,避免与其他服务端口冲突):
- **同目录配置文件(推荐)**:在 `webhook-receiver.js` 同目录放置 `webhook.config.json`,启动时会自动读取。

View File

@@ -26,9 +26,11 @@ const fetch = (globalThis.fetch ? globalThis.fetch.bind(globalThis) : (() => {
})())
const crypto = require('crypto')
const PORT = process.env.PORT || 7201
// 支持服务专用端口,避免与 push-server 共用 server/config.json 时发生端口冲突
const PORT = process.env.WEBHOOK_PORT || process.env.PORT || 7201
const SUPA_URL = (process.env.SUPA_URL || process.env.SUPA_URL_OVERRIDE || '').replace(/\/$/, '')
const SUPA_KEY = process.env.SUPA_KEY || process.env.SERVICE_ROLE_KEY || ''
// Prefer service role key when present (server-side), to avoid RLS issues.
const SUPA_KEY = process.env.SERVICE_ROLE_KEY || process.env.SUPA_KEY || ''
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET || '' // optional HMAC secret
function supaFetch(path, opts = {}) {
@@ -51,6 +53,21 @@ function computeSignature(bodyText, ts) {
return h.digest('hex')
}
function stableEventDedupeKey({ tracking_no, carrier, status_code, event_time, event_code, event_text }) {
// 目的:让 webhook 重复回调(同一事件)不会重复入库,从而避免重复入队/重复通知。
// 注意:优先使用稳定字段组合;不要使用 Date.now() 这类易变字段。
const base = JSON.stringify({
tracking_no: tracking_no || null,
carrier: carrier || null,
status_code: status_code || null,
event_time: event_time || null,
event_code: event_code || null,
event_text: (event_text || '').trim().slice(0, 200)
})
const hex = crypto.createHash('sha256').update(base).digest('hex')
return 'WH_' + hex.slice(0, 32)
}
async function upsertRaw(payload, tracking_no, carrier, signature_valid) {
try {
const body = {
@@ -72,10 +89,10 @@ async function upsertRaw(payload, tracking_no, carrier, signature_valid) {
}
}
async function findWaybillId(tracking_no, order_no) {
async function findWaybill(tracking_no, order_no) {
try {
if (tracking_no) {
const r = await supaFetch(`platform_express_waybills?tracking_no=eq.${encodeURIComponent(tracking_no)}`)
const r = await supaFetch(`platform_express_waybills?tracking_no=eq.${encodeURIComponent(tracking_no)}&select=id,carrier,tracking_no,order_no,order_id`)
if (!r.ok) {
const txt = await r.text().catch(() => '')
const err = new Error(`Supabase query failed (tracking_no): HTTP ${r.status} ${txt}`)
@@ -83,10 +100,10 @@ async function findWaybillId(tracking_no, order_no) {
throw err
}
const data = await r.json()
if (data && data.length > 0) return data[0].id
if (data && data.length > 0) return data[0]
}
if (order_no) {
const r2 = await supaFetch(`platform_express_waybills?order_no=eq.${encodeURIComponent(order_no)}`)
const r2 = await supaFetch(`platform_express_waybills?order_no=eq.${encodeURIComponent(order_no)}&select=id,carrier,tracking_no,order_no,order_id`)
if (!r2.ok) {
const txt2 = await r2.text().catch(() => '')
const err2 = new Error(`Supabase query failed (order_no): HTTP ${r2.status} ${txt2}`)
@@ -94,11 +111,11 @@ async function findWaybillId(tracking_no, order_no) {
throw err2
}
const data2 = await r2.json()
if (data2 && data2.length > 0) return data2[0].id
if (data2 && data2.length > 0) return data2[0]
}
return null
} catch (e) {
console.warn('findWaybillId error', e && e.message ? e.message : e)
console.warn('findWaybill error', e && e.message ? e.message : e)
throw e
}
}
@@ -135,12 +152,23 @@ async function updateWaybill(id, status_code, text) {
async function insertEvent(event) {
try {
await supaFetch('platform_express_tracking_events', {
// 幂等插入:基于 (waybill_id, dedupe_key) 的唯一约束忽略重复。
const resp = await supaFetch('platform_express_tracking_events?on_conflict=waybill_id,dedupe_key', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: { 'Content-Type': 'application/json', Prefer: 'resolution=ignore-duplicates' },
body: JSON.stringify(event)
})
} catch (e) { console.warn('insertEvent error', e) }
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
console.warn('insertEvent failed:', `HTTP ${resp.status}`, txt)
return { ok: false, status: resp.status, body: txt }
}
return { ok: true }
} catch (e) {
console.warn('insertEvent error', e)
return { ok: false, status: 0, body: (e && e.message) ? e.message : String(e) }
}
}
async function start() {
@@ -164,19 +192,24 @@ async function start() {
sigValid = calc === String(sig)
}
// persist raw
await upsertRaw(req.body || {}, req.body && (req.body.mailNo || req.body.tracking_no), req.body && req.body.carrier, sigValid)
// persist raw (best-effort)
await upsertRaw(
req.body || {},
req.body && (req.body.mailNo || req.body.tracking_no),
req.body && (req.body.carrier || req.body.company),
sigValid
)
// find waybill
const tracking_no = req.body && (req.body.mailNo || req.body.tracking_no)
const order_no = req.body && (req.body.txLogisticId || req.body.order_no)
const carrier = req.body && req.body.carrier ? req.body.carrier : (req.body && req.body.company || null)
const carrierIn = req.body && (req.body.carrier || req.body.company || null)
const event_code = req.body && (req.body.infoContent || req.body.status_code || req.body.event_code)
const event_text = req.body && (req.body.remark || req.body.event_text || '')
let waybillId = null
let waybill = null
try {
waybillId = await findWaybillId(tracking_no, order_no)
waybill = await findWaybill(tracking_no, order_no)
} catch (e) {
const status = e && e.status ? Number(e.status) : 0
if (status === 401 || status === 403) {
@@ -184,11 +217,14 @@ async function start() {
}
return res.status(502).json({ ok: false, message: 'supabase query failed' })
}
if (!waybillId) {
if (!waybill || !waybill.id) {
// Waybill not found — respond 200 but inform caller in body.
return res.status(200).json({ ok: false, message: 'waybill not found' })
}
const waybillId = waybill.id
const carrier = carrierIn || waybill.carrier || null
const status_code = mapStatus(event_code)
// update waybill
@@ -205,18 +241,26 @@ async function start() {
}
// insert event
const received_at = new Date().toISOString()
const dedupe_key = stableEventDedupeKey({ tracking_no, carrier, status_code, event_time, event_code, event_text })
const eventPayload = {
waybill_id: waybillId,
carrier: carrier,
tracking_no: tracking_no || null,
tracking_no: tracking_no || waybill.tracking_no || null,
received_at,
source: 'webhook',
event_id: dedupe_key,
event_time: event_time,
event_code: event_code || 'UNKNOWN',
event_text: event_text || '',
status_code: status_code,
raw_payload: req.body || {},
dedupe_key: 'WEBHOOK_' + Date.now()
dedupe_key
}
const ins = await insertEvent(eventPayload)
if (!ins || ins.ok !== true) {
return res.status(200).json({ ok: false, message: 'insert tracking_event failed (see webhook-receiver logs)' })
}
await insertEvent(eventPayload)
return res.json({ ok: true })
})

View File

@@ -0,0 +1,5 @@
{
"push-server": 27380,
"webhook-receiver": 13140,
"notify-worker": 37804
}

View File

@@ -0,0 +1,144 @@
# notify-worker物流事件 -> 消息入队)说明
面向读者:后端/运维/联调同学。
一句话:`notify-worker` 是常驻 worker用于从 `notify_queue` 消费“新物流事件通知”并生成upsert`express_notifications`,让后续的 `push-server` consumer 去实际下发推送。
---
## 1. 它解决什么问题?
- **解耦**Webhook 入库只保证事实表(`platform_express_*`)一致;消息生成用 worker 异步做,避免在 webhook 请求里做过多业务计算。
- **可追溯/可补偿**:队列表 `notify_queue` 和消息表 `express_notifications` 让每次处理结果可落库(成功/跳过/失败原因)。
- **幂等**:对同一事件(同一 `dedupe_key`)写入消息时,使用稳定的 `message_id` 进行 upsert避免重复生成消息。
在整体链路中的位置:
1) webhook-receiver 写入 `platform_express_tracking_events`
2) 事件被推入 `notify_queue`(由触发器/任务/上游逻辑实现)
3) **notify-worker 消费 `notify_queue` → 写入 `express_notifications`**
4) push-server consumer 轮询 `express_notifications` → 调用 `CLOUD_FUNC_URL` 下发
---
## 2. 用什么做的(技术栈/依赖)
- **运行时**Node.js建议 18+,以确保 `fetch` 可用;旧版本会回退到 `node-fetch`
- **通信方式**:直接调用 Supabase PostgREST REST API`$SUPA_URL/rest/v1/...`
- **鉴权**:默认仅发送 `apikey` header仅当 `SUPA_USE_BEARER=true` 才附加 `Authorization: Bearer ...`
- **核心依赖**
- `crypto`:计算稳定的 `message_id`SHA-256
- `fetch` / `node-fetch`HTTP 调用 Supabase REST
它不是一个 HTTP Server不监听端口而是一个“循环轮询的后台进程”。
---
## 3. 输入/输出(读写哪些表)
### 3.1 输入:`notify_queue`
`notify-worker` 会拉取:
- `processed_at IS NULL` 的记录
-`created_at ASC` 排序
- 每次最多 `BATCH_SIZE`
并在处理完成后回写:
- `processed_at`(处理时间)
- `process_status``queued` / `skipped` / `failed`
- `last_error`(失败/跳过原因,截断保存)
> 说明:字段命名以当前脚本实现为准,你的表结构需要包含这些列。
### 3.2 查询依赖
为把“物流事件”映射到收件人user/merchant脚本会查询
- `platform_express_waybills`:取 `order_id/order_no, carrier, tracking_no`
- `ml_orders`:取 `user_id, merchant_id`
### 3.3 输出:`express_notifications`
对每个收件人写入 1 条通知(同一事件会写 user 与 merchant 两条,若都存在):
- `aud``user``merchant`
- `recipient_id`
- `order_id / waybill_id / tracking_no / carrier`
- `message_id`:稳定哈希生成(`aud|waybill_id|dedupe_key`
- `dedupe_key``waybill_id|aud|<queue.dedupe_key>`(截断到 256
- `event_text_safe`:当前实现直接复用队列行的 `event_text`
- `status_code / event_time / payload`
写入方式是 **upsert**`POST express_notifications?on_conflict=message_id`,并使用 `resolution=merge-duplicates`
注意:数据库侧必须有 **普通** 唯一索引/约束 `express_notifications(message_id)`;不要用 `WHERE message_id IS NOT NULL` 的部分唯一索引,否则会触发 `42P10`
若你已遇到该错误,执行修复脚本:`pages/mall/delivery/doc/需求文档/20260310_fix_express_notifications_on_conflict_message_id.sql`
---
## 4. 配置与启动
### 4.1 配置加载优先级
脚本会复用 `server/load-config.js`,加载顺序(只填充未设置的 env
1) 系统环境变量
2) `CONFIG_FILE` / `CONFIG_PATH` 指定的 `.env``.json`
3) 同目录 `server/notify-worker.config.json`
4) `server/.env` / `server/config.json` / `server/config.json.example`
### 4.2 关键环境变量
- `SUPA_URL`(必需)
- `SERVICE_ROLE_KEY``SUPA_KEY`(必需,推荐 service_role
- `SUPA_USE_BEARER`(可选,默认 false
worker 行为:
- `NOTIFY_WORKER_POLL_MS`(默认 2000
- `NOTIFY_WORKER_BATCH_SIZE`(默认 20
- `RUN_ONCE=true`(只跑一轮就退出,适合手动验证/CI
### 4.3 启动命令
在仓库根目录:
```powershell
# 常驻运行
node server/notify-worker.js
# 只跑一轮(用于验证)
$env:RUN_ONCE='true'
node server/notify-worker.js
```
如果你的配置放在 JSON
```powershell
$env:CONFIG_FILE=(Resolve-Path .\server\notify-worker.config.json)
node server/notify-worker.js
```
---
## 5. 管理者“一页摘要”(给非一线同学)
### 业务价值
- 让“物流事件入库”与“消息生成/推送”解耦,降低 webhook 链路复杂度和故障扩散。
- 通过队列表落库,可审计、可重放、可定位卡点。
### 边界
- notify-worker **只负责生成消息入队**`express_notifications`),不负责实际推送下发。
- 推送通道成功率与重试投递由 push-server consumer + 云函数负责。
### 关键依赖
- Supabase/Postgres 表:`notify_queue``platform_express_waybills``ml_orders``express_notifications`
- 上游必须保证:有机制把新事件推入 `notify_queue`(触发器/任务/服务均可)
### 主要风险与控制点
- **幂等/重复消息**:依赖 `dedupe_key` 的稳定性;上游写队列时应构造稳定的 dedupe_key不要用 Date.now 这种随机值)。
- **权限/密钥**:需要 `service_role` 读写多表,必须只放服务器环境变量。
- **数据合规**`event_text_safe` 应为清洗/脱敏后的文案;若上游传入的是原文,需在生成前做脱敏。
### 建议监控指标
- `notify_queue` 未处理数量(`processed_at IS NULL`)是否持续增长
- 每分钟处理量、`process_status=failed` 比例与 Top `last_error`
- 从队列入库到消息入队的延迟created_at → processed_at
---
## 6. 进一步阅读
- push-server投递消费者`server/PUSH_SERVER_README.md`
- 总览Webhook + worker + push`docs/DELIVERY_WEBHOOK_PUSH_SERVER_OVERVIEW.md`
- webhook-receiver事件入库`pages/mall/delivery/webhook-server/README.md`

View File

@@ -3,6 +3,10 @@
简要说明
- 该文档记录对 `server/push-server.js` 的修改、运行所需的环境变量、表结构依赖、以及如何把 Supabase 的 cid 与通知通过云函数(`CLOUD_FUNC_URL`)下发的端到端操作步骤。
组件补充(新增)
- `server/notify-worker.js`:常驻 worker`notify_queue` 生成/写入 `express_notifications`(消息入队)。
- 说明文档:`server/NOTIFY_WORKER_README.md`(包含技术栈/依赖、配置项、监控与管理者摘要)。
变更要点(代码修改摘要)
- supaFetch: 默认仅发送 `apikey`;仅当 `SUPA_USE_BEARER=true` 时才发送 `Authorization: Bearer`。用于避免自托管 Supabase/Kong 场景下因 JWT_SECRET 不一致触发 `PGRST301`
- 新增 endpoint `/api/v1/notifications`:将通知写入 `express_notifications`(排队);由消费者(轮询)读取待处理记录并 POST 到 `CLOUD_FUNC_URL`
@@ -26,6 +30,20 @@
- `public.push_devices`:用于存储设备 cid、user_id/merchant_id、is_active 等(见仓库迁移脚本 `20260224_add_push_devices_and_notifications.sql`)。
- `public.express_notifications`:用于保存通知记录与状态(见迁移脚本)。
字段语义(从 2026-03-09 起,重要变更)
- `status_code`:物流/业务状态(如 `OUT_FOR_DELIVERY/DELIVERED/...`),由 `notify-worker` 写入。
- `send_status`:投递处理状态(`null`=待发送,`processing/retrying/success/failed/no-targets`),由 push-server consumer 读写。
为什么需要这个变更:引入 `notify-worker` 后,`status_code` 不再适合作为“投递状态”,否则 consumer 会捞不到记录(或把物流状态误当投递状态)。
迁移脚本(必需执行一次):
- `pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql`
消息生成(可选但推荐)
- `public.notify_queue`:轨迹事件入队表(由 DB trigger 写入)。
- 迁移脚本:`pages/mall/delivery/doc/需求文档/20260309_add_notify_queue_and_trigger.sql`(创建 `notify_queue` + `AFTER INSERT` trigger
- 常驻消费者:`server/notify-worker.js`(从 `notify_queue` 生成 `express_notifications`push-server consumer 再负责下发)。
关键环境变量(示例与说明)
- SUPA_URL — Supabase RESTPostgREST地址内部建议 `http://rest:3000`)。
- SERVICE_ROLE_KEY 或 SUPA_KEY — 用作 `apikey` 向 PostgREST 请求(不要把明文放到 Authorization 除非该值确为 JWT
@@ -34,6 +52,7 @@
- CONSUMER_POLL_MS — 轮询间隔(毫秒)。
- CLOUD_FUNC_URL — 云函数外网调用 URL每个目标 cid 会对该 URL 发 POST
- PUSH_TOKEN — (可选) 云函数鉴权 token会在 POST body 的 token 字段中透传)。
- PUSH_SERVER_PORT — (可选) push-server 监听端口,默认 7301推荐用这个便于与 webhook-receiver 共享同一份 `server/config.json`)。
<!--
UNI_PUSH_URL / UNI_PUSH_APPID / UNI_PUSH_SECRET / PUSH_PROXY_URL / PUSH_PROXY_TOKEN仅在非云函数模式下使用当前已不再使用。
@@ -101,7 +120,7 @@ Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/notifications' -Method POST
- 在 Supabase 中查询最近记录:
```sql
select id, message_id, status_code, retry_count, last_error, updated_at
select id, message_id, send_status, status_code, retry_count, last_error, updated_at
from public.express_notifications
order by created_at desc
limit 10;

View File

@@ -84,10 +84,13 @@ Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/notifications' -Method POST
# 3) 等待 2 秒CONSUMER_POLL_MS=2000观察 push-server 控制台:应出现对 CLOUD_FUNC_URL 的 POST
```
- 如何确认是否已处理:在 Supabase 查询最近记录的 `status_code` / `last_error`
-- 如何确认是否已处理:在 Supabase 查询最近记录的 `send_status` / `last_error`
> 说明:`status_code` 是物流/业务状态(由 notify-worker 写入consumer 的投递状态使用 `send_status`。
> 若你的数据库还没有该字段,请先执行迁移:`pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql`。
```sql
select id, message_id, status_code, retry_count, last_error, updated_at
select id, message_id, send_status, status_code, retry_count, last_error, updated_at
from public.express_notifications
order by created_at desc
limit 10;

View File

@@ -9,5 +9,6 @@
"RETRY_INITIAL_MS": "5000",
"RETRY_FACTOR": "2",
"RETRY_MAX_MS": "3600000",
"PORT": "7301"
"PUSH_SERVER_PORT": "7301",
"WEBHOOK_PORT": "7201"
}

255
server/notify-worker.js Normal file
View File

@@ -0,0 +1,255 @@
// notify-worker.js
// 常驻队列消费者:从 notify_queue 读取待处理事件,写入 express_notifications。
// push-server consumer 将继续轮询 express_notifications 并调用 CLOUD_FUNC_URL 实际下发。
//
// 配置加载优先级:
// 1) 系统环境变量
// 2) CONFIG_FILE/CONFIG_PATH
// 3) 同目录 notify-worker.config.json
// 4) server/.env / server/config.json via server/load-config.js
const fs = require('fs')
const path = require('path')
const localConfigPath = path.join(__dirname, 'notify-worker.config.json')
if (!process.env.CONFIG_FILE && !process.env.CONFIG_PATH && fs.existsSync(localConfigPath)) {
process.env.CONFIG_FILE = localConfigPath
}
require('./load-config')
const crypto = require('crypto')
const fetchImpl = (globalThis.fetch ? globalThis.fetch.bind(globalThis) : (() => {
try {
return require('node-fetch')
} catch (e) {
throw new Error("No fetch implementation found. Use Node.js 18+ or install 'node-fetch'.")
}
})())
const SUPA_URL = (process.env.SUPA_URL || process.env.SUPA_URL_OVERRIDE || '').replace(/\/$/, '')
// Prefer service role key when present (server-side), to avoid RLS issues.
const SUPA_KEY = process.env.SERVICE_ROLE_KEY || process.env.SUPA_KEY || ''
const SUPA_USE_BEARER = (process.env.SUPA_USE_BEARER === 'true')
const POLL_MS = Number(process.env.NOTIFY_WORKER_POLL_MS || process.env.WORKER_POLL_MS || 2000)
const BATCH_SIZE = Number(process.env.NOTIFY_WORKER_BATCH_SIZE || process.env.WORKER_BATCH_SIZE || 20)
const RUN_ONCE = (process.env.RUN_ONCE === 'true' || process.env.RUN_ONCE === '1')
function supaFetch(restPath, opts = {}) {
const url = `${SUPA_URL}/rest/v1/${restPath}`
const headers = Object.assign({}, opts.headers || {}, {
apikey: SUPA_KEY,
Accept: 'application/json'
})
if (SUPA_USE_BEARER) headers.Authorization = `Bearer ${SUPA_KEY}`
return fetchImpl(url, Object.assign({}, opts, { headers }))
}
function sha256Hex(input) {
return crypto.createHash('sha256').update(String(input)).digest('hex')
}
function buildMessageId({ aud, waybill_id, dedupe_key }) {
const h = sha256Hex(`${aud}|${waybill_id}|${dedupe_key}`)
return `evt_${aud}_${h.slice(0, 32)}`
}
async function fetchPendingQueue(limit) {
const q = `notify_queue?processed_at=is.null&order=created_at.asc&limit=${encodeURIComponent(String(limit))}`
const resp = await supaFetch(q, { method: 'GET' })
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch notify_queue failed: HTTP ${resp.status} ${txt}`)
}
return await resp.json()
}
async function fetchWaybill(waybillId) {
const resp = await supaFetch(`platform_express_waybills?id=eq.${encodeURIComponent(waybillId)}&select=id,order_id,order_no,carrier,tracking_no`, { method: 'GET' })
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch waybill failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json()
return rows && rows[0] ? rows[0] : null
}
async function fetchOrderById(orderId) {
const resp = await supaFetch(`ml_orders?id=eq.${encodeURIComponent(orderId)}&select=id,order_no,user_id,merchant_id`, { method: 'GET' })
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch order (id) failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json()
return rows && rows[0] ? rows[0] : null
}
async function fetchOrderByNo(orderNo) {
const resp = await supaFetch(`ml_orders?order_no=eq.${encodeURIComponent(orderNo)}&select=id,order_no,user_id,merchant_id`, { method: 'GET' })
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch order (order_no) failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json()
return rows && rows[0] ? rows[0] : null
}
async function upsertExpressNotification(body) {
const resp = await supaFetch('express_notifications?on_conflict=message_id', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Prefer: 'return=representation,resolution=merge-duplicates'
},
body: JSON.stringify(body)
})
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`insert express_notifications failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json().catch(() => null)
return Array.isArray(rows) ? rows[0] : rows
}
async function markQueueProcessed(id, patch) {
const resp = await supaFetch(`notify_queue?id=eq.${encodeURIComponent(id)}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(Object.assign({}, patch, { processed_at: new Date().toISOString() }))
})
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`patch notify_queue failed: HTTP ${resp.status} ${txt}`)
}
}
async function processOne(row) {
const waybillId = row.waybill_id
if (!waybillId) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'missing waybill_id' })
return
}
const waybill = await fetchWaybill(waybillId)
if (!waybill) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'waybill not found' })
return
}
let order = null
if (waybill.order_id) order = await fetchOrderById(waybill.order_id)
if (!order && waybill.order_no) order = await fetchOrderByNo(waybill.order_no)
if (!order) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'order not found for waybill' })
return
}
const recipients = []
if (order.user_id) recipients.push({ aud: 'user', recipient_id: order.user_id })
if (order.merchant_id) recipients.push({ aud: 'merchant', recipient_id: order.merchant_id })
if (recipients.length === 0) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'no recipient_id resolved from order' })
return
}
const payload = {
biz: 'express',
order_no: order.order_no || waybill.order_no || null,
order_id: order.id || waybill.order_id || null,
waybill_id: waybill.id,
carrier: waybill.carrier || row.carrier || null,
tracking_no: waybill.tracking_no || row.tracking_no || null,
status_code: row.status_code,
event_time: row.event_time,
event_id: row.event_id || null
}
// 为每个受众写入一条通知message_id 按 aud 区分,确保幂等)
for (const r of recipients) {
const message_id = buildMessageId({ aud: r.aud, waybill_id: waybill.id, dedupe_key: row.dedupe_key })
const dedupe_key = `${waybill.id}|${r.aud}|${row.dedupe_key}`.slice(0, 256)
await upsertExpressNotification({
aud: r.aud,
recipient_id: r.recipient_id,
order_id: order.id || null,
waybill_id: waybill.id,
tracking_no: waybill.tracking_no || null,
carrier: waybill.carrier || null,
message_id,
event_text_safe: row.event_text || null,
status_code: row.status_code,
event_time: row.event_time || null,
payload,
dedupe_key
})
}
await markQueueProcessed(row.id, { process_status: 'queued', last_error: null })
}
async function loopOnce() {
const rows = await fetchPendingQueue(BATCH_SIZE)
if (!rows || rows.length === 0) return { processed: 0 }
let processed = 0
for (const row of rows) {
try {
await processOne(row)
processed += 1
} catch (e) {
const msg = (e && e.message) ? e.message : String(e)
try {
await markQueueProcessed(row.id, { process_status: 'failed', last_error: msg.substring(0, 2000) })
} catch (patchErr) {
console.warn('failed to mark queue row failed:', patchErr)
}
console.warn('processOne failed:', msg)
processed += 1
}
}
return { processed }
}
async function main() {
if (!SUPA_URL || !SUPA_KEY) {
console.error('SUPA_URL and SUPA_KEY must be set in env')
process.exit(1)
}
console.log('notify-worker starting...')
console.log('ENV: SUPA_URL=', SUPA_URL)
console.log('ENV: SUPA_USE_BEARER=', SUPA_USE_BEARER)
console.log('ENV: POLL_MS=', POLL_MS, 'BATCH_SIZE=', BATCH_SIZE, 'RUN_ONCE=', RUN_ONCE)
if (RUN_ONCE) {
const r = await loopOnce()
console.log('notify-worker done (run once):', r)
return
}
// 常驻轮询
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const r = await loopOnce()
if (r.processed > 0) console.log('notify-worker processed:', r.processed)
} catch (e) {
console.warn('notify-worker loop error:', e && e.message ? e.message : e)
}
await new Promise(resolve => setTimeout(resolve, POLL_MS))
}
}
main().catch(e => {
console.error('notify-worker fatal:', e)
process.exit(1)
})

View File

@@ -4,7 +4,8 @@
"description": "Local push backend for development (register devices, mock send)",
"main": "push-server.js",
"scripts": {
"start": "node push-server.js"
"start": "node push-server.js",
"worker": "node notify-worker.js"
},
"dependencies": {
"archiver": "^7.0.1",

View File

@@ -12,12 +12,14 @@ const path = require('path')
const fetch = require('node-fetch')
const { spawn } = require('child_process')
const PORT = process.env.PORT || 7301
// 支持服务专用端口,避免与 webhook-receiver 共用 server/config.json 时发生端口冲突
const PORT = process.env.PUSH_SERVER_PORT || process.env.PORT || 7301
const DATA_DIR = path.join(__dirname, 'data')
const DEVICES_FILE = path.join(DATA_DIR, 'push_devices.json')
const SUPA_URL = process.env.SUPA_URL || ''
// 支持两种环境变量名SUPA_KEY和 SERVICE_ROLE_KEY常用于自托管 .env
const SUPA_KEY = process.env.SUPA_KEY || process.env.SERVICE_ROLE_KEY || ''
// Prefer service role key when present (server-side), to avoid RLS issues.
const SUPA_KEY = process.env.SERVICE_ROLE_KEY || process.env.SUPA_KEY || ''
const SUPA_SCHEMA = process.env.SUPA_SCHEMA || 'public'
// Consumer 配置
@@ -31,6 +33,8 @@ const RETRY_INITIAL_MS = parseInt(process.env.RETRY_INITIAL_MS || '5000', 10)
const RETRY_FACTOR = parseInt(process.env.RETRY_FACTOR || '2', 10)
const RETRY_MAX_MS = parseInt(process.env.RETRY_MAX_MS || '3600000', 10) // 1 hour
let warnedMissingSendStatus = false
async function ensureDataDir() {
try {
await fs.mkdir(DATA_DIR, { recursive: true })
@@ -144,12 +148,18 @@ async function start() {
if (!SUPA_URL || !SUPA_KEY) return []
try {
// 查询:
// - pending: status_code IS NULL
// - retrying: status_code = 'retrying' 且 (next_attempt_at IS NULL 或 next_attempt_at <= now)
// - pending: send_status IS NULL
// - retrying: send_status = 'retrying' 且 (next_attempt_at IS NULL 或 next_attempt_at <= now)
const now = new Date().toISOString()
const q = `express_notifications?or=(status_code.is.null,and(status_code.eq.retrying,or(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})))&order=created_at.asc&limit=${limit}`
const q = `express_notifications?or=(send_status.is.null,and(send_status.eq.retrying,or(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})))&order=created_at.asc&limit=${limit}`
const resp = await supaFetch(q, { method: 'GET' })
if (!resp.ok) return []
if (!resp.ok) {
if (!warnedMissingSendStatus && resp.status === 400) {
warnedMissingSendStatus = true
console.warn('Consumer query failed. If this mentions missing column "send_status", run migration: pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql')
}
return []
}
const data = await resp.json().catch(() => [])
return Array.isArray(data) ? data : []
} catch (e) {
@@ -160,19 +170,23 @@ async function start() {
async function claimNotification(id) {
try {
const body = { status_code: 'processing', updated_at: new Date().toISOString() }
const body = { send_status: 'processing', updated_at: new Date().toISOString() }
// 仅当当前仍为 pending 或 retrying 且到期时,才抢占。
// 为了避免在 PATCH 中使用复杂 or= 逻辑树导致匹配失败,这里拆成两次尝试。
const now = new Date().toISOString()
const attempts = [
// pending: status_code IS NULL
`express_notifications?id=eq.${encodeURIComponent(id)}&status_code=is.null`,
// retrying: status_code='retrying' and next_attempt_at is null or <= now
`express_notifications?id=eq.${encodeURIComponent(id)}&status_code=eq.retrying&or=(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})`
// pending: send_status IS NULL
`express_notifications?id=eq.${encodeURIComponent(id)}&send_status=is.null`,
// retrying: send_status='retrying' and next_attempt_at is null or <= now
`express_notifications?id=eq.${encodeURIComponent(id)}&send_status=eq.retrying&or=(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})`
]
for (const path of attempts) {
const resp = await supaFetch(path, { method: 'PATCH', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify(body) })
if (!resp.ok && !warnedMissingSendStatus && resp.status === 400) {
warnedMissingSendStatus = true
console.warn('Consumer claim failed. If this mentions missing column "send_status", run migration: pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql')
}
if (!resp.ok) continue
const j = await resp.json().catch(() => null)
if (Array.isArray(j)) {
@@ -193,9 +207,13 @@ async function start() {
async function updateNotificationStatus(id, status, note) {
try {
if (!id) return
const body = { status_code: String(status), updated_at: new Date().toISOString() }
if (note) body.event_text_safe = String(note).substring(0, 2000)
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) })
const body = { send_status: String(status), updated_at: new Date().toISOString() }
if (note) body.last_error = String(note).substring(0, 2000)
const resp = await supaFetch(`express_notifications?id=eq.${encodeURIComponent(id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) })
if (!resp.ok && !warnedMissingSendStatus && resp.status === 400) {
warnedMissingSendStatus = true
console.warn('Consumer status update failed. If this mentions missing column "send_status", run migration: pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql')
}
} catch (e) { console.warn('updateNotificationStatus error', e) }
}
@@ -233,6 +251,12 @@ async function start() {
const notification = claimed.payload && claimed.payload.notification ? claimed.payload.notification : (claimed.event_text_safe ? { title: claimed.event_text_safe, body: '' } : {})
const payload = claimed.payload || {}
// Some push providers (and client notification renderers) may not display a system notification
// when content/body is empty, even if the push API returns success.
const titleText = (notification && (notification.title || notification.event_text || notification.text)) || claimed.event_text_safe || ''
const bodyText = (notification && (notification.body || notification.content)) || ''
const contentText = bodyText || titleText || ''
// build targets similar to /api/v1/notifications
let targets = []
try {
@@ -261,7 +285,7 @@ async function start() {
let allOk = true
let lastNote = ''
if (CLOUD_FUNC_URL) {
const calls = await Promise.all(targets.map(cid => invokeCloudFuncForCid(CLOUD_FUNC_URL, PUSH_TOKEN, cid, notification && notification.title, notification && notification.body, payload)))
const calls = await Promise.all(targets.map(cid => invokeCloudFuncForCid(CLOUD_FUNC_URL, PUSH_TOKEN, cid, titleText, contentText, payload)))
for (const c of calls) {
if (!c.ok) {
allOk = false
@@ -285,7 +309,7 @@ async function start() {
if (nextRetry >= MAX_RETRIES) {
// mark failed
try {
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: 'failed', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), updated_at: new Date().toISOString() }) })
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ send_status: 'failed', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), updated_at: new Date().toISOString() }) })
} catch (e) { console.warn('mark failed error', e) }
} else {
// compute exponential backoff
@@ -293,7 +317,7 @@ async function start() {
if (delay > RETRY_MAX_MS) delay = RETRY_MAX_MS
const nextAt = new Date(Date.now() + delay).toISOString()
try {
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: 'retrying', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), next_attempt_at: nextAt, updated_at: new Date().toISOString() }) })
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ send_status: 'retrying', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), next_attempt_at: nextAt, updated_at: new Date().toISOString() }) })
} catch (e) { console.warn('schedule retry error', e) }
}
}

View File

@@ -0,0 +1,59 @@
param(
[switch]$NoKill
)
$ErrorActionPreference = 'Stop'
function Ensure-Dir($p) {
if (-not (Test-Path $p)) { New-Item -ItemType Directory -Path $p -Force | Out-Null }
}
function Stop-ListeningPort($port) {
$lines = netstat -ano | Select-String (":$port ")
foreach ($l in $lines) {
$m = [regex]::Match($l.ToString(), "\sLISTENING\s+(\d+)$")
if ($m.Success) {
$procId = [int]$m.Groups[1].Value
try {
Stop-Process -Id $procId -Force -ErrorAction Stop
Write-Host "Stopped PID $procId on port $port"
} catch {
Write-Host "Failed to stop PID $procId on port $($port): $($_.Exception.Message)"
}
}
}
}
# Repo root is two levels up from this script: server\scripts\*.ps1
$root = Resolve-Path (Join-Path $PSScriptRoot "..\..")
$configPath = Resolve-Path (Join-Path $root "server\config.json")
if (-not $NoKill) {
Stop-ListeningPort 7201
Stop-ListeningPort 7301
}
Ensure-Dir (Join-Path $root "server\scripts")
$env:CONFIG_FILE = $configPath.Path
$services = @(
@{ Name = 'webhook-receiver'; Script = (Join-Path $root 'pages\mall\delivery\webhook-server\webhook-receiver.js'); Out = (Join-Path $root 'webhook-receiver.log'); Err = (Join-Path $root 'webhook-receiver.err.log') },
@{ Name = 'notify-worker'; Script = (Join-Path $root 'server\notify-worker.js'); Out = (Join-Path $root 'notify-worker.log'); Err = (Join-Path $root 'notify-worker.err.log') },
@{ Name = 'push-server'; Script = (Join-Path $root 'server\push-server.js'); Out = (Join-Path $root 'push-server.log'); Err = (Join-Path $root 'push-server.err.log') }
)
$pids = @{}
foreach ($s in $services) {
if (-not (Test-Path $s.Script)) { throw "Missing script: $($s.Script)" }
Write-Host "Starting $($s.Name)..."
$p = Start-Process -FilePath "node" -ArgumentList @($s.Script) -WorkingDirectory $root.Path -WindowStyle Hidden -RedirectStandardOutput $s.Out -RedirectStandardError $s.Err -PassThru
$pids[$s.Name] = $p.Id
Write-Host " PID=$($p.Id) out=$($s.Out) err=$($s.Err)"
}
Ensure-Dir (Join-Path $root 'server\.runtime')
$pidsFile = Join-Path $root 'server\.runtime\delivery-backend.pids.json'
$pids | ConvertTo-Json -Depth 5 | Set-Content -Path $pidsFile -Encoding UTF8
Write-Host "Saved pids: $pidsFile"
Write-Host "Done. Use test sender: node .\pages\mall\delivery\webhook-server\test-send.js"

View File

@@ -0,0 +1,27 @@
$ErrorActionPreference = 'SilentlyContinue'
$root = Resolve-Path (Join-Path $PSScriptRoot "..\..")
$pidsFile = Join-Path $root 'server\.runtime\delivery-backend.pids.json'
if (Test-Path $pidsFile) {
try {
$pids = Get-Content $pidsFile -Raw | ConvertFrom-Json
foreach ($name in $pids.PSObject.Properties.Name) {
$pid = [int]$pids.$name
if ($pid -gt 0) {
try {
Stop-Process -Id $pid -Force
Write-Host "Stopped $name PID=$pid"
} catch {
Write-Host "Failed to stop $name PID=$pid"
}
}
}
} catch {
Write-Host "Failed to parse $pidsFile"
}
} else {
Write-Host "No pid file found: $pidsFile"
}
Write-Host "If ports still occupied, run: netstat -ano | Select-String ':7201 ' / ':7301 '"