from PyQt5.QtCore import QTimer, QThread, QObject, pyqtSignal from PyQt5.QtWidgets import QApplication from PyQt5.QtGui import QImage, QPixmap import cv2 # from src.webcam_manager import WebcamManager class WebcamControl(QObject): command_signal = pyqtSignal(bool) status_signal = pyqtSignal(bool) shutdown_event = pyqtSignal() def __init__(self): super(WebcamControl, self).__init__() self.webcammanager = WebcamManager() self.webcammanager_thread = QThread() self.webcammanager.moveToThread(self.webcammanager_thread) self.webcammanager.video_timer.moveToThread(self.webcammanager_thread) self.command_signal.connect(self.webcammanager.command_connection) self.webcammanager.connection_status.connect(self.handle_connection) def start(self): self.webcammanager_thread.start() def shutdown(self): self.webcammanager.disconnect() self.shutdown_event.emit() def command_connect(self, connect): self.command_signal.emit(connect) def handle_connection(self, connected): self.status_signal.emit(connected) class WebcamManager(QObject): connection_status = pyqtSignal(bool) recording_status = pyqtSignal(bool) video = pyqtSignal(QPixmap) video_timer = None def __init__(self): QObject.__init__(self) self.connected = False self.video_width = 1280 self.video_height = 720 self.capture = None self.monitor_loop_running = None # Set up the video stream loop connection self.video_timer = QTimer() def command_connection(self, connect): """ Control connect/disconnect """ if connect: self.connect() else: self.disconnect() def connect(self): """ connect to webcam """ if not self.connected: self.video_timer.timeout.connect(self.stream_loop) self.capture = cv2.VideoCapture(0) if self.capture.isOpened(): self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.video_width) self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.video_height) self.monitor_loop_running = True self.video_timer.start(33) # delay for approx. 30 fps. self.connected = True self.connection_status.emit(self.connected) def disconnect(self): """ disconnect the webcam """ if self.connected: self.connected = False self.monitor_loop_running = False self.capture.release() self.capture = None self.video_timer.stop() self.connection_status.emit(self.connected) def stream_loop(self): if self.monitor_loop_running: ret, frame = self.capture.read() if ret: frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image = QImage(frame, 1280, 720, QImage.Format_RGB888) pixmap = QPixmap.fromImage(image) self.video.emit(pixmap) else: self.disconnect()