| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908 |
- # builders.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
- # This copyright was auto-generated on Tue Sep 7 21:35:53 UTC 2021
- """Common functionality across API versions to build commands, settings, and statuses"""
- from __future__ import annotations
- import enum
- import logging
- from collections.abc import Iterable
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any, Callable, Final, Generic, Protocol, TypeVar, Union
- import construct
- import wrapt
- from returns.result import ResultE
- from open_gopro.domain.communicator_interface import (
- BleMessage,
- BleMessages,
- GoProBle,
- GoProHttp,
- HttpMessage,
- HttpMessages,
- MessageRules,
- )
- from open_gopro.domain.enum import GoProIntEnum
- from open_gopro.domain.exceptions import GoProError
- from open_gopro.domain.gopro_observable import GoProCompositeObservable, GoProObservable
- from open_gopro.domain.parser_interface import (
- BytesBuilder,
- BytesParserBuilder,
- GlobalParsers,
- Parser,
- )
- from open_gopro.models import GoProResp
- from open_gopro.models.constants import (
- ActionId,
- CmdId,
- FeatureId,
- GoProUUID,
- QueryCmdId,
- SettingId,
- StatusId,
- )
- from open_gopro.models.types import CameraState, JsonDict, Protobuf, ProtobufId
- from open_gopro.network.ble import BleUUID
- from open_gopro.parsers.bytes import (
- ConstructByteParserBuilder,
- GoProEnumByteParserBuilder,
- ProtobufByteParser,
- )
- from open_gopro.util.logger import Logger
- logger = logging.getLogger(__name__)
- QueryParserType = Union[construct.Construct, type[GoProIntEnum], BytesParserBuilder]
- ######################################################## BLE #################################################
- T = TypeVar("T")
- class BleReadCommand(BleMessage):
- """A BLE command that reads data from a BleUUID
- Args:
- uuid (BleUUID): BleUUID to read from
- parser (Parser): the parser that will parse the received bytestream into a JSON dict
- """
- def __init__(self, uuid: BleUUID, parser: Parser) -> None:
- super().__init__(uuid=uuid, parser=parser, identifier=uuid)
- def _build_data(self, **kwargs: Any) -> bytearray:
- # Read commands do not have data
- raise NotImplementedError
- def __str__(self) -> str:
- return f"Read {self._uuid.name.lower().replace('_', ' ').title()}"
- def _as_dict(self, **kwargs: Any) -> JsonDict:
- """Return the attributes of the command as a dict
- Args:
- **kwargs (Any): additional entries for the dict
- Returns:
- JsonDict: command as dict
- """
- return {"id": self._uuid, **self._base_dict} | kwargs
- class BleWriteCommand(BleMessage):
- """A BLE command that writes to a BleUUID and retrieves responses by accumulating notifications
- Args:
- uuid (BleUUID): UUID to write to
- cmd (CmdId): command identifier
- param_builder (BytesBuilder | None): builds bytes from params. Defaults to None.
- parser (Parser | None): response parser to parse received bytes. Defaults to None.
- rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
- """
- def __init__(
- self,
- uuid: BleUUID,
- cmd: CmdId,
- param_builder: BytesBuilder | None = None,
- parser: Parser | None = None,
- rules: MessageRules = MessageRules(),
- ) -> None:
- self.param_builder = param_builder
- self.cmd = cmd
- self.rules = rules
- super().__init__(uuid, cmd, parser)
- def _build_data(self, **kwargs: Any) -> bytearray:
- data = bytearray([self.cmd.value])
- params = bytearray()
- if self.param_builder:
- params.extend(self.param_builder.build(*kwargs.values()))
- else:
- for arg in kwargs.values():
- params.extend(arg.value if isinstance(arg, enum.Enum) else arg)
- if params:
- data.append(len(params))
- data.extend(params)
- return data
- def __str__(self) -> str:
- return self.cmd.name.lower().replace("_", " ").removeprefix("cmdid").title()
- def _as_dict(self, **kwargs: Any) -> JsonDict:
- """Return the attributes of the command as a dict
- Args:
- **kwargs (Any): additional entries for the dict
- Returns:
- JsonDict: command as dict
- """
- return {"id": self.cmd, **self._base_dict} | kwargs
- class RegisterUnregisterAll(BleWriteCommand):
- """Base class for register / unregister all commands
- This will loop over all of the elements (i.e. settings / statuses found from the element_set entry of the
- producer tuple parameter) and individually register / unregister (depending on the action parameter) each
- element in the set
- Args:
- uuid (BleUUID): UUID to write to
- cmd (CmdId): Command ID that is being sent
- update_set (type[SettingId] | type[StatusId]): what are registering / unregistering for?
- action (Action): whether to register or unregister
- parser (Parser | None): Optional response parser. Defaults to None.
- """
- class Action(enum.Enum):
- """Enum to differentiate between register actions"""
- REGISTER = enum.auto()
- UNREGISTER = enum.auto()
- def __init__(
- self,
- uuid: BleUUID,
- cmd: CmdId,
- update_set: type[SettingId] | type[StatusId],
- action: Action,
- parser: Parser | None = None,
- ) -> None:
- self.action = action
- self.update_set = update_set
- super().__init__(uuid=uuid, cmd=cmd, parser=parser)
- def _build_data(self, **kwargs: Any) -> bytearray:
- return bytearray([self.cmd.value])
- class BleProtoCommand(BleMessage):
- """A BLE command that is sent and received as using the Protobuf protocol
- Args:
- uuid (BleUUID): BleUUID to write to
- feature_id (FeatureId): Feature ID that is being executed
- action_id (ActionId): protobuf specific action ID that is being executed
- response_action_id (ActionId): the action ID that will be in the response to this command
- request_proto (type[Protobuf]): the action ID that will be in the response
- response_proto (type[Protobuf]): protobuf used to parse received bytestream
- parser (Parser | None): Optional response parser. Defaults to None.
- additional_matching_ids (set[ProtobufId | CmdId] | None): Other action ID's to share this parser. This is used, for
- example, if a notification shares the same ID as the synchronous response. Defaults to None. Defaults to None.
- """
- def __init__(
- self,
- uuid: BleUUID,
- feature_id: FeatureId,
- action_id: ActionId,
- response_action_id: ActionId,
- request_proto: type[Protobuf],
- response_proto: type[Protobuf],
- parser: Parser | None,
- additional_matching_ids: set[ProtobufId | CmdId] | None = None,
- ) -> None:
- p = parser or Parser()
- p.byte_json_adapter = ProtobufByteParser(response_proto)
- super().__init__(uuid=uuid, parser=p, identifier=ProtobufId(feature_id, response_action_id))
- self.feature_id = feature_id
- self.action_id = action_id
- self.response_action_id = response_action_id
- self.request_proto = request_proto
- self.response_proto = response_proto
- self.additional_matching_ids: set[ProtobufId | CmdId] = additional_matching_ids or set()
- assert self._parser
- for matching_id in [*self.additional_matching_ids, ProtobufId(feature_id, response_action_id)]:
- GlobalParsers.add(matching_id, self._parser)
- GlobalParsers.add_feature_action_id_mapping(self.feature_id, self.response_action_id)
- def _build_data(self, **kwargs: Any) -> bytearray:
- """Build the byte data to prepare for command sending
- Args:
- **kwargs (Any): arguments to command to use to build protobuf
- Returns:
- bytearray: built byte data
- """
- proto = self.request_proto()
- for attr_name, arg in kwargs.items():
- value = arg.value if issubclass(type(arg), enum.Enum) else arg
- attr = getattr(proto, attr_name)
- # Protobuf "repeatable" (i.e. iterable) fields can not be set directly and must be appended / extended
- if isinstance(attr, Iterable) and not isinstance(value, (str, bytes)):
- if isinstance(value, Iterable):
- for element in value:
- attr.append(element.value if isinstance(element, enum.Enum) else element) # type: ignore
- else:
- attr.append(value.value if isinstance(value, enum.Enum) else value) # type:ignore
- else:
- setattr(proto, attr_name, value)
- # Prepend headers and serialize
- return bytearray([self.feature_id.value, self.action_id.value, *proto.SerializeToString()])
- def __str__(self) -> str:
- return self.action_id.name.lower().replace("_", " ").removeprefix("actionid").title()
- def _as_dict(self, **kwargs: Any) -> JsonDict:
- """Return the attributes of the command as a dict
- Args:
- **kwargs (Any): additional entries for the dict
- Returns:
- JsonDict: command as dict
- """
- return {"id": self.action_id, "feature_id": self.feature_id, **self._base_dict} | kwargs
- def ble_write_command(
- uuid: BleUUID,
- cmd: CmdId,
- param_builder: BytesBuilder | None = None,
- parser: Parser | None = None,
- rules: MessageRules = MessageRules(),
- ) -> Callable:
- """Decorator to build and encapsulate a BleWriteCommand in a Callable
- Args:
- uuid (BleUUID): UUID to write to
- cmd (CmdId): command identifier
- param_builder (BytesBuilder | None): builds bytes from params. Defaults to None.
- parser (Parser | None): response parser to parse received bytes. Defaults to None.
- rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
- Returns:
- Callable: built callable to perform operation
- """
- message = BleWriteCommand(uuid, cmd, param_builder, parser)
- @wrapt.decorator
- async def wrapper(wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any) -> GoProResp:
- return await instance._communicator._send_ble_message(message, rules, **(await wrapped(**kwargs) or kwargs))
- return wrapper
- def ble_read_command(uuid: BleUUID, parser: Parser) -> Callable:
- """Decorator to build a BleReadCommand and wrapper to execute it
- Args:
- uuid (BleUUID): BleUUID to read from
- parser (Parser): the parser that will parse the received bytestream into a JSON dict
- Returns:
- Callable: Generated method to perform command
- """
- message = BleReadCommand(uuid, parser)
- @wrapt.decorator
- async def wrapper(wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any) -> GoProResp:
- return await instance._communicator._read_ble_characteristic(message, **(await wrapped(**kwargs) or kwargs))
- return wrapper
- def ble_register_command(
- uuid: BleUUID,
- cmd: CmdId,
- update_set: type[SettingId] | type[StatusId],
- parser: Parser | None = None,
- ) -> Callable:
- """Decorator to build a RegisterUnregisterAll command and wrapper to execute it
- Args:
- uuid (BleUUID): UUID to write to
- cmd (CmdId): Command ID that is being sent
- update_set (type[SettingId] | type[StatusId]): set of ID's being registered for
- parser (Parser | None): Optional response parser. Defaults to None.
- Returns:
- Callable: Generated method to perform command
- """
- register_message = RegisterUnregisterAll(uuid, cmd, update_set, RegisterUnregisterAll.Action.REGISTER, parser)
- unregister_message = RegisterUnregisterAll(uuid, cmd, update_set, RegisterUnregisterAll.Action.UNREGISTER, parser)
- @wrapt.decorator
- async def wrapper(
- wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any
- ) -> ResultE[GoProCompositeObservable]:
- internal_update_type = (
- GoProBle._CompositeRegisterType.ALL_STATUSES
- if update_set == StatusId
- else GoProBle._CompositeRegisterType.ALL_SETTINGS
- )
- try:
- return ResultE.from_value(
- await GoProCompositeObservable(
- gopro=instance._communicator,
- update=internal_update_type,
- register_command=instance._communicator._send_ble_message(
- register_message, **(await wrapped(**kwargs) or kwargs)
- ),
- unregister_command=instance._communicator._send_ble_message(
- unregister_message, **(await wrapped(**kwargs) or kwargs)
- ),
- ).start()
- )
- except GoProError as e:
- logger.error(f"Failed to register for {update_set} ==> {e}")
- return ResultE.from_failure(e)
- return wrapper
- def ble_proto_command(
- uuid: BleUUID,
- feature_id: FeatureId,
- action_id: ActionId,
- response_action_id: ActionId,
- request_proto: type[Protobuf],
- response_proto: type[Protobuf],
- parser: Parser | None = None,
- additional_matching_ids: set[ProtobufId | CmdId] | None = None,
- rules: MessageRules = MessageRules(),
- ) -> Callable:
- """Decorator to build a BLE Protobuf command and wrapper to execute it
- Args:
- uuid (BleUUID): BleUUID to write to
- feature_id (FeatureId): Feature ID that is being executed
- action_id (ActionId): protobuf specific action ID that is being executed
- response_action_id (ActionId): the action ID that will be in the response to this command
- request_proto (type[Protobuf]): the action ID that will be in the response
- response_proto (type[Protobuf]): protobuf used to parse received bytestream
- parser (Parser | None): Response parser to transform received Protobuf bytes. Defaults to None.
- additional_matching_ids (set[ProtobufId | CmdId] | None): Other action ID's to share this parser. This is used,
- for example, if a notification shares the same ID as the synchronous response. Defaults to None.
- rules (MessageRules): rules that describe sending / receiving the message. Defaults to MessageRules() (no rules).
- Returns:
- Callable: Generated method to perform command
- """
- message = BleProtoCommand(
- uuid,
- feature_id,
- action_id,
- response_action_id,
- request_proto,
- response_proto,
- parser,
- additional_matching_ids,
- )
- @wrapt.decorator
- async def wrapper(wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any) -> GoProResp:
- return await instance._communicator._send_ble_message(message, rules, **(await wrapped(**kwargs) or kwargs))
- return wrapper
- @dataclass
- class BleAsyncResponse:
- """A BLE protobuf response that is not associated with any message.
- Attributes:
- feature_id (FeatureId): Feature ID that response corresponds to
- action_id (ActionId): Action ID that response corresponds to
- parser (Parser): response parser
- """
- feature_id: FeatureId
- action_id: ActionId
- parser: Parser
- def __str__(self) -> str:
- return self.action_id.name.lower().replace("_", " ").removeprefix("actionid").title()
- class BuilderProtocol(Protocol):
- """Protocol definition of data building methods"""
- def __call__(self, **kwargs: Any) -> bytearray: # noqa: D102
- ...
- class BleSettingFacade(Generic[T]):
- """Wrapper around BleSetting since a BleSetting's message definition changes based on how it is being operated on.
- Raises:
- TypeError: Parser builder is not a valid type
- Attributes:
- SETTER_UUID (Final[BleUUID]): UUID used to perform set operation
- READER_UUID (Final[BleUUID]): UUID used to perform read operation
- Args:
- communicator (GoProBle): BLE communicator that will operate on this object.
- identifier (SettingId): Setting Identifier
- parser_builder (QueryParserType): Parses responses from bytes and builds requests to bytes.
- """
- SETTER_UUID: Final[BleUUID] = GoProUUID.CQ_SETTINGS
- READER_UUID: Final[BleUUID] = GoProUUID.CQ_QUERY
- class BleSettingMessageBase(BleMessage):
- """Actual BLE Setting Message that is wrapped by the facade.
- Args:
- uuid (BleUUID): UUID to access this setting.
- identifier (SettingId | QueryCmdId): How responses to operations on this message will be identified.
- setting_id (SettingId): Setting identifier. May match identifier in some cases.
- builder (BuilderProtocol): Build request bytes from the current message.
- """
- def __init__(
- self,
- uuid: BleUUID,
- identifier: SettingId | QueryCmdId,
- setting_id: SettingId,
- builder: BuilderProtocol,
- ) -> None:
- self._build = builder
- self._setting_id = setting_id
- super().__init__(uuid, identifier, None) # type: ignore
- def _build_data(self, **kwargs: Any) -> bytearray:
- return self._build(**kwargs)
- def _as_dict(self, **kwargs: Any) -> JsonDict:
- d = {"id": self._identifier, "setting_id": self._setting_id, **self._base_dict} | kwargs
- return d
- def __init__(self, communicator: GoProBle, identifier: SettingId, parser_builder: QueryParserType) -> None:
- # TODO abstract this
- parser = Parser[CameraState]()
- if isinstance(parser_builder, construct.Construct):
- parser.byte_json_adapter = ConstructByteParserBuilder(parser_builder)
- elif isinstance(parser_builder, BytesParserBuilder):
- parser.byte_json_adapter = parser_builder
- elif issubclass(parser_builder, GoProIntEnum):
- parser.byte_json_adapter = GoProEnumByteParserBuilder(parser_builder)
- else:
- raise TypeError(f"Unexpected {parser_builder=}")
- GlobalParsers.add(identifier, parser)
- self._identifier = identifier
- self._builder = parser.byte_json_adapter
- self._communicator = communicator
- def _build_cmd(self, cmd: QueryCmdId) -> bytearray:
- """Build the data
- Args:
- cmd (QueryCmdId): query command
- Returns:
- bytearray: built data
- """
- return bytearray([cmd.value, int(self._identifier)])
- async def set(self, value: T) -> GoProResp[None]:
- """Set the value of the setting.
- Args:
- value (T): The argument to use to set the setting value.
- Returns:
- GoProResp[None]: Status of set
- """
- def _build_data(**kwargs: Any) -> bytearray:
- # Special case. Can't use _send_query
- data = bytearray([int(self._identifier)])
- try:
- param = self._builder.build(kwargs["value"])
- data.extend([len(param), *param])
- except IndexError:
- pass
- return data
- message = BleSettingFacade.BleSettingMessageBase(
- BleSettingFacade.SETTER_UUID,
- self._identifier,
- self._identifier,
- lambda **_: _build_data(value=value),
- )
- return await self._communicator._send_ble_message(message)
- async def get_value(self) -> GoProResp[T]:
- """Get the settings value.
- Returns:
- GoProResp[T]: settings value
- """
- message = BleSettingFacade.BleSettingMessageBase(
- BleSettingFacade.READER_UUID,
- QueryCmdId.GET_SETTING_VAL,
- self._identifier,
- lambda **_: self._build_cmd(QueryCmdId.GET_SETTING_VAL),
- )
- return await self._communicator._send_ble_message(message)
- async def get_name(self) -> GoProResp[str]:
- """Get the settings name.
- Raises:
- NotImplementedError: This isn't implemented on the camera
- Returns:
- GoProResp[str]: setting name as string
- """
- raise NotImplementedError("Not implemented on camera!")
- async def get_capabilities_values(self) -> GoProResp[list[T]]:
- """Get currently supported settings capabilities values.
- Returns:
- GoProResp[list[T]]: settings capabilities values
- """
- message = BleSettingFacade.BleSettingMessageBase(
- BleSettingFacade.READER_UUID,
- QueryCmdId.GET_CAPABILITIES_VAL,
- self._identifier,
- lambda **_: self._build_cmd(QueryCmdId.GET_CAPABILITIES_VAL),
- )
- return await self._communicator._send_ble_message(message)
- async def get_capabilities_names(self) -> GoProResp[list[str]]:
- """Get currently supported settings capabilities names.
- Raises:
- NotImplementedError: This isn't implemented on the camera
- Returns:
- GoProResp[list[str]]: list of capability names as strings
- """
- raise NotImplementedError("Not implemented on camera!")
- async def get_value_observable(self) -> ResultE[GoProObservable[T]]:
- """Receive an observable of asynchronously notified setting values.
- Returns:
- ResultE[GoProObservable[T]]: data observable if successful otherwise an error
- """
- register_message = BleSettingFacade.BleSettingMessageBase(
- BleSettingFacade.READER_UUID,
- QueryCmdId.REG_SETTING_VAL_UPDATE,
- self._identifier,
- lambda **_: self._build_cmd(QueryCmdId.REG_SETTING_VAL_UPDATE),
- )
- unregister_message = BleSettingFacade.BleSettingMessageBase(
- BleSettingFacade.READER_UUID,
- QueryCmdId.UNREG_SETTING_VAL_UPDATE,
- self._identifier,
- lambda **_: self._build_cmd(QueryCmdId.UNREG_SETTING_VAL_UPDATE),
- )
- return ResultE.from_value(
- await GoProObservable[T](
- gopro=self._communicator,
- update=self._identifier,
- register_command=self._communicator._send_ble_message(register_message),
- unregister_command=self._communicator._send_ble_message(unregister_message),
- ).start()
- )
- async def get_capabilities_observable(self) -> ResultE[GoProObservable[list[T]]]:
- """Receive an observable of asynchronously notified lists of setting value capabilities.
- Returns:
- ResultE[GoProObservable[list[T]]]: data observable if successful otherwise an error
- """
- register_message = BleSettingFacade.BleSettingMessageBase(
- BleSettingFacade.READER_UUID,
- QueryCmdId.REG_CAPABILITIES_UPDATE,
- self._identifier,
- lambda **_: self._build_cmd(QueryCmdId.REG_CAPABILITIES_UPDATE),
- )
- unregister_message = BleSettingFacade.BleSettingMessageBase(
- BleSettingFacade.READER_UUID,
- QueryCmdId.UNREG_CAPABILITIES_UPDATE,
- self._identifier,
- lambda **_: self._build_cmd(QueryCmdId.UNREG_CAPABILITIES_UPDATE),
- )
- return ResultE.from_value(
- await GoProObservable[list[T]](
- gopro=self._communicator,
- update=self._identifier,
- register_command=self._communicator._send_ble_message(register_message),
- unregister_command=self._communicator._send_ble_message(unregister_message),
- ).start()
- )
- def __str__(self) -> str:
- return str(self._identifier).lower().replace("_", " ").title()
- class BleStatusFacade(Generic[T]):
- """Wrapper around BleStatus since a BleStatus's message definition changes based on how it is being operated on.
- Attributes:
- UUID (Final[BleUUID]): attribute ID used to perform set operation
- Args:
- communicator (GoProBle): BLE communicator that will operate on this object.
- identifier (StatusId): Status identifier
- parser (QueryParserType): Parser responses from bytes
- Raises:
- TypeError: Attempted to pass an invalid parser type
- """
- UUID: Final[BleUUID] = GoProUUID.CQ_QUERY
- class BleStatusMessageBase(BleMessage):
- """An individual camera status that is interacted with via BLE.
- Args:
- uuid (BleUUID): UUID to access this status.
- identifier (StatusId | QueryCmdId): How responses to operations on this message will be identified.
- status_id (StatusId): Status identifier. May match identifier in some cases.
- builder (Callable[[Any], bytearray]): Build request bytes from the current message.
- """
- def __init__(
- self,
- uuid: BleUUID,
- identifier: StatusId | QueryCmdId,
- status_id: StatusId,
- builder: Callable[[Any], bytearray],
- ) -> None:
- self._build = builder
- self._status_id = status_id
- super().__init__(uuid, identifier, None) # type: ignore
- def _build_data(self, **kwargs: Any) -> bytearray:
- return self._build(self, **kwargs)
- def _as_dict(self, **kwargs: Any) -> JsonDict:
- return {"id": self._identifier, "status_id": self._status_id, **self._base_dict} | kwargs
- def __init__(self, communicator: GoProBle, identifier: StatusId, parser: QueryParserType) -> None:
- # TODO abstract this
- parser_builder = Parser[CameraState]()
- # Is it a protobuf enum?
- if isinstance(parser, construct.Construct):
- parser_builder.byte_json_adapter = ConstructByteParserBuilder(parser)
- elif isinstance(parser, BytesParserBuilder):
- parser_builder.byte_json_adapter = parser
- elif issubclass(parser, GoProIntEnum):
- parser_builder.byte_json_adapter = GoProEnumByteParserBuilder(parser)
- else:
- raise TypeError(f"Unexpected {parser_builder=}")
- GlobalParsers.add(identifier, parser_builder)
- self._communicator = communicator
- self._identifier = identifier
- def __str__(self) -> str:
- return str(self._identifier).lower().replace("_", " ").title()
- async def get_value(self) -> GoProResp[T]:
- """Get the current value of a status.
- Returns:
- GoProResp[T]: current status value
- """
- message = BleStatusFacade.BleStatusMessageBase(
- BleStatusFacade.UUID,
- QueryCmdId.GET_STATUS_VAL,
- self._identifier,
- lambda *args: self._build_cmd(QueryCmdId.GET_STATUS_VAL),
- )
- return await self._communicator._send_ble_message(message)
- async def get_value_observable(self) -> ResultE[GoProObservable[T]]:
- """Register for asynchronous notifications when a status changes.
- Returns:
- ResultE[GoProObservable[T]]: current status value
- """
- register_message = BleStatusFacade.BleStatusMessageBase(
- BleStatusFacade.UUID,
- QueryCmdId.REG_STATUS_VAL_UPDATE,
- self._identifier,
- lambda *args: self._build_cmd(QueryCmdId.REG_STATUS_VAL_UPDATE),
- )
- unregister_message = BleStatusFacade.BleStatusMessageBase(
- BleStatusFacade.UUID,
- QueryCmdId.UNREG_STATUS_VAL_UPDATE,
- self._identifier,
- lambda *args: self._build_cmd(QueryCmdId.UNREG_STATUS_VAL_UPDATE),
- )
- return ResultE.from_value(
- await GoProObservable[T](
- gopro=self._communicator,
- update=self._identifier,
- register_command=self._communicator._send_ble_message(register_message),
- unregister_command=self._communicator._send_ble_message(unregister_message),
- ).start()
- )
- def _build_cmd(self, cmd: QueryCmdId) -> bytearray:
- """Build the data for a given status command.
- Args:
- cmd (QueryCmdId): command to build data for
- Returns:
- bytearray: data to send over-the-air
- """
- return bytearray([cmd.value, int(self._identifier)])
- ######################################################## HTTP #################################################
- def http_get_json_command(
- endpoint: str,
- components: list[str] | None = None,
- arguments: list[str] | None = None,
- parser: Parser | None = None,
- identifier: str | None = None,
- rules: MessageRules = MessageRules(),
- ) -> Callable:
- """Decorator to build and encapsulate a an Http Message that performs a GET to return JSON.
- Args:
- endpoint (str): base endpoint
- components (list[str] | None): Additional path components (i.e. endpoint/{COMPONENT}). Defaults to None.
- arguments (list[str] | None): Any arguments to be appended after endpoint (i.e. endpoint?{ARGUMENT}). Defaults to None.
- parser (Parser | None): Parser to handle received JSON. Defaults to None.
- identifier (str | None): explicit message identifier. If None, will be generated from endpoint.
- rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
- Returns:
- Callable: built callable to perform operation
- """
- message = HttpMessage(
- endpoint=endpoint, identifier=identifier, components=components, arguments=arguments, parser=parser
- )
- @wrapt.decorator
- async def wrapper(wrapped: Callable, instance: HttpMessages, _: Any, kwargs: Any) -> GoProResp:
- return await instance._communicator._get_json(message, rules=rules, **(await wrapped(**kwargs) or kwargs))
- return wrapper
- def http_get_binary_command(
- endpoint: str,
- components: list[str] | None = None,
- arguments: list[str] | None = None,
- parser: Parser | None = None,
- identifier: str | None = None,
- rules: MessageRules = MessageRules(),
- ) -> Callable:
- """Decorator to build and encapsulate a an Http Message that performs a GET to return a binary.
- Args:
- endpoint (str): base endpoint
- components (list[str] | None): Additional path components (i.e. endpoint/{COMPONENT}). Defaults to None.
- arguments (list[str] | None): Any arguments to be appended after endpoint (i.e. endpoint?{ARGUMENT}). Defaults to None.
- parser (Parser | None): Parser to handle received JSON. Defaults to None.
- identifier (str | None): explicit message identifier. If None, will be generated from endpoint.
- rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
- Returns:
- Callable: built callable to perform operation
- """
- message = HttpMessage(
- endpoint=endpoint, identifier=identifier, components=components, arguments=arguments, parser=parser
- )
- @wrapt.decorator
- async def wrapper(wrapped: Callable, instance: HttpMessages, _: Any, kwargs: Any) -> GoProResp:
- kwargs = await wrapped(**kwargs) or kwargs
- # If no local file was passed, used the file name of the camera file
- kwargs["local_file"] = (
- kwargs.pop("local_file") if "local_file" in kwargs else Path(kwargs["camera_file"].split("/")[-1])
- )
- return await instance._communicator._get_stream(message, rules=rules, **kwargs)
- return wrapper
- def http_put_json_command(
- endpoint: str,
- components: list[str] | None = None,
- arguments: list[str] | None = None,
- body_args: list[str] | None = None,
- parser: Parser | None = None,
- identifier: str | None = None,
- rules: MessageRules = MessageRules(),
- ) -> Callable:
- """Decorator to build and encapsulate a an Http Message that performs a PUT to return JSON.
- Args:
- endpoint (str): base endpoint
- components (list[str] | None): Additional path components (i.e. endpoint/{COMPONENT}). Defaults to None.
- arguments (list[str] | None): Any arguments to be appended after endpoint (i.e. endpoint?{ARGUMENT}). Defaults to None.
- body_args (list[str] | None): Arguments to be added to the body JSON. Defaults to None.
- parser (Parser | None): Parser to handle received JSON. Defaults to None.
- identifier (str | None): explicit message identifier. If None, will be generated from endpoint.
- rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
- Returns:
- Callable: built callable to perform operation
- """
- message = HttpMessage(
- endpoint=endpoint,
- identifier=identifier,
- body_args=body_args,
- arguments=arguments,
- components=components,
- parser=parser,
- )
- @wrapt.decorator
- async def wrapper(wrapped: Callable, instance: HttpMessages, _: Any, kwargs: Any) -> GoProResp:
- return await instance._communicator._put_json(message, rules=rules, **(await wrapped(**kwargs) or kwargs))
- return wrapper
- class HttpSetting(HttpMessage, Generic[T]):
- """An individual camera setting that is interacted with via Wifi."""
- def __init__(self, communicator: GoProHttp, identifier: SettingId) -> None:
- super().__init__("gopro/camera/setting?setting={setting}&option={option}", identifier)
- self._communicator = communicator
- # Note! It is assumed that BLE and HTTP settings are symmetric so we only add to the communicator's
- # parser in the BLE Setting.
- def __str__(self) -> str:
- return str(self._identifier).lower().replace("_", " ").title()
- def build_url(self, **kwargs: Any) -> str:
- """Build the endpoint from the current arguments
- Args:
- **kwargs (Any): run-time arguments
- Returns:
- str: built URL
- """
- assert not isinstance(self._identifier, ProtobufId) # needed to satisfy typing
- return self._endpoint.format(setting=int(self._identifier), option=int(kwargs["value"]))
- async def set(self, value: T) -> GoProResp:
- """Set the value of the setting.
- Args:
- value (T): value to set setting
- Returns:
- GoProResp: Status of set
- """
- response = await self._communicator._get_json(self, value=value)
- response.identifier = self._identifier
- logger.info(Logger.build_log_rx_str(response))
- return response
|