aquastat/picamera/mmalobj.py
2023-02-01 21:37:42 +00:00

3737 lines
147 KiB
Python

# vim: set et sw=4 sts=4 fileencoding=utf-8:
#
# Python header conversion
# Copyright (c) 2013-2017 Dave Jones <dave@waveform.org.uk>
#
# Original headers
# Copyright (c) 2012, Broadcom Europe Ltd
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import (
unicode_literals,
print_function,
division,
absolute_import,
)
# Make Py2's str equivalent to Py3's
str = type('')
import io
import ctypes as ct
import warnings
import weakref
from threading import Thread, Event
from collections import namedtuple
from fractions import Fraction
from itertools import cycle
from functools import reduce
from operator import mul
from . import bcm_host, mmal
from .streams import BufferIO
from .exc import (
mmal_check,
PiCameraValueError,
PiCameraRuntimeError,
PiCameraMMALError,
PiCameraPortDisabled,
PiCameraDeprecated,
)
# Old firmwares confuse the RGB24 and BGR24 encodings. This flag tracks whether
# the order needs fixing (it is set during MMALCamera.__init__).
FIX_RGB_BGR_ORDER = None
# Mapping of parameters to the C-structure they expect / return. If a parameter
# does not appear in this mapping, it cannot be queried / set with the
# MMALControlPort.params attribute.
PARAM_TYPES = {
mmal.MMAL_PARAMETER_ALGORITHM_CONTROL: mmal.MMAL_PARAMETER_ALGORITHM_CONTROL_T,
mmal.MMAL_PARAMETER_ANNOTATE: None, # adjusted by MMALCamera.annotate_rev
mmal.MMAL_PARAMETER_ANTISHAKE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_AUDIO_LATENCY_TARGET: mmal.MMAL_PARAMETER_AUDIO_LATENCY_TARGET_T,
mmal.MMAL_PARAMETER_AWB_MODE: mmal.MMAL_PARAMETER_AWBMODE_T,
mmal.MMAL_PARAMETER_BRIGHTNESS: mmal.MMAL_PARAMETER_RATIONAL_T,
mmal.MMAL_PARAMETER_BUFFER_FLAG_FILTER: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_BUFFER_REQUIREMENTS: mmal.MMAL_PARAMETER_BUFFER_REQUIREMENTS_T,
mmal.MMAL_PARAMETER_CAMERA_BURST_CAPTURE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_CAMERA_CLOCKING_MODE: mmal.MMAL_PARAMETER_CAMERA_CLOCKING_MODE_T,
mmal.MMAL_PARAMETER_CAMERA_CONFIG: mmal.MMAL_PARAMETER_CAMERA_CONFIG_T,
mmal.MMAL_PARAMETER_CAMERA_CUSTOM_SENSOR_CONFIG: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_CAMERA_INFO: None, # adjusted by MMALCameraInfo.info_rev
mmal.MMAL_PARAMETER_CAMERA_INTERFACE: mmal.MMAL_PARAMETER_CAMERA_INTERFACE_T,
mmal.MMAL_PARAMETER_CAMERA_ISP_BLOCK_OVERRIDE: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_CAMERA_MIN_ISO: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_CAMERA_NUM: mmal.MMAL_PARAMETER_INT32_T,
mmal.MMAL_PARAMETER_CAMERA_RX_CONFIG: mmal.MMAL_PARAMETER_CAMERA_RX_CONFIG_T,
mmal.MMAL_PARAMETER_CAMERA_RX_TIMING: mmal.MMAL_PARAMETER_CAMERA_RX_TIMING_T,
mmal.MMAL_PARAMETER_CAMERA_SETTINGS: mmal.MMAL_PARAMETER_CAMERA_SETTINGS_T,
mmal.MMAL_PARAMETER_CAMERA_USE_CASE: mmal.MMAL_PARAMETER_CAMERA_USE_CASE_T,
mmal.MMAL_PARAMETER_CAPTURE_EXPOSURE_COMP: mmal.MMAL_PARAMETER_INT32_T,
mmal.MMAL_PARAMETER_CAPTURE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_CAPTURE_MODE: mmal.MMAL_PARAMETER_CAPTUREMODE_T,
mmal.MMAL_PARAMETER_CAPTURE_STATS_PASS: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_CAPTURE_STATUS: mmal.MMAL_PARAMETER_CAPTURE_STATUS_T,
mmal.MMAL_PARAMETER_CHANGE_EVENT_REQUEST: mmal.MMAL_PARAMETER_CHANGE_EVENT_REQUEST_T,
mmal.MMAL_PARAMETER_CLOCK_ACTIVE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_CLOCK_DISCONT_THRESHOLD: mmal.MMAL_PARAMETER_CLOCK_DISCONT_THRESHOLD_T,
mmal.MMAL_PARAMETER_CLOCK_ENABLE_BUFFER_INFO: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_CLOCK_FRAME_RATE: mmal.MMAL_PARAMETER_RATIONAL_T,
mmal.MMAL_PARAMETER_CLOCK_LATENCY: mmal.MMAL_PARAMETER_CLOCK_LATENCY_T,
mmal.MMAL_PARAMETER_CLOCK_REQUEST_THRESHOLD: mmal.MMAL_PARAMETER_CLOCK_REQUEST_THRESHOLD_T,
mmal.MMAL_PARAMETER_CLOCK_SCALE: mmal.MMAL_PARAMETER_RATIONAL_T,
mmal.MMAL_PARAMETER_CLOCK_TIME: mmal.MMAL_PARAMETER_INT64_T,
mmal.MMAL_PARAMETER_CLOCK_UPDATE_THRESHOLD: mmal.MMAL_PARAMETER_CLOCK_UPDATE_THRESHOLD_T,
mmal.MMAL_PARAMETER_COLOUR_EFFECT: mmal.MMAL_PARAMETER_COLOURFX_T,
mmal.MMAL_PARAMETER_CONTRAST: mmal.MMAL_PARAMETER_RATIONAL_T,
mmal.MMAL_PARAMETER_CORE_STATISTICS: mmal.MMAL_PARAMETER_CORE_STATISTICS_T,
mmal.MMAL_PARAMETER_CUSTOM_AWB_GAINS: mmal.MMAL_PARAMETER_AWB_GAINS_T,
mmal.MMAL_PARAMETER_DISPLAYREGION: mmal.MMAL_DISPLAYREGION_T,
mmal.MMAL_PARAMETER_DPF_CONFIG: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_DYNAMIC_RANGE_COMPRESSION: mmal.MMAL_PARAMETER_DRC_T,
mmal.MMAL_PARAMETER_ENABLE_RAW_CAPTURE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_EXIF_DISABLE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_EXIF: mmal.MMAL_PARAMETER_EXIF_T,
mmal.MMAL_PARAMETER_EXP_METERING_MODE: mmal.MMAL_PARAMETER_EXPOSUREMETERINGMODE_T,
mmal.MMAL_PARAMETER_EXPOSURE_COMP: mmal.MMAL_PARAMETER_INT32_T,
mmal.MMAL_PARAMETER_EXPOSURE_MODE: mmal.MMAL_PARAMETER_EXPOSUREMODE_T,
mmal.MMAL_PARAMETER_EXTRA_BUFFERS: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_FIELD_OF_VIEW: mmal.MMAL_PARAMETER_FIELD_OF_VIEW_T,
mmal.MMAL_PARAMETER_FLASH: mmal.MMAL_PARAMETER_FLASH_T,
mmal.MMAL_PARAMETER_FLASH_REQUIRED: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_FLASH_SELECT: mmal.MMAL_PARAMETER_FLASH_SELECT_T,
mmal.MMAL_PARAMETER_FLICKER_AVOID: mmal.MMAL_PARAMETER_FLICKERAVOID_T,
mmal.MMAL_PARAMETER_FOCUS: mmal.MMAL_PARAMETER_FOCUS_T,
mmal.MMAL_PARAMETER_FOCUS_REGIONS: mmal.MMAL_PARAMETER_FOCUS_REGIONS_T,
mmal.MMAL_PARAMETER_FOCUS_STATUS: mmal.MMAL_PARAMETER_FOCUS_STATUS_T,
mmal.MMAL_PARAMETER_FPS_RANGE: mmal.MMAL_PARAMETER_FPS_RANGE_T,
mmal.MMAL_PARAMETER_FRAME_RATE: mmal.MMAL_PARAMETER_RATIONAL_T, # actually mmal.MMAL_PARAMETER_FRAME_RATE_T but this only contains a rational anyway...
mmal.MMAL_PARAMETER_IMAGE_EFFECT: mmal.MMAL_PARAMETER_IMAGEFX_T,
mmal.MMAL_PARAMETER_IMAGE_EFFECT_PARAMETERS: mmal.MMAL_PARAMETER_IMAGEFX_PARAMETERS_T,
mmal.MMAL_PARAMETER_INPUT_CROP: mmal.MMAL_PARAMETER_INPUT_CROP_T,
mmal.MMAL_PARAMETER_INTRAPERIOD: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_ISO: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_JPEG_ATTACH_LOG: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_JPEG_Q_FACTOR: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_JPEG_RESTART_INTERVAL: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_LOCKSTEP_ENABLE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_LOGGING: mmal.MMAL_PARAMETER_LOGGING_T,
mmal.MMAL_PARAMETER_MB_ROWS_PER_SLICE: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_MEM_USAGE: mmal.MMAL_PARAMETER_MEM_USAGE_T,
mmal.MMAL_PARAMETER_MINIMISE_FRAGMENTATION: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_MIRROR: mmal.MMAL_PARAMETER_UINT32_T, # actually mmal.MMAL_PARAMETER_MIRROR_T but this just contains a uint32
mmal.MMAL_PARAMETER_NALUNITFORMAT: mmal.MMAL_PARAMETER_VIDEO_NALUNITFORMAT_T,
mmal.MMAL_PARAMETER_NO_IMAGE_PADDING: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_POWERMON_ENABLE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_PRIVACY_INDICATOR: mmal.MMAL_PARAMETER_PRIVACY_INDICATOR_T,
mmal.MMAL_PARAMETER_PROFILE: mmal.MMAL_PARAMETER_VIDEO_PROFILE_T,
mmal.MMAL_PARAMETER_RATECONTROL: mmal.MMAL_PARAMETER_VIDEO_RATECONTROL_T,
mmal.MMAL_PARAMETER_REDEYE: mmal.MMAL_PARAMETER_REDEYE_T,
mmal.MMAL_PARAMETER_ROTATION: mmal.MMAL_PARAMETER_INT32_T,
mmal.MMAL_PARAMETER_SATURATION: mmal.MMAL_PARAMETER_RATIONAL_T,
mmal.MMAL_PARAMETER_SEEK: mmal.MMAL_PARAMETER_SEEK_T,
mmal.MMAL_PARAMETER_SENSOR_INFORMATION: mmal.MMAL_PARAMETER_SENSOR_INFORMATION_T,
mmal.MMAL_PARAMETER_SHARPNESS: mmal.MMAL_PARAMETER_RATIONAL_T,
mmal.MMAL_PARAMETER_SHUTTER_SPEED: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_STATISTICS: mmal.MMAL_PARAMETER_STATISTICS_T,
mmal.MMAL_PARAMETER_STEREOSCOPIC_MODE: mmal.MMAL_PARAMETER_STEREOSCOPIC_MODE_T,
mmal.MMAL_PARAMETER_STILLS_DENOISE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_SUPPORTED_ENCODINGS: mmal.MMAL_PARAMETER_ENCODING_T,
mmal.MMAL_PARAMETER_SUPPORTED_PROFILES: mmal.MMAL_PARAMETER_VIDEO_PROFILE_T,
mmal.MMAL_PARAMETER_SW_SATURATION_DISABLE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_SW_SHARPEN_DISABLE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_SYSTEM_TIME: mmal.MMAL_PARAMETER_UINT64_T,
mmal.MMAL_PARAMETER_THUMBNAIL_CONFIGURATION: mmal.MMAL_PARAMETER_THUMBNAIL_CONFIG_T,
mmal.MMAL_PARAMETER_URI: mmal.MMAL_PARAMETER_URI_T,
mmal.MMAL_PARAMETER_USE_STC: mmal.MMAL_PARAMETER_CAMERA_STC_MODE_T,
mmal.MMAL_PARAMETER_VIDEO_ALIGN_HORIZ: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ALIGN_VERT: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_BIT_RATE: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_DENOISE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_DROPPABLE_PFRAMES: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_EEDE_ENABLE: mmal.MMAL_PARAMETER_VIDEO_EEDE_ENABLE_T,
mmal.MMAL_PARAMETER_VIDEO_EEDE_LOSSRATE: mmal.MMAL_PARAMETER_VIDEO_EEDE_LOSSRATE_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_FRAME_LIMIT_BITS: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_INITIAL_QUANT: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_INLINE_HEADER: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_INLINE_VECTORS: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_MAX_QUANT: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_MIN_QUANT: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_PEAK_RATE: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_QP_P: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_RC_MODEL: mmal.MMAL_PARAMETER_VIDEO_ENCODE_RC_MODEL_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_RC_SLICE_DQUANT: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_SEI_ENABLE: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_ENCODE_SPS_TIMING: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_FRAME_RATE: mmal.MMAL_PARAMETER_RATIONAL_T, # actually mmal.MMAL_PARAMETER_FRAME_RATE_T but this only contains a rational anyway...
mmal.MMAL_PARAMETER_VIDEO_IMMUTABLE_INPUT: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_INTERLACE_TYPE: mmal.MMAL_PARAMETER_VIDEO_INTERLACE_TYPE_T,
mmal.MMAL_PARAMETER_VIDEO_INTERPOLATE_TIMESTAMPS: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_INTRA_REFRESH: mmal.MMAL_PARAMETER_VIDEO_INTRA_REFRESH_T,
mmal.MMAL_PARAMETER_VIDEO_LEVEL_EXTENSION: mmal.MMAL_PARAMETER_VIDEO_LEVEL_EXTENSION_T,
mmal.MMAL_PARAMETER_VIDEO_MAX_NUM_CALLBACKS: mmal.MMAL_PARAMETER_UINT32_T,
mmal.MMAL_PARAMETER_VIDEO_RENDER_STATS: mmal.MMAL_PARAMETER_VIDEO_RENDER_STATS_T,
mmal.MMAL_PARAMETER_VIDEO_REQUEST_I_FRAME: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_VIDEO_STABILISATION: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_ZERO_COPY: mmal.MMAL_PARAMETER_BOOLEAN_T,
mmal.MMAL_PARAMETER_ZERO_SHUTTER_LAG: mmal.MMAL_PARAMETER_ZEROSHUTTERLAG_T,
mmal.MMAL_PARAMETER_ZOOM: mmal.MMAL_PARAMETER_SCALEFACTOR_T,
}
class PiCameraFraction(Fraction):
"""
Extends :class:`~fractions.Fraction` to act as a (numerator, denominator)
tuple when required.
"""
def __len__(self):
warnings.warn(
PiCameraDeprecated(
'Accessing framerate as a tuple is deprecated; this value is '
'now a Fraction, so you can query the numerator and '
'denominator properties directly, convert to an int or float, '
'or perform arithmetic operations and comparisons directly'))
return 2
def __getitem__(self, index):
warnings.warn(
PiCameraDeprecated(
'Accessing framerate as a tuple is deprecated; this value is '
'now a Fraction, so you can query the numerator and '
'denominator properties directly, convert to an int or float, '
'or perform arithmetic operations and comparisons directly'))
if index == 0:
return self.numerator
elif index == 1:
return self.denominator
else:
raise IndexError('invalid index %d' % index)
def __contains__(self, value):
return value in (self.numerator, self.denominator)
class PiResolution(namedtuple('PiResolution', ('width', 'height'))):
"""
A :func:`~collections.namedtuple` derivative which represents a resolution
with a :attr:`width` and :attr:`height`.
.. attribute:: width
The width of the resolution in pixels
.. attribute:: height
The height of the resolution in pixels
.. versionadded:: 1.11
"""
__slots__ = () # workaround python issue #24931
def pad(self, width=32, height=16):
"""
Returns the resolution padded up to the nearest multiple of *width*
and *height* which default to 32 and 16 respectively (the camera's
native block size for most operations). For example:
.. code-block:: pycon
>>> PiResolution(1920, 1080).pad()
PiResolution(width=1920, height=1088)
>>> PiResolution(100, 100).pad(16, 16)
PiResolution(width=128, height=112)
>>> PiResolution(100, 100).pad(16, 16)
PiResolution(width=112, height=112)
"""
return PiResolution(
width=((self.width + (width - 1)) // width) * width,
height=((self.height + (height - 1)) // height) * height,
)
def transpose(self):
"""
Returns the resolution with the width and height transposed. For
example:
.. code-block:: pycon
>>> PiResolution(1920, 1080).transpose()
PiResolution(width=1080, height=1920)
"""
return PiResolution(self.height, self.width)
def __str__(self):
return '%dx%d' % (self.width, self.height)
class PiFramerateRange(namedtuple('PiFramerateRange', ('low', 'high'))):
"""
This class is a :func:`~collections.namedtuple` derivative used to store
the low and high limits of a range of framerates. It is recommended that
you access the information stored by this class by attribute rather than
position (for example: ``camera.framerate_range.low`` rather than
``camera.framerate_range[0]``).
.. attribute:: low
The lowest framerate that the camera is permitted to use (inclusive).
When the :attr:`~picamera.PiCamera.framerate_range` attribute is
queried, this value will always be returned as a
:class:`~fractions.Fraction`.
.. attribute:: high
The highest framerate that the camera is permitted to use (inclusive).
When the :attr:`~picamera.PiCamera.framerate_range` attribute is
queried, this value will always be returned as a
:class:`~fractions.Fraction`.
.. versionadded:: 1.13
"""
__slots__ = () # workaround python issue #24931
def __new__(cls, low, high):
return super(PiFramerateRange, cls).__new__(cls, to_fraction(low),
to_fraction(high))
def __str__(self):
return '%s..%s' % (self.low, self.high)
class PiSensorMode(namedtuple('PiSensorMode', ('resolution', 'framerates',
'video', 'still', 'full_fov'))):
"""
This class is a :func:`~collections.namedtuple` derivative used to store
the attributes describing a camera sensor mode.
.. attribute:: resolution
A :class:`PiResolution` specifying the size of frames output by the
camera in this mode.
.. attribute:: framerates
A :class:`PiFramerateRange` specifying the minimum and maximum
framerates supported by this sensor mode. Typically the low value is
exclusive and high value inclusive.
.. attribute:: video
A :class:`bool` indicating whether or not the mode is capable of
recording video. Currently this is always ``True``.
.. attribute:: still
A :class:`bool` indicating whether the mode can be used for still
captures (cases where a capture method is called with
``use_video_port`` set to ``False``).
.. attribute:: full_fov
A :class:`bool` indicating whether the full width of the sensor
area is used to capture frames. This can be ``True`` even when the
resolution is less than the camera's maximum resolution due to binning
and skipping. See :ref:`camera_modes` for a diagram of the available
fields of view.
"""
__slots__ = () # workaround python issue #24931
def __new__(cls, resolution, framerates, video=True, still=False,
full_fov=True):
return super(PiSensorMode, cls).__new__(
cls,
resolution
if isinstance(resolution, PiResolution) else
to_resolution(resolution),
framerates
if isinstance(framerates, PiFramerateRange) else
PiFramerateRange(*framerates),
video, still, full_fov)
def open_stream(stream, output=True, buffering=65536):
"""
This is the core of picamera's IO-semantics. It returns a tuple of a
file-like object and a bool indicating whether the stream requires closing
once the caller is finished with it.
* If *stream* is a string, it is opened as a file object (with mode 'wb' if
*output* is ``True``, and the specified amount of *bufffering*). In this
case the function returns ``(stream, True)``.
* If *stream* is a stream with a ``write`` method, it is returned as
``(stream, False)``.
* Otherwise *stream* is assumed to be a writeable buffer and is wrapped
with :class:`BufferIO`. The function returns ``(stream, True)``.
"""
if isinstance(stream, bytes):
stream = stream.decode('ascii')
opened = isinstance(stream, str)
if opened:
stream = io.open(stream, 'wb' if output else 'rb', buffering)
else:
try:
if output:
stream.write
else:
stream.read
except AttributeError:
# Assume the stream is actually a buffer
opened = True
stream = BufferIO(stream)
if output and not stream.writable:
raise IOError('writeable buffer required for output')
return (stream, opened)
def close_stream(stream, opened):
"""
If *opened* is ``True``, then the ``close`` method of *stream* will be
called. Otherwise, the function will attempt to call the ``flush`` method
on *stream* (if one exists). This function essentially takes the output
of :func:`open_stream` and finalizes the result.
"""
if opened:
stream.close()
else:
try:
stream.flush()
except AttributeError:
pass
def to_resolution(value):
"""
Converts *value* which may be a (width, height) tuple or a string
containing a representation of a resolution (e.g. "1024x768" or "1080p") to
a (width, height) tuple.
"""
if isinstance(value, bytes):
value = value.decode('utf-8')
if isinstance(value, str):
try:
# A selection from https://en.wikipedia.org/wiki/Graphics_display_resolution
# Feel free to suggest additions
w, h = {
'VGA': (640, 480),
'SVGA': (800, 600),
'XGA': (1024, 768),
'SXGA': (1280, 1024),
'UXGA': (1600, 1200),
'HD': (1280, 720),
'FHD': (1920, 1080),
'1080P': (1920, 1080),
'720P': (1280, 720),
}[value.strip().upper()]
except KeyError:
w, h = (int(i.strip()) for i in value.upper().split('X', 1))
else:
try:
w, h = value
except (TypeError, ValueError):
raise PiCameraValueError("Invalid resolution tuple: %r" % value)
return PiResolution(w, h)
def to_fraction(value, den_limit=65536):
"""
Converts *value*, which can be any numeric type, an MMAL_RATIONAL_T, or a
(numerator, denominator) tuple to a :class:`~fractions.Fraction` limiting
the denominator to the range 0 < n <= *den_limit* (which defaults to
65536).
"""
try:
# int, long, or fraction
n, d = value.numerator, value.denominator
except AttributeError:
try:
# float
n, d = value.as_integer_ratio()
except AttributeError:
try:
n, d = value.num, value.den
except AttributeError:
try:
# tuple
n, d = value
warnings.warn(
PiCameraDeprecated(
"Setting framerate or gains as a tuple is "
"deprecated; please use one of Python's many "
"numeric classes like int, float, Decimal, or "
"Fraction instead"))
except (TypeError, ValueError):
# try and convert anything else to a Fraction directly
value = Fraction(value)
n, d = value.numerator, value.denominator
# Ensure denominator is reasonable
if d == 0:
raise PiCameraValueError("Denominator cannot be 0")
elif d > den_limit:
return Fraction(n, d).limit_denominator(den_limit)
else:
return Fraction(n, d)
def to_rational(value):
"""
Converts *value* (which can be anything accepted by :func:`to_fraction`) to
an MMAL_RATIONAL_T structure.
"""
value = to_fraction(value)
return mmal.MMAL_RATIONAL_T(value.numerator, value.denominator)
def buffer_bytes(buf):
"""
Given an object which implements the :ref:`buffer protocol
<bufferobjects>`, this function returns the size of the object in bytes.
The object can be multi-dimensional or include items larger than byte-size.
"""
if not isinstance(buf, memoryview):
buf = memoryview(buf)
return buf.itemsize * reduce(mul, buf.shape)
def debug_pipeline(port):
"""
Given an :class:`MMALVideoPort` *port*, this traces all objects in the
pipeline feeding it (including components and connections) and yields each
object in turn. Hence the generator typically yields something like:
* :class:`MMALVideoPort` (the specified output port)
* :class:`MMALEncoder` (the encoder which owns the output port)
* :class:`MMALVideoPort` (the encoder's input port)
* :class:`MMALConnection` (the connection between the splitter and encoder)
* :class:`MMALVideoPort` (the splitter's output port)
* :class:`MMALSplitter` (the splitter on the camera's video port)
* :class:`MMALVideoPort` (the splitter's input port)
* :class:`MMALConnection` (the connection between the splitter and camera)
* :class:`MMALVideoPort` (the camera's video port)
* :class:`MMALCamera` (the camera component)
"""
def find_port(addr):
for obj in MMALObject.REGISTRY:
if isinstance(obj, MMALControlPort):
if ct.addressof(obj._port[0]) == addr:
return obj
raise IndexError('unable to locate port with address %x' % addr)
def find_component(addr):
for obj in MMALObject.REGISTRY:
if isinstance(obj, MMALBaseComponent) and obj._component is not None:
if ct.addressof(obj._component[0]) == addr:
return obj
raise IndexError('unable to locate component with address %x' % addr)
assert isinstance(port, (MMALControlPort, MMALPythonPort))
while True:
if port.type == mmal.MMAL_PORT_TYPE_OUTPUT:
yield port
if isinstance(port, MMALPythonPort):
comp = port._owner()
else:
comp = find_component(ct.addressof(port._port[0].component[0]))
yield comp
if not isinstance(comp, (MMALComponent, MMALPythonComponent)):
break
if comp.connection is None:
break
if isinstance(comp.connection, MMALPythonConnection):
port = comp.connection._target
else:
port = find_port(ct.addressof(comp.connection._connection[0].in_[0]))
yield port
yield comp.connection
if isinstance(comp.connection, MMALPythonConnection):
port = comp.connection._source
else:
port = find_port(ct.addressof(comp.connection._connection[0].out[0]))
def print_pipeline(port):
"""
Prints a human readable representation of the pipeline feeding the
specified :class:`MMALVideoPort` *port*.
"""
rows = [[], [], [], [], [], []]
under_comp = False
for obj in reversed(list(debug_pipeline(port))):
if isinstance(obj, (MMALBaseComponent, MMALPythonBaseComponent)):
rows[0].append(obj.name)
under_comp = True
elif isinstance(obj, MMALVideoPort):
rows[0].append('[%d]' % obj._port[0].index)
if under_comp:
rows[1].append('encoding')
if obj.format == mmal.MMAL_ENCODING_OPAQUE:
rows[1].append(obj.opaque_subformat)
else:
rows[1].append(mmal.FOURCC_str(obj._port[0].format[0].encoding))
if under_comp:
rows[2].append('buf')
rows[2].append('%dx%d' % (obj._port[0].buffer_num, obj._port[0].buffer_size))
if under_comp:
rows[3].append('bitrate')
rows[3].append('%dbps' % (obj._port[0].format[0].bitrate,))
if under_comp:
rows[4].append('frame')
rows[4].append('%dx%d@%sfps' % (
obj._port[0].format[0].es[0].video.width,
obj._port[0].format[0].es[0].video.height,
obj.framerate))
if under_comp:
rows[5].append('colorspc')
under_comp = False
rows[5].append(mmal.FOURCC_str(obj._port[0].format[0].es[0].video.color_space))
elif isinstance(obj, MMALPythonPort):
rows[0].append('[%d]' % obj._index)
if under_comp:
rows[1].append('encoding')
if obj.format == mmal.MMAL_ENCODING_OPAQUE:
rows[1].append(obj.opaque_subformat)
else:
rows[1].append(mmal.FOURCC_str(obj._format[0].encoding))
if under_comp:
rows[2].append('buf')
rows[2].append('%dx%d' % (obj.buffer_count, obj.buffer_size))
if under_comp:
rows[3].append('bitrate')
rows[3].append('%dbps' % (obj._format[0].bitrate,))
if under_comp:
rows[4].append('frame')
under_comp = False
rows[4].append('%dx%d@%sfps' % (
obj._format[0].es[0].video.width,
obj._format[0].es[0].video.height,
obj.framerate))
if under_comp:
rows[5].append('colorspc')
rows[5].append('???')
elif isinstance(obj, (MMALConnection, MMALPythonConnection)):
rows[0].append('')
rows[1].append('')
rows[2].append('-->')
rows[3].append('')
rows[4].append('')
rows[5].append('')
if under_comp:
rows[1].append('encoding')
rows[2].append('buf')
rows[3].append('bitrate')
rows[4].append('frame')
rows[5].append('colorspc')
cols = list(zip(*rows))
max_lens = [max(len(s) for s in col) + 2 for col in cols]
rows = [
''.join('{0:{align}{width}s}'.format(s, align=align, width=max_len)
for s, max_len, align in zip(row, max_lens, cycle('^<^>')))
for row in rows
]
for row in rows:
print(row)
class MMALObject(object):
"""
Represents an object wrapper around an MMAL object (component, port,
connection, etc). This base class maintains a registry of all MMAL objects
currently alive (via weakrefs) which permits object lookup by name and
listing all used MMAL objects.
"""
__slots__ = ('__weakref__',)
REGISTRY = weakref.WeakSet()
def __init__(self):
super(MMALObject, self).__init__()
MMALObject.REGISTRY.add(self)
class MMALBaseComponent(MMALObject):
"""
Represents a generic MMAL component. Class attributes are read to determine
the component type, and the OPAQUE sub-formats of each connectable port.
"""
__slots__ = ('_component', '_control', '_inputs', '_outputs')
component_type = b'none'
opaque_input_subformats = ()
opaque_output_subformats = ()
def __init__(self):
super(MMALBaseComponent, self).__init__()
self._component = ct.POINTER(mmal.MMAL_COMPONENT_T)()
mmal_check(
mmal.mmal_component_create(self.component_type, self._component),
prefix="Failed to create MMAL component %s" % self.component_type)
if self._component[0].input_num != len(self.opaque_input_subformats):
raise PiCameraRuntimeError(
'Expected %d inputs but found %d on component %s' % (
len(self.opaque_input_subformats),
self._component[0].input_num,
self.component_type))
if self._component[0].output_num != len(self.opaque_output_subformats):
raise PiCameraRuntimeError(
'Expected %d outputs but found %d on component %s' % (
len(self.opaque_output_subformats),
self._component[0].output_num,
self.component_type))
self._control = MMALControlPort(self._component[0].control)
port_class = {
mmal.MMAL_ES_TYPE_UNKNOWN: MMALPort,
mmal.MMAL_ES_TYPE_CONTROL: MMALControlPort,
mmal.MMAL_ES_TYPE_VIDEO: MMALVideoPort,
mmal.MMAL_ES_TYPE_AUDIO: MMALAudioPort,
mmal.MMAL_ES_TYPE_SUBPICTURE: MMALSubPicturePort,
}
self._inputs = tuple(
port_class[self._component[0].input[n][0].format[0].type](
self._component[0].input[n], opaque_subformat)
for n, opaque_subformat in enumerate(self.opaque_input_subformats))
self._outputs = tuple(
port_class[self._component[0].output[n][0].format[0].type](
self._component[0].output[n], opaque_subformat)
for n, opaque_subformat in enumerate(self.opaque_output_subformats))
def close(self):
"""
Close the component and release all its resources. After this is
called, most methods will raise exceptions if called.
"""
if self._component is not None:
# ensure we free any pools associated with input/output ports
for output in self.outputs:
output.disable()
for input in self.inputs:
input.disable()
mmal.mmal_component_destroy(self._component)
self._component = None
self._inputs = ()
self._outputs = ()
self._control = None
@property
def name(self):
return self._component[0].name.decode('ascii')
@property
def control(self):
"""
The :class:`MMALControlPort` control port of the component which can be
used to configure most aspects of the component's behaviour.
"""
return self._control
@property
def inputs(self):
"""
A sequence of :class:`MMALPort` objects representing the inputs
of the component.
"""
return self._inputs
@property
def outputs(self):
"""
A sequence of :class:`MMALPort` objects representing the outputs
of the component.
"""
return self._outputs
@property
def enabled(self):
"""
Returns ``True`` if the component is currently enabled. Use
:meth:`enable` and :meth:`disable` to control the component's state.
"""
return bool(self._component[0].is_enabled)
def enable(self):
"""
Enable the component. When a component is enabled it will process data
sent to its input port(s), sending the results to buffers on its output
port(s). Components may be implicitly enabled by connections.
"""
mmal_check(
mmal.mmal_component_enable(self._component),
prefix="Failed to enable component")
def disable(self):
"""
Disables the component.
"""
mmal_check(
mmal.mmal_component_disable(self._component),
prefix="Failed to disable component")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
def __repr__(self):
if self._component is not None:
return '<%s "%s": %d inputs %d outputs>' % (
self.__class__.__name__, self.name,
len(self.inputs), len(self.outputs))
else:
return '<%s closed>' % self.__class__.__name__
class MMALControlPort(MMALObject):
"""
Represents an MMAL port with properties to configure the port's parameters.
"""
__slots__ = ('_port', '_params', '_wrapper')
def __init__(self, port):
super(MMALControlPort, self).__init__()
self._port = port
self._params = MMALPortParams(port)
self._wrapper = None
@property
def index(self):
"""
Returns an integer indicating the port's position within its owning
list (inputs, outputs, etc.)
"""
return self._port[0].index
@property
def enabled(self):
"""
Returns a :class:`bool` indicating whether the port is currently
enabled. Unlike other classes, this is a read-only property. Use
:meth:`enable` and :meth:`disable` to modify the value.
"""
return bool(self._port[0].is_enabled)
def enable(self, callback=None):
"""
Enable the port with the specified callback function (this must be
``None`` for connected ports, and a callable for disconnected ports).
The callback function must accept two parameters which will be this
:class:`MMALControlPort` (or descendent) and an :class:`MMALBuffer`
instance. Any return value will be ignored.
"""
def wrapper(port, buf):
buf = MMALBuffer(buf)
try:
callback(self, buf)
finally:
buf.release()
if callback:
self._wrapper = mmal.MMAL_PORT_BH_CB_T(wrapper)
else:
self._wrapper = ct.cast(None, mmal.MMAL_PORT_BH_CB_T)
mmal_check(
mmal.mmal_port_enable(self._port, self._wrapper),
prefix="Unable to enable port %s" % self.name)
def disable(self):
"""
Disable the port.
"""
# NOTE: The test here only exists to avoid spamming the console; when
# disabling an already disabled port MMAL dumps errors to stderr. If
# this test isn't here closing a camera results in half a dozen lines
# of ignored errors
if self.enabled:
try:
mmal_check(
mmal.mmal_port_disable(self._port),
prefix="Unable to disable port %s" % self.name)
except PiCameraMMALError as e:
# Ignore the error if we're disabling an already disabled port
if not (e.status == mmal.MMAL_EINVAL and not self.enabled):
raise e
self._wrapper = None
@property
def name(self):
result = self._port[0].name.decode('ascii')
if result.endswith(')'):
try:
# strip (format) from port names as it doesn't really belong
# there (it doesn't identify the port in any way) and makes
# matching some of the correctional cases a pain
return result[:result.rindex('(')]
except ValueError:
return result
else:
return result
@property
def type(self):
"""
The type of the port. One of:
* MMAL_PORT_TYPE_OUTPUT
* MMAL_PORT_TYPE_INPUT
* MMAL_PORT_TYPE_CONTROL
* MMAL_PORT_TYPE_CLOCK
"""
return self._port[0].type
@property
def capabilities(self):
"""
The capabilities of the port. A bitfield of the following:
* MMAL_PORT_CAPABILITY_PASSTHROUGH
* MMAL_PORT_CAPABILITY_ALLOCATION
* MMAL_PORT_CAPABILITY_SUPPORTS_EVENT_FORMAT_CHANGE
"""
return self._port[0].capabilities
@property
def params(self):
"""
The configurable parameters for the port. This is presented as a
mutable mapping of parameter numbers to values, implemented by the
:class:`MMALPortParams` class.
"""
return self._params
def __repr__(self):
if self._port is not None:
return '<MMALControlPort "%s">' % self.name
else:
return '<MMALControlPort closed>'
class MMALPort(MMALControlPort):
"""
Represents an MMAL port with properties to configure and update the port's
format. This is the base class of :class:`MMALVideoPort`,
:class:`MMALAudioPort`, and :class:`MMALSubPicturePort`.
"""
__slots__ = ('_opaque_subformat', '_pool', '_stopped', '_connection')
# A mapping of corrected definitions of supported_formats for ports with
# particular names. Older firmwares either raised EINVAL, ENOSYS, or just
# reported the wrong things for various ports; these lists are derived from
# querying newer firmwares or in some cases guessing sensible defaults
# (for ports where even the newer firmwares get stuff wrong).
_supported_formats_patch = {
'vc.ril.camera:out:2': [
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_NV12,
mmal.MMAL_ENCODING_I422,
mmal.MMAL_ENCODING_YUYV,
mmal.MMAL_ENCODING_YVYU,
mmal.MMAL_ENCODING_VYUY,
mmal.MMAL_ENCODING_UYVY,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_BGRA,
mmal.MMAL_ENCODING_RGB16,
mmal.MMAL_ENCODING_YV12,
mmal.MMAL_ENCODING_NV21,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_RGBA,
],
'vc.ril.image_encode:in:0': [
mmal.MMAL_ENCODING_RGB16,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_I422,
mmal.MMAL_ENCODING_NV12,
mmal.MMAL_ENCODING_YUYV,
mmal.MMAL_ENCODING_YVYU,
mmal.MMAL_ENCODING_VYUY,
],
'vc.ril.image_encode:out:0': [
mmal.MMAL_ENCODING_JPEG,
mmal.MMAL_ENCODING_GIF,
mmal.MMAL_ENCODING_PNG,
mmal.MMAL_ENCODING_BMP,
mmal.MMAL_ENCODING_PPM,
mmal.MMAL_ENCODING_TGA,
],
'vc.ril.resize:in:0': [
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
mmal.MMAL_ENCODING_RGB16,
mmal.MMAL_ENCODING_I420,
# several invalid encodings (lowercase versions of the priors)
# appear here in modern firmwares but since they don't map to any
# constants they're excluded
mmal.MMAL_ENCODING_I420_SLICE,
],
'vc.ril.resize:out:0': [
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
mmal.MMAL_ENCODING_RGB16,
mmal.MMAL_ENCODING_I420,
# same invalid encodings as above here
mmal.MMAL_ENCODING_I420_SLICE,
],
'vc.ril.isp:in:0': [
mmal.MMAL_ENCODING_BAYER_SBGGR8,
mmal.MMAL_ENCODING_BAYER_SBGGR10DPCM8,
mmal.MMAL_ENCODING_BAYER_SBGGR10P,
mmal.MMAL_ENCODING_BAYER_SBGGR12P,
mmal.MMAL_ENCODING_YUYV,
mmal.MMAL_ENCODING_YVYU,
mmal.MMAL_ENCODING_VYUY,
mmal.MMAL_ENCODING_UYVY,
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_YV12,
mmal.MMAL_ENCODING_I422,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
mmal.MMAL_ENCODING_RGB16,
mmal.MMAL_ENCODING_YUVUV128,
mmal.MMAL_ENCODING_NV12,
mmal.MMAL_ENCODING_NV21,
],
'vc.ril.isp:out:0': [
mmal.MMAL_ENCODING_YUYV,
mmal.MMAL_ENCODING_YVYU,
mmal.MMAL_ENCODING_VYUY,
mmal.MMAL_ENCODING_UYVY,
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_YV12,
mmal.MMAL_ENCODING_I422,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
mmal.MMAL_ENCODING_RGB16,
mmal.MMAL_ENCODING_YUVUV128,
mmal.MMAL_ENCODING_NV12,
mmal.MMAL_ENCODING_NV21,
],
'vc.null_sink:in:0': [
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
],
}
def __init__(self, port, opaque_subformat='OPQV'):
super(MMALPort, self).__init__(port)
self.opaque_subformat = opaque_subformat
self._pool = None
self._stopped = True
self._connection = None
def __repr__(self):
if self._port is not None:
return '<MMALPort "%s": format=MMAL_FOURCC(%r) buffers=%dx%d>' % (
self.name, mmal.FOURCC_str(self.format),
self.buffer_count, self.buffer_size)
else:
return '<MMALPort closed>'
def _get_opaque_subformat(self):
return self._opaque_subformat
def _set_opaque_subformat(self, value):
self._opaque_subformat = value
opaque_subformat = property(
_get_opaque_subformat, _set_opaque_subformat, doc="""\
Retrieves or sets the opaque sub-format that the port speaks. While
most formats (I420, RGBA, etc.) mean one thing, the opaque format is
special; different ports produce different sorts of data when
configured for OPQV format. This property stores a string which
uniquely identifies what the associated port means for OPQV format.
If the port does not support opaque format at all, set this property to
``None``.
:class:`MMALConnection` uses this information when negotiating formats
for a connection between two ports.
""")
def _get_format(self):
result = self._port[0].format[0].encoding
if FIX_RGB_BGR_ORDER:
return {
mmal.MMAL_ENCODING_RGB24: mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_BGR24: mmal.MMAL_ENCODING_RGB24,
}.get(result, result)
else:
return result
def _set_format(self, value):
if FIX_RGB_BGR_ORDER:
value = {
mmal.MMAL_ENCODING_RGB24: mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_BGR24: mmal.MMAL_ENCODING_RGB24,
}.get(value, value)
self._port[0].format[0].encoding = value
if value == mmal.MMAL_ENCODING_OPAQUE:
self._port[0].format[0].encoding_variant = mmal.MMAL_ENCODING_I420
format = property(_get_format, _set_format, doc="""\
Retrieves or sets the encoding format of the port. Setting this
attribute implicitly sets the encoding variant to a sensible value
(I420 in the case of OPAQUE).
After setting this attribute, call :meth:`commit` to make the changes
effective.
""")
@property
def supported_formats(self):
"""
Retrieves a sequence of supported encodings on this port.
"""
try:
mp = self.params[mmal.MMAL_PARAMETER_SUPPORTED_ENCODINGS]
except PiCameraMMALError as e:
if e.status in (mmal.MMAL_EINVAL, mmal.MMAL_ENOSYS):
# Workaround: old firmwares raise EINVAL or ENOSYS when various
# ports are queried for supported formats. The following is the
# correct sequence for old firmwares (note: swapped RGB24 and
# BGR24 order in still port) ... probably (vc.ril.camera:out:2
# is definitely right, the rest are largely guessed based on
# queries of later firmwares)
try:
return MMALPort._supported_formats_patch[self.name]
except KeyError:
raise e
else:
raise
else:
result = [
v for v in mp.encoding if v != 0
][:mp.hdr.size // ct.sizeof(ct.c_uint32)]
# Workaround: Fix incorrect result on MMALImageEncoder.outputs[0]
# from modern firmwares
if self.name == 'vc.ril.image_encode:out:0' and result == [
mmal.MMAL_ENCODING_MP2V, mmal.MMAL_ENCODING_MP2V,
mmal.MMAL_ENCODING_H264, mmal.MMAL_ENCODING_H264,
mmal.MMAL_ENCODING_VP7, mmal.MMAL_ENCODING_VP7,
mmal.MMAL_ENCODING_VP6, mmal.MMAL_ENCODING_VP6]:
return MMALPort._supported_formats_patch[self.name]
else:
return result
def _get_bitrate(self):
return self._port[0].format[0].bitrate
def _set_bitrate(self, value):
self._port[0].format[0].bitrate = value
bitrate = property(_get_bitrate, _set_bitrate, doc="""\
Retrieves or sets the bitrate limit for the port's format.
""")
def copy_from(self, source):
"""
Copies the port's :attr:`format` from the *source*
:class:`MMALControlPort`.
"""
if isinstance(source, MMALPythonPort):
mmal.mmal_format_copy(self._port[0].format, source._format)
else:
mmal.mmal_format_copy(self._port[0].format, source._port[0].format)
def commit(self):
"""
Commits the port's configuration and automatically updates the number
and size of associated buffers according to the recommendations of the
MMAL library. This is typically called after adjusting the port's
format and/or associated settings (like width and height for video
ports).
"""
mmal_check(
mmal.mmal_port_format_commit(self._port),
prefix="Format couldn't be set on port %s" % self.name)
# Workaround: Unfortunately, there is an upstream issue with the
# buffer_num_recommended which means it can't currently be used (see
# discussion in raspberrypi/userland#167). There's another upstream
# issue with buffer_num_min which means we need to guard against 0
# values...
self._port[0].buffer_num = max(1, self._port[0].buffer_num_min)
self._port[0].buffer_size = (
self._port[0].buffer_size_recommended
if self._port[0].buffer_size_recommended > 0 else
self._port[0].buffer_size_min)
@property
def pool(self):
"""
Returns the :class:`MMALPool` associated with the buffer, if any.
"""
return self._pool
def get_buffer(self, block=True, timeout=None):
"""
Returns a :class:`MMALBuffer` from the associated :attr:`pool`. *block*
and *timeout* act as they do in the corresponding
:meth:`MMALPool.get_buffer`.
"""
if not self.enabled:
raise PiCameraPortDisabled(
'cannot get buffer from disabled port %s' % self.name)
return self.pool.get_buffer(block, timeout)
def send_buffer(self, buf):
"""
Send :class:`MMALBuffer` *buf* to the port.
"""
if (
self.type == mmal.MMAL_PORT_TYPE_INPUT and
isinstance(self._connection, MMALPythonConnection) and
self._connection._callback is not None):
try:
modified_buf = self._connection._callback(self._connection, buf)
except:
buf.release()
raise
else:
if modified_buf is None:
buf.release()
return
else:
buf = modified_buf
try:
mmal_check(
mmal.mmal_port_send_buffer(self._port, buf._buf),
prefix="cannot send buffer to port %s" % self.name)
except PiCameraMMALError as e:
# If port is disabled, convert exception for convenience
if e.status == mmal.MMAL_EINVAL and not self.enabled:
raise PiCameraPortDisabled(
'cannot send buffer to disabled port %s' % self.name)
else:
raise
def flush(self):
"""
Flush the port.
"""
mmal_check(
mmal.mmal_port_flush(self._port),
prefix="Unable to flush port %s" % self.name)
def _get_buffer_count(self):
return self._port[0].buffer_num
def _set_buffer_count(self, value):
if value < 1:
raise PiCameraMMALError(mmal.MMAL_EINVAL, 'buffer count <1')
self._port[0].buffer_num = value
buffer_count = property(_get_buffer_count, _set_buffer_count, doc="""\
The number of buffers allocated (or to be allocated) to the port.
The ``mmalobj`` layer automatically configures this based on
recommendations from the MMAL library.
""")
def _get_buffer_size(self):
return self._port[0].buffer_size
def _set_buffer_size(self, value):
if value < 0:
raise PiCameraMMALError(mmal.MMAL_EINVAL, 'buffer size <0')
self._port[0].buffer_size = value
buffer_size = property(_get_buffer_size, _set_buffer_size, doc="""\
The size of buffers allocated (or to be allocated) to the port. The
size of buffers is typically dictated by the port's format. The
``mmalobj`` layer automatically configures this based on
recommendations from the MMAL library.
""")
def enable(self, callback=None):
"""
Enable the port with the specified callback function (this must be
``None`` for connected ports, and a callable for disconnected ports).
The callback function must accept two parameters which will be this
:class:`MMALControlPort` (or descendent) and an :class:`MMALBuffer`
instance. The callback should return ``True`` when processing is
complete and no further calls are expected (e.g. at frame-end for an
image encoder), and ``False`` otherwise.
"""
def wrapper(port, buf):
buf = MMALBuffer(buf)
try:
if not self._stopped and callback(self, buf):
self._stopped = True
finally:
buf.release()
try:
self._pool.send_buffer(block=False)
except PiCameraPortDisabled:
# The port was disabled, no point trying again
pass
# Workaround: There is a bug in the MJPEG encoder that causes a
# deadlock if the FIFO is full on shutdown. Increasing the encoder
# buffer size makes this less likely to happen. See
# raspberrypi/userland#208. Connecting the encoder component resets the
# output port's buffer size, hence why we correct this here, just
# before enabling the port.
if self._port[0].format[0].encoding == mmal.MMAL_ENCODING_MJPEG:
self._port[0].buffer_size = max(512 * 1024, self._port[0].buffer_size_recommended)
if callback:
assert self._stopped
assert self._pool is None
self._stopped = False
self._pool = MMALPortPool(self)
try:
self._wrapper = mmal.MMAL_PORT_BH_CB_T(wrapper)
mmal_check(
mmal.mmal_port_enable(self._port, self._wrapper),
prefix="Unable to enable port %s" % self.name)
# If this port is an output port, send it all the buffers
# in the pool. If it's an input port, don't bother: the user
# will presumably want to feed buffers to it manually
if self._port[0].type == mmal.MMAL_PORT_TYPE_OUTPUT:
self._pool.send_all_buffers(block=False)
except:
self._pool.close()
self._pool = None
self._stopped = True
raise
else:
super(MMALPort, self).enable()
def disable(self):
"""
Disable the port.
"""
self._stopped = True
super(MMALPort, self).disable()
if self._pool is not None:
self._pool.close()
self._pool = None
@property
def connection(self):
"""
If this port is connected to another, this property holds the
:class:`MMALConnection` or :class:`MMALPythonConnection` object which
represents that connection. If this port is not connected, this
property is ``None``.
"""
return self._connection
def connect(self, other, **options):
"""
Connect this port to the *other* :class:`MMALPort` (or
:class:`MMALPythonPort`). The type and configuration of the connection
will be automatically selected.
Various connection *options* can be specified as keyword arguments.
These will be passed onto the :class:`MMALConnection` or
:class:`MMALPythonConnection` constructor that is called (see those
classes for an explanation of the available options).
"""
# Always construct connections from the output end
if self.type != mmal.MMAL_PORT_TYPE_OUTPUT:
return other.connect(self, **options)
if other.type != mmal.MMAL_PORT_TYPE_INPUT:
raise PiCameraValueError(
'A connection can only be established between an output and '
'an input port')
if isinstance(other, MMALPythonPort):
return MMALPythonConnection(self, other, **options)
else:
return MMALConnection(self, other, **options)
def disconnect(self):
"""
Destroy the connection between this port and another port.
"""
if self.connection is not None:
self.connection.close()
class MMALVideoPort(MMALPort):
"""
Represents an MMAL port used to pass video data.
"""
__slots__ = ()
def __repr__(self):
if self._port is not None:
return (
'<MMALVideoPort "%s": format=MMAL_FOURCC("%s") buffers=%dx%d '
'frames=%s@%sfps colorspace=MMAL_FOURCC("%s")>' % (
self.name, mmal.FOURCC_str(self.format),
self._port[0].buffer_num, self._port[0].buffer_size,
self.framesize, self.framerate,
mmal.FOURCC_str(self.colorspace)))
else:
return '<MMALVideoPort closed>'
def _get_framesize(self):
return PiResolution(
self._port[0].format[0].es[0].video.crop.width,
self._port[0].format[0].es[0].video.crop.height,
)
def _set_framesize(self, value):
value = to_resolution(value)
video = self._port[0].format[0].es[0].video
video.width = bcm_host.VCOS_ALIGN_UP(value.width, 32)
video.height = bcm_host.VCOS_ALIGN_UP(value.height, 16)
video.crop.width = value.width
video.crop.height = value.height
framesize = property(_get_framesize, _set_framesize, doc="""\
Retrieves or sets the size of the port's video frames as a (width,
height) tuple. This attribute implicitly handles scaling the given
size up to the block size of the camera (32x16).
After setting this attribute, call :meth:`~MMALPort.commit` to make the
changes effective.
""")
def _get_framerate(self):
video = self._port[0].format[0].es[0].video
try:
return Fraction(
video.frame_rate.num,
video.frame_rate.den)
except ZeroDivisionError:
assert video.frame_rate.num == 0
return Fraction(0, 1)
def _set_framerate(self, value):
value = to_fraction(value)
video = self._port[0].format[0].es[0].video
video.frame_rate.num = value.numerator
video.frame_rate.den = value.denominator
framerate = property(_get_framerate, _set_framerate, doc="""\
Retrieves or sets the framerate of the port's video frames in fps.
After setting this attribute, call :meth:`~MMALPort.commit` to make the
changes effective.
""")
def _get_colorspace(self):
return self._port[0].format[0].es[0].video.color_space
def _set_colorspace(self, value):
self._port[0].format[0].es[0].video.color_space = value
colorspace = property(_get_colorspace, _set_colorspace, doc="""\
Retrieves or sets the color-space of the port's frames.
After setting this attribute, call :meth:`~MMALPort.commit` to make the
changes effective.
""")
class MMALAudioPort(MMALPort):
"""
Represents an MMAL port used to pass audio data.
"""
__slots__ = ()
def __repr__(self):
if self._port is not None:
return '<MMALAudioPort "%s": format=MMAL_FOURCC(%r) buffers=%dx%d>' % (
self.name, mmal.FOURCC_str(self.format),
self._port[0].buffer_num, self._port[0].buffer_size)
else:
return '<MMALAudioPort closed>'
class MMALSubPicturePort(MMALPort):
"""
Represents an MMAL port used to pass sub-picture (caption) data.
"""
__slots__ = ()
def __repr__(self):
if self._port is not None:
return '<MMALSubPicturePort "%s": format=MMAL_FOURCC(%r) buffers=%dx%d>' % (
self.name, mmal.FOURCC_str(self.format),
self._port[0].buffer_num, self._port[0].buffer_size)
else:
return '<MMALSubPicturePort closed>'
class MMALPortParams(object):
"""
Represents the parameters of an MMAL port. This class implements the
:attr:`MMALControlPort.params` attribute.
Internally, the class understands how to convert certain structures to more
common Python data-types. For example, parameters that expect an
MMAL_RATIONAL_T type will return and accept Python's
:class:`~fractions.Fraction` class (or any other numeric types), while
parameters that expect an MMAL_BOOL_T type will treat anything as a truthy
value. Parameters that expect the MMAL_PARAMETER_STRING_T structure will be
treated as plain strings, and likewise MMAL_PARAMETER_INT32_T and similar
structures will be treated as plain ints.
Parameters that expect more complex structures will return and expect
those structures verbatim.
"""
__slots__ = ('_port',)
def __init__(self, port):
super(MMALPortParams, self).__init__()
self._port = port
def __getitem__(self, key):
dtype = PARAM_TYPES[key]
# Use the short-cut functions where possible (teeny bit faster if we
# get some C to do the structure wrapping for us)
func = {
mmal.MMAL_PARAMETER_RATIONAL_T: mmal.mmal_port_parameter_get_rational,
mmal.MMAL_PARAMETER_BOOLEAN_T: mmal.mmal_port_parameter_get_boolean,
mmal.MMAL_PARAMETER_INT32_T: mmal.mmal_port_parameter_get_int32,
mmal.MMAL_PARAMETER_INT64_T: mmal.mmal_port_parameter_get_int64,
mmal.MMAL_PARAMETER_UINT32_T: mmal.mmal_port_parameter_get_uint32,
mmal.MMAL_PARAMETER_UINT64_T: mmal.mmal_port_parameter_get_uint64,
}.get(dtype, mmal.mmal_port_parameter_get)
conv = {
mmal.MMAL_PARAMETER_RATIONAL_T: lambda v: Fraction(v.num, v.den),
mmal.MMAL_PARAMETER_BOOLEAN_T: lambda v: v.value != mmal.MMAL_FALSE,
mmal.MMAL_PARAMETER_INT32_T: lambda v: v.value,
mmal.MMAL_PARAMETER_INT64_T: lambda v: v.value,
mmal.MMAL_PARAMETER_UINT32_T: lambda v: v.value,
mmal.MMAL_PARAMETER_UINT64_T: lambda v: v.value,
mmal.MMAL_PARAMETER_STRING_T: lambda v: v.str.decode('ascii'),
}.get(dtype, lambda v: v)
if func == mmal.mmal_port_parameter_get:
result = dtype(
mmal.MMAL_PARAMETER_HEADER_T(key, ct.sizeof(dtype))
)
mmal_check(
func(self._port, result.hdr),
prefix="Failed to get parameter %d" % key)
else:
dtype = {
mmal.MMAL_PARAMETER_RATIONAL_T: mmal.MMAL_RATIONAL_T,
mmal.MMAL_PARAMETER_BOOLEAN_T: mmal.MMAL_BOOL_T,
mmal.MMAL_PARAMETER_INT32_T: ct.c_int32,
mmal.MMAL_PARAMETER_INT64_T: ct.c_int64,
mmal.MMAL_PARAMETER_UINT32_T: ct.c_uint32,
mmal.MMAL_PARAMETER_UINT64_T: ct.c_uint64,
}[dtype]
result = dtype()
mmal_check(
func(self._port, key, result),
prefix="Failed to get parameter %d" % key)
return conv(result)
def __setitem__(self, key, value):
dtype = PARAM_TYPES[key]
func = {
mmal.MMAL_PARAMETER_RATIONAL_T: mmal.mmal_port_parameter_set_rational,
mmal.MMAL_PARAMETER_BOOLEAN_T: mmal.mmal_port_parameter_set_boolean,
mmal.MMAL_PARAMETER_INT32_T: mmal.mmal_port_parameter_set_int32,
mmal.MMAL_PARAMETER_INT64_T: mmal.mmal_port_parameter_set_int64,
mmal.MMAL_PARAMETER_UINT32_T: mmal.mmal_port_parameter_set_uint32,
mmal.MMAL_PARAMETER_UINT64_T: mmal.mmal_port_parameter_set_uint64,
mmal.MMAL_PARAMETER_STRING_T: mmal.mmal_port_parameter_set_string,
}.get(dtype, mmal.mmal_port_parameter_set)
conv = {
mmal.MMAL_PARAMETER_RATIONAL_T: lambda v: to_rational(v),
mmal.MMAL_PARAMETER_BOOLEAN_T: lambda v: mmal.MMAL_TRUE if v else mmal.MMAL_FALSE,
mmal.MMAL_PARAMETER_STRING_T: lambda v: v.encode('ascii'),
}.get(dtype, lambda v: v)
if func == mmal.mmal_port_parameter_set:
mp = conv(value)
assert mp.hdr.id == key
assert mp.hdr.size >= ct.sizeof(dtype)
mmal_check(
func(self._port, mp.hdr),
prefix="Failed to set parameter %d to %r" % (key, value))
else:
mmal_check(
func(self._port, key, conv(value)),
prefix="Failed to set parameter %d to %r" % (key, value))
class MMALBuffer(object):
"""
Represents an MMAL buffer header. This is usually constructed from the
buffer header pointer and is largely supplied to make working with
the buffer's data a bit simpler. Using the buffer as a context manager
implicitly locks the buffer's memory and returns the :mod:`ctypes`
buffer object itself::
def callback(port, buf):
with buf as data:
# data is a ctypes uint8 array with size entries
print(len(data))
Alternatively you can use the :attr:`data` property directly, which returns
and modifies the buffer's data as a :class:`bytes` object (note this is
generally slower than using the buffer object unless you are simply
replacing the entire buffer)::
def callback(port, buf):
# the buffer contents as a byte-string
print(buf.data)
"""
__slots__ = ('_buf',)
def __init__(self, buf):
super(MMALBuffer, self).__init__()
self._buf = buf
def _get_command(self):
return self._buf[0].cmd
def _set_command(self, value):
self._buf[0].cmd = value
command = property(_get_command, _set_command, doc="""\
The command set in the buffer's meta-data. This is usually 0 for
buffers returned by an encoder; typically this is only used by buffers
sent to the callback of a control port.
""")
def _get_flags(self):
return self._buf[0].flags
def _set_flags(self, value):
self._buf[0].flags = value
flags = property(_get_flags, _set_flags, doc="""\
The flags set in the buffer's meta-data, returned as a bitmapped
integer. Typical flags include:
* ``MMAL_BUFFER_HEADER_FLAG_EOS`` -- end of stream
* ``MMAL_BUFFER_HEADER_FLAG_FRAME_START`` -- start of frame data
* ``MMAL_BUFFER_HEADER_FLAG_FRAME_END`` -- end of frame data
* ``MMAL_BUFFER_HEADER_FLAG_KEYFRAME`` -- frame is a key-frame
* ``MMAL_BUFFER_HEADER_FLAG_FRAME`` -- frame data
* ``MMAL_BUFFER_HEADER_FLAG_CODECSIDEINFO`` -- motion estimatation data
""")
def _get_pts(self):
return self._buf[0].pts
def _set_pts(self, value):
self._buf[0].pts = value
pts = property(_get_pts, _set_pts, doc="""\
The presentation timestamp (PTS) of the buffer, as an integer number
of microseconds or ``MMAL_TIME_UNKNOWN``.
""")
def _get_dts(self):
return self._buf[0].dts
def _set_dts(self, value):
self._buf[0].dts = value
dts = property(_get_dts, _set_dts, doc="""\
The decoding timestamp (DTS) of the buffer, as an integer number of
microseconds or ``MMAL_TIME_UNKNOWN``.
""")
@property
def size(self):
"""
Returns the length of the buffer's data area in bytes. This will be
greater than or equal to :attr:`length` and is fixed in value.
"""
return self._buf[0].alloc_size
def _get_offset(self):
return self._buf[0].offset
def _set_offset(self, value):
assert 0 <= value <= self.size
self._buf[0].offset = value
self.length = min(self.size - self.offset, self.length)
offset = property(_get_offset, _set_offset, doc="""\
The offset from the start of the buffer at which the data actually
begins. Defaults to 0. If this is set to a value which would force the
current :attr:`length` off the end of the buffer's :attr:`size`, then
:attr:`length` will be decreased automatically.
""")
def _get_length(self):
return self._buf[0].length
def _set_length(self, value):
assert 0 <= value <= self.size - self.offset
self._buf[0].length = value
length = property(_get_length, _set_length, doc="""\
The length of data held in the buffer. Must be less than or equal to
the allocated size of data held in :attr:`size` minus the data
:attr:`offset`. This attribute can be used to effectively blank the
buffer by setting it to zero.
""")
def _get_data(self):
with self as buf:
return ct.string_at(
ct.byref(buf, self._buf[0].offset),
self._buf[0].length)
def _set_data(self, value):
value_len = buffer_bytes(value)
if value_len:
if value_len > self.size:
raise PiCameraValueError(
'data is too large for buffer (%d > %d)' % (
value_len, self.size))
bp = ct.c_uint8 * value_len
try:
sp = bp.from_buffer(value)
except TypeError:
sp = bp.from_buffer_copy(value)
with self as buf:
ct.memmove(buf, sp, value_len)
self._buf[0].offset = 0
self._buf[0].length = value_len
data = property(_get_data, _set_data, doc="""\
The data held in the buffer as a :class:`bytes` string. You can set
this attribute to modify the data in the buffer. Acceptable values
are anything that supports the buffer protocol, and which contains
:attr:`size` bytes or less. Setting this attribute implicitly modifies
the :attr:`length` attribute to the length of the specified value and
sets :attr:`offset` to zero.
.. note::
Accessing a buffer's data via this attribute is relatively slow
(as it copies the buffer's data to/from Python objects). See the
:class:`MMALBuffer` documentation for details of a faster (but
more complex) method.
""")
def replicate(self, source):
"""
Replicates the *source* :class:`MMALBuffer`. This copies all fields
from the *source* buffer, including the internal :attr:`data` pointer.
In other words, after replication this buffer and the *source* buffer
will share the same block of memory for *data*.
The *source* buffer will also be referenced internally by this buffer
and will only be recycled once this buffer is released.
.. note::
This is fundamentally different to the operation of the
:meth:`copy_from` method. It is much faster, but imposes the burden
that two buffers now share data (the *source* cannot be released
until the replicant has been released).
"""
mmal_check(
mmal.mmal_buffer_header_replicate(self._buf, source._buf),
prefix='unable to replicate buffer')
def copy_from(self, source):
"""
Copies all fields (including data) from the *source*
:class:`MMALBuffer`. This buffer must have sufficient :attr:`size` to
store :attr:`length` bytes from the *source* buffer. This method
implicitly sets :attr:`offset` to zero, and :attr:`length` to the
number of bytes copied.
.. note::
This is fundamentally different to the operation of the
:meth:`replicate` method. It is much slower, but afterward the
copied buffer is entirely independent of the *source*.
"""
assert self.size >= source.length
source_len = source._buf[0].length
if source_len:
with self as target_buf, source as source_buf:
ct.memmove(target_buf, ct.byref(source_buf, source.offset), source_len)
self._buf[0].offset = 0
self._buf[0].length = source_len
self.copy_meta(source)
def copy_meta(self, source):
"""
Copy meta-data from the *source* :class:`MMALBuffer`; specifically this
copies all buffer fields with the exception of :attr:`data`,
:attr:`length` and :attr:`offset`.
"""
self._buf[0].cmd = source._buf[0].cmd
self._buf[0].flags = source._buf[0].flags
self._buf[0].dts = source._buf[0].dts
self._buf[0].pts = source._buf[0].pts
self._buf[0].type[0] = source._buf[0].type[0]
def acquire(self):
"""
Acquire a reference to the buffer. This will prevent the buffer from
being recycled until :meth:`release` is called. This method can be
called multiple times in which case an equivalent number of calls
to :meth:`release` must be made before the buffer will actually be
released.
"""
mmal.mmal_buffer_header_acquire(self._buf)
def release(self):
"""
Release a reference to the buffer. This is the opposing call to
:meth:`acquire`. Once all references have been released, the buffer
will be recycled.
"""
mmal.mmal_buffer_header_release(self._buf)
def reset(self):
"""
Resets all buffer header fields to default values.
"""
mmal.mmal_buffer_header_reset(self._buf)
def __enter__(self):
mmal_check(
mmal.mmal_buffer_header_mem_lock(self._buf),
prefix='unable to lock buffer header memory')
return ct.cast(
self._buf[0].data,
ct.POINTER(ct.c_uint8 * self._buf[0].alloc_size)).contents
def __exit__(self, *exc):
mmal.mmal_buffer_header_mem_unlock(self._buf)
return False
def __repr__(self):
if self._buf is not None:
return '<MMALBuffer object: flags=%s command=%s length=%d>' % (
''.join((
'S' if self.flags & mmal.MMAL_BUFFER_HEADER_FLAG_FRAME_START else '_',
'E' if self.flags & mmal.MMAL_BUFFER_HEADER_FLAG_FRAME_END else '_',
'K' if self.flags & mmal.MMAL_BUFFER_HEADER_FLAG_KEYFRAME else '_',
'C' if self.flags & mmal.MMAL_BUFFER_HEADER_FLAG_CONFIG else '_',
'M' if self.flags & mmal.MMAL_BUFFER_HEADER_FLAG_CODECSIDEINFO else '_',
'X' if self.flags & mmal.MMAL_BUFFER_HEADER_FLAG_EOS else '_',
)), {
0: 'none',
mmal.MMAL_EVENT_ERROR: 'error',
mmal.MMAL_EVENT_FORMAT_CHANGED: 'format-change',
mmal.MMAL_EVENT_PARAMETER_CHANGED: 'param-change',
mmal.MMAL_EVENT_EOS: 'end-of-stream',
}[self.command], self.length)
else:
return '<MMALBuffer object: ???>'
class MMALQueue(object):
"""
Represents an MMAL buffer queue. Buffers can be added to the queue with the
:meth:`put` method, and retrieved from the queue (with optional wait
timeout) with the :meth:`get` method.
"""
__slots__ = ('_queue', '_created')
def __init__(self, queue):
self._created = False
self._queue = queue
@classmethod
def create(cls):
self = cls(mmal.mmal_queue_create())
self._created = True
return self
def close(self):
if self._created:
mmal_queue_destroy(self._queue)
self._queue = None
def __len__(self):
return mmal.mmal_queue_length(self._queue)
def get(self, block=True, timeout=None):
"""
Get the next buffer from the queue. If *block* is ``True`` (the default)
and *timeout* is ``None`` (the default) then the method will block
until a buffer is available. Otherwise *timeout* is the maximum time to
wait (in seconds) for a buffer to become available. If a buffer is not
available before the timeout expires, the method returns ``None``.
Likewise, if *block* is ``False`` and no buffer is immediately
available then ``None`` is returned.
"""
if block and timeout is None:
buf = mmal.mmal_queue_wait(self._queue)
elif block and timeout is not None:
buf = mmal.mmal_queue_timedwait(self._queue, int(timeout * 1000))
else:
buf = mmal.mmal_queue_get(self._queue)
if buf:
return MMALBuffer(buf)
def put(self, buf):
"""
Place :class:`MMALBuffer` *buf* at the back of the queue.
"""
mmal.mmal_queue_put(self._queue, buf._buf)
def put_back(self, buf):
"""
Place :class:`MMALBuffer` *buf* at the front of the queue. This is
used when a buffer was removed from the queue but needs to be put
back at the front where it was originally taken from.
"""
mmal.mmal_queue_put_back(self._queue, buf._buf)
class MMALPool(object):
"""
Represents an MMAL pool containing :class:`MMALBuffer` objects. All active
ports are associated with a pool of buffers, and a queue. Instances can be
treated as a sequence of :class:`MMALBuffer` objects but this is only
recommended for debugging purposes; otherwise, use the :meth:`get_buffer`,
:meth:`send_buffer`, and :meth:`send_all_buffers` methods which work with
the encapsulated :class:`MMALQueue`.
"""
__slots__ = ('_pool', '_queue')
def __init__(self, pool):
self._pool = pool
super(MMALPool, self).__init__()
self._queue = MMALQueue(pool[0].queue)
def __len__(self):
return self._pool[0].headers_num
def __getitem__(self, index):
return MMALBuffer(self._pool[0].header[index])
@property
def queue(self):
"""
The :class:`MMALQueue` associated with the pool.
"""
return self._queue
def close(self):
if self._pool is not None:
mmal.mmal_pool_destroy(self._pool)
self._pool = None
def resize(self, new_count, new_size):
"""
Resizes the pool to contain *new_count* buffers with *new_size* bytes
allocated to each buffer.
*new_count* must be 1 or more (you cannot resize a pool to contain
no headers). However, *new_size* can be 0 which causes all payload
buffers to be released.
.. warning::
If the pool is associated with a port, the port must be disabled
when resizing the pool.
"""
mmal_check(
mmal.mmal_pool_resize(self._pool, new_count, new_size),
prefix='unable to resize pool')
def get_buffer(self, block=True, timeout=None):
"""
Get the next buffer from the pool's queue. See :meth:`MMALQueue.get`
for the meaning of the parameters.
"""
return self._queue.get(block, timeout)
def send_buffer(self, port, block=True, timeout=None):
"""
Get a buffer from the pool's queue and send it to *port*. *block* and
*timeout* act as they do in :meth:`get_buffer`. If no buffer is
available (for the values of *block* and *timeout*,
:exc:`~picamera.PiCameraMMALError` is raised).
"""
buf = self.get_buffer(block, timeout)
if buf is None:
raise PiCameraMMALError(mmal.MMAL_EAGAIN, 'no buffers available')
port.send_buffer(buf)
def send_all_buffers(self, port, block=True, timeout=None):
"""
Send all buffers from the queue to *port*. *block* and *timeout* act as
they do in :meth:`get_buffer`. If no buffer is available (for the
values of *block* and *timeout*, :exc:`~picamera.PiCameraMMALError` is
raised).
"""
for i in range(len(self._queue)):
self.send_buffer(port, block, timeout)
class MMALPortPool(MMALPool):
"""
Construct an MMAL pool for the number and size of buffers required by
the :class:`MMALPort` *port*.
"""
__slots__ = ('_port',)
def __init__(self, port):
pool = mmal.mmal_port_pool_create(
port._port, port._port[0].buffer_num, port._port[0].buffer_size)
if not pool:
raise PiCameraMMALError(
mmal.MMAL_ENOSPC,
'failed to create buffer header pool for port %s' % port.name)
super(MMALPortPool, self).__init__(pool)
self._port = port
def close(self):
if self._pool is not None:
mmal.mmal_port_pool_destroy(self._port._port, self._pool)
self._port = None
self._pool = None
super(MMALPortPool, self).close()
@property
def port(self):
return self._port
def send_buffer(self, port=None, block=True, timeout=None):
"""
Get a buffer from the pool and send it to *port* (or the port the pool
is associated with by default). *block* and *timeout* act as they do in
:meth:`MMALPool.get_buffer`.
"""
if port is None:
port = self._port
super(MMALPortPool, self).send_buffer(port, block, timeout)
def send_all_buffers(self, port=None, block=True, timeout=None):
"""
Send all buffers from the pool to *port* (or the port the pool is
associated with by default). *block* and *timeout* act as they do in
:meth:`MMALPool.get_buffer`.
"""
if port is None:
port = self._port
super(MMALPortPool, self).send_all_buffers(port, block, timeout)
class MMALBaseConnection(MMALObject):
"""
Abstract base class for :class:`MMALConnection` and
:class:`MMALPythonConnection`. Handles weakrefs to the source and
target ports, and format negotiation. All other connection details are
handled by the descendent classes.
"""
__slots__ = ('_source', '_target')
default_formats = ()
compatible_opaque_formats = {
('OPQV-single', 'OPQV-single'),
('OPQV-dual', 'OPQV-dual'),
('OPQV-strips', 'OPQV-strips'),
('OPQV-dual', 'OPQV-single'),
('OPQV-single', 'OPQV-dual'), # recent firmwares permit this
}
def __init__(
self, source, target, formats=default_formats):
super(MMALBaseConnection, self).__init__()
if not isinstance(source, (MMALPort, MMALPythonPort)):
raise PiCameraValueError('source is not a port')
if not isinstance(target, (MMALPort, MMALPythonPort)):
raise PiCameraValueError('target is not a port')
if source.type != mmal.MMAL_PORT_TYPE_OUTPUT:
raise PiCameraValueError('source is not an output port')
if target.type != mmal.MMAL_PORT_TYPE_INPUT:
raise PiCameraValueError('target is not an input port')
if source.connection is not None:
raise PiCameraValueError('source port is already connected')
if target.connection is not None:
raise PiCameraValueError('target port is already connected')
if formats is None:
formats = ()
self._source = source
self._target = target
try:
iter(formats)
except TypeError:
formats = (formats,)
self._negotiate_format(formats)
source._connection = self
target._connection = self
# Descendents continue with connection implementation...
def close(self):
if self._source is not None:
self._source._connection = None
self._source = None
if self._target is not None:
self._target._connection = None
self._target = None
def _negotiate_format(self, formats):
def copy_format():
self._source.commit()
self._target.copy_from(self._source)
self._target.commit()
def max_buffers():
self._source.buffer_count = self._target.buffer_count = max(
self._source.buffer_count, self._target.buffer_count)
self._source.buffer_size = self._target.buffer_size = max(
self._source.buffer_size, self._target.buffer_size)
# Filter out formats that aren't supported on both source and target
# ports. This is a little tricky as ports that support OPAQUE never
# claim they do (so we have to assume it's mutually supported)
mutually_supported = (
set(self._source.supported_formats) &
set(self._target.supported_formats)
) | {mmal.MMAL_ENCODING_OPAQUE}
formats = [f for f in formats if f in mutually_supported]
if formats:
# If there are any formats left to try, perform the negotiation
# with the filtered list. Again, there's some special casing to
# deal with the incompatible OPAQUE sub-formats
for f in formats:
if f == mmal.MMAL_ENCODING_OPAQUE:
if (self._source.opaque_subformat,
self._target.opaque_subformat) in self.compatible_opaque_formats:
self._source.format = mmal.MMAL_ENCODING_OPAQUE
else:
continue
else:
self._source.format = f
try:
copy_format()
except PiCameraMMALError as e:
if e.status != mmal.MMAL_EINVAL:
raise
continue
else:
max_buffers()
return
raise PiCameraMMALError(
mmal.MMAL_EINVAL, 'failed to negotiate port format')
else:
# If no formats are available to try (either from filtering or
# because none were given), assume the source port is set up
# properly. Just copy the format to the target and hope the caller
# knows what they're doing
try:
copy_format()
except PiCameraMMALError as e:
if e.status != mmal.MMAL_EINVAL:
raise
raise PiCameraMMALError(
mmal.MMAL_EINVAL, 'failed to copy source format to target port')
else:
max_buffers()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
@property
def source(self):
"""
The source :class:`MMALPort` or :class:`MMALPythonPort` of the
connection.
"""
return self._source
@property
def target(self):
"""
The target :class:`MMALPort` or :class:`MMALPythonPort` of the
connection.
"""
return self._target
class MMALConnection(MMALBaseConnection):
"""
Represents an MMAL internal connection between two components. The
constructor accepts arguments providing the *source* :class:`MMALPort` and
*target* :class:`MMALPort`.
The *formats* parameter specifies an iterable of formats (in preference
order) that the connection may attempt when negotiating formats between
the two ports. If this is ``None``, or an empty iterable, no negotiation
will take place and the source port's format will simply be copied to the
target port. Otherwise, the iterable will be worked through in order until
a format acceptable to both ports is discovered.
.. note::
The default *formats* list starts with OPAQUE; the class understands
the different OPAQUE sub-formats (see :ref:`mmal` for more information)
and will only select OPAQUE if compatible sub-formats can be used on
both ports.
The *callback* parameter can optionally specify a callable which will be
executed for each buffer that traverses the connection (providing an
opportunity to manipulate or drop that buffer). If specified, it must be a
callable which accepts two parameters: the :class:`MMALConnection` object
sending the data, and the :class:`MMALBuffer` object containing data. The
callable may optionally manipulate the :class:`MMALBuffer` and return it
to permit it to continue traversing the connection, or return ``None``
in which case the buffer will be released.
.. note::
There is a significant performance penalty for specifying a
callback between MMAL components as it requires buffers to be
copied from the GPU's memory to the CPU's memory and back again.
.. data:: default_formats
:annotation: = (MMAL_ENCODING_OPAQUE, MMAL_ENCODING_I420, MMAL_ENCODING_RGB24, MMAL_ENCODING_BGR24, MMAL_ENCODING_RGBA, MMAL_ENCODING_BGRA)
Class attribute defining the default formats used to negotiate
connections between MMAL components.
"""
__slots__ = ('_connection', '_callback', '_wrapper')
default_formats = (
mmal.MMAL_ENCODING_OPAQUE,
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
)
def __init__(
self, source, target, formats=default_formats, callback=None):
if not isinstance(source, MMALPort):
raise PiCameraValueError('source is not an MMAL port')
if not isinstance(target, MMALPort):
raise PiCameraValueError('target is not an MMAL port')
super(MMALConnection, self).__init__(source, target, formats)
self._connection = ct.POINTER(mmal.MMAL_CONNECTION_T)()
self._callback = callback
flags = mmal.MMAL_CONNECTION_FLAG_ALLOCATION_ON_INPUT
if callback is None:
flags |= mmal.MMAL_CONNECTION_FLAG_TUNNELLING
try:
mmal_check(
mmal.mmal_connection_create(
self._connection, source._port, target._port, flags),
prefix="Failed to create connection")
except:
self._connection = None
raise
def close(self):
if self._connection is not None:
mmal.mmal_connection_destroy(self._connection)
self._connection = None
self._wrapper = None
super(MMALConnection, self).close()
@property
def enabled(self):
"""
Returns ``True`` if the connection is enabled. Use :meth:`enable`
and :meth:`disable` to control the state of the connection.
"""
return bool(self._connection[0].is_enabled)
def enable(self):
"""
Enable the connection. When a connection is enabled, data is
continually transferred from the output port of the source to the input
port of the target component.
"""
def wrapper(connection):
buf = mmal.mmal_queue_get(connection[0].queue)
if buf:
buf = MMALBuffer(buf)
try:
modified_buf = self._callback(self, buf)
except:
buf.release()
raise
else:
if modified_buf is not None:
try:
self._target.send_buffer(modified_buf)
except PiCameraPortDisabled:
# Target port disabled; ignore the error
pass
else:
buf.release()
return
buf = mmal.mmal_queue_get(connection[0].pool[0].queue)
if buf:
buf = MMALBuffer(buf)
try:
self._source.send_buffer(buf)
except PiCameraPortDisabled:
# Source port has been disabled; ignore the error
pass
if self._callback is not None:
self._wrapper = mmal.MMAL_CONNECTION_CALLBACK_T(wrapper)
self._connection[0].callback = self._wrapper
self._source.params[mmal.MMAL_PARAMETER_ZERO_COPY] = True
self._target.params[mmal.MMAL_PARAMETER_ZERO_COPY] = True
mmal_check(
mmal.mmal_connection_enable(self._connection),
prefix="Failed to enable connection")
if self._callback is not None:
MMALPool(self._connection[0].pool).send_all_buffers(self._source)
def disable(self):
"""
Disables the connection.
"""
mmal_check(
mmal.mmal_connection_disable(self._connection),
prefix="Failed to disable connection")
self._wrapper = None
@property
def name(self):
return self._connection[0].name.decode('ascii')
def __repr__(self):
if self._connection is not None:
return '<MMALConnection "%s">' % self.name
else:
return '<MMALConnection closed>'
class MMALRawCamera(MMALBaseComponent):
"""
The MMAL "raw camera" component.
Don't use this! If you insist on using this anyway, read the forum post
about `raw sensor access`_ first.
.. raw sensor access: https://www.raspberrypi.org/forums/viewtopic.php?f=43&t=109137
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_RAW_CAMERA
opaque_input_subformats = ()
opaque_output_subformats = ('OPQV-single',)
class MMALCamera(MMALBaseComponent):
"""
Represents the MMAL camera component. This component has 0 input ports and
3 output ports. The intended use of the output ports (which in turn
determines the behaviour of those ports) is as follows:
* Port 0 is intended for preview renderers
* Port 1 is intended for video recording
* Port 2 is intended for still image capture
Use the ``MMAL_PARAMETER_CAMERA_CONFIG`` parameter on the control port to
obtain and manipulate the camera's configuration.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_CAMERA
opaque_output_subformats = ('OPQV-single', 'OPQV-dual', 'OPQV-strips')
annotate_structs = (
mmal.MMAL_PARAMETER_CAMERA_ANNOTATE_T,
mmal.MMAL_PARAMETER_CAMERA_ANNOTATE_V2_T,
mmal.MMAL_PARAMETER_CAMERA_ANNOTATE_V3_T,
)
def __init__(self):
global FIX_RGB_BGR_ORDER
super(MMALCamera, self).__init__()
if PARAM_TYPES[mmal.MMAL_PARAMETER_ANNOTATE] is None:
found = False
# try largest struct to smallest as later firmwares still happily
# accept earlier revision structures
# XXX do old firmwares reject too-large structs?
for struct in reversed(MMALCamera.annotate_structs):
try:
PARAM_TYPES[mmal.MMAL_PARAMETER_ANNOTATE] = struct
self.control.params[mmal.MMAL_PARAMETER_ANNOTATE]
except PiCameraMMALError:
pass
else:
found = True
break
if not found:
PARAM_TYPES[mmal.MMAL_PARAMETER_ANNOTATE] = None
raise PiCameraMMALError(
mmal.MMAL_EINVAL, "unknown camera annotation structure revision")
if FIX_RGB_BGR_ORDER is None:
# old firmware lists BGR24 before RGB24 in supported_formats
for f in self.outputs[1].supported_formats:
if f == mmal.MMAL_ENCODING_BGR24:
FIX_RGB_BGR_ORDER = True
break
elif f == mmal.MMAL_ENCODING_RGB24:
FIX_RGB_BGR_ORDER = False
break
def _get_annotate_rev(self):
try:
return MMALCamera.annotate_structs.index(PARAM_TYPES[mmal.MMAL_PARAMETER_ANNOTATE]) + 1
except IndexError:
raise PiCameraMMALError(
mmal.MMAL_EINVAL, "unknown camera annotation structure revision")
def _set_annotate_rev(self, value):
try:
PARAM_TYPES[mmal.MMAL_PARAMETER_ANNOTATE] = MMALCamera.annotate_structs[value - 1]
except IndexError:
raise PiCameraMMALError(
mmal.MMAL_EINVAL, "invalid camera annotation structure revision")
annotate_rev = property(_get_annotate_rev, _set_annotate_rev, doc="""\
The annotation capabilities of the firmware have evolved over time and
several structures are available for querying and setting video
annotations. By default the :class:`MMALCamera` class will pick the
latest annotation structure supported by the current firmware but you
can select older revisions with :attr:`annotate_rev` for other purposes
(e.g. testing).
""")
class MMALCameraInfo(MMALBaseComponent):
"""
Represents the MMAL camera-info component. Query the
``MMAL_PARAMETER_CAMERA_INFO`` parameter on the control port to obtain
information about the connected camera module.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_CAMERA_INFO
info_structs = (
mmal.MMAL_PARAMETER_CAMERA_INFO_T,
mmal.MMAL_PARAMETER_CAMERA_INFO_V2_T,
)
def __init__(self):
super(MMALCameraInfo, self).__init__()
if PARAM_TYPES[mmal.MMAL_PARAMETER_CAMERA_INFO] is None:
found = False
# try smallest structure to largest as later firmwares reject
# older structures
for struct in MMALCameraInfo.info_structs:
try:
PARAM_TYPES[mmal.MMAL_PARAMETER_CAMERA_INFO] = struct
self.control.params[mmal.MMAL_PARAMETER_CAMERA_INFO]
except PiCameraMMALError:
pass
else:
found = True
break
if not found:
PARAM_TYPES[mmal.MMAL_PARAMETER_CAMERA_INFO] = None
raise PiCameraMMALError(
mmal.MMAL_EINVAL, "unknown camera info structure revision")
def _get_info_rev(self):
try:
return MMALCameraInfo.info_structs.index(PARAM_TYPES[mmal.MMAL_PARAMETER_CAMERA_INFO]) + 1
except IndexError:
raise PiCameraMMALError(
mmal.MMAL_EINVAL, "unknown camera info structure revision")
def _set_info_rev(self, value):
try:
PARAM_TYPES[mmal.MMAL_PARAMETER_CAMERA_INFO] = MMALCameraInfo.info_structs[value - 1]
except IndexError:
raise PiCameraMMALError(
mmal.MMAL_EINVAL, "invalid camera info structure revision")
info_rev = property(_get_info_rev, _set_info_rev, doc="""\
The camera information capabilities of the firmware have evolved over
time and several structures are available for querying camera
information. When initialized, :class:`MMALCameraInfo` will attempt
to discover which structure is in use by the extant firmware. This
property can be used to discover the structure version and to modify
the version in use for other purposes (e.g. testing).
""")
class MMALComponent(MMALBaseComponent):
"""
Represents an MMAL component that acts as a filter of some sort, with a
single input that connects to an upstream source port. This is an asbtract
base class.
"""
__slots__ = ()
def __init__(self):
super(MMALComponent, self).__init__()
assert len(self.opaque_input_subformats) == 1
def close(self):
self.disconnect()
super(MMALComponent, self).close()
def enable(self):
super(MMALComponent, self).enable()
if self.connection is not None:
self.connection.enable()
def disable(self):
if self.connection is not None:
self.connection.disable()
super(MMALComponent, self).disable()
def connect(self, source, **options):
"""
Connects the input port of this component to the specified *source*
:class:`MMALPort` or :class:`MMALPythonPort`. Alternatively, as a
convenience (primarily intended for command line experimentation; don't
use this in scripts), *source* can be another component in which case
the first unconnected output port will be selected as *source*.
Keyword arguments will be passed along to the connection constructor.
See :class:`MMALConnection` and :class:`MMALPythonConnection` for
further information.
"""
if isinstance(source, (MMALPort, MMALPythonPort)):
return self.inputs[0].connect(source)
else:
for port in source.outputs:
if not port.connection:
return self.inputs[0].connect(port, **options)
raise PiCameraMMALError(
mmal.MMAL_EINVAL, 'no free output ports on %r' % source)
def disconnect(self):
"""
Destroy the connection between this component's input port and the
upstream component.
"""
self.inputs[0].disconnect()
@property
def connection(self):
"""
The :class:`MMALConnection` or :class:`MMALPythonConnection` object
linking this component to the upstream component.
"""
return self.inputs[0].connection
class MMALSplitter(MMALComponent):
"""
Represents the MMAL splitter component. This component has 1 input port
and 4 output ports which all generate duplicates of buffers passed to the
input port.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_VIDEO_SPLITTER
opaque_input_subformats = ('OPQV-single',)
opaque_output_subformats = ('OPQV-single',) * 4
class MMALISPResizer(MMALComponent):
"""
Represents the MMAL ISP resizer component. This component has 1 input port
and 1 output port, and supports resizing via the VideoCore ISP, along with
conversion of numerous formats into numerous other formats (e.g. OPAQUE to
RGB, etc). This is more efficient than :class:`MMALResizer` but is only
available on later firmware versions.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_ISP
opaque_input_subformats = ('OPQV-single',)
opaque_output_subformats = (None,)
class MMALResizer(MMALComponent):
"""
Represents the MMAL VPU resizer component. This component has 1 input port
and 1 output port. This supports resizing via the VPU. This is not as
efficient as :class:`MMALISPResizer` but is available on all firmware
verions. The output port can (and usually should) have a different frame
size to the input port.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_RESIZER
opaque_input_subformats = (None,)
opaque_output_subformats = (None,)
class MMALEncoder(MMALComponent):
"""
Represents a generic MMAL encoder. This is an abstract base class.
"""
__slots__ = ()
class MMALVideoEncoder(MMALEncoder):
"""
Represents the MMAL video encoder component. This component has 1 input
port and 1 output port. The output port is usually configured with
``MMAL_ENCODING_H264`` or ``MMAL_ENCODING_MJPEG``.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_VIDEO_ENCODER
opaque_input_subformats = ('OPQV-dual',)
opaque_output_subformats = (None,)
class MMALImageEncoder(MMALEncoder):
"""
Represents the MMAL image encoder component. This component has 1 input
port and 1 output port. The output port is typically configured with
``MMAL_ENCODING_JPEG`` but can also use ``MMAL_ENCODING_PNG``,
``MMAL_ENCODING_GIF``, etc.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_IMAGE_ENCODER
opaque_input_subformats = ('OPQV-strips',)
opaque_output_subformats = (None,)
class MMALDecoder(MMALComponent):
"""
Represents a generic MMAL decoder. This is an abstract base class.
"""
__slots__ = ()
class MMALVideoDecoder(MMALDecoder):
"""
Represents the MMAL video decoder component. This component has 1 input
port and 1 output port. The input port is usually configured with
``MMAL_ENCODING_H264`` or ``MMAL_ENCODING_MJPEG``.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_VIDEO_DECODER
opaque_input_subformats = (None,)
opaque_output_subformats = ('OPQV-single',)
class MMALImageDecoder(MMALDecoder):
"""
Represents the MMAL iamge decoder component. This component has 1 input
port and 1 output port. The input port is usually configured with
``MMAL_ENCODING_JPEG``.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_IMAGE_DECODER
opaque_input_subformats = (None,)
opaque_output_subformats = ('OPQV-single',)
class MMALRenderer(MMALComponent):
"""
Represents the MMAL renderer component. This component has 1 input port and
0 output ports. It is used to implement the camera preview and overlays.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_VIDEO_RENDERER
opaque_input_subformats = ('OPQV-single',)
class MMALNullSink(MMALComponent):
"""
Represents the MMAL null-sink component. This component has 1 input port
and 0 output ports. It is used to keep the preview port "alive" (and thus
calculating white-balance and exposure) when the camera preview is not
required.
"""
__slots__ = ()
component_type = mmal.MMAL_COMPONENT_DEFAULT_NULL_SINK
opaque_input_subformats = ('OPQV-single',)
class MMALPythonPort(MMALObject):
"""
Implements ports for Python-based MMAL components.
"""
__slots__ = (
'_buffer_count',
'_buffer_size',
'_connection',
'_enabled',
'_owner',
'_pool',
'_type',
'_index',
'_supported_formats',
'_format',
'_callback',
)
_FORMAT_BPP = {
'I420': 1.5,
'RGB3': 3,
'RGBA': 4,
'BGR3': 3,
'BGRA': 4,
}
def __init__(self, owner, port_type, index):
self._buffer_count = 2
self._buffer_size = 0
self._connection = None
self._enabled = False
self._owner = weakref.ref(owner)
self._pool = None
self._callback = None
self._type = port_type
self._index = index
self._supported_formats = {
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
}
self._format = ct.pointer(mmal.MMAL_ES_FORMAT_T(
type=mmal.MMAL_ES_TYPE_VIDEO,
encoding=mmal.MMAL_ENCODING_I420,
es=ct.pointer(mmal.MMAL_ES_SPECIFIC_FORMAT_T())))
def close(self):
self.disconnect()
self.disable()
self._format = None
def __repr__(self):
return '<MMALPythonPort "%s": format=MMAL_FOURCC(%r) buffers=%dx%d frames=%s@%sfps>' % (
self.name, mmal.FOURCC_str(self.format), self.buffer_count,
self.buffer_size, self.framesize, self.framerate)
def _get_bitrate(self):
return self._format[0].bitrate
def _set_bitrate(self, value):
self._format[0].bitrate = value
bitrate = property(_get_bitrate, _set_bitrate, doc="""\
Retrieves or sets the bitrate limit for the port's format.
""")
def _get_supported_formats(self):
return self._supported_formats
def _set_supported_formats(self, value):
try:
value = {f for f in value}
except TypeError:
value = {value}
if not value:
raise PiCameraMMALError(
mmal.MMAL_EINVAL, "port must have at least one valid format")
self._supported_formats = value
supported_formats = property(_get_supported_formats, _set_supported_formats, doc="""\
Retrieves or sets the set of valid formats for this port. The set must
always contain at least one valid format. A single format can be
specified; it will be converted implicitly to a singleton set.
If the current port :attr:`format` is not a member of the new set, no
error is raised. An error will be raised when :meth:`commit` is next
called if :attr:`format` is still not a member of the set.
""")
def _get_format(self):
return self._format[0].encoding
def _set_format(self, value):
self._format[0].encoding = value
format = property(_get_format, _set_format, doc="""\
Retrieves or sets the encoding format of the port. Setting this
attribute implicitly sets the encoding variant to a sensible value
(I420 in the case of OPAQUE).
""")
def _get_framesize(self):
return PiResolution(
self._format[0].es[0].video.crop.width,
self._format[0].es[0].video.crop.height,
)
def _set_framesize(self, value):
value = to_resolution(value)
video = self._format[0].es[0].video
video.width = bcm_host.VCOS_ALIGN_UP(value.width, 32)
video.height = bcm_host.VCOS_ALIGN_UP(value.height, 16)
video.crop.width = value.width
video.crop.height = value.height
framesize = property(_get_framesize, _set_framesize, doc="""\
Retrieves or sets the size of the source's video frames as a (width,
height) tuple. This attribute implicitly handles scaling the given
size up to the block size of the camera (32x16).
""")
def _get_framerate(self):
video = self._format[0].es[0].video
try:
return Fraction(
video.frame_rate.num,
video.frame_rate.den)
except ZeroDivisionError:
return Fraction(0, 1)
def _set_framerate(self, value):
value = to_fraction(value)
video = self._format[0].es[0].video
video.frame_rate.num = value.numerator
video.frame_rate.den = value.denominator
framerate = property(_get_framerate, _set_framerate, doc="""\
Retrieves or sets the framerate of the port's video frames in fps.
""")
@property
def pool(self):
"""
Returns the :class:`MMALPool` associated with the buffer, if any.
"""
return self._pool
@property
def opaque_subformat(self):
return None
def _get_buffer_count(self):
return self._buffer_count
def _set_buffer_count(self, value):
if value < 1:
raise PiCameraMMALError(mmal.MMAL_EINVAL, 'buffer count <1')
self._buffer_count = int(value)
buffer_count = property(_get_buffer_count, _set_buffer_count, doc="""\
The number of buffers allocated (or to be allocated) to the port. The
default is 2 but more may be required in the case of long pipelines
with replicated buffers.
""")
def _get_buffer_size(self):
return self._buffer_size
def _set_buffer_size(self, value):
if value < 0:
raise PiCameraMMALError(mmal.MMAL_EINVAL, 'buffer size <0')
self._buffer_size = value
buffer_size = property(_get_buffer_size, _set_buffer_size, doc="""\
The size of buffers allocated (or to be allocated) to the port. The
size of buffers defaults to a value dictated by the port's format.
""")
def copy_from(self, source):
"""
Copies the port's :attr:`format` from the *source*
:class:`MMALControlPort`.
"""
if isinstance(source, MMALPythonPort):
mmal.mmal_format_copy(self._format, source._format)
else:
mmal.mmal_format_copy(self._format, source._port[0].format)
def commit(self):
"""
Commits the port's configuration and automatically updates the number
and size of associated buffers. This is typically called after
adjusting the port's format and/or associated settings (like width and
height for video ports).
"""
if self.format not in self.supported_formats:
raise PiCameraMMALError(
mmal.MMAL_EINVAL, 'invalid format for port %r' % self)
self._buffer_count = 2
video = self._format[0].es[0].video
try:
self._buffer_size = int(
MMALPythonPort._FORMAT_BPP[str(self.format)]
* video.width
* video.height)
except KeyError:
# If it's an unknown / encoded format just leave the buffer size
# alone and hope the owning component knows what to set
pass
self._owner()._commit_port(self)
@property
def enabled(self):
"""
Returns a :class:`bool` indicating whether the port is currently
enabled. Unlike other classes, this is a read-only property. Use
:meth:`enable` and :meth:`disable` to modify the value.
"""
return self._enabled
def enable(self, callback=None):
"""
Enable the port with the specified callback function (this must be
``None`` for connected ports, and a callable for disconnected ports).
The callback function must accept two parameters which will be this
:class:`MMALControlPort` (or descendent) and an :class:`MMALBuffer`
instance. Any return value will be ignored.
"""
if self._connection is not None:
if callback is not None:
raise PiCameraMMALError(
mmal.MMAL_EINVAL,
'connected ports must be enabled without callback')
else:
if callback is None:
raise PiCameraMMALError(
mmal.MMAL_EINVAL,
'unconnected ports must be enabled with callback')
if self.type == mmal.MMAL_PORT_TYPE_INPUT or self._connection is None:
self._pool = MMALPythonPortPool(self)
self._callback = callback
self._enabled = True
def disable(self):
"""
Disable the port.
"""
self._enabled = False
if self._pool is not None:
# Release any unprocessed buffers from the owner's queue before
# we destroy them all
while True:
buf = self._owner()._queue.get(False)
if buf:
buf.release()
else:
break
self._pool.close()
self._pool = None
self._callback = None
def get_buffer(self, block=True, timeout=None):
"""
Returns a :class:`MMALBuffer` from the associated :attr:`pool`. *block*
and *timeout* act as they do in the corresponding
:meth:`MMALPool.get_buffer`.
"""
if not self._enabled:
raise PiCameraPortDisabled(
'cannot get buffer from disabled port %s' % self.name)
if self._pool is not None:
# Unconnected port or input port case; retrieve buffer from the
# allocated pool
return self._pool.get_buffer(block, timeout)
else:
# Connected output port case; get a buffer from the target input
# port (in this case the port is just a thin proxy for the
# corresponding input port)
assert self.type == mmal.MMAL_PORT_TYPE_OUTPUT
return self._connection.target.get_buffer(block, timeout)
def send_buffer(self, buf):
"""
Send :class:`MMALBuffer` *buf* to the port.
"""
# NOTE: The MMALPythonConnection callback must occur *before* the test
# for the port being enabled; it's meant to be the connection making
# the callback prior to the buffer getting to the port after all
if (
self.type == mmal.MMAL_PORT_TYPE_INPUT and
self._connection._callback is not None):
try:
modified_buf = self._connection._callback(self._connection, buf)
except:
buf.release()
raise
else:
if modified_buf is None:
buf.release()
else:
buf = modified_buf
if not self._enabled:
raise PiCameraPortDisabled(
'cannot send buffer to disabled port %s' % self.name)
if self._callback is not None:
# but what about output ports?
try:
# XXX Return value? If it's an input port we should ignore it,
self._callback(self, buf)
except:
buf.release()
raise
if self._type == mmal.MMAL_PORT_TYPE_INPUT:
# Input port case; queue the buffer for processing on the
# owning component
self._owner()._queue.put(buf)
elif self._connection is None:
# Unconnected output port case; release the buffer back to the
# pool
buf.release()
else:
# Connected output port case; forward the buffer to the
# connected component's input port
# XXX If it's a format-change event?
self._connection.target.send_buffer(buf)
@property
def name(self):
return '%s:%s:%d' % (self._owner().name, {
mmal.MMAL_PORT_TYPE_OUTPUT: 'out',
mmal.MMAL_PORT_TYPE_INPUT: 'in',
mmal.MMAL_PORT_TYPE_CONTROL: 'control',
mmal.MMAL_PORT_TYPE_CLOCK: 'clock',
}[self.type], self._index)
@property
def type(self):
"""
The type of the port. One of:
* MMAL_PORT_TYPE_OUTPUT
* MMAL_PORT_TYPE_INPUT
* MMAL_PORT_TYPE_CONTROL
* MMAL_PORT_TYPE_CLOCK
"""
return self._type
@property
def capabilities(self):
"""
The capabilities of the port. A bitfield of the following:
* MMAL_PORT_CAPABILITY_PASSTHROUGH
* MMAL_PORT_CAPABILITY_ALLOCATION
* MMAL_PORT_CAPABILITY_SUPPORTS_EVENT_FORMAT_CHANGE
"""
return mmal.MMAL_PORT_CAPABILITY_SUPPORTS_EVENT_FORMAT_CHANGE
@property
def index(self):
"""
Returns an integer indicating the port's position within its owning
list (inputs, outputs, etc.)
"""
return self._index
@property
def connection(self):
"""
If this port is connected to another, this property holds the
:class:`MMALConnection` or :class:`MMALPythonConnection` object which
represents that connection. If this port is not connected, this
property is ``None``.
"""
return self._connection
def connect(self, other, **options):
"""
Connect this port to the *other* :class:`MMALPort` (or
:class:`MMALPythonPort`). The type and configuration of the connection
will be automatically selected.
Various connection options can be specified as keyword arguments. These
will be passed onto the :class:`MMALConnection` or
:class:`MMALPythonConnection` constructor that is called (see those
classes for an explanation of the available options).
"""
# Always construct connections from the output end
if self.type != mmal.MMAL_PORT_TYPE_OUTPUT:
return other.connect(self, **options)
if other.type != mmal.MMAL_PORT_TYPE_INPUT:
raise PiCameraValueError(
'A connection can only be established between an output and '
'an input port')
return MMALPythonConnection(self, other, **options)
def disconnect(self):
"""
Destroy the connection between this port and another port.
"""
if self.connection is not None:
self.connection.close()
class MMALPythonPortPool(MMALPool):
"""
Creates a pool of buffer headers for an :class:`MMALPythonPort`. This is
only used when a fake port is used without a corresponding
:class:`MMALPythonConnection`.
"""
__slots__ = ('_port',)
def __init__(self, port):
super(MMALPythonPortPool, self).__init__(
mmal.mmal_pool_create(port.buffer_count, port.buffer_size))
self._port = port
@property
def port(self):
return self._port
def send_buffer(self, port=None, block=True, timeout=None):
"""
Get a buffer from the pool and send it to *port* (or the port the pool
is associated with by default). *block* and *timeout* act as they do in
:meth:`MMALPool.get_buffer`.
"""
if port is None:
port = self._port
super(MMALPythonPortPool, self).send_buffer(port, block, timeout)
def send_all_buffers(self, port=None, block=True, timeout=None):
"""
Send all buffers from the pool to *port* (or the port the pool is
associated with by default). *block* and *timeout* act as they do in
:meth:`MMALPool.get_buffer`.
"""
if port is None:
port = self._port
super(MMALPythonPortPool, self).send_all_buffers(port, block, timeout)
class MMALPythonBaseComponent(MMALObject):
"""
Base class for Python-implemented MMAL components. This class provides the
:meth:`_commit_port` method used by descendents to control their ports'
behaviour, and the :attr:`enabled` property. However, it is unlikely that
users will want to sub-class this directly. See
:class:`MMALPythonComponent` for a more useful starting point.
"""
__slots__ = ('_inputs', '_outputs', '_enabled',)
def __init__(self):
super(MMALPythonBaseComponent, self).__init__()
self._enabled = False
self._inputs = ()
self._outputs = ()
# TODO Control port?
def close(self):
"""
Close the component and release all its resources. After this is
called, most methods will raise exceptions if called.
"""
self.disable()
@property
def enabled(self):
"""
Returns ``True`` if the component is currently enabled. Use
:meth:`enable` and :meth:`disable` to control the component's state.
"""
return self._enabled
def enable(self):
"""
Enable the component. When a component is enabled it will process data
sent to its input port(s), sending the results to buffers on its output
port(s). Components may be implicitly enabled by connections.
"""
self._enabled = True
def disable(self):
"""
Disables the component.
"""
self._enabled = False
@property
def control(self):
"""
The :class:`MMALControlPort` control port of the component which can be
used to configure most aspects of the component's behaviour.
"""
return None
@property
def inputs(self):
"""
A sequence of :class:`MMALPort` objects representing the inputs
of the component.
"""
return self._inputs
@property
def outputs(self):
"""
A sequence of :class:`MMALPort` objects representing the outputs
of the component.
"""
return self._outputs
def _commit_port(self, port):
"""
Called by ports when their format is committed. Descendents may
override this to reconfigure output ports when input ports are
committed, or to raise errors if the new port configuration is
unacceptable.
.. warning::
This method must *not* reconfigure input ports when called; however
it can reconfigure *output* ports when input ports are committed.
"""
pass
def __repr__(self):
if self._outputs:
return '<%s "%s": %d inputs %d outputs>' % (
self.__class__.__name__, self.name,
len(self.inputs), len(self.outputs))
else:
return '<%s closed>' % self.__class__.__name__
class MMALPythonSource(MMALPythonBaseComponent):
"""
Provides a source for other :class:`MMALComponent` instances. The
specified *input* is read in chunks the size of the configured output
buffer(s) until the input is exhausted. The :meth:`wait` method can be
used to block until this occurs. If the output buffer is configured to
use a full-frame unencoded format (like I420 or RGB), frame-end flags will
be automatically generated by the source. When the input is exhausted an
empty buffer with the End Of Stream (EOS) flag will be sent.
The component provides all picamera's usual IO-handling characteristics; if
*input* is a string, a file with that name will be opened as the input and
closed implicitly when the component is closed. Otherwise, the input will
not be closed implicitly (the component did not open it, so the assumption
is that closing *input* is the caller's responsibility). If *input* is an
object with a ``read`` method it is assumed to be a file-like object and is
used as is. Otherwise, *input* is assumed to be a readable object
supporting the buffer protocol (which is wrapped in a :class:`BufferIO`
stream).
"""
__slots__ = ('_stream', '_opened', '_thread')
def __init__(self, input):
super(MMALPythonSource, self).__init__()
self._inputs = ()
self._outputs = (MMALPythonPort(self, mmal.MMAL_PORT_TYPE_OUTPUT, 0),)
self._stream, self._opened = open_stream(input, output=False)
self._thread = None
def close(self):
super(MMALPythonSource, self).close()
if self._outputs:
self._outputs[0].close()
self._outputs = ()
if self._stream:
close_stream(self._stream, self._opened)
self._stream = None
def enable(self):
super(MMALPythonSource, self).enable()
self._thread = Thread(target=self._send_run)
self._thread.daemon = True
self._thread.start()
def disable(self):
super(MMALPythonSource, self).disable()
if self._thread:
self._thread.join()
self._thread = None
def wait(self, timeout=None):
"""
Wait for the source to send all bytes from the specified input. If
*timeout* is specified, it is the number of seconds to wait for
completion. The method returns ``True`` if the source completed within
the specified timeout and ``False`` otherwise.
"""
if not self.enabled:
raise PiCameraMMALError(
mmal.MMAL_EINVAL, 'cannot wait on disabled component')
self._thread.join(timeout)
return not self._thread.is_alive()
def _send_run(self):
# Calculate the size of a frame if possible (i.e. when the output
# format is an unencoded full frame format). If it's an unknown /
# encoded format, we've no idea what the framesize is (this would
# presumably require decoding the stream) so leave framesize as None.
video = self._outputs[0]._format[0].es[0].video
try:
framesize = (
MMALPythonPort._FORMAT_BPP[str(self._outputs[0].format)]
* video.width
* video.height)
except KeyError:
framesize = None
frameleft = framesize
while self.enabled:
buf = self._outputs[0].get_buffer(timeout=0.1)
if buf:
try:
if frameleft is None:
send = buf.size
else:
send = min(frameleft, buf.size)
with buf as data:
if send == buf.size:
try:
# readinto() is by far the fastest method of
# getting data into the buffer
buf.length = self._stream.readinto(data)
except AttributeError:
# if there's no readinto() method, fallback on
# read() and the data setter (memmove)
buf.data = self._stream.read(buf.size)
else:
buf.data = self._stream.read(send)
if frameleft is not None:
frameleft -= buf.length
if not frameleft:
buf.flags |= mmal.MMAL_BUFFER_HEADER_FLAG_FRAME_END
frameleft = framesize
if not buf.length:
buf.flags |= mmal.MMAL_BUFFER_HEADER_FLAG_EOS
break
finally:
self._outputs[0].send_buffer(buf)
@property
def name(self):
return 'py.source'
class MMALPythonComponent(MMALPythonBaseComponent):
"""
Provides a Python-based MMAL component with a *name*, a single input and
the specified number of *outputs* (default 1). The :meth:`connect` and
:meth:`disconnect` methods can be used to establish or break a connection
from the input port to an upstream component.
Typically descendents will override the :meth:`_handle_frame` method to
respond to buffers sent to the input port, and will set
:attr:`MMALPythonPort.supported_formats` in the constructor to define the
formats that the component will work with.
"""
__slots__ = ('_name', '_thread', '_queue', '_error')
def __init__(self, name='py.component', outputs=1):
super(MMALPythonComponent, self).__init__()
self._name = name
self._thread = None
self._error = None
self._queue = MMALQueue.create()
self._inputs = (MMALPythonPort(self, mmal.MMAL_PORT_TYPE_INPUT, 0),)
self._outputs = tuple(
MMALPythonPort(self, mmal.MMAL_PORT_TYPE_OUTPUT, n)
for n in range(outputs)
)
def close(self):
super(MMALPythonComponent, self).close()
self.disconnect()
if self._inputs:
self._inputs[0].close()
self._inputs = ()
for output in self._outputs:
output.disable()
self._outputs = ()
self._queue.close()
self._queue = None
def connect(self, source, **options):
"""
Connects the input port of this component to the specified *source*
:class:`MMALPort` or :class:`MMALPythonPort`. Alternatively, as a
convenience (primarily intended for command line experimentation; don't
use this in scripts), *source* can be another component in which case
the first unconnected output port will be selected as *source*.
Keyword arguments will be passed along to the connection constructor.
See :class:`MMALConnection` and :class:`MMALPythonConnection` for
further information.
"""
if isinstance(source, (MMALPort, MMALPythonPort)):
return self.inputs[0].connect(source)
else:
for port in source.outputs:
if not port.connection:
return self.inputs[0].connect(port, **options)
raise PiCameraMMALError(
mmal.MMAL_EINVAL, 'no free output ports on %r' % source)
def disconnect(self):
"""
Destroy the connection between this component's input port and the
upstream component.
"""
self.inputs[0].disconnect()
@property
def connection(self):
"""
The :class:`MMALConnection` or :class:`MMALPythonConnection` object
linking this component to the upstream component.
"""
return self.inputs[0].connection
@property
def name(self):
return self._name
def _commit_port(self, port):
"""
Overridden to to copy the input port's configuration to the output
port(s), and to ensure that the output port(s)' format(s) match
the input port's format.
"""
super(MMALPythonComponent, self)._commit_port(port)
if port.type == mmal.MMAL_PORT_TYPE_INPUT:
for output in self.outputs:
output.copy_from(port)
elif port.type == mmal.MMAL_PORT_TYPE_OUTPUT:
if port.format != self.inputs[0].format:
raise PiCameraMMALError(mmal.MMAL_EINVAL, 'output format mismatch')
def enable(self):
super(MMALPythonComponent, self).enable()
if not self._thread:
self._thread = Thread(target=self._thread_run)
self._thread.daemon = True
self._thread.start()
def disable(self):
super(MMALPythonComponent, self).disable()
if self._thread:
self._thread.join()
self._thread = None
if self._error:
raise self._error
def _thread_run(self):
try:
while self._enabled:
buf = self._queue.get(timeout=0.1)
if buf:
try:
handler = {
0: self._handle_frame,
mmal.MMAL_EVENT_PARAMETER_CHANGED: self._handle_parameter_changed,
mmal.MMAL_EVENT_FORMAT_CHANGED: self._handle_format_changed,
mmal.MMAL_EVENT_ERROR: self._handle_error,
mmal.MMAL_EVENT_EOS: self._handle_end_of_stream,
}[buf.command]
if handler(self.inputs[0], buf):
self._enabled = False
finally:
buf.release()
except Exception as e:
self._error = e
self._enabled = False
def _handle_frame(self, port, buf):
"""
Handles frame data buffers (where :attr:`MMALBuffer.command` is set to
0).
Typically, if the component has output ports, the method is expected to
fetch a buffer from the output port(s), write data into them, and send
them back to their respective ports.
Return values are as for normal event handlers (``True`` when no more
buffers are expected, ``False`` otherwise).
"""
return False
def _handle_format_changed(self, port, buf):
"""
Handles format change events passed to the component (where
:attr:`MMALBuffer.command` is set to MMAL_EVENT_FORMAT_CHANGED).
The default implementation re-configures the input port of the
component and emits the event on all output ports for downstream
processing. Override this method if you wish to do something else in
response to format change events.
The *port* parameter is the port into which the event arrived, and
*buf* contains the event itself (a MMAL_EVENT_FORMAT_CHANGED_T
structure). Use ``mmal_event_format_changed_get`` on the buffer's data
to extract the event.
"""
with buf as data:
event = mmal.mmal_event_format_changed_get(buf._buf)
if port.connection:
# Handle format change on the source output port, if any. We
# don't check the output port capabilities because it was the
# port that emitted the format change in the first case so it'd
# be odd if it didn't support them (or the format requested)!
output = port.connection._source
output.disable()
if isinstance(output, MMALPythonPort):
mmal.mmal_format_copy(output._format, event[0].format)
else:
mmal.mmal_format_copy(output._port[0].format, event[0].format)
output.commit()
output.buffer_count = (
event[0].buffer_num_recommended
if event[0].buffer_num_recommended > 0 else
event[0].buffer_num_min)
output.buffer_size = (
event[0].buffer_size_recommended
if event[0].buffer_size_recommended > 0 else
event[0].buffer_size_min)
if isinstance(output, MMALPythonPort):
output.enable()
else:
output.enable(port.connection._transfer)
# Now deal with the format change on this input port (this is only
# called from _thread_run so port must be an input port)
try:
if not (port.capabilities & mmal.MMAL_PORT_CAPABILITY_SUPPORTS_EVENT_FORMAT_CHANGE):
raise PiCameraMMALError(
mmal.MMAL_EINVAL,
'port %s does not support event change' % self.name)
mmal.mmal_format_copy(port._format, event[0].format)
self._commit_port(port)
port.pool.resize(
event[0].buffer_num_recommended
if event[0].buffer_num_recommended > 0 else
event[0].buffer_num_min,
event[0].buffer_size_recommended
if event[0].buffer_size_recommended > 0 else
event[0].buffer_size_min)
port.buffer_count = len(port.pool)
port.buffer_size = port.pool[0].size
except:
# If this port can't handle the format change, or if anything goes
# wrong (like the owning component doesn't like the new format)
# stop the pipeline (from here at least)
if port.connection:
port.connection.disable()
raise
# Chain the format-change onward so everything downstream sees it.
# NOTE: the callback isn't given the format-change because there's no
# image data in it
for output in self.outputs:
out_buf = output.get_buffer()
out_buf.copy_from(buf)
output.send_buffer(out_buf)
return False
def _handle_parameter_changed(self, port, buf):
"""
Handles parameter change events passed to the component (where
:attr:`MMALBuffer.command` is set to MMAL_EVENT_PARAMETER_CHANGED).
The default implementation does nothing but return ``False``
(indicating that processing should continue). Override this in
descendents to respond to parameter changes.
The *port* parameter is the port into which the event arrived, and
*buf* contains the event itself (a MMAL_EVENT_PARAMETER_CHANGED_T
structure).
"""
return False
def _handle_error(self, port, buf):
"""
Handles error notifications passed to the component (where
:attr:`MMALBuffer.command` is set to MMAL_EVENT_ERROR).
The default implementation does nothing but return ``True`` (indicating
that processing should halt). Override this in descendents to respond
to error events.
The *port* parameter is the port into which the event arrived.
"""
return True
def _handle_end_of_stream(self, port, buf):
"""
Handles end-of-stream notifications passed to the component (where
:attr:`MMALBuffer.command` is set to MMAL_EVENT_EOS).
The default implementation does nothing but return ``True`` (indicating
that processing should halt). Override this in descendents to respond
to the end of stream.
The *port* parameter is the port into which the event arrived.
"""
return True
class MMALPythonTarget(MMALPythonComponent):
"""
Provides a simple component that writes all received buffers to the
specified *output* until a frame with the *done* flag is seen (defaults to
MMAL_BUFFER_HEADER_FLAG_EOS indicating End Of Stream).
The component provides all picamera's usual IO-handling characteristics; if
*output* is a string, a file with that name will be opened as the output
and closed implicitly when the component is closed. Otherwise, the output
will not be closed implicitly (the component did not open it, so the
assumption is that closing *output* is the caller's responsibility). If
*output* is an object with a ``write`` method it is assumed to be a
file-like object and is used as is. Otherwise, *output* is assumed to be a
writeable object supporting the buffer protocol (which is wrapped in a
:class:`BufferIO` stream).
"""
__slots__ = ('_opened', '_stream', '_done', '_event')
def __init__(self, output, done=mmal.MMAL_BUFFER_HEADER_FLAG_EOS):
super(MMALPythonTarget, self).__init__(name='py.target', outputs=0)
self._stream, self._opened = open_stream(output)
self._done = done
self._event = Event()
# Accept all the formats picamera generally produces (user can add
# other esoteric stuff if they need to)
self.inputs[0].supported_formats = {
mmal.MMAL_ENCODING_MJPEG,
mmal.MMAL_ENCODING_H264,
mmal.MMAL_ENCODING_JPEG,
mmal.MMAL_ENCODING_GIF,
mmal.MMAL_ENCODING_PNG,
mmal.MMAL_ENCODING_BMP,
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
}
def close(self):
super(MMALPythonTarget, self).close()
close_stream(self._stream, self._opened)
def enable(self):
self._event.clear()
super(MMALPythonTarget, self).enable()
def wait(self, timeout=None):
"""
Wait for the output to be "complete" as defined by the constructor's
*done* parameter. If *timeout* is specified it is the number of seconds
to wait for completion. The method returns ``True`` if the target
completed within the specified timeout and ``False`` otherwise.
"""
return self._event.wait(timeout)
def _handle_frame(self, port, buf):
self._stream.write(buf.data)
if buf.flags & self._done:
self._event.set()
return True
return False
class MMALPythonConnection(MMALBaseConnection):
"""
Represents a connection between an :class:`MMALPythonBaseComponent` and a
:class:`MMALBaseComponent` or another :class:`MMALPythonBaseComponent`.
The constructor accepts arguments providing the *source* :class:`MMALPort`
(or :class:`MMALPythonPort`) and *target* :class:`MMALPort` (or
:class:`MMALPythonPort`).
The *formats* parameter specifies an iterable of formats (in preference
order) that the connection may attempt when negotiating formats between
the two ports. If this is ``None``, or an empty iterable, no negotiation
will take place and the source port's format will simply be copied to the
target port. Otherwise, the iterable will be worked through in order until
a format acceptable to both ports is discovered.
The *callback* parameter can optionally specify a callable which will be
executed for each buffer that traverses the connection (providing an
opportunity to manipulate or drop that buffer). If specified, it must be a
callable which accepts two parameters: the :class:`MMALPythonConnection`
object sending the data, and the :class:`MMALBuffer` object containing
data. The callable may optionally manipulate the :class:`MMALBuffer` and
return it to permit it to continue traversing the connection, or return
``None`` in which case the buffer will be released.
.. data:: default_formats
:annotation: = (MMAL_ENCODING_I420, MMAL_ENCODING_RGB24, MMAL_ENCODING_BGR24, MMAL_ENCODING_RGBA, MMAL_ENCODING_BGRA)
Class attribute defining the default formats used to negotiate
connections between Python and and MMAL components, in preference
order. Note that OPAQUE is not present in contrast with the default
formats in :class:`MMALConnection`.
"""
__slots__ = ('_enabled', '_callback')
default_formats = (
mmal.MMAL_ENCODING_I420,
mmal.MMAL_ENCODING_RGB24,
mmal.MMAL_ENCODING_BGR24,
mmal.MMAL_ENCODING_RGBA,
mmal.MMAL_ENCODING_BGRA,
)
def __init__(
self, source, target, formats=default_formats, callback=None):
if not (
isinstance(source, MMALPythonPort) or
isinstance(target, MMALPythonPort)
):
raise PiCameraValueError('use a real MMAL connection')
super(MMALPythonConnection, self).__init__(source, target, formats)
self._enabled = False
self._callback = callback
def close(self):
self.disable()
super(MMALPythonConnection, self).close()
@property
def enabled(self):
"""
Returns ``True`` if the connection is enabled. Use :meth:`enable`
and :meth:`disable` to control the state of the connection.
"""
return self._enabled
def enable(self):
"""
Enable the connection. When a connection is enabled, data is
continually transferred from the output port of the source to the input
port of the target component.
"""
if not self._enabled:
self._enabled = True
if isinstance(self._target, MMALPythonPort):
# Connected python input ports require no callback
self._target.enable()
else:
# Connected MMAL input ports don't know they're connected so
# provide a dummy callback
self._target.params[mmal.MMAL_PARAMETER_ZERO_COPY] = True
self._target.enable(lambda port, buf: True)
if isinstance(self._source, MMALPythonPort):
# Connected python output ports are nothing more than thin
# proxies for the target input port; no callback required
self._source.enable()
else:
# Connected MMAL output ports are made to transfer their
# data to the Python input port
self._source.params[mmal.MMAL_PARAMETER_ZERO_COPY] = True
self._source.enable(self._transfer)
def disable(self):
"""
Disables the connection.
"""
self._enabled = False
self._source.disable()
self._target.disable()
def _transfer(self, port, buf):
while self._enabled:
try:
dest = self._target.get_buffer(timeout=0.01)
except PiCameraPortDisabled:
dest = None
if dest:
dest.copy_from(buf)
try:
self._target.send_buffer(dest)
except PiCameraPortDisabled:
pass
return False
@property
def name(self):
return '%s/%s' % (self._source.name, self._target.name)
def __repr__(self):
try:
return '<MMALPythonConnection "%s">' % self.name
except NameError:
return '<MMALPythonConnection closed>'