bytes.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. # bytes.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
  2. # This copyright was auto-generated on Mon Apr 21 22:24:00 UTC 2025
  3. """Bytes parser / builders for data models"""
  4. from __future__ import annotations
  5. import datetime
  6. import logging
  7. from dataclasses import asdict
  8. from typing import Any, Generic, TypeVar
  9. import google.protobuf.json_format
  10. from construct import Construct, Flag, Int16sb, Int16ub
  11. from google.protobuf import descriptor
  12. from google.protobuf.json_format import MessageToDict as ProtobufToDict
  13. from open_gopro.domain.enum import GoProIntEnum, enum_factory
  14. from open_gopro.domain.parser_interface import (
  15. BytesBuilder,
  16. BytesParser,
  17. BytesParserBuilder,
  18. )
  19. from open_gopro.models.types import Protobuf
  20. from open_gopro.util import is_dataclass_instance, pretty_print, to_dict
  21. logger = logging.getLogger(__name__)
  22. ProtobufPrinter = google.protobuf.json_format._Printer # type: ignore # noqa
  23. original_field_to_json = ProtobufPrinter._FieldToJsonObject
  24. class ProtobufDictProxy(dict):
  25. """Proxy a dict to appear as an object by giving its keys attribute access"""
  26. def __init__(self, *args: Any, **kwargs: Any) -> None:
  27. super().__init__(*args, **kwargs)
  28. self.__dict__ = self
  29. def __str__(self) -> str:
  30. return pretty_print(self.__dict__)
  31. @classmethod
  32. def from_proto(cls, proto_dict: dict) -> ProtobufDictProxy:
  33. """Build a proxy from a dictionary attr-name to value
  34. Args:
  35. proto_dict (dict): dict to build from
  36. Returns:
  37. ProtobufDictProxy: built proxy
  38. """
  39. def recurse(obj: Any) -> Any:
  40. # Recursion Cases
  41. if isinstance(obj, list):
  42. return [recurse(item) for item in obj]
  43. if isinstance(obj, dict):
  44. nested_dict = {}
  45. for key, value in obj.items():
  46. nested_dict[key] = recurse(value)
  47. return ProtobufDictProxy(nested_dict)
  48. # Base Case
  49. return obj
  50. return ProtobufDictProxy(recurse(proto_dict))
  51. class GoProEnumByteParserBuilder(BytesParserBuilder):
  52. """Parse into a GoProEnum
  53. Args:
  54. target (type[GoProIntEnum]): enum type to parse into
  55. """
  56. def __init__(self, target: type[GoProIntEnum]) -> None:
  57. self._container = target
  58. def parse(self, data: bytes) -> GoProIntEnum:
  59. """Parse bytes into GoPro enum
  60. Args:
  61. data (bytes): bytes to parse
  62. Returns:
  63. GoProIntEnum: parsed enum
  64. """
  65. return self._container(data[0])
  66. def build(self, *args: Any, **_: Any) -> bytes:
  67. """Build bytes from GoPro Enum
  68. Args:
  69. *args (Any): enum to use for building
  70. **_ (Any): not used
  71. Returns:
  72. bytes: built bytes
  73. """
  74. return bytes([int(args[0])])
  75. class ProtobufByteParser(BytesParser):
  76. """Parse into a protobuf object
  77. The actual returned type is a proxy to a protobuf object but it's attributes can be accessed
  78. using the protobuf definition
  79. Args:
  80. proto (type[Protobuf]): protobuf definition to parse (a proxy) into
  81. """
  82. def __init__(self, proto: type[Protobuf]) -> None:
  83. class Closure(BytesParser[dict]):
  84. """Parse bytes into a dict using the protobuf"""
  85. protobuf = proto
  86. # pylint: disable=not-callable
  87. def parse(self, data: bytes) -> Any:
  88. response: Protobuf = self.protobuf().FromString(bytes(data))
  89. # TODO can wetranslate from Protobuf enums without relying on Protobuf internal implementation?
  90. # Monkey patch the field-to-json function to use our enum translation
  91. ProtobufPrinter._FieldToJsonObject = lambda self, field, value: (
  92. enum_factory(field.enum_type)(value)
  93. if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM
  94. else original_field_to_json(self, field, value)
  95. )
  96. as_dict = ProtobufToDict(response, preserving_proto_field_name=True)
  97. # For any unset fields, use None
  98. for key in response.DESCRIPTOR.fields_by_name:
  99. if key not in as_dict:
  100. as_dict[key] = None
  101. # Proxy as an object
  102. return ProtobufDictProxy.from_proto(as_dict)
  103. self._proto_parser = Closure()
  104. def parse(self, data: bytes) -> dict:
  105. """Parse the bytes into a Protobuf Proxy
  106. Args:
  107. data (bytes): bytes to parse
  108. Returns:
  109. dict: protobuf proxy dict which provides attribute access
  110. """
  111. return self._proto_parser.parse(data)
  112. class DateTimeByteParserBuilder(BytesParser, BytesBuilder):
  113. """Handle local and non-local datetime parsing / building"""
  114. def build(self, obj: datetime.datetime, tzone: int | None = None, is_dst: bool | None = None) -> bytes:
  115. """Build bytestream from datetime and optional local arguments
  116. Args:
  117. obj (datetime.datetime): date and time
  118. tzone (int | None): timezone (as UTC offset). Defaults to None.
  119. is_dst (bool | None): is daylight savings time?. Defaults to None.
  120. Returns:
  121. bytes: bytestream built from datetime
  122. """
  123. byte_data = [*Int16ub.build(obj.year), obj.month, obj.day, obj.hour, obj.minute, obj.second]
  124. if tzone is not None and is_dst is not None:
  125. byte_data.extend([*Int16sb.build(tzone), *Flag.build(is_dst)])
  126. return bytes(byte_data)
  127. def parse(self, data: bytes) -> dict:
  128. """Parse bytestream into dict of datetime and potential timezone / dst
  129. Args:
  130. data (bytes): bytestream to parse
  131. Returns:
  132. dict: dict containing datetime
  133. """
  134. is_dst_tz = len(data) == 9
  135. buf = data[1:]
  136. year = Int16ub.parse(buf[0:2])
  137. dt = datetime.datetime(year, *[int(x) for x in buf[2:7]]) # type: ignore
  138. return (
  139. {"datetime": dt} if is_dst_tz else {"datetime": dt, "tzone": Int16sb.parse(buf[7:9]), "dst": bool(buf[9])}
  140. )
  141. class ConstructByteParserBuilder(BytesParserBuilder):
  142. """Parse bytes into a construct object
  143. Args:
  144. construct (Construct): construct definition
  145. """
  146. def __init__(self, construct: Construct) -> None:
  147. self._construct = self._construct_adapter_factory(construct)
  148. @classmethod
  149. def _construct_adapter_factory(cls, target: Construct) -> BytesParserBuilder:
  150. """Build a construct parser adapter from a construct
  151. Args:
  152. target (Construct): construct to use for parsing and building
  153. Returns:
  154. BytesParserBuilder: instance of generated class
  155. """
  156. class ParserBuilder(BytesParserBuilder):
  157. """Adapt the construct for our interface"""
  158. container = target
  159. def parse(self, data: bytes) -> Any:
  160. return self.container.parse(data)
  161. def build(self, *args: Any, **kwargs: Any) -> bytes:
  162. return self.container.build(*args, **kwargs)
  163. return ParserBuilder()
  164. def parse(self, data: bytes) -> Construct:
  165. """Parse bytes into construct container
  166. Args:
  167. data (bytes): bytes to parse
  168. Returns:
  169. Construct: construct container
  170. """
  171. return self._construct.parse(data)
  172. def build(self, obj: Construct) -> bytes:
  173. """Built bytes from filled out construct container
  174. Args:
  175. obj (Construct): construct container
  176. Returns:
  177. bytes: built bytes
  178. """
  179. return self._construct.build(obj)
  180. T = TypeVar("T")
  181. class ConstructDataclassByteParserBuilder(Generic[T], BytesParserBuilder[T]):
  182. """Helper class for byte building / parsing using a data class using Construct"""
  183. def __init__(self, construct: Construct, data_class: T, int_builder: Construct) -> None:
  184. self.construct = construct
  185. self.data_class = data_class
  186. self.int_builder = int_builder
  187. def parse(self, data: bytes) -> T: # noqa: D102
  188. return self.data_class(**to_dict(self.construct.parse(data))) # type: ignore
  189. def build(self, obj: Any) -> bytes: # noqa: D102
  190. if is_dataclass_instance(obj):
  191. return self.construct.build(asdict(obj))
  192. match obj:
  193. case int():
  194. return self.int_builder.build(obj)
  195. case _:
  196. raise TypeError(f"Can not build from type {type(obj)}")
  197. def __call__(self) -> ConstructDataclassByteParserBuilder:
  198. """Helper method to just return itself in order to be used similarly to other parsers that require instantiation
  199. Returns:
  200. ConstructDataclassByteParserBuilder: returns self
  201. """
  202. return self