import asyncio from typing import NewType, Optional from socketio import AsyncClient, AsyncSimpleClient from aiohttp import ClientSession from aiohttp_socks import ProxyConnector from PIL import Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops from base64 import b64decode from random import choice, randint from json import load from time import time as time_now PixelMap = NewType("PixelMap", dict[int, bool]) Animation = NewType("Animation", tuple[list[PixelMap], float]) Font = ImageFont.FreeTypeFont | ImageFont.ImageFont class AsyncBotManager: def __init__(self, base: str = ""): self.base = base self.canvas ="1", (1000, 1000)) self.fonts: dict[tuple[str, int], Font] = { ("default", 8): ImageFont.load_default(8) } self.difference: PixelMap = PixelMap({}) self._last_update = 0 self._shutdown: bool = False self.avoid: set[int] = set() self.animations: list[Animation] = [] self._written_boxes = 0 self._read_boxes = 0 self._last_printout = time_now() self._active: set[int] = set() @staticmethod def get_text_image(text: str, font: ImageFont.ImageFont | ImageFont.FreeTypeFont) -> Image.Image: left, top, right, bottom = font.getbbox(text) with"LA", (int(right - left) + 4, int(bottom - top) + 4), 0) as im: draw = ImageDraw.Draw(im) draw.rectangle((0, 0, im.width, im.height), (0, 0)) draw.text((left + 2, top + 2), text, font=font, fill=(255, 0)) alpha = im.convert("L").filter(ImageFilter.MaxFilter(5)) im.putalpha(alpha) return im.copy() def get_font(self, font_name: str, size: int = 8): if (font := self.fonts.get((font_name, size))): return font if font_name == "default": font = ImageFont.load_default(size) else: font = ImageFont.truetype(font_name, size) self.fonts[font_name, size] = font return font def put_text(self, x: int, y: int, text: str, font: str, size: int = 8): self.put_image(x, y, self.get_text_image(text, self.get_font(font, size))) def put_image(self, ox: int, oy: int, im: Image.Image): for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if a: self.put_bit(index, l > 0) def put_pixel(self, x: int, y: int, val: bool): self.put_bit(x + y * 1000, val) def put_bit(self, index: int, value: bool): if index not in self.avoid: self.difference[index] = value def add_animation( self, ox: int, oy: int, frames: list[Image.Image], spf: float = 1 ) -> None: animation = Animation(([], spf)) alpha ="L", frames[0].size, 0) for frame in frames: frame_la = frame.convert("LA") frame_alpha = frame_la.getchannel("A") alpha = ImageChops.add(alpha, frame_alpha) for frame in frames: frame_la = frame.convert("LA") pixelmap = PixelMap({}) for y in range(frame_la.height): for x in range(frame_la.width): l, a = frame_la.getpixel((x, y)) # type: ignore ga: int = alpha.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if (a > 128 or ga > 128) and index not in self.avoid: pixelmap[index] = l > 128 if a > 128 else True animation[0].append(pixelmap) self.animations.append(animation) def add_avoid_rect(self, sx: int, sy: int, w: int, h: int) -> None: for y in range(sy, sy + h): ox = y * 1000 self.add_avoid_range(sx + ox, sx + w + ox) def add_avoid_range(self, start: int, stop: int, step: int = 1) -> None: self.avoid |= set(range(start, stop, step)) def add_avoid_index(self, *indices: int) -> None: self.avoid |= set(indices) def get_difference_image(self) -> Image.Image: with"LA", (1000, 1000), 0) as im: for index, expected in self.difference.items(): y, x = divmod(index, 1000) im.putpixel((x, y), (255 if expected else 0, 255)) return im.copy() def get_avoid_image(self) -> Image.Image: with"RGB", (1000, 1000), (0, 255, 0)) as im: for index in self.avoid: y, x = divmod(index, 1000) im.putpixel((x, y), (255, 0, 0)) return im.copy() async def listener(self) -> None: try: async with ClientSession() as http: async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() buffer = b64decode(data["full_state"].encode() + b"=") self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer)) self._last_update = data["timestamp"] async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/") while not self._shutdown: event, data = await sio.receive() if event == "full_state": buffer = b64decode(data["full_state"].encode() + b"=") image = Image.frombytes("1", (1000, 1000), buffer) self.canvas.paste(image) image.close() self._last_update = data["timestamp"] elif event == "batched_bit_toggles": bits_on, bits_off, timestamp = data if timestamp < self._last_update: print("SKIPPING UPDATES: TOO OLD") else: self._last_update = timestamp self._read_boxes += len(bits_on) + len(bits_off) for ndx in bits_on: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 255) for ndx in bits_off: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 0) else: print("unknown event", event, data) now = time_now() if (now - self._last_printout) > 10: outgoing = self._written_boxes / (now - self._last_printout) incoming = self._read_boxes / (now - self._last_printout) print() print(f"Incoming: {incoming:7.2f}/s Outgoing: {outgoing:7.2f}/s") print(f"Alive workers: {len(self._active)}") n_correct, n_wrong = 0, 0 for index, expected in self.difference.items(): y, x = divmod(index, 1000) actual = self.canvas.getpixel((x, y)) > 0 # type: ignore if expected != actual: n_wrong += 1 else: n_correct += 1 print(f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}") self._written_boxes = 0 self._read_boxes = 0 self._active.clear() self._last_printout = now for pixmaps, spf in self.animations: frame_index = int(now / spf) self.difference.update( pixmaps[frame_index % len(pixmaps)] ) except Exception as e: print(f"Listener died: {e!r}") self._shutdown = True raise async def writer( self, bot_index: int, proxy_url: Optional[str] = None, delay: float = 0.25, ): proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None async with ClientSession(connector=proxy) as http: async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/") offset = randint(0, 1000000) while not self._shutdown: diff = list(self.difference.items()) for _ in range(100): index, expected = diff[offset % len(diff)] offset += randint(0, 100) y, x = divmod(index, 1000) current = self.canvas.getpixel((x, y)) > 0 # type: ignore if current != expected: # print(f"[{bot_index:2d}] swap {x:3d} {y:3d}") self._written_boxes += 1 await sio.emit("toggle_bit", {"index": index}) await asyncio.sleep(delay) break self._active.add(bot_index) await asyncio.sleep(0.01) async def __aenter__(self): self._listener_task = asyncio.create_task(self.listener()) return self async def __aexit__(self, a, b, c): self._shutdown = True await self._listener_task async def amain() -> None: with open("settings.json", "r") as fp: settings = load(fp) async with AsyncBotManager() as mgr: for avoid in settings["avoid"]: if avoid["type"] == "rect": mgr.add_avoid_rect( avoid["x"], avoid["y"], avoid["w"], avoid["h"] ) print("AVOID rectangle {w}x{h}+{x}+{y}".format(**avoid)) elif avoid["type"] == "range": mgr.add_avoid_range( avoid["start"], avoid["stop"], avoid.get("step", 1) ) print("AVOID range {start}-{stop}".format(**avoid)) elif avoid["type"] == "image": with["path"]).convert("LA") as im: assert im.width == 1000 and im.height == 1000 for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore if a > 128: mgr.add_avoid_index(x + y * 1000) print("AVOID image", avoid["path"]) for elem in settings["elements"]: if elem["type"] == "text": mgr.put_text(elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", 8)) print("ADD text", elem["x"], elem["y"], elem["text"]) elif elem["type"] == "text_anim": frames: list[Image.Image] = [] for text in elem["lines"]: frames.append(mgr.get_text_image(text, mgr.get_font(elem.get("font", "default"), elem.get("size", 8)))) mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"]) for frame in frames: frame.close() elif elem["type"] == "image": with["path"]).convert("LA") as im: mgr.put_image(elem["x"], elem["y"], im) elif elem["type"] == "animation": with["path"]) as anim: frames: list[Image.Image] = [] for frame in ImageSequence.Iterator(anim): frames.append(frame.copy()) mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"]) for frame in frames: frame.close() elif elem["type"] == "rgb111": ox, oy = elem["x"], elem["y"] with["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue start_ndx = (x + ox + (y + oy) * 577) * 3 mgr.put_bit(start_ndx, r > 128) mgr.put_bit(start_ndx + 1, g > 128) mgr.put_bit(start_ndx + 2, b > 128) elif elem["type"] == "rgb565": ox, oy = elem["x"], elem["y"] with["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue offset: int = (x + ox + (y + oy) * 250) * 16 # pack rgb888 to rgb565 rgb: int = (r >> 3) << 11 rgb |= (g >> 2) << 5 rgb |= (b >> 3) # write to a bitstream for i in range(16): mgr.put_bit( offset + i, ((rgb << i) & 0x8000) > 0 ) elif elem["type"] == "rect": for y in range(elem["y"], elem["y"] + elem["h"]): for x in range(elem["x"], elem["x"] + elem["w"]): mgr.put_pixel(x, y, elem["fill"]) elif elem["type"] == "blob": with open(elem["path"], "rb") as fp: offset = elem["offset"] length = elem.get("length", 1000000) written = 0 while (char := byte = char[0] if written > length: break for i in range(8): if written > length: break mgr.put_bit(offset, bool((byte >> (7 - i)) & 1)) written += 1 offset += 1 mgr.get_difference_image().save("result.png") mgr.get_avoid_image().save("avoid.png") print("Starting writers...") if n_proxies := len(settings["proxies"]): res = await asyncio.gather( *[ mgr.writer( i, settings["proxies"][i % n_proxies], settings["delay"], ) for i in range(settings["n_bots"]) ], return_exceptions=True, ) else: res = await asyncio.gather( *[mgr.writer(i) for i in range(settings["n_bots"])], return_exceptions=True, ) for ret in res: print(ret) if __name__ == "__main__":