const express = require('express') const bodyParser = require('body-parser') const cors = require('cors') const fs = require('fs').promises const path = require('path') const fetch = require('node-fetch') const crypto = require('crypto') const PORT = process.env.PORT || 7301 const DATA_DIR = path.join(__dirname, 'data') const DEVICES_FILE = path.join(DATA_DIR, 'push_devices.json') const SUPA_URL = process.env.SUPA_URL || '' // 支持两种环境变量名:SUPA_KEY(旧)和 SERVICE_ROLE_KEY(常用于自托管 .env) const SUPA_KEY = process.env.SUPA_KEY || process.env.SERVICE_ROLE_KEY || '' const SUPA_SCHEMA = process.env.SUPA_SCHEMA || 'public' async function ensureDataDir() { try { await fs.mkdir(DATA_DIR, { recursive: true }) try { await fs.access(DEVICES_FILE) } catch (e) { await fs.writeFile(DEVICES_FILE, '[]', 'utf8') } } catch (e) { console.error('无法创建数据目录:', e) process.exit(1) } } async function readDevices() { try { const txt = await fs.readFile(DEVICES_FILE, 'utf8') return JSON.parse(txt || '[]') } catch (e) { return [] } } async function writeDevices(devices) { await fs.writeFile(DEVICES_FILE, JSON.stringify(devices, null, 2), 'utf8') } async function supaFetch(path, opts = {}) { const url = `${SUPA_URL.replace(/\/$/, '')}/rest/v1/${path}` // 默认只发送 apikey。只有在显式要求 Bearer(环境变量 SUPA_USE_BEARER=true) // 或者看到 SUPA_KEY 看起来像 JWT(包含两处 ".")时,才添加 Authorization 头。 const headers = Object.assign({}, opts.headers || {}, { apikey: SUPA_KEY, Accept: 'application/json' }) const sendBearer = (process.env.SUPA_USE_BEARER === 'true') || (SUPA_KEY && (SUPA_KEY.split('.').length === 3)) if (sendBearer) headers.Authorization = `Bearer ${SUPA_KEY}` try { console.log('supaFetch ->', url) const resp = await fetch(url, Object.assign({}, opts, { headers })) if (!resp.ok) { let txt = '' try { txt = await resp.text() } catch (e) { txt = `` } console.warn('supaFetch failed', resp.status, url, txt) } else { // optional: peek at small JSON responses for debugging try { const clone = resp.clone() const maybeText = await clone.text() if (maybeText && maybeText.length < 2000) console.log('supaFetch response preview:', maybeText) } catch (e) { // ignore preview errors } } return resp } catch (e) { console.warn('supaFetch exception', e, url) throw e } } // 适配并发送到 dCloud uni-push(若设置了 UNI_PUSH_URL) async function sendToUniPush(targets, notification, payload) { const uniUrlRaw = process.env.UNI_PUSH_URL || '' if (!uniUrlRaw) throw new Error('UNI_PUSH_URL not configured') const appid = process.env.UNI_PUSH_APPID || '5fSIfMap289ymbdnkfMJ29' const appkey = process.env.UNI_PUSH_APPKEY || '33z3cfHI6Z9TpzyQXRWu01' const secret = process.env.UNI_PUSH_SECRET || process.env.PUSH_PROXY_TOKEN || 'PkVpjStqwW9BourzryQHc7' const authUrl = process.env.UNI_PUSH_AUTH_URL || 'https://restapi.getui.com/v2/5fSIfMap289ymbdnkfMJ29/auth' // 全局缓存 token if (!global.__uniPushTokenCache) global.__uniPushTokenCache = {} async function obtainToken() { if (authUrl) { const cache = global.__uniPushTokenCache[authUrl] if (cache && cache.expiresAt && Date.now() < cache.expiresAt - 5000) return cache.token try { // 为了解决 provider 对 timestamp/sign 规则不同的情况,先尝试当前 env 配置; // 若返回 timestamp/sign 错误则自动按常见组合重试(秒/毫秒、md5/hmac、大小写、是否包含 secret) const tried = new Set() const timestampUnits = ['ms', 's'] const signAlgos = ['md5', 'hmac-sha256'] const signCases = ['lower', 'upper'] const includeSecretOptions = [true, false] // 首选使用环境变量指定的选项(若有) const envUnit = process.env.UNI_PUSH_TIMESTAMP_UNIT || 'ms' const envAlgo = (process.env.UNI_PUSH_SIGN_ALGO || 'md5').toLowerCase() const envCase = (process.env.UNI_PUSH_SIGN_CASE || 'lower').toLowerCase() const envInclude = (process.env.UNI_PUSH_INCLUDE_SECRET === 'true') const candidates = [] // push env preferred first candidates.push({ unit: envUnit, algo: envAlgo, signCase: envCase, includeSecret: envInclude }) for (const u of timestampUnits) for (const a of signAlgos) for (const c of signCases) for (const inc of includeSecretOptions) { const key = `${u}|${a}|${c}|${inc}` if (candidates.find(x => `${x.unit}|${x.algo}|${x.signCase}|${x.includeSecret}` === key)) continue candidates.push({ unit: u, algo: a, signCase: c, includeSecret: inc }) } for (const cand of candidates) { const { unit, algo, signCase, includeSecret } = cand const stamp = (unit === 's') ? String(Math.floor(Date.now() / 1000)) : String(Date.now()) const signAlgo = (algo || 'md5').toLowerCase() let sign = null try { if (secret) { if (signAlgo === 'md5') sign = crypto.createHash('md5').update(String(appkey || '') + stamp + String(secret)).digest('hex') else if (signAlgo === 'hmac-sha256' || signAlgo === 'hmac256') sign = crypto.createHmac('sha256', String(secret)).update(String(appkey || '') + stamp).digest('hex') else sign = crypto.createHash('md5').update(String(appkey || '') + stamp + String(secret)).digest('hex') } if (sign && (signCase === 'upper')) sign = sign.toUpperCase() else if (sign) sign = sign.toLowerCase() } catch (e) { console.warn('生成 sign 失败,继续无 sign 请求', e) } const b = {} if (appid) { b.appId = appid; b.appid = appid } if (appkey) { b.appKey = appkey; b.appkey = appkey } b.timestamp = stamp if (sign) b.sign = sign if (includeSecret && secret) { b.appSecret = secret; b.appsecret = secret } const tryKey = JSON.stringify(b) if (tried.has(tryKey)) continue tried.add(tryKey) // Mask debug copy try { const dbg = Object.assign({}, b) if (dbg.appSecret) dbg.appSecret = '***masked***' console.log('uni-push auth attempt:', { unit, algo: signAlgo, signCase, includeSecret }) console.log('uni-push auth body:', JSON.stringify(dbg)) console.log('uni-push auth url:', authUrl) } catch (e) { /* ignore */ } const resp = await fetch(authUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(b) }) let txtDbg = '' try { txtDbg = await resp.clone().text().catch(() => '') } catch (e) { /* ignore */ } if (txtDbg && txtDbg.length < 2000) console.log('uni-push auth response preview:', txtDbg) else console.log('uni-push auth response status:', resp.status) let j = null try { j = await resp.json().catch(() => null) } catch (e) { j = null } const token = (j && (j.token || (j.data && j.data.token) || j.access_token)) || null const expires = (j && (j.expires_in || (j.data && j.data.expires_in))) || 0 if (token) { global.__uniPushTokenCache[authUrl] = { token, expiresAt: expires ? (Date.now() + (expires * 1000)) : (Date.now() + 55 * 60 * 1000) } console.log('uni-push auth succeeded with variant:', { unit, algo: signAlgo, signCase, includeSecret }) return token } // 若 provider 明确提示 timestamp/sign 错误,则继续尝试其他组合,否则可能无需重试 const code = j && (j.code || (j.error && j.error.code)) const msg = j && (j.msg || j.message || (j.error && j.error.message)) if (code && !String(code).match(/20001|10001/)) { // 非 timestamp/sign 类型错误,不再重试 console.log('uni-push auth returned non-sign/timestamp error, stopping retries:', code, msg) break } console.log('uni-push auth did not return token, response code/msg:', code, msg) // 否则继续下一种变体 } } catch (e) { console.warn('uni-push auth failed', e) } return null } if (secret) return secret return null } const token = await obtainToken() // 支持 {appId} 占位符替换 let uniUrl = uniUrlRaw if (appid && uniUrl.includes('{appId}')) uniUrl = uniUrl.replace(/\{appId\}/g, appid) console.log('uni-push send: uniUrl=', uniUrl, 'token?', !!token, 'targetsLen=', targets.length) const body = { appid: appid || undefined, cidList: targets, message: { title: notification && notification.title, content: notification && notification.body, payload: payload || {} } } const headers = { 'Content-Type': 'application/json' } if (token) headers.Authorization = `Bearer ${token}` const resp = await fetch(uniUrl, { method: 'POST', headers, body: JSON.stringify(body) }) const text = await resp.text().catch(() => '') try { if (text && text.length < 2000) console.log('uni-push send response preview:', text); else console.log('uni-push send response status:', resp.status) } catch (e) { /* ignore */ } return { status: resp.status, body: text } } async function getDevicesFromSupabase({ user_id, active } = {}) { let q = 'push_devices' const params = [] if (user_id) params.push(`user_id=eq.${encodeURIComponent(user_id)}`) if (active != null) params.push(`is_active=eq.${encodeURIComponent(active ? 'true' : 'false')}`) const suffix = params.length ? `?${params.join('&')}` : '' const resp = await supaFetch(`${q}${suffix}`, { method: 'GET' }) if (!resp.ok) { const txt = await resp.text() throw new Error(`supabase get devices failed ${resp.status} ${txt}`) } const data = await resp.json() return data } async function start() { await ensureDataDir() const app = express() app.use(cors()) app.use(bodyParser.json()) app.get('/health', (req, res) => res.json({ ok: true })) // 注册或更新设备 app.post('/api/v1/push/register', async (req, res) => { const { cid, user_id, platform } = req.body || {} if (!cid) return res.status(400).json({ error: 'cid required' }) const devices = await readDevices() let found = devices.find(d => d.cid === cid) const now = new Date().toISOString() if (found) { found.user_id = user_id ?? found.user_id found.platform = platform ?? found.platform found.updated_at = now found.active = true } else { found = { cid, user_id: user_id ?? null, platform: platform ?? null, created_at: now, updated_at: now, active: true } devices.push(found) } await writeDevices(devices) // 如果配置了 Supabase,则尝试写入 supabase 表(异步记录错误) if (SUPA_URL && SUPA_KEY) { (async () => { try { const body = { cid: found.cid, user_id: found.user_id, platform: found.platform, appid: found.appid || 'default', is_active: true, last_seen_at: found.updated_at } const resp = await supaFetch(`push_devices?on_conflict=cid`, { method: 'POST', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify(body) }) if (!resp.ok) { const txt = await resp.text() console.warn('Supabase upsert push_device failed:', resp.status, txt) } } catch (e) { console.warn('Supabase upsert exception:', e) } })() } return res.json({ ok: true, cid }) }) // 注销设备(可选移除或置为 inactive) app.post('/api/v1/push/unregister', async (req, res) => { const { cid, user_id } = req.body || {} if (!cid && !user_id) return res.status(400).json({ error: 'cid or user_id required' }) let devices = await readDevices() if (cid) { devices = devices.map(d => d.cid === cid ? Object.assign({}, d, { active: false, updated_at: new Date().toISOString() }) : d) } else if (user_id) { devices = devices.map(d => d.user_id === user_id ? Object.assign({}, d, { active: false, updated_at: new Date().toISOString() }) : d) } await writeDevices(devices) // 同步到 Supabase(如果可用) if (SUPA_URL && SUPA_KEY) { (async () => { try { const now = new Date().toISOString() if (cid) { await supaFetch(`push_devices?cid=eq.${encodeURIComponent(cid)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify({ is_active: false, updated_at: now }) }) } else if (user_id) { await supaFetch(`push_devices?user_id=eq.${encodeURIComponent(user_id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify({ is_active: false, updated_at: now }) }) } } catch (e) { console.warn('Supabase unregister exception:', e) } })() } return res.json({ ok: true }) }) // 列出设备 app.get('/api/v1/push/devices', async (req, res) => { const { user_id, active } = req.query try { if (SUPA_URL && SUPA_KEY) { const devices = await getDevicesFromSupabase({ user_id, active: active == null ? undefined : (active === 'true') }) return res.json({ ok: true, total: devices.length, data: devices }) } let devices = await readDevices() if (user_id) devices = devices.filter(d => String(d.user_id) === String(user_id)) if (active != null) devices = devices.filter(d => String(!!d.active) === String(active === 'true')) res.json({ ok: true, total: devices.length, data: devices }) } catch (e) { console.warn('list devices failed:', e) res.status(500).json({ ok: false, error: String(e) }) } }) // 发送推送(mock 或代理到真实 provider,当环境变量 PUSH_PROXY_URL 设置时会代理请求) app.post('/api/v1/push/send', async (req, res) => { const { cids, user_id, notification, payload } = req.body || {} if ((!cids || cids.length === 0) && !user_id) return res.status(400).json({ error: 'cids or user_id required' }) let targets = [] if (cids && cids.length > 0) targets = cids else if (user_id) { if (SUPA_URL && SUPA_KEY) { try { const devices = await getDevicesFromSupabase({ user_id, active: true }) targets = devices.map(d => d.cid) } catch (e) { console.warn('Supabase query devices failed, falling back to local:', e) const devices = await readDevices() targets = devices.filter(d => String(d.user_id) === String(user_id) && d.active).map(d => d.cid) } } else { const devices = await readDevices() targets = devices.filter(d => String(d.user_id) === String(user_id) && d.active).map(d => d.cid) } } // 推送优先策略: // 1) 若配置了 UNI_PUSH_URL,优先使用本地适配器 sendToUniPush // 2) 否则若配置了 PUSH_PROXY_URL,则转发到 proxy // 3) 否则进入 mock const proxyUrl = process.env.PUSH_PROXY_URL || '' const proxyToken = process.env.PUSH_PROXY_TOKEN || '' console.log('send handler decision: UNI_PUSH_URL=', process.env.UNI_PUSH_URL, 'PUSH_PROXY_URL=', proxyUrl) console.log('send handler targets count:', targets.length) if (process.env.UNI_PUSH_URL) { try { const result = await sendToUniPush(targets, notification, payload) return res.json({ ok: true, proxied: true, status: result.status, response: result.body }) } catch (e) { console.warn('本地 uni-push 适配器推送失败:', e) return res.status(500).json({ ok: false, error: String(e) }) } } else if (proxyUrl) { try { const resp = await fetch(proxyUrl, { method: 'POST', headers: Object.assign({ 'Content-Type': 'application/json' }, proxyToken ? { Authorization: `Bearer ${proxyToken}` } : {}), body: JSON.stringify({ targets, notification, payload }) }) const data = await resp.text() return res.json({ ok: true, proxied: true, status: resp.status, response: data }) } catch (e) { console.warn('代理推送失败:', e) return res.status(500).json({ ok: false, error: String(e) }) } } // 否则仅记录并返回模拟结果 console.log('Mock push to', targets.length, 'clients (no UNI_PUSH_URL and no PUSH_PROXY_URL matched)') try { console.log('notification:', notification ? JSON.stringify(notification, null, 2) : null) } catch (e) { console.log('notification:', notification) } try { console.log('payload:', payload ? JSON.stringify(payload, null, 2) : '{}') } catch (e) { console.log('payload:', payload) } return res.json({ ok: true, mocked: true, sent: targets.length, payload: payload || {} }) }) // 创建通知并基于数据库记录推送(写入 express_notifications 并发送) app.post('/api/v1/notifications', async (req, res) => { const { aud, recipient_id, notification, payload, dedupe_key, message_id, order_id, waybill_id } = req.body || {} if (!aud || !recipient_id) return res.status(400).json({ error: 'aud and recipient_id required' }) try { // 写入 express_notifications 表并返回记录(使用 return=representation) const body = Object.assign({}, { aud, recipient_id, event_text_safe: (notification && notification.title) || null, payload: payload || null, dedupe_key: dedupe_key || null, message_id: message_id || null, order_id: order_id || null, waybill_id: waybill_id || null, event_time: new Date().toISOString() }) const insertResp = await supaFetch(`express_notifications?on_conflict=message_id`, { method: 'POST', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify(body) }) if (!insertResp.ok) { const txt = await insertResp.text() return res.status(500).json({ ok: false, error: `insert notification failed ${insertResp.status} ${txt}` }) } const inserted = await insertResp.json() const record = Array.isArray(inserted) ? inserted[0] : inserted // 根据 aud 拉取目标设备 let targets = [] try { if (SUPA_URL && SUPA_KEY) { if (aud === 'user') { const devices = await getDevicesFromSupabase({ user_id: recipient_id, active: true }) targets = devices.map(d => d.cid) } else if (aud === 'merchant') { // 使用 merchant_id 字段查询 const resp = await supaFetch(`push_devices?merchant_id=eq.${encodeURIComponent(recipient_id)}&is_active=eq.true`, { method: 'GET' }) if (resp.ok) { const devices = await resp.json() targets = devices.map(d => d.cid) } } } else { // 本地文件回退 const devices = await readDevices() if (aud === 'user') targets = devices.filter(d => String(d.user_id) === String(recipient_id) && d.active).map(d => d.cid) else if (aud === 'merchant') targets = devices.filter(d => String(d.merchant_id) === String(recipient_id) && d.active).map(d => d.cid) } } catch (e) { console.warn('fetch devices for notification failed:', e) } // 进行推送(与 /push/send 相同逻辑:proxy 或 mock) const proxyUrl = process.env.PUSH_PROXY_URL || '' const proxyToken = process.env.PUSH_PROXY_TOKEN || '' if (proxyUrl) { try { // 若配置了 UNI_PUSH_URL,优先使用本地 adapter;否则直接转发到 proxyUrl if (process.env.UNI_PUSH_URL) { try { const result = await sendToUniPush(targets, notification, payload) // 更新通知记录状态 if (record && record.id) { try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: String(result.status), updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ } } return res.json({ ok: true, proxied: true, status: result.status, response: result.body }) } catch (e) { console.warn('本地 uni-push 适配器推送失败:', e) return res.status(500).json({ ok: false, error: String(e) }) } } const resp = await fetch(proxyUrl, { method: 'POST', headers: Object.assign({ 'Content-Type': 'application/json' }, proxyToken ? { Authorization: `Bearer ${proxyToken}` } : {}), body: JSON.stringify({ targets, notification, payload }) }) const data = await resp.text() // 更新通知记录状态 if (record && record.id) { try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: String(resp.status), updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ } } return res.json({ ok: true, proxied: true, status: resp.status, response: data }) } catch (e) { console.warn('代理推送失败:', e) return res.status(500).json({ ok: false, error: String(e) }) } } // mock 模式 console.log('Notification mock push to', targets.length, 'clients for', aud, recipient_id) console.log('notification:', notification) console.log('payload:', payload) if (record && record.id) { try { await supaFetch(`express_notifications?id=eq.${encodeURIComponent(record.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ status_code: 'mocked', updated_at: new Date().toISOString() }) }) } catch (e) { /* ignore */ } } return res.json({ ok: true, mocked: true, sent: targets.length, notification_id: record && record.id }) } catch (e) { console.warn('create notification failed:', e) return res.status(500).json({ ok: false, error: String(e) }) } }) app.listen(PORT, '0.0.0.0', () => { console.log(`Push server listening on http://0.0.0.0:${PORT}`) console.log('ENV: UNI_PUSH_URL=', process.env.UNI_PUSH_URL) console.log('ENV: UNI_PUSH_AUTH_URL=', process.env.UNI_PUSH_AUTH_URL) console.log('ENV: UNI_PUSH_APPID=', process.env.UNI_PUSH_APPID) console.log('ENV: PUSH_PROXY_URL=', process.env.PUSH_PROXY_URL) }) } start().catch(e => { console.error('启动失败:', e) process.exit(1) })