| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- # bytes.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
- # This copyright was auto-generated on Mon Apr 21 22:24:00 UTC 2025
- """Bytes parser / builders for data models"""
- from __future__ import annotations
- import datetime
- import logging
- from dataclasses import asdict
- from typing import Any, Generic, TypeVar
- import google.protobuf.json_format
- from construct import Construct, Flag, Int16sb, Int16ub
- from google.protobuf import descriptor
- from google.protobuf.json_format import MessageToDict as ProtobufToDict
- from open_gopro.domain.enum import GoProIntEnum, enum_factory
- from open_gopro.domain.parser_interface import (
- BytesBuilder,
- BytesParser,
- BytesParserBuilder,
- )
- from open_gopro.models.types import Protobuf
- from open_gopro.util import is_dataclass_instance, pretty_print, to_dict
- logger = logging.getLogger(__name__)
- ProtobufPrinter = google.protobuf.json_format._Printer # type: ignore # noqa
- original_field_to_json = ProtobufPrinter._FieldToJsonObject
- class ProtobufDictProxy(dict):
- """Proxy a dict to appear as an object by giving its keys attribute access"""
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- super().__init__(*args, **kwargs)
- self.__dict__ = self
- def __str__(self) -> str:
- return pretty_print(self.__dict__)
- @classmethod
- def from_proto(cls, proto_dict: dict) -> ProtobufDictProxy:
- """Build a proxy from a dictionary attr-name to value
- Args:
- proto_dict (dict): dict to build from
- Returns:
- ProtobufDictProxy: built proxy
- """
- def recurse(obj: Any) -> Any:
- # Recursion Cases
- if isinstance(obj, list):
- return [recurse(item) for item in obj]
- if isinstance(obj, dict):
- nested_dict = {}
- for key, value in obj.items():
- nested_dict[key] = recurse(value)
- return ProtobufDictProxy(nested_dict)
- # Base Case
- return obj
- return ProtobufDictProxy(recurse(proto_dict))
- class GoProEnumByteParserBuilder(BytesParserBuilder):
- """Parse into a GoProEnum
- Args:
- target (type[GoProIntEnum]): enum type to parse into
- """
- def __init__(self, target: type[GoProIntEnum]) -> None:
- self._container = target
- def parse(self, data: bytes) -> GoProIntEnum:
- """Parse bytes into GoPro enum
- Args:
- data (bytes): bytes to parse
- Returns:
- GoProIntEnum: parsed enum
- """
- return self._container(data[0])
- def build(self, *args: Any, **_: Any) -> bytes:
- """Build bytes from GoPro Enum
- Args:
- *args (Any): enum to use for building
- **_ (Any): not used
- Returns:
- bytes: built bytes
- """
- return bytes([int(args[0])])
- class ProtobufByteParser(BytesParser):
- """Parse into a protobuf object
- The actual returned type is a proxy to a protobuf object but it's attributes can be accessed
- using the protobuf definition
- Args:
- proto (type[Protobuf]): protobuf definition to parse (a proxy) into
- """
- def __init__(self, proto: type[Protobuf]) -> None:
- class Closure(BytesParser[dict]):
- """Parse bytes into a dict using the protobuf"""
- protobuf = proto
- # pylint: disable=not-callable
- def parse(self, data: bytes) -> Any:
- response: Protobuf = self.protobuf().FromString(bytes(data))
- # TODO can wetranslate from Protobuf enums without relying on Protobuf internal implementation?
- # Monkey patch the field-to-json function to use our enum translation
- ProtobufPrinter._FieldToJsonObject = lambda self, field, value: (
- enum_factory(field.enum_type)(value)
- if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM
- else original_field_to_json(self, field, value)
- )
- as_dict = ProtobufToDict(response, preserving_proto_field_name=True)
- # For any unset fields, use None
- for key in response.DESCRIPTOR.fields_by_name:
- if key not in as_dict:
- as_dict[key] = None
- # Proxy as an object
- return ProtobufDictProxy.from_proto(as_dict)
- self._proto_parser = Closure()
- def parse(self, data: bytes) -> dict:
- """Parse the bytes into a Protobuf Proxy
- Args:
- data (bytes): bytes to parse
- Returns:
- dict: protobuf proxy dict which provides attribute access
- """
- return self._proto_parser.parse(data)
- class DateTimeByteParserBuilder(BytesParser, BytesBuilder):
- """Handle local and non-local datetime parsing / building"""
- def build(self, obj: datetime.datetime, tzone: int | None = None, is_dst: bool | None = None) -> bytes:
- """Build bytestream from datetime and optional local arguments
- Args:
- obj (datetime.datetime): date and time
- tzone (int | None): timezone (as UTC offset). Defaults to None.
- is_dst (bool | None): is daylight savings time?. Defaults to None.
- Returns:
- bytes: bytestream built from datetime
- """
- byte_data = [*Int16ub.build(obj.year), obj.month, obj.day, obj.hour, obj.minute, obj.second]
- if tzone is not None and is_dst is not None:
- byte_data.extend([*Int16sb.build(tzone), *Flag.build(is_dst)])
- return bytes(byte_data)
- def parse(self, data: bytes) -> dict:
- """Parse bytestream into dict of datetime and potential timezone / dst
- Args:
- data (bytes): bytestream to parse
- Returns:
- dict: dict containing datetime
- """
- is_dst_tz = len(data) == 9
- buf = data[1:]
- year = Int16ub.parse(buf[0:2])
- dt = datetime.datetime(year, *[int(x) for x in buf[2:7]]) # type: ignore
- return (
- {"datetime": dt} if is_dst_tz else {"datetime": dt, "tzone": Int16sb.parse(buf[7:9]), "dst": bool(buf[9])}
- )
- class ConstructByteParserBuilder(BytesParserBuilder):
- """Parse bytes into a construct object
- Args:
- construct (Construct): construct definition
- """
- def __init__(self, construct: Construct) -> None:
- self._construct = self._construct_adapter_factory(construct)
- @classmethod
- def _construct_adapter_factory(cls, target: Construct) -> BytesParserBuilder:
- """Build a construct parser adapter from a construct
- Args:
- target (Construct): construct to use for parsing and building
- Returns:
- BytesParserBuilder: instance of generated class
- """
- class ParserBuilder(BytesParserBuilder):
- """Adapt the construct for our interface"""
- container = target
- def parse(self, data: bytes) -> Any:
- return self.container.parse(data)
- def build(self, *args: Any, **kwargs: Any) -> bytes:
- return self.container.build(*args, **kwargs)
- return ParserBuilder()
- def parse(self, data: bytes) -> Construct:
- """Parse bytes into construct container
- Args:
- data (bytes): bytes to parse
- Returns:
- Construct: construct container
- """
- return self._construct.parse(data)
- def build(self, obj: Construct) -> bytes:
- """Built bytes from filled out construct container
- Args:
- obj (Construct): construct container
- Returns:
- bytes: built bytes
- """
- return self._construct.build(obj)
- T = TypeVar("T")
- class ConstructDataclassByteParserBuilder(Generic[T], BytesParserBuilder[T]):
- """Helper class for byte building / parsing using a data class using Construct"""
- def __init__(self, construct: Construct, data_class: T, int_builder: Construct) -> None:
- self.construct = construct
- self.data_class = data_class
- self.int_builder = int_builder
- def parse(self, data: bytes) -> T: # noqa: D102
- return self.data_class(**to_dict(self.construct.parse(data))) # type: ignore
- def build(self, obj: Any) -> bytes: # noqa: D102
- if is_dataclass_instance(obj):
- return self.construct.build(asdict(obj))
- match obj:
- case int():
- return self.int_builder.build(obj)
- case _:
- raise TypeError(f"Can not build from type {type(obj)}")
- def __call__(self) -> ConstructDataclassByteParserBuilder:
- """Helper method to just return itself in order to be used similarly to other parsers that require instantiation
- Returns:
- ConstructDataclassByteParserBuilder: returns self
- """
- return self
|