修改消息后台的启动和停止文件并整理文档

This commit is contained in:
not-like-juvenile
2026-03-11 16:42:33 +08:00
parent e67016a6f4
commit 9cc6dcc2a6
19 changed files with 327 additions and 375 deletions

View File

@@ -0,0 +1,200 @@
-- =====================================================================================
-- Add push device table and express_notifications table
-- 目的:保存每个用户/商家设备的推送 CID并记录由轨迹事件触发的通知消息
-- 创建日期2026-02-24
-- 注意:尽量与仓库中其它迁移风格保持一致;该脚本对已存在对象做 IF NOT EXISTS 检查。
-- =====================================================================================
BEGIN;
-- 依赖扩展(若已存在则忽略)
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- updated_at 自动维护函数(若仓库已有同名函数则不重复创建)
DO $do$
BEGIN
IF to_regprocedure('public.update_updated_at_column()') IS NULL THEN
CREATE OR REPLACE FUNCTION public.update_updated_at_column()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $func$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$func$;
END IF;
END $do$;
-- =====================================================
-- A. push_devices设备注册/绑定表
-- =====================================================
CREATE TABLE IF NOT EXISTS public.push_devices (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- 关联主体user / merchant使用具体字段便于查询与权限控制。
user_id UUID NULL,
merchant_id UUID NULL,
-- 设备推送客户端标识CID由推送 SDK 提供
cid VARCHAR(255) NOT NULL,
-- 平台/渠道android / ios / web / huawei / xiaomi
platform VARCHAR(32) NOT NULL DEFAULT 'android',
-- 应用标识(用于区分不同打包的 AppID / 环境)
appid VARCHAR(128) NOT NULL DEFAULT 'default',
-- 设备是否激活(登录/解绑/失效时置 false
is_active BOOLEAN NOT NULL DEFAULT TRUE,
-- 设备最后上报时间(用于清理失效设备)
last_seen_at TIMESTAMP WITH TIME ZONE NULL,
-- 注册来源(可选,用于审计)
registration_source VARCHAR(64) NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- updated_at 触发器
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgname = 'trigger_push_devices_updated_at'
) THEN
CREATE TRIGGER trigger_push_devices_updated_at
BEFORE UPDATE ON public.push_devices
FOR EACH ROW EXECUTE FUNCTION public.update_updated_at_column();
END IF;
END $$;
-- 唯一约束:同一 appid 下 cid 唯一
CREATE UNIQUE INDEX IF NOT EXISTS ux_push_devices_appid_cid
ON public.push_devices(appid, cid);
-- 索引:按 user_id/merchant_id 查询是常用路径
CREATE INDEX IF NOT EXISTS idx_push_devices_user_id ON public.push_devices(user_id);
CREATE INDEX IF NOT EXISTS idx_push_devices_merchant_id ON public.push_devices(merchant_id);
CREATE INDEX IF NOT EXISTS idx_push_devices_is_active ON public.push_devices(is_active);
-- =====================================================
-- B. express_notifications物流相关推送消息中心表
-- 用途:保存由轨迹事件触发的消息记录(用于消息中心、未读计数与幂等)
-- =====================================================
CREATE TABLE IF NOT EXISTS public.express_notifications (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- 接收范围user / merchant
aud VARCHAR(16) NOT NULL,
recipient_id UUID NOT NULL,
-- 业务关联(可选):订单/运单
order_id UUID NULL,
waybill_id UUID NULL,
-- 运单与承运方冗余字段,便于快速查询
tracking_no VARCHAR(64) NULL,
carrier VARCHAR(32) NULL,
-- 推送消息唯一 id可由服务端生成并返回给客户端
message_id VARCHAR(128) NULL,
-- 事件摘要(下发给客户端的安全/脱敏文案)
event_text_safe TEXT NULL,
status_code VARCHAR(32) NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT NULL,
next_attempt_at TIMESTAMP WITH TIME ZONE NULL,
event_time TIMESTAMP WITH TIME ZONE NULL,
-- 透传/审计用 payload不包含敏感字段raw_payload 请勿透传给客户端)
payload JSONB NULL,
-- 已读时间null 表示未读)
read_at TIMESTAMP WITH TIME ZONE NULL,
-- 幂等键(确保同一事件只生成一条消息)
dedupe_key VARCHAR(256) NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- updated_at 触发器
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgname = 'trigger_express_notifications_updated_at'
) THEN
CREATE TRIGGER trigger_express_notifications_updated_at
BEFORE UPDATE ON public.express_notifications
FOR EACH ROW EXECUTE FUNCTION public.update_updated_at_column();
END IF;
END $$;
-- 索引与约束
CREATE INDEX IF NOT EXISTS idx_express_notifications_aud_recipient ON public.express_notifications(aud, recipient_id);
CREATE INDEX IF NOT EXISTS idx_express_notifications_order_id ON public.express_notifications(order_id);
CREATE INDEX IF NOT EXISTS idx_express_notifications_waybill_id ON public.express_notifications(waybill_id);
CREATE INDEX IF NOT EXISTS idx_express_notifications_created_at ON public.express_notifications(created_at);
CREATE INDEX IF NOT EXISTS idx_express_notifications_read_at ON public.express_notifications(read_at);
-- 若使用 message_id 做幂等(外部系统/队列),则建立唯一索引
-- 注意这里不能使用“部分唯一索引WHERE message_id IS NOT NULL
-- 否则 PostgREST 的 upsert `?on_conflict=message_id` 会触发 42P10
-- "there is no unique or exclusion constraint matching the ON CONFLICT specification"
-- 普通 UNIQUE INDEX 仍允许多个 NULL符合历史兼容
CREATE UNIQUE INDEX IF NOT EXISTS ux_express_notifications_message_id
ON public.express_notifications(message_id);
-- =====================================================
-- C. 兼容性与外键(若目标表存在则添加外键约束;若不存在则保留字段)
-- 说明:为避免迁移失败,我们在这里尝试添加外键,但使用 DO $$ ... EXCEPTION 来忽略不存在表的情况。
-- =====================================================
DO $$
BEGIN
-- 尝试为 push_devices.user_id 添加外键(如果 ak_users 表存在)
IF EXISTS (SELECT 1 FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE c.relname = 'ak_users' AND n.nspname = 'public') THEN
BEGIN
ALTER TABLE public.push_devices
ADD CONSTRAINT fk_push_devices_user FOREIGN KEY (user_id) REFERENCES public.ak_users(id) ON DELETE SET NULL;
EXCEPTION WHEN duplicate_object THEN
-- 已存在约束则忽略
NULL;
END;
END IF;
-- 尝试为 express_notifications.order_id 添加外键(如果 ml_orders 表存在)
IF EXISTS (SELECT 1 FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE c.relname = 'ml_orders' AND n.nspname = 'public') THEN
BEGIN
ALTER TABLE public.express_notifications
ADD CONSTRAINT fk_express_notifications_order FOREIGN KEY (order_id) REFERENCES public.ml_orders(id) ON DELETE SET NULL;
EXCEPTION WHEN duplicate_object THEN
NULL;
END;
END IF;
-- 尝试为 express_notifications.waybill_id 添加外键(如果 platform_express_waybills 表存在)
IF EXISTS (SELECT 1 FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE c.relname = 'platform_express_waybills' AND n.nspname = 'public') THEN
BEGIN
ALTER TABLE public.express_notifications
ADD CONSTRAINT fk_express_notifications_waybill FOREIGN KEY (waybill_id) REFERENCES public.platform_express_waybills(id) ON DELETE SET NULL;
EXCEPTION WHEN duplicate_object THEN
NULL;
END;
END IF;
END $$;
COMMIT;
-- =====================================================================================
-- 使用说明(简要)
-- 1) `push_devices` 存储设备 CID 与所属主体user_id / merchant_id提供活跃检测与解绑能力。
-- 2) `express_notifications` 存储消息中心记录;生成通知时写入该表並异步向 `push_devices` 中匹配设备发送推送。
-- 3) 推荐在应用层实现:当用户登出或解绑设备时将 `push_devices.is_active` 置 false。
-- 4) 可按需要添加清理任务:清理 long-inactive 的 `push_devices`(例如 180 天未上报)。
-- =====================================================================================

