完善服务模块缺少付款页的bug

This commit is contained in:
2026-06-02 11:35:31 +08:00
parent c3324d459a
commit 881262940c
35 changed files with 29069 additions and 557 deletions

View File

@@ -0,0 +1,384 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
解析医疗-consumer / 医疗-delivery 的 Schema、RLS、RPC生成数据库对接文档 v2
整合来源:
- complete_mall_database.sql核心商城表
- docs/sql/10_schema/(增量模块表)
- pages/user/test/USER_AUTH_SCHEMA.sqlak_users 定义)
- docs/sql/20_rls/RLS 策略)
- docs/sql/30_rpc/RPC 函数)
"""
import re
import os
from collections import defaultdict, OrderedDict
OUTPUT_PATH = r"D:\骅锋\医疗-consumer\docs\数据库对接文档.md"
SOURCES = [
("医疗-consumer/mall_sql/schemas/complete_mall_database_fixed.sql", "core"),
("医疗-consumer/docs/sql/all_schema_merged.sql", "ext"),
("医疗-consumer/pages/user/test/USER_AUTH_SCHEMA.sql", "auth"),
]
RLS_PATH = "医疗-consumer/docs/sql/all_rls_merged.sql"
RPC_PATH = "医疗-consumer/docs/sql/all_rpc_merged.sql"
# 定义各端涉及的表
CONSUMER_TABLES = {
"ak_users", "ml_user_profiles", "ml_user_addresses", "ml_shopping_cart",
"ml_products", "ml_product_skus", "ml_product_specs", "ml_product_reviews",
"ml_categories", "medical_mall_categories", "ml_product_labels",
"ml_product_member_prices", "ml_product_protections", "ml_product_templates",
"ak_shipping_templates", "ml_products_ext", "ml_brands", "ml_shops",
"ml_orders", "ml_order_items",
"ml_delivery_tasks", "ml_delivery_drivers", "ml_delivery_staff", "ml_delivery_stations",
"ml_system_configs", "ml_regions",
"ml_cms_content", "ml_cms_categories", "ml_cms_banners", "ml_articles", "ml_article_categories",
"ml_user_bill", "ml_user_recharge", "ml_extract", "ml_invoices",
"ml_user_coupons", "ml_coupon_templates", "ml_seckill", "ml_group_buy",
"ml_bargain", "ml_live", "ml_lottery", "ml_signin_logs", "ml_checkin_configs",
"ml_member_cards", "ml_recharge_rules",
"ak_user_groups", "ak_user_labels", "ak_user_levels",
"ak_distribution_agents", "ak_distribution_divisions", "ak_distribution_config",
"ak_distribution_level", "ak_promoter_relations", "ak_commission_logs",
"ak_distribution_agent_applications", "ak_distribution_division_applications",
"ak_diy_pages", "ak_roles", "ak_permissions", "ak_admin_roles", "ak_role_permissions",
"ml_kefu_accounts", "ml_kefu_sessions", "ml_kefu_messages",
"homecare_patients", "homecare_nurses", "homecare_services", "homecare_orders",
"homecare_schedules", "homecare_evaluations",
"ml_user_favorites", "ml_browse_history", "ml_search_history",
}
DELIVERY_TABLES = {
"ak_users",
"ml_delivery_tasks", "ml_delivery_drivers", "ml_delivery_staff", "ml_delivery_stations",
"ml_orders", "ml_order_items",
"homecare_patients", "homecare_nurses", "homecare_services", "homecare_orders",
"homecare_schedules", "homecare_evaluations",
"ml_system_configs",
}
def parse_sql_file(path, source_type):
"""提取 CREATE TABLE 语句"""
if not os.path.exists(path):
print(f"[WARN] 文件不存在: {path}")
return {}
# 尝试多种编码
content = None
for enc in ["utf-8", "gbk", "gb2312", "latin-1"]:
try:
with open(path, "r", encoding=enc) as f:
content = f.read()
break
except UnicodeDecodeError:
continue
if content is None:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
tables = {}
pattern = re.compile(
r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:public\.)?(\w+)\s*\((.*?)\);",
re.DOTALL | re.IGNORECASE,
)
for match in pattern.finditer(content):
table_name = match.group(1)
body = match.group(2)
fields = []
constraints = []
lines = [line.strip() for line in body.split("\n") if line.strip()]
for line in lines:
line = line.rstrip(",")
if line.startswith("--") or line.startswith("/*") or line.startswith("*"):
continue
if re.match(r"^(CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE|CHECK)", line, re.I):
constraints.append(line)
continue
m = re.match(r"^(\w+)\s+([\w\[\]]+(?:\s*\([^)]+\))?)\s*(.*)$", line)
if m:
fields.append({
"name": m.group(1),
"type": m.group(2).strip(),
"rest": m.group(3).strip(),
})
tables[table_name] = {
"name": table_name,
"fields": fields,
"constraints": constraints,
"source": source_type,
}
return tables
def parse_rls(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
table_rls = defaultdict(lambda: {"enabled": False, "policies": []})
for m in re.finditer(
r"ALTER\s+TABLE\s+(?:public\.)?(\w+)\s+ENABLE\s+ROW\s+LEVEL\s+SECURITY",
content, re.I,
):
table_rls[m.group(1)]["enabled"] = True
for m in re.finditer(
r"CREATE\s+POLICY\s+(\w+)\s+ON\s+(?:public\.)?(\w+)\s+.*?;",
content, re.DOTALL | re.IGNORECASE,
):
table_rls[m.group(2)]["policies"].append({
"name": m.group(1),
"sql": m.group(0),
})
return table_rls
def parse_rpc(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
funcs = []
pattern = re.compile(
r"CREATE\s+OR\s+REPLACE\s+FUNCTION\s+(?:public\.)?(\w+)\s*\((.*?)\)\s*RETURNS\s+(.*?)(?:LANGUAGE\s+\w+|AS\s*\$\$|SECURITY\s+DEFINER)",
re.DOTALL | re.IGNORECASE,
)
for m in pattern.finditer(content):
funcs.append({
"name": m.group(1),
"args": re.sub(r'\s+', ' ', m.group(2).strip()),
"returns": re.sub(r'\s+', ' ', m.group(3).strip()),
})
return funcs
def get_table_belong(table_name):
belongs = []
if table_name in CONSUMER_TABLES:
belongs.append("consumer")
if table_name in DELIVERY_TABLES:
belongs.append("delivery")
if not belongs:
return "admin / 系统"
return " / ".join(belongs)
def module_of(table_name):
if table_name == "ak_users":
return "用户与权限"
if table_name in {"ml_user_profiles", "ml_user_addresses", "ml_shopping_cart", "ml_user_favorites", "ml_browse_history", "ml_search_history"}:
return "用户与权限"
if table_name in {"ak_roles", "ak_permissions", "ak_admin_roles", "ak_role_permissions"}:
return "用户与权限"
if table_name in {"ak_user_groups", "ak_user_labels", "ak_user_levels"}:
return "用户与权限"
if table_name.startswith("ml_product") or table_name.startswith("medical_mall") or table_name.startswith("ak_shipping") or table_name.startswith("ml_products"):
return "商品与类目"
if table_name in {"ml_categories", "ml_brands", "ml_shops"}:
return "商品与类目"
if table_name.startswith("ml_order"):
return "订单"
if table_name.startswith("ml_delivery") or table_name == "ml_delivery_drivers":
return "配送与物流"
if table_name.startswith("homecare"):
return "居家护理"
if table_name.startswith("ml_system"):
return "系统配置"
if table_name.startswith("ml_cms") or table_name.startswith("ml_articles") or table_name.startswith("ml_article") or table_name.startswith("ak_diy"):
return "内容与装修"
if table_name.startswith("ml_kefu"):
return "客服"
if table_name in {"ml_user_bill", "ml_user_recharge", "ml_extract", "ml_invoices"}:
return "财务"
if table_name.startswith("ml_coupon") or table_name.startswith("ml_seckill") or table_name.startswith("ml_group") or table_name.startswith("ml_bargain") or table_name.startswith("ml_live") or table_name.startswith("ml_lottery") or table_name.startswith("ml_signin") or table_name.startswith("ml_checkin") or table_name.startswith("ml_member") or table_name.startswith("ml_recharge"):
return "营销与促销"
if table_name.startswith("ak_distribution") or table_name.startswith("ak_promoter") or table_name.startswith("ak_commission"):
return "分销与推广"
return "其他"
def main():
os.chdir(r"D:\骅锋")
all_tables = OrderedDict()
# 按优先级解析auth -> core -> ext
for path, stype in SOURCES:
parsed = parse_sql_file(path, stype)
for name, info in parsed.items():
if name not in all_tables:
all_tables[name] = info
rls_data = parse_rls(RLS_PATH)
rpc_funcs = parse_rpc(RPC_PATH)
module_map = defaultdict(list)
for t in all_tables.values():
module_map[module_of(t["name"])].append(t)
# 排序模块
module_order = [
"用户与权限", "商品与类目", "订单", "配送与物流",
"居家护理", "营销与促销", "分销与推广", "财务",
"内容与装修", "客服", "系统配置", "其他"
]
lines = []
lines.append("# 医疗-consumer & 医疗-delivery 数据库对接文档")
lines.append("")
lines.append("> **生成日期**2026-06-01")
lines.append("> **适用范围**:医疗-consumer消费者端+ 医疗-delivery配送端")
lines.append("> **数据库**PostgreSQL (Supabase)")
lines.append("> **规范**:所有用户侧表已启用 RLS行级安全全局/管理后台查询请走 RPCSECURITY DEFINER")
lines.append("")
lines.append("## 目录")
lines.append("")
lines.append("- [通用规范说明](#通用规范说明)")
for mod in module_order:
if mod in module_map and module_map[mod]:
anchor = mod.replace(" ", "-").replace("", "")
lines.append(f"- [{mod}](#{anchor})")
lines.append("- [完整对接 SQL 汇总](#完整对接-sql-汇总)")
lines.append("- [关键 RPC 清单](#关键-rpc-清单)")
lines.append("")
lines.append("## 通用规范说明")
lines.append("")
lines.append("### 软删除标准")
lines.append("本项目所有业务表默认采用**软删除**,标准字段如下:")
lines.append("- `deleted_at` (timestamptz)删除时间NULL 表示未删除")
lines.append("- `deleted_by` (uuid):删除操作人")
lines.append("- `created_at` (timestamptz):创建时间,默认 `now()`")
lines.append("- `updated_at` (timestamptz):更新时间,默认 `now()`")
lines.append("")
lines.append("> **注意**RLS 策略默认过滤 `deleted_at IS NULL` 的行,查询时无需手动加条件,但管理后台统计需视情况处理。")
lines.append("")
lines.append("### 角色字段权威口径")
lines.append("- 统一用户主表:`public.ak_users`")
lines.append("- 角色唯一权威字段:`ak_users.role`,取值:`customer`(消费者)、`merchant`(商家)、`delivery`(配送员)、`admin`(管理员)、`analytics`(数据分析师)")
lines.append("- 商城用户扩展档案:`public.ml_user_profiles`,与 `ak_users` 1:1 关系(`user_id` UNIQUE")
lines.append("")
lines.append("### 状态机速查")
lines.append("| 状态域 | 字段名 | 关键取值 | 说明 |")
lines.append("|--------|--------|----------|------|")
lines.append("| 订单流程 | `order_status` | 1=待付款, 2=待发货, 3=待收货, 4=已完成, 5=已取消, 6=退款中, 7=已退款 | 主状态 |")
lines.append("| 支付状态 | `payment_status` | 1=未支付, 2=已支付, 3=部分退款, 4=全额退款 | 支付线 |")
lines.append("| 物流状态 | `shipping_status` | 1=未发货, 2=已发货, 3=运输中, 4=已送达 | 物流线 |")
lines.append("| 配送任务 | `status` | 1=待接单, 2=已接单, 3=取货中, 4=配送中, 5=已送达, 6=配送失败 | 配送端 |")
lines.append("")
for mod in module_order:
if mod not in module_map or not module_map[mod]:
continue
anchor = mod.replace(" ", "-").replace("", "")
lines.append(f"## {mod}")
lines.append("")
for t in module_map[mod]:
tname = t["name"]
belong = get_table_belong(tname)
lines.append(f"### {tname}")
lines.append(f"- **所属端**{belong}")
lines.append("")
lines.append("| 字段名 | 数据类型 | 约束/默认值 | 说明 |")
lines.append("|--------|----------|-------------|------|")
for f in t["fields"]:
rest = f["rest"].replace("|", "\|")
lines.append(f"| {f['name']} | {f['type']} | {rest} | |")
lines.append("")
if t["constraints"]:
lines.append("**约束**")
for c in t["constraints"]:
lines.append(f"- `{c}`")
lines.append("")
rls = rls_data.get(tname)
if rls and rls["policies"]:
lines.append("**RLS 策略**")
for p in rls["policies"]:
lines.append(f"- `{p['name']}`")
lines.append("")
elif rls and rls["enabled"] and not rls["policies"]:
lines.append("**RLS**:已启用,暂无显式策略(默认拒绝直接访问)")
lines.append("")
# 查找关联 RPC函数名包含表名关键词
keywords = [tname.replace("ml_", "").replace("ak_", "").replace("_", "")]
if tname.startswith("ml_"):
keywords.append(tname[3:].replace("_", ""))
related_rpc = []
for fn in rpc_funcs:
fn_flat = fn["name"].replace("_", "").lower()
for kw in keywords:
if kw.lower() in fn_flat:
related_rpc.append(fn)
break
if related_rpc:
lines.append("**相关 RPC**(最多展示 8 个):")
for fn in related_rpc[:8]:
args = fn["args"][:100] + "..." if len(fn["args"]) > 100 else fn["args"]
lines.append(f"- `{fn['name']}({args}) -> {fn['returns']}`")
lines.append("")
lines.append("---")
lines.append("")
# 汇总 SQL
lines.append("## 完整对接 SQL 汇总")
lines.append("")
lines.append("> 以下 SQL 按执行顺序排列Schema → RLS → RPC。请在新项目/新环境中按顺序执行。")
lines.append("")
lines.append("### 1) 核心商城 Schemacomplete_mall_database.sql")
lines.append("```sql")
with open(SOURCES[0][0], "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
lines.append("### 2) 扩展模块 Schemadocs/sql/10_schema/")
lines.append("```sql")
with open(SOURCES[1][0], "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
lines.append("### 3) RLS行级安全")
lines.append("```sql")
with open(RLS_PATH, "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
lines.append("### 4) RPC关键函数完整见 30_rpc 目录)")
lines.append("```sql")
with open(RPC_PATH, "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
# RPC 清单
lines.append("## 关键 RPC 清单")
lines.append("")
lines.append("| 函数名 | 参数 | 返回 | 用途 |")
lines.append("|--------|------|------|------|")
for fn in rpc_funcs:
args = fn["args"].replace("\n", " ")[:80]
ret = fn["returns"].replace("\n", " ")[:80]
lines.append(f"| {fn['name']} | {args} | {ret} | |")
lines.append("")
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"文档已生成:{OUTPUT_PATH}")
print(f"共汇总表:{len(all_tables)}RPC 函数:{len(rpc_funcs)}")
if __name__ == "__main__":
main()