125 lines
5.4 KiB
Markdown
125 lines
5.4 KiB
Markdown
**推送工作流说明(Push Queue + 后端消费者 + 云函数 invoke)**
|
||
|
||
概述
|
||
- 目的是将数据库中产生的待推送消息可靠地传递到云函数进行实际推送(云函数使用 `uniPush`)。
|
||
- 思路:业务写库 → 写入 `push_queue`(持久化队列)→ `push-server` 或消费者监听队列 → 构造 `event` 并调用云函数 invoke URL → 云函数从 `event` 中读取参数并发送推送。
|
||
|
||
优点
|
||
- 可靠:持久化队列可重试、审计与人工干预。
|
||
- 解耦:业务写库与实际推送解耦,降低失败传播风险。
|
||
- 可扩展:支持多 worker 并发消费与并发控制。
|
||
|
||
组件与职责
|
||
- `push_queue`(数据库表):保存待推送任务与状态;保证持久化与可重试。
|
||
- `push-server`:消费者或 HTTP 接口,负责从队列读取任务、锁定、调用云函数,并更新任务状态。
|
||
- 云函数(已上传并发布一次):例如 `uniCloud` 云函数 `testUnipush2`,接收 `event` 并调用 `uniPush.sendMessage` 完成推送。
|
||
|
||
Schema 示例(Postgres / Supabase)
|
||
```sql
|
||
CREATE TABLE push_queue (
|
||
id serial PRIMARY KEY,
|
||
push_clientid varchar NOT NULL,
|
||
title text,
|
||
content text,
|
||
payload jsonb DEFAULT '{}'::jsonb,
|
||
status varchar(16) DEFAULT 'pending', -- pending, processing, success, failed
|
||
retry_count int DEFAULT 0,
|
||
last_error text,
|
||
created_at timestamptz DEFAULT now(),
|
||
updated_at timestamptz DEFAULT now()
|
||
);
|
||
CREATE INDEX ON push_queue (status);
|
||
```
|
||
|
||
写入队列(业务侧)
|
||
- 在业务事务中,除了写业务表外,同时插入一条 `push_queue` 记录(保证一致性)。
|
||
- 如果业务不想同步写队列,也可由异步任务或触发器生成。
|
||
|
||
消费者(`push-server`)行为说明
|
||
- 1) 轮询或订阅:通过 Supabase Realtime、Postgres LISTEN/NOTIFY、或定时轮询读取 `status='pending'` 的记录。
|
||
- 2) 锁定任务:在读取后将该记录设为 `processing`(或用数据库事务/乐观锁),防止重复消费。
|
||
- 3) 构造 event:
|
||
{
|
||
token: <PUSH_TOKEN>,
|
||
push_clientid: <push_clientid>,
|
||
title: <title>,
|
||
content: <content>,
|
||
payload: <payload>
|
||
}
|
||
- 4) 调用云函数:POST JSON 到云函数 invoke URL(或使用云 SDK)。
|
||
- 5) 根据响应更新:
|
||
- 成功:`status='success'`,记录返回结果。
|
||
- 失败:增加 `retry_count`、写 `last_error`、按重试策略重试或标记 `failed`。
|
||
|
||
示例消费者伪代码(Node.js)
|
||
```javascript
|
||
const fetch = require('node-fetch');
|
||
const { Pool } = require('pg');
|
||
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
|
||
|
||
async function consumeOnce() {
|
||
const client = await pool.connect();
|
||
try {
|
||
await client.query('BEGIN');
|
||
const res = await client.query("SELECT * FROM push_queue WHERE status='pending' ORDER BY created_at FOR UPDATE SKIP LOCKED LIMIT 1");
|
||
if (res.rowCount === 0) { await client.query('COMMIT'); return; }
|
||
const rec = res.rows[0];
|
||
await client.query("UPDATE push_queue SET status='processing', updated_at=now() WHERE id=$1", [rec.id]);
|
||
await client.query('COMMIT');
|
||
|
||
// 调用云函数
|
||
const body = {
|
||
token: process.env.PUSH_TOKEN,
|
||
push_clientid: rec.push_clientid,
|
||
title: rec.title,
|
||
content: rec.content,
|
||
payload: rec.payload
|
||
};
|
||
const r = await fetch(process.env.CLOUD_FUNC_URL, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) });
|
||
const txt = await r.text();
|
||
if (r.ok) {
|
||
await pool.query("UPDATE push_queue SET status='success', updated_at=now() WHERE id=$1", [rec.id]);
|
||
} else {
|
||
await pool.query("UPDATE push_queue SET status='pending', retry_count = retry_count + 1, last_error=$2, updated_at=now() WHERE id=$1", [rec.id, txt]);
|
||
}
|
||
} catch (e) {
|
||
await client.query('ROLLBACK');
|
||
console.error('consume error', e);
|
||
} finally { client.release(); }
|
||
}
|
||
|
||
setInterval(consumeOnce, 1000); // 或使用更复杂的 worker 池
|
||
```
|
||
|
||
云函数调用约定
|
||
- 云函数在 `index.js` 中接收 `event`,你的函数示例:
|
||
- 检查 `event.token` 是否与环境变量 `PUSH_TOKEN` 匹配(鉴权);
|
||
- 必要字段 `push_clientid` 缺失返回 400;
|
||
- 成功后调用 `uniPush.sendMessage(...)` 并返回 `{ errCode:0, errMsg:'success' }`。
|
||
|
||
安全与鉴权
|
||
- 在后端调用云函数时带上 `token`,不要把明文 token 写入仓库,使用环境变量或 CI secrets管理。
|
||
- `push-server` 的对外 API(若开放)应加鉴权(Bearer token/IP白名单等)。
|
||
|
||
重试与幂等
|
||
- 采用 `retry_count` + 指数退避策略;超过阈值标记 `failed` 并告警。
|
||
- 消费前将任务标记为 `processing`(或使用 `FOR UPDATE SKIP LOCKED`),防止多个 worker 重复处理。
|
||
|
||
部署注意
|
||
- 云函数只需上传/发布一次;后续调用使用 invoke URL。
|
||
- 将 `CLOUD_FUNC_URL`, `PUSH_TOKEN`, `DATABASE_URL` 等设置为运行环境的环境变量。
|
||
|
||
监控与日志
|
||
- 记录每次调用响应、失败原因、重试次数;在失败率异常时发送报警。
|
||
- 可把 `push_queue` 的任务历史导出用于审计。
|
||
|
||
示例 SQL + 流程总结见上方片段。
|
||
|
||
下一步建议
|
||
- 我可以为你:
|
||
1) 在 `push-server` 中实现上面的 PostgreSQL consumer(并提交代码);或
|
||
2) 实现基于 Supabase Realtime 的监听示例并集成;或
|
||
3) 添加一个业务 webhook API,业务方直接 POST 任务到 `push-server`,由 `push-server` 写队列并触发消费。
|
||
|
||
文件位置:server/PUSH_WORKFLOW.md
|