| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- # gopro.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
- # This copyright was auto-generated on Wed, Sep 1, 2021 5:05:47 PM
- """Implements top level interface to GoPro module."""
- from __future__ import annotations
- import asyncio
- import enum
- import json
- import logging
- import threading
- import traceback
- from abc import abstractmethod
- from typing import Any, Awaitable, Callable, Final, Generic, TypeVar
- import requests
- import wrapt
- from open_gopro.api import (
- BleCommands,
- BleSettings,
- BleStatuses,
- HttpCommands,
- HttpSettings,
- WiredApi,
- WirelessApi,
- )
- from open_gopro.domain.communicator_interface import (
- GoProHttp,
- HttpMessage,
- Message,
- MessageRules,
- )
- from open_gopro.domain.exceptions import GoProNotOpened, ResponseTimeout
- from open_gopro.models import GoProResp
- from open_gopro.models.constants import ErrorCode
- from open_gopro.models.types import JsonDict
- from open_gopro.parsers.response import RequestsHttpRespBuilderDirector
- from open_gopro.util import pretty_print
- from open_gopro.util.logger import Logger
- logger = logging.getLogger(__name__)
- GoPro = TypeVar("GoPro", bound="GoProBase")
- ApiType = TypeVar("ApiType", WiredApi, WirelessApi)
- MessageMethodType = Callable[[Any, bool], Awaitable[GoProResp]]
- class GoProMessageInterface(enum.Enum):
- """Enum to identify wireless interface"""
- HTTP = enum.auto()
- BLE = enum.auto()
- @wrapt.decorator
- def catch_thread_exception(wrapped: Callable, instance: GoProBase, args: Any, kwargs: Any) -> Callable | None:
- """Catch any exceptions from this method and pass them to the exception handler identifier by thread name
- Args:
- wrapped (Callable): method that this is wrapping
- instance (GoProBase): instance owner of method
- args (Any): positional args
- kwargs (Any): keyword args
- Returns:
- Callable | None: forwarded return of wrapped method or None if exception occurs
- """
- try:
- return wrapped(*args, **kwargs)
- except Exception as e: # pylint: disable=broad-exception-caught
- instance._handle_exception(threading.current_thread().name, {"exception": e})
- return None
- def ensure_opened(interface: tuple[GoProMessageInterface]) -> Callable:
- """Raise exception if relevant interface is not currently opened
- Args:
- interface (tuple[GoProMessageInterface]): wireless interface to verify
- Returns:
- Callable: Direct pass-through of callable after verification
- """
- @wrapt.decorator
- def wrapper(wrapped: Callable, instance: GoProBase, args: Any, kwargs: Any) -> Callable:
- if GoProMessageInterface.BLE in interface and not instance.is_ble_connected:
- raise GoProNotOpened("BLE not connected")
- if GoProMessageInterface.HTTP in interface and not instance.is_http_connected:
- raise GoProNotOpened("HTTP interface not connected")
- return wrapped(*args, **kwargs)
- return wrapper
- @wrapt.decorator
- async def enforce_message_rules(wrapped: MessageMethodType, instance: GoProBase, args: Any, kwargs: Any) -> GoProResp:
- """Decorator proxy to call the GoProBase's _enforce_message_rules method.
- Args:
- wrapped (MessageMethodType): Operation to enforce
- instance (GoProBase): GoProBase instance to use
- args (Any): positional arguments to wrapped
- kwargs (Any): keyword arguments to wrapped
- Returns:
- GoProResp: common response object
- """
- return await instance._enforce_message_rules(wrapped, *args, **kwargs)
- class GoProBase(GoProHttp, Generic[ApiType]):
- """The base class for communicating with all GoPro Clients"""
- HTTP_TIMEOUT: Final = 5
- HTTP_GET_RETRIES: Final = 5
- def __init__(self, **kwargs: Any) -> None:
- self._should_maintain_state = kwargs.get("maintain_state", True)
- self._exception_cb = kwargs.get("exception_cb", None)
- async def __aenter__(self: GoPro) -> GoPro:
- await self.open()
- return self
- async def __aexit__(self, *_: Any) -> None:
- await self.close()
- @abstractmethod
- async def open(self, timeout: int = 10, retries: int = 5) -> None:
- """Connect to the GoPro Client and prepare it for communication
- Args:
- timeout (int): time before considering connection a failure. Defaults to 10.
- retries (int): number of connection retries. Defaults to 5.
- """
- @abstractmethod
- async def close(self) -> None:
- """Gracefully close the GoPro Client connection"""
- @property
- @abstractmethod
- async def is_ready(self) -> bool:
- """Is gopro ready to receive commands
- Returns:
- bool: yes if ready, no otherwise
- """
- @property
- @abstractmethod
- def _requests_session(self) -> requests.Session:
- """The requests session used to communicate with the GoPro Client
- Returns:
- requests.Session: requests session
- """
- @property
- @abstractmethod
- def identifier(self) -> str:
- """Unique identifier for the connected GoPro Client
- Returns:
- str: identifier
- """
- @property
- def version(self) -> str:
- """The API version that the connected camera supports
- Only 2.0 is currently supported
- Returns:
- str: supported version
- """
- return self._api.version
- @property
- @abstractmethod
- def http_command(self) -> HttpCommands:
- """Used to access the Wifi commands
- Returns:
- HttpCommands: the commands
- """
- @property
- @abstractmethod
- def http_setting(self) -> HttpSettings:
- """Used to access the Wifi settings
- Returns:
- HttpSettings: the settings
- """
- @property
- @abstractmethod
- def ble_command(self) -> BleCommands:
- """Used to call the BLE commands
- Returns:
- BleCommands: the commands
- """
- @property
- @abstractmethod
- def ble_setting(self) -> BleSettings:
- """Used to access the BLE settings
- Returns:
- BleSettings: the settings
- """
- @property
- @abstractmethod
- def ble_status(self) -> BleStatuses:
- """Used to access the BLE statuses
- Returns:
- BleStatuses: the statuses
- """
- @property
- @abstractmethod
- def is_open(self) -> bool:
- """Is this client ready for communication?
- Returns:
- bool: True if yes, False if no
- """
- @property
- @abstractmethod
- def is_ble_connected(self) -> bool:
- """Are we connected via BLE to the GoPro device?
- Returns:
- bool: True if yes, False if no
- """
- @property
- @abstractmethod
- def is_http_connected(self) -> bool:
- """Are we connected via HTTP to the GoPro device?
- Returns:
- bool: True if yes, False if no
- """
- ##########################################################################################################
- # End Public API
- ##########################################################################################################
- @abstractmethod
- async def _enforce_message_rules(
- self, wrapped: Callable, message: Message, rules: MessageRules, **kwargs: Any
- ) -> GoProResp:
- """Rule Enforcer. Called by enforce_message_rules decorator.
- Args:
- wrapped (Callable): operation to enforce
- message (Message): message passed to operation
- rules (MessageRules): rules to enforce
- **kwargs (Any) : arguments passed to operation
- Returns:
- GoProResp: Operation response
- """
- def _handle_exception(self, source: Any, context: JsonDict) -> None:
- """Gather exceptions from module threads and send through callback if registered.
- Note that this function signature matches asyncio's exception callback requirement.
- Args:
- source (Any): Where did the exception come from?
- context (JsonDict): Access exception via context["exception"]
- """
- # context["message"] will always be there; but context["exception"] may not
- if exception := context.get("exception", False):
- logger.error(f"Received exception {exception} from {source}")
- logger.error(traceback.format_exc())
- if self._exception_cb:
- self._exception_cb(exception)
- else:
- logger.error(f"Caught unknown message: {context['message']} from {source}")
- class _InternalState(enum.IntFlag):
- """State used to manage whether the GoPro instance is ready or not."""
- READY = 0
- ENCODING = 1 << 0
- SYSTEM_BUSY = 1 << 1
- @property
- @abstractmethod
- def ip_address(self) -> str:
- """The IP address of the GoPro device
- Raises:
- GoProNotOpened: The GoPro IP address is not yet available
- Returns:
- str: IP address
- """
- @property
- @abstractmethod
- def _base_url(self) -> str:
- """Build the base endpoint for USB commands
- Returns:
- str: base endpoint with URL from serial number
- """
- @property
- @abstractmethod
- def _api(self) -> ApiType:
- """Unique identifier for the connected GoPro Client
- Returns:
- ApiType: identifier
- """
- @staticmethod
- def _ensure_opened(interface: tuple[GoProMessageInterface]) -> Callable:
- """Raise exception if relevant interface is not currently opened
- Args:
- interface (tuple[GoProMessageInterface]): wireless interface to verify
- Returns:
- Callable: Direct pass-through of callable after verification
- """
- return ensure_opened(interface)
- @staticmethod
- def _catch_thread_exception(*args: Any, **kwargs: Any) -> Callable | None:
- """Catch any exceptions from this method and pass them to the exception handler identifier by thread name
- Args:
- *args (Any): positional args
- **kwargs (Any): keyword args
- Returns:
- Callable | None: forwarded return of wrapped method or None if exception occurs
- """
- return catch_thread_exception(*args, **kwargs)
- def _build_http_request_args(self, message: HttpMessage) -> dict[str, Any]:
- """Helper method to build request kwargs from message
- Args:
- message (HttpMessage): message to build args from
- Returns:
- dict[str, Any]: built args
- """
- # Dynamically build get kwargs
- request_args: dict[str, Any] = {}
- if message._headers:
- request_args["headers"] = message._headers
- if message._certificate:
- request_args["verify"] = str(message._certificate)
- return request_args
- @enforce_message_rules
- async def _get_json(
- self,
- message: HttpMessage,
- *,
- timeout: int = HTTP_TIMEOUT,
- rules: MessageRules = MessageRules(),
- **kwargs: Any,
- ) -> GoProResp:
- url = self._base_url + message.build_url(**kwargs)
- logger.debug(f"Sending: {url}")
- logger.info(Logger.build_log_tx_str(pretty_print(message._as_dict(**kwargs))))
- for retry in range(1, GoProBase.HTTP_GET_RETRIES + 1):
- try:
- http_response = self._requests_session.get(
- url,
- timeout=timeout,
- **self._build_http_request_args(message),
- )
- logger.trace(f"received raw json: {json.dumps(http_response.json() if http_response.text else {}, indent=4)}") # type: ignore
- if not http_response.ok:
- logger.warning(f"Received non-success status {http_response.status_code}: {http_response.reason}")
- response = RequestsHttpRespBuilderDirector(http_response, message._parser)()
- break
- except requests.exceptions.ConnectionError as e:
- # This appears to only occur after initial connection after pairing
- logger.warning(repr(e))
- # Back off before retrying. TODO This appears to be needed on MacOS
- await asyncio.sleep(2)
- except Exception as e: # pylint: disable=broad-exception-caught
- logger.error(f"Unexpected error: {repr(e)}")
- logger.warning(f"Retrying #{retry} to send the command...")
- else:
- raise ResponseTimeout(GoProBase.HTTP_GET_RETRIES)
- logger.info(Logger.build_log_rx_str(pretty_print(response._as_dict())))
- return response
- @enforce_message_rules
- async def _get_stream(
- self,
- message: HttpMessage,
- *,
- timeout: int = HTTP_TIMEOUT,
- rules: MessageRules = MessageRules(),
- **kwargs: Any,
- ) -> GoProResp:
- url = self._base_url + message.build_url(path=kwargs["camera_file"])
- logger.debug(f"Sending: {url}")
- with self._requests_session.get(
- url,
- stream=True,
- timeout=timeout,
- **self._build_http_request_args(message),
- ) as request:
- request.raise_for_status()
- file = kwargs["local_file"]
- with open(file, "wb") as f:
- logger.debug(f"receiving stream to {file}...")
- for chunk in request.iter_content(chunk_size=8192):
- f.write(chunk)
- return GoProResp(protocol=GoProResp.Protocol.HTTP, status=ErrorCode.SUCCESS, data=file, identifier=url)
- @enforce_message_rules
- async def _put_json(
- self,
- message: HttpMessage,
- *,
- timeout: int = HTTP_TIMEOUT,
- rules: MessageRules = MessageRules(),
- **kwargs: Any,
- ) -> GoProResp:
- url = self._base_url + message.build_url(**kwargs)
- body = message.build_body(**kwargs)
- logger.debug(f"Sending: {url} with body: {json.dumps(body, indent=4)}")
- for retry in range(1, GoProBase.HTTP_GET_RETRIES + 1):
- try:
- http_response = self._requests_session.put(
- url,
- timeout=timeout,
- json=body,
- **self._build_http_request_args(message),
- )
- logger.trace(f"received raw json: {json.dumps(http_response.json() if http_response.text else {}, indent=4)}") # type: ignore
- if not http_response.ok:
- logger.warning(f"Received non-success status {http_response.status_code}: {http_response.reason}")
- response = RequestsHttpRespBuilderDirector(http_response, message._parser)()
- break
- except requests.exceptions.ConnectionError as e:
- # This appears to only occur after initial connection after pairing
- logger.warning(repr(e))
- # Back off before retrying. TODO This appears to be needed on MacOS
- await asyncio.sleep(2)
- except Exception as e: # pylint: disable=broad-exception-caught
- logger.error(f"Unexpected error: {repr(e)}")
- logger.warning(f"Retrying #{retry} to send the command...")
- else:
- raise ResponseTimeout(GoProBase.HTTP_GET_RETRIES)
- return response
|