import asyncio from typing import Optional from socketio import AsyncSimpleClient from aiohttp import ClientSession from aiohttp_socks import ProxyConnector from PIL import Image, ImageFont, ImageDraw, ImageFilter from base64 import b64decode from random import choice from json import load class AsyncBotManager: def __init__(self, base: str = "https://onemillioncheckboxes.com"): self.base = base self.canvas = Image.new("1", (1000, 1000)) self.font = ImageFont.load_default(8) self.difference: dict[int, bool] = {} self._last_update = 0 self._shutdown: bool = False def put_text(self, x: int, y: int, text: str): with Image.new("LA", (int(self.font.getlength(text) + 12), 16)) as im: draw = ImageDraw.Draw(im) draw.rectangle((0, 0, im.width, im.height), (0, 0)) draw.text((6, 5), text, font=self.font, fill=(255, 0)) alpha = im.convert("L").filter(ImageFilter.MaxFilter(3)) im.putalpha(alpha) self.put_image(x, y, im) def put_image(self, ox: int, oy: int, im: Image.Image): for y in range(im.height): for x in range(im.width): l, a = im.getpixel((x, y)) # type: ignore if a: self.difference[x + ox + (y + oy) * 1000] = l > 0 async def listener(self): async with ClientSession() as http: async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() buffer = b64decode(data["full_state"].encode() + b"=") self.canvas.paste(Image.frombytes("1", (1000, 1000), buffer)) self._last_update: int = data["timestamp"] async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") while not self._shutdown: event, data = await sio.receive() if event == "full_state": buffer = b64decode(data["full_state"].encode() + b"=") image = Image.frombytes("1", (1000, 1000), buffer) self.canvas.paste(image) image.close() self._last_update: int = data["timestamp"] elif event == "batched_bit_toggles": bits_on, bits_off, timestamp = data if timestamp < self._last_update: print("SKIPPING UPDATES: TOO OLD") for ndx in bits_on: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 255) for ndx in bits_off: y, x = divmod(ndx, 1000) self.canvas.putpixel((x, y), 0) else: print("unknown event", event, data) async def writer(self, bot_index: int, proxy_url: Optional[str] = None): proxy = ProxyConnector.from_url(proxy_url) if proxy_url else None async with ClientSession(connector=proxy) as http: async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") while not self._shutdown: try: event, data = await sio.receive(0.1) if event not in ("full_state", "batched_bit_toggles"): print(bot_index, "!!!", event, data) except Exception: pass index, target = choice(list(self.difference.items())) y, x = divmod(index, 1000) curr = self.canvas.getpixel((x, y)) > 0 # type: ignore if curr != target: print(bot_index, "swap", x, y) await sio.emit("toggle_bit", { "index": index }) await asyncio.sleep(0.25) async def __aenter__(self): self._listener_task = asyncio.create_task(self.listener()) return self async def __aexit__(self, a, b, c): self._shutdown = True await self._listener_task async def amain(): with open("settings.json", "r") as fp: settings = load(fp) async with AsyncBotManager() as mgr: mgr.font = ImageFont.truetype(settings["font"], 8) for elem in settings["elements"]: if elem["type"] == "text": mgr.put_text(elem["text"], elem["x"], elem["y"]) elif elem["type"] == "image": with Image.open(elem["path"]).convert("LA") as im: mgr.put_image(elem["x"], elem["y"], im) elif elem["type"] == "rgb111": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGB") as im: for y in range(im.height): for x in range(im.width): r, g, b = im.getpixel((x, y)) # type: ignore pocket_x = x + ox pocket_y = y + oy ndx_start = pocket_x * 3 + pocket_y * 577 * 3 mgr.difference[ndx_start] = r > 128 mgr.difference[ndx_start + 1] = g > 128 mgr.difference[ndx_start + 2] = b > 128 elif elem["type"] == "rgb565": ox, oy = elem["x"], elem["y"] with Image.open(elem["path"]).convert("RGB") as im: for y in range(im.height): for x in range(im.width): r, g, b = im.getpixel((x, y)) # type: ignore ndx_start: int = (x + ox + (y + oy) * 250) * 16 color = (r >> 3) << 13 color |= (g >> 2) << 5 color |= (b >> 3) for i in range(16): mgr.difference[ndx_start + i] = ((color >> i) & 1) > 0 if (n_proxies := len(settings["proxies"])): await asyncio.gather(*[ mgr.writer(i, settings["proxies"][i % n_proxies]) for i in range(settings["n_bots"]) ], return_exceptions=True) else: await asyncio.gather(*[ mgr.writer(i) for i in range(settings["n_bots"]) ], return_exceptions=True) if __name__ == "__main__": asyncio.run(amain())