From 6cce15b2cab848bf0bd362a1d7d03805a132f038 Mon Sep 17 00:00:00 2001 From: hkc Date: Tue, 9 Jul 2024 11:35:50 +0300 Subject: [PATCH] WIP: multiprocessed bot, added avoidance --- swarm/manager.py | 167 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 swarm/manager.py diff --git a/swarm/manager.py b/swarm/manager.py new file mode 100644 index 0000000..4876f88 --- /dev/null +++ b/swarm/manager.py @@ -0,0 +1,167 @@ +from multiprocessing.shared_memory import SharedMemory +from typing import Optional +import asyncio +import socketio +import aiohttp +from aiohttp_socks import ProxyConnector +from PIL import ( + Image, + ImageFont, + ImageDraw, + ImageFilter, + ImageSequence, + ImageChops, +) +from enum import IntFlag +from base64 import b64decode +import signal +import os +import time + + +class PixelMask(IntFlag): + AVOID = 16 + MASK = 32 + FILL = 64 + CHECKED = 128 + + +class Manager: + def __init__(self): + self.shmem: Optional[SharedMemory] = None + self.shmem_name = "omcb-bot" + self.base = "https://onemillioncheckboxes.com" + self.last_update = 0 + + self.bits_toggled_on = 0 + self.bits_toggled_off = 0 + self.last_printout = 0 + + async def listener(self): + sio = socketio.AsyncClient() + sio.on("connect", self.on_connect) + sio.on("batched_bit_toggles", self.on_batched_bit_toggles) + sio.on("full_state", self.on_full_state) + + await sio.connect(f"{self.base}") + await sio.wait() + + def update_shmem(self, state: bytes): + if not self.shmem: + raise ValueError("shared memory is not initialized yet") + + buffer = bytearray(bytes(1000000)) + for i in range(1000000): + byte, bit = divmod(i, 8) + if state[byte] & (0x80 >> bit): + buffer[i] |= PixelMask.CHECKED + else: + buffer[i] &= ~PixelMask.CHECKED + self.shmem.buf[:] = buffer + + async def on_connect(self): + async with aiohttp.ClientSession() as http: + async with http.get(f"{self.base}/api/initial-state") as req: + data = await req.json() + buffer = b64decode(data["full_state"].encode() + b"=") + self.update_shmem(buffer) + self.last_update = data["timestamp"] + + async def on_full_state(self, data): + if not self.shmem: + raise ValueError("shared memory is not initialized yet") + buffer = b64decode(data["full_state"].encode() + b"=") + self.update_shmem(buffer) + self.last_update = data["timestamp"] + + async def on_batched_bit_toggles(self, data): + if not self.shmem: + raise ValueError("shared memory is not initialized yet") + bits_on, bits_off, timestamp = data + if timestamp < self.last_update: + print("old update, ignoring") + + self.last_update = timestamp + + self.bits_toggled_on += len(bits_on) + self.bits_toggled_off = len(bits_off) + + for bit in bits_on: + self.shmem.buf[bit] |= PixelMask.CHECKED + for bit in bits_off: + self.shmem.buf[bit] &= ~PixelMask.CHECKED + + since_last_printout = time.time() - self.last_printout + if since_last_printout >= 5: + self.last_printout = time.time() + print() + print(f"Toggled on: {self.bits_toggled_on / since_last_printout}/s") + print(f"Toggled off: {self.bits_toggled_off / since_last_printout}/s") + self.bits_toggled_on = self.bits_toggled_off = 0 + + def on_sigusr1(self, signum, frame): + if not self.shmem: + raise ValueError("shared memory is not initialized yet") + print("Caught SIGUSR1, dumping state") + buf = bytes(self.shmem.buf[:]) + with Image.new("RGB", (1000, 1000), 0) as im: + for i in range(1000000): + y, x = divmod(i, 1000) + im.putpixel((x, y), ( + 255 if buf[i] & PixelMask.FILL else 0, + 255 if buf[i] & PixelMask.MASK else 0, + 255 if buf[i] & PixelMask.CHECKED else 0 + )) + im.save("state.png") + + async def animator(self): + while True: + await asyncio.sleep(0.1) + + def add_avoid_range(self, rng: range): + if not self.shmem: + raise ValueError("shared memory is not initialized yet") + for ndx in rng: + self.shmem.buf[ndx] |= PixelMask.AVOID + + def add_avoid_rect(self, sx: int, sy: int, w: int, h: int): + for y in range(sy, sy + h): + ox = y * 1000 + self.add_avoid_range(range(sx + ox, sx + w + ox)) + + def add_avoid_index(self, index: int): + if not self.shmem: + raise ValueError("shared memory is not initialized yet") + assert 0 <= index < 1000000 + self.shmem.buf[index] |= PixelMask.AVOID + + def add_avoid_image(self, im: Image.Image): + assert im.width == 1000 + assert im.height == 1000 + for i in range(1000000): + y, x = divmod(i, 1000) + la = im.getpixel((x, y)) + assert isinstance(la, (tuple, list)) and len(la) == 2 + if la[1]: + self.add_avoid_index(i) + + async def __aenter__(self): + self.shmem = SharedMemory(self.shmem_name, True, 1000000) + return self + + async def __aexit__(self, a, b, c): + if self.shmem: + print("cleaning up shmem") + self.shmem.close() + self.shmem.unlink() + + +async def main(): + print(f"PID: {os.getpid()}") + async with Manager() as mgr: + signal.signal(signal.SIGUSR1, mgr.on_sigusr1) + await mgr.listener() + + +if __name__ == "__main__": + asyncio.run(main())