去年接了个"烫手山芋"项目——几家银行想联合做征信,共享黑名单数据,但谁都不愿意把自己的数据明文给别人。法务部门的要求更是苛刻:数据不出行、可用不可见、过程可审计。当时我心想,这不是要上天吗?

经过大半年的摸索和踩坑,我们最终用同态加密、模型保护和联盟链技术搭建了一套多方隐私计算平台。今天把这段经历分享出来,希望对做隐私计算的同学有所帮助。

一、同态加密计算:在密文上做运算的黑魔法

需求背景

先说说最核心的需求。三家银行(暂且叫A、B、C银行)各自有一批信贷黑名单数据,想要计算三个关键指标:

  1. 某用户在多少家银行有不良记录
  2. 用户的综合风险评分
  3. 行业整体的违约率统计

但是,谁都不能看到其他银行的原始数据。这就是同态加密要解决的问题。

技术选型的纠结

一开始我们对比了好几种方案:

方案 计算能力 性能 实现难度 安全性 最终选择
安全多方计算(MPC) 通用
同态加密(HE) 受限 很慢 极高 极高
可信执行环境(TEE) 通用 部分场景✅
差分隐私 统计 辅助使用

最后选择了同态加密为主,TEE为辅的方案。主要是因为监管对数据安全要求极高,宁愿慢一点也要绝对安全。

同态加密的实现细节

我们用的是微软的SEAL库,支持BFV和CKKS两种方案。先看一个简单的例子:

// 初始化同态加密环境
#include "seal/seal.h"
using namespace seal;

class HomomorphicComputer {
private:
    EncryptionParameters parms;
    SEALContext context;
    KeyGenerator keygen;
    PublicKey public_key;
    SecretKey secret_key;
    Encryptor encryptor;
    Evaluator evaluator;
    Decryptor decryptor;
    
public:
    HomomorphicComputer() : parms(scheme_type::bfv) {
        // 设置多项式模数度
        size_t poly_modulus_degree = 4096;
        parms.set_poly_modulus_degree(poly_modulus_degree);
        
        // 设置系数模数
        parms.set_coeff_modulus(CoeffModulus::BFVDefault(poly_modulus_degree));
        
        // 设置明文模数
        parms.set_plain_modulus(1024);
        
        // 创建context
        context = SEALContext(parms);
        
        // 生成密钥
        keygen = KeyGenerator(context);
        keygen.create_public_key(public_key);
        secret_key = keygen.secret_key();
        
        // 初始化加密器、计算器、解密器
        encryptor = Encryptor(context, public_key);
        evaluator = Evaluator(context);
        decryptor = Decryptor(context, secret_key);
    }
    
    // 加密整数
    Ciphertext encrypt(int value) {
        Plaintext plain(to_string(value));
        Ciphertext encrypted;
        encryptor.encrypt(plain, encrypted);
        return encrypted;
    }
    
    // 同态加法
    Ciphertext add(const Ciphertext &a, const Ciphertext &b) {
        Ciphertext result;
        evaluator.add(a, b, result);
        return result;
    }
    
    // 同态乘法
    Ciphertext multiply(const Ciphertext &a, const Ciphertext &b) {
        Ciphertext result;
        evaluator.multiply(a, b, result);
        evaluator.relinearize_inplace(result, keygen.create_relin_keys());
        return result;
    }
};

但实际业务比这复杂得多。比如计算用户的综合风险评分:

# 风险评分计算(简化版)
class PrivacyRiskScoring:
    def __init__(self, he_context):
        self.he = he_context
        self.weights = {
            'overdue_count': 0.3,
            'overdue_amount': 0.4,
            'query_frequency': 0.2,
            'account_age': 0.1
        }
    
    def compute_risk_score(self, encrypted_features):
        """在密文上计算风险评分"""
        # 所有银行的数据都是加密的
        bank_a_data = encrypted_features['bank_a']
        bank_b_data = encrypted_features['bank_b']
        bank_c_data = encrypted_features['bank_c']
        
        # 同态计算:逾期次数总和
        total_overdue = self.he.add(
            self.he.add(bank_a_data['overdue_count'], 
                       bank_b_data['overdue_count']),
            bank_c_data['overdue_count']
        )
        
        # 同态计算:加权评分
        score = self.he.multiply_plain(total_overdue, 
                                     int(self.weights['overdue_count'] * 1000))
        
        # 注意:这里的score仍然是密文!
        return score

性能优化的血泪史

刚开始跑的时候,计算1000个用户的风险评分要花2个小时!这谁能接受?于是开始了漫长的优化之旅:

优化措施 实施前 实施后 提升 代价
批处理(Batching) 7200s 1800s 75% 内存增加4倍
参数调优 1800s 900s 50% 安全性略降
GPU加速 900s 180s 80% 需要GPU服务器
混合计算 180s 45s 75% 架构复杂

