onemillioncheckboxes/swarm/manager.py

168 lines
5.2 KiB
Python

from multiprocessing.shared_memory import SharedMemory
from typing import Optional
import asyncio
import socketio
import aiohttp
from aiohttp_socks import ProxyConnector
from PIL import (
Image,
ImageFont,
ImageDraw,
ImageFilter,
ImageSequence,
ImageChops,
)
from enum import IntFlag
from base64 import b64decode
import signal
import os
import time
class PixelMask(IntFlag):
AVOID = 16
MASK = 32
FILL = 64
CHECKED = 128
class Manager:
def __init__(self):
self.shmem: Optional[SharedMemory] = None
self.shmem_name = "omcb-bot"
self.base = "https://onemillioncheckboxes.com"
self.last_update = 0
self.bits_toggled_on = 0
self.bits_toggled_off = 0
self.last_printout = 0
async def listener(self):
sio = socketio.AsyncClient()
sio.on("connect", self.on_connect)
sio.on("batched_bit_toggles", self.on_batched_bit_toggles)
sio.on("full_state", self.on_full_state)
await sio.connect(f"{self.base}")
await sio.wait()
def update_shmem(self, state: bytes):
if not self.shmem:
raise ValueError("shared memory is not initialized yet")
buffer = bytearray(bytes(1000000))
for i in range(1000000):
byte, bit = divmod(i, 8)
if state[byte] & (0x80 >> bit):
buffer[i] |= PixelMask.CHECKED
else:
buffer[i] &= ~PixelMask.CHECKED
self.shmem.buf[:] = buffer
async def on_connect(self):
async with aiohttp.ClientSession() as http:
async with http.get(f"{self.base}/api/initial-state") as req:
data = await req.json()
buffer = b64decode(data["full_state"].encode() + b"=")
self.update_shmem(buffer)
self.last_update = data["timestamp"]
async def on_full_state(self, data):
if not self.shmem:
raise ValueError("shared memory is not initialized yet")
buffer = b64decode(data["full_state"].encode() + b"=")
self.update_shmem(buffer)
self.last_update = data["timestamp"]
async def on_batched_bit_toggles(self, data):
if not self.shmem:
raise ValueError("shared memory is not initialized yet")
bits_on, bits_off, timestamp = data
if timestamp < self.last_update:
print("old update, ignoring")
self.last_update = timestamp
self.bits_toggled_on += len(bits_on)
self.bits_toggled_off = len(bits_off)
for bit in bits_on:
self.shmem.buf[bit] |= PixelMask.CHECKED
for bit in bits_off:
self.shmem.buf[bit] &= ~PixelMask.CHECKED
since_last_printout = time.time() - self.last_printout
if since_last_printout >= 5:
self.last_printout = time.time()
print()
print(f"Toggled on: {self.bits_toggled_on / since_last_printout}/s")
print(f"Toggled off: {self.bits_toggled_off / since_last_printout}/s")
self.bits_toggled_on = self.bits_toggled_off = 0
def on_sigusr1(self, signum, frame):
if not self.shmem:
raise ValueError("shared memory is not initialized yet")
print("Caught SIGUSR1, dumping state")
buf = bytes(self.shmem.buf[:])
with Image.new("RGB", (1000, 1000), 0) as im:
for i in range(1000000):
y, x = divmod(i, 1000)
im.putpixel((x, y), (
255 if buf[i] & PixelMask.FILL else 0,
255 if buf[i] & PixelMask.MASK else 0,
255 if buf[i] & PixelMask.CHECKED else 0
))
im.save("state.png")
async def animator(self):
while True:
await asyncio.sleep(0.1)
def add_avoid_range(self, rng: range):
if not self.shmem:
raise ValueError("shared memory is not initialized yet")
for ndx in rng:
self.shmem.buf[ndx] |= PixelMask.AVOID
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int):
for y in range(sy, sy + h):
ox = y * 1000
self.add_avoid_range(range(sx + ox, sx + w + ox))
def add_avoid_index(self, index: int):
if not self.shmem:
raise ValueError("shared memory is not initialized yet")
assert 0 <= index < 1000000
self.shmem.buf[index] |= PixelMask.AVOID
def add_avoid_image(self, im: Image.Image):
assert im.width == 1000
assert im.height == 1000
for i in range(1000000):
y, x = divmod(i, 1000)
la = im.getpixel((x, y))
assert isinstance(la, (tuple, list)) and len(la) == 2
if la[1]:
self.add_avoid_index(i)
async def __aenter__(self):
self.shmem = SharedMemory(self.shmem_name, True, 1000000)
return self
async def __aexit__(self, a, b, c):
if self.shmem:
print("cleaning up shmem")
self.shmem.close()
self.shmem.unlink()
async def main():
print(f"PID: {os.getpid()}")
async with Manager() as mgr:
signal.signal(signal.SIGUSR1, mgr.on_sigusr1)
await mgr.listener()
if __name__ == "__main__":
asyncio.run(main())