import json from multiprocessing.shared_memory import SharedMemory import asyncio import random from typing import NamedTuple, Optional from aiohttp_socks import ProxyConnector import socketio import aiohttp import time OFFSET_STATE = 0 OFFSET_AVOID = 125000 OFFSET_CANVAS = 250000 OFFSET_MASK = 375000 class PixelState(NamedTuple): state: bool avoid: bool canvas: bool mask: bool class WorkerManager: def __init__(self, shmem_name: str = "omcb-bot"): self.shmem_name = shmem_name self.base = "https://onemillioncheckboxes.com" self.delay = 0.25 self.batch_size = 7 self.queue: asyncio.Queue[int] = asyncio.Queue(64) self.n_toggles = 0 self.workers: set[tuple[int, int]] = set() self._restarts: dict[int, int] = {} self.miss_avoid = 0 self.miss_mask = 0 self.miss_state = 0 async def queue_manager(self): offset = random.randint(0, 1000000) matrixX = [ 0, 2, 2, 0, 1, 3, 3, 1, 1, 3, 3, 1, 0, 2, 2, 0 ] matrixY = [ 0, 2, 0, 2, 1, 3, 1, 3, 0, 2, 0, 2, 1, 3, 1, 3 ] while True: for ox, oy in zip(matrixX, matrixY): for y in range(oy, 1000, 4): for x in range(ox, 1000, 4): index = (x + y * 1000 + offset) % 1000000 byte, bit = index >> 3, index & 7 mask = 0x80 >> bit if self.shmem.buf[OFFSET_AVOID + byte] & mask: continue if (self.shmem.buf[OFFSET_MASK + byte] & mask) == 0: continue if (self.shmem.buf[OFFSET_CANVAS + byte] & mask) != ( self.shmem.buf[OFFSET_STATE + byte] & mask ): await self.queue.put(index) await asyncio.sleep(0.01) async def writer(self, bot_index: int, proxy: Optional[str] = None): connector = ProxyConnector.from_url(proxy) if proxy else None async with aiohttp.ClientSession(connector=connector) as http: sio = socketio.AsyncClient(http_session=http) self._restarts[bot_index] = 0 async def writer_itself(): while not sio.connected: await asyncio.sleep(0.1) self._restarts[bot_index] += 1 cookie = bot_index, self._restarts[bot_index] self.workers.add(cookie) # await sio.emit("unsubscribe") try: batch = 0 while sio.connected: index = await self.queue.get() byte, bit = index >> 3, index & 7 mask = 0x80 >> bit if (self.shmem.buf[OFFSET_CANVAS + byte] & mask) != ( self.shmem.buf[OFFSET_STATE + byte] & mask ): await sio.emit("toggle_bit", {"index": index}) self.shmem.buf[OFFSET_STATE + byte] ^= mask self.n_toggles += 1 self.queue.task_done() batch += 1 if batch >= self.batch_size: await asyncio.sleep(self.delay) batch = 0 else: self.miss_state += 1 finally: self.workers.remove(cookie) sio.on("connect", writer_itself) sio.on("unsubscribed", print) await sio.connect(self.base.replace("http", "ws")) await sio.wait() async def status_display(self): last_printout = time.time() while True: await asyncio.sleep(1) diff = time.time() - last_printout print() print(f"Workers: {len(self.workers)} {self.workers}") print(f"Queue size: {self.queue.qsize()}/{self.queue.maxsize}") print(f"Toggles: {self.n_toggles / diff:.2f}/s EST: {self.batch_size * len(self.workers) / self.delay}") print(f"Misses: A:{self.miss_avoid} M:{self.miss_mask} S:{self.miss_state}") print(f"Q: {self.queue}") self.n_toggles = 0 last_printout = time.time() async def __aenter__(self): self.shmem = SharedMemory(self.shmem_name) return self async def __aexit__(self, a, b, c): pass async def main(config_path: str = "worker.json", *_): with open(config_path, "r") as fp: config = json.load(fp) n_bots = config.get("n_bots", 1) async with WorkerManager(config.get("shmem", "omcb-bot")) as mgr: mgr.delay = config.get("delay", mgr.delay) mgr.batch_size = config.get("batch", mgr.batch_size) workers = [] if proxies := config.get("proxy", []): workers.extend( [ mgr.writer(i, proxies[i % len(proxies)]) for i in range(n_bots) ] ) else: workers.extend([mgr.writer(i) for i in range(n_bots)]) await asyncio.gather( mgr.queue_manager(), mgr.status_display(), *workers ) if __name__ == "__main__": from sys import argv asyncio.run(main(*argv[1:]))