332 lines
12 KiB
JavaScript
332 lines
12 KiB
JavaScript
// Load configuration into process.env.
|
|
// Priority:
|
|
// 1) Real environment variables
|
|
// 2) CONFIG_FILE/CONFIG_PATH (explicit)
|
|
// 3) Local file next to this script: webhook.config.json
|
|
// 4) server/.env / server/config.json via server/load-config.js
|
|
const fs = require('fs')
|
|
const path = require('path')
|
|
|
|
const localConfigPath = path.join(__dirname, 'webhook.config.json')
|
|
if (!process.env.CONFIG_FILE && !process.env.CONFIG_PATH && fs.existsSync(localConfigPath)) {
|
|
process.env.CONFIG_FILE = localConfigPath
|
|
}
|
|
|
|
require('../../../../server/load-config')
|
|
|
|
const express = require('express')
|
|
const bodyParser = require('body-parser')
|
|
const fetch = (globalThis.fetch ? globalThis.fetch.bind(globalThis) : (() => {
|
|
try {
|
|
// Fallback for older Node versions where fetch is not available.
|
|
return require('node-fetch')
|
|
} catch (e) {
|
|
throw new Error("No fetch implementation found. Use Node.js 18+ or install 'node-fetch'.")
|
|
}
|
|
})())
|
|
const crypto = require('crypto')
|
|
|
|
// 支持服务专用端口,避免与 push-server 共用 server/config.json 时发生端口冲突
|
|
const PORT = process.env.WEBHOOK_PORT || process.env.PORT || 7201
|
|
const SUPA_URL = (process.env.SUPA_URL || process.env.SUPA_URL_OVERRIDE || '').replace(/\/$/, '')
|
|
// Prefer service role key when present (server-side), to avoid RLS issues.
|
|
const SUPA_KEY = process.env.SERVICE_ROLE_KEY || process.env.SUPA_KEY || ''
|
|
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET || '' // optional HMAC secret
|
|
|
|
function supaFetch(path, opts = {}) {
|
|
const url = `${SUPA_URL}/rest/v1/${path}`
|
|
// Default to apikey only (compatible with self-hosted Supabase/Kong key-auth).
|
|
// Only attach Authorization: Bearer when explicitly enabled.
|
|
const headers = Object.assign({}, opts.headers || {}, {
|
|
apikey: SUPA_KEY,
|
|
Accept: 'application/json'
|
|
})
|
|
const sendBearer = (process.env.SUPA_USE_BEARER === 'true')
|
|
if (sendBearer) headers.Authorization = `Bearer ${SUPA_KEY}`
|
|
return fetch(url, Object.assign({}, opts, { headers }))
|
|
}
|
|
|
|
function computeSignature(bodyText, ts) {
|
|
if (!WEBHOOK_SECRET) return ''
|
|
const h = crypto.createHmac('sha256', WEBHOOK_SECRET)
|
|
h.update(bodyText + (ts || ''))
|
|
return h.digest('hex')
|
|
}
|
|
|
|
function stableEventDedupeKey({ tracking_no, carrier, status_code, event_time, event_code, event_text }) {
|
|
// 目的:让 webhook 重复回调(同一事件)不会重复入库,从而避免重复入队/重复通知。
|
|
// 注意:优先使用稳定字段组合;不要使用 Date.now() 这类易变字段。
|
|
const base = JSON.stringify({
|
|
tracking_no: tracking_no || null,
|
|
carrier: carrier || null,
|
|
status_code: status_code || null,
|
|
event_time: event_time || null,
|
|
event_code: event_code || null,
|
|
event_text: (event_text || '').trim().slice(0, 200)
|
|
})
|
|
const hex = crypto.createHash('sha256').update(base).digest('hex')
|
|
return 'WH_' + hex.slice(0, 32)
|
|
}
|
|
|
|
async function upsertRaw(payload, tracking_no, carrier, signature_valid) {
|
|
try {
|
|
const received_at = new Date().toISOString()
|
|
|
|
const minimal = {
|
|
carrier: carrier || null,
|
|
tracking_no: tracking_no || null,
|
|
body: payload,
|
|
received_at,
|
|
signature_valid: signature_valid
|
|
}
|
|
|
|
// Prefer inserting richer metadata when the upgraded schema is present.
|
|
// If the database schema is older (missing columns), fall back to minimal.
|
|
const meta = arguments && arguments.length >= 5 ? arguments[4] : null
|
|
const extended = Object.assign({}, minimal, {
|
|
source: 'webhook',
|
|
client_id: meta && meta.client_id ? meta.client_id : null,
|
|
signature: meta && meta.signature ? meta.signature : null,
|
|
ts_header: meta && meta.ts_header ? meta.ts_header : null,
|
|
request_id: meta && meta.request_id ? meta.request_id : null,
|
|
remote_ip: meta && meta.remote_ip ? meta.remote_ip : null,
|
|
headers: meta && meta.headers ? meta.headers : null,
|
|
dedupe_key: meta && meta.dedupe_key ? meta.dedupe_key : null
|
|
})
|
|
|
|
const tryInsert = async (row) => {
|
|
return await supaFetch('platform_express_event_raw', {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(row)
|
|
})
|
|
}
|
|
|
|
let resp = await tryInsert(extended)
|
|
if (resp && !resp.ok) {
|
|
const txt = await resp.text().catch(() => '')
|
|
if (resp.status === 400 && /column .* does not exist/i.test(txt)) {
|
|
resp = await tryInsert(minimal)
|
|
}
|
|
}
|
|
return resp
|
|
} catch (e) {
|
|
console.warn('upsertRaw error', e)
|
|
return null
|
|
}
|
|
}
|
|
|
|
async function findWaybill(tracking_no, order_no) {
|
|
try {
|
|
if (tracking_no) {
|
|
const r = await supaFetch(`platform_express_waybills?tracking_no=eq.${encodeURIComponent(tracking_no)}&select=id,carrier,tracking_no,order_no,order_id`)
|
|
if (!r.ok) {
|
|
const txt = await r.text().catch(() => '')
|
|
const err = new Error(`Supabase query failed (tracking_no): HTTP ${r.status} ${txt}`)
|
|
err.status = r.status
|
|
throw err
|
|
}
|
|
const data = await r.json()
|
|
if (data && data.length > 0) return data[0]
|
|
}
|
|
if (order_no) {
|
|
const r2 = await supaFetch(`platform_express_waybills?order_no=eq.${encodeURIComponent(order_no)}&select=id,carrier,tracking_no,order_no,order_id`)
|
|
if (!r2.ok) {
|
|
const txt2 = await r2.text().catch(() => '')
|
|
const err2 = new Error(`Supabase query failed (order_no): HTTP ${r2.status} ${txt2}`)
|
|
err2.status = r2.status
|
|
throw err2
|
|
}
|
|
const data2 = await r2.json()
|
|
if (data2 && data2.length > 0) return data2[0]
|
|
}
|
|
return null
|
|
} catch (e) {
|
|
console.warn('findWaybill error', e && e.message ? e.message : e)
|
|
throw e
|
|
}
|
|
}
|
|
|
|
function mapStatus(inStatus) {
|
|
let s = 'IN_TRANSIT'
|
|
if (!inStatus) return s
|
|
const v = String(inStatus).toUpperCase()
|
|
if (['GOT','SEND','TRANSIT'].indexOf(v) > -1) s = 'IN_TRANSIT'
|
|
else if (v === 'SENT') s = 'OUT_FOR_DELIVERY'
|
|
else if (v === 'PICKUP') s = 'READY_FOR_PICKUP'
|
|
else if (v === 'SIGNED' || v === 'DELIVERED') s = 'DELIVERED'
|
|
else if (v === 'FAILED' || v === 'EXCEPTION') s = 'EXCEPTION'
|
|
else if (v === 'RETURNED') s = 'RETURNED'
|
|
else {
|
|
const valid = ['ORDER_PLACED','SHIPPED','IN_TRANSIT','OUT_FOR_DELIVERY','READY_FOR_PICKUP','DELIVERED','EXCEPTION','RETURNED']
|
|
if (valid.indexOf(v) > -1) s = v
|
|
}
|
|
return s
|
|
}
|
|
|
|
async function updateWaybill(id, status_code, text) {
|
|
try {
|
|
const now = new Date().toISOString()
|
|
await supaFetch(`platform_express_waybills?id=eq.${encodeURIComponent(id)}`, {
|
|
method: 'PATCH',
|
|
headers: { 'Content-Type': 'application/json', Prefer: 'return=representation' },
|
|
body: JSON.stringify({ current_status_code: status_code, current_status_text: text, last_synced_at: now })
|
|
})
|
|
} catch (e) {
|
|
console.warn('updateWaybill error', e)
|
|
}
|
|
}
|
|
|
|
async function insertEvent(event) {
|
|
try {
|
|
// 幂等插入:基于 (waybill_id, dedupe_key) 的唯一约束忽略重复。
|
|
const resp = await supaFetch('platform_express_tracking_events?on_conflict=waybill_id,dedupe_key', {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json', Prefer: 'resolution=ignore-duplicates' },
|
|
body: JSON.stringify(event)
|
|
})
|
|
|
|
if (!resp.ok) {
|
|
const txt = await resp.text().catch(() => '')
|
|
console.warn('insertEvent failed:', `HTTP ${resp.status}`, txt)
|
|
return { ok: false, status: resp.status, body: txt }
|
|
}
|
|
return { ok: true }
|
|
} catch (e) {
|
|
console.warn('insertEvent error', e)
|
|
return { ok: false, status: 0, body: (e && e.message) ? e.message : String(e) }
|
|
}
|
|
}
|
|
|
|
async function start() {
|
|
if (!SUPA_URL || !SUPA_KEY) {
|
|
console.error('SUPA_URL and SUPA_KEY must be set in env')
|
|
process.exit(1)
|
|
}
|
|
|
|
const app = express()
|
|
// Capture raw body for signature verification.
|
|
app.use(bodyParser.json({
|
|
limit: '1mb',
|
|
verify: (req, res, buf) => {
|
|
try {
|
|
req.rawBody = buf ? buf.toString('utf8') : ''
|
|
} catch (e) {
|
|
req.rawBody = ''
|
|
}
|
|
}
|
|
}))
|
|
|
|
app.post('/webhook/express/status', async (req, res) => {
|
|
const ts = req.headers['x-timestamp'] || ''
|
|
const sig = req.headers['x-signature'] || ''
|
|
const cid = req.headers['x-client-id'] || ''
|
|
const bodyText = (req.rawBody && typeof req.rawBody === 'string' && req.rawBody.length > 0)
|
|
? req.rawBody
|
|
: JSON.stringify(req.body || {})
|
|
|
|
let sigValid = true
|
|
if (WEBHOOK_SECRET) {
|
|
const calc = computeSignature(bodyText, ts)
|
|
sigValid = calc === String(sig)
|
|
}
|
|
|
|
// Optional strict mode: reject invalid signature when secret is configured.
|
|
if (WEBHOOK_SECRET && !sigValid && process.env.WEBHOOK_REJECT_INVALID_SIGNATURE === 'true') {
|
|
return res.status(401).json({ ok: false, message: 'invalid signature' })
|
|
}
|
|
|
|
// persist raw (best-effort)
|
|
const tracking_no_raw = req.body && (req.body.mailNo || req.body.tracking_no)
|
|
const carrier_raw = req.body && (req.body.carrier || req.body.company)
|
|
const dedupe_key_raw = (() => {
|
|
try {
|
|
const base = String(bodyText || '') + '|' + String(ts || '')
|
|
return 'RAW_' + crypto.createHash('sha256').update(base).digest('hex').slice(0, 32)
|
|
} catch (e) {
|
|
return null
|
|
}
|
|
})()
|
|
|
|
await upsertRaw(req.body || {}, tracking_no_raw, carrier_raw, sigValid, {
|
|
client_id: String(cid || '') || null,
|
|
signature: String(sig || '') || null,
|
|
ts_header: String(ts || '') || null,
|
|
remote_ip: (req.ip || (req.connection && req.connection.remoteAddress) || null),
|
|
request_id: null,
|
|
headers: req.headers || null,
|
|
dedupe_key: dedupe_key_raw
|
|
})
|
|
|
|
// find waybill
|
|
const tracking_no = req.body && (req.body.mailNo || req.body.tracking_no)
|
|
const order_no = req.body && (req.body.txLogisticId || req.body.order_no)
|
|
const carrierIn = req.body && (req.body.carrier || req.body.company || null)
|
|
const event_code = req.body && (req.body.infoContent || req.body.status_code || req.body.event_code)
|
|
const event_text = req.body && (req.body.remark || req.body.event_text || '')
|
|
|
|
let waybill = null
|
|
try {
|
|
waybill = await findWaybill(tracking_no, order_no)
|
|
} catch (e) {
|
|
const status = e && e.status ? Number(e.status) : 0
|
|
if (status === 401 || status === 403) {
|
|
return res.status(502).json({ ok: false, message: 'supabase unauthorized (check SUPA_KEY/SUPA_URL)' })
|
|
}
|
|
return res.status(502).json({ ok: false, message: 'supabase query failed' })
|
|
}
|
|
if (!waybill || !waybill.id) {
|
|
// Waybill not found — respond 200 but inform caller in body.
|
|
return res.status(200).json({ ok: false, message: 'waybill not found' })
|
|
}
|
|
|
|
const waybillId = waybill.id
|
|
const carrier = carrierIn || waybill.carrier || null
|
|
|
|
const status_code = mapStatus(event_code)
|
|
|
|
// update waybill
|
|
await updateWaybill(waybillId, status_code, event_text)
|
|
|
|
// parse event_time
|
|
let event_time = new Date().toISOString()
|
|
if (req.body && req.body.acceptTime) {
|
|
try {
|
|
const t = req.body.acceptTime.indexOf('T') > -1 ? req.body.acceptTime : req.body.acceptTime.replace(' ', 'T')
|
|
const dt = new Date(t)
|
|
if (!isNaN(dt.getTime())) event_time = dt.toISOString()
|
|
} catch (e) {}
|
|
}
|
|
|
|
// insert event
|
|
const received_at = new Date().toISOString()
|
|
const dedupe_key = stableEventDedupeKey({ tracking_no, carrier, status_code, event_time, event_code, event_text })
|
|
const eventPayload = {
|
|
waybill_id: waybillId,
|
|
carrier: carrier,
|
|
tracking_no: tracking_no || waybill.tracking_no || null,
|
|
received_at,
|
|
source: 'webhook',
|
|
event_id: dedupe_key,
|
|
event_time: event_time,
|
|
event_code: event_code || 'UNKNOWN',
|
|
event_text: event_text || '',
|
|
status_code: status_code,
|
|
raw_payload: req.body || {},
|
|
dedupe_key
|
|
}
|
|
const ins = await insertEvent(eventPayload)
|
|
if (!ins || ins.ok !== true) {
|
|
return res.status(200).json({ ok: false, message: 'insert tracking_event failed (see webhook-receiver logs)' })
|
|
}
|
|
|
|
return res.json({ ok: true })
|
|
})
|
|
|
|
app.get('/health', (req, res) => res.json({ ok: true }))
|
|
|
|
app.listen(PORT, '0.0.0.0', () => console.log(`Webhook receiver listening on http://0.0.0.0:${PORT}`))
|
|
}
|
|
|
|
start().catch(e => { console.error('start failed', e); process.exit(1) })
|