最关键的优化是批处理,把多个数据打包在一起计算:

// 批处理优化
class BatchedHomomorphic {
    BatchEncoder batch_encoder;
    
    Ciphertext encrypt_batch(const vector<int64_t> &values) {
        Plaintext plain;
        batch_encoder.encode(values, plain);
        
        Ciphertext encrypted;
        encryptor.encrypt(plain, encrypted);
        return encrypted;
    }
    
    // 一次计算多个用户的评分
    Ciphertext compute_batch_scores(const vector<Ciphertext> &features) {
        size_t slot_count = batch_encoder.slot_count();
        // 并行计算slot_count个用户
        // ...
    }
};

二、模型逆向工程防护:保护我们的"独门秘方"

模型泄露的风险

在做风控模型时,我们遇到了另一个问题:每家银行都有自己的风控模型,这是核心竞争力。但在联合建模时,模型参数可能被其他参与方逆向推导出来。

我们做了个实验,仅通过1万次API调用,就能以85%的准确率复现出对方的决策树模型。这太可怕了!

防护方案设计

经过研究,我们设计了多层防护体系:

class ModelProtectionFramework:
    def __init__(self, original_model):
        self.model = original_model
        self.noise_generator = DifferentialPrivacy(epsilon=0.1)
        self.query_monitor = QueryMonitor()
        self.watermark = ModelWatermark()
        
    def protected_predict(self, input_data, user_id):
        # 1. 查询频率检测
        if self.query_monitor.is_suspicious(user_id):
            return self._random_response()
        
        # 2. 输入扰动
        perturbed_input = self._add_input_noise(input_data)
        
        # 3. 模型预测
        raw_output = self.model.predict(perturbed_input)
        
        # 4. 输出扰动
        noisy_output = self.noise_generator.add_noise(raw_output)
        
        # 5. 嵌入水印
        watermarked_output = self.watermark.embed(noisy_output, user_id)
        
        # 6. 记录查询
        self.query_monitor.log_query(user_id, input_data, watermarked_output)
        
        return watermarked_output
    
    def _add_input_noise(self, data):
        """添加对抗性扰动,不影响正常预测但能干扰逆向"""
        epsilon = 0.01
        noise = np.random.laplace(0, epsilon, data.shape)
        return data + noise

查询模式检测

最有意思的是查询模式检测。我们发现正常用户和恶意逆向的查询模式差异很大:

行为特征 正常用户 逆向工程攻击 检测阈值
查询频率 10-50次/天 >1000次/天 100次/天
输入分布 符合业务分布 均匀/网格分布 KL散度>0.5
时间模式 工作时间 24小时持续 夜间查询>30%
相似查询 <5% >40% 20%
class QueryMonitor:
    def __init__(self, window_size=3600):  # 1小时窗口
        self.query_history = defaultdict(list)
        self.window_size = window_size
        
    def is_suspicious(self, user_id):
        history = self.query_history[user_id]
        
        # 频率检测
        recent_queries = self._get_recent_queries(history)
        if len(recent_queries) > 100:
            return True
        
        # 分布检测
        if self._check_distribution_anomaly(recent_queries):
            return True
        
        # 相似性检测
        if self._check_similarity(recent_queries) > 0.4:
            return True
            
        return False
    
    def _check_distribution_anomaly(self, queries):
        """检测输入分布是否异常"""
        if len(queries) < 20:
            return False
            
        # 提取所有查询的特征
        features = [q['features'] for q in queries]
        
        # 计算特征的分布
        hist, edges = np.histogramdd(features, bins=10)
        hist = hist.flatten() + 1e-10
        hist = hist / hist.sum()
        
        # 均匀分布的熵
        uniform = np.ones_like(hist) / len(hist)
        
        # KL散度
        kl_div = np.sum(hist * np.log(hist / uniform))
        
        return kl_div < 0.5  # 太接近均匀分布

三、隐私计算联盟链:让一切都可审计

为什么需要联盟链

光有加密计算还不够,监管要求所有的计算过程都要可审计、可追溯。而且参与方之间需要一个去中心化的协作机制。于是我们搭建了一条隐私计算联盟链。

链上链下协同架构

┌─────────────────────────────────────────────────┐
│                    应用层                        │
│  风控决策系统 │ 征信查询API │ 监管审计接口      │
└─────────────────┬───────────────────────────────┘
                  │
┌─────────────────┴───────────────────────────────┐
│                 链上(联盟链)                     │
│  · 任务发布与认领                               │
│  · 计算证明存储                                 │
│  · 结果哈希记录                                 │
│  · 激励结算                                     │
└─────────────────┬───────────────────────────────┘
                  │
┌─────────────────┴───────────────────────────────┐
│                 链下(隐私计算)                   │
│  · 同态加密计算                                 │
│  · 安全多方计算                                 │
│  · TEE可信计算                                  │
└─────────────────────────────────────────────────┘

