import asyncio from typing import Optional from PIL import Image from base64 import b64decode from aiohttp import ClientSession from aiohttp_socks import ProxyConnector from socketio import AsyncSimpleClient from json import load from itertools import cycle from random import random class Manager: def __init__(self, base: str = "https://onemillioncheckboxes.com"): self.base = base self.state = Image.new("1", (1000, 1000)) self.canvas = Image.new("LA", (1000, 1000), 0) self._queue: asyncio.Queue[int] = asyncio.Queue(131072) self._mask: set[int] = set() self._dirty = False self._shutdown = False self._last_update = 0 async def calculate_changes(self): for oy in [0, 4, 2, 6, 1, 3, 5, 7]: for y in range(oy, self.canvas.height, 8): for x in range(self.canvas.width): l, a = self.canvas.getpixel((x, y)) # type: ignore c = self.state.getpixel((x, y)) if a > 128: self._mask.add(x + y * 1000) if c != l and not self._queue.full(): await self._queue.put(x + y * 1000) async def update_state(self, data: str): buffer = b64decode(data.encode()) with Image.frombytes("1", (1000, 1000), buffer) as image: self.state.paste(image) async def updater(self): while not self._shutdown: if self._dirty: self._dirty = False await self.calculate_changes() await asyncio.sleep(1) async def reader(self): async with ClientSession() as http: async with http.get(f"{self.base}/api/initial-state") as req: data = await req.json() await self.update_state(data["full_state"] + "=") self._last_update = data["timestamp"] self._dirty = True async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") while not self._shutdown: try: async with asyncio.timeout(10): event, data = await sio.receive() except TimeoutError: print("Timeout getting an event") continue if event == "full_state": await self.update_state(data["full_state"] + "=") self._last_update = data["timestamp"] self._dirty = True print("full state") elif event == "batched_bit_toggles": bits_on, bits_off, timestamp = data if timestamp < self._last_update: print("updates too old, skipping") continue print("toggled", len(bits_on), len(bits_off)) for bit in bits_on: y, x = divmod(bit, 1000) self.state.putpixel((x, y), 255) if bit in self._mask and not self._queue.full(): await self._queue.put(bit) for bit in bits_off: y, x = divmod(bit, 1000) self.state.putpixel((x, y), 0) if bit in self._mask and not self._queue.full(): await self._queue.put(bit) else: print("unknown event", event, data) async def writer(self, proxy: Optional[str] = None): connector = ProxyConnector.from_url(proxy) if proxy else None async with ClientSession(connector=connector) as http: async with AsyncSimpleClient(http_session=http) as sio: await sio.connect(f"{self.base}/socket.io") await asyncio.sleep(2 + random() * 2) while not self._shutdown and not self._queue.empty(): try: async with asyncio.timeout(1): index = await self._queue.get() except TimeoutError: print("Timed out getting an item") continue try: async with asyncio.timeout(1): y, x = divmod(index, 1000) current = self.state.getpixel((x, y)) expected: tuple[int, int] = self.canvas.getpixel((x, y)) # type: ignore except TimeoutError: print("Timed out verifying state") continue if current != expected[0] and expected[1]: try: async with asyncio.timeout(2): await sio.emit("toggle_bit", {"index": index}) await asyncio.sleep(0.25 + random() * 0.125) except TimeoutError: print("Timed out setting, retrying later") if not self._queue.full(): await self._queue.put(index) print("Writer exited") async def main(): with open("proxies.json", "r") as fp: proxies = load(fp)["proxies"] mgr = Manager() with Image.open("./funnies/autism.png").convert("L") as im: mgr.canvas.paste(im, ((1000 - im.width) // 2, (1000 - im.height) // 2)) reader_task = asyncio.create_task(mgr.reader(), name="Reader") updater_task = asyncio.create_task(mgr.updater(), name="Updater") tasks: list[asyncio.Task] = [] for i, proxy in zip(range(len(proxies) * 6), cycle(proxies)): tasks.append(asyncio.create_task(mgr.writer(proxy), name=f"Writer {i}")) pending = tasks while True: done, pending = await asyncio.wait(pending) if not pending: break for task in done: print("DONE", task) await reader_task await updater_task if __name__ == "__main__": asyncio.run(main())