response.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. # response.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
  2. # This copyright was auto-generated on Mon Apr 21 22:24:00 UTC 2025
  3. """GoPro response parsing"""
  4. import enum
  5. import logging
  6. from abc import ABC, abstractmethod
  7. from collections import defaultdict
  8. from typing import Any, Final, Generic, TypeVar
  9. import requests
  10. from open_gopro.domain.exceptions import ResponseParseError
  11. from open_gopro.domain.parser_interface import GlobalParsers, Parser
  12. from open_gopro.models import GoProBlePacketHeader, GoProResp
  13. from open_gopro.models.constants import (
  14. ActionId,
  15. CmdId,
  16. ErrorCode,
  17. FeatureId,
  18. GoProUUID,
  19. QueryCmdId,
  20. SettingId,
  21. StatusId,
  22. )
  23. from open_gopro.models.proto import EnumResultGeneric
  24. from open_gopro.models.types import CameraState, JsonDict, ProtobufId, ResponseType
  25. from open_gopro.network.ble import BleUUID
  26. from open_gopro.parsers.json import LambdaJsonParser
  27. CONT_MASK: Final = 0b10000000
  28. HDR_MASK: Final = 0b01100000
  29. GEN_LEN_MASK: Final = 0b00011111
  30. EXT_13_BYTE0_MASK: Final = 0b00011111
  31. logger = logging.getLogger(__name__)
  32. T = TypeVar("T")
  33. validResponseProtobufIds: Final[list[tuple[FeatureId, ActionId]]] = [
  34. (FeatureId.COMMAND, ActionId.SET_CAMERA_CONTROL_RSP),
  35. (FeatureId.COMMAND, ActionId.SET_LIVESTREAM_MODE_RSP),
  36. (FeatureId.COMMAND, ActionId.RESPONSE_PRESET_UPDATE_CUSTOM),
  37. (FeatureId.COMMAND, ActionId.RESPONSE_CLEAR_COHN_CERT),
  38. (FeatureId.COMMAND, ActionId.RESPONSE_CREATE_COHN_CERT),
  39. (FeatureId.COMMAND, ActionId.RESPONSE_COHN_SETTING),
  40. (FeatureId.COMMAND, ActionId.RELEASE_NETWORK_RSP),
  41. (FeatureId.NETWORK_MANAGEMENT, ActionId.SCAN_WIFI_NETWORKS_RSP),
  42. (FeatureId.NETWORK_MANAGEMENT, ActionId.NOTIF_START_SCAN),
  43. (FeatureId.NETWORK_MANAGEMENT, ActionId.GET_AP_ENTRIES_RSP),
  44. (FeatureId.NETWORK_MANAGEMENT, ActionId.REQUEST_WIFI_CONNECT_NEW_RSP),
  45. (FeatureId.NETWORK_MANAGEMENT, ActionId.REQUEST_WIFI_CONNECT_RSP),
  46. (FeatureId.NETWORK_MANAGEMENT, ActionId.NOTIF_PROVIS_STATE),
  47. (FeatureId.QUERY, ActionId.LIVESTREAM_STATUS_RSP),
  48. (FeatureId.QUERY, ActionId.LIVESTREAM_STATUS_NOTIF),
  49. (FeatureId.QUERY, ActionId.GET_PRESET_STATUS_RSP),
  50. (FeatureId.QUERY, ActionId.PRESET_MODIFIED_NOTIFICATION),
  51. (FeatureId.QUERY, ActionId.RESPONSE_GET_COHN_STATUS),
  52. (FeatureId.QUERY, ActionId.RESPONSE_GET_COHN_CERT),
  53. (FeatureId.QUERY, ActionId.INTERNAL_FF),
  54. (FeatureId.WIRELESS_MANAGEMENT, ActionId.SET_PAIRING_STATE_RSP),
  55. ]
  56. class RespBuilder(Generic[T], ABC):
  57. """Common Response Builder Interface"""
  58. class _State(enum.Enum):
  59. """Describes the state of building the response."""
  60. INITIALIZED = enum.auto()
  61. ACCUMULATED = enum.auto()
  62. PARSED = enum.auto()
  63. ERROR = enum.auto()
  64. def __init__(self) -> None:
  65. self._packet: T
  66. self._status: ErrorCode = ErrorCode.UNKNOWN
  67. self._state: RespBuilder._State = RespBuilder._State.INITIALIZED
  68. self._parser: Parser | None = None
  69. @abstractmethod
  70. def build(self) -> GoProResp[T]:
  71. """Build a response
  72. Returns:
  73. GoProResp[T]: built response
  74. """
  75. class HttpRespBuilder(RespBuilder[JsonDict]):
  76. """HTTP Response Builder
  77. This is not intended to be fool proof to use as the user must understand which fields are needed.
  78. Directors should be created if this needs to be simplified.
  79. """
  80. def __init__(self) -> None:
  81. super().__init__()
  82. self._endpoint: str
  83. self._response: JsonDict
  84. def set_response(self, response: JsonDict) -> None:
  85. """Store the JSON data. This is mandatory.
  86. Args:
  87. response (JsonDict): json data_
  88. """
  89. self._response = response
  90. def set_status(self, status: ErrorCode) -> None:
  91. """Store the status. This is mandatory.
  92. Args:
  93. status (ErrorCode): status of response
  94. """
  95. self._status = status
  96. def set_parser(self, parser: Parser) -> None:
  97. """Store a parser. This is optional.
  98. Args:
  99. parser (Parser): monolithic parser
  100. """
  101. self._parser = parser
  102. def set_endpoint(self, endpoint: str) -> None:
  103. """Store the endpoint. This is mandatory.
  104. Args:
  105. endpoint (str): endpoint of response.
  106. """
  107. self._endpoint = endpoint
  108. def build(self) -> GoProResp:
  109. """Build the GoPro response from the information accumulated about the HTTP response
  110. Returns:
  111. GoProResp: built response
  112. """
  113. # Is there a parser for this? Most of them do not have one yet.
  114. data = self._parser.parse(self._response) if self._parser else self._response
  115. return GoProResp(
  116. protocol=GoProResp.Protocol.HTTP,
  117. status=self._status,
  118. identifier=self._endpoint,
  119. data=data,
  120. )
  121. class RequestsHttpRespBuilderDirector:
  122. """An abstraction to help simplify using the HTTP Response Builder for requests
  123. Args:
  124. response (requests.models.Response): direct response from requests
  125. parser (Parser | None): parsers to use on the requests response
  126. """
  127. def __init__(self, response: requests.models.Response, parser: Parser | None) -> None:
  128. self.response = response
  129. self.parser = parser or Parser(json_parser=LambdaJsonParser(lambda data: data))
  130. def __call__(self) -> GoProResp:
  131. """Build the response
  132. Returns:
  133. GoProResp: built response
  134. """
  135. builder = HttpRespBuilder()
  136. builder.set_endpoint(self.response.url)
  137. builder.set_status(ErrorCode.SUCCESS if self.response.ok else ErrorCode.ERROR)
  138. builder.set_parser(self.parser)
  139. builder.set_response(self.response.json() if self.response.text else {})
  140. return builder.build()
  141. class BleRespBuilder(RespBuilder[bytearray]):
  142. """BLE Response Builder
  143. This is not intended to be fool proof to use as the user must understand which fields are needed.
  144. Directors should be created if this needs to be simplified.
  145. """
  146. def __init__(self) -> None:
  147. self._bytes_remaining = 0
  148. self._uuid: BleUUID
  149. self._identifier: ResponseType
  150. self._feature_id: FeatureId | None = None
  151. self._action_id: ActionId | None = None
  152. super().__init__()
  153. @property
  154. def is_response_protobuf(self) -> bool:
  155. """Is this a protobuf response?
  156. Returns:
  157. bool: True if protobuf, False otherwise
  158. """
  159. return isinstance(self._identifier, (ActionId, FeatureId))
  160. @classmethod
  161. def identify_response(cls, uuid: BleUUID, packet: bytearray) -> ResponseType:
  162. """Get the identifier based on what is currently known about the packet
  163. Args:
  164. uuid (BleUUID): UUID packet was received on
  165. packet (bytearray): raw bytes contained in packet
  166. Returns:
  167. ResponseType: identifier of this response
  168. """
  169. try:
  170. # If it's a protobuf command
  171. if (packet[0], packet[1]) in validResponseProtobufIds:
  172. return ProtobufId(FeatureId(packet[0]), ActionId(packet[1]))
  173. identifier = packet[0]
  174. # Otherwise it's a TLV command
  175. if uuid is GoProUUID.CQ_SETTINGS_RESP:
  176. return SettingId(identifier)
  177. if uuid is GoProUUID.CQ_QUERY_RESP:
  178. return QueryCmdId(identifier)
  179. if uuid in [GoProUUID.CQ_COMMAND_RESP, GoProUUID.CN_NET_MGMT_RESP]:
  180. return CmdId(identifier)
  181. return uuid
  182. except ValueError:
  183. # There is a special case where an unsupported protobuf message was sent. In this case, the only identifier
  184. # we have is the feature ID.
  185. return ProtobufId(FeatureId(packet[0]), None)
  186. def set_parser(self, parser: Parser) -> None:
  187. """Store a parser. This is optional.
  188. Args:
  189. parser (Parser): monolithic parser
  190. """
  191. self._parser = parser
  192. def set_packet(self, packet: bytes) -> None:
  193. """Store the complete data that comprises the response.
  194. This is mutually exclusive with accumulate. It is only for responses (such as direct UUID reads) that
  195. do not follow the packet fragmentation scheme.
  196. Args:
  197. packet (bytes): packet to store
  198. """
  199. self._packet = bytearray(packet)
  200. def accumulate(self, data: bytes) -> None:
  201. """Accumulate BLE byte data.
  202. This is mutually exclusive with accumulate. It should be used in any case where the response follows
  203. the packet fragmentation scheme.
  204. Args:
  205. data (bytes): byte level BLE data
  206. """
  207. buf = bytearray(data)
  208. if buf[0] & CONT_MASK:
  209. buf.pop(0)
  210. else:
  211. # This is a new packet so start with an empty byte array
  212. self._packet = bytearray([])
  213. hdr = GoProBlePacketHeader((buf[0] & HDR_MASK) >> 5)
  214. if hdr is GoProBlePacketHeader.GENERAL:
  215. self._bytes_remaining = buf[0] & GEN_LEN_MASK
  216. buf = buf[1:]
  217. elif hdr is GoProBlePacketHeader.EXT_13:
  218. self._bytes_remaining = ((buf[0] & EXT_13_BYTE0_MASK) << 8) + buf[1]
  219. buf = buf[2:]
  220. elif hdr is GoProBlePacketHeader.EXT_16:
  221. self._bytes_remaining = (buf[1] << 8) + buf[2]
  222. buf = buf[3:]
  223. # Append payload to buffer and update remaining / complete
  224. self._packet.extend(buf)
  225. self._bytes_remaining -= len(buf)
  226. if self._bytes_remaining < 0:
  227. logger.error("received too much data. parsing is in unknown state")
  228. elif self._bytes_remaining == 0:
  229. self._state = RespBuilder._State.ACCUMULATED
  230. def set_status(self, status: ErrorCode) -> None:
  231. """Store the status. This is sometimes optional.
  232. Args:
  233. status (ErrorCode): status
  234. """
  235. self._status = status
  236. def set_uuid(self, uuid: BleUUID) -> None:
  237. """Store the UUID. This is mandatory.
  238. Args:
  239. uuid (BleUUID): uuid
  240. """
  241. self._uuid = uuid
  242. @property
  243. def is_finished_accumulating(self) -> bool:
  244. """Has the response been completely received?
  245. Returns:
  246. bool: True if completely received, False if not
  247. """
  248. return self._state is not RespBuilder._State.INITIALIZED
  249. @property
  250. def _is_protobuf(self) -> bool:
  251. """Is this response a protobuf response
  252. Returns:
  253. bool: Yes if true, No otherwise
  254. """
  255. return isinstance(self._identifier, ProtobufId)
  256. @property
  257. def _is_direct_read(self) -> bool:
  258. """Is this response a direct read of a BLE characteristic
  259. Returns:
  260. bool: Yes if true, No otherwise
  261. """
  262. return isinstance(self._identifier, BleUUID)
  263. def build(self) -> GoProResp:
  264. """Parse the accumulated response (either from a BLE bytestream or an HTTP JSON dict).
  265. Raises:
  266. NotImplementedError: Parsing for this id is not yet supported
  267. ResponseParseError: Error when parsing data
  268. Returns:
  269. GoProResp: built response
  270. """
  271. try:
  272. self._identifier = self.identify_response(self._uuid, self._packet)
  273. buf = self._packet
  274. if not self._is_direct_read: # length byte
  275. buf.pop(0)
  276. if self._is_protobuf: # feature ID byte
  277. # This is a special case where we have a protobuf error response. It does not contain the Action ID.
  278. if self._identifier.action_id is None: # type: ignore
  279. return GoProResp(
  280. protocol=GoProResp.Protocol.BLE,
  281. status=ErrorCode(buf[0]),
  282. data=None,
  283. identifier=self._identifier,
  284. )
  285. buf.pop(0)
  286. parsed: Any = None
  287. query_type: type[StatusId] | type[SettingId] | StatusId | SettingId | None = None
  288. # Need to delineate QueryCmd responses between settings and status
  289. if not self._is_protobuf:
  290. if isinstance(self._identifier, (SettingId, StatusId)):
  291. query_type = self._identifier
  292. elif isinstance(self._identifier, QueryCmdId):
  293. if self._identifier in [
  294. QueryCmdId.GET_STATUS_VAL,
  295. QueryCmdId.REG_STATUS_VAL_UPDATE,
  296. QueryCmdId.UNREG_STATUS_VAL_UPDATE,
  297. QueryCmdId.STATUS_VAL_PUSH,
  298. ]:
  299. query_type = StatusId
  300. elif self._identifier is QueryCmdId.GET_SETTING_NAME:
  301. raise NotImplementedError
  302. else:
  303. query_type = SettingId
  304. # Query (setting get value, status get value, etc.)
  305. if query_type:
  306. camera_state: CameraState = defaultdict(list)
  307. self._status = ErrorCode(buf[0])
  308. buf = buf[1:]
  309. # Parse all parameters
  310. while len(buf) != 0:
  311. param_len = buf[1]
  312. try:
  313. param_id = query_type(buf[0]) # type: ignore
  314. except ValueError:
  315. # We don't handle this entity. Ensure to advance past the value.
  316. buf = buf[2 + param_len :]
  317. continue
  318. buf = buf[2:]
  319. # Special case where we register for a push notification for something that does not yet have a value
  320. if param_len == 0:
  321. camera_state[param_id] = []
  322. continue
  323. param_val = buf[:param_len]
  324. buf = buf[param_len:]
  325. # Add parsed value to response's data dict
  326. try:
  327. if not (parser := GlobalParsers.get_parser(param_id)):
  328. # We don't have defined params for all ID's yet. Just store raw bytes
  329. logger.warning(f"No parser defined for {param_id}")
  330. camera_state[param_id] = param_val.hex(":")
  331. continue
  332. # These can be more than 1 value so use a list
  333. if self._identifier in [
  334. QueryCmdId.GET_CAPABILITIES_VAL,
  335. QueryCmdId.REG_CAPABILITIES_UPDATE,
  336. QueryCmdId.SETTING_CAPABILITY_PUSH,
  337. ]:
  338. # Parse using parser from global map and append
  339. camera_state[param_id].append(parser.parse(param_val))
  340. else:
  341. # Parse using parser from map and set
  342. camera_state[param_id] = parser.parse(param_val)
  343. except ValueError:
  344. # This is the case where we receive a value that is not defined in our params.
  345. # This shouldn't happen and means the documentation needs to be updated. However, it
  346. # isn't functionally critical
  347. logger.warning(f"{param_id} does not contain a value {param_val}")
  348. camera_state[param_id] = param_val
  349. parsed = camera_state
  350. else: # Commands, Protobuf, and direct Reads
  351. if is_cmd := isinstance(self._identifier, CmdId):
  352. # All (non-protobuf) commands have a status
  353. self._status = ErrorCode(buf[0])
  354. buf = buf[1:]
  355. # Use parser if explicitly passed otherwise get global parser
  356. if not (parser := self._parser or GlobalParsers.get_parser(self._identifier)) and not is_cmd:
  357. error_msg = f"No parser exists for {self._identifier}"
  358. logger.error(error_msg)
  359. raise ResponseParseError(str(self._identifier), self._packet, msg=error_msg)
  360. # Parse payload if a parser was found.
  361. if parser:
  362. parsed = parser.parse(buf)
  363. # TODO make status checking an abstract method of a shared base class
  364. # Attempt to determine and / or extract status (we already got command status above)
  365. if self._is_direct_read and len(self._packet):
  366. # Assume success on direct reads if there was any data
  367. self._status = ErrorCode.SUCCESS
  368. # Check for result field in protobuf's
  369. elif self._is_protobuf and "result" in parsed:
  370. self._status = (
  371. ErrorCode.SUCCESS
  372. if parsed.get("result") == EnumResultGeneric.RESULT_SUCCESS
  373. else ErrorCode.ERROR
  374. )
  375. except Exception as e:
  376. self._state = RespBuilder._State.ERROR
  377. raise ResponseParseError(str(self._identifier), buf) from e
  378. # Recursively scrub away parsing artifacts
  379. self._state = RespBuilder._State.PARSED
  380. return GoProResp(protocol=GoProResp.Protocol.BLE, status=self._status, data=parsed, identifier=self._identifier)