云服务推送
This commit is contained in:
@@ -5,6 +5,7 @@ const fs = require('fs').promises
|
||||
const path = require('path')
|
||||
const fetch = require('node-fetch')
|
||||
const crypto = require('crypto')
|
||||
const { spawn } = require('child_process')
|
||||
|
||||
const PORT = process.env.PORT || 7301
|
||||
const DATA_DIR = path.join(__dirname, 'data')
|
||||
@@ -14,6 +15,17 @@ const SUPA_URL = process.env.SUPA_URL || ''
|
||||
const SUPA_KEY = process.env.SUPA_KEY || process.env.SERVICE_ROLE_KEY || ''
|
||||
const SUPA_SCHEMA = process.env.SUPA_SCHEMA || 'public'
|
||||
|
||||
// Consumer 配置
|
||||
const CONSUMER_ENABLED = (process.env.ENABLE_CONSUMER === 'true') || (process.env.CONSUMER_ENABLED === 'true')
|
||||
const CONSUMER_POLL_MS = parseInt(process.env.CONSUMER_POLL_MS || process.env.CONSUMER_POLL_INTERVAL_MS || '2000', 10)
|
||||
const CLOUD_FUNC_URL = process.env.CLOUD_FUNC_URL || ''
|
||||
const PUSH_TOKEN = process.env.PUSH_TOKEN || ''
|
||||
// Retry/backoff config
|
||||
const MAX_RETRIES = parseInt(process.env.MAX_RETRIES || '5', 10)
|
||||
const RETRY_INITIAL_MS = parseInt(process.env.RETRY_INITIAL_MS || '5000', 10)
|
||||
const RETRY_FACTOR = parseInt(process.env.RETRY_FACTOR || '2', 10)
|
||||
const RETRY_MAX_MS = parseInt(process.env.RETRY_MAX_MS || '3600000', 10) // 1 hour
|
||||
|
||||
async function ensureDataDir() {
|
||||
try {
|
||||
await fs.mkdir(DATA_DIR, { recursive: true })
|
||||
@@ -238,8 +250,174 @@ async function start() {
|
||||
app.use(cors())
|
||||
app.use(bodyParser.json())
|
||||
|
||||
// 部署路由与简单鉴权(若设置了 DEPLOY_BEARER 或 DEPLOY_TOKEN 环境变量)
|
||||
try {
|
||||
const deployRouter = require('./routes/deploy')
|
||||
const DEPLOY_BEARER = process.env.DEPLOY_BEARER || process.env.DEPLOY_TOKEN || process.env.ADMIN_DEPLOY_TOKEN || ''
|
||||
app.use('/api/v1', (req, res, next) => {
|
||||
if (req.path === '/deploy-cloudfunc' && DEPLOY_BEARER) {
|
||||
const auth = req.headers.authorization || ''
|
||||
if (auth !== `Bearer ${DEPLOY_BEARER}`) return res.status(401).json({ ok: false, error: 'unauthorized' })
|
||||
}
|
||||
next()
|
||||
}, deployRouter)
|
||||
} catch (e) {
|
||||
console.warn('deploy route not mounted (missing file?):', e && e.message)
|
||||
}
|
||||
|
||||
app.get('/health', (req, res) => res.json({ ok: true }))
|
||||
|
||||
// ---- Consumer: 从 express_notifications 拉取 pending 记录并调用云函数 ----
|
||||
async function fetchPendingNotifications(limit = 5) {
|
||||
// 只在配置了 Supabase REST 时工作
|
||||
if (!SUPA_URL || !SUPA_KEY) return []
|
||||
try {
|
||||
// 查询 status_code IS NULL (pending) 或 next_attempt_at <= now 的记录
|
||||
const now = new Date().toISOString()
|
||||
// 使用 OR 过滤:status_code.is.null 或 next_attempt_at <= now
|
||||
const q = `express_notifications?or=(status_code.is.null,next_attempt_at=lte.${encodeURIComponent(now)})&order=created_at.asc&limit=${limit}`
|
||||
const resp = await supaFetch(q, { method: 'GET' })
|
||||
if (!resp.ok) return []
|
||||
const data = await resp.json().catch(() => [])
|
||||
return Array.isArray(data) ? data : []
|
||||
} catch (e) {
|
||||
console.warn('fetchPendingNotifications error', e)
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
async function claimNotification(id) {
|
||||
try {
|
||||
const body = { status_code: 'processing', updated_at: new Date().toISOString() }
|
||||
// 仅当当前仍为 pending 或 retrying 且到期时,才抢占
|
||||
const path = `express_notifications?id=eq.${encodeURIComponent(id)}&or=(status_code.is.null,next_attempt_at=lte.${encodeURIComponent(new Date().toISOString())})`
|
||||
const resp = await supaFetch(path, { method: 'PATCH', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify(body) })
|
||||
if (!resp.ok) return null
|
||||
const j = await resp.json().catch(() => null)
|
||||
if (Array.isArray(j) && j.length > 0) return j[0]
|
||||
return j
|
||||
} catch (e) {
|
||||
console.warn('claimNotification error', e)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
async function updateNotificationStatus(id, status, note) {
|
||||
try {
|
||||
const body = { status_code: String(status), updated_at: new Date().toISOString() }
|
||||
if (note) body.event_text_safe = String(note).substring(0, 2000)
|
||||
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) })
|
||||
} catch (e) { console.warn('updateNotificationStatus error', e) }
|
||||
}
|
||||
|
||||
async function invokeCloudFuncForCid(funcUrl, token, pushCid, title, content, payload) {
|
||||
try {
|
||||
const body = { token: token || PUSH_TOKEN || null, push_clientid: pushCid, title: title || '', content: content || '', payload: payload || {} }
|
||||
const resp = await fetch(funcUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) })
|
||||
const txt = await resp.text().catch(() => '')
|
||||
let json
|
||||
try { json = JSON.parse(txt) } catch (e) { json = { statusText: txt } }
|
||||
return { ok: resp.ok, status: resp.status, body: json }
|
||||
} catch (e) {
|
||||
return { ok: false, error: String(e) }
|
||||
}
|
||||
}
|
||||
|
||||
async function processNotificationRecord(rec) {
|
||||
if (!rec || !rec.id) return
|
||||
// claim the record (set processing) to avoid races
|
||||
const claimed = await claimNotification(rec.id)
|
||||
if (!claimed) return // someone else claimed or failed
|
||||
|
||||
const aud = claimed.aud
|
||||
const recipient_id = claimed.recipient_id
|
||||
const notification = claimed.payload && claimed.payload.notification ? claimed.payload.notification : (claimed.event_text_safe ? { title: claimed.event_text_safe, body: '' } : {})
|
||||
const payload = claimed.payload || {}
|
||||
|
||||
// build targets similar to /api/v1/notifications
|
||||
let targets = []
|
||||
try {
|
||||
if (SUPA_URL && SUPA_KEY) {
|
||||
if (aud === 'user') {
|
||||
const devices = await getDevicesFromSupabase({ user_id: recipient_id, active: true })
|
||||
targets = devices.map(d => d.cid)
|
||||
} else if (aud === 'merchant') {
|
||||
const resp = await supaFetch(`push_devices?merchant_id=eq.${encodeURIComponent(recipient_id)}&is_active=eq.true`, { method: 'GET' })
|
||||
if (resp.ok) {
|
||||
const devs = await resp.json().catch(() => [])
|
||||
targets = devs.map(d => d.cid)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.warn('processNotificationRecord fetch devices failed', e)
|
||||
}
|
||||
|
||||
if (!targets || targets.length === 0) {
|
||||
await updateNotificationStatus(claimed.id, 'no-targets', 'no active devices')
|
||||
return
|
||||
}
|
||||
|
||||
// If CLOUD_FUNC_URL is configured, POST per cid to cloud function; otherwise fallback to sendToUniPush if configured
|
||||
let allOk = true
|
||||
let lastNote = ''
|
||||
if (CLOUD_FUNC_URL) {
|
||||
const calls = await Promise.all(targets.map(cid => invokeCloudFuncForCid(CLOUD_FUNC_URL, PUSH_TOKEN, cid, notification && notification.title, notification && notification.body, payload)))
|
||||
for (const c of calls) {
|
||||
if (!c.ok) { allOk = false; lastNote = (c.error || JSON.stringify(c.body)).toString().substring(0, 1000); break }
|
||||
}
|
||||
} else if (process.env.UNI_PUSH_URL) {
|
||||
try {
|
||||
const r = await sendToUniPush(targets, notification, payload)
|
||||
if (!r || (r.status && r.status >= 400)) { allOk = false; lastNote = JSON.stringify(r && r.body).substring(0, 1000) }
|
||||
} catch (e) { allOk = false; lastNote = String(e) }
|
||||
} else {
|
||||
allOk = false
|
||||
lastNote = 'no CLOUD_FUNC_URL and no UNI_PUSH_URL configured'
|
||||
}
|
||||
|
||||
if (allOk) {
|
||||
await updateNotificationStatus(claimed.id, 'success', null)
|
||||
} else {
|
||||
// failure -> increment retry_count, set last_error and next_attempt_at or mark failed
|
||||
const currentRetry = parseInt(claimed.retry_count || 0, 10)
|
||||
const nextRetry = currentRetry + 1
|
||||
if (nextRetry >= MAX_RETRIES) {
|
||||
// mark failed
|
||||
try {
|
||||
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: 'failed', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), updated_at: new Date().toISOString() }) })
|
||||
} catch (e) { console.warn('mark failed error', e) }
|
||||
} else {
|
||||
// compute exponential backoff
|
||||
let delay = RETRY_INITIAL_MS * Math.pow(RETRY_FACTOR, currentRetry)
|
||||
if (delay > RETRY_MAX_MS) delay = RETRY_MAX_MS
|
||||
const nextAt = new Date(Date.now() + delay).toISOString()
|
||||
try {
|
||||
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: 'retrying', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), next_attempt_at: nextAt, updated_at: new Date().toISOString() }) })
|
||||
} catch (e) { console.warn('schedule retry error', e) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function consumerOnce() {
|
||||
try {
|
||||
const pending = await fetchPendingNotifications(5)
|
||||
if (!pending || pending.length === 0) return
|
||||
for (const rec of pending) {
|
||||
try { await processNotificationRecord(rec) } catch (e) { console.warn('consumer process error', e) }
|
||||
}
|
||||
} catch (e) {
|
||||
console.warn('consumerOnce error', e)
|
||||
}
|
||||
}
|
||||
|
||||
if (CONSUMER_ENABLED) {
|
||||
if (!SUPA_URL || !SUPA_KEY) console.warn('Consumer enabled but SUPA_URL/SUPA_KEY not configured; consumer will not run against Supabase.')
|
||||
else console.log('Notification consumer enabled, poll interval(ms)=', CONSUMER_POLL_MS)
|
||||
setInterval(consumerOnce, CONSUMER_POLL_MS)
|
||||
}
|
||||
// ---- end consumer ----
|
||||
|
||||
// 注册或更新设备
|
||||
app.post('/api/v1/push/register', async (req, res) => {
|
||||
const { cid, user_id, platform } = req.body || {}
|
||||
@@ -516,6 +694,28 @@ async function start() {
|
||||
console.log('ENV: UNI_PUSH_AUTH_URL=', process.env.UNI_PUSH_AUTH_URL)
|
||||
console.log('ENV: UNI_PUSH_APPID=', process.env.UNI_PUSH_APPID)
|
||||
console.log('ENV: PUSH_PROXY_URL=', process.env.PUSH_PROXY_URL)
|
||||
// // 可选:在启动时自动运行部署脚本以打包并上传云函数
|
||||
// try {
|
||||
// const AUTO_DEPLOY = (process.env.AUTO_DEPLOY_ON_START === 'true' || process.env.AUTO_DEPLOY_ON_START === '1')
|
||||
// const DEPLOY_BEARER = process.env.DEPLOY_BEARER || process.env.DEPLOY_TOKEN || process.env.ADMIN_DEPLOY_TOKEN || ''
|
||||
// if (AUTO_DEPLOY) {
|
||||
// if (!DEPLOY_BEARER) {
|
||||
// console.warn('AUTO_DEPLOY_ON_START enabled but no DEPLOY_BEARER found; skipping auto deploy for safety.')
|
||||
// } else {
|
||||
// const argsEnv = process.env.AUTO_DEPLOY_ARGS ? process.env.AUTO_DEPLOY_ARGS.split(' ') : []
|
||||
// const deployScript = path.join(__dirname, 'tools', 'deploy-cloudfunc.js')
|
||||
// const nodeArgs = [deployScript].concat(argsEnv)
|
||||
// console.log('Auto-deploy: spawning node', nodeArgs.join(' '))
|
||||
// const child = spawn(process.execPath, nodeArgs, { cwd: __dirname, env: Object.assign({}, process.env), stdio: ['ignore', 'pipe', 'pipe'] })
|
||||
// child.stdout.on('data', d => console.log('[auto-deploy stdout]', d.toString().trim()))
|
||||
// child.stderr.on('data', d => console.error('[auto-deploy stderr]', d.toString().trim()))
|
||||
// child.on('exit', (code, signal) => console.log(`Auto-deploy process exited with code=${code} signal=${signal}`))
|
||||
// child.on('error', e => console.error('Auto-deploy spawn error', e))
|
||||
// }
|
||||
// }
|
||||
// } catch (e) {
|
||||
// console.warn('auto-deploy runtime error', e)
|
||||
// }
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user