// notify-worker.js // 常驻队列消费者:从 notify_queue 读取待处理事件,写入 express_notifications。 // push-server consumer 将继续轮询 express_notifications 并调用 CLOUD_FUNC_URL 实际下发。 // // 配置加载优先级: // 1) 系统环境变量 // 2) CONFIG_FILE/CONFIG_PATH // 3) 同目录 notify-worker.config.json // 4) server/.env / server/config.json via server/load-config.js const fs = require('fs') const path = require('path') const localConfigPath = path.join(__dirname, 'notify-worker.config.json') if (!process.env.CONFIG_FILE && !process.env.CONFIG_PATH && fs.existsSync(localConfigPath)) { process.env.CONFIG_FILE = localConfigPath } require('./load-config') const crypto = require('crypto') const fetchImpl = (globalThis.fetch ? globalThis.fetch.bind(globalThis) : (() => { try { return require('node-fetch') } catch (e) { throw new Error("No fetch implementation found. Use Node.js 18+ or install 'node-fetch'.") } })()) const SUPA_URL = (process.env.SUPA_URL || process.env.SUPA_URL_OVERRIDE || '').replace(/\/$/, '') // Prefer service role key when present (server-side), to avoid RLS issues. const SUPA_KEY = process.env.SERVICE_ROLE_KEY || process.env.SUPA_KEY || '' // notify-worker needs to read ml_orders which is often protected by RLS. // To avoid changing behavior of other services, allow a worker-specific toggle. const SUPA_USE_BEARER = ( process.env.NOTIFY_WORKER_SUPA_USE_BEARER || process.env.WORKER_SUPA_USE_BEARER || process.env.SUPA_USE_BEARER || 'false' ) === 'true' const POLL_MS = Number(process.env.NOTIFY_WORKER_POLL_MS || process.env.WORKER_POLL_MS || 2000) const BATCH_SIZE = Number(process.env.NOTIFY_WORKER_BATCH_SIZE || process.env.WORKER_BATCH_SIZE || 20) const RUN_ONCE = (process.env.RUN_ONCE === 'true' || process.env.RUN_ONCE === '1') function supaFetch(restPath, opts = {}, useBearer = false) { const url = `${SUPA_URL}/rest/v1/${restPath}` const headers = Object.assign({}, opts.headers || {}, { apikey: SUPA_KEY, Accept: 'application/json' }) if (useBearer) headers.Authorization = `Bearer ${SUPA_KEY}` if (process.env.DEBUG_SUPA) { console.log(`[DEBUG] supaFetch: ${opts.method || 'GET'} ${url}`) console.log(`[DEBUG] Headers: apikey=${SUPA_KEY ? SUPA_KEY.substring(0, 10) : 'MISSING'}... Authorization=${headers.Authorization ? 'PRESENT' : 'NONE'}`) } return fetchImpl(url, Object.assign({}, opts, { headers })) } function sha256Hex(input) { return crypto.createHash('sha256').update(String(input)).digest('hex') } function buildMessageId({ aud, waybill_id, dedupe_key }) { const h = sha256Hex(`${aud}|${waybill_id}|${dedupe_key}`) return `evt_${aud}_${h.slice(0, 32)}` } async function fetchPendingQueue(limit) { const q = `notify_queue?processed_at=is.null&order=created_at.asc&limit=${encodeURIComponent(String(limit))}` const resp = await supaFetch(q, { method: 'GET' }, false) if (!resp.ok) { const txt = await resp.text().catch(() => '') throw new Error(`fetch notify_queue failed: HTTP ${resp.status} ${txt}`) } return await resp.json() } async function fetchWaybill(waybillId) { const resp = await supaFetch( `platform_express_waybills?id=eq.${encodeURIComponent(waybillId)}&select=id,order_id,order_no,carrier,tracking_no`, { method: 'GET' }, false ) if (!resp.ok) { const txt = await resp.text().catch(() => '') throw new Error(`fetch waybill failed: HTTP ${resp.status} ${txt}`) } const rows = await resp.json() return rows && rows[0] ? rows[0] : null } async function fetchOrderById(orderId) { const resp = await supaFetch( `ml_orders?id=eq.${encodeURIComponent(orderId)}&select=id,order_no,user_id,merchant_id`, { method: 'GET' }, SUPA_USE_BEARER ) if (!resp.ok) { const txt = await resp.text().catch(() => '') throw new Error(`fetch order (id) failed: HTTP ${resp.status} ${txt}`) } const rows = await resp.json() return rows && rows[0] ? rows[0] : null } async function fetchOrderByNo(orderNo) { const resp = await supaFetch( `ml_orders?order_no=eq.${encodeURIComponent(orderNo)}&select=id,order_no,user_id,merchant_id`, { method: 'GET' }, SUPA_USE_BEARER ) if (!resp.ok) { const txt = await resp.text().catch(() => '') throw new Error(`fetch order (order_no) failed: HTTP ${resp.status} ${txt}`) } const rows = await resp.json() return rows && rows[0] ? rows[0] : null } async function upsertExpressNotification(body) { const resp = await supaFetch('express_notifications?on_conflict=message_id', { method: 'POST', headers: { 'Content-Type': 'application/json', Prefer: 'return=representation,resolution=merge-duplicates' }, body: JSON.stringify(body) }) if (!resp.ok) { const txt = await resp.text().catch(() => '') throw new Error(`insert express_notifications failed: HTTP ${resp.status} ${txt}`) } const rows = await resp.json().catch(() => null) return Array.isArray(rows) ? rows[0] : rows } async function markQueueProcessed(id, patch) { const resp = await supaFetch(`notify_queue?id=eq.${encodeURIComponent(id)}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(Object.assign({}, patch, { processed_at: new Date().toISOString() })) }) if (!resp.ok) { const txt = await resp.text().catch(() => '') throw new Error(`patch notify_queue failed: HTTP ${resp.status} ${txt}`) } } async function processOne(row) { const waybillId = row.waybill_id if (!waybillId) { await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'missing waybill_id' }) return } const waybill = await fetchWaybill(waybillId) if (!waybill) { await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'waybill not found' }) return } let order = null if (waybill.order_id) order = await fetchOrderById(waybill.order_id) if (!order && waybill.order_no) order = await fetchOrderByNo(waybill.order_no) if (!order) { await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'order not found for waybill' }) return } const recipients = [] if (order.user_id) recipients.push({ aud: 'user', recipient_id: order.user_id }) if (order.merchant_id) recipients.push({ aud: 'merchant', recipient_id: order.merchant_id }) if (recipients.length === 0) { await markQueueProcessed(row.id, { process_status: 'skipped', last_error: 'no recipient_id resolved from order' }) return } const payload = { biz: 'express', order_no: order.order_no || waybill.order_no || null, order_id: order.id || waybill.order_id || null, waybill_id: waybill.id, carrier: waybill.carrier || row.carrier || null, tracking_no: waybill.tracking_no || row.tracking_no || null, status_code: row.status_code, event_time: row.event_time, event_id: row.event_id || null } // 为每个受众写入一条通知(message_id 按 aud 区分,确保幂等) for (const r of recipients) { const message_id = buildMessageId({ aud: r.aud, waybill_id: waybill.id, dedupe_key: row.dedupe_key }) const dedupe_key = `${waybill.id}|${r.aud}|${row.dedupe_key}`.slice(0, 256) await upsertExpressNotification({ aud: r.aud, recipient_id: r.recipient_id, order_id: order.id || null, waybill_id: waybill.id, tracking_no: waybill.tracking_no || null, carrier: waybill.carrier || null, message_id, event_text_safe: row.event_text || null, status_code: row.status_code, event_time: row.event_time || null, payload, dedupe_key }) } await markQueueProcessed(row.id, { process_status: 'queued', last_error: null }) } async function loopOnce() { const rows = await fetchPendingQueue(BATCH_SIZE) if (!rows || rows.length === 0) return { processed: 0 } let processed = 0 for (const row of rows) { try { await processOne(row) processed += 1 } catch (e) { const msg = (e && e.message) ? e.message : String(e) try { await markQueueProcessed(row.id, { process_status: 'failed', last_error: msg.substring(0, 2000) }) } catch (patchErr) { console.warn('failed to mark queue row failed:', patchErr) } console.warn('processOne failed:', msg) processed += 1 } } return { processed } } async function main() { if (!SUPA_URL || !SUPA_KEY) { console.error('SUPA_URL and SUPA_KEY must be set in env') process.exit(1) } console.log('notify-worker starting...') console.log('ENV: SUPA_URL=', SUPA_URL) console.log('ENV: SUPA_USE_BEARER=', SUPA_USE_BEARER) console.log('ENV: POLL_MS=', POLL_MS, 'BATCH_SIZE=', BATCH_SIZE, 'RUN_ONCE=', RUN_ONCE) if (RUN_ONCE) { const r = await loopOnce() console.log('notify-worker done (run once):', r) return } // 常驻轮询 // eslint-disable-next-line no-constant-condition while (true) { try { const r = await loopOnce() if (r.processed > 0) console.log('notify-worker processed:', r.processed) } catch (e) { console.warn('notify-worker loop error:', e && e.message ? e.message : e) } await new Promise(resolve => setTimeout(resolve, POLL_MS)) } } main().catch(e => { console.error('notify-worker fatal:', e) process.exit(1) })