开发环境说明

  • 课程平台:Mo

1 中医辨证系统实现

系统提示词设计

为确保大语言模型在中医辨证任务中输出规范、可靠且符合临床标准的结果,基于中医辨证理论与已有慢性淋巴细胞白血病(CLL)中医证候数据库,设计了结构化、强约束的系统提示词(system prompt)。该提示词明确限定输出范围、格式及判断依据,有效抑制模型的自由发挥,提升临床适用性。

查看源码
system_prompt = """
你是一位经验丰富的中医专家,请根据患者症状描述,结合慢性淋巴细胞白血病(CLL)的中医辨证规律,判断以下两项:

1. **证型**:仅限以下五种之一,依据症状、舌象、脉象综合判断:
   - 痰湿内蕴
   - 脾虚痰湿
   - 气阴两虚
   - 痰瘀互结
   - 痰湿内蕴兼气虚发热

2. **治法**:必须与所选证型严格对应,采用标准中医治法表述。

辨证要点参考:
- **痰湿内蕴**:痰涎多、口中黏、胸闷腹胀、舌胖齿痕、苔白或淡黄润、脉弦或滑。
- **脾虚痰湿**:乏力、气短、纳差、脾大、淋巴结肿大、舌胖齿痕、脉细弱或滑细。
- **气阴两虚**:口干、手足心热、腰膝酸软、失眠、舌红有裂纹、苔少或黄白相兼、脉细数。
- **痰瘀互结**:淋巴结质硬成团、固定不移、舌暗或有瘀点、脉滑。
- **痰湿内蕴兼气虚发热**:具备痰湿主症,兼见口干口苦、脉滑数等郁热或虚热征象。

请严格依据上述标准,输出仅包含“证型”和“治法”的JSON,格式为:
{"证型": "", "治法": ""}
不得添加解释、推测或额外字段。
"""

多模型初步测试结果

在多个主流大语言模型上进行了初步测试,评估其在相同提示词与测试集下的辨证准确率。

模型名称测试得分(%)
qwen3-32b89.24
ernie-4.5-turbo-128k89.79
qwen3-8b90.38
ernie-4.5-21b-a3b89.08
ernie-4.5-0.3b83.40
kimi-k2-instruct91.24

初步结果显示,kimi-k2-instruct 在当前测试集上表现最优,准确率达到 91.24%

模型选择与实验结果

在综合考虑模型性能、推理效率与平台兼容性后,最终选定 kimi-k2-instruct 作为本系统的推理模型。

最终评估结果如下图所示:

中医辨证系统最终测试结果

2 垃圾分类

本项目基于预训练的 MobileNetV2 和一个包含 26 类垃圾、每类 100 张图像的垃圾分类数据集,进行迁移学习。

初步尝试:MobileNetV2 迁移学习

原始代码使用 MindSpore 2.3.1,不支持 GPU,训练速度慢。因此将整个训练流程迁移到 PyTorch,并修复关键问题:

  • 原配置中 weight_decay=3 过大,导致模型无法收敛(测试分数仅“小几十”);
  • 调整为 weight_decay=0.0001 后,测试准确率提升至 80+;
  • 通过合理设置 epochslr_max、余弦退火学习率调度、全量微调,最终达到 94.62%(123/130)

PyTorch 版 MobileNetV2 训练核心代码

查看源码
# dataset.py 中的数据加载器
"""
Create train or eval dataset using PyTorch.
"""
import os
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch

