RK3588人脸识别
最近要在边缘端部署一个人脸识别密码管理器的任务,需要将人脸的图像经过检测和识别变成特征向量,发送给服务器端进行比对,并根据比对的结果去数据库中寻找对应的账号和密码,返回给边缘段进行登录操作。但是如果将检测和识别的权重文件在cpu上执行,不仅速度慢,而且功耗也高,所以我们选择了NPU的开发板。这是第一篇博客,先写服务器端如何构建,怎么存储特征向量和人脸ID。
·
前言:最近要在边缘端部署一个人脸识别密码管理器的任务,需要将人脸的图像经过检测和识别变成特征向量,发送给服务器端进行比对,并根据比对的结果去数据库中寻找对应的账号和密码,返回给边缘段进行登录操作。但是如果将检测和识别的权重文件在cpu上执行,不仅速度慢,而且功耗也高,所以我们选择了NPU的开发板。
这是第一篇博客,先写服务器端如何构建,怎么存储特征向量和人脸ID
服务端
1、数据集创建
你可以选择使用摄像头采集人脸直接进行编码并保存,但是我的工作主要是在边缘设备上,数据库对我来说只是验证的过程,所以我先在本地进行保存图片,在调用函数进行编码保存为.npy文件
建立face_dataset文件夹图片的命名规则如下

2、ONNX模型
将之前训练好的人脸检测和识别的onnx权重文件放在指定目录下

