diff --git a/.env.example b/.env.example
index e3bc577..8e65d8b 100644
--- a/.env.example
+++ b/.env.example
@@ -1,3 +1,8 @@
DATABASE_URL=postgresql://contexthub:contexthub@localhost:5432/contexthub
+DB_BACKEND=postgres # "postgres" or "opengauss"
API_KEY=changeme
EMBEDDING_MODEL=text-embedding-3-small
+
+# Example openGauss config:
+# DATABASE_URL=postgresql://contexthub:ContextHub@123@localhost:15432/contexthub
+# DB_BACKEND=opengauss
diff --git a/README.md b/README.md
index ce18143..137a8d1 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,40 @@
+## 为ContextHub新增openGauss后端
+
+### 一、openGauss server配置
+
+* 参考`docs/setup/opengauss-setup-guide-zh.md`新增server后端
+* CREATE DATABASE时需要设定DBCOMPATIBILITY = 'PG'模式, 避免空字符串被interpret成NULL。
+
+### 二、Extension替换
+
+* 原ContextHub项目依赖pgvector+pgcrypto两个插件,这两个库在opengauss都不支持
+* 解决方案:opengauss 7.0.0原生支持DataVec,可以替换pgvector;opengauss不支持pgcrypto,替代方案uuid-ossp在官方镜像中也缺失无法安装,最终手动自定义实现`gen_random_uuid`函数,规避pgcrypto extension
+
+### 三、Python Driver兼容
+
+* 原ContextHub使用asyncpg库与db后端交互,但是asyncpg不支持opengauss的vector数据格式
+ * 报错信息为`message: unhandled standard data type 'vector' (OID 8305)`, 复现脚本为`opengauss/vector_asyncpg.py`
+ * gaussdb有一个自己维护的`async_gaussdb`库,但是一样不支持vector格式, 验证脚本为`opengauss/vector_async_gaussdb.py`
+* 解决方案:新增db兼容层,PG后端仍然使用asyncpg,OpenGauss后端切换到psycopg3连接
+ * psycopg3 的位置参数语法与asyncpg完全不同,一个是%s一个是$n,用正则匹配转换
+ * 兼容层处理语法转换,对外封装暴露统一的fetch/fetchrow/fetchall/execute接口
+ * 相关实现在`src/contexthub/db/repository.py`
+
+### 四、SQL Dialect转写
+
+* 原ContextHub使用postgres方言的SQL,多种语言特性与opengauss不兼容
+* 例如,openGauss不支持PG的INSERT ON CONFLICT (需要重写为ON DUPLICATE KEY UPDATE), 且不能与RETURNING语句, ROW POLICY同时使用
+* 全项目约20+条需要重写, 详细情况可见于报告`opengauss-compatibility-report.md`
+
+### 整体完成度
+
+* 步骤一、二、三进度100%,目前demo `opengauss/demo_e2e_opengauss.py` 可以成功执行前3个steps
+* 步骤四进度50%, 正在处理demo的第四个step的SQL转写,具体可见`ContextHub/src/contexthub/services/skill_service.py`的FIXME
+
+
+---
+

