隐私计算实战:我们如何用同态加密搭建多方征信系统
去年接了个"烫手山芋"项目——几家银行想联合做征信,共享黑名单数据,但谁都不愿意把自己的数据明文给别人。法务部门的要求更是苛刻:数据不出行、可用不可见、过程可审计。当时我心想,这不是要上天吗?
经过大半年的摸索和踩坑,我们最终用同态加密、模型保护和联盟链技术搭建了一套多方隐私计算平台。今天把这段经历分享出来,希望对做隐私计算的同学有所帮助。
一、同态加密计算:在密文上做运算的黑魔法
需求背景
先说说最核心的需求。三家银行(暂且叫A、B、C银行)各自有一批信贷黑名单数据,想要计算三个关键指标:
- 某用户在多少家银行有不良记录
- 用户的综合风险评分
- 行业整体的违约率统计
但是,谁都不能看到其他银行的原始数据。这就是同态加密要解决的问题。
技术选型的纠结
一开始我们对比了好几种方案:
| 方案 | 计算能力 | 性能 | 实现难度 | 安全性 | 最终选择 |
|---|---|---|---|---|---|
| 安全多方计算(MPC) | 通用 | 慢 | 高 | 高 | ❌ |
| 同态加密(HE) | 受限 | 很慢 | 极高 | 极高 | ✅ |
| 可信执行环境(TEE) | 通用 | 快 | 中 | 中 | 部分场景✅ |
| 差分隐私 | 统计 | 快 | 低 | 中 | 辅助使用 |
最后选择了同态加密为主,TEE为辅的方案。主要是因为监管对数据安全要求极高,宁愿慢一点也要绝对安全。
同态加密的实现细节
我们用的是微软的SEAL库,支持BFV和CKKS两种方案。先看一个简单的例子:
// 初始化同态加密环境
#include "seal/seal.h"
using namespace seal;
class HomomorphicComputer {
private:
EncryptionParameters parms;
SEALContext context;
KeyGenerator keygen;
PublicKey public_key;
SecretKey secret_key;
Encryptor encryptor;
Evaluator evaluator;
Decryptor decryptor;
public:
HomomorphicComputer() : parms(scheme_type::bfv) {
// 设置多项式模数度
size_t poly_modulus_degree = 4096;
parms.set_poly_modulus_degree(poly_modulus_degree);
// 设置系数模数
parms.set_coeff_modulus(CoeffModulus::BFVDefault(poly_modulus_degree));
// 设置明文模数
parms.set_plain_modulus(1024);
// 创建context
context = SEALContext(parms);
// 生成密钥
keygen = KeyGenerator(context);
keygen.create_public_key(public_key);
secret_key = keygen.secret_key();
// 初始化加密器、计算器、解密器
encryptor = Encryptor(context, public_key);
evaluator = Evaluator(context);
decryptor = Decryptor(context, secret_key);
}
// 加密整数
Ciphertext encrypt(int value) {
Plaintext plain(to_string(value));
Ciphertext encrypted;
encryptor.encrypt(plain, encrypted);
return encrypted;
}
// 同态加法
Ciphertext add(const Ciphertext &a, const Ciphertext &b) {
Ciphertext result;
evaluator.add(a, b, result);
return result;
}
// 同态乘法
Ciphertext multiply(const Ciphertext &a, const Ciphertext &b) {
Ciphertext result;
evaluator.multiply(a, b, result);
evaluator.relinearize_inplace(result, keygen.create_relin_keys());
return result;
}
};
但实际业务比这复杂得多。比如计算用户的综合风险评分:
# 风险评分计算(简化版)
class PrivacyRiskScoring:
def __init__(self, he_context):
self.he = he_context
self.weights = {
'overdue_count': 0.3,
'overdue_amount': 0.4,
'query_frequency': 0.2,
'account_age': 0.1
}
def compute_risk_score(self, encrypted_features):
"""在密文上计算风险评分"""
# 所有银行的数据都是加密的
bank_a_data = encrypted_features['bank_a']
bank_b_data = encrypted_features['bank_b']
bank_c_data = encrypted_features['bank_c']
# 同态计算:逾期次数总和
total_overdue = self.he.add(
self.he.add(bank_a_data['overdue_count'],
bank_b_data['overdue_count']),
bank_c_data['overdue_count']
)
# 同态计算:加权评分
score = self.he.multiply_plain(total_overdue,
int(self.weights['overdue_count'] * 1000))
# 注意:这里的score仍然是密文!
return score
性能优化的血泪史
刚开始跑的时候,计算1000个用户的风险评分要花2个小时!这谁能接受?于是开始了漫长的优化之旅:
| 优化措施 | 实施前 | 实施后 | 提升 | 代价 |
|---|---|---|---|---|
| 批处理(Batching) | 7200s | 1800s | 75% | 内存增加4倍 |
| 参数调优 | 1800s | 900s | 50% | 安全性略降 |
| GPU加速 | 900s | 180s | 80% | 需要GPU服务器 |
| 混合计算 | 180s | 45s | 75% | 架构复杂 |
最关键的优化是批处理,把多个数据打包在一起计算:
// 批处理优化
class BatchedHomomorphic {
BatchEncoder batch_encoder;
Ciphertext encrypt_batch(const vector<int64_t> &values) {
Plaintext plain;
batch_encoder.encode(values, plain);
Ciphertext encrypted;
encryptor.encrypt(plain, encrypted);
return encrypted;
}
// 一次计算多个用户的评分
Ciphertext compute_batch_scores(const vector<Ciphertext> &features) {
size_t slot_count = batch_encoder.slot_count();
// 并行计算slot_count个用户
// ...
}
};
二、模型逆向工程防护:保护我们的"独门秘方"
模型泄露的风险
在做风控模型时,我们遇到了另一个问题:每家银行都有自己的风控模型,这是核心竞争力。但在联合建模时,模型参数可能被其他参与方逆向推导出来。
我们做了个实验,仅通过1万次API调用,就能以85%的准确率复现出对方的决策树模型。这太可怕了!
防护方案设计
经过研究,我们设计了多层防护体系:
class ModelProtectionFramework:
def __init__(self, original_model):
self.model = original_model
self.noise_generator = DifferentialPrivacy(epsilon=0.1)
self.query_monitor = QueryMonitor()
self.watermark = ModelWatermark()
def protected_predict(self, input_data, user_id):
# 1. 查询频率检测
if self.query_monitor.is_suspicious(user_id):
return self._random_response()
# 2. 输入扰动
perturbed_input = self._add_input_noise(input_data)
# 3. 模型预测
raw_output = self.model.predict(perturbed_input)
# 4. 输出扰动
noisy_output = self.noise_generator.add_noise(raw_output)
# 5. 嵌入水印
watermarked_output = self.watermark.embed(noisy_output, user_id)
# 6. 记录查询
self.query_monitor.log_query(user_id, input_data, watermarked_output)
return watermarked_output
def _add_input_noise(self, data):
"""添加对抗性扰动,不影响正常预测但能干扰逆向"""
epsilon = 0.01
noise = np.random.laplace(0, epsilon, data.shape)
return data + noise
查询模式检测
最有意思的是查询模式检测。我们发现正常用户和恶意逆向的查询模式差异很大:
| 行为特征 | 正常用户 | 逆向工程攻击 | 检测阈值 |
|---|---|---|---|
| 查询频率 | 10-50次/天 | >1000次/天 | 100次/天 |
| 输入分布 | 符合业务分布 | 均匀/网格分布 | KL散度>0.5 |
| 时间模式 | 工作时间 | 24小时持续 | 夜间查询>30% |
| 相似查询 | <5% | >40% | 20% |
class QueryMonitor:
def __init__(self, window_size=3600): # 1小时窗口
self.query_history = defaultdict(list)
self.window_size = window_size
def is_suspicious(self, user_id):
history = self.query_history[user_id]
# 频率检测
recent_queries = self._get_recent_queries(history)
if len(recent_queries) > 100:
return True
# 分布检测
if self._check_distribution_anomaly(recent_queries):
return True
# 相似性检测
if self._check_similarity(recent_queries) > 0.4:
return True
return False
def _check_distribution_anomaly(self, queries):
"""检测输入分布是否异常"""
if len(queries) < 20:
return False
# 提取所有查询的特征
features = [q['features'] for q in queries]
# 计算特征的分布
hist, edges = np.histogramdd(features, bins=10)
hist = hist.flatten() + 1e-10
hist = hist / hist.sum()
# 均匀分布的熵
uniform = np.ones_like(hist) / len(hist)
# KL散度
kl_div = np.sum(hist * np.log(hist / uniform))
return kl_div < 0.5 # 太接近均匀分布
三、隐私计算联盟链:让一切都可审计
为什么需要联盟链
光有加密计算还不够,监管要求所有的计算过程都要可审计、可追溯。而且参与方之间需要一个去中心化的协作机制。于是我们搭建了一条隐私计算联盟链。
链上链下协同架构
┌─────────────────────────────────────────────────┐
│ 应用层 │
│ 风控决策系统 │ 征信查询API │ 监管审计接口 │
└─────────────────┬───────────────────────────────┘
│
┌─────────────────┴───────────────────────────────┐
│ 链上(联盟链) │
│ · 任务发布与认领 │
│ · 计算证明存储 │
│ · 结果哈希记录 │
│ · 激励结算 │
└─────────────────┬───────────────────────────────┘
│
┌─────────────────┴───────────────────────────────┐
│ 链下(隐私计算) │
│ · 同态加密计算 │
│ · 安全多方计算 │
│ · TEE可信计算 │
└─────────────────────────────────────────────────┘
智能合约设计
核心合约负责任务调度和结果验证:
pragma solidity ^0.8.0;
contract PrivacyComputingPlatform {
struct ComputeTask {
uint256 taskId;
address requester;
string taskType; // "risk_score", "blacklist_check", etc
bytes32 inputHash; // 输入数据的哈希
uint256 deadline;
uint256 reward;
TaskStatus status;
mapping(address => bytes32) participantCommitments;
mapping(address => bool) hasSubmitted;
bytes32 finalResult;
}
enum TaskStatus { Created, InProgress, Completed, Disputed }
mapping(uint256 => ComputeTask) public tasks;
mapping(address => bool) public authorizedNodes;
mapping(address => uint256) public nodeReputation;
event TaskCreated(uint256 taskId, address requester, string taskType);
event ResultSubmitted(uint256 taskId, address node, bytes32 resultHash);
event TaskCompleted(uint256 taskId, bytes32 finalResult);
modifier onlyAuthorized() {
require(authorizedNodes[msg.sender], "Not authorized");
_;
}
function createTask(
string memory taskType,
bytes32 inputHash,
uint256 deadline
) public payable returns (uint256) {
require(msg.value > 0, "Must provide reward");
uint256 taskId = uint256(keccak256(abi.encodePacked(
block.timestamp, msg.sender, inputHash
)));
ComputeTask storage task = tasks[taskId];
task.taskId = taskId;
task.requester = msg.sender;
task.taskType = taskType;
task.inputHash = inputHash;
task.deadline = deadline;
task.reward = msg.value;
task.status = TaskStatus.Created;
emit TaskCreated(taskId, msg.sender, taskType);
return taskId;
}
function submitResult(
uint256 taskId,
bytes32 resultHash,
bytes memory proof
) public onlyAuthorized {
ComputeTask storage task = tasks[taskId];
require(task.status == TaskStatus.InProgress, "Invalid status");
require(block.timestamp < task.deadline, "Past deadline");
require(!task.hasSubmitted[msg.sender], "Already submitted");
// 验证计算证明
require(verifyComputation(task, resultHash, proof), "Invalid proof");
task.participantCommitments[msg.sender] = resultHash;
task.hasSubmitted[msg.sender] = true;
emit ResultSubmitted(taskId, msg.sender, resultHash);
// 检查是否达到共识
if (checkConsensus(taskId)) {
finalizeTask(taskId);
}
}
function verifyComputation(
ComputeTask storage task,
bytes32 resultHash,
bytes memory proof
) private view returns (bool) {
// 这里简化了,实际要验证零知识证明
// 比如验证同态加密的计算正确性
return true;
}
}
实际运行效果
部署半年来的运行数据:
| 指标 | 数值 | 说明 |
|---|---|---|
| 总计算任务 | 128,459 | 日均700+ |
| 参与节点 | 12 | 3家银行×4个节点 |
| 平均响应时间 | 3.2秒 | 包含链上确认 |
| 争议率 | 0.02% | 26次争议 |
| 链上存储 | 18GB | 仅存储哈希和证明 |
| Gas成本 | ¥0.13/任务 | 使用联盟链,成本可控 |
踩坑心得
1. 同态加密的坑
最大的坑是参数选择。一开始为了安全,我们把多项式度设到8192,结果一个简单的乘法都要算10秒。后来请教了密码学专家,才知道4096对我们的场景已经足够安全了。
还有就是噪声增长问题。同态运算会累积噪声,运算链太长会导致解密失败:
# 错误示例:连续乘法导致噪声爆炸
result = encrypted_a
for i in range(10):
result = he.multiply(result, encrypted_b) # 噪声指数增长!
# 正确做法:重新设计算法,减少乘法深度
result = he.power(encrypted_a, 10) # 使用快速幂算法
2. 模型保护的平衡
保护过度会影响正常业务。我们一开始把查询限制设得太严,结果批量查询的正常业务都被拦截了。后来加入了白名单机制和业务场景识别。
3. 联盟链的共识
不要迷信复杂的共识算法。我们最开始想用PBFT,结果发现12个节点的场景下,简单的多数投票就够了,还更稳定。
未来展望
这套系统运行半年多了,基本稳定。下一步的计划:
- 性能提升:正在研究基于FPGA的同态加密加速,预计能提速10倍
- 功能扩展:准备支持更复杂的机器学习算法,比如联邦学习
- 标准化:正在和其他同业一起制定隐私计算的行业标准
隐私计算这个领域真的是既有挑战又有前景。虽然现在性能还不如明文计算,但是在数据要素流通的大背景下,"可用不可见"的需求会越来越多。
如果你也在做隐私计算,欢迎交流。特别是同态加密的工程优化,还有很多坑等着我们一起填呢!
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)