View File

@@ -0,0 +1,26 @@
-- =====================================================================================
-- Add send_status to express_notifications
--
-- 目的区分“物流状态status_code”与“投递处理状态send_status”。
-- - status_code业务/物流状态SHIPPED/OUT_FOR_DELIVERY/...
-- - send_status投递状态null=待发送, processing, retrying, success, failed, no-targets
--
-- 创建日期2026-03-09
-- =====================================================================================
BEGIN;
ALTER TABLE public.express_notifications
ADD COLUMN IF NOT EXISTS send_status VARCHAR(32) NULL;
CREATE INDEX IF NOT EXISTS idx_express_notifications_send_status
ON public.express_notifications(send_status);
-- 兼容旧实现:历史上 push-server consumer 使用 status_code 存投递状态。
-- 迁移后 consumer 改读写 send_status为避免把旧的 success/failed 等记录当成 pending 再次推送,做一次安全回填。
UPDATE public.express_notifications
SET send_status = status_code
WHERE send_status IS NULL
AND status_code IN ('processing', 'retrying', 'success', 'failed', 'no-targets');
COMMIT;

View File

@@ -0,0 +1,102 @@
-- =====================================================================================
-- notify_queue + trigger: platform_express_tracking_events -> notify_queue
--
-- 目的:把“轨迹事件入库”和“消息生成/推送”解耦。
-- - Webhook/轮询/手工写入 tracking_events 后,由触发器把关键事件入队到 notify_queue。
-- - 常驻 worker 消费 notify_queue生成 express_notifications消息中心/推送任务)。
-- - push-server consumer 轮询 express_notifications 并调用 CLOUD_FUNC_URL 进行实际下发。
--
-- 创建日期2026-03-09
-- =====================================================================================
BEGIN;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- 队列表:仅做“轻量入队”,避免触发器做外部 IO
CREATE TABLE IF NOT EXISTS public.notify_queue (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
waybill_id UUID NOT NULL,
carrier VARCHAR(32) NULL,
tracking_no VARCHAR(64) NULL,
event_id VARCHAR(128) NULL,
status_code VARCHAR(32) NOT NULL,
event_time TIMESTAMP WITH TIME ZONE NOT NULL,
event_text TEXT NULL,
source VARCHAR(16) NULL,
-- 与 platform_express_tracking_events 对齐的幂等键
dedupe_key VARCHAR(256) NOT NULL,
raw_payload JSONB NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE NULL,
process_status VARCHAR(32) NULL,
last_error TEXT NULL,
CONSTRAINT uk_notify_queue_dedupe UNIQUE (waybill_id, dedupe_key)
);
CREATE INDEX IF NOT EXISTS idx_notify_queue_processed_at ON public.notify_queue(processed_at);
CREATE INDEX IF NOT EXISTS idx_notify_queue_created_at ON public.notify_queue(created_at);
CREATE INDEX IF NOT EXISTS idx_notify_queue_status_code ON public.notify_queue(status_code);
-- 触发器函数:入队关键状态事件
CREATE OR REPLACE FUNCTION public.notify_new_tracking_event()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $func$
DECLARE
should_enqueue BOOLEAN := FALSE;
BEGIN
-- 推送策略MVP只对关键状态入队
IF NEW.status_code IN ('SHIPPED','OUT_FOR_DELIVERY','READY_FOR_PICKUP','DELIVERED','EXCEPTION','RETURNED') THEN
should_enqueue := TRUE;
END IF;
IF should_enqueue THEN
INSERT INTO public.notify_queue(
waybill_id,
carrier,
tracking_no,
event_id,
status_code,
event_time,
event_text,
source,
dedupe_key,
raw_payload
) VALUES (
NEW.waybill_id,
NEW.carrier,
NEW.tracking_no,
NEW.event_id,
NEW.status_code,
NEW.event_time,
NEW.event_text,
NEW.source,
NEW.dedupe_key,
NEW.raw_payload
)
ON CONFLICT (waybill_id, dedupe_key) DO NOTHING;
END IF;
RETURN NEW;
END;
$func$;
-- 触发器tracking_events 写入后入队
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger WHERE tgname = 'trigger_notify_new_tracking_event'
) THEN
CREATE TRIGGER trigger_notify_new_tracking_event
AFTER INSERT ON public.platform_express_tracking_events
FOR EACH ROW EXECUTE FUNCTION public.notify_new_tracking_event();
END IF;
END $$;
COMMIT;

