Files
medical-mall/server/notify-worker.js
2026-03-16 14:58:00 +08:00

281 lines
9.2 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// notify-worker.js
// 常驻队列消费者:从 notify_queue 读取待处理事件,写入 express_notifications。
// push-server consumer 将继续轮询 express_notifications 并调用 CLOUD_FUNC_URL 实际下发。
//
// 配置加载优先级:
// 1) 系统环境变量
// 2) CONFIG_FILE/CONFIG_PATH
// 3) 同目录 notify-worker.config.json
// 4) server/.env / server/config.json via server/load-config.js
const fs = require('fs')
const path = require('path')
const localConfigPath = path.join(__dirname, 'notify-worker.config.json')
if (!process.env.CONFIG_FILE && !process.env.CONFIG_PATH && fs.existsSync(localConfigPath)) {
process.env.CONFIG_FILE = localConfigPath
}
require('./load-config')
const crypto = require('crypto')
const fetchImpl = (globalThis.fetch ? globalThis.fetch.bind(globalThis) : (() => {
try {
return require('node-fetch')
} catch (e) {
throw new Error("No fetch implementation found. Use Node.js 18+ or install 'node-fetch'.")
}
})())
const SUPA_URL = (process.env.SUPA_URL || process.env.SUPA_URL_OVERRIDE || '').replace(/\/$/, '')
// Prefer service role key when present (server-side), to avoid RLS issues.
const SUPA_KEY = process.env.SERVICE_ROLE_KEY || process.env.SUPA_KEY || ''
// notify-worker needs to read ml_orders which is often protected by RLS.
// To avoid changing behavior of other services, allow a worker-specific toggle.
const SUPA_USE_BEARER = (
process.env.NOTIFY_WORKER_SUPA_USE_BEARER ||
process.env.WORKER_SUPA_USE_BEARER ||
process.env.SUPA_USE_BEARER ||
'false'
) === 'true'
const POLL_MS = Number(process.env.NOTIFY_WORKER_POLL_MS || process.env.WORKER_POLL_MS || 2000)
const BATCH_SIZE = Number(process.env.NOTIFY_WORKER_BATCH_SIZE || process.env.WORKER_BATCH_SIZE || 20)
const RUN_ONCE = (process.env.RUN_ONCE === 'true' || process.env.RUN_ONCE === '1')
function supaFetch(restPath, opts = {}, useBearer = false) {
const url = `${SUPA_URL}/rest/v1/${restPath}`
const headers = Object.assign({}, opts.headers || {}, {
apikey: SUPA_KEY,
Accept: 'application/json'
})
if (useBearer) headers.Authorization = `Bearer ${SUPA_KEY}`
if (process.env.DEBUG_SUPA) {
console.log(`[DEBUG] supaFetch: ${opts.method || 'GET'} ${url}`)
console.log(`[DEBUG] Headers: apikey=${SUPA_KEY ? SUPA_KEY.substring(0, 10) : 'MISSING'}... Authorization=${headers.Authorization ? 'PRESENT' : 'NONE'}`)
}
return fetchImpl(url, Object.assign({}, opts, { headers }))
}
function sha256Hex(input) {
return crypto.createHash('sha256').update(String(input)).digest('hex')
}
function buildMessageId({ aud, waybill_id, dedupe_key }) {
const h = sha256Hex(`${aud}|${waybill_id}|${dedupe_key}`)
return `evt_${aud}_${h.slice(0, 32)}`
}
async function fetchPendingQueue(limit) {
const q = `notify_queue?processed_at=is.null&order=created_at.asc&limit=${encodeURIComponent(String(limit))}`
const resp = await supaFetch(q, { method: 'GET' }, false)
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch notify_queue failed: HTTP ${resp.status} ${txt}`)
}
return await resp.json()
}
async function fetchWaybill(waybillId) {
const resp = await supaFetch(
`platform_express_waybills?id=eq.${encodeURIComponent(waybillId)}&select=id,order_id,order_no,carrier,tracking_no`,
{ method: 'GET' },
false
)
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch waybill failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json()
return rows && rows[0] ? rows[0] : null
}
async function fetchOrderById(orderId) {
const resp = await supaFetch(
`ml_orders?id=eq.${encodeURIComponent(orderId)}&select=id,order_no,user_id,merchant_id`,
{ method: 'GET' },
SUPA_USE_BEARER
)
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch order (id) failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json()
return rows && rows[0] ? rows[0] : null
}
async function fetchOrderByNo(orderNo) {
const resp = await supaFetch(
`ml_orders?order_no=eq.${encodeURIComponent(orderNo)}&select=id,order_no,user_id,merchant_id`,
{ method: 'GET' },
SUPA_USE_BEARER
)
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`fetch order (order_no) failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json()
return rows && rows[0] ? rows[0] : null
}
async function upsertExpressNotification(body) {
const resp = await supaFetch('express_notifications?on_conflict=message_id', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Prefer: 'return=representation,resolution=merge-duplicates'
},
body: JSON.stringify(body)
})
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`insert express_notifications failed: HTTP ${resp.status} ${txt}`)
}
const rows = await resp.json().catch(() => null)
return Array.isArray(rows) ? rows[0] : rows
}
async function markQueueProcessed(id, patch) {
const resp = await supaFetch(`notify_queue?id=eq.${encodeURIComponent(id)}`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(Object.assign({}, patch, { processed_at: new Date().toISOString() }))
})
if (!resp.ok) {
const txt = await resp.text().catch(() => '')
throw new Error(`patch notify_queue failed: HTTP ${resp.status} ${txt}`)
}
}
async function processOne(row) {
const waybillId = row.waybill_id
if (!waybillId) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'missing waybill_id' })
return
}
const waybill = await fetchWaybill(waybillId)
if (!waybill) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'waybill not found' })
return
}
let order = null
if (waybill.order_id) order = await fetchOrderById(waybill.order_id)
if (!order && waybill.order_no) order = await fetchOrderByNo(waybill.order_no)
if (!order) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'order not found for waybill' })
return
}
const recipients = []
if (order.user_id) recipients.push({ aud: 'user', recipient_id: order.user_id })
if (order.merchant_id) recipients.push({ aud: 'merchant', recipient_id: order.merchant_id })
if (recipients.length === 0) {
await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'no recipient_id resolved from order' })
return
}
const payload = {
biz: 'express',
order_no: order.order_no || waybill.order_no || null,
order_id: order.id || waybill.order_id || null,
waybill_id: waybill.id,
carrier: waybill.carrier || row.carrier || null,
tracking_no: waybill.tracking_no || row.tracking_no || null,
status_code: row.status_code,
event_time: row.event_time,
event_id: row.event_id || null
}
// 为每个受众写入一条通知message_id 按 aud 区分,确保幂等)
for (const r of recipients) {
const message_id = buildMessageId({ aud: r.aud, waybill_id: waybill.id, dedupe_key: row.dedupe_key })
const dedupe_key = `${waybill.id}|${r.aud}|${row.dedupe_key}`.slice(0, 256)
await upsertExpressNotification({
aud: r.aud,
recipient_id: r.recipient_id,
order_id: order.id || null,
waybill_id: waybill.id,
tracking_no: waybill.tracking_no || null,
carrier: waybill.carrier || null,
message_id,
event_text_safe: row.event_text || null,
status_code: row.status_code,
event_time: row.event_time || null,
payload,
dedupe_key
})
}
await markQueueProcessed(row.id, { process_status: 'queued', last_error: null })
}
async function loopOnce() {
const rows = await fetchPendingQueue(BATCH_SIZE)
if (!rows || rows.length === 0) return { processed: 0 }
let processed = 0
for (const row of rows) {
try {
await processOne(row)
processed += 1
} catch (e) {
const msg = (e && e.message) ? e.message : String(e)
try {
await markQueueProcessed(row.id, { process_status: 'failed', last_error: msg.substring(0, 2000) })
} catch (patchErr) {
console.warn('failed to mark queue row failed:', patchErr)
}
console.warn('processOne failed:', msg)
processed += 1
}
}
return { processed }
}
async function main() {
if (!SUPA_URL || !SUPA_KEY) {
console.error('SUPA_URL and SUPA_KEY must be set in env')
process.exit(1)
}
console.log('notify-worker starting...')
console.log('ENV: SUPA_URL=', SUPA_URL)
console.log('ENV: SUPA_USE_BEARER=', SUPA_USE_BEARER)
console.log('ENV: POLL_MS=', POLL_MS, 'BATCH_SIZE=', BATCH_SIZE, 'RUN_ONCE=', RUN_ONCE)
if (RUN_ONCE) {
const r = await loopOnce()
console.log('notify-worker done (run once):', r)
return
}
// 常驻轮询
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const r = await loopOnce()
if (r.processed > 0) console.log('notify-worker processed:', r.processed)
} catch (e) {
console.warn('notify-worker loop error:', e && e.message ? e.message : e)
}
await new Promise(resolve => setTimeout(resolve, POLL_MS))
}
}
main().catch(e => {
console.error('notify-worker fatal:', e)
process.exit(1)
})