From 436b7b251f3dec9b1dd0c40188f2e302806dc90e Mon Sep 17 00:00:00 2001 From: not-like-juvenile <16056107+not-like-juvenile@user.noreply.gitee.com> Date: Mon, 9 Mar 2026 10:39:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81=E5=90=8E?= =?UTF-8?q?=E5=8F=B0=E6=89=93=E9=80=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pages/mall/delivery/webhook-server/README.md | 91 +++++- .../mall/delivery/webhook-server/test-send.js | 2 +- .../webhook-server/webhook-receiver.js | 65 +++- .../webhook-server/webhook.config.json | 6 + .../webhook.config.json.example | 6 + server/PUSH_SERVER_README.md | 88 +++++- server/README.md | 117 ++++++- server/config.json | 13 + server/config.json.example | 13 + server/load-config.js | 68 +++++ server/push-server.js | 288 +++--------------- 11 files changed, 466 insertions(+), 291 deletions(-) create mode 100644 pages/mall/delivery/webhook-server/webhook.config.json create mode 100644 pages/mall/delivery/webhook-server/webhook.config.json.example create mode 100644 server/config.json create mode 100644 server/config.json.example create mode 100644 server/load-config.js diff --git a/pages/mall/delivery/webhook-server/README.md b/pages/mall/delivery/webhook-server/README.md index 4aae44f2..45940ddc 100644 --- a/pages/mall/delivery/webhook-server/README.md +++ b/pages/mall/delivery/webhook-server/README.md @@ -1,22 +1,41 @@ # Webhook 接收器 — 说明 -路径:`pages/mall/delivery/server/webhook-receiver.js` +路径:`pages/mall/delivery/webhook-server/webhook-receiver.js` 目的:接收承运方或 Mock Server 的 HTTP 回调(POST /webhook/express/status),将原始回文写入 `platform_express_event_raw`,并按项目现有映射更新 `platform_express_waybills` 与写入 `platform_express_tracking_events`。 环境变量(必须/可选): - `SUPA_URL`:Supabase REST 地址(示例 `http://192.168.1.62:18000`) - `SUPA_KEY`:Supabase service_role 或 anon key(用于 REST 写入) +- `SUPA_USE_BEARER`(可选):是否附加 `Authorization: Bearer `,默认 `false`。 + - 在一些自托管 Supabase/Kong(key-auth)环境中,**只需要** `apikey`;如果误加 Bearer 且 key 不是 JWT,可能出现 `PGRST301`("None of the keys was able to decode the JWT")。 - `WEBHOOK_SECRET`(可选):与第三方共享的 HMAC-SHA256 secret,用于校验 `X-Signature`(签名为 hex) - `PORT`(可选):接收器监听端口,默认 `7201` +配置方式(推荐用配置文件,避免与其他服务端口冲突): +- **同目录配置文件(推荐)**:在 `webhook-receiver.js` 同目录放置 `webhook.config.json`,启动时会自动读取。 +- **显式指定配置文件**:设置 `CONFIG_FILE` / `CONFIG_PATH` 指向你的 `.env` 或 `.json`。 +- **回退加载**:若未指定 `CONFIG_FILE` 且同目录无 `webhook.config.json`,会尝试读取 `server/.env` 与 `server/config.json`(由 `server/load-config.js` 负责)。 + +示例配置文件:仓库内提供 [server/webhook.config.json.example](server/webhook.config.json.example),你可以复制一份到本目录作为 `webhook.config.json` 使用。 + 启动(PowerShell): ```powershell -$env:SUPA_URL='http://192.168.1.62:18000' -$env:SUPA_KEY='your_service_role_key' -# 可选验签 -$env:WEBHOOK_SECRET='your-secret' -node pages/mall/delivery/server/webhook-receiver.js +node pages/mall/delivery/webhook-server/webhook-receiver.js +``` + +推荐:使用同目录配置文件启动(PowerShell): +```powershell +# 复制示例并填写真实 SUPA_KEY +Copy-Item .\server\webhook.config.json.example .\pages\mall\delivery\webhook-server\webhook.config.json + +node pages/mall/delivery/webhook-server/webhook-receiver.js +``` + +如果你使用显式 CONFIG_FILE: +```powershell +$env:CONFIG_FILE=(Resolve-Path .\pages\mall\delivery\webhook-server\webhook.config.json) +node pages/mall/delivery/webhook-server/webhook-receiver.js ``` 启动(Linux / macOS / WSL): @@ -24,7 +43,30 @@ node pages/mall/delivery/server/webhook-receiver.js export SUPA_URL='http://192.168.1.62:18000' export SUPA_KEY='your_service_role_key' export WEBHOOK_SECRET='your-secret' # optional -node pages/mall/delivery/server/webhook-receiver.js +node pages/mall/delivery/webhook-server/webhook-receiver.js +``` + +也可以用配置文件(更适合长期运行): +- `server/load-config.js` 会自动尝试加载:`server/.env`、`server/config.json`(以及 `CONFIG_FILE/CONFIG_PATH` 指定的文件),并把其中的键注入到 `process.env`。 +- 因为接收器已在启动时 `require` 了该加载器,所以你只要把 `SUPA_URL`、`SUPA_KEY` 写进上述文件之一即可。 + +如果你不想与 `server/config.json` 共用 `PORT`(避免端口冲突),建议为 webhook 单独准备一个配置文件,然后用 `CONFIG_FILE` 指定: + +```powershell +# 复制示例文件并填写真实 SUPA_KEY: +Copy-Item .\server\webhook.config.json.example .\server\webhook.config.json + +# 指定配置文件启动(不会影响 push-server 的配置): +$env:CONFIG_FILE=(Resolve-Path .\server\webhook.config.json) +node pages/mall/delivery/webhook-server/webhook-receiver.js +``` + +示例:在 `server/.env` 中写入: +```env +SUPA_URL=http://192.168.1.62:18000 +SUPA_KEY=your_service_role_key +WEBHOOK_SECRET=your-secret +PORT=7201 ``` 测试(curl 模拟第三方推送): @@ -42,6 +84,41 @@ curl -i -X POST http://localhost:7201/webhook/express/status \ -d "$BODY" ``` +健康检查: +- `GET http://localhost:7201/health`(端口以 `PORT` 为准) + +常见问题排查: +- 返回 `{ ok:false, message:'waybill not found' }`:说明 webhook 已收到请求,但在 `platform_express_waybills` 中找不到 `tracking_no`(或 `order_no`)匹配的记录。 +- 返回 `502 supabase unauthorized (check SUPA_KEY/SUPA_URL)`:说明当前 `SUPA_KEY` / `SUPA_URL` 无法通过 Supabase REST 鉴权(常见于 key 填错、已失效、URL 不对)。请换成 Supabase 控制台中的真实 `service_role` key,并重启接收器。 + +Windows 下保持“持续监听”(后台运行): +> 只要 Node 进程还在,webhook 就会持续监听;如果你关闭终端/窗口或按 `Ctrl+C`,进程结束就不会再监听。 + +```powershell +# 后台启动并把日志写到文件(推荐) +Start-Process node -ArgumentList 'pages/mall/delivery/webhook-server/webhook-receiver.js' \ + -WorkingDirectory (Get-Location) \ + -RedirectStandardOutput '.\webhook-receiver.log' \ + -RedirectStandardError '.\webhook-receiver.err.log' \ + -PassThru + +# 查看是否在监听(把 7201 换成你的 PORT) +netstat -ano | findstr :7201 + +# 查看健康检查 +Invoke-RestMethod -Uri 'http://localhost:7201/health' -Method GET | ConvertTo-Json +``` + +停止服务(按 PID 结束进程): +```powershell +# 先用 netstat 找到 LISTENING 的 PID,然后结束它 +Stop-Process -Id +``` + +依赖说明: +- 建议使用 Node.js 18+(例如你当前的 Node.js 22),已内置 `fetch`,无需安装 `node-fetch`。 +- 若使用更老的 Node 且没有 `fetch`,需要安装 `node-fetch`(并保持 CommonJS 兼容)。 + 预期:接口返回 200 JSON {ok:true}(若未找到对应运单会返回 {ok:false, message:'waybill not found'})。 验证写入(查看 Supabase): diff --git a/pages/mall/delivery/webhook-server/test-send.js b/pages/mall/delivery/webhook-server/test-send.js index d04b5ddd..fbd895b6 100644 --- a/pages/mall/delivery/webhook-server/test-send.js +++ b/pages/mall/delivery/webhook-server/test-send.js @@ -7,7 +7,7 @@ const URL = `http://localhost:${PORT}/webhook/express/status` const SECRET = process.env.WEBHOOK_SECRET || 'test_secret' const payload = { - tracking_no: 'LOCALTEST123', + tracking_no: 'TEST_YT_20260206_0007', status_code: 'DELIVERED', acceptTime: new Date().toISOString(), remark: 'local test event' diff --git a/pages/mall/delivery/webhook-server/webhook-receiver.js b/pages/mall/delivery/webhook-server/webhook-receiver.js index f9833a93..bd3f7bec 100644 --- a/pages/mall/delivery/webhook-server/webhook-receiver.js +++ b/pages/mall/delivery/webhook-server/webhook-receiver.js @@ -1,6 +1,29 @@ +// Load configuration into process.env. +// Priority: +// 1) Real environment variables +// 2) CONFIG_FILE/CONFIG_PATH (explicit) +// 3) Local file next to this script: webhook.config.json +// 4) server/.env / server/config.json via server/load-config.js +const fs = require('fs') +const path = require('path') + +const localConfigPath = path.join(__dirname, 'webhook.config.json') +if (!process.env.CONFIG_FILE && !process.env.CONFIG_PATH && fs.existsSync(localConfigPath)) { + process.env.CONFIG_FILE = localConfigPath +} + +require('../../../../server/load-config') + const express = require('express') const bodyParser = require('body-parser') -const fetch = require('node-fetch') +const fetch = (globalThis.fetch ? globalThis.fetch.bind(globalThis) : (() => { + try { + // Fallback for older Node versions where fetch is not available. + return require('node-fetch') + } catch (e) { + throw new Error("No fetch implementation found. Use Node.js 18+ or install 'node-fetch'.") + } +})()) const crypto = require('crypto') const PORT = process.env.PORT || 7201 @@ -10,11 +33,14 @@ const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET || '' // optional HMAC secret function supaFetch(path, opts = {}) { const url = `${SUPA_URL}/rest/v1/${path}` + // Default to apikey only (compatible with self-hosted Supabase/Kong key-auth). + // Only attach Authorization: Bearer when explicitly enabled. const headers = Object.assign({}, opts.headers || {}, { apikey: SUPA_KEY, - Authorization: `Bearer ${SUPA_KEY}`, Accept: 'application/json' }) + const sendBearer = (process.env.SUPA_USE_BEARER === 'true') + if (sendBearer) headers.Authorization = `Bearer ${SUPA_KEY}` return fetch(url, Object.assign({}, opts, { headers })) } @@ -50,22 +76,30 @@ async function findWaybillId(tracking_no, order_no) { try { if (tracking_no) { const r = await supaFetch(`platform_express_waybills?tracking_no=eq.${encodeURIComponent(tracking_no)}`) - if (r.ok) { - const data = await r.json() - if (data && data.length > 0) return data[0].id + if (!r.ok) { + const txt = await r.text().catch(() => '') + const err = new Error(`Supabase query failed (tracking_no): HTTP ${r.status} ${txt}`) + err.status = r.status + throw err } + const data = await r.json() + if (data && data.length > 0) return data[0].id } if (order_no) { const r2 = await supaFetch(`platform_express_waybills?order_no=eq.${encodeURIComponent(order_no)}`) - if (r2.ok) { - const data2 = await r2.json() - if (data2 && data2.length > 0) return data2[0].id + if (!r2.ok) { + const txt2 = await r2.text().catch(() => '') + const err2 = new Error(`Supabase query failed (order_no): HTTP ${r2.status} ${txt2}`) + err2.status = r2.status + throw err2 } + const data2 = await r2.json() + if (data2 && data2.length > 0) return data2[0].id } return null } catch (e) { - console.warn('findWaybillId error', e) - return null + console.warn('findWaybillId error', e && e.message ? e.message : e) + throw e } } @@ -140,7 +174,16 @@ async function start() { const event_code = req.body && (req.body.infoContent || req.body.status_code || req.body.event_code) const event_text = req.body && (req.body.remark || req.body.event_text || '') - const waybillId = await findWaybillId(tracking_no, order_no) + let waybillId = null + try { + waybillId = await findWaybillId(tracking_no, order_no) + } catch (e) { + const status = e && e.status ? Number(e.status) : 0 + if (status === 401 || status === 403) { + return res.status(502).json({ ok: false, message: 'supabase unauthorized (check SUPA_KEY/SUPA_URL)' }) + } + return res.status(502).json({ ok: false, message: 'supabase query failed' }) + } if (!waybillId) { // Waybill not found — respond 200 but inform caller in body. return res.status(200).json({ ok: false, message: 'waybill not found' }) diff --git a/pages/mall/delivery/webhook-server/webhook.config.json b/pages/mall/delivery/webhook-server/webhook.config.json new file mode 100644 index 00000000..c0a6ef3a --- /dev/null +++ b/pages/mall/delivery/webhook-server/webhook.config.json @@ -0,0 +1,6 @@ +{ + "SUPA_URL": "http://192.168.1.62:18000", + "SUPA_KEY": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoic2VydmljZV9yb2xlIiwiaXNzIjoic3VwYWJhc2UtMSIsImlhdCI6MTc2OTY3NjQ5OCwiZXhwIjoxOTI3MzU2NDk4fQ.ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", + "WEBHOOK_SECRET": "", + "PORT": "7201" +} diff --git a/pages/mall/delivery/webhook-server/webhook.config.json.example b/pages/mall/delivery/webhook-server/webhook.config.json.example new file mode 100644 index 00000000..62a36b21 --- /dev/null +++ b/pages/mall/delivery/webhook-server/webhook.config.json.example @@ -0,0 +1,6 @@ +{ + "SUPA_URL": "http://192.168.1.62:18000", + "SUPA_KEY": "PASTE_YOUR_SERVICE_ROLE_KEY_HERE", + "WEBHOOK_SECRET": "", + "PORT": "7201" +} diff --git a/server/PUSH_SERVER_README.md b/server/PUSH_SERVER_README.md index 6dcabde9..3d1d286d 100644 --- a/server/PUSH_SERVER_README.md +++ b/server/PUSH_SERVER_README.md @@ -1,20 +1,26 @@ **Push Server - 使用与变更说明** 简要说明 -- 该文档记录对 `server/push-server.js` 的修改、运行所需的环境变量、表结构依赖、以及如何把 Supabase 的 cid 与通知通过 dCloud UNI‑PUSH 下发的端到端操作步骤。 +- 该文档记录对 `server/push-server.js` 的修改、运行所需的环境变量、表结构依赖、以及如何把 Supabase 的 cid 与通知通过云函数(`CLOUD_FUNC_URL`)下发的端到端操作步骤。 变更要点(代码修改摘要) -- supaFetch: 默认仅发送 `apikey`;仅当 `SUPA_USE_BEARER=true` 或 `SUPA_KEY` 看起来像 JWT(包含两处 ".")时,才发送 `Authorization: Bearer`。避免把明文 service key 当作 JWT 发出导致 PostgREST 拒绝。 -- 新增 endpoint `/api/v1/notifications`:将通知写入 `express_notifications`,根据 `aud` 与 `recipient_id` 查询 `push_devices`,再发送推送(proxy 或 mock),并写回通知状态。 -- 新增 uni-push adapter `sendToUniPush(targets, notification, payload)`:当设置了 `UNI_PUSH_URL` 时,`/api/v1/push/send` 与 `/api/v1/notifications` 会调用该适配器优先发送到 UNI‑PUSH;否则若设置了 `PUSH_PROXY_URL` 则转发到该 URL。 +- supaFetch: 默认仅发送 `apikey`;仅当 `SUPA_USE_BEARER=true` 时才发送 `Authorization: Bearer`。用于避免自托管 Supabase/Kong 场景下因 JWT_SECRET 不一致触发 `PGRST301`。 +- 新增 endpoint `/api/v1/notifications`:将通知写入 `express_notifications`(排队);由消费者(轮询)读取待处理记录并 POST 到 `CLOUD_FUNC_URL`。 +- 仅云函数模式:`server/push-server.js` 已禁用 UNI‑PUSH/代理分支,发送只走 `CLOUD_FUNC_URL`。 + + 新增/修改的接口(简要) - GET `/health` — 健康检查。 - POST `/api/v1/push/register` — 注册/更新设备;会写本地 `server/data/push_devices.json`,并尝试 upsert 到 Supabase `push_devices` 表(如果配置了 SUPA_URL + SERVICE_ROLE_KEY)。 - POST `/api/v1/push/unregister` — 注销设备(本地并尝试同步 Supabase)。 - GET `/api/v1/push/devices` — 列出设备(优先从 Supabase 获取)。 -- POST `/api/v1/push/send` — 直接按 `cids` 或 `user_id` 发送推送;若 `UNI_PUSH_URL` 存在使用 adapter,否则若 `PUSH_PROXY_URL` 存在转发,默认 mock 返回。 -- POST `/api/v1/notifications` — 将通知写入 `express_notifications` 并基于 `aud`/`recipient_id` 拉取 `push_devices` 发推送,成功/失败状态写回 `express_notifications.status_code`。 +- POST `/api/v1/push/send` — 直接按 `cids` 或 `user_id` 发送推送(仅云函数:对每个 cid POST 到 `CLOUD_FUNC_URL`)。 +- POST `/api/v1/notifications` — 将通知写入 `express_notifications`(排队);实际下发由消费者完成。 依赖的数据库表(必须存在) - `public.push_devices`:用于存储设备 cid、user_id/merchant_id、is_active 等(见仓库迁移脚本 `20260224_add_push_devices_and_notifications.sql`)。 @@ -24,10 +30,14 @@ - SUPA_URL — Supabase REST(PostgREST)地址(内部建议 `http://rest:3000`)。 - SERVICE_ROLE_KEY 或 SUPA_KEY — 用作 `apikey` 向 PostgREST 请求(不要把明文放到 Authorization 除非该值确为 JWT)。 - SUPA_USE_BEARER — (可选) 若为 `true` 则强制发送 Authorization: Bearer 。 -- UNI_PUSH_URL — (可选) 若设置则使用内置 `sendToUniPush` adapter 直接调用 dCloud uni-push 接口。 -- UNI_PUSH_APPID / UNI_PUSH_SECRET — adapter 用于构造或鉴权(按你现有 uni-push 接口调整)。 -- PUSH_PROXY_URL / PUSH_PROXY_TOKEN — 若不使用 adapter,可把此设置为你现有的推送代理 URL 与 token,后端会将 {targets, notification, payload} 转发过去。 -- PUSH_PROXY_URL 优先级低于 UNI_PUSH_URL:若 UNI_PUSH_URL 存在,优先使用本地 adapter。 +- ENABLE_CONSUMER / CONSUMER_ENABLED — 启用消费者轮询(从 express_notifications 读取待处理记录)。 +- CONSUMER_POLL_MS — 轮询间隔(毫秒)。 +- CLOUD_FUNC_URL — 云函数外网调用 URL(每个目标 cid 会对该 URL 发 POST)。 +- PUSH_TOKEN — (可选) 云函数鉴权 token(会在 POST body 的 token 字段中透传)。 + + 运行与测试(本地示例) 1) 安装依赖并启动: @@ -57,6 +67,46 @@ curl -X POST http://localhost:7301/api/v1/notifications \ -d '{"aud":"user","recipient_id":"","notification":{"title":"测试","body":"uni-push 测试"}}' ``` +> 说明(仅云函数模式):`/api/v1/notifications` 仅写入 `express_notifications`(排队)。实际下发由消费者轮询后对 `CLOUD_FUNC_URL` 执行 POST。 + +端到端验证(推荐) +----------------- + +前置条件 +- `CLOUD_FUNC_URL` 可访问且返回 2xx(否则会被标记为 failed/retrying)。 +- 目标用户在 `push_devices` 中存在至少一个 `is_active=true` 的设备(否则会被标记为 no-targets)。 + +PowerShell 示例(Windows) +```powershell +# 1) 注册设备(写入 push_devices) +Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/push/register' -Method POST -ContentType 'application/json' -Body (@{ + cid='CID_TEST_001' + user_id='a8e3a568-fc1f-4237-bcc5-5722e2fca0a3' + platform='android' +} | ConvertTo-Json) + +# 2) 写入通知(排队) +$body = @{ + aud='user' + recipient_id='a8e3a568-fc1f-4237-bcc5-5722e2fca0a3' + notification=@{ title='测试'; body='hello' } + payload=@{ order_id='123' } +} | ConvertTo-Json -Depth 6 +Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/notifications' -Method POST -ContentType 'application/json' -Body $body + +# 3) 等待 2 秒(CONSUMER_POLL_MS 默认 2000),观察 push-server 控制台日志:应出现对 CLOUD_FUNC_URL 的 POST +``` + +如何确认处理结果 +- 在 Supabase 中查询最近记录: + +```sql +select id, message_id, status_code, retry_count, last_error, updated_at +from public.express_notifications +order by created_at desc +limit 10; +``` + 5) 直接按 cid 发(跳过 DB): ```bash curl -X POST http://localhost:7301/api/v1/push/send \ @@ -65,8 +115,9 @@ curl -X POST http://localhost:7301/api/v1/push/send \ ``` UNI‑PUSH 集成注意事项 -- adapter 当前构造 body 使用 `cidList` 与 `message:{title,content,payload}`。请根据你已经验证成功的 uni-push curl 请求体调整字段名与鉴权 header(可使用 `UNI_PUSH_APPID`、`UNI_PUSH_SECRET`、或 `PUSH_PROXY_TOKEN`)。 -- 建议:把你成功的 uni-push curl 发给我,我可以把 adapter 的 body/header 精确改成一致格式。 + 故障与排查要点 - 如果 Supabase 报 401 或 PGRST301:不要把明文 service key 作为 Bearer;使用 `apikey` header,或生成并使用与 `PGRST_JWT_SECRET` 匹配的 JWT。可通过 `docker inspect` 检查容器 env 中的 `PGRST_JWT_SECRET`。 @@ -138,7 +189,8 @@ node push-server.js ``` Push server listening on http://0.0.0.0:7301 -ENV: UNI_PUSH_URL= https://restapi.getui.com/v2/... +ENV: CLOUD_FUNC_URL configured? true +ENV: ENABLE_CONSUMER= true Auto-deploy: spawning node D:\...\server\tools\deploy-cloudfunc.js [auto-deploy stdout] 打包目录: D:\...\uniCloud-alipay\cloudfunctions\testUnipush2 [auto-deploy stdout] 打包完成 -> D:\...\server\dist\testUnipush2.zip (1324 bytes) @@ -286,11 +338,19 @@ Auto-deploy process exited with code=0 signal=null - 目的:从数据库 `express_notifications` 拉取待发送的消息(pending),解耦写入与下发流程,保证可重试、可审计与可运维。 - 实现位置:`server/push-server.js`(已新增 consumer 逻辑)。 - 启用:设置环境变量 `ENABLE_CONSUMER=true`(或 `CONSUMER_ENABLED=true`),可选配置轮询间隔 `CONSUMER_POLL_MS`(默认 2000 ms)。 + + - 轮询频率说明:默认 `CONSUMER_POLL_MS` 为 `2000`(即每 2000 毫秒,2 秒轮询一次)。若需调整频率,请设置环境变量 `CONSUMER_POLL_MS` 为毫秒数。例如将间隔改为 5 秒: + + ```powershell + $env:ENABLE_CONSUMER="true" + $env:CONSUMER_POLL_MS="5000" + node push-server.js + ``` - 关键环境变量:`SUPA_URL`、`SUPA_KEY`(Supabase REST)、`CLOUD_FUNC_URL`(云函数 invoke URL)、`PUSH_TOKEN`(云函数鉴权)。 - 行为摘要: - 轮询 `express_notifications`(status_code IS NULL)并选取记录; - 通过带过滤的 PATCH 抢占(将 `status_code` 设为 `processing`)以避免并发重复处理; - - 查询目标设备(`push_devices`),对每个 `cid` 构造 event 并 POST 到 `CLOUD_FUNC_URL`(若未配置则回退到 `UNI_PUSH_URL` 适配器); + - 查询目标设备(`push_devices`),对每个 `cid` 构造 event 并 POST 到 `CLOUD_FUNC_URL`;若未配置 `CLOUD_FUNC_URL` 则本次处理将失败并写入错误原因; - 根据调用结果回写 `express_notifications.status_code` 为 `success` / `failed` / `no-targets`。 - 限制与扩展点:当前 consumer 依赖 Supabase REST;尚未在 DB 中新增 `retry_count`/`last_error` 字段(建议在迁移中加入以支持指数退避与重试);为保证高可用建议配合 `FOR UPDATE SKIP LOCKED` 或 Supabase Realtime 优化并发策略。 diff --git a/server/README.md b/server/README.md index 6d650a63..c60d8a26 100644 --- a/server/README.md +++ b/server/README.md @@ -1,5 +1,101 @@ 如需我在本地使用你的 Supabase 凭证演示一次完整的注册→查询→发送流程,或将持久化切换为仅 Supabase(移除本地 JSON 回退),请直接告诉我你的选择并提供测试凭证或确认权限范围。 +## Cloud 函数 & 消费者(自动调用) + +- 说明:`server/push-server.js` 并非由 Supabase 在插入时直接触发云函数;当启用消费者(轮询)时,服务会定期读取 Supabase 表 `express_notifications` 的待处理记录,然后对配置的 `CLOUD_FUNC_URL` 发起 POST 请求。 +- 关键代码位置(仓库): + - 轮询待推送记录: [server/push-server.js](server/push-server.js#L271-L286) (`fetchPendingNotifications`) + - 调用云函数的实现: [server/push-server.js](server/push-server.js#L313-L323) (`invokeCloudFuncForCid`) + - 在处理记录時对每个 `cid` 调用云函数: [server/push-server.js](server/push-server.js#L360-L370) + - 启动消费者(定时轮询): [server/push-server.js](server/push-server.js#L417-L418) (`setInterval(consumerOnce, CONSUMER_POLL_MS)`) + - 写入通知的 HTTP 接口(会插入 `express_notifications`): [server/push-server.js](server/push-server.js#L594-L610) (`POST /api/v1/notifications`) + +- 所需/可选环境变量: + - 必要:`SUPA_URL`、`SUPA_KEY`(或 `SERVICE_ROLE_KEY`,用于读取/写入 Supabase) + - 启用消费者:`ENABLE_CONSUMER=true` 或 `CONSUMER_ENABLED=true` + - 云函数地址:`CLOUD_FUNC_URL`(每个目标 `cid` 会对该 URL 发 POST) + - 可选鉴权透传:`PUSH_TOKEN`(会在 POST body 的 `token` 字段中传递) + - 轮询与重试配置:`CONSUMER_POLL_MS`、`MAX_RETRIES`、`RETRY_INITIAL_MS`、`RETRY_FACTOR`、`RETRY_MAX_MS` + +- POST 请求体(发送到 `CLOUD_FUNC_URL`): + +```json +{ + "token": "(来自 env:PUSH_TOKEN 或 null)", + "push_clientid": "目标 cid", + "title": "通知标题", + "content": "通知内容", + "payload": { } +} +``` + +- 成功判定:云函数应返回 HTTP 2xx(服务会把非 2xx 或网络错误视为失败并按重试策略重试或标记失败)。 + +- 快速启用示例(PowerShell): + +```powershell +$env:SUPA_URL="https://your-supabase.example" +$env:SUPA_KEY="your-service-role-key" +$env:CLOUD_FUNC_URL="https://your-cloudfunc.example/handle" +$env:ENABLE_CONSUMER="true" +node server/push-server.js +``` + +- 本地测试:直接调用云函数验证可达性: + +```bash +curl -X POST https://your-cloudfunc.example/handle -H "Content-Type: application/json" \ + -d '{"token":"test","push_clientid":"CID123","title":"测试","content":"hello","payload":{}}' +``` + +- 通过完整链路测试(写入通知 -> 消费者轮询 -> 云函数 POST): + +> 注意:`POST /api/v1/notifications` 在“仅云函数模式”下只负责把通知写入 `express_notifications`(排队),不会在该请求内立即下发;实际下发由消费者轮询后对 `CLOUD_FUNC_URL` 执行 POST。 + +> 前置条件:目标用户/商户必须在 `push_devices` 表(或本地 `server/data/push_devices.json`)中存在至少一个 `is_active=true` 的设备,否则该条通知会被标记为 `no-targets`。 + +```bash +curl -X POST http://localhost:7301/api/v1/notifications \ + -H "Content-Type: application/json" \ + -d '{"aud":"user","recipient_id":123,"notification":{"title":"测试","body":"hello"},"payload":{}}' +``` + +- 端到端验证(推荐步骤,PowerShell): + +```powershell +# 1) 注册一个设备(cid + user_id),写入 push_devices +Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/push/register' -Method POST -ContentType 'application/json' -Body (@{ + cid='CID_TEST_001'; + user_id='a8e3a568-fc1f-4237-bcc5-5722e2fca0a3'; + platform='android' +} | ConvertTo-Json) + +# 2) 写入一条通知到 express_notifications(排队) +$body = @{ + aud='user' + recipient_id='a8e3a568-fc1f-4237-bcc5-5722e2fca0a3' + notification=@{ title='测试'; body='hello' } + payload=@{ order_id='123' } +} | ConvertTo-Json -Depth 6 +Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/notifications' -Method POST -ContentType 'application/json' -Body $body + +# 3) 等待 2 秒(CONSUMER_POLL_MS=2000),观察 push-server 控制台:应出现对 CLOUD_FUNC_URL 的 POST +``` + +- 如何确认是否已处理:在 Supabase 查询最近记录的 `status_code` / `last_error`: + +```sql +select id, message_id, status_code, retry_count, last_error, updated_at +from public.express_notifications +order by created_at desc +limit 10; +``` + +- 常见现象解释: + - push-server 日志里 `supaFetch response preview: []`:表示当前没有 pending/retrying 且到期的记录可处理(队列为空)。 + +- 注意:如果你需要“插入时立即触发云函数”的实时行为,可考虑将轮询改为 Supabase Realtime 订阅或使用 Supabase 的 Edge Function / webhook 触发器;我可以协助把轮询替换为实时订阅的示例实现。 + --- ## 故障排查记录(已执行) @@ -41,7 +137,7 @@ Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/push/devices?user_id= 调用云函数”的链路,可把已知的 device(cid + user_id)写入本地 `server/data/push_devices.json`,并确保已配置 `CLOUD_FUNC_URL`,然后调用 `/api/v1/push/send`(会直接 POST 到云函数): ```powershell # 查看本地 devices 文件 @@ -52,7 +148,7 @@ $devs = Get-Content .\server\data\push_devices.json -Raw | ConvertFrom-Json $devs += @{ cid='d9aa69ec415...'; user_id='a8e3a568-fc1f-4237-bcc5-5722e2fca0a3'; platform='android'; created_at=(Get-Date).ToString('o'); updated_at=(Get-Date).ToString('o'); active=$true } $devs | ConvertTo-Json -Depth 5 | Set-Content .\server\data\push_devices.json -Encoding utf8 -# 然后测试发送(mock) +# 然后测试发送(会直接调用 CLOUD_FUNC_URL) Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/push/send' -Method POST -ContentType 'application/json' -Body (@{ user_id='a8e3a568-fc1f-4237-bcc5-5722e2fca0a3'; notification=@{ title='测试'; body='hello' } } | ConvertTo-Json) ``` @@ -70,9 +166,15 @@ Invoke-RestMethod -Uri 'http://localhost:7301/api/v1/push/send' -Method POST -Co - 注册/更新设备:`POST /api/v1/push/register` { cid, user_id, platform } - 注销设备:`POST /api/v1/push/unregister` { cid | user_id } - 列出设备:`GET /api/v1/push/devices?user_id=...&active=true|false` -- 发送推送(模拟):`POST /api/v1/push/send` { cids:[], user_id, notification, payload } +- 发送推送(云函数模式):`POST /api/v1/push/send` { cids:[], user_id, notification, payload }(直接 POST 到 `CLOUD_FUNC_URL`) -如果你有真实的推送服务端 API,可以设置环境变量 `PUSH_PROXY_URL`(和可选的 `PUSH_PROXY_TOKEN`),服务器会将 `/api/v1/push/send` 请求代理到该 URL。 + 快速使用: @@ -135,7 +237,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS ux_push_devices_appid_cid ON public.push_devic ```bash curl -X POST "${SUPA_URL}/rest/v1/push_devices?on_conflict=cid" \ -H "apikey: ${SUPA_KEY}" \ - -H "Authorization: Bearer ${SUPA_KEY}" \ + # 如需发送 Authorization: Bearer,请设置 SUPA_USE_BEARER=true(默认只发送 apikey) -H "Content-Type: application/json" \ -d '[{"cid":"CID_TEST_001","user_id":"","platform":"android","appid":"default","is_active":true}]' ``` @@ -158,7 +260,7 @@ curl -X POST "${SUPA_URL}/rest/v1/push_devices?on_conflict=cid" \ - `POST /api/v1/push/register`:注册或更新设备,body: `{ cid, user_id, platform, ... }` - `POST /api/v1/push/unregister`:注销设备,body: `{ cid | user_id }` - `GET /api/v1/push/devices`:列出设备,query: `user_id`, `active` -- `POST /api/v1/push/send`:发送推送(开发环境为 mock;可代理到 `PUSH_PROXY_URL`) +- `POST /api/v1/push/send`:发送推送(仅云函数:直接调用 `CLOUD_FUNC_URL`;未配置则返回错误) **数据与持久化** - 开发默认将设备保存在本地文件:`server/data/push_devices.json`(回退/离线使用)。 @@ -166,8 +268,9 @@ curl -X POST "${SUPA_URL}/rest/v1/push_devices?on_conflict=cid" \ **重要环境变量** - `PORT`:监听端口(默认 7301) -- `PUSH_PROXY_URL` / `PUSH_PROXY_TOKEN`:将 `/api/v1/push/send` 代理到真实推送服务 - `SUPA_URL` / `SUPA_KEY` / `SUPA_SCHEMA`:启用 Supabase 同步(`SUPA_KEY` 应为服务端 `service_role`,仅服务器端使用) +- `SUPA_USE_BEARER`:(可选)仅当为 `true` 时才发送 `Authorization: Bearer `;默认只发送 `apikey`。 +- `CLOUD_FUNC_URL` / `PUSH_TOKEN`:云函数调用地址 / (可选)鉴权 token **安全与部署注意** - 切勿将 `SUPA_KEY`(service_role)暴露给客户端;只能在后端使用。 diff --git a/server/config.json b/server/config.json new file mode 100644 index 00000000..4cadb1ad --- /dev/null +++ b/server/config.json @@ -0,0 +1,13 @@ +{ + "SUPA_URL": "http://192.168.1.62:18000", + "SUPA_KEY": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoic2VydmljZV9yb2xlIiwiaXNzIjoic3VwYWJhc2UtMSIsImlhdCI6MTc2OTY3NjQ5OCwiZXhwIjoxOTI3MzU2NDk4fQ.ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", + "CLOUD_FUNC_URL": "https://env-00jy5x5oy9zd.dev-hz.cloudbasefunction.cn/test", + "ENABLE_CONSUMER": "true", + "CONSUMER_POLL_MS": "2000", + "PUSH_TOKEN": "", + "MAX_RETRIES": "5", + "RETRY_INITIAL_MS": "5000", + "RETRY_FACTOR": "2", + "RETRY_MAX_MS": "3600000", + "PORT": "7301" +} diff --git a/server/config.json.example b/server/config.json.example new file mode 100644 index 00000000..62b450db --- /dev/null +++ b/server/config.json.example @@ -0,0 +1,13 @@ +{ + "SUPA_URL": "https://your-supabase.example", + "SUPA_KEY": "your-service-role-key", + "CLOUD_FUNC_URL": "https://your-cloudfunc.example/handle", + "ENABLE_CONSUMER": "true", + "CONSUMER_POLL_MS": "2000", + "PUSH_TOKEN": "optional-push-token", + "MAX_RETRIES": "5", + "RETRY_INITIAL_MS": "5000", + "RETRY_FACTOR": "2", + "RETRY_MAX_MS": "3600000", + "PORT": "7301" +} diff --git a/server/load-config.js b/server/load-config.js new file mode 100644 index 00000000..a3aa75b3 --- /dev/null +++ b/server/load-config.js @@ -0,0 +1,68 @@ +const fs = require('fs') +const path = require('path') + +function loadJsonConfig(filePath) { + try { + const txt = fs.readFileSync(filePath, 'utf8') + const obj = JSON.parse(txt) + Object.keys(obj).forEach(k => { + if (process.env[k] === undefined) process.env[k] = String(obj[k]) + }) + console.log('Loaded config from', filePath) + return true + } catch (e) { + return false + } +} + +function loadDotEnv(filePath) { + try { + const txt = fs.readFileSync(filePath, 'utf8') + const lines = txt.split(/\r?\n/) + for (const line of lines) { + const l = line.trim() + if (!l || l.startsWith('#')) continue + const idx = l.indexOf('=') + if (idx === -1) continue + const key = l.slice(0, idx).trim() + let val = l.slice(idx + 1).trim() + if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) { + val = val.slice(1, -1) + } + if (process.env[key] === undefined) process.env[key] = val + } + console.log('Loaded .env from', filePath) + return true + } catch (e) { + return false + } +} + +// Precedence (higher wins because we only fill undefined keys): +// 1) real environment variables +// 2) CONFIG_FILE / CONFIG_PATH (explicit) +// 3) .env (local secrets) +// 4) config.json (repo/local config) +// 5) config.json.example (defaults) +const explicitPath = process.env.CONFIG_FILE || process.env.CONFIG_PATH +const candidates = [ + explicitPath ? path.resolve(explicitPath) : null, + path.join(__dirname, '.env'), + path.join(__dirname, 'config.json'), + path.join(__dirname, 'config.json.example') +].filter(Boolean) + +let loadedAny = false +for (const p of candidates) { + if (!fs.existsSync(p)) continue + const ok = p.endsWith('.env') ? loadDotEnv(p) + : p.endsWith('.json') ? loadJsonConfig(p) + : false + if (ok) loadedAny = true +} + +if (!loadedAny) { + // no config found; silent +} + +module.exports = {} diff --git a/server/push-server.js b/server/push-server.js index 31a82e02..3678df09 100644 --- a/server/push-server.js +++ b/server/push-server.js @@ -1,10 +1,15 @@ +try { + require('./load-config') +} catch (e) { + // no-op if no config file +} + const express = require('express') const bodyParser = require('body-parser') const cors = require('cors') const fs = require('fs').promises const path = require('path') const fetch = require('node-fetch') -const crypto = require('crypto') const { spawn } = require('child_process') const PORT = process.env.PORT || 7301 @@ -55,13 +60,15 @@ async function writeDevices(devices) { async function supaFetch(path, opts = {}) { const url = `${SUPA_URL.replace(/\/$/, '')}/rest/v1/${path}` - // 默认只发送 apikey。只有在显式要求 Bearer(环境变量 SUPA_USE_BEARER=true) - // 或者看到 SUPA_KEY 看起来像 JWT(包含两处 ".")时,才添加 Authorization 头。 + // 默认只发送 apikey。 + // 只有在显式要求 Bearer(环境变量 SUPA_USE_BEARER=true)时,才添加 Authorization 头。 + // 注意:在一些自托管 Supabase/Kong 场景中,apikey 可用于 Kong 的 key-auth, + // 但 PostgREST 可能无法解码我们手动附加的 Bearer JWT(JWT_SECRET 不一致会触发 PGRST301)。 const headers = Object.assign({}, opts.headers || {}, { apikey: SUPA_KEY, Accept: 'application/json' }) - const sendBearer = (process.env.SUPA_USE_BEARER === 'true') || (SUPA_KEY && (SUPA_KEY.split('.').length === 3)) + const sendBearer = (process.env.SUPA_USE_BEARER === 'true') if (sendBearer) headers.Authorization = `Bearer ${SUPA_KEY}` try { console.log('supaFetch ->', url) @@ -87,146 +94,10 @@ async function supaFetch(path, opts = {}) { } } -// 适配并发送到 dCloud uni-push(若设置了 UNI_PUSH_URL) -async function sendToUniPush(targets, notification, payload) { - const uniUrlRaw = process.env.UNI_PUSH_URL || '' - if (!uniUrlRaw) throw new Error('UNI_PUSH_URL not configured') - const appid = process.env.UNI_PUSH_APPID || '5fSIfMap289ymbdnkfMJ29' - const appkey = process.env.UNI_PUSH_APPKEY || '33z3cfHI6Z9TpzyQXRWu01' - const secret = process.env.UNI_PUSH_SECRET || process.env.PUSH_PROXY_TOKEN || 'PkVpjStqwW9BourzryQHc7' - const authUrl = process.env.UNI_PUSH_AUTH_URL || 'https://restapi.getui.com/v2/5fSIfMap289ymbdnkfMJ29/auth' - - // 全局缓存 token - if (!global.__uniPushTokenCache) global.__uniPushTokenCache = {} - - async function obtainToken() { - if (authUrl) { - const cache = global.__uniPushTokenCache[authUrl] - if (cache && cache.expiresAt && Date.now() < cache.expiresAt - 5000) return cache.token - try { - // 为了解决 provider 对 timestamp/sign 规则不同的情况,先尝试当前 env 配置; - // 若返回 timestamp/sign 错误则自动按常见组合重试(秒/毫秒、md5/hmac、大小写、是否包含 secret) - const tried = new Set() - - const timestampUnits = ['ms', 's'] - const signAlgos = ['md5', 'hmac-sha256'] - const signCases = ['lower', 'upper'] - const includeSecretOptions = [true, false] - - // 首选使用环境变量指定的选项(若有) - const envUnit = process.env.UNI_PUSH_TIMESTAMP_UNIT || 'ms' - const envAlgo = (process.env.UNI_PUSH_SIGN_ALGO || 'md5').toLowerCase() - const envCase = (process.env.UNI_PUSH_SIGN_CASE || 'lower').toLowerCase() - const envInclude = (process.env.UNI_PUSH_INCLUDE_SECRET === 'true') - - const candidates = [] - // push env preferred first - candidates.push({ unit: envUnit, algo: envAlgo, signCase: envCase, includeSecret: envInclude }) - for (const u of timestampUnits) for (const a of signAlgos) for (const c of signCases) for (const inc of includeSecretOptions) { - const key = `${u}|${a}|${c}|${inc}` - if (candidates.find(x => `${x.unit}|${x.algo}|${x.signCase}|${x.includeSecret}` === key)) continue - candidates.push({ unit: u, algo: a, signCase: c, includeSecret: inc }) - } - - for (const cand of candidates) { - const { unit, algo, signCase, includeSecret } = cand - const stamp = (unit === 's') ? String(Math.floor(Date.now() / 1000)) : String(Date.now()) - const signAlgo = (algo || 'md5').toLowerCase() - let sign = null - try { - if (secret) { - if (signAlgo === 'md5') sign = crypto.createHash('md5').update(String(appkey || '') + stamp + String(secret)).digest('hex') - else if (signAlgo === 'hmac-sha256' || signAlgo === 'hmac256') sign = crypto.createHmac('sha256', String(secret)).update(String(appkey || '') + stamp).digest('hex') - else sign = crypto.createHash('md5').update(String(appkey || '') + stamp + String(secret)).digest('hex') - } - if (sign && (signCase === 'upper')) sign = sign.toUpperCase() - else if (sign) sign = sign.toLowerCase() - } catch (e) { - console.warn('生成 sign 失败,继续无 sign 请求', e) - } - - const b = {} - if (appid) { b.appId = appid; b.appid = appid } - if (appkey) { b.appKey = appkey; b.appkey = appkey } - b.timestamp = stamp - if (sign) b.sign = sign - if (includeSecret && secret) { b.appSecret = secret; b.appsecret = secret } - - const tryKey = JSON.stringify(b) - if (tried.has(tryKey)) continue - tried.add(tryKey) - - // Mask debug copy - try { - const dbg = Object.assign({}, b) - if (dbg.appSecret) dbg.appSecret = '***masked***' - console.log('uni-push auth attempt:', { unit, algo: signAlgo, signCase, includeSecret }) - console.log('uni-push auth body:', JSON.stringify(dbg)) - console.log('uni-push auth url:', authUrl) - } catch (e) { /* ignore */ } - - const resp = await fetch(authUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(b) }) - let txtDbg = '' - try { txtDbg = await resp.clone().text().catch(() => '') } catch (e) { /* ignore */ } - if (txtDbg && txtDbg.length < 2000) console.log('uni-push auth response preview:', txtDbg) - else console.log('uni-push auth response status:', resp.status) - - let j = null - try { j = await resp.json().catch(() => null) } catch (e) { j = null } - const token = (j && (j.token || (j.data && j.data.token) || j.access_token)) || null - const expires = (j && (j.expires_in || (j.data && j.data.expires_in))) || 0 - if (token) { - global.__uniPushTokenCache[authUrl] = { token, expiresAt: expires ? (Date.now() + (expires * 1000)) : (Date.now() + 55 * 60 * 1000) } - console.log('uni-push auth succeeded with variant:', { unit, algo: signAlgo, signCase, includeSecret }) - return token - } - - // 若 provider 明确提示 timestamp/sign 错误,则继续尝试其他组合,否则可能无需重试 - const code = j && (j.code || (j.error && j.error.code)) - const msg = j && (j.msg || j.message || (j.error && j.error.message)) - if (code && !String(code).match(/20001|10001/)) { - // 非 timestamp/sign 类型错误,不再重试 - console.log('uni-push auth returned non-sign/timestamp error, stopping retries:', code, msg) - break - } - console.log('uni-push auth did not return token, response code/msg:', code, msg) - // 否则继续下一种变体 - } - } catch (e) { - console.warn('uni-push auth failed', e) - } - return null - } - - if (secret) return secret - return null - } - - const token = await obtainToken() - - // 支持 {appId} 占位符替换 - let uniUrl = uniUrlRaw - if (appid && uniUrl.includes('{appId}')) uniUrl = uniUrl.replace(/\{appId\}/g, appid) - - console.log('uni-push send: uniUrl=', uniUrl, 'token?', !!token, 'targetsLen=', targets.length) - - const body = { - appid: appid || undefined, - cidList: targets, - message: { - title: notification && notification.title, - content: notification && notification.body, - payload: payload || {} - } - } - - const headers = { 'Content-Type': 'application/json' } - if (token) headers.Authorization = `Bearer ${token}` - - const resp = await fetch(uniUrl, { method: 'POST', headers, body: JSON.stringify(body) }) - const text = await resp.text().catch(() => '') - try { if (text && text.length < 2000) console.log('uni-push send response preview:', text); else console.log('uni-push send response status:', resp.status) } catch (e) { /* ignore */ } - return { status: resp.status, body: text } +// Cloud-function-only: +// UNI-PUSH adapter has been disabled. Push delivery should be implemented inside the cloud function. +async function sendToUniPush() { + throw new Error('UNI-PUSH adapter disabled (cloud-function-only mode)') } async function getDevicesFromSupabase({ user_id, active } = {}) { @@ -272,10 +143,11 @@ async function start() { // 只在配置了 Supabase REST 时工作 if (!SUPA_URL || !SUPA_KEY) return [] try { - // 查询 status_code IS NULL (pending) 或 next_attempt_at <= now 的记录 + // 查询: + // - pending: status_code IS NULL + // - retrying: status_code = 'retrying' 且 (next_attempt_at IS NULL 或 next_attempt_at <= now) const now = new Date().toISOString() - // 使用 OR 过滤:status_code.is.null 或 next_attempt_at <= now - const q = `express_notifications?or=(status_code.is.null,next_attempt_at=lte.${encodeURIComponent(now)})&order=created_at.asc&limit=${limit}` + const q = `express_notifications?or=(status_code.is.null,and(status_code.eq.retrying,or(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})))&order=created_at.asc&limit=${limit}` const resp = await supaFetch(q, { method: 'GET' }) if (!resp.ok) return [] const data = await resp.json().catch(() => []) @@ -290,11 +162,13 @@ async function start() { try { const body = { status_code: 'processing', updated_at: new Date().toISOString() } // 仅当当前仍为 pending 或 retrying 且到期时,才抢占 - const path = `express_notifications?id=eq.${encodeURIComponent(id)}&or=(status_code.is.null,next_attempt_at=lte.${encodeURIComponent(new Date().toISOString())})` + const now = new Date().toISOString() + const path = `express_notifications?id=eq.${encodeURIComponent(id)}&or=(status_code.is.null,and(status_code.eq.retrying,or(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})))` const resp = await supaFetch(path, { method: 'PATCH', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify(body) }) if (!resp.ok) return null const j = await resp.json().catch(() => null) - if (Array.isArray(j) && j.length > 0) return j[0] + if (Array.isArray(j)) return j.length > 0 ? j[0] : null + if (!j || !j.id) return null return j } catch (e) { console.warn('claimNotification error', e) @@ -304,6 +178,7 @@ async function start() { async function updateNotificationStatus(id, status, note) { try { + if (!id) return const body = { status_code: String(status), updated_at: new Date().toISOString() } if (note) body.event_text_safe = String(note).substring(0, 2000) await supaFetch(`express_notifications?id=eq.${encodeURIComponent(id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) }) @@ -358,7 +233,7 @@ async function start() { return } - // If CLOUD_FUNC_URL is configured, POST per cid to cloud function; otherwise fallback to sendToUniPush if configured + // Cloud-function-only: always POST per cid to CLOUD_FUNC_URL let allOk = true let lastNote = '' if (CLOUD_FUNC_URL) { @@ -366,14 +241,9 @@ async function start() { for (const c of calls) { if (!c.ok) { allOk = false; lastNote = (c.error || JSON.stringify(c.body)).toString().substring(0, 1000); break } } - } else if (process.env.UNI_PUSH_URL) { - try { - const r = await sendToUniPush(targets, notification, payload) - if (!r || (r.status && r.status >= 400)) { allOk = false; lastNote = JSON.stringify(r && r.body).substring(0, 1000) } - } catch (e) { allOk = false; lastNote = String(e) } } else { allOk = false - lastNote = 'no CLOUD_FUNC_URL and no UNI_PUSH_URL configured' + lastNote = 'no CLOUD_FUNC_URL configured' } if (allOk) { @@ -544,50 +414,11 @@ async function start() { } } - // 推送优先策略: - // 1) 若配置了 UNI_PUSH_URL,优先使用本地适配器 sendToUniPush - // 2) 否则若配置了 PUSH_PROXY_URL,则转发到 proxy - // 3) 否则进入 mock - const proxyUrl = process.env.PUSH_PROXY_URL || '' - const proxyToken = process.env.PUSH_PROXY_TOKEN || '' - console.log('send handler decision: UNI_PUSH_URL=', process.env.UNI_PUSH_URL, 'PUSH_PROXY_URL=', proxyUrl) - console.log('send handler targets count:', targets.length) - if (process.env.UNI_PUSH_URL) { - try { - const result = await sendToUniPush(targets, notification, payload) - return res.json({ ok: true, proxied: true, status: result.status, response: result.body }) - } catch (e) { - console.warn('本地 uni-push 适配器推送失败:', e) - return res.status(500).json({ ok: false, error: String(e) }) - } - } else if (proxyUrl) { - try { - const resp = await fetch(proxyUrl, { - method: 'POST', - headers: Object.assign({ 'Content-Type': 'application/json' }, proxyToken ? { Authorization: `Bearer ${proxyToken}` } : {}), - body: JSON.stringify({ targets, notification, payload }) - }) - const data = await resp.text() - return res.json({ ok: true, proxied: true, status: resp.status, response: data }) - } catch (e) { - console.warn('代理推送失败:', e) - return res.status(500).json({ ok: false, error: String(e) }) - } - } - - // 否则仅记录并返回模拟结果 - console.log('Mock push to', targets.length, 'clients (no UNI_PUSH_URL and no PUSH_PROXY_URL matched)') - try { - console.log('notification:', notification ? JSON.stringify(notification, null, 2) : null) - } catch (e) { - console.log('notification:', notification) - } - try { - console.log('payload:', payload ? JSON.stringify(payload, null, 2) : '{}') - } catch (e) { - console.log('payload:', payload) - } - return res.json({ ok: true, mocked: true, sent: targets.length, payload: payload || {} }) + if (!CLOUD_FUNC_URL) return res.status(500).json({ ok: false, error: 'CLOUD_FUNC_URL not configured' }) + console.log('send handler (cloud function only) targets count:', targets.length) + const calls = await Promise.all((targets || []).map(cid => invokeCloudFuncForCid(CLOUD_FUNC_URL, PUSH_TOKEN, cid, notification && notification.title, notification && (notification.body || notification.content), payload))) + const allOk = calls.every(c => c && c.ok) + return res.status(allOk ? 200 : 502).json({ ok: allOk, sent: targets.length, results: calls }) }) // 创建通知并基于数据库记录推送(写入 express_notifications 并发送) @@ -635,53 +466,10 @@ async function start() { console.warn('fetch devices for notification failed:', e) } - // 进行推送(与 /push/send 相同逻辑:proxy 或 mock) - const proxyUrl = process.env.PUSH_PROXY_URL || '' - const proxyToken = process.env.PUSH_PROXY_TOKEN || '' - if (proxyUrl) { - try { - // 若配置了 UNI_PUSH_URL,优先使用本地 adapter;否则直接转发到 proxyUrl - if (process.env.UNI_PUSH_URL) { - try { - const result = await sendToUniPush(targets, notification, payload) - // 更新通知记录状态 - if (record && record.id) { - try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: String(result.status), updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ } - } - return res.json({ ok: true, proxied: true, status: result.status, response: result.body }) - } catch (e) { - console.warn('本地 uni-push 适配器推送失败:', e) - return res.status(500).json({ ok: false, error: String(e) }) - } - } - - const resp = await fetch(proxyUrl, { - method: 'POST', - headers: Object.assign({ 'Content-Type': 'application/json' }, proxyToken ? { Authorization: `Bearer ${proxyToken}` } : {}), - body: JSON.stringify({ targets, notification, payload }) - }) - const data = await resp.text() - // 更新通知记录状态 - if (record && record.id) { - try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: String(resp.status), updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ } - } - return res.json({ ok: true, proxied: true, status: resp.status, response: data }) - } catch (e) { - console.warn('代理推送失败:', e) - return res.status(500).json({ ok: false, error: String(e) }) - } - } - - // mock 模式 - console.log('Notification mock push to', targets.length, 'clients for', aud, recipient_id) - console.log('notification:', notification) - console.log('payload:', payload) - - if (record && record.id) { - try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: 'mocked', updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ } - } - - return res.json({ ok: true, mocked: true, sent: targets.length, notification_id: record && record.id }) + // Cloud-function-only: + // - This endpoint writes to express_notifications. + // - Actual push delivery is performed by the consumer that polls express_notifications. + return res.json({ ok: true, queued: true, notification_id: record && record.id, targets: targets.length }) } catch (e) { console.warn('create notification failed:', e) return res.status(500).json({ ok: false, error: String(e) }) @@ -690,10 +478,8 @@ async function start() { app.listen(PORT, '0.0.0.0', () => { console.log(`Push server listening on http://0.0.0.0:${PORT}`) - console.log('ENV: UNI_PUSH_URL=', process.env.UNI_PUSH_URL) - console.log('ENV: UNI_PUSH_AUTH_URL=', process.env.UNI_PUSH_AUTH_URL) - console.log('ENV: UNI_PUSH_APPID=', process.env.UNI_PUSH_APPID) - console.log('ENV: PUSH_PROXY_URL=', process.env.PUSH_PROXY_URL) + console.log('ENV: CLOUD_FUNC_URL configured?', !!process.env.CLOUD_FUNC_URL) + console.log('ENV: ENABLE_CONSUMER=', process.env.ENABLE_CONSUMER) // // 可选:在启动时自动运行部署脚本以打包并上传云函数 // try { // const AUTO_DEPLOY = (process.env.AUTO_DEPLOY_ON_START === 'true' || process.env.AUTO_DEPLOY_ON_START === '1')