View File

@@ -0,0 +1,30 @@
-- =====================================================================================
-- Fix express_notifications upsert conflict target (message_id)
-- 目的:修复 notify-worker / push-server 在 upsert 时遇到的 42P10
-- "there is no unique or exclusion constraint matching the ON CONFLICT specification"
-- 原因历史迁移可能创建了部分唯一索引WHERE message_id IS NOT NULL
-- PostgREST 的 upsert `?on_conflict=message_id` 无法匹配该索引。
-- 方案:改为普通 UNIQUE INDEX (message_id)。Postgres UNIQUE 允许多条 NULL兼容旧数据。
-- 创建日期2026-03-10
-- =====================================================================================
BEGIN;
-- 1) 预检查:是否存在重复的非空 message_id若存在创建唯一索引会失败
-- 如有返回结果,请先人工去重后再继续执行后续语句。
-- 示例去重策略:保留最新 created_at其它行将 message_id 置为 NULL 或删除重复行。
--
-- SELECT message_id, COUNT(*)
-- FROM public.express_notifications
-- WHERE message_id IS NOT NULL
-- GROUP BY message_id
-- HAVING COUNT(*) > 1;
-- 2) 删除历史“部分唯一索引”(如果存在)
DROP INDEX IF EXISTS public.ux_express_notifications_message_id;
-- 3) 创建普通唯一索引,让 `on_conflict=message_id` 正常工作
CREATE UNIQUE INDEX IF NOT EXISTS ux_express_notifications_message_id
ON public.express_notifications(message_id);
COMMIT;

