import asyncio from typing import Optional from socketio import AsyncSimpleClient from aiohttp import ClientSession from aiohttp_socks import ProxyConnector from PIL import Image, ImageFont, ImageDraw, ImageFilter from base64 import b64decode from random import choice from json import load class AsyncBotManager: def __init__(self, base: str = "https://onemillioncheckboxes.com"): self.base = base self.canvas = Image.new("1", (1000, 1000)) self.font = ImageFont.load_default(8) self.difference: dict[int, bool] = {} self._last_update = 0 self._shutdown: bool = False self.avoid: set[int] = set() def put_text(self, x: int, y: int, text: str): with Image.new("LA", (int(self.font.getlength(text) + 12), 16)) as im: draw = ImageDraw.Draw(im) draw.rectangle((0, 0, im.width, im.height), (0, 0)) draw.text((6, 5), text, font=self.font, fill=(255, 0)) alpha = im.convert("L").filter(ImageFilter.MaxFilter(3)) im.putalpha(alpha) self.put_image(x, y, im) def put_image(self, ox: int, oy: int, im: Image.Image): for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore index = x + ox + (y + oy) * 1000 if a: self.put_index(index, l > 0) def put_pixel(self, x: int, y: int, val: bool): self.put_index(x + y * 1000, val) def put_index(self, index: int, value: bool): if not index in self.avoid: self.difference[index] = value def add_avoid_rect(self, sx: int, sy: int, w: int, h: int): for y in range(sy, sy + h): ox = y * 1000 self.add_avoid_range(sx + ox, sx + w + ox) def add_avoid_range(self, start: int, stop: int, step: int = 1): self.avoid |= set(range(start, stop, step)) def add_avoid_index(self, *indices: int): self.avoid |= set(indices) def get_difference_image(self) -> Image.Image: with Image.new("LA", (1000, 1000), 0) as im: for index, expected in self.difference.items(): y, x = divmod(index, 1000) im.putpixel((x, y), (255 if expected else 0, 255)) return im.copy() def get_avoid_image(self) -> Image.Image: with Image.new("RGB", (1000, 1000), (0, 255, 0)) as im: for index in self.avoid: y, x = divmod(index, 1000) im.putpixel((x, y), (255, 0, 0)) return im.copy() async def listener(self): async with ClientSession() as http: async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() buffer = b64decode(data["full_state"].encode() + b"=") self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer)) self._last_update: int = data["timestamp"] async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") while not self._shutdown: event, data = await sio.receive() if event == "full_state": buffer = b64decode(data["full_state"].encode() + b"=") image = Image.frombytes("1", (1000, 1000), buffer) self.canvas.paste(image) image.close() self._last_update: int = data["timestamp"] elif event == "batched_bit_toggles": bits_on, bits_off, timestamp = data if timestamp < self._last_update: print("SKIPPING UPDATES: TOO OLD") self._last_update: int = timestamp for ndx in bits_on: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 255) for ndx in bits_off: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 0) else: print("unknown event", event, data) async def writer( self, bot_index: int, proxy_url: Optional[str] = None, delay: float = 0.25, ): proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None async with ClientSession(connector=proxy) as http: async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") while not self._shutdown: diff = list(self.difference.items()) for _ in range(100): index, expected = choice(diff) y, x = divmod(index, 1000) current = self.canvas.getpixel((x, y)) > 0 # type: ignore if current != expected: print(f"[{bot_index:2d}] swap {x:3d} {y:3d}") await sio.emit("toggle_bit", {"index": index}) await asyncio.sleep(delay) break await asyncio.sleep(0.1) async def __aenter__(self): self._listener_task = asyncio.create_task(self.listener()) return self async def __aexit__(self, a, b, c): self._shutdown = True await self._listener_task def copy(self): copy = self.__class__(self.base) copy.difference = dict.copy(self.difference) return copy async def amain(): with open("settings.json", "r") as fp: settings = load(fp) async with AsyncBotManager() as mgr: mgr.font = ImageFont.truetype(settings["font"], 8) for avoid in settings["avoid"]: if avoid["type"] == "rect": mgr.add_avoid_rect( avoid["x"], avoid["y"], avoid["w"], avoid["h"] ) elif avoid["type"] == "range": mgr.add_avoid_range( avoid["start"], avoid["stop"], avoid.get("step", 1) ) elif avoid["type"] == "image": with Image.open(avoid["path"]).convert("LA") as im: assert im.width == 1000 and im.height == 1000 for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore if a > 128: mgr.add_avoid_index(x + y * 1000) for elem in settings["elements"]: if elem["type"] == "text": mgr.put_text(elem["x"], elem["y"], elem["text"]) elif elem["type"] == "image": with Image.open(elem["path"]).convert("LA") as im: mgr.put_image(elem["x"], elem["y"], im) elif elem["type"] == "rgb111": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue ndx_start = (x + ox + (y + oy) * 577) * 3 mgr.put_index(ndx_start, r > 128) mgr.put_index(ndx_start + 1, g > 128) mgr.put_index(ndx_start + 2, b > 128) elif elem["type"] == "rgb565": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGBA") as im: for y in range(im.height): for x in range(im.width): r, g, b, a = im.getpixel((x, y)) # type: ignore if a < 128: continue ndx_start: int = (x + ox + (y + oy) * 250) * 16 color = (r >> 3) << 13 color |= (g >> 2) << 5 color |= b >> 3 for i in range(16): mgr.put_index( ndx_start + i, ((color >> i) & 1) > 0 ) elif elem["type"] == "rect": for y in range(elem["y"], elem["y"] + elem["h"]): for x in range(elem["x"], elem["x"] + elem["w"]): mgr.put_pixel(x, y, elem["fill"]) mgr.get_difference_image().save("result.png") mgr.get_avoid_image().save("avoid.png") if n_proxies := len(settings["proxies"]): await asyncio.gather( *[ mgr.writer( i, settings["proxies"][i % n_proxies], settings["delay"], ) for i in range(settings["n_bots"]) ], return_exceptions=True, ) else: await asyncio.gather( *[mgr.writer(i) for i in range(settings["n_bots"])], return_exceptions=True, ) if __name__ == "__main__": asyncio.run(amain())