from asyncio.queues import Queue import struct class AsyncDataInputStream: def __init__(self, queue: Queue): self._queue = queue self._last = b'' async def read_bytes(self, n: int) -> bytes: if len(self._last) < n: self._last += await self._queue.get() out, self._last = self._last[:n], self._last[n:] return out async def read(self) -> int: return (await self.read_bytes(1))[0] async def read_boolean(self) -> bool: return (await self.read()) != 0 async def read_short(self) -> int: return struct.unpack('>h', await self.read_bytes(2))[0] async def read_ushort(self) -> int: return struct.unpack('>H', await self.read_bytes(2))[0] async def read_int(self) -> int: return struct.unpack('>i', await self.read_bytes(4))[0] async def read_uint(self) -> int: return struct.unpack('>I', await self.read_bytes(4))[0] async def read_long(self) -> int: return struct.unpack('>q', await self.read_bytes(8))[0] async def read_ulong(self) -> int: return struct.unpack('>Q', await self.read_bytes(8))[0] async def read_float(self) -> float: return struct.unpack('>f', await self.read_bytes(4))[0] async def read_double(self) -> float: return struct.unpack('>d', await self.read_bytes(8))[0] async def read_char(self) -> str: return chr(await self.read_ushort()) async def read_varint(self, bits: int = 32) -> int: value: int = 0 position: int = 0 while True: byte = await self.read() value |= (byte & 0x7F) << position if ((byte & 0x80) == 0): break position += 7 if position >= bits: raise ValueError('varint too big') return value async def read_string(self) -> str: size = await self.read_short() return (await self.read_bytes(size)).decode('utf-8')