这两个模型的主干网络和框架的选择分别是
model目录下的两个权重文件
1:model/RetinaFace_mobile320.onnx
以mobilenetv1*0.25为主干网络,使用RetinaFace进行人脸检测
2:model/facenet_mobilenet.onnx
以mobilenetv1为主干网络,使用facenet进行人脸特征编码
3、数据库
创建model_data文件夹保存人脸的特征向量和名字
4、代码部分
a、数据库编码函数encoding.py
'''
这段代码是对数据集中的人脸图像进行编码报错
数据集格式
例:
tsy_1.jpg
gy_2.jpg
shy_3.jpg
zh_4.jpg
'''
import os
from tqdm import tqdm
import retinaface
import rec
import cv2
import numpy as np
list_dir = os.listdir("face_dataset")
image_paths = []
names = []
for name in list_dir:
image_paths.append("face_dataset/"+name)
names.append(name.split("_")[0])
def encode_face_dataset(image_paths, names):
embedder_ret = []
for index, path in enumerate(tqdm(image_paths)):
image = cv2.imread(path)
x1, y1, x2, y2, ret = retinaface.get_faces(image)
if ret == None: return None
for face in ret:
embedding = rec.get_feat(face['face'])
embedder_ret.append(embedding)
np.save("model_data/face_encoding.npy", embedder_ret)
np.save("model_data/names.npy", names)
return embedder_ret
encode_face_dataset(image_paths ,names)
b、人脸对齐函数face_align.py
'''
这段代码是根据RetinaFace网络生成的人脸特征点坐标进行人脸矫正
送进facenet进行人脸编码
'''
import cv2
import numpy as np
def affineMatrix(nose, leftEyeCenter, rightEyeCenter, scale=2.5):
nose = np.array(nose, dtype=np.float32)
left_eye = np.array(leftEyeCenter, dtype=np.float32)
right_eye = np.array(rightEyeCenter, dtype=np.float32)
eye_width = right_eye - left_eye
angle = np.arctan2(eye_width[1], eye_width[0])
center = nose
alpha = np.cos(angle)
beta = np.sin(angle)
w = np.sqrt(np.sum(eye_width**2)) * scale
m = [[alpha, beta, -alpha * center[0] - beta * center[1] + w * 0.5],
[-beta, alpha, beta * center[0] - alpha * center[1] + w * 0.5]]
return np.array(m), (int(w), int(w))
def align(img, nose, leftEyeCenter, rightEyeCenter, target_size=(160, 160)):
mat, size = affineMatrix(nose, leftEyeCenter, rightEyeCenter)
img = cv2.warpAffine(img, mat, size)
img = letterbox_image(img, target_size)
img = img.astype(np.float32) / 255.0
return img
def letterbox_image(image, size):
ih, iw, _ = np.shape(image)
w, h = size
scale = min(w/iw, h/ih)
nw = int(iw*scale)
nh = int(ih*scale)
image = cv2.resize(image, (nw, nh))
new_image = np.ones([size[1], size[0], 3]) * 128
new_image[(h-nh)//2:nh+(h-nh)//2, (w-nw)//2:nw+(w-nw)//2] = image
return new_image
c、核心代码get_embedding.py
'''
此代码是人脸识别的中心代码,
x1 ,y1, x2, y2,ret = get_faces(image)检测
embedding = rec.get_feat(face['face'])识别
'''
from retinaface import get_faces
import rec
def get_embeddings(image):
x1 ,y1, x2, y2,ret = get_faces(image)
if len(x1) != 1:
return x1, y1, x2, y2, ret
if ret == None: return None
embedder_ret = []
for face in ret:
embedding = rec.get_feat(face['face'])
embedder_ret.append(embedding)
return x1 ,y1, x2, y2, embedder_ret
d、人脸识别的函数rec.py
'''
人脸特征点编码的代码部分
'''
import cv2
import onnxruntime as ort
import numpy as np
facenet = 'model/facenet_mobilenet.onnx'
facenet = ort.InferenceSession(facenet)
def get_feat(img):
img = img[..., ::-1]
blob = np.expand_dims(img, axis=0)
blob = np.transpose(blob, (0, 3, 1, 2))
input_name = facenet.get_inputs()[0].name
inputs = {input_name: blob}
outputs = facenet.run(None, inputs)
net_out = outputs[0][0]
return net_out
e、人脸检测的函数retinaface.py
'''
人脸检测的代码部分
'''
import numpy as np
import cv2
from math import ceil
from itertools import product as product
import face_align
import time
import onnxruntime as ort
retinaface = 'model/RetinaFace_mobile320.onnx'
net = ort.InferenceSession(retinaface)
def letterbox_resize(image, size, bg_color):
if isinstance(image, str):
image = cv2.imread(image)
target_width, target_height = size
image_height, image_width, _ = image.shape
aspect_ratio = min(target_width / image_width, target_height / image_height)
new_width = int(image_width * aspect_ratio)
new_height = int(image_height * aspect_ratio)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
result_image = np.ones((target_height, target_width, 3), dtype=np.uint8) * bg_color
offset_x = (target_width - new_width) // 2
offset_y = (target_height - new_height) // 2
result_image[offset_y:offset_y + new_height, offset_x:offset_x + new_width] = image
return result_image, aspect_ratio, offset_x, offset_y
def PriorBox(image_size):
anchors = []
min_sizes = [[16, 32], [64, 128], [256, 512]]
steps = [8, 16, 32]
feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in steps]
for k, f in enumerate(feature_maps):
min_sizes_ = min_sizes[k]
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes_:
s_kx = min_size / image_size[1]
s_ky = min_size / image_size[0]
dense_cx = [x * steps[k] / image_size[1] for x in [j + 0.5]]
dense_cy = [y * steps[k] / image_size[0] for y in [i + 0.5]]
for cy, cx in product(dense_cy, dense_cx):
anchors += [cx, cy, s_kx, s_ky]
output = np.array(anchors).reshape(-1, 4)
return output
def box_decode(loc, priors):
variances = [0.1, 0.2]
boxes = np.concatenate((
priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), axis=1)
boxes[:, :2] -= boxes[:, 2:] / 2
boxes[:, 2:] += boxes[:, :2]
return boxes
def decode_landm(pre, priors):
variances = [0.1, 0.2]
landmarks = np.concatenate((
priors[:, :2] + pre[:, :2] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:]
), axis=1)
return landmarks
def nms(dets, thresh):
x1 = dets[:, 0]
y1 = dets[:, 1]
x2 = dets[:, 2]
y2 = dets[:, 3]
scores = dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
def get_faces(img):
img_height, img_width, _ = img.shape
model_height, model_width = (320, 320)
letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(img, (model_height,model_width), 114) # letterbox缩放
infer_img = letterbox_img[..., ::-1]
i=np.expand_dims(infer_img,0)
start_time = time.time()
i = i.astype('float32')
i = np.transpose(i, (0, 3, 1, 2)) # 交换维度顺序
input_name = net.get_inputs()[0].name # 获取正确的输入名称
outputs = net.run(None, {input_name: i}) # 使用正确的输入名称
if outputs == None: return None
loc, conf, landmarks = outputs
priors = PriorBox(image_size=(model_height, model_width))
boxes = box_decode(loc.squeeze(0), priors)
scale = np.array([model_width, model_height,
model_width, model_height])
boxes = boxes * scale // 1
boxes[...,0::2] =np.clip((boxes[...,0::2] - offset_x) / aspect_ratio, 0, img_width) #letterbox
boxes[...,1::2] =np.clip((boxes[...,1::2] - offset_y) / aspect_ratio, 0, img_height) #letterbox
scores = conf.squeeze(0)[:, 1]
landmarks = decode_landm(landmarks.squeeze(
0), priors)
scale_landmarks = np.array([model_width, model_height, model_width, model_height,
model_width, model_height, model_width, model_height,
model_width, model_height])
landmarks = landmarks * scale_landmarks // 1
landmarks[...,0::2] = np.clip((landmarks[...,0::2] - offset_x) / aspect_ratio, 0, img_width) #letterbox
landmarks[...,1::2] = np.clip((landmarks[...,1::2] - offset_y) / aspect_ratio, 0, img_height) #letterbox
inds = np.where(scores > 0.02)[0]
boxes = boxes[inds]
landmarks = landmarks[inds]
scores = scores[inds]
order = scores.argsort()[::-1]
boxes = boxes[order]
landmarks = landmarks[order]
scores = scores[order]
dets = np.hstack((boxes, scores[:, np.newaxis])).astype(
np.float32, copy=False)
keep = nms(dets, 0.5)
dets = dets[keep, :]
landmarks = landmarks[keep]
dets = np.concatenate((dets, landmarks), axis=1)
ret = []
x1_list, y1_list, x2_list, y2_list = [], [], [], []
for data in dets:
if float(data[4]) < 0.6:
continue
x1 = int(data[0])
y1 = int(data[1])
x2 = int(data[2])
y2 = int(data[3])
x3 = int(data[5])
y3 = int(data[6])
x4 = int(data[7])
y4 = int(data[8])
x5 = int(data[9])
y5 = int(data[10])
leftEyeCenter = np.array([x3, y3])
rightEyeCenter = np.array([x4, y4])
nose = np.array([x5, y5])
face_aligned = face_align.align(img, nose, leftEyeCenter, rightEyeCenter)
faces = {'face' : face_aligned, 'score' : data[4], 'point1' : x1, 'point2': y1 + 12}
ret.append(faces)
x1_list.append(x1)
y1_list.append(y1)
x2_list.append(x2)
y2_list.append(y2)
# Release
return x1_list, y1_list, x2_list, y2_list, ret
f、本地测试函数test.py
'''
本地测试的代码
与server.py中的代码类似,只不过特征向量来自本地,并读取本地的.npy文件进行欧氏距离的比对
'''
import get_embedding
import cv2
import time
import numpy as np
facenet_threhold = 0.9
def face_distance(face_encodings, face_to_compare):
if len(face_encodings) == 0:
return np.empty((0))
return np.linalg.norm(face_encodings - face_to_compare, axis=1)
def compare_faces(known_face_encodings, face_encoding_to_check, tolerance=1):
dis = face_distance(known_face_encodings, face_encoding_to_check)
return list(dis <= tolerance), dis
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("无法打开摄像头")
exit()
print("按下 'q' 键退出")
prev_time = time.time()
fps = 0
while True:
ret, frame = cap.read()
if not ret:
print("无法从摄像头读取画面")
break
frame = cv2.flip(frame, 1)
old_image = frame.copy()
current_time = time.time()
elapsed_time = current_time - prev_time
fps = 1 / elapsed_time if elapsed_time > 0 else 0
prev_time = current_time
try:
x1, y1, x2, y2, embedder_ret = get_embedding.get_embeddings(old_image)
known_face_encodings = np.load("model_data/face_encoding.npy")
known_face_names = np.load("model_data/names.npy")
if len(x1) > 0:
if len(x1) > 1:
for i in range(len(x1)):
cv2.rectangle(old_image, (x1[i], y1[i]), (x2[i], y2[i]), (255, 0, 0), 2)
else:
cv2.rectangle(old_image, (x1[0], y1[0]), (x2[0], y2[0]), (255, 0, 0), 2)
face_names = []
for face_encoding in embedder_ret:
matches, face_distances = compare_faces(known_face_encodings, face_encoding,
tolerance=facenet_threhold)
# print(matches)
# print(face_distances)
name = "Unknown"
best_match_index = np.argmin(face_distances)
if matches[best_match_index]:
name = known_face_names[best_match_index]
print(name)
face_names.append(name)
else:
print("未检测到人脸")
except Exception as e:
print(f"获取人脸特征失败: {e}")
cv2.putText(old_image, f'FPS: {fps:.2f}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
cv2.imshow("Camera", old_image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
现在人脸特征编码保存,人脸识别数据库对比的功能已经基本实现。
服务端代码(与客户端连接才可以用)
'''
这是我写的第一版的服务端代码,可以与rk3088进行通信,通过socket接受来自开发板的128维特征向量
与本地数据库进行比对model_data/face_encoding.npy,model_data/names.npy
找到特征向量对应的ID并返回给开发板进行打印和显示
'''
import socket
import numpy as np
import pickle
import struct
from sklearn.metrics.pairwise import euclidean_distances
known_face_encodings = np.load("model_data/face_encoding.npy")
known_face_names = np.load("model_data/names.npy")
HOST = 'xxx.xxx.xx.x'
PORT = 12345
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((HOST, PORT))
server_socket.listen(5)
print("服务器正在等待连接...")
while True:
client_socket, client_address = server_socket.accept()
print(f"已连接客户端:{client_address}")
try:
while True:
data_length = b""
while len(data_length) < 4:
packet = client_socket.recv(4 - len(data_length))
if not packet:
raise Exception("客户端连接已关闭或数据接收异常")
data_length += packet
length = struct.unpack("!I", data_length)[0]
print(f"接收到数据长度: {length}")
face_encoding_data = b""
while len(face_encoding_data) < length:
packet = client_socket.recv(length - len(face_encoding_data))
if not packet:
raise Exception("客户端连接已关闭或数据接收异常")
face_encoding_data += packet
try:
face_encoding = pickle.loads(face_encoding_data)
print(f"接收到人脸特征长度: {len(face_encoding)}")
except Exception as e:
print(f"反序列化失败: {e}")
continue
distances = euclidean_distances([face_encoding], known_face_encodings)
print(distances)
best_match_index = np.argmin(distances)
match_distance = distances[0][best_match_index]
facenet_threshold = 0.75
if match_distance <= facenet_threshold:
name = known_face_names[best_match_index]
else:
name = "Unknown"
print(f"匹配结果:{name}")
serialized_name = pickle.dumps(name)
client_socket.sendall(serialized_name)
except Exception as e:
print(f"发生错误: {e}")
finally:
client_socket.close()
有疑问请联系我:QQ:3281428136
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)