import asyncio from typing import Callable, NewType, Optional from socketio import AsyncClient, AsyncSimpleClient import socketio from aiohttp import ClientSession, ClientTimeout from aiohttp_socks import ProxyConnector from PIL import Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops from base64 import b64decode from random import choice, randint from json import load from time import time as time_now import datetime from contextlib import suppress PixelMap = NewType("PixelMap", dict[int, bool]) Animation = NewType("Animation", tuple[list[PixelMap], float]) Font = ImageFont.FreeTypeFont | ImageFont.ImageFont TIMEOUT = ClientTimeout(120) class AsyncBotManager: def __init__(self, base: str = "https://onemillioncheckboxes.com"): self.base = base self.canvas = Image.new("1", (1000, 1000)) self.fonts: dict[tuple[str, int], Font] = { ("default", 8): ImageFont.load_default(8) } self.difference: PixelMap = PixelMap({}) self._last_update = 0 self._shutdown: bool = False self.avoid: set[int] = set() self.animations: list[Animation] = [] self.animation_functions: list[Callable[[], PixelMap]] = [] self._written_boxes = 0 self._read_boxes = 0 self._last_printout = time_now() self._n_printouts = 0 self._active: set[int] = set() self.reader_event = asyncio.Event() self.ready_event = asyncio.Event() @staticmethod def get_text_image(text: str, font: ImageFont.ImageFont | ImageFont.FreeTypeFont) -> Image.Image: left, top, right, bottom = font.getbbox(text) with Image.new("LA", (int(right - left) + 4, int(bottom - top) + 16), 0) as im: draw = ImageDraw.Draw(im) draw.rectangle((0, 0, im.width, im.height), (0, 0)) draw.text((left + 2, top + 2), text, font=font, fill=(255, 0), anchor="lt") alpha = im.convert("L").filter(ImageFilter.MaxFilter(5)) im.putalpha(alpha) return im.copy() def get_font(self, font_name: str, size: int = 8): if (font := self.fonts.get((font_name, size))): return font if font_name == "default": font = ImageFont.load_default(size) else: font = ImageFont.truetype(font_name, size) self.fonts[font_name, size] = font return font def put_text(self, x: int, y: int, text: str, font: str, size: int = 8): self.put_image(x, y, self.get_text_image(text, self.get_font(font, size))) def get_image_diff(self, ox: int, oy: int, im: Image.Image) -> PixelMap: pixmap = PixelMap({}) for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if a and index not in self.avoid: pixmap[index] = l > 0 return pixmap def put_image(self, ox: int, oy: int, im: Image.Image): for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if a: self.put_bit(index, l > 0) def put_pixel(self, x: int, y: int, val: bool): if x > 1000 or y > 1000: return self.put_bit(x + y * 1000, val) def put_bit(self, index: int, value: bool): if index not in self.avoid and index < 1000000: self.difference[index] = value def add_animation( self, ox: int, oy: int, frames: list[Image.Image], spf: float = 1 ) -> None: animation = Animation(([], spf)) alpha = Image.new("L", frames[0].size, 0) for frame in frames: frame_la = frame.convert("LA") frame_alpha = frame_la.getchannel("A") alpha = ImageChops.add(alpha, frame_alpha) for frame in frames: frame_la = frame.convert("LA") pixelmap = PixelMap({}) for y in range(frame_la.height): for x in range(frame_la.width): l, a = frame_la.getpixel((x, y)) # type: ignore ga: int = alpha.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if (a > 128 or ga > 128) and index not in self.avoid: pixelmap[index] = l > 128 if a > 128 else True animation[0].append(pixelmap) self.animations.append(animation) def add_avoid_rect(self, sx: int, sy: int, w: int, h: int) -> None: for y in range(sy, sy + h): ox = y * 1000 self.add_avoid_range(sx + ox, sx + w + ox) def add_avoid_range(self, start: int, stop: int, step: int = 1) -> None: self.avoid |= set(range(start, stop, step)) def add_avoid_index(self, *indices: int) -> None: self.avoid |= set(indices) def get_difference_image(self) -> Image.Image: with Image.new("LA", (1000, 1000), 0) as im: for index, expected in self.difference.items(): y, x = divmod(index, 1000) im.putpixel((x, y), (255 if expected else 0, 255)) return im.copy() def get_avoid_image(self) -> Image.Image: with Image.new("RGB", (1000, 1000), (0, 255, 0)) as im: for index in self.avoid: y, x = divmod(index, 1000) im.putpixel((x, y), (255, 0, 0)) return im.copy() async def listener(self) -> None: try: async with ClientSession(timeout=TIMEOUT) as http: print("Getting initial state") async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() buffer = b64decode(data["full_state"].encode() + b"=") self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer)) self._last_update = data["timestamp"] print("Initial state received") async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") while not self._shutdown: try: async with asyncio.timeout(20): event, data = await sio.receive() except TimeoutError: print("Reading failed") if not sio.connected: print("Reconnecting") await sio.connect(f"{self.base}/socket.io") continue self.reader_event.set() self.ready_event.set() if event == "full_state": buffer = b64decode(data["full_state"].encode() + b"=") image = Image.frombytes("1", (1000, 1000), buffer) self.canvas.paste(image) image.close() self._last_update = data["timestamp"] elif event == "batched_bit_toggles": bits_on, bits_off, timestamp = data if timestamp < self._last_update: print("SKIPPING UPDATES: TOO OLD") else: self._last_update = timestamp self._read_boxes += len(bits_on) + len(bits_off) for ndx in bits_on: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 255) for ndx in bits_off: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 0) else: print("unknown event", event, data) now = time_now() if (now - self._last_printout) > 5: outgoing = self._written_boxes / (now - self._last_printout) incoming = self._read_boxes / (now - self._last_printout) print() print(f"I/O: {incoming:7.2f}/s | {outgoing:7.2f}/s") print(f"Alive workers: {len(self._active)}") if len(self._active) < 2 and self._n_printouts > 3: print("Too few workers, dying!") self._shutdown = True return self._n_printouts += 1 n_correct, n_wrong = 0, 0 for index, expected in self.difference.items(): y, x = divmod(index, 1000) actual = self.canvas.getpixel((x, y)) > 0 # type: ignore if expected != actual: n_wrong += 1 else: n_correct += 1 print(f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}") self._written_boxes = 0 self._read_boxes = 0 self._active.clear() self._last_printout = now for pixmaps, spf in self.animations: frame_index = int(now / spf) self.difference.update( pixmaps[frame_index % len(pixmaps)] ) for func in self.animation_functions: self.difference.update(func()) except Exception as e: print(f"Listener died: {e!r}") self._shutdown = True raise async def writer( self, bot_index: int, proxy_url: Optional[str] = None, delay: float = 0.25, ): proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None try: async with ClientSession(connector=proxy, timeout=TIMEOUT) as http: async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") print(f"Writer {bot_index} connected, waiting...") await self.ready_event.wait() print(f"Writer {bot_index} running") offset = 0 while not self._shutdown: diff = list(self.difference.items()) diff = sorted(diff, key=lambda kv: kv[0]) for _ in range(100): index, expected = diff[offset % len(diff)] offset += randint(0, 1000) y, x = divmod(index, 1000) current = self.canvas.getpixel((x, y)) > 0 # type: ignore if current != expected: self._written_boxes += 1 await sio.emit("toggle_bit", {"index": index}) await asyncio.sleep(delay) self._active.add(bot_index) break with suppress(BaseException): await sio.receive(0.1) print(f"Worker {bot_index} stopped: shutdown") except Exception as e: print(f"Worker {bot_index} died: {e!r}") async def __aenter__(self): self._listener_task = asyncio.create_task(self.listener()) return self async def __aexit__(self, a, b, c): self._shutdown = True await self._listener_task async def amain(conf_path: str = "settings.json", *_) -> None: with open(conf_path, "r") as fp: settings = load(fp) async with AsyncBotManager() as mgr: for avoid in settings["avoid"]: if avoid["type"] == "rect": mgr.add_avoid_rect( avoid["x"], avoid["y"], avoid["w"], avoid["h"] ) print("AVOID rectangle {w}x{h}+{x}+{y}".format(**avoid)) elif avoid["type"] == "range": mgr.add_avoid_range( avoid["start"], avoid["stop"], avoid.get("step", 1) ) print("AVOID range {start}-{stop}".format(**avoid)) elif avoid["type"] == "image": with Image.open(avoid["path"]).convert("LA") as im: assert im.width == 1000 and im.height == 1000 for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore if a > 128: mgr.add_avoid_index(x + y * 1000) print("AVOID image", avoid["path"]) for elem in settings["elements"]: if elem["type"] == "text": mgr.put_text(elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", 8)) print("ADD text", elem) elif elem["type"] == "text_anim": frames: list[Image.Image] = [] for text in elem["lines"]: frames.append(mgr.get_text_image(text, mgr.get_font(elem.get("font", "default"), elem.get("size", 8)))) mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"]) for frame in frames: frame.close() print("ADD text animation", elem) elif elem["type"] == "image": with Image.open(elem["path"]).convert("LA") as im: mgr.put_image(elem["x"], elem["y"], im) print("ADD image", elem) elif elem["type"] == "time": time_format = elem["format"] pos_x, pos_y = elem["x"], elem["y"] font = mgr.get_font(elem.get("font", "default"), elem.get("size", 8)) def update() -> PixelMap: now = datetime.datetime.now(datetime.timezone.utc) txt = now.strftime(time_format) img = mgr.get_text_image(txt, font) pixmap = mgr.get_image_diff(pos_x, pos_y, img) img.close() return pixmap mgr.animation_functions.append(update) elif elem["type"] == "tile": with Image.open(elem["path"]).convert("LA") as im: for oy in range(elem.get("h", im.height)): for ox in range(elem.get("w", im.width)): l, a = im.getpixel((ox % im.width, oy % im.height)) # type: ignore if a: x, y = elem["x"] + ox, elem["y"] + oy index = x + y * 1000 mgr.put_bit(index, l > 0) print("ADD tile", elem) elif elem["type"] == "animation": with Image.open(elem["path"]) as anim: frames: list[Image.Image] = [] for frame in ImageSequence.Iterator(anim): frames.append(frame.copy()) mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"]) for frame in frames: frame.close() print("ADD animation", elem) elif elem["type"] == "rgb111": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue start_ndx = (x + ox + (y + oy) * 577) * 3 mgr.put_bit(start_ndx, r > 128) mgr.put_bit(start_ndx + 1, g > 128) mgr.put_bit(start_ndx + 2, b > 128) elif elem["type"] == "rgb565": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue offset: int = (x + ox + (y + oy) * 250) * 16 # pack rgb888 to rgb565 rgb: int = (r >> 3) << 11 rgb |= (g >> 2) << 5 rgb |= (b >> 3) # write to a bitstream for i in range(16): mgr.put_bit( offset + i, ((rgb << i) & 0x8000) > 0 ) elif elem["type"] == "rect": for y in range(elem["y"], elem["y"] + elem["h"]): for x in range(elem["x"], elem["x"] + elem["w"]): mgr.put_pixel(x, y, elem["fill"]) elif elem["type"] == "blob": with open(elem["path"], "rb") as fp: offset = elem["offset"] length = elem.get("length", 1000000) written = 0 while (char := fp.read(1)): byte = char[0] if written > length: break for i in range(8): if written > length: break mgr.put_bit(offset, bool((byte >> (7 - i)) & 1)) written += 1 offset += 1 mgr.get_difference_image().save("result.png") mgr.get_avoid_image().save("avoid.png") print("Starting writers...") if n_proxies := len(settings["proxies"]): res = await asyncio.gather( *[ mgr.writer( i, settings["proxies"][i % n_proxies], settings["delay"], ) for i in range(settings["n_bots"]) ], ) else: res = await asyncio.gather( *[mgr.writer(i, None, settings["delay"]) for i in range(settings["n_bots"])], ) for ret in res: print("RETURN", repr(ret)) print("Shutting down...") mgr._shutdown = True if __name__ == "__main__": from sys import argv asyncio.run(amain(*argv[1:]), debug=True)