builders.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908
  1. # builders.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
  2. # This copyright was auto-generated on Tue Sep 7 21:35:53 UTC 2021
  3. """Common functionality across API versions to build commands, settings, and statuses"""
  4. from __future__ import annotations
  5. import enum
  6. import logging
  7. from collections.abc import Iterable
  8. from dataclasses import dataclass
  9. from pathlib import Path
  10. from typing import Any, Callable, Final, Generic, Protocol, TypeVar, Union
  11. import construct
  12. import wrapt
  13. from returns.result import ResultE
  14. from open_gopro.domain.communicator_interface import (
  15. BleMessage,
  16. BleMessages,
  17. GoProBle,
  18. GoProHttp,
  19. HttpMessage,
  20. HttpMessages,
  21. MessageRules,
  22. )
  23. from open_gopro.domain.enum import GoProIntEnum
  24. from open_gopro.domain.exceptions import GoProError
  25. from open_gopro.domain.gopro_observable import GoProCompositeObservable, GoProObservable
  26. from open_gopro.domain.parser_interface import (
  27. BytesBuilder,
  28. BytesParserBuilder,
  29. GlobalParsers,
  30. Parser,
  31. )
  32. from open_gopro.models import GoProResp
  33. from open_gopro.models.constants import (
  34. ActionId,
  35. CmdId,
  36. FeatureId,
  37. GoProUUID,
  38. QueryCmdId,
  39. SettingId,
  40. StatusId,
  41. )
  42. from open_gopro.models.types import CameraState, JsonDict, Protobuf, ProtobufId
  43. from open_gopro.network.ble import BleUUID
  44. from open_gopro.parsers.bytes import (
  45. ConstructByteParserBuilder,
  46. GoProEnumByteParserBuilder,
  47. ProtobufByteParser,
  48. )
  49. from open_gopro.util.logger import Logger
  50. logger = logging.getLogger(__name__)
  51. QueryParserType = Union[construct.Construct, type[GoProIntEnum], BytesParserBuilder]
  52. ######################################################## BLE #################################################
  53. T = TypeVar("T")
  54. class BleReadCommand(BleMessage):
  55. """A BLE command that reads data from a BleUUID
  56. Args:
  57. uuid (BleUUID): BleUUID to read from
  58. parser (Parser): the parser that will parse the received bytestream into a JSON dict
  59. """
  60. def __init__(self, uuid: BleUUID, parser: Parser) -> None:
  61. super().__init__(uuid=uuid, parser=parser, identifier=uuid)
  62. def _build_data(self, **kwargs: Any) -> bytearray:
  63. # Read commands do not have data
  64. raise NotImplementedError
  65. def __str__(self) -> str:
  66. return f"Read {self._uuid.name.lower().replace('_', ' ').title()}"
  67. def _as_dict(self, **kwargs: Any) -> JsonDict:
  68. """Return the attributes of the command as a dict
  69. Args:
  70. **kwargs (Any): additional entries for the dict
  71. Returns:
  72. JsonDict: command as dict
  73. """
  74. return {"id": self._uuid, **self._base_dict} | kwargs
  75. class BleWriteCommand(BleMessage):
  76. """A BLE command that writes to a BleUUID and retrieves responses by accumulating notifications
  77. Args:
  78. uuid (BleUUID): UUID to write to
  79. cmd (CmdId): command identifier
  80. param_builder (BytesBuilder | None): builds bytes from params. Defaults to None.
  81. parser (Parser | None): response parser to parse received bytes. Defaults to None.
  82. rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
  83. """
  84. def __init__(
  85. self,
  86. uuid: BleUUID,
  87. cmd: CmdId,
  88. param_builder: BytesBuilder | None = None,
  89. parser: Parser | None = None,
  90. rules: MessageRules = MessageRules(),
  91. ) -> None:
  92. self.param_builder = param_builder
  93. self.cmd = cmd
  94. self.rules = rules
  95. super().__init__(uuid, cmd, parser)
  96. def _build_data(self, **kwargs: Any) -> bytearray:
  97. data = bytearray([self.cmd.value])
  98. params = bytearray()
  99. if self.param_builder:
  100. params.extend(self.param_builder.build(*kwargs.values()))
  101. else:
  102. for arg in kwargs.values():
  103. params.extend(arg.value if isinstance(arg, enum.Enum) else arg)
  104. if params:
  105. data.append(len(params))
  106. data.extend(params)
  107. return data
  108. def __str__(self) -> str:
  109. return self.cmd.name.lower().replace("_", " ").removeprefix("cmdid").title()
  110. def _as_dict(self, **kwargs: Any) -> JsonDict:
  111. """Return the attributes of the command as a dict
  112. Args:
  113. **kwargs (Any): additional entries for the dict
  114. Returns:
  115. JsonDict: command as dict
  116. """
  117. return {"id": self.cmd, **self._base_dict} | kwargs
  118. class RegisterUnregisterAll(BleWriteCommand):
  119. """Base class for register / unregister all commands
  120. This will loop over all of the elements (i.e. settings / statuses found from the element_set entry of the
  121. producer tuple parameter) and individually register / unregister (depending on the action parameter) each
  122. element in the set
  123. Args:
  124. uuid (BleUUID): UUID to write to
  125. cmd (CmdId): Command ID that is being sent
  126. update_set (type[SettingId] | type[StatusId]): what are registering / unregistering for?
  127. action (Action): whether to register or unregister
  128. parser (Parser | None): Optional response parser. Defaults to None.
  129. """
  130. class Action(enum.Enum):
  131. """Enum to differentiate between register actions"""
  132. REGISTER = enum.auto()
  133. UNREGISTER = enum.auto()
  134. def __init__(
  135. self,
  136. uuid: BleUUID,
  137. cmd: CmdId,
  138. update_set: type[SettingId] | type[StatusId],
  139. action: Action,
  140. parser: Parser | None = None,
  141. ) -> None:
  142. self.action = action
  143. self.update_set = update_set
  144. super().__init__(uuid=uuid, cmd=cmd, parser=parser)
  145. def _build_data(self, **kwargs: Any) -> bytearray:
  146. return bytearray([self.cmd.value])
  147. class BleProtoCommand(BleMessage):
  148. """A BLE command that is sent and received as using the Protobuf protocol
  149. Args:
  150. uuid (BleUUID): BleUUID to write to
  151. feature_id (FeatureId): Feature ID that is being executed
  152. action_id (ActionId): protobuf specific action ID that is being executed
  153. response_action_id (ActionId): the action ID that will be in the response to this command
  154. request_proto (type[Protobuf]): the action ID that will be in the response
  155. response_proto (type[Protobuf]): protobuf used to parse received bytestream
  156. parser (Parser | None): Optional response parser. Defaults to None.
  157. additional_matching_ids (set[ProtobufId | CmdId] | None): Other action ID's to share this parser. This is used, for
  158. example, if a notification shares the same ID as the synchronous response. Defaults to None. Defaults to None.
  159. """
  160. def __init__(
  161. self,
  162. uuid: BleUUID,
  163. feature_id: FeatureId,
  164. action_id: ActionId,
  165. response_action_id: ActionId,
  166. request_proto: type[Protobuf],
  167. response_proto: type[Protobuf],
  168. parser: Parser | None,
  169. additional_matching_ids: set[ProtobufId | CmdId] | None = None,
  170. ) -> None:
  171. p = parser or Parser()
  172. p.byte_json_adapter = ProtobufByteParser(response_proto)
  173. super().__init__(uuid=uuid, parser=p, identifier=ProtobufId(feature_id, response_action_id))
  174. self.feature_id = feature_id
  175. self.action_id = action_id
  176. self.response_action_id = response_action_id
  177. self.request_proto = request_proto
  178. self.response_proto = response_proto
  179. self.additional_matching_ids: set[ProtobufId | CmdId] = additional_matching_ids or set()
  180. assert self._parser
  181. for matching_id in [*self.additional_matching_ids, ProtobufId(feature_id, response_action_id)]:
  182. GlobalParsers.add(matching_id, self._parser)
  183. GlobalParsers.add_feature_action_id_mapping(self.feature_id, self.response_action_id)
  184. def _build_data(self, **kwargs: Any) -> bytearray:
  185. """Build the byte data to prepare for command sending
  186. Args:
  187. **kwargs (Any): arguments to command to use to build protobuf
  188. Returns:
  189. bytearray: built byte data
  190. """
  191. proto = self.request_proto()
  192. for attr_name, arg in kwargs.items():
  193. value = arg.value if issubclass(type(arg), enum.Enum) else arg
  194. attr = getattr(proto, attr_name)
  195. # Protobuf "repeatable" (i.e. iterable) fields can not be set directly and must be appended / extended
  196. if isinstance(attr, Iterable) and not isinstance(value, (str, bytes)):
  197. if isinstance(value, Iterable):
  198. for element in value:
  199. attr.append(element.value if isinstance(element, enum.Enum) else element) # type: ignore
  200. else:
  201. attr.append(value.value if isinstance(value, enum.Enum) else value) # type:ignore
  202. else:
  203. setattr(proto, attr_name, value)
  204. # Prepend headers and serialize
  205. return bytearray([self.feature_id.value, self.action_id.value, *proto.SerializeToString()])
  206. def __str__(self) -> str:
  207. return self.action_id.name.lower().replace("_", " ").removeprefix("actionid").title()
  208. def _as_dict(self, **kwargs: Any) -> JsonDict:
  209. """Return the attributes of the command as a dict
  210. Args:
  211. **kwargs (Any): additional entries for the dict
  212. Returns:
  213. JsonDict: command as dict
  214. """
  215. return {"id": self.action_id, "feature_id": self.feature_id, **self._base_dict} | kwargs
  216. def ble_write_command(
  217. uuid: BleUUID,
  218. cmd: CmdId,
  219. param_builder: BytesBuilder | None = None,
  220. parser: Parser | None = None,
  221. rules: MessageRules = MessageRules(),
  222. ) -> Callable:
  223. """Decorator to build and encapsulate a BleWriteCommand in a Callable
  224. Args:
  225. uuid (BleUUID): UUID to write to
  226. cmd (CmdId): command identifier
  227. param_builder (BytesBuilder | None): builds bytes from params. Defaults to None.
  228. parser (Parser | None): response parser to parse received bytes. Defaults to None.
  229. rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
  230. Returns:
  231. Callable: built callable to perform operation
  232. """
  233. message = BleWriteCommand(uuid, cmd, param_builder, parser)
  234. @wrapt.decorator
  235. async def wrapper(wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any) -> GoProResp:
  236. return await instance._communicator._send_ble_message(message, rules, **(await wrapped(**kwargs) or kwargs))
  237. return wrapper
  238. def ble_read_command(uuid: BleUUID, parser: Parser) -> Callable:
  239. """Decorator to build a BleReadCommand and wrapper to execute it
  240. Args:
  241. uuid (BleUUID): BleUUID to read from
  242. parser (Parser): the parser that will parse the received bytestream into a JSON dict
  243. Returns:
  244. Callable: Generated method to perform command
  245. """
  246. message = BleReadCommand(uuid, parser)
  247. @wrapt.decorator
  248. async def wrapper(wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any) -> GoProResp:
  249. return await instance._communicator._read_ble_characteristic(message, **(await wrapped(**kwargs) or kwargs))
  250. return wrapper
  251. def ble_register_command(
  252. uuid: BleUUID,
  253. cmd: CmdId,
  254. update_set: type[SettingId] | type[StatusId],
  255. parser: Parser | None = None,
  256. ) -> Callable:
  257. """Decorator to build a RegisterUnregisterAll command and wrapper to execute it
  258. Args:
  259. uuid (BleUUID): UUID to write to
  260. cmd (CmdId): Command ID that is being sent
  261. update_set (type[SettingId] | type[StatusId]): set of ID's being registered for
  262. parser (Parser | None): Optional response parser. Defaults to None.
  263. Returns:
  264. Callable: Generated method to perform command
  265. """
  266. register_message = RegisterUnregisterAll(uuid, cmd, update_set, RegisterUnregisterAll.Action.REGISTER, parser)
  267. unregister_message = RegisterUnregisterAll(uuid, cmd, update_set, RegisterUnregisterAll.Action.UNREGISTER, parser)
  268. @wrapt.decorator
  269. async def wrapper(
  270. wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any
  271. ) -> ResultE[GoProCompositeObservable]:
  272. internal_update_type = (
  273. GoProBle._CompositeRegisterType.ALL_STATUSES
  274. if update_set == StatusId
  275. else GoProBle._CompositeRegisterType.ALL_SETTINGS
  276. )
  277. try:
  278. return ResultE.from_value(
  279. await GoProCompositeObservable(
  280. gopro=instance._communicator,
  281. update=internal_update_type,
  282. register_command=instance._communicator._send_ble_message(
  283. register_message, **(await wrapped(**kwargs) or kwargs)
  284. ),
  285. unregister_command=instance._communicator._send_ble_message(
  286. unregister_message, **(await wrapped(**kwargs) or kwargs)
  287. ),
  288. ).start()
  289. )
  290. except GoProError as e:
  291. logger.error(f"Failed to register for {update_set} ==> {e}")
  292. return ResultE.from_failure(e)
  293. return wrapper
  294. def ble_proto_command(
  295. uuid: BleUUID,
  296. feature_id: FeatureId,
  297. action_id: ActionId,
  298. response_action_id: ActionId,
  299. request_proto: type[Protobuf],
  300. response_proto: type[Protobuf],
  301. parser: Parser | None = None,
  302. additional_matching_ids: set[ProtobufId | CmdId] | None = None,
  303. rules: MessageRules = MessageRules(),
  304. ) -> Callable:
  305. """Decorator to build a BLE Protobuf command and wrapper to execute it
  306. Args:
  307. uuid (BleUUID): BleUUID to write to
  308. feature_id (FeatureId): Feature ID that is being executed
  309. action_id (ActionId): protobuf specific action ID that is being executed
  310. response_action_id (ActionId): the action ID that will be in the response to this command
  311. request_proto (type[Protobuf]): the action ID that will be in the response
  312. response_proto (type[Protobuf]): protobuf used to parse received bytestream
  313. parser (Parser | None): Response parser to transform received Protobuf bytes. Defaults to None.
  314. additional_matching_ids (set[ProtobufId | CmdId] | None): Other action ID's to share this parser. This is used,
  315. for example, if a notification shares the same ID as the synchronous response. Defaults to None.
  316. rules (MessageRules): rules that describe sending / receiving the message. Defaults to MessageRules() (no rules).
  317. Returns:
  318. Callable: Generated method to perform command
  319. """
  320. message = BleProtoCommand(
  321. uuid,
  322. feature_id,
  323. action_id,
  324. response_action_id,
  325. request_proto,
  326. response_proto,
  327. parser,
  328. additional_matching_ids,
  329. )
  330. @wrapt.decorator
  331. async def wrapper(wrapped: Callable, instance: BleMessages, _: Any, kwargs: Any) -> GoProResp:
  332. return await instance._communicator._send_ble_message(message, rules, **(await wrapped(**kwargs) or kwargs))
  333. return wrapper
  334. @dataclass
  335. class BleAsyncResponse:
  336. """A BLE protobuf response that is not associated with any message.
  337. Attributes:
  338. feature_id (FeatureId): Feature ID that response corresponds to
  339. action_id (ActionId): Action ID that response corresponds to
  340. parser (Parser): response parser
  341. """
  342. feature_id: FeatureId
  343. action_id: ActionId
  344. parser: Parser
  345. def __str__(self) -> str:
  346. return self.action_id.name.lower().replace("_", " ").removeprefix("actionid").title()
  347. class BuilderProtocol(Protocol):
  348. """Protocol definition of data building methods"""
  349. def __call__(self, **kwargs: Any) -> bytearray: # noqa: D102
  350. ...
  351. class BleSettingFacade(Generic[T]):
  352. """Wrapper around BleSetting since a BleSetting's message definition changes based on how it is being operated on.
  353. Raises:
  354. TypeError: Parser builder is not a valid type
  355. Attributes:
  356. SETTER_UUID (Final[BleUUID]): UUID used to perform set operation
  357. READER_UUID (Final[BleUUID]): UUID used to perform read operation
  358. Args:
  359. communicator (GoProBle): BLE communicator that will operate on this object.
  360. identifier (SettingId): Setting Identifier
  361. parser_builder (QueryParserType): Parses responses from bytes and builds requests to bytes.
  362. """
  363. SETTER_UUID: Final[BleUUID] = GoProUUID.CQ_SETTINGS
  364. READER_UUID: Final[BleUUID] = GoProUUID.CQ_QUERY
  365. class BleSettingMessageBase(BleMessage):
  366. """Actual BLE Setting Message that is wrapped by the facade.
  367. Args:
  368. uuid (BleUUID): UUID to access this setting.
  369. identifier (SettingId | QueryCmdId): How responses to operations on this message will be identified.
  370. setting_id (SettingId): Setting identifier. May match identifier in some cases.
  371. builder (BuilderProtocol): Build request bytes from the current message.
  372. """
  373. def __init__(
  374. self,
  375. uuid: BleUUID,
  376. identifier: SettingId | QueryCmdId,
  377. setting_id: SettingId,
  378. builder: BuilderProtocol,
  379. ) -> None:
  380. self._build = builder
  381. self._setting_id = setting_id
  382. super().__init__(uuid, identifier, None) # type: ignore
  383. def _build_data(self, **kwargs: Any) -> bytearray:
  384. return self._build(**kwargs)
  385. def _as_dict(self, **kwargs: Any) -> JsonDict:
  386. d = {"id": self._identifier, "setting_id": self._setting_id, **self._base_dict} | kwargs
  387. return d
  388. def __init__(self, communicator: GoProBle, identifier: SettingId, parser_builder: QueryParserType) -> None:
  389. # TODO abstract this
  390. parser = Parser[CameraState]()
  391. if isinstance(parser_builder, construct.Construct):
  392. parser.byte_json_adapter = ConstructByteParserBuilder(parser_builder)
  393. elif isinstance(parser_builder, BytesParserBuilder):
  394. parser.byte_json_adapter = parser_builder
  395. elif issubclass(parser_builder, GoProIntEnum):
  396. parser.byte_json_adapter = GoProEnumByteParserBuilder(parser_builder)
  397. else:
  398. raise TypeError(f"Unexpected {parser_builder=}")
  399. GlobalParsers.add(identifier, parser)
  400. self._identifier = identifier
  401. self._builder = parser.byte_json_adapter
  402. self._communicator = communicator
  403. def _build_cmd(self, cmd: QueryCmdId) -> bytearray:
  404. """Build the data
  405. Args:
  406. cmd (QueryCmdId): query command
  407. Returns:
  408. bytearray: built data
  409. """
  410. return bytearray([cmd.value, int(self._identifier)])
  411. async def set(self, value: T) -> GoProResp[None]:
  412. """Set the value of the setting.
  413. Args:
  414. value (T): The argument to use to set the setting value.
  415. Returns:
  416. GoProResp[None]: Status of set
  417. """
  418. def _build_data(**kwargs: Any) -> bytearray:
  419. # Special case. Can't use _send_query
  420. data = bytearray([int(self._identifier)])
  421. try:
  422. param = self._builder.build(kwargs["value"])
  423. data.extend([len(param), *param])
  424. except IndexError:
  425. pass
  426. return data
  427. message = BleSettingFacade.BleSettingMessageBase(
  428. BleSettingFacade.SETTER_UUID,
  429. self._identifier,
  430. self._identifier,
  431. lambda **_: _build_data(value=value),
  432. )
  433. return await self._communicator._send_ble_message(message)
  434. async def get_value(self) -> GoProResp[T]:
  435. """Get the settings value.
  436. Returns:
  437. GoProResp[T]: settings value
  438. """
  439. message = BleSettingFacade.BleSettingMessageBase(
  440. BleSettingFacade.READER_UUID,
  441. QueryCmdId.GET_SETTING_VAL,
  442. self._identifier,
  443. lambda **_: self._build_cmd(QueryCmdId.GET_SETTING_VAL),
  444. )
  445. return await self._communicator._send_ble_message(message)
  446. async def get_name(self) -> GoProResp[str]:
  447. """Get the settings name.
  448. Raises:
  449. NotImplementedError: This isn't implemented on the camera
  450. Returns:
  451. GoProResp[str]: setting name as string
  452. """
  453. raise NotImplementedError("Not implemented on camera!")
  454. async def get_capabilities_values(self) -> GoProResp[list[T]]:
  455. """Get currently supported settings capabilities values.
  456. Returns:
  457. GoProResp[list[T]]: settings capabilities values
  458. """
  459. message = BleSettingFacade.BleSettingMessageBase(
  460. BleSettingFacade.READER_UUID,
  461. QueryCmdId.GET_CAPABILITIES_VAL,
  462. self._identifier,
  463. lambda **_: self._build_cmd(QueryCmdId.GET_CAPABILITIES_VAL),
  464. )
  465. return await self._communicator._send_ble_message(message)
  466. async def get_capabilities_names(self) -> GoProResp[list[str]]:
  467. """Get currently supported settings capabilities names.
  468. Raises:
  469. NotImplementedError: This isn't implemented on the camera
  470. Returns:
  471. GoProResp[list[str]]: list of capability names as strings
  472. """
  473. raise NotImplementedError("Not implemented on camera!")
  474. async def get_value_observable(self) -> ResultE[GoProObservable[T]]:
  475. """Receive an observable of asynchronously notified setting values.
  476. Returns:
  477. ResultE[GoProObservable[T]]: data observable if successful otherwise an error
  478. """
  479. register_message = BleSettingFacade.BleSettingMessageBase(
  480. BleSettingFacade.READER_UUID,
  481. QueryCmdId.REG_SETTING_VAL_UPDATE,
  482. self._identifier,
  483. lambda **_: self._build_cmd(QueryCmdId.REG_SETTING_VAL_UPDATE),
  484. )
  485. unregister_message = BleSettingFacade.BleSettingMessageBase(
  486. BleSettingFacade.READER_UUID,
  487. QueryCmdId.UNREG_SETTING_VAL_UPDATE,
  488. self._identifier,
  489. lambda **_: self._build_cmd(QueryCmdId.UNREG_SETTING_VAL_UPDATE),
  490. )
  491. return ResultE.from_value(
  492. await GoProObservable[T](
  493. gopro=self._communicator,
  494. update=self._identifier,
  495. register_command=self._communicator._send_ble_message(register_message),
  496. unregister_command=self._communicator._send_ble_message(unregister_message),
  497. ).start()
  498. )
  499. async def get_capabilities_observable(self) -> ResultE[GoProObservable[list[T]]]:
  500. """Receive an observable of asynchronously notified lists of setting value capabilities.
  501. Returns:
  502. ResultE[GoProObservable[list[T]]]: data observable if successful otherwise an error
  503. """
  504. register_message = BleSettingFacade.BleSettingMessageBase(
  505. BleSettingFacade.READER_UUID,
  506. QueryCmdId.REG_CAPABILITIES_UPDATE,
  507. self._identifier,
  508. lambda **_: self._build_cmd(QueryCmdId.REG_CAPABILITIES_UPDATE),
  509. )
  510. unregister_message = BleSettingFacade.BleSettingMessageBase(
  511. BleSettingFacade.READER_UUID,
  512. QueryCmdId.UNREG_CAPABILITIES_UPDATE,
  513. self._identifier,
  514. lambda **_: self._build_cmd(QueryCmdId.UNREG_CAPABILITIES_UPDATE),
  515. )
  516. return ResultE.from_value(
  517. await GoProObservable[list[T]](
  518. gopro=self._communicator,
  519. update=self._identifier,
  520. register_command=self._communicator._send_ble_message(register_message),
  521. unregister_command=self._communicator._send_ble_message(unregister_message),
  522. ).start()
  523. )
  524. def __str__(self) -> str:
  525. return str(self._identifier).lower().replace("_", " ").title()
  526. class BleStatusFacade(Generic[T]):
  527. """Wrapper around BleStatus since a BleStatus's message definition changes based on how it is being operated on.
  528. Attributes:
  529. UUID (Final[BleUUID]): attribute ID used to perform set operation
  530. Args:
  531. communicator (GoProBle): BLE communicator that will operate on this object.
  532. identifier (StatusId): Status identifier
  533. parser (QueryParserType): Parser responses from bytes
  534. Raises:
  535. TypeError: Attempted to pass an invalid parser type
  536. """
  537. UUID: Final[BleUUID] = GoProUUID.CQ_QUERY
  538. class BleStatusMessageBase(BleMessage):
  539. """An individual camera status that is interacted with via BLE.
  540. Args:
  541. uuid (BleUUID): UUID to access this status.
  542. identifier (StatusId | QueryCmdId): How responses to operations on this message will be identified.
  543. status_id (StatusId): Status identifier. May match identifier in some cases.
  544. builder (Callable[[Any], bytearray]): Build request bytes from the current message.
  545. """
  546. def __init__(
  547. self,
  548. uuid: BleUUID,
  549. identifier: StatusId | QueryCmdId,
  550. status_id: StatusId,
  551. builder: Callable[[Any], bytearray],
  552. ) -> None:
  553. self._build = builder
  554. self._status_id = status_id
  555. super().__init__(uuid, identifier, None) # type: ignore
  556. def _build_data(self, **kwargs: Any) -> bytearray:
  557. return self._build(self, **kwargs)
  558. def _as_dict(self, **kwargs: Any) -> JsonDict:
  559. return {"id": self._identifier, "status_id": self._status_id, **self._base_dict} | kwargs
  560. def __init__(self, communicator: GoProBle, identifier: StatusId, parser: QueryParserType) -> None:
  561. # TODO abstract this
  562. parser_builder = Parser[CameraState]()
  563. # Is it a protobuf enum?
  564. if isinstance(parser, construct.Construct):
  565. parser_builder.byte_json_adapter = ConstructByteParserBuilder(parser)
  566. elif isinstance(parser, BytesParserBuilder):
  567. parser_builder.byte_json_adapter = parser
  568. elif issubclass(parser, GoProIntEnum):
  569. parser_builder.byte_json_adapter = GoProEnumByteParserBuilder(parser)
  570. else:
  571. raise TypeError(f"Unexpected {parser_builder=}")
  572. GlobalParsers.add(identifier, parser_builder)
  573. self._communicator = communicator
  574. self._identifier = identifier
  575. def __str__(self) -> str:
  576. return str(self._identifier).lower().replace("_", " ").title()
  577. async def get_value(self) -> GoProResp[T]:
  578. """Get the current value of a status.
  579. Returns:
  580. GoProResp[T]: current status value
  581. """
  582. message = BleStatusFacade.BleStatusMessageBase(
  583. BleStatusFacade.UUID,
  584. QueryCmdId.GET_STATUS_VAL,
  585. self._identifier,
  586. lambda *args: self._build_cmd(QueryCmdId.GET_STATUS_VAL),
  587. )
  588. return await self._communicator._send_ble_message(message)
  589. async def get_value_observable(self) -> ResultE[GoProObservable[T]]:
  590. """Register for asynchronous notifications when a status changes.
  591. Returns:
  592. ResultE[GoProObservable[T]]: current status value
  593. """
  594. register_message = BleStatusFacade.BleStatusMessageBase(
  595. BleStatusFacade.UUID,
  596. QueryCmdId.REG_STATUS_VAL_UPDATE,
  597. self._identifier,
  598. lambda *args: self._build_cmd(QueryCmdId.REG_STATUS_VAL_UPDATE),
  599. )
  600. unregister_message = BleStatusFacade.BleStatusMessageBase(
  601. BleStatusFacade.UUID,
  602. QueryCmdId.UNREG_STATUS_VAL_UPDATE,
  603. self._identifier,
  604. lambda *args: self._build_cmd(QueryCmdId.UNREG_STATUS_VAL_UPDATE),
  605. )
  606. return ResultE.from_value(
  607. await GoProObservable[T](
  608. gopro=self._communicator,
  609. update=self._identifier,
  610. register_command=self._communicator._send_ble_message(register_message),
  611. unregister_command=self._communicator._send_ble_message(unregister_message),
  612. ).start()
  613. )
  614. def _build_cmd(self, cmd: QueryCmdId) -> bytearray:
  615. """Build the data for a given status command.
  616. Args:
  617. cmd (QueryCmdId): command to build data for
  618. Returns:
  619. bytearray: data to send over-the-air
  620. """
  621. return bytearray([cmd.value, int(self._identifier)])
  622. ######################################################## HTTP #################################################
  623. def http_get_json_command(
  624. endpoint: str,
  625. components: list[str] | None = None,
  626. arguments: list[str] | None = None,
  627. parser: Parser | None = None,
  628. identifier: str | None = None,
  629. rules: MessageRules = MessageRules(),
  630. ) -> Callable:
  631. """Decorator to build and encapsulate a an Http Message that performs a GET to return JSON.
  632. Args:
  633. endpoint (str): base endpoint
  634. components (list[str] | None): Additional path components (i.e. endpoint/{COMPONENT}). Defaults to None.
  635. arguments (list[str] | None): Any arguments to be appended after endpoint (i.e. endpoint?{ARGUMENT}). Defaults to None.
  636. parser (Parser | None): Parser to handle received JSON. Defaults to None.
  637. identifier (str | None): explicit message identifier. If None, will be generated from endpoint.
  638. rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
  639. Returns:
  640. Callable: built callable to perform operation
  641. """
  642. message = HttpMessage(
  643. endpoint=endpoint, identifier=identifier, components=components, arguments=arguments, parser=parser
  644. )
  645. @wrapt.decorator
  646. async def wrapper(wrapped: Callable, instance: HttpMessages, _: Any, kwargs: Any) -> GoProResp:
  647. return await instance._communicator._get_json(message, rules=rules, **(await wrapped(**kwargs) or kwargs))
  648. return wrapper
  649. def http_get_binary_command(
  650. endpoint: str,
  651. components: list[str] | None = None,
  652. arguments: list[str] | None = None,
  653. parser: Parser | None = None,
  654. identifier: str | None = None,
  655. rules: MessageRules = MessageRules(),
  656. ) -> Callable:
  657. """Decorator to build and encapsulate a an Http Message that performs a GET to return a binary.
  658. Args:
  659. endpoint (str): base endpoint
  660. components (list[str] | None): Additional path components (i.e. endpoint/{COMPONENT}). Defaults to None.
  661. arguments (list[str] | None): Any arguments to be appended after endpoint (i.e. endpoint?{ARGUMENT}). Defaults to None.
  662. parser (Parser | None): Parser to handle received JSON. Defaults to None.
  663. identifier (str | None): explicit message identifier. If None, will be generated from endpoint.
  664. rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
  665. Returns:
  666. Callable: built callable to perform operation
  667. """
  668. message = HttpMessage(
  669. endpoint=endpoint, identifier=identifier, components=components, arguments=arguments, parser=parser
  670. )
  671. @wrapt.decorator
  672. async def wrapper(wrapped: Callable, instance: HttpMessages, _: Any, kwargs: Any) -> GoProResp:
  673. kwargs = await wrapped(**kwargs) or kwargs
  674. # If no local file was passed, used the file name of the camera file
  675. kwargs["local_file"] = (
  676. kwargs.pop("local_file") if "local_file" in kwargs else Path(kwargs["camera_file"].split("/")[-1])
  677. )
  678. return await instance._communicator._get_stream(message, rules=rules, **kwargs)
  679. return wrapper
  680. def http_put_json_command(
  681. endpoint: str,
  682. components: list[str] | None = None,
  683. arguments: list[str] | None = None,
  684. body_args: list[str] | None = None,
  685. parser: Parser | None = None,
  686. identifier: str | None = None,
  687. rules: MessageRules = MessageRules(),
  688. ) -> Callable:
  689. """Decorator to build and encapsulate a an Http Message that performs a PUT to return JSON.
  690. Args:
  691. endpoint (str): base endpoint
  692. components (list[str] | None): Additional path components (i.e. endpoint/{COMPONENT}). Defaults to None.
  693. arguments (list[str] | None): Any arguments to be appended after endpoint (i.e. endpoint?{ARGUMENT}). Defaults to None.
  694. body_args (list[str] | None): Arguments to be added to the body JSON. Defaults to None.
  695. parser (Parser | None): Parser to handle received JSON. Defaults to None.
  696. identifier (str | None): explicit message identifier. If None, will be generated from endpoint.
  697. rules (MessageRules): rules this Message must obey. Defaults to MessageRules().
  698. Returns:
  699. Callable: built callable to perform operation
  700. """
  701. message = HttpMessage(
  702. endpoint=endpoint,
  703. identifier=identifier,
  704. body_args=body_args,
  705. arguments=arguments,
  706. components=components,
  707. parser=parser,
  708. )
  709. @wrapt.decorator
  710. async def wrapper(wrapped: Callable, instance: HttpMessages, _: Any, kwargs: Any) -> GoProResp:
  711. return await instance._communicator._put_json(message, rules=rules, **(await wrapped(**kwargs) or kwargs))
  712. return wrapper
  713. class HttpSetting(HttpMessage, Generic[T]):
  714. """An individual camera setting that is interacted with via Wifi."""
  715. def __init__(self, communicator: GoProHttp, identifier: SettingId) -> None:
  716. super().__init__("gopro/camera/setting?setting={setting}&option={option}", identifier)
  717. self._communicator = communicator
  718. # Note! It is assumed that BLE and HTTP settings are symmetric so we only add to the communicator's
  719. # parser in the BLE Setting.
  720. def __str__(self) -> str:
  721. return str(self._identifier).lower().replace("_", " ").title()
  722. def build_url(self, **kwargs: Any) -> str:
  723. """Build the endpoint from the current arguments
  724. Args:
  725. **kwargs (Any): run-time arguments
  726. Returns:
  727. str: built URL
  728. """
  729. assert not isinstance(self._identifier, ProtobufId) # needed to satisfy typing
  730. return self._endpoint.format(setting=int(self._identifier), option=int(kwargs["value"]))
  731. async def set(self, value: T) -> GoProResp:
  732. """Set the value of the setting.
  733. Args:
  734. value (T): value to set setting
  735. Returns:
  736. GoProResp: Status of set
  737. """
  738. response = await self._communicator._get_json(self, value=value)
  739. response.identifier = self._identifier
  740. logger.info(Logger.build_log_rx_str(response))
  741. return response