import asyncio from typing import Callable, NewType, Optional from socketio import AsyncClient, AsyncSimpleClient import socketio from aiohttp import ClientSession, ClientTimeout from aiohttp_socks import ProxyConnector from PIL import ( Image, ImageFont, ImageDraw, ImageFilter, ImageSequence, ImageChops, ) from base64 import b64decode from random import choice, randint from json import load from time import time as time_now import datetime from contextlib import suppress PixelMap = NewType("PixelMap", dict[int, bool]) Animation = NewType("Animation", tuple[list[PixelMap], float]) Font = ImageFont.FreeTypeFont | ImageFont.ImageFont TIMEOUT = ClientTimeout(120) class AsyncBotManager: def __init__(self, base: str = "https://onemillioncheckboxes.com"): self.base = base self.canvas = Image.new("1", (1000, 1000)) self.fonts: dict[tuple[str, int], Font] = { ("default", 8): ImageFont.load_default(8) } self.difference: PixelMap = PixelMap({}) self._last_update = 0 self._shutdown: bool = False self.avoid: set[int] = set() self.animations: list[Animation] = [] self.animation_functions: list[Callable[[], PixelMap]] = [] self._written_boxes = 0 self._read_boxes = 0 self._last_printout = time_now() self._n_printouts = 0 self._active: set[int] = set() self.reader_event = asyncio.Event() self.ready_event = asyncio.Event() @staticmethod def get_text_image( text: str, font: ImageFont.ImageFont | ImageFont.FreeTypeFont, negative: bool = False, ) -> Image.Image: left, top, right, bottom = font.getbbox(text) with Image.new( "LA", (int(right - left) + 4, int(bottom - top) + 16), 0 ) as im: draw = ImageDraw.Draw(im) draw.rectangle( (0, 0, im.width, im.height), (0, 0) ) draw.text((left + 2, top + 2), text, font=font, fill=255, anchor="lt") alpha = im.convert("L").filter(ImageFilter.MaxFilter(5)) im.putalpha(alpha) if negative: im = ImageChops.invert(im.convert("RGBA")).convert("LA") return im.copy() def get_font(self, font_name: str, size: int = 8): if font := self.fonts.get((font_name, size)): return font if font_name == "default": font = ImageFont.load_default(size) else: font = ImageFont.truetype(font_name, size) self.fonts[font_name, size] = font return font def put_text( self, x: int, y: int, text: str, font: str, size: int = 8, negative: bool = False, ): self.put_image( x, y, self.get_text_image(text, self.get_font(font, size), negative), ) def get_image_diff(self, ox: int, oy: int, im: Image.Image) -> PixelMap: pixmap = PixelMap({}) for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if a and index not in self.avoid: pixmap[index] = l > 0 return pixmap def put_image(self, ox: int, oy: int, im: Image.Image): for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if a: self.put_bit(index, l > 0) def put_pixel(self, x: int, y: int, val: bool): if x > 1000 or y > 1000: return self.put_bit(x + y * 1000, val) def put_bit(self, index: int, value: bool): if index not in self.avoid and index < 1000000: self.difference[index] = value def add_animation( self, ox: int, oy: int, frames: list[Image.Image], spf: float = 1 ) -> None: animation = Animation(([], spf)) alpha = Image.new("L", frames[0].size, 0) for frame in frames: frame_la = frame.convert("LA") frame_alpha = frame_la.getchannel("A") alpha = ImageChops.add(alpha, frame_alpha) for frame in frames: frame_la = frame.convert("LA") pixelmap = PixelMap({}) for y in range(frame_la.height): for x in range(frame_la.width): l, a = frame_la.getpixel((x, y)) # type: ignore ga: int = alpha.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if (a > 128 or ga > 128) and index not in self.avoid: pixelmap[index] = l > 128 if a > 128 else True animation[0].append(pixelmap) self.animations.append(animation) def add_avoid_rect(self, sx: int, sy: int, w: int, h: int) -> None: for y in range(sy, sy + h): ox = y * 1000 self.add_avoid_range(sx + ox, sx + w + ox) def add_avoid_range(self, start: int, stop: int, step: int = 1) -> None: self.avoid |= set(range(start, stop, step)) def add_avoid_index(self, *indices: int) -> None: self.avoid |= set(indices) def get_difference_image(self) -> Image.Image: with Image.new("LA", (1000, 1000), 0) as im: for index, expected in self.difference.items(): y, x = divmod(index, 1000) im.putpixel((x, y), (255 if expected else 0, 255)) return im.copy() def get_avoid_image(self) -> Image.Image: with Image.new("RGB", (1000, 1000), (0, 255, 0)) as im: for index in self.avoid: y, x = divmod(index, 1000) im.putpixel((x, y), (255, 0, 0)) return im.copy() async def listener(self) -> None: try: async with ClientSession(timeout=TIMEOUT) as http: print("Getting initial state") async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() buffer = b64decode(data["full_state"].encode() + b"=") self.canvas.paste( Image.frombytes("1", (1000, 1000), buffer) ) self._last_update = data["timestamp"] print("Initial state received") async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") while not self._shutdown: try: async with asyncio.timeout(20): event, data = await sio.receive() except TimeoutError: print("Reading failed") if not sio.connected: print("Reconnecting") await sio.connect(f"{self.base}/socket.io") continue self.reader_event.set() self.ready_event.set() if event == "full_state": buffer = b64decode( data["full_state"].encode() + b"=" ) image = Image.frombytes("1", (1000, 1000), buffer) self.canvas.paste(image) image.close() self._last_update = data["timestamp"] elif event == "batched_bit_toggles": bits_on, bits_off, timestamp = data if timestamp < self._last_update: print("SKIPPING UPDATES: TOO OLD") else: self._last_update = timestamp self._read_boxes += len(bits_on) + len( bits_off ) for ndx in bits_on: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 255) for ndx in bits_off: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 0) else: print("unknown event", event, data) now = time_now() if (now - self._last_printout) > 5: outgoing = self._written_boxes / ( now - self._last_printout ) incoming = self._read_boxes / ( now - self._last_printout ) print() print( f"I/O: {incoming:7.2f}/s | {outgoing:7.2f}/s" ) print(f"Alive workers: {len(self._active)}") if len(self._active) < 2 and self._n_printouts > 3: print("Too few workers, dying!") self._shutdown = True return self._n_printouts += 1 n_correct, n_wrong = 0, 0 for index, expected in self.difference.items(): y, x = divmod(index, 1000) actual = self.canvas.getpixel((x, y)) > 0 # type: ignore if expected != actual: n_wrong += 1 else: n_correct += 1 print( f"Invalid: {n_wrong:4d} Valid: {n_correct:4d}" ) self._written_boxes = 0 self._read_boxes = 0 self._active.clear() self._last_printout = now for pixmaps, spf in self.animations: frame_index = int(now / spf) self.difference.update( pixmaps[frame_index % len(pixmaps)] ) for func in self.animation_functions: self.difference.update(func()) except Exception as e: print(f"Listener died: {e!r}") self._shutdown = True raise async def writer( self, bot_index: int, proxy_url: Optional[str] = None, delay: float = 0.25, ): proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None try: async with ClientSession(connector=proxy, timeout=TIMEOUT) as http: async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") print(f"Writer {bot_index} connected, waiting...") await self.ready_event.wait() print(f"Writer {bot_index} running") offset = 0 while not self._shutdown: diff = list(self.difference.items()) diff = sorted(diff, key=lambda kv: kv[0]) for _ in range(100): index, expected = diff[offset % len(diff)] offset += randint(0, 1000) y, x = divmod(index, 1000) current = self.canvas.getpixel((x, y)) > 0 # type: ignore if current != expected: self._written_boxes += 1 await sio.emit("toggle_bit", {"index": index}) await asyncio.sleep(delay) self._active.add(bot_index) break with suppress(BaseException): await sio.receive(0.1) print(f"Worker {bot_index} stopped: shutdown") except Exception as e: print(f"Worker {bot_index} died: {e!r}") async def __aenter__(self): self._listener_task = asyncio.create_task(self.listener()) return self async def __aexit__(self, a, b, c): self._shutdown = True await self._listener_task async def amain(conf_path: str = "settings.json", *_) -> None: with open(conf_path, "r") as fp: settings = load(fp) async with AsyncBotManager() as mgr: for avoid in settings["avoid"]: if avoid["type"] == "rect": mgr.add_avoid_rect( avoid["x"], avoid["y"], avoid["w"], avoid["h"] ) print("AVOID rectangle {w}x{h}+{x}+{y}".format(**avoid)) elif avoid["type"] == "range": mgr.add_avoid_range( avoid["start"], avoid["stop"], avoid.get("step", 1) ) print("AVOID range {start}-{stop}".format(**avoid)) elif avoid["type"] == "image": with Image.open(avoid["path"]).convert("LA") as im: assert im.width == 1000 and im.height == 1000 for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore if a > 128: mgr.add_avoid_index(x + y * 1000) print("AVOID image", avoid["path"]) for elem in settings["elements"]: if elem["type"] == "text": mgr.put_text( elem["x"], elem["y"], elem["text"], elem.get("font", "default"), elem.get("size", 8), elem.get("negative", False), ) print("ADD text", elem) elif elem["type"] == "text_anim": frames: list[Image.Image] = [] for text in elem["lines"]: frames.append( mgr.get_text_image( text, mgr.get_font( elem.get("font", "default"), elem.get("size", 8), ), ) ) mgr.add_animation(elem["x"], elem["y"], frames, elem["spf"]) for frame in frames: frame.close() print("ADD text animation", elem) elif elem["type"] == "image": with Image.open(elem["path"]).convert("LA") as im: mgr.put_image(elem["x"], elem["y"], im) print("ADD image", elem) elif elem["type"] == "time": time_format = elem["format"] pos_x, pos_y = elem["x"], elem["y"] font = mgr.get_font( elem.get("font", "default"), elem.get("size", 8) ) def update() -> PixelMap: now = datetime.datetime.now(datetime.timezone.utc) txt = now.strftime(time_format) img = mgr.get_text_image(txt, font) pixmap = mgr.get_image_diff(pos_x, pos_y, img) img.close() return pixmap mgr.animation_functions.append(update) elif elem["type"] == "tile": with Image.open(elem["path"]).convert("LA") as im: for oy in range(elem.get("h", im.height)): for ox in range(elem.get("w", im.width)): l, a = im.getpixel((ox % im.width, oy % im.height)) # type: ignore if a: x, y = elem["x"] + ox, elem["y"] + oy index = x + y * 1000 mgr.put_bit(index, l > 0) print("ADD tile", elem) elif elem["type"] == "animation": with Image.open(elem["path"]) as anim: frames: list[Image.Image] = [] for frame in ImageSequence.Iterator(anim): frames.append(frame.copy()) mgr.add_animation( elem["x"], elem["y"], frames, elem["spf"] ) for frame in frames: frame.close() print("ADD animation", elem) elif elem["type"] == "rgb111": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue start_ndx = (x + ox + (y + oy) * 577) * 3 mgr.put_bit(start_ndx, r > 128) mgr.put_bit(start_ndx + 1, g > 128) mgr.put_bit(start_ndx + 2, b > 128) elif elem["type"] == "rgb565": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue offset: int = (x + ox + (y + oy) * 250) * 16 # pack rgb888 to rgb565 rgb: int = (r >> 3) << 11 rgb |= (g >> 2) << 5 rgb |= b >> 3 # write to a bitstream for i in range(16): mgr.put_bit( offset + i, ((rgb << i) & 0x8000) > 0 ) elif elem["type"] == "rect": for y in range(elem["y"], elem["y"] + elem["h"]): for x in range(elem["x"], elem["x"] + elem["w"]): mgr.put_pixel(x, y, elem["fill"]) elif elem["type"] == "blob": with open(elem["path"], "rb") as fp: offset = elem["offset"] length = elem.get("length", 1000000) written = 0 while char := fp.read(1): byte = char[0] if written > length: break for i in range(8): if written > length: break mgr.put_bit(offset, bool((byte >> (7 - i)) & 1)) written += 1 offset += 1 mgr.get_difference_image().save("result.png") mgr.get_avoid_image().save("avoid.png") print("Starting writers...") if n_proxies := len(settings["proxies"]): res = await asyncio.gather( *[ mgr.writer( i, settings["proxies"][i % n_proxies], settings["delay"], ) for i in range(settings["n_bots"]) ], ) else: res = await asyncio.gather( *[ mgr.writer(i, None, settings["delay"]) for i in range(settings["n_bots"]) ], ) for ret in res: print("RETURN", repr(ret)) print("Shutting down...") mgr._shutdown = True if __name__ == "__main__": from sys import argv asyncio.run(amain(*argv[1:]), debug=True)