444 lines
17 KiB
Python
444 lines
17 KiB
Python
import sys
|
||
import cv2
|
||
import os
|
||
import time
|
||
from PyQt5.QtCore import QThread, pyqtSignal, QObject
|
||
import face_recognition
|
||
|
||
current_dir = os.path.dirname(__file__)
|
||
share_code_dir = os.path.abspath(os.path.join(current_dir, '..', '..'))
|
||
sys.path.append(share_code_dir)
|
||
|
||
from print_color import *
|
||
|
||
DEFAULT_VIDEO_SLEEP_MS = 20
|
||
|
||
# 图像处理线程
|
||
class ImageProcessingThread(QThread):
|
||
processed_image_signal = pyqtSignal(object)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
def run(self):
|
||
while True:
|
||
# 在这里添加图像处理代码
|
||
# 这里暂时只是将图像传递给下一个线程
|
||
time.sleep(0.01)
|
||
self.processed_image_signal.emit(None)
|
||
|
||
# 摄像头采集线程
|
||
class CameraThread(QThread):
|
||
image_signal = pyqtSignal(object)
|
||
|
||
def __init__(self, camera_url, circuit_id: int = 0):
|
||
super().__init__()
|
||
|
||
self.camera_url = camera_url # 摄像头url地址, 整数 或者 字符串
|
||
self.face_detection: bool = False
|
||
|
||
if isinstance(camera_url, int):
|
||
self.camera_url_str = str(camera_url)
|
||
else:
|
||
self.camera_url_str = camera_url
|
||
|
||
self.cap: cv2.VideoCapture = None
|
||
self.running: bool = True
|
||
|
||
self.fps = 0
|
||
|
||
self.cycle_limit = DEFAULT_VIDEO_SLEEP_MS
|
||
self.cycle_ms = 0
|
||
self.circuit_id = circuit_id # 摄像头对应的回路, 组合开关每一个回路都有对应的摄像头
|
||
self.is_signal_connect = False
|
||
self.is_emit = False
|
||
|
||
self.set_video_cycle_ms(2000)
|
||
|
||
def signal_connect(self, slot_func):
|
||
self.image_signal.connect(slot_func)
|
||
self.is_signal_connect = True
|
||
|
||
def signal_disconnect(self):
|
||
# 判断信号是否已连接
|
||
if self.is_signal_connect:
|
||
self.image_signal.disconnect()
|
||
self.is_signal_connect = False
|
||
|
||
def set_video_cycle_ms(self, cycle_ms: int = 10):
|
||
if self.cycle_limit != cycle_ms:
|
||
if cycle_ms <= DEFAULT_VIDEO_SLEEP_MS:
|
||
self.cycle_limit = DEFAULT_VIDEO_SLEEP_MS
|
||
else:
|
||
self.cycle_limit = cycle_ms
|
||
|
||
# 新建函数,改变函数延时方法
|
||
def change_camera_url(self, camera_url: str):
|
||
if self.circuit_id == camera_url:
|
||
self.set_video_cycle_ms(DEFAULT_VIDEO_SLEEP_MS)
|
||
else:
|
||
self.set_video_cycle_ms(1000)
|
||
|
||
def close(self):
|
||
if self.cap == None: # 初始化一直未完成
|
||
self.terminate()
|
||
self.running = False
|
||
|
||
def run(self):
|
||
process_count = 0
|
||
fps_time = 0
|
||
|
||
while self.running == True:
|
||
inform_msg = "cameral init start, url = " + self.camera_url_str
|
||
print_inform_msg(inform_msg)
|
||
|
||
try:
|
||
if isinstance(self.camera_url, int):
|
||
# 在 Windows 平台下,使用默认的 cv2.VideoCapture 接口
|
||
self.cap = cv2.VideoCapture(self.camera_url, cv2.CAP_DSHOW)
|
||
else:
|
||
# 在 Linux 平台下,使用默认的 cv2.VideoCapture 接口
|
||
self.cap = cv2.VideoCapture(self.camera_url)
|
||
|
||
if self.cap != None:
|
||
inform_msg = "cameral Init Success, url = " + self.camera_url_str
|
||
self.is_emit = True
|
||
print_inform_msg(inform_msg)
|
||
else:
|
||
inform_msg = "cameral connection timeout, url = " + self.camera_url_str
|
||
print_inform_msg(inform_msg)
|
||
|
||
except Exception as e:
|
||
self.cap = None
|
||
inform_msg = "cameral camera Init Fail, url = " + self.camera_url_str
|
||
print_error_msg(inform_msg)
|
||
|
||
base_time = time.time()
|
||
|
||
while self.running and self.cap != None:
|
||
# 延时20ms
|
||
time.sleep(DEFAULT_VIDEO_SLEEP_MS / 1000)
|
||
if self.cycle_ms + DEFAULT_VIDEO_SLEEP_MS < self.cycle_limit:
|
||
self.cycle_ms += DEFAULT_VIDEO_SLEEP_MS
|
||
self.cap.grab() # 抛弃多余的帧,保持最新帧
|
||
continue
|
||
else:
|
||
self.cycle_ms = 0
|
||
|
||
try:
|
||
ret, frame = self.cap.read()
|
||
cur_time = time.time()
|
||
|
||
execution_time = cur_time - base_time
|
||
base_time = cur_time
|
||
process_count += 1
|
||
|
||
fps_time += execution_time
|
||
|
||
if fps_time >= 1:
|
||
self.fps = process_count
|
||
process_count = 0
|
||
fps_time = 0
|
||
|
||
if execution_time < 5:
|
||
image_object: QObject = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
self.image_signal.emit(image_object)
|
||
else: # 时间差大于5秒, 表示网络可能中断过, 退出并重新连接
|
||
err_message = "cameral read timeout, url = " + self.camera_url_str
|
||
print_error_msg(err_message)
|
||
break
|
||
except Exception as e:
|
||
err_message = "cameral read timeout, url = " + self.camera_url_str
|
||
self.is_emit = False
|
||
print_error_msg(err_message)
|
||
if self.running == True:
|
||
time.sleep(2)
|
||
break
|
||
|
||
print_inform_msg("cameral connection End")
|
||
time.sleep(0.01)
|
||
|
||
if self.cap != None:
|
||
self.cap.release()
|
||
self.cap = None
|
||
# - * - coding:utf - 8 - * -
|
||
# import sys
|
||
# import cv2
|
||
# import os
|
||
# import time
|
||
# import subprocess
|
||
# import face_recognition
|
||
# import numpy as np
|
||
# from PIL import Image, ImageDraw, ImageFont
|
||
# from PyQt5.QtCore import QThread, pyqtSignal, QObject
|
||
# from print_color import *
|
||
# current_dir = os.path.dirname(__file__)
|
||
# share_code_dir = os.path.abspath(os.path.join(current_dir, '..', '..'))
|
||
# sys.path.append(share_code_dir)
|
||
|
||
# DEFAULT_VIDEO_SLEEP_MS = 20
|
||
|
||
# # 定义路径常量
|
||
# npy_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'face_data/face_data.npy'))
|
||
# image_folder_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'face_data'))
|
||
# font_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'font/hanzi.ttc'))
|
||
# def load_face_data():
|
||
# face_data = {}
|
||
# try:
|
||
# face_data = np.load(npy_file_path, allow_pickle=True).item()
|
||
# print_debug_msg(f"Loaded {len(face_data)} face encodings from {npy_file_path}.")
|
||
# except Exception as e:
|
||
# print_error_msg(f"Error loading face encodings: {e}")
|
||
|
||
# current_images = [f for f in os.listdir(image_folder_path) if f.endswith('.jpg')]
|
||
# current_image_set = set(os.path.splitext(f)[0] for f in current_images)
|
||
# known_image_set = set(face_data.keys())
|
||
|
||
# added_images = current_image_set - known_image_set
|
||
# removed_images = known_image_set - current_image_set
|
||
|
||
# for image_name in added_images:
|
||
# image_path = os.path.join(image_folder_path, f"{image_name}.jpg")
|
||
# try:
|
||
# image = face_recognition.load_image_file(image_path)
|
||
# face_encodings = face_recognition.face_encodings(image)
|
||
# if face_encodings:
|
||
# face_data[image_name] = face_encodings[0].tolist()
|
||
# print_debug_msg(f"Added encoding for {image_name}.")
|
||
# else:
|
||
# print_warning_msg(f"[WARNING] No face detected in {image_name}.jpg.")
|
||
# except Exception as e:
|
||
# print_error_msg(f"[ERROR] Error processing {image_name}.jpg: {e}")
|
||
|
||
# for removed_image in removed_images:
|
||
# del face_data[removed_image]
|
||
# print_debug_msg(f"Removed encoding for {removed_image}.")
|
||
|
||
# np.save(npy_file_path, face_data)
|
||
# print_debug_msg(f"Updated face data saved with {len(face_data)} entries.")
|
||
# return face_data
|
||
|
||
# class ImageProcessingThread(QThread):
|
||
# processed_image_signal = pyqtSignal(object)
|
||
|
||
# def __init__(self, frame=None):
|
||
# super().__init__()
|
||
# self.frame = frame # 添加 frame 参数
|
||
# self.face_encodings = []
|
||
# self.face_names = []
|
||
# self.face_locations = []
|
||
# self.face_data = load_face_data()
|
||
# self.total_image_name = list(self.face_data.keys())
|
||
# self.total_face_encoding = [np.array(self.face_data[name]) for name in self.total_image_name]
|
||
|
||
# def process_frame(self, frame):
|
||
# """实时处理帧,匹配已知人脸"""
|
||
# self.face_locations = face_recognition.face_locations(frame)
|
||
# face_encodings = face_recognition.face_encodings(frame, self.face_locations)
|
||
# names = []
|
||
# for face_encoding in face_encodings:
|
||
# name = "Unknown"
|
||
# face_distances = face_recognition.face_distance(self.total_face_encoding, face_encoding)
|
||
|
||
# if face_distances.size > 0:
|
||
# min_distance = np.min(face_distances)
|
||
# best_match_index = np.argmin(face_distances)
|
||
|
||
# if min_distance < self.dynamic_tolerance(): # 使用动态容忍度
|
||
# name = self.total_image_name[best_match_index]
|
||
|
||
# names.append(name)
|
||
|
||
# self.face_names = names
|
||
# return frame
|
||
|
||
# def dynamic_tolerance(self):
|
||
# # 动态调整容忍度,可以基于一些规则来调整,如帧内错误率、识别准确率等
|
||
# error_rate = self.calculate_error_rate()
|
||
# if error_rate > 0.1:
|
||
# return 0.5 # 高容忍度以减少错误
|
||
# else:
|
||
# return 0.4 # 适中容忍度
|
||
|
||
# def calculate_error_rate(self):
|
||
# # 计算识别错误率,作为动态调整容忍度的依据
|
||
# total_faces = len(self.total_face_encoding)
|
||
# unmatched_faces = sum(1 for name in self.face_names if name == "Unknown")
|
||
# return unmatched_faces / total_faces if total_faces > 0 else 0
|
||
|
||
# def set_frame(self, frame):
|
||
# self.frame = frame
|
||
|
||
# def run(self):
|
||
# while True:
|
||
# if self.frame is not None:
|
||
# self.process_frame(self.frame)
|
||
# self.processed_image_signal.emit(self.frame)
|
||
# time.sleep(0.01) # 控制帧率
|
||
|
||
|
||
# # 摄像头采集线程
|
||
# class CameraThread(QThread):
|
||
# image_signal = pyqtSignal(object)
|
||
|
||
# def __init__(self, camera_url, circuit_id: int = 0):
|
||
# super().__init__()
|
||
# self.camera_url = camera_url
|
||
# self.face_detection: bool = True
|
||
|
||
# if isinstance(camera_url, int):
|
||
# self.camera_url_str = str(camera_url)
|
||
# else:
|
||
# self.camera_url_str = camera_url
|
||
|
||
# self.cap: cv2.VideoCapture = None
|
||
# self.running: bool = True
|
||
|
||
# self.fps = 0
|
||
# self.frame_count = 0
|
||
# self.fps_time = 0
|
||
|
||
# self.cycle_limit = DEFAULT_VIDEO_SLEEP_MS
|
||
# self.cycle_ms = 0
|
||
# self.circuit_id = circuit_id
|
||
# self.is_signal_connect = False
|
||
# self.is_emit = False
|
||
|
||
# self.set_video_cycle_ms(2000)
|
||
|
||
# def signal_connect(self, slot_func):
|
||
# self.image_signal.connect(slot_func)
|
||
# self.is_signal_connect = True
|
||
|
||
# def signal_disconnect(self):
|
||
# if self.is_signal_connect:
|
||
# self.image_signal.disconnect()
|
||
# self.is_signal_connect = False
|
||
|
||
# def set_video_cycle_ms(self, cycle_ms: int = 10):
|
||
# if self.cycle_limit != cycle_ms:
|
||
# if cycle_ms <= DEFAULT_VIDEO_SLEEP_MS:
|
||
# self.cycle_limit = DEFAULT_VIDEO_SLEEP_MS
|
||
# else:
|
||
# self.cycle_limit = cycle_ms
|
||
|
||
# def change_camera_url(self, camera_url: str):
|
||
# if self.circuit_id == camera_url:
|
||
# self.set_video_cycle_ms(DEFAULT_VIDEO_SLEEP_MS)
|
||
# else:
|
||
# self.set_video_cycle_ms(1000)
|
||
|
||
# def close(self):
|
||
# if self.cap is None:
|
||
# self.terminate()
|
||
# self.running = False
|
||
|
||
# def run(self):
|
||
# process_count = 0
|
||
# fps_time = 0
|
||
# face_thread = ImageProcessingThread()
|
||
# face_thread.start()
|
||
|
||
# self.match_count = 0 # 初始化匹配帧数计数器
|
||
|
||
# while self.running:
|
||
# inform_msg = "Camera init start, URL = " + self.camera_url_str
|
||
# print_inform_msg(inform_msg)
|
||
|
||
# try:
|
||
# self.cap = cv2.VideoCapture(self.camera_url) # 强制使用 BGR 格式
|
||
# if self.cap.isOpened():
|
||
# inform_msg = "Camera Init Success, URL = " + self.camera_url_str
|
||
# self.is_emit = True
|
||
# print_inform_msg(inform_msg)
|
||
# else:
|
||
# inform_msg = "Camera connection timeout, URL = " + self.camera_url_str
|
||
# print_error_msg(inform_msg)
|
||
|
||
# except Exception as e:
|
||
# self.cap = None
|
||
# inform_msg = "Camera Init Fail, URL = " + self.camera_url_str
|
||
# print_error_msg(inform_msg)
|
||
|
||
# base_time = time.time()
|
||
|
||
# while self.running and self.cap:
|
||
# time.sleep(DEFAULT_VIDEO_SLEEP_MS / 1000)
|
||
|
||
# if self.cycle_ms + DEFAULT_VIDEO_SLEEP_MS < self.cycle_limit:
|
||
# self.cycle_ms += DEFAULT_VIDEO_SLEEP_MS
|
||
# self.cap.grab()
|
||
# continue
|
||
# else:
|
||
# self.cycle_ms = 0
|
||
|
||
# try:
|
||
# ret, frame = self.cap.read()
|
||
# if not ret:
|
||
# break
|
||
|
||
# face_thread.set_frame(frame)
|
||
|
||
# cur_time = time.time()
|
||
# execution_time = cur_time - base_time
|
||
# base_time = cur_time
|
||
# process_count += 1
|
||
|
||
# fps_time += execution_time
|
||
# if fps_time >= 1: # 每隔1秒重新计算 FPS
|
||
# self.fps = process_count
|
||
# process_count = 0
|
||
# fps_time = 0
|
||
|
||
# if execution_time < 5:
|
||
# frame: QObject = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
# for (top, right, bottom, left), name in zip(face_thread.face_locations, face_thread.face_names):
|
||
# if name != "Unknown": # 只绘制已匹配的结果
|
||
# self.match_count += 1 # 计数成功匹配的帧
|
||
# # cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
|
||
# # cv2.putText(frame, name, (left + 6, bottom - 6), cv2.FONT_HERSHEY_DUPLEX, 1.0, (255, 255, 255), 1)
|
||
|
||
# # 将OpenCV图像转换为PIL图像格式(注意要转换色彩空间,OpenCV是BGR,PIL是RGB)
|
||
# pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||
# draw = ImageDraw.Draw(pil_image)
|
||
# # 这里选择合适的中文字体文件路径,比如系统自带的宋体(不同系统路径可能有差异),并设置字体大小
|
||
# font = ImageFont.truetype(font_path, 15)
|
||
# # 在指定位置绘制中文
|
||
# draw.text((left + 12, bottom + 12), name, font=font, fill=(255, 255, 255))
|
||
# # 再将PIL图像转换回OpenCV图像格式
|
||
# frame = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
|
||
# cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
|
||
# else:
|
||
# name = "未在人脸库中,无操作权限"
|
||
# pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||
# draw = ImageDraw.Draw(pil_image)
|
||
# #设置字体大小
|
||
# font = ImageFont.truetype(font_path, 15)
|
||
# # 在指定位置绘制中文
|
||
# draw.text((left + 12, bottom + 12), name, font=font, fill=(255, 255, 255))
|
||
# # 再将PIL图像转换回OpenCV图像格式
|
||
# frame = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
|
||
# cv2.rectangle(frame, (left, top), (right, bottom), (255, 0, 0), 2)
|
||
# self.image_signal.emit(frame)
|
||
# else:
|
||
# err_message = "Camera read timeout, URL = " + self.camera_url_str
|
||
# print_error_msg(err_message)
|
||
# break
|
||
|
||
# except Exception as e:
|
||
# err_message = "Camera read exception, URL = " + self.camera_url_str
|
||
# print_error_msg(err_message)
|
||
# if self.running:
|
||
# time.sleep(2)
|
||
# break
|
||
|
||
# if self.cap:
|
||
# self.cap.release()
|
||
# self.cap = None
|
||
|
||
# print_inform_msg("Camera connection ended")
|
||
# time.sleep(0.01)
|
||
|
||
|
||
|