View File

@@ -0,0 +1,174 @@
-- =====================================================================================
-- 第三方快递轨迹(平台侧)表结构升级 (PostgreSQL + Supabase)
-- 用途:
-- - 引入第三方承运方运单与轨迹事件的统一入库模型
-- - 支撑商家端/用户端/平台后台的同源轨迹展示
-- 说明:
-- - 仅创建 platform 侧三张表platform_express_waybills / platform_express_tracking_events / platform_express_event_raw
-- - 不包含 mock_* 测试表
-- - 不涉及自营骑手表ml_delivery_*
-- =====================================================================================
BEGIN;
-- 依赖扩展(项目主库已使用 uuid-ossp这里重复声明是安全的
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "btree_gin";
-- updated_at 维护函数:若主库已存在则不重复创建
DO $do$
BEGIN
IF to_regprocedure('public.update_updated_at_column()') IS NULL THEN
CREATE OR REPLACE FUNCTION public.update_updated_at_column()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $func$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$func$;
END IF;
END $do$;
-- =====================================================================
-- A. 平台侧platform统一轨迹模型入库与查询
-- =====================================================================
-- 运单主表:按 (carrier, tracking_no) 唯一
CREATE TABLE IF NOT EXISTS public.platform_express_waybills (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- 关联订单推荐关联主键order_no 作为展示/兜底)
order_id UUID NULL REFERENCES public.ml_orders(id) ON DELETE SET NULL,
order_no VARCHAR(64) NULL,
carrier VARCHAR(32) NOT NULL, -- YUNDA/YTO/ZTO/STO/KDN...
tracking_no VARCHAR(64) NOT NULL,
source VARCHAR(16) NOT NULL DEFAULT 'mock', -- mock/carrier/aggregator
current_status_code VARCHAR(32) NOT NULL DEFAULT 'SHIPPED',
current_status_text TEXT NULL,
eta TIMESTAMP WITH TIME ZONE NULL,
last_synced_at TIMESTAMP WITH TIME ZONE NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uk_platform_express_waybill UNIQUE (carrier, tracking_no)
);
-- updated_at 触发器
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgname = 'trigger_platform_express_waybills_updated_at'
) THEN
CREATE TRIGGER trigger_platform_express_waybills_updated_at
BEFORE UPDATE ON public.platform_express_waybills
FOR EACH ROW EXECUTE FUNCTION public.update_updated_at_column();
END IF;
END $$;
-- 索引(按订单查运单是主查询路径)
CREATE INDEX IF NOT EXISTS idx_platform_express_waybills_order_id
ON public.platform_express_waybills(order_id);
CREATE INDEX IF NOT EXISTS idx_platform_express_waybills_order_no
ON public.platform_express_waybills(order_no);
CREATE INDEX IF NOT EXISTS idx_platform_express_waybills_tracking_no
ON public.platform_express_waybills(tracking_no);
CREATE INDEX IF NOT EXISTS idx_platform_express_waybills_status
ON public.platform_express_waybills(current_status_code);
-- 轨迹事件表:前端时间线/告警/统计的主数据来源
CREATE TABLE IF NOT EXISTS public.platform_express_tracking_events (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
waybill_id UUID NOT NULL REFERENCES public.platform_express_waybills(id) ON DELETE CASCADE,
carrier VARCHAR(32) NOT NULL,
tracking_no VARCHAR(64) NOT NULL,
-- 第三方事件唯一标识(可能缺失)
event_id VARCHAR(128) NULL,
event_time TIMESTAMP WITH TIME ZONE NOT NULL,
event_code VARCHAR(64) NOT NULL,
event_text TEXT NOT NULL,
-- 平台统一状态
status_code VARCHAR(32) NOT NULL,
node_name VARCHAR(128) NULL,
location TEXT NULL,
description TEXT NULL,
evidence_urls JSONB NOT NULL DEFAULT '[]'::jsonb,
-- 原始回文(用于审计/排障)
raw_payload JSONB NULL,
-- 接收侧信息
received_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
source VARCHAR(16) NOT NULL DEFAULT 'webhook', -- webhook/poll/manual
-- 幂等去重键:优先 event_id缺失时用 tracking_no+event_code+event_time(+可选字段) 构造
dedupe_key VARCHAR(256) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uk_platform_express_event_dedupe UNIQUE (waybill_id, dedupe_key)
);
CREATE INDEX IF NOT EXISTS idx_platform_express_events_waybill_time
ON public.platform_express_tracking_events(waybill_id, event_time);
CREATE INDEX IF NOT EXISTS idx_platform_express_events_tracking_time
ON public.platform_express_tracking_events(tracking_no, event_time);
CREATE INDEX IF NOT EXISTS idx_platform_express_events_status
ON public.platform_express_tracking_events(status_code);
CREATE INDEX IF NOT EXISTS idx_platform_express_events_received_at
ON public.platform_express_tracking_events(received_at);
CREATE INDEX IF NOT EXISTS gin_platform_express_events_raw_payload
ON public.platform_express_tracking_events USING gin (raw_payload);
-- 原始接收表:记录每一次 webhook/轮询原文、验签结果与解析错误
CREATE TABLE IF NOT EXISTS public.platform_express_event_raw (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
received_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
source VARCHAR(16) NOT NULL DEFAULT 'webhook',
-- 请求侧标识
client_id VARCHAR(64) NULL,
carrier VARCHAR(32) NULL,
tracking_no VARCHAR(64) NULL,
-- 安全审计
signature_valid BOOLEAN NULL,
signature TEXT NULL,
ts_header TEXT NULL,
request_id VARCHAR(64) NULL,
remote_ip INET NULL,
headers JSONB NULL,
body JSONB NULL,
parse_error TEXT NULL,
-- 去重辅助(可选):用于识别完全重复的原始请求
dedupe_key VARCHAR(256) NULL
);
CREATE INDEX IF NOT EXISTS idx_platform_express_raw_received_at
ON public.platform_express_event_raw(received_at);
CREATE INDEX IF NOT EXISTS idx_platform_express_raw_tracking_no
ON public.platform_express_event_raw(tracking_no);
CREATE INDEX IF NOT EXISTS idx_platform_express_raw_signature_valid
ON public.platform_express_event_raw(signature_valid);
CREATE INDEX IF NOT EXISTS gin_platform_express_raw_body
ON public.platform_express_event_raw USING gin (body);
COMMIT;