Files
medical-mall/docs/sql/generate_db_doc_v2.py

385 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
解析医疗-consumer / 医疗-delivery 的 Schema、RLS、RPC生成数据库对接文档 v2
整合来源:
- complete_mall_database.sql核心商城表
- docs/sql/10_schema/(增量模块表)
- pages/user/test/USER_AUTH_SCHEMA.sqlak_users 定义)
- docs/sql/20_rls/RLS 策略)
- docs/sql/30_rpc/RPC 函数)
"""
import re
import os
from collections import defaultdict, OrderedDict
OUTPUT_PATH = r"D:\骅锋\医疗-consumer\docs\数据库对接文档.md"
SOURCES = [
("医疗-consumer/mall_sql/schemas/complete_mall_database_fixed.sql", "core"),
("医疗-consumer/docs/sql/all_schema_merged.sql", "ext"),
("医疗-consumer/pages/user/test/USER_AUTH_SCHEMA.sql", "auth"),
]
RLS_PATH = "医疗-consumer/docs/sql/all_rls_merged.sql"
RPC_PATH = "医疗-consumer/docs/sql/all_rpc_merged.sql"
# 定义各端涉及的表
CONSUMER_TABLES = {
"ak_users", "ml_user_profiles", "ml_user_addresses", "ml_shopping_cart",
"ml_products", "ml_product_skus", "ml_product_specs", "ml_product_reviews",
"ml_categories", "medical_mall_categories", "ml_product_labels",
"ml_product_member_prices", "ml_product_protections", "ml_product_templates",
"ak_shipping_templates", "ml_products_ext", "ml_brands", "ml_shops",
"ml_orders", "ml_order_items",
"ml_delivery_tasks", "ml_delivery_drivers", "ml_delivery_staff", "ml_delivery_stations",
"ml_system_configs", "ml_regions",
"ml_cms_content", "ml_cms_categories", "ml_cms_banners", "ml_articles", "ml_article_categories",
"ml_user_bill", "ml_user_recharge", "ml_extract", "ml_invoices",
"ml_user_coupons", "ml_coupon_templates", "ml_seckill", "ml_group_buy",
"ml_bargain", "ml_live", "ml_lottery", "ml_signin_logs", "ml_checkin_configs",
"ml_member_cards", "ml_recharge_rules",
"ak_user_groups", "ak_user_labels", "ak_user_levels",
"ak_distribution_agents", "ak_distribution_divisions", "ak_distribution_config",
"ak_distribution_level", "ak_promoter_relations", "ak_commission_logs",
"ak_distribution_agent_applications", "ak_distribution_division_applications",
"ak_diy_pages", "ak_roles", "ak_permissions", "ak_admin_roles", "ak_role_permissions",
"ml_kefu_accounts", "ml_kefu_sessions", "ml_kefu_messages",
"homecare_patients", "homecare_nurses", "homecare_services", "homecare_orders",
"homecare_schedules", "homecare_evaluations",
"ml_user_favorites", "ml_browse_history", "ml_search_history",
}
DELIVERY_TABLES = {
"ak_users",
"ml_delivery_tasks", "ml_delivery_drivers", "ml_delivery_staff", "ml_delivery_stations",
"ml_orders", "ml_order_items",
"homecare_patients", "homecare_nurses", "homecare_services", "homecare_orders",
"homecare_schedules", "homecare_evaluations",
"ml_system_configs",
}
def parse_sql_file(path, source_type):
"""提取 CREATE TABLE 语句"""
if not os.path.exists(path):
print(f"[WARN] 文件不存在: {path}")
return {}
# 尝试多种编码
content = None
for enc in ["utf-8", "gbk", "gb2312", "latin-1"]:
try:
with open(path, "r", encoding=enc) as f:
content = f.read()
break
except UnicodeDecodeError:
continue
if content is None:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
tables = {}
pattern = re.compile(
r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:public\.)?(\w+)\s*\((.*?)\);",
re.DOTALL | re.IGNORECASE,
)
for match in pattern.finditer(content):
table_name = match.group(1)
body = match.group(2)
fields = []
constraints = []
lines = [line.strip() for line in body.split("\n") if line.strip()]
for line in lines:
line = line.rstrip(",")
if line.startswith("--") or line.startswith("/*") or line.startswith("*"):
continue
if re.match(r"^(CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY|UNIQUE|CHECK)", line, re.I):
constraints.append(line)
continue
m = re.match(r"^(\w+)\s+([\w\[\]]+(?:\s*\([^)]+\))?)\s*(.*)$", line)
if m:
fields.append({
"name": m.group(1),
"type": m.group(2).strip(),
"rest": m.group(3).strip(),
})
tables[table_name] = {
"name": table_name,
"fields": fields,
"constraints": constraints,
"source": source_type,
}
return tables
def parse_rls(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
table_rls = defaultdict(lambda: {"enabled": False, "policies": []})
for m in re.finditer(
r"ALTER\s+TABLE\s+(?:public\.)?(\w+)\s+ENABLE\s+ROW\s+LEVEL\s+SECURITY",
content, re.I,
):
table_rls[m.group(1)]["enabled"] = True
for m in re.finditer(
r"CREATE\s+POLICY\s+(\w+)\s+ON\s+(?:public\.)?(\w+)\s+.*?;",
content, re.DOTALL | re.IGNORECASE,
):
table_rls[m.group(2)]["policies"].append({
"name": m.group(1),
"sql": m.group(0),
})
return table_rls
def parse_rpc(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
funcs = []
pattern = re.compile(
r"CREATE\s+OR\s+REPLACE\s+FUNCTION\s+(?:public\.)?(\w+)\s*\((.*?)\)\s*RETURNS\s+(.*?)(?:LANGUAGE\s+\w+|AS\s*\$\$|SECURITY\s+DEFINER)",
re.DOTALL | re.IGNORECASE,
)
for m in pattern.finditer(content):
funcs.append({
"name": m.group(1),
"args": re.sub(r'\s+', ' ', m.group(2).strip()),
"returns": re.sub(r'\s+', ' ', m.group(3).strip()),
})
return funcs
def get_table_belong(table_name):
belongs = []
if table_name in CONSUMER_TABLES:
belongs.append("consumer")
if table_name in DELIVERY_TABLES:
belongs.append("delivery")
if not belongs:
return "admin / 系统"
return " / ".join(belongs)
def module_of(table_name):
if table_name == "ak_users":
return "用户与权限"
if table_name in {"ml_user_profiles", "ml_user_addresses", "ml_shopping_cart", "ml_user_favorites", "ml_browse_history", "ml_search_history"}:
return "用户与权限"
if table_name in {"ak_roles", "ak_permissions", "ak_admin_roles", "ak_role_permissions"}:
return "用户与权限"
if table_name in {"ak_user_groups", "ak_user_labels", "ak_user_levels"}:
return "用户与权限"
if table_name.startswith("ml_product") or table_name.startswith("medical_mall") or table_name.startswith("ak_shipping") or table_name.startswith("ml_products"):
return "商品与类目"
if table_name in {"ml_categories", "ml_brands", "ml_shops"}:
return "商品与类目"
if table_name.startswith("ml_order"):
return "订单"
if table_name.startswith("ml_delivery") or table_name == "ml_delivery_drivers":
return "配送与物流"
if table_name.startswith("homecare"):
return "居家护理"
if table_name.startswith("ml_system"):
return "系统配置"
if table_name.startswith("ml_cms") or table_name.startswith("ml_articles") or table_name.startswith("ml_article") or table_name.startswith("ak_diy"):
return "内容与装修"
if table_name.startswith("ml_kefu"):
return "客服"
if table_name in {"ml_user_bill", "ml_user_recharge", "ml_extract", "ml_invoices"}:
return "财务"
if table_name.startswith("ml_coupon") or table_name.startswith("ml_seckill") or table_name.startswith("ml_group") or table_name.startswith("ml_bargain") or table_name.startswith("ml_live") or table_name.startswith("ml_lottery") or table_name.startswith("ml_signin") or table_name.startswith("ml_checkin") or table_name.startswith("ml_member") or table_name.startswith("ml_recharge"):
return "营销与促销"
if table_name.startswith("ak_distribution") or table_name.startswith("ak_promoter") or table_name.startswith("ak_commission"):
return "分销与推广"
return "其他"
def main():
os.chdir(r"D:\骅锋")
all_tables = OrderedDict()
# 按优先级解析auth -> core -> ext
for path, stype in SOURCES:
parsed = parse_sql_file(path, stype)
for name, info in parsed.items():
if name not in all_tables:
all_tables[name] = info
rls_data = parse_rls(RLS_PATH)
rpc_funcs = parse_rpc(RPC_PATH)
module_map = defaultdict(list)
for t in all_tables.values():
module_map[module_of(t["name"])].append(t)
# 排序模块
module_order = [
"用户与权限", "商品与类目", "订单", "配送与物流",
"居家护理", "营销与促销", "分销与推广", "财务",
"内容与装修", "客服", "系统配置", "其他"
]
lines = []
lines.append("# 医疗-consumer & 医疗-delivery 数据库对接文档")
lines.append("")
lines.append("> **生成日期**2026-06-01")
lines.append("> **适用范围**:医疗-consumer消费者端+ 医疗-delivery配送端")
lines.append("> **数据库**PostgreSQL (Supabase)")
lines.append("> **规范**:所有用户侧表已启用 RLS行级安全全局/管理后台查询请走 RPCSECURITY DEFINER")
lines.append("")
lines.append("## 目录")
lines.append("")
lines.append("- [通用规范说明](#通用规范说明)")
for mod in module_order:
if mod in module_map and module_map[mod]:
anchor = mod.replace(" ", "-").replace("", "")
lines.append(f"- [{mod}](#{anchor})")
lines.append("- [完整对接 SQL 汇总](#完整对接-sql-汇总)")
lines.append("- [关键 RPC 清单](#关键-rpc-清单)")
lines.append("")
lines.append("## 通用规范说明")
lines.append("")
lines.append("### 软删除标准")
lines.append("本项目所有业务表默认采用**软删除**,标准字段如下:")
lines.append("- `deleted_at` (timestamptz)删除时间NULL 表示未删除")
lines.append("- `deleted_by` (uuid):删除操作人")
lines.append("- `created_at` (timestamptz):创建时间,默认 `now()`")
lines.append("- `updated_at` (timestamptz):更新时间,默认 `now()`")
lines.append("")
lines.append("> **注意**RLS 策略默认过滤 `deleted_at IS NULL` 的行,查询时无需手动加条件,但管理后台统计需视情况处理。")
lines.append("")
lines.append("### 角色字段权威口径")
lines.append("- 统一用户主表:`public.ak_users`")
lines.append("- 角色唯一权威字段:`ak_users.role`,取值:`customer`(消费者)、`merchant`(商家)、`delivery`(配送员)、`admin`(管理员)、`analytics`(数据分析师)")
lines.append("- 商城用户扩展档案:`public.ml_user_profiles`,与 `ak_users` 1:1 关系(`user_id` UNIQUE")
lines.append("")
lines.append("### 状态机速查")
lines.append("| 状态域 | 字段名 | 关键取值 | 说明 |")
lines.append("|--------|--------|----------|------|")
lines.append("| 订单流程 | `order_status` | 1=待付款, 2=待发货, 3=待收货, 4=已完成, 5=已取消, 6=退款中, 7=已退款 | 主状态 |")
lines.append("| 支付状态 | `payment_status` | 1=未支付, 2=已支付, 3=部分退款, 4=全额退款 | 支付线 |")
lines.append("| 物流状态 | `shipping_status` | 1=未发货, 2=已发货, 3=运输中, 4=已送达 | 物流线 |")
lines.append("| 配送任务 | `status` | 1=待接单, 2=已接单, 3=取货中, 4=配送中, 5=已送达, 6=配送失败 | 配送端 |")
lines.append("")
for mod in module_order:
if mod not in module_map or not module_map[mod]:
continue
anchor = mod.replace(" ", "-").replace("", "")
lines.append(f"## {mod}")
lines.append("")
for t in module_map[mod]:
tname = t["name"]
belong = get_table_belong(tname)
lines.append(f"### {tname}")
lines.append(f"- **所属端**{belong}")
lines.append("")
lines.append("| 字段名 | 数据类型 | 约束/默认值 | 说明 |")
lines.append("|--------|----------|-------------|------|")
for f in t["fields"]:
rest = f["rest"].replace("|", "\|")
lines.append(f"| {f['name']} | {f['type']} | {rest} | |")
lines.append("")
if t["constraints"]:
lines.append("**约束**")
for c in t["constraints"]:
lines.append(f"- `{c}`")
lines.append("")
rls = rls_data.get(tname)
if rls and rls["policies"]:
lines.append("**RLS 策略**")
for p in rls["policies"]:
lines.append(f"- `{p['name']}`")
lines.append("")
elif rls and rls["enabled"] and not rls["policies"]:
lines.append("**RLS**:已启用,暂无显式策略(默认拒绝直接访问)")
lines.append("")
# 查找关联 RPC函数名包含表名关键词
keywords = [tname.replace("ml_", "").replace("ak_", "").replace("_", "")]
if tname.startswith("ml_"):
keywords.append(tname[3:].replace("_", ""))
related_rpc = []
for fn in rpc_funcs:
fn_flat = fn["name"].replace("_", "").lower()
for kw in keywords:
if kw.lower() in fn_flat:
related_rpc.append(fn)
break
if related_rpc:
lines.append("**相关 RPC**(最多展示 8 个):")
for fn in related_rpc[:8]:
args = fn["args"][:100] + "..." if len(fn["args"]) > 100 else fn["args"]
lines.append(f"- `{fn['name']}({args}) -> {fn['returns']}`")
lines.append("")
lines.append("---")
lines.append("")
# 汇总 SQL
lines.append("## 完整对接 SQL 汇总")
lines.append("")
lines.append("> 以下 SQL 按执行顺序排列Schema → RLS → RPC。请在新项目/新环境中按顺序执行。")
lines.append("")
lines.append("### 1) 核心商城 Schemacomplete_mall_database.sql")
lines.append("```sql")
with open(SOURCES[0][0], "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
lines.append("### 2) 扩展模块 Schemadocs/sql/10_schema/")
lines.append("```sql")
with open(SOURCES[1][0], "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
lines.append("### 3) RLS行级安全")
lines.append("```sql")
with open(RLS_PATH, "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
lines.append("### 4) RPC关键函数完整见 30_rpc 目录)")
lines.append("```sql")
with open(RPC_PATH, "r", encoding="utf-8", errors="ignore") as f:
lines.append(f.read())
lines.append("```")
lines.append("")
# RPC 清单
lines.append("## 关键 RPC 清单")
lines.append("")
lines.append("| 函数名 | 参数 | 返回 | 用途 |")
lines.append("|--------|------|------|------|")
for fn in rpc_funcs:
args = fn["args"].replace("\n", " ")[:80]
ret = fn["returns"].replace("\n", " ")[:80]
lines.append(f"| {fn['name']} | {args} | {ret} | |")
lines.append("")
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"文档已生成:{OUTPUT_PATH}")
print(f"共汇总表:{len(all_tables)}RPC 函数:{len(rpc_funcs)}")
if __name__ == "__main__":
main()