diff --git a/alembic/env.py b/alembic/env.py
index 01c87e1..bb913d4 100644
--- a/alembic/env.py
+++ b/alembic/env.py
@@ -7,6 +7,12 @@
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
+import os
+if os.environ['DB_BACKEND'] == 'opengauss':
+ # Force SQLAlchemy to treat openGauss as PostgreSQL 9.2 to bypass the version string error
+ from sqlalchemy.dialects.postgresql.base import PGDialect
+ PGDialect._get_server_version_info = lambda *args, **kwargs: (9, 2, 0)
+
PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = PROJECT_ROOT / "src"
if str(SRC_DIR) not in sys.path:
diff --git a/alembic/versions/001_initial_schema.py b/alembic/versions/001_initial_schema.py
index f73e50b..d71ff39 100644
--- a/alembic/versions/001_initial_schema.py
+++ b/alembic/versions/001_initial_schema.py
@@ -14,15 +14,35 @@
depends_on: Union[str, Sequence[str], None] = None
+def _is_opengauss() -> bool:
+ import os
+ return os.environ.get("DB_BACKEND", "postgres").lower() == "opengauss"
+
+
def upgrade() -> None:
+ opengauss = _is_opengauss()
+ uuid_default = "uuid_generate_opengauss()" if opengauss else "gen_random_uuid()"
+
# Extensions
- op.execute("CREATE EXTENSION IF NOT EXISTS vector")
- op.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto")
+ if opengauss:
+ # openGauss 7.0+ has DataVec built-in; avoid pgcrypto and uuid-ossp with
+ # customized random uuid generation function
+ op.execute("""
+ CREATE OR REPLACE FUNCTION uuid_generate_opengauss()
+ RETURNS uuid
+ LANGUAGE sql
+ AS $$
+ SELECT md5(random()::text || clock_timestamp()::text)::uuid;
+ $$;
+ """)
+ else:
+ op.execute("CREATE EXTENSION IF NOT EXISTS vector")
+ op.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto")
# --- contexts ---
- op.execute("""
+ op.execute(f"""
CREATE TABLE contexts (
- id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ id UUID PRIMARY KEY DEFAULT {uuid_default},
uri TEXT NOT NULL,
context_type TEXT NOT NULL CHECK (context_type IN ('table_schema', 'skill', 'memory', 'resource')),
scope TEXT NOT NULL CHECK (scope IN ('datalake', 'team', 'agent', 'user')),
@@ -79,9 +99,9 @@ def upgrade() -> None:
op.execute("CREATE INDEX idx_deps_dependent ON dependencies (dependent_id)")
# --- change_events (no RLS) ---
- op.execute("""
+ op.execute(f"""
CREATE TABLE change_events (
- event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ event_id UUID PRIMARY KEY DEFAULT {uuid_default},
timestamp TIMESTAMPTZ DEFAULT NOW(),
context_id UUID NOT NULL REFERENCES contexts(id),
account_id TEXT NOT NULL,
@@ -126,9 +146,9 @@ def upgrade() -> None:
""")
# --- teams ---
- op.execute("""
+ op.execute(f"""
CREATE TABLE teams (
- id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ id UUID PRIMARY KEY DEFAULT {uuid_default},
path TEXT NOT NULL,
parent_id UUID REFERENCES teams(id),
display_name TEXT,
@@ -247,7 +267,7 @@ def upgrade() -> None:
op.execute("CREATE INDEX idx_qt_context ON query_templates (context_id)")
# --- Seed data ---
- op.execute("""
+ op.execute(f"""
INSERT INTO teams (id, path, parent_id, display_name, account_id) VALUES
('00000000-0000-0000-0000-000000000001', '', NULL, '全组织', 'acme'),
('00000000-0000-0000-0000-000000000002', 'engineering', '00000000-0000-0000-0000-000000000001', '工程部', 'acme'),
diff --git a/docker-compose.opengauss.yml b/docker-compose.opengauss.yml
new file mode 100644
index 0000000..22db61f
--- /dev/null
+++ b/docker-compose.opengauss.yml
@@ -0,0 +1,13 @@
+services:
+ opengauss:
+ image: opengauss/opengauss-server:latest
+ privileged: true
+ ports:
+ - "15432:5432"
+ environment:
+ GS_PASSWORD: "Huawei@123"
+ volumes:
+ - ogdata:/var/lib/opengauss/data
+
+volumes:
+ ogdata:
diff --git a/docs/setup/opengauss-setup-guide-zh.md b/docs/setup/opengauss-setup-guide-zh.md
new file mode 100644
index 0000000..388682b
--- /dev/null
+++ b/docs/setup/opengauss-setup-guide-zh.md
@@ -0,0 +1,97 @@
+# openGauss 部署指南
+
+本文档介绍如何使用 openGauss 7.0+ 作为 ContextHub 的存储后端。
+
+## 前置条件
+
+- Docker 已安装
+- openGauss 7.0+ (内置 DataVec 向量能力)
+
+## 1. 启动 openGauss 容器
+
+```bash
+# 使用项目提供的 compose 文件
+docker compose -f docker-compose.opengauss.yml up -d
+
+# 或者手动启动
+docker pull opengauss/opengauss-server:latest
+docker run --name opengauss --privileged=true -d \
+ -e GS_PASSWORD=Huawei@123 \
+ -p 15432:5432 \
+ opengauss/opengauss-server:latest
+```
+
+## 2. 初始化数据库
+
+```bash
+docker exec -it opengauss bash
+su omm
+gsql -d postgres -p 5432
+```
+
+在 gsql 中执行:
+
+```sql
+CREATE USER contexthub WITH PASSWORD 'ContextHub@123' SYSADMIN;
+CREATE DATABASE contexthub OWNER contexthub DBCOMPATIBILITY = 'PG';
+```
+
+> **注意:**
+> - openGauss 7.0+ 内置 DataVec,无需创建 vector 扩展
+> - 使用自定义`uuid_generate_v4()` (`alembic/versions/001_initial_schema.py`),替代 PostgreSQL 的 `pgcrypto` + `gen_random_uuid()`
+> - openGauss 密码有强度约束,须包含大小写字母、数字和特殊字符
+> - 使用 `SYSADMIN` 而非 `SUPERUSER` 关键字
+> - 使用DBCOMPATIBILITY = 'PG'模式,保证空字符串/NULL值处理等与postgres统一。
+
+## 3. 配置 ContextHub
+
+编辑 `.env` 文件:
+
+```env
+DATABASE_URL=postgresql://contexthub:ContextHub%40123@
:15432/contexthub
+DB_BACKEND=opengauss
+```
+
+其中 `` 替换为 openGauss 服务器的实际 IP 地址。
+
+## 4. 运行数据库迁移
+
+```bash
+DB_BACKEND=opengauss alembic upgrade head
+```
+
+> 迁移脚本会根据 `DB_BACKEND` 环境变量自动选择:
+> - `opengauss`: 使用 `uuid_generate_v4()` 作为 UUID 默认值
+> - `postgres` (默认): 创建 `vector` + `pgcrypto` 扩展,使用 `gen_random_uuid()`
+
+## 5. 启动服务
+
+```bash
+DB_BACKEND=opengauss uvicorn contexthub.main:app --host 0.0.0.0 --port 8000
+```
+
+## 6. 安装 Python 依赖说明
+
+使用 openGauss 后端时,不需要安装 `pgvector` Python 包:
+
+```bash
+# openGauss 后端
+pip install .
+
+# PostgreSQL + pgvector 后端
+pip install ".[postgres]"
+```
+
+## 与 PostgreSQL 后端的差异
+
+| 特性 | PostgreSQL 16 | openGauss 7.0+ |
+|------|--------------|----------------|
+| 向量扩展 | pgvector (需安装) | DataVec (内置) |
+| UUID 函数 | `gen_random_uuid()` (pgcrypto) | `uuid_generate_v4()` (自定义) |
+| 向量类型 `vector(N)` | 兼容 | 兼容 |
+| 向量距离 `<=>` | 兼容 | 兼容 |
+| HNSW 索引 | 兼容 | 兼容 |
+| RLS | 兼容 | 兼容 |
+| `pg_notify`/`LISTEN` | 兼容 | 不兼容 |
+| asyncpg 驱动 | 兼容 | 部分兼容 |
+| 连接 URL 格式 | `postgresql://` | `postgresql://` |
diff --git a/opengauss-compatibility-report.md b/opengauss-compatibility-report.md
new file mode 100644
index 0000000..b0b1c1e
--- /dev/null
+++ b/opengauss-compatibility-report.md
@@ -0,0 +1,599 @@
+# ContextHub: PostgreSQL 16 → openGauss 7.0.0 SQL 兼容性分析报告
+
+> 分析时间: 2026-04-13
+>
+> 参考文档:
+> - https://github.com/opengauss-mirror
+> - https://docs.opengauss.org/zh/docs/7.0.0-RC1
+
+---
+
+## 一、项目概述
+
+ContextHub 是一个基于 **FastAPI + asyncpg + PostgreSQL 16** 的上下文管理 REST API。项目使用:
+
+- **asyncpg** 连接池执行原生参数化 SQL(无 ORM)
+- **Alembic** 管理 DDL 迁移(通过 `op.execute()` 执行原生 SQL)
+- **pgvector** 扩展做向量相似度检索
+- **pgcrypto** 扩展提供 `gen_random_uuid()`
+- **Row Level Security (RLS)** 实现多租户隔离
+- **LISTEN/NOTIFY** (`pg_notify`) 实现事件传播唤醒
+
+---
+
+## 二、SQL 语句全量清单
+
+以下按文件分类罗列仓库中所有实际执行的 SQL 语句。
+
+### 2.1 Alembic 迁移(DDL 层)
+
+#### `alembic/versions/001_initial_schema.py`
+
+| # | SQL 语句摘要 | 类型 |
+|---|---|---|
+| 1 | `CREATE EXTENSION IF NOT EXISTS vector` | DDL |
+| 2 | `CREATE EXTENSION IF NOT EXISTS pgcrypto` | DDL |
+| 3 | `CREATE TABLE contexts (... UUID PRIMARY KEY DEFAULT gen_random_uuid(), ... vector(1536), TEXT[], TIMESTAMPTZ, JSONB ...)` | DDL |
+| 4 | `ALTER TABLE contexts ENABLE ROW LEVEL SECURITY` | DDL |
+| 5 | `ALTER TABLE contexts FORCE ROW LEVEL SECURITY` | DDL |
+| 6 | `CREATE POLICY tenant_isolation ON contexts USING (account_id = current_setting('app.account_id'))` | DDL |
+| 7 | `CREATE INDEX idx_contexts_scope ON contexts (scope, context_type)` | DDL |
+| 8 | `CREATE INDEX idx_contexts_owner ON contexts (account_id, owner_space)` | DDL |
+| 9 | `CREATE INDEX idx_contexts_status ON contexts (status) WHERE status != 'deleted'` | DDL(部分索引) |
+| 10 | `CREATE INDEX idx_contexts_l0_embedding ON contexts USING hnsw (l0_embedding vector_cosine_ops) WITH (m=16, ef_construction=64)` | DDL(HNSW 向量索引) |
+| 11 | `CREATE TABLE dependencies (... SERIAL PRIMARY KEY ...)` | DDL |
+| 12 | `CREATE TABLE change_events (... UUID PRIMARY KEY DEFAULT gen_random_uuid(), JSONB, TIMESTAMPTZ ...)` | DDL |
+| 13 | `CREATE INDEX ... WHERE delivery_status IN ('pending', 'retry')` | DDL(部分索引) |
+| 14 | `CREATE INDEX ... WHERE delivery_status = 'processing'` | DDL(部分索引) |
+| 15 | `CREATE OR REPLACE FUNCTION notify_change_event() RETURNS trigger AS $$ ... pg_notify ... $$ LANGUAGE plpgsql` | DDL(触发器函数) |
+| 16 | `CREATE TRIGGER trg_change_events_notify AFTER INSERT ON change_events FOR EACH ROW EXECUTE FUNCTION notify_change_event()` | DDL(触发器) |
+| 17 | `CREATE TABLE teams (... UUID PRIMARY KEY DEFAULT gen_random_uuid() ...)` | DDL |
+| 18 | `CREATE POLICY tenant_isolation ON teams USING (...)` | DDL |
+| 19 | `CREATE TABLE team_memberships (... PRIMARY KEY (agent_id, team_id) ...)` | DDL |
+| 20 | `CREATE TABLE skill_versions (... PRIMARY KEY (skill_id, version) ...)` | DDL |
+| 21 | `CREATE TABLE skill_subscriptions (... SERIAL PRIMARY KEY ...)` | DDL |
+| 22 | `CREATE POLICY tenant_isolation ON skill_subscriptions USING (...)` | DDL |
+| 23 | `CREATE TABLE table_metadata (... UUID PRIMARY KEY REFERENCES ..., JSONB ...)` | DDL |
+| 24 | `CREATE TABLE lineage (... PRIMARY KEY (upstream_id, downstream_id) ...)` | DDL |
+| 25 | `CREATE TABLE table_relationships (... JSONB NOT NULL, FLOAT ...)` | DDL |
+| 26 | `CREATE TABLE query_templates (... SERIAL PRIMARY KEY ...)` | DDL |
+| 27 | `INSERT INTO teams VALUES (UUID literal, ...)` — 种子数据 | DML |
+| 28 | `INSERT INTO team_memberships VALUES (...)` — 种子数据 | DML |
+| 29 | `DROP TABLE IF EXISTS ... CASCADE` — 降级 | DDL |
+| 30 | `DROP FUNCTION IF EXISTS notify_change_event() CASCADE` — 降级 | DDL |
+
+#### `alembic/versions/002_force_row_level_security.py`
+
+| # | SQL 语句摘要 | 类型 |
+|---|---|---|
+| 31 | `ALTER TABLE contexts/teams/skill_subscriptions FORCE ROW LEVEL SECURITY` | DDL |
+| 32 | `ALTER TABLE ... NO FORCE ROW LEVEL SECURITY` — 降级 | DDL |
+
+### 2.2 数据库连接层
+
+#### `src/contexthub/db/repository.py`
+
+| # | SQL 语句 | 用途 |
+|---|---|---|
+| 33 | `SELECT set_config('app.account_id', $1, true)` | 设置事务级 GUC,实现 RLS 租户绑定 |
+
+### 2.3 应用服务层(DML)
+
+#### `src/contexthub/services/acl_service.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 34 | `SELECT scope, owner_space FROM contexts WHERE uri = $1 AND status != 'deleted'` | ACL 读/写检查 |
+| 35 | `WITH RECURSIVE visible_teams AS (... UNION ALL ...) SELECT DISTINCT path FROM visible_teams` | 递归 CTE 获取可见团队 |
+| 36 | `SELECT 1 FROM team_memberships tm JOIN teams t ON ... WHERE tm.agent_id = $1 AND t.path = $2 AND tm.access = 'read_write'` | 团队写权限检查 |
+
+#### `src/contexthub/store/context_store.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 37 | `SELECT {col} FROM contexts WHERE uri = $1 AND status != 'deleted'` | 读取上下文内容 |
+| 38 | `UPDATE contexts SET last_accessed_at = NOW() WHERE uri = $1` | 更新访问时间 |
+| 39 | `UPDATE contexts SET {col} = $1, ... version = version + 1 ... WHERE uri = $2 AND version = $3 ... RETURNING id, version` | 乐观锁写入 |
+| 40 | `SELECT 1 FROM contexts WHERE uri = $1 AND status != 'deleted'` | 存在性检查 |
+| 41 | `INSERT INTO change_events (context_id, account_id, change_type, actor) VALUES ($1, current_setting('app.account_id'), 'modified', $2)` | 变更事件记录 |
+| 42 | `SELECT uri, scope, owner_space, status FROM contexts WHERE uri LIKE $1 AND status != 'deleted'` | 目录列表 |
+| 43 | `SELECT id, uri, context_type, ... FROM contexts WHERE uri = $1 AND status != 'deleted'` | stat 查询 |
+
+#### `src/contexthub/services/context_service.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 44 | `INSERT INTO contexts (...) VALUES ($1,...,$9) RETURNING *` | 创建上下文 |
+| 45 | `INSERT INTO change_events ... VALUES ($1, current_setting('app.account_id'), 'created', $2)` | 变更事件 |
+| 46 | `UPDATE contexts SET {动态set子句} WHERE uri = $n AND version = $m AND status != 'deleted' RETURNING *` | 更新上下文 |
+| 47 | `UPDATE contexts SET status = 'deleted', deleted_at = NOW(), ... WHERE uri = $1 AND version = $2 ... RETURNING id` | 软删除 |
+| 48 | `SELECT id FROM contexts WHERE uri = $1 AND status != 'deleted'` | 依赖查询 |
+| 49 | `SELECT d.dep_type, d.pinned_version, c1.uri, c2.uri FROM dependencies d JOIN contexts c1 ... JOIN contexts c2 ... WHERE d.dependent_id = $1 OR d.dependency_id = $1` | 依赖图查询 |
+
+#### `src/contexthub/services/memory_service.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 50 | `INSERT INTO contexts (...) VALUES ($1, 'memory', 'agent', ..., current_setting('app.account_id'), ...) RETURNING *` | 添加记忆 |
+| 51 | `SELECT uri, l0_content, ... FROM contexts WHERE context_type = 'memory' AND scope IN ('agent','team') ... ORDER BY updated_at DESC` | 列出记忆 |
+| 52 | `SELECT * FROM contexts WHERE uri = $1 AND status != 'deleted'` | 读取源记忆 |
+| 53 | `INSERT INTO contexts (...) VALUES ($1, 'memory', 'team', ...) RETURNING *` | 促进记忆 |
+| 54 | `INSERT INTO dependencies (dependent_id, dependency_id, dep_type) VALUES ($1, $2, 'derived_from')` | 依赖关系 |
+| 55 | `INSERT INTO change_events (..., metadata) VALUES ($1, ..., $3)` | 变更事件(含元数据) |
+
+#### `src/contexthub/services/skill_service.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 56 | `SELECT id, context_type FROM contexts WHERE uri = $1 AND status != 'deleted'` | 技能检查 |
+| 57 | `SELECT id FROM contexts WHERE id = $1 FOR UPDATE` | 行锁防并发发布 |
+| 58 | `SELECT COALESCE(MAX(version), 0) FROM skill_versions WHERE skill_id = $1` | 获取最新版本号 |
+| 59 | `INSERT INTO skill_versions (...) VALUES ($1, $2, $3, $4, $5, 'published', $6, NOW())` | 插入技能版本 |
+| 60 | `UPDATE contexts SET l0_content=$1, l1_content=$2, l2_content=$3, version=$4 ... WHERE id=$5` | 更新技能头指针 |
+| 61 | `INSERT INTO change_events (..., new_version, metadata) VALUES (...)` | 版本发布事件 |
+| 62 | `SELECT ... FROM skill_versions WHERE skill_id=$1 AND status IN ('published','deprecated') ORDER BY version DESC` | 获取版本列表 |
+| 63 | `INSERT INTO skill_subscriptions ... ON CONFLICT (agent_id, skill_id) DO UPDATE SET pinned_version = EXCLUDED.pinned_version RETURNING *` | **UPSERT 订阅** |
+| 64 | `SELECT pinned_version FROM skill_subscriptions WHERE agent_id=$1 AND skill_id=$2` | 查询订阅 |
+| 65 | `SELECT MAX(version) FROM skill_versions WHERE skill_id=$1 AND status='published'` | 最新发布版本 |
+| 66 | `SELECT content, version, status FROM skill_versions WHERE ... AND status IN ('published','deprecated')` | 读取版本 |
+| 67 | `SELECT l2_content, version FROM contexts WHERE id = $1` | 读取技能内容 |
+| 68 | `SELECT 1 FROM skill_versions WHERE skill_id=$1 AND status='published' LIMIT 1` | 发布版本存在性 |
+
+#### `src/contexthub/services/retrieval_service.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 69 | `SELECT id, l2_content FROM contexts WHERE id IN ($1,$2,...)` | L2 按需加载 |
+| 70 | `UPDATE contexts SET active_count = active_count + 1, last_accessed_at = NOW() WHERE id = ANY($1)` | 更新活跃计数 |
+
+#### `src/contexthub/retrieval/vector_strategy.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 71 | `SELECT ... 1 - (l0_embedding <=> $1::vector) AS cosine_similarity FROM contexts WHERE ... ORDER BY l0_embedding <=> $1::vector LIMIT $n` | **pgvector 余弦相似度检索** |
+
+#### `src/contexthub/retrieval/keyword_strategy.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 72 | `SELECT ... (CASE WHEN LOWER(COALESCE(...)) LIKE $n THEN 1 ELSE 0 END + ...)::float / {max} AS cosine_similarity FROM contexts WHERE ... ORDER BY ... DESC LIMIT $n` | 关键词回退检索 |
+
+#### `src/contexthub/services/indexer_service.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 73 | `UPDATE contexts SET l0_embedding = NULL WHERE id = $1` | 清除向量嵌入 |
+| 74 | `SELECT id, l0_content FROM contexts WHERE l0_embedding IS NULL AND l0_content IS NOT NULL AND status IN ('active','stale') LIMIT $1` | 回填选择 |
+| 75 | `UPDATE contexts SET l0_embedding = $1::vector WHERE id = $2` | 写入向量嵌入 |
+
+#### `src/contexthub/services/catalog_sync_service.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 76 | `SELECT created_at, updated_at FROM contexts WHERE id = $1` | 判断新建/更新 |
+| 77 | `INSERT INTO contexts (...) VALUES (...) ON CONFLICT (account_id, uri) DO UPDATE SET ... RETURNING id, (xmax = 0) AS is_new` | **UPSERT + xmax 判断** |
+| 78 | `SELECT ddl FROM table_metadata WHERE context_id = $1` | DDL 变更检测 |
+| 79 | `INSERT INTO table_metadata (...) VALUES ($1,...,$6::jsonb,$7::jsonb, NOW()) ON CONFLICT (context_id) DO UPDATE SET ...` | **UPSERT 表元数据** |
+| 80 | `INSERT INTO change_events (...) VALUES (...)` | 变更事件 |
+| 81 | `UPDATE contexts SET version = version + 1 WHERE id = $1` | 版本递增 |
+| 82 | `SELECT id FROM contexts WHERE uri = $1 AND account_id = $2` | 定位上下文 |
+| 83 | `UPDATE contexts SET status = 'archived', archived_at = NOW() WHERE id = $1` | 归档 |
+| 84 | `SELECT dependent_id FROM dependencies WHERE dependency_id = $1 AND dep_type = 'table_schema'` | 查询依赖者 |
+| 85 | `UPDATE contexts SET status = 'stale', stale_at = NOW(), updated_at = NOW() WHERE id = $1 AND status NOT IN (...)` | 标记过期 |
+| 86 | `INSERT INTO table_relationships (...) VALUES ($1,$2,$3,$4::jsonb) ON CONFLICT ... DO UPDATE SET ...` | **UPSERT 表关系** |
+| 87 | `INSERT INTO lineage (...) VALUES ($1,$2,'fk',$3) ON CONFLICT ... DO NOTHING` | **UPSERT 血缘(DO NOTHING)** |
+| 88 | `SELECT c.uri, ... FROM contexts c JOIN table_metadata tm ON ... WHERE ... ORDER BY tm.table_name` | 列出同步表 |
+| 89 | `SELECT c.id, c.uri, ... FROM contexts c JOIN table_metadata tm ON ... WHERE ...` | 表详情 |
+| 90 | `SELECT tr.join_type, tr.join_columns, ... CASE WHEN ... FROM table_relationships tr LEFT JOIN contexts ...` | 关系查询 |
+| 91 | `SELECT sql_template, description, hit_count FROM query_templates WHERE context_id=$1 ORDER BY hit_count DESC LIMIT 5` | 查询模板 |
+| 92 | `WITH RECURSIVE upstream_lineage AS (... ARRAY[...]::uuid[] ... NOT l.upstream_id = ANY(ul.path) ...) SELECT DISTINCT ON (c.uri) ... ORDER BY c.uri, ul.depth ASC` | **递归 CTE 血缘上游** |
+| 93 | `WITH RECURSIVE downstream_lineage AS (...) SELECT DISTINCT ON (c.uri) ...` | **递归 CTE 血缘下游** |
+
+#### `src/contexthub/services/propagation_engine.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 94 | `UPDATE change_events SET delivery_status='retry', ... WHERE delivery_status='processing' AND claimed_at < NOW() - $1::interval` | 回收过期事件 |
+| 95 | `UPDATE change_events SET delivery_status='processing', ... WHERE event_id IN (SELECT ... WHERE context_id = $1::uuid AND delivery_status IN ('pending','retry') AND next_retry_at <= NOW() ORDER BY timestamp ASC LIMIT $2) RETURNING *` | 领取事件(按 context) |
+| 96 | `UPDATE change_events SET ... WHERE event_id IN (SELECT ... WHERE delivery_status IN ('pending','retry') ... LIMIT $1) RETURNING *` | 领取事件(全局) |
+| 97 | `SELECT dependent_id, dep_type, pinned_version, created_at FROM dependencies WHERE dependency_id=$1 AND created_at <= $2 ORDER BY ...` | 查询依赖者 |
+| 98 | `SELECT agent_id, pinned_version, created_at FROM skill_subscriptions WHERE skill_id=$1 AND created_at <= $2 ORDER BY ...` | 查询订阅者 |
+| 99 | `UPDATE contexts SET status='stale', stale_at=NOW(), updated_at=NOW() WHERE id=$1 AND status NOT IN (...)` | 标记过期 |
+| 100 | `INSERT INTO change_events (...) VALUES ($1,$2,'marked_stale','propagation_engine',$3)` | 变更事件 |
+| 101 | `SELECT id, context_type, l0_content, l1_content, l2_content FROM contexts WHERE id = $1` | 加载源上下文 |
+| 102 | `SELECT id, context_type, l2_content FROM contexts WHERE id = $1` | 加载依赖者 |
+| 103 | `UPDATE contexts SET l0_content=$1, l1_content=$2, updated_at=NOW() WHERE id=$3` | 自动更新派生投影 |
+| 104 | `UPDATE contexts SET l0_embedding = NULL WHERE id = $1` | 清除嵌入 |
+| 105 | `UPDATE change_events SET delivery_status='processed', processed_at=NOW(), ... WHERE event_id=$1` | 完成事件 |
+| 106 | `UPDATE change_events SET delivery_status='retry', ... next_retry_at = NOW() + make_interval(secs => LEAST(300, 5 * attempt_count)), ... WHERE event_id=$1` | **重试事件(make_interval)** |
+
+#### `src/contexthub/api/routers/datalake.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 107 | `SELECT c.id FROM contexts c JOIN table_metadata tm ON ... WHERE c.uri=$1 AND tm.catalog=$2 ...` | SQL 上下文定位 |
+| 108 | `SELECT c.id, c.uri, ... (SELECT jsonb_agg(jsonb_build_object(...)) FROM table_relationships tr ...) AS joins, (SELECT jsonb_agg(jsonb_build_object(...)) FROM (...) qt) AS top_templates FROM contexts c JOIN table_metadata tm ON ... WHERE c.id = ANY($1::uuid[]) AND tm.catalog=$2 ORDER BY array_position($1::uuid[], c.id)` | **SQL 上下文组装(jsonb_agg, jsonb_build_object, array_position)** |
+
+#### `src/contexthub/api/routers/tools.py` & `contexts.py`
+
+| # | SQL 语句摘要 | 用途 |
+|---|---|---|
+| 109 | `SELECT id, context_type FROM contexts WHERE uri=$1 AND status!='deleted'` | 路由分发 |
+| 110 | `UPDATE contexts SET last_accessed_at = NOW() WHERE uri = $1` | 访问时间更新 |
+| 111 | `SELECT context_type FROM contexts WHERE uri=$1 AND status != 'deleted'` | 类型检查 |
+
+### 2.4 脚本与测试
+
+| # | 文件 | SQL 语句 | 用途 |
+|---|---|---|---|
+| 112 | `scripts/demo_e2e.py` | `SET app.account_id = 'acme'` | 会话 GUC |
+| 113 | `scripts/demo_e2e.py` | `INSERT INTO team_memberships ... ON CONFLICT DO NOTHING` | **UPSERT** |
+| 114 | `tests/conftest.py` | `TRUNCATE contexts, ... CASCADE` | 测试清理 |
+
+---
+
+## 三、openGauss 7.0.0 兼容性逐项分析
+
+### 风险等级说明
+
+- 🔴 **阻断 (BLOCKER)** — 语法不支持,必须改写 SQL 否则无法执行
+- 🟡 **需适配 (ADAPTATION)** — 功能支持但语法/扩展名不同,需修改代码
+- 🟢 **兼容 (COMPATIBLE)** — 直接可用,无需修改
+
+---
+
+### 3.1 🔴 阻断级问题
+
+#### 3.1.1 `INSERT ... ON CONFLICT` 不支持
+
+**影响范围**: #63, #77, #79, #86, #87, #113(共 6 处)
+
+**问题描述**:
+openGauss 7.0.0 **不支持** PostgreSQL 的 `INSERT ... ON CONFLICT (columns) DO UPDATE SET ... / DO NOTHING` 语法。这是 PostgreSQL 9.5+ 引入的 upsert 语法,但 openGauss 至今未实现。
+
+**涉及文件**:
+- `src/contexthub/services/skill_service.py` — `INSERT INTO skill_subscriptions ... ON CONFLICT (agent_id, skill_id) DO UPDATE ... RETURNING *`
+- `src/contexthub/services/catalog_sync_service.py` — 4 处 `ON CONFLICT ... DO UPDATE / DO NOTHING`
+- `scripts/demo_e2e.py` — `ON CONFLICT DO NOTHING`
+
+**openGauss 替代方案**:
+
+openGauss 提供两种替代:
+
+**方案 A: `ON DUPLICATE KEY UPDATE`(推荐简单场景)**
+```sql
+-- PostgreSQL 原始
+INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id)
+VALUES ($1, $2, $3, current_setting('app.account_id'))
+ON CONFLICT (agent_id, skill_id)
+DO UPDATE SET pinned_version = EXCLUDED.pinned_version
+RETURNING *;
+
+-- openGauss 改写
+INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id)
+VALUES ($1, $2, $3, current_setting('app.account_id'))
+ON DUPLICATE KEY UPDATE pinned_version = EXCLUDED.pinned_version;
+```
+
+> ⚠️ **重要限制**: openGauss 的 `ON DUPLICATE KEY UPDATE` **不支持与 `RETURNING` 子句一起使用**。SQL #63 和 #77 同时使用了 `ON CONFLICT ... RETURNING *`,改写后需要拆分为两步操作(先 upsert,再 SELECT)。
+
+**方案 B: `MERGE INTO`(推荐复杂场景)**
+```sql
+MERGE INTO skill_subscriptions t
+USING (SELECT $1 AS agent_id, $2 AS skill_id, $3 AS pinned_version) s
+ON (t.agent_id = s.agent_id AND t.skill_id = s.skill_id)
+WHEN MATCHED THEN UPDATE SET pinned_version = s.pinned_version
+WHEN NOT MATCHED THEN INSERT (agent_id, skill_id, pinned_version, account_id)
+ VALUES (s.agent_id, s.skill_id, s.pinned_version, current_setting('app.account_id'));
+```
+
+**对于 `ON CONFLICT DO NOTHING`**: 可改写为 `INSERT ... ON DUPLICATE KEY UPDATE NOTHING`。
+
+---
+
+#### 3.1.2 `ON CONFLICT ... RETURNING *` + `(xmax = 0) AS is_new` 组合不可用
+
+**影响范围**: #77(1 处,但属于核心同步逻辑)
+
+**问题描述**:
+`catalog_sync_service.py` 中的核心 upsert 语句同时使用了三个 openGauss 不兼容的特性:
+1. `ON CONFLICT ... DO UPDATE SET ...` — 语法不支持
+2. `RETURNING *` — 与 upsert 组合不支持
+3. `(xmax = 0) AS is_new` — 利用 PostgreSQL 内部系统列判断是否是新插入行
+
+**涉及代码**:
+```sql
+INSERT INTO contexts (...) VALUES (...)
+ON CONFLICT (account_id, uri) DO UPDATE SET ...
+RETURNING id, (xmax = 0) AS is_new
+```
+
+虽然 openGauss 支持 `xmax` 系统隐藏列,但由于 `ON CONFLICT` 和 `RETURNING` 的双重限制,此查询需要完全重构。
+
+**改写建议**:
+```sql
+-- 先尝试 SELECT
+SELECT id FROM contexts WHERE account_id = $4 AND uri = $1;
+-- 如果不存在则 INSERT,如果存在则 UPDATE
+-- 根据操作类型在应用层确定 is_new
+```
+
+---
+
+#### 3.1.3 `CREATE EXTENSION IF NOT EXISTS vector` — pgvector 不存在
+
+**影响范围**: #1(影响整个向量检索功能链:#10, #71, #73, #74, #75)
+
+**问题描述**:
+openGauss **不包含 pgvector 扩展**。openGauss 提供的是自研的 **DataVec** 扩展,虽然提供了相同的 `vector` 数据类型和 `<=>` 余弦距离操作符,但扩展名和安装方式不同。
+
+**改写方法**:
+```sql
+-- PostgreSQL
+CREATE EXTENSION IF NOT EXISTS vector;
+
+-- openGauss
+CREATE EXTENSION IF NOT EXISTS datavec;
+```
+
+DataVec 支持:
+- ✅ `vector(1536)` 数据类型(最大 16000 维,索引最大 2000 维;1536 维在索引限制内)
+- ✅ `<=>` 余弦距离操作符
+- ✅ HNSW 索引 + `vector_cosine_ops` 操作符类
+- ✅ `$1::vector` 类型转换
+- ✅ `WITH (m=16, ef_construction=64)` 索引参数
+
+因此向量相关的 DML(#71, #73, #75 等)**在安装 DataVec 后可直接使用**,无需修改。
+
+---
+
+#### 3.1.4 `CREATE EXTENSION IF NOT EXISTS pgcrypto` — pgcrypto 不存在
+
+**影响范围**: #2 + 所有使用 `gen_random_uuid()` 的表定义(#3, #12, #17 等)
+
+**问题描述**:
+openGauss **不支持 pgcrypto 扩展**。`pgcrypto` 的控制文件在 openGauss 发行版中不包含,`CREATE EXTENSION pgcrypto` 会直接报错。
+
+项目使用 `pgcrypto` 的唯一目的是提供 `gen_random_uuid()` 函数。
+
+**改写方法**:
+需确认 openGauss 是否内置 `gen_random_uuid()`。根据 openGauss 文档,`gen_random_uuid()` 从较新版本开始作为内置函数提供(无需扩展)。如果 7.0.0 版本中已内置,则只需删除 `CREATE EXTENSION IF NOT EXISTS pgcrypto` 语句即可。
+
+若未内置,则需要使用 `uuid-ossp` 扩展或者 `sys_guid()` 函数替代:
+```sql
+-- 替代方案 1: 使用 uuid-ossp
+CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
+-- 然后将 DEFAULT gen_random_uuid() 替换为 DEFAULT uuid_generate_v4()
+
+-- 替代方案 2: 使用 sys_guid()
+-- 将列默认值改为 DEFAULT sys_guid()
+```
+
+---
+
+#### 3.1.5 `CREATE POLICY` 语法差异
+
+**影响范围**: #6, #18, #22(共 3 处)
+
+**问题描述**:
+PostgreSQL 使用 `CREATE POLICY name ON table`,而 openGauss 使用 `CREATE ROW LEVEL SECURITY POLICY name ON table`。
+
+**涉及代码**:
+```sql
+-- PostgreSQL 原始
+CREATE POLICY tenant_isolation ON contexts
+ USING (account_id = current_setting('app.account_id'))
+
+-- openGauss 必须改为
+CREATE ROW LEVEL SECURITY POLICY tenant_isolation ON contexts
+ USING (account_id = current_setting('app.account_id'))
+```
+
+三处 `CREATE POLICY` 都需要加上 `ROW LEVEL SECURITY` 关键词。
+
+---
+
+#### 3.1.6 `CREATE TRIGGER ... EXECUTE FUNCTION` 语法差异
+
+**影响范围**: #16(1 处)
+
+**问题描述**:
+PostgreSQL 11+ 推荐使用 `EXECUTE FUNCTION`,而 openGauss 的常规触发器只支持 `EXECUTE PROCEDURE` 语法。
+
+**改写方法**:
+```sql
+-- PostgreSQL
+CREATE TRIGGER trg_change_events_notify
+AFTER INSERT ON change_events
+FOR EACH ROW EXECUTE FUNCTION notify_change_event();
+
+-- openGauss
+CREATE TRIGGER trg_change_events_notify
+AFTER INSERT ON change_events
+FOR EACH ROW EXECUTE PROCEDURE notify_change_event();
+```
+
+---
+
+#### 3.1.7 `make_interval()` 函数不支持
+
+**影响范围**: #106(1 处)
+
+**问题描述**:
+openGauss 不支持 PostgreSQL 的 `make_interval(secs => ...)` 函数。
+
+**涉及代码**:
+```sql
+next_retry_at = NOW() + make_interval(secs => LEAST(300, 5 * attempt_count))
+```
+
+**改写方法**:
+```sql
+-- 使用 interval 乘法替代
+next_retry_at = NOW() + (LEAST(300, 5 * attempt_count) || ' seconds')::interval
+-- 或
+next_retry_at = NOW() + (LEAST(300, 5 * attempt_count) * interval '1 second')
+```
+
+---
+
+### 3.2 🟡 需适配问题
+
+#### 3.2.1 `LISTEN/NOTIFY` 及 `pg_notify` 可用性不确定
+
+**影响范围**: 传播引擎核心机制(#15 触发器函数中的 `pg_notify` + `PropagationEngine` 的 `LISTEN`)
+
+**问题描述**:
+openGauss 的系统函数列表中包含 `pg_notify`(已在 7.0.0-RC1 文档中确认),但 `LISTEN` 语句的支持情况在官方文档中没有明确说明。openGauss 的函数文档声明"内置函数和操作符继承自开源 PG",暗示可能支持,但需要实际验证。
+
+**风险**:
+如果 `LISTEN` 不支持,则整个事件传播的实时唤醒机制将失效。但由于 `PropagationEngine` 本身有周期性唤醒(`_periodic_wakeup`)作为兜底,系统不会完全中断,只是实时性会降低为定时轮询。
+
+**建议**: 在 openGauss 环境中实测 `LISTEN 'context_changed'` 和 `pg_notify('context_changed', ...)` 是否可用。
+
+#### 3.2.2 `array_position()` 函数可用性不确定
+
+**影响范围**: #108(1 处)
+
+**问题描述**:
+`datalake.py` 中使用 `ORDER BY array_position($1::uuid[], c.id)` 来保持结果排序与输入数组一致。openGauss 的数组函数文档未明确列出 `array_position`,但其文档声明"函数继承自 PG"。
+
+**改写方法(如不支持)**:
+```sql
+-- 使用子查询 + 生成序列替代
+ORDER BY (SELECT i FROM generate_subscripts($1::uuid[], 1) i WHERE ($1::uuid[])[i] = c.id LIMIT 1)
+```
+
+#### 3.2.3 `jsonb_agg` 聚合函数可用性不确定
+
+**影响范围**: #108(1 处)
+
+**问题描述**:
+`datalake.py` 的 SQL 上下文组装查询大量使用 `jsonb_agg(jsonb_build_object(...))` 子查询。`jsonb_build_object` 已确认支持,但 `jsonb_agg` 在 openGauss 文档中未明确提及。
+
+**改写方法(如不支持)**:
+可用 `json_agg` 替代后转 jsonb,或使用 `array_agg` + `array_to_json` 组合。
+
+#### 3.2.4 asyncpg 驱动不兼容
+
+**影响范围**: 整个数据访问层
+
+**问题描述**:
+标准 `asyncpg` 库是为 PostgreSQL 协议优化的,直接连接 openGauss 可能因认证协议差异(openGauss 使用 SHA256 认证)而失败。
+
+**替代方案**:
+华为提供了 **`async-gaussdb`** 库,这是 asyncpg 的 openGauss 适配分支,API 与 asyncpg 基本兼容,支持 SHA256 认证。
+
+```bash
+pip install async-gaussdb
+```
+
+代码中需要将 `import asyncpg` 替换为 `import async_gaussdb as asyncpg`(或按库的实际 API 调整)。
+
+---
+
+### 3.3 🟢 兼容项(无需修改)
+
+以下 SQL 特性在 openGauss 7.0.0 中**已确认兼容**:
+
+| 特性 | 涉及 SQL # | 说明 |
+|---|---|---|
+| `set_config()` / `current_setting()` | #33, #41, #45 等 | openGauss 完全支持 |
+| `WITH RECURSIVE` 递归 CTE | #35, #92, #93 | openGauss 完全支持 |
+| `UNION ALL` | #35, #92, #93 | 标准 SQL,完全支持 |
+| `SELECT ... FOR UPDATE` 行锁 | #57 | openGauss 完全支持 |
+| `UPDATE ... RETURNING` | #39, #44, #46, #47 等 | openGauss 支持(列存表除外) |
+| `INSERT ... RETURNING` | #44, #50, #53 | openGauss 支持 |
+| `DISTINCT ON (expression)` | #92, #93 | openGauss 完全支持 |
+| `ALTER TABLE ... ENABLE/FORCE ROW LEVEL SECURITY` | #4, #5, #31, #32 | openGauss 完全支持 |
+| `TIMESTAMPTZ` 数据类型 | 所有表 | openGauss 完全支持 |
+| `JSONB` 数据类型与 `$n::jsonb` 转换 | #12, #23, #25, #79 | openGauss 完全支持 |
+| `TEXT[]` 数组类型 | #3 (tags 字段) | openGauss 支持 |
+| `= ANY($1)` 数组操作符 | #70, #92, #93 | openGauss 完全支持 |
+| `SERIAL` 自增类型 | #11, #21, #26 | openGauss 完全支持 |
+| `UUID` 数据类型 | 所有表 | openGauss 完全支持 |
+| `COALESCE()`, `LEAST()`, `NOW()` | 多处 | openGauss 完全支持 |
+| `LIKE` / `ILIKE` | #42, #72 | openGauss 完全支持 |
+| `CASE WHEN ... THEN ... ELSE ... END` | #72, #90 | openGauss 完全支持 |
+| 部分索引 (Partial Index) `CREATE INDEX ... WHERE ...` | #9, #13, #14 | openGauss 完全支持 |
+| `PL/pgSQL` 函数语言 | #15 | openGauss 完全支持 |
+| `TRUNCATE ... CASCADE` | #114 | openGauss 完全支持 |
+| `DROP TABLE IF EXISTS ... CASCADE` | #29 | openGauss 完全支持 |
+| `CHECK` 约束 | 多表 | openGauss 完全支持 |
+| `UNIQUE` 约束 | 多表 | openGauss 完全支持 |
+| `REFERENCES` 外键约束 | 多表 | openGauss 完全支持 |
+| `$1::interval` 类型转换 | #94 | openGauss 完全支持 |
+| `$1::uuid` 类型转换 | #95 | openGauss 完全支持 |
+| `FLOAT` 数据类型 | #25 | openGauss 完全支持 |
+| `HNSW` 索引(通过 DataVec) | #10 | openGauss DataVec 支持,语法一致 |
+| `<=>` 余弦距离操作符(通过 DataVec) | #71 | openGauss DataVec 支持 |
+| `vector_cosine_ops` 操作符类(通过 DataVec) | #10 | openGauss DataVec 支持 |
+
+---
+
+## 四、改写工作量汇总
+
+| 严重程度 | 数量 | 涉及文件数 | 描述 |
+|---|---|---|---|
+| 🔴 阻断 | 7 类问题 | ~8 个文件 | 必须改写否则无法运行 |
+| 🟡 适配 | 4 类问题 | ~5 个文件 | 需验证或小幅调整 |
+| 🟢 兼容 | ~90+ 条 SQL | — | 无需修改 |
+
+### 必须修改的文件清单
+
+| 文件 | 修改项 |
+|---|---|
+| `alembic/versions/001_initial_schema.py` | `CREATE EXTENSION vector` → `datavec`; 删除 `pgcrypto`(或替换 UUID 方案); `CREATE POLICY` → `CREATE ROW LEVEL SECURITY POLICY`(3处); `EXECUTE FUNCTION` → `EXECUTE PROCEDURE`(1处) |
+| `src/contexthub/services/skill_service.py` | `ON CONFLICT ... DO UPDATE ... RETURNING *` → `MERGE INTO` 或 `ON DUPLICATE KEY UPDATE` + 单独 SELECT |
+| `src/contexthub/services/catalog_sync_service.py` | 4 处 `ON CONFLICT` → `MERGE INTO` 或 `ON DUPLICATE KEY UPDATE`; `RETURNING id, (xmax = 0) AS is_new` → 拆分为先查后改 |
+| `src/contexthub/services/propagation_engine.py` | `make_interval(secs => ...)` → `(... \|\| ' seconds')::interval` |
+| `src/contexthub/api/routers/datalake.py` | 若 `array_position` / `jsonb_agg` 不可用则需改写 |
+| `src/contexthub/db/pool.py` / `repository.py` | `asyncpg` → `async-gaussdb` 驱动替换 |
+| `scripts/demo_e2e.py` | `ON CONFLICT DO NOTHING` → `ON DUPLICATE KEY UPDATE NOTHING` |
+| `pyproject.toml` / `requirements` | 依赖替换: `asyncpg` → `async-gaussdb`; `pgvector` python 包评估 |
+| `docker-compose.yml` | `pgvector/pgvector:pg16` 镜像 → openGauss 7.0.0 镜像 |
+
+---
+
+## 五、迁移建议
+
+### 5.1 推荐使用 PG 兼容模式
+
+创建 openGauss 数据库时建议使用 **PG 兼容模式**:
+```sql
+CREATE DATABASE contexthub DBCOMPATIBILITY = 'PG';
+```
+这将最大程度保留 PostgreSQL 语法兼容性(如 `LIMIT/OFFSET`、`||` 字符串拼接、空字符串处理等)。
+
+### 5.2 迁移优先级
+
+1. **P0 — 驱动层**: 将 `asyncpg` 替换为 `async-gaussdb`,这是最基础的连接层变更
+2. **P0 — 扩展替换**: `pgvector` → `datavec`,`pgcrypto` → 内置 UUID 或 `uuid-ossp`
+3. **P0 — DDL 语法**: 修改 `CREATE POLICY` 和 `EXECUTE FUNCTION` 语法
+4. **P1 — UPSERT 改写**: 6 处 `ON CONFLICT` 改为 `MERGE INTO` 或 `ON DUPLICATE KEY UPDATE`
+5. **P1 — 函数替换**: `make_interval` → interval 表达式
+6. **P2 — 验证**: `LISTEN/NOTIFY`、`array_position`、`jsonb_agg` 实测验证
+7. **P3 — 基础设施**: Docker 镜像、CI/CD 流水线调整
+
+### 5.3 建议引入兼容层
+
+考虑在 `ScopedRepo` 层(`db/repository.py`)引入 SQL 方言抽象层,使同一份业务代码可以同时支持 PostgreSQL 和 openGauss:
+
+```python
+class SQLDialect:
+ def upsert(self, table, conflict_cols, update_cols, returning=None):
+ """根据数据库类型生成不同的 UPSERT 语法"""
+ ...
+```
+
+---
+
+## 六、结论
+
+将 ContextHub 从 PostgreSQL 16 迁移到 openGauss 7.0.0 是**可行的**,但需要处理 **7 类阻断级兼容性问题**。其中最大的工程量在于:
+
+1. **`ON CONFLICT` UPSERT 语法改写**(6 处,涉及 3 个核心服务文件,其中 2 处还组合了 `RETURNING`)
+2. **扩展替换**(`pgvector` → `datavec`,`pgcrypto` → 内置方案)
+3. **asyncpg 驱动替换**(→ `async-gaussdb`)
+
+大部分标准 SQL(SELECT/UPDATE/INSERT/DELETE、JOIN、递归 CTE、行锁、RLS 等)在 openGauss 7.0.0 PG 兼容模式下可以直接使用,整体兼容性较好。预计核心代码改动集中在约 **8 个源文件**中,业务逻辑本身无需变更。
diff --git a/opengauss/cleanup_demo_data.py b/opengauss/cleanup_demo_data.py
new file mode 100644
index 0000000..047e3e9
--- /dev/null
+++ b/opengauss/cleanup_demo_data.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python3
+"""Clean data possibly inserted by demo_e2e_opengauss.py.
+
+Usage:
+ python clean_for_opengauss.py
+
+Environment:
+ DEMO_DB_DSN Optional asyncpg DSN. Defaults to local openGauss DSN used by demo.
+ DEMO_ACCOUNT Optional account id. Defaults to "acme".
+"""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import sys
+from typing import Iterable
+from uuid import UUID
+
+import asyncpg
+
+
+DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub"
+DEFAULT_ACCOUNT = "acme"
+
+DEMO_SKILL_URI = "ctx://team/engineering/skills/sql-generator"
+DEMO_MEMORY_CONTENT = (
+ "The orders table uses user_id as FK to users. "
+ "Always JOIN on orders.user_id = users.id."
+)
+
+
+def _format_count(tag: str, count: str) -> str:
+ return f" - {tag}: {count}"
+
+
+async def _collect_target_contexts(conn: asyncpg.Connection) -> list[asyncpg.Record]:
+ return await conn.fetch(
+ """
+ SELECT id, uri
+ FROM contexts
+ WHERE account_id = current_setting('app.account_id')
+ AND (
+ uri = $1
+ OR (
+ uri LIKE 'ctx://agent/query-agent/memories/mem-%'
+ AND l2_content = $2
+ )
+ OR (
+ uri LIKE 'ctx://team/engineering/memories/shared_knowledge/mem-%'
+ AND l2_content = $2
+ )
+ OR uri LIKE 'ctx://datalake/mock/%'
+ )
+ ORDER BY uri
+ """,
+ DEMO_SKILL_URI,
+ DEMO_MEMORY_CONTENT,
+ )
+
+
+async def _delete_by_context_ids(conn: asyncpg.Connection, context_ids: Iterable[UUID]) -> None:
+ ids = list(context_ids)
+ if not ids:
+ print(" - No matching contexts found, nothing to delete.")
+ return
+
+ print(_format_count("target contexts", str(len(ids))))
+
+ statements: list[tuple[str, str]] = [
+ (
+ "query_templates",
+ "DELETE FROM query_templates WHERE context_id = ANY($1::uuid[])",
+ ),
+ (
+ "table_relationships",
+ "DELETE FROM table_relationships WHERE table_id_a = ANY($1::uuid[]) OR table_id_b = ANY($1::uuid[])",
+ ),
+ (
+ "lineage",
+ "DELETE FROM lineage WHERE upstream_id = ANY($1::uuid[]) OR downstream_id = ANY($1::uuid[])",
+ ),
+ (
+ "table_metadata",
+ "DELETE FROM table_metadata WHERE context_id = ANY($1::uuid[])",
+ ),
+ (
+ "skill_subscriptions",
+ "DELETE FROM skill_subscriptions WHERE skill_id = ANY($1::uuid[])",
+ ),
+ (
+ "skill_versions",
+ "DELETE FROM skill_versions WHERE skill_id = ANY($1::uuid[])",
+ ),
+ (
+ "dependencies",
+ "DELETE FROM dependencies WHERE dependent_id = ANY($1::uuid[]) OR dependency_id = ANY($1::uuid[])",
+ ),
+ (
+ "change_events",
+ "DELETE FROM change_events WHERE context_id = ANY($1::uuid[])",
+ ),
+ (
+ "contexts",
+ "DELETE FROM contexts WHERE id = ANY($1::uuid[])",
+ ),
+ ]
+
+ for label, sql in statements:
+ result = await conn.execute(sql, ids)
+ print(_format_count(label, result))
+
+
+async def main() -> None:
+ dsn = os.environ.get("DEMO_DB_DSN", DEFAULT_DSN)
+ account = os.environ.get("DEMO_ACCOUNT", DEFAULT_ACCOUNT)
+
+ print("Cleaning demo data for openGauss...")
+ print(f" - account: {account}")
+ print(f" - dsn: {dsn}")
+
+ conn = await asyncpg.connect(dsn)
+ try:
+ async with conn.transaction():
+ await conn.execute("SELECT set_config('app.account_id', $1, true)", account)
+
+ targets = await _collect_target_contexts(conn)
+ for row in targets:
+ print(f" - match: {row['uri']}")
+
+ await _delete_by_context_ids(conn, [row["id"] for row in targets])
+ finally:
+ await conn.close()
+
+ print("Cleanup complete.")
+
+
+if __name__ == "__main__":
+ try:
+ asyncio.run(main())
+ except Exception as exc: # pragma: no cover
+ print(f"Cleanup failed: {exc}", file=sys.stderr)
+ raise
diff --git a/opengauss/connection_test.py b/opengauss/connection_test.py
new file mode 100644
index 0000000..2f01515
--- /dev/null
+++ b/opengauss/connection_test.py
@@ -0,0 +1,75 @@
+import psycopg2
+from psycopg2 import OperationalError
+
+def test_opengauss_connection():
+ conn = None
+ try:
+ # Connect using explicit parameters to safely handle the '@' in the password
+ print("Connecting to openGauss database...")
+ conn = psycopg2.connect(
+ host="localhost", # Replace with your actual host IP/domain
+ port="15432",
+ dbname="contexthub",
+ user="contexthub",
+ password="ContextHub@123"
+ )
+ print("✅ Connection successful!")
+
+ # Create a cursor to execute SQL commands
+ cur = conn.cursor()
+
+ # --- WRITE TEST ---
+ print("\nTesting Write Operation...")
+
+ # 1. Create a temporary test table
+ cur.execute("""
+ CREATE TABLE IF NOT EXISTS python_test_table (
+ id SERIAL PRIMARY KEY,
+ message VARCHAR(255)
+ )
+ """)
+
+ # 2. Insert data and return the ID
+ insert_query = "INSERT INTO python_test_table (message) VALUES (%s) RETURNING id;"
+ cur.execute(insert_query, ("Hello from Python to openGauss!",))
+
+ # Fetch the ID of the newly inserted row
+ inserted_id = cur.fetchone()[0]
+
+ # Commit the transaction to save the write
+ conn.commit()
+ print(f"✅ Write successful! Inserted row with ID: {inserted_id}")
+
+ # --- READ TEST ---
+ print("\nTesting Read Operation...")
+
+ # Select the data we just inserted
+ select_query = "SELECT id, message FROM python_test_table WHERE id = %s;"
+ cur.execute(select_query, (inserted_id,))
+
+ record = cur.fetchone()
+ print(f"✅ Read successful! Fetched data: {record}")
+
+ # --- CLEANUP ---
+ print("\nCleaning up test data...")
+ cur.execute("DROP TABLE python_test_table;")
+ conn.commit()
+ print("✅ Table dropped successfully!")
+
+ except OperationalError as e:
+ print(f"❌ Connection failed: {e}")
+ except Exception as e:
+ print(f"❌ An error occurred during database operations: {e}")
+ # Rollback the transaction on error
+ if conn is not None:
+ conn.rollback()
+ finally:
+ # Always close the cursor and connection
+ if conn is not None:
+ cur.close()
+ conn.close()
+ print("\nDatabase connection closed.")
+
+if __name__ == "__main__":
+ test_opengauss_connection()
+
diff --git a/opengauss/demo_e2e_opengauss.py b/opengauss/demo_e2e_opengauss.py
new file mode 100644
index 0000000..9bef4b2
--- /dev/null
+++ b/opengauss/demo_e2e_opengauss.py
@@ -0,0 +1,193 @@
+"""ContextHub MVP End-to-End Demo
+
+Prerequisites:
+ docker-compose up -d
+ alembic upgrade head
+ uvicorn contexthub.main:app --port 8000
+
+Usage:
+ python demo_e2e.py
+"""
+
+from __future__ import annotations
+
+import asyncio
+import sys
+import time
+
+import httpx
+
+
+BASE_URL = "http://localhost:8000"
+API_KEY = "changeme"
+ACCOUNT = "acme"
+
+
+def _headers(agent_id: str) -> dict:
+ return {
+ "X-API-Key": API_KEY,
+ "X-Account-Id": ACCOUNT,
+ "X-Agent-Id": agent_id,
+ }
+
+
+def step(n: int, desc: str):
+ print(f"\n{'='*60}")
+ print(f" Step {n}: {desc}")
+ print(f"{'='*60}")
+
+
+async def _ensure_team_membership():
+ """Ensure query-agent is a direct member of engineering (seed data only has engineering/backend)."""
+ import asyncpg
+ conn = await asyncpg.connect("postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub")
+ try:
+ await conn.execute("SET app.account_id = 'acme'")
+ await conn.execute("""
+ INSERT INTO team_memberships (agent_id, team_id, role, access, is_primary)
+ VALUES ('query-agent', '00000000-0000-0000-0000-000000000002', 'member', 'read_write', FALSE)
+ ON DUPLICATE KEY UPDATE NOTHING
+ """)
+ finally:
+ await conn.close()
+
+
+async def main():
+ await _ensure_team_membership()
+
+ async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as http:
+ # Health check
+ r = await http.get("/health")
+ if r.status_code != 200:
+ print("Server not reachable. Start it first.")
+ sys.exit(1)
+ print("Server healthy.")
+
+ qa = _headers("query-agent")
+ aa = _headers("analysis-agent")
+
+ # ── Step 1: query-agent writes private memory ──
+ step(1, "query-agent writes private memory")
+ r = await http.post("/api/v1/memories", json={
+ "content": "The orders table uses user_id as FK to users. Always JOIN on orders.user_id = users.id.",
+ "tags": ["schema-note", "orders"],
+ }, headers=qa)
+ assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}"
+ mem = r.json()
+ mem_uri = mem["uri"]
+ print(f" Created: {mem_uri}")
+
+ # ── Step 2: query-agent creates and publishes Skill v1 ──
+ step(2, "query-agent publishes sql-generator Skill v1")
+ # Create skill context first
+ r = await http.post("/api/v1/contexts", json={
+ "uri": "ctx://team/engineering/skills/sql-generator",
+ "context_type": "skill",
+ "scope": "team",
+ "owner_space": "engineering",
+ "l2_content": "SELECT * FROM orders WHERE status = 'completed'",
+ }, headers=qa)
+ assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}"
+ skill_ctx = r.json()
+ print(f" Skill context: {skill_ctx['uri']}")
+
+ r = await http.post("/api/v1/skills/versions", json={
+ "skill_uri": "ctx://team/engineering/skills/sql-generator",
+ "content": "v1: Basic SQL generator for orders queries",
+ "changelog": "Initial release",
+ "is_breaking": False,
+ }, headers=qa)
+ assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}"
+ v1 = r.json()
+ print(f" Published v{v1['version']}")
+
+ # ── Step 3: query-agent promotes memory to team ──
+ step(3, "query-agent promotes memory to team/engineering")
+ r = await http.post("/api/v1/memories/promote", json={
+ "uri": mem_uri,
+ "target_team": "engineering",
+ }, headers=qa)
+ assert r.status_code == 201, f"Expected 201, got {r.status_code}: {r.text}"
+ promoted = r.json()
+ print(f" Promoted to: {promoted['uri']}")
+
+ # ── Step 4: analysis-agent sees shared memory + subscribes to skill ──
+ step(4, "analysis-agent retrieves shared memory and subscribes to skill")
+ r = await http.get("/api/v1/memories", headers=aa)
+ assert r.status_code == 200
+ memories = r.json()
+ shared = [m for m in memories if "shared_knowledge" in m["uri"]]
+ print(f" analysis-agent sees {len(shared)} shared memories")
+ assert len(shared) >= 1, "Promoted memory not visible!"
+
+ r = await http.post("/api/v1/skills/subscribe", json={
+ "skill_uri": "ctx://team/engineering/skills/sql-generator",
+ "pinned_version": 1,
+ }, headers=aa)
+ assert r.status_code == 200, f"Subscribe failed: {r.text}"
+ print(f" Subscribed to sql-generator, pinned v1")
+
+ # ── Step 5: query-agent publishes breaking Skill v2 ──
+ step(5, "query-agent publishes breaking Skill v2")
+ r = await http.post("/api/v1/skills/versions", json={
+ "skill_uri": "ctx://team/engineering/skills/sql-generator",
+ "content": "v2: Rewritten SQL generator with CTE support",
+ "changelog": "Breaking: new output format",
+ "is_breaking": True,
+ }, headers=qa)
+ assert r.status_code == 201
+ v2 = r.json()
+ print(f" Published v{v2['version']} (breaking)")
+
+ # ── Step 6: Wait for propagation + verify ──
+ step(6, "Verify propagation: stale/advisory")
+ await asyncio.sleep(2) # Let propagation engine process
+
+ # analysis-agent reads skill — should get v1 with advisory about v2
+ r = await http.post("/api/v1/tools/read", json={
+ "uri": "ctx://team/engineering/skills/sql-generator",
+ }, headers=aa)
+ assert r.status_code == 200
+ read_result = r.json()
+ print(f" analysis-agent reads skill: version={read_result.get('version')}")
+ if read_result.get("advisory"):
+ print(f" Advisory: {read_result['advisory']}")
+
+ # ── Step 7 (optional): Catalog sync + sql-context ──
+ step(7, "[Vertical] Catalog sync + sql-context query")
+ r = await http.post("/api/v1/datalake/sync", json={"catalog": "mock"}, headers=qa)
+ if r.status_code == 200:
+ sync = r.json()
+ print(f" Synced {sync['tables_synced']} tables ({sync['tables_created']} new)")
+
+ r = await http.get("/api/v1/datalake/mock/prod", headers=qa)
+ if r.status_code == 200:
+ tables = r.json().get("tables", [])
+ print(f" Listed {len(tables)} tables in mock/prod")
+
+ r = await http.post("/api/v1/search/sql-context", json={
+ "query": "How many orders per user?",
+ "catalog": "mock",
+ "top_k": 3,
+ }, headers=qa)
+ if r.status_code == 200:
+ sql_ctx = r.json()
+ print(f" sql-context returned {sql_ctx['total_tables_found']} relevant tables")
+ else:
+ print(f" sql-context: {r.status_code} (search may need embeddings)")
+ else:
+ print(f" Sync returned {r.status_code}: {r.text}")
+
+ # ── Summary ──
+ print(f"\n{'='*60}")
+ print(" MVP Demo Complete")
+ print(f"{'='*60}")
+ print(f" - Private memory created: {mem_uri}")
+ print(f" - Promoted to team: {promoted['uri']}")
+ print(f" - Skill v1 + v2 published, breaking propagation triggered")
+ print(f" - Cross-agent visibility verified")
+ print(f" - Catalog sync + sql-context demonstrated")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/opengauss/readme.md b/opengauss/readme.md
new file mode 100644
index 0000000..c486d14
--- /dev/null
+++ b/opengauss/readme.md
@@ -0,0 +1,9 @@
+`connection_test.py` test connection and basic read/write to a opengauss database.
+
+Unsupported Features:
+* `verify_LISTEN_UNLISTEN_NOTIFY.py`: opengauss does not support LISTEN/UNLISTEN/NOTIFY.
+* `vector_asyncpg.py`: opengauss is incompatible with asyncpg w.r.t. vector dtype, `message: unhandled standard data type 'vector' (OID 8305)`
+
+demo:
+* `demo_e2e_opengauss.py`: run demo with opengauss backend.
+* `cleanup_demo_data.py`: cleanup memories inserted by demo script.
diff --git a/opengauss/vector_async_gaussdb.py b/opengauss/vector_async_gaussdb.py
new file mode 100644
index 0000000..afaf426
--- /dev/null
+++ b/opengauss/vector_async_gaussdb.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python3
+"""Minimal repro for asyncpg + openGauss vector result decoding.
+
+What this script demonstrates:
+1. Queries that do NOT return the vector column succeed.
+2. Queries that DO return the vector column fail in asyncpg on openGauss.
+
+Default DSN matches the repo's openGauss setup guide. Override with:
+ VECTOR_REPRO_DSN=postgresql://user:pass@host:port/db python scripts/repro_asyncpg_opengauss_vector.py
+"""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import sys
+
+import async_gaussdb as asyncpg
+
+
+#DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub"
+DEFAULT_DSN = "gaussdb://contexthub:ContextHub%40123@localhost:15432/contexthub"
+TEMP_TABLE = "tmp_vector_repro"
+
+
+async def _run_query(conn: asyncpg.Connection, label: str, sql: str, method: str) -> None:
+ print(f"\n[{label}]")
+ print(f"SQL: {sql}")
+ try:
+ result = await getattr(conn, method)(sql)
+ print("status: OK")
+ print(f"result: {result}")
+ except Exception as exc:
+ print("status: ERROR")
+ print(f"type: {type(exc).__name__}")
+ print(f"message: {exc}")
+
+
+async def main() -> int:
+ dsn = os.environ.get("VECTOR_REPRO_DSN", DEFAULT_DSN)
+
+ print("Connecting with asyncpg...")
+ conn = await asyncpg.connect(dsn)
+ try:
+ await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}")
+ await conn.execute(
+ f"""
+ CREATE TABLE {TEMP_TABLE} (
+ id INT PRIMARY KEY,
+ note TEXT,
+ emb vector(3)
+ )
+ """
+ )
+ await conn.execute(
+ f"""
+ INSERT INTO {TEMP_TABLE} (id, note, emb)
+ VALUES (1, 'vector row', '[1,2,3]')
+ """
+ )
+
+ print("Inserted one row with a vector value.")
+
+ await _run_query(
+ conn,
+ "Control case: query without vector column",
+ f"SELECT id, note FROM {TEMP_TABLE}",
+ "fetch",
+ )
+ await _run_query(
+ conn,
+ "Failure case: query only the vector column",
+ f"SELECT emb FROM {TEMP_TABLE}",
+ "fetch",
+ )
+ await _run_query(
+ conn,
+ "Failure case: SELECT * includes the vector column",
+ f"SELECT * FROM {TEMP_TABLE}",
+ "fetch",
+ )
+ await _run_query(
+ conn,
+ "Failure case: fetchrow also fails when result contains vector",
+ f"SELECT * FROM {TEMP_TABLE}",
+ "fetchrow",
+ )
+ finally:
+ await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}")
+ await conn.close()
+
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(asyncio.run(main()))
diff --git a/opengauss/vector_asyncpg.py b/opengauss/vector_asyncpg.py
new file mode 100644
index 0000000..3707263
--- /dev/null
+++ b/opengauss/vector_asyncpg.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python3
+"""Minimal repro for asyncpg + openGauss vector result decoding.
+
+What this script demonstrates:
+1. Queries that do NOT return the vector column succeed.
+2. Queries that DO return the vector column fail in asyncpg on openGauss.
+
+Default DSN matches the repo's openGauss setup guide. Override with:
+ VECTOR_REPRO_DSN=postgresql://user:pass@host:port/db python scripts/repro_asyncpg_opengauss_vector.py
+"""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import sys
+
+import asyncpg
+
+
+DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub"
+TEMP_TABLE = "tmp_vector_repro"
+
+
+async def _run_query(conn: asyncpg.Connection, label: str, sql: str, method: str) -> None:
+ print(f"\n[{label}]")
+ print(f"SQL: {sql}")
+ try:
+ result = await getattr(conn, method)(sql)
+ print("status: OK")
+ print(f"result: {result}")
+ except Exception as exc:
+ print("status: ERROR")
+ print(f"type: {type(exc).__name__}")
+ print(f"message: {exc}")
+
+
+async def main() -> int:
+ dsn = os.environ.get("VECTOR_REPRO_DSN", DEFAULT_DSN)
+
+ print("Connecting with asyncpg...")
+ conn = await asyncpg.connect(dsn)
+ try:
+ await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}")
+ await conn.execute(
+ f"""
+ CREATE TABLE {TEMP_TABLE} (
+ id INT PRIMARY KEY,
+ note TEXT,
+ emb vector(3)
+ )
+ """
+ )
+ await conn.execute(
+ f"""
+ INSERT INTO {TEMP_TABLE} (id, note, emb)
+ VALUES (1, 'vector row', '[1,2,3]')
+ """
+ )
+
+ print("Inserted one row with a vector value.")
+
+ await _run_query(
+ conn,
+ "Control case: query without vector column",
+ f"SELECT id, note FROM {TEMP_TABLE}",
+ "fetch",
+ )
+ await _run_query(
+ conn,
+ "Failure case: query only the vector column",
+ f"SELECT emb FROM {TEMP_TABLE}",
+ "fetch",
+ )
+ await _run_query(
+ conn,
+ "Failure case: SELECT * includes the vector column",
+ f"SELECT * FROM {TEMP_TABLE}",
+ "fetch",
+ )
+ await _run_query(
+ conn,
+ "Failure case: fetchrow also fails when result contains vector",
+ f"SELECT * FROM {TEMP_TABLE}",
+ "fetchrow",
+ )
+ finally:
+ await conn.execute(f"DROP TABLE IF EXISTS {TEMP_TABLE}")
+ await conn.close()
+
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(asyncio.run(main()))
diff --git a/opengauss/vector_psycopg2.py b/opengauss/vector_psycopg2.py
new file mode 100644
index 0000000..150cb61
--- /dev/null
+++ b/opengauss/vector_psycopg2.py
@@ -0,0 +1,75 @@
+"""
+test read/write to opengauss database with vector datatype
+"""
+import psycopg2
+from psycopg2 import OperationalError
+
+DEFAULT_DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub"
+
+def test_opengauss_connection():
+ conn = None
+ try:
+ # Connect using explicit parameters to safely handle the '@' in the password
+ print("Connecting to openGauss database...")
+ conn = psycopg2.connect(DEFAULT_DSN)
+ print("✅ Connection successful!")
+
+ # Create a cursor to execute SQL commands
+ cur = conn.cursor()
+
+ # --- WRITE TEST ---
+ print("\nTesting Write Operation...")
+
+ # 1. Create a temporary test table
+ cur.execute("""
+ CREATE TABLE IF NOT EXISTS python_test_table (
+ id SERIAL PRIMARY KEY,
+ message VARCHAR(255),
+ emb vector(3)
+ )
+ """)
+
+ # 2. Insert data and return the ID
+ insert_query = "INSERT INTO python_test_table (message, emb) VALUES ('hello', '[1,2,3]') RETURNING id;"
+ cur.execute(insert_query)
+
+ # Fetch the ID of the newly inserted row
+ inserted_id = cur.fetchone()[0]
+
+ # Commit the transaction to save the write
+ conn.commit()
+ print(f"✅ Write successful! Inserted row with ID: {inserted_id}")
+
+ # --- READ TEST ---
+ print("\nTesting Read Operation...")
+
+ # Select the data we just inserted
+ select_query = "SELECT * FROM python_test_table WHERE id = %s;"
+ cur.execute(select_query, (inserted_id,))
+
+ record = cur.fetchone()
+ print(f"✅ Read successful! Fetched data: {record}")
+
+ # --- CLEANUP ---
+ print("\nCleaning up test data...")
+ cur.execute("DROP TABLE python_test_table;")
+ conn.commit()
+ print("✅ Table dropped successfully!")
+
+ except OperationalError as e:
+ print(f"❌ Connection failed: {e}")
+ except Exception as e:
+ print(f"❌ An error occurred during database operations: {e}")
+ # Rollback the transaction on error
+ if conn is not None:
+ conn.rollback()
+ finally:
+ # Always close the cursor and connection
+ if conn is not None:
+ cur.close()
+ conn.close()
+ print("\nDatabase connection closed.")
+
+if __name__ == "__main__":
+ test_opengauss_connection()
+
diff --git a/opengauss/verify_LISTEN_UNLISTEN_NOTIFY.py b/opengauss/verify_LISTEN_UNLISTEN_NOTIFY.py
new file mode 100644
index 0000000..d223706
--- /dev/null
+++ b/opengauss/verify_LISTEN_UNLISTEN_NOTIFY.py
@@ -0,0 +1,94 @@
+import asyncio
+import time
+import asyncpg
+
+DSN = "postgresql://contexthub:ContextHub%40123@localhost:15432/contexthub"
+CHANNEL = f"listen_notify_test_{int(time.time())}"
+PAYLOAD = "hello-from-notify"
+
+
+def print_exc(prefix: str, exc: Exception) -> None:
+ print(f"{prefix}: FAIL")
+ print(f" type = {type(exc).__name__}")
+ print(f" sqlstate = {getattr(exc, 'sqlstate', None)}")
+ print(f" message = {exc}")
+
+
+async def main():
+ listener_conn = None
+ sender_conn = None
+ got_event = asyncio.Event()
+ received = []
+
+ def on_notify(connection, pid, channel, payload):
+ received.append(
+ {
+ "pid": pid,
+ "channel": channel,
+ "payload": payload,
+ }
+ )
+ got_event.set()
+
+ try:
+ print(f"Connecting to {DSN}")
+ listener_conn = await asyncpg.connect(DSN)
+ sender_conn = await asyncpg.connect(DSN)
+ print("CONNECT: OK\n")
+
+ # 1) LISTEN
+ print(f"[1] Testing LISTEN on channel: {CHANNEL}")
+ listen_ok = False
+ try:
+ await listener_conn.add_listener(CHANNEL, on_notify)
+ listen_ok = True
+ print("LISTEN: OK")
+ except Exception as e:
+ print_exc("LISTEN", e)
+ print()
+
+ # 2) NOTIFY
+ print(f"[2] Testing NOTIFY on channel: {CHANNEL}")
+ notify_ok = False
+ try:
+ sql = f"NOTIFY {CHANNEL}, '{PAYLOAD}'"
+ result = await sender_conn.execute(sql)
+ notify_ok = True
+ print(f"NOTIFY: OK ({result})")
+ except Exception as e:
+ print_exc("NOTIFY", e)
+ print()
+
+ # 3) Delivery
+ print("[3] Testing notification delivery")
+ if listen_ok and notify_ok:
+ try:
+ await asyncio.wait_for(got_event.wait(), timeout=2.0)
+ print("DELIVERY: OK")
+ print(f" received = {received[0]}")
+ except asyncio.TimeoutError:
+ print("DELIVERY: FAIL")
+ print(" message = LISTEN and NOTIFY executed, but no notification was received within 2s")
+ else:
+ print("DELIVERY: SKIPPED")
+ print(" reason = LISTEN or NOTIFY already failed")
+ print()
+
+ # 4) UNLISTEN
+ print("[4] Testing UNLISTEN *")
+ try:
+ result = await listener_conn.execute("UNLISTEN *")
+ print(f"UNLISTEN: OK ({result})")
+ except Exception as e:
+ print_exc("UNLISTEN", e)
+ print()
+
+ finally:
+ if sender_conn is not None:
+ await sender_conn.close()
+ if listener_conn is not None:
+ await listener_conn.close()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/pyproject.toml b/pyproject.toml
index a4c3259..6e82023 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -10,7 +10,7 @@ dependencies = [
"fastapi>=0.115",
"uvicorn[standard]>=0.34",
"asyncpg>=0.30",
- "pgvector>=0.3",
+ "psycopg[binary]>=3.2",
"alembic>=1.14",
"pydantic>=2.10",
"pydantic-settings>=2.7",
@@ -18,6 +18,9 @@ dependencies = [
]
[project.optional-dependencies]
+postgres = [
+ "pgvector>=0.3",
+]
dev = [
"httpx>=0.28",
"pytest>=8.0",
diff --git a/src/contexthub/config.py b/src/contexthub/config.py
index 8ea71d4..865607c 100644
--- a/src/contexthub/config.py
+++ b/src/contexthub/config.py
@@ -16,6 +16,7 @@ def _normalize_postgres_url(url: str) -> str:
class Settings(BaseSettings):
database_url: str = "postgresql://contexthub:contexthub@localhost:5432/contexthub"
+ db_backend: str = "postgres" # "postgres" or "opengauss"
api_key: str = "changeme"
embedding_model: str = "text-embedding-3-small"
propagation_enabled: bool = True
@@ -33,6 +34,10 @@ class Settings(BaseSettings):
env_file_encoding="utf-8",
)
+ @property
+ def is_opengauss(self) -> bool:
+ return self.db_backend.lower() == "opengauss"
+
@property
def asyncpg_database_url(self) -> str:
return _normalize_postgres_url(self.database_url)
diff --git a/src/contexthub/db/pool.py b/src/contexthub/db/pool.py
index 5cf2123..528739b 100644
--- a/src/contexthub/db/pool.py
+++ b/src/contexthub/db/pool.py
@@ -3,9 +3,19 @@
from contexthub.config import Settings
+class _OpenGaussConnection(asyncpg.Connection):
+ """Avoid UNLISTEN in pool reset queries for openGauss compatibility."""
+
+ def get_reset_query(self):
+ return "SELECT pg_advisory_unlock_all(); CLOSE ALL; RESET ALL;"
+
+
async def create_pool(settings: Settings) -> asyncpg.Pool:
- return await asyncpg.create_pool(
- dsn=settings.asyncpg_database_url,
- min_size=2,
- max_size=10,
- )
+ kwargs = {
+ "dsn": settings.asyncpg_database_url,
+ "min_size": 2,
+ "max_size": 10,
+ }
+ if settings.is_opengauss:
+ kwargs["connection_class"] = _OpenGaussConnection
+ return await asyncpg.create_pool(**kwargs)
diff --git a/src/contexthub/db/repository.py b/src/contexthub/db/repository.py
index 8562347..e659e63 100644
--- a/src/contexthub/db/repository.py
+++ b/src/contexthub/db/repository.py
@@ -1,13 +1,34 @@
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from typing import Any
+import re
import asyncpg
+import psycopg
+from psycopg.rows import dict_row
+from contexthub.config import Settings
-class ScopedRepo:
- """Request-scoped 数据库执行器。所有 SQL 都必须通过它执行。"""
+_DOLLAR_PARAM_PATTERN = re.compile(r"\$([1-9][0-9]*)")
+
+
+def _rewrite_dollar_params(sql: str, args: tuple[Any, ...]) -> tuple[str, tuple[Any, ...]]:
+ """Convert $n placeholders to %s placeholders for psycopg."""
+ indexes: list[int] = []
+
+ def _replace(match: re.Match[str]) -> str:
+ idx = int(match.group(1)) - 1
+ indexes.append(idx)
+ return "%s"
+
+ rewritten = _DOLLAR_PARAM_PATTERN.sub(_replace, sql)
+ if not indexes:
+ return sql, args
+ return rewritten, tuple(args[i] for i in indexes)
+
+
+class _AsyncpgExecutor:
def __init__(self, conn: asyncpg.Connection):
self._conn = conn
@@ -24,12 +45,98 @@ async def execute(self, sql: str, *args: Any) -> str:
return await self._conn.execute(sql, *args)
+class _OpenGaussExecutor:
+ def __init__(self, conn: psycopg.AsyncConnection[Any]):
+ self._conn = conn
+
+ async def fetch(self, sql: str, *args: Any) -> list[dict[str, Any]]:
+ adapted_sql, adapted_args = _rewrite_dollar_params(sql, args)
+ async with self._conn.cursor(row_factory=dict_row) as cur:
+ if adapted_args:
+ await cur.execute(adapted_sql, adapted_args)
+ else:
+ await cur.execute(adapted_sql)
+ return await cur.fetchall()
+
+ async def fetchrow(self, sql: str, *args: Any) -> dict[str, Any] | None:
+ adapted_sql, adapted_args = _rewrite_dollar_params(sql, args)
+ async with self._conn.cursor(row_factory=dict_row) as cur:
+ if adapted_args:
+ await cur.execute(adapted_sql, adapted_args)
+ else:
+ await cur.execute(adapted_sql)
+ row = await cur.fetchone()
+ return row
+
+ async def fetchval(self, sql: str, *args: Any) -> Any:
+ adapted_sql, adapted_args = _rewrite_dollar_params(sql, args)
+ async with self._conn.cursor() as cur:
+ if adapted_args:
+ await cur.execute(adapted_sql, adapted_args)
+ else:
+ await cur.execute(adapted_sql)
+ row = await cur.fetchone()
+ return row[0] if row is not None else None
+
+ async def execute(self, sql: str, *args: Any) -> str:
+ adapted_sql, adapted_args = _rewrite_dollar_params(sql, args)
+ async with self._conn.cursor() as cur:
+ if adapted_args:
+ await cur.execute(adapted_sql, adapted_args)
+ else:
+ await cur.execute(adapted_sql)
+ return cur.statusmessage or ""
+
+
+class ScopedRepo:
+ """Request-scoped 数据库执行器。所有 SQL 都必须通过它执行。"""
+
+ def __init__(self, conn: _AsyncpgExecutor | _OpenGaussExecutor):
+ self._conn = conn
+
+ async def fetch(self, sql: str, *args: Any) -> list[Any]:
+ return await self._conn.fetch(sql, *args)
+
+ async def fetchrow(self, sql: str, *args: Any) -> Any:
+ return await self._conn.fetchrow(sql, *args)
+
+ async def fetchval(self, sql: str, *args: Any) -> Any:
+ return await self._conn.fetchval(sql, *args)
+
+ async def execute(self, sql: str, *args: Any) -> str:
+ return await self._conn.execute(sql, *args)
+
+
class PgRepository:
- def __init__(self, pool: asyncpg.Pool):
+ def __init__(self, pool: asyncpg.Pool | None, settings: Settings | None = None):
self._pool = pool
+ self._settings = settings or Settings()
@asynccontextmanager
async def session(self, account_id: str) -> AsyncIterator[ScopedRepo]:
+ if self._settings.is_opengauss:
+ conn = await psycopg.AsyncConnection.connect(self._settings.asyncpg_database_url)
+ try:
+ await conn.execute("BEGIN")
+ scoped_repo = ScopedRepo(_OpenGaussExecutor(conn))
+ await scoped_repo.execute(
+ "SELECT set_config('app.account_id', $1, true)",
+ account_id,
+ )
+ try:
+ yield scoped_repo
+ except Exception:
+ await conn.rollback()
+ raise
+ else:
+ await conn.commit()
+ finally:
+ await conn.close()
+ return
+
+ if self._pool is None:
+ raise RuntimeError("Postgres backend requires an asyncpg pool")
+
async with self._pool.acquire() as conn:
async with conn.transaction():
# asyncpg does not support bind parameters in SET statements.
@@ -37,4 +144,4 @@ async def session(self, account_id: str) -> AsyncIterator[ScopedRepo]:
"SELECT set_config('app.account_id', $1, true)",
account_id,
)
- yield ScopedRepo(conn)
+ yield ScopedRepo(_AsyncpgExecutor(conn))
diff --git a/src/contexthub/services/acl_service.py b/src/contexthub/services/acl_service.py
index 7c85e1b..c528042 100644
--- a/src/contexthub/services/acl_service.py
+++ b/src/contexthub/services/acl_service.py
@@ -1,10 +1,11 @@
"""MVP ACL: default visibility and write permission checks."""
+import os
+
from contexthub.db.repository import ScopedRepo
from contexthub.models.context import Scope
from contexthub.models.request import RequestContext
-
class ACLService:
"""MVP 默认可见性 / 默认写权限。"""
diff --git a/src/contexthub/services/propagation_engine.py b/src/contexthub/services/propagation_engine.py
index 3ae560b..66eaf72 100644
--- a/src/contexthub/services/propagation_engine.py
+++ b/src/contexthub/services/propagation_engine.py
@@ -61,7 +61,15 @@ async def start(self) -> None:
try:
# 1. 建立独立 LISTEN 连接
listen_conn = await asyncpg.connect(self._dsn)
- await listen_conn.add_listener("context_changed", self._on_notify)
+ try:
+ await listen_conn.add_listener("context_changed", self._on_notify)
+ except asyncpg.exceptions.FeatureNotSupportedError:
+ logger.warning(
+ "LISTEN/NOTIFY is not supported by the current database; "
+ "falling back to periodic propagation polling only."
+ )
+ await listen_conn.close()
+ listen_conn = None
# 2. 启动单条 drain loop + 周期唤醒 task
drain_task = asyncio.create_task(self._drain_loop())
diff --git a/src/contexthub/services/skill_service.py b/src/contexthub/services/skill_service.py
index 039c67f..64132a4 100644
--- a/src/contexthub/services/skill_service.py
+++ b/src/contexthub/services/skill_service.py
@@ -174,17 +174,34 @@ async def subscribe(
raise BadRequestError(f"Version {pinned_version} does not exist")
if ver["status"] != "published":
raise BadRequestError(f"Version {pinned_version} is not published")
-
- row = await db.fetchrow(
+ #row = await db.fetchrow(
+ # """
+ # INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id)
+ # VALUES ($1, $2, $3, current_setting('app.account_id'))
+ # ON CONFLICT (agent_id, skill_id)
+ # DO UPDATE SET pinned_version = EXCLUDED.pinned_version
+ # RETURNING *
+ # """,
+ # ctx.agent_id, skill_id, pinned_version,
+ #)
+ _ = await db.execute(
"""
INSERT INTO skill_subscriptions (agent_id, skill_id, pinned_version, account_id)
VALUES ($1, $2, $3, current_setting('app.account_id'))
- ON CONFLICT (agent_id, skill_id)
- DO UPDATE SET pinned_version = EXCLUDED.pinned_version
- RETURNING *
+ ON DUPLICATE KEY UPDATE pinned_version = EXCLUDED.pinned_version
""",
ctx.agent_id, skill_id, pinned_version,
)
+ row = await db.fetchrow(
+ """
+ SELECT * FROM skill_subscriptions
+ WHERE agent_id = $1 AND skill_id = $2;
+ """,
+ ctx.agent_id, skill_id,
+ )
+ # FIXME: ON DUPLICATE KEY UPDATE is incompatible with ROW level security policy in opengauss
+ # psycopg.errors.FeatureNotSupported: Row level security policy is not supported by INSERT ON DUPLICATE KEY UPDATE.
+ raise NotImplementedError
return SkillSubscription(
id=row["id"],
agent_id=row["agent_id"],
diff --git a/tests/test_propagation.py b/tests/test_propagation.py
index d44d7f5..1268001 100644
--- a/tests/test_propagation.py
+++ b/tests/test_propagation.py
@@ -1,6 +1,7 @@
"""Propagation engine tests: P-1 through P-8 + event-time correctness + single-instance concurrency."""
import asyncio
+import asyncpg
import json
import uuid
from contextlib import asynccontextmanager
@@ -872,6 +873,33 @@ async def test_start_failure_closes_listener_connection():
assert engine._running is False
+@pytest.mark.asyncio
+async def test_start_degrades_gracefully_when_listen_not_supported():
+ """Unsupported LISTEN should fall back to polling instead of aborting startup."""
+ engine = _make_engine()
+ listen_conn = MagicMock()
+ listen_conn.add_listener = AsyncMock(
+ side_effect=asyncpg.exceptions.FeatureNotSupportedError(
+ "LISTEN statement is not yet supported."
+ )
+ )
+ listen_conn.close = AsyncMock()
+
+ with patch(
+ "contexthub.services.propagation_engine.asyncpg.connect",
+ AsyncMock(return_value=listen_conn),
+ ):
+ await engine.start()
+
+ listen_conn.close.assert_awaited_once()
+ assert engine._listen_conn is None
+ assert engine._drain_task is not None
+ assert engine._ticker_task is not None
+ assert engine._running is True
+
+ await engine.stop()
+
+
@pytest.mark.asyncio
async def test_lifespan_cleans_up_when_propagation_start_fails():
"""Startup failure must still close pool and embedding client."""