from asyncio.queues import Queue from logging import getLogger import struct logger = getLogger(__name__) class AsyncDataInputStream: def __init__(self, queue: Queue): self.queue = queue self.buffer = b'' self.last = b'' self.offset = 0 def peek_rest(self): return self.buffer def read_rest(self): out = self.buffer self.buffer = b'' return out async def read_bytes(self, n: int) -> bytes: logger.debug(f"trying to read {n} bytes") if len(self.buffer) < n: self.last = (await self.queue.get()) logger.debug(f"new packet from the queue {self.last!r}") if not self.last: raise EOFError('empty packet was received') self.buffer += self.last self.offset -= len(self.last) out, self.buffer = self.buffer[:n], self.buffer[n:] self.offset += n return out async def read(self) -> int: self.offset += 1 return (await self.read_bytes(1))[0] read_ubyte = read async def read_byte(self) -> int: return struct.unpack('b', await self.read_bytes(1))[0] async def read_boolean(self) -> bool: return (await self.read()) != 0 async def read_short(self) -> int: return struct.unpack('>h', await self.read_bytes(2))[0] async def read_ushort(self) -> int: return struct.unpack('>H', await self.read_bytes(2))[0] async def read_int(self) -> int: return struct.unpack('>i', await self.read_bytes(4))[0] async def read_uint(self) -> int: return struct.unpack('>I', await self.read_bytes(4))[0] async def read_long(self) -> int: return struct.unpack('>q', await self.read_bytes(8))[0] async def read_ulong(self) -> int: return struct.unpack('>Q', await self.read_bytes(8))[0] async def read_float(self) -> float: return struct.unpack('>f', await self.read_bytes(4))[0] async def read_double(self) -> float: return struct.unpack('>d', await self.read_bytes(8))[0] async def read_char(self) -> str: return chr(await self.read_ushort()) async def read_varint(self, bits: int = 32) -> int: value: int = 0 position: int = 0 while True: byte = await self.read() value |= (byte & 0x7F) << position if ((byte & 0x80) == 0): break position += 7 if position >= bits: raise ValueError('varint too big') return value async def read_string(self) -> str: last = self.last size = await self.read_short() try: return (await self.read_bytes(size)).decode('utf-8') except Exception as e: raise ValueError(f'failed reading string of size {size} in {last!r}') from e class SyncDataInputStream: def __init__(self, buffer: bytes): self._buffer = buffer self._cursor = 0 def read_bytes(self, n: int) -> bytes: if self._cursor + n > len(self._buffer): raise EOFError('stream overread') blob = self._buffer[self._cursor : self._cursor + n] self._cursor += n return blob def empty(self): return self._cursor >= len(self._buffer) - 1 def end(self) -> bytes: return self.read_bytes(len(self._buffer) - self._cursor) def read(self) -> int: if self._cursor >= len(self._buffer): raise EOFError(f'stream overread in {self._buffer!r} at {self._cursor}') self._cursor += 1 return self._buffer[self._cursor - 1] def read_boolean(self) -> bool: return self.read() != 0 def read_short(self) -> int: return struct.unpack('>h', self.read_bytes(2))[0] def read_ushort(self) -> int: return struct.unpack('>H', self.read_bytes(2))[0] def read_int(self) -> int: return struct.unpack('>i', self.read_bytes(4))[0] def read_uint(self) -> int: return struct.unpack('>I', self.read_bytes(4))[0] def read_long(self) -> int: return struct.unpack('>q', self.read_bytes(8))[0] def read_ulong(self) -> int: return struct.unpack('>Q', self.read_bytes(8))[0] def read_float(self) -> float: return struct.unpack('>f', self.read_bytes(4))[0] def read_double(self) -> float: return struct.unpack('>d', self.read_bytes(8))[0] def read_char(self) -> str: return chr(self.read_ushort()) def read_varint(self, bits: int = 32) -> int: value: int = 0 position: int = 0 while True: byte = self.read() value |= (byte & 0x7F) << position if ((byte & 0x80) == 0): break position += 7 if position >= bits: raise ValueError('varint too big') return value def read_string(self) -> str: size = self.read_short() return self.read_bytes(size).decode('utf-8')