def create_dataset(config, training=True, buffer_size=None, repeat=1):
    """
    Create a train or eval dataset using PyTorch.

    Args:
        config (object): Config object with attributes:
            - dataset_path (str): root path to dataset (should contain 'train' and 'val' subdirs)
            - image_height (int): target image height
            - image_width (int): target image width
            - batch_size (int): training batch size
            - eval_batch_size (int): evaluation batch size
            - class_index (dict, optional): not directly used in PyTorch ImageFolder (ignored)
        training (bool): if True, create training dataset with augmentations; else, eval dataset
        buffer_size (int): ignored in PyTorch (shuffle is handled by DataLoader)
        repeat (int): number of dataset repetitions (handled via DataLoader or custom sampler if needed)

    Returns:
        torch.utils.data.DataLoader: dataset loader
    """
    data_path = os.path.join(config.dataset_path, 'train' if training else 'val')

    # Common normalization (note: PyTorch uses [0,1] or [0,255] depending on transform; here we assume ToTensor() gives [0,1])
    normalize = transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )

    if training:
        # Training transforms: RandomResizedCrop + HorizontalFlip + ColorJitter
        train_transform = transforms.Compose([
            transforms.RandomResizedCrop(
                (config.image_height, config.image_width),
                scale=(0.08, 1.0),
                ratio=(0.75, 1.333)
            ),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
            transforms.ToTensor(),
            normalize
        ])

        dataset = datasets.ImageFolder(root=data_path, transform=train_transform)
        dataloader = DataLoader(
            dataset,
            batch_size=config.batch_size,
            shuffle=True,
            drop_last=True,
            num_workers=4,
            pin_memory=True
        )

        # Handle repeat by repeating the dataloader in training loop if needed (PyTorch doesn't have .repeat())
        # Here we just return the loader; repeat logic should be handled externally if necessary.
        return dataloader

    else:
        # Eval transforms: Resize to larger size (width / 0.875), then center crop to image_width
        # Note: original code uses resize_width for both height & width in eval
        eval_size = int(config.image_width / 0.875)
        eval_transform = transforms.Compose([
            transforms.Resize(eval_size),
            transforms.CenterCrop(config.image_width),
            transforms.ToTensor(),
            normalize
        ])

        dataset = datasets.ImageFolder(root=data_path, transform=eval_transform)
        dataloader = DataLoader(
            dataset,
            batch_size=config.eval_batch_size,
            shuffle=False,
            drop_last=True,
            num_workers=4,
            pin_memory=True
        )

        return dataloader

# model_v2.py 实现了 MobileNetV2
from torch import nn
import torch

