from enum import Enum
from inspect import signature
from threading import Event, Thread
from pitop.core import ImageFunctions
from pitop.core.mixins import Recreatable, Stateful
from pitop.pma.common import type_check
from .core import CameraTypes, FileSystemCamera, FrameHandler, UsbCamera
from .core.capture_actions import CaptureActions
[docs]
class Camera(Stateful, Recreatable):
"""Provides a variety of high-level functionality for using the PMA USB
Camera, including capturing images and video, and processing image data
from the camera.
:type index: int
:param index:
ID of the video capturing device to open.
Passing `None` will cause the backend to autodetect the
available video capture devices and attempt to use them.
"""
__VALID_FORMATS = ("opencv", "pil")
def __init__(
self,
index=None,
resolution=(640, 480),
camera_type=CameraTypes.USB_CAMERA,
path_to_images="",
format="PIL",
flip_top_bottom: bool = False,
flip_left_right: bool = False,
rotate_angle=0,
name="camera",
):
# Initialise private variables
self._resolution = resolution
self._format = None
# Set format using setter property function
self.format = format
# Frame callback
self.on_frame = None
# Internal
self._index = index
self._camera_type = CameraTypes(camera_type)
self._path_to_images = path_to_images
self._rotate_angle = rotate_angle
self._flip_top_bottom = flip_top_bottom
self._flip_left_right = flip_left_right
if self._camera_type == CameraTypes.USB_CAMERA:
self._camera = UsbCamera(
index=self._index,
resolution=self._resolution,
flip_top_bottom=flip_top_bottom,
flip_left_right=flip_left_right,
rotate_angle=self._rotate_angle,
)
elif self._camera_type == CameraTypes.FILE_SYSTEM_CAMERA:
self._camera = FileSystemCamera(self._path_to_images)
self._continue_processing = True
self._frame_handler = FrameHandler()
self._new_frame_event = Event()
self._process_image_thread = Thread(
target=self.__process_camera_output, daemon=True
)
self._process_image_thread.start()
self.name = name
Stateful.__init__(self)
Recreatable.__init__(
self,
config_dict={
"index": index,
"resolution": resolution,
"camera_type": (
camera_type.value if isinstance(camera_type, Enum) else camera_type
),
"path_to_images": path_to_images,
"format": format,
"name": self.name,
"flip_top_bottom": self._flip_top_bottom,
"flip_left_right": self._flip_left_right,
"rotate_angle": self._rotate_angle,
},
)
@property
def own_state(self):
return {
"running": lambda: self._process_image_thread.is_alive(),
"capture_actions": lambda: self._frame_handler.current_actions(),
}
[docs]
@classmethod
def from_file_system(cls, path_to_images: str):
"""Alternative classmethod to create an instance of a :class:`Camera`
object using a :data:`FileSystemCamera`"""
return cls(
camera_type=CameraTypes.FILE_SYSTEM_CAMERA, path_to_images=path_to_images
)
@property
def format(self):
return self._format
@format.setter
def format(self, format_value):
ImageFunctions.image_format_check(format_value)
self._format = format_value.lower()
[docs]
@classmethod
def from_usb(cls, index=None):
"""Alternative classmethod to create an instance of a :class:`Camera`
object using a :data:`UsbCamera`"""
return cls(camera_type=CameraTypes.USB_CAMERA, index=index)
def __exit__(self, exc_type, exc_value, exc_traceback):
self._camera = None
self._continue_processing = False
if self._process_image_thread.is_alive():
self._process_image_thread.join()
[docs]
def is_recording(self):
"""Returns True if recording mode is enabled."""
return self._frame_handler.is_running_action(
CaptureActions.CAPTURE_VIDEO_TO_FILE
)
[docs]
def is_detecting_motion(self):
"""Returns True if motion detection mode is enabled."""
return self._frame_handler.is_running_action(CaptureActions.DETECT_MOTION)
[docs]
@type_check
def capture_image(self, output_file_name=""):
"""Capture a single frame image to file.
.. note::
If no :data:`output_file_name` argument is provided, images will be stored in `~/Camera`.
:type output_file_name: str
:param output_file_name:
The filename into which to write the image.
"""
self._frame_handler.register_action(
CaptureActions.CAPTURE_SINGLE_FRAME, {"output_file_name": output_file_name}
)
[docs]
@type_check
def start_video_capture(self, output_file_name="", fps=20.0, resolution=None):
"""Begin capturing video from the camera.
.. note::
If no :data:`output_file_name` argument is provided, video will be stored in `~/Camera`.
:type output_file_name: str
:param output_file_name:
The filename into which to write the video.
:type fps: int or float
:param fps:
The framerate to use for the captured video. Defaults to 20.0 fps
:type resolution: tuple
:param resolution:
The resolution to use for the captured video. Defaults to (640, 368)
"""
args = {
"output_file_name": output_file_name,
"fps": fps,
"resolution": resolution,
}
self._frame_handler.register_action(CaptureActions.CAPTURE_VIDEO_TO_FILE, args)
[docs]
def stop_video_capture(self):
"""Stop capturing video from the camera.
Does nothing unless :class:`start_video_capture` has been
called.
"""
self._frame_handler.remove_action(CaptureActions.CAPTURE_VIDEO_TO_FILE)
[docs]
@type_check
def start_detecting_motion(
self, callback_on_motion, moving_object_minimum_area=300
):
"""Begin processing image data from the camera, attempting to detect
motion. When motion is detected, call the function passed in.
.. warning::
The callback function can take either no arguments or only one, which will be used to provide the image back
to the user when motion is detected.
If a callback with another signature is received, the method will raise an exception.
:type callback_on_motion: function
:param callback_on_motion:
A callback function that will be called when motion is detected.
:type moving_object_minimum_area: int
:param moving_object_minimum_area:
The sensitivity of the motion detection, measured as the area of pixels changing between frames that constitutes motion.
"""
args = {
"callback_on_motion": callback_on_motion,
"moving_object_minimum_area": moving_object_minimum_area,
}
callback_signature = signature(callback_on_motion)
if len(callback_signature.parameters) > 1:
raise ValueError(
"Invalid callback signature: it should receive at most one argument."
)
self._frame_handler.register_action(CaptureActions.DETECT_MOTION, args)
[docs]
def stop_detecting_motion(self):
"""Stop running the motion detection processing.
Does nothing unless :class:`start_detecting_motion` has been
called.
"""
self._frame_handler.remove_action(CaptureActions.DETECT_MOTION)
[docs]
@type_check
def start_handling_frames(self, callback_on_frame, frame_interval=1, format=None):
"""Begin calling the passed callback with each new frame, allowing for
custom processing.
.. warning::
The callback function can take either no arguments or only one, which will be used to provide the image back
to the user.
If a callback with another signature is received, the method will raise an exception.
:type callback_on_frame: function
:param callback_on_frame:
A callback function that will be called every :class:`frame_interval` camera frames.
:type frame_interval: int
:param frame_interval:
The callback will run every frame_interval frames, decreasing the frame rate of processing. Defaults to 1.
:type format: string
:param format:
DEPRECATED. Set 'camera.format' directly, and call this function directly instead.
"""
if format is not None:
print(
"'format' is no longer supported in this function. "
"Please set the 'camera.format' property directly, and call this function without 'format' parameter."
)
args = {
"callback_on_frame": callback_on_frame,
"frame_interval": frame_interval,
"format": self.format,
}
callback_signature = signature(callback_on_frame)
if len(callback_signature.parameters) == 0:
raise ValueError(
"Invalid callback signature: it should receive at least one argument."
)
self._frame_handler.register_action(CaptureActions.HANDLE_FRAME, args)
[docs]
def stop_handling_frames(self):
"""Stops handling camera frames.
Does nothing unless :class:`start_handling_frames` has been
called.
"""
self._frame_handler.remove_action(CaptureActions.HANDLE_FRAME)
def __get_processed_current_frame(self):
image = self._frame_handler.frame
if self.format.lower() == "opencv":
image = ImageFunctions.convert(image, format="opencv")
return image
def __process_camera_output(self):
while (
self._camera
and self._camera.is_opened()
and self._continue_processing is True
):
self._frame_handler.frame = self._camera.get_frame()
self._new_frame_event.set()
if callable(self.on_frame):
self.on_frame(self.__get_processed_current_frame())
try:
self._frame_handler.process()
except Exception as e:
print(f"Error in camera frame handler: {e}")
[docs]
def current_frame(self, format=None):
"""Returns the latest frame captured by the camera. This method is non-
blocking and can return the same frame multiple times.
By default the returned image is formatted as a :class:`PIL.Image.Image`.
:type format: string
:param format:
DEPRECATED. Set 'camera.format' directly, and call this function directly instead.
"""
if format is not None:
print(
"'format' is no longer supported in this function. "
"Please set the 'camera.format' property directly, and call this function without 'format' parameter."
)
return self.__get_processed_current_frame()
[docs]
def get_frame(self, format=None):
"""Returns the next frame captured by the camera. This method blocks
until a new frame is available.
:type format: string
:param format: DEPRECATED. Set 'camera.format' directly, and
call this function directly instead.
"""
if format is not None:
print(
"'format' is no longer supported in this function. "
"Please set the 'camera.format' property directly, and call this function without 'format' parameter."
)
self._new_frame_event.wait()
self._new_frame_event.clear()
return self.__get_processed_current_frame()