| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894 |
- # gopro_wireless.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
- # This copyright was auto-generated on Wed, Sep 1, 2021 5:05:47 PM
- """Implements top level interface to Wireless GoPros."""
- from __future__ import annotations
- import asyncio
- import enum
- import logging
- import queue
- import traceback
- from collections import defaultdict
- from copy import deepcopy
- from pathlib import Path
- from types import TracebackType
- from typing import Any, Callable, Final
- import requests
- from tinydb import TinyDB
- import open_gopro.features
- from open_gopro.api import (
- BleCommands,
- BleSettings,
- BleStatuses,
- HttpCommands,
- HttpSettings,
- WirelessApi,
- )
- from open_gopro.domain.communicator_interface import (
- BleMessage,
- GoProBle,
- GoProWirelessInterface,
- HttpMessage,
- Message,
- MessageRules,
- )
- from open_gopro.domain.exceptions import (
- ConnectFailed,
- ConnectionTerminated,
- GoProNotOpened,
- InterfaceConfigFailure,
- InvalidOpenGoProVersion,
- ResponseTimeout,
- )
- # These are imported this way for monkeypatching in pytest
- from open_gopro.domain.gopro_observable import GoProObservable
- from open_gopro.gopro_base import (
- GoProBase,
- GoProMessageInterface,
- enforce_message_rules,
- )
- from open_gopro.models import GoProResp
- from open_gopro.models.constants import GoProUUID, StatusId
- from open_gopro.models.constants.settings import SettingId
- from open_gopro.models.types import ProtobufId, ResponseType, UpdateCb, UpdateType
- from open_gopro.network.ble import BleakWrapperController, BleUUID
- from open_gopro.network.ble.controller import BLEController
- from open_gopro.network.wifi import WifiCli
- from open_gopro.network.wifi.controller import WifiController
- from open_gopro.network.wifi.requests_session import create_less_strict_requests_session
- from open_gopro.parsers.response import BleRespBuilder
- from open_gopro.util import SnapshotQueue, get_current_dst_aware_time, pretty_print
- from open_gopro.util.logger import Logger
- logger = logging.getLogger(__name__)
- class _ReadyLock:
- """Camera ready state lock manager"""
- class _LockOwner(enum.Enum):
- """Current owner of the communication lock"""
- RULE_ENFORCER = enum.auto()
- STATE_MANAGER = enum.auto()
- def __init__(self) -> None:
- self.lock = asyncio.Lock()
- self.owner: _ReadyLock._LockOwner | None = None
- async def __aenter__(self) -> _ReadyLock:
- """Acquire lock with clear ownership tracking"""
- await self.lock.acquire()
- return self
- async def __aexit__(self, exc_type: BaseException, exc_val: Any, exc_tb: TracebackType) -> None:
- """Release lock and clear ownership"""
- if self.lock.locked():
- self.lock.release()
- self.owner = None
- async def acquire(self, owner: _LockOwner) -> None:
- """Acquire lock with specified owner
- Args:
- owner (_LockOwner): Owner attempting to acquire lock
- """
- logger.trace(f"{owner.name} acquiring lock") # type: ignore
- await self.lock.acquire()
- self.owner = owner
- logger.trace(f"{owner.name} acquired lock") # type: ignore
- def release(self) -> None:
- """Release lock if locked"""
- if self.lock.locked():
- logger.trace(f"{self.owner.name} releasing lock") # type: ignore
- self.lock.release()
- self.owner = None
- class WirelessGoPro(GoProBase[WirelessApi], GoProWirelessInterface):
- """The top-level BLE and Wifi interface to a Wireless GoPro device.
- See the `Open GoPro SDK <https://gopro.github.io/OpenGoPro/python_sdk>`_ for complete documentation.
- This will handle, for BLE:
- - discovering target GoPro device
- - establishing the connection
- - discovering GATT characteristics
- - enabling notifications
- - discovering Open GoPro version
- - setting the date, time, timezone, and DST
- - transferring data
- This will handle, for Wifi:
- - finding SSID and password
- - establishing Wifi connection
- - transferring data
- This will handle, for COHN:
- - connecting to Access Point and provisioning COHN
- - maintaining the COHN credential database
- - appending COHN headers to HTTP requests
- It will also do some state management, etc:
- - ensuring camera is ready / not encoding before transferring data
- - sending keep alive signal periodically
- - tracking COHN state
- If no target arg is passed in, the first discovered BLE GoPro device will be connected to.
- It can be used via context manager:
- >>> async with WirelessGoPro() as gopro:
- >>> # Send some messages now
- Or without:
- >>> gopro = WirelessGoPro()
- >>> await gopro.open()
- >>> # Send some messages now
- Attributes:
- WRITE_TIMEOUT (Final[int]): BLE Write Timeout in seconds. Not configurable.
- Args:
- target (str | None): The trailing digits of the target GoPro's serial number to search for.
- Defaults to None which will connect to the first discovered GoPro.
- host_wifi_interface (str | None): used to specify the wifi interface the local machine will use to connect
- to the GoPro. Defaults to None in which case the first discovered interface will be used. This is only
- needed if you have multiple wifi interfaces on your machine.
- host_sudo_password (str | None): User password for sudo. Defaults to None in which case you will
- be prompted if a password is needed which should only happen on Nix systems. This is only needed for
- Nix systems where the user does not have passwordless sudo access to the wifi interface.
- cohn_db (Path): Path to COHN Database. Defaults to Path("cohn_db.json").
- interfaces (set[WirelessGoPro.Interface] | None): Wireless interfaces for which to attempt to
- establish communication channels. Defaults to None in which case both BLE and WiFi will be used.
- **kwargs (Any): additional parameters for internal use / testing
- Raises:
- ValueError: Invalid combination of arguments.
- InterfaceConfigFailure: In order to communicate via Wifi, there must be an available
- Wifi Interface. By default during initialization, the Wifi driver will attempt to automatically
- discover such an interface. If it does not find any, it will raise this exception. Note that
- the interface can also be specified manually with the 'wifi_interface' argument.
- """
- WRITE_TIMEOUT: Final[int] = 5
- class Interface(enum.Enum):
- """GoPro Wireless Interface selection"""
- BLE = enum.auto() #: Bluetooth Low Energy
- WIFI_AP = enum.auto() #: Wifi Access Point
- COHN = enum.auto() #: Camera on the Home Network (WIFI_STA mode).
- def __init__(
- self,
- target: str | None = None,
- host_wifi_interface: str | None = None,
- host_sudo_password: str | None = None,
- cohn_db: Path = Path("cohn_db.json"),
- interfaces: set[WirelessGoPro.Interface] | None = None,
- **kwargs: Any,
- ) -> None:
- GoProBase.__init__(self, **kwargs)
- # Store initialization information
- interfaces = interfaces or {WirelessGoPro.Interface.BLE, WirelessGoPro.Interface.WIFI_AP}
- self._should_enable_wifi = WirelessGoPro.Interface.WIFI_AP in interfaces
- self._should_enable_ble = WirelessGoPro.Interface.BLE in interfaces
- self._should_enable_cohn = WirelessGoPro.Interface.COHN in interfaces
- self._cohn_credentials = kwargs.get("cohn_credentials")
- self._is_cohn_configured = False
- # Valid parameter selections
- if self._should_enable_wifi and self._should_enable_cohn:
- raise ValueError("Can not have simultaneous COHN and Wifi Access Point connections")
- if self._should_enable_wifi and not self._should_enable_ble:
- raise ValueError("Can not have Wifi Access Point connection without BLE")
- self._identifier = target
- ble_adapter: type[BLEController] = kwargs.get("ble_adapter", BleakWrapperController)
- wifi_adapter: type[WifiController] = kwargs.get("wifi_adapter", WifiCli)
- # Set up API delegate
- self._wireless_api = WirelessApi(self)
- self._keep_alive_interval: int = kwargs.get("keep_alive_interval", 3)
- try:
- # Initialize GoPro Communication Client
- GoProWirelessInterface.__init__(
- self,
- ble_controller=ble_adapter(self._handle_exception),
- wifi_controller=wifi_adapter(host_wifi_interface, password=host_sudo_password),
- disconnected_cb=self._disconnect_handler,
- notification_cb=self._notification_handler,
- target=target,
- )
- except InterfaceConfigFailure as e:
- logger.error(
- "Could not find a suitable Wifi Interface. If there is an available Wifi interface, try passing it manually with the 'wifi_interface' argument."
- )
- raise e
- # Feature delegates
- self.cohn = open_gopro.features.CohnFeature(TinyDB(cohn_db, indent=4))
- self.access_point = open_gopro.features.AccessPointFeature()
- self.streaming = open_gopro.features.StreamFeature()
- # Builders for currently accumulating synchronous responses, indexed by GoProUUID. This assumes there
- # can only be one active response per BleUUID
- self._active_builders: dict[BleUUID, BleRespBuilder] = {}
- # Responses that we are waiting for.
- self._sync_resp_wait_q: SnapshotQueue[ResponseType] = SnapshotQueue()
- # Synchronous response that has been parsed and are ready for their sender to receive as the response.
- self._sync_resp_ready_q: SnapshotQueue[GoProResp] = SnapshotQueue()
- self._listeners: dict[UpdateType | GoProBle._CompositeRegisterType, set[UpdateCb]] = defaultdict(set)
- # To be set up when opening in async context
- self._loop: asyncio.AbstractEventLoop
- self._open = False
- self._is_ble_connected = False
- self._ble_disconnect_event: asyncio.Event
- if self._should_maintain_state:
- self._status_tasks: list[asyncio.Task] = []
- self._state_acquire_lock_tasks: list[asyncio.Task] = []
- self._ready_lock: _ReadyLock
- self._keep_alive_task: asyncio.Task
- self._encoding: bool
- self._busy: bool
- self._encoding_started: asyncio.Event
- @property
- def identifier(self) -> str:
- """Get a unique identifier for this instance.
- The identifier is the last 4 digits of the camera. That is, the same string that is used to
- scan for the camera for BLE.
- Raises:
- GoProNotOpened: Client is not opened yet so no identifier is available
- Returns:
- str: last 4 digits if available, else None
- """
- if not self._identifier:
- raise GoProNotOpened("Identifier not yet set")
- return self._identifier
- @property
- def is_ble_connected(self) -> bool:
- """Are we connected via BLE to the GoPro device?
- Returns:
- bool: True if yes, False if no
- """
- # We can't rely on the BLE Client since it can be connected but not ready
- return self._is_ble_connected
- @property
- def is_http_connected(self) -> bool:
- """Are we connected via HTTP to the GoPro device?
- That is, are we connected to the camera's access point or via COHN?
- Returns:
- bool: True if yes, False if no
- """
- return self._is_cohn_configured or self._wifi.is_connected
- @property
- def ble_command(self) -> BleCommands:
- """Used to call the BLE commands
- Returns:
- BleCommands: the commands
- """
- return self._api.ble_command
- @property
- def ble_setting(self) -> BleSettings:
- """Used to access the BLE settings
- Returns:
- BleSettings: the settings
- """
- return self._api.ble_setting
- @property
- def ble_status(self) -> BleStatuses:
- """Used to access the BLE statuses
- Returns:
- BleStatuses: the statuses
- """
- return self._api.ble_status
- @property
- def http_command(self) -> HttpCommands:
- """Used to access the Wifi commands
- Returns:
- HttpCommands: the commands
- """
- return self._api.http_command
- @property
- def http_setting(self) -> HttpSettings:
- """Used to access the Wifi settings
- Returns:
- HttpSettings: the settings
- """
- return self._api.http_setting
- async def open(self, timeout: int = 15, retries: int = 5) -> None:
- """Perform all initialization commands for ble and wifi
- For BLE: scan and find device, establish connection, discover characteristics, configure queries
- start maintenance, and get Open GoPro version..
- For Wifi: discover SSID and password, enable and connect. Or disable if not using.
- Raises:
- Exception: Any exceptions during opening are propagated through
- InvalidOpenGoProVersion: Only 2.0 is supported
- InterfaceConfigFailure: Requested connection(s) failed to establish
- Args:
- timeout (int): How long to wait for each connection before timing out. Defaults to 10.
- retries (int): How many connection attempts before considering connection failed. Defaults to 5.
- """
- # Set up concurrency
- self._loop = asyncio.get_running_loop()
- self._ble_disconnect_event = asyncio.Event()
- # If we are to perform BLE housekeeping
- if self._should_maintain_state:
- self._ready_lock = _ReadyLock()
- self._keep_alive_task = asyncio.create_task(self._periodic_keep_alive())
- self._encoding = True
- self._busy = True
- self._encoding_started = asyncio.Event()
- await self.cohn.open(gopro=self, loop=self._loop, cohn_credentials=self._cohn_credentials)
- await self.access_point.open(self._loop, self)
- await self.streaming.open(self._loop, self)
- RETRIES = 5
- for retry in range(RETRIES):
- try:
- if self._should_enable_ble:
- await self._open_ble(timeout, retries)
- # TODO need to handle sending these if BLE does not exist
- await self.ble_command.set_third_party_client_info()
- # Set current dst-aware time. Don't assert on success since some old cameras don't support this command.
- dt, tz_offset, is_dst = get_current_dst_aware_time()
- await self.ble_command.set_date_time_tz_dst(date_time=dt, tz_offset=tz_offset, is_dst=is_dst)
- # Find and configure API version
- version = (await self.ble_command.get_open_gopro_api_version()).data
- if version != self.version:
- raise InvalidOpenGoProVersion(version)
- logger.info(f"Using Open GoPro API version {version}")
- await self.cohn.wait_until_ready()
- # Establish Wifi / COHN connection if desired
- if self._should_enable_wifi:
- await self._open_wifi(timeout, retries)
- elif self._should_enable_cohn:
- # TODO DNS scan?
- if await self.cohn.is_configured:
- self._is_cohn_configured = True
- else:
- logger.warning("COHN needs to be configured.")
- # We need at least one connection to continue
- if not self.is_ble_connected and not self.is_http_connected:
- raise InterfaceConfigFailure("No connections were established.")
- if not self.is_ble_connected and self._should_maintain_state:
- logger.warning("Can not maintain state without BLE")
- self._should_maintain_state = False
- self._open = True
- return
- except Exception as e: # pylint: disable=broad-exception-caught
- logger.error(f"Error while opening: {repr(e)}")
- traceback.print_exc()
- await self.close()
- if retry >= RETRIES - 1:
- raise e
- async def close(self) -> None:
- """Safely stop the GoPro instance.
- This will disconnect BLE and WiFI if applicable.
- If not using the context manager, it is mandatory to call this before exiting the program in order to
- prevent reconnection issues because the OS has never disconnected from the previous session.
- """
- try:
- await self._close_wifi()
- await self._close_ble()
- for feature in [self.cohn, self.access_point, self.streaming]:
- await feature.close()
- except AttributeError:
- # This is possible if the GoPro was never opened
- pass
- except Exception as e: # pylint: disable=broad-exception-caught
- logger.warning(f"Error while closing features: {repr(e)}")
- self._open = False
- def register_update(self, callback: UpdateCb, update: UpdateType) -> None:
- """Register for callbacks when an update occurs
- Args:
- callback (UpdateCb): callback to be notified in
- update (UpdateType): update to register for
- """
- return self._register_update(callback, update)
- def _register_update(self, callback: UpdateCb, update: GoProBle._CompositeRegisterType | UpdateType) -> None:
- """Common register method for both public UpdateType and "protected" internal register type
- Args:
- callback (UpdateCb): callback to register
- update (GoProBle._CompositeRegisterType | UpdateType): update type to register for
- """
- self._listeners[update].add(callback)
- def unregister_update(self, callback: UpdateCb, update: UpdateType | None = None) -> None:
- """Unregister for asynchronous update(s)
- Args:
- callback (UpdateCb): callback to stop receiving update(s) on
- update (UpdateType | None): updates to unsubscribe for. Defaults to None (all
- updates that use this callback will be unsubscribed).
- """
- return self._unregister_update(callback, update)
- def _unregister_update(
- self, callback: UpdateCb, update: GoProBle._CompositeRegisterType | UpdateType | None = None
- ) -> None:
- """Common unregister method for both public UpdateType and "protected" internal register type
- Args:
- callback (UpdateCb): callback to unregister
- update (GoProBle._CompositeRegisterType | UpdateType | None): Update type to unregister for. Defaults to
- None which will unregister the callback for all update types.
- """
- if update:
- try:
- self._listeners.get(update, set()).remove(callback)
- except KeyError:
- # This is possible if, for example, the register occurred with register and the unregister is now an
- # individual setting / status
- return
- else:
- # If update was not specified, remove all uses of callback
- for key in dict(self._listeners).keys():
- try:
- self._listeners[key].remove(callback)
- except KeyError:
- continue
- @property
- def is_open(self) -> bool:
- """Is this client ready for communication?
- Returns:
- bool: True if yes, False if no
- """
- return self._open
- @property
- async def is_ready(self) -> bool:
- """Is gopro ready to receive commands
- Returns:
- bool: yes if ready, no otherwise
- """
- return not (self._busy or self._encoding)
- ##########################################################################################################
- #### Abstracted commands
- @GoProBase._ensure_opened((GoProMessageInterface.BLE,))
- async def keep_alive(self) -> bool:
- """Send a heartbeat to prevent the BLE connection from dropping.
- This is sent automatically by the GoPro instance if its `maintain_ble` argument is not False.
- Returns:
- bool: True if it succeeded,. False otherwise
- """
- return (await self.ble_setting.led.set(66)).ok # type: ignore
- ##########################################################################################################
- # End Public API
- ##########################################################################################################
- async def _enforce_message_rules(
- self, wrapped: Callable, message: Message, rules: MessageRules = MessageRules(), **kwargs: Any
- ) -> GoProResp:
- """Enforce rules around message sending"""
- if self._should_maintain_state and self.is_open and not rules.is_fastpass(**kwargs):
- logger.trace("Rule enforcer acquiring lock") # type: ignore
- async with self._ready_lock as lock:
- lock.owner = _ReadyLock._LockOwner.RULE_ENFORCER
- logger.trace("Rule enforcer acquired lock") # type: ignore
- response = await wrapped(message, **kwargs)
- logger.trace("Rule enforcer released lock") # type: ignore
- else:
- response = await wrapped(message, **kwargs)
- # Handle post-response actions
- if self._should_maintain_state and rules.should_wait_for_encoding_start(**kwargs):
- await self._encoding_started.wait()
- self._encoding_started.clear()
- return response
- async def _notify_listeners(self, update: UpdateType, value: Any) -> None:
- """Notify all registered listeners of this update
- Args:
- update (UpdateType): update to notify
- value (Any): value to notify
- """
- listeners: set[UpdateCb] = set()
- # check individual updates
- for listener in self._listeners.get(update, []):
- listeners.add(listener)
- # Now check our internal composite updates
- match update:
- case StatusId():
- for listener in self._listeners.get(GoProBle._CompositeRegisterType.ALL_STATUSES, []):
- listeners.add(listener)
- case SettingId():
- for listener in self._listeners.get(GoProBle._CompositeRegisterType.ALL_SETTINGS, []):
- listeners.add(listener)
- for listener in listeners:
- await listener(update, value)
- async def _periodic_keep_alive(self) -> None:
- """Task to periodically send the keep alive message via BLE."""
- while True:
- if self.is_ble_connected:
- if not await self.keep_alive():
- logger.error("Failed to send keep alive")
- await asyncio.sleep(self._keep_alive_interval)
- async def _open_ble(self, timeout: int = 10, retries: int = 5) -> None:
- """Connect the instance to a device via BLE.
- Raises:
- InterfaceConfigFailure: failed to get identifier from BLE client
- Args:
- timeout (int): Time in seconds before considering establishment failed. Defaults to 10 seconds.
- retries (int): How many tries to reconnect after failures. Defaults to 5.
- """
- # Establish connection, pair, etc.
- await self._ble.open(timeout, retries)
- self._is_ble_connected = True
- await self.ble_command.set_pairing_complete()
- if not self._ble.identifier:
- raise InterfaceConfigFailure("Failed to get identifier from BLE client")
- self._identifier = self._ble.identifier[-4:]
- # Start state maintenance
- if self._should_maintain_state:
- logger.trace("State manager initially acquiring lock") # type: ignore
- await self._ready_lock.acquire(_ReadyLock._LockOwner.STATE_MANAGER)
- logger.trace("State manager initially acquired lock") # type: ignore
- self._ble_disconnect_event.clear()
- async def _handle_encoding(observable: GoProObservable) -> None:
- async for encoding_status in observable.observe(debug_id=StatusId.ENCODING.name):
- asyncio.create_task(self._update_internal_state(StatusId.ENCODING, encoding_status))
- async def _handle_busy(observable: GoProObservable) -> None:
- async for busy_status in observable.observe(debug_id=StatusId.BUSY.name):
- asyncio.create_task(self._update_internal_state(StatusId.BUSY, busy_status))
- self._status_tasks.append(
- asyncio.create_task(_handle_encoding((await self.ble_status.encoding.get_value_observable()).unwrap()))
- )
- self._status_tasks.append(
- asyncio.create_task(_handle_busy((await self.ble_status.busy.get_value_observable()).unwrap()))
- )
- logger.info("BLE is ready!")
- async def _update_internal_state(self, update: UpdateType, value: int) -> None:
- """Update internal state based on camera status changes"""
- # Clean up pending tasks
- logger.trace(f"Received internal state update {update}: {value}") # type: ignore
- for task in self._state_acquire_lock_tasks:
- task.cancel()
- self._state_acquire_lock_tasks.clear()
- # Update state variables
- previous_ready = await self.is_ready
- encoding_started = False
- if update == StatusId.ENCODING:
- encoding_started = not self._encoding and bool(value)
- self._encoding = bool(value)
- elif update == StatusId.BUSY:
- self._busy = bool(value)
- current_ready = await self.is_ready
- logger.trace(f"Current state: {self._encoding=}, {self._busy=}, {current_ready=}") # type: ignore
- # Handle lock state transitions based on camera readiness
- if self._ready_lock.owner == _ReadyLock._LockOwner.STATE_MANAGER:
- if current_ready and not previous_ready:
- # Camera became ready, release lock
- self._ready_lock.release()
- elif not current_ready and self._ready_lock.owner != _ReadyLock._LockOwner.STATE_MANAGER:
- # Camera became busy, acquire lock
- try:
- task = asyncio.create_task(self._ready_lock.acquire(_ReadyLock._LockOwner.STATE_MANAGER))
- self._state_acquire_lock_tasks.append(task)
- await task
- except asyncio.CancelledError:
- pass
- # Notify encoding started if applicable
- if encoding_started and self.is_open:
- self._encoding_started.set()
- async def _route_response(self, response: GoProResp) -> None:
- """After parsing response, route it to any stakeholders (such as registered listeners)
- Args:
- response (GoProResp): parsed response to route
- """
- original_response = deepcopy(response)
- # We only support queries for either one ID or all ID's. If this is an individual query, extract the value
- # for cleaner response data
- if response._is_query and not response._is_push and len(response.data) == 1:
- response.data = list(response.data.values())[0]
- # Check if this is the awaited synchronous response (id matches). Note! these have to come in order.
- if ((head := await self._sync_resp_wait_q.peek_front()) == response.identifier) or (
- # There is a special case for an unsupported protobuf error response. It does not contain the feature ID so we
- # have to match it by Feature ID / UUID. Feature ID's are not used across UUID's so we will only check for
- # matching Feature ID(s)
- isinstance(head, ProtobufId)
- and ProtobufId(head.feature_id, None)
- == response.identifier # Feature IDs match and response identifier does not contain Action ID
- ):
- logger.info(Logger.build_log_rx_str(original_response, asynchronous=False))
- # Dequeue it and put this on the ready queue
- await self._sync_resp_wait_q.get()
- await self._sync_resp_ready_q.put(response)
- # If this wasn't the awaited synchronous response...
- else:
- logger.info(Logger.build_log_rx_str(original_response, asynchronous=True))
- if response._is_push:
- for update_id, value in response.data.items():
- await self._notify_listeners(update_id, value)
- elif isinstance(response.identifier, ProtobufId):
- await self._notify_listeners(response.identifier, response.data)
- def _notification_handler(self, handle: int, data: bytearray) -> None:
- """Receive notifications from the BLE controller.
- Args:
- handle (int): Attribute handle that notification was received on.
- data (bytearray): Bytestream that was received.
- """
- async def _async_notification_handler() -> None:
- # Responses we don't care about. For now, just the BLE-spec defined battery characteristic
- if (uuid := self._ble.gatt_db.handle2uuid(handle)) == GoProUUID.BATT_LEVEL:
- return
- logger.debug(f'Received response on BleUUID [{uuid}]: {data.hex(":")}')
- # Add to response dict if not already there
- if uuid not in self._active_builders:
- builder = BleRespBuilder()
- builder.set_uuid(uuid)
- self._active_builders[uuid] = builder
- # Accumulate the packet
- self._active_builders[uuid].accumulate(data)
- if (builder := self._active_builders[uuid]).is_finished_accumulating:
- # Clear active response from response dict
- del self._active_builders[uuid]
- await self._route_response(builder.build())
- asyncio.run_coroutine_threadsafe(_async_notification_handler(), self._loop)
- async def _close_ble(self) -> None:
- """Terminate BLE connection if it is connected"""
- if self._should_maintain_state:
- for task in [*self._status_tasks, *self._state_acquire_lock_tasks, self._keep_alive_task]:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass # This exception is expected when cancelling a task
- if self.is_ble_connected and self._ble is not None:
- await self._ble.close()
- await self._ble_disconnect_event.wait()
- def _disconnect_handler(self, _: Any) -> None:
- """Disconnect callback from BLE controller
- Raises:
- ConnectionTerminated: We entered this callback in an unexpected state.
- """
- self._is_ble_connected = False
- if self._ble_disconnect_event.is_set():
- raise ConnectionTerminated("BLE connection terminated unexpectedly.")
- self._ble_disconnect_event.set()
- @GoProBase._ensure_opened((GoProMessageInterface.BLE,))
- @enforce_message_rules
- async def _send_ble_message(
- self, message: BleMessage, rules: MessageRules = MessageRules(), **kwargs: Any
- ) -> GoProResp:
- # Store information on the response we are expecting
- await self._sync_resp_wait_q.put(message._identifier)
- logger.info(Logger.build_log_tx_str(pretty_print(message._as_dict(**kwargs))))
- # Fragment data and write it
- for packet in self._fragment(message._build_data(**kwargs)):
- logger.debug(f"Writing to [{message._uuid.name}] UUID: {packet.hex(':')}")
- await self._ble.write(message._uuid, packet)
- # Wait to be notified that response was received
- try:
- response = await asyncio.wait_for(self._sync_resp_ready_q.get(), WirelessGoPro.WRITE_TIMEOUT)
- except asyncio.TimeoutError as e:
- logger.error(
- f"Response timeout of {WirelessGoPro.WRITE_TIMEOUT} seconds when sending {message._identifier}!"
- )
- raise ResponseTimeout(WirelessGoPro.WRITE_TIMEOUT) from e
- except queue.Empty as e:
- logger.error(
- f"Response timeout of {WirelessGoPro.WRITE_TIMEOUT} seconds when sending {message._identifier}!"
- )
- raise ResponseTimeout(WirelessGoPro.WRITE_TIMEOUT) from e
- # Check status
- if not response.ok:
- logger.warning(f"Received non-success status: {response.status}")
- return response
- @GoProBase._ensure_opened((GoProMessageInterface.BLE,))
- @enforce_message_rules
- async def _read_ble_characteristic(
- self, message: BleMessage, rules: MessageRules = MessageRules(), **kwargs: Any
- ) -> GoProResp:
- received_data = await self._ble.read(message._uuid)
- logger.debug(f"Reading from {message._uuid.name}")
- builder = BleRespBuilder()
- builder.set_uuid(message._uuid)
- builder.set_packet(received_data)
- return builder.build()
- def _handle_cohn(self, message: HttpMessage) -> HttpMessage:
- """Prepend COHN headers if COHN is provisioned
- Args:
- message (HttpMessage): HTTP message to append headers to
- Returns:
- HttpMessage: potentially modified HTTP message
- """
- try:
- if self._should_enable_cohn and self.cohn.credentials:
- message._headers["Authorization"] = self.cohn.credentials.auth_token
- message._certificate = self.cohn.credentials.certificate_as_path
- return message
- except GoProNotOpened:
- return message
- async def _get_json(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp:
- message = self._handle_cohn(message)
- return await super()._get_json(*args, message=message, **kwargs)
- async def _get_stream(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp:
- message = self._handle_cohn(message)
- return await super()._get_stream(*args, message=message, **kwargs)
- async def _put_json(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp:
- message = self._handle_cohn(message)
- return await super()._put_json(*args, message=message, **kwargs)
- @GoProBase._ensure_opened((GoProMessageInterface.BLE,))
- async def _open_wifi(self, timeout: int = 30, retries: int = 5) -> None:
- """Connect to a GoPro device via Wifi.
- Args:
- timeout (int): Time before considering establishment failed. Defaults to 10 seconds.
- retries (int): How many tries to reconnect after failures. Defaults to 5.
- Raises:
- ConnectFailed: Was not able to establish the Wifi Connection
- """
- logger.info("Discovering Wifi AP info and enabling via BLE")
- password = (await self.ble_command.get_wifi_password()).data
- ssid = (await self.ble_command.get_wifi_ssid()).data
- for retry in range(1, retries):
- try:
- assert (await self.ble_command.enable_wifi_ap(enable=True)).ok
- async def _wait_for_camera_wifi_ready() -> None:
- logger.debug("Waiting for camera wifi ready status")
- while not (await self.ble_status.ap_mode.get_value()).data:
- await asyncio.sleep(0.200)
- await asyncio.wait_for(_wait_for_camera_wifi_ready(), 5)
- await self._wifi.open(ssid, password, timeout, 1)
- break
- except ConnectFailed:
- logger.warning(f"Wifi connection failed. Retrying #{retry}")
- # In case camera Wifi is in strange disable, reset it
- assert (await self.ble_command.enable_wifi_ap(enable=False)).ok
- else:
- raise ConnectFailed("Wifi Connection failed", timeout, retries)
- async def _close_wifi(self) -> None:
- """Terminate the Wifi connection."""
- if hasattr(self, "_wifi"): # Corner case where instantiation fails before superclass is initialized
- await self._wifi.close()
- @property
- def ip_address(self) -> str: # noqa: D102
- return self.cohn.credentials.ip_address if self._should_enable_cohn and self.cohn.credentials else "10.5.5.9"
- @property
- def _base_url(self) -> str:
- return (
- f"https://{self.ip_address}/"
- if self._should_enable_cohn and self.cohn.credentials
- else f"http://{self.ip_address}:8080/"
- )
- @property
- def _requests_session(self) -> requests.Session:
- return (
- create_less_strict_requests_session(self.cohn.credentials.certificate_as_path)
- if self._should_enable_cohn and self.cohn.credentials
- else requests.Session()
- )
- @property
- def _api(self) -> WirelessApi:
- return self._wireless_api
|