智能合约设计

核心合约负责任务调度和结果验证:

pragma solidity ^0.8.0;

contract PrivacyComputingPlatform {
    struct ComputeTask {
        uint256 taskId;
        address requester;
        string taskType;  // "risk_score", "blacklist_check", etc
        bytes32 inputHash;  // 输入数据的哈希
        uint256 deadline;
        uint256 reward;
        TaskStatus status;
        mapping(address => bytes32) participantCommitments;
        mapping(address => bool) hasSubmitted;
        bytes32 finalResult;
    }
    
    enum TaskStatus { Created, InProgress, Completed, Disputed }
    
    mapping(uint256 => ComputeTask) public tasks;
    mapping(address => bool) public authorizedNodes;
    mapping(address => uint256) public nodeReputation;
    
    event TaskCreated(uint256 taskId, address requester, string taskType);
    event ResultSubmitted(uint256 taskId, address node, bytes32 resultHash);
    event TaskCompleted(uint256 taskId, bytes32 finalResult);
    
    modifier onlyAuthorized() {
        require(authorizedNodes[msg.sender], "Not authorized");
        _;
    }
    
    function createTask(
        string memory taskType,
        bytes32 inputHash,
        uint256 deadline
    ) public payable returns (uint256) {
        require(msg.value > 0, "Must provide reward");
        
        uint256 taskId = uint256(keccak256(abi.encodePacked(
            block.timestamp, msg.sender, inputHash
        )));
        
        ComputeTask storage task = tasks[taskId];
        task.taskId = taskId;
        task.requester = msg.sender;
        task.taskType = taskType;
        task.inputHash = inputHash;
        task.deadline = deadline;
        task.reward = msg.value;
        task.status = TaskStatus.Created;
        
        emit TaskCreated(taskId, msg.sender, taskType);
        return taskId;
    }
    
    function submitResult(
        uint256 taskId,
        bytes32 resultHash,
        bytes memory proof
    ) public onlyAuthorized {
        ComputeTask storage task = tasks[taskId];
        require(task.status == TaskStatus.InProgress, "Invalid status");
        require(block.timestamp < task.deadline, "Past deadline");
        require(!task.hasSubmitted[msg.sender], "Already submitted");
        
        // 验证计算证明
        require(verifyComputation(task, resultHash, proof), "Invalid proof");
        
        task.participantCommitments[msg.sender] = resultHash;
        task.hasSubmitted[msg.sender] = true;
        
        emit ResultSubmitted(taskId, msg.sender, resultHash);
        
        // 检查是否达到共识
        if (checkConsensus(taskId)) {
            finalizeTask(taskId);
        }
    }
    
    function verifyComputation(
        ComputeTask storage task,
        bytes32 resultHash,
        bytes memory proof
    ) private view returns (bool) {
        // 这里简化了,实际要验证零知识证明
        // 比如验证同态加密的计算正确性
        return true;
    }
}

实际运行效果

部署半年来的运行数据:

指标 数值 说明
总计算任务 128,459 日均700+
参与节点 12 3家银行×4个节点
平均响应时间 3.2秒 包含链上确认
争议率 0.02% 26次争议
链上存储 18GB 仅存储哈希和证明
Gas成本 ¥0.13/任务 使用联盟链,成本可控

踩坑心得

1. 同态加密的坑

最大的坑是参数选择。一开始为了安全,我们把多项式度设到8192,结果一个简单的乘法都要算10秒。后来请教了密码学专家,才知道4096对我们的场景已经足够安全了。

还有就是噪声增长问题。同态运算会累积噪声,运算链太长会导致解密失败:

# 错误示例:连续乘法导致噪声爆炸
result = encrypted_a
for i in range(10):
    result = he.multiply(result, encrypted_b)  # 噪声指数增长!

# 正确做法:重新设计算法,减少乘法深度
result = he.power(encrypted_a, 10)  # 使用快速幂算法

2. 模型保护的平衡

保护过度会影响正常业务。我们一开始把查询限制设得太严,结果批量查询的正常业务都被拦截了。后来加入了白名单机制和业务场景识别。

3. 联盟链的共识

不要迷信复杂的共识算法。我们最开始想用PBFT,结果发现12个节点的场景下,简单的多数投票就够了,还更稳定。

未来展望

这套系统运行半年多了,基本稳定。下一步的计划:

  1. 性能提升:正在研究基于FPGA的同态加密加速,预计能提速10倍
  2. 功能扩展:准备支持更复杂的机器学习算法,比如联邦学习
  3. 标准化:正在和其他同业一起制定隐私计算的行业标准

隐私计算这个领域真的是既有挑战又有前景。虽然现在性能还不如明文计算,但是在数据要素流通的大背景下,"可用不可见"的需求会越来越多。

如果你也在做隐私计算,欢迎交流。特别是同态加密的工程优化,还有很多坑等着我们一起填呢!

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