diff --git a/.env.example b/.env.example index e3bc577..8e65d8b 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,8 @@ DATABASE_URL=postgresql://contexthub:contexthub@localhost:5432/contexthub +DB_BACKEND=postgres # "postgres" or "opengauss" API_KEY=changeme EMBEDDING_MODEL=text-embedding-3-small + +# Example openGauss config: +# DATABASE_URL=postgresql://contexthub:ContextHub@123@localhost:15432/contexthub +# DB_BACKEND=opengauss diff --git a/README.md b/README.md index ce18143..137a8d1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,40 @@ +## 为ContextHub新增openGauss后端 + +### 一、openGauss server配置 + +* 参考`docs/setup/opengauss-setup-guide-zh.md`新增server后端 +* CREATE DATABASE时需要设定DBCOMPATIBILITY = 'PG'模式, 避免空字符串被interpret成NULL。 + +### 二、Extension替换 + +* 原ContextHub项目依赖pgvector+pgcrypto两个插件,这两个库在opengauss都不支持 +* 解决方案:opengauss 7.0.0原生支持DataVec,可以替换pgvector;opengauss不支持pgcrypto,替代方案uuid-ossp在官方镜像中也缺失无法安装,最终手动自定义实现`gen_random_uuid`函数,规避pgcrypto extension + +### 三、Python Driver兼容 + +* 原ContextHub使用asyncpg库与db后端交互,但是asyncpg不支持opengauss的vector数据格式 + * 报错信息为`message: unhandled standard data type 'vector' (OID 8305)`, 复现脚本为`opengauss/vector_asyncpg.py` + * gaussdb有一个自己维护的`async_gaussdb`库,但是一样不支持vector格式, 验证脚本为`opengauss/vector_async_gaussdb.py` +* 解决方案:新增db兼容层,PG后端仍然使用asyncpg,OpenGauss后端切换到psycopg3连接 + * psycopg3 的位置参数语法与asyncpg完全不同,一个是%s一个是$n,用正则匹配转换 + * 兼容层处理语法转换,对外封装暴露统一的fetch/fetchrow/fetchall/execute接口 + * 相关实现在`src/contexthub/db/repository.py` + +### 四、SQL Dialect转写 + +* 原ContextHub使用postgres方言的SQL,多种语言特性与opengauss不兼容 +* 例如,openGauss不支持PG的INSERT ON CONFLICT (需要重写为ON DUPLICATE KEY UPDATE), 且不能与RETURNING语句, ROW POLICY同时使用 +* 全项目约20+条需要重写, 详细情况可见于报告`opengauss-compatibility-report.md` + +### 整体完成度 + +* 步骤一、二、三进度100%,目前demo `opengauss/demo_e2e_opengauss.py` 可以成功执行前3个steps +* 步骤四进度50%, 正在处理demo的第四个step的SQL转写,具体可见`ContextHub/src/contexthub/services/skill_service.py`的FIXME + + +--- +
diff --git a/alembic/env.py b/alembic/env.py index 01c87e1..bb913d4 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -7,6 +7,12 @@ from sqlalchemy import pool from sqlalchemy.ext.asyncio import async_engine_from_config +import os +if os.environ['DB_BACKEND'] == 'opengauss': + # Force SQLAlchemy to treat openGauss as PostgreSQL 9.2 to bypass the version string error + from sqlalchemy.dialects.postgresql.base import PGDialect + PGDialect._get_server_version_info = lambda *args, **kwargs: (9, 2, 0) + PROJECT_ROOT = Path(__file__).resolve().parents[1] SRC_DIR = PROJECT_ROOT / "src" if str(SRC_DIR) not in sys.path: diff --git a/alembic/versions/001_initial_schema.py b/alembic/versions/001_initial_schema.py index f73e50b..d71ff39 100644 --- a/alembic/versions/001_initial_schema.py +++ b/alembic/versions/001_initial_schema.py @@ -14,15 +14,35 @@ depends_on: Union[str, Sequence[str], None] = None +def _is_opengauss() -> bool: + import os + return os.environ.get("DB_BACKEND", "postgres").lower() == "opengauss" + + def upgrade() -> None: + opengauss = _is_opengauss() + uuid_default = "uuid_generate_opengauss()" if opengauss else "gen_random_uuid()" + # Extensions - op.execute("CREATE EXTENSION IF NOT EXISTS vector") - op.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto") + if opengauss: + # openGauss 7.0+ has DataVec built-in; avoid pgcrypto and uuid-ossp with + # customized random uuid generation function + op.execute(""" + CREATE OR REPLACE FUNCTION uuid_generate_opengauss() + RETURNS uuid + LANGUAGE sql + AS $$ + SELECT md5(random()::text || clock_timestamp()::text)::uuid; + $$; + """) + else: + op.execute("CREATE EXTENSION IF NOT EXISTS vector") + op.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto") # --- contexts --- - op.execute(""" + op.execute(f""" CREATE TABLE contexts ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + id UUID PRIMARY KEY DEFAULT {uuid_default}, uri TEXT NOT NULL, context_type TEXT NOT NULL CHECK (context_type IN ('table_schema', 'skill', 'memory', 'resource')), scope TEXT NOT NULL CHECK (scope IN ('datalake', 'team', 'agent', 'user')), @@ -79,9 +99,9 @@ def upgrade() -> None: op.execute("CREATE INDEX idx_deps_dependent ON dependencies (dependent_id)") # --- change_events (no RLS) --- - op.execute(""" + op.execute(f""" CREATE TABLE change_events ( - event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_id UUID PRIMARY KEY DEFAULT {uuid_default}, timestamp TIMESTAMPTZ DEFAULT NOW(), context_id UUID NOT NULL REFERENCES contexts(id), account_id TEXT NOT NULL, @@ -126,9 +146,9 @@ def upgrade() -> None: """) # --- teams --- - op.execute(""" + op.execute(f""" CREATE TABLE teams ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + id UUID PRIMARY KEY DEFAULT {uuid_default}, path TEXT NOT NULL, parent_id UUID REFERENCES teams(id), display_name TEXT, @@ -247,7 +267,7 @@ def upgrade() -> None: op.execute("CREATE INDEX idx_qt_context ON query_templates (context_id)") # --- Seed data --- - op.execute(""" + op.execute(f""" INSERT INTO teams (id, path, parent_id, display_name, account_id) VALUES ('00000000-0000-0000-0000-000000000001', '', NULL, '全组织', 'acme'), ('00000000-0000-0000-0000-000000000002', 'engineering', '00000000-0000-0000-0000-000000000001', '工程部', 'acme'), diff --git a/docker-compose.opengauss.yml b/docker-compose.opengauss.yml new file mode 100644 index 0000000..22db61f --- /dev/null +++ b/docker-compose.opengauss.yml @@ -0,0 +1,13 @@ +services: + opengauss: + image: opengauss/opengauss-server:latest + privileged: true + ports: + - "15432:5432" + environment: + GS_PASSWORD: "Huawei@123" + volumes: + - ogdata:/var/lib/opengauss/data + +volumes: + ogdata: diff --git a/docs/setup/opengauss-setup-guide-zh.md b/docs/setup/opengauss-setup-guide-zh.md new file mode 100644 index 0000000..388682b --- /dev/null +++ b/docs/setup/opengauss-setup-guide-zh.md @@ -0,0 +1,97 @@ +# openGauss 部署指南 + +本文档介绍如何使用 openGauss 7.0+ 作为 ContextHub 的存储后端。 + +## 前置条件 + +- Docker 已安装 +- openGauss 7.0+ (内置 DataVec 向量能力) + +## 1. 启动 openGauss 容器 + +```bash +# 使用项目提供的 compose 文件 +docker compose -f docker-compose.opengauss.yml up -d + +# 或者手动启动 +docker pull opengauss/opengauss-server:latest +docker run --name opengauss --privileged=true -d \ + -e GS_PASSWORD=Huawei@123 \ + -p 15432:5432 \ + opengauss/opengauss-server:latest +``` + +## 2. 初始化数据库 + +```bash +docker exec -it opengauss bash +su omm +gsql -d postgres -p 5432 +``` + +在 gsql 中执行: + +```sql +CREATE USER contexthub WITH PASSWORD 'ContextHub@123' SYSADMIN; +CREATE DATABASE contexthub OWNER contexthub DBCOMPATIBILITY = 'PG'; +``` + +> **注意:** +> - openGauss 7.0+ 内置 DataVec,无需创建 vector 扩展 +> - 使用自定义`uuid_generate_v4()` (`alembic/versions/001_initial_schema.py`),替代 PostgreSQL 的 `pgcrypto` + `gen_random_uuid()` +> - openGauss 密码有强度约束,须包含大小写字母、数字和特殊字符 +> - 使用 `SYSADMIN` 而非 `SUPERUSER` 关键字 +> - 使用DBCOMPATIBILITY = 'PG'模式,保证空字符串/NULL值处理等与postgres统一。 + +## 3. 配置 ContextHub + +编辑 `.env` 文件: + +```env +DATABASE_URL=postgresql://contexthub:ContextHub%40123@:15432/contexthub +DB_BACKEND=opengauss +``` + +其中 `` 替换为 openGauss 服务器的实际 IP 地址。 + +## 4. 运行数据库迁移 + +```bash +DB_BACKEND=opengauss alembic upgrade head +``` + +> 迁移脚本会根据 `DB_BACKEND` 环境变量自动选择: +> - `opengauss`: 使用 `uuid_generate_v4()` 作为 UUID 默认值 +> - `postgres` (默认): 创建 `vector` + `pgcrypto` 扩展,使用 `gen_random_uuid()` + +## 5. 启动服务 + +```bash +DB_BACKEND=opengauss uvicorn contexthub.main:app --host 0.0.0.0 --port 8000 +``` + +## 6. 安装 Python 依赖说明 + +使用 openGauss 后端时,不需要安装 `pgvector` Python 包: + +```bash +# openGauss 后端 +pip install . + +# PostgreSQL + pgvector 后端 +pip install ".[postgres]" +``` + +## 与 PostgreSQL 后端的差异 + +| 特性 | PostgreSQL 16 | openGauss 7.0+ | +|------|--------------|----------------| +| 向量扩展 | pgvector (需安装) | DataVec (内置) | +| UUID 函数 | `gen_random_uuid()` (pgcrypto) | `uuid_generate_v4()` (自定义) | +| 向量类型 `vector(N)` | 兼容 | 兼容 | +| 向量距离 `<=>` | 兼容 | 兼容 | +| HNSW 索引 | 兼容 | 兼容 | +| RLS | 兼容 | 兼容 | +| `pg_notify`/`LISTEN` | 兼容 | 不兼容 | +| asyncpg 驱动 | 兼容 | 部分兼容 | +| 连接 URL 格式 | `postgresql://` | `postgresql://` | diff --git a/opengauss-compatibility-report.md b/opengauss-compatibility-report.md new file mode 100644 index 0000000..b0b1c1e --- /dev/null +++ b/opengauss-compatibility-report.md @@ -0,0 +1,599 @@ +# ContextHub: PostgreSQL 16 → openGauss 7.0.0 SQL 兼容性分析报告 + +> 分析时间: 2026-04-13 +> +> 参考文档: +> - https://github.com/opengauss-mirror +> - https://docs.opengauss.org/zh/docs/7.0.0-RC1 + +--- + +## 一、项目概述 + +ContextHub 是一个基于 **FastAPI + asyncpg + PostgreSQL 16** 的上下文管理 REST API。项目使用: + +- **asyncpg** 连接池执行原生参数化 SQL(无 ORM) +- **Alembic** 管理 DDL 迁移(通过 `op.execute()` 执行原生 SQL) +- **pgvector** 扩展做向量相似度检索 +- **pgcrypto** 扩展提供 `gen_random_uuid()` +- **Row Level Security (RLS)** 实现多租户隔离 +- **LISTEN/NOTIFY** (`pg_notify`) 实现事件传播唤醒 + +--- + +## 二、SQL 语句全量清单 + +以下按文件分类罗列仓库中所有实际执行的 SQL 语句。 + +### 2.1 Alembic 迁移(DDL 层) + +#### `alembic/versions/001_initial_schema.py` + +| # | SQL 语句摘要 | 类型 | +|---|---|---| +| 1 | `CREATE EXTENSION IF NOT EXISTS vector` | DDL | +| 2 | `CREATE EXTENSION IF NOT EXISTS pgcrypto` | DDL | +| 3 | `CREATE TABLE contexts (... UUID PRIMARY KEY DEFAULT gen_random_uuid(), ... vector(1536), TEXT[], TIMESTAMPTZ, JSONB ...)` | DDL | +| 4 | `ALTER TABLE contexts ENABLE ROW LEVEL SECURITY` | DDL | +| 5 | `ALTER TABLE contexts FORCE ROW LEVEL SECURITY` | DDL | +| 6 | `CREATE POLICY tenant_isolation ON contexts USING (account_id = current_setting('app.account_id'))` | DDL | +| 7 | `CREATE INDEX idx_contexts_scope ON contexts (scope, context_type)` | DDL | +| 8 | `CREATE INDEX idx_contexts_owner ON contexts (account_id, owner_space)` | DDL | +| 9 | `CREATE INDEX idx_contexts_status ON contexts (status) WHERE status != 'deleted'` | DDL(部分索引) | +| 10 | `CREATE INDEX idx_contexts_l0_embedding ON contexts USING hnsw (l0_embedding vector_cosine_ops) WITH (m=16, ef_construction=64)` | DDL(HNSW 向量索引) | +| 11 | `CREATE TABLE dependencies (... SERIAL PRIMARY KEY ...)` | DDL | +| 12 | `CREATE TABLE change_events (... UUID PRIMARY KEY DEFAULT gen_random_uuid(), JSONB, TIMESTAMPTZ ...)` | DDL | +| 13 | `CREATE INDEX ... WHERE delivery_status IN ('pending', 'retry')` | DDL(部分索引) | +| 14 | `CREATE INDEX ... WHERE delivery_status = 'processing'` | DDL(部分索引) | +| 15 | `CREATE OR REPLACE FUNCTION notify_change_event() RETURNS trigger AS $$ ... pg_notify ... $$ LANGUAGE plpgsql` | DDL(触发器函数) | +| 16 | `CREATE TRIGGER trg_change_events_notify AFTER INSERT ON change_events FOR EACH ROW EXECUTE FUNCTION notify_change_event()` | DDL(触发器) | +| 17 | `CREATE TABLE teams (... UUID PRIMARY KEY DEFAULT gen_random_uuid() ...)` | DDL | +| 18 | `CREATE POLICY tenant_isolation ON teams USING (...)` | DDL | +| 19 | `CREATE TABLE team_memberships (... PRIMARY KEY (agent_id, team_id) ...)` | DDL | +| 20 | `CREATE TABLE skill_versions (... PRIMARY KEY (skill_id, version) ...)` | DDL | +| 21 | `CREATE TABLE skill_subscriptions (... SERIAL PRIMARY KEY ...)` | DDL | +| 22 | `CREATE POLICY tenant_isolation ON skill_subscriptions USING (...)` | DDL | +| 23 | `CREATE TABLE table_metadata (... UUID PRIMARY KEY REFERENCES ..., JSONB ...)` | DDL | +| 24 | `CREATE TABLE lineage (... PRIMARY KEY (upstream_id, downstream_id) ...)` | DDL | +| 25 | `CREATE TABLE table_relationships (... JSONB NOT NULL, FLOAT ...)` | DDL | +| 26 | `CREATE TABLE query_templates (... SERIAL PRIMARY KEY ...)` | DDL | +| 27 | `INSERT INTO teams VALUES (UUID literal, ...)` — 种子数据 | DML | +| 28 | `INSERT INTO team_memberships VALUES (...)` — 种子数据 | DML | +| 29 | `DROP TABLE IF EXISTS ... CASCADE` — 降级 | DDL | +| 30 | `DROP FUNCTION IF EXISTS notify_change_event() CASCADE` — 降级 | DDL | + +#### `alembic/versions/002_force_row_level_security.py` + +| # | SQL 语句摘要 | 类型 | +|---|---|---| +| 31 | `ALTER TABLE contexts/teams/skill_subscriptions FORCE ROW LEVEL SECURITY` | DDL | +| 32 | `ALTER TABLE ... NO FORCE ROW LEVEL SECURITY` — 降级 | DDL | + +### 2.2 数据库连接层 + +#### `src/contexthub/db/repository.py` + +| # | SQL 语句 | 用途 | +|---|---|---| +| 33 | `SELECT set_config('app.account_id', $1, true)` | 设置事务级 GUC,实现 RLS 租户绑定 | + +### 2.3 应用服务层(DML) + +#### `src/contexthub/services/acl_service.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 34 | `SELECT scope, owner_space FROM contexts WHERE uri = $1 AND status != 'deleted'` | ACL 读/写检查 | +| 35 | `WITH RECURSIVE visible_teams AS (... UNION ALL ...) SELECT DISTINCT path FROM visible_teams` | 递归 CTE 获取可见团队 | +| 36 | `SELECT 1 FROM team_memberships tm JOIN teams t ON ... WHERE tm.agent_id = $1 AND t.path = $2 AND tm.access = 'read_write'` | 团队写权限检查 | + +#### `src/contexthub/store/context_store.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 37 | `SELECT {col} FROM contexts WHERE uri = $1 AND status != 'deleted'` | 读取上下文内容 | +| 38 | `UPDATE contexts SET last_accessed_at = NOW() WHERE uri = $1` | 更新访问时间 | +| 39 | `UPDATE contexts SET {col} = $1, ... version = version + 1 ... WHERE uri = $2 AND version = $3 ... RETURNING id, version` | 乐观锁写入 | +| 40 | `SELECT 1 FROM contexts WHERE uri = $1 AND status != 'deleted'` | 存在性检查 | +| 41 | `INSERT INTO change_events (context_id, account_id, change_type, actor) VALUES ($1, current_setting('app.account_id'), 'modified', $2)` | 变更事件记录 | +| 42 | `SELECT uri, scope, owner_space, status FROM contexts WHERE uri LIKE $1 AND status != 'deleted'` | 目录列表 | +| 43 | `SELECT id, uri, context_type, ... FROM contexts WHERE uri = $1 AND status != 'deleted'` | stat 查询 | + +#### `src/contexthub/services/context_service.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 44 | `INSERT INTO contexts (...) VALUES ($1,...,$9) RETURNING *` | 创建上下文 | +| 45 | `INSERT INTO change_events ... VALUES ($1, current_setting('app.account_id'), 'created', $2)` | 变更事件 | +| 46 | `UPDATE contexts SET {动态set子句} WHERE uri = $n AND version = $m AND status != 'deleted' RETURNING *` | 更新上下文 | +| 47 | `UPDATE contexts SET status = 'deleted', deleted_at = NOW(), ... WHERE uri = $1 AND version = $2 ... RETURNING id` | 软删除 | +| 48 | `SELECT id FROM contexts WHERE uri = $1 AND status != 'deleted'` | 依赖查询 | +| 49 | `SELECT d.dep_type, d.pinned_version, c1.uri, c2.uri FROM dependencies d JOIN contexts c1 ... JOIN contexts c2 ... WHERE d.dependent_id = $1 OR d.dependency_id = $1` | 依赖图查询 | + +#### `src/contexthub/services/memory_service.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 50 | `INSERT INTO contexts (...) VALUES ($1, 'memory', 'agent', ..., current_setting('app.account_id'), ...) RETURNING *` | 添加记忆 | +| 51 | `SELECT uri, l0_content, ... FROM contexts WHERE context_type = 'memory' AND scope IN ('agent','team') ... ORDER BY updated_at DESC` | 列出记忆 | +| 52 | `SELECT * FROM contexts WHERE uri = $1 AND status != 'deleted'` | 读取源记忆 | +| 53 | `INSERT INTO contexts (...) VALUES ($1, 'memory', 'team', ...) RETURNING *` | 促进记忆 | +| 54 | `INSERT INTO dependencies (dependent_id, dependency_id, dep_type) VALUES ($1, $2, 'derived_from')` | 依赖关系 | +| 55 | `INSERT INTO change_events (..., metadata) VALUES ($1, ..., $3)` | 变更事件(含元数据) | + +#### `src/contexthub/services/skill_service.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 56 | `SELECT id, context_type FROM contexts WHERE uri = $1 AND status != 'deleted'` | 技能检查 | +| 57 | `SELECT id FROM contexts WHERE id = $1 FOR UPDATE` | 行锁防并发发布 | +| 58 | `SELECT COALESCE(MAX(version), 0) FROM skill_versions WHERE skill_id = $1` | 获取最新版本号 | +| 59 | `INSERT INTO skill_versions (...) VALUES ($1, $2, $3, $4, $5, 'published', $6, NOW())` | 插入技能版本 | +| 60 | `UPDATE contexts SET l0_content=$1, l1_content=$2, l2_content=$3, version=$4 ... WHERE id=$5` | 更新技能头指针 | +| 61 | `INSERT INTO change_events (..., new_version, metadata) VALUES (...)` | 版本发布事件 | +| 62 | `SELECT ... FROM skill_versions WHERE skill_id=$1 AND status IN ('published','deprecated') ORDER BY version DESC` | 获取版本列表 | +| 63 | `INSERT INTO skill_subscriptions ... ON CONFLICT (agent_id, skill_id) DO UPDATE SET pinned_version = EXCLUDED.pinned_version RETURNING *` | **UPSERT 订阅** | +| 64 | `SELECT pinned_version FROM skill_subscriptions WHERE agent_id=$1 AND skill_id=$2` | 查询订阅 | +| 65 | `SELECT MAX(version) FROM skill_versions WHERE skill_id=$1 AND status='published'` | 最新发布版本 | +| 66 | `SELECT content, version, status FROM skill_versions WHERE ... AND status IN ('published','deprecated')` | 读取版本 | +| 67 | `SELECT l2_content, version FROM contexts WHERE id = $1` | 读取技能内容 | +| 68 | `SELECT 1 FROM skill_versions WHERE skill_id=$1 AND status='published' LIMIT 1` | 发布版本存在性 | + +#### `src/contexthub/services/retrieval_service.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 69 | `SELECT id, l2_content FROM contexts WHERE id IN ($1,$2,...)` | L2 按需加载 | +| 70 | `UPDATE contexts SET active_count = active_count + 1, last_accessed_at = NOW() WHERE id = ANY($1)` | 更新活跃计数 | + +#### `src/contexthub/retrieval/vector_strategy.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 71 | `SELECT ... 1 - (l0_embedding <=> $1::vector) AS cosine_similarity FROM contexts WHERE ... ORDER BY l0_embedding <=> $1::vector LIMIT $n` | **pgvector 余弦相似度检索** | + +#### `src/contexthub/retrieval/keyword_strategy.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 72 | `SELECT ... (CASE WHEN LOWER(COALESCE(...)) LIKE $n THEN 1 ELSE 0 END + ...)::float / {max} AS cosine_similarity FROM contexts WHERE ... ORDER BY ... DESC LIMIT $n` | 关键词回退检索 | + +#### `src/contexthub/services/indexer_service.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 73 | `UPDATE contexts SET l0_embedding = NULL WHERE id = $1` | 清除向量嵌入 | +| 74 | `SELECT id, l0_content FROM contexts WHERE l0_embedding IS NULL AND l0_content IS NOT NULL AND status IN ('active','stale') LIMIT $1` | 回填选择 | +| 75 | `UPDATE contexts SET l0_embedding = $1::vector WHERE id = $2` | 写入向量嵌入 | + +#### `src/contexthub/services/catalog_sync_service.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 76 | `SELECT created_at, updated_at FROM contexts WHERE id = $1` | 判断新建/更新 | +| 77 | `INSERT INTO contexts (...) VALUES (...) ON CONFLICT (account_id, uri) DO UPDATE SET ... RETURNING id, (xmax = 0) AS is_new` | **UPSERT + xmax 判断** | +| 78 | `SELECT ddl FROM table_metadata WHERE context_id = $1` | DDL 变更检测 | +| 79 | `INSERT INTO table_metadata (...) VALUES ($1,...,$6::jsonb,$7::jsonb, NOW()) ON CONFLICT (context_id) DO UPDATE SET ...` | **UPSERT 表元数据** | +| 80 | `INSERT INTO change_events (...) VALUES (...)` | 变更事件 | +| 81 | `UPDATE contexts SET version = version + 1 WHERE id = $1` | 版本递增 | +| 82 | `SELECT id FROM contexts WHERE uri = $1 AND account_id = $2` | 定位上下文 | +| 83 | `UPDATE contexts SET status = 'archived', archived_at = NOW() WHERE id = $1` | 归档 | +| 84 | `SELECT dependent_id FROM dependencies WHERE dependency_id = $1 AND dep_type = 'table_schema'` | 查询依赖者 | +| 85 | `UPDATE contexts SET status = 'stale', stale_at = NOW(), updated_at = NOW() WHERE id = $1 AND status NOT IN (...)` | 标记过期 | +| 86 | `INSERT INTO table_relationships (...) VALUES ($1,$2,$3,$4::jsonb) ON CONFLICT ... DO UPDATE SET ...` | **UPSERT 表关系** | +| 87 | `INSERT INTO lineage (...) VALUES ($1,$2,'fk',$3) ON CONFLICT ... DO NOTHING` | **UPSERT 血缘(DO NOTHING)** | +| 88 | `SELECT c.uri, ... FROM contexts c JOIN table_metadata tm ON ... WHERE ... ORDER BY tm.table_name` | 列出同步表 | +| 89 | `SELECT c.id, c.uri, ... FROM contexts c JOIN table_metadata tm ON ... WHERE ...` | 表详情 | +| 90 | `SELECT tr.join_type, tr.join_columns, ... CASE WHEN ... FROM table_relationships tr LEFT JOIN contexts ...` | 关系查询 | +| 91 | `SELECT sql_template, description, hit_count FROM query_templates WHERE context_id=$1 ORDER BY hit_count DESC LIMIT 5` | 查询模板 | +| 92 | `WITH RECURSIVE upstream_lineage AS (... ARRAY[...]::uuid[] ... NOT l.upstream_id = ANY(ul.path) ...) SELECT DISTINCT ON (c.uri) ... ORDER BY c.uri, ul.depth ASC` | **递归 CTE 血缘上游** | +| 93 | `WITH RECURSIVE downstream_lineage AS (...) SELECT DISTINCT ON (c.uri) ...` | **递归 CTE 血缘下游** | + +#### `src/contexthub/services/propagation_engine.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 94 | `UPDATE change_events SET delivery_status='retry', ... WHERE delivery_status='processing' AND claimed_at < NOW() - $1::interval` | 回收过期事件 | +| 95 | `UPDATE change_events SET delivery_status='processing', ... WHERE event_id IN (SELECT ... WHERE context_id = $1::uuid AND delivery_status IN ('pending','retry') AND next_retry_at <= NOW() ORDER BY timestamp ASC LIMIT $2) RETURNING *` | 领取事件(按 context) | +| 96 | `UPDATE change_events SET ... WHERE event_id IN (SELECT ... WHERE delivery_status IN ('pending','retry') ... LIMIT $1) RETURNING *` | 领取事件(全局) | +| 97 | `SELECT dependent_id, dep_type, pinned_version, created_at FROM dependencies WHERE dependency_id=$1 AND created_at <= $2 ORDER BY ...` | 查询依赖者 | +| 98 | `SELECT agent_id, pinned_version, created_at FROM skill_subscriptions WHERE skill_id=$1 AND created_at <= $2 ORDER BY ...` | 查询订阅者 | +| 99 | `UPDATE contexts SET status='stale', stale_at=NOW(), updated_at=NOW() WHERE id=$1 AND status NOT IN (...)` | 标记过期 | +| 100 | `INSERT INTO change_events (...) VALUES ($1,$2,'marked_stale','propagation_engine',$3)` | 变更事件 | +| 101 | `SELECT id, context_type, l0_content, l1_content, l2_content FROM contexts WHERE id = $1` | 加载源上下文 | +| 102 | `SELECT id, context_type, l2_content FROM contexts WHERE id = $1` | 加载依赖者 | +| 103 | `UPDATE contexts SET l0_content=$1, l1_content=$2, updated_at=NOW() WHERE id=$3` | 自动更新派生投影 | +| 104 | `UPDATE contexts SET l0_embedding = NULL WHERE id = $1` | 清除嵌入 | +| 105 | `UPDATE change_events SET delivery_status='processed', processed_at=NOW(), ... WHERE event_id=$1` | 完成事件 | +| 106 | `UPDATE change_events SET delivery_status='retry', ... next_retry_at = NOW() + make_interval(secs => LEAST(300, 5 * attempt_count)), ... WHERE event_id=$1` | **重试事件(make_interval)** | + +#### `src/contexthub/api/routers/datalake.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 107 | `SELECT c.id FROM contexts c JOIN table_metadata tm ON ... WHERE c.uri=$1 AND tm.catalog=$2 ...` | SQL 上下文定位 | +| 108 | `SELECT c.id, c.uri, ... (SELECT jsonb_agg(jsonb_build_object(...)) FROM table_relationships tr ...) AS joins, (SELECT jsonb_agg(jsonb_build_object(...)) FROM (...) qt) AS top_templates FROM contexts c JOIN table_metadata tm ON ... WHERE c.id = ANY($1::uuid[]) AND tm.catalog=$2 ORDER BY array_position($1::uuid[], c.id)` | **SQL 上下文组装(jsonb_agg, jsonb_build_object, array_position)** | + +#### `src/contexthub/api/routers/tools.py` & `contexts.py` + +| # | SQL 语句摘要 | 用途 | +|---|---|---| +| 109 | `SELECT id, context_type FROM contexts WHERE uri=$1 AND status!='deleted'` | 路由分发 | +| 110 | `UPDATE contexts SET last_accessed_at = NOW() WHERE uri = $1` | 访问时间更新 | +| 111 | `SELECT context_type FROM contexts WHERE uri=$1 AND status != 'deleted'` | 类型检查 | + +### 2.4 脚本与测试 + +| # | 文件 | SQL 语句 | 用途 | +|---|---|---|---| +| 112 | `scripts/demo_e2e.py` | `SET app.account_id = 'acme'` | 会话 GUC | +| 113 | `scripts/demo_e2e.py` | `INSERT INTO team_memberships ... ON CONFLICT DO NOTHING` | **UPSERT** | +| 114 | `tests/conftest.py` | `TRUNCATE contexts, ... CASCADE` | 测试清理 | + +--- + +## 三、openGauss 7.0.0 兼容性逐项分析 + +### 风险等级说明 + +- 🔴 **阻断 (BLOCKER)** — 语法不支持,必须改写 SQL 否则无法执行 +- 🟡 **需适配 (ADAPTATION)** — 功能支持但语法/扩展名不同,需修改代码 +- 🟢 **兼容 (COMPATIBLE)** — 直接可用,无需修改 + +--- + +### 3.1 🔴 阻断级问题 + +#### 3.1.1 `INSERT ... ON CONFLICT` 不支持 + +**影响范围**: #63, #77, #79, #86, #87, #113(共 6 处) + +**问题描述**: +openGauss 7.0.0 **不支持** PostgreSQL 的 `INSERT ... ON CONFLICT (columns) DO UPDATE SET ... / DO NOTHING` 语法。这是 PostgreSQL 9.5+ 引入的 upsert 语法,但 openGauss 至今未实现。 + +**涉及文件**: +- `src/contexthub/services/skill_service.py` — `INSERT INTO skill_subscriptions ... ON CONFLICT (agent_id, skill_id) DO UPDATE ... RETURNING *` +- `src/contexthub/services/catalog_sync_service.py` — 4 处 `ON CONFLICT ... DO UPDATE / DO NOTHING` +- `scripts/demo_e2e.py` — `ON CONFLICT DO NOTHING` + +**openGauss 替代方案**: + +openGauss 提供两种替代: + +**方案 A: `ON DUPLICATE KEY UPDATE`(推荐简单场景)** +```sql +-- PostgreSQL 原始 +INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id) +VALUES ($1, $2, $3, current_setting('app.account_id')) +ON CONFLICT (agent_id, skill_id) +DO UPDATE SET pinned_version = EXCLUDED.pinned_version +RETURNING *; + +-- openGauss 改写 +INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id) +VALUES ($1, $2, $3, current_setting('app.account_id')) +ON DUPLICATE KEY UPDATE pinned_version = EXCLUDED.pinned_version; +``` + +> ⚠️ **重要限制**: openGauss 的 `ON DUPLICATE KEY UPDATE` **不支持与 `RETURNING` 子句一起使用**。SQL #63 和 #77 同时使用了 `ON CONFLICT ... RETURNING *`,改写后需要拆分为两步操作(先 upsert,再 SELECT)。 + +**方案 B: `MERGE INTO`(推荐复杂场景)** +```sql +MERGE INTO skill_subscriptions t +USING (SELECT $1 AS agent_id, $2 AS skill_id, $3 AS pinned_version) s +ON (t.agent_id = s.agent_id AND t.skill_id = s.skill_id) +WHEN MATCHED THEN UPDATE SET pinned_version = s.pinned_version +WHEN NOT MATCHED THEN INSERT (agent_id, skill_id, pinned_version, account_id) + VALUES (s.agent_id, s.skill_id, s.pinned_version, current_setting('app.account_id')); +``` + +**对于 `ON CONFLICT DO NOTHING`**: 可改写为 `INSERT ... ON DUPLICATE KEY UPDATE NOTHING`。 + +--- + +#### 3.1.2 `ON CONFLICT ... RETURNING *` + `(xmax = 0) AS is_new` 组合不可用 + +**影响范围**: #77(1 处,但属于核心同步逻辑) + +**问题描述**: +`catalog_sync_service.py` 中的核心 upsert 语句同时使用了三个 openGauss 不兼容的特性: +1. `ON CONFLICT ... DO UPDATE SET ...` — 语法不支持 +2. `RETURNING *` — 与 upsert 组合不支持 +3. `(xmax = 0) AS is_new` — 利用 PostgreSQL 内部系统列判断是否是新插入行 + +**涉及代码**: +```sql +INSERT INTO contexts (...) VALUES (...) +ON CONFLICT (account_id, uri) DO UPDATE SET ... +RETURNING id, (xmax = 0) AS is_new +``` + +虽然 openGauss 支持 `xmax` 系统隐藏列,但由于 `ON CONFLICT` 和 `RETURNING` 的双重限制,此查询需要完全重构。 + +**改写建议**: +```sql +-- 先尝试 SELECT +SELECT id FROM contexts WHERE account_id = $4 AND uri = $1; +-- 如果不存在则 INSERT,如果存在则 UPDATE +-- 根据操作类型在应用层确定 is_new +``` + +--- + +#### 3.1.3 `CREATE EXTENSION IF NOT EXISTS vector` — pgvector 不存在 + +**影响范围**: #1(影响整个向量检索功能链:#10, #71, #73, #74, #75) + +**问题描述**: +openGauss **不包含 pgvector 扩展**。openGauss 提供的是自研的 **DataVec** 扩展,虽然提供了相同的 `vector` 数据类型和 `<=>` 余弦距离操作符,但扩展名和安装方式不同。 + +**改写方法**: +```sql +-- PostgreSQL +CREATE EXTENSION IF NOT EXISTS vector; + +-- openGauss +CREATE EXTENSION IF NOT EXISTS datavec; +``` + +DataVec 支持: +- ✅ `vector(1536)` 数据类型(最大 16000 维,索引最大 2000 维;1536 维在索引限制内) +- ✅ `<=>` 余弦距离操作符 +- ✅ HNSW 索引 + `vector_cosine_ops` 操作符类 +- ✅ `$1::vector` 类型转换 +- ✅ `WITH (m=16, ef_construction=64)` 索引参数 + +因此向量相关的 DML(#71, #73, #75 等)**在安装 DataVec 后可直接使用**,无需修改。 + +--- + +#### 3.1.4 `CREATE EXTENSION IF NOT EXISTS pgcrypto` — pgcrypto 不存在 + +**影响范围**: #2 + 所有使用 `gen_random_uuid()` 的表定义(#3, #12, #17 等) + +**问题描述**: +openGauss **不支持 pgcrypto 扩展**。`pgcrypto` 的控制文件在 openGauss 发行版中不包含,`CREATE EXTENSION pgcrypto` 会直接报错。 + +项目使用 `pgcrypto` 的唯一目的是提供 `gen_random_uuid()` 函数。 + +**改写方法**: +需确认 openGauss 是否内置 `gen_random_uuid()`。根据 openGauss 文档,`gen_random_uuid()` 从较新版本开始作为内置函数提供(无需扩展)。如果 7.0.0 版本中已内置,则只需删除 `CREATE EXTENSION IF NOT EXISTS pgcrypto` 语句即可。 + +若未内置,则需要使用 `uuid-ossp` 扩展或者 `sys_guid()` 函数替代: +```sql +-- 替代方案 1: 使用 uuid-ossp +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +-- 然后将 DEFAULT gen_random_uuid() 替换为 DEFAULT uuid_generate_v4() + +-- 替代方案 2: 使用 sys_guid() +-- 将列默认值改为 DEFAULT sys_guid() +``` + +--- + +#### 3.1.5 `CREATE POLICY` 语法差异 + +**影响范围**: #6, #18, #22(共 3 处) + +**问题描述**: +PostgreSQL 使用 `CREATE POLICY name ON table`,而 openGauss 使用 `CREATE ROW LEVEL SECURITY POLICY name ON table`。 + +**涉及代码**: +```sql +-- PostgreSQL 原始 +CREATE POLICY tenant_isolation ON contexts + USING (account_id = current_setting('app.account_id')) + +-- openGauss 必须改为 +CREATE ROW LEVEL SECURITY POLICY tenant_isolation ON contexts + USING (account_id = current_setting('app.account_id')) +``` + +三处 `CREATE POLICY` 都需要加上 `ROW LEVEL SECURITY` 关键词。 + +--- + +#### 3.1.6 `CREATE TRIGGER ... EXECUTE FUNCTION` 语法差异 + +**影响范围**: #16(1 处) + +**问题描述**: +PostgreSQL 11+ 推荐使用 `EXECUTE FUNCTION`,而 openGauss 的常规触发器只支持 `EXECUTE PROCEDURE` 语法。 + +**改写方法**: +```sql +-- PostgreSQL +CREATE TRIGGER trg_change_events_notify +AFTER INSERT ON change_events +FOR EACH ROW EXECUTE FUNCTION notify_change_event(); + +-- openGauss +CREATE TRIGGER trg_change_events_notify +AFTER INSERT ON change_events +FOR EACH ROW EXECUTE PROCEDURE notify_change_event(); +``` + +--- + +#### 3.1.7 `make_interval()` 函数不支持 + +**影响范围**: #106(1 处) + +**问题描述**: +openGauss 不支持 PostgreSQL 的 `make_interval(secs => ...)` 函数。 + +**涉及代码**: +```sql +next_retry_at = NOW() + make_interval(secs => LEAST(300, 5 * attempt_count)) +``` + +**改写方法**: +```sql +-- 使用 interval 乘法替代 +next_retry_at = NOW() + (LEAST(300, 5 * attempt_count) || ' seconds')::interval +-- 或 +next_retry_at = NOW() + (LEAST(300, 5 * attempt_count) * interval '1 second') +``` + +--- + +### 3.2 🟡 需适配问题 + +#### 3.2.1 `LISTEN/NOTIFY` 及 `pg_notify` 可用性不确定 + +**影响范围**: 传播引擎核心机制(#15 触发器函数中的 `pg_notify` + `PropagationEngine` 的 `LISTEN`) + +**问题描述**: +openGauss 的系统函数列表中包含 `pg_notify`(已在 7.0.0-RC1 文档中确认),但 `LISTEN` 语句的支持情况在官方文档中没有明确说明。openGauss 的函数文档声明"内置函数和操作符继承自开源 PG",暗示可能支持,但需要实际验证。 + +**风险**: +如果 `LISTEN` 不支持,则整个事件传播的实时唤醒机制将失效。但由于 `PropagationEngine` 本身有周期性唤醒(`_periodic_wakeup`)作为兜底,系统不会完全中断,只是实时性会降低为定时轮询。 + +**建议**: 在 openGauss 环境中实测 `LISTEN 'context_changed'` 和 `pg_notify('context_changed', ...)` 是否可用。 + +#### 3.2.2 `array_position()` 函数可用性不确定 + +**影响范围**: #108(1 处) + +**问题描述**: +`datalake.py` 中使用 `ORDER BY array_position($1::uuid[], c.id)` 来保持结果排序与输入数组一致。openGauss 的数组函数文档未明确列出 `array_position`,但其文档声明"函数继承自 PG"。 + +**改写方法(如不支持)**: +```sql +-- 使用子查询 + 生成序列替代 +ORDER BY (SELECT i FROM generate_subscripts($1::uuid[], 1) i WHERE ($1::uuid[])[i] = c.id LIMIT 1) +``` + +#### 3.2.3 `jsonb_agg` 聚合函数可用性不确定 + +**影响范围**: #108(1 处) + +**问题描述**: +`datalake.py` 的 SQL 上下文组装查询大量使用 `jsonb_agg(jsonb_build_object(...))` 子查询。`jsonb_build_object` 已确认支持,但 `jsonb_agg` 在 openGauss 文档中未明确提及。 + +**改写方法(如不支持)**: +可用 `json_agg` 替代后转 jsonb,或使用 `array_agg` + `array_to_json` 组合。 + +#### 3.2.4 asyncpg 驱动不兼容 + +**影响范围**: 整个数据访问层 + +**问题描述**: +标准 `asyncpg` 库是为 PostgreSQL 协议优化的,直接连接 openGauss 可能因认证协议差异(openGauss 使用 SHA256 认证)而失败。 + +**替代方案**: +华为提供了 **`async-gaussdb`** 库,这是 asyncpg 的 openGauss 适配分支,API 与 asyncpg 基本兼容,支持 SHA256 认证。 + +```bash +pip install async-gaussdb +``` + +代码中需要将 `import asyncpg` 替换为 `import async_gaussdb as asyncpg`(或按库的实际 API 调整)。 + +--- + +### 3.3 🟢 兼容项(无需修改) + +以下 SQL 特性在 openGauss 7.0.0 中**已确认兼容**: + +| 特性 | 涉及 SQL # | 说明 | +|---|---|---| +| `set_config()` / `current_setting()` | #33, #41, #45 等 | openGauss 完全支持 | +| `WITH RECURSIVE` 递归 CTE | #35, #92, #93 | openGauss 完全支持 | +| `UNION ALL` | #35, #92, #93 | 标准 SQL,完全支持 | +| `SELECT ... FOR UPDATE` 行锁 | #57 | openGauss 完全支持 | +| `UPDATE ... RETURNING` | #39, #44, #46, #47 等 | openGauss 支持(列存表除外) | +| `INSERT ... RETURNING` | #44, #50, #53 | openGauss 支持 | +| `DISTINCT ON (expression)` | #92, #93 | openGauss 完全支持 | +| `ALTER TABLE ... ENABLE/FORCE ROW LEVEL SECURITY` | #4, #5, #31, #32 | openGauss 完全支持 | +| `TIMESTAMPTZ` 数据类型 | 所有表 | openGauss 完全支持 | +| `JSONB` 数据类型与 `$n::jsonb` 转换 | #12, #23, #25, #79 | openGauss 完全支持 | +| `TEXT[]` 数组类型 | #3 (tags 字段) | openGauss 支持 | +| `= ANY($1)` 数组操作符 | #70, #92, #93 | openGauss 完全支持 | +| `SERIAL` 自增类型 | #11, #21, #26 | openGauss 完全支持 | +| `UUID` 数据类型 | 所有表 | openGauss 完全支持 | +| `COALESCE()`, `LEAST()`, `NOW()` | 多处 | openGauss 完全支持 | +| `LIKE` / `ILIKE` | #42, #72 | openGauss 完全支持 | +| `CASE WHEN ... THEN ... ELSE ... END` | #72, #90 | openGauss 完全支持 | +| 部分索引 (Partial Index) `CREATE INDEX ... WHERE ...` | #9, #13, #14 | openGauss 完全支持 | +| `PL/pgSQL` 函数语言 | #15 | openGauss 完全支持 | +| `TRUNCATE ... CASCADE` | #114 | openGauss 完全支持 | +| `DROP TABLE IF EXISTS ... CASCADE` | #29 | openGauss 完全支持 | +| `CHECK` 约束 | 多表 | openGauss 完全支持 | +| `UNIQUE` 约束 | 多表 | openGauss 完全支持 | +| `REFERENCES` 外键约束 | 多表 | openGauss 完全支持 | +| `$1::interval` 类型转换 | #94 | openGauss 完全支持 | +| `$1::uuid` 类型转换 | #95 | openGauss 完全支持 | +| `FLOAT` 数据类型 | #25 | openGauss 完全支持 | +| `HNSW` 索引(通过 DataVec) | #10 | openGauss DataVec 支持,语法一致 | +| `<=>` 余弦距离操作符(通过 DataVec) | #71 | openGauss DataVec 支持 | +| `vector_cosine_ops` 操作符类(通过 DataVec) | #10 | openGauss DataVec 支持 | + +--- + +## 四、改写工作量汇总 + +| 严重程度 | 数量 | 涉及文件数 | 描述 | +|---|---|---|---| +| 🔴 阻断 | 7 类问题 | ~8 个文件 | 必须改写否则无法运行 | +| 🟡 适配 | 4 类问题 | ~5 个文件 | 需验证或小幅调整 | +| 🟢 兼容 | ~90+ 条 SQL | — | 无需修改 | + +### 必须修改的文件清单 + +| 文件 | 修改项 | +|---|---| +| `alembic/versions/001_initial_schema.py` | `CREATE EXTENSION vector` → `datavec`; 删除 `pgcrypto`(或替换 UUID 方案); `CREATE POLICY` → `CREATE ROW LEVEL SECURITY POLICY`(3处); `EXECUTE FUNCTION` → `EXECUTE PROCEDURE`(1处) | +| `src/contexthub/services/skill_service.py` | `ON CONFLICT ... DO UPDATE ... RETURNING *` → `MERGE INTO` 或 `ON DUPLICATE KEY UPDATE` + 单独 SELECT | +| `src/contexthub/services/catalog_sync_service.py` | 4 处 `ON CONFLICT` → `MERGE INTO` 或 `ON DUPLICATE KEY UPDATE`; `RETURNING id, (xmax = 0) AS is_new` → 拆分为先查后改 | +| `src/contexthub/services/propagation_engine.py` | `make_interval(secs => ...)` → `(... \|\| ' seconds')::interval` | +| `src/contexthub/api/routers/datalake.py` | 若 `array_position` / `jsonb_agg` 不可用则需改写 | +| `src/contexthub/db/pool.py` / `repository.py` | `asyncpg` → `async-gaussdb` 驱动替换 | +| `scripts/demo_e2e.py` | `ON CONFLICT DO NOTHING` → `ON DUPLICATE KEY UPDATE NOTHING` | +| `pyproject.toml` / `requirements` | 依赖替换: `asyncpg` → `async-gaussdb`; `pgvector` python 包评估 | +| `docker-compose.yml` | `pgvector/pgvector:pg16` 镜像 → openGauss 7.0.0 镜像 | + +--- + +## 五、迁移建议 + +### 5.1 推荐使用 PG 兼容模式 + +创建 openGauss 数据库时建议使用 **PG 兼容模式**: +```sql +CREATE DATABASE contexthub DBCOMPATIBILITY = 'PG'; +``` +这将最大程度保留 PostgreSQL 语法兼容性(如 `LIMIT/OFFSET`、`||` 字符串拼接、空字符串处理等)。 + +### 5.2 迁移优先级 + +1. **P0 — 驱动层**: 将 `asyncpg` 替换为 `async-gaussdb`,这是最基础的连接层变更 +2. **P0 — 扩展替换**: `pgvector` → `datavec`,`pgcrypto` → 内置 UUID 或 `uuid-ossp` +3. **P0 — DDL 语法**: 修改 `CREATE POLICY` 和 `EXECUTE FUNCTION` 语法 +4. **P1 — UPSERT 改写**: 6 处 `ON CONFLICT` 改为 `MERGE INTO` 或 `ON DUPLICATE KEY UPDATE` +5. **P1 — 函数替换**: `make_interval` → interval 表达式 +6. **P2 — 验证**: `LISTEN/NOTIFY`、`array_position`、`jsonb_agg` 实测验证 +7. **P3 — 基础设施**: Docker 镜像、CI/CD 流水线调整 + +### 5.3 建议引入兼容层 + +考虑在 `ScopedRepo` 层(`db/repository.py`)引入 SQL 方言抽象层,使同一份业务代码可以同时支持 PostgreSQL 和 openGauss: + +```python +class SQLDialect: + def upsert(self, table, conflict_cols, update_cols, returning=None): + """根据数据库类型生成不同的 UPSERT 语法""" + ... +``` + +--- + +## 六、结论 + +将 ContextHub 从 PostgreSQL 16 迁移到 openGauss 7.0.0 是**可行的**,但需要处理 **7 类阻断级兼容性问题**。其中最大的工程量在于: + +1. **`ON CONFLICT` UPSERT 语法改写**(6 处,涉及 3 个核心服务文件,其中 2 处还组合了 `RETURNING`) +2. **扩展替换**(`pgvector` → `datavec`,`pgcrypto` → 内置方案) +3. **asyncpg 驱动替换**(→ `async-gaussdb`) + +大部分标准 SQL(SELECT/UPDATE/INSERT/DELETE、JOIN、递归 CTE、行锁、RLS 等)在 openGauss 7.0.0 PG 兼容模式下可以直接使用,整体兼容性较好。预计核心代码改动集中在约 **8 个源文件**中,业务逻辑本身无需变更。 diff --git a/opengauss/cleanup_demo_data.py b/opengauss/cleanup_demo_data.py new file mode 100644 index 0000000..047e3e9 --- /dev/null +++ b/opengauss/cleanup_demo_data.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +"""Clean data possibly inserted by demo_e2e_opengauss.py. + +Usage: + python clean_for_opengauss.py + +Environment: + DEMO_DB_DSN Optional asyncpg DSN. Defaults to local openGauss DSN used by demo. + DEMO_ACCOUNT Optional account id. Defaults to "acme". +""" + +from __future__ import annotations + +import asyncio +import os +import sys +from typing import Iterable +from uuid import UUID + +import asyncpg + + +DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub" +DEFAULT_ACCOUNT = "acme" + +DEMO_SKILL_URI = "ctx://team/engineering/skills/sql-generator" +DEMO_MEMORY_CONTENT = ( + "The orders table uses user_id as FK to users. " + "Always JOIN on orders.user_id = users.id." +) + + +def _format_count(tag: str, count: str) -> str: + return f" - {tag}: {count}" + + +async def _collect_target_contexts(conn: asyncpg.Connection) -> list[asyncpg.Record]: + return await conn.fetch( + """ + SELECT id, uri + FROM contexts + WHERE account_id = current_setting('app.account_id') + AND ( + uri = $1 + OR ( + uri LIKE 'ctx://agent/query-agent/memories/mem-%' + AND l2_content = $2 + ) + OR ( + uri LIKE 'ctx://team/engineering/memories/shared_knowledge/mem-%' + AND l2_content = $2 + ) + OR uri LIKE 'ctx://datalake/mock/%' + ) + ORDER BY uri + """, + DEMO_SKILL_URI, + DEMO_MEMORY_CONTENT, + ) + + +async def _delete_by_context_ids(conn: asyncpg.Connection, context_ids: Iterable[UUID]) -> None: + ids = list(context_ids) + if not ids: + print(" - No matching contexts found, nothing to delete.") + return + + print(_format_count("target contexts", str(len(ids)))) + + statements: list[tuple[str, str]] = [ + ( + "query_templates", + "DELETE FROM query_templates WHERE context_id = ANY($1::uuid[])", + ), + ( + "table_relationships", + "DELETE FROM table_relationships WHERE table_id_a = ANY($1::uuid[]) OR table_id_b = ANY($1::uuid[])", + ), + ( + "lineage", + "DELETE FROM lineage WHERE upstream_id = ANY($1::uuid[]) OR downstream_id = ANY($1::uuid[])", + ), + ( + "table_metadata", + "DELETE FROM table_metadata WHERE context_id = ANY($1::uuid[])", + ), + ( + "skill_subscriptions", + "DELETE FROM skill_subscriptions WHERE skill_id = ANY($1::uuid[])", + ), + ( + "skill_versions", + "DELETE FROM skill_versions WHERE skill_id = ANY($1::uuid[])", + ), + ( + "dependencies", + "DELETE FROM dependencies WHERE dependent_id = ANY($1::uuid[]) OR dependency_id = ANY($1::uuid[])", + ), + ( + "change_events", + "DELETE FROM change_events WHERE context_id = ANY($1::uuid[])", + ), + ( + "contexts", + "DELETE FROM contexts WHERE id = ANY($1::uuid[])", + ), + ] + + for label, sql in statements: + result = await conn.execute(sql, ids) + print(_format_count(label, result)) + + +async def main() -> None: + dsn = os.environ.get("DEMO_DB_DSN", DEFAULT_DSN) + account = os.environ.get("DEMO_ACCOUNT", DEFAULT_ACCOUNT) + + print("Cleaning demo data for openGauss...") + print(f" - account: {account}") + print(f" - dsn: {dsn}") + + conn = await asyncpg.connect(dsn) + try: + async with conn.transaction(): + await conn.execute("SELECT set_config('app.account_id', $1, true)", account) + + targets = await _collect_target_contexts(conn) + for row in targets: + print(f" - match: {row['uri']}") + + await _delete_by_context_ids(conn, [row["id"] for row in targets]) + finally: + await conn.close() + + print("Cleanup complete.") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except Exception as exc: # pragma: no cover + print(f"Cleanup failed: {exc}", file=sys.stderr) + raise diff --git a/opengauss/connection_test.py b/opengauss/connection_test.py new file mode 100644 index 0000000..2f01515 --- /dev/null +++ b/opengauss/connection_test.py @@ -0,0 +1,75 @@ +import psycopg2 +from psycopg2 import OperationalError + +def test_opengauss_connection(): + conn = None + try: + # Connect using explicit parameters to safely handle the '@' in the password + print("Connecting to openGauss database...") + conn = psycopg2.connect( + host="localhost", # Replace with your actual host IP/domain + port="15432", + dbname="contexthub", + user="contexthub", + password="ContextHub@123" + ) + print("✅ Connection successful!") + + # Create a cursor to execute SQL commands + cur = conn.cursor() + + # --- WRITE TEST --- + print("\nTesting Write Operation...") + + # 1. Create a temporary test table + cur.execute(""" + CREATE TABLE IF NOT EXISTS python_test_table ( + id SERIAL PRIMARY KEY, + message VARCHAR(255) + ) + """) + + # 2. Insert data and return the ID + insert_query = "INSERT INTO python_test_table (message) VALUES (%s) RETURNING id;" + cur.execute(insert_query, ("Hello from Python to openGauss!",)) + + # Fetch the ID of the newly inserted row + inserted_id = cur.fetchone()[0] + + # Commit the transaction to save the write + conn.commit() + print(f"✅ Write successful! Inserted row with ID: {inserted_id}") + + # --- READ TEST --- + print("\nTesting Read Operation...") + + # Select the data we just inserted + select_query = "SELECT id, message FROM python_test_table WHERE id = %s;" + cur.execute(select_query, (inserted_id,)) + + record = cur.fetchone() + print(f"✅ Read successful! Fetched data: {record}") + + # --- CLEANUP --- + print("\nCleaning up test data...") + cur.execute("DROP TABLE python_test_table;") + conn.commit() + print("✅ Table dropped successfully!") + + except OperationalError as e: + print(f"❌ Connection failed: {e}") + except Exception as e: + print(f"❌ An error occurred during database operations: {e}") + # Rollback the transaction on error + if conn is not None: + conn.rollback() + finally: + # Always close the cursor and connection + if conn is not None: + cur.close() + conn.close() + print("\nDatabase connection closed.") + +if __name__ == "__main__": + test_opengauss_connection() + diff --git a/opengauss/demo_e2e_opengauss.py b/opengauss/demo_e2e_opengauss.py new file mode 100644 index 0000000..9bef4b2 --- /dev/null +++ b/opengauss/demo_e2e_opengauss.py @@ -0,0 +1,193 @@ +"""ContextHub MVP End-to-End Demo + +Prerequisites: + docker-compose up -d + alembic upgrade head + uvicorn contexthub.main:app --port 8000 + +Usage: + python demo_e2e.py +""" + +from __future__ import annotations + +import asyncio +import sys +import time + +import httpx + + +BASE_URL = "http://localhost:8000" +API_KEY = "changeme" +ACCOUNT = "acme" + + +def _headers(agent_id: str) -> dict: + return { + "X-API-Key": API_KEY, + "X-Account-Id": ACCOUNT, + "X-Agent-Id": agent_id, + } + + +def step(n: int, desc: str): + print(f"\n{'='*60}") + print(f" Step {n}: {desc}") + print(f"{'='*60}") + + +async def _ensure_team_membership(): + """Ensure query-agent is a direct member of engineering (seed data only has engineering/backend).""" + import asyncpg + conn = await asyncpg.connect("postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub") + try: + await conn.execute("SET app.account_id = 'acme'") + await conn.execute(""" + INSERT INTO team_memberships (agent_id, team_id, role, access, is_primary) + VALUES ('query-agent', '00000000-0000-0000-0000-000000000002', 'member', 'read_write', FALSE) + ON DUPLICATE KEY UPDATE NOTHING + """) + finally: + await conn.close() + + +async def main(): + await _ensure_team_membership() + + async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as http: + # Health check + r = await http.get("/health") + if r.status_code != 200: + print("Server not reachable. Start it first.") + sys.exit(1) + print("Server healthy.") + + qa = _headers("query-agent") + aa = _headers("analysis-agent") + + # ── Step 1: query-agent writes private memory ── + step(1, "query-agent writes private memory") + r = await http.post("/api/v1/memories", json={ + "content": "The orders table uses user_id as FK to users. Always JOIN on orders.user_id = users.id.", + "tags": ["schema-note", "orders"], + }, headers=qa) + assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}" + mem = r.json() + mem_uri = mem["uri"] + print(f" Created: {mem_uri}") + + # ── Step 2: query-agent creates and publishes Skill v1 ── + step(2, "query-agent publishes sql-generator Skill v1") + # Create skill context first + r = await http.post("/api/v1/contexts", json={ + "uri": "ctx://team/engineering/skills/sql-generator", + "context_type": "skill", + "scope": "team", + "owner_space": "engineering", + "l2_content": "SELECT * FROM orders WHERE status = 'completed'", + }, headers=qa) + assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}" + skill_ctx = r.json() + print(f" Skill context: {skill_ctx['uri']}") + + r = await http.post("/api/v1/skills/versions", json={ + "skill_uri": "ctx://team/engineering/skills/sql-generator", + "content": "v1: Basic SQL generator for orders queries", + "changelog": "Initial release", + "is_breaking": False, + }, headers=qa) + assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}" + v1 = r.json() + print(f" Published v{v1['version']}") + + # ── Step 3: query-agent promotes memory to team ── + step(3, "query-agent promotes memory to team/engineering") + r = await http.post("/api/v1/memories/promote", json={ + "uri": mem_uri, + "target_team": "engineering", + }, headers=qa) + assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}" + promoted = r.json() + print(f" Promoted to: {promoted['uri']}") + + # ── Step 4: analysis-agent sees shared memory + subscribes to skill ── + step(4, "analysis-agent retrieves shared memory and subscribes to skill") + r = await http.get("/api/v1/memories", headers=aa) + assert r.status_code == 200 + memories = r.json() + shared = [m for m in memories if "shared_knowledge" in m["uri"]] + print(f" analysis-agent sees {len(shared)} shared memories") + assert len(shared) >= 1, "Promoted memory not visible!" + + r = await http.post("/api/v1/skills/subscribe", json={ + "skill_uri": "ctx://team/engineering/skills/sql-generator", + "pinned_version": 1, + }, headers=aa) + assert r.status_code == 200, f"Subscribe failed: {r.text}" + print(f" Subscribed to sql-generator, pinned v1") + + # ── Step 5: query-agent publishes breaking Skill v2 ── + step(5, "query-agent publishes breaking Skill v2") + r = await http.post("/api/v1/skills/versions", json={ + "skill_uri": "ctx://team/engineering/skills/sql-generator", + "content": "v2: Rewritten SQL generator with CTE support", + "changelog": "Breaking: new output format", + "is_breaking": True, + }, headers=qa) + assert r.status_code == 201 + v2 = r.json() + print(f" Published v{v2['version']} (breaking)") + + # ── Step 6: Wait for propagation + verify ── + step(6, "Verify propagation: stale/advisory") + await asyncio.sleep(2) # Let propagation engine process + + # analysis-agent reads skill — should get v1 with advisory about v2 + r = await http.post("/api/v1/tools/read", json={ + "uri": "ctx://team/engineering/skills/sql-generator", + }, headers=aa) + assert r.status_code == 200 + read_result = r.json() + print(f" analysis-agent reads skill: version={read_result.get('version')}") + if read_result.get("advisory"): + print(f" Advisory: {read_result['advisory']}") + + # ── Step 7 (optional): Catalog sync + sql-context ── + step(7, "[Vertical] Catalog sync + sql-context query") + r = await http.post("/api/v1/datalake/sync", json={"catalog": "mock"}, headers=qa) + if r.status_code == 200: + sync = r.json() + print(f" Synced {sync['tables_synced']} tables ({sync['tables_created']} new)") + + r = await http.get("/api/v1/datalake/mock/prod", headers=qa) + if r.status_code == 200: + tables = r.json().get("tables", []) + print(f" Listed {len(tables)} tables in mock/prod") + + r = await http.post("/api/v1/search/sql-context", json={ + "query": "How many orders per user?", + "catalog": "mock", + "top_k": 3, + }, headers=qa) + if r.status_code == 200: + sql_ctx = r.json() + print(f" sql-context returned {sql_ctx['total_tables_found']} relevant tables") + else: + print(f" sql-context: {r.status_code} (search may need embeddings)") + else: + print(f" Sync returned {r.status_code}: {r.text}") + + # ── Summary ── + print(f"\n{'='*60}") + print(" MVP Demo Complete") + print(f"{'='*60}") + print(f" - Private memory created: {mem_uri}") + print(f" - Promoted to team: {promoted['uri']}") + print(f" - Skill v1 + v2 published, breaking propagation triggered") + print(f" - Cross-agent visibility verified") + print(f" - Catalog sync + sql-context demonstrated") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/opengauss/readme.md b/opengauss/readme.md new file mode 100644 index 0000000..c486d14 --- /dev/null +++ b/opengauss/readme.md @@ -0,0 +1,9 @@ +`connection_test.py` test connection and basic read/write to a opengauss database. + +Unsupported Features: +* `verify_LISTEN_UNLISTEN_NOTIFY.py`: opengauss does not support LISTEN/UNLISTEN/NOTIFY. +* `vector_asyncpg.py`: opengauss is incompatible with asyncpg w.r.t. vector dtype, `message: unhandled standard data type 'vector' (OID 8305)` + +demo: +* `demo_e2e_opengauss.py`: run demo with opengauss backend. +* `cleanup_demo_data.py`: cleanup memories inserted by demo script. diff --git a/opengauss/vector_async_gaussdb.py b/opengauss/vector_async_gaussdb.py new file mode 100644 index 0000000..afaf426 --- /dev/null +++ b/opengauss/vector_async_gaussdb.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""Minimal repro for asyncpg + openGauss vector result decoding. + +What this script demonstrates: +1. Queries that do NOT return the vector column succeed. +2. Queries that DO return the vector column fail in asyncpg on openGauss. + +Default DSN matches the repo's openGauss setup guide. Override with: + VECTOR_REPRO_DSN=postgresql://user:pass@host:port/db python scripts/repro_asyncpg_opengauss_vector.py +""" + +from __future__ import annotations + +import asyncio +import os +import sys + +import async_gaussdb as asyncpg + + +#DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub" +DEFAULT_DSN = "gaussdb://contexthub:ContextHub%40123@localhost:15432/contexthub" +TEMP_TABLE = "tmp_vector_repro" + + +async def _run_query(conn: asyncpg.Connection, label: str, sql: str, method: str) -> None: + print(f"\n[{label}]") + print(f"SQL: {sql}") + try: + result = await getattr(conn, method)(sql) + print("status: OK") + print(f"result: {result}") + except Exception as exc: + print("status: ERROR") + print(f"type: {type(exc).__name__}") + print(f"message: {exc}") + + +async def main() -> int: + dsn = os.environ.get("VECTOR_REPRO_DSN", DEFAULT_DSN) + + print("Connecting with asyncpg...") + conn = await asyncpg.connect(dsn) + try: + await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}") + await conn.execute( + f""" + CREATE TABLE {TEMP_TABLE} ( + id INT PRIMARY KEY, + note TEXT, + emb vector(3) + ) + """ + ) + await conn.execute( + f""" + INSERT INTO {TEMP_TABLE} (id, note, emb) + VALUES (1, 'vector row', '[1,2,3]') + """ + ) + + print("Inserted one row with a vector value.") + + await _run_query( + conn, + "Control case: query without vector column", + f"SELECT id, note FROM {TEMP_TABLE}", + "fetch", + ) + await _run_query( + conn, + "Failure case: query only the vector column", + f"SELECT emb FROM {TEMP_TABLE}", + "fetch", + ) + await _run_query( + conn, + "Failure case: SELECT * includes the vector column", + f"SELECT * FROM {TEMP_TABLE}", + "fetch", + ) + await _run_query( + conn, + "Failure case: fetchrow also fails when result contains vector", + f"SELECT * FROM {TEMP_TABLE}", + "fetchrow", + ) + finally: + await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}") + await conn.close() + + return 0 + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) diff --git a/opengauss/vector_asyncpg.py b/opengauss/vector_asyncpg.py new file mode 100644 index 0000000..3707263 --- /dev/null +++ b/opengauss/vector_asyncpg.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +"""Minimal repro for asyncpg + openGauss vector result decoding. + +What this script demonstrates: +1. Queries that do NOT return the vector column succeed. +2. Queries that DO return the vector column fail in asyncpg on openGauss. + +Default DSN matches the repo's openGauss setup guide. Override with: + VECTOR_REPRO_DSN=postgresql://user:pass@host:port/db python scripts/repro_asyncpg_opengauss_vector.py +""" + +from __future__ import annotations + +import asyncio +import os +import sys + +import asyncpg + + +DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub" +TEMP_TABLE = "tmp_vector_repro" + + +async def _run_query(conn: asyncpg.Connection, label: str, sql: str, method: str) -> None: + print(f"\n[{label}]") + print(f"SQL: {sql}") + try: + result = await getattr(conn, method)(sql) + print("status: OK") + print(f"result: {result}") + except Exception as exc: + print("status: ERROR") + print(f"type: {type(exc).__name__}") + print(f"message: {exc}") + + +async def main() -> int: + dsn = os.environ.get("VECTOR_REPRO_DSN", DEFAULT_DSN) + + print("Connecting with asyncpg...") + conn = await asyncpg.connect(dsn) + try: + await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}") + await conn.execute( + f""" + CREATE TABLE {TEMP_TABLE} ( + id INT PRIMARY KEY, + note TEXT, + emb vector(3) + ) + """ + ) + await conn.execute( + f""" + INSERT INTO {TEMP_TABLE} (id, note, emb) + VALUES (1, 'vector row', '[1,2,3]') + """ + ) + + print("Inserted one row with a vector value.") + + await _run_query( + conn, + "Control case: query without vector column", + f"SELECT id, note FROM {TEMP_TABLE}", + "fetch", + ) + await _run_query( + conn, + "Failure case: query only the vector column", + f"SELECT emb FROM {TEMP_TABLE}", + "fetch", + ) + await _run_query( + conn, + "Failure case: SELECT * includes the vector column", + f"SELECT * FROM {TEMP_TABLE}", + "fetch", + ) + await _run_query( + conn, + "Failure case: fetchrow also fails when result contains vector", + f"SELECT * FROM {TEMP_TABLE}", + "fetchrow", + ) + finally: + await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}") + await conn.close() + + return 0 + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) diff --git a/opengauss/vector_psycopg2.py b/opengauss/vector_psycopg2.py new file mode 100644 index 0000000..150cb61 --- /dev/null +++ b/opengauss/vector_psycopg2.py @@ -0,0 +1,75 @@ +""" +test read/write to opengauss database with vector datatype +""" +import psycopg2 +from psycopg2 import OperationalError + +DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub" + +def test_opengauss_connection(): + conn = None + try: + # Connect using explicit parameters to safely handle the '@' in the password + print("Connecting to openGauss database...") + conn = psycopg2.connect(DEFAULT_DSN) + print("✅ Connection successful!") + + # Create a cursor to execute SQL commands + cur = conn.cursor() + + # --- WRITE TEST --- + print("\nTesting Write Operation...") + + # 1. Create a temporary test table + cur.execute(""" + CREATE TABLE IF NOT EXISTS python_test_table ( + id SERIAL PRIMARY KEY, + message VARCHAR(255), + emb vector(3) + ) + """) + + # 2. Insert data and return the ID + insert_query = "INSERT INTO python_test_table (message, emb) VALUES ('hello', '[1,2,3]') RETURNING id;" + cur.execute(insert_query) + + # Fetch the ID of the newly inserted row + inserted_id = cur.fetchone()[0] + + # Commit the transaction to save the write + conn.commit() + print(f"✅ Write successful! Inserted row with ID: {inserted_id}") + + # --- READ TEST --- + print("\nTesting Read Operation...") + + # Select the data we just inserted + select_query = "SELECT * FROM python_test_table WHERE id = %s;" + cur.execute(select_query, (inserted_id,)) + + record = cur.fetchone() + print(f"✅ Read successful! Fetched data: {record}") + + # --- CLEANUP --- + print("\nCleaning up test data...") + cur.execute("DROP TABLE python_test_table;") + conn.commit() + print("✅ Table dropped successfully!") + + except OperationalError as e: + print(f"❌ Connection failed: {e}") + except Exception as e: + print(f"❌ An error occurred during database operations: {e}") + # Rollback the transaction on error + if conn is not None: + conn.rollback() + finally: + # Always close the cursor and connection + if conn is not None: + cur.close() + conn.close() + print("\nDatabase connection closed.") + +if __name__ == "__main__": + test_opengauss_connection() + diff --git a/opengauss/verify_LISTEN_UNLISTEN_NOTIFY.py b/opengauss/verify_LISTEN_UNLISTEN_NOTIFY.py new file mode 100644 index 0000000..d223706 --- /dev/null +++ b/opengauss/verify_LISTEN_UNLISTEN_NOTIFY.py @@ -0,0 +1,94 @@ +import asyncio +import time +import asyncpg + +DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub" +CHANNEL = f"listen_notify_test_{int(time.time())}" +PAYLOAD = "hello-from-notify" + + +def print_exc(prefix: str, exc: Exception) -> None: + print(f"{prefix}: FAIL") + print(f" type = {type(exc).__name__}") + print(f" sqlstate = {getattr(exc, 'sqlstate', None)}") + print(f" message = {exc}") + + +async def main(): + listener_conn = None + sender_conn = None + got_event = asyncio.Event() + received = [] + + def on_notify(connection, pid, channel, payload): + received.append( + { + "pid": pid, + "channel": channel, + "payload": payload, + } + ) + got_event.set() + + try: + print(f"Connecting to {DSN}") + listener_conn = await asyncpg.connect(DSN) + sender_conn = await asyncpg.connect(DSN) + print("CONNECT: OK\n") + + # 1) LISTEN + print(f"[1] Testing LISTEN on channel: {CHANNEL}") + listen_ok = False + try: + await listener_conn.add_listener(CHANNEL, on_notify) + listen_ok = True + print("LISTEN: OK") + except Exception as e: + print_exc("LISTEN", e) + print() + + # 2) NOTIFY + print(f"[2] Testing NOTIFY on channel: {CHANNEL}") + notify_ok = False + try: + sql = f"NOTIFY {CHANNEL}, '{PAYLOAD}'" + result = await sender_conn.execute(sql) + notify_ok = True + print(f"NOTIFY: OK ({result})") + except Exception as e: + print_exc("NOTIFY", e) + print() + + # 3) Delivery + print("[3] Testing notification delivery") + if listen_ok and notify_ok: + try: + await asyncio.wait_for(got_event.wait(), timeout=2.0) + print("DELIVERY: OK") + print(f" received = {received[0]}") + except asyncio.TimeoutError: + print("DELIVERY: FAIL") + print(" message = LISTEN and NOTIFY executed, but no notification was received within 2s") + else: + print("DELIVERY: SKIPPED") + print(" reason = LISTEN or NOTIFY already failed") + print() + + # 4) UNLISTEN + print("[4] Testing UNLISTEN *") + try: + result = await listener_conn.execute("UNLISTEN *") + print(f"UNLISTEN: OK ({result})") + except Exception as e: + print_exc("UNLISTEN", e) + print() + + finally: + if sender_conn is not None: + await sender_conn.close() + if listener_conn is not None: + await listener_conn.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index a4c3259..6e82023 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "fastapi>=0.115", "uvicorn[standard]>=0.34", "asyncpg>=0.30", - "pgvector>=0.3", + "psycopg[binary]>=3.2", "alembic>=1.14", "pydantic>=2.10", "pydantic-settings>=2.7", @@ -18,6 +18,9 @@ dependencies = [ ] [project.optional-dependencies] +postgres = [ + "pgvector>=0.3", +] dev = [ "httpx>=0.28", "pytest>=8.0", diff --git a/src/contexthub/config.py b/src/contexthub/config.py index 8ea71d4..865607c 100644 --- a/src/contexthub/config.py +++ b/src/contexthub/config.py @@ -16,6 +16,7 @@ def _normalize_postgres_url(url: str) -> str: class Settings(BaseSettings): database_url: str = "postgresql://contexthub:contexthub@localhost:5432/contexthub" + db_backend: str = "postgres" # "postgres" or "opengauss" api_key: str = "changeme" embedding_model: str = "text-embedding-3-small" propagation_enabled: bool = True @@ -33,6 +34,10 @@ class Settings(BaseSettings): env_file_encoding="utf-8", ) + @property + def is_opengauss(self) -> bool: + return self.db_backend.lower() == "opengauss" + @property def asyncpg_database_url(self) -> str: return _normalize_postgres_url(self.database_url) diff --git a/src/contexthub/db/pool.py b/src/contexthub/db/pool.py index 5cf2123..528739b 100644 --- a/src/contexthub/db/pool.py +++ b/src/contexthub/db/pool.py @@ -3,9 +3,19 @@ from contexthub.config import Settings +class _OpenGaussConnection(asyncpg.Connection): + """Avoid UNLISTEN in pool reset queries for openGauss compatibility.""" + + def get_reset_query(self): + return "SELECT pg_advisory_unlock_all(); CLOSE ALL; RESET ALL;" + + async def create_pool(settings: Settings) -> asyncpg.Pool: - return await asyncpg.create_pool( - dsn=settings.asyncpg_database_url, - min_size=2, - max_size=10, - ) + kwargs = { + "dsn": settings.asyncpg_database_url, + "min_size": 2, + "max_size": 10, + } + if settings.is_opengauss: + kwargs["connection_class"] = _OpenGaussConnection + return await asyncpg.create_pool(**kwargs) diff --git a/src/contexthub/db/repository.py b/src/contexthub/db/repository.py index 8562347..e659e63 100644 --- a/src/contexthub/db/repository.py +++ b/src/contexthub/db/repository.py @@ -1,13 +1,34 @@ from contextlib import asynccontextmanager from collections.abc import AsyncIterator from typing import Any +import re import asyncpg +import psycopg +from psycopg.rows import dict_row +from contexthub.config import Settings -class ScopedRepo: - """Request-scoped 数据库执行器。所有 SQL 都必须通过它执行。""" +_DOLLAR_PARAM_PATTERN = re.compile(r"\$([1-9][0-9]*)") + + +def _rewrite_dollar_params(sql: str, args: tuple[Any, ...]) -> tuple[str, tuple[Any, ...]]: + """Convert $n placeholders to %s placeholders for psycopg.""" + indexes: list[int] = [] + + def _replace(match: re.Match[str]) -> str: + idx = int(match.group(1)) - 1 + indexes.append(idx) + return "%s" + + rewritten = _DOLLAR_PARAM_PATTERN.sub(_replace, sql) + if not indexes: + return sql, args + return rewritten, tuple(args[i] for i in indexes) + + +class _AsyncpgExecutor: def __init__(self, conn: asyncpg.Connection): self._conn = conn @@ -24,12 +45,98 @@ async def execute(self, sql: str, *args: Any) -> str: return await self._conn.execute(sql, *args) +class _OpenGaussExecutor: + def __init__(self, conn: psycopg.AsyncConnection[Any]): + self._conn = conn + + async def fetch(self, sql: str, *args: Any) -> list[dict[str, Any]]: + adapted_sql, adapted_args = _rewrite_dollar_params(sql, args) + async with self._conn.cursor(row_factory=dict_row) as cur: + if adapted_args: + await cur.execute(adapted_sql, adapted_args) + else: + await cur.execute(adapted_sql) + return await cur.fetchall() + + async def fetchrow(self, sql: str, *args: Any) -> dict[str, Any] | None: + adapted_sql, adapted_args = _rewrite_dollar_params(sql, args) + async with self._conn.cursor(row_factory=dict_row) as cur: + if adapted_args: + await cur.execute(adapted_sql, adapted_args) + else: + await cur.execute(adapted_sql) + row = await cur.fetchone() + return row + + async def fetchval(self, sql: str, *args: Any) -> Any: + adapted_sql, adapted_args = _rewrite_dollar_params(sql, args) + async with self._conn.cursor() as cur: + if adapted_args: + await cur.execute(adapted_sql, adapted_args) + else: + await cur.execute(adapted_sql) + row = await cur.fetchone() + return row[0] if row is not None else None + + async def execute(self, sql: str, *args: Any) -> str: + adapted_sql, adapted_args = _rewrite_dollar_params(sql, args) + async with self._conn.cursor() as cur: + if adapted_args: + await cur.execute(adapted_sql, adapted_args) + else: + await cur.execute(adapted_sql) + return cur.statusmessage or "" + + +class ScopedRepo: + """Request-scoped 数据库执行器。所有 SQL 都必须通过它执行。""" + + def __init__(self, conn: _AsyncpgExecutor | _OpenGaussExecutor): + self._conn = conn + + async def fetch(self, sql: str, *args: Any) -> list[Any]: + return await self._conn.fetch(sql, *args) + + async def fetchrow(self, sql: str, *args: Any) -> Any: + return await self._conn.fetchrow(sql, *args) + + async def fetchval(self, sql: str, *args: Any) -> Any: + return await self._conn.fetchval(sql, *args) + + async def execute(self, sql: str, *args: Any) -> str: + return await self._conn.execute(sql, *args) + + class PgRepository: - def __init__(self, pool: asyncpg.Pool): + def __init__(self, pool: asyncpg.Pool | None, settings: Settings | None = None): self._pool = pool + self._settings = settings or Settings() @asynccontextmanager async def session(self, account_id: str) -> AsyncIterator[ScopedRepo]: + if self._settings.is_opengauss: + conn = await psycopg.AsyncConnection.connect(self._settings.asyncpg_database_url) + try: + await conn.execute("BEGIN") + scoped_repo = ScopedRepo(_OpenGaussExecutor(conn)) + await scoped_repo.execute( + "SELECT set_config('app.account_id', $1, true)", + account_id, + ) + try: + yield scoped_repo + except Exception: + await conn.rollback() + raise + else: + await conn.commit() + finally: + await conn.close() + return + + if self._pool is None: + raise RuntimeError("Postgres backend requires an asyncpg pool") + async with self._pool.acquire() as conn: async with conn.transaction(): # asyncpg does not support bind parameters in SET statements. @@ -37,4 +144,4 @@ async def session(self, account_id: str) -> AsyncIterator[ScopedRepo]: "SELECT set_config('app.account_id', $1, true)", account_id, ) - yield ScopedRepo(conn) + yield ScopedRepo(_AsyncpgExecutor(conn)) diff --git a/src/contexthub/services/acl_service.py b/src/contexthub/services/acl_service.py index 7c85e1b..c528042 100644 --- a/src/contexthub/services/acl_service.py +++ b/src/contexthub/services/acl_service.py @@ -1,10 +1,11 @@ """MVP ACL: default visibility and write permission checks.""" +import os + from contexthub.db.repository import ScopedRepo from contexthub.models.context import Scope from contexthub.models.request import RequestContext - class ACLService: """MVP 默认可见性 / 默认写权限。""" diff --git a/src/contexthub/services/propagation_engine.py b/src/contexthub/services/propagation_engine.py index 3ae560b..66eaf72 100644 --- a/src/contexthub/services/propagation_engine.py +++ b/src/contexthub/services/propagation_engine.py @@ -61,7 +61,15 @@ async def start(self) -> None: try: # 1. 建立独立 LISTEN 连接 listen_conn = await asyncpg.connect(self._dsn) - await listen_conn.add_listener("context_changed", self._on_notify) + try: + await listen_conn.add_listener("context_changed", self._on_notify) + except asyncpg.exceptions.FeatureNotSupportedError: + logger.warning( + "LISTEN/NOTIFY is not supported by the current database; " + "falling back to periodic propagation polling only." + ) + await listen_conn.close() + listen_conn = None # 2. 启动单条 drain loop + 周期唤醒 task drain_task = asyncio.create_task(self._drain_loop()) diff --git a/src/contexthub/services/skill_service.py b/src/contexthub/services/skill_service.py index 039c67f..64132a4 100644 --- a/src/contexthub/services/skill_service.py +++ b/src/contexthub/services/skill_service.py @@ -174,17 +174,34 @@ async def subscribe( raise BadRequestError(f"Version {pinned_version} does not exist") if ver["status"] != "published": raise BadRequestError(f"Version {pinned_version} is not published") - - row = await db.fetchrow( + #row = await db.fetchrow( + # """ + # INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id) + # VALUES ($1, $2, $3, current_setting('app.account_id')) + # ON CONFLICT (agent_id, skill_id) + # DO UPDATE SET pinned_version = EXCLUDED.pinned_version + # RETURNING * + # """, + # ctx.agent_id, skill_id, pinned_version, + #) + _ = await db.execute( """ INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id) VALUES ($1, $2, $3, current_setting('app.account_id')) - ON CONFLICT (agent_id, skill_id) - DO UPDATE SET pinned_version = EXCLUDED.pinned_version - RETURNING * + ON DUPLICATE KEY UPDATE pinned_version = EXCLUDED.pinned_version """, ctx.agent_id, skill_id, pinned_version, ) + row = await db.fetchrow( + """ + SELECT * FROM skill_subscriptions + WHERE agent_id = $1 AND skill_id = $2; + """, + ctx.agent_id, skill_id, + ) + # FIXME: ON DUPLICATE KEY UPDATE is incompatible with ROW level security policy in opengauss + # psycopg.errors.FeatureNotSupported: Row level security policy is not supported by INSERT ON DUPLICATE KEY UPDATE. + raise NotImplementedError return SkillSubscription( id=row["id"], agent_id=row["agent_id"], diff --git a/tests/test_propagation.py b/tests/test_propagation.py index d44d7f5..1268001 100644 --- a/tests/test_propagation.py +++ b/tests/test_propagation.py @@ -1,6 +1,7 @@ """Propagation engine tests: P-1 through P-8 + event-time correctness + single-instance concurrency.""" import asyncio +import asyncpg import json import uuid from contextlib import asynccontextmanager @@ -872,6 +873,33 @@ async def test_start_failure_closes_listener_connection(): assert engine._running is False +@pytest.mark.asyncio +async def test_start_degrades_gracefully_when_listen_not_supported(): + """Unsupported LISTEN should fall back to polling instead of aborting startup.""" + engine = _make_engine() + listen_conn = MagicMock() + listen_conn.add_listener = AsyncMock( + side_effect=asyncpg.exceptions.FeatureNotSupportedError( + "LISTEN statement is not yet supported." + ) + ) + listen_conn.close = AsyncMock() + + with patch( + "contexthub.services.propagation_engine.asyncpg.connect", + AsyncMock(return_value=listen_conn), + ): + await engine.start() + + listen_conn.close.assert_awaited_once() + assert engine._listen_conn is None + assert engine._drain_task is not None + assert engine._ticker_task is not None + assert engine._running is True + + await engine.stop() + + @pytest.mark.asyncio async def test_lifespan_cleans_up_when_propagation_start_fails(): """Startup failure must still close pool and embedding client."""