def _make_divisible(ch, divisor=8, min_ch=None):
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    """
    if min_ch is None:
        min_ch = divisor
    new_ch = max(min_ch, int(ch + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_ch < 0.9 * ch:
        new_ch += divisor
    return new_ch


class ConvBNReLU(nn.Sequential):
    def __init__(self, in_channel, out_channel, kernel_size=3, stride=1, groups=1):
        padding = (kernel_size - 1) // 2
        super(ConvBNReLU, self).__init__(
            nn.Conv2d(in_channel, out_channel, kernel_size, stride, padding, groups=groups, bias=False),
            nn.BatchNorm2d(out_channel),
            nn.ReLU6(inplace=True)
        )


class InvertedResidual(nn.Module):
    def __init__(self, in_channel, out_channel, stride, expand_ratio):
        super(InvertedResidual, self).__init__()
        hidden_channel = in_channel * expand_ratio
        self.use_shortcut = stride == 1 and in_channel == out_channel

        layers = []
        if expand_ratio != 1:
            # 1x1 pointwise conv
            layers.append(ConvBNReLU(in_channel, hidden_channel, kernel_size=1))
        layers.extend([
            # 3x3 depthwise conv
            ConvBNReLU(hidden_channel, hidden_channel, stride=stride, groups=hidden_channel),
            # 1x1 pointwise conv(linear)
            nn.Conv2d(hidden_channel, out_channel, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channel),
        ])

        self.conv = nn.Sequential(*layers)

    def forward(self, x):
        if self.use_shortcut:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileNetV2(nn.Module):
    def __init__(self, num_classes=1000, alpha=1.0, round_nearest=8):
        super(MobileNetV2, self).__init__()
        block = InvertedResidual
        input_channel = _make_divisible(32 * alpha, round_nearest)
        last_channel = _make_divisible(1280 * alpha, round_nearest)

        inverted_residual_setting = [
            # t, c, n, s
            [1, 16, 1, 1],
            [6, 24, 2, 2],
            [6, 32, 3, 2],
            [6, 64, 4, 2],
            [6, 96, 3, 1],
            [6, 160, 3, 2],
            [6, 320, 1, 1],
        ]

        features = []
        # conv1 layer
        features.append(ConvBNReLU(3, input_channel, stride=2))
        # building inverted residual residual blockes
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c * alpha, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t))
                input_channel = output_channel
        # building last several layers
        features.append(ConvBNReLU(input_channel, last_channel, 1))
        # combine feature layers
        self.features = nn.Sequential(*features)

        # building classifier
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(last_channel, num_classes)
        )

        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# train_main.py 主训练逻辑
import os
import math
import time
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.optim.lr_scheduler import LambdaLR

# 自定义模块
from dataset import create_dataset
from model_v2 import MobileNetV2

# 垃圾分类数据集标签,以及用于标签映射的字典。
index = {'00_00': 0, '00_01': 1, '00_02': 2, '00_03': 3, '00_04': 4, '00_05': 5, '00_06': 6, '00_07': 7,
         '00_08': 8, '00_09': 9, '01_00': 10, '01_01': 11, '01_02': 12, '01_03': 13, '01_04': 14,
         '01_05': 15, '01_06': 16, '01_07': 17, '02_00': 18, '02_01': 19, '02_02': 20, '02_03': 21,
         '03_00': 22, '03_01': 23, '03_02': 24, '03_03': 25}
inverted = {0: 'Plastic Bottle', 1: 'Hats', 2: 'Newspaper', 3: 'Cans', 4: 'Glassware', 5: 'Glass Bottle',
            6: 'Cardboard', 7: 'Basketball',
            8: 'Paper', 9: 'Metalware', 10: 'Disposable Chopsticks', 11: 'Lighter', 12: 'Broom', 13: 'Old Mirror',
            14: 'Toothbrush',
            15: 'Dirty Cloth', 16: 'Seashell', 17: 'Ceramic Bowl', 18: 'Paint bucket', 19: 'Battery',
            20: 'Fluorescent lamp', 21: 'Tablet capsules',
            22: 'Orange Peel', 23: 'Vegetable Leaf', 24: 'Eggshell', 25: 'Banana Peel'}


# 配置参数(使用 EasyDict 风格,但这里简化为 dict)
class Config:
    def __init__(self):
            # 获取当前脚本的绝对路径
        current_script_dir = os.path.dirname(os.path.abspath(__file__))  # /home/jovyan/work/src_pytorch
        project_root = os.path.dirname(current_script_dir)               # /home/jovyan/work

        self.num_classes = 26
        self.image_height = 224
        self.image_width = 224
        self.batch_size = 24
        self.eval_batch_size = 10
        self.epochs = 50
        self.lr_max = 0.008
        self.momentum = 0.8
        self.weight_decay = 0.0001
        self.dataset_path = os.path.join(project_root, "datasets", "5fbdf571c06d3433df85ac65-momodel", "garbage_26x100")
        self.pretrained_ckpt = os.path.join(current_script_dir, "mobilenet_v2-b0353104.pth")
        self.save_model_path = os.path.join(project_root, "results/torch/")


config = Config()


def build_lr_lambda(total_steps, lr_max, warmup_steps=0, decay_type='cosine'):
    def lr_lambda(step):
        if step < warmup_steps:
            return (step + 1) / warmup_steps
        else:
            if decay_type == 'cosine':
                progress = (step - warmup_steps) / (total_steps - warmup_steps)
                return 0.5 * (1 + math.cos(math.pi * progress))
            else:
                return 1.0

    return lr_lambda


def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Training on {'GPU' if torch.cuda.is_available() else 'CPU'}")

    data_path = os.path.join(config.dataset_path, 'train')

    # 创建数据加载器
    train_loader = create_dataset(config, training=True)

    # 构建模型
    model = MobileNetV2(num_classes=config.num_classes, alpha=1.0)

    # 加载预训练权重(ImageNet)
    if os.path.exists(config.pretrained_ckpt):
        pretrained_dict = torch.load(config.pretrained_ckpt, map_location='cpu')
        model_dict = model.state_dict()
        # 过滤掉分类层(因为 num_classes 不同)
        pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict and 'classifier.1' not in k}
        model_dict.update(pretrained_dict)
        model.load_state_dict(model_dict)
        print("Loaded pre-trained MobileNetV2 weights (excluding classifier).")

    model.to(device)

    # 冻结 backbone(可选:根据需求决定是否冻结)
    # for param in model.features.parameters():
    #     param.requires_grad = False

    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = SGD(
        model.parameters(),
        lr=config.lr_max,
        momentum=config.momentum,
        weight_decay=config.weight_decay
    )

    # 学习率调度器
    total_steps = len(train_loader) * config.epochs
    lr_lambda = build_lr_lambda(total_steps, config.lr_max, warmup_steps=0, decay_type='cosine')
    scheduler = LambdaLR(optimizer, lr_lambda)

    # 训练循环
    model.train()
    for epoch in range(config.epochs):
        epoch_start = time.time()
        running_loss = 0.0
        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            scheduler.step()

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)
        epoch_time = time.time() - epoch_start
        print(f"Epoch [{epoch + 1}/{config.epochs}], Time: {epoch_time:.2f}s, Avg Loss: {epoch_loss:.4f}")

    # 保存微调后的模型到 results/
    os.makedirs("../results", exist_ok=True)
    timestamp = time.strftime("%m%d_%H%M", time.localtime())  # e.g., 1226_1430
    save_path = os.path.join(config.save_model_dir, f"{timestamp}.pth")
    torch.save(model.state_dict(), save_path)
    print(f"Model saved to {save_path}")


if __name__ == "__main__":
    main()

注:model_v2 参考自B站教程1

进阶方案:EfficientNet-B0

考虑到 MobileNetV2 已较陈旧,参考 SOTA 图像分类榜单2 与 ImageNet Leaderboard3,在模型精度与参数量之间权衡后,选用 EfficientNet-B0

PyTorch 版 EfficientNet-B0 训练核心代码

查看源码
import os
import math
import time
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.optim.lr_scheduler import LambdaLR
from torchvision.models import efficientnet_b0  # ✅ 使用官方 EfficientNet-B0

# 自定义模块(仅用于数据加载)
from dataset import create_dataset

# 垃圾分类数据集标签(保持不变)
index = {'00_00': 0, '00_01': 1, '00_02': 2, '00_03': 3, '00_04': 4, '00_05': 5, '00_06': 6, '00_07': 7,
         '00_08': 8, '00_09': 9, '01_00': 10, '01_01': 11, '01_02': 12, '01_03': 13, '01_04': 14,
         '01_05': 15, '01_06': 16, '01_07': 17, '02_00': 18, '02_01': 19, '02_02': 20, '02_03': 21,
         '03_00': 22, '03_01': 23, '03_02': 24, '03_03': 25}
inverted = {0: 'Plastic Bottle', 1: 'Hats', 2: 'Newspaper', 3: 'Cans', 4: 'Glassware', 5: 'Glass Bottle',
            6: 'Cardboard', 7: 'Basketball',
            8: 'Paper', 9: 'Metalware', 10: 'Disposable Chopsticks', 11: 'Lighter', 12: 'Broom', 13: 'Old Mirror',
            14: 'Toothbrush',
            15: 'Dirty Cloth', 16: 'Seashell', 17: 'Ceramic Bowl', 18: 'Paint bucket', 19: 'Battery',
            20: 'Fluorescent lamp', 21: 'Tablet capsules',
            22: 'Orange Peel', 23: 'Vegetable Leaf', 24: 'Eggshell', 25: 'Banana Peel'}


class Config:
    def __init__(self):
        current_script_dir = os.path.dirname(os.path.abspath(__file__))
        project_root = os.path.dirname(current_script_dir)

        self.num_classes = 26
        self.image_height = 224
        self.image_width = 224
        self.batch_size = 64
        self.eval_batch_size = 10
        self.epochs = 100
        self.lr_max = 0.003
        self.momentum = 0.9
        self.weight_decay = 0.0001
        self.dataset_path = os.path.join(project_root, "datasets", "5fbdf571c06d3433df85ac65-momodel", "garbage_26x100")
        self.pretrained_ckpt = None  # ✅ EfficientNet 使用 torchvision 自带的预训练,无需本地 .pth
        self.save_model_path = os.path.join(project_root, "results/torch/")


config = Config()


def build_lr_lambda(total_steps, lr_max, warmup_steps=0, decay_type='cosine'):
    def lr_lambda(step):
        if step < warmup_steps:
            return (step + 1) / warmup_steps
        else:
            if decay_type == 'cosine':
                progress = (step - warmup_steps) / (total_steps - warmup_steps)
                return 0.5 * (1 + math.cos(math.pi * progress))
            else:
                return 1.0
    return lr_lambda


def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Training on {'GPU' if torch.cuda.is_available() else 'CPU'}")

    train_loader = create_dataset(config, training=True)

    # ✅ 构建 EfficientNet-B0 模型
    model = efficientnet_b0(weights='IMAGENET1K_V1')  # 自动下载 ImageNet 预训练权重
    # 替换分类器(原为 1000 类)
    model.classifier[1] = nn.Linear(model.classifier[1].in_features, config.num_classes)

    model.to(device)

    # 可选:冻结 backbone(这里不冻结,进行全模型微调)
    # for param in model.features.parameters():
    #     param.requires_grad = False

    criterion = nn.CrossEntropyLoss()
    optimizer = SGD(
        model.parameters(),
        lr=config.lr_max,
        momentum=config.momentum,
        weight_decay=config.weight_decay
    )

    total_steps = len(train_loader) * config.epochs
    # ✅ 增加 warmup(如 5 个 epoch)
    warmup_steps = len(train_loader) * 5
    lr_lambda = build_lr_lambda(total_steps, config.lr_max, warmup_steps=warmup_steps, decay_type='cosine')
    scheduler = LambdaLR(optimizer, lr_lambda)

    timestamp = time.strftime("%m%d_%H%M", time.localtime())
    os.makedirs(config.save_model_path, exist_ok=True)

    model.train()
    for epoch in range(config.epochs):
        epoch_start = time.time()
        running_loss = 0.0
        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            scheduler.step()
            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)
        epoch_time = time.time() - epoch_start
        print(f"Epoch [{epoch + 1}/{config.epochs}], Time: {epoch_time:.2f}s, Avg Loss: {epoch_loss:.4f}")

        # 每个 epoch 保存一次
        save_path = os.path.join(config.save_model_path, f"{timestamp}_{epoch + 1:03d}.pth")
        torch.save(model.state_dict(), save_path)
        print(f"Model saved to {save_path}")


if __name__ == "__main__":
    main()

改进点

  • 使用 torchvision.models.efficientnet_b0(weights='IMAGENET1K_V1') 自动加载 ImageNet 预训练权重;
  • 替换分类头为 26 类;
  • 增加 5 个 epoch 的学习率 warmup
  • 调整 batch_size、epochs、lr_max。

训练与测试结果

在第 80 个 epoch(1227_1824_080.pth)测试准确率达到 97.69%(127/130)

查看源码
# main.py 测试推理代码
import os
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.models import efficientnet_b0  # ✅ 使用 EfficientNet-B0
from PIL import Image
import numpy as np

# 垃圾分类类别(必须与训练一致)
CLASS_NAMES = [
    'Plastic Bottle', 'Hats', 'Newspaper', 'Cans', 'Glassware', 'Glass Bottle',
    'Cardboard', 'Basketball', 'Paper', 'Metalware', 'Disposable Chopsticks',
    'Lighter', 'Broom', 'Old Mirror', 'Toothbrush', 'Dirty Cloth',  # 修正 typo: 'To toothbrush' → 'Toothbrush'
    'Seashell', 'Ceramic Bowl', 'Paint bucket', 'Battery', 'Fluorescent lamp',
    'Tablet capsules', 'Orange Peel', 'Vegetable Leaf', 'Eggshell', 'Banana Peel'
]

NUM_CLASSES = len(CLASS_NAMES)

def predict(image):
    """
    加载微调后的 EfficientNet-B0 模型并对输入图像进行垃圾分类预测。
    :param image: PIL Image 或 np.ndarray (H, W, C), dtype=uint8
    :return: str, 预测的类别名(共26类之一)
    """
    # 转为 PIL Image
    if isinstance(image, np.ndarray):
        image = Image.fromarray(image.astype('uint8'), 'RGB')
    elif not isinstance(image, Image.Image):
        raise TypeError("Input must be a PIL Image or numpy.ndarray")

    # 图像预处理(与训练时一致:EfficientNet 使用与 MobileNet 相同的 ImageNet 预处理)
    eval_size = int(224 / 0.875)  # 256
    transform = transforms.Compose([
        transforms.Resize(eval_size),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # ✅ 初始化 EfficientNet-B0 模型结构
    model = efficientnet_b0(weights=None)  # 不加载预训练,因为我们用自己的微调权重
    # 替换分类头为 26 类
    model.classifier[1] = nn.Linear(model.classifier[1].in_features, NUM_CLASSES)

    # 加载微调权重
    model_path = "./results/torch/1227_1824_080.pth"
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file not found: {model_path}")
    
    state_dict = torch.load(model_path, map_location=device, weights_only=True)
    model.load_state_dict(state_dict)
    model.to(device)
    model.eval()

    # 推理
    input_tensor = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(input_tensor)
        pred_idx = torch.argmax(output, dim=1).item()

    return CLASS_NAMES[pred_idx]

注:此处未继续优化,但可通过更强模型(如 EfficientNet-B6、ConvNeXt)、更丰富数据增强(AutoAugment, Mixup)、更精细调参进一步提升性能。

垃圾分类最终测试结果

3 金融异常检测

本次金融异常检测实验围绕图神经网络(GNN)在欺诈交易识别任务中的应用。

基于 SAGEConv 的三层图神经网络

本实验使用 torch_geometric.nn.SAGEConv 实现 GraphSAGE 模型。GraphSAGE 通过聚合邻居节点信息来更新当前节点的表示,适用于大规模图结构数据。

模型结构如下:

查看网络结构
class SAGE(nn.Module):
    def __init__(self, in_feats, h_feats, num_classes):
        super(SAGE, self).__init__()
        self.conv1 = SAGEConv(in_feats, h_feats)
        self.conv2 = SAGEConv(h_feats, h_feats)
        self.conv3 = SAGEConv(h_feats, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x, edge_index):
        h = self.conv1(x, edge_index)
        h = F.relu(self.dropout(h))
        h = self.conv2(h, edge_index)
        h = F.relu(self.dropout(h))
        h = self.conv3(h, edge_index)
        return h

    def reset(self):
        self.conv1.reset_parameters()
        self.conv2.reset_parameters()
        self.conv3.reset_parameters()

  • 三层设计:每层使用 SAGEConv,隐藏层维度设为 128;
  • 激活与正则化:在每层后使用 ReLU 激活 + Dropout(0.5) 防止过拟合;
  • 输出层:最后一层直接输出 num_classes=2 维的 logits(未经过 softmax),用于后续损失计算;
  • 全图推理:训练完成后,对整个图的所有节点进行前向传播,保存 softmax 后的概率分布(见主函数末尾):
    y_pred_all = torch.softmax(out, dim=1).cpu()
    torch.save(y_pred_all, pred_save_dir)
    
    这确保了 predict 函数可直接通过节点 ID 快速查表获取预测结果。

超参数调整与收敛性监控

训练过程中使用了以下策略监控模型状态:

  • 优化器Adam,初始学习率 lr=0.01
  • 学习率调度ReduceLROnPlateau,当验证损失连续 6 轮未下降时,学习率减半;
  • 评估指标:每 5 轮打印训练/验证的 LossAUC

训练循环核心逻辑如下:

查看训练循环代码
for epoch in range(100):
    model.train()
    out = model(features, edge_index)
    loss = loss_fn(out[data.train_mask], labels[data.train_mask])

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # 验证
    model.eval()
    with torch.no_grad():
        losses = {}
        eval_results = {}
        for key in ['train', 'valid']:
            node_id = split_idx[key]
            y_pred = torch.softmax(out[node_id], dim=1)  # [N, 2]
            losses[key] = loss_fn(out[node_id], labels[node_id]).item()
            eval_results[key] = evaluator.eval(labels[node_id], y_pred)['auc']

        scheduler.step(losses['valid'])

        train_eval, valid_eval = eval_results['train'], eval_results['valid']
        train_loss, valid_loss = losses['train'], losses['valid']

        if valid_loss < min_valid_loss:
            min_valid_loss = valid_loss
            torch.save(model.state_dict(), save_path)

        if epoch % 5 == 0:
            print(f'Epoch: {epoch:02d}, '
                  f'Loss: {loss:.4f}, '
                  f'Train AUC: {100 * train_eval:.3f}, '
                  f'Valid AUC: {100 * valid_eval:.3f}')

通过观察 Valid AUC 是否持续上升且 Valid Loss 是否下降,可判断模型是否收敛;若训练 AUC 远高于验证 AUC,则可能过拟合。

针对类别不平衡的加权交叉熵损失

数据集统计显示:欺诈比例仅为 1.2655%,属于典型的高度不平衡分类问题。若使用普通交叉熵,模型会偏向预测多数类(正常交易),导致对欺诈样本识别能力极差。

为此,我们在损失函数中引入类别权重

查看加权损失实现
# 计算类别权重(反比于频率)
weight = torch.tensor([1.0, len(data.y[data.y == 0]) / len(data.y[data.y == 1])], dtype=torch.float)
weight = weight.to(device)
loss_fn = nn.CrossEntropyLoss(weight=weight)

核心代码与结果

查看核心代码
# train.py
import os
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch_geometric.transforms as T
from torch_geometric.nn import SAGEConv
from utils.dgraphfin import DGraphFin
from utils.utils import prepare_folder
from utils.evaluator import Evaluator

# --------------------------
# 路径与时间戳
# --------------------------
path = './datasets/632d74d4e2843a53167ee9a1-momodel/'
timestamp = time.strftime("%m%d-%H%M", time.localtime())
save_dir = f'./results/{timestamp}.pt'
pred_save_dir = f'./results/{timestamp}pred.pt'
os.makedirs('./results', exist_ok=True)

# --------------------------
# 设备设置
# --------------------------
device = 0
device = f'cuda:{device}' if torch.cuda.is_available() else 'cpu'
device = torch.device(device)

# --------------------------
# 数据加载与预处理
# --------------------------
dataset_name = 'DGraph'
dataset = DGraphFin(
    root=path,
    name=dataset_name,
    transform=T.ToSparseTensor(remove_edge_index=False)
)
data = dataset[0]

# 仅预测类0(正常)和类1(欺诈)
nlabels = 2

# 归一化节点特征
x = data.x
x = (x - x.mean(0)) / x.std(0)
data.x = x

# 确保标签为1维
if data.y.dim() == 2:
    data.y = data.y.squeeze(1)

# 划分掩码
split_idx = {
    'train': data.train_mask,
    'valid': data.valid_mask,
    'test': data.test_mask
}

print(data)
print(f"x shape: {data.x.shape}")
print(f"y shape: {data.y.shape}")
print(f"adj_t type: {type(data.adj_t)}")

# --------------------------
# 模型定义
# --------------------------
class SAGE(nn.Module):
    def __init__(self, in_feats, h_feats, num_classes):
        super(SAGE, self).__init__()
        self.conv1 = SAGEConv(in_feats, h_feats)
        self.conv2 = SAGEConv(h_feats, h_feats)
        self.conv3 = SAGEConv(h_feats, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x, edge_index):
        h = self.conv1(x, edge_index)
        h = F.relu(self.dropout(h))
        h = self.conv2(h, edge_index)
        h = F.relu(self.dropout(h))
        h = self.conv3(h, edge_index)
        return h

    def reset(self):
        self.conv1.reset_parameters()
        self.conv2.reset_parameters()
        self.conv3.reset_parameters()

# --------------------------
# 训练函数
# --------------------------
def train(data, model, save_path):
    model.reset()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', patience=6, factor=0.5, verbose=True
    )
    # 计算类别权重(反比于频率)
    weight = torch.tensor([1.0, len(data.y[data.y == 0]) / len(data.y[data.y == 1])], dtype=torch.float)
    weight = weight.to(device)
    
    loss_fn = nn.CrossEntropyLoss(weight=weight)
    min_valid_loss = float('inf')

    features = data.x.to(device)
    labels = data.y.to(device)
    edge_index = data.edge_index.to(device)
    evaluator = Evaluator('auc')

    for epoch in range(100):
        model.train()
        out = model(features, edge_index)
        loss = loss_fn(out[data.train_mask], labels[data.train_mask])

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 验证
        model.eval()
        with torch.no_grad():
            losses = {}
            eval_results = {}
            for key in ['train', 'valid']:
                node_id = split_idx[key]
                y_pred = torch.softmax(out[node_id], dim=1)  # [N, 2]
                losses[key] = loss_fn(out[node_id], labels[node_id]).item()
                eval_results[key] = evaluator.eval(labels[node_id], y_pred)['auc']

            scheduler.step(losses['valid'])

            train_eval, valid_eval = eval_results['train'], eval_results['valid']
            train_loss, valid_loss = losses['train'], losses['valid']

            if valid_loss < min_valid_loss:
                min_valid_loss = valid_loss
                torch.save(model.state_dict(), save_path)

            if epoch % 5 == 0:
                print(f'Epoch: {epoch:02d}, '
                      f'Loss: {loss:.4f}, '
                      f'Train AUC: {100 * train_eval:.3f}, '
                      f'Valid AUC: {100 * valid_eval:.3f}')

# --------------------------
# 执行训练并保存预测结果
# --------------------------
if __name__ == "__main__":
    model = SAGE(in_feats=data.x.size(-1), h_feats=128, num_classes=nlabels).to(device)
    train(data, model, save_dir)

    # 预测并保存所有节点的结果(用于后续 predict)
    model.load_state_dict(torch.load(save_dir, map_location=device))
    model.eval()
    with torch.no_grad():
        out = model(data.x.to(device), data.edge_index.to(device))
        y_pred_all = torch.softmax(out, dim=1).cpu()  # ✅ [N, 2] 概率
        torch.save(y_pred_all, pred_save_dir)
        
    print(f"Model saved to {save_dir}")
    print(f"Prediction saved to {pred_save_dir}")


# main.py
import torch

# 这里可以加载模型
pred = torch.load('./results/1228-0037pred.pt', map_location=torch.device('cpu'))
def predict(data,node_id):
    y_pred = pred[node_id]              # 根据索引快速访问结果
    return y_pred

最终评估结果如下图所示: 金融异常检测最终结果