bta-proxy/bta_proxy/packets/base.py

69 lines
2.3 KiB
Python

from typing import Any, ClassVar, Type
from ..datainputstream import DataInputStream
class Packet:
REGISTRY: ClassVar[dict[int, Type['Packet']]] = {}
FIELDS: ClassVar[list[tuple[str, Any]]] = []
PACKET_ID: ClassVar[int] = 0x00
def __init__(self, **params):
for k, v in params.items():
setattr(self, k, v)
@classmethod
def read_from(cls, stream: DataInputStream) -> 'Packet':
fields: dict = {}
for key, datatype in cls.FIELDS:
fields[key] = cls.read_field(stream, datatype)
return cls(**fields)
@staticmethod
def read_field(stream: DataInputStream, datatype: Any):
match datatype:
case 'uint':
return stream.read_uint()
case 'int':
return stream.read_int()
case 'str':
return stream.read_string()
case 'str', length:
return stream.read_string()[:length]
case 'ulong':
return stream.read_ulong()
case 'long':
return stream.read_long()
case 'ushort':
return stream.read_ushort()
case 'short':
return stream.read_short()
case 'byte':
return stream.read_byte()
case 'float':
return stream.read_float()
case 'double':
return stream.read_double()
case 'bool':
return stream.read_boolean()
case 'bytes', length:
return stream.read_bytes(length)
case _:
raise ValueError(f'unknown type {datatype}')
def __init_subclass__(cls) -> None:
Packet.REGISTRY[cls.PACKET_ID] = cls
@classmethod
def parse_packet(cls, stream: DataInputStream) -> 'Packet':
packet_id: int = stream.read_byte()
if packet_id not in cls.REGISTRY:
stream._cursor -= 1
raise ValueError(f'invalid packet 0x{packet_id:02x}')
return cls.REGISTRY[packet_id].read_from(stream)
def __repr__(self):
pkt_name = self.REGISTRY[self.PACKET_ID].__name__
fields = []
for name, _ in self.FIELDS:
fields.append(f'{name}={getattr(self, name)!r}')
return f'<{pkt_name} {str.join(", ", fields)}'