diff --git a/rfcs/PaddleMaterials/GDI-NN/20260322_gdinn.md b/rfcs/PaddleMaterials/GDI-NN/20260322_gdinn.md new file mode 100644 index 000000000..66c187f88 --- /dev/null +++ b/rfcs/PaddleMaterials/GDI-NN/20260322_gdinn.md @@ -0,0 +1,165 @@ +# GDI-NN 设计文档 + +| API名称 | 新增API名称 | +| ------------ | ----------------- | +| 提交作者 | 柳顺(megemini) | +| 提交时间 | 2026-03-22 | +| 版本号 | V1.0.0 | +| 依赖飞桨版本 | develop | +| 文件名 | 20260322_gdinn.md | + +# 一、概述 + +## 1、相关背景 + +GDI-NN (Gibbs-Duhem Informed Neural Network) 是一种基于物理约束的图神经网络模型,用于预测二元混合物中组分的活度系数(activity coefficient),该模型由Rittig等人提出,通过将Gibbs-Duhem方程作为正则化项引入损失函数,确保模型预测满足热力学一致性。 + +## 2、功能目标 + +本项目旨在将 GDI-NN 模型集成至 PaddleMaterials 框架,提供模型训练、评估和推理的完整 pipeline,实现与原始 PyTorch 版本的精度对齐。 + +## 3、意义 + +填补飞桨生态在热力学一致性分子性质预测领域的空白 + +# 二、飞桨现状 + +PaddleMaterials 目前不支持 GDI-NN 模型。 + +# 三、业内方案调研 + +[GDI-NN](https://git.rwth-aachen.de/avt-svt/public/GDI-NN) 此 REPO 提供了 PyTorch 实现,包括模型: + +- model/model_GNN.py,实现了 solvgnn_binary, solvgnn_xMLP_binary,gegnn_binary 等模型 +- model/model_MCM.py,实现了 MCM_multiMLP 模型 + +# 四、对比分析 + +**PyTorch原版**: + +基于 PyTorch 的实现,使用 DGL 进行图操作。 + +**飞桨实现**: + +基于 PaddlePaddle 的实现,使用 PGL 进行图操作。 + +# 五、设计思路与实现方案 + +```shell +ppmat/ +├── datasets/ +│ ├── __init__.py +│ ├── binary_activity_dataset.py # 二元活度系数数据集处理类 +│ └── collate_fn.py # 数据集处理函数 +├── losses/ +│ ├── __init__.py +│ └── gibbs_duhem_loss.py # Gibbs-Duhem 一致性损失函数 +├── models/ +│ ├─── gdinn/ +│ │ ├── utils/ +│ │ │ ├── atom_feat_encoding.py # 特征编码 +│ │ │ ├── graph_utils.py # 图操作工具 +│ │ │ ├── layers.py # 图卷积层 +│ │ │ ├── molecular_graph.py # 分子图构建工具 +│ │ ├── __init__.py +│ │ ├── gnn.py # SolvGNN 等模型,对应 GDI-NN 的 model_GNN.py +│ │ └── mcm.py # MCM_multiMLP 模型,对应 GDI-NN 的 model_MCM.py +│ └─── __init__.py +├── property_prediction/ +│ └── configs/ +│ └── gdinn/ +│ └── solvgnn_binary_gamma.yaml # 模型训练配置 +└── .pre-commit-config.yaml # 修改了 line 的限制为 120,默认的 88 对于 docstring 中的公式等限制太大 +``` + +其中: + +- `datasets/binary_activity_dataset.py` 与 `datasets/collate_fn.py` + +GDI-NN 中的数据集处理类为 `util/generate_dataset_for_training.py`,这里主要参考了这个文件进行适配。 + +GDI-NN 中还有一个数据集处理类为 `util/generate_dataset.py`,但是,整个 repo 中都没有使用!因此,这里只适配 `generate_dataset_for_training.py` 。 + +由于 PaddleMaterials 处理数据的方式的不同,将 `empty_solvsys` 的生成放到了 `collate_fn.py` 中,根据每一个 batch 进行处理。 + +- `losses/gibbs_duhem_loss.py` + +GDI-NN 中将 loss 放在了 `train.py` 中,本项目将其单独抽取出来,放到 `losses/gibbs_duhem_loss.py` 作为类使用。 + +在模型 `gnn.py` 和 `mcm.py` 中,也都使用了 `losses/gibbs_duhem_loss.py` 的 `GibbsDuhemLoss` 类。 + +- `models/gdinn/utils/atom_feat_encoding.py` + +参考 GDI-NN 的 `util/atom_feat_encoding.py` 进行了部分的适配。原项目这个文件中有很多类和方法,此次项目迁移只保留了部分实际用的方法。 + +- `models/gdinn/utils/molecular_graph.py` + +参考 GDI-NN 的 `util/molecular_graph.py` 进行了部分的适配。使用 PGL 代替 DGL 。 + +- `models/gdinn/utils/layers.py` 与 `models/gdinn/utils/graph_utils.py` + +`models/gdinn/utils/layers.py` 中抽取并迁移了部分的 layer,`models/gdinn/utils/graph_utils.py` 中抽取了部分公用方法。 + +这里单独说明一下 `NNConv` 这个类,在 `paddle_geometric` 中有这么一个类 (`jointContribution/mattergen/paddle_geometric/nn/conv/nn_conv.py`),但是,经过测试发现,`paddle_geometric` 这个包目前似乎还有一些问题,比如,在我本地的 paddle 版本为 `3.1.0` 的环境中,通过源码安装 `paddle_geometric` 后无法运行,经过一些修改与问题定位后,怀疑是 paddle 版本的兼容问题导致的。因此,这里选择单独创建无特殊依赖的 `NNConv` 类,而不是直接使用 `paddle_geometric` 中的 `NNConv` 类。 + +- `models/gdinn/gnn.py` 和 `models/gdinn/mcm.py` + +参考 GDI-NN 的 `model/model_GNN.py` 和 `model/model_MCM.py` 进行了适配。但是,并没有将 `model/model_GNN.py` 中的所有模型都做迁移,如: + +- solvgnn_onexMLP_binary +- solvgnn_onexMLP_share1layer_binary +- solvgnn_onexMLP_share2layer_binary + +这几个模型仅在 `model/model_GNN.py` 中出现,`train.py` 中并没有使用。`train.py` 中仅提供了: + +- SolvGNN +- SolvGNNxMLP +- GEGNN +- MCM_multiMLP + +的支持,本项目也仅迁移了这几个模型。 + +- `property_prediction/configs/gdinn/solvgnn_binary_gamma.yaml` + +参考 GDI-NN 的 `train.py` 的使用进行了编写。 + +# 六、测试和验收的考量 + +- **单元测试**:本地对模型、类、方法做必要的单元测试 +- **精度对齐**:与PyTorch原版在相同数据集上进行精度对齐 +- **集成测试**:验证与PaddleMaterials训练pipeline的兼容性 + +**验收标准**: + +- 模型训练精度与PyTorch原版精度对齐 +- 模型推理精度与PyTorch原版精度对齐 +- 模型训练pipeline与PaddleMaterials训练pipeline兼容 + +# 七、可行性分析和排期规划 + +## 可行性分析 + +- **技术可行性**: + + 使用 PaddlePaddle 代替 PyTorch 进行开发,并使用 PGL 代替 DGL 进行图操作,经验证可行。 + +- **数据可行性**: + + 使用 GDI-NN 中提供的数据 `data/output_binary_with_inf_all.csv` `data/solvent_list.csv` 进行测试,精度与 PyTorch 原版精度对齐。 + +## 排期规划 + +- 代码迁移,1 周 +- 集成测试,1 周 +- 精度测试,1 周 +- 其他事项,1 周 + +# 八、影响面 + +本项目将 GDI-NN 集成到 PaddleMaterials 中,利用 PGL 代替 DGL ,未引入其他依赖。 + +# 附件及参考资料 + +- https://github.com/PaddlePaddle/community/blob/master/hackathon/hackathon_10th/%E3%80%90Hackathon_10th%E3%80%91%E5%BC%80%E6%BA%90%E8%B4%A1%E7%8C%AE%E4%B8%AA%E4%BA%BA%E6%8C%91%E6%88%98%E8%B5%9B%E6%98%A5%E8%8A%82%E7%89%B9%E5%88%AB%E5%AD%A3%E2%80%94%E4%BB%BB%E5%8A%A1%E5%90%88%E9%9B%86.md#no6---no19-paddlemateirals%E6%A8%A1%E5%9E%8B%E5%A4%8D%E7%8E%B0 +- https://github.com/PaddlePaddle/PaddleMaterials/issues/194 +- https://git.rwth-aachen.de/avt-svt/public/GDI-NN \ No newline at end of file diff --git a/rfcs/PaddleMaterials/GDI-NN/test_gdinn/quick_test.py b/rfcs/PaddleMaterials/GDI-NN/test_gdinn/quick_test.py new file mode 100644 index 000000000..784b14db3 --- /dev/null +++ b/rfcs/PaddleMaterials/GDI-NN/test_gdinn/quick_test.py @@ -0,0 +1,1295 @@ +#!/usr/bin/env python +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. + +""" +GDI-NN 快速测试脚本 +验证训练和预测流程是否正常工作 +""" + +import os +import sys +import paddle +import numpy as np +import pandas as pd + +# 添加路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'ppmat')) + +# ============================================================================ +# 配置路径 +# ============================================================================ +class Config: + # 数据目录配置 + DATASET_DIR = './test_gdinn/dataset' + OUTPUT_DIR = './test_gdinn/data/gdinn' + + # 原始数据文件 + SOLVENT_LIST_FILE = 'solvent_list.csv' + BINARY_DATA_FILE = 'output_binary_with_inf_all.csv' + + # 输出数据文件 + TRAIN_BINARY_FILE = 'train_binary.csv' + VAL_BINARY_FILE = 'val_binary.csv' + TEST_BINARY_FILE = 'test_binary.csv' + SOLVENT_LIST_OUTPUT = 'solvent_list.csv' + + # MCM 数据文件 + TRAIN_MCM_FILE = 'train_mcm.csv' + VAL_MCM_FILE = 'val_mcm.csv' + TEST_MCM_FILE = 'test_mcm.csv' + + # 数据分割比例 + TRAIN_RATIO = 0.8 + VAL_RATIO = 0.1 + # TEST_RATIO = 0.1 + + # 完整路径 + @property + def solvent_list_path(self): + return os.path.join(self.DATASET_DIR, self.SOLVENT_LIST_FILE) + + @property + def binary_data_path(self): + return os.path.join(self.DATASET_DIR, self.BINARY_DATA_FILE) + + @property + def output_dir(self): + os.makedirs(self.OUTPUT_DIR, exist_ok=True) + return self.OUTPUT_DIR + + @property + def train_binary_path(self): + return os.path.join(self.OUTPUT_DIR, self.TRAIN_BINARY_FILE) + + @property + def val_binary_path(self): + return os.path.join(self.OUTPUT_DIR, self.VAL_BINARY_FILE) + + @property + def test_binary_path(self): + return os.path.join(self.OUTPUT_DIR, self.TEST_BINARY_FILE) + + @property + def solvent_list_output_path(self): + return os.path.join(self.OUTPUT_DIR, self.SOLVENT_LIST_OUTPUT) + + @property + def train_mcm_path(self): + return os.path.join(self.OUTPUT_DIR, self.TRAIN_MCM_FILE) + + @property + def val_mcm_path(self): + return os.path.join(self.OUTPUT_DIR, self.VAL_MCM_FILE) + + @property + def test_mcm_path(self): + return os.path.join(self.OUTPUT_DIR, self.TEST_MCM_FILE) + + +# 全局配置实例 +config = Config() + + +def create_test_data(): + """创建测试数据(使用 GDI-NN 格式)""" + import pandas as pd + + print("准备测试数据(使用 GDI-NN 格式)...") + + # 使用配置路径 + solvent_list_path = config.solvent_list_path + output_binary_path = config.binary_data_path + output_dir = config.output_dir + + # 复制溶剂列表(直接使用) + print(f"读取溶剂列表: {solvent_list_path}") + solvent_df = pd.read_csv(solvent_list_path) + solvent_df.to_csv(config.solvent_list_output_path, index=False) + print(f"✓ 溶剂数量: {len(solvent_df)}") + + # 读取 GDI-NN 格式数据 + print(f"读取数据: {output_binary_path}") + df = pd.read_csv(output_binary_path) + print(f"✓ 数据量: {len(df)}") + + # GDI-NN 格式:solv1_gamma, solv2_gamma 存储 ln_gamma + # 取前5000条数据进行测试(如果数据量足够) + df = df.head(5000) + + # 分割数据集(使用比例) + n = len(df) + train_size = int(n * config.TRAIN_RATIO) + val_size = int(n * config.VAL_RATIO) + + train_df = df.iloc[:train_size] + val_df = df.iloc[train_size:train_size + val_size] + test_df = df.iloc[train_size + val_size:] + + # 保存数据(GDI-NN 格式) + train_df.to_csv(config.train_binary_path, index=False) + val_df.to_csv(config.val_binary_path, index=False) + test_df.to_csv(config.test_binary_path, index=False) + + print(f"✓ 训练集: {len(train_df)} 样本") + print(f"✓ 验证集: {len(val_df)} 样本") + print(f"✓ 测试集: {len(test_df)} 样本") + print(f"✓ 数据保存在: {config.OUTPUT_DIR}/") + + +def test_data_loading(): + """测试数据加载""" + print("\n" + "=" * 80) + print("测试数据加载") + print("=" * 80) + + try: + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建数据集(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + print(f"✓ 数据集创建成功") + print(f" 样本数量: {len(dataset)}") + + # 创建采样器 + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=True, + drop_last=True + ) + + # 创建数据加载器 + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + print(f"✓ 数据加载器创建成功") + print(f" Batch数量: {len(dataloader)}") + + # 测试加载数据 + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 2: # 只测试前2个batch + break + + print(f"\nBatch {batch_idx + 1}:") + print(f" g1 nodes: {batch['g1'].num_nodes}") + print(f" g1 edges: {batch['g1'].num_edges}") + print(f" g2 nodes: {batch['g2'].num_nodes}") + print(f" g2 edges: {batch['g2'].num_edges}") + print(f" empty_solvsys nodes: {batch['empty_solvsys'].num_nodes}") + print(f" empty_solvsys edges: {batch['empty_solvsys'].num_edges}") + print(f" x1 shape: {batch['x1'].shape}") + print(f" x2 shape: {batch['x2'].shape}") + print(f" gamma1 shape: {batch['gamma1'].shape}") + print(f" gamma2 shape: {batch['gamma2'].shape}") + + print("\n✓ 数据加载测试通过") + return True + + except Exception as e: + print(f"\n✗ 数据加载测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_model_forward(): + """测试模型前向传播""" + print("\n" + "=" * 80) + print("测试模型前向传播") + print("=" * 80) + + try: + from ppmat.models import SolvGNN + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 (使用与原始 GDI-NN 一致的默认参数) + model = SolvGNN( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + num_step_message_passing=1, + pinn_lambda=1.0 + ) + + print(f"✓ 模型创建成功") + param_count = sum(p.numel().item() for p in model.parameters()) + print(f" 参数数量: {param_count}") + + # 创建数据加载器(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=False, + drop_last=True + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 测试前向传播 + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 1: + break + + print(f"\n测试 Batch {batch_idx + 1}...") + + # 前向传播 + output = model(batch) + + print(f"✓ 前向传播成功") + print(f" loss_dict keys: {list(output['loss_dict'].keys())}") + print(f" pred_dict keys: {list(output['pred_dict'].keys())}") + print(f" loss: {output['loss_dict']['loss'].item():.4f}") + print(f" pred_loss: {output['loss_dict'].get('pred_loss', 0).item():.4f}") + print(f" gd_loss: {output['loss_dict'].get('gd_loss', 0).item():.4f}") + print(f" gamma1 shape: {output['pred_dict']['gamma1'].shape}") + print(f" gamma2 shape: {output['pred_dict']['gamma2'].shape}") + + print("\n✓ 模型前向传播测试通过") + return True + + except Exception as e: + print(f"\n✗ 模型前向传播测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_training_step(): + """测试训练步骤""" + print("\n" + "=" * 80) + print("测试训练步骤") + print("=" * 80) + + try: + from ppmat.models import SolvGNN + from ppmat.losses import GibbsDuhemLoss + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 (使用与原始 GDI-NN 一致的默认参数) + model = SolvGNN( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + num_step_message_passing=1, + pinn_lambda=1.0 + ) + + # 创建损失函数 + criterion = GibbsDuhemLoss() + + print(f"✓ 模型和损失函数创建成功") + + # 创建数据加载器(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=True, + drop_last=True + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 创建优化器 + optimizer = paddle.optimizer.Adam( + parameters=model.parameters(), + learning_rate=0.001 + ) + + print(f"✓ 优化器创建成功") + + # 测试训练步骤 + model.train() + + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 3: + break + + # 前向传播 + output = model(batch) + loss = output['loss_dict']['loss'] + + # 反向传播 + loss.backward() + + # 梯度裁剪 + paddle.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + + # 参数更新 + optimizer.step() + optimizer.clear_grad() + + print(f" Step {batch_idx + 1}: Loss = {loss.item():.4f}") + + print("\n✓ 训练步骤测试通过") + return True + + except Exception as e: + print(f"\n✗ 训练步骤测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_solvgnn_xmlp_forward(): + """测试 SolvGNNxMLP 模型前向传播""" + print("\n" + "=" * 80) + print("测试 SolvGNNxMLP 模型前向传播") + print("=" * 80) + + try: + from ppmat.models import SolvGNNxMLP + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 + model = SolvGNNxMLP( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + mlp_num_hid_layers=2, + num_step_message_passing=1, + pinn_lambda=1.0 + ) + + print(f"✓ SolvGNNxMLP 模型创建成功") + param_count = sum(p.numel().item() for p in model.parameters()) + print(f" 参数数量: {param_count}") + + # 创建数据加载器(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=False, + drop_last=True + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 测试前向传播 + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 1: + break + + print(f"\n测试 Batch {batch_idx + 1}...") + + # 前向传播 + output = model(batch) + + print(f"✓ 前向传播成功") + print(f" loss_dict keys: {list(output['loss_dict'].keys())}") + print(f" pred_dict keys: {list(output['pred_dict'].keys())}") + print(f" loss: {output['loss_dict']['loss'].item():.4f}") + print(f" pred_loss: {output['loss_dict'].get('pred_loss', 0).item():.4f}") + print(f" gd_loss: {output['loss_dict'].get('gd_loss', 0).item():.4f}") + print(f" gamma1 shape: {output['pred_dict']['gamma1'].shape}") + print(f" gamma2 shape: {output['pred_dict']['gamma2'].shape}") + + print("\n✓ SolvGNNxMLP 模型前向传播测试通过") + return True + + except Exception as e: + print(f"\n✗ SolvGNNxMLP 模型前向传播测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_solvgnn_xmlp_training(): + """测试 SolvGNNxMLP 模型训练步骤""" + print("\n" + "=" * 80) + print("测试 SolvGNNxMLP 模型训练步骤") + print("=" * 80) + + try: + from ppmat.models import SolvGNNxMLP + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 + model = SolvGNNxMLP( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + mlp_num_hid_layers=2, + num_step_message_passing=1, + pinn_lambda=1.0 + ) + + print(f"✓ SolvGNNxMLP 模型创建成功") + + # 创建数据加载器(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=True, + drop_last=True + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 创建优化器 + optimizer = paddle.optimizer.Adam( + parameters=model.parameters(), + learning_rate=0.001 + ) + + print(f"✓ 优化器创建成功") + + # 测试训练步骤 + model.train() + + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 3: + break + + # 前向传播 + output = model(batch) + loss = output['loss_dict']['loss'] + + # 反向传播 + loss.backward() + + # 梯度裁剪 + paddle.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + + # 参数更新 + optimizer.step() + optimizer.clear_grad() + + print(f" Step {batch_idx + 1}: Loss = {loss.item():.4f}") + + print("\n✓ SolvGNNxMLP 训练步骤测试通过") + return True + + except Exception as e: + print(f"\n✗ SolvGNNxMLP 训练步骤测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_gegnn_forward(): + """测试 GEGNN 模型前向传播""" + print("\n" + "=" * 80) + print("测试 GEGNN 模型前向传播") + print("=" * 80) + + try: + from ppmat.models import GEGNN + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 + model = GEGNN( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + num_step_message_passing=1, + pinn_lambda=1.0 + ) + + print(f"✓ GEGNN 模型创建成功") + param_count = sum(p.numel().item() for p in model.parameters()) + print(f" 参数数量: {param_count}") + + # 创建数据加载器(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=False, + drop_last=True + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 测试前向传播 + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 1: + break + + print(f"\n测试 Batch {batch_idx + 1}...") + + # 前向传播 + output = model(batch) + + print(f"✓ 前向传播成功") + print(f" loss_dict keys: {list(output['loss_dict'].keys())}") + print(f" pred_dict keys: {list(output['pred_dict'].keys())}") + print(f" loss: {output['loss_dict']['loss'].item():.4f}") + print(f" pred_loss: {output['loss_dict'].get('pred_loss', 0).item():.4f}") + print(f" gd_loss: {output['loss_dict'].get('gd_loss', 0).item():.4f}") + print(f" gamma1 shape: {output['pred_dict']['gamma1'].shape}") + print(f" gamma2 shape: {output['pred_dict']['gamma2'].shape}") + if 'G_E' in output['pred_dict']: + print(f" G_E shape: {output['pred_dict']['G_E'].shape}") + print(f" G_E mean: {output['pred_dict']['G_E'].mean().item():.4f}") + + print("\n✓ GEGNN 模型前向传播测试通过") + return True + + except Exception as e: + print(f"\n✗ GEGNN 模型前向传播测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_gegnn_training(): + """测试 GEGNN 模型训练步骤""" + print("\n" + "=" * 80) + print("测试 GEGNN 模型训练步骤") + print("=" * 80) + + try: + from ppmat.models import GEGNN + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 + model = GEGNN( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + num_step_message_passing=1, + pinn_lambda=1.0 + ) + + print(f"✓ GEGNN 模型创建成功") + + # 创建数据加载器(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=True, + drop_last=True + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 创建优化器 + optimizer = paddle.optimizer.Adam( + parameters=model.parameters(), + learning_rate=0.001 + ) + + print(f"✓ 优化器创建成功") + + # 测试训练步骤 + model.train() + + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 3: + break + + # 前向传播 + output = model(batch) + loss = output['loss_dict']['loss'] + + # 反向传播 + loss.backward() + + # 梯度裁剪 + paddle.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + + # 参数更新 + optimizer.step() + optimizer.clear_grad() + + print(f" Step {batch_idx + 1}: Loss = {loss.item():.4f}, " + f"G_E = {output['pred_dict']['G_E'].mean().item():.4f}") + + print("\n✓ GEGNN 训练步骤测试通过") + return True + + except Exception as e: + print(f"\n✗ GEGNN 训练步骤测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def create_test_data_mcm(): + """创建 MCM 模型测试数据(使用 ID 而非 SMILES)""" + import pandas as pd + + print("创建 MCM 测试数据...") + + # 创建简单的测试数据 + data = [] + + # 使用 ID 而非 SMILES + # 假设有 10 种不同的溶剂 + solvent_pairs = [ + # Solvent ID pairs (solv1_id, solv2_id, temp, x1, gamma1, gamma2) + (0, 1, 298.15, 0.5, 1.2, 0.8), + (1, 2, 298.15, 0.5, 1.1, 0.9), + (0, 2, 298.15, 0.5, 1.3, 0.7), + (2, 3, 298.15, 0.5, 1.15, 0.85), + (3, 4, 298.15, 0.5, 1.25, 0.75), + ] + + # 重复生成更多数据 + for solv1_id, solv2_id, temp, x1, gamma1, gamma2 in solvent_pairs: + for _ in range(100): # 每个组合生成100个样本 + # 添加一些随机变化 + x1_var = np.clip(x1 + np.random.normal(0, 0.1), 0.01, 0.99) + x2 = 1.0 - x1_var + + # 简单的活度系数模拟 + gamma1_var = gamma1 * (1 + 0.1 * np.random.randn()) + gamma2_var = gamma2 * (1 + 0.1 * np.random.randn()) + + # 转换为 ln_gamma + ln_gamma1_var = np.log(abs(gamma1_var)) + ln_gamma2_var = np.log(abs(gamma2_var)) + + data.append({ + 'solv1_id': solv1_id, + 'solv2_id': solv2_id, + 'temperature (K)': temp + np.random.normal(0, 5), + 'x(1)': x1_var, + 'x(2)': x2, + 'ln_gamma_1': ln_gamma1_var, + 'ln_gamma_2': ln_gamma2_var + }) + + # 创建目录 + os.makedirs(config.OUTPUT_DIR, exist_ok=True) + + # 保存数据 + df = pd.DataFrame(data) + + # 分割数据集 + train_df = df.iloc[:400] + val_df = df.iloc[400:450] + test_df = df.iloc[450:] + + train_df.to_csv(config.train_mcm_path, index=False) + val_df.to_csv(config.val_mcm_path, index=False) + test_df.to_csv(config.test_mcm_path, index=False) + + print(f"✓ MCM 训练集: {len(train_df)} 样本") + print(f"✓ MCM 验证集: {len(val_df)} 样本") + print(f"✓ MCM 测试集: {len(test_df)} 样本") + print(f"✓ 数据保存在: {config.OUTPUT_DIR}/") + + return 5 # 返回最大 solvent ID + + +def test_mcm_forward(): + """测试 MCM 模型前向传播""" + print("\n" + "=" * 80) + print("测试 MCM 模型前向传播") + print("=" * 80) + + try: + from ppmat.models.gdinn.mcm import MCM_MultiMLP + import pandas as pd + + # 创建测试数据 + max_solvent_id = create_test_data_mcm() + + # 创建模型 + model = MCM_MultiMLP( + solvent_id_max=max_solvent_id, + dim_hidden_channels=64, + dropout_hidden=0.05, + dropout_interaction=0.03, + mlp_num_hid_layers=1, + pinn_lambda=1.0 + ) + + print(f"✓ MCM 模型创建成功") + param_count = sum(p.numel().item() for p in model.parameters()) + print(f" 参数数量: {param_count}") + + # 加载测试数据 + df = pd.read_csv(config.train_mcm_path) + + # 创建 batch data + batch_size = 32 + batch_data = { + 'solv1_id': paddle.to_tensor(df['solv1_id'].values[:batch_size], dtype='int64'), + 'solv2_id': paddle.to_tensor(df['solv2_id'].values[:batch_size], dtype='int64'), + 'x1': paddle.to_tensor(df['x(1)'].values[:batch_size], dtype='float32'), + 'gamma1': paddle.to_tensor(df['ln_gamma_1'].values[:batch_size], dtype='float32').unsqueeze(-1), + 'gamma2': paddle.to_tensor(df['ln_gamma_2'].values[:batch_size], dtype='float32').unsqueeze(-1) + } + + print(f"\n测试前向传播...") + + # 前向传播 + output = model(batch_data) + + print(f"✓ 前向传播成功") + print(f" loss_dict keys: {list(output['loss_dict'].keys())}") + print(f" pred_dict keys: {list(output['pred_dict'].keys())}") + print(f" pred_loss: {output['loss_dict'].get('pred_loss', 0).item():.4f}") + print(f" gd_loss: {output['loss_dict'].get('gd_loss', 0).item():.4f}") + print(f" gamma1 shape: {output['pred_dict']['gamma1'].shape}") + print(f" gamma2 shape: {output['pred_dict']['gamma2'].shape}") + + print("\n✓ MCM 模型前向传播测试通过") + return True + + except Exception as e: + print(f"\n✗ MCM 模型前向传播测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_mcm_training(): + """测试 MCM 模型训练步骤""" + print("\n" + "=" * 80) + print("测试 MCM 模型训练步骤") + print("=" * 80) + + try: + from ppmat.models.gdinn.mcm import MCM_MultiMLP + import pandas as pd + + # 创建测试数据 + max_solvent_id = 5 + + # 创建模型 + model = MCM_MultiMLP( + solvent_id_max=max_solvent_id, + dim_hidden_channels=64, + dropout_hidden=0.05, + dropout_interaction=0.03, + mlp_num_hid_layers=1, + pinn_lambda=1.0 + ) + + print(f"✓ MCM 模型创建成功") + + # 加载训练数据 + df = pd.read_csv(config.train_mcm_path) + + # 创建优化器 + optimizer = paddle.optimizer.Adam( + parameters=model.parameters(), + learning_rate=0.001 + ) + + print(f"✓ 优化器创建成功") + + # 测试训练步骤 + model.train() + + batch_size = 32 + num_batches = min(3, len(df) // batch_size) + + for batch_idx in range(num_batches): + start_idx = batch_idx * batch_size + end_idx = start_idx + batch_size + + # 创建 batch data + batch_data = { + 'solv1_id': paddle.to_tensor(df['solv1_id'].values[start_idx:end_idx], dtype='int64'), + 'solv2_id': paddle.to_tensor(df['solv2_id'].values[start_idx:end_idx], dtype='int64'), + 'x1': paddle.to_tensor(df['x(1)'].values[start_idx:end_idx], dtype='float32'), + 'gamma1': paddle.to_tensor(df['ln_gamma_1'].values[start_idx:end_idx], dtype='float32').unsqueeze(-1), + 'gamma2': paddle.to_tensor(df['ln_gamma_2'].values[start_idx:end_idx], dtype='float32').unsqueeze(-1) + } + + # 前向传播 + output = model(batch_data) + loss = output['loss_dict']['loss'] + + # 反向传播 + loss.backward() + + # 梯度裁剪 + paddle.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + + # 参数更新 + optimizer.step() + optimizer.clear_grad() + + print(f" Step {batch_idx + 1}: Loss = {loss.item():.4f}") + + print("\n✓ MCM 训练步骤测试通过") + return True + + except Exception as e: + print(f"\n✗ MCM 训练步骤测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_gibbs_duhem_loss(): + """测试 GibbsDuhemLoss 损失函数""" + print("\n" + "=" * 80) + print("测试 GibbsDuhemLoss 损失函数") + print("=" * 80) + + try: + from ppmat.losses import GibbsDuhemLoss + + # 创建损失函数实例 + criterion = GibbsDuhemLoss(lambda_gd=1.0, loss_type='mse') + print(f"✓ GibbsDuhemLoss 创建成功") + print(f" lambda_gd: {criterion.lambda_gd}") + print(f" loss_type: {criterion.loss_type}") + + # 测试 1: 简单的合成数据测试 + print("\n测试 1: 合成数据测试") + batch_size = 10 + + # 创建测试数据 + # x1 在 [0.1, 0.9] 范围内 + x1 = paddle.linspace(0.1, 0.9, batch_size).unsqueeze(-1) + x1.stop_gradient = False + + # 创建简单的 ln_gamma 函数(满足 Gibbs-Duhem 约束) + # 例如:ln_gamma1 = A * x2^2, ln_gamma2 = A * x1^2 + # 这满足 x1 * d(ln_gamma1)/dx1 + x2 * d(ln_gamma2)/dx1 = 0 + A = 2.0 + x2 = 1 - x1 + ln_gamma1 = A * x2 * x2 + ln_gamma2 = A * x1 * x1 + + # 计算损失 + loss = criterion(ln_gamma1, ln_gamma2, x1) + print(f" 满足约束的损失: {loss.item():.6f}") + + # 测试 2: 不满足约束的情况 + print("\n测试 2: 不满足约束的情况") + ln_gamma1_bad = paddle.randn([batch_size, 1]) + ln_gamma2_bad = paddle.randn([batch_size, 1]) + x1_test = paddle.linspace(0.1, 0.9, batch_size).unsqueeze(-1) + x1_test.stop_gradient = False + + # 需要重新计算以建立计算图 + ln_gamma1_bad = x1_test * 3.0 # 简单的线性函数 + ln_gamma2_bad = x1_test * 2.0 + + loss_bad = criterion(ln_gamma1_bad, ln_gamma2_bad, x1_test) + print(f" 不满足约束的损失: {loss_bad.item():.6f}") + + # 测试 3: 不同的损失类型 + print("\n测试 3: 测试不同损失类型") + for loss_type in ['mse', 'mae', 'huber']: + criterion_type = GibbsDuhemLoss(lambda_gd=1.0, loss_type=loss_type) + + x1_type = paddle.linspace(0.1, 0.9, batch_size).unsqueeze(-1) + x1_type.stop_gradient = False + x2_type = 1 - x1_type + + # 使用满足约束的函数 + ln_gamma1_type = 2.0 * x2_type * x2_type + ln_gamma2_type = 2.0 * x1_type * x1_type + + loss_type_val = criterion_type(ln_gamma1_type, ln_gamma2_type, x1_type) + print(f" {loss_type} 损失: {loss_type_val.item():.6f}") + + # 测试 5: 梯度计算测试 + print("\n测试 5: 梯度计算测试") + criterion_grad = GibbsDuhemLoss(lambda_gd=1.0, create_graph=True) + + x1_grad = paddle.linspace(0.1, 0.9, batch_size).unsqueeze(-1) + x1_grad.stop_gradient = False + + # 创建可训练参数 + A_param = paddle.create_parameter(shape=[1], dtype='float32', default_initializer=paddle.nn.initializer.Constant(2.0)) + + x2_grad = 1 - x1_grad + ln_gamma1_grad = A_param * x2_grad * x2_grad + ln_gamma2_grad = A_param * x1_grad * x1_grad + + loss_grad = criterion_grad(ln_gamma1_grad, ln_gamma2_grad, x1_grad) + + # 计算梯度 + loss_grad.backward() + + print(f" 损失值: {loss_grad.item():.6f}") + print(f" A_param 梯度: {A_param.grad.item():.6f}") + + print("\n✓ GibbsDuhemLoss 测试通过") + return True + + except Exception as e: + print(f"\n✗ GibbsDuhemLoss 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_gibbs_duhem_loss_with_model(): + """测试 GibbsDuhemLoss 与模型集成""" + print("\n" + "=" * 80) + print("测试 GibbsDuhemLoss 与模型集成") + print("=" * 80) + + try: + from ppmat.models import SolvGNN + from ppmat.losses import GibbsDuhemLoss + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 + model = SolvGNN( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + num_step_message_passing=1, + pinn_lambda=0.0 # 禁用模型内部的 GD loss + ) + + # 创建独立的 GibbsDuhemLoss + criterion_gd = GibbsDuhemLoss(lambda_gd=1.0, loss_type='mse') + + print(f"✓ 模型和 GibbsDuhemLoss 创建成功") + + # 创建数据加载器 + dataset = BinaryActivityDataset( + path=config.train_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=False, + drop_last=True + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 创建优化器 + optimizer = paddle.optimizer.Adam( + parameters=model.parameters(), + learning_rate=0.001 + ) + + print(f"✓ 优化器创建成功") + + # 测试使用 GibbsDuhemLoss 计算额外损失 + model.train() + + for batch_idx, batch in enumerate(dataloader): + if batch_idx >= 3: + break + + print(f"\n测试 Batch {batch_idx + 1}...") + + # 准备输入数据,确保 x1 可以计算梯度 + x1_gd = batch['x1'].clone() + x1_gd.stop_gradient = False + + # 前向传播 + output = model(batch) + + # 获取预测值 + gamma1_pred = output['pred_dict']['gamma1'] + gamma2_pred = output['pred_dict']['gamma2'] + + # 计算预测损失 (MSE) + pred_loss = paddle.nn.functional.mse_loss(gamma1_pred, batch['gamma1']) + \ + paddle.nn.functional.mse_loss(gamma2_pred, batch['gamma2']) + + # 使用 criterion_gd 计算 Gibbs-Duhem 约束损失 + gd_loss = criterion_gd(gamma1_pred, gamma2_pred, x1_gd) + + # 总损失 = 预测损失 + Gibbs-Duhem 约束损失 + total_loss = pred_loss + gd_loss + + # 反向传播 + total_loss.backward() + + # 梯度裁剪 + paddle.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + + # 参数更新 + optimizer.step() + optimizer.clear_grad() + + print(f"✓ 训练步骤成功") + print(f" pred_loss: {pred_loss.item():.4f}") + print(f" gd_loss (使用 criterion_gd): {gd_loss.item():.4f}") + print(f" total_loss: {total_loss.item():.4f}") + print(f" gamma1 shape: {gamma1_pred.shape}") + print(f" gamma2 shape: {gamma2_pred.shape}") + + print("\n✓ GibbsDuhemLoss 与模型集成测试通过") + return True + + except Exception as e: + print(f"\n✗ GibbsDuhemLoss 与模型集成测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_prediction(): + """测试预测""" + print("\n" + "=" * 80) + print("测试预测") + print("=" * 80) + + try: + from ppmat.models import SolvGNN + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 创建模型 (使用与原始 GDI-NN 一致的默认参数) + model = SolvGNN( + in_dim=74, # Match GDI-NN's feature dimension + hidden_dim=64, + n_classes=1, + num_step_message_passing=1, + pinn_lambda=1.0 + ) + + # 创建测试数据加载器(GDI-NN 格式) + dataset = BinaryActivityDataset( + path=config.test_binary_path, + solvent_list_path=config.solvent_list_output_path, + add_self_loop=True, + preload_graphs=False + ) + + sampler = BatchSampler( + dataset=dataset, + batch_size=32, + shuffle=False, + drop_last=False + ) + + collator = BinaryActivityCollator() + dataloader = DataLoader( + dataset=dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 测试预测 + model.eval() + + all_predictions = [] + + with paddle.no_grad(): + for batch in dataloader: + output = model(batch) + pred_dict = output['pred_dict'] + + gamma1_pred = pred_dict['gamma1'].numpy() + gamma2_pred = pred_dict['gamma2'].numpy() + gamma1_target = batch['gamma1'].numpy() + gamma2_target = batch['gamma2'].numpy() + + for i in range(len(gamma1_pred)): + all_predictions.append({ + 'gamma1_pred': gamma1_pred[i], + 'gamma2_pred': gamma2_pred[i], + 'gamma1_target': gamma1_target[i], + 'gamma2_target': gamma2_target[i] + }) + + print(f"✓ 预测完成") + print(f" 预测样本数: {len(all_predictions)}") + + # 计算简单的误差 + mae = np.mean([ + abs(p['gamma1_pred'] - p['gamma1_target']) + + abs(p['gamma2_pred'] - p['gamma2_target']) + for p in all_predictions + ]) / 2 + + print(f" 平均绝对误差 (MAE): {mae:.4f}") + + # 显示前5个预测结果 + print(f"\n前5个预测结果:") + for i, pred in enumerate(all_predictions[:5]): + print(f" {i+1}. gamma1: pred={float(pred['gamma1_pred']):.4f}, target={float(pred['gamma1_target']):.4f}, " + f"error={abs(float(pred['gamma1_pred']) - float(pred['gamma1_target'])):.4f}") + print(f" gamma2: pred={float(pred['gamma2_pred']):.4f}, target={float(pred['gamma2_target']):.4f}, " + f"error={abs(float(pred['gamma2_pred']) - float(pred['gamma2_target'])):.4f}") + + print("\n✓ 预测测试通过") + return True + + except Exception as e: + print(f"\n✗ 预测测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """主函数""" + print("=" * 80) + print("GDI-NN 快速测试") + print("=" * 80) + + # 创建测试数据 + create_test_data() + + # 运行测试 + results = {} + + results['数据加载'] = test_data_loading() + results['SolvGNN前向传播'] = test_model_forward() + results['SolvGNN训练步骤'] = test_training_step() + results['SolvGNNxMLP前向传播'] = test_solvgnn_xmlp_forward() + results['SolvGNNxMLP训练步骤'] = test_solvgnn_xmlp_training() + results['GEGNN前向传播'] = test_gegnn_forward() + results['GEGNN训练步骤'] = test_gegnn_training() + results['MCM前向传播'] = test_mcm_forward() + results['MCM训练步骤'] = test_mcm_training() + results['GibbsDuhemLoss'] = test_gibbs_duhem_loss() + results['GibbsDuhemLoss与模型集成'] = test_gibbs_duhem_loss_with_model() + results['预测'] = test_prediction() + + # 总结 + print("\n" + "=" * 80) + print("测试总结") + print("=" * 80) + + passed = sum(results.values()) + total = len(results) + + for test_name, result in results.items(): + status = "✓ 通过" if result else "✗ 失败" + print(f"{test_name}: {status}") + + print(f"\n总计: {passed}/{total} 测试通过") + print("=" * 80) + + if passed == total: + print("✓ 所有测试通过!") + print("\n现在可以使用以下命令进行完整训练:") + print(" python train_gdinn.py \\") + print(" --model_type SolvGNN \\") + print(" --batch_size 32 \\") + print(" --epochs 2 \\") + print(" --hidden_dim 64 \\") + print(" --lr 1e-3 \\") + print(" --pinn_lambda 1.0") + print("\n训练完成后,可以使用以下命令进行预测:") + print(" python predict_gdinn.py \\") + print(" --model_type SolvGNN \\") + print(" --batch_size 32 \\") + print(" --hidden_dim 64 \\") + print(" --checkpoint ./checkpoints/best_model.pdparams") + return 0 + else: + print("✗ 部分测试失败,请检查错误信息") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/rfcs/PaddleMaterials/GDI-NN/test_gdinn/test_alignment.py b/rfcs/PaddleMaterials/GDI-NN/test_gdinn/test_alignment.py new file mode 100644 index 000000000..502b66017 --- /dev/null +++ b/rfcs/PaddleMaterials/GDI-NN/test_gdinn/test_alignment.py @@ -0,0 +1,831 @@ +#!/usr/bin/env python +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. + +""" +GDI-NN 精度对齐测试脚本 + +验证 PaddleMaterials 的 gdinn 实现与原始 GDI-NN (PyTorch) 实现的模型结构一致性: +- ppmat/models/gdinn/gnn.py <-> model/model_GNN.py +- ppmat/models/gdinn/mcm.py <-> model/model_MCM.py + +通过对比相同随机输入的输出来验证模型结构一致性 +""" + +import os +import sys +import numpy as np +import pandas as pd +import paddle +import torch +import dgl + +# 设置 Paddle 使用 GPU +paddle.set_device('gpu:0') + +# 设置 PyTorch 使用 GPU (GDI-NN 代码内部硬编码了 .cuda()) +torch.cuda.set_device(0) + +# 添加路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) +sys.path.insert(0, '/home/shun/workspace/Projects/github/GDI-NN') + +# ============================================================================ +# 配置路径 (参考 quick_test.py) +# ============================================================================ +GDI_NN_DIR = '/home/shun/workspace/Projects/github/GDI-NN' + +class Config: + # 数据目录配置 + DATASET_DIR = './test_gdinn/dataset' + OUTPUT_DIR = './test_gdinn/data/alignment_test' + + # 数据文件 + SOLVENT_LIST_FILE = 'solvent_list.csv' + BINARY_DATA_FILE = 'output_binary_with_inf_all.csv' + + # 精度阈值 + FORWARD_TOLERANCE = 1e-3 + + # 测试参数 + BATCH_SIZE = 8 + HIDDEN_DIM = 64 + IN_DIM = 74 # Match GDI-NN's feature dimension + NUM_CLASSES = 1 + + @property + def solvent_list_path(self): + return os.path.join(self.DATASET_DIR, self.SOLVENT_LIST_FILE) + + @property + def binary_data_path(self): + return os.path.join(self.DATASET_DIR, self.BINARY_DATA_FILE) + + @property + def output_dir(self): + os.makedirs(self.OUTPUT_DIR, exist_ok=True) + return self.OUTPUT_DIR + + +config = Config() + + +def set_random_seed(seed=42): + """设置随机种子以确保可重复性""" + np.random.seed(seed) + paddle.seed(seed) + torch.manual_seed(seed) + + +def torch_to_paddle_param(torch_param, transpose=False): + """将 PyTorch 参数转换为 Paddle Tensor""" + np_val = torch_param.detach().cpu().numpy() + if transpose and np_val.ndim == 2: + np_val = np_val.T # PyTorch Linear: [out, in] -> Paddle Linear: [in, out] + return paddle.to_tensor(np_val) + + +def sync_gnn_weights(torch_model, paddle_model): + """将 PyTorch GNN 模型权重同步到 Paddle GNN 模型""" + # GraphConv: weight [in, out], bias [out] — same layout + paddle_model.conv1.weight.set_value(torch_to_paddle_param(torch_model.conv1.weight)) + paddle_model.conv1.bias.set_value(torch_to_paddle_param(torch_model.conv1.bias)) + paddle_model.conv2.weight.set_value(torch_to_paddle_param(torch_model.conv2.weight)) + paddle_model.conv2.bias.set_value(torch_to_paddle_param(torch_model.conv2.bias)) + + # MPNNConv / MPNNconv + tc = torch_model.global_conv1 + pc = paddle_model.global_conv + + # project_node_feats: Sequential(Linear, Activation) + pc.project_node_feats[0].weight.set_value(torch_to_paddle_param(tc.project_node_feats[0].weight, transpose=True)) + pc.project_node_feats[0].bias.set_value(torch_to_paddle_param(tc.project_node_feats[0].bias)) + + # gnn_layer.edge_func: Sequential(Linear, Activation, Linear) + pc.gnn_layer.edge_func[0].weight.set_value(torch_to_paddle_param(tc.gnn_layer.edge_func[0].weight, transpose=True)) + pc.gnn_layer.edge_func[0].bias.set_value(torch_to_paddle_param(tc.gnn_layer.edge_func[0].bias)) + pc.gnn_layer.edge_func[2].weight.set_value(torch_to_paddle_param(tc.gnn_layer.edge_func[2].weight, transpose=True)) + pc.gnn_layer.edge_func[2].bias.set_value(torch_to_paddle_param(tc.gnn_layer.edge_func[2].bias)) + + # gnn_layer.bias + pc.gnn_layer.bias.set_value(torch_to_paddle_param(tc.gnn_layer.bias)) + + # GRU (PyTorch) -> GRUCell (Paddle): same weight layout [3*hidden, input/hidden] + pc.gru_cell.weight_ih.set_value(torch_to_paddle_param(tc.gru.weight_ih_l0)) + pc.gru_cell.weight_hh.set_value(torch_to_paddle_param(tc.gru.weight_hh_l0)) + pc.gru_cell.bias_ih.set_value(torch_to_paddle_param(tc.gru.bias_ih_l0)) + pc.gru_cell.bias_hh.set_value(torch_to_paddle_param(tc.gru.bias_hh_l0)) + + # Classifier Linear layers: transpose weights + paddle_model.classify1.weight.set_value(torch_to_paddle_param(torch_model.classify1.weight, transpose=True)) + paddle_model.classify1.bias.set_value(torch_to_paddle_param(torch_model.classify1.bias)) + paddle_model.classify2.weight.set_value(torch_to_paddle_param(torch_model.classify2.weight, transpose=True)) + paddle_model.classify2.bias.set_value(torch_to_paddle_param(torch_model.classify2.bias)) + paddle_model.classify3.weight.set_value(torch_to_paddle_param(torch_model.classify3.weight, transpose=True)) + paddle_model.classify3.bias.set_value(torch_to_paddle_param(torch_model.classify3.bias)) + + +def sync_mcm_weights(torch_model, paddle_model): + """将 PyTorch MCM 模型权重同步到 Paddle MCM 模型""" + # Embedding: solvent_emb[0][0] -> solvent_emb.embedding + torch_emb_seq = torch_model.solvent_emb[0] + paddle_model.solvent_emb.embedding.weight.set_value( + torch_to_paddle_param(torch_emb_seq[0].weight)) + + # Linear layers: solvent_emb[0][3,6,9] -> solvent_emb.linear1,2,3 + paddle_model.solvent_emb.linear1.weight.set_value( + torch_to_paddle_param(torch_emb_seq[3].weight, transpose=True)) + paddle_model.solvent_emb.linear1.bias.set_value( + torch_to_paddle_param(torch_emb_seq[3].bias)) + paddle_model.solvent_emb.linear2.weight.set_value( + torch_to_paddle_param(torch_emb_seq[6].weight, transpose=True)) + paddle_model.solvent_emb.linear2.bias.set_value( + torch_to_paddle_param(torch_emb_seq[6].bias)) + paddle_model.solvent_emb.linear3.weight.set_value( + torch_to_paddle_param(torch_emb_seq[9].weight, transpose=True)) + paddle_model.solvent_emb.linear3.bias.set_value( + torch_to_paddle_param(torch_emb_seq[9].bias)) + + # layers_end: two Sequential branches + for branch_idx in range(2): + torch_branch = torch_model.layers_end[branch_idx] + paddle_branch = paddle_model.layers_end[branch_idx] + # Copy all Linear layers + for t_layer, p_layer in zip(torch_branch, paddle_branch): + if hasattr(t_layer, 'weight') and hasattr(p_layer, 'weight'): + p_layer.weight.set_value(torch_to_paddle_param(t_layer.weight, transpose=True)) + p_layer.bias.set_value(torch_to_paddle_param(t_layer.bias)) + + +def prepare_data(): + """准备真实测试数据""" + print("准备测试数据...") + + # 读取数据 + solvent_df = pd.read_csv(config.solvent_list_path) + df = pd.read_csv(config.binary_data_path) + + # 取前500条数据进行测试 + df = df.head(500) + + print(f"✓ 加载数据: {len(df)} 样本, {len(solvent_df)} 溶剂") + + return df, solvent_df + + +# ============================================================================ +# GNN 模型精度对齐测试 +# ============================================================================ + +def test_gnn_alignment(): + """测试 GNN 模型精度对齐 (Paddle vs PyTorch) + + 注意: 数据集自动计算 HB 特征(与 GDI-NN 原始行为一致) + """ + print("\n" + "=" * 80) + print("测试 GNN 精度对齐 (Paddle vs PyTorch)") + print("=" * 80) + + try: + # 导入 Paddle 模型和数据集 + from ppmat.models.gdinn.gnn import SolvGNN + from ppmat.datasets import BinaryActivityDataset + from paddle.io import DataLoader, BatchSampler + from ppmat.datasets.collate_fn import BinaryActivityCollator + + # 导入 PyTorch 模型 + sys.path.insert(0, GDI_NN_DIR) + from model.model_GNN import solvgnn_binary + + # 创建 Paddle 数据集(自动计算 HB 特征) + paddle_dataset = BinaryActivityDataset( + path=config.binary_data_path, + solvent_list_path=config.solvent_list_path, + add_self_loop=True, + preload_graphs=False + ) + + # 创建采样器 + sampler = BatchSampler( + dataset=paddle_dataset, + batch_size=config.BATCH_SIZE, + shuffle=False, + drop_last=False + ) + + collator = BinaryActivityCollator() + paddle_loader = DataLoader( + dataset=paddle_dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=collator + ) + + # 创建 Paddle 模型 + paddle_model = SolvGNN( + in_dim=config.IN_DIM, + hidden_dim=config.HIDDEN_DIM, + n_classes=config.NUM_CLASSES, + num_step_message_passing=1, + pinn_lambda=0.0 + ) + paddle_model.eval() + + # 创建 PyTorch 模型并移到 GPU + torch_model = solvgnn_binary( + in_dim=config.IN_DIM, + hidden_dim=config.HIDDEN_DIM, + n_classes=config.NUM_CLASSES, + mlp_dropout_rate=0, + mlp_activation="relu", + mpnn_activation="relu", + mlp_num_hid_layers=2 + ) + torch_model = torch_model.cuda() + torch_model.eval() + + # 同步权重: PyTorch -> Paddle + sync_gnn_weights(torch_model, paddle_model) + + # 获取一个 batch + for batch_idx, paddle_batch in enumerate(paddle_loader): + if batch_idx >= 1: + break + + # 先获取 PyTorch 数据 (在 Paddle 前向传播之前,避免图特征被修改) + g1_paddle = paddle_batch['g1'] + g2_paddle = paddle_batch['g2'] + x1_np = paddle_batch['x1'].numpy() + + # 提取 HB 特征 (GDI-NN 要求) + inter_hb_np = paddle_batch['inter_hb'].numpy().flatten() + intra_hb1_np = paddle_batch['intra_hb1'].numpy().flatten() + intra_hb2_np = paddle_batch['intra_hb2'].numpy().flatten() + + # 转换 Paddle 图到 DGL 图 + g1_dgl = paddle_graph_to_dgl(g1_paddle) + g2_dgl = paddle_graph_to_dgl(g2_paddle) + + # 创建 empty solvsys (匹配原始 generate_solvsys) + empty_solvsys = create_empty_solvsys(config.BATCH_SIZE) + + # PyTorch 前向传播 (solv1_x 需要 1D tensor) + torch_batch = { + 'g1': g1_dgl, + 'g2': g2_dgl, + 'solv1_x': torch.from_numpy(x1_np).flatten(), + 'inter_hb': torch.from_numpy(inter_hb_np), + 'intra_hb1': torch.from_numpy(intra_hb1_np), + 'intra_hb2': torch.from_numpy(intra_hb2_np), + } + + with torch.no_grad(): + torch_output = torch_model(torch_batch, empty_solvsys, gamma_grad=False) + + torch_gamma1 = torch_output[:, 0].cpu().numpy() + torch_gamma2 = torch_output[:, 1].cpu().numpy() + + # Paddle 前向传播 + with paddle.no_grad(): + paddle_output = paddle_model(paddle_batch) + + paddle_pred = paddle_output['pred_dict'] + # 使用 ln_gamma 进行比较 (PyTorch 输出的是 ln_gamma) + paddle_gamma1 = paddle_pred['ln_gamma1'].numpy() + paddle_gamma2 = paddle_pred['ln_gamma2'].numpy() + + # 比较 + diff_gamma1 = np.abs(paddle_gamma1.flatten() - torch_gamma1) + diff_gamma2 = np.abs(paddle_gamma2.flatten() - torch_gamma2) + + max_diff_gamma1 = np.max(diff_gamma1) + max_diff_gamma2 = np.max(diff_gamma2) + mean_diff_gamma1 = np.mean(diff_gamma1) + mean_diff_gamma2 = np.mean(diff_gamma2) + + print(f" gamma1 最大差异: {max_diff_gamma1:.6f}") + print(f" gamma1 平均差异: {mean_diff_gamma1:.6f}") + print(f" gamma2 最大差异: {max_diff_gamma2:.6f}") + print(f" gamma2 平均差异: {mean_diff_gamma2:.6f}") + + passed = max(max_diff_gamma1, max_diff_gamma2) < config.FORWARD_TOLERANCE + + if passed: + return True, "精度对齐" + else: + return False, f"gamma1 max diff: {max_diff_gamma1:.6f}, gamma2 max diff: {max_diff_gamma2:.6f}" + + except ImportError as e: + print(f"✗ 导入失败: {e}") + return False, f"导入失败: {e}" + except Exception as e: + print(f"✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return False, str(e) + + +def paddle_graph_to_dgl(paddle_g): + """将 Paddle batched 图转换为 DGL batched 图""" + # 获取节点和边信息 + # pgl.Graph.edges is a tensor of shape [num_edges, 2] + edges_tensor = paddle_g.edges + # Handle both numpy arrays and paddle tensors + if hasattr(edges_tensor, 'numpy'): + src = edges_tensor[:, 0].numpy() + dst = edges_tensor[:, 1].numpy() + else: + src = np.asarray(edges_tensor[:, 0]) + dst = np.asarray(edges_tensor[:, 1]) + num_nodes = int(paddle_g.num_nodes) + + # 获取节点特征 (Paddle 使用 node_feat 字典) + if paddle_g.node_feat and 'h' in paddle_g.node_feat: + h_feat = paddle_g.node_feat['h'] + node_feats = h_feat.numpy() if hasattr(h_feat, 'numpy') else np.asarray(h_feat) + elif paddle_g.node_feat and 'feat' in paddle_g.node_feat: + feat = paddle_g.node_feat['feat'] + node_feats = feat.numpy() if hasattr(feat, 'numpy') else np.asarray(feat) + else: + node_feats = np.random.randn(num_nodes, config.IN_DIM).astype(np.float32) + + # 创建 DGL 图并移到 GPU + g = dgl.graph((src, dst), num_nodes=num_nodes) + g = g.to("cuda:0") # 先把图移到 GPU + g.ndata['h'] = torch.from_numpy(node_feats).cuda() + + # 设置 batch 信息 (如果存在) + # pgl uses graph_node_id to identify which graph each node belongs to + # Count nodes per graph + graph_node_id_raw = paddle_g.graph_node_id + graph_node_id = graph_node_id_raw.numpy() if hasattr(graph_node_id_raw, 'numpy') else np.asarray(graph_node_id_raw) + num_graphs = int(paddle_g.num_graph) + batch_num_nodes = np.array([np.sum(graph_node_id == i) for i in range(num_graphs)]) + g.set_batch_num_nodes(batch_num_nodes) + + # Count edges per graph + graph_edge_id_raw = paddle_g.graph_edge_id + graph_edge_id = graph_edge_id_raw.numpy() if hasattr(graph_edge_id_raw, 'numpy') else np.asarray(graph_edge_id_raw) + batch_num_edges = np.array([np.sum(graph_edge_id == i) for i in range(num_graphs)]) + g.set_batch_num_edges(batch_num_edges) + + return g + + +def create_empty_solvsys(batch_size): + """创建溶剂系统图 (GPU),匹配原始 generate_solvsys 方法。 + + 原始代码: + solvsys.add_nodes(n_solv * batch_size) + src = arange(batch_size), dst = arange(batch_size, 2*batch_size) + add_edges(cat(src, dst), cat(dst, src)) # bidirectional + add_edges(arange(2*batch), arange(2*batch)) # self-loops + """ + n_solv = 2 + num_nodes = n_solv * batch_size + + src_range = torch.arange(batch_size) + dst_range = torch.arange(batch_size, num_nodes) + all_range = torch.arange(num_nodes) + + # Bidirectional edges + self-loops + edge_src = torch.cat([torch.cat([src_range, dst_range]), all_range]) + edge_dst = torch.cat([torch.cat([dst_range, src_range]), all_range]) + + g = dgl.graph((edge_src, edge_dst), num_nodes=num_nodes) + return g.to("cuda:0") + + +# ============================================================================ +# MCM 模型精度对齐测试 +# ============================================================================ + +def test_mcm_alignment(): + """测试 MCM 模型精度对齐 (Paddle vs PyTorch)""" + print("\n" + "=" * 80) + print("测试 MCM 精度对齐 (Paddle vs PyTorch)") + print("=" * 80) + + try: + # 准备数据 + df, solvent_df = prepare_data() + + # 导入 Paddle MCM 模型 + from ppmat.models.gdinn.mcm import MCM_MultiMLP + + # 导入 PyTorch MCM 模型 + sys.path.insert(0, GDI_NN_DIR) + from model.model_MCM import MCM_multiMLP + + # 创建 Paddle MCM 模型 + solvent_id_max = len(solvent_df) + + paddle_model = MCM_MultiMLP( + solvent_id_max=solvent_id_max, + dim_hidden_channels=config.HIDDEN_DIM, + dropout_hidden=0.0, + dropout_interaction=0.0, + mlp_num_hid_layers=1, + pinn_lambda=0.0 + ) + paddle_model.eval() + + # 创建 PyTorch MCM 模型并移到 GPU + torch_model = MCM_multiMLP( + solvent_id_max=solvent_id_max, + dim_hidden_channels=config.HIDDEN_DIM, + dropout_hidden=0.0, + dropout_interaction=0.0, + mlp_num_hid_layers=1 + ) + torch_model = torch_model.cuda() + torch_model.eval() + + # 同步权重: PyTorch -> Paddle + sync_mcm_weights(torch_model, paddle_model) + + # 准备测试数据 (从真实数据中取) + batch_size = min(config.BATCH_SIZE, len(df)) + + # 查找溶剂 ID + solv1_smiles = df['solv1'].values[:batch_size] + solv2_smiles = df['solv2'].values[:batch_size] + + # 创建 SMILES 到 ID 的映射 + solvent_list = solvent_df['smiles_can'].tolist() + solv1_id = [solvent_list.index(s) if s in solvent_list else 0 for s in solv1_smiles] + solv2_id = [solvent_list.index(s) if s in solvent_list else 0 for s in solv2_smiles] + + x1 = df['solv1_x'].values[:batch_size].astype(np.float32) + gamma1 = df['solv1_gamma'].values[:batch_size].astype(np.float32) + gamma2 = df['solv2_gamma'].values[:batch_size].astype(np.float32) + + # Paddle 前向传播 + paddle_batch = { + 'solv1_id': paddle.to_tensor(solv1_id, dtype='int64'), + 'solv2_id': paddle.to_tensor(solv2_id, dtype='int64'), + 'x1': paddle.to_tensor(x1, dtype='float32'), + 'gamma1': paddle.to_tensor(gamma1, dtype='float32').reshape([-1, 1]), + 'gamma2': paddle.to_tensor(gamma2, dtype='float32').reshape([-1, 1]), + } + + with paddle.no_grad(): + paddle_output = paddle_model(paddle_batch) + + paddle_pred = paddle_output['pred_dict'] + paddle_ln_gamma1 = paddle_pred['ln_gamma1'].numpy() + paddle_ln_gamma2 = paddle_pred['ln_gamma2'].numpy() + + # PyTorch 前向传播 (solv1_x 需要 1D tensor) + torch_batch = { + 'solv1_id': torch.tensor(solv1_id, dtype=torch.int64), + 'solv2_id': torch.tensor(solv2_id, dtype=torch.int64), + 'solv1_x': torch.tensor(x1, dtype=torch.float32).flatten(), + 'gamma1': torch.tensor(gamma1, dtype=torch.float32).reshape([-1, 1]), + 'gamma2': torch.tensor(gamma2, dtype=torch.float32).reshape([-1, 1]), + } + + with torch.no_grad(): + torch_output = torch_model(torch_batch, None, gamma_grad=False) + + torch_ln_gamma1 = torch_output[:, 0].cpu().numpy() + torch_ln_gamma2 = torch_output[:, 1].cpu().numpy() + + # 比较 + diff_ln_gamma1 = np.abs(paddle_ln_gamma1.flatten() - torch_ln_gamma1) + diff_ln_gamma2 = np.abs(paddle_ln_gamma2.flatten() - torch_ln_gamma2) + + max_diff_gamma1 = np.max(diff_ln_gamma1) + max_diff_gamma2 = np.max(diff_ln_gamma2) + mean_diff_gamma1 = np.mean(diff_ln_gamma1) + mean_diff_gamma2 = np.mean(diff_ln_gamma2) + + print(f" ln_gamma1 最大差异: {max_diff_gamma1:.6f}") + print(f" ln_gamma1 平均差异: {mean_diff_gamma1:.6f}") + print(f" ln_gamma2 最大差异: {max_diff_gamma2:.6f}") + print(f" ln_gamma2 平均差异: {mean_diff_gamma2:.6f}") + + passed = max(max_diff_gamma1, max_diff_gamma2) < config.FORWARD_TOLERANCE + + if passed: + return True, "精度对齐" + else: + return False, f"ln_gamma1 max diff: {max_diff_gamma1:.6f}, ln_gamma2 max diff: {max_diff_gamma2:.6f}" + + except ImportError as e: + print(f"✗ 导入失败: {e}") + return False, f"导入失败: {e}" + except Exception as e: + print(f"✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return False, str(e) + + +# ============================================================================ +# Graph Utils 精度对齐测试 +# ============================================================================ + +def test_mean_nodes_alignment(): + """测试 mean_nodes 函数精度对齐 (Paddle vs DGL) + + 验证 PaddleMaterials 的 mean_nodes 实现与 DGL 的 dgl.mean_nodes 输出一致 + """ + print("\n" + "=" * 80) + print("测试 mean_nodes 精度对齐 (Paddle vs DGL)") + print("=" * 80) + + try: + import pgl + from ppmat.models.gdinn.utils.graph_utils import mean_nodes + + # 测试参数 + batch_size = 4 + feat_dim = 64 + num_nodes_list = [10, 15, 8, 12] # 每个图的节点数不同 + + # 设置随机种子 + set_random_seed(42) + + # 创建 Paddle 图列表 + paddle_graphs = [] + for i, num_nodes in enumerate(num_nodes_list): + # 创建随机边 + num_edges = num_nodes * 3 + src = paddle.randint(0, num_nodes, [num_edges]) + dst = paddle.randint(0, num_nodes, [num_edges]) + + # 创建随机节点特征 + node_feat = paddle.randn([num_nodes, feat_dim]) + + # 创建 pgl.Graph + edges = list(zip(src.tolist(), dst.tolist())) + g = pgl.Graph( + num_nodes=num_nodes, + edges=edges, + node_feat={'h': node_feat} + ) + paddle_graphs.append(g) + + # 批处理 Paddle 图 + paddle_batched = pgl.Graph.batch(paddle_graphs) + + # 计算 Paddle mean_nodes + paddle_result = mean_nodes(paddle_batched, 'h') + + print(f" Paddle mean_nodes 输出形状: {paddle_result.shape}") + + # 创建对应的 DGL 图列表 + dgl_graphs = [] + for i, num_nodes in enumerate(num_nodes_list): + # 使用相同的边 + # pgl.Graph.edges is a tensor of shape [num_edges, 2] + edges_tensor = paddle_graphs[i].edges + if hasattr(edges_tensor, 'numpy'): + src_np = edges_tensor[:, 0].numpy() + dst_np = edges_tensor[:, 1].numpy() + else: + src_np = np.asarray(edges_tensor[:, 0]) + dst_np = np.asarray(edges_tensor[:, 1]) + + # 创建 DGL 图 + g = dgl.graph((src_np, dst_np), num_nodes=num_nodes) + + # 使用相同的节点特征 + h_feat = paddle_graphs[i].node_feat['h'] + node_feat_np = h_feat.numpy() if hasattr(h_feat, 'numpy') else np.asarray(h_feat) + g.ndata['h'] = torch.from_numpy(node_feat_np) + + dgl_graphs.append(g) + + # 批处理 DGL 图 + dgl_batched = dgl.batch(dgl_graphs) + + # 计算 DGL mean_nodes + dgl_result = dgl.mean_nodes(dgl_batched, 'h') + + print(f" DGL mean_nodes 输出形状: {dgl_result.shape}") + + # 转换为 numpy 进行比较 + paddle_np = paddle_result.numpy() + dgl_np = dgl_result.cpu().numpy() + + # 计算差异 + diff = np.abs(paddle_np - dgl_np) + max_diff = np.max(diff) + mean_diff = np.mean(diff) + + print(f" 最大差异: {max_diff:.10f}") + print(f" 平均差异: {mean_diff:.10f}") + + # 验证形状 (Paddle uses list, DGL uses torch.Size) + assert list(paddle_result.shape) == list(dgl_result.shape), \ + f"形状不匹配: Paddle {paddle_result.shape} vs DGL {dgl_result.shape}" + + # 验证精度 + tolerance = 1e-6 + passed = max_diff < tolerance + + if passed: + print(f" ✓ 精度对齐 (差异 < {tolerance})") + return True, "精度对齐" + else: + print(f" ✗ 精度未对齐 (差异 {max_diff:.10f} >= {tolerance})") + return False, f"最大差异: {max_diff:.10f}" + + except ImportError as e: + print(f"✗ 导入失败: {e}") + return False, f"导入失败: {e}" + except Exception as e: + print(f"✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return False, str(e) + + +# ============================================================================ +# HB 特征测试 +# ============================================================================ + +def test_hb_features(): + """测试氢键特征计算是否与原始 GDI-NN 一致 + + 原始 GDI-NN 实现 (generate_dataset_for_training.py): + - solvent_data[solvent_id] = [graph, hba, hbd, min(hba, hbd)] + - intra_hb1 = solv1[3] # min(hba, hbd) + - intra_hb2 = solv2[3] # min(hba, hbd) + - inter_hb = min(solv1[1], solv2[2]) + min(solv1[2], solv2[1]) + = min(hba1, hbd2) + min(hbd1, hba2) + """ + print("\n" + "=" * 80) + print("测试氢键特征计算 (与原始 GDI-NN 对齐)") + print("=" * 80) + + try: + from ppmat.datasets import BinaryActivityDataset + from rdkit import Chem + from rdkit.Chem import rdMolDescriptors + + # 创建数据集 + dataset = BinaryActivityDataset( + path=config.binary_data_path, + solvent_list_path=config.solvent_list_path, + add_self_loop=True, + preload_graphs=False + ) + + # 获取一个样本 + sample = dataset[0] + + # 验证 HB 特征存在 + assert 'intra_hb1' in sample, "缺少 intra_hb1 特征" + assert 'intra_hb2' in sample, "缺少 intra_hb2 特征" + assert 'inter_hb' in sample, "缺少 inter_hb 特征" + + print(f"✓ HB 特征字段存在") + + # 验证 HB 特征的计算值 + # 从溶剂列表获取 SMILES + solvent_df = pd.read_csv(config.solvent_list_path) + + # 获取样本的溶剂 ID 和 SMILES + solv1_id = sample['solv1_id'] + solv2_id = sample['solv2_id'] + + solv1_row = solvent_df[solvent_df['solvent_id'] == solv1_id].iloc[0] + solv2_row = solvent_df[solvent_df['solvent_id'] == solv2_id].iloc[0] + + smiles1 = solv1_row['smiles_can'] + smiles2 = solv2_row['smiles_can'] + + # 使用 RDKit 计算 HBA 和 HBD (与原始 GDI-NN 一致) + mol1 = Chem.MolFromSmiles(smiles1) + mol2 = Chem.MolFromSmiles(smiles2) + + hba1 = rdMolDescriptors.CalcNumHBA(mol1) + hbd1 = rdMolDescriptors.CalcNumHBD(mol1) + hba2 = rdMolDescriptors.CalcNumHBA(mol2) + hbd2 = rdMolDescriptors.CalcNumHBD(mol2) + + # 计算期望值 (原始 GDI-NN 公式) + expected_intra_hb1 = min(hba1, hbd1) + expected_intra_hb2 = min(hba2, hbd2) + expected_inter_hb = min(hba1, hbd2) + min(hbd1, hba2) + + # 获取实际值 + actual_intra_hb1 = sample['intra_hb1'] + actual_intra_hb2 = sample['intra_hb2'] + actual_inter_hb = sample['inter_hb'] + + print(f"\n溶剂 1: {solv1_id}") + print(f" SMILES: {smiles1}") + print(f" HBA: {hba1}, HBD: {hbd1}") + print(f" intra_hb1: expected={expected_intra_hb1}, actual={actual_intra_hb1}") + + print(f"\n溶剂 2: {solv2_id}") + print(f" SMILES: {smiles2}") + print(f" HBA: {hba2}, HBD: {hbd2}") + print(f" intra_hb2: expected={expected_intra_hb2}, actual={actual_intra_hb2}") + + print(f"\n交互氢键:") + print(f" inter_hb = min({hba1}, {hbd2}) + min({hbd1}, {hba2})") + print(f" = {min(hba1, hbd2)} + {min(hbd1, hba2)}") + print(f" = {expected_inter_hb}") + print(f" actual: {actual_inter_hb}") + + # 验证值是否匹配 + assert actual_intra_hb1 == expected_intra_hb1, \ + f"intra_hb1 不匹配: expected={expected_intra_hb1}, actual={actual_intra_hb1}" + assert actual_intra_hb2 == expected_intra_hb2, \ + f"intra_hb2 不匹配: expected={expected_intra_hb2}, actual={actual_intra_hb2}" + assert actual_inter_hb == expected_inter_hb, \ + f"inter_hb 不匹配: expected={expected_inter_hb}, actual={actual_inter_hb}" + + print(f"\n✓ HB 特征计算与原始 GDI-NN 一致") + + # 测试溶剂缓存机制 + print(f"\n测试溶剂缓存机制...") + + # 检查 solvent_data 缓存 + assert hasattr(dataset, 'solvent_data'), "数据集缺少 solvent_data 属性" + + # 验证缓存格式: [graph, hba, hbd, intra_hb] + if solv1_id in dataset.solvent_data: + cached = dataset.solvent_data[solv1_id] + assert len(cached) == 4, f"缓存格式错误: 期望 4 个元素,实际 {len(cached)} 个" + assert cached[1] == hba1, f"缓存 HBA 不匹配" + assert cached[2] == hbd1, f"缓存 HBD 不匹配" + assert cached[3] == min(hba1, hbd1), f"缓存 intra_hb 不匹配" + print(f" ✓ 溶剂缓存格式正确: [graph, hba={cached[1]}, hbd={cached[2]}, intra_hb={cached[3]}]") + + return True, "HB 特征测试通过" + + except AssertionError as e: + print(f"\n✗ 断言失败: {e}") + return False, str(e) + except Exception as e: + print(f"\n✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return False, str(e) + + +# ============================================================================ +# 主函数 +# ============================================================================ + +def main(): + """主函数""" + print("=" * 80) + print("GDI-NN 精度对齐测试 (使用真实数据)") + print("=" * 80) + + # 设置随机种子 + set_random_seed(42) + + # 创建输出目录 + os.makedirs(config.OUTPUT_DIR, exist_ok=True) + + # 运行测试 + results = {} + + # mean_nodes 精度对齐测试 + results['mean_nodes_精度对齐'] = test_mean_nodes_alignment() + + # HB 特征测试 (验证数据集实现与原始 GDI-NN 一致) + results['HB_特征计算'] = test_hb_features() + + # GNN 精度对齐测试 + results['GNN_精度对齐'] = test_gnn_alignment() + + # MCM 精度对齐测试 + results['MCM_精度对齐'] = test_mcm_alignment() + + # 总结 + print("\n" + "=" * 80) + print("测试总结") + print("=" * 80) + + passed = 0 + failed = 0 + + for test_name, (result, message) in results.items(): + status = "✓ 通过" if result else "✗ 失败" + print(f"{test_name}: {status} ({message})") + if result: + passed += 1 + else: + failed += 1 + + print(f"\n总计: {passed}/{len(results)} 测试通过") + print("=" * 80) + + if failed == 0: + print("✓ 所有测试通过!") + return 0 + else: + print(f"✗ {failed} 个测试失败") + return 1 + + +if __name__ == "__main__": + sys.exit(main())