572 lines
26 KiB
JavaScript
572 lines
26 KiB
JavaScript
try {
|
||
require('./load-config')
|
||
} catch (e) {
|
||
// no-op if no config file
|
||
}
|
||
|
||
const express = require('express')
|
||
const bodyParser = require('body-parser')
|
||
const cors = require('cors')
|
||
const fs = require('fs').promises
|
||
const path = require('path')
|
||
const fetchImpl = (globalThis.fetch ? globalThis.fetch.bind(globalThis) : (() => {
|
||
try {
|
||
return require('node-fetch')
|
||
} catch (e) {
|
||
throw new Error("No fetch implementation found. Use Node.js 18+ or install 'node-fetch'.")
|
||
}
|
||
})())
|
||
const { spawn } = require('child_process')
|
||
|
||
// 支持服务专用端口,避免与 webhook-receiver 共用 server/config.json 时发生端口冲突
|
||
const PORT = process.env.PUSH_SERVER_PORT || process.env.PORT || 7301
|
||
const DATA_DIR = path.join(__dirname, 'data')
|
||
const DEVICES_FILE = path.join(DATA_DIR, 'push_devices.json')
|
||
const SUPA_URL = process.env.SUPA_URL || ''
|
||
// 支持两种环境变量名:SUPA_KEY(旧)和 SERVICE_ROLE_KEY(常用于自托管 .env)
|
||
// Prefer service role key when present (server-side), to avoid RLS issues.
|
||
const SUPA_KEY = process.env.SERVICE_ROLE_KEY || process.env.SUPA_KEY || ''
|
||
const SUPA_SCHEMA = process.env.SUPA_SCHEMA || 'public'
|
||
|
||
// Consumer 配置
|
||
const CONSUMER_ENABLED = (process.env.ENABLE_CONSUMER === 'true') || (process.env.CONSUMER_ENABLED === 'true')
|
||
const CONSUMER_POLL_MS = parseInt(process.env.CONSUMER_POLL_MS || process.env.CONSUMER_POLL_INTERVAL_MS || '2000', 10)
|
||
const CLOUD_FUNC_URL = process.env.CLOUD_FUNC_URL || ''
|
||
const PUSH_TOKEN = process.env.PUSH_TOKEN || ''
|
||
// Retry/backoff config
|
||
const MAX_RETRIES = parseInt(process.env.MAX_RETRIES || '5', 10)
|
||
const RETRY_INITIAL_MS = parseInt(process.env.RETRY_INITIAL_MS || '5000', 10)
|
||
const RETRY_FACTOR = parseInt(process.env.RETRY_FACTOR || '2', 10)
|
||
const RETRY_MAX_MS = parseInt(process.env.RETRY_MAX_MS || '3600000', 10) // 1 hour
|
||
|
||
let warnedMissingSendStatus = false
|
||
|
||
async function ensureDataDir() {
|
||
try {
|
||
await fs.mkdir(DATA_DIR, { recursive: true })
|
||
try {
|
||
await fs.access(DEVICES_FILE)
|
||
} catch (e) {
|
||
await fs.writeFile(DEVICES_FILE, '[]', 'utf8')
|
||
}
|
||
} catch (e) {
|
||
console.error('无法创建数据目录:', e)
|
||
process.exit(1)
|
||
}
|
||
}
|
||
|
||
async function readDevices() {
|
||
try {
|
||
const txt = await fs.readFile(DEVICES_FILE, 'utf8')
|
||
return JSON.parse(txt || '[]')
|
||
} catch (e) {
|
||
return []
|
||
}
|
||
}
|
||
|
||
async function writeDevices(devices) {
|
||
await fs.writeFile(DEVICES_FILE, JSON.stringify(devices, null, 2), 'utf8')
|
||
}
|
||
|
||
async function supaFetch(path, opts = {}) {
|
||
const url = `${SUPA_URL.replace(/\/$/, '')}/rest/v1/${path}`
|
||
// 默认只发送 apikey。
|
||
// 只有在显式要求 Bearer(环境变量 SUPA_USE_BEARER=true)时,才添加 Authorization 头。
|
||
// 注意:在一些自托管 Supabase/Kong 场景中,apikey 可用于 Kong 的 key-auth,
|
||
// 但 PostgREST 可能无法解码我们手动附加的 Bearer JWT(JWT_SECRET 不一致会触发 PGRST301)。
|
||
const headers = Object.assign({}, opts.headers || {}, {
|
||
apikey: SUPA_KEY,
|
||
Accept: 'application/json'
|
||
})
|
||
const sendBearer = (process.env.SUPA_USE_BEARER === 'true')
|
||
if (sendBearer) headers.Authorization = `Bearer ${SUPA_KEY}`
|
||
try {
|
||
console.log('supaFetch ->', url)
|
||
const resp = await fetchImpl(url, Object.assign({}, opts, { headers }))
|
||
if (!resp.ok) {
|
||
let txt = ''
|
||
try { txt = await resp.text() } catch (e) { txt = `<unable to read body: ${e}>` }
|
||
console.warn('supaFetch failed', resp.status, url, txt)
|
||
} else {
|
||
// optional: peek at small JSON responses for debugging
|
||
try {
|
||
const clone = resp.clone()
|
||
const maybeText = await clone.text()
|
||
if (maybeText && maybeText.length < 2000) console.log('supaFetch response preview:', maybeText)
|
||
} catch (e) {
|
||
// ignore preview errors
|
||
}
|
||
}
|
||
return resp
|
||
} catch (e) {
|
||
console.warn('supaFetch exception', e, url)
|
||
throw e
|
||
}
|
||
}
|
||
|
||
// Cloud-function-only:
|
||
// UNI-PUSH adapter has been disabled. Push delivery should be implemented inside the cloud function.
|
||
async function sendToUniPush() {
|
||
throw new Error('UNI-PUSH adapter disabled (cloud-function-only mode)')
|
||
}
|
||
|
||
async function getDevicesFromSupabase({ user_id, active } = {}) {
|
||
let q = 'push_devices'
|
||
const params = []
|
||
if (user_id) params.push(`user_id=eq.${encodeURIComponent(user_id)}`)
|
||
if (active != null) params.push(`is_active=eq.${encodeURIComponent(active ? 'true' : 'false')}`)
|
||
const suffix = params.length ? `?${params.join('&')}` : ''
|
||
const resp = await supaFetch(`${q}${suffix}`, { method: 'GET' })
|
||
if (!resp.ok) {
|
||
const txt = await resp.text()
|
||
throw new Error(`supabase get devices failed ${resp.status} ${txt}`)
|
||
}
|
||
const data = await resp.json()
|
||
return data
|
||
}
|
||
|
||
async function start() {
|
||
await ensureDataDir()
|
||
const app = express()
|
||
app.use(cors())
|
||
app.use(bodyParser.json())
|
||
|
||
// 部署路由与简单鉴权(若设置了 DEPLOY_BEARER 或 DEPLOY_TOKEN 环境变量)
|
||
try {
|
||
const deployRouter = require('./routes/deploy')
|
||
const DEPLOY_BEARER = process.env.DEPLOY_BEARER || process.env.DEPLOY_TOKEN || process.env.ADMIN_DEPLOY_TOKEN || ''
|
||
app.use('/api/v1', (req, res, next) => {
|
||
if (req.path === '/deploy-cloudfunc' && DEPLOY_BEARER) {
|
||
const auth = req.headers.authorization || ''
|
||
if (auth !== `Bearer ${DEPLOY_BEARER}`) return res.status(401).json({ ok: false, error: 'unauthorized' })
|
||
}
|
||
next()
|
||
}, deployRouter)
|
||
} catch (e) {
|
||
console.warn('deploy route not mounted (missing file?):', e && e.message)
|
||
}
|
||
|
||
app.get('/health', (req, res) => res.json({ ok: true }))
|
||
|
||
// ---- Consumer: 从 express_notifications 拉取 pending 记录并调用云函数 ----
|
||
async function fetchPendingNotifications(limit = 5) {
|
||
// 只在配置了 Supabase REST 时工作
|
||
if (!SUPA_URL || !SUPA_KEY) return []
|
||
try {
|
||
// 查询:
|
||
// - pending: send_status IS NULL
|
||
// - retrying: send_status = 'retrying' 且 (next_attempt_at IS NULL 或 next_attempt_at <= now)
|
||
const now = new Date().toISOString()
|
||
const q = `express_notifications?or=(send_status.is.null,and(send_status.eq.retrying,or(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})))&order=created_at.asc&limit=${limit}`
|
||
const resp = await supaFetch(q, { method: 'GET' })
|
||
if (!resp.ok) {
|
||
if (!warnedMissingSendStatus && resp.status === 400) {
|
||
warnedMissingSendStatus = true
|
||
console.warn('Consumer query failed. If this mentions missing column "send_status", run migration: pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql')
|
||
}
|
||
return []
|
||
}
|
||
const data = await resp.json().catch(() => [])
|
||
return Array.isArray(data) ? data : []
|
||
} catch (e) {
|
||
console.warn('fetchPendingNotifications error', e)
|
||
return []
|
||
}
|
||
}
|
||
|
||
async function claimNotification(id) {
|
||
try {
|
||
const body = { send_status: 'processing', updated_at: new Date().toISOString() }
|
||
// 仅当当前仍为 pending 或 retrying 且到期时,才抢占。
|
||
// 为了避免在 PATCH 中使用复杂 or= 逻辑树导致匹配失败,这里拆成两次尝试。
|
||
const now = new Date().toISOString()
|
||
const attempts = [
|
||
// pending: send_status IS NULL
|
||
`express_notifications?id=eq.${encodeURIComponent(id)}&send_status=is.null`,
|
||
// retrying: send_status='retrying' and next_attempt_at is null or <= now
|
||
`express_notifications?id=eq.${encodeURIComponent(id)}&send_status=eq.retrying&or=(next_attempt_at.is.null,next_attempt_at.lte.${encodeURIComponent(now)})`
|
||
]
|
||
|
||
for (const path of attempts) {
|
||
const resp = await supaFetch(path, { method: 'PATCH', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' }, body: JSON.stringify(body) })
|
||
if (!resp.ok && !warnedMissingSendStatus && resp.status === 400) {
|
||
warnedMissingSendStatus = true
|
||
console.warn('Consumer claim failed. If this mentions missing column "send_status", run migration: pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql')
|
||
}
|
||
if (!resp.ok) continue
|
||
const j = await resp.json().catch(() => null)
|
||
if (Array.isArray(j)) {
|
||
if (j.length > 0) return j[0]
|
||
// debug hint: matched 0 rows
|
||
try { console.log('claimNotification: updated 0 rows for', path) } catch (e) {}
|
||
continue
|
||
}
|
||
if (j && j.id) return j
|
||
}
|
||
return null
|
||
} catch (e) {
|
||
console.warn('claimNotification error', e)
|
||
return null
|
||
}
|
||
}
|
||
|
||
async function updateNotificationStatus(id, status, note) {
|
||
try {
|
||
if (!id) return
|
||
const body = { send_status: String(status), updated_at: new Date().toISOString() }
|
||
if (note) body.last_error = String(note).substring(0, 2000)
|
||
const resp = await supaFetch(`express_notifications?id=eq.${encodeURIComponent(id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) })
|
||
if (!resp.ok && !warnedMissingSendStatus && resp.status === 400) {
|
||
warnedMissingSendStatus = true
|
||
console.warn('Consumer status update failed. If this mentions missing column "send_status", run migration: pages/mall/delivery/doc/需求文档/20260309_add_express_notifications_send_status.sql')
|
||
}
|
||
} catch (e) { console.warn('updateNotificationStatus error', e) }
|
||
}
|
||
|
||
async function invokeCloudFuncForCid(funcUrl, token, pushCid, title, content, payload) {
|
||
try {
|
||
const body = { token: token || PUSH_TOKEN || null, push_clientid: pushCid, title: title || '', content: content || '', payload: payload || {} }
|
||
const resp = await fetchImpl(funcUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) })
|
||
const txt = await resp.text().catch(() => '')
|
||
let json
|
||
try { json = JSON.parse(txt) } catch (e) { json = { statusText: txt } }
|
||
// Treat explicit business-level failures as errors even when HTTP is 2xx.
|
||
// Many cloud functions return `{ ok: false, ... }` with HTTP 200.
|
||
const businessOk = (() => {
|
||
if (!json || typeof json !== 'object') return true
|
||
if (json.ok === false) return false
|
||
// uniCloud/uni-push demo often returns { errCode: 0|..., errMsg: '...' }
|
||
if (typeof json.errCode === 'number') return json.errCode === 0
|
||
if (typeof json.errCode === 'string' && json.errCode.trim() !== '') return json.errCode === '0'
|
||
return true
|
||
})()
|
||
return { ok: resp.ok && businessOk, httpOk: resp.ok, status: resp.status, body: json }
|
||
} catch (e) {
|
||
return { ok: false, error: String(e) }
|
||
}
|
||
}
|
||
|
||
async function processNotificationRecord(rec) {
|
||
if (!rec || !rec.id) return
|
||
// claim the record (set processing) to avoid races
|
||
const claimed = await claimNotification(rec.id)
|
||
if (!claimed) return // someone else claimed or failed
|
||
|
||
const aud = claimed.aud
|
||
const recipient_id = claimed.recipient_id
|
||
const notification = claimed.payload && claimed.payload.notification ? claimed.payload.notification : (claimed.event_text_safe ? { title: claimed.event_text_safe, body: '' } : {})
|
||
const payload = claimed.payload || {}
|
||
|
||
// Some push providers (and client notification renderers) may not display a system notification
|
||
// when content/body is empty, even if the push API returns success.
|
||
const titleText = (notification && (notification.title || notification.event_text || notification.text)) || claimed.event_text_safe || ''
|
||
const bodyText = (notification && (notification.body || notification.content)) || ''
|
||
const contentText = bodyText || titleText || ''
|
||
|
||
// build targets similar to /api/v1/notifications
|
||
let targets = []
|
||
try {
|
||
if (SUPA_URL && SUPA_KEY) {
|
||
if (aud === 'user') {
|
||
const devices = await getDevicesFromSupabase({ user_id: recipient_id, active: true })
|
||
targets = devices.map(d => d.cid)
|
||
} else if (aud === 'merchant') {
|
||
const resp = await supaFetch(`push_devices?merchant_id=eq.${encodeURIComponent(recipient_id)}&is_active=eq.true`, { method: 'GET' })
|
||
if (resp.ok) {
|
||
const devs = await resp.json().catch(() => [])
|
||
targets = devs.map(d => d.cid)
|
||
}
|
||
}
|
||
}
|
||
} catch (e) {
|
||
console.warn('processNotificationRecord fetch devices failed', e)
|
||
}
|
||
|
||
if (!targets || targets.length === 0) {
|
||
await updateNotificationStatus(claimed.id, 'no-targets', 'no active devices')
|
||
return
|
||
}
|
||
|
||
// Cloud-function-only: always POST per cid to CLOUD_FUNC_URL
|
||
let allOk = true
|
||
let lastNote = ''
|
||
if (CLOUD_FUNC_URL) {
|
||
const calls = await Promise.all(targets.map(cid => invokeCloudFuncForCid(CLOUD_FUNC_URL, PUSH_TOKEN, cid, titleText, contentText, payload)))
|
||
for (const c of calls) {
|
||
if (!c.ok) {
|
||
allOk = false
|
||
const err = c.error ? String(c.error) : ''
|
||
const bodyStr = c.body ? JSON.stringify(c.body) : ''
|
||
lastNote = `cloudfunc failed: status=${c.status || ''} httpOk=${c.httpOk === true} ${err} ${bodyStr}`.trim().substring(0, 1000)
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
allOk = false
|
||
lastNote = 'no CLOUD_FUNC_URL configured'
|
||
}
|
||
|
||
if (allOk) {
|
||
await updateNotificationStatus(claimed.id, 'success', null)
|
||
} else {
|
||
// failure -> increment retry_count, set last_error and next_attempt_at or mark failed
|
||
const currentRetry = parseInt(claimed.retry_count || 0, 10)
|
||
const nextRetry = currentRetry + 1
|
||
if (nextRetry >= MAX_RETRIES) {
|
||
// mark failed
|
||
try {
|
||
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ send_status: 'failed', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), updated_at: new Date().toISOString() }) })
|
||
} catch (e) { console.warn('mark failed error', e) }
|
||
} else {
|
||
// compute exponential backoff
|
||
let delay = RETRY_INITIAL_MS * Math.pow(RETRY_FACTOR, currentRetry)
|
||
if (delay > RETRY_MAX_MS) delay = RETRY_MAX_MS
|
||
const nextAt = new Date(Date.now() + delay).toISOString()
|
||
try {
|
||
await supaFetch(`express_notifications?id=eq.${encodeURIComponent(claimed.id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ send_status: 'retrying', retry_count: nextRetry, last_error: String(lastNote).substring(0,2000), next_attempt_at: nextAt, updated_at: new Date().toISOString() }) })
|
||
} catch (e) { console.warn('schedule retry error', e) }
|
||
}
|
||
}
|
||
}
|
||
|
||
async function consumerOnce() {
|
||
try {
|
||
const pending = await fetchPendingNotifications(5)
|
||
if (!pending || pending.length === 0) return
|
||
for (const rec of pending) {
|
||
try { await processNotificationRecord(rec) } catch (e) { console.warn('consumer process error', e) }
|
||
}
|
||
} catch (e) {
|
||
console.warn('consumerOnce error', e)
|
||
}
|
||
}
|
||
|
||
if (CONSUMER_ENABLED) {
|
||
if (!SUPA_URL || !SUPA_KEY) console.warn('Consumer enabled but SUPA_URL/SUPA_KEY not configured; consumer will not run against Supabase.')
|
||
else console.log('Notification consumer enabled, poll interval(ms)=', CONSUMER_POLL_MS)
|
||
setInterval(consumerOnce, CONSUMER_POLL_MS)
|
||
}
|
||
// ---- end consumer ----
|
||
|
||
// 注册或更新设备
|
||
app.post('/api/v1/push/register', async (req, res) => {
|
||
const { cid, user_id, platform } = req.body || {}
|
||
if (!cid) return res.status(400).json({ error: 'cid required' })
|
||
const devices = await readDevices()
|
||
let found = devices.find(d => d.cid === cid)
|
||
const now = new Date().toISOString()
|
||
if (found) {
|
||
found.user_id = user_id ?? found.user_id
|
||
found.platform = platform ?? found.platform
|
||
found.updated_at = now
|
||
found.active = true
|
||
} else {
|
||
found = { cid, user_id: user_id ?? null, platform: platform ?? null, created_at: now, updated_at: now, active: true }
|
||
devices.push(found)
|
||
}
|
||
await writeDevices(devices)
|
||
// 如果配置了 Supabase,则尝试写入 supabase 表(异步记录错误)
|
||
if (SUPA_URL && SUPA_KEY) {
|
||
(async () => {
|
||
try {
|
||
const body = {
|
||
cid: found.cid,
|
||
user_id: found.user_id,
|
||
platform: found.platform,
|
||
appid: found.appid || 'default',
|
||
is_active: true,
|
||
last_seen_at: found.updated_at
|
||
}
|
||
const resp = await supaFetch(`push_devices?on_conflict=cid`, {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' },
|
||
body: JSON.stringify(body)
|
||
})
|
||
if (!resp.ok) {
|
||
const txt = await resp.text()
|
||
console.warn('Supabase upsert push_device failed:', resp.status, txt)
|
||
}
|
||
} catch (e) {
|
||
console.warn('Supabase upsert exception:', e)
|
||
}
|
||
})()
|
||
}
|
||
|
||
return res.json({ ok: true, cid })
|
||
})
|
||
|
||
// 注销设备(可选移除或置为 inactive)
|
||
app.post('/api/v1/push/unregister', async (req, res) => {
|
||
const { cid, user_id } = req.body || {}
|
||
if (!cid && !user_id) return res.status(400).json({ error: 'cid or user_id required' })
|
||
let devices = await readDevices()
|
||
if (cid) {
|
||
devices = devices.map(d => d.cid === cid ? Object.assign({}, d, { active: false, updated_at: new Date().toISOString() }) : d)
|
||
} else if (user_id) {
|
||
devices = devices.map(d => d.user_id === user_id ? Object.assign({}, d, { active: false, updated_at: new Date().toISOString() }) : d)
|
||
}
|
||
await writeDevices(devices)
|
||
// 同步到 Supabase(如果可用)
|
||
if (SUPA_URL && SUPA_KEY) {
|
||
(async () => {
|
||
try {
|
||
const now = new Date().toISOString()
|
||
if (cid) {
|
||
await supaFetch(`push_devices?cid=eq.${encodeURIComponent(cid)}`, {
|
||
method: 'PATCH',
|
||
headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' },
|
||
body: JSON.stringify({ is_active: false, updated_at: now })
|
||
})
|
||
} else if (user_id) {
|
||
await supaFetch(`push_devices?user_id=eq.${encodeURIComponent(user_id)}`, {
|
||
method: 'PATCH',
|
||
headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' },
|
||
body: JSON.stringify({ is_active: false, updated_at: now })
|
||
})
|
||
}
|
||
} catch (e) {
|
||
console.warn('Supabase unregister exception:', e)
|
||
}
|
||
})()
|
||
}
|
||
|
||
return res.json({ ok: true })
|
||
})
|
||
|
||
// 列出设备
|
||
app.get('/api/v1/push/devices', async (req, res) => {
|
||
const { user_id, active } = req.query
|
||
try {
|
||
if (SUPA_URL && SUPA_KEY) {
|
||
const devices = await getDevicesFromSupabase({ user_id, active: active == null ? undefined : (active === 'true') })
|
||
return res.json({ ok: true, total: devices.length, data: devices })
|
||
}
|
||
let devices = await readDevices()
|
||
if (user_id) devices = devices.filter(d => String(d.user_id) === String(user_id))
|
||
if (active != null) devices = devices.filter(d => String(!!d.active) === String(active === 'true'))
|
||
res.json({ ok: true, total: devices.length, data: devices })
|
||
} catch (e) {
|
||
console.warn('list devices failed:', e)
|
||
res.status(500).json({ ok: false, error: String(e) })
|
||
}
|
||
})
|
||
|
||
// 发送推送(mock 或代理到真实 provider,当环境变量 PUSH_PROXY_URL 设置时会代理请求)
|
||
app.post('/api/v1/push/send', async (req, res) => {
|
||
const { cids, user_id, notification, payload } = req.body || {}
|
||
if ((!cids || cids.length === 0) && !user_id) return res.status(400).json({ error: 'cids or user_id required' })
|
||
|
||
let targets = []
|
||
if (cids && cids.length > 0) targets = cids
|
||
else if (user_id) {
|
||
if (SUPA_URL && SUPA_KEY) {
|
||
try {
|
||
const devices = await getDevicesFromSupabase({ user_id, active: true })
|
||
targets = devices.map(d => d.cid)
|
||
} catch (e) {
|
||
console.warn('Supabase query devices failed, falling back to local:', e)
|
||
const devices = await readDevices()
|
||
targets = devices.filter(d => String(d.user_id) === String(user_id) && d.active).map(d => d.cid)
|
||
}
|
||
} else {
|
||
const devices = await readDevices()
|
||
targets = devices.filter(d => String(d.user_id) === String(user_id) && d.active).map(d => d.cid)
|
||
}
|
||
}
|
||
|
||
if (!CLOUD_FUNC_URL) return res.status(500).json({ ok: false, error: 'CLOUD_FUNC_URL not configured' })
|
||
console.log('send handler (cloud function only) targets count:', targets.length)
|
||
const calls = await Promise.all((targets || []).map(cid => invokeCloudFuncForCid(CLOUD_FUNC_URL, PUSH_TOKEN, cid, notification && notification.title, notification && (notification.body || notification.content), payload)))
|
||
const allOk = calls.every(c => c && c.ok)
|
||
return res.status(allOk ? 200 : 502).json({ ok: allOk, sent: targets.length, results: calls })
|
||
})
|
||
|
||
// 创建通知并基于数据库记录推送(写入 express_notifications 并发送)
|
||
app.post('/api/v1/notifications', async (req, res) => {
|
||
const { aud, recipient_id, notification, payload, dedupe_key, message_id, order_id, waybill_id } = req.body || {}
|
||
if (!aud || !recipient_id) return res.status(400).json({ error: 'aud and recipient_id required' })
|
||
|
||
try {
|
||
// 写入 express_notifications 表并返回记录(使用 return=representation)
|
||
const body = Object.assign({}, { aud, recipient_id, event_text_safe: (notification && notification.title) || null, payload: payload || null, dedupe_key: dedupe_key || null, message_id: message_id || null, order_id: order_id || null, waybill_id: waybill_id || null, event_time: new Date().toISOString() })
|
||
const insertResp = await supaFetch(`express_notifications?on_conflict=message_id`, {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' },
|
||
body: JSON.stringify(body)
|
||
})
|
||
if (!insertResp.ok) {
|
||
const txt = await insertResp.text()
|
||
return res.status(500).json({ ok: false, error: `insert notification failed ${insertResp.status} ${txt}` })
|
||
}
|
||
const inserted = await insertResp.json()
|
||
const record = Array.isArray(inserted) ? inserted[0] : inserted
|
||
|
||
// 根据 aud 拉取目标设备
|
||
let targets = []
|
||
try {
|
||
if (SUPA_URL && SUPA_KEY) {
|
||
if (aud === 'user') {
|
||
const devices = await getDevicesFromSupabase({ user_id: recipient_id, active: true })
|
||
targets = devices.map(d => d.cid)
|
||
} else if (aud === 'merchant') {
|
||
// 使用 merchant_id 字段查询
|
||
const resp = await supaFetch(`push_devices?merchant_id=eq.${encodeURIComponent(recipient_id)}&is_active=eq.true`, { method: 'GET' })
|
||
if (resp.ok) {
|
||
const devices = await resp.json()
|
||
targets = devices.map(d => d.cid)
|
||
}
|
||
}
|
||
} else {
|
||
// 本地文件回退
|
||
const devices = await readDevices()
|
||
if (aud === 'user') targets = devices.filter(d => String(d.user_id) === String(recipient_id) && d.active).map(d => d.cid)
|
||
else if (aud === 'merchant') targets = devices.filter(d => String(d.merchant_id) === String(recipient_id) && d.active).map(d => d.cid)
|
||
}
|
||
} catch (e) {
|
||
console.warn('fetch devices for notification failed:', e)
|
||
}
|
||
|
||
// Cloud-function-only:
|
||
// - This endpoint writes to express_notifications.
|
||
// - Actual push delivery is performed by the consumer that polls express_notifications.
|
||
return res.json({ ok: true, queued: true, notification_id: record && record.id, targets: targets.length })
|
||
} catch (e) {
|
||
console.warn('create notification failed:', e)
|
||
return res.status(500).json({ ok: false, error: String(e) })
|
||
}
|
||
})
|
||
|
||
app.listen(PORT, '0.0.0.0', () => {
|
||
console.log(`Push server listening on http://0.0.0.0:${PORT}`)
|
||
console.log('ENV: CLOUD_FUNC_URL configured?', !!process.env.CLOUD_FUNC_URL)
|
||
console.log('ENV: ENABLE_CONSUMER=', process.env.ENABLE_CONSUMER)
|
||
// // 可选:在启动时自动运行部署脚本以打包并上传云函数
|
||
// try {
|
||
// const AUTO_DEPLOY = (process.env.AUTO_DEPLOY_ON_START === 'true' || process.env.AUTO_DEPLOY_ON_START === '1')
|
||
// const DEPLOY_BEARER = process.env.DEPLOY_BEARER || process.env.DEPLOY_TOKEN || process.env.ADMIN_DEPLOY_TOKEN || ''
|
||
// if (AUTO_DEPLOY) {
|
||
// if (!DEPLOY_BEARER) {
|
||
// console.warn('AUTO_DEPLOY_ON_START enabled but no DEPLOY_BEARER found; skipping auto deploy for safety.')
|
||
// } else {
|
||
// const argsEnv = process.env.AUTO_DEPLOY_ARGS ? process.env.AUTO_DEPLOY_ARGS.split(' ') : []
|
||
// const deployScript = path.join(__dirname, 'tools', 'deploy-cloudfunc.js')
|
||
// const nodeArgs = [deployScript].concat(argsEnv)
|
||
// console.log('Auto-deploy: spawning node', nodeArgs.join(' '))
|
||
// const child = spawn(process.execPath, nodeArgs, { cwd: __dirname, env: Object.assign({}, process.env), stdio: ['ignore', 'pipe', 'pipe'] })
|
||
// child.stdout.on('data', d => console.log('[auto-deploy stdout]', d.toString().trim()))
|
||
// child.stderr.on('data', d => console.error('[auto-deploy stderr]', d.toString().trim()))
|
||
// child.on('exit', (code, signal) => console.log(`Auto-deploy process exited with code=${code} signal=${signal}`))
|
||
// child.on('error', e => console.error('Auto-deploy spawn error', e))
|
||
// }
|
||
// }
|
||
// } catch (e) {
|
||
// console.warn('auto-deploy runtime error', e)
|
||
// }
|
||
})
|
||
}
|
||
|
||
start().catch(e => {
|
||
console.error('启动失败:', e)
|
||
process.exit(1)
|
||
})
|