Source code for unifi_video.api

from __future__ import print_function, unicode_literals

    from urllib.parse import urljoin, urlparse, urlunparse
except ImportError:
    from urlparse import urljoin, urlparse, urlunparse

    from urllib.request import urlopen, Request
    from urllib.error import HTTPError
except ImportError:
    from urllib2 import urlopen, Request, HTTPError

import json

from .camera import UnifiVideoCamera
from .recording import UnifiVideoRecording
from .collections import UnifiVideoCollection
from .utils import parse_gmt_offset, dt_resolvable_to_ms

from distutils.version import LooseVersion

except NameError:
    unicode = str

endpoints = {
    'login': 'login',
    'cameras': 'camera',
    'recordings': lambda x: 'recording?idsOnly=false&' \
        'sortBy=startTime&sort=desc&limit={}'.format(x) \
        if x is not None else 'recording',
    'recording': lambda x: 'recording/{}'.format(
        x._id if isinstance(x, UnifiVideoRecording) else x),
    'bootstrap': 'bootstrap',
    'delete_all': 'recording?deleteRecordings&confirmed=true',

class UnifiVideoVersionError(ValueError):
    """Unsupported UniFi Video version"""

    def __init__(self, message=None):
        if not message:
            message = 'Unsupported UniFi Video version'
        super(UnifiVideoVersionError, self).__init__(message)

class UnifiVideoHTTPError(ValueError):
    """HTTP error with message from UniFi Video server"""

    def __init__(self, code=None, message=None, caused_by=None):
        msg = 'HTTP {} from UniFi Video.'.format(code)
        if message:
            msg += ' Error: {}.'.format(message)
        if caused_by:
            msg += ' Caused by: {}'.format(caused_by)
        super(UnifiVideoHTTPError, self).__init__(msg)

[docs]class UnifiVideoAPI(object): """Encapsulates a single UniFi Video server. Arguments: api_key (str): UniFi Video API key username (str): UniFi Video account username password (str): UniFi Video account pasword addr (str): UniFi Video host address port (int): UniFi Video host port schema (str): Protocol schema to use. Valid values: `http`, `https` verify_cert (bool): Whether to verify UniFi Video's TLS cert when connecting over HTTPS check_ufv_version (bool): Set to ``False`` to use with untested UniFi Video versions utc_offset_sec (int or NoneType): UniFi Video server's UTC offset in seconds. Note: At minimum, you have to - provide either an API key or a username:password pair - set the host address and port to wherever your UniFi Video is listening at Attributes: _data (dict): UniFi Video "bootstrap" JSON as a dict base_url (str): API base URL api_key (str or NoneType): API key (from input params) username (str or NoneType): Username (from input params) password (str or NoneType): Password (from input params) name (str or NoneType): UniFi Video server name version (str or NoneType): UniFi Video version jsession_av (str or NoneType): UniFi Video session ID cameras (:class:`UnifiVideoCollection`): Collection of :class:`` objects. Includes all cameras that the associated UniFi Video instance is aware of active_cameras (:class:`UnifiVideoCollection`): Like :attr:`UnifiVideoAPI.cameras` but only includes cameras that are both connected and managed by the UniFi Video instance. managed_cameras (:class:`UnifiVideoCollection`): Includes all cameras that are managed by the UniFi Video instance, whether they're online or not. recordings (:class:`UnifiVideoCollection`): Collection of :class:`~unifi_video.recording.UnifiVideoRecording` objects """ _supported_ufv_versions = [] _supported_ufv_version_ranges = [ ['3.9.12', '3.10.13'], ] def __init__(self, api_key=None, username=None, password=None, addr='localhost', port=7080, schema='http', verify_cert=True, check_ufv_version=True, utc_offset_sec=None): if not verify_cert and schema == 'https': import ssl self._ssl_context = ssl._create_unverified_context() if not api_key and not (username and password): raise ValueError('To init {}, provide either API key ' \ 'or username password pair'.format(type(self).__name__)) self.api_key = api_key self.login_attempts = 0 self.jsession_av = None self.username = username self.password = password self.utc_offset = utc_offset_sec self.base_url = '{}://{}:{}/api/2.0/'.format(schema, addr, port) self._version_stickler = check_ufv_version self._load_data(self.get(endpoints['bootstrap'])) self.cameras = UnifiVideoCollection(UnifiVideoCamera) self.active_cameras = UnifiVideoCollection(UnifiVideoCamera) self.managed_cameras = UnifiVideoCollection(UnifiVideoCamera) self.recordings = UnifiVideoCollection(UnifiVideoRecording) self.refresh_cameras() self.refresh_recordings() # /bootstrap: data[0].settings.systemSettings.gmtOffset first appeared # in version 3.10.2. For earlier versions, try to determine the offset # by comparing what is reported by attached cameras. If all cameras # report the same offset, use that offset for the server as well if self.utc_offset is None: camera_utc_offsets = set([ c.utc_offset for c in self.active_cameras if c.utc_offset is not None]) if len(camera_utc_offsets) == 1: self.utc_offset = camera_utc_offsets.pop() def _load_data(self, data): if not isinstance(data, dict): raise ValueError('Server responded with unknown bootstrap data') self._data = data.get('data', [{}]) = self._data[0].get('nvrName', None) self.version = self._data[0].get('systemInfo', {}).get('version', None) self._is_supported = False if self.utc_offset is None: system_settings = \ self._data[0].get('settings', {}).get('systemSettings', {}) \ or self._data[0].get('servers', [{}])[0]\ .get('systemSettings', {}) try: self.utc_offset = parse_gmt_offset( system_settings.get('gmtOffset', '')) except (TypeError, ValueError): pass if self.version in UnifiVideoAPI._supported_ufv_versions: self._is_supported = True else: v_actual = LooseVersion(self.version) for curr_version in UnifiVideoAPI._supported_ufv_version_ranges: v_low = LooseVersion(curr_version[0]) v_high = LooseVersion(curr_version[1]) try: if v_actual >= v_low and v_actual <= v_high: self._is_supported = True break except TypeError as e: break if self._version_stickler and not self._is_supported: raise UnifiVideoVersionError() def _ensure_headers(self, req): req.add_header('Content-Type', 'application/json') if self.jsession_av: req.add_header('Cookie', 'JSESSIONID_AV={}'\ .format(self.jsession_av)) def _build_req(self, url, data=None, method=None): url = urljoin(self.base_url, url) if self.api_key: _s, _nloc, _path, _params, _q, _f = urlparse(url) _q = '{}&apiKey={}'.format(_q, self.api_key) if len(_q) \ else 'apiKey={}'.format(self.api_key) url = urlunparse((_s, _nloc, _path, _params, _q, _f)) req = Request(url, bytes(json.dumps(data).encode('utf8'))) \ if data else Request(url) self._ensure_headers(req) if method: req.get_method = lambda: method return req def _parse_cookies(self, res, return_existing=False): if 'Set-Cookie' not in res.headers: return False cookies = res.headers['Set-Cookie'].split(',') for cookie in cookies: for part in cookie.split(';'): if 'JSESSIONID_AV' in part: self.jsession_av = part\ .replace('JSESSIONID_AV=', '').strip() return True def _urlopen(self, req): if hasattr(self, '_ssl_context'): return urlopen(req, context=self._ssl_context) else: return urlopen(req) def _get_response_content(self, res, raw=False): try: if res.headers['Content-Type'] == 'application/json': return json.loads('utf8')) raise KeyError except KeyError: upstream_filename = None if 'Content-Disposition' in res.headers: for part in res.headers['Content-Disposition'].split(';'): part = part.strip() if part.startswith('filename='): upstream_filename = part.split('filename=').pop() if isinstance(raw, str) or isinstance(raw, unicode): filename = raw if len(raw) else upstream_filename with open(filename, 'wb') as f: while True: chunk = if not chunk: break f.write(chunk) f.truncate() return True elif isinstance(raw, bool): return else: try: return'utf8') except UnicodeDecodeError: return def _handle_http_401(self, url, raw): if self.api_key: raise ValueError('Invalid API key') elif self.login(): return self.get(url, raw)
[docs] def get(self, url, raw=False, url_params={}): """Send GET request. Arguments: url (str): API endpoint (relative to the API base URL) raw (str or bool, optional): Set `str` filename if you want to save the response to a file. Set to ``True`` if you want the to return raw response data. url_params (dict, optional): URL parameters as a dict. Gets turned into query string and appended to ``url`` Returns: Response JSON (as `dict`) when `Content-Type` response header is `application/json` ``True`` if ``raw`` is `str` (filename) and a file was successfully written to Raw response body (as `bytes`) if the `raw` input param is of type `bool` ``False`` on HTTP 4xx - 5xx :rtype: NoneType, bool, dict, bytes """ if url_params: url = '{}?{}'.format( url, UnifiVideoAPI.params_to_query_str(url_params)) req = self._build_req(url) try: res = self._urlopen(req) self._parse_cookies(res) return self._get_response_content(res, raw) except HTTPError as err: if err.code == 401 and self.login_attempts == 0: return self._handle_http_401(url, raw) elif err.code == 400 and hasattr(err, 'headers') \ and 'application/json' in err.headers.get( 'content-type', ''): err_body = json.loads('utf8')) if isinstance(err_body, dict) and err_body.get('rc') == 'error': raise UnifiVideoHTTPError( code=err.code, message=err_body.get('message'), caused_by=err_body.get('causedBy')) return False
[docs] def post(self, url, data=None, raw=False, _method=None): """Send POST request. Args: url (str): API endpoint (relative to the API base URL) data (dict or NoneType): Request body raw (str or bool): Filename (`str`) if you want the response saved to a file, ``True`` (`bool`) if you want the response body as return value Returns: See :func:`~unifi_video.api.get`. """ if data: req = self._build_req(url, data, _method) else: req = self._build_req(url, method=_method) try: res = self._urlopen(req) self._parse_cookies(res) return self._get_response_content(res, raw) except HTTPError as err: if err.code == 401 and url != 'login' and self.login_attempts == 0: return self._handle_http_401(url, raw) return False
[docs] def put(self, url, data=None, raw=False): """Send PUT request. Thin wrapper around :func:``; the same parameter/return semantics apply here. """ return, data, raw, 'PUT')
[docs] def delete(self, url, data=None, raw=False): """Send DELETE request. Thin wrapper around :func:``; the same parameter/return semantics apply here. """ return, data, raw, 'DELETE')
def login(self): self.login_attempts = 1 res_data =['login'], { 'username': self.username, 'password': self.password}) if res_data: self.login_attempts = 0 return True else: return False
[docs] def refresh_cameras(self): '''GET cameras from the server and update camera collections Touches :attr:`UnifiVideoAPI.cameras`, :attr:`UnifiVideoAPI.active_cameras`, and :attr:`UnifiVideoAPI.managed_cameras` ''' collections = { 'cameras': { 'new_ids': set(), 'accepts': lambda _: True, }, 'active_cameras': { 'new_ids': set(), 'accepts': lambda cam: cam.managed and cam.connected, }, 'managed_cameras': { 'new_ids': set(), 'accepts': lambda cam: cam.managed, }, } # Suspicion: the type check provides zero value and exists simply due # to some momentary lapse in coherence at the time it was originally # written. Leaving it be, on the off chance that there was a good # reason for it. Unable to investigate atm. cameras = self.get(endpoints['cameras']) if not isinstance(cameras, dict): return for camera in ( UnifiVideoCamera(self, c) for c in cameras.get('data', [])): for cname, collection in collections.items(): if collection['accepts'](camera): getattr(self, cname).add(camera) collection['new_ids'].add(camera._id) for cname in collections.keys(): for camera_id in list(getattr(self, cname).keys()): if camera_id not in collections[cname]['new_ids']: del getattr(self, cname)[camera_id]
[docs] def refresh_recordings(self, limit=300): """GET recordings from the server and update ``self.recordings``. :param int limit: Limit the number of recording items to fetch (``0`` for no limit). """ for recording in self.get_recordings( rec_type='all', order='desc', limit=limit): self.recordings.add(recording)
[docs] def get_camera(self, search_term, managed_only=False): '''Get camera by its ObjectID, name or overlay text Arguments: search_term (str): String to test against :attr:``, :attr:`~unifi_video.UnifiVideoCamera._id`, and :attr:`~unifi_video.UnifiVideoCamera.overlay_text`. managed_only (bool): Whether to search unmanaged cameras as well. Returns: :class:`` or `NoneType` depending on whether or not `search_term` was matched to a camera. Tip: Do not attempt to find an unmanaged camera by it's overlay text; UniFi Video provides limited detail for unmanaged cameras. ''' search_term = search_term.lower() for camera in self.cameras: is_match = camera._id == search_term or \ == search_term or \ camera.overlay_text.lower() == search_term if is_match and (not managed_only or camera.managed): return camera
[docs] def get_recordings(self, rec_type='all', camera=None, start_time=None, end_time=None, limit=0, order='desc', req_each=False): '''Fetch recording listing Args: rec_type (str, optional): Type of recordings to fetch: *all*, *motion* or *fulltime* camera (:class:`` or str or \ list of :class:`` or \ list of str, optional): Camera or cameras whose recordings to fetch start_time (datetime or str or int, optional): Recording start time. (See :meth:`~unifi_video.utils.dt_resolvable_to_ms`.) end_time (datetime or str or int, optional): Recording end time. (See :meth:`~unifi_video.utils.dt_resolvable_to_ms`.) order (str, optional): Sort order: *desc* or *asc*. Recordings are sorted by their start time. limit (int, optional): Limit the number of recordings req_each (bool, optional): Whether to save bandwidth on the initial request and to fetch each recordings' details individually or to ask for each recordings' details to be included in the one and only initial request. ``True`` can potentially save you in total bytes transferred but will cost you in the number of HTTP requests made. Returns: Iterable[:class:`~unifi_video.recording.UnifiVideoRecording`] Note: You can use naive :class:`~datetime.datetime` objects or strings when you want to mark time in the UniFi Server's local time. Otherwise, use Unix timestamps (*int*, **in seconds**) or timezone aware :class:`~datetime.datetime` objects. ''' rec_types = { 'motion': ('motionRecording',), 'fulltime': ('fullTimeRecording',), 'all': ('motionRecording', 'fullTimeRecording'), } url_params = { 'sortBy': 'startTime', 'order': order, 'idsOnly': req_each, 'limit': limit if limit else None, 'cause': rec_types[rec_type], 'startTime': dt_resolvable_to_ms( start_time, utc_offset=self.utc_offset, resolution=1000) if start_time else None, 'endTime': dt_resolvable_to_ms( end_time, utc_offset=self.utc_offset, resolution=1000) if end_time else None, 'cameras': camera if isinstance(camera, (list, tuple)) else [camera], } if req_each: return ( UnifiVideoRecording( self, self.get(endpoints['recording'](rec_id))['data'][0]) for rec_id in self.get( endpoints['recordings'](None), url_params=url_params)['data'] ) else: return ( UnifiVideoRecording(self, rec) for rec in self.get( endpoints['recordings'](None), url_params=url_params)['data'] )
[docs] def delete_all_recordings(self): """ Delete all existing recordings """ return self.delete(endpoints['delete_all'])
def __str__(self): return '{}: {}'.format(type(self).__name__, { 'name':, 'version': self.version, 'supported_version': self._is_supported })
[docs] @staticmethod def params_to_query_str(params_dict): '''Build query string from dict of URL parameters Arguments: params_dict (dict): URL parameters Returns: str: Query string ''' str_conversions = { str: lambda x: x, unicode: lambda x: x, int: lambda x: '{}'.format(x), float: lambda x: '{}'.format(x), bool: lambda x: 'true' if x else 'false', UnifiVideoCamera: lambda x: x._id, } params = [] for k, v in ((k, v) for k, v in params_dict.items() if v is not None): if isinstance(v, (list, tuple)): for lv in (x for x in v if x is not None): params.append('{}[]={}'.format( k, str_conversions[type(lv)](lv))) else: params.append('{}={}'.format(k, str_conversions[type(v)](v))) return '&'.join(params)
__all__ = ['UnifiVideoAPI']