消息推送后台打通

This commit is contained in:
not-like-juvenile
2026-03-09 10:39:29 +08:00
parent 427010f7db
commit 436b7b251f
11 changed files with 466 additions and 291 deletions

View File

@@ -1,10 +1,15 @@
try {
require('./load-config')
} catch (e) {
// no-op if no config file
}
const express = require('express')
const bodyParser = require('body-parser')
const cors = require('cors')
const fs = require('fs').promises
const path = require('path')
const fetch = require('node-fetch')
const crypto = require('crypto')
const { spawn } = require('child_process')
const PORT = process.env.PORT || 7301
@@ -55,13 +60,15 @@ async function writeDevices(devices) {
async function supaFetch(path, opts = {}) {
const url = `${SUPA_URL.replace(/\/$/, '')}/rest/v1/${path}`
// 默认只发送 apikey。只有在显式要求 Bearer环境变量 SUPA_USE_BEARER=true
// 或者看到 SUPA_KEY 看起来像 JWT包含两处 ".")时,才添加 Authorization 头。
// 默认只发送 apikey。
// 只有在显式要求 Bearer环境变量 SUPA_USE_BEARER=true)时,才添加 Authorization 头。
// 注意:在一些自托管 Supabase/Kong 场景中apikey 可用于 Kong 的 key-auth
// 但 PostgREST 可能无法解码我们手动附加的 Bearer JWTJWT_SECRET 不一致会触发 PGRST301
const headers = Object.assign({}, opts.headers || {}, {
apikey: SUPA_KEY,
Accept: 'application/json'
})
const sendBearer = (process.env.SUPA_USE_BEARER === 'true') || (SUPA_KEY && (SUPA_KEY.split('.').length === 3))
const sendBearer = (process.env.SUPA_USE_BEARER === 'true')
if (sendBearer) headers.Authorization = `Bearer ${SUPA_KEY}`
try {
console.log('supaFetch ->', url)
@@ -87,146 +94,10 @@ async function supaFetch(path, opts = {}) {
}
}
// 适配并发送到 dCloud uni-push若设置了 UNI_PUSH_URL
async function sendToUniPush(targets, notification, payload) {
const uniUrlRaw = process.env.UNI_PUSH_URL || ''
if (!uniUrlRaw) throw new Error('UNI_PUSH_URL not configured')
const appid = process.env.UNI_PUSH_APPID || '5fSIfMap289ymbdnkfMJ29'
const appkey = process.env.UNI_PUSH_APPKEY || '33z3cfHI6Z9TpzyQXRWu01'
const secret = process.env.UNI_PUSH_SECRET || process.env.PUSH_PROXY_TOKEN || 'PkVpjStqwW9BourzryQHc7'
const authUrl = process.env.UNI_PUSH_AUTH_URL || 'https://restapi.getui.com/v2/5fSIfMap289ymbdnkfMJ29/auth'
// 全局缓存 token
if (!global.__uniPushTokenCache) global.__uniPushTokenCache = {}
async function obtainToken() {
if (authUrl) {
const cache = global.__uniPushTokenCache[authUrl]
if (cache && cache.expiresAt && Date.now() < cache.expiresAt - 5000) return cache.token
try {
// 为了解决 provider 对 timestamp/sign 规则不同的情况,先尝试当前 env 配置;
// 若返回 timestamp/sign 错误则自动按常见组合重试(秒/毫秒、md5/hmac、大小写、是否包含 secret
const tried = new Set()
const timestampUnits = ['ms', 's']
const signAlgos = ['md5', 'hmac-sha256']
const signCases = ['lower', 'upper']
const includeSecretOptions = [true, false]
// 首选使用环境变量指定的选项(若有)
const envUnit = process.env.UNI_PUSH_TIMESTAMP_UNIT || 'ms'
const envAlgo = (process.env.UNI_PUSH_SIGN_ALGO || 'md5').toLowerCase()
const envCase = (process.env.UNI_PUSH_SIGN_CASE || 'lower').toLowerCase()
const envInclude = (process.env.UNI_PUSH_INCLUDE_SECRET === 'true')
const candidates = []
// push env preferred first
candidates.push({ unit: envUnit, algo: envAlgo, signCase: envCase, includeSecret: envInclude })
for (const u of timestampUnits) for (const a of signAlgos) for (const c of signCases) for (const inc of includeSecretOptions) {
const key = `${u}|${a}|${c}|${inc}`
if (candidates.find(x => `${x.unit}|${x.algo}|${x.signCase}|${x.includeSecret}` === key)) continue
candidates.push({ unit: u, algo: a, signCase: c, includeSecret: inc })
}
for (const cand of candidates) {
const { unit, algo, signCase, includeSecret } = cand
const stamp = (unit === 's') ? String(Math.floor(Date.now() / 1000)) : String(Date.now())
const signAlgo = (algo || 'md5').toLowerCase()
let sign = null
try {
if (secret) {
if (signAlgo === 'md5') sign = crypto.createHash('md5').update(String(appkey || '') + stamp + String(secret)).digest('hex')
else if (signAlgo === 'hmac-sha256' || signAlgo === 'hmac256') sign = crypto.createHmac('sha256', String(secret)).update(String(appkey || '') + stamp).digest('hex')
else sign = crypto.createHash('md5').update(String(appkey || '') + stamp + String(secret)).digest('hex')
}
if (sign && (signCase === 'upper')) sign = sign.toUpperCase()
else if (sign) sign = sign.toLowerCase()
} catch (e) {
console.warn('生成 sign 失败,继续无 sign 请求', e)
}
const b = {}
if (appid) { b.appId = appid; b.appid = appid }
if (appkey) { b.appKey = appkey; b.appkey = appkey }
b.timestamp = stamp
if (sign) b.sign = sign
if (includeSecret && secret) { b.appSecret = secret; b.appsecret = secret }
const tryKey = JSON.stringify(b)
if (tried.has(tryKey)) continue
tried.add(tryKey)
// Mask debug copy
try {
const dbg = Object.assign({}, b)
if (dbg.appSecret) dbg.appSecret = '***masked***'
console.log('uni-push auth attempt:', { unit, algo: signAlgo, signCase, includeSecret })
console.log('uni-push auth body:', JSON.stringify(dbg))
console.log('uni-push auth url:', authUrl)
} catch (e) { /* ignore */ }
const resp = await fetch(authUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(b) })
let txtDbg = ''
try { txtDbg = await resp.clone().text().catch(() => '') } catch (e) { /* ignore */ }
if (txtDbg && txtDbg.length < 2000) console.log('uni-push auth response preview:', txtDbg)
else console.log('uni-push auth response status:', resp.status)
let j = null
try { j = await resp.json().catch(() => null) } catch (e) { j = null }
const token = (j && (j.token || (j.data && j.data.token) || j.access_token)) || null
const expires = (j && (j.expires_in || (j.data && j.data.expires_in))) || 0
if (token) {
global.__uniPushTokenCache[authUrl] = { token, expiresAt: expires ? (Date.now() + (expires * 1000)) : (Date.now() + 55 * 60 * 1000) }
console.log('uni-push auth succeeded with variant:', { unit, algo: signAlgo, signCase, includeSecret })
return token
}
// 若 provider 明确提示 timestamp/sign 错误,则继续尝试其他组合,否则可能无需重试
const code = j && (j.code || (j.error && j.error.code))
const msg = j && (j.msg || j.message || (j.error && j.error.message))
if (code && !String(code).match(/20001|10001/)) {
// 非 timestamp/sign 类型错误,不再重试
console.log('uni-push auth returned non-sign/timestamp error, stopping retries:', code, msg)
break
}
console.log('uni-push auth did not return token, response code/msg:', code, msg)
// 否则继续下一种变体
}
} catch (e) {
console.warn('uni-push auth failed', e)
}
return null
}
if (secret) return secret
return null
}
const token = await obtainToken()
// 支持 {appId} 占位符替换
let uniUrl = uniUrlRaw
if (appid && uniUrl.includes('{appId}')) uniUrl = uniUrl.replace(/\{appId\}/g, appid)
console.log('uni-push send: uniUrl=', uniUrl, 'token?', !!token, 'targetsLen=', targets.length)
const body = {
appid: appid || undefined,
cidList: targets,
message: {
title: notification && notification.title,
content: notification && notification.body,
payload: payload || {}
}
}
const headers = { 'Content-Type': 'application/json' }
if (token) headers.Authorization = `Bearer ${token}`
const resp = await fetch(uniUrl, { method: 'POST', headers, body: JSON.stringify(body) })
const text = await resp.text().catch(() => '')
try { if (text && text.length < 2000) console.log('uni-push send response preview:', text); else console.log('uni-push send response status:', resp.status) } catch (e) { /* ignore */ }
return { status: resp.status, body: text }
// Cloud-function-only:
// UNI-PUSH adapter has been disabled. Push delivery should be implemented inside the cloud function.
async function sendToUniPush() {
throw new Error('UNI-PUSH adapter disabled (cloud-function-only mode)')
}
async function getDevicesFromSupabase({ user_id, active } = {}) {
@@ -272,10 +143,11 @@ async function start() {
// 只在配置了 Supabase REST 时工作
if (!SUPA_URL || !SUPA_KEY) return []
try {
// 查询 status_code IS NULL (pending) 或 next_attempt_at <= now 的记录
// 查询
// - pending: status_code IS NULL
// - retrying: status_code = 'retrying' 且 (next_attempt_at IS NULL 或 next_attempt_at <= now)
const now = new Date().toISOString()
// 使用 OR 过滤status_code.is.null 或 next_attempt_at <= now
const q = `express_notifications?or=(status_code.is.null,next_attempt_at=lte.${encodeURIComponent(now)})&order=created_at.asc&limit=${limit}`
const q = `express_notifications?or=(status_code.is.null,and(status_code.eq.retrying,or(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})))&order=created_at.asc&limit=${limit}`
const resp = await supaFetch(q, { method: 'GET' })
if (!resp.ok) return []
const data = await resp.json().catch(() => [])
@@ -290,11 +162,13 @@ async function start() {
try {
const body = { status_code: 'processing', updated_at: new Date().toISOString() }
// 仅当当前仍为 pending 或 retrying 且到期时,才抢占
const path = `express_notifications?id=eq.${encodeURIComponent(id)}&or=(status_code.is.null,next_attempt_at=lte.${encodeURIComponent(new Date().toISOString())})`
const now = new Date().toISOString()
const path = `express_notifications?id=eq.${encodeURIComponent(id)}&or=(status_code.is.null,and(status_code.eq.retrying,or(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})))`
const resp = await supaFetch(path, { method: 'PATCH', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify(body) })
if (!resp.ok) return null
const j = await resp.json().catch(() => null)
if (Array.isArray(j) && j.length > 0) return j[0]
if (Array.isArray(j)) return j.length > 0 ? j[0] : null
if (!j || !j.id) return null
return j
} catch (e) {
console.warn('claimNotification error', e)
@@ -304,6 +178,7 @@ async function start() {
async function updateNotificationStatus(id, status, note) {
try {
if (!id) return
const body = { status_code: String(status), updated_at: new Date().toISOString() }
if (note) body.event_text_safe = String(note).substring(0, 2000)
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) })
@@ -358,7 +233,7 @@ async function start() {
return
}
// If CLOUD_FUNC_URL is configured, POST per cid to cloud function; otherwise fallback to sendToUniPush if configured
// Cloud-function-only: always POST per cid to CLOUD_FUNC_URL
let allOk = true
let lastNote = ''
if (CLOUD_FUNC_URL) {
@@ -366,14 +241,9 @@ async function start() {
for (const c of calls) {
if (!c.ok) { allOk = false; lastNote = (c.error || JSON.stringify(c.body)).toString().substring(0, 1000); break }
}
} else if (process.env.UNI_PUSH_URL) {
try {
const r = await sendToUniPush(targets, notification, payload)
if (!r || (r.status && r.status >= 400)) { allOk = false; lastNote = JSON.stringify(r && r.body).substring(0, 1000) }
} catch (e) { allOk = false; lastNote = String(e) }
} else {
allOk = false
lastNote = 'no CLOUD_FUNC_URL and no UNI_PUSH_URL configured'
lastNote = 'no CLOUD_FUNC_URL configured'
}
if (allOk) {
@@ -544,50 +414,11 @@ async function start() {
}
}
// 推送优先策略:
// 1) 若配置了 UNI_PUSH_URL优先使用本地适配器 sendToUniPush
// 2) 否则若配置了 PUSH_PROXY_URL则转发到 proxy
// 3) 否则进入 mock
const proxyUrl = process.env.PUSH_PROXY_URL || ''
const proxyToken = process.env.PUSH_PROXY_TOKEN || ''
console.log('send handler decision: UNI_PUSH_URL=', process.env.UNI_PUSH_URL, 'PUSH_PROXY_URL=', proxyUrl)
console.log('send handler targets count:', targets.length)
if (process.env.UNI_PUSH_URL) {
try {
const result = await sendToUniPush(targets, notification, payload)
return res.json({ ok: true, proxied: true, status: result.status, response: result.body })
} catch (e) {
console.warn('本地 uni-push 适配器推送失败:', e)
return res.status(500).json({ ok: false, error: String(e) })
}
} else if (proxyUrl) {
try {
const resp = await fetch(proxyUrl, {
method: 'POST',
headers: Object.assign({ 'Content-Type': 'application/json' }, proxyToken ? { Authorization: `Bearer ${proxyToken}` } : {}),
body: JSON.stringify({ targets, notification, payload })
})
const data = await resp.text()
return res.json({ ok: true, proxied: true, status: resp.status, response: data })
} catch (e) {
console.warn('代理推送失败:', e)
return res.status(500).json({ ok: false, error: String(e) })
}
}
// 否则仅记录并返回模拟结果
console.log('Mock push to', targets.length, 'clients (no UNI_PUSH_URL and no PUSH_PROXY_URL matched)')
try {
console.log('notification:', notification ? JSON.stringify(notification, null, 2) : null)
} catch (e) {
console.log('notification:', notification)
}
try {
console.log('payload:', payload ? JSON.stringify(payload, null, 2) : '{}')
} catch (e) {
console.log('payload:', payload)
}
return res.json({ ok: true, mocked: true, sent: targets.length, payload: payload || {} })
if (!CLOUD_FUNC_URL) return res.status(500).json({ ok: false, error: 'CLOUD_FUNC_URL not configured' })
console.log('send handler (cloud function only) targets count:', targets.length)
const calls = await Promise.all((targets || []).map(cid => invokeCloudFuncForCid(CLOUD_FUNC_URL, PUSH_TOKEN, cid, notification && notification.title, notification && (notification.body || notification.content), payload)))
const allOk = calls.every(c => c && c.ok)
return res.status(allOk ? 200 : 502).json({ ok: allOk, sent: targets.length, results: calls })
})
// 创建通知并基于数据库记录推送(写入 express_notifications 并发送)
@@ -635,53 +466,10 @@ async function start() {
console.warn('fetch devices for notification failed:', e)
}
// 进行推送(与 /push/send 相同逻辑proxy 或 mock
const proxyUrl = process.env.PUSH_PROXY_URL || ''
const proxyToken = process.env.PUSH_PROXY_TOKEN || ''
if (proxyUrl) {
try {
// 若配置了 UNI_PUSH_URL优先使用本地 adapter否则直接转发到 proxyUrl
if (process.env.UNI_PUSH_URL) {
try {
const result = await sendToUniPush(targets, notification, payload)
// 更新通知记录状态
if (record && record.id) {
try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: String(result.status), updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ }
}
return res.json({ ok: true, proxied: true, status: result.status, response: result.body })
} catch (e) {
console.warn('本地 uni-push 适配器推送失败:', e)
return res.status(500).json({ ok: false, error: String(e) })
}
}
const resp = await fetch(proxyUrl, {
method: 'POST',
headers: Object.assign({ 'Content-Type': 'application/json' }, proxyToken ? { Authorization: `Bearer ${proxyToken}` } : {}),
body: JSON.stringify({ targets, notification, payload })
})
const data = await resp.text()
// 更新通知记录状态
if (record && record.id) {
try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: String(resp.status), updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ }
}
return res.json({ ok: true, proxied: true, status: resp.status, response: data })
} catch (e) {
console.warn('代理推送失败:', e)
return res.status(500).json({ ok: false, error: String(e) })
}
}
// mock 模式
console.log('Notification mock push to', targets.length, 'clients for', aud, recipient_id)
console.log('notification:', notification)
console.log('payload:', payload)
if (record && record.id) {
try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: 'mocked', updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ }
}
return res.json({ ok: true, mocked: true, sent: targets.length, notification_id: record && record.id })
// Cloud-function-only:
// - This endpoint writes to express_notifications.
// - Actual push delivery is performed by the consumer that polls express_notifications.
return res.json({ ok: true, queued: true, notification_id: record && record.id, targets: targets.length })
} catch (e) {
console.warn('create notification failed:', e)
return res.status(500).json({ ok: false, error: String(e) })
@@ -690,10 +478,8 @@ async function start() {
app.listen(PORT, '0.0.0.0', () => {
console.log(`Push server listening on http://0.0.0.0:${PORT}`)
console.log('ENV: UNI_PUSH_URL=', process.env.UNI_PUSH_URL)
console.log('ENV: UNI_PUSH_AUTH_URL=', process.env.UNI_PUSH_AUTH_URL)
console.log('ENV: UNI_PUSH_APPID=', process.env.UNI_PUSH_APPID)
console.log('ENV: PUSH_PROXY_URL=', process.env.PUSH_PROXY_URL)
console.log('ENV: CLOUD_FUNC_URL configured?', !!process.env.CLOUD_FUNC_URL)
console.log('ENV: ENABLE_CONSUMER=', process.env.ENABLE_CONSUMER)
// // 可选:在启动时自动运行部署脚本以打包并上传云函数
// try {
// const AUTO_DEPLOY = (process.env.AUTO_DEPLOY_ON_START === 'true' || process.env.AUTO_DEPLOY_ON_START === '1')