2023-08-24 15:29:57 +03:00
|
|
|
import struct
|
|
|
|
|
|
|
|
class DataInputStream:
|
|
|
|
def __init__(self, buffer: bytes):
|
|
|
|
self._buffer = buffer
|
|
|
|
self._cursor = 0
|
|
|
|
|
|
|
|
def read_bytes(self, n: int) -> bytes:
|
|
|
|
if self._cursor + n > len(self._buffer):
|
|
|
|
raise EOFError('stream overread')
|
|
|
|
blob = self._buffer[self._cursor : self._cursor + n]
|
|
|
|
self._cursor += n
|
|
|
|
return blob
|
|
|
|
|
|
|
|
def empty(self):
|
|
|
|
return self._cursor >= len(self._buffer) - 1
|
|
|
|
|
|
|
|
def end(self) -> bytes:
|
|
|
|
return self.read_bytes(len(self._buffer) - self._cursor)
|
|
|
|
|
|
|
|
def read_byte(self) -> int:
|
|
|
|
if self._cursor >= len(self._buffer):
|
2023-08-24 17:40:23 +03:00
|
|
|
print(f'\033[91mstream overread in {self._buffer}\033[0m')
|
2023-08-24 15:29:57 +03:00
|
|
|
raise EOFError('stream overread')
|
|
|
|
self._cursor += 1
|
|
|
|
return self._buffer[self._cursor - 1]
|
|
|
|
|
|
|
|
def read_boolean(self) -> bool:
|
|
|
|
return self.read_byte() != 0
|
|
|
|
|
|
|
|
def read_short(self) -> int:
|
|
|
|
return struct.unpack('>h', self.read_bytes(2))[0]
|
|
|
|
|
|
|
|
def read_ushort(self) -> int:
|
|
|
|
return struct.unpack('>H', self.read_bytes(2))[0]
|
|
|
|
|
|
|
|
def read_int(self) -> int:
|
|
|
|
return struct.unpack('>i', self.read_bytes(4))[0]
|
|
|
|
|
|
|
|
def read_uint(self) -> int:
|
|
|
|
return struct.unpack('>I', self.read_bytes(4))[0]
|
|
|
|
|
|
|
|
def read_long(self) -> int:
|
|
|
|
return struct.unpack('>q', self.read_bytes(8))[0]
|
|
|
|
|
|
|
|
def read_ulong(self) -> int:
|
|
|
|
return struct.unpack('>Q', self.read_bytes(8))[0]
|
|
|
|
|
|
|
|
def read_float(self) -> float:
|
|
|
|
return struct.unpack('>f', self.read_bytes(4))[0]
|
|
|
|
|
|
|
|
def read_double(self) -> float:
|
|
|
|
return struct.unpack('>d', self.read_bytes(8))[0]
|
|
|
|
|
|
|
|
def read_char(self) -> str:
|
|
|
|
return chr(self.read_ushort())
|
|
|
|
|
|
|
|
def read_varint(self, bits: int = 32) -> int:
|
|
|
|
value: int = 0
|
|
|
|
position: int = 0
|
|
|
|
while True:
|
|
|
|
byte = self.read_byte()
|
|
|
|
value |= (byte & 0x7F) << position
|
|
|
|
if ((byte & 0x80) == 0):
|
|
|
|
break
|
|
|
|
position += 7
|
|
|
|
if position >= bits:
|
|
|
|
raise ValueError('varint too big')
|
|
|
|
return value
|
|
|
|
|
|
|
|
def read_string(self) -> str:
|
|
|
|
size = self.read_short()
|
|
|
|
return self.read_bytes(size).decode('utf-8')
|
|
|
|
|