Source code for

from __future__ import print_function, unicode_literals
from functools import wraps
from copy import deepcopy
from datetime import datetime

import time

from .single import UnifiVideoSingle
from . import utils

endpoints = {
    'save': lambda x: 'camera/{}'.format(x),
    'data': lambda x: 'camera/{}'.format(x),
    'snapshot': lambda c, w: 'snapshot/camera/{}?force=true{}'.format(
        c, '&width={}'.format(w) if w else ''),
    'recording_span': lambda x, s, e: 'video/camera?' \
        'startTime={}&endTime={}&cameras[]={}'.format(s, e, x)

# Supported camera models, checked during UnifiVideoCamera initialization.
# Some model specifications include list of supported features. These are
# not currently checked against and are included only to account for
# some potential future use.
# The structure is constructed from bits gleaned from the frontend JS
# served by UniFi Video.
models = {
    'UVC': {},

    'UVC G3': {
        'features': [

    'UVC G3 Dome': {
        'features': [

    'UVC Dome': {},

    'UVC Pro': {
        'pro': True,
        'features': [

    'UVC G3 Pro': {
        'pro': True,
        'features': [

    'UVC G3 Flex': {
        'features': [

    'UVC Micro': {
        'features': [

    'UVC G3 Micro': {
        'features': [

    'Vision Pro': {
        'features': [

    'airCam': {},

    'airCam Dome': {},

    'airCam Mini': {},

    'UVC G4 Bullet': {
        'features': [

    'UVC G4 Pro': {
        'features': [

common_isp_actionables = [
    ['brightness', 0, 100],
    ['contrast', 0, 100],
    ['hue', 0, 100],
    ['saturation', 0, 100],
    ['denoise', 0, 100],
    ['sharpness', 0, 100],

def determine_img_actionables(fw_platform, camera_model):
    actionables = list(map(lambda x: x[0], common_isp_actionables))

    if fw_platform == 'GEN1':
        actionables.extend(['gamma', 'aeModeGen1'])
        actionables.extend(['wdr', 'aeMode'])

    if camera_model in ['UVC Pro', 'UVC G3 Pro']:
        actionables.extend(['irLedModePro', 'zoom', 'focus'])

    return actionables

def isp_actionable(floor=0, ceiling=100, name=None):
    def decfn(fn):
        def wrapper(camera, val=None):
            fn_name = name or fn.__name__
            if fn_name not in camera._isp_actionables:
                raise CameraModelError('This camera model ({}) has no ' \
                    'support for {} control'.format(camera.model, fn_name))
            if val is None:
                return fn(camera)
            if val > ceiling:
                val = ceiling
            elif val < floor:
                val = floor
            return fn(camera, val)
        return wrapper
    return decfn

def add_actionable(actionable):
    name, floor, ceiling = actionable

    def fn(self, value=None):
        isp = self._data['ispSettings']
        if value is None:
            return isp.get(name, -1)
        isp[name] = value
        if isp[name] == value:
            return True
            return False
    fn.__name__ = str(name)
    fn.__doc__ = """Control image {name}

        value (int or NoneType): New {name} value
            (min: ``{floor}``, max: ``{ceiling}``)

        bool or int: ``True`` or ``False``, depending on whether new value
        value was successfully registered. Current {name} value when
        called without input value.

    """.format(name=name, floor=floor, ceiling=ceiling)

    setattr(UnifiVideoCamera, name, isp_actionable(floor, ceiling)(fn))

[docs]class CameraModelError(ValueError): """Unsupported camera model""" def __init__(self, message=None): if not message: message = 'Unsupported camera model' super(CameraModelError, self).__init__(message)
[docs]class UnifiVideoCamera(UnifiVideoSingle): """Represents a single camera connected to a UniFi Video server (:class:`~unifi_video.api.UnifiVideoAPI`). Attributes: name (str or NoneType): Camera name model (str or NoneType): Camera model platform (str or NoneType): Firmware platform overlay_text (str): Custom text overlayd over the image mac_addr (str): Camera MAC address utc_h_offset (int or NoneType): UTC offset in hours state (str): Camera state managed (bool): Whether camera is managed by the UniFi Video instance provisioned (bool): Whether camera is provisioned managed_by_others (bool): Whether camera is managed by some other UniFi Video instance disconnect_reason (str): Reason for most recent disconnect connected (bool): Whether camera is connected (ie, not disconnected or in process of rebooting or being upgraded) last_recording_id (str): MongoDB ObjectID of latest recording last_recording_start_time (int): Unix timestamp (in ms): start time of latest recording last_seen (int): Unix timestamp (in ms). Meaning depends on the value of :attr:`UnifiVideoCamera.state`: - *CONNECTED*: timestamp for when the camera came online - *DISCONNECTED*: timestamp for when the camera went offline last_seen_ndt (datetime or NoneType): :attr:`UnifiVideoCamera.last_seen` as naive :class:`datetime` object _id (str): Camera ID (MongoDB ObjectID as hex string) _data (dict): Complete camera JSON from UniFi Video server _isp_actionables (list): List of supported image settings Warning: Attributes having to do with camera state reflect the state as it was during object instantiation. Warning: :attr:`UnifiVideoCamera.last_seen` changes were observed on UniFi Video v3.10.13. No attempt has been made to verify :attr:`UnifiVideoCamera.last_seen` acts the same way across all supported UniFi Video versions. """ def _load_data(self, data): self.model = data.get('model', None) if not self.model or self.model not in models: raise CameraModelError self._data = data self._id = data['_id'] = data.get('name', None) self.platform = data.get('platform', None) self.overlay_text = data.get('osdSettings', {}).get('tag', None) self.mac_addr = utils.format_mac_addr(data.get('mac', 'ffffffffffff')) self._isp_actionables = determine_img_actionables(self.platform, self.model) self.state = data.get('state', '') self.managed = data.get('managed', None) self.provisioned = data.get('provisioned', None) self.managed_by_others = data.get('managedByOthers', None) self.disconnect_reason = data.get('disconnectReason') or '' self.connected = self.state == 'CONNECTED' self.last_recording_id = data.get('lastRecordingId', '') or '' self.last_recording_start_time = \ data.get('lastRecordingStartTime', 0) or 0 self.last_seen = data.get('lastSeen', 0) self.last_seen_ndt = datetime.fromtimestamp(self.last_seen / 1000) \ if self.last_seen else None try: self.utc_offset = utils.parse_gmt_offset( (data.get('deviceSettings') or {}).get('timezone', '')) self.utc_h_offset = self.utc_offset / 3600 except (TypeError, ValueError): self.utc_offset = None self.utc_h_offset = None def _simple_isp_actionable(self, setting_name, value): isp = self._data['ispSettings'] if value is None: return isp.get(setting_name, -1) isp[setting_name] = value self.update(True) if isp[setting_name] == value: return True else: return False def _toggable_osd_actionable(self, setting_name, enabled, ints=False): osd = self._data['osdSettings'] if enabled is None: return bool(osd[setting_name]) osd[setting_name] = int(enabled) if ints else enabled self.update(True) if osd[setting_name] == enabled: return True else: return False
[docs] def update(self, save=False): """Update settings from remote UniFi Video server (``self._api``). Call with ``True`` to write local settings to remote before updating. :param bool save: Whether to push settings to the camera """ if save: self._load_data(self._extract_data( self._api.put(endpoints['save'](self._id), self._data))) else: self._load_data(self._extract_data( self._api.get(endpoints['data'](self._id))))
[docs] def snapshot(self, filename=None, width=0): """Take and download snapshot. :param filename: Filename to save the snapshot to :type filename: str or None :param width: Image width in pixels :type width: int """ return self._api.get( endpoints['snapshot'](self._id, int(width)), filename if filename else 'snapshot-{}-{}.jpg'.format( self._id, int(time.time())))
[docs] def recording_between(self, start_time, end_time, filename=None): '''Download a recording of the camera's footage from an arbitrary timespan, between ``start_time`` and ``end_time``. Arguments: start_time (datetime or str or int): Recording start time. (See :meth:`~unifi_video.utils.dt_resolvable_to_ms`.) end_time (datetime or str or int): Recording end time. (See :meth:`~unifi_video.utils.dt_resolvable_to_ms`.) filename (str, optional): Filename to save the recording to (a ZIP file). Will use whatever the server provides if left out. Tip: Widen the time span by a few seconds at each end. UniFi Video often falls a little short of the exact start and end times. ''' start_time = utils.dt_resolvable_to_ms( start_time, utc_offset=self._api.utc_offset, resolution=1000) end_time = utils.dt_resolvable_to_ms( end_time, utc_offset=self._api.utc_offset, resolution=1000) return self._api.get(endpoints['recording_span']( self._id, start_time, end_time), filename if filename else '')
[docs] @isp_actionable(0, 3, name='wdr') def dynamic_range(self, wdr=None): """Control image WDR (dynamic range). Input should be either `None` or an `int` between ``0`` and ``3``. :param wdr: New WDR value :type wdr: int or None :return: If value provided: `True` or `False`, depending on whether new value was registered. If no value provided: current WDR value. :rtype: `bool` or `int` """ return self._simple_isp_actionable('wdr', wdr)
[docs] def ir_leds(self, led_state=None): """Control IR leds. :param led_state: New led state (``auto``, ``on``, or ``off``). :type led_state: str or None :return: `True` or `False` depending on successful value change. Current led state if called with no args. :rtype: `bool` or `str` """ isp = self._data['ispSettings'] if not led_state: _s = isp['irLedMode'] _v = isp['irLedLevel'] if _s == 'auto': return _s elif _s == 'manual' and _v == 0: return 'off' elif _s == 'manual' and _v == 215: return 'on' isp['irLedLevel'] = 215 if led_state == 'auto': isp['irLedMode'] = 'auto' elif led_state == 'on': isp['irLedMode'] = 'manual' elif led_state == 'off': isp['irLedMode'] = 'manual' isp['irLedLevel'] = 0 else: raise ValueError('Unknown led_state: {}'.format(led_state)) verify = isp['irLedMode'] + str(isp['irLedLevel']) self.update(True) if isp['irLedMode'] + str(isp['irLedLevel']) == verify: return True else: return False
[docs] def onscreen_text(self, text=None): """Set or get on-screen text. :param text: New on-screen text :type text: str or None :return: `True` for successful value change, `Fail` for failed attempt, current `str` value if called without ``text``. :rtype: `bool` or `str` """ if not text: return self.overlay_text osd = self._data['osdSettings'] osd['overrideMessage'] = True osd['tag'] = text.strip() self.update(True) if osd['tag'] == text: return True else: return False
[docs] def onscreen_timestamp(self, enabled=None): """Set or get on-screen timestamp state. :param enabled: New state :type enabled: bool or None :return: `True` for successful state change, `Fail` for failed attempt. Either of the two for current state (when called without the ``enabled`` arg) :rtype: `bool` """ return self._toggable_osd_actionable('enableDate', enabled, True)
[docs] def onscreen_watermark(self, enabled=None): """Enable or disable on-screen watermark. Call without args to get current setting. :param enabled: Enable or disable :type enabled: bool or None :return: `True` for successful change, `Fail` for failed attempt. One or the other for calls without args. :rtype: `bool` """ return self._toggable_osd_actionable('enableLogo', enabled, True)
[docs] def set_recording_settings(self, recording_mode=None, pre_padding_secs=None, post_padding_secs=None): """Set recording mode and pre/post padding. Possible recording modes: - ``disable``: don't record - ``fulltime``: record at all times - ``motion``: record when motion detected :param str recording_mode: See above :param int pre_padding_secs: Number of seconds to include pre-motion footage of :param int post_padding_secs: Number of seconds to include post-motion footage of """ rec_settings = self._data['recordingSettings'] if recording_mode: if recording_mode == 'disable': rec_settings['fullTimeRecordEnabled'] = False rec_settings['motionRecordEnabled'] = False elif recording_mode == 'fulltime': rec_settings['fullTimeRecordEnabled'] = True rec_settings['motionRecordEnabled'] = True elif recording_mode == 'motion': rec_settings['fullTimeRecordEnabled'] = False rec_settings['motionRecordEnabled'] = True else: raise ValueError('Unknow recording mode "{}"'.format( recording_mode)) if pre_padding_secs is not None: rec_settings['prePaddingSecs'] = pre_padding_secs if post_padding_secs is not None: rec_settings['postPaddingSecs'] = post_padding_secs verify = deepcopy(rec_settings) self.update(True) return verify == self._data['recordingSettings']
[docs] def get_recording_settings(self, all=False): """Get camera's recording settings Arguments: all (bool): Whether to show all available settings. The default is to only show the settings that are controllable by calling :func:``. """ controllable = [ 'motionRecordEnabled', 'fullTimeRecordEnabled', 'prePaddingSecs', 'postPaddingSecs', ] all_settings = self._data.get('recordingSettings', {}) return {k: all_settings.get(k, None) for k in controllable} \ if not all else all_settings
def __str__(self): _filter = ['name', 'model', 'platform'] return '{}: {}'.format( type(self).__name__, {k: v for k, v in self.__dict__.items() if k in _filter})
# Define methods for controlling the isp actionables that are common to all # camera models. Other actionables -- those not common to all models -- # are controlled with methods defined in UnifiVideoCamera body. # # Note: "isp actionables" is what the Ubiquiti provided frontend JS refers # to contrast, brightness, saturation, etc. as. for actionable in common_isp_actionables: add_